Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial checkin - Step Function cross-execution concurrency control pattern #49

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sam/app-concurrency-control/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
samconfig.toml
.aws-sam
.DS_Store
*.code-workspace
22 changes: 22 additions & 0 deletions sam/app-concurrency-control/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
## Description

This example demonstrate the implementation of cross-execution concurrency control for AWS Step Function workflows, by utlizing the listExecutions() API (https://docs.aws.amazon.com/step-functions/latest/apireference/API_ListExecutions.html).

Within a single flow, one can utilize Map or Distributed Map state to control how many concurrent flows can be launched within the same execution. However, there are use cases where one may want to limit the number of concurrent executions of the same workflow, for example, due to downstream API limitation or tasks that requires human intervention.

## Implementation

### Concurrency Controller function:

The concurrency controller Lambda function will check, for a given SFN ARN, the current number of executions using the listExecutions API. It then compares that against a preset concurrency threshold, a static value stored in SSM Parameter Store (for simplicity), and return a “proceed” or “wait” flag

### Other considrations

* Single SAM template is used to create all resources
* Test runner: Lambda function that generates test messages to SQS (e.g., 1k - 10k)
* SQS provides trigger for Concurrency controller Lambda function, with batch size of 1 and maximum concurrency set to 2
* A random delay up to 1 sec (jitter) is added when listExecutions is called to avoid racing condition
* Concurrency Threshold set to 10 in SSM Param Store
* listExecution() API call is eventual consistency and results are best effort (no SLA) → Concurrency can exceed threshold value on occasion
* Concurrency can be tracked using CloudWatch Log Insight

Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import time
import random
import boto3
import os
from botocore.config import Config

config = Config(
retries = {
'max_attempts': 10,
'mode': 'standard'
}
)

try:
#get parameter from system manager
ssm = boto3.client('ssm')
concurrency_limit_threshold = int(ssm.get_parameter(Name='concurrencyLimit', WithDecryption=True)['Parameter']['Value'])

except:
print('Failed to get failure threshold from SSM')


def lambda_handler(event, context):
#list step function executions
region = context.invoked_function_arn.split(":")[3]
account_id = context.invoked_function_arn.split(":")[4]
arn = 'arn:aws:states:'+ str(region) + ':' + str(account_id) + ':stateMachine:CC-WorkStateMachine'
print('stepfunction arn:' + str(arn))
stepfunctions = boto3.client('stepfunctions', config=config)
records = event['Records']
for record in records:
#wait a random amount of time before invoking step function
time.sleep(random.randint(1,10)*0.1)
executions = stepfunctions.list_executions(stateMachineArn=arn, statusFilter='RUNNING')
#get number of executions
execution_count = len(executions['executions'])
print('current execution count:' + str(execution_count))
print('concurrency limit threshold:' + str(concurrency_limit_threshold))

# Throw and exception if the random number is larger than the specified threshold
if execution_count >= concurrency_limit_threshold:
raise Exception('Concurrent workflow reaching limit!')
else:
#invoke step function
print('Processing ' + str(record["body"]))
stepfunctions.start_execution(stateMachineArn=arn)
Empty file.
8 changes: 8 additions & 0 deletions sam/app-concurrency-control/functions/do_work_function/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import time
import random

def lambda_handler(event, context):
time.sleep(10)
return 1
Empty file.
Empty file.
14 changes: 14 additions & 0 deletions sam/app-concurrency-control/functions/test_runner/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import time
import random
import boto3
import os

def lambda_handler(event, context):
sqs = boto3.resource('sqs')
#insert into sqs queue
queue = sqs.get_queue_by_name(QueueName='CC-Test-Queue')
#while loop to send messages
for i in range(100):
queue.send_message(MessageBody='test message ' + str(i))
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
{
"Comment": "Check current number of execution. Proceed if it's lower than threshold",
"StartAt": "Check current running tasks",
"States": {
"Check current running tasks": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"Payload.$": "$",
"FunctionName": "${LambdaConcurrencyController}"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Next": "Continue or wait",
"ResultPath": "$.flag"
},
"Continue or wait": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.flag.Payload",
"StringEquals": "proceed",
"Next": "Do some work"
},
{
"Variable": "$.flag.Payload",
"StringEquals": "wait",
"Next": "Wait"
}
]
},
"Do some work": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "${LambdaDoWorkFunction}"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"End": true
},
"Wait": {
"Type": "Wait",
"Seconds": 5,
"Next": "Check current running tasks"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
{
"Comment": "A description of my state machine",
"StartAt": "Check current running executions",
"States": {
"Check current running executions": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.flag",
"Parameters": {
"Payload.$": "$",
"FunctionName": "${LambdaConcurrencyController}"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Next": "Over limit?"
},
"Over limit?": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.flag.Payload.",
"StringEquals": "proceed",
"Next": "Step Functions StartExecution"
},
{
"Variable": "$.flag.Payload",
"StringEquals": "wait",
"Next": "Fail"
}
]
},
"Step Functions StartExecution": {
"Type": "Task",
"Resource": "arn:aws:states:::states:startExecution",
"Parameters": {
"StateMachineArn": "${DoSomeWorkFlow}",
"Input": {
"StatePayload": "Hello from Step Functions!",
"AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id"
}
},
"End": true
},
"Fail": {
"Type": "Fail",
"Error": "\"Reaching maximum allowed executions\""
}
}
}
29 changes: 29 additions & 0 deletions sam/app-concurrency-control/statemachines/do-some-work.asl.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"Comment": "Check current number of execution. Proceed if it's lower than threshold",
"StartAt": "Do some work",
"States": {
"Do some work": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "${LambdaDoWorkFunction}"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"End": true
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
{
"Comment": "A state machine that will react to completion events and clean up orphaned locks",
"StartAt": "Get Current Lock Item",
"States": {
"Get Current Lock Item": {
"Comment": "Get info from DDB for the lock item to look and see if this specific owner is still holding a lock",
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:getItem",
"Parameters": {
"TableName": "${TableSemaphore}",
"ExpressionAttributeNames": {
"#lockownerid.$": "$.detail.executionArn"
},
"Key": {
"LockName": {
"S": "${LockName}"
}
},
"ProjectionExpression": "#lockownerid"
},
"Retry": [
{
"ErrorEquals": [ "States.ALL" ],
"IntervalSeconds": 5,
"MaxAttempts": 20,
"BackoffRate": 1.4
} ],
"ResultSelector": {
"Item.$": "$.Item",
"ItemString.$": "States.JsonToString($.Item)"
},
"ResultPath": "$.lockinfo.currentlockitem",
"Next": "Check If Lock Is Held"
},
"Check If Lock Is Held": {
"Comment": "This state checks to see if the execution in question holds a lock. It can tell that by looking for Z, which will be indicative of the timestamp value. That will only be there in the stringified version of the data returned from DDB if this execution holds a lock",
"Type": "Choice",
"Choices": [
{
"And": [
{
"Variable": "$.lockinfo.currentlockitem.ItemString",
"IsPresent": true
},
{
"Variable": "$.lockinfo.currentlockitem.ItemString",
"StringMatches": "*Z*"
}
],
"Next": "Clean Up Lock"
}
],
"Default": "Success State"
},
"Clean Up Lock": {
"Comment": "If this lockownerid is still there, then clean it up and release the lock",
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": "${TableSemaphore}",
"Key": {
"LockName": {
"S": "${LockName}"
}
},
"ExpressionAttributeNames": {
"#currentlockcount": "currentlockcount",
"#lockownerid.$": "$.detail.executionArn"
},
"ExpressionAttributeValues": {
":decrease": {
"N": "1"
}
},
"UpdateExpression": "SET #currentlockcount = #currentlockcount - :decrease REMOVE #lockownerid",
"ConditionExpression": "attribute_exists(#lockownerid)",
"ReturnValues": "UPDATED_NEW"
},
"Retry": [
{
"ErrorEquals": [ "DynamoDB.ConditionalCheckFailedException" ],
"MaxAttempts": 0
},
{
"ErrorEquals": [ "States.ALL" ],
"IntervalSeconds": 5,
"MaxAttempts": 20,
"BackoffRate": 1.4
} ],
"Catch": [
{
"ErrorEquals": [
"DynamoDB.ConditionalCheckFailedException"
],
"Next": "Success State",
"ResultPath": null
}
],
"ResultPath": null,
"Next": "Success State"
},
"Success State": {
"Type": "Succeed"
}
}

}
Loading