I am having a lot of issues handling concurrent runs of a StateMachine (Step Function) that does have a GlueJob task in it.
The state machine is initiated by a Lambda that gets trigger by a FIFO SQS queue.
The lambda gets the message, checks how many of state machine instances are running and if this number is below the GlueJob concurrent runs threshold, it starts the State Machine.
The problem I am having is that this check fails most of the time. The state machine starts although there is not enough concurrency available for my GlueJob. Obviously, the message the SQS queue passes to lambda gets processed, so if the state machine fails for this reason, that message is gone forever (unless I catch the exception and send back a new message to the queue).
I believe this behavior is due to the speed messages gets processed by my lambda (although it's a FIFO queue, so 1 message at a time), and the fact that my checker cannot keep up.
I have implemented some time.sleep() here and there to see if things get better, but no substantial improvement.
I would like to ask you if you have ever had issues like this one and how you got them programmatically solved.
Thanks in advance!
This is my checker:
def get_running_workflows(sf_client, sfn_arn, cnt=0, next_token=None):
if next_token:
response = sf_client.list_executions(stateMachineArn=sfn_arn, statusFilter='RUNNING', nextToken=next_token)
else:
response = sf_client.list_executions(stateMachineArn=sfn_arn, statusFilter='RUNNING')
cnt += len(response['executions'])
if cnt > 0 and 'nextToken' in response:
return get_running_workflows(cnt, response['nextToken'])
return cnt
EDIT 24-01-2022:
Here's the implementation based @Mark-B answer.
I've created a DynamoDB table as follows:
resources:
Resources:
concurrencyHandler:
Type: AWS::DynamoDB::Table
Properties:
TableName: concurrencyHandler
AttributeDefinitions:
- AttributeName: concurrencyProcess
AttributeType: S
KeySchema:
- AttributeName: concurrencyProcess
KeyType: HASH
BillingMode: PAY_PER_REQUEST
Primary Key is used to store the process name that I am handling the concurrency for. This helps if I want to reuse the same table to handle concurrency for other processes.
Then, in the lambda that is consuming the messages from SQS and triggers my Step Function, I have written an atomic counter with a conditional check as follow:
try:
response = concurrency_table.update_item(
Key={'concurrencyProcess': processName},
ExpressionAttributeNames={
'#C': 'concurrencyCount',
'#O': step_function_name,
},
ExpressionAttributeValues={
':start': 0,
':inc': 1,
':limit': int(glue_max_concurrent_runs),
':time': datetime.datetime.utcnow().isoformat()
},
UpdateExpression="SET #C = if_not_exists(#C, :start) + :inc, #O = :time",
ConditionExpression="#C < :limit AND attribute_not_exists(#O)",
ReturnValues="UPDATED_NEW"
)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] != 'ConditionalCheckFailedException':
raise
else:
raise Exception("Either max Concurrency number reached or "
"Stepfunction is requesting second lock")
Any time a new message arrives, this code will increment the counter. If the counter is >= than limit, it will fail. So the message will be sent back to the queue and processed again later.
Obviously, I have set up a clean-up, final step in my StepFunction workflow, one if the workflow is succesful, one if the workflow should fail for any reason. Here's the .yaml:
UpdateConcurrencyCountSuccess:
Type: Task
Resource: arn:aws:states:::dynamodb:updateItem
Parameters:
TableName: ${self:custom.concurrencyTable}
Key:
concurrencyProcess:
S: {processName}
ExpressionAttributeNames:
'#C': concurrencyCount
'#O.$': $$.Execution.Name
ExpressionAttributeValues:
:dec:
N: '1'
UpdateExpression: "SET #C = #C - :dec REMOVE #O"
ConditionExpression: attribute_exists(#O)
ReturnValues: UPDATED_NEW
Retry:
- ErrorEquals:
- DynamoDB.ConditionalCheckFailedException
MaxAttempts: 0
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 20
BackoffRate: 1.4
ResultPath: $.update_concurrency_success
End: true
Catch:
- ErrorEquals:
- States.ALL
Next: PublishToSNSFailed
ResultPath: $.error
# Update Concurrency Count table, failed flow
UpdateConcurrencyCountFail:
Type: Task
Resource: arn:aws:states:::dynamodb:updateItem
Parameters:
TableName: ${self:custom.concurrencyTable}
Key:
concurrencyProcess:
S: {processName}
ExpressionAttributeNames:
'#C': concurrencyCount
'#O.$': $$.Execution.Name
ExpressionAttributeValues:
:dec:
N: '1'
UpdateExpression: "SET #C = #C - :dec REMOVE #O"
ConditionExpression: attribute_exists(#O)
ReturnValues: UPDATED_NEW
Retry:
- ErrorEquals:
- DynamoDB.ConditionalCheckFailedException
MaxAttempts: 0
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 20
BackoffRate: 1.4
ResultPath: $.update_concurrency_failed
Next: ExecutionFailed
Catch:
- ErrorEquals:
- States.ALL
Next: ExecutionFailed
ResultPath: $.error
Note how the attribute #O
in DynamoDB is the Step Function Name, and takes as value the starting-execution-time. The name of the step function execution is generated inside my trigger-lambda (it is univocal). Within my workflow I can access the name via $$Execution.Name
so to remove that attribute in the clean up process.
Having the step function univocal name as attribute can help debug in case (although very remote) an execution starts but fails to decrease the counter.
EDIT 02-02-2022:
After stress-testing this solution, I've come to the conclusion that it fails to keep the count of state machines running: when the max is set to 10, somehow an 11th lock is given.
On top of, I must say, Glue having some sort of invisible "state" after "SUCCESS" which was leading to less concurrency than locks available (for instance: 0 glue concurrency available, but 2/3 free locks).
Hopefully AWS can implement some concurrency control and a direct trigger from SQS to StepFunction
You are going to run into problems with this approach because the call to start a new flow may not immediately cause the list_executions()
to show a new number. There may be some seconds between requesting that a new workflow start, and the workflow actually starting. As far as I'm aware there are no strong consistency guarantees for the list_executions()
API call.
You need something that is strongly consistent, and DynamoDB atomic counters is a great solution for this problem. Amazon published a blog post detailing the use of DynamoDB for this exact scenario. The gist is that you would attempt to increment an atomic counter in DynamoDB, with a limit
expression that causes the increment to fail if it would cause the counter to go above a certain value. Catching that failure/exception is how your Lambda function knows to send the message back to the queue. Then at the end of the workflow you call another Lambda function to decrement the counter.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With