Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tools: Weathertop - Update testing stack #5579

Merged
merged 9 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .tools/test/eventbridge_rule_with_sns_fanout/cdk.json

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
class ProducerStack(Stack):
def __init__(self, scope: Construct, id: str, **kwargs) -> None:
super().__init__(scope, id, **kwargs)
acct_config = self.get_yaml_config("../../config/targets.yaml")
resource_config = self.get_yaml_config("../../config/resources.yaml")
acct_config = self.get_yaml_config("../config/targets.yaml")
resource_config = self.get_yaml_config("../config/resources.yaml")
topic_name = resource_config["topic_name"]
bucket_name = resource_config["bucket_name"]
topic = self.init_get_topic(topic_name)
Expand Down Expand Up @@ -112,4 +112,5 @@ def init_cross_account_log_role(self, target_accts, bucket):
statement.add_arn_principal(
f"arn:aws:iam::{str(target_accts[language]['account_id'])}:role/LogsLambdaExecutionRole"
)
statement.add_arn_principal(f"arn:aws:iam::{Aws.ACCOUNT_ID}:root")
bucket.add_to_resource_policy(statement)
4 changes: 2 additions & 2 deletions .tools/test/eventbridge_rule_with_sns_fanout/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ aws-cdk-lib>=2.0.0
constructs>=10.0.0
types-boto3
types-setuptools
random
pyyaml
pyyaml
random2
2 changes: 1 addition & 1 deletion .tools/test/sqs_lambda_to_batch_fargate/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
app = App()
ConsumerStack(
app,
f"ConsumerStack-{os.getenv('LANGUAGE_NAME')}",
f"ConsumerStack-{os.getenv('LANGUAGE_NAME').replace('_', '-')}",
env=cdk.Environment(
account=os.getenv("CDK_DEFAULT_ACCOUNT"), region=os.getenv("CDK_DEFAULT_REGION")
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

import os

from aws_cdk import Aws, Duration, Size, Stack
import yaml
from aws_cdk import Aws, Duration, Size, Stack, aws_batch
from aws_cdk import aws_batch_alpha as batch_alpha
from aws_cdk import aws_ec2 as ec2
from aws_cdk import aws_ecs as ecs
Expand All @@ -26,78 +27,27 @@
class ConsumerStack(Stack):
def __init__(self, scope: Construct, id: str, **kwargs) -> None:
super().__init__(scope, id, **kwargs)

#####################################################
## ##
## FANOUT COMPONENTS ##
## (SQS, SNS, and Subscriptions) ##
## ##
#####################################################

# Locate Amazon Simple Notification Service (Amazon SNS) topic in the producer account.
fanout_topic_name = "aws-weathertop-central-sns-fanout-topic"
fanout_topic_arn = (
f"arn:aws:sns:us-east-1:{producer_account_id}:{fanout_topic_name}"
)
sns_topic = sns.Topic.from_topic_arn(
self, fanout_topic_name, topic_arn=fanout_topic_arn
)

# Define the Amazon Simple Queue Service (Amazon SQS) queue in this account.
resource_config = self.get_yaml_config("../config/resources.yaml")
topic_name = resource_config["topic_name"]
producer_bucket_name = resource_config["bucket_name"]
sns_topic = self.init_get_topic(topic_name)
sqs_queue = sqs.Queue(self, f"BatchJobQueue-{language_name}")

# Create an AWS Identity and Access Management (IAM) role for the SNS topic to send messages to the SQS queue.
sns_topic_role = iam.Role(
self,
f"SNSTopicRole-{language_name}",
assumed_by=iam.ServicePrincipal("sns.amazonaws.com"),
description="Allows the SNS topic to send messages to the SQS queue in this account",
role_name=f"SNSTopicRole-{language_name}",
)

# Policy to allow existing SNS topic to publish to new SQS queue.
sns_topic_policy = iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["sqs:SendMessage"],
resources=[sqs_queue.queue_arn],
conditions={"ArnEquals": {"aws:SourceArn": fanout_topic_arn}},
)

# Create an SNS subscription for the SQS queue.
subs.SqsSubscription(sqs_queue, raw_message_delivery=True).bind(sns_topic)
sns_topic.add_subscription(subs.SqsSubscription(sqs_queue))

# Add the Amazon SNS and Amazon SQS policy to the IAM role.
sns_topic_role.add_to_policy(sns_topic_policy)

# Define policy that allows cross-account Amazon SNS and Amazon SQS access.
statement = iam.PolicyStatement()
statement.add_resources(sqs_queue.queue_arn)
statement.add_actions("sqs:*")
statement.add_arn_principal(f"arn:aws:iam::{producer_account_id}:root")
statement.add_arn_principal(f"arn:aws:iam::{Aws.ACCOUNT_ID}:root")
statement.add_condition("ArnLike", {"aws:SourceArn": fanout_topic_arn})
sqs_queue.add_to_resource_policy(statement)

################################################
## ##
## S3 BUCKET FOR LOGS ##
## ##
################################################

bucket = s3.Bucket(
self,
"LogBucket",
versioned=False,
block_public_access=s3.BlockPublicAccess.BLOCK_ALL,
)

################################################
## ##
## BATCH FARGATE JOBS ##
## ##
################################################

self.init_subscribe_sns(sqs_queue, sns_topic)
job_definition, job_queue = self.init_batch_fargte()
batch_function = self.init_batch_lambda(job_queue, job_definition)
self.init_sqs_lambda_integration(batch_function, sqs_queue)
self.init_log_function(producer_bucket_name)

def get_yaml_config(self, filepath):
with open(filepath, "r") as file:
data = yaml.safe_load(file)
return data

def init_get_topic(self, topic_name):
topic = sns.Topic(self, "fanout-topic", topic_name=topic_name)
return topic

def init_batch_fargte(self):
batch_execution_role = iam.Role(
self,
f"BatchExecutionRole-{language_name}",
Expand Down Expand Up @@ -157,13 +107,48 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None:

job_queue.add_compute_environment(fargate_environment, 1)

#################################################
## ##
## BATCH LAMBDA FUNCTION ##
## (Triggers Batch job from SQS queue) ##
## ##
#################################################
return job_definition, job_queue

def init_sqs_queue(self):
# Define the Amazon Simple Queue Service (Amazon SQS) queue in this account.
sqs_queue = sqs.Queue(self, f"BatchJobQueue-{language_name}")
return sqs_queue

def init_subscribe_sns(self, sqs_queue, sns_topic):
# Create an AWS Identity and Access Management (IAM) role for the SNS topic to send messages to the SQS queue.
sns_topic_role = iam.Role(
self,
f"SNSTopicRole-{language_name}",
assumed_by=iam.ServicePrincipal("sns.amazonaws.com"),
description="Allows the SNS topic to send messages to the SQS queue in this account",
role_name=f"SNSTopicRole-{language_name}",
)

# Policy to allow existing SNS topic to publish to new SQS queue.
sns_topic_policy = iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["sqs:SendMessage"],
resources=[sqs_queue.queue_arn],
conditions={"ArnEquals": {"aws:SourceArn": sns_topic.topic_arn}},
)

# Create an SNS subscription for the SQS queue.
subs.SqsSubscription(sqs_queue, raw_message_delivery=True).bind(sns_topic)
sns_topic.add_subscription(subs.SqsSubscription(sqs_queue))

# Add the Amazon SNS and Amazon SQS policy to the IAM role.
sns_topic_role.add_to_policy(sns_topic_policy)

# Define policy that allows cross-account Amazon SNS and Amazon SQS access.
statement = iam.PolicyStatement()
statement.add_resources(sqs_queue.queue_arn)
statement.add_actions("sqs:*")
statement.add_arn_principal(f"arn:aws:iam::{producer_account_id}:root")
statement.add_arn_principal(f"arn:aws:iam::{Aws.ACCOUNT_ID}:root")
statement.add_condition("ArnLike", {"aws:SourceArn": sns_topic.topic_arn})
sqs_queue.add_to_resource_policy(statement)

def init_batch_lambda(self, job_queue, job_definition):
# Execution role for AWS Lambda function to use.
execution_role = iam.Role(
self,
Expand Down Expand Up @@ -199,7 +184,9 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None:
"JOB_NAME": f"job-{language_name}",
},
)
return function

