From c8075d187bf48d1e1f51327738decebcbe28821c Mon Sep 17 00:00:00 2001 From: 4D54 <78043109+4D54@users.noreply.github.com> Date: Thu, 5 Feb 2026 19:46:42 +0100 Subject: [PATCH 1/3] Add Step Functions to ECS Python integration pattern - Demonstrates synchronous (polling) and callback integration patterns - Includes inline Python code in ECS task definitions - Complete SAM template with VPC, networking, and IAM roles - Comprehensive documentation and testing instructions --- sfn-ecs-python-sam/README.md | 349 ++++++++++++++++ sfn-ecs-python-sam/example-pattern.json | 64 +++ .../statemachine/callback-pattern.asl.json | 87 ++++ .../statemachine/sync-pattern.asl.json | 69 ++++ sfn-ecs-python-sam/template.yaml | 378 ++++++++++++++++++ 5 files changed, 947 insertions(+) create mode 100644 sfn-ecs-python-sam/README.md create mode 100644 sfn-ecs-python-sam/example-pattern.json create mode 100644 sfn-ecs-python-sam/statemachine/callback-pattern.asl.json create mode 100644 sfn-ecs-python-sam/statemachine/sync-pattern.asl.json create mode 100644 sfn-ecs-python-sam/template.yaml diff --git a/sfn-ecs-python-sam/README.md b/sfn-ecs-python-sam/README.md new file mode 100644 index 000000000..edc3b47ee --- /dev/null +++ b/sfn-ecs-python-sam/README.md @@ -0,0 +1,349 @@ +# AWS Step Functions to Amazon ECS with Python + +This pattern demonstrates how to invoke an Amazon ECS task from AWS Step Functions using Python, showcasing both **synchronous (polling)** and **callback** integration patterns. + +Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns + +**Important:** This application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed + +## Architecture + +### Pattern 1: Synchronous (Polling) Integration + +``` +┌─────────────┐ ┌──────────────────┐ ┌─────────────┐ +│ Step │ │ ECS Task │ │ CloudWatch │ +│ Functions │─────▶│ (Python) │─────▶│ Logs │ +│ (Sync) │ │ │ │ │ +└─────────────┘ └──────────────────┘ └─────────────┘ + │ │ + │ │ + └───────────────────────┘ + Waits for completion +``` + +**How it works:** +1. Step Functions invokes the ECS task using `arn:aws:states:::ecs:runTask.sync` +2. Step Functions **polls** the task status automatically +3. The workflow **waits** until the ECS task completes +4. Once complete, Step Functions continues to the next state +5. If the task fails, Step Functions can catch the error and retry + +**Use cases:** +- Short to medium-duration tasks (< 1 year) +- When you need the task result before proceeding +- Simple workflows where waiting is acceptable + +### Pattern 2: Callback Integration + +``` +┌─────────────┐ ┌──────────────────┐ ┌─────────────┐ +│ Step │ │ ECS Task │ │ CloudWatch │ +│ Functions │─────▶│ (Python) │─────▶│ Logs │ +│ (Callback) │ │ │ │ │ +└─────────────┘ └──────────────────┘ └─────────────┘ + ▲ │ + │ │ + └───────────────────────┘ + Task sends callback +``` + +**How it works:** +1. Step Functions invokes the ECS task using `arn:aws:states:::ecs:runTask.waitForTaskToken` +2. Step Functions passes a **task token** to the ECS container via environment variable +3. Step Functions **pauses** and waits for a callback +4. The Python application in ECS processes the work +5. When done, the Python app calls `send_task_success()` or `send_task_failure()` with the task token +6. Step Functions receives the callback and continues + +**Use cases:** +- Long-running tasks +- Human approval workflows +- External system integrations +- When you need to decouple task execution from workflow progression + +## Deployment Instructions + +### Step 1: Clone the Repository + +```bash +git clone https://github.com/aws-samples/serverless-patterns +cd serverless-patterns/sfn-ecs-python-sam +``` + +### Step 2: Build and Deploy + +```bash +sam build +sam deploy --guided +``` + +During the prompts: +- **Stack Name**: `sfn-ecs-python-stack` +- **AWS Region**: Your preferred region (e.g., `us-east-1`) +- **Confirm changes before deploy**: Y +- **Allow SAM CLI IAM role creation**: Y +- **Save arguments to samconfig.toml**: Y + +### Step 3: Note the Outputs + +After deployment, note the following outputs: +- `SyncStateMachineArn` - ARN for the synchronous pattern +- `CallbackStateMachineArn` - ARN for the callback pattern +- `ECSClusterName` - Name of the ECS cluster +- `TaskDefinitionArn` - ARN of the task definition + +## How to Test + +### Testing the Synchronous Pattern + +1. **Start the execution:** + +```bash +aws stepfunctions start-execution \ + --state-machine-arn \ + --input '{"message": "Hello from sync pattern", "processingTime": 10}' +``` + +2. **Monitor the execution:** + +```bash +aws stepfunctions describe-execution \ + --execution-arn +``` + +3. **View the output:** + +The execution will wait for the ECS task to complete and return the result: + +```json +{ + "status": "success", + "message": "Processed: Hello from sync pattern", + "processingTime": 10, + "timestamp": "2024-02-05T10:30:00Z" +} +``` + +### Testing the Callback Pattern + +1. **Start the execution:** + +```bash +aws stepfunctions start-execution \ + --state-machine-arn \ + --input '{"message": "Hello from callback pattern", "processingTime": 30}' +``` + +2. **Monitor the execution:** + +```bash +aws stepfunctions describe-execution \ + --execution-arn +``` + +The execution will show status as `RUNNING` while waiting for the callback. + +3. **View ECS task logs:** + +```bash +aws logs tail /ecs/sfn-ecs-python-callback --follow +``` + +4. **The task will automatically send the callback** when processing completes. + +## Step-by-Step Explanation + +### Understanding the Synchronous Pattern + +**Step 1: State Machine Definition** + +The state machine uses the `.sync` integration pattern: + +```json +{ + "Type": "Task", + "Resource": "arn:aws:states:::ecs:runTask.sync", + "Parameters": { + "Cluster": "my-cluster", + "TaskDefinition": "my-task", + "LaunchType": "FARGATE", + "NetworkConfiguration": { + "AwsvpcConfiguration": { + "Subnets": ["subnet-xxx"], + "SecurityGroups": ["sg-xxx"], + "AssignPublicIp": "ENABLED" + } + }, + "Overrides": { + "ContainerOverrides": [{ + "Name": "python-container", + "Environment": [{ + "Name": "MESSAGE", + "Value.$": "$.message" + }] + }] + } + } +} +``` + +**Step 2: ECS Task Execution** + +The Python container starts and processes the input: + +```python +import os +import time +import json + +def main(): + message = os.environ.get('MESSAGE', 'No message') + processing_time = int(os.environ.get('PROCESSING_TIME', '5')) + + print(f"Processing: {message}") + time.sleep(processing_time) + + result = { + "status": "success", + "message": f"Processed: {message}", + "processingTime": processing_time + } + + print(json.dumps(result)) + +if __name__ == "__main__": + main() +``` + +**Step 3: Step Functions Polling** + +- Step Functions automatically polls ECS every few seconds +- Checks if the task is still running +- When the task completes (or fails), Step Functions captures the result +- The workflow continues to the next state + +### Understanding the Callback Pattern + +**Step 1: State Machine Definition** + +The state machine uses the `.waitForTaskToken` integration pattern: + +```json +{ + "Type": "Task", + "Resource": "arn:aws:states:::ecs:runTask.waitForTaskToken", + "Parameters": { + "Cluster": "my-cluster", + "TaskDefinition": "my-task", + "LaunchType": "FARGATE", + "Overrides": { + "ContainerOverrides": [{ + "Name": "python-container", + "Environment": [{ + "Name": "TASK_TOKEN", + "Value.$": "$$.Task.Token" + }, { + "Name": "MESSAGE", + "Value.$": "$.message" + }] + }] + } + } +} +``` + +**Key difference:** The `$$.Task.Token` is passed to the container. + +**Step 2: ECS Task with Callback** + +The Python container receives the task token and sends a callback: + +```python +import os +import boto3 +import json +import time + +def main(): + task_token = os.environ.get('TASK_TOKEN') + message = os.environ.get('MESSAGE', 'No message') + processing_time = int(os.environ.get('PROCESSING_TIME', '5')) + + sfn_client = boto3.client('stepfunctions') + + try: + print(f"Processing: {message}") + time.sleep(processing_time) + + result = { + "status": "success", + "message": f"Processed: {message}", + "processingTime": processing_time + } + + # Send success callback + sfn_client.send_task_success( + taskToken=task_token, + output=json.dumps(result) + ) + + except Exception as e: + # Send failure callback + sfn_client.send_task_failure( + taskToken=task_token, + error='ProcessingError', + cause=str(e) + ) + +if __name__ == "__main__": + main() +``` + +**Step 3: Callback Mechanism** + +- Step Functions pauses execution after starting the ECS task +- The task token acts as a unique identifier for this specific execution +- The Python app calls `send_task_success()` or `send_task_failure()` +- Step Functions receives the callback and resumes execution +- The output from the callback becomes the state output + +## Key Differences Between Patterns + +| Feature | Synchronous (.sync) | Callback (.waitForTaskToken) | +|---------|-------------------|------------------------------| +| **Max Duration** | 1 year | 1 year | +| **Polling** | Automatic by Step Functions | No polling needed | +| **Task Awareness** | Task doesn't know about Step Functions | Task must send callback | +| **Complexity** | Simple | Moderate (requires SDK calls) | +| **Use Case** | Standard batch jobs | Long-running, human approval, external systems | +| **Failure Handling** | Automatic | Manual (task must call send_task_failure) | +| **Cost** | State transitions for polling | Fewer state transitions | + +## Cleanup + +To delete the resources: + +```bash +sam delete +``` + +## Resources + +- [AWS Step Functions](https://aws.amazon.com/step-functions/) +- [Amazon ECS](https://aws.amazon.com/ecs/) +- [Step Functions ECS Integration](https://docs.aws.amazon.com/step-functions/latest/dg/connect-ecs.html) +- [Task Token Pattern](https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-wait-token) + +--- + +Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 \ No newline at end of file diff --git a/sfn-ecs-python-sam/example-pattern.json b/sfn-ecs-python-sam/example-pattern.json new file mode 100644 index 000000000..3f3fcad73 --- /dev/null +++ b/sfn-ecs-python-sam/example-pattern.json @@ -0,0 +1,64 @@ +{ + "title": "AWS Step Functions to Amazon ECS with Python", + "description": "Invoke ECS tasks from Step Functions using synchronous (polling) and callback integration patterns with Python containers", + "language": "Python", + "level": "200", + "framework": "SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern demonstrates two ways to integrate AWS Step Functions with Amazon ECS tasks running Python code:", + "1. Synchronous Pattern (.sync): Step Functions automatically polls the ECS task status and waits for completion", + "2. Callback Pattern (.waitForTaskToken): The ECS task receives a token and explicitly sends a callback when done", + "The pattern includes complete Python code running in ECS Fargate containers, VPC networking, and CloudWatch logging." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/sfn-ecs-python-sam", + "templateURL": "serverless-patterns/sfn-ecs-python-sam", + "projectFolder": "sfn-ecs-python-sam", + "templateFile": "template.yaml" + } + }, + "resources": { + "bullets": [ + { + "text": "Run Amazon ECS or Fargate tasks with Step Functions", + "link": "https://docs.aws.amazon.com/step-functions/latest/dg/connect-ecs.html" + }, + { + "text": "Wait for a Callback with Task Token", + "link": "https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-wait-token" + }, + { + "text": "Amazon ECS Task Definitions", + "link": "https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task_definitions.html" + } + ] + }, + "deploy": { + "text": [ + "sam build", + "sam deploy --guided" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: sam delete" + ] + }, + "authors": [ + { + "name": "Mian Tariq", + "image": "", + "bio": "Senior Delivery Consultant", + "linkedin": "" + } + ] +} \ No newline at end of file diff --git a/sfn-ecs-python-sam/statemachine/callback-pattern.asl.json b/sfn-ecs-python-sam/statemachine/callback-pattern.asl.json new file mode 100644 index 000000000..c33c3be11 --- /dev/null +++ b/sfn-ecs-python-sam/statemachine/callback-pattern.asl.json @@ -0,0 +1,87 @@ +{ + "Comment": "Callback ECS Task Execution - Task sends callback when complete", + "StartAt": "Run ECS Task (Callback)", + "States": { + "Run ECS Task (Callback)": { + "Type": "Task", + "Resource": "arn:aws:states:::ecs:runTask.waitForTaskToken", + "Parameters": { + "Cluster": "${ECSCluster}", + "TaskDefinition": "${TaskDefinition}", + "LaunchType": "FARGATE", + "NetworkConfiguration": { + "AwsvpcConfiguration": { + "Subnets": [ + "${Subnet1}", + "${Subnet2}" + ], + "SecurityGroups": [ + "${SecurityGroup}" + ], + "AssignPublicIp": "ENABLED" + } + }, + "Overrides": { + "ContainerOverrides": [ + { + "Name": "python-callback-container", + "Environment": [ + { + "Name": "TASK_TOKEN", + "Value.$": "$$.Task.Token" + }, + { + "Name": "MESSAGE", + "Value.$": "$.message" + }, + { + "Name": "PROCESSING_TIME", + "Value.$": "States.Format('{}', $.processingTime)" + } + ] + } + ] + } + }, + "TimeoutSeconds": 300, + "HeartbeatSeconds": 60, + "Next": "Callback Received Successfully", + "Catch": [ + { + "ErrorEquals": [ + "States.Timeout" + ], + "Next": "Task Timed Out", + "ResultPath": "$.error" + }, + { + "ErrorEquals": [ + "States.ALL" + ], + "Next": "Task Failed", + "ResultPath": "$.error" + } + ] + }, + "Callback Received Successfully": { + "Type": "Pass", + "Parameters": { + "status": "completed", + "pattern": "callback", + "message": "ECS task sent callback successfully", + "result.$": "$" + }, + "End": true + }, + "Task Timed Out": { + "Type": "Fail", + "Error": "TaskTimeout", + "Cause": "The ECS task did not send a callback within the timeout period" + }, + "Task Failed": { + "Type": "Fail", + "Error": "ECSTaskFailed", + "Cause": "The ECS task failed or sent a failure callback" + } + } +} \ No newline at end of file diff --git a/sfn-ecs-python-sam/statemachine/sync-pattern.asl.json b/sfn-ecs-python-sam/statemachine/sync-pattern.asl.json new file mode 100644 index 000000000..55a2961e7 --- /dev/null +++ b/sfn-ecs-python-sam/statemachine/sync-pattern.asl.json @@ -0,0 +1,69 @@ +{ + "Comment": "Synchronous ECS Task Execution - Step Functions polls until task completes", + "StartAt": "Run ECS Task (Sync)", + "States": { + "Run ECS Task (Sync)": { + "Type": "Task", + "Resource": "arn:aws:states:::ecs:runTask.sync", + "Parameters": { + "Cluster": "${ECSCluster}", + "TaskDefinition": "${TaskDefinition}", + "LaunchType": "FARGATE", + "NetworkConfiguration": { + "AwsvpcConfiguration": { + "Subnets": [ + "${Subnet1}", + "${Subnet2}" + ], + "SecurityGroups": [ + "${SecurityGroup}" + ], + "AssignPublicIp": "ENABLED" + } + }, + "Overrides": { + "ContainerOverrides": [ + { + "Name": "python-sync-container", + "Environment": [ + { + "Name": "MESSAGE", + "Value.$": "$.message" + }, + { + "Name": "PROCESSING_TIME", + "Value.$": "States.Format('{}', $.processingTime)" + } + ] + } + ] + } + }, + "Next": "Task Completed Successfully", + "Catch": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "Next": "Task Failed", + "ResultPath": "$.error" + } + ] + }, + "Task Completed Successfully": { + "Type": "Pass", + "Parameters": { + "status": "completed", + "pattern": "synchronous", + "message": "ECS task completed successfully", + "input.$": "$" + }, + "End": true + }, + "Task Failed": { + "Type": "Fail", + "Error": "ECSTaskFailed", + "Cause": "The ECS task failed to complete successfully" + } + } +} \ No newline at end of file diff --git a/sfn-ecs-python-sam/template.yaml b/sfn-ecs-python-sam/template.yaml new file mode 100644 index 000000000..4c6662767 --- /dev/null +++ b/sfn-ecs-python-sam/template.yaml @@ -0,0 +1,378 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: > + Step Functions to ECS with Python - Demonstrates sync and callback patterns + +Parameters: + VpcCIDR: + Type: String + Default: 10.0.0.0/16 + Description: CIDR block for VPC + +Resources: + # VPC and Networking + VPC: + Type: AWS::EC2::VPC + Properties: + CidrBlock: !Ref VpcCIDR + EnableDnsHostnames: true + EnableDnsSupport: true + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-vpc + + PublicSubnet1: + Type: AWS::EC2::Subnet + Properties: + VpcId: !Ref VPC + CidrBlock: 10.0.1.0/24 + AvailabilityZone: !Select [0, !GetAZs ''] + MapPublicIpOnLaunch: true + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-public-subnet-1 + + PublicSubnet2: + Type: AWS::EC2::Subnet + Properties: + VpcId: !Ref VPC + CidrBlock: 10.0.2.0/24 + AvailabilityZone: !Select [1, !GetAZs ''] + MapPublicIpOnLaunch: true + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-public-subnet-2 + + InternetGateway: + Type: AWS::EC2::InternetGateway + Properties: + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-igw + + AttachGateway: + Type: AWS::EC2::VPCGatewayAttachment + Properties: + VpcId: !Ref VPC + InternetGatewayId: !Ref InternetGateway + + PublicRouteTable: + Type: AWS::EC2::RouteTable + Properties: + VpcId: !Ref VPC + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-public-rt + + PublicRoute: + Type: AWS::EC2::Route + DependsOn: AttachGateway + Properties: + RouteTableId: !Ref PublicRouteTable + DestinationCidrBlock: 0.0.0.0/0 + GatewayId: !Ref InternetGateway + + SubnetRouteTableAssociation1: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + SubnetId: !Ref PublicSubnet1 + RouteTableId: !Ref PublicRouteTable + + SubnetRouteTableAssociation2: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + SubnetId: !Ref PublicSubnet2 + RouteTableId: !Ref PublicRouteTable + + # Security Group + ECSSecurityGroup: + Type: AWS::EC2::SecurityGroup + Properties: + GroupDescription: Security group for ECS tasks + VpcId: !Ref VPC + SecurityGroupEgress: + - IpProtocol: -1 + CidrIp: 0.0.0.0/0 + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-ecs-sg + + # ECS Cluster + ECSCluster: + Type: AWS::ECS::Cluster + Properties: + ClusterName: !Sub ${AWS::StackName}-cluster + ClusterSettings: + - Name: containerInsights + Value: enabled + + # CloudWatch Log Group + ECSLogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: !Sub /ecs/${AWS::StackName} + RetentionInDays: 7 + + # ECS Task Execution Role + ECSTaskExecutionRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: ecs-tasks.amazonaws.com + Action: sts:AssumeRole + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy + + # ECS Task Role (for callback pattern) + ECSTaskRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: ecs-tasks.amazonaws.com + Action: sts:AssumeRole + Policies: + - PolicyName: StepFunctionsCallback + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - states:SendTaskSuccess + - states:SendTaskFailure + - states:SendTaskHeartbeat + Resource: '*' + + # ECS Task Definition for Sync Pattern + SyncTaskDefinition: + Type: AWS::ECS::TaskDefinition + Properties: + Family: !Sub ${AWS::StackName}-sync-task + NetworkMode: awsvpc + RequiresCompatibilities: + - FARGATE + Cpu: '256' + Memory: '512' + ExecutionRoleArn: !GetAtt ECSTaskExecutionRole.Arn + ContainerDefinitions: + - Name: python-sync-container + Image: public.ecr.aws/docker/library/python:3.11-slim + Essential: true + Command: + - /bin/sh + - -c + - | + pip install --quiet boto3 && python3 << 'EOF' + import os + import time + import json + from datetime import datetime + + def main(): + message = os.environ.get('MESSAGE', 'No message provided') + processing_time = int(os.environ.get('PROCESSING_TIME', '5')) + + print(f"[SYNC] Starting processing: {message}") + print(f"[SYNC] Will process for {processing_time} seconds") + + # Simulate processing + time.sleep(processing_time) + + result = { + "status": "success", + "message": f"Processed: {message}", + "processingTime": processing_time, + "timestamp": datetime.utcnow().isoformat() + "Z" + } + + print(f"[SYNC] Completed: {json.dumps(result)}") + return result + + if __name__ == "__main__": + main() + EOF + LogConfiguration: + LogDriver: awslogs + Options: + awslogs-group: !Ref ECSLogGroup + awslogs-region: !Ref AWS::Region + awslogs-stream-prefix: sync + + # ECS Task Definition for Callback Pattern + CallbackTaskDefinition: + Type: AWS::ECS::TaskDefinition + Properties: + Family: !Sub ${AWS::StackName}-callback-task + NetworkMode: awsvpc + RequiresCompatibilities: + - FARGATE + Cpu: '256' + Memory: '512' + ExecutionRoleArn: !GetAtt ECSTaskExecutionRole.Arn + TaskRoleArn: !GetAtt ECSTaskRole.Arn + ContainerDefinitions: + - Name: python-callback-container + Image: public.ecr.aws/docker/library/python:3.11-slim + Essential: true + Command: + - /bin/sh + - -c + - | + pip install --quiet boto3 && python3 << 'EOF' + import os + import boto3 + import json + import time + from datetime import datetime + + def main(): + task_token = os.environ.get('TASK_TOKEN') + message = os.environ.get('MESSAGE', 'No message provided') + processing_time = int(os.environ.get('PROCESSING_TIME', '5')) + + if not task_token: + print("[CALLBACK] ERROR: No task token provided!") + return + + sfn_client = boto3.client('stepfunctions') + + try: + print(f"[CALLBACK] Starting processing: {message}") + print(f"[CALLBACK] Task token: {task_token[:20]}...") + print(f"[CALLBACK] Will process for {processing_time} seconds") + + # Simulate processing + time.sleep(processing_time) + + result = { + "status": "success", + "message": f"Processed: {message}", + "processingTime": processing_time, + "timestamp": datetime.utcnow().isoformat() + "Z" + } + + print(f"[CALLBACK] Sending success callback") + sfn_client.send_task_success( + taskToken=task_token, + output=json.dumps(result) + ) + print(f"[CALLBACK] Success callback sent!") + + except Exception as e: + print(f"[CALLBACK] ERROR: {str(e)}") + try: + sfn_client.send_task_failure( + taskToken=task_token, + error='ProcessingError', + cause=str(e) + ) + print(f"[CALLBACK] Failure callback sent") + except Exception as callback_error: + print(f"[CALLBACK] Failed to send callback: {str(callback_error)}") + + if __name__ == "__main__": + main() + EOF + LogConfiguration: + LogDriver: awslogs + Options: + awslogs-group: !Ref ECSLogGroup + awslogs-region: !Ref AWS::Region + awslogs-stream-prefix: callback + + # Step Functions Role + StepFunctionsRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: states.amazonaws.com + Action: sts:AssumeRole + Policies: + - PolicyName: ECSTaskExecution + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - ecs:RunTask + - ecs:StopTask + - ecs:DescribeTasks + Resource: '*' + - Effect: Allow + Action: + - iam:PassRole + Resource: + - !GetAtt ECSTaskExecutionRole.Arn + - !GetAtt ECSTaskRole.Arn + - Effect: Allow + Action: + - events:PutTargets + - events:PutRule + - events:DescribeRule + Resource: '*' + + # Synchronous Pattern State Machine + SyncStateMachine: + Type: AWS::Serverless::StateMachine + Properties: + Name: !Sub ${AWS::StackName}-sync-pattern + Type: STANDARD + Role: !GetAtt StepFunctionsRole.Arn + DefinitionUri: statemachine/sync-pattern.asl.json + DefinitionSubstitutions: + ECSCluster: !Ref ECSCluster + TaskDefinition: !Ref SyncTaskDefinition + Subnet1: !Ref PublicSubnet1 + Subnet2: !Ref PublicSubnet2 + SecurityGroup: !Ref ECSSecurityGroup + + # Callback Pattern State Machine + CallbackStateMachine: + Type: AWS::Serverless::StateMachine + Properties: + Name: !Sub ${AWS::StackName}-callback-pattern + Type: STANDARD + Role: !GetAtt StepFunctionsRole.Arn + DefinitionUri: statemachine/callback-pattern.asl.json + DefinitionSubstitutions: + ECSCluster: !Ref ECSCluster + TaskDefinition: !Ref CallbackTaskDefinition + Subnet1: !Ref PublicSubnet1 + Subnet2: !Ref PublicSubnet2 + SecurityGroup: !Ref ECSSecurityGroup + +Outputs: + SyncStateMachineArn: + Description: ARN of the Synchronous Pattern State Machine + Value: !Ref SyncStateMachine + + CallbackStateMachineArn: + Description: ARN of the Callback Pattern State Machine + Value: !Ref CallbackStateMachine + + ECSClusterName: + Description: Name of the ECS Cluster + Value: !Ref ECSCluster + + SyncTaskDefinitionArn: + Description: ARN of the Sync Task Definition + Value: !Ref SyncTaskDefinition + + CallbackTaskDefinitionArn: + Description: ARN of the Callback Task Definition + Value: !Ref CallbackTaskDefinition + + LogGroupName: + Description: CloudWatch Log Group for ECS tasks + Value: !Ref ECSLogGroup \ No newline at end of file From ce2829ff3059c34e77efcf7c213c3596d538fea0 Mon Sep 17 00:00:00 2001 From: 4D54 <78043109+4D54@users.noreply.github.com> Date: Thu, 5 Feb 2026 20:14:41 +0100 Subject: [PATCH 2/3] Add security note to README - Document permissive IAM and security group configurations - Explain why they exist (demo/learning purposes) - Provide production security recommendations - Recommend deployment in non-production environments --- sfn-ecs-python-sam/README.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/sfn-ecs-python-sam/README.md b/sfn-ecs-python-sam/README.md index edc3b47ee..01b84f6d9 100644 --- a/sfn-ecs-python-sam/README.md +++ b/sfn-ecs-python-sam/README.md @@ -6,6 +6,22 @@ Learn more about this pattern at Serverless Land Patterns: https://serverlesslan **Important:** This application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. +## Security Note + +This pattern is designed for learning and demonstration purposes. The IAM roles and security group use permissive configurations to simplify deployment and focus on the integration patterns: + +- **Security Group**: Allows all outbound traffic (required for pulling Docker images and calling AWS APIs) +- **IAM Roles**: Use wildcard (`*`) resources for ECS task management and Step Functions callbacks + +**For production use**, you should: +- Restrict security group egress to specific AWS service endpoints using VPC endpoints +- Scope IAM policies to specific resources (task definitions, state machines) +- Implement least privilege access based on your security requirements +- Consider using AWS PrivateLink for service-to-service communication +- Enable VPC Flow Logs for network traffic monitoring + +Deploy this pattern in a non-production AWS account or isolated environment for testing. + ## Requirements * [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. From 5c2a13082fb4e226b69caf82d8996da6cdb246a3 Mon Sep 17 00:00:00 2001 From: 4D54 <78043109+4D54@users.noreply.github.com> Date: Thu, 12 Feb 2026 18:03:53 +0100 Subject: [PATCH 3/3] feat: Replace Step Functions pattern with Lambda Durable Functions pattern - Replace sfn-ecs-python-sam with lambda-ecs-python-sam - Implement synchronous pattern with polling using context.wait() - Implement callback pattern with DynamoDB state tracking - Use Python 3.13 with container image deployment - Support durable execution for workflows up to 1 year - Reduce costs with no charges during context.wait() periods --- lambda-ecs-durable-python-sam/README.md | 320 +++++++++++++++ .../example-pattern.json | 68 ++++ lambda-ecs-durable-python-sam/src/Dockerfile | 14 + .../src/callback_handler.py | 171 ++++++++ .../src/requirements.txt | 2 + .../src/sync_handler.py | 153 ++++++++ .../template.yaml | 239 +++++++----- sfn-ecs-python-sam/README.md | 365 ------------------ sfn-ecs-python-sam/example-pattern.json | 64 --- .../statemachine/callback-pattern.asl.json | 87 ----- .../statemachine/sync-pattern.asl.json | 69 ---- 11 files changed, 879 insertions(+), 673 deletions(-) create mode 100644 lambda-ecs-durable-python-sam/README.md create mode 100644 lambda-ecs-durable-python-sam/example-pattern.json create mode 100644 lambda-ecs-durable-python-sam/src/Dockerfile create mode 100644 lambda-ecs-durable-python-sam/src/callback_handler.py create mode 100644 lambda-ecs-durable-python-sam/src/requirements.txt create mode 100644 lambda-ecs-durable-python-sam/src/sync_handler.py rename {sfn-ecs-python-sam => lambda-ecs-durable-python-sam}/template.yaml (59%) delete mode 100644 sfn-ecs-python-sam/README.md delete mode 100644 sfn-ecs-python-sam/example-pattern.json delete mode 100644 sfn-ecs-python-sam/statemachine/callback-pattern.asl.json delete mode 100644 sfn-ecs-python-sam/statemachine/sync-pattern.asl.json diff --git a/lambda-ecs-durable-python-sam/README.md b/lambda-ecs-durable-python-sam/README.md new file mode 100644 index 000000000..f3eddce4b --- /dev/null +++ b/lambda-ecs-durable-python-sam/README.md @@ -0,0 +1,320 @@ +# AWS Lambda Durable Functions to Amazon ECS with Python + +This pattern demonstrates how to invoke an Amazon ECS task from AWS Lambda Durable Functions using Python, showcasing resilient multi-step workflows with automatic checkpointing and state management. + +Lambda Durable Functions enable you to build resilient applications that can execute for up to one year while maintaining reliable progress despite interruptions. This pattern shows two integration approaches: **synchronous (polling with durable waits)** and **callback (async with durable steps)**. + +Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns + +**Important:** This application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. + +## What are Lambda Durable Functions? + +Lambda Durable Functions enable you to build resilient multi-step applications that can execute for up to one year while maintaining reliable progress despite interruptions. Key features include: + +- **Automatic Checkpointing**: Each step is automatically checkpointed, so your function can resume from the last completed step after interruptions +- **Cost-Effective Waits**: During wait operations, your function suspends without incurring compute charges +- **Built-in Retries**: Steps have automatic retry logic with progress tracking +- **Deterministic Replay**: When resuming, completed steps use stored results instead of re-executing + +This pattern uses the [AWS Durable Execution SDK for Python](https://docs.aws.amazon.com/lambda/latest/dg/durable-execution-sdk.html) to implement these capabilities. + +## Security Note + +This pattern is designed for learning and demonstration purposes. The IAM roles and security group use permissive configurations to simplify deployment and focus on the integration patterns: + +- **Security Group**: Allows all outbound traffic (required for pulling Docker images and calling AWS APIs) +- **IAM Roles**: Use wildcard (`*`) resources for ECS task management + +**For production use**, you should: +- Restrict security group egress to specific AWS service endpoints using VPC endpoints +- Scope IAM policies to specific resources (task definitions, DynamoDB tables) +- Implement least privilege access based on your security requirements +- Consider using AWS PrivateLink for service-to-service communication +- Enable VPC Flow Logs for network traffic monitoring +- Package the AWS SDK in your Lambda deployment package (13-14MB) instead of relying on the Lambda-provided runtime SDK +- Include the Durable Execution SDK in your deployment package for production (included in requirements.txt) + +Deploy this pattern in a non-production AWS account or isolated environment for testing. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed + +## Architecture + +### Pattern 1: Synchronous (Durable Polling) Integration + +``` +┌─────────────────────┐ ┌──────────────────┐ ┌─────────────┐ +│ Lambda Durable │ │ ECS Task │ │ CloudWatch │ +│ Function (Sync) │─────▶│ (Python) │─────▶│ Logs │ +│ │ │ │ │ │ +└─────────────────────┘ └──────────────────┘ └─────────────┘ + │ │ + │ Durable Wait (no charges) │ + └───────────────────────────────┘ + Polls with checkpointing +``` + +**How it works:** +1. Lambda durable function invokes the ECS task using `ecs:RunTask` (checkpointed step) +2. Function uses `context.wait()` to pause without compute charges +3. After each wait, function checks task status using `ecs:DescribeTasks` (checkpointed step) +4. If interrupted, function automatically resumes from last checkpoint +5. Once complete, Lambda returns the result +6. Can run for up to 1 year (vs 15 minutes for standard Lambda) + +**Key Durable Features:** +- `@durable_execution` decorator enables durable execution +- `@durable_step` decorator marks functions as checkpointed steps +- `context.wait()` suspends execution without charges +- Automatic replay and recovery from failures + +**Use cases:** +- Long-running tasks (hours to days) +- Tasks requiring reliable progress tracking +- Workflows that need automatic recovery +- Cost-sensitive polling operations + +**Advantages over standard Lambda:** +- No 15-minute timeout limitation +- Pay only for active execution time (not wait time) +- Automatic checkpointing and recovery +- Built-in retry logic + +### Pattern 2: Callback (Durable Async) Integration + +``` +┌─────────────────────┐ ┌──────────────────┐ ┌─────────────┐ +│ Lambda Durable │ │ ECS Task │ │ CloudWatch │ +│ Function (Callback)│─────▶│ (Python) │─────▶│ Logs │ +│ │ │ │ │ │ +└─────────────────────┘ └──────────────────┘ └─────────────┘ + │ │ │ + │ Checkpointed Steps │ │ + │ ▼ │ + │ ┌─────────────────┐ │ + └──────────────────────│ DynamoDB │◄─────────────┘ + │ Table │ + └─────────────────┘ +``` + +**How it works:** +1. Lambda durable function creates DynamoDB record (checkpointed step) +2. Lambda invokes the ECS task using `ecs:RunTask` (checkpointed step) +3. Lambda updates DynamoDB with task ARN (checkpointed step) +4. Lambda **returns immediately** (async pattern) +5. The Python application in ECS processes the work +6. When done, the ECS task updates DynamoDB with the result +7. If any step fails, automatic retry with checkpoint recovery + +**Key Durable Features:** +- Each step is automatically checkpointed +- If interrupted, function resumes from last completed step +- No re-execution of completed steps +- Reliable task initiation guaranteed + +**Use cases:** +- Fire-and-forget workflows +- Asynchronous processing +- When you don't need immediate results +- Decoupling task execution from API responses +- Workflows requiring guaranteed task initiation + +**Advantages:** +- Reliable task initiation with automatic recovery +- Minimal Lambda execution time +- Each step is independently retryable +- No risk of duplicate task creation (idempotent) + +## Deployment Instructions + +### Prerequisites + +* Python 3.13 or 3.14 runtime support for Lambda Durable Functions +* AWS SAM CLI version that supports DurableConfig and container images +* Docker installed (for building Lambda container images) + +### Step 1: Clone the Repository + +```bash +git clone https://github.com/aws-samples/serverless-patterns +cd serverless-patterns/lambda-ecs-python-sam +``` + +### Step 2: Build and Deploy + +This pattern uses Lambda container images with Python 3.13 to support durable functions. The build process will: +- Build Docker images with the Durable Execution SDK +- Create ECR repositories automatically +- Push images to ECR +- Deploy Lambda functions using the container images + +```bash +sam build +sam deploy --guided +``` + +During the prompts: +- **Stack Name**: `lambda-ecs-durable-demo` (or your preferred name) +- **AWS Region**: Your preferred region (e.g., `us-east-1`) +- **Parameter VpcCIDR**: Press Enter to use default (10.0.0.0/16) +- **Confirm changes before deploy**: Y +- **Allow SAM CLI IAM role creation**: Y +- **Disable rollback**: N +- **SyncLambdaFunction has no authorization defined**: Y +- **CallbackLambdaFunction has no authorization defined**: Y +- **Create managed ECR repositories for all functions**: Y (required for container images) +- **Save arguments to samconfig.toml**: Y + +The deployment will take 5-10 minutes as it creates VPC, ECS cluster, Lambda functions, and other resources. + +### Step 3: Note the Outputs + +After deployment, note the following outputs: +- `SyncLambdaFunctionArn` - ARN for the synchronous pattern Lambda +- `CallbackLambdaFunctionArn` - ARN for the callback pattern Lambda +- `CallbackTableName` - DynamoDB table for callback tracking +- `ECSClusterName` - Name of the ECS cluster +- `LogGroupName` - CloudWatch log group for ECS tasks + +**Important**: When invoking durable functions, you must use a qualified ARN (append `:$LATEST` to the function name). + +## How to Test + +### Testing the Synchronous (Durable) Pattern + +1. **Invoke the durable function asynchronously:** + +Lambda Durable Functions with execution timeout > 15 minutes must be invoked asynchronously. Use the `--invocation-type Event` flag and a qualified ARN (with `:$LATEST`): + +```bash +aws lambda invoke \ + --function-name lambda-ecs-durable-demo-sync-function:\$LATEST \ + --invocation-type Event \ + --cli-binary-format raw-in-base64-out \ + --payload '{"message": "Hello from durable sync pattern", "processingTime": 10}' \ + response.json +``` + +**Note**: The `\$LATEST` qualifier is required for durable functions. The backslash escapes the dollar sign in bash. + +2. **Monitor the Lambda execution logs:** + +```bash +aws logs tail /aws/lambda/lambda-ecs-durable-demo-sync-function --follow +``` + +You'll see: +- Task starting with checkpointed step +- Durable waits (no compute charges during waits) +- Status checks every 5 seconds (PROVISIONING → PENDING → RUNNING → STOPPED) +- Each check is a separate checkpointed operation +- Final result when task completes + +3. **View ECS task logs:** + +```bash +aws logs tail /ecs/lambda-ecs-durable-demo --follow +``` + +4. **View execution in Lambda console:** + +Navigate to the Lambda console → Your function → "Monitoring" tab → "Logs" to see the execution timeline and checkpoints. + +### Testing the Callback (Durable) Pattern + +1. **Invoke the durable function asynchronously:** + +```bash +aws lambda invoke \ + --function-name lambda-ecs-durable-demo-callback-function:\$LATEST \ + --invocation-type Event \ + --cli-binary-format raw-in-base64-out \ + --payload '{"message": "Hello from durable callback pattern", "processingTime": 30}' \ + response.json +``` + +2. **Monitor the Lambda execution logs:** + +```bash +aws logs tail /aws/lambda/lambda-ecs-durable-demo-callback-function --follow +``` + +You'll see: +- DynamoDB record creation (checkpointed) +- ECS task initiation (checkpointed) +- Function returns immediately + +3. **Check the status in DynamoDB:** + +```bash +# Scan the table to see all executions +aws dynamodb scan --table-name lambda-ecs-durable-demo-callbacks + +# Or get a specific execution (replace with your execution ID from logs) +aws dynamodb get-item \ + --table-name lambda-ecs-durable-demo-callbacks \ + --key '{"executionId": {"S": "YOUR-EXECUTION-ID"}}' +``` + +4. **Monitor ECS task logs:** + +```bash +aws logs tail /ecs/lambda-ecs-durable-demo --follow +``` + +The ECS task will update DynamoDB when processing is complete. You'll see the result in the `result` field with status `COMPLETED`. + +## Key Differences Between Patterns + +| Feature | Synchronous (Durable Polling) | Callback (Durable Async) | +|---------|------------------------------|--------------------------| +| **Execution Duration** | Up to 1 year | Up to 1 year | +| **Checkpointing** | Automatic for each step | Automatic for each step | +| **Wait Charges** | No charges during waits | N/A (returns immediately) | +| **Polling** | Durable waits between checks | No polling needed | +| **Task Awareness** | Task doesn't know about Lambda | Task updates DynamoDB | +| **Complexity** | Moderate (durable steps + waits) | Moderate (durable steps + DynamoDB) | +| **Use Case** | Long-running tasks needing results | Fire-and-forget workflows | +| **Cost** | Pay only for active execution | Minimal (quick execution) | +| **Result Retrieval** | Returned by function | Query DynamoDB | +| **Reliability** | Automatic recovery from failures | Guaranteed task initiation | + +## Benefits of Lambda Durable Functions + +Compared to standard Lambda functions: + +✅ **Extended Duration**: Execute for up to 1 year (vs 15 minutes) +✅ **Cost Optimization**: No charges during wait operations +✅ **Automatic Recovery**: Built-in checkpointing and replay +✅ **Simplified Code**: No manual state management needed +✅ **Reliable Execution**: Guaranteed progress despite interruptions +✅ **Built-in Retries**: Automatic retry logic for steps + +## Cleanup + +To delete the resources: + +```bash +sam delete +``` + +## Resources + +- [AWS Lambda Durable Functions](https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html) +- [Durable Execution SDK](https://docs.aws.amazon.com/lambda/latest/dg/durable-execution-sdk.html) +- [AWS Lambda](https://aws.amazon.com/lambda/) +- [Amazon ECS](https://aws.amazon.com/ecs/) +- [Amazon DynamoDB](https://aws.amazon.com/dynamodb/) +- [ECS RunTask API](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_RunTask.html) + +--- + +Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/lambda-ecs-durable-python-sam/example-pattern.json b/lambda-ecs-durable-python-sam/example-pattern.json new file mode 100644 index 000000000..51f7d21da --- /dev/null +++ b/lambda-ecs-durable-python-sam/example-pattern.json @@ -0,0 +1,68 @@ +{ + "title": "AWS Lambda Durable Functions to Amazon ECS with Python", + "description": "Invoke ECS tasks from Lambda Durable Functions with automatic checkpointing, state management, and resilient execution patterns", + "language": "Python", + "level": "300", + "framework": "SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern demonstrates AWS Lambda Durable Functions invoking Amazon ECS tasks with resilient, long-running execution capabilities:", + "1. Durable Synchronous Pattern: Lambda uses checkpointed steps and durable waits to poll ECS task status. Can run for up to 1 year with automatic recovery from failures. No compute charges during wait periods.", + "2. Durable Callback Pattern: Lambda uses checkpointed steps to reliably initiate ECS tasks. Each step (create record, start task, update status) is automatically checkpointed for guaranteed execution.", + "The pattern uses the AWS Durable Execution SDK for Python, providing automatic state management, checkpoint-based recovery, and cost-effective long-running workflows. Includes inline Python code in ECS containers, VPC networking, and DynamoDB for callback tracking." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/lambda-ecs-python-sam", + "templateURL": "serverless-patterns/lambda-ecs-python-sam", + "projectFolder": "lambda-ecs-python-sam", + "templateFile": "template.yaml" + } + }, + "resources": { + "bullets": [ + { + "text": "Lambda Durable Functions", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html" + }, + { + "text": "Durable Execution SDK", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-execution-sdk.html" + }, + { + "text": "Run Amazon ECS or Fargate tasks", + "link": "https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ecs_run_task.html" + }, + { + "text": "Amazon ECS Task Definitions", + "link": "https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task_definitions.html" + } + ] + }, + "deploy": { + "text": [ + "sam build", + "sam deploy --guided" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: sam delete" + ] + }, + "authors": [ + { + "name": "Mian Tariq", + "image": "", + "bio": "Senior Delivery Consultant", + "linkedin": "" + } + ] +} diff --git a/lambda-ecs-durable-python-sam/src/Dockerfile b/lambda-ecs-durable-python-sam/src/Dockerfile new file mode 100644 index 000000000..730a1f347 --- /dev/null +++ b/lambda-ecs-durable-python-sam/src/Dockerfile @@ -0,0 +1,14 @@ +FROM public.ecr.aws/lambda/python:3.13 + +# Copy requirements file +COPY requirements.txt ${LAMBDA_TASK_ROOT}/ + +# Install dependencies including durable SDK +RUN pip install -r requirements.txt + +# Copy function code +COPY sync_handler.py ${LAMBDA_TASK_ROOT}/ +COPY callback_handler.py ${LAMBDA_TASK_ROOT}/ + +# Default handler (will be overridden by template) +CMD [ "sync_handler.lambda_handler" ] diff --git a/lambda-ecs-durable-python-sam/src/callback_handler.py b/lambda-ecs-durable-python-sam/src/callback_handler.py new file mode 100644 index 000000000..97a3294c7 --- /dev/null +++ b/lambda-ecs-durable-python-sam/src/callback_handler.py @@ -0,0 +1,171 @@ +import json +import boto3 +import os +from aws_durable_execution_sdk_python import ( + DurableContext, + durable_execution, + durable_step, +) +from aws_durable_execution_sdk_python.config import Duration + +ecs_client = boto3.client('ecs') +dynamodb = boto3.resource('dynamodb') + +@durable_step +def create_execution_record(step_context, callback_table, execution_id, message, processing_time): + """ + Durable step that creates initial execution record in DynamoDB. + """ + step_context.logger.info(f"[CALLBACK] Creating execution record: {execution_id}") + + table = dynamodb.Table(callback_table) + table.put_item( + Item={ + 'executionId': execution_id, + 'status': 'RUNNING', + 'message': message, + 'processingTime': processing_time + } + ) + + return execution_id + +@durable_step +def start_ecs_task_async(step_context, cluster, task_definition, subnet1, subnet2, security_group, + message, processing_time, execution_id, callback_table): + """ + Durable step that starts an ECS task for async processing. + """ + step_context.logger.info(f"[CALLBACK] Starting ECS task with execution ID: {execution_id}") + + response = ecs_client.run_task( + cluster=cluster, + taskDefinition=task_definition, + launchType='FARGATE', + networkConfiguration={ + 'awsvpcConfiguration': { + 'subnets': [subnet1, subnet2], + 'securityGroups': [security_group], + 'assignPublicIp': 'ENABLED' + } + }, + overrides={ + 'containerOverrides': [ + { + 'name': 'python-callback-container', + 'environment': [ + {'name': 'MESSAGE', 'value': message}, + {'name': 'PROCESSING_TIME', 'value': str(processing_time)}, + {'name': 'EXECUTION_ID', 'value': execution_id}, + {'name': 'CALLBACK_TABLE', 'value': callback_table} + ] + } + ] + } + ) + + if not response['tasks']: + raise Exception("Failed to start ECS task") + + task_arn = response['tasks'][0]['taskArn'] + step_context.logger.info(f"[CALLBACK] Task started: {task_arn}") + + return task_arn + +@durable_step +def update_task_arn(step_context, callback_table, execution_id, task_arn): + """ + Durable step that updates DynamoDB with task ARN. + """ + step_context.logger.info(f"[CALLBACK] Updating task ARN in DynamoDB") + + table = dynamodb.Table(callback_table) + table.update_item( + Key={'executionId': execution_id}, + UpdateExpression='SET taskArn = :arn', + ExpressionAttributeValues={':arn': task_arn} + ) + + return True + +@durable_execution +def lambda_handler(event, context: DurableContext): + """ + Lambda Durable Function that invokes an ECS task asynchronously. + The ECS task will update DynamoDB when complete. + + This function uses durable execution to ensure reliable task initiation + with automatic checkpointing and recovery. + """ + + # Get configuration from environment variables + cluster = os.environ['ECS_CLUSTER'] + task_definition = os.environ['TASK_DEFINITION'] + subnet1 = os.environ['SUBNET_1'] + subnet2 = os.environ['SUBNET_2'] + security_group = os.environ['SECURITY_GROUP'] + callback_table = os.environ['CALLBACK_TABLE'] + + # Get input parameters + message = event.get('message', 'No message provided') + processing_time = event.get('processingTime', 5) + + # Generate unique execution ID from durable execution ARN + import uuid + execution_id = event.get('executionId', str(uuid.uuid4())) + + try: + # Step 1: Create execution record (checkpointed) + context.step(create_execution_record( + callback_table, execution_id, message, processing_time + )) + + # Step 2: Start ECS task (checkpointed) + task_arn = context.step(start_ecs_task_async( + cluster, task_definition, subnet1, subnet2, security_group, + message, processing_time, execution_id, callback_table + )) + + # Step 3: Update DynamoDB with task ARN (checkpointed) + context.step(update_task_arn(callback_table, execution_id, task_arn)) + + # Return immediately (async pattern) + return { + 'statusCode': 202, # Accepted + 'body': json.dumps({ + 'status': 'accepted', + 'message': 'ECS task started, will callback when complete', + 'executionId': execution_id, + 'taskArn': task_arn + }) + } + + except Exception as e: + context.logger.error(f"[CALLBACK] Error: {str(e)}") + + # Update DynamoDB with error + try: + table = dynamodb.Table(callback_table) + table.update_item( + Key={'executionId': execution_id}, + UpdateExpression='SET #status = :status, #error = :error', + ExpressionAttributeNames={ + '#status': 'status', + '#error': 'error' + }, + ExpressionAttributeValues={ + ':status': 'FAILED', + ':error': str(e) + } + ) + except: + pass + + return { + 'statusCode': 500, + 'body': json.dumps({ + 'status': 'error', + 'error': str(e), + 'executionId': execution_id + }) + } diff --git a/lambda-ecs-durable-python-sam/src/requirements.txt b/lambda-ecs-durable-python-sam/src/requirements.txt new file mode 100644 index 000000000..46ba39952 --- /dev/null +++ b/lambda-ecs-durable-python-sam/src/requirements.txt @@ -0,0 +1,2 @@ +boto3 +aws-durable-execution-sdk-python diff --git a/lambda-ecs-durable-python-sam/src/sync_handler.py b/lambda-ecs-durable-python-sam/src/sync_handler.py new file mode 100644 index 000000000..8ba2739e6 --- /dev/null +++ b/lambda-ecs-durable-python-sam/src/sync_handler.py @@ -0,0 +1,153 @@ +import json +import boto3 +import os +from aws_durable_execution_sdk_python import ( + DurableContext, + durable_execution, + durable_step, +) +from aws_durable_execution_sdk_python.config import Duration + +ecs_client = boto3.client('ecs') + +@durable_step +def start_ecs_task(step_context, cluster, task_definition, subnet1, subnet2, security_group, message, processing_time): + """ + Durable step that starts an ECS task. + This step is checkpointed, so if interrupted, it won't re-execute. + """ + step_context.logger.info(f"[SYNC] Starting ECS task with message: {message}") + + response = ecs_client.run_task( + cluster=cluster, + taskDefinition=task_definition, + launchType='FARGATE', + networkConfiguration={ + 'awsvpcConfiguration': { + 'subnets': [subnet1, subnet2], + 'securityGroups': [security_group], + 'assignPublicIp': 'ENABLED' + } + }, + overrides={ + 'containerOverrides': [ + { + 'name': 'python-sync-container', + 'environment': [ + {'name': 'MESSAGE', 'value': message}, + {'name': 'PROCESSING_TIME', 'value': str(processing_time)} + ] + } + ] + } + ) + + if not response['tasks']: + raise Exception("Failed to start ECS task") + + task_arn = response['tasks'][0]['taskArn'] + step_context.logger.info(f"[SYNC] Task started: {task_arn}") + + return task_arn + +@durable_step +def check_task_status(step_context, cluster, task_arn): + """ + Durable step that checks ECS task status. + This step is checkpointed and can be retried if it fails. + """ + step_context.logger.info(f"[SYNC] Checking task status: {task_arn}") + + describe_response = ecs_client.describe_tasks( + cluster=cluster, + tasks=[task_arn] + ) + + if not describe_response['tasks']: + raise Exception(f"Task not found: {task_arn}") + + task = describe_response['tasks'][0] + last_status = task['lastStatus'] + + step_context.logger.info(f"[SYNC] Task status: {last_status}") + + return { + 'status': last_status, + 'task': task + } + +@durable_execution +def lambda_handler(event, context: DurableContext): + """ + Lambda Durable Function that invokes an ECS task and waits for completion. + Uses the Durable Execution SDK for automatic checkpointing and replay. + + This function can run for up to 1 year, with automatic state management + and recovery from failures. + """ + + # Get configuration from environment variables + cluster = os.environ['ECS_CLUSTER'] + task_definition = os.environ['TASK_DEFINITION'] + subnet1 = os.environ['SUBNET_1'] + subnet2 = os.environ['SUBNET_2'] + security_group = os.environ['SECURITY_GROUP'] + + # Get input parameters + message = event.get('message', 'No message provided') + processing_time = event.get('processingTime', 5) + + try: + # Step 1: Start ECS task (checkpointed) + task_arn = context.step(start_ecs_task( + cluster, task_definition, subnet1, subnet2, + security_group, message, processing_time + )) + + # Poll for task completion using durable waits + max_attempts = 60 # 5 minutes max (60 * 5 seconds) + poll_interval = 5 # Check every 5 seconds + + for attempt in range(max_attempts): + # Wait before checking status (no compute charges during wait) + context.wait(Duration.from_seconds(poll_interval)) + + # Step 2: Check task status (checkpointed) + status_result = context.step(check_task_status(cluster, task_arn)) + + if status_result['status'] == 'STOPPED': + # Task completed + task = status_result['task'] + stop_code = task.get('stopCode', 'Unknown') + + if stop_code == 'EssentialContainerExited': + exit_code = task['containers'][0].get('exitCode', 1) + + if exit_code == 0: + context.logger.info(f"[SYNC] Task completed successfully") + return { + 'statusCode': 200, + 'body': json.dumps({ + 'status': 'success', + 'message': f'Processed: {message}', + 'processingTime': processing_time, + 'taskArn': task_arn + }) + } + else: + raise Exception(f"Task failed with exit code: {exit_code}") + else: + raise Exception(f"Task stopped unexpectedly: {stop_code}") + + # Timeout + raise Exception(f"Task did not complete within {max_attempts * poll_interval} seconds") + + except Exception as e: + context.logger.error(f"[SYNC] Error: {str(e)}") + return { + 'statusCode': 500, + 'body': json.dumps({ + 'status': 'error', + 'error': str(e) + }) + } diff --git a/sfn-ecs-python-sam/template.yaml b/lambda-ecs-durable-python-sam/template.yaml similarity index 59% rename from sfn-ecs-python-sam/template.yaml rename to lambda-ecs-durable-python-sam/template.yaml index 4c6662767..b04140fa6 100644 --- a/sfn-ecs-python-sam/template.yaml +++ b/lambda-ecs-durable-python-sam/template.yaml @@ -1,7 +1,7 @@ AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: > - Step Functions to ECS with Python - Demonstrates sync and callback patterns + Lambda Durable Functions to ECS with Python - Demonstrates durable execution patterns with ECS tasks Parameters: VpcCIDR: @@ -113,6 +113,21 @@ Resources: LogGroupName: !Sub /ecs/${AWS::StackName} RetentionInDays: 7 + # DynamoDB Table for Callback Pattern + CallbackTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: !Sub ${AWS::StackName}-callbacks + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: executionId + AttributeType: S + KeySchema: + - AttributeName: executionId + KeyType: HASH + StreamSpecification: + StreamViewType: NEW_AND_OLD_IMAGES + # ECS Task Execution Role ECSTaskExecutionRole: Type: AWS::IAM::Role @@ -139,16 +154,15 @@ Resources: Service: ecs-tasks.amazonaws.com Action: sts:AssumeRole Policies: - - PolicyName: StepFunctionsCallback + - PolicyName: DynamoDBCallback PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - - states:SendTaskSuccess - - states:SendTaskFailure - - states:SendTaskHeartbeat - Resource: '*' + - dynamodb:UpdateItem + - dynamodb:PutItem + Resource: !GetAtt CallbackTable.Arn # ECS Task Definition for Sync Pattern SyncTaskDefinition: @@ -233,19 +247,21 @@ Resources: from datetime import datetime def main(): - task_token = os.environ.get('TASK_TOKEN') + execution_id = os.environ.get('EXECUTION_ID') + callback_table = os.environ.get('CALLBACK_TABLE') message = os.environ.get('MESSAGE', 'No message provided') processing_time = int(os.environ.get('PROCESSING_TIME', '5')) - if not task_token: - print("[CALLBACK] ERROR: No task token provided!") + if not execution_id or not callback_table: + print("[CALLBACK] ERROR: Missing execution ID or callback table!") return - sfn_client = boto3.client('stepfunctions') + dynamodb = boto3.resource('dynamodb') + table = dynamodb.Table(callback_table) try: print(f"[CALLBACK] Starting processing: {message}") - print(f"[CALLBACK] Task token: {task_token[:20]}...") + print(f"[CALLBACK] Execution ID: {execution_id}") print(f"[CALLBACK] Will process for {processing_time} seconds") # Simulate processing @@ -258,22 +274,37 @@ Resources: "timestamp": datetime.utcnow().isoformat() + "Z" } - print(f"[CALLBACK] Sending success callback") - sfn_client.send_task_success( - taskToken=task_token, - output=json.dumps(result) + print(f"[CALLBACK] Sending callback to DynamoDB") + table.update_item( + Key={'executionId': execution_id}, + UpdateExpression='SET #status = :status, #result = :result', + ExpressionAttributeNames={ + '#status': 'status', + '#result': 'result' + }, + ExpressionAttributeValues={ + ':status': 'COMPLETED', + ':result': json.dumps(result) + } ) - print(f"[CALLBACK] Success callback sent!") + print(f"[CALLBACK] Callback sent successfully!") except Exception as e: print(f"[CALLBACK] ERROR: {str(e)}") try: - sfn_client.send_task_failure( - taskToken=task_token, - error='ProcessingError', - cause=str(e) + table.update_item( + Key={'executionId': execution_id}, + UpdateExpression='SET #status = :status, #error = :error', + ExpressionAttributeNames={ + '#status': 'status', + '#error': 'error' + }, + ExpressionAttributeValues={ + ':status': 'FAILED', + ':error': str(e) + } ) - print(f"[CALLBACK] Failure callback sent") + print(f"[CALLBACK] Error callback sent") except Exception as callback_error: print(f"[CALLBACK] Failed to send callback: {str(callback_error)}") @@ -287,79 +318,111 @@ Resources: awslogs-region: !Ref AWS::Region awslogs-stream-prefix: callback - # Step Functions Role - StepFunctionsRole: - Type: AWS::IAM::Role + # Lambda Function for Sync Pattern (Durable) + SyncLambdaFunction: + Type: AWS::Serverless::Function + Metadata: + Dockerfile: Dockerfile + DockerContext: ./src + DockerTag: python3.13-v1 Properties: - AssumeRolePolicyDocument: - Version: '2012-10-17' - Statement: - - Effect: Allow - Principal: - Service: states.amazonaws.com - Action: sts:AssumeRole + FunctionName: !Sub ${AWS::StackName}-sync-function + PackageType: Image + ImageConfig: + Command: + - sync_handler.lambda_handler + Timeout: 900 # 15 minutes (max Lambda timeout) + MemorySize: 256 + DurableConfig: + ExecutionTimeout: 3600 # 1 hour for durable execution + Environment: + Variables: + ECS_CLUSTER: !Ref ECSCluster + TASK_DEFINITION: !Ref SyncTaskDefinition + SUBNET_1: !Ref PublicSubnet1 + SUBNET_2: !Ref PublicSubnet2 + SECURITY_GROUP: !Ref ECSSecurityGroup Policies: - - PolicyName: ECSTaskExecution - PolicyDocument: - Version: '2012-10-17' - Statement: - - Effect: Allow - Action: - - ecs:RunTask - - ecs:StopTask - - ecs:DescribeTasks - Resource: '*' - - Effect: Allow - Action: - - iam:PassRole - Resource: - - !GetAtt ECSTaskExecutionRole.Arn - - !GetAtt ECSTaskRole.Arn - - Effect: Allow - Action: - - events:PutTargets - - events:PutRule - - events:DescribeRule - Resource: '*' - - # Synchronous Pattern State Machine - SyncStateMachine: - Type: AWS::Serverless::StateMachine - Properties: - Name: !Sub ${AWS::StackName}-sync-pattern - Type: STANDARD - Role: !GetAtt StepFunctionsRole.Arn - DefinitionUri: statemachine/sync-pattern.asl.json - DefinitionSubstitutions: - ECSCluster: !Ref ECSCluster - TaskDefinition: !Ref SyncTaskDefinition - Subnet1: !Ref PublicSubnet1 - Subnet2: !Ref PublicSubnet2 - SecurityGroup: !Ref ECSSecurityGroup + - Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - ecs:RunTask + - ecs:DescribeTasks + Resource: '*' + - Effect: Allow + Action: + - iam:PassRole + Resource: + - !GetAtt ECSTaskExecutionRole.Arn + - !GetAtt ECSTaskRole.Arn + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + Resource: '*' - # Callback Pattern State Machine - CallbackStateMachine: - Type: AWS::Serverless::StateMachine + # Lambda Function for Callback Pattern (Durable) + CallbackLambdaFunction: + Type: AWS::Serverless::Function + Metadata: + Dockerfile: Dockerfile + DockerContext: ./src + DockerTag: python3.13-v1 Properties: - Name: !Sub ${AWS::StackName}-callback-pattern - Type: STANDARD - Role: !GetAtt StepFunctionsRole.Arn - DefinitionUri: statemachine/callback-pattern.asl.json - DefinitionSubstitutions: - ECSCluster: !Ref ECSCluster - TaskDefinition: !Ref CallbackTaskDefinition - Subnet1: !Ref PublicSubnet1 - Subnet2: !Ref PublicSubnet2 - SecurityGroup: !Ref ECSSecurityGroup + FunctionName: !Sub ${AWS::StackName}-callback-function + PackageType: Image + ImageConfig: + Command: + - callback_handler.lambda_handler + Timeout: 60 + MemorySize: 256 + DurableConfig: + ExecutionTimeout: 3600 # 1 hour for durable execution + Environment: + Variables: + ECS_CLUSTER: !Ref ECSCluster + TASK_DEFINITION: !Ref CallbackTaskDefinition + SUBNET_1: !Ref PublicSubnet1 + SUBNET_2: !Ref PublicSubnet2 + SECURITY_GROUP: !Ref ECSSecurityGroup + CALLBACK_TABLE: !Ref CallbackTable + Policies: + - Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - ecs:RunTask + Resource: '*' + - Effect: Allow + Action: + - iam:PassRole + Resource: + - !GetAtt ECSTaskExecutionRole.Arn + - !GetAtt ECSTaskRole.Arn + - Effect: Allow + Action: + - dynamodb:PutItem + - dynamodb:UpdateItem + Resource: !GetAtt CallbackTable.Arn + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + Resource: '*' Outputs: - SyncStateMachineArn: - Description: ARN of the Synchronous Pattern State Machine - Value: !Ref SyncStateMachine + SyncLambdaFunctionArn: + Description: ARN of the Synchronous Pattern Lambda Function + Value: !GetAtt SyncLambdaFunction.Arn + + CallbackLambdaFunctionArn: + Description: ARN of the Callback Pattern Lambda Function + Value: !GetAtt CallbackLambdaFunction.Arn - CallbackStateMachineArn: - Description: ARN of the Callback Pattern State Machine - Value: !Ref CallbackStateMachine + CallbackTableName: + Description: DynamoDB table for callback tracking + Value: !Ref CallbackTable ECSClusterName: Description: Name of the ECS Cluster @@ -375,4 +438,4 @@ Outputs: LogGroupName: Description: CloudWatch Log Group for ECS tasks - Value: !Ref ECSLogGroup \ No newline at end of file + Value: !Ref ECSLogGroup diff --git a/sfn-ecs-python-sam/README.md b/sfn-ecs-python-sam/README.md deleted file mode 100644 index 01b84f6d9..000000000 --- a/sfn-ecs-python-sam/README.md +++ /dev/null @@ -1,365 +0,0 @@ -# AWS Step Functions to Amazon ECS with Python - -This pattern demonstrates how to invoke an Amazon ECS task from AWS Step Functions using Python, showcasing both **synchronous (polling)** and **callback** integration patterns. - -Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns - -**Important:** This application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. - -## Security Note - -This pattern is designed for learning and demonstration purposes. The IAM roles and security group use permissive configurations to simplify deployment and focus on the integration patterns: - -- **Security Group**: Allows all outbound traffic (required for pulling Docker images and calling AWS APIs) -- **IAM Roles**: Use wildcard (`*`) resources for ECS task management and Step Functions callbacks - -**For production use**, you should: -- Restrict security group egress to specific AWS service endpoints using VPC endpoints -- Scope IAM policies to specific resources (task definitions, state machines) -- Implement least privilege access based on your security requirements -- Consider using AWS PrivateLink for service-to-service communication -- Enable VPC Flow Logs for network traffic monitoring - -Deploy this pattern in a non-production AWS account or isolated environment for testing. - -## Requirements - -* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. -* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured -* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) -* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed - -## Architecture - -### Pattern 1: Synchronous (Polling) Integration - -``` -┌─────────────┐ ┌──────────────────┐ ┌─────────────┐ -│ Step │ │ ECS Task │ │ CloudWatch │ -│ Functions │─────▶│ (Python) │─────▶│ Logs │ -│ (Sync) │ │ │ │ │ -└─────────────┘ └──────────────────┘ └─────────────┘ - │ │ - │ │ - └───────────────────────┘ - Waits for completion -``` - -**How it works:** -1. Step Functions invokes the ECS task using `arn:aws:states:::ecs:runTask.sync` -2. Step Functions **polls** the task status automatically -3. The workflow **waits** until the ECS task completes -4. Once complete, Step Functions continues to the next state -5. If the task fails, Step Functions can catch the error and retry - -**Use cases:** -- Short to medium-duration tasks (< 1 year) -- When you need the task result before proceeding -- Simple workflows where waiting is acceptable - -### Pattern 2: Callback Integration - -``` -┌─────────────┐ ┌──────────────────┐ ┌─────────────┐ -│ Step │ │ ECS Task │ │ CloudWatch │ -│ Functions │─────▶│ (Python) │─────▶│ Logs │ -│ (Callback) │ │ │ │ │ -└─────────────┘ └──────────────────┘ └─────────────┘ - ▲ │ - │ │ - └───────────────────────┘ - Task sends callback -``` - -**How it works:** -1. Step Functions invokes the ECS task using `arn:aws:states:::ecs:runTask.waitForTaskToken` -2. Step Functions passes a **task token** to the ECS container via environment variable -3. Step Functions **pauses** and waits for a callback -4. The Python application in ECS processes the work -5. When done, the Python app calls `send_task_success()` or `send_task_failure()` with the task token -6. Step Functions receives the callback and continues - -**Use cases:** -- Long-running tasks -- Human approval workflows -- External system integrations -- When you need to decouple task execution from workflow progression - -## Deployment Instructions - -### Step 1: Clone the Repository - -```bash -git clone https://github.com/aws-samples/serverless-patterns -cd serverless-patterns/sfn-ecs-python-sam -``` - -### Step 2: Build and Deploy - -```bash -sam build -sam deploy --guided -``` - -During the prompts: -- **Stack Name**: `sfn-ecs-python-stack` -- **AWS Region**: Your preferred region (e.g., `us-east-1`) -- **Confirm changes before deploy**: Y -- **Allow SAM CLI IAM role creation**: Y -- **Save arguments to samconfig.toml**: Y - -### Step 3: Note the Outputs - -After deployment, note the following outputs: -- `SyncStateMachineArn` - ARN for the synchronous pattern -- `CallbackStateMachineArn` - ARN for the callback pattern -- `ECSClusterName` - Name of the ECS cluster -- `TaskDefinitionArn` - ARN of the task definition - -## How to Test - -### Testing the Synchronous Pattern - -1. **Start the execution:** - -```bash -aws stepfunctions start-execution \ - --state-machine-arn \ - --input '{"message": "Hello from sync pattern", "processingTime": 10}' -``` - -2. **Monitor the execution:** - -```bash -aws stepfunctions describe-execution \ - --execution-arn -``` - -3. **View the output:** - -The execution will wait for the ECS task to complete and return the result: - -```json -{ - "status": "success", - "message": "Processed: Hello from sync pattern", - "processingTime": 10, - "timestamp": "2024-02-05T10:30:00Z" -} -``` - -### Testing the Callback Pattern - -1. **Start the execution:** - -```bash -aws stepfunctions start-execution \ - --state-machine-arn \ - --input '{"message": "Hello from callback pattern", "processingTime": 30}' -``` - -2. **Monitor the execution:** - -```bash -aws stepfunctions describe-execution \ - --execution-arn -``` - -The execution will show status as `RUNNING` while waiting for the callback. - -3. **View ECS task logs:** - -```bash -aws logs tail /ecs/sfn-ecs-python-callback --follow -``` - -4. **The task will automatically send the callback** when processing completes. - -## Step-by-Step Explanation - -### Understanding the Synchronous Pattern - -**Step 1: State Machine Definition** - -The state machine uses the `.sync` integration pattern: - -```json -{ - "Type": "Task", - "Resource": "arn:aws:states:::ecs:runTask.sync", - "Parameters": { - "Cluster": "my-cluster", - "TaskDefinition": "my-task", - "LaunchType": "FARGATE", - "NetworkConfiguration": { - "AwsvpcConfiguration": { - "Subnets": ["subnet-xxx"], - "SecurityGroups": ["sg-xxx"], - "AssignPublicIp": "ENABLED" - } - }, - "Overrides": { - "ContainerOverrides": [{ - "Name": "python-container", - "Environment": [{ - "Name": "MESSAGE", - "Value.$": "$.message" - }] - }] - } - } -} -``` - -**Step 2: ECS Task Execution** - -The Python container starts and processes the input: - -```python -import os -import time -import json - -def main(): - message = os.environ.get('MESSAGE', 'No message') - processing_time = int(os.environ.get('PROCESSING_TIME', '5')) - - print(f"Processing: {message}") - time.sleep(processing_time) - - result = { - "status": "success", - "message": f"Processed: {message}", - "processingTime": processing_time - } - - print(json.dumps(result)) - -if __name__ == "__main__": - main() -``` - -**Step 3: Step Functions Polling** - -- Step Functions automatically polls ECS every few seconds -- Checks if the task is still running -- When the task completes (or fails), Step Functions captures the result -- The workflow continues to the next state - -### Understanding the Callback Pattern - -**Step 1: State Machine Definition** - -The state machine uses the `.waitForTaskToken` integration pattern: - -```json -{ - "Type": "Task", - "Resource": "arn:aws:states:::ecs:runTask.waitForTaskToken", - "Parameters": { - "Cluster": "my-cluster", - "TaskDefinition": "my-task", - "LaunchType": "FARGATE", - "Overrides": { - "ContainerOverrides": [{ - "Name": "python-container", - "Environment": [{ - "Name": "TASK_TOKEN", - "Value.$": "$$.Task.Token" - }, { - "Name": "MESSAGE", - "Value.$": "$.message" - }] - }] - } - } -} -``` - -**Key difference:** The `$$.Task.Token` is passed to the container. - -**Step 2: ECS Task with Callback** - -The Python container receives the task token and sends a callback: - -```python -import os -import boto3 -import json -import time - -def main(): - task_token = os.environ.get('TASK_TOKEN') - message = os.environ.get('MESSAGE', 'No message') - processing_time = int(os.environ.get('PROCESSING_TIME', '5')) - - sfn_client = boto3.client('stepfunctions') - - try: - print(f"Processing: {message}") - time.sleep(processing_time) - - result = { - "status": "success", - "message": f"Processed: {message}", - "processingTime": processing_time - } - - # Send success callback - sfn_client.send_task_success( - taskToken=task_token, - output=json.dumps(result) - ) - - except Exception as e: - # Send failure callback - sfn_client.send_task_failure( - taskToken=task_token, - error='ProcessingError', - cause=str(e) - ) - -if __name__ == "__main__": - main() -``` - -**Step 3: Callback Mechanism** - -- Step Functions pauses execution after starting the ECS task -- The task token acts as a unique identifier for this specific execution -- The Python app calls `send_task_success()` or `send_task_failure()` -- Step Functions receives the callback and resumes execution -- The output from the callback becomes the state output - -## Key Differences Between Patterns - -| Feature | Synchronous (.sync) | Callback (.waitForTaskToken) | -|---------|-------------------|------------------------------| -| **Max Duration** | 1 year | 1 year | -| **Polling** | Automatic by Step Functions | No polling needed | -| **Task Awareness** | Task doesn't know about Step Functions | Task must send callback | -| **Complexity** | Simple | Moderate (requires SDK calls) | -| **Use Case** | Standard batch jobs | Long-running, human approval, external systems | -| **Failure Handling** | Automatic | Manual (task must call send_task_failure) | -| **Cost** | State transitions for polling | Fewer state transitions | - -## Cleanup - -To delete the resources: - -```bash -sam delete -``` - -## Resources - -- [AWS Step Functions](https://aws.amazon.com/step-functions/) -- [Amazon ECS](https://aws.amazon.com/ecs/) -- [Step Functions ECS Integration](https://docs.aws.amazon.com/step-functions/latest/dg/connect-ecs.html) -- [Task Token Pattern](https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-wait-token) - ---- - -Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. - -SPDX-License-Identifier: MIT-0 \ No newline at end of file diff --git a/sfn-ecs-python-sam/example-pattern.json b/sfn-ecs-python-sam/example-pattern.json deleted file mode 100644 index 3f3fcad73..000000000 --- a/sfn-ecs-python-sam/example-pattern.json +++ /dev/null @@ -1,64 +0,0 @@ -{ - "title": "AWS Step Functions to Amazon ECS with Python", - "description": "Invoke ECS tasks from Step Functions using synchronous (polling) and callback integration patterns with Python containers", - "language": "Python", - "level": "200", - "framework": "SAM", - "introBox": { - "headline": "How it works", - "text": [ - "This pattern demonstrates two ways to integrate AWS Step Functions with Amazon ECS tasks running Python code:", - "1. Synchronous Pattern (.sync): Step Functions automatically polls the ECS task status and waits for completion", - "2. Callback Pattern (.waitForTaskToken): The ECS task receives a token and explicitly sends a callback when done", - "The pattern includes complete Python code running in ECS Fargate containers, VPC networking, and CloudWatch logging." - ] - }, - "gitHub": { - "template": { - "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/sfn-ecs-python-sam", - "templateURL": "serverless-patterns/sfn-ecs-python-sam", - "projectFolder": "sfn-ecs-python-sam", - "templateFile": "template.yaml" - } - }, - "resources": { - "bullets": [ - { - "text": "Run Amazon ECS or Fargate tasks with Step Functions", - "link": "https://docs.aws.amazon.com/step-functions/latest/dg/connect-ecs.html" - }, - { - "text": "Wait for a Callback with Task Token", - "link": "https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-wait-token" - }, - { - "text": "Amazon ECS Task Definitions", - "link": "https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task_definitions.html" - } - ] - }, - "deploy": { - "text": [ - "sam build", - "sam deploy --guided" - ] - }, - "testing": { - "text": [ - "See the GitHub repo for detailed testing instructions." - ] - }, - "cleanup": { - "text": [ - "Delete the stack: sam delete" - ] - }, - "authors": [ - { - "name": "Mian Tariq", - "image": "", - "bio": "Senior Delivery Consultant", - "linkedin": "" - } - ] -} \ No newline at end of file diff --git a/sfn-ecs-python-sam/statemachine/callback-pattern.asl.json b/sfn-ecs-python-sam/statemachine/callback-pattern.asl.json deleted file mode 100644 index c33c3be11..000000000 --- a/sfn-ecs-python-sam/statemachine/callback-pattern.asl.json +++ /dev/null @@ -1,87 +0,0 @@ -{ - "Comment": "Callback ECS Task Execution - Task sends callback when complete", - "StartAt": "Run ECS Task (Callback)", - "States": { - "Run ECS Task (Callback)": { - "Type": "Task", - "Resource": "arn:aws:states:::ecs:runTask.waitForTaskToken", - "Parameters": { - "Cluster": "${ECSCluster}", - "TaskDefinition": "${TaskDefinition}", - "LaunchType": "FARGATE", - "NetworkConfiguration": { - "AwsvpcConfiguration": { - "Subnets": [ - "${Subnet1}", - "${Subnet2}" - ], - "SecurityGroups": [ - "${SecurityGroup}" - ], - "AssignPublicIp": "ENABLED" - } - }, - "Overrides": { - "ContainerOverrides": [ - { - "Name": "python-callback-container", - "Environment": [ - { - "Name": "TASK_TOKEN", - "Value.$": "$$.Task.Token" - }, - { - "Name": "MESSAGE", - "Value.$": "$.message" - }, - { - "Name": "PROCESSING_TIME", - "Value.$": "States.Format('{}', $.processingTime)" - } - ] - } - ] - } - }, - "TimeoutSeconds": 300, - "HeartbeatSeconds": 60, - "Next": "Callback Received Successfully", - "Catch": [ - { - "ErrorEquals": [ - "States.Timeout" - ], - "Next": "Task Timed Out", - "ResultPath": "$.error" - }, - { - "ErrorEquals": [ - "States.ALL" - ], - "Next": "Task Failed", - "ResultPath": "$.error" - } - ] - }, - "Callback Received Successfully": { - "Type": "Pass", - "Parameters": { - "status": "completed", - "pattern": "callback", - "message": "ECS task sent callback successfully", - "result.$": "$" - }, - "End": true - }, - "Task Timed Out": { - "Type": "Fail", - "Error": "TaskTimeout", - "Cause": "The ECS task did not send a callback within the timeout period" - }, - "Task Failed": { - "Type": "Fail", - "Error": "ECSTaskFailed", - "Cause": "The ECS task failed or sent a failure callback" - } - } -} \ No newline at end of file diff --git a/sfn-ecs-python-sam/statemachine/sync-pattern.asl.json b/sfn-ecs-python-sam/statemachine/sync-pattern.asl.json deleted file mode 100644 index 55a2961e7..000000000 --- a/sfn-ecs-python-sam/statemachine/sync-pattern.asl.json +++ /dev/null @@ -1,69 +0,0 @@ -{ - "Comment": "Synchronous ECS Task Execution - Step Functions polls until task completes", - "StartAt": "Run ECS Task (Sync)", - "States": { - "Run ECS Task (Sync)": { - "Type": "Task", - "Resource": "arn:aws:states:::ecs:runTask.sync", - "Parameters": { - "Cluster": "${ECSCluster}", - "TaskDefinition": "${TaskDefinition}", - "LaunchType": "FARGATE", - "NetworkConfiguration": { - "AwsvpcConfiguration": { - "Subnets": [ - "${Subnet1}", - "${Subnet2}" - ], - "SecurityGroups": [ - "${SecurityGroup}" - ], - "AssignPublicIp": "ENABLED" - } - }, - "Overrides": { - "ContainerOverrides": [ - { - "Name": "python-sync-container", - "Environment": [ - { - "Name": "MESSAGE", - "Value.$": "$.message" - }, - { - "Name": "PROCESSING_TIME", - "Value.$": "States.Format('{}', $.processingTime)" - } - ] - } - ] - } - }, - "Next": "Task Completed Successfully", - "Catch": [ - { - "ErrorEquals": [ - "States.ALL" - ], - "Next": "Task Failed", - "ResultPath": "$.error" - } - ] - }, - "Task Completed Successfully": { - "Type": "Pass", - "Parameters": { - "status": "completed", - "pattern": "synchronous", - "message": "ECS task completed successfully", - "input.$": "$" - }, - "End": true - }, - "Task Failed": { - "Type": "Fail", - "Error": "ECSTaskFailed", - "Cause": "The ECS task failed to complete successfully" - } - } -} \ No newline at end of file