| layout | title | nav_order | has_children | parent |
|---|---|---|---|---|
default |
OpenHands Tutorial - Chapter 7: Integration |
7 |
false |
OpenHands Tutorial |
Welcome to Chapter 7: Integration - Connecting Applications with External Services. In this part of OpenHands Tutorial: Autonomous Software Engineering Workflows, you will build an intuitive mental model first, then move into concrete implementation details and practical production tradeoffs.
Master OpenHands' integration capabilities for connecting applications with APIs, databases, third-party services, and complex system architectures.
OpenHands excels at integrating applications with external services, APIs, and databases. This chapter covers comprehensive integration patterns, from simple API calls to complex distributed system orchestration.
from openhands import OpenHands
# API integration agent
api_integrator = OpenHands()
# Complete REST API integration
rest_integration = api_integrator.run("""
Create comprehensive REST API integration for a weather application:
**External Weather API Integration:**
```python
# Current basic implementation
import requests
def get_weather(city):
api_key = "your-api-key"
url = f"http://api.weatherapi.com/v1/current.json?key={api_key}&q={city}"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
return {
'temperature': data['current']['temp_c'],
'condition': data['current']['condition']['text'],
'humidity': data['current']['humidity']
}
else:
raise Exception(f"API call failed: {response.status_code}")Enhance with:
- Request/Response Models - Pydantic models for type safety
- Error Handling - Comprehensive error handling and retry logic
- Rate Limiting - Respect API rate limits with backoff strategies
- Caching - Response caching to reduce API calls
- Authentication - API key, OAuth, JWT token management
- Monitoring - Request/response logging and metrics
- Fallback Mechanisms - Alternative data sources when API fails
- Configuration Management - Environment-based API configuration
- Testing - Mock API responses for testing
- Documentation - API usage documentation and examples
Include async support, circuit breaker patterns, and graceful degradation. """)
graphql_integration = api_integrator.run(""" Implement GraphQL API integration with advanced features:
GitHub GraphQL API Integration:
# Basic GraphQL query
def get_user_repos(username):
query = '''
query($username: String!) {
user(login: $username) {
repositories(first: 10) {
nodes {
name
description
stargazerCount
}
}
}
}'''
# Basic implementation
response = requests.post(
'https://api.github.com/graphql',
json={'query': query, 'variables': {'username': username}},
headers={'Authorization': f'bearer {token}'}
)
return response.json()Enhance with:
- Query Builders - Fluent API for constructing GraphQL queries
- Schema Introspection - Dynamic query building from GraphQL schema
- Subscription Support - Real-time data with GraphQL subscriptions
- Batch Operations - Multiple queries in single request
- Fragment Management - Reusable query fragments
- Type Safety - Generated types from GraphQL schema
- Caching Strategy - Intelligent caching based on query structure
- Error Handling - Detailed GraphQL error parsing
- Testing - Mock GraphQL server for testing
- Performance Optimization - Query optimization and batching """)
webhook_integration = api_integrator.run(""" Create webhook and real-time integration system:
Stripe Payment Webhooks:
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/webhooks/stripe', methods=['POST'])
def stripe_webhook():
payload = request.get_json()
# Basic webhook handling
if payload['type'] == 'payment_intent.succeeded':
payment_id = payload['data']['object']['id']
# Process successful payment
print(f"Payment {payment_id} succeeded")
return jsonify({'status': 'ok'})Enhance with:
- Webhook Verification - Signature verification for security
- Event Processing - Asynchronous event handling
- Retry Logic - Failed webhook retry with exponential backoff
- Idempotency - Prevent duplicate event processing
- Event Storage - Persistent event logging and auditing
- Monitoring - Webhook health and success rate monitoring
- Rate Limiting - Protect against webhook spam
- Testing - Webhook testing utilities and mock servers
- Documentation - Webhook event documentation
- Multi-Provider Support - Generic webhook handler for multiple providers """)
## Database Integration
### Multiple Database Systems
```python
# Database integration agent
db_integrator = OpenHands()
# Comprehensive database integration
multi_db_integration = db_integrator.run("""
Create multi-database integration layer supporting different database systems:
**Supported Databases:**
1. PostgreSQL - Advanced relational features
2. MongoDB - Document-based NoSQL
3. Redis - In-memory data structure store
4. Elasticsearch - Full-text search and analytics
**Core Features:**
```python
# Current basic implementation
class DatabaseManager:
def __init__(self, db_type, connection_string):
if db_type == 'postgres':
self.connection = psycopg2.connect(connection_string)
elif db_type == 'mongodb':
self.client = MongoClient(connection_string)
# ... other databases
Enhance with:
- Unified Interface - Common API across all database types
- Connection Pooling - Efficient connection management
- Query Builders - Type-safe query construction
- Migration System - Database schema migrations
- Transaction Management - ACID transaction support
- Caching Layer - Multi-level caching (Redis + application cache)
- ORM Integration - SQLAlchemy, MongoEngine, etc.
- Monitoring - Query performance and connection monitoring
- Backup/Restore - Automated backup and restore capabilities
- Security - Encrypted connections and access control
Include database-specific optimizations, indexing strategies, and performance tuning. """)
advanced_db_patterns = db_integrator.run(""" Implement advanced database integration patterns:
CQRS (Command Query Responsibility Segregation):
class CQRSExample:
def __init__(self):
self.write_db = PostgreSQLConnection() # For commands
self.read_db = ElasticsearchConnection() # For queries
def create_user(self, user_data):
# Command: Write to primary database
user_id = self.write_db.insert('users', user_data)
# Publish event for read model update
self.publish_event('user_created', {'user_id': user_id, **user_data})
return user_id
def get_user_profile(self, user_id):
# Query: Read from optimized read model
return self.read_db.search('users', {'id': user_id})Event Sourcing:
class EventSourcingExample:
def __init__(self):
self.event_store = EventStore()
self.projections = UserProjection()
def update_user_email(self, user_id, new_email):
# Store event instead of updating state directly
event = {
'type': 'user_email_updated',
'user_id': user_id,
'new_email': new_email,
'timestamp': datetime.utcnow()
}
self.event_store.append(event)
# Update read model
self.projections.apply_event(event)
def get_user(self, user_id):
# Reconstruct state from events
events = self.event_store.get_events(user_id)
return self.projections.reconstruct_state(events)Database Sharding:
class ShardedDatabase:
def __init__(self, shard_configs):
self.shards = [DatabaseConnection(config) for config in shard_configs]
self.shard_key = 'user_id'
def get_shard(self, record):
# Simple hash-based sharding
shard_id = hash(record[self.shard_key]) % len(self.shards)
return self.shards[shard_id]
def insert(self, table, record):
shard = self.get_shard(record)
return shard.insert(table, record)
def query(self, table, conditions):
# Query all shards if no shard key in conditions
if self.shard_key in conditions:
shard = self.get_shard({self.shard_key: conditions[self.shard_key]})
return shard.query(table, conditions)
else:
# Cross-shard query (more complex)
results = []
for shard in self.shards:
results.extend(shard.query(table, conditions))
return resultsInclude implementation details, trade-offs, and when to use each pattern. """)
## Third-Party Service Integration
### Payment Processing
```python
# Payment integration agent
payment_integrator = OpenHands()
# Complete payment processing integration
payment_integration = payment_integrator.run("""
Create comprehensive payment processing integration:
**Supported Payment Providers:**
1. Stripe - Credit card processing
2. PayPal - PayPal payments
3. Square - Point of sale integration
4. Adyen - Global payment processing
**Core Features:**
```python
class PaymentProcessor:
def __init__(self, provider='stripe'):
self.provider = provider
# Basic setup
def process_payment(self, amount, currency, payment_method):
# Basic payment processing
pass
Enhance with:
- Unified Interface - Common API across all payment providers
- Payment Methods - Credit cards, digital wallets, bank transfers
- Security - PCI compliance, tokenization, encryption
- Error Handling - Payment failures, declines, chargebacks
- Webhooks - Real-time payment status updates
- Refunds - Partial and full refund processing
- Subscriptions - Recurring payment management
- Fraud Detection - Risk assessment and fraud prevention
- Multi-Currency - International payment support
- Compliance - Regulatory compliance (SOX, GDPR, etc.)
Include testing with mock payments, error simulation, and comprehensive logging. """)
communication_integration = payment_integrator.run(""" Implement email and communication service integration:
Email Providers:
- SendGrid - Transactional email
- Mailgun - Email API service
- Amazon SES - AWS email service
- Postmark - Email delivery service
Communication Features:
class EmailService:
def send_email(self, to, subject, body):
# Basic email sending
pass
def send_template_email(self, template_id, to, variables):
# Template-based emails
passEnhance with:
- Template Engine - Dynamic email templates with variables
- Bulk Email - Mass email campaigns with personalization
- Email Tracking - Open rates, click tracking, bounces
- SMS Integration - Twilio, AWS SNS for text messaging
- Push Notifications - Firebase, OneSignal for mobile push
- Chat Integration - Slack, Discord, Microsoft Teams
- Multi-Channel - Unified messaging across channels
- Queue Management - Asynchronous message processing
- Analytics - Message delivery and engagement analytics
- Compliance - CAN-SPAM, GDPR compliance for communications
Include A/B testing, personalization, and deliverability optimization. """)
cloud_integration = payment_integrator.run(""" Create cloud service integration layer:
Cloud Providers:
- AWS - EC2, S3, Lambda, DynamoDB
- Google Cloud - Compute Engine, Cloud Storage, Cloud Functions
- Azure - VMs, Blob Storage, Functions, Cosmos DB
Integration Patterns:
class CloudManager:
def __init__(self, provider='aws'):
self.provider = provider
def upload_file(self, file_path, bucket):
# File upload to cloud storage
pass
def run_function(self, function_name, payload):
# Serverless function execution
passEnhance with:
- Storage Integration - File upload/download, CDN integration
- Compute Services - Serverless functions, container orchestration
- Database Services - Managed databases, caching layers
- AI/ML Services - Pre-trained models, custom model hosting
- Monitoring - Cloud-native monitoring and logging
- Security - IAM, encryption, compliance certifications
- Cost Optimization - Resource usage monitoring and optimization
- Multi-Cloud - Unified interface across cloud providers
- Disaster Recovery - Backup, failover, and recovery automation
- DevOps Integration - CI/CD, infrastructure as code
Include provider-specific optimizations, cost monitoring, and migration strategies. """)
## Message Queue and Event-Driven Integration
### Message Queue Systems
```python
# Message queue integration agent
queue_integrator = OpenHands()
# Complete message queue integration
message_queue_integration = queue_integrator.run("""
Implement comprehensive message queue integration:
**Supported Message Queues:**
1. RabbitMQ - Advanced message queuing
2. Apache Kafka - Distributed event streaming
3. Redis Queue - Simple in-memory queuing
4. Amazon SQS - AWS managed queue service
**Core Features:**
```python
class MessageQueue:
def __init__(self, queue_type='rabbitmq'):
self.queue_type = queue_type
def publish_message(self, queue_name, message):
# Publish message to queue
pass
def consume_messages(self, queue_name, callback):
# Consume messages with callback
pass
Enhance with:
- Message Serialization - JSON, Avro, Protocol Buffers
- Reliability - Message acknowledgments, dead letter queues
- Scalability - Consumer group scaling, partition management
- Monitoring - Queue depth, processing rates, error rates
- Security - Message encryption, authentication, authorization
- Batching - Message batching for efficiency
- Retry Logic - Exponential backoff, circuit breakers
- Event Sourcing - Event-driven architecture patterns
- Stream Processing - Real-time data processing pipelines
- Integration Testing - Message flow testing and validation
Include producer/consumer patterns, message routing, and error handling strategies. """)
event_driven_integration = queue_integrator.run(""" Create event-driven architecture integration:
Event Processing Patterns:
class EventProcessor:
def __init__(self):
self.event_handlers = {}
def register_handler(self, event_type, handler):
self.event_handlers[event_type] = handler
def process_event(self, event):
handler = self.event_handlers.get(event['type'])
if handler:
handler(event)Enhance with:
- Event Sourcing - Complete event history and replay
- CQRS Pattern - Command Query Responsibility Segregation
- Saga Pattern - Distributed transaction management
- Event Streaming - Real-time event processing with Kafka
- Event Stores - Persistent event storage and retrieval
- Event Versioning - Schema evolution and compatibility
- Event Routing - Content-based and header-based routing
- Monitoring - Event throughput, latency, and error tracking
- Testing - Event-driven test scenarios and mocking
- Documentation - Event schema documentation and API
Include event schema definitions, event storming facilitation, and integration patterns. """)
## Authentication and Authorization Integration
### OAuth and SSO Integration
```python
# Authentication integration agent
auth_integrator = OpenHands()
# Complete authentication system integration
auth_integration = auth_integrator.run("""
Create comprehensive authentication and authorization integration:
**Authentication Providers:**
1. Auth0 - Universal authentication platform
2. Firebase Auth - Google authentication service
3. AWS Cognito - AWS user identity management
4. Okta - Enterprise identity management
**Authorization Patterns:**
```python
class AuthManager:
def __init__(self, provider='auth0'):
self.provider = provider
def authenticate_user(self, credentials):
# User authentication
pass
def authorize_action(self, user, action, resource):
# Action authorization
pass
Enhance with:
- OAuth 2.0 Flows - Authorization code, implicit, client credentials
- JWT Token Management - Token generation, validation, refresh
- Role-Based Access Control - User roles and permissions
- Multi-Factor Authentication - SMS, TOTP, biometric factors
- Social Login - Google, Facebook, GitHub authentication
- Single Sign-On - Enterprise SSO integration
- Session Management - Secure session handling and timeouts
- Audit Logging - Authentication and authorization events
- Compliance - GDPR, SOX, HIPAA compliance features
- Testing - Authentication flow testing and mocking
Include security best practices, token security, and integration testing. """)
api_gateway_integration = auth_integrator.run(""" Implement API gateway integration for service orchestration:
API Gateway Features:
class APIGateway:
def __init__(self):
self.routes = {}
self.middlewares = []
def add_route(self, path, service, methods=['GET']):
self.routes[path] = {'service': service, 'methods': methods}
def handle_request(self, request):
# Route request to appropriate service
passEnhance with:
- Request Routing - Path-based, header-based, and content-based routing
- Rate Limiting - Request throttling and quota management
- Load Balancing - Service instance distribution
- Authentication - Centralized auth for all services
- Caching - Response caching and cache invalidation
- Transformation - Request/response transformation and adaptation
- Monitoring - Request tracking, latency monitoring, error rates
- Security - API key validation, CORS handling, security headers
- Documentation - Auto-generated API documentation
- Testing - Gateway testing and service mocking
Include service discovery, circuit breakers, and resilience patterns. """)
## File and Media Integration
### Cloud Storage Integration
```python
# File integration agent
file_integrator = OpenHands()
# Complete file and media integration
file_integration = file_integrator.run("""
Create comprehensive file and media integration system:
**Storage Providers:**
1. Amazon S3 - Object storage service
2. Google Cloud Storage - Cloud storage platform
3. Azure Blob Storage - Microsoft's cloud storage
4. Cloudflare R2 - S3-compatible storage
**File Processing Features:**
```python
class FileManager:
def __init__(self, provider='s3'):
self.provider = provider
def upload_file(self, file_path, destination):
# File upload to cloud storage
pass
def download_file(self, source, local_path):
# File download from cloud storage
pass
Enhance with:
- File Upload/Download - Resumable uploads, multipart uploads
- Media Processing - Image resizing, video transcoding, format conversion
- CDN Integration - Content delivery network for fast global access
- Security - Signed URLs, access control, encryption
- Backup and Sync - Automated backup, cross-region replication
- Metadata Management - File metadata, tagging, search
- Versioning - File version control and rollback
- Compression - Automatic compression and optimization
- Virus Scanning - Malware detection and prevention
- Analytics - Usage statistics, access patterns, cost tracking
Include large file handling, streaming uploads/downloads, and integration testing. """)
document_integration = file_integrator.run(""" Implement document processing and OCR integration:
Document Processing Services:
- Google Document AI - Document understanding and OCR
- AWS Textract - Document text extraction
- Azure Form Recognizer - Form and document analysis
- Adobe Document Services - PDF processing and manipulation
Document Features:
class DocumentProcessor:
def __init__(self, provider='google'):
self.provider = provider
def extract_text(self, document_path):
# OCR text extraction
pass
def extract_form_data(self, form_path):
# Form field extraction
passEnhance with:
- OCR Processing - Text extraction from images and PDFs
- Form Recognition - Structured data extraction from forms
- Document Classification - Automatic document type detection
- Table Extraction - Table and spreadsheet data extraction
- Signature Detection - Handwritten signature recognition
- Language Detection - Multi-language document processing
- Quality Assessment - OCR confidence scores and validation
- Batch Processing - High-volume document processing
- Integration APIs - REST APIs for document processing
- Workflow Automation - Automated document processing pipelines
Include error handling, retry logic, and performance optimization. """)
## Summary
In this chapter, we've covered OpenHands' comprehensive integration capabilities:
- **API Integration**: REST APIs, GraphQL, webhooks, real-time integration
- **Database Integration**: Multiple database systems, advanced patterns (CQRS, Event Sourcing)
- **Third-Party Services**: Payment processing, email/communication, cloud services
- **Message Queues**: Event-driven architecture, message processing patterns
- **Authentication**: OAuth, SSO, API gateways, authorization patterns
- **File/Media Integration**: Cloud storage, document processing, CDN integration
OpenHands can build complete, integrated systems that connect with external services, databases, and APIs while maintaining security, reliability, and performance.
## Key Takeaways
1. **Unified Interfaces**: Common APIs across different service providers
2. **Security First**: Secure authentication, encryption, and access control
3. **Reliability Patterns**: Circuit breakers, retries, fallback mechanisms
4. **Monitoring & Observability**: Comprehensive logging, metrics, and alerting
5. **Scalability**: Load balancing, caching, and performance optimization
6. **Testing Integration**: Mock services, integration testing, and validation
Next, we'll explore **advanced projects** - building complete applications, microservices, and complex system architectures.
---
**Ready for the next chapter?** [Chapter 8: Advanced Projects](08-advanced-projects.md)
*Generated for [Awesome Code Docs](https://github.com/johnxie/awesome-code-docs)*
## What Problem Does This Solve?
Most teams struggle here because the hard part is not writing more code, but deciding clear boundaries for `integration`, `processing`, `Event` so behavior stays predictable as complexity grows.
In practical terms, this chapter helps you avoid three common failures:
- coupling core logic too tightly to one implementation path
- missing the handoff boundaries between setup, execution, and validation
- shipping changes without clear rollback or observability strategy
After working through this chapter, you should be able to reason about `Chapter 7: Integration - Connecting Applications with External Services` as an operating subsystem inside **OpenHands Tutorial: Autonomous Software Engineering Workflows**, with explicit contracts for inputs, state transitions, and outputs.
Use the implementation notes around `Enhance`, `Include`, `service` as your checklist when adapting these patterns to your own repository.
## How it Works Under the Hood
Under the hood, `Chapter 7: Integration - Connecting Applications with External Services` usually follows a repeatable control path:
1. **Context bootstrap**: initialize runtime config and prerequisites for `integration`.
2. **Input normalization**: shape incoming data so `processing` receives stable contracts.
3. **Core execution**: run the main logic branch and propagate intermediate state through `Event`.
4. **Policy and safety checks**: enforce limits, auth scopes, and failure boundaries.
5. **Output composition**: return canonical result payloads for downstream consumers.
6. **Operational telemetry**: emit logs/metrics needed for debugging and performance tuning.
When debugging, walk this sequence in order and confirm each stage has explicit success/failure conditions.
## Source Walkthrough
Use the following upstream sources to verify implementation details while reading this chapter:
- [OpenHands Repository](https://github.com/OpenHands/OpenHands)
Why it matters: authoritative reference on `OpenHands Repository` (github.com).
- [OpenHands Docs](https://docs.openhands.dev/)
Why it matters: authoritative reference on `OpenHands Docs` (docs.openhands.dev).
- [OpenHands Releases](https://github.com/OpenHands/OpenHands/releases)
Why it matters: authoritative reference on `OpenHands Releases` (github.com).
Suggested trace strategy:
- search upstream code for `integration` and `processing` to map concrete implementation paths
- compare docs claims against actual runtime/config code before reusing patterns in production
## Chapter Connections
- [Tutorial Index](README.md)
- [Previous Chapter: Chapter 6: Refactoring - Code Structure Improvement and Modernization](06-refactoring.md)
- [Next Chapter: Chapter 8: Advanced Projects - Complete Applications and System Architectures](08-advanced-projects.md)
- [Main Catalog](../../README.md#-tutorial-catalog)
- [A-Z Tutorial Directory](../../discoverability/tutorial-directory.md)