Skip to content
This repository has been archived by the owner on Aug 14, 2024. It is now read-only.

Commit

Permalink
fix integration test assertions to accommodate recent CloudWatch metr…
Browse files Browse the repository at this point in the history
…ics changes
  • Loading branch information
whummer committed May 15, 2021
1 parent c236bf2 commit 57172fd
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 30 deletions.
5 changes: 5 additions & 0 deletions tests/integration/lambdas/lambda_integration.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import json
import base64
import logging
Expand All @@ -15,6 +16,10 @@
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)

# TODO: should be injected by Lambda executor! (Doesn't seem to be the case for default python executor?)
os.environ['AWS_ACCESS_KEY_ID'] = os.environ.get('AWS_ACCESS_KEY_ID') or 'test'
os.environ['AWS_SECRET_ACCESS_KEY'] = os.environ.get('AWS_SECRET_ACCESS_KEY') or 'test'


# Subclass of boto's TypeDeserializer for DynamoDB
# to adjust for DynamoDB Stream format.
Expand Down
57 changes: 27 additions & 30 deletions tests/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
TEST_LAMBDA_SOURCE_STREAM_NAME = 'test_source_stream'
TEST_TABLE_NAME = 'test_stream_table'
TEST_LAMBDA_NAME_DDB = 'test_lambda_ddb'
TEST_LAMBDA_NAME_STREAM = 'test_lambda_stream'
TEST_LAMBDA_NAME_QUEUE = 'test_lambda_queue'
TEST_FIREHOSE_NAME = 'test_firehose'
TEST_BUCKET_NAME = lambda_integration.TEST_BUCKET_NAME
Expand Down Expand Up @@ -157,6 +156,7 @@ def create_kinesis_stream(name, delete=False):
ddb_lease_table_suffix = '-kclapp'
table_name = TEST_TABLE_NAME + 'klsdss' + ddb_lease_table_suffix
stream_name = TEST_STREAM_NAME
lambda_stream_name = 'lambda-stream-%s' % short_uid()
dynamodb = aws_stack.connect_to_resource('dynamodb')
dynamodb_service = aws_stack.connect_to_service('dynamodb')
dynamodbstreams = aws_stack.connect_to_service('dynamodbstreams')
Expand Down Expand Up @@ -197,23 +197,23 @@ def process_records(records, shard_id):

# deploy test lambda connected to DynamoDB Stream
zip_file = testutil.create_lambda_archive(load_file(TEST_LAMBDA_PYTHON), get_content=True,
libs=TEST_LAMBDA_LIBS, runtime=LAMBDA_RUNTIME_PYTHON27)
libs=TEST_LAMBDA_LIBS)
testutil.create_lambda_function(func_name=TEST_LAMBDA_NAME_DDB,
zip_file=zip_file, event_source_arn=ddb_event_source_arn, runtime=LAMBDA_RUNTIME_PYTHON27, delete=True)
zip_file=zip_file, event_source_arn=ddb_event_source_arn, delete=True)
# make sure we cannot create Lambda with same name twice
assert_raises(Exception, testutil.create_lambda_function, func_name=TEST_LAMBDA_NAME_DDB,
zip_file=zip_file, event_source_arn=ddb_event_source_arn, runtime=LAMBDA_RUNTIME_PYTHON27)
zip_file=zip_file, event_source_arn=ddb_event_source_arn)

# deploy test lambda connected to Kinesis Stream
kinesis_event_source_arn = kinesis.describe_stream(
StreamName=TEST_LAMBDA_SOURCE_STREAM_NAME)['StreamDescription']['StreamARN']
testutil.create_lambda_function(func_name=TEST_LAMBDA_NAME_STREAM,
zip_file=zip_file, event_source_arn=kinesis_event_source_arn, runtime=LAMBDA_RUNTIME_PYTHON27)
testutil.create_lambda_function(func_name=lambda_stream_name,
zip_file=zip_file, event_source_arn=kinesis_event_source_arn)

# deploy test lambda connected to SQS queue
sqs_queue_info = testutil.create_sqs_queue(TEST_LAMBDA_NAME_QUEUE)
testutil.create_lambda_function(func_name=TEST_LAMBDA_NAME_QUEUE,
zip_file=zip_file, event_source_arn=sqs_queue_info['QueueArn'], runtime=LAMBDA_RUNTIME_PYTHON27)
zip_file=zip_file, event_source_arn=sqs_queue_info['QueueArn'])

# set number of items to update/put to table
num_events_ddb = 15
Expand Down Expand Up @@ -252,27 +252,30 @@ def process_records(records, shard_id):
}})

# put items to stream
num_events_kinesis = 10
LOGGER.info('Putting %s items to stream...' % num_events_kinesis)
num_events_kinesis = 1
num_kinesis_records = 10
LOGGER.info('Putting %s records in %s event to stream...' % (num_kinesis_records, num_events_kinesis))
kinesis.put_records(
Records=[
{
'Data': '{}',
'PartitionKey': 'testId%s' % i
} for i in range(0, num_events_kinesis)
} for i in range(0, num_kinesis_records)
], StreamName=TEST_LAMBDA_SOURCE_STREAM_NAME
)

# put 1 item to stream that will trigger an error in the Lambda
kinesis.put_record(Data='{"%s": 1}' % lambda_integration.MSG_BODY_RAISE_ERROR_FLAG,
PartitionKey='testIdError', StreamName=TEST_LAMBDA_SOURCE_STREAM_NAME)
num_events_kinesis_err = 1
for i in range(num_events_kinesis_err):
kinesis.put_record(Data='{"%s": 1}' % lambda_integration.MSG_BODY_RAISE_ERROR_FLAG,
PartitionKey='testIdError', StreamName=TEST_LAMBDA_SOURCE_STREAM_NAME)

# create SNS topic, connect it to the Lambda, publish test messages
num_events_sns = 3
response = sns.create_topic(Name=TEST_TOPIC_NAME)
sns.subscribe(TopicArn=response['TopicArn'], Protocol='lambda',
Endpoint=aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_STREAM))
for i in range(0, num_events_sns):
Endpoint=aws_stack.lambda_function_arn(lambda_stream_name))
for i in range(num_events_sns):
sns.publish(TopicArn=response['TopicArn'], Subject='test_subject', Message='test message %s' % i)

# get latest records
Expand All @@ -289,7 +292,7 @@ def process_records(records, shard_id):
time.sleep(2)

num_events_lambda = num_events_ddb + num_events_sns + num_events_sqs
num_events = num_events_lambda + num_events_kinesis
num_events = num_events_lambda + num_kinesis_records

def check_events():
if len(events) != num_events:
Expand All @@ -308,22 +311,16 @@ def check_events():

# check cloudwatch notifications
def check_cw_invocations():
num_invocations = get_lambda_invocations_count(TEST_LAMBDA_NAME_STREAM)
# TODO: It seems that CloudWatch is currently reporting an incorrect number of
# invocations, namely the sum over *all* lambdas, not the single one we're asking for.
# Also, we need to bear in mind that Kinesis may perform batch updates, i.e., a single
# Lambda invocation may happen with a set of Kinesis records, hence we cannot simply
# add num_events_ddb to num_events_lambda above!
# self.assertEqual(num_invocations, 2 + num_events_lambda)
self.assertGreater(num_invocations, num_events_sns + num_events_sqs)
num_error_invocations = get_lambda_invocations_count(TEST_LAMBDA_NAME_STREAM, 'Errors', 15)
self.assertEqual(num_error_invocations, 1)
num_invocations = get_lambda_invocations_count(lambda_stream_name)
self.assertEqual(num_invocations, num_events_kinesis + num_events_kinesis_err + num_events_sns)
num_error_invocations = get_lambda_invocations_count(lambda_stream_name, 'Errors')
self.assertEqual(num_error_invocations, num_events_kinesis_err)

# Lambda invocations are running asynchronously, hence sleep some time here to wait for results
retry(check_cw_invocations, retries=5, sleep=2)

# clean up
testutil.delete_lambda_function(TEST_LAMBDA_NAME_STREAM)
testutil.delete_lambda_function(lambda_stream_name)
testutil.delete_lambda_function(TEST_LAMBDA_NAME_DDB)
testutil.delete_lambda_function(TEST_LAMBDA_NAME_QUEUE)
sqs.delete_queue(QueueUrl=sqs_queue_info['QueueUrl'])
Expand Down Expand Up @@ -626,18 +623,18 @@ def get_event_source_arn(stream_name):
return kinesis.describe_stream(StreamName=stream_name)['StreamDescription']['StreamARN']


def get_lambda_invocations_count(lambda_name, metric=None, period=60, start_time=None, end_time=None):
def get_lambda_invocations_count(lambda_name, metric=None, period=None, start_time=None, end_time=None):
metric = get_lambda_metrics(lambda_name, metric, period, start_time, end_time)
if not metric['Datapoints']:
return 0
return metric['Datapoints'][-1]['Sum']


def get_lambda_metrics(func_name, metric=None, period=60, start_time=None, end_time=None):
def get_lambda_metrics(func_name, metric=None, period=None, start_time=None, end_time=None):
metric = metric or 'Invocations'
cloudwatch = aws_stack.connect_to_service('cloudwatch')
if end_time is None:
end_time = datetime.now()
period = period or 600
end_time = end_time or datetime.now()
if start_time is None:
start_time = end_time - timedelta(seconds=period)
return cloudwatch.get_metric_statistics(
Expand Down

0 comments on commit 57172fd

Please sign in to comment.