Production-ready Python implementation with 100% feature completeness - The flagship reference implementation.
This FastAPI application demonstrates production-grade best practices for integrating with the DevStack Core infrastructure. It showcases secure credential management, circuit breakers, rate limiting, resilience patterns, response caching, observability, and comprehensive error handling with 188 unit tests.
- Complete infrastructure integration - All services: Vault, PostgreSQL, MySQL, MongoDB, Redis, RabbitMQ
- Advanced resilience patterns - Circuit breakers prevent cascading failures
- Rate limiting - IP-based rate limiting (100-1000 req/min by endpoint)
- Response caching - Automatic caching with TTL configuration
- 188 comprehensive unit tests - Extensive test coverage with pytest
- Async/await throughout - Modern Python async patterns with asyncio
- Structured logging - JSON-formatted logs for aggregation
- Real Prometheus metrics - HTTP requests, cache ops, circuit breakers
- Request tracing - Distributed request ID correlation
- Interactive API docs - Auto-generated Swagger UI and ReDoc
- Features Overview
- API Endpoints
- Architecture
- Security Features
- Middleware
- Monitoring & Observability
- Quick Start
- Development
- Testing
- Environment Variables
- Vault Integration: Secure credential fetching with HashiCorp Vault
- Database Connections: PostgreSQL, MySQL, MongoDB with connection pooling
- Caching: Redis cluster integration with response caching
- Messaging: RabbitMQ pub/sub patterns with queue management
- Health Monitoring: Comprehensive health checks for all services
- Redis Cluster: Full cluster management and slot distribution monitoring
- Circuit Breakers: Prevent cascading failures across all services
- Rate Limiting: IP-based rate limiting (100-1000 req/min depending on endpoint)
- Request Validation: Content-type and size validation
- Response Caching: Automatic response caching with TTL configuration
- Structured Logging: JSON-formatted logs for aggregation
- Prometheus Metrics: HTTP requests, cache operations, circuit breakers
- CORS Security: Environment-aware CORS configuration
- Request Tracing: Distributed request ID correlation
Comprehensive health monitoring for all infrastructure services.
| Method | Endpoint | Description | Rate Limit | Cache TTL |
|---|---|---|---|---|
| GET | /health/ |
Simple health check (no dependencies) | 200/min | None |
| GET | /health/all |
Aggregate health of all services | 200/min | 30s |
| GET | /health/vault |
Vault connectivity and status | 200/min | None |
| GET | /health/postgres |
PostgreSQL connection test | 200/min | None |
| GET | /health/mysql |
MySQL connection test | 200/min | None |
| GET | /health/mongodb |
MongoDB connection test | 200/min | None |
| GET | /health/redis |
Redis cluster health | 200/min | None |
| GET | /health/rabbitmq |
RabbitMQ connectivity | 200/min | None |
Health Check Response Format:
{
"status": "healthy|degraded",
"services": {
"vault": {
"status": "healthy|unhealthy",
"details": {
"initialized": true,
"sealed": false,
"standby": false
}
},
"postgres": {
"status": "healthy",
"version": "PostgreSQL 16.6"
},
"redis": {
"status": "healthy",
"cluster_enabled": true,
"cluster_state": "ok",
"total_nodes": 3
}
}
}Secure credential management using HashiCorp Vault KV v2 secrets engine.
| Method | Endpoint | Description | Cache TTL | Response |
|---|---|---|---|---|
| GET | /examples/vault/secret/{service_name} |
Fetch all credentials for a service | 5 min | All secret fields (passwords masked) |
| GET | /examples/vault/secret/{service_name}/{key} |
Fetch specific credential field | 5 min | Single field value (masked if sensitive) |
Parameters:
service_name: Service identifier (e.g.,postgres,mysql,redis-1)- Pattern:
^[a-zA-Z0-9_-]+$ - Length: 1-50 characters
- Pattern:
key: Specific credential key (e.g.,user,password,database)- Pattern:
^[a-zA-Z0-9_-]+$ - Length: 1-100 characters
- Pattern:
Example Response:
GET /examples/vault/secret/postgres
{
"service": "postgres",
"data": {
"user": "dev_admin",
"password": "***",
"database": "dev_database",
"host": "postgres",
"port": "5432"
},
"note": "Passwords are masked in API responses for security"
}Security:
- All passwords automatically masked in responses
- Credentials cached for 5 minutes
- Circuit breaker protection (5 failures = open circuit for 60s)
- All access logged with request ID
Demonstrates database connectivity with Vault-managed credentials.
| Method | Endpoint | Description | Database |
|---|---|---|---|
| GET | /examples/database/postgres/query |
Execute test query on PostgreSQL | PostgreSQL |
| GET | /examples/database/mysql/query |
Execute test query on MySQL | MySQL |
| GET | /examples/database/mongodb/query |
List collections on MongoDB | MongoDB |
PostgreSQL Response:
{
"database": "PostgreSQL",
"query": "SELECT current_timestamp",
"result": "2025-10-27 12:34:56.789123+00"
}MongoDB Response:
{
"database": "MongoDB",
"collections": ["users", "sessions", "logs"],
"count": 3
}Features:
- Credentials fetched from Vault at runtime
- Connection pooling (asyncpg for PostgreSQL, aiomysql for MySQL, Motor for MongoDB)
- Circuit breaker protection per database
- Comprehensive error handling with specific exception types
Redis-based caching with TTL support and validation.
| Method | Endpoint | Description | Parameters |
|---|---|---|---|
| GET | /examples/cache/{key} |
Get value from cache | key: Cache key (1-200 chars) |
| POST | /examples/cache/{key} |
Set value with optional TTL | key: Cache keyvalue: Value to cache (query, max 10KB)ttl: Expiration in seconds (query, 1-86400s, optional) |
| DELETE | /examples/cache/{key} |
Delete value from cache | key: Cache key |
Cache Key Validation:
- Pattern:
^[a-zA-Z0-9_:.-]+$(alphanumeric, underscore, colon, hyphen, dot) - Length: 1-200 characters
- Whitespace automatically stripped
Value Constraints:
- Maximum size: 10KB (10,000 characters)
- TTL range: 1 second to 24 hours (86,400 seconds)
GET Response:
{
"key": "user:123",
"value": "{\"id\":123,\"name\":\"John\"}",
"exists": true,
"ttl": 3599 // seconds remaining, or "no expiration"
}POST Response:
{
"key": "session:abc123",
"value": "active",
"ttl": 3600,
"action": "set"
}DELETE Response:
{
"key": "temp:data",
"deleted": true,
"action": "delete"
}RabbitMQ message publishing and queue management.
| Method | Endpoint | Description | Parameters |
|---|---|---|---|
| POST | /examples/messaging/publish |
Publish message to queue | queue_name: Queue name (query, 1-100 chars)message: JSON message body (max 1MB) |
| GET | /examples/messaging/queue/{queue_name}/info |
Get queue information | queue_name: Queue name (path) |
Queue Name Validation:
- Pattern:
^[a-zA-Z0-9_.-]+$ - Length: 1-100 characters
Message Constraints:
- Format: JSON object
- Maximum size: 1MB
- Cannot be empty
Publish Response:
{
"queue": "notifications",
"message": {"type": "email", "to": "user@example.com"},
"action": "published"
}Queue Info Response:
{
"queue": "notifications",
"exists": true,
"message_count": 42,
"consumer_count": 2
}Features:
- Durable queues (survive broker restart)
- Credentials from Vault
- Circuit breaker protection
- Configurable vhost (default:
dev_vhost)
Full Redis cluster introspection and management capabilities.
| Method | Endpoint | Description |
|---|---|---|
| GET | /redis/cluster/nodes |
Get all cluster nodes with roles and slots |
| GET | /redis/cluster/slots |
Get slot distribution across masters and replicas |
| GET | /redis/cluster/info |
Get cluster state and statistics |
| GET | /redis/nodes/{node_name}/info |
Get detailed info for specific node |
Cluster Nodes Response:
{
"status": "success",
"total_nodes": 6,
"nodes": [
{
"node_id": "abc123...",
"host": "172.20.0.20",
"port": 6379,
"role": "master",
"flags": ["master", "myself"],
"master_id": null,
"ping_sent": "0",
"pong_recv": "1698765432",
"config_epoch": 1,
"link_state": "connected",
"slots_count": 5461,
"slot_ranges": [
{"start": 0, "end": 5460}
]
}
]
}Cluster Slots Response:
{
"status": "success",
"total_slots": 16384,
"max_slots": 16384,
"coverage_percentage": 100.0,
"slot_distribution": [
{
"start_slot": 0,
"end_slot": 5460,
"slots_count": 5461,
"master": {
"host": "172.20.0.20",
"port": 6379,
"node_id": "abc123..."
},
"replicas": [
{
"host": "172.20.0.21",
"port": 6379,
"node_id": "def456..."
}
]
}
]
}Cluster Info Response:
{
"status": "success",
"cluster_info": {
"cluster_state": "ok",
"cluster_slots_assigned": 16384,
"cluster_slots_ok": 16384,
"cluster_slots_pfail": 0,
"cluster_slots_fail": 0,
"cluster_known_nodes": 6,
"cluster_size": 3,
"cluster_current_epoch": 6,
"cluster_my_epoch": 1,
"cluster_stats_messages_sent": 123456,
"cluster_stats_messages_received": 123450
}
}Node Info Response:
{
"status": "success",
"node": "redis-1",
"info": {
"redis_version": "7.4.0",
"redis_mode": "cluster",
"os": "Linux 6.1.0-26-cloud-amd64 aarch64",
"arch_bits": 64,
"multiplexing_api": "epoll",
"uptime_in_seconds": 86400,
"uptime_in_days": 1,
"used_memory": "2048576",
"used_memory_human": "2.00M",
// ... many more fields
}
}Supported Nodes:
redis-1: Typically master for slots 0-5460redis-2: Typically master for slots 5461-10922redis-3: Typically master for slots 10923-16383
Prometheus-formatted metrics for monitoring and alerting.
Endpoint: GET /metrics
Rate Limit: 1000 requests/minute
Response Format: Prometheus text exposition format
Metrics Exposed:
HTTP Metrics:
http_requests_total{method, endpoint, status}- Total requests by endpoint and statushttp_request_duration_seconds{method, endpoint}- Request latency histogramhttp_requests_in_progress{method, endpoint}- Current in-flight requestshttp_errors_total{error_type, status_code}- Error count by type
Cache Metrics:
cache_hits_total{endpoint}- Cache hit countcache_misses_total{endpoint}- Cache miss countcache_invalidations_total{pattern}- Cache invalidation count
Circuit Breaker Metrics:
circuit_breaker_opened_total{service}- Times circuit openedcircuit_breaker_half_open_total{service}- Times circuit entered half-open statecircuit_breaker_closed_total{service}- Times circuit recoveredcircuit_breaker_failures_total{service}- Total failures per service
Application Info:
app_info{version, name}- Application metadata
reference-apps/fastapi/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI application setup
│ ├── config.py # Settings management
│ ├── exceptions.py # Custom exception hierarchy
│ ├── middleware/
│ │ ├── __init__.py
│ │ ├── cache.py # Response caching middleware
│ │ ├── circuit_breaker.py # Circuit breaker pattern
│ │ └── exception_handlers.py # Centralized error handling
│ ├── models/
│ │ ├── __init__.py
│ │ ├── requests.py # Pydantic request models
│ │ └── responses.py # Pydantic response models
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── health.py # Health check endpoints
│ │ ├── vault_demo.py # Vault examples
│ │ ├── database_demo.py # Database examples
│ │ ├── cache_demo.py # Redis caching examples
│ │ ├── messaging_demo.py # RabbitMQ examples
│ │ └── redis_cluster.py # Redis cluster management
│ └── services/
│ ├── __init__.py
│ └── vault.py # Vault client implementation
├── tests/
│ ├── conftest.py # Pytest fixtures
│ ├── test_caching.py # Cache tests
│ ├── test_circuit_breaker.py # Circuit breaker tests
│ ├── test_cors.py # CORS tests
│ ├── test_database_demo.py # Database integration tests
│ ├── test_exception_handlers.py # Exception handler tests
│ ├── test_exceptions.py # Exception hierarchy tests
│ ├── test_health_routers.py # Health check tests
│ ├── test_rate_limiting.py # Rate limiting tests
│ ├── test_redis_cluster.py # Redis cluster tests
│ ├── test_request_validation.py # Input validation tests
│ ├── test_routers_unit.py # Router unit tests
│ └── test_vault_service.py # Vault service tests
├── Dockerfile
├── init.sh # Container initialization
├── pytest.ini # Pytest configuration
├── requirements.txt # Python dependencies
├── start.sh # Application startup script
└── README.md # This file
BaseAPIException
├── ServiceUnavailableError (503)
│ ├── VaultUnavailableError
│ ├── DatabaseConnectionError
│ ├── CacheConnectionError
│ ├── MessageQueueError
│ └── CircuitBreakerError
├── ConfigurationError (500)
├── ValidationError (422)
├── ResourceNotFoundError (404)
├── AuthenticationError (401)
├── RateLimitError (429)
└── TimeoutError (504)
- Vault Integration: All credentials fetched from HashiCorp Vault
- AppRole Authentication: Secure Vault authentication using AppRole method (recommended for production)
- Fallback Authentication: Automatic fallback to token-based authentication if AppRole is not configured
- No Hardcoded Secrets: No credentials stored in environment variables or configuration files
- Password Masking: Sensitive fields masked in API responses
- Runtime Credentials: Credentials loaded at request time, never cached long-term
The reference API uses HashiCorp Vault AppRole authentication for secure credential management:
How it Works:
- AppRole credentials (
role-idandsecret-id) are mounted into the container from~/.config/vault/approles/reference-api/ - On startup, the application reads these credentials from
/vault-approles/reference-api/ - Exchanges
role-idandsecret-idfor a Vault client token via/v1/auth/approle/login - Uses the obtained token (hvs. prefix) for all subsequent Vault operations
- If AppRole authentication fails, falls back to
VAULT_TOKENenvironment variable
Configuration:
# app/config.py
VAULT_APPROLE_DIR: str = os.getenv("VAULT_APPROLE_DIR", "/vault-approles/reference-api")Docker Compose Setup:
volumes:
- ${HOME}/.config/vault/approles/reference-api:/vault-approles/reference-api:ro
environment:
VAULT_ADDR: ${VAULT_ADDR:-http://vault:8200}
# Note: No VAULT_TOKEN - forces AppRole authenticationFallback Mechanism:
# app/services/vault.py
def __init__(self):
if settings.VAULT_APPROLE_DIR and os.path.exists(settings.VAULT_APPROLE_DIR):
try:
self.vault_token = self._login_with_approle()
logger.info("Successfully authenticated to Vault using AppRole")
except Exception as e:
logger.warning(f"AppRole authentication failed: {e}, falling back to token-based auth")
self.vault_token = settings.VAULT_TOKEN
else:
logger.info("Using token-based authentication (AppRole directory not found)")
self.vault_token = settings.VAULT_TOKENBenefits:
- More secure than root tokens (limited permissions, renewable, revocable)
- Follows HashiCorp's recommended authentication method for applications
- No root token exposure in environment variables
- Automatic fallback ensures compatibility with development setups
- Token rotation without container restart (when using wrapped secret-id)
- IP-based Limiting: Using slowapi (Flask-Limiter port)
- Configurable Rates:
- General endpoints: 100 requests/minute
- Metrics endpoint: 1000 requests/minute
- Health checks: 200 requests/minute
- 429 Responses: Returns
Retry-Afterheader
- Content-Type Validation: POST/PUT/PATCH requests must have valid content type
- Request Size Limits: 10MB maximum request body
- Input Sanitization: Regex pattern validation on all inputs
- Allowed Content Types:
application/json,application/x-www-form-urlencoded,multipart/form-data,text/plain
- Environment-Aware:
- Development (
DEBUG=true): Allow all origins (*) - Production (
DEBUG=false): Explicit origin whitelist
- Development (
- Allowed Origins (production):
http://localhost:3000(React/Next.js)http://localhost:8000(self)http://localhost:8080(common dev)
- Credentials: Only with explicit origins (not with
*) - Preflight Caching: 600 seconds (10 minutes)
- Service Protection: All external services (Vault, databases, Redis, RabbitMQ)
- Configuration:
- Failure threshold: 5 consecutive failures
- Reset timeout: 60 seconds
- States: CLOSED → OPEN → HALF_OPEN → CLOSED
- Benefits: Prevents cascading failures, improves system stability
- Structured Responses: Consistent error format across all endpoints
- Request Correlation: Every request has unique request ID
- Debug Mode Control: Stack traces only in DEBUG mode
- Sensitive Data Protection: No credentials in error responses
Purpose: Request tracking, timing, and Prometheus metrics
Functionality:
- Generates unique request ID (UUID4)
- Tracks in-progress requests (gauge metric)
- Measures request duration (histogram)
- Counts total requests by status (counter)
- Structured JSON logging of all requests
- Adds
X-Request-IDheader to responses
Metrics:
http_requests_total{method, endpoint, status}http_request_duration_seconds{method, endpoint}http_requests_in_progress{method, endpoint}
Purpose: Validate request size and content type
Validations:
- Request Size: Maximum 10MB (returns 413 if exceeded)
- Content-Type: Required for POST/PUT/PATCH with body (returns 400 if missing)
- Allowed Types: JSON, form-urlencoded, multipart, text/plain (returns 415 if invalid)
Exemptions:
- GET, HEAD, OPTIONS requests
/metrics,/docs,/redoc,/openapi.json/health/*endpoints
Purpose: Automatic response caching with Redis backend
Features:
- Cache Key Generation: Based on function, path params, and query params
- Long Key Hashing: MD5 hash for keys > 200 characters
- TTL Configuration: Configurable per endpoint (30s to 5min typical)
- Pattern Invalidation: Wildcard-based cache clearing
- Metrics: Tracks hits, misses, and invalidations
Cache Strategy:
- Vault endpoints: 5 minute TTL
- Health checks: 30 second TTL
- Other endpoints: No caching (unless explicitly configured)
Purpose: Prevent cascading failures across services
Protected Services:
vault_breaker- Vault API callspostgres_breaker- PostgreSQL connectionsmysql_breaker- MySQL connectionsmongodb_breaker- MongoDB connectionsredis_breaker- Redis operationsrabbitmq_breaker- RabbitMQ connections
Behavior:
- CLOSED: Normal operation, all requests pass through
- OPEN: After 5 failures, circuit opens, all requests fail immediately
- HALF_OPEN: After 60s timeout, allow limited requests to test recovery
- Recovery: On success in HALF_OPEN, circuit closes
Metrics:
circuit_breaker_opened_total{service}circuit_breaker_half_open_total{service}circuit_breaker_closed_total{service}circuit_breaker_failures_total{service}
Purpose: Centralized error handling and formatting
Handlers:
BaseAPIException- All custom exceptionsServiceUnavailableError- Service outages with retry suggestionsRequestValidationError- Pydantic validation errors (422)HTTPException- FastAPI/Starlette HTTP exceptionsException- Catch-all for unhandled errors (500)
Response Format:
{
"error": "VaultUnavailableError",
"message": "Vault service is currently unavailable",
"status_code": 503,
"details": {
"service": "vault",
"secret_path": "postgres"
},
"request_id": "abc-123-def-456"
}Format: JSON (python-json-logger)
Log Fields:
asctime: Timestampname: Logger namelevelname: Log level (INFO, ERROR, etc.)message: Log messagerequest_id: Request correlation IDmethod: HTTP methodpath: Request pathstatus_code: Response statusduration_ms: Request duration in milliseconds
Example Log Entry:
{
"asctime": "2025-10-27 12:34:56,789",
"name": "__main__",
"levelname": "INFO",
"message": "HTTP request completed",
"request_id": "abc-123-def-456",
"method": "GET",
"path": "/health/all",
"status_code": 200,
"duration_ms": 45.23
}Exposed at /metrics in Prometheus text format.
HTTP Metrics:
# HELP http_requests_total Total HTTP requests
# TYPE http_requests_total counter
http_requests_total{method="GET",endpoint="/health/all",status="200"} 42.0
# HELP http_request_duration_seconds HTTP request latency
# TYPE http_request_duration_seconds histogram
http_request_duration_seconds_bucket{method="GET",endpoint="/health/all",le="0.005"} 10.0
http_request_duration_seconds_bucket{method="GET",endpoint="/health/all",le="0.01"} 20.0
http_request_duration_seconds_bucket{method="GET",endpoint="/health/all",le="0.025"} 30.0
http_request_duration_seconds_sum{method="GET",endpoint="/health/all"} 1.234
http_request_duration_seconds_count{method="GET",endpoint="/health/all"} 42.0
# HELP http_requests_in_progress HTTP requests in progress
# TYPE http_requests_in_progress gauge
http_requests_in_progress{method="GET",endpoint="/health/all"} 2.0
Cache Metrics:
cache_hits_total{endpoint="/examples/vault/secret/postgres"} 15.0
cache_misses_total{endpoint="/examples/vault/secret/postgres"} 3.0
cache_invalidations_total{pattern="cache:*"} 1.0
Circuit Breaker Metrics:
circuit_breaker_opened_total{service="vault"} 0.0
circuit_breaker_half_open_total{service="vault"} 0.0
circuit_breaker_closed_total{service="vault"} 0.0
circuit_breaker_failures_total{service="vault"} 0.0
- Request ID: UUID4 generated per request
- Propagation: Stored in
request.state.request_id - Response Header:
X-Request-IDadded to all responses - Error Correlation: Included in error responses and logs
- Log Correlation: All log entries tagged with request ID
The application is included in the main docker-compose.yml:
# From repository root
./devstack.sh start
# Application available at:
# - HTTP: http://localhost:8000
# - HTTPS: https://localhost:8443
# - Docs: http://localhost:8000/docs# Root endpoint (API info)
curl http://localhost:8000/
# Check all services health
curl http://localhost:8000/health/all
# Get credentials from Vault (passwords masked)
curl http://localhost:8000/examples/vault/secret/postgres
# Test PostgreSQL connection
curl http://localhost:8000/examples/database/postgres/query
# Use Redis cache
curl -X POST "http://localhost:8000/examples/cache/mykey?value=hello&ttl=60"
curl http://localhost:8000/examples/cache/mykey
# View Prometheus metrics
curl http://localhost:8000/metricscd reference-apps/fastapi
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt
# Set environment variables
export VAULT_ADDR=http://localhost:8200
export VAULT_TOKEN=$(cat ~/.config/vault/vault-token.txt)
export DEBUG=true
# Run the application
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000Development Features:
- Hot reload on code changes
- DEBUG mode enables:
- CORS
*(allow all origins) - Stack traces in error responses
- Detailed logging
- CORS
# From FastAPI directory
cd reference-apps/fastapi
# Run all tests
pytest tests/ -v
# Run with coverage
pytest tests/ --cov=app --cov-report=html --cov-report=term
# Run specific test file
pytest tests/test_vault_service.py -v
# Run specific test
pytest tests/test_vault_service.py::TestVaultClientGetSecret::test_get_secret_success -vTotal: 254 tests (178 executed unit tests, 76 skipped integration tests)
Test Categories:
- Unit Tests (178): Mock external dependencies, fast execution
- Integration Tests (76): Require running infrastructure, skipped in CI
Coverage: 84.39% (exceeds 80% requirement)
Test Files:
test_caching.py- Cache middleware and operations (23 tests)test_circuit_breaker.py- Circuit breaker behavior (10 tests)test_cors.py- CORS configuration (13 tests)test_database_demo.py- Database integration (9 tests, 6 skipped)test_exception_handlers.py- Exception handling (35 tests)test_exceptions.py- Exception hierarchy (20 tests)test_health_routers.py- Health checks (18 tests)test_rate_limiting.py- Rate limiting (8 tests)test_redis_cluster.py- Redis cluster operations (15 tests, 10 skipped)test_request_validation.py- Input validation (15 tests)test_routers_unit.py- Router unit tests (30+ tests)test_vault_service.py- Vault client (17 tests)
Test Markers:
@pytest.mark.unit- Unit tests@pytest.mark.integration- Integration tests (require infrastructure)@pytest.mark.asyncio- Async tests@pytest.mark.cache- Cache-related tests
# Vault Configuration
VAULT_ADDR=http://vault:8200 # Vault API address
VAULT_APPROLE_DIR=/vault-approles/reference-api # AppRole credentials directory (preferred)
# OR
VAULT_TOKEN=<your-token> # Vault authentication token (fallback)
# Service Endpoints (Docker network names)
POSTGRES_HOST=postgres
POSTGRES_PORT=5432
MYSQL_HOST=mysql
MYSQL_PORT=3306
MONGODB_HOST=mongodb
MONGODB_PORT=27017
REDIS_HOST=redis-1
REDIS_PORT=6379
RABBITMQ_HOST=rabbitmq
RABBITMQ_PORT=5672
# Redis Cluster Nodes (comma-separated)
REDIS_NODES=redis-1:6379,redis-2:6379,redis-3:6379Note: The application prefers AppRole authentication over token-based authentication. If VAULT_APPROLE_DIR exists and contains valid credentials, it will be used. Otherwise, it falls back to VAULT_TOKEN.
# Application Settings
DEBUG=false # Enable debug mode (default: false)
APP_NAME="DevStack Core Reference API"When running in Docker Compose, these are set automatically via docker-compose.yml:
environment:
VAULT_ADDR: ${VAULT_ADDR:-http://vault:8200}
VAULT_TOKEN: ${VAULT_TOKEN}
POSTGRES_HOST: postgres
POSTGRES_PORT: 5432
MYSQL_HOST: mysql
MYSQL_PORT: 3306
MONGODB_HOST: mongodb
MONGODB_PORT: 27017
REDIS_HOST: redis-1
REDIS_PORT: 6379
RABBITMQ_HOST: rabbitmq
RABBITMQ_PORT: 5672from app.services.vault import vault_client
# Get all credentials for a service
creds = await vault_client.get_secret("postgres")
user = creds.get("user")
password = creds.get("password")
# Get specific key
password = await vault_client.get_secret("postgres", key="password")Error Handling:
from app.exceptions import VaultUnavailableError, ResourceNotFoundError
try:
creds = await vault_client.get_secret("postgres")
except ResourceNotFoundError:
# Secret doesn't exist in Vault
pass
except VaultUnavailableError as e:
# Vault is down, sealed, or connection failed
# Circuit breaker may be open
passimport asyncpg
from app.services.vault import vault_client
from app.middleware.circuit_breaker import postgres_breaker
@postgres_breaker
async def query_database():
# Fetch credentials from Vault
creds = await vault_client.get_secret("postgres")
# Connect to PostgreSQL
conn = await asyncpg.connect(
host="postgres",
port=5432,
user=creds.get("user"),
password=creds.get("password"),
database=creds.get("database")
)
# Execute query
result = await conn.fetch("SELECT * FROM users")
await conn.close()
return resultCircuit Breaker Protection:
- First 5 failures: Requests continue
- After 5 failures: Circuit opens, requests fail immediately with
CircuitBreakerError - After 60 seconds: Circuit enters HALF_OPEN, allows test request
- On success: Circuit closes, normal operation resumes
import redis.asyncio as redis
from app.services.vault import vault_client
async def get_redis_client() -> redis.Redis:
"""Create Redis client with Vault credentials"""
creds = await vault_client.get_secret("redis-1")
return redis.Redis(
host="redis-1",
port=6379,
password=creds.get("password"),
decode_responses=True,
socket_timeout=5
)
# Usage
client = await get_redis_client()
await client.setex("session:123", 3600, "user_data") # 1 hour TTL
value = await client.get("session:123")
await client.close()import aio_pika
import json
from app.services.vault import vault_client
from app.middleware.circuit_breaker import rabbitmq_breaker
@rabbitmq_breaker
async def publish_message(queue_name: str, message: dict):
"""Publish message to RabbitMQ queue"""
# Get credentials from Vault
creds = await vault_client.get_secret("rabbitmq")
# Build connection URL
url = f"amqp://{creds.get('user')}:{creds.get('password')}@rabbitmq:5672/{creds.get('vhost', '')}"
# Connect and publish
connection = await aio_pika.connect_robust(url)
channel = await connection.channel()
# Declare durable queue
await channel.declare_queue(queue_name, durable=True)
# Publish message
await channel.default_exchange.publish(
aio_pika.Message(
body=json.dumps(message).encode(),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT
),
routing_key=queue_name
)
await connection.close()from fastapi_cache.decorator import cache
@router.get("/expensive-operation")
@cache(expire=300) # Cache for 5 minutes
async def expensive_operation():
# This operation result will be cached
result = await perform_expensive_computation()
return resultCache Key Generation:
- Automatically includes: function name, path parameters, query parameters
- Keys > 200 chars are hashed (MD5)
- Supports custom namespace prefixes
- FastAPI 0.104+ - Modern async Python web framework
- Pydantic 2.x - Data validation and serialization
- Uvicorn - ASGI server
- asyncpg - PostgreSQL async driver
- aiomysql - MySQL async driver
- motor - MongoDB async driver
- redis[asyncio] - Redis async client
- aio-pika - RabbitMQ async client
- httpx - Async HTTP client for Vault
- fastapi-cache2 - Response caching with Redis
- slowapi - Rate limiting (Flask-Limiter port)
- pybreaker - Circuit breaker pattern
- prometheus-client - Prometheus metrics
- python-json-logger - Structured JSON logging
- starlette - ASGI toolkit (FastAPI dependency)
- pytest - Test framework
- pytest-asyncio - Async test support
- pytest-cov - Coverage reporting
- pytest-mock - Mocking utilities
This is a REFERENCE IMPLEMENTATION designed for:
- Learning infrastructure integration patterns
- Testing service connectivity
- Demonstrating best practices
- Local development environment
Not production-ready because:
- No authentication/authorization beyond Vault token
- Limited error recovery strategies
- No request queuing or backpressure handling
- Simplified health checks
- Debug mode exposes stack traces
- No horizontal scaling considerations
For production deployment, consider:
- Authentication: Add JWT/OAuth2 authentication
- Authorization: Implement role-based access control (RBAC)
- API Gateway: Use Kong, Traefik, or AWS API Gateway
- Service Mesh: Consider Istio or Linkerd for advanced traffic management
- Observability: Send logs to Loki/Elasticsearch, metrics to Prometheus
- Secrets Rotation: Implement automatic credential rotation
- Connection Pooling: Configure connection pool limits
- Graceful Shutdown: Handle SIGTERM for zero-downtime deployments
- Health Checks: Add readiness and liveness probes for Kubernetes
- TLS Everywhere: Enable TLS for all inter-service communication
- Main Infrastructure: ../../README.md
- Vault Security: ../../docs/VAULT_SECURITY.md
- Test Documentation: ../../tests/README.md
- Test Coverage Report: ../../tests/TEST_COVERAGE.md
- Docker Compose Config: ../../docker-compose.yml
This reference application is part of the DevStack Core project. See the main repository for license information.
This is a reference implementation. For contributions, improvements, or issues, please refer to the main DevStack Core repository and see our Contributing Guide for detailed instructions.