def init_sqs_lambda_integration(self, function, sqs_queue):
# Add the SQS queue as an event source for the Lambda function.
function.add_event_source(_event_sources.SqsEventSource(sqs_queue))

Expand All @@ -221,12 +208,14 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None:
)
)

#################################################
## ##
## LOG LAMBDA FUNCTION ##
## (Processes logs and puts them to S3) ##
## ##
#################################################
def init_log_function(self, producer_bucket_name):
# S3 Bucket to store logs within this account.
bucket = s3.Bucket(
self,
"LogBucket",
versioned=False,
block_public_access=s3.BlockPublicAccess.BLOCK_ALL,
)

# Execution role for AWS Lambda function to use.
execution_role = iam.Role(
Expand Down Expand Up @@ -264,7 +253,7 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None:
execution_role.add_to_policy(
statement=iam.PolicyStatement(
actions=["s3:PutObject", "s3:PutObjectAcl"],
resources=["arn:aws:s3:::aws-weathertop-central-log-bucket/*"],
resources=[f"arn:aws:s3:::{producer_bucket_name}/*"],
)
)

Expand All @@ -279,7 +268,7 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None:
environment={
"LANGUAGE_NAME": language_name,
"BUCKET_NAME": bucket.bucket_name,
"PRODUCER_BUCKET_NAME": "aws-weathertop-central-log-bucket",
"PRODUCER_BUCKET_NAME": f"{producer_bucket_name}",
},
)

