diff --git a/lambda-ecs-durable-python-sam/README.md b/lambda-ecs-durable-python-sam/README.md new file mode 100644 index 000000000..f3eddce4b --- /dev/null +++ b/lambda-ecs-durable-python-sam/README.md @@ -0,0 +1,320 @@ +# AWS Lambda Durable Functions to Amazon ECS with Python + +This pattern demonstrates how to invoke an Amazon ECS task from AWS Lambda Durable Functions using Python, showcasing resilient multi-step workflows with automatic checkpointing and state management. + +Lambda Durable Functions enable you to build resilient applications that can execute for up to one year while maintaining reliable progress despite interruptions. This pattern shows two integration approaches: **synchronous (polling with durable waits)** and **callback (async with durable steps)**. + +Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns + +**Important:** This application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. + +## What are Lambda Durable Functions? + +Lambda Durable Functions enable you to build resilient multi-step applications that can execute for up to one year while maintaining reliable progress despite interruptions. Key features include: + +- **Automatic Checkpointing**: Each step is automatically checkpointed, so your function can resume from the last completed step after interruptions +- **Cost-Effective Waits**: During wait operations, your function suspends without incurring compute charges +- **Built-in Retries**: Steps have automatic retry logic with progress tracking +- **Deterministic Replay**: When resuming, completed steps use stored results instead of re-executing + +This pattern uses the [AWS Durable Execution SDK for Python](https://docs.aws.amazon.com/lambda/latest/dg/durable-execution-sdk.html) to implement these capabilities. + +## Security Note + +This pattern is designed for learning and demonstration purposes. The IAM roles and security group use permissive configurations to simplify deployment and focus on the integration patterns: + +- **Security Group**: Allows all outbound traffic (required for pulling Docker images and calling AWS APIs) +- **IAM Roles**: Use wildcard (`*`) resources for ECS task management + +**For production use**, you should: +- Restrict security group egress to specific AWS service endpoints using VPC endpoints +- Scope IAM policies to specific resources (task definitions, DynamoDB tables) +- Implement least privilege access based on your security requirements +- Consider using AWS PrivateLink for service-to-service communication +- Enable VPC Flow Logs for network traffic monitoring +- Package the AWS SDK in your Lambda deployment package (13-14MB) instead of relying on the Lambda-provided runtime SDK +- Include the Durable Execution SDK in your deployment package for production (included in requirements.txt) + +Deploy this pattern in a non-production AWS account or isolated environment for testing. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed + +## Architecture + +### Pattern 1: Synchronous (Durable Polling) Integration + +``` +┌─────────────────────┐ ┌──────────────────┐ ┌─────────────┐ +│ Lambda Durable │ │ ECS Task │ │ CloudWatch │ +│ Function (Sync) │─────▶│ (Python) │─────▶│ Logs │ +│ │ │ │ │ │ +└─────────────────────┘ └──────────────────┘ └─────────────┘ + │ │ + │ Durable Wait (no charges) │ + └───────────────────────────────┘ + Polls with checkpointing +``` + +**How it works:** +1. Lambda durable function invokes the ECS task using `ecs:RunTask` (checkpointed step) +2. Function uses `context.wait()` to pause without compute charges +3. After each wait, function checks task status using `ecs:DescribeTasks` (checkpointed step) +4. If interrupted, function automatically resumes from last checkpoint +5. Once complete, Lambda returns the result +6. Can run for up to 1 year (vs 15 minutes for standard Lambda) + +**Key Durable Features:** +- `@durable_execution` decorator enables durable execution +- `@durable_step` decorator marks functions as checkpointed steps +- `context.wait()` suspends execution without charges +- Automatic replay and recovery from failures + +**Use cases:** +- Long-running tasks (hours to days) +- Tasks requiring reliable progress tracking +- Workflows that need automatic recovery +- Cost-sensitive polling operations + +**Advantages over standard Lambda:** +- No 15-minute timeout limitation +- Pay only for active execution time (not wait time) +- Automatic checkpointing and recovery +- Built-in retry logic + +### Pattern 2: Callback (Durable Async) Integration + +``` +┌─────────────────────┐ ┌──────────────────┐ ┌─────────────┐ +│ Lambda Durable │ │ ECS Task │ │ CloudWatch │ +│ Function (Callback)│─────▶│ (Python) │─────▶│ Logs │ +│ │ │ │ │ │ +└─────────────────────┘ └──────────────────┘ └─────────────┘ + │ │ │ + │ Checkpointed Steps │ │ + │ ▼ │ + │ ┌─────────────────┐ │ + └──────────────────────│ DynamoDB │◄─────────────┘ + │ Table │ + └─────────────────┘ +``` + +**How it works:** +1. Lambda durable function creates DynamoDB record (checkpointed step) +2. Lambda invokes the ECS task using `ecs:RunTask` (checkpointed step) +3. Lambda updates DynamoDB with task ARN (checkpointed step) +4. Lambda **returns immediately** (async pattern) +5. The Python application in ECS processes the work +6. When done, the ECS task updates DynamoDB with the result +7. If any step fails, automatic retry with checkpoint recovery + +**Key Durable Features:** +- Each step is automatically checkpointed +- If interrupted, function resumes from last completed step +- No re-execution of completed steps +- Reliable task initiation guaranteed + +**Use cases:** +- Fire-and-forget workflows +- Asynchronous processing +- When you don't need immediate results +- Decoupling task execution from API responses +- Workflows requiring guaranteed task initiation + +**Advantages:** +- Reliable task initiation with automatic recovery +- Minimal Lambda execution time +- Each step is independently retryable +- No risk of duplicate task creation (idempotent) + +## Deployment Instructions + +### Prerequisites + +* Python 3.13 or 3.14 runtime support for Lambda Durable Functions +* AWS SAM CLI version that supports DurableConfig and container images +* Docker installed (for building Lambda container images) + +### Step 1: Clone the Repository + +```bash +git clone https://github.com/aws-samples/serverless-patterns +cd serverless-patterns/lambda-ecs-python-sam +``` + +### Step 2: Build and Deploy + +This pattern uses Lambda container images with Python 3.13 to support durable functions. The build process will: +- Build Docker images with the Durable Execution SDK +- Create ECR repositories automatically +- Push images to ECR +- Deploy Lambda functions using the container images + +```bash +sam build +sam deploy --guided +``` + +During the prompts: +- **Stack Name**: `lambda-ecs-durable-demo` (or your preferred name) +- **AWS Region**: Your preferred region (e.g., `us-east-1`) +- **Parameter VpcCIDR**: Press Enter to use default (10.0.0.0/16) +- **Confirm changes before deploy**: Y +- **Allow SAM CLI IAM role creation**: Y +- **Disable rollback**: N +- **SyncLambdaFunction has no authorization defined**: Y +- **CallbackLambdaFunction has no authorization defined**: Y +- **Create managed ECR repositories for all functions**: Y (required for container images) +- **Save arguments to samconfig.toml**: Y + +The deployment will take 5-10 minutes as it creates VPC, ECS cluster, Lambda functions, and other resources. + +### Step 3: Note the Outputs + +After deployment, note the following outputs: +- `SyncLambdaFunctionArn` - ARN for the synchronous pattern Lambda +- `CallbackLambdaFunctionArn` - ARN for the callback pattern Lambda +- `CallbackTableName` - DynamoDB table for callback tracking +- `ECSClusterName` - Name of the ECS cluster +- `LogGroupName` - CloudWatch log group for ECS tasks + +**Important**: When invoking durable functions, you must use a qualified ARN (append `:$LATEST` to the function name). + +## How to Test + +### Testing the Synchronous (Durable) Pattern + +1. **Invoke the durable function asynchronously:** + +Lambda Durable Functions with execution timeout > 15 minutes must be invoked asynchronously. Use the `--invocation-type Event` flag and a qualified ARN (with `:$LATEST`): + +```bash +aws lambda invoke \ + --function-name lambda-ecs-durable-demo-sync-function:\$LATEST \ + --invocation-type Event \ + --cli-binary-format raw-in-base64-out \ + --payload '{"message": "Hello from durable sync pattern", "processingTime": 10}' \ + response.json +``` + +**Note**: The `\$LATEST` qualifier is required for durable functions. The backslash escapes the dollar sign in bash. + +2. **Monitor the Lambda execution logs:** + +```bash +aws logs tail /aws/lambda/lambda-ecs-durable-demo-sync-function --follow +``` + +You'll see: +- Task starting with checkpointed step +- Durable waits (no compute charges during waits) +- Status checks every 5 seconds (PROVISIONING → PENDING → RUNNING → STOPPED) +- Each check is a separate checkpointed operation +- Final result when task completes + +3. **View ECS task logs:** + +```bash +aws logs tail /ecs/lambda-ecs-durable-demo --follow +``` + +4. **View execution in Lambda console:** + +Navigate to the Lambda console → Your function → "Monitoring" tab → "Logs" to see the execution timeline and checkpoints. + +### Testing the Callback (Durable) Pattern + +1. **Invoke the durable function asynchronously:** + +```bash +aws lambda invoke \ + --function-name lambda-ecs-durable-demo-callback-function:\$LATEST \ + --invocation-type Event \ + --cli-binary-format raw-in-base64-out \ + --payload '{"message": "Hello from durable callback pattern", "processingTime": 30}' \ + response.json +``` + +2. **Monitor the Lambda execution logs:** + +```bash +aws logs tail /aws/lambda/lambda-ecs-durable-demo-callback-function --follow +``` + +You'll see: +- DynamoDB record creation (checkpointed) +- ECS task initiation (checkpointed) +- Function returns immediately + +3. **Check the status in DynamoDB:** + +```bash +# Scan the table to see all executions +aws dynamodb scan --table-name lambda-ecs-durable-demo-callbacks + +# Or get a specific execution (replace with your execution ID from logs) +aws dynamodb get-item \ + --table-name lambda-ecs-durable-demo-callbacks \ + --key '{"executionId": {"S": "YOUR-EXECUTION-ID"}}' +``` + +4. **Monitor ECS task logs:** + +```bash +aws logs tail /ecs/lambda-ecs-durable-demo --follow +``` + +The ECS task will update DynamoDB when processing is complete. You'll see the result in the `result` field with status `COMPLETED`. + +## Key Differences Between Patterns + +| Feature | Synchronous (Durable Polling) | Callback (Durable Async) | +|---------|------------------------------|--------------------------| +| **Execution Duration** | Up to 1 year | Up to 1 year | +| **Checkpointing** | Automatic for each step | Automatic for each step | +| **Wait Charges** | No charges during waits | N/A (returns immediately) | +| **Polling** | Durable waits between checks | No polling needed | +| **Task Awareness** | Task doesn't know about Lambda | Task updates DynamoDB | +| **Complexity** | Moderate (durable steps + waits) | Moderate (durable steps + DynamoDB) | +| **Use Case** | Long-running tasks needing results | Fire-and-forget workflows | +| **Cost** | Pay only for active execution | Minimal (quick execution) | +| **Result Retrieval** | Returned by function | Query DynamoDB | +| **Reliability** | Automatic recovery from failures | Guaranteed task initiation | + +## Benefits of Lambda Durable Functions + +Compared to standard Lambda functions: + +✅ **Extended Duration**: Execute for up to 1 year (vs 15 minutes) +✅ **Cost Optimization**: No charges during wait operations +✅ **Automatic Recovery**: Built-in checkpointing and replay +✅ **Simplified Code**: No manual state management needed +✅ **Reliable Execution**: Guaranteed progress despite interruptions +✅ **Built-in Retries**: Automatic retry logic for steps + +## Cleanup + +To delete the resources: + +```bash +sam delete +``` + +## Resources + +- [AWS Lambda Durable Functions](https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html) +- [Durable Execution SDK](https://docs.aws.amazon.com/lambda/latest/dg/durable-execution-sdk.html) +- [AWS Lambda](https://aws.amazon.com/lambda/) +- [Amazon ECS](https://aws.amazon.com/ecs/) +- [Amazon DynamoDB](https://aws.amazon.com/dynamodb/) +- [ECS RunTask API](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_RunTask.html) + +--- + +Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/lambda-ecs-durable-python-sam/example-pattern.json b/lambda-ecs-durable-python-sam/example-pattern.json new file mode 100644 index 000000000..51f7d21da --- /dev/null +++ b/lambda-ecs-durable-python-sam/example-pattern.json @@ -0,0 +1,68 @@ +{ + "title": "AWS Lambda Durable Functions to Amazon ECS with Python", + "description": "Invoke ECS tasks from Lambda Durable Functions with automatic checkpointing, state management, and resilient execution patterns", + "language": "Python", + "level": "300", + "framework": "SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern demonstrates AWS Lambda Durable Functions invoking Amazon ECS tasks with resilient, long-running execution capabilities:", + "1. Durable Synchronous Pattern: Lambda uses checkpointed steps and durable waits to poll ECS task status. Can run for up to 1 year with automatic recovery from failures. No compute charges during wait periods.", + "2. Durable Callback Pattern: Lambda uses checkpointed steps to reliably initiate ECS tasks. Each step (create record, start task, update status) is automatically checkpointed for guaranteed execution.", + "The pattern uses the AWS Durable Execution SDK for Python, providing automatic state management, checkpoint-based recovery, and cost-effective long-running workflows. Includes inline Python code in ECS containers, VPC networking, and DynamoDB for callback tracking." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/lambda-ecs-python-sam", + "templateURL": "serverless-patterns/lambda-ecs-python-sam", + "projectFolder": "lambda-ecs-python-sam", + "templateFile": "template.yaml" + } + }, + "resources": { + "bullets": [ + { + "text": "Lambda Durable Functions", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html" + }, + { + "text": "Durable Execution SDK", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-execution-sdk.html" + }, + { + "text": "Run Amazon ECS or Fargate tasks", + "link": "https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ecs_run_task.html" + }, + { + "text": "Amazon ECS Task Definitions", + "link": "https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task_definitions.html" + } + ] + }, + "deploy": { + "text": [ + "sam build", + "sam deploy --guided" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: sam delete" + ] + }, + "authors": [ + { + "name": "Mian Tariq", + "image": "", + "bio": "Senior Delivery Consultant", + "linkedin": "" + } + ] +} diff --git a/lambda-ecs-durable-python-sam/src/Dockerfile b/lambda-ecs-durable-python-sam/src/Dockerfile new file mode 100644 index 000000000..730a1f347 --- /dev/null +++ b/lambda-ecs-durable-python-sam/src/Dockerfile @@ -0,0 +1,14 @@ +FROM public.ecr.aws/lambda/python:3.13 + +# Copy requirements file +COPY requirements.txt ${LAMBDA_TASK_ROOT}/ + +# Install dependencies including durable SDK +RUN pip install -r requirements.txt + +# Copy function code +COPY sync_handler.py ${LAMBDA_TASK_ROOT}/ +COPY callback_handler.py ${LAMBDA_TASK_ROOT}/ + +# Default handler (will be overridden by template) +CMD [ "sync_handler.lambda_handler" ] diff --git a/lambda-ecs-durable-python-sam/src/callback_handler.py b/lambda-ecs-durable-python-sam/src/callback_handler.py new file mode 100644 index 000000000..97a3294c7 --- /dev/null +++ b/lambda-ecs-durable-python-sam/src/callback_handler.py @@ -0,0 +1,171 @@ +import json +import boto3 +import os +from aws_durable_execution_sdk_python import ( + DurableContext, + durable_execution, + durable_step, +) +from aws_durable_execution_sdk_python.config import Duration + +ecs_client = boto3.client('ecs') +dynamodb = boto3.resource('dynamodb') + +@durable_step +def create_execution_record(step_context, callback_table, execution_id, message, processing_time): + """ + Durable step that creates initial execution record in DynamoDB. + """ + step_context.logger.info(f"[CALLBACK] Creating execution record: {execution_id}") + + table = dynamodb.Table(callback_table) + table.put_item( + Item={ + 'executionId': execution_id, + 'status': 'RUNNING', + 'message': message, + 'processingTime': processing_time + } + ) + + return execution_id + +@durable_step +def start_ecs_task_async(step_context, cluster, task_definition, subnet1, subnet2, security_group, + message, processing_time, execution_id, callback_table): + """ + Durable step that starts an ECS task for async processing. + """ + step_context.logger.info(f"[CALLBACK] Starting ECS task with execution ID: {execution_id}") + + response = ecs_client.run_task( + cluster=cluster, + taskDefinition=task_definition, + launchType='FARGATE', + networkConfiguration={ + 'awsvpcConfiguration': { + 'subnets': [subnet1, subnet2], + 'securityGroups': [security_group], + 'assignPublicIp': 'ENABLED' + } + }, + overrides={ + 'containerOverrides': [ + { + 'name': 'python-callback-container', + 'environment': [ + {'name': 'MESSAGE', 'value': message}, + {'name': 'PROCESSING_TIME', 'value': str(processing_time)}, + {'name': 'EXECUTION_ID', 'value': execution_id}, + {'name': 'CALLBACK_TABLE', 'value': callback_table} + ] + } + ] + } + ) + + if not response['tasks']: + raise Exception("Failed to start ECS task") + + task_arn = response['tasks'][0]['taskArn'] + step_context.logger.info(f"[CALLBACK] Task started: {task_arn}") + + return task_arn + +@durable_step +def update_task_arn(step_context, callback_table, execution_id, task_arn): + """ + Durable step that updates DynamoDB with task ARN. + """ + step_context.logger.info(f"[CALLBACK] Updating task ARN in DynamoDB") + + table = dynamodb.Table(callback_table) + table.update_item( + Key={'executionId': execution_id}, + UpdateExpression='SET taskArn = :arn', + ExpressionAttributeValues={':arn': task_arn} + ) + + return True + +@durable_execution +def lambda_handler(event, context: DurableContext): + """ + Lambda Durable Function that invokes an ECS task asynchronously. + The ECS task will update DynamoDB when complete. + + This function uses durable execution to ensure reliable task initiation + with automatic checkpointing and recovery. + """ + + # Get configuration from environment variables + cluster = os.environ['ECS_CLUSTER'] + task_definition = os.environ['TASK_DEFINITION'] + subnet1 = os.environ['SUBNET_1'] + subnet2 = os.environ['SUBNET_2'] + security_group = os.environ['SECURITY_GROUP'] + callback_table = os.environ['CALLBACK_TABLE'] + + # Get input parameters + message = event.get('message', 'No message provided') + processing_time = event.get('processingTime', 5) + + # Generate unique execution ID from durable execution ARN + import uuid + execution_id = event.get('executionId', str(uuid.uuid4())) + + try: + # Step 1: Create execution record (checkpointed) + context.step(create_execution_record( + callback_table, execution_id, message, processing_time + )) + + # Step 2: Start ECS task (checkpointed) + task_arn = context.step(start_ecs_task_async( + cluster, task_definition, subnet1, subnet2, security_group, + message, processing_time, execution_id, callback_table + )) + + # Step 3: Update DynamoDB with task ARN (checkpointed) + context.step(update_task_arn(callback_table, execution_id, task_arn)) + + # Return immediately (async pattern) + return { + 'statusCode': 202, # Accepted + 'body': json.dumps({ + 'status': 'accepted', + 'message': 'ECS task started, will callback when complete', + 'executionId': execution_id, + 'taskArn': task_arn + }) + } + + except Exception as e: + context.logger.error(f"[CALLBACK] Error: {str(e)}") + + # Update DynamoDB with error + try: + table = dynamodb.Table(callback_table) + table.update_item( + Key={'executionId': execution_id}, + UpdateExpression='SET #status = :status, #error = :error', + ExpressionAttributeNames={ + '#status': 'status', + '#error': 'error' + }, + ExpressionAttributeValues={ + ':status': 'FAILED', + ':error': str(e) + } + ) + except: + pass + + return { + 'statusCode': 500, + 'body': json.dumps({ + 'status': 'error', + 'error': str(e), + 'executionId': execution_id + }) + } diff --git a/lambda-ecs-durable-python-sam/src/requirements.txt b/lambda-ecs-durable-python-sam/src/requirements.txt new file mode 100644 index 000000000..46ba39952 --- /dev/null +++ b/lambda-ecs-durable-python-sam/src/requirements.txt @@ -0,0 +1,2 @@ +boto3 +aws-durable-execution-sdk-python diff --git a/lambda-ecs-durable-python-sam/src/sync_handler.py b/lambda-ecs-durable-python-sam/src/sync_handler.py new file mode 100644 index 000000000..8ba2739e6 --- /dev/null +++ b/lambda-ecs-durable-python-sam/src/sync_handler.py @@ -0,0 +1,153 @@ +import json +import boto3 +import os +from aws_durable_execution_sdk_python import ( + DurableContext, + durable_execution, + durable_step, +) +from aws_durable_execution_sdk_python.config import Duration + +ecs_client = boto3.client('ecs') + +@durable_step +def start_ecs_task(step_context, cluster, task_definition, subnet1, subnet2, security_group, message, processing_time): + """ + Durable step that starts an ECS task. + This step is checkpointed, so if interrupted, it won't re-execute. + """ + step_context.logger.info(f"[SYNC] Starting ECS task with message: {message}") + + response = ecs_client.run_task( + cluster=cluster, + taskDefinition=task_definition, + launchType='FARGATE', + networkConfiguration={ + 'awsvpcConfiguration': { + 'subnets': [subnet1, subnet2], + 'securityGroups': [security_group], + 'assignPublicIp': 'ENABLED' + } + }, + overrides={ + 'containerOverrides': [ + { + 'name': 'python-sync-container', + 'environment': [ + {'name': 'MESSAGE', 'value': message}, + {'name': 'PROCESSING_TIME', 'value': str(processing_time)} + ] + } + ] + } + ) + + if not response['tasks']: + raise Exception("Failed to start ECS task") + + task_arn = response['tasks'][0]['taskArn'] + step_context.logger.info(f"[SYNC] Task started: {task_arn}") + + return task_arn + +@durable_step +def check_task_status(step_context, cluster, task_arn): + """ + Durable step that checks ECS task status. + This step is checkpointed and can be retried if it fails. + """ + step_context.logger.info(f"[SYNC] Checking task status: {task_arn}") + + describe_response = ecs_client.describe_tasks( + cluster=cluster, + tasks=[task_arn] + ) + + if not describe_response['tasks']: + raise Exception(f"Task not found: {task_arn}") + + task = describe_response['tasks'][0] + last_status = task['lastStatus'] + + step_context.logger.info(f"[SYNC] Task status: {last_status}") + + return { + 'status': last_status, + 'task': task + } + +@durable_execution +def lambda_handler(event, context: DurableContext): + """ + Lambda Durable Function that invokes an ECS task and waits for completion. + Uses the Durable Execution SDK for automatic checkpointing and replay. + + This function can run for up to 1 year, with automatic state management + and recovery from failures. + """ + + # Get configuration from environment variables + cluster = os.environ['ECS_CLUSTER'] + task_definition = os.environ['TASK_DEFINITION'] + subnet1 = os.environ['SUBNET_1'] + subnet2 = os.environ['SUBNET_2'] + security_group = os.environ['SECURITY_GROUP'] + + # Get input parameters + message = event.get('message', 'No message provided') + processing_time = event.get('processingTime', 5) + + try: + # Step 1: Start ECS task (checkpointed) + task_arn = context.step(start_ecs_task( + cluster, task_definition, subnet1, subnet2, + security_group, message, processing_time + )) + + # Poll for task completion using durable waits + max_attempts = 60 # 5 minutes max (60 * 5 seconds) + poll_interval = 5 # Check every 5 seconds + + for attempt in range(max_attempts): + # Wait before checking status (no compute charges during wait) + context.wait(Duration.from_seconds(poll_interval)) + + # Step 2: Check task status (checkpointed) + status_result = context.step(check_task_status(cluster, task_arn)) + + if status_result['status'] == 'STOPPED': + # Task completed + task = status_result['task'] + stop_code = task.get('stopCode', 'Unknown') + + if stop_code == 'EssentialContainerExited': + exit_code = task['containers'][0].get('exitCode', 1) + + if exit_code == 0: + context.logger.info(f"[SYNC] Task completed successfully") + return { + 'statusCode': 200, + 'body': json.dumps({ + 'status': 'success', + 'message': f'Processed: {message}', + 'processingTime': processing_time, + 'taskArn': task_arn + }) + } + else: + raise Exception(f"Task failed with exit code: {exit_code}") + else: + raise Exception(f"Task stopped unexpectedly: {stop_code}") + + # Timeout + raise Exception(f"Task did not complete within {max_attempts * poll_interval} seconds") + + except Exception as e: + context.logger.error(f"[SYNC] Error: {str(e)}") + return { + 'statusCode': 500, + 'body': json.dumps({ + 'status': 'error', + 'error': str(e) + }) + } diff --git a/lambda-ecs-durable-python-sam/template.yaml b/lambda-ecs-durable-python-sam/template.yaml new file mode 100644 index 000000000..b04140fa6 --- /dev/null +++ b/lambda-ecs-durable-python-sam/template.yaml @@ -0,0 +1,441 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: > + Lambda Durable Functions to ECS with Python - Demonstrates durable execution patterns with ECS tasks + +Parameters: + VpcCIDR: + Type: String + Default: 10.0.0.0/16 + Description: CIDR block for VPC + +Resources: + # VPC and Networking + VPC: + Type: AWS::EC2::VPC + Properties: + CidrBlock: !Ref VpcCIDR + EnableDnsHostnames: true + EnableDnsSupport: true + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-vpc + + PublicSubnet1: + Type: AWS::EC2::Subnet + Properties: + VpcId: !Ref VPC + CidrBlock: 10.0.1.0/24 + AvailabilityZone: !Select [0, !GetAZs ''] + MapPublicIpOnLaunch: true + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-public-subnet-1 + + PublicSubnet2: + Type: AWS::EC2::Subnet + Properties: + VpcId: !Ref VPC + CidrBlock: 10.0.2.0/24 + AvailabilityZone: !Select [1, !GetAZs ''] + MapPublicIpOnLaunch: true + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-public-subnet-2 + + InternetGateway: + Type: AWS::EC2::InternetGateway + Properties: + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-igw + + AttachGateway: + Type: AWS::EC2::VPCGatewayAttachment + Properties: + VpcId: !Ref VPC + InternetGatewayId: !Ref InternetGateway + + PublicRouteTable: + Type: AWS::EC2::RouteTable + Properties: + VpcId: !Ref VPC + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-public-rt + + PublicRoute: + Type: AWS::EC2::Route + DependsOn: AttachGateway + Properties: + RouteTableId: !Ref PublicRouteTable + DestinationCidrBlock: 0.0.0.0/0 + GatewayId: !Ref InternetGateway + + SubnetRouteTableAssociation1: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + SubnetId: !Ref PublicSubnet1 + RouteTableId: !Ref PublicRouteTable + + SubnetRouteTableAssociation2: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + SubnetId: !Ref PublicSubnet2 + RouteTableId: !Ref PublicRouteTable + + # Security Group + ECSSecurityGroup: + Type: AWS::EC2::SecurityGroup + Properties: + GroupDescription: Security group for ECS tasks + VpcId: !Ref VPC + SecurityGroupEgress: + - IpProtocol: -1 + CidrIp: 0.0.0.0/0 + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-ecs-sg + + # ECS Cluster + ECSCluster: + Type: AWS::ECS::Cluster + Properties: + ClusterName: !Sub ${AWS::StackName}-cluster + ClusterSettings: + - Name: containerInsights + Value: enabled + + # CloudWatch Log Group + ECSLogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: !Sub /ecs/${AWS::StackName} + RetentionInDays: 7 + + # DynamoDB Table for Callback Pattern + CallbackTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: !Sub ${AWS::StackName}-callbacks + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: executionId + AttributeType: S + KeySchema: + - AttributeName: executionId + KeyType: HASH + StreamSpecification: + StreamViewType: NEW_AND_OLD_IMAGES + + # ECS Task Execution Role + ECSTaskExecutionRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: ecs-tasks.amazonaws.com + Action: sts:AssumeRole + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy + + # ECS Task Role (for callback pattern) + ECSTaskRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: ecs-tasks.amazonaws.com + Action: sts:AssumeRole + Policies: + - PolicyName: DynamoDBCallback + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - dynamodb:UpdateItem + - dynamodb:PutItem + Resource: !GetAtt CallbackTable.Arn + + # ECS Task Definition for Sync Pattern + SyncTaskDefinition: + Type: AWS::ECS::TaskDefinition + Properties: + Family: !Sub ${AWS::StackName}-sync-task + NetworkMode: awsvpc + RequiresCompatibilities: + - FARGATE + Cpu: '256' + Memory: '512' + ExecutionRoleArn: !GetAtt ECSTaskExecutionRole.Arn + ContainerDefinitions: + - Name: python-sync-container + Image: public.ecr.aws/docker/library/python:3.11-slim + Essential: true + Command: + - /bin/sh + - -c + - | + pip install --quiet boto3 && python3 << 'EOF' + import os + import time + import json + from datetime import datetime + + def main(): + message = os.environ.get('MESSAGE', 'No message provided') + processing_time = int(os.environ.get('PROCESSING_TIME', '5')) + + print(f"[SYNC] Starting processing: {message}") + print(f"[SYNC] Will process for {processing_time} seconds") + + # Simulate processing + time.sleep(processing_time) + + result = { + "status": "success", + "message": f"Processed: {message}", + "processingTime": processing_time, + "timestamp": datetime.utcnow().isoformat() + "Z" + } + + print(f"[SYNC] Completed: {json.dumps(result)}") + return result + + if __name__ == "__main__": + main() + EOF + LogConfiguration: + LogDriver: awslogs + Options: + awslogs-group: !Ref ECSLogGroup + awslogs-region: !Ref AWS::Region + awslogs-stream-prefix: sync + + # ECS Task Definition for Callback Pattern + CallbackTaskDefinition: + Type: AWS::ECS::TaskDefinition + Properties: + Family: !Sub ${AWS::StackName}-callback-task + NetworkMode: awsvpc + RequiresCompatibilities: + - FARGATE + Cpu: '256' + Memory: '512' + ExecutionRoleArn: !GetAtt ECSTaskExecutionRole.Arn + TaskRoleArn: !GetAtt ECSTaskRole.Arn + ContainerDefinitions: + - Name: python-callback-container + Image: public.ecr.aws/docker/library/python:3.11-slim + Essential: true + Command: + - /bin/sh + - -c + - | + pip install --quiet boto3 && python3 << 'EOF' + import os + import boto3 + import json + import time + from datetime import datetime + + def main(): + execution_id = os.environ.get('EXECUTION_ID') + callback_table = os.environ.get('CALLBACK_TABLE') + message = os.environ.get('MESSAGE', 'No message provided') + processing_time = int(os.environ.get('PROCESSING_TIME', '5')) + + if not execution_id or not callback_table: + print("[CALLBACK] ERROR: Missing execution ID or callback table!") + return + + dynamodb = boto3.resource('dynamodb') + table = dynamodb.Table(callback_table) + + try: + print(f"[CALLBACK] Starting processing: {message}") + print(f"[CALLBACK] Execution ID: {execution_id}") + print(f"[CALLBACK] Will process for {processing_time} seconds") + + # Simulate processing + time.sleep(processing_time) + + result = { + "status": "success", + "message": f"Processed: {message}", + "processingTime": processing_time, + "timestamp": datetime.utcnow().isoformat() + "Z" + } + + print(f"[CALLBACK] Sending callback to DynamoDB") + table.update_item( + Key={'executionId': execution_id}, + UpdateExpression='SET #status = :status, #result = :result', + ExpressionAttributeNames={ + '#status': 'status', + '#result': 'result' + }, + ExpressionAttributeValues={ + ':status': 'COMPLETED', + ':result': json.dumps(result) + } + ) + print(f"[CALLBACK] Callback sent successfully!") + + except Exception as e: + print(f"[CALLBACK] ERROR: {str(e)}") + try: + table.update_item( + Key={'executionId': execution_id}, + UpdateExpression='SET #status = :status, #error = :error', + ExpressionAttributeNames={ + '#status': 'status', + '#error': 'error' + }, + ExpressionAttributeValues={ + ':status': 'FAILED', + ':error': str(e) + } + ) + print(f"[CALLBACK] Error callback sent") + except Exception as callback_error: + print(f"[CALLBACK] Failed to send callback: {str(callback_error)}") + + if __name__ == "__main__": + main() + EOF + LogConfiguration: + LogDriver: awslogs + Options: + awslogs-group: !Ref ECSLogGroup + awslogs-region: !Ref AWS::Region + awslogs-stream-prefix: callback + + # Lambda Function for Sync Pattern (Durable) + SyncLambdaFunction: + Type: AWS::Serverless::Function + Metadata: + Dockerfile: Dockerfile + DockerContext: ./src + DockerTag: python3.13-v1 + Properties: + FunctionName: !Sub ${AWS::StackName}-sync-function + PackageType: Image + ImageConfig: + Command: + - sync_handler.lambda_handler + Timeout: 900 # 15 minutes (max Lambda timeout) + MemorySize: 256 + DurableConfig: + ExecutionTimeout: 3600 # 1 hour for durable execution + Environment: + Variables: + ECS_CLUSTER: !Ref ECSCluster + TASK_DEFINITION: !Ref SyncTaskDefinition + SUBNET_1: !Ref PublicSubnet1 + SUBNET_2: !Ref PublicSubnet2 + SECURITY_GROUP: !Ref ECSSecurityGroup + Policies: + - Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - ecs:RunTask + - ecs:DescribeTasks + Resource: '*' + - Effect: Allow + Action: + - iam:PassRole + Resource: + - !GetAtt ECSTaskExecutionRole.Arn + - !GetAtt ECSTaskRole.Arn + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + Resource: '*' + + # Lambda Function for Callback Pattern (Durable) + CallbackLambdaFunction: + Type: AWS::Serverless::Function + Metadata: + Dockerfile: Dockerfile + DockerContext: ./src + DockerTag: python3.13-v1 + Properties: + FunctionName: !Sub ${AWS::StackName}-callback-function + PackageType: Image + ImageConfig: + Command: + - callback_handler.lambda_handler + Timeout: 60 + MemorySize: 256 + DurableConfig: + ExecutionTimeout: 3600 # 1 hour for durable execution + Environment: + Variables: + ECS_CLUSTER: !Ref ECSCluster + TASK_DEFINITION: !Ref CallbackTaskDefinition + SUBNET_1: !Ref PublicSubnet1 + SUBNET_2: !Ref PublicSubnet2 + SECURITY_GROUP: !Ref ECSSecurityGroup + CALLBACK_TABLE: !Ref CallbackTable + Policies: + - Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - ecs:RunTask + Resource: '*' + - Effect: Allow + Action: + - iam:PassRole + Resource: + - !GetAtt ECSTaskExecutionRole.Arn + - !GetAtt ECSTaskRole.Arn + - Effect: Allow + Action: + - dynamodb:PutItem + - dynamodb:UpdateItem + Resource: !GetAtt CallbackTable.Arn + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + Resource: '*' + +Outputs: + SyncLambdaFunctionArn: + Description: ARN of the Synchronous Pattern Lambda Function + Value: !GetAtt SyncLambdaFunction.Arn + + CallbackLambdaFunctionArn: + Description: ARN of the Callback Pattern Lambda Function + Value: !GetAtt CallbackLambdaFunction.Arn + + CallbackTableName: + Description: DynamoDB table for callback tracking + Value: !Ref CallbackTable + + ECSClusterName: + Description: Name of the ECS Cluster + Value: !Ref ECSCluster + + SyncTaskDefinitionArn: + Description: ARN of the Sync Task Definition + Value: !Ref SyncTaskDefinition + + CallbackTaskDefinitionArn: + Description: ARN of the Callback Task Definition + Value: !Ref CallbackTaskDefinition + + LogGroupName: + Description: CloudWatch Log Group for ECS tasks + Value: !Ref ECSLogGroup