Skip to content

Commit

Permalink
Tools: Weathertop - Update testing stack (#5579)
Browse files Browse the repository at this point in the history
  • Loading branch information
ford-at-aws committed Dec 15, 2023
1 parent 30362e8 commit a060c2b
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 108 deletions.
3 changes: 0 additions & 3 deletions .tools/test/eventbridge_rule_with_sns_fanout/cdk.json

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
class ProducerStack(Stack):
def __init__(self, scope: Construct, id: str, **kwargs) -> None:
super().__init__(scope, id, **kwargs)
acct_config = self.get_yaml_config("../../config/targets.yaml")
resource_config = self.get_yaml_config("../../config/resources.yaml")
acct_config = self.get_yaml_config("../config/targets.yaml")
resource_config = self.get_yaml_config("../config/resources.yaml")
topic_name = resource_config["topic_name"]
bucket_name = resource_config["bucket_name"]
topic = self.init_get_topic(topic_name)
Expand Down Expand Up @@ -112,4 +112,5 @@ def init_cross_account_log_role(self, target_accts, bucket):
statement.add_arn_principal(
f"arn:aws:iam::{str(target_accts[language]['account_id'])}:role/LogsLambdaExecutionRole"
)
statement.add_arn_principal(f"arn:aws:iam::{Aws.ACCOUNT_ID}:root")
bucket.add_to_resource_policy(statement)
4 changes: 2 additions & 2 deletions .tools/test/eventbridge_rule_with_sns_fanout/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ aws-cdk-lib>=2.0.0
constructs>=10.0.0
types-boto3
types-setuptools
random
pyyaml
pyyaml
random2
2 changes: 1 addition & 1 deletion .tools/test/sqs_lambda_to_batch_fargate/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
app = App()
ConsumerStack(
app,
f"ConsumerStack-{os.getenv('LANGUAGE_NAME')}",
f"ConsumerStack-{os.getenv('LANGUAGE_NAME').replace('_', '-')}",
env=cdk.Environment(
account=os.getenv("CDK_DEFAULT_ACCOUNT"), region=os.getenv("CDK_DEFAULT_REGION")
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

import os

from aws_cdk import Aws, Duration, Size, Stack
import yaml
from aws_cdk import Aws, Duration, Size, Stack, aws_batch
from aws_cdk import aws_batch_alpha as batch_alpha
from aws_cdk import aws_ec2 as ec2
from aws_cdk import aws_ecs as ecs
Expand All @@ -26,78 +27,27 @@
class ConsumerStack(Stack):
def __init__(self, scope: Construct, id: str, **kwargs) -> None:
super().__init__(scope, id, **kwargs)

#####################################################
## ##
## FANOUT COMPONENTS ##
## (SQS, SNS, and Subscriptions) ##
## ##
#####################################################

# Locate Amazon Simple Notification Service (Amazon SNS) topic in the producer account.
fanout_topic_name = "aws-weathertop-central-sns-fanout-topic"
fanout_topic_arn = (
f"arn:aws:sns:us-east-1:{producer_account_id}:{fanout_topic_name}"
)
sns_topic = sns.Topic.from_topic_arn(
self, fanout_topic_name, topic_arn=fanout_topic_arn
)

# Define the Amazon Simple Queue Service (Amazon SQS) queue in this account.
resource_config = self.get_yaml_config("../config/resources.yaml")
topic_name = resource_config["topic_name"]
producer_bucket_name = resource_config["bucket_name"]
sns_topic = self.init_get_topic(topic_name)
sqs_queue = sqs.Queue(self, f"BatchJobQueue-{language_name}")

# Create an AWS Identity and Access Management (IAM) role for the SNS topic to send messages to the SQS queue.
sns_topic_role = iam.Role(
self,
f"SNSTopicRole-{language_name}",
assumed_by=iam.ServicePrincipal("sns.amazonaws.com"),
description="Allows the SNS topic to send messages to the SQS queue in this account",
role_name=f"SNSTopicRole-{language_name}",
)

# Policy to allow existing SNS topic to publish to new SQS queue.
sns_topic_policy = iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["sqs:SendMessage"],
resources=[sqs_queue.queue_arn],
conditions={"ArnEquals": {"aws:SourceArn": fanout_topic_arn}},
)

# Create an SNS subscription for the SQS queue.
subs.SqsSubscription(sqs_queue, raw_message_delivery=True).bind(sns_topic)
sns_topic.add_subscription(subs.SqsSubscription(sqs_queue))

# Add the Amazon SNS and Amazon SQS policy to the IAM role.
sns_topic_role.add_to_policy(sns_topic_policy)

# Define policy that allows cross-account Amazon SNS and Amazon SQS access.
statement = iam.PolicyStatement()
statement.add_resources(sqs_queue.queue_arn)
statement.add_actions("sqs:*")
statement.add_arn_principal(f"arn:aws:iam::{producer_account_id}:root")
statement.add_arn_principal(f"arn:aws:iam::{Aws.ACCOUNT_ID}:root")
statement.add_condition("ArnLike", {"aws:SourceArn": fanout_topic_arn})
sqs_queue.add_to_resource_policy(statement)

################################################
## ##
## S3 BUCKET FOR LOGS ##
## ##
################################################

bucket = s3.Bucket(
self,
"LogBucket",
versioned=False,
block_public_access=s3.BlockPublicAccess.BLOCK_ALL,
)

################################################
## ##
## BATCH FARGATE JOBS ##
## ##
################################################

self.init_subscribe_sns(sqs_queue, sns_topic)
job_definition, job_queue = self.init_batch_fargte()
batch_function = self.init_batch_lambda(job_queue, job_definition)
self.init_sqs_lambda_integration(batch_function, sqs_queue)
self.init_log_function(producer_bucket_name)

def get_yaml_config(self, filepath):
with open(filepath, "r") as file:
data = yaml.safe_load(file)
return data

def init_get_topic(self, topic_name):
topic = sns.Topic(self, "fanout-topic", topic_name=topic_name)
return topic

def init_batch_fargte(self):
batch_execution_role = iam.Role(
self,
f"BatchExecutionRole-{language_name}",
Expand Down Expand Up @@ -157,13 +107,48 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None:

job_queue.add_compute_environment(fargate_environment, 1)

#################################################
## ##
## BATCH LAMBDA FUNCTION ##
## (Triggers Batch job from SQS queue) ##
## ##
#################################################
return job_definition, job_queue

def init_sqs_queue(self):
# Define the Amazon Simple Queue Service (Amazon SQS) queue in this account.
sqs_queue = sqs.Queue(self, f"BatchJobQueue-{language_name}")
return sqs_queue

def init_subscribe_sns(self, sqs_queue, sns_topic):
# Create an AWS Identity and Access Management (IAM) role for the SNS topic to send messages to the SQS queue.
sns_topic_role = iam.Role(
self,
f"SNSTopicRole-{language_name}",
assumed_by=iam.ServicePrincipal("sns.amazonaws.com"),
description="Allows the SNS topic to send messages to the SQS queue in this account",
role_name=f"SNSTopicRole-{language_name}",
)

# Policy to allow existing SNS topic to publish to new SQS queue.
sns_topic_policy = iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["sqs:SendMessage"],
resources=[sqs_queue.queue_arn],
conditions={"ArnEquals": {"aws:SourceArn": sns_topic.topic_arn}},
)

# Create an SNS subscription for the SQS queue.
subs.SqsSubscription(sqs_queue, raw_message_delivery=True).bind(sns_topic)
sns_topic.add_subscription(subs.SqsSubscription(sqs_queue))

# Add the Amazon SNS and Amazon SQS policy to the IAM role.
sns_topic_role.add_to_policy(sns_topic_policy)

# Define policy that allows cross-account Amazon SNS and Amazon SQS access.
statement = iam.PolicyStatement()
statement.add_resources(sqs_queue.queue_arn)
statement.add_actions("sqs:*")
statement.add_arn_principal(f"arn:aws:iam::{producer_account_id}:root")
statement.add_arn_principal(f"arn:aws:iam::{Aws.ACCOUNT_ID}:root")
statement.add_condition("ArnLike", {"aws:SourceArn": sns_topic.topic_arn})
sqs_queue.add_to_resource_policy(statement)

def init_batch_lambda(self, job_queue, job_definition):
# Execution role for AWS Lambda function to use.
execution_role = iam.Role(
self,
Expand Down Expand Up @@ -199,7 +184,9 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None:
"JOB_NAME": f"job-{language_name}",
},
)
return function

def init_sqs_lambda_integration(self, function, sqs_queue):
# Add the SQS queue as an event source for the Lambda function.
function.add_event_source(_event_sources.SqsEventSource(sqs_queue))

Expand All @@ -221,12 +208,14 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None:
)
)

