From a060c2b22e6f71ad058c7cb8b7e2e9a4edf47b83 Mon Sep 17 00:00:00 2001 From: ford prior <108086978+ford-at-aws@users.noreply.github.com> Date: Wed, 1 Nov 2023 12:59:56 -0400 Subject: [PATCH] Tools: Weathertop - Update testing stack (#5579) --- .../eventbridge_rule_with_sns_fanout/cdk.json | 3 - .../producer_stack/producer_stack.py | 5 +- .../requirements.txt | 4 +- .../test/sqs_lambda_to_batch_fargate/app.py | 2 +- .../consumer_stack/consumer_stack.py | 161 ++++++++---------- .../lambda/export_logs.py | 34 ++-- .../requirements.txt | 1 + 7 files changed, 102 insertions(+), 108 deletions(-) delete mode 100644 .tools/test/eventbridge_rule_with_sns_fanout/cdk.json diff --git a/.tools/test/eventbridge_rule_with_sns_fanout/cdk.json b/.tools/test/eventbridge_rule_with_sns_fanout/cdk.json deleted file mode 100644 index 787a71dd6e8..00000000000 --- a/.tools/test/eventbridge_rule_with_sns_fanout/cdk.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "app": "python3 app.py" -} diff --git a/.tools/test/eventbridge_rule_with_sns_fanout/producer_stack/producer_stack.py b/.tools/test/eventbridge_rule_with_sns_fanout/producer_stack/producer_stack.py index 0896be3dda4..629c4f73882 100644 --- a/.tools/test/eventbridge_rule_with_sns_fanout/producer_stack/producer_stack.py +++ b/.tools/test/eventbridge_rule_with_sns_fanout/producer_stack/producer_stack.py @@ -14,8 +14,8 @@ class ProducerStack(Stack): def __init__(self, scope: Construct, id: str, **kwargs) -> None: super().__init__(scope, id, **kwargs) - acct_config = self.get_yaml_config("../../config/targets.yaml") - resource_config = self.get_yaml_config("../../config/resources.yaml") + acct_config = self.get_yaml_config("../config/targets.yaml") + resource_config = self.get_yaml_config("../config/resources.yaml") topic_name = resource_config["topic_name"] bucket_name = resource_config["bucket_name"] topic = self.init_get_topic(topic_name) @@ -112,4 +112,5 @@ def init_cross_account_log_role(self, target_accts, bucket): statement.add_arn_principal( f"arn:aws:iam::{str(target_accts[language]['account_id'])}:role/LogsLambdaExecutionRole" ) + statement.add_arn_principal(f"arn:aws:iam::{Aws.ACCOUNT_ID}:root") bucket.add_to_resource_policy(statement) diff --git a/.tools/test/eventbridge_rule_with_sns_fanout/requirements.txt b/.tools/test/eventbridge_rule_with_sns_fanout/requirements.txt index 2c97bd3e5c2..c6102758f8c 100644 --- a/.tools/test/eventbridge_rule_with_sns_fanout/requirements.txt +++ b/.tools/test/eventbridge_rule_with_sns_fanout/requirements.txt @@ -3,5 +3,5 @@ aws-cdk-lib>=2.0.0 constructs>=10.0.0 types-boto3 types-setuptools -random -pyyaml \ No newline at end of file +pyyaml +random2 \ No newline at end of file diff --git a/.tools/test/sqs_lambda_to_batch_fargate/app.py b/.tools/test/sqs_lambda_to_batch_fargate/app.py index 1c386307b61..4bbc9424745 100644 --- a/.tools/test/sqs_lambda_to_batch_fargate/app.py +++ b/.tools/test/sqs_lambda_to_batch_fargate/app.py @@ -9,7 +9,7 @@ app = App() ConsumerStack( app, - f"ConsumerStack-{os.getenv('LANGUAGE_NAME')}", + f"ConsumerStack-{os.getenv('LANGUAGE_NAME').replace('_', '-')}", env=cdk.Environment( account=os.getenv("CDK_DEFAULT_ACCOUNT"), region=os.getenv("CDK_DEFAULT_REGION") ), diff --git a/.tools/test/sqs_lambda_to_batch_fargate/consumer_stack/consumer_stack.py b/.tools/test/sqs_lambda_to_batch_fargate/consumer_stack/consumer_stack.py index c5af9b62054..627b47b1c93 100644 --- a/.tools/test/sqs_lambda_to_batch_fargate/consumer_stack/consumer_stack.py +++ b/.tools/test/sqs_lambda_to_batch_fargate/consumer_stack/consumer_stack.py @@ -3,7 +3,8 @@ import os -from aws_cdk import Aws, Duration, Size, Stack +import yaml +from aws_cdk import Aws, Duration, Size, Stack, aws_batch from aws_cdk import aws_batch_alpha as batch_alpha from aws_cdk import aws_ec2 as ec2 from aws_cdk import aws_ecs as ecs @@ -26,78 +27,27 @@ class ConsumerStack(Stack): def __init__(self, scope: Construct, id: str, **kwargs) -> None: super().__init__(scope, id, **kwargs) - - ##################################################### - ## ## - ## FANOUT COMPONENTS ## - ## (SQS, SNS, and Subscriptions) ## - ## ## - ##################################################### - - # Locate Amazon Simple Notification Service (Amazon SNS) topic in the producer account. - fanout_topic_name = "aws-weathertop-central-sns-fanout-topic" - fanout_topic_arn = ( - f"arn:aws:sns:us-east-1:{producer_account_id}:{fanout_topic_name}" - ) - sns_topic = sns.Topic.from_topic_arn( - self, fanout_topic_name, topic_arn=fanout_topic_arn - ) - - # Define the Amazon Simple Queue Service (Amazon SQS) queue in this account. + resource_config = self.get_yaml_config("../config/resources.yaml") + topic_name = resource_config["topic_name"] + producer_bucket_name = resource_config["bucket_name"] + sns_topic = self.init_get_topic(topic_name) sqs_queue = sqs.Queue(self, f"BatchJobQueue-{language_name}") - - # Create an AWS Identity and Access Management (IAM) role for the SNS topic to send messages to the SQS queue. - sns_topic_role = iam.Role( - self, - f"SNSTopicRole-{language_name}", - assumed_by=iam.ServicePrincipal("sns.amazonaws.com"), - description="Allows the SNS topic to send messages to the SQS queue in this account", - role_name=f"SNSTopicRole-{language_name}", - ) - - # Policy to allow existing SNS topic to publish to new SQS queue. - sns_topic_policy = iam.PolicyStatement( - effect=iam.Effect.ALLOW, - actions=["sqs:SendMessage"], - resources=[sqs_queue.queue_arn], - conditions={"ArnEquals": {"aws:SourceArn": fanout_topic_arn}}, - ) - - # Create an SNS subscription for the SQS queue. - subs.SqsSubscription(sqs_queue, raw_message_delivery=True).bind(sns_topic) - sns_topic.add_subscription(subs.SqsSubscription(sqs_queue)) - - # Add the Amazon SNS and Amazon SQS policy to the IAM role. - sns_topic_role.add_to_policy(sns_topic_policy) - - # Define policy that allows cross-account Amazon SNS and Amazon SQS access. - statement = iam.PolicyStatement() - statement.add_resources(sqs_queue.queue_arn) - statement.add_actions("sqs:*") - statement.add_arn_principal(f"arn:aws:iam::{producer_account_id}:root") - statement.add_arn_principal(f"arn:aws:iam::{Aws.ACCOUNT_ID}:root") - statement.add_condition("ArnLike", {"aws:SourceArn": fanout_topic_arn}) - sqs_queue.add_to_resource_policy(statement) - - ################################################ - ## ## - ## S3 BUCKET FOR LOGS ## - ## ## - ################################################ - - bucket = s3.Bucket( - self, - "LogBucket", - versioned=False, - block_public_access=s3.BlockPublicAccess.BLOCK_ALL, - ) - - ################################################ - ## ## - ## BATCH FARGATE JOBS ## - ## ## - ################################################ - + self.init_subscribe_sns(sqs_queue, sns_topic) + job_definition, job_queue = self.init_batch_fargte() + batch_function = self.init_batch_lambda(job_queue, job_definition) + self.init_sqs_lambda_integration(batch_function, sqs_queue) + self.init_log_function(producer_bucket_name) + + def get_yaml_config(self, filepath): + with open(filepath, "r") as file: + data = yaml.safe_load(file) + return data + + def init_get_topic(self, topic_name): + topic = sns.Topic(self, "fanout-topic", topic_name=topic_name) + return topic + + def init_batch_fargte(self): batch_execution_role = iam.Role( self, f"BatchExecutionRole-{language_name}", @@ -157,13 +107,48 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None: job_queue.add_compute_environment(fargate_environment, 1) - ################################################# - ## ## - ## BATCH LAMBDA FUNCTION ## - ## (Triggers Batch job from SQS queue) ## - ## ## - ################################################# + return job_definition, job_queue + + def init_sqs_queue(self): + # Define the Amazon Simple Queue Service (Amazon SQS) queue in this account. + sqs_queue = sqs.Queue(self, f"BatchJobQueue-{language_name}") + return sqs_queue + + def init_subscribe_sns(self, sqs_queue, sns_topic): + # Create an AWS Identity and Access Management (IAM) role for the SNS topic to send messages to the SQS queue. + sns_topic_role = iam.Role( + self, + f"SNSTopicRole-{language_name}", + assumed_by=iam.ServicePrincipal("sns.amazonaws.com"), + description="Allows the SNS topic to send messages to the SQS queue in this account", + role_name=f"SNSTopicRole-{language_name}", + ) + + # Policy to allow existing SNS topic to publish to new SQS queue. + sns_topic_policy = iam.PolicyStatement( + effect=iam.Effect.ALLOW, + actions=["sqs:SendMessage"], + resources=[sqs_queue.queue_arn], + conditions={"ArnEquals": {"aws:SourceArn": sns_topic.topic_arn}}, + ) + # Create an SNS subscription for the SQS queue. + subs.SqsSubscription(sqs_queue, raw_message_delivery=True).bind(sns_topic) + sns_topic.add_subscription(subs.SqsSubscription(sqs_queue)) + + # Add the Amazon SNS and Amazon SQS policy to the IAM role. + sns_topic_role.add_to_policy(sns_topic_policy) + + # Define policy that allows cross-account Amazon SNS and Amazon SQS access. + statement = iam.PolicyStatement() + statement.add_resources(sqs_queue.queue_arn) + statement.add_actions("sqs:*") + statement.add_arn_principal(f"arn:aws:iam::{producer_account_id}:root") + statement.add_arn_principal(f"arn:aws:iam::{Aws.ACCOUNT_ID}:root") + statement.add_condition("ArnLike", {"aws:SourceArn": sns_topic.topic_arn}) + sqs_queue.add_to_resource_policy(statement) + + def init_batch_lambda(self, job_queue, job_definition): # Execution role for AWS Lambda function to use. execution_role = iam.Role( self, @@ -199,7 +184,9 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None: "JOB_NAME": f"job-{language_name}", }, ) + return function + def init_sqs_lambda_integration(self, function, sqs_queue): # Add the SQS queue as an event source for the Lambda function. function.add_event_source(_event_sources.SqsEventSource(sqs_queue)) @@ -221,12 +208,14 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None: ) ) - ################################################# - ## ## - ## LOG LAMBDA FUNCTION ## - ## (Processes logs and puts them to S3) ## - ## ## - ################################################# + def init_log_function(self, producer_bucket_name): + # S3 Bucket to store logs within this account. + bucket = s3.Bucket( + self, + "LogBucket", + versioned=False, + block_public_access=s3.BlockPublicAccess.BLOCK_ALL, + ) # Execution role for AWS Lambda function to use. execution_role = iam.Role( @@ -264,7 +253,7 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None: execution_role.add_to_policy( statement=iam.PolicyStatement( actions=["s3:PutObject", "s3:PutObjectAcl"], - resources=["arn:aws:s3:::aws-weathertop-central-log-bucket/*"], + resources=[f"arn:aws:s3:::{producer_bucket_name}/*"], ) ) @@ -279,7 +268,7 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None: environment={ "LANGUAGE_NAME": language_name, "BUCKET_NAME": bucket.bucket_name, - "PRODUCER_BUCKET_NAME": "aws-weathertop-central-log-bucket", + "PRODUCER_BUCKET_NAME": f"{producer_bucket_name}", }, ) diff --git a/.tools/test/sqs_lambda_to_batch_fargate/lambda/export_logs.py b/.tools/test/sqs_lambda_to_batch_fargate/lambda/export_logs.py index fdd08a0440a..5e070f45415 100644 --- a/.tools/test/sqs_lambda_to_batch_fargate/lambda/export_logs.py +++ b/.tools/test/sqs_lambda_to_batch_fargate/lambda/export_logs.py @@ -19,28 +19,27 @@ def handler(event, context): logger.debug(f"BUCKET_NAME: {os.environ['BUCKET_NAME']}") logger.debug(f"INCOMING EVENT: {event}") - status = event["detail"]["status"] - if "Batch Job State Change" not in event["detail-type"]: logger.info(f"Non-triggering Batch event: {event['detail-type']}") return - if "TIMED_OUT" in status: + if "TIMED_OUT" in event["detail"]["status"]: raise Exception( "Job timed out. Contact application owner or increase time out threshold" ) - if status not in ["FAILED", "SUCCEEDED"]: - logger.info(f"Non-triggering Batch status: STATUS: {status}") + if event["detail"]["status"] not in ["FAILED", "SUCCEEDED"]: + logger.info(f"Non-triggering Batch status: STATUS: {event['detail']['status']}") return try: - get_and_put_logs() + job_id = event["detail"]["jobId"] + get_and_put_logs(job_id) except Exception as e: logger.error(json.dumps(f"Error: {str(e)}")) raise e -def get_and_put_logs(): - # Get most recent stream +def get_and_put_logs(job_id): + # Get most recent log stream log_streams = logs_client.describe_log_streams( logGroupName=log_group_name, orderBy="LastEventTime", @@ -55,17 +54,24 @@ def get_and_put_logs(): startFromHead=True, ) + log_file_name = f"{job_id}.log" + log_file = "\n".join( [f"{e['timestamp']}, {e['message']}" for e in log_events["events"]] ) - file_identifier = str(random.randint(10**7, 10**8 - 1)) - s3_client.upload_fileobj( - log_file, - os.environ["PRODUCER_BUCKET_NAME"], - f"{os.environ['LANGUAGE_NAME']}/{file_identifier}", + # Put logs to cross-account bucket + s3_client.put_object( + Body=log_file, + Bucket=os.environ["PRODUCER_BUCKET_NAME"], + Key=f"{os.environ['LANGUAGE_NAME']}/{log_file_name}", + ) + + # Back up logs to local bucket + s3_client.put_object( + Body=log_file, Bucket=os.environ["BUCKET_NAME"], Key=f"{log_file_name}" ) logger.info( - f"Log data saved successfully: {os.environ['LANGUAGE_NAME']}/{file_identifier}" + f"Log data saved successfully: {os.environ['LANGUAGE_NAME']}/{log_file_name}" ) diff --git a/.tools/test/sqs_lambda_to_batch_fargate/requirements.txt b/.tools/test/sqs_lambda_to_batch_fargate/requirements.txt index d61aa2e8182..d753af0860b 100644 --- a/.tools/test/sqs_lambda_to_batch_fargate/requirements.txt +++ b/.tools/test/sqs_lambda_to_batch_fargate/requirements.txt @@ -3,3 +3,4 @@ constructs>=10.0.0 types-boto3 types-setuptools aws_cdk.aws_batch_alpha +pyyaml \ No newline at end of file