Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

AWS Checking StateMachines/StepFunctions concurrent runs

I am having a lot of issues handling concurrent runs of a StateMachine (Step Function) that does have a GlueJob task in it.

The state machine is initiated by a Lambda that gets trigger by a FIFO SQS queue.

The lambda gets the message, checks how many of state machine instances are running and if this number is below the GlueJob concurrent runs threshold, it starts the State Machine.

The problem I am having is that this check fails most of the time. The state machine starts although there is not enough concurrency available for my GlueJob. Obviously, the message the SQS queue passes to lambda gets processed, so if the state machine fails for this reason, that message is gone forever (unless I catch the exception and send back a new message to the queue).

I believe this behavior is due to the speed messages gets processed by my lambda (although it's a FIFO queue, so 1 message at a time), and the fact that my checker cannot keep up.

I have implemented some time.sleep() here and there to see if things get better, but no substantial improvement.

I would like to ask you if you have ever had issues like this one and how you got them programmatically solved.

Thanks in advance!

This is my checker:

def get_running_workflows(sf_client, sfn_arn, cnt=0, next_token=None):

    if next_token:
        response = sf_client.list_executions(stateMachineArn=sfn_arn, statusFilter='RUNNING', nextToken=next_token)
    else:
        response = sf_client.list_executions(stateMachineArn=sfn_arn, statusFilter='RUNNING')

    cnt += len(response['executions'])

    if cnt > 0 and 'nextToken' in response:
        return get_running_workflows(cnt, response['nextToken'])

    return cnt

EDIT 24-01-2022:

Here's the implementation based @Mark-B answer.

I've created a DynamoDB table as follows:

resources:
  Resources:
    concurrencyHandler:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: concurrencyHandler
        AttributeDefinitions:
          - AttributeName: concurrencyProcess
            AttributeType: S
        KeySchema:
          - AttributeName: concurrencyProcess
            KeyType: HASH
        BillingMode: PAY_PER_REQUEST

Primary Key is used to store the process name that I am handling the concurrency for. This helps if I want to reuse the same table to handle concurrency for other processes.

Then, in the lambda that is consuming the messages from SQS and triggers my Step Function, I have written an atomic counter with a conditional check as follow:

    try:
        response = concurrency_table.update_item(
            Key={'concurrencyProcess': processName},
            ExpressionAttributeNames={
                '#C': 'concurrencyCount',
                '#O': step_function_name,
            },
            ExpressionAttributeValues={
                ':start': 0,
                ':inc': 1,
                ':limit': int(glue_max_concurrent_runs),
                ':time': datetime.datetime.utcnow().isoformat()
            },
            UpdateExpression="SET #C = if_not_exists(#C, :start) + :inc, #O = :time",
            ConditionExpression="#C < :limit AND attribute_not_exists(#O)",
            ReturnValues="UPDATED_NEW"
        )

    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] != 'ConditionalCheckFailedException':
            raise
        else:
            raise Exception("Either max Concurrency number reached or "
                            "Stepfunction is requesting second lock")

Any time a new message arrives, this code will increment the counter. If the counter is >= than limit, it will fail. So the message will be sent back to the queue and processed again later.

Obviously, I have set up a clean-up, final step in my StepFunction workflow, one if the workflow is succesful, one if the workflow should fail for any reason. Here's the .yaml:

  UpdateConcurrencyCountSuccess:
    Type: Task
    Resource: arn:aws:states:::dynamodb:updateItem
    Parameters:
      TableName: ${self:custom.concurrencyTable}
      Key:
        concurrencyProcess:
          S: {processName}
      ExpressionAttributeNames:
        '#C': concurrencyCount
        '#O.$': $$.Execution.Name
      ExpressionAttributeValues:
        :dec:
          N: '1'
      UpdateExpression: "SET #C = #C - :dec REMOVE #O"
      ConditionExpression: attribute_exists(#O)
      ReturnValues: UPDATED_NEW
    Retry:
      - ErrorEquals:
          - DynamoDB.ConditionalCheckFailedException
        MaxAttempts: 0
      - ErrorEquals:
          - States.ALL
        IntervalSeconds: 5
        MaxAttempts: 20
        BackoffRate: 1.4
    ResultPath: $.update_concurrency_success 
    End: true
    Catch:
      - ErrorEquals:
          - States.ALL
        Next: PublishToSNSFailed
        ResultPath: $.error

  # Update Concurrency Count table, failed flow
  UpdateConcurrencyCountFail:
    Type: Task
    Resource: arn:aws:states:::dynamodb:updateItem
    Parameters:
      TableName: ${self:custom.concurrencyTable}
      Key:
        concurrencyProcess:
          S: {processName}
      ExpressionAttributeNames:
        '#C': concurrencyCount
        '#O.$': $$.Execution.Name
      ExpressionAttributeValues:
        :dec:
          N: '1'
      UpdateExpression: "SET #C = #C - :dec REMOVE #O"
      ConditionExpression: attribute_exists(#O)
      ReturnValues: UPDATED_NEW
    Retry:
      - ErrorEquals:
          - DynamoDB.ConditionalCheckFailedException
        MaxAttempts: 0
      - ErrorEquals:
          - States.ALL
        IntervalSeconds: 5
        MaxAttempts: 20
        BackoffRate: 1.4
    ResultPath: $.update_concurrency_failed     
    Next: ExecutionFailed
    Catch:
      - ErrorEquals:
          - States.ALL
        Next: ExecutionFailed
        ResultPath: $.error

Note how the attribute #O in DynamoDB is the Step Function Name, and takes as value the starting-execution-time. The name of the step function execution is generated inside my trigger-lambda (it is univocal). Within my workflow I can access the name via $$Execution.Name so to remove that attribute in the clean up process.

Having the step function univocal name as attribute can help debug in case (although very remote) an execution starts but fails to decrease the counter.

EDIT 02-02-2022:

After stress-testing this solution, I've come to the conclusion that it fails to keep the count of state machines running: when the max is set to 10, somehow an 11th lock is given.

On top of, I must say, Glue having some sort of invisible "state" after "SUCCESS" which was leading to less concurrency than locks available (for instance: 0 glue concurrency available, but 2/3 free locks).

Hopefully AWS can implement some concurrency control and a direct trigger from SQS to StepFunction

like image 815
E. Faslo Avatar asked Sep 06 '25 03:09

E. Faslo


1 Answers

You are going to run into problems with this approach because the call to start a new flow may not immediately cause the list_executions() to show a new number. There may be some seconds between requesting that a new workflow start, and the workflow actually starting. As far as I'm aware there are no strong consistency guarantees for the list_executions() API call.

You need something that is strongly consistent, and DynamoDB atomic counters is a great solution for this problem. Amazon published a blog post detailing the use of DynamoDB for this exact scenario. The gist is that you would attempt to increment an atomic counter in DynamoDB, with a limit expression that causes the increment to fail if it would cause the counter to go above a certain value. Catching that failure/exception is how your Lambda function knows to send the message back to the queue. Then at the end of the workflow you call another Lambda function to decrement the counter.

like image 56
Mark B Avatar answered Sep 07 '25 21:09

Mark B