#################################################
## ##
## LOG LAMBDA FUNCTION ##
## (Processes logs and puts them to S3) ##
## ##
#################################################
def init_log_function(self, producer_bucket_name):
# S3 Bucket to store logs within this account.
bucket = s3.Bucket(
self,
"LogBucket",
versioned=False,
block_public_access=s3.BlockPublicAccess.BLOCK_ALL,
)

# Execution role for AWS Lambda function to use.
execution_role = iam.Role(
Expand Down Expand Up @@ -264,7 +253,7 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None:
execution_role.add_to_policy(
statement=iam.PolicyStatement(
actions=["s3:PutObject", "s3:PutObjectAcl"],
resources=["arn:aws:s3:::aws-weathertop-central-log-bucket/*"],
resources=[f"arn:aws:s3:::{producer_bucket_name}/*"],
)
)

Expand All @@ -279,7 +268,7 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None:
environment={
"LANGUAGE_NAME": language_name,
"BUCKET_NAME": bucket.bucket_name,
"PRODUCER_BUCKET_NAME": "aws-weathertop-central-log-bucket",
"PRODUCER_BUCKET_NAME": f"{producer_bucket_name}",
},
)

Expand Down
34 changes: 20 additions & 14 deletions .tools/test/sqs_lambda_to_batch_fargate/lambda/export_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,27 @@ def handler(event, context):
logger.debug(f"BUCKET_NAME: {os.environ['BUCKET_NAME']}")
logger.debug(f"INCOMING EVENT: {event}")