Expand Down
34 changes: 20 additions & 14 deletions .tools/test/sqs_lambda_to_batch_fargate/lambda/export_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,27 @@ def handler(event, context):
logger.debug(f"BUCKET_NAME: {os.environ['BUCKET_NAME']}")
logger.debug(f"INCOMING EVENT: {event}")

status = event["detail"]["status"]

if "Batch Job State Change" not in event["detail-type"]:
logger.info(f"Non-triggering Batch event: {event['detail-type']}")
return
if "TIMED_OUT" in status:
if "TIMED_OUT" in event["detail"]["status"]:
raise Exception(
"Job timed out. Contact application owner or increase time out threshold"
)
if status not in ["FAILED", "SUCCEEDED"]:
logger.info(f"Non-triggering Batch status: STATUS: {status}")
if event["detail"]["status"] not in ["FAILED", "SUCCEEDED"]:
logger.info(f"Non-triggering Batch status: STATUS: {event['detail']['status']}")
return

try:
get_and_put_logs()
job_id = event["detail"]["jobId"]
get_and_put_logs(job_id)
except Exception as e:
logger.error(json.dumps(f"Error: {str(e)}"))
raise e


def get_and_put_logs():
# Get most recent stream
def get_and_put_logs(job_id):
# Get most recent log stream
log_streams = logs_client.describe_log_streams(
logGroupName=log_group_name,
orderBy="LastEventTime",
Expand All @@ -55,17 +54,24 @@ def get_and_put_logs():
startFromHead=True,
)

log_file_name = f"{job_id}.log"

log_file = "\n".join(
[f"{e['timestamp']}, {e['message']}" for e in log_events["events"]]
)
file_identifier = str(random.randint(10**7, 10**8 - 1))

s3_client.upload_fileobj(
log_file,
os.environ["PRODUCER_BUCKET_NAME"],
f"{os.environ['LANGUAGE_NAME']}/{file_identifier}",
# Put logs to cross-account bucket
s3_client.put_object(
Body=log_file,
Bucket=os.environ["PRODUCER_BUCKET_NAME"],
Key=f"{os.environ['LANGUAGE_NAME']}/{log_file_name}",
)

# Back up logs to local bucket
s3_client.put_object(
Body=log_file, Bucket=os.environ["BUCKET_NAME"], Key=f"{log_file_name}"
)

logger.info(
f"Log data saved successfully: {os.environ['LANGUAGE_NAME']}/{file_identifier}"
f"Log data saved successfully: {os.environ['LANGUAGE_NAME']}/{log_file_name}"
)
1 change: 1 addition & 0 deletions .tools/test/sqs_lambda_to_batch_fargate/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ constructs>=10.0.0
types-boto3
types-setuptools
aws_cdk.aws_batch_alpha
pyyaml