From fac2bd5872b11b151ddf1836fa4afab3ef9da7f9 Mon Sep 17 00:00:00 2001 From: Jun Guo Date: Mon, 28 Aug 2023 17:10:50 -0600 Subject: [PATCH 1/4] Initial checkin --- sam/app-concurrency-control/.gitignore | 4 + sam/app-concurrency-control/README.md | 22 ++ .../concurrency_controller/__init__.py | 0 .../functions/concurrency_controller/app.py | 48 ++++ .../concurrency_controller/requirements.txt | 0 .../functions/do_work_function/__init__.py | 0 .../functions/do_work_function/app.py | 8 + .../do_work_function/requirements.txt | 0 .../functions/test_runner/__init__.py | 0 .../functions/test_runner/app.py | 14 + .../functions/test_runner/requirements.txt | 0 .../concurrency-control-flow-old.asl.json | 72 +++++ .../concurrency-control-flow.asl.json | 60 +++++ .../statemachines/do-some-work.asl.json | 29 ++ ...odb-semaphore-cleanfromincomplete.asl.json | 107 ++++++++ .../statemachines/dynamodb-semaphore.asl.json | 252 ++++++++++++++++++ .../statemachines/test-run-semaphore.asl.json | 71 +++++ sam/app-concurrency-control/template.yaml | 193 ++++++++++++++ 18 files changed, 880 insertions(+) create mode 100644 sam/app-concurrency-control/.gitignore create mode 100644 sam/app-concurrency-control/README.md create mode 100644 sam/app-concurrency-control/functions/concurrency_controller/__init__.py create mode 100644 sam/app-concurrency-control/functions/concurrency_controller/app.py create mode 100644 sam/app-concurrency-control/functions/concurrency_controller/requirements.txt create mode 100644 sam/app-concurrency-control/functions/do_work_function/__init__.py create mode 100644 sam/app-concurrency-control/functions/do_work_function/app.py create mode 100644 sam/app-concurrency-control/functions/do_work_function/requirements.txt create mode 100644 sam/app-concurrency-control/functions/test_runner/__init__.py create mode 100644 sam/app-concurrency-control/functions/test_runner/app.py create mode 100644 sam/app-concurrency-control/functions/test_runner/requirements.txt create mode 100644 sam/app-concurrency-control/statemachines/concurrency-control-flow-old.asl.json create mode 100644 sam/app-concurrency-control/statemachines/concurrency-control-flow.asl.json create mode 100644 sam/app-concurrency-control/statemachines/do-some-work.asl.json create mode 100644 sam/app-concurrency-control/statemachines/dynamodb-semaphore-cleanfromincomplete.asl.json create mode 100644 sam/app-concurrency-control/statemachines/dynamodb-semaphore.asl.json create mode 100644 sam/app-concurrency-control/statemachines/test-run-semaphore.asl.json create mode 100644 sam/app-concurrency-control/template.yaml diff --git a/sam/app-concurrency-control/.gitignore b/sam/app-concurrency-control/.gitignore new file mode 100644 index 0000000..86ebae9 --- /dev/null +++ b/sam/app-concurrency-control/.gitignore @@ -0,0 +1,4 @@ +samconfig.toml +.aws-sam +.DS_Store +*.code-workspace diff --git a/sam/app-concurrency-control/README.md b/sam/app-concurrency-control/README.md new file mode 100644 index 0000000..0dc1ca9 --- /dev/null +++ b/sam/app-concurrency-control/README.md @@ -0,0 +1,22 @@ +## Description + +This example demonstrate the implementation of cross-execution concurrency control for AWS Step Function workflows, by utlizing the listExecutions() API (https://docs.aws.amazon.com/step-functions/latest/apireference/API_ListExecutions.html). + +Within a single flow, one can utilize Map or Distributed Map state to control how many concurrent flows can be launched within the same execution. However, there are use cases where one may want to limit the number of concurrent executions of the same workflow, for example, due to downstream API limitation or tasks that requires human intervention. + +## Implementation + +### Concurrency Controller function: + +The concurrency controller Lambda function will check, for a given SFN ARN, the current number of executions using the listExecutions API. It then compares that against a preset concurrency threshold, a static value stored in SSM Parameter Store (for simplicity), and return a “proceed” or “wait” flag + +### Other considrations + +* Single SAM template is used to create all resources +* Test runner: Lambda function that generates test messages to SQS (e.g., 1k - 10k) +* SQS provides trigger for Concurrency controller Lambda function, with batch size of 1 and maximum concurrency set to 4 (to avoid ThrottlingException for the API call and racing condition) +* A random delay up to 1 sec (jitter) is introduced when listExecutions is called to avoid racing condition +* Concurrency Threshold set to 10 in SSM Param Store +* listExecution() API call is eventual consistency and results are best effort (no SLA) → Concurrency can exceed threshold value on occasion +* Concurrency can be tracked using CloudWatch Log Insight + diff --git a/sam/app-concurrency-control/functions/concurrency_controller/__init__.py b/sam/app-concurrency-control/functions/concurrency_controller/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sam/app-concurrency-control/functions/concurrency_controller/app.py b/sam/app-concurrency-control/functions/concurrency_controller/app.py new file mode 100644 index 0000000..029a52f --- /dev/null +++ b/sam/app-concurrency-control/functions/concurrency_controller/app.py @@ -0,0 +1,48 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: MIT-0 +import time +import random +import boto3 +import os +from botocore.config import Config + +config = Config( + retries = { + 'max_attempts': 10, + 'mode': 'standard' + } +) + +try: + #get parameter from system manager + ssm = boto3.client('ssm') + concurrency_limit_threshold = int(ssm.get_parameter(Name='concurrencyLimit', WithDecryption=True)['Parameter']['Value']) + +except: + print('Failed to get failure threshold from SSM') + + +def lambda_handler(event, context): + #list step function executions + region = context.invoked_function_arn.split(":")[3] + account_id = context.invoked_function_arn.split(":")[4] + arn = 'arn:aws:states:'+ str(region) + ':' + str(account_id) + ':stateMachine:CC-WorkStateMachine' + print('stepfunction arn:' + str(arn)) + stepfunctions = boto3.client('stepfunctions', config=config) + records = event['Records'] + for record in records: + #wait a random amount of time before invoking step function + time.sleep(random.randint(1,10)*0.1) + executions = stepfunctions.list_executions(stateMachineArn=arn, statusFilter='RUNNING') + #get number of executions + execution_count = len(executions['executions']) + print('current execution count:' + str(execution_count)) + print('concurrency limit threshold:' + str(concurrency_limit_threshold)) + + # Throw and exception if the random number is larger than the specified threshold + if execution_count >= concurrency_limit_threshold: + raise Exception('Concurrent workflow reaching limit!') + else: + #invoke step function + print('Processing ' + str(record["body"])) + stepfunctions.start_execution(stateMachineArn=arn) \ No newline at end of file diff --git a/sam/app-concurrency-control/functions/concurrency_controller/requirements.txt b/sam/app-concurrency-control/functions/concurrency_controller/requirements.txt new file mode 100644 index 0000000..e69de29 diff --git a/sam/app-concurrency-control/functions/do_work_function/__init__.py b/sam/app-concurrency-control/functions/do_work_function/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sam/app-concurrency-control/functions/do_work_function/app.py b/sam/app-concurrency-control/functions/do_work_function/app.py new file mode 100644 index 0000000..3bbc12a --- /dev/null +++ b/sam/app-concurrency-control/functions/do_work_function/app.py @@ -0,0 +1,8 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: MIT-0 +import time +import random + +def lambda_handler(event, context): + time.sleep(10) + return 1 \ No newline at end of file diff --git a/sam/app-concurrency-control/functions/do_work_function/requirements.txt b/sam/app-concurrency-control/functions/do_work_function/requirements.txt new file mode 100644 index 0000000..e69de29 diff --git a/sam/app-concurrency-control/functions/test_runner/__init__.py b/sam/app-concurrency-control/functions/test_runner/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sam/app-concurrency-control/functions/test_runner/app.py b/sam/app-concurrency-control/functions/test_runner/app.py new file mode 100644 index 0000000..e17ab11 --- /dev/null +++ b/sam/app-concurrency-control/functions/test_runner/app.py @@ -0,0 +1,14 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: MIT-0 +import time +import random +import boto3 +import os + +def lambda_handler(event, context): + sqs = boto3.resource('sqs') + #insert into sqs queue + queue = sqs.get_queue_by_name(QueueName='CC-Test-Queue') + #while loop to send messages + for i in range(100): + queue.send_message(MessageBody='test message ' + str(i)) \ No newline at end of file diff --git a/sam/app-concurrency-control/functions/test_runner/requirements.txt b/sam/app-concurrency-control/functions/test_runner/requirements.txt new file mode 100644 index 0000000..e69de29 diff --git a/sam/app-concurrency-control/statemachines/concurrency-control-flow-old.asl.json b/sam/app-concurrency-control/statemachines/concurrency-control-flow-old.asl.json new file mode 100644 index 0000000..d5b10da --- /dev/null +++ b/sam/app-concurrency-control/statemachines/concurrency-control-flow-old.asl.json @@ -0,0 +1,72 @@ +{ + "Comment": "Check current number of execution. Proceed if it's lower than threshold", + "StartAt": "Check current running tasks", + "States": { + "Check current running tasks": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "Payload.$": "$", + "FunctionName": "${LambdaConcurrencyController}" + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 2, + "MaxAttempts": 6, + "BackoffRate": 2 + } + ], + "Next": "Continue or wait", + "ResultPath": "$.flag" + }, + "Continue or wait": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.flag.Payload", + "StringEquals": "proceed", + "Next": "Do some work" + }, + { + "Variable": "$.flag.Payload", + "StringEquals": "wait", + "Next": "Wait" + } + ] + }, + "Do some work": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.Payload", + "Parameters": { + "Payload.$": "$", + "FunctionName": "${LambdaDoWorkFunction}" + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 2, + "MaxAttempts": 6, + "BackoffRate": 2 + } + ], + "End": true + }, + "Wait": { + "Type": "Wait", + "Seconds": 5, + "Next": "Check current running tasks" + } + } + } \ No newline at end of file diff --git a/sam/app-concurrency-control/statemachines/concurrency-control-flow.asl.json b/sam/app-concurrency-control/statemachines/concurrency-control-flow.asl.json new file mode 100644 index 0000000..fade20c --- /dev/null +++ b/sam/app-concurrency-control/statemachines/concurrency-control-flow.asl.json @@ -0,0 +1,60 @@ +{ + "Comment": "A description of my state machine", + "StartAt": "Check current running executions", + "States": { + "Check current running executions": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.flag", + "Parameters": { + "Payload.$": "$", + "FunctionName": "${LambdaConcurrencyController}" + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 2, + "MaxAttempts": 6, + "BackoffRate": 2 + } + ], + "Next": "Over limit?" + }, + "Over limit?": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.flag.Payload.", + "StringEquals": "proceed", + "Next": "Step Functions StartExecution" + }, + { + "Variable": "$.flag.Payload", + "StringEquals": "wait", + "Next": "Fail" + } + ] + }, + "Step Functions StartExecution": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution", + "Parameters": { + "StateMachineArn": "${DoSomeWorkFlow}", + "Input": { + "StatePayload": "Hello from Step Functions!", + "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id" + } + }, + "End": true + }, + "Fail": { + "Type": "Fail", + "Error": "\"Reaching maximum allowed executions\"" + } + } + } diff --git a/sam/app-concurrency-control/statemachines/do-some-work.asl.json b/sam/app-concurrency-control/statemachines/do-some-work.asl.json new file mode 100644 index 0000000..667049c --- /dev/null +++ b/sam/app-concurrency-control/statemachines/do-some-work.asl.json @@ -0,0 +1,29 @@ +{ + "Comment": "Check current number of execution. Proceed if it's lower than threshold", + "StartAt": "Do some work", + "States": { + "Do some work": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.Payload", + "Parameters": { + "Payload.$": "$", + "FunctionName": "${LambdaDoWorkFunction}" + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 2, + "MaxAttempts": 6, + "BackoffRate": 2 + } + ], + "End": true + } + } + } \ No newline at end of file diff --git a/sam/app-concurrency-control/statemachines/dynamodb-semaphore-cleanfromincomplete.asl.json b/sam/app-concurrency-control/statemachines/dynamodb-semaphore-cleanfromincomplete.asl.json new file mode 100644 index 0000000..17c77b9 --- /dev/null +++ b/sam/app-concurrency-control/statemachines/dynamodb-semaphore-cleanfromincomplete.asl.json @@ -0,0 +1,107 @@ +{ + "Comment": "A state machine that will react to completion events and clean up orphaned locks", + "StartAt": "Get Current Lock Item", + "States": { + "Get Current Lock Item": { + "Comment": "Get info from DDB for the lock item to look and see if this specific owner is still holding a lock", + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:getItem", + "Parameters": { + "TableName": "${TableSemaphore}", + "ExpressionAttributeNames": { + "#lockownerid.$": "$.detail.executionArn" + }, + "Key": { + "LockName": { + "S": "${LockName}" + } + }, + "ProjectionExpression": "#lockownerid" + }, + "Retry": [ + { + "ErrorEquals": [ "States.ALL" ], + "IntervalSeconds": 5, + "MaxAttempts": 20, + "BackoffRate": 1.4 + } ], + "ResultSelector": { + "Item.$": "$.Item", + "ItemString.$": "States.JsonToString($.Item)" + }, + "ResultPath": "$.lockinfo.currentlockitem", + "Next": "Check If Lock Is Held" + }, + "Check If Lock Is Held": { + "Comment": "This state checks to see if the execution in question holds a lock. It can tell that by looking for Z, which will be indicative of the timestamp value. That will only be there in the stringified version of the data returned from DDB if this execution holds a lock", + "Type": "Choice", + "Choices": [ + { + "And": [ + { + "Variable": "$.lockinfo.currentlockitem.ItemString", + "IsPresent": true + }, + { + "Variable": "$.lockinfo.currentlockitem.ItemString", + "StringMatches": "*Z*" + } + ], + "Next": "Clean Up Lock" + } + ], + "Default": "Success State" + }, + "Clean Up Lock": { + "Comment": "If this lockownerid is still there, then clean it up and release the lock", + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:updateItem", + "Parameters": { + "TableName": "${TableSemaphore}", + "Key": { + "LockName": { + "S": "${LockName}" + } + }, + "ExpressionAttributeNames": { + "#currentlockcount": "currentlockcount", + "#lockownerid.$": "$.detail.executionArn" + }, + "ExpressionAttributeValues": { + ":decrease": { + "N": "1" + } + }, + "UpdateExpression": "SET #currentlockcount = #currentlockcount - :decrease REMOVE #lockownerid", + "ConditionExpression": "attribute_exists(#lockownerid)", + "ReturnValues": "UPDATED_NEW" + }, + "Retry": [ + { + "ErrorEquals": [ "DynamoDB.ConditionalCheckFailedException" ], + "MaxAttempts": 0 + }, + { + "ErrorEquals": [ "States.ALL" ], + "IntervalSeconds": 5, + "MaxAttempts": 20, + "BackoffRate": 1.4 + } ], + "Catch": [ + { + "ErrorEquals": [ + "DynamoDB.ConditionalCheckFailedException" + ], + "Next": "Success State", + "ResultPath": null + } + ], + "ResultPath": null, + "Next": "Success State" + }, + "Success State": { + "Type": "Succeed" + } + } + + } \ No newline at end of file diff --git a/sam/app-concurrency-control/statemachines/dynamodb-semaphore.asl.json b/sam/app-concurrency-control/statemachines/dynamodb-semaphore.asl.json new file mode 100644 index 0000000..c082367 --- /dev/null +++ b/sam/app-concurrency-control/statemachines/dynamodb-semaphore.asl.json @@ -0,0 +1,252 @@ +{ + "Comment": "A state machine to demonstrate using DynamoDB to implement a semaphore", + "StartAt": "Get Lock", + "States": { + "Get Lock": { + "Comment": "This parallel state contains the logic to acquire a lock and to handle the cases where a lock cannot be Acquired. Containing this in a parallel allows for visual separation when viewing the state machine and makes it easier to reuse this same logic elsewhere if desired. Because this state sets ResultPath: null, it will not manipulate the execution input that is passed on to the subsequent part of your statemachine that is responsible for doing the work.", + "Type": "Parallel", + "Branches": [ + { + "StartAt": "Acquire Lock", + "States": { + "Acquire Lock": { + "Comment": "acquire a lock using a conditional update to DynamoDB. This update will do two things: 1) increment a counter for the number of held locks and 2) add an attribute to the DynamoDB Item with a unique key for this execution and with a value of the time when the lock was Acquired. The Update includes a conditional expression that will fail under two circumstances: 1) if the maximum number of locks have already been distributed or 2) if the current execution already owns a lock. The latter check is important to ensure the same execution doesn't increase the counter more than once. If either of these conditions are not met, then the task will fail with a DynamoDB.ConditionalCheckFailedException error, retry a few times, then if it is still not successful, it will move off to another branch of the workflow. If this is the first time that a given lockname has been used, there will not be a row in DynamoDB, so the update will fail with DynamoDB.AmazonDynamoDBException. In that case, this state sends the workflow to state that will create that row to initialize.", + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:updateItem", + "Parameters": { + "TableName": "${TableSemaphore}", + "Key": { + "LockName": { + "S": "${LockName}" + } + }, + "ExpressionAttributeNames": { + "#currentlockcount": "currentlockcount", + "#lockownerid.$": "$$.Execution.Id" + }, + "ExpressionAttributeValues": { + ":increase": { + "N": "1" + }, + ":limit": { + "N": "${ConcurrentAccessLimit}" + }, + ":lockacquiredtime": { + "S.$": "$$.State.EnteredTime" + } + }, + "UpdateExpression": "SET #currentlockcount = #currentlockcount + :increase, #lockownerid = :lockacquiredtime", + "ConditionExpression": "currentlockcount <> :limit and attribute_not_exists(#lockownerid)", + "ReturnValues": "UPDATED_NEW" + }, + "Retry": [ + { + "ErrorEquals": ["DynamoDB.AmazonDynamoDBException"], + "MaxAttempts": 0 + }, + { + "ErrorEquals": [ "States.ALL" ], + "MaxAttempts": 6, + "BackoffRate": 2 + } + ], + "Catch": [ + { + "ErrorEquals": ["DynamoDB.AmazonDynamoDBException"], + "Next": "Initialize Lock Item", + "ResultPath": "$.lockinfo.acquisitionerror" + }, + { + "ErrorEquals": [ + "DynamoDB.ConditionalCheckFailedException" + ], + "Next": "Get Current Lock Record", + "ResultPath": "$.lockinfo.acquisitionerror" + } + ], + "End": true + }, + "Initialize Lock Item": { + "Comment": "This state handles the case where an item hasn't been created for this lock yet. In that case, it will insert an initial item that includes the lock name as the key and currentlockcount of 0. The Put to DynamoDB includes a conditonal expression to fail if the an item with that key already exists, which avoids a race condition if multiple executions start at the same time. There are other reasons that the previous state could fail and end up here, so this is safe in those cases too.", + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:putItem", + "Parameters": { + "TableName": "${TableSemaphore}", + "Item": { + "LockName": { + "S": "${LockName}" + }, + "currentlockcount": { + "N": "0" + } + }, + "ConditionExpression": "LockName <> :lockname", + "ExpressionAttributeValues": { + ":lockname": {"S": "${LockName}"} + } + }, + "Catch": [ + { + "ErrorEquals": [ "States.ALL" ], + "Next": "Acquire Lock", + "ResultPath": null + } + ], + "Next": "Acquire Lock", + "ResultPath": null + }, + "Get Current Lock Record": { + "Comment": "This state is called when the execution is unable to acquire a lock because there limit has either been exceeded or because this execution already holds a lock. I that case, this task loads info from DDB for the current lock item so that the right decision can be made in subsequent states.", + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:getItem", + "Parameters": { + "TableName": "${TableSemaphore}", + "ExpressionAttributeNames": { + "#lockownerid.$": "$$.Execution.Id" + }, + "Key": { + "LockName": { + "S": "${LockName}" + } + }, + "ProjectionExpression": "#lockownerid" + }, + "ResultSelector": { + "Item.$": "$.Item", + "ItemString.$": "States.JsonToString($.Item)" + }, + "ResultPath": "$.lockinfo.currentlockitem", + "Next": "Check If Lock Already Acquired" + }, + "Check If Lock Already Acquired": { + "Comment": "This state checks to see if the current execution already holds a lock. It can tell that by looking for Z, which will be indicative of the timestamp value. That will only be there in the stringified version of the data returned from DDB if this execution holds a lock.", + "Type": "Choice", + "Choices": [ + { + "And": [ + { + "Variable": "$.lockinfo.currentlockitem.ItemString", + "IsPresent": true + }, + { + "Variable": "$.lockinfo.currentlockitem.ItemString", + "StringMatches": "*Z*" + } + ], + "Next": "Continue Because Lock Was Already Acquired" + } + ], + "Default": "Wait to Get Lock" + }, + "Continue Because Lock Was Already Acquired": { + "Comment": "In this state, we have confimed that lock is already held, so we pass the original execution input into the the function that does the work.", + "Type": "Pass", + "End": true + }, + "Wait to Get Lock": { + "Comment": "If the lock indeed not been succesfully Acquired, then wait for a bit before trying again.", + "Type": "Wait", + "Seconds": 3, + "Next": "Acquire Lock" + } + + + + } + } + ], + "ResultPath": null, + "Next": "Do Work" + }, + "Do Work": { + "Comment": "This is a placeholder for the actual logic of your workflow. By wrapping this in a parallel state, you should be able to paste in any statemachine defined elsewhere. In this case, to illustrate the behavior, this one will run through some pass states and then call a Lambda function that will sleep for a period before it returns.", + "Type": "Parallel", + "Branches": [ + { + "StartAt": "Here", + "States": { + "Here": { + "Type": "Pass", + "Next": "You" + }, + "You": { + "Type": "Pass", + "Next": "Do" + }, + "Do": { + "Type": "Pass", + "Next": "Work" + }, + "Work": { + "Type": "Pass", + "Next": "Run Lambda Function With Controlled Concurrency" + }, + "Run Lambda Function With Controlled Concurrency": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "${LambdaDoWorkFunction}", + "Payload": { + "Input.$": "$" + } + }, + "ResultSelector": { + "payload.$": "$.Payload" + }, + "End": true + } + } + } + ], + "Next": "Release Lock" + }, + "Release Lock": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:updateItem", + "Parameters": { + "TableName": "${TableSemaphore}", + "Key": { + "LockName": { + "S": "${LockName}" + } + }, + "ExpressionAttributeNames": { + "#currentlockcount": "currentlockcount", + "#lockownerid.$": "$$.Execution.Id" + }, + "ExpressionAttributeValues": { + ":decrease": { + "N": "1" + } + }, + "UpdateExpression": "SET #currentlockcount = #currentlockcount - :decrease REMOVE #lockownerid", + "ConditionExpression": "attribute_exists(#lockownerid)", + "ReturnValues": "UPDATED_NEW" + }, + "Retry": [ + { + "ErrorEquals": [ "DynamoDB.ConditionalCheckFailedException" ], + "MaxAttempts": 0 + }, + { + "ErrorEquals": [ "States.ALL" ], + "MaxAttempts": 5, + "BackoffRate": 1.5 + } ], + "Catch": [ + { + "ErrorEquals": [ + "DynamoDB.ConditionalCheckFailedException" + ], + "Next": "Success State", + "ResultPath": null + } + ], + "ResultPath": null, + "Next": "Success State" + }, + "Success State": { + "Type": "Succeed" + } + } + } \ No newline at end of file diff --git a/sam/app-concurrency-control/statemachines/test-run-semaphore.asl.json b/sam/app-concurrency-control/statemachines/test-run-semaphore.asl.json new file mode 100644 index 0000000..57f7ff8 --- /dev/null +++ b/sam/app-concurrency-control/statemachines/test-run-semaphore.asl.json @@ -0,0 +1,71 @@ +{ + "Comment": "A statemachine for running the main loop of tests ", + "StartAt": "GenerateDefaultInput", + "States": { + "GenerateDefaultInput": { + "Type": "Pass", + "Parameters": { + "iterations": [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,36,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,84,95,96,97,98,99,100] + }, + "Next": "StartInParallel" + }, + + "StartInParallel": { + "Type": "Map", + "ItemsPath": "$.iterations", + "MaxConcurrency": 0, + "End": true, + "Iterator": { + "StartAt": "RunChildStateMachine", + "States": { + "RunChildStateMachine": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution", + "Parameters": { + "Input": { + "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id" + }, + "StateMachineArn": "${StateMachineSemaphore}" + }, + "ResultSelector": {"Nothing": "Nothing"}, + "Next": "ClearResults", + "Retry": [ + { + "ErrorEquals": [ + "StepFunctions.ExecutionAlreadyExistsException" + ], + "IntervalSeconds": 1, + "BackoffRate": 5, + "MaxAttempts": 1 + }, + { + "ErrorEquals": [ + "States.ALL" + ], + "IntervalSeconds": 1, + "BackoffRate": 2, + "MaxAttempts": 12 + } + ], + "Catch": [ + { + "ErrorEquals": [ + "States.TaskFailed" + ], + "ResultPath": "$.stateoutput.RunChildStateMachine", + "Next": "ClearResults" + } + ] + }, + "ClearResults": { + "Type": "Pass", + "Result": "Done", + "End": true + } + } + } + + } + + } + } \ No newline at end of file diff --git a/sam/app-concurrency-control/template.yaml b/sam/app-concurrency-control/template.yaml new file mode 100644 index 0000000..50579af --- /dev/null +++ b/sam/app-concurrency-control/template.yaml @@ -0,0 +1,193 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: MIT-0 +AWSTemplateFormatVersion: "2010-09-09" +Transform: AWS::Serverless-2016-10-31 +Description: "Sfn-test-runner: An example app that demonstrates how to provide rate limiting/concurrency control for step function" + +Globals: + Function: + Timeout: 900 + Tags: + auto-delete: "no" +Parameters: + ParameterInstancePrefix: + Type: String + Default: "CC" + Description: "Prefix to be used in names of the things created by this stack." + +Resources: +############### Test Infrastructure ############################################### + # Define a common IAM role to be used for all components of this app + ApplicationRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: "2012-10-17" + Statement: + - + Effect: "Allow" + Principal: + Service: + - "states.amazonaws.com" + - "lambda.amazonaws.com" + Action: + - "sts:AssumeRole" + Policies: + - PolicyName: AppPolicy + PolicyDocument: + Version: 2012-10-17 + Statement: + - + Effect: Allow + Action: + - events:PutTargets + - events:PutRule + - events:DescribeRule + - states:StartExecution + - xray:PutTraceSegments + - xray:PutTelemetryRecords + - xray:GetSamplingRules + - xray:GetSamplingTargets + - logs:CreateLogDelivery + - logs:GetLogDelivery + - logs:UpdateLogDelivery + - logs:DeleteLogDelivery + - logs:ListLogDeliveries + - logs:PutResourcePolicy + - logs:DescribeResourcePolicies + - logs:DescribeLogGroups + - cloudwatch:PutMetricData + Resource: '*' + - + Effect: Allow + Action: + - lambda:InvokeFunction + Resource: '*' + - + Effect: Allow + Action: + - states:ListExecutions + Resource: '*' + - + Effect: Allow + Action: + - sqs:* + Resource: '*' + - + Effect: Allow + Action: + - ssm:Get* + - ssm:Describe* + - ssm:List* + Resource: '*' + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole + + # Main statemachine that runs the tests + StateMachineSempaphore: + Type: AWS::Serverless::StateMachine + Properties: + DefinitionUri: statemachines/do-some-work.asl.json + DefinitionSubstitutions: + LambdaDoWorkFunction: !GetAtt LambdaDoWorkFunction.Arn + Tracing: + Enabled: true + Role: !GetAtt ApplicationRole.Arn + Logging: + Destinations: + - CloudWatchLogsLogGroup: + LogGroupArn: !GetAtt LogGroupStateMachines.Arn + IncludeExecutionData: TRUE + Level: "ALL" + Type: "STANDARD" + Name: !Join ["",[!Ref ParameterInstancePrefix,'-',"WorkStateMachine"]] + Tags: + -key: "auto-delete" + -value: "no" + + # Lambda function that builds up a list of things to run + LambdaDoWorkFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: functions/do_work_function/ + Handler: app.lambda_handler + Runtime: python3.8 + Timeout: 60 + Role: !GetAtt ApplicationRole.Arn + Tags: + -key: "auto-delete" + -value: "no" + + LogGroupStateMachines: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: !Join [ "", ["/aws/states/",!Ref ParameterInstancePrefix,"-StateMachineLogs"]] + # Lambda function that checks if current execution count is under threshold + LambdaConcurrencyController: + Type: AWS::Serverless::Function + Properties: + CodeUri: functions/concurrency_controller/ + Handler: app.lambda_handler + Runtime: python3.8 + Timeout: 900 + Role: !GetAtt ApplicationRole.Arn + Events: + TestQueueEvent: + Type: SQS + Properties: + Queue: !GetAtt TestQueue.Arn + BatchSize: 1 + Enabled: true + ScalingConfig: + MaximumConcurrency: 4 + Tags: + -key: "auto-delete" + -value: "no" + TestRunner: + Type: AWS::Serverless::Function + Properties: + CodeUri: functions/test_runner/ + Handler: app.lambda_handler + Runtime: python3.8 + Timeout: 900 + Role: !GetAtt ApplicationRole.Arn + Tags: + -key: "auto-delete" + -value: "no" + # SSM param for concurrency threshold + SSMParamConcurrencyLimit: + Type: AWS::SSM::Parameter + Properties: + Type: String + Description: "Step Function Concurrency Limit" + Name: "concurrencyLimit" + Value: "10" + Tags: + -key: "auto-delete" + -value: "no" + # test queue + TestQueue: + Type: AWS::SQS::Queue + Properties: + VisibilityTimeout: 900 + QueueName: CC-Test-Queue + RedrivePolicy: + deadLetterTargetArn: !GetAtt CCDeadLetterQueue.Arn + maxReceiveCount: 10 + # DLQ + CCDeadLetterQueue: + Type: AWS::SQS::Queue + Properties: + VisibilityTimeout: 900 + QueueName: CC-DLQ + +Outputs: + StateMachineLogGroup: + Description: "Log group for statemachine logs" + Value: !GetAtt LogGroupStateMachines.Arn + StateMachineMain: + Description: "Main statemachine that is the entry point for this application" + Value: !Ref StateMachineSempaphore + + + From 3bd0c7b393f88f837623e7e39190db426133a436 Mon Sep 17 00:00:00 2001 From: Jun Guo Date: Mon, 28 Aug 2023 22:13:47 -0600 Subject: [PATCH 2/4] change readme --- sam/app-concurrency-control/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sam/app-concurrency-control/README.md b/sam/app-concurrency-control/README.md index 0dc1ca9..e36dbd1 100644 --- a/sam/app-concurrency-control/README.md +++ b/sam/app-concurrency-control/README.md @@ -14,8 +14,8 @@ The concurrency controller Lambda function will check, for a given SFN ARN, the * Single SAM template is used to create all resources * Test runner: Lambda function that generates test messages to SQS (e.g., 1k - 10k) -* SQS provides trigger for Concurrency controller Lambda function, with batch size of 1 and maximum concurrency set to 4 (to avoid ThrottlingException for the API call and racing condition) -* A random delay up to 1 sec (jitter) is introduced when listExecutions is called to avoid racing condition +* SQS provides trigger for Concurrency controller Lambda function, with batch size of 1 and maximum concurrency set to 2 +* A random delay up to 1 sec (jitter) is added when listExecutions is called to avoid racing condition * Concurrency Threshold set to 10 in SSM Param Store * listExecution() API call is eventual consistency and results are best effort (no SLA) → Concurrency can exceed threshold value on occasion * Concurrency can be tracked using CloudWatch Log Insight From 8e3a252403adbc35dded3b9593495c39bb99c322 Mon Sep 17 00:00:00 2001 From: junguo Date: Mon, 28 Aug 2023 22:16:57 -0600 Subject: [PATCH 3/4] Update README.md --- sam/app-concurrency-control/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sam/app-concurrency-control/README.md b/sam/app-concurrency-control/README.md index 0dc1ca9..8d47aac 100644 --- a/sam/app-concurrency-control/README.md +++ b/sam/app-concurrency-control/README.md @@ -14,8 +14,8 @@ The concurrency controller Lambda function will check, for a given SFN ARN, the * Single SAM template is used to create all resources * Test runner: Lambda function that generates test messages to SQS (e.g., 1k - 10k) -* SQS provides trigger for Concurrency controller Lambda function, with batch size of 1 and maximum concurrency set to 4 (to avoid ThrottlingException for the API call and racing condition) -* A random delay up to 1 sec (jitter) is introduced when listExecutions is called to avoid racing condition +* SQS provides trigger for Concurrency controller Lambda function, with batch size of 1 and maximum concurrency set to 2 +* A random delay up to 1 sec (jitter) is added when listExecutions is called to avoid racing condition * Concurrency Threshold set to 10 in SSM Param Store * listExecution() API call is eventual consistency and results are best effort (no SLA) → Concurrency can exceed threshold value on occasion * Concurrency can be tracked using CloudWatch Log Insight From e268eec52bcf132c616f44ed9023664de33dccf3 Mon Sep 17 00:00:00 2001 From: Jun Guo Date: Mon, 28 Aug 2023 22:25:06 -0600 Subject: [PATCH 4/4] change readme --- sam/app-concurrency-control/README.md | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sam/app-concurrency-control/README.md b/sam/app-concurrency-control/README.md index d2cebc3..e36dbd1 100644 --- a/sam/app-concurrency-control/README.md +++ b/sam/app-concurrency-control/README.md @@ -14,11 +14,7 @@ The concurrency controller Lambda function will check, for a given SFN ARN, the * Single SAM template is used to create all resources * Test runner: Lambda function that generates test messages to SQS (e.g., 1k - 10k) -<<<<<<< HEAD * SQS provides trigger for Concurrency controller Lambda function, with batch size of 1 and maximum concurrency set to 2 -======= -* SQS provides trigger for Concurrency controller Lambda function, with batch size of 1 and maximum concurrency set to 2 ->>>>>>> 8e3a252403adbc35dded3b9593495c39bb99c322 * A random delay up to 1 sec (jitter) is added when listExecutions is called to avoid racing condition * Concurrency Threshold set to 10 in SSM Param Store * listExecution() API call is eventual consistency and results are best effort (no SLA) → Concurrency can exceed threshold value on occasion