status = event["detail"]["status"]

if "Batch Job State Change" not in event["detail-type"]:
logger.info(f"Non-triggering Batch event: {event['detail-type']}")
return
if "TIMED_OUT" in status:
if "TIMED_OUT" in event["detail"]["status"]:
raise Exception(
"Job timed out. Contact application owner or increase time out threshold"
)
if status not in ["FAILED", "SUCCEEDED"]:
logger.info(f"Non-triggering Batch status: STATUS: {status}")
if event["detail"]["status"] not in ["FAILED", "SUCCEEDED"]:
logger.info(f"Non-triggering Batch status: STATUS: {event['detail']['status']}")
return

try:
get_and_put_logs()
job_id = event["detail"]["jobId"]
get_and_put_logs(job_id)
except Exception as e:
logger.error(json.dumps(f"Error: {str(e)}"))
raise e


def get_and_put_logs():
# Get most recent stream
def get_and_put_logs(job_id):
# Get most recent log stream
log_streams = logs_client.describe_log_streams(
logGroupName=log_group_name,
orderBy="LastEventTime",
Expand All @@ -55,17 +54,24 @@ def get_and_put_logs():
startFromHead=True,
)

log_file_name = f"{job_id}.log"

log_file = "\n".join(
[f"{e['timestamp']}, {e['message']}" for e in log_events["events"]]
)
file_identifier = str(random.randint(10**7, 10**8 - 1))

s3_client.upload_fileobj(
log_file,
os.environ["PRODUCER_BUCKET_NAME"],
f"{os.environ['LANGUAGE_NAME']}/{file_identifier}",
# Put logs to cross-account bucket
s3_client.put_object(
Body=log_file,
Bucket=os.environ["PRODUCER_BUCKET_NAME"],
Key=f"{os.environ['LANGUAGE_NAME']}/{log_file_name}",
)

# Back up logs to local bucket
s3_client.put_object(
Body=log_file, Bucket=os.environ["BUCKET_NAME"], Key=f"{log_file_name}"
)

logger.info(
f"Log data saved successfully: {os.environ['LANGUAGE_NAME']}/{file_identifier}"
f"Log data saved successfully: {os.environ['LANGUAGE_NAME']}/{log_file_name}"
)
1 change: 1 addition & 0 deletions .tools/test/sqs_lambda_to_batch_fargate/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ constructs>=10.0.0
types-boto3
types-setuptools
aws_cdk.aws_batch_alpha
pyyaml

0 comments on commit a060c2b

Please sign in to comment.