diff --git a/Dockerfile b/Dockerfile index 4ad1d0d..14e0c87 100644 --- a/Dockerfile +++ b/Dockerfile @@ -35,4 +35,4 @@ USER appuser EXPOSE 8000 -CMD ["python", "-m", "src.main"] +CMD ["python", "src/main.py"] diff --git a/src/application/routes/agents.py b/src/application/routes/agents.py index f38e207..0841d3f 100644 --- a/src/application/routes/agents.py +++ b/src/application/routes/agents.py @@ -50,7 +50,9 @@ async def list_agents( use_case: Annotated[ListAgentConfigsUseCase, Depends(get_list_agent_configs_use_case)], ) -> list[AgentConfigMetadata]: """List all agent configuration metadata.""" - return await use_case.execute() + agents = await use_case.execute() + logger.info("Listed %d agent configs", len(agents)) + return agents @router.get("/{agent_name}", response_model=AgentConfig) @@ -60,6 +62,7 @@ async def get_agent( ) -> AgentConfig: """Retrieve a single agent configuration by name.""" _validate_agent_name(agent_name) + logger.info("Getting agent config: %s", agent_name) return await use_case.execute(name=agent_name) @@ -72,7 +75,10 @@ async def create_agent( """Create a new agent configuration from an uploaded YAML file.""" _validate_agent_name(agent_name) yaml_content = await _read_yaml_upload(file) - return await use_case.execute(name=agent_name, yaml_content=yaml_content) + logger.info("Creating agent config: %s", agent_name) + result = await use_case.execute(name=agent_name, yaml_content=yaml_content) + logger.info("Agent config created: %s", agent_name) + return result @router.put("/{agent_name}", response_model=AgentConfig) @@ -84,7 +90,10 @@ async def update_agent( """Update an existing agent configuration from an uploaded YAML file.""" _validate_agent_name(agent_name) yaml_content = await _read_yaml_upload(file) - return await use_case.execute(name=agent_name, yaml_content=yaml_content) + logger.info("Updating agent config: %s", agent_name) + result = await use_case.execute(name=agent_name, yaml_content=yaml_content) + logger.info("Agent config updated: %s", agent_name) + return result @router.delete("/{agent_name}", status_code=status.HTTP_204_NO_CONTENT) @@ -94,4 +103,6 @@ async def delete_agent( ) -> None: """Delete an agent configuration.""" _validate_agent_name(agent_name) + logger.info("Deleting agent config: %s", agent_name) await use_case.execute(name=agent_name) + logger.info("Agent config deleted: %s", agent_name) diff --git a/src/application/routes/prompt.py b/src/application/routes/prompt.py index a7ce97b..85df774 100644 --- a/src/application/routes/prompt.py +++ b/src/application/routes/prompt.py @@ -42,6 +42,7 @@ async def create_prompt( """Create a new prompt.""" use_case = CreatePromptUseCase(prompt_manager) try: + logger.info("Creating prompt: %s", request.identifier) content_dicts = [msg.model_dump() for msg in request.content] prompt = await use_case.execute( identifier=request.identifier, @@ -51,10 +52,11 @@ async def create_prompt( tags=request.tags, metadata=request.metadata, ) + logger.info("Prompt created: %s", request.identifier) return {"status": "success", "prompt": prompt} - except Exception as e: - logger.error(f"Error creating prompt '{request.identifier}': {e}") - raise _handle_http_error(e, request.identifier) + except Exception: + logger.exception("Error creating prompt '%s'", request.identifier) + raise @router.get("/get/{identifier}") @@ -73,9 +75,9 @@ async def get_prompt( tag=tag, ) return {"status": "success", "prompt": prompt} - except Exception as e: - logger.error(f"Error getting prompt '{identifier}': {e}") - raise _handle_http_error(e, identifier) + except Exception: + logger.exception("Error getting prompt '%s'", identifier) + raise @router.put("/update/{identifier}") @@ -96,6 +98,6 @@ async def update_prompt( metadata=request.metadata, ) return {"status": "success", "prompt": prompt} - except Exception as e: - logger.error(f"Error updating prompt '{identifier}': {e}") - raise _handle_http_error(e, identifier) + except Exception: + logger.exception("Error updating prompt '%s'", identifier) + raise diff --git a/src/application/routes/threads.py b/src/application/routes/threads.py index d8a811b..7cff719 100644 --- a/src/application/routes/threads.py +++ b/src/application/routes/threads.py @@ -39,7 +39,7 @@ async def list_threads( use_case: Annotated[ListThreadsUseCase, Depends(get_list_threads_use_case)], ) -> list[Thread]: threads = await use_case.execute() - logger.debug("Listed %d threads", len(threads)) + logger.info("Listed %d threads", len(threads)) return threads @@ -48,7 +48,7 @@ async def get_thread( thread_id: str, use_case: Annotated[GetThreadUseCase, Depends(get_get_thread_use_case)], ) -> Thread: - logger.debug("Getting thread=%s", thread_id) + logger.info("Getting thread=%s", thread_id) return await use_case.execute(thread_id) @@ -67,5 +67,5 @@ async def list_messages( use_case: Annotated[GetThreadUseCase, Depends(get_get_thread_use_case)], ) -> list: thread = await use_case.execute(thread_id) - logger.debug("[thread=%s] Listed %d messages", thread_id, len(thread.messages)) + logger.info("[thread=%s] Listed %d messages", thread_id, len(thread.messages)) return thread.messages diff --git a/src/application/routes/websocket.py b/src/application/routes/websocket.py index 7d8b11e..9076f4b 100644 --- a/src/application/routes/websocket.py +++ b/src/application/routes/websocket.py @@ -26,7 +26,7 @@ async def websocket_chat( try: payload = json.loads(data) except json.JSONDecodeError: - logger.error("[thread=%s] Invalid JSON received: %s", thread_id, data[:200]) + logger.exception("[thread=%s] Invalid JSON received: %s", thread_id, data[:200]) await websocket.send_text(json.dumps({"error": "Invalid JSON"})) continue message = payload.get("message", "") diff --git a/src/application/use_cases/get_agent_config.py b/src/application/use_cases/get_agent_config.py index 9130cdd..38d26d4 100644 --- a/src/application/use_cases/get_agent_config.py +++ b/src/application/use_cases/get_agent_config.py @@ -33,5 +33,5 @@ async def execute(self, name: str) -> AgentConfig: """ yaml_content = await self._config_store.get(name) config = self._config_loader.load_from_string(yaml_content) - logger.debug("Loaded agent config '%s' from store", name) + logger.info("Loaded agent config '%s' from store", name) return config diff --git a/src/application/use_cases/list_agent_configs.py b/src/application/use_cases/list_agent_configs.py index dbf1d36..148353a 100644 --- a/src/application/use_cases/list_agent_configs.py +++ b/src/application/use_cases/list_agent_configs.py @@ -19,5 +19,5 @@ async def execute(self) -> list[AgentConfigMetadata]: List of AgentConfigMetadata. """ result = await self._config_repository.list_all() - logger.debug("Listed %d agent configs from repository", len(result)) + logger.info("Listed %d agent configs from repository", len(result)) return result diff --git a/src/application/use_cases/thread_management.py b/src/application/use_cases/thread_management.py index 02bb461..04416ba 100644 --- a/src/application/use_cases/thread_management.py +++ b/src/application/use_cases/thread_management.py @@ -15,7 +15,6 @@ def __init__(self, threads: ThreadRepository, registry: AgentRegistry): async def execute(self, agent_name: str) -> Thread: if agent_name not in await self._registry.list_agents(): - logger.error("Agent not found: %s", agent_name) raise AgentNotFoundError(f"Agent not found: {agent_name}") thread = await self._threads.create(agent_name) logger.info("Thread created: id=%s agent=%s", thread.id, agent_name) diff --git a/src/config.py b/src/config.py index 74cfd7c..5071694 100644 --- a/src/config.py +++ b/src/config.py @@ -21,7 +21,7 @@ class Settings(BaseSettings): openai_api_key: str | None = None host: str = "0.0.0.0" port: int = 8000 - log_level: str = "INFO" + uvicorn_log_level: str = "info" allowed_origins: list[str] = ["http://localhost:8080"] tracing: TracingSettings = TracingSettings() diff --git a/src/infrastructure/deepagent/adapter.py b/src/infrastructure/deepagent/adapter.py index 4df63f7..b525889 100644 --- a/src/infrastructure/deepagent/adapter.py +++ b/src/infrastructure/deepagent/adapter.py @@ -183,7 +183,7 @@ def _build_response(self, result: dict, config: dict, thinking: str | None) -> M async def invoke(self, thread_id: str, message: str) -> Message: config = self._build_config(thread_id) logger.info("[thread=%s] Invoking agent", thread_id) - logger.debug("[thread=%s] Message: %s", thread_id, message[:200]) + logger.info("[thread=%s] Message: %s", thread_id, message[:200]) try: start = time.monotonic() result = await self._graph.ainvoke( diff --git a/src/infrastructure/deepagent/factory.py b/src/infrastructure/deepagent/factory.py index 3aa4813..09a3286 100644 --- a/src/infrastructure/deepagent/factory.py +++ b/src/infrastructure/deepagent/factory.py @@ -209,7 +209,7 @@ async def create_agent_from_config( mcp_tools = await mcp_tool_loader.load_tools(config.mcp_servers) logger.info("Loaded %d MCP tools for agent '%s'", len(mcp_tools), config.name) all_tools = (local_tools or []) + mcp_tools if (local_tools or mcp_tools) else None - logger.debug("Agent '%s' tools: %d total", config.name, len(all_tools) if all_tools else 0) + logger.info("Agent '%s' tools: %d total", config.name, len(all_tools) if all_tools else 0) if prompt_manager: system_prompt = await get_system_prompt_from_phoenix(config.name, prompt_manager) @@ -254,8 +254,8 @@ async def create_agent_from_config( logger.info("Agent '%s' has %d subagents", config.name, len(subagents)) try: graph = create_deep_agent(**kwargs) - except Exception as e: - logger.error(f"Error creating agent '{config.name}': {e}") + except Exception: + logger.exception("Error creating agent '%s'", config.name) raise logger.info("Agent '%s' created successfully", config.name) return graph, response_format_model diff --git a/src/infrastructure/minio_store/adapter.py b/src/infrastructure/minio_store/adapter.py index 3f997e5..00cce5d 100644 --- a/src/infrastructure/minio_store/adapter.py +++ b/src/infrastructure/minio_store/adapter.py @@ -84,7 +84,7 @@ async def list_all(self) -> list[str]: async def ensure_bucket(self) -> None: """Create the bucket if it does not already exist.""" if await self._client.bucket_exists(self._bucket): - logger.debug("MinIO bucket '%s' already exists", self._bucket) + logger.info("MinIO bucket '%s' already exists", self._bucket) return await self._client.make_bucket(self._bucket) logger.info("Created MinIO bucket '%s'", self._bucket) diff --git a/src/infrastructure/persistent_registry/adapter.py b/src/infrastructure/persistent_registry/adapter.py index b24c585..484dc9c 100644 --- a/src/infrastructure/persistent_registry/adapter.py +++ b/src/infrastructure/persistent_registry/adapter.py @@ -49,7 +49,7 @@ async def get_runner(self, agent_name: str) -> AgentRunner: AgentNotFoundError: If no config exists for this agent. """ if agent_name in self._runners: - logger.debug("Agent '%s' loaded from cache", agent_name) + logger.info("Agent '%s' loaded from cache", agent_name) return self._runners[agent_name] async with self._lock: diff --git a/src/infrastructure/prompt_management/phoenix_prompt_adapter.py b/src/infrastructure/prompt_management/phoenix_prompt_adapter.py index e71ef6b..a1c5b0b 100644 --- a/src/infrastructure/prompt_management/phoenix_prompt_adapter.py +++ b/src/infrastructure/prompt_management/phoenix_prompt_adapter.py @@ -73,8 +73,8 @@ def __init__( ), ) logger.info("PhoenixPromptManagerProvider initialized base_url=%s timeout=%ss", base_url, timeout) - except Exception as e: - logger.error("Failed to initialize Phoenix client: %s", e) + except Exception: + logger.exception("Failed to initialize Phoenix client") self._client = None @_phoenix_retry @@ -101,8 +101,8 @@ async def get_prompt( return self._to_domain_prompt(prompt_obj, identifier=identifier, description=prompt_obj._description, tags=[t["name"] for t in tags]) except (ValueError, PhoenixUnavailableError): raise - except Exception as e: - logger.error("Error getting prompt '%s': %s", identifier, e) + except Exception: + logger.exception("Error getting prompt '%s'", identifier) raise _wrap_phoenix_error("get_prompt", identifier, e) from e @cached(cache=TTLCache(maxsize=10, ttl=300)) @@ -130,8 +130,8 @@ async def get_prompt_content( return messages[0] if messages else {} except (ValueError, PhoenixUnavailableError): raise - except Exception as e: - logger.error("Error getting prompt content '%s': %s", identifier, e) + except Exception: + logger.exception("Error getting prompt content '%s'", identifier) raise _wrap_phoenix_error("get_prompt_content", identifier, e) from e @_phoenix_retry @@ -167,8 +167,8 @@ async def create_prompt( return prompt_obj except (ValueError, PhoenixUnavailableError): raise - except Exception as e: - logger.error("Error creating prompt '%s': %s", identifier, e) + except Exception: + logger.exception("Error creating prompt '%s'", identifier) raise _wrap_phoenix_error("create_prompt", identifier, e) from e @_phoenix_retry @@ -199,8 +199,8 @@ async def update_prompt( return updated except (ValueError, PhoenixUnavailableError): raise - except Exception as e: - logger.error("Error updating prompt '%s': %s", identifier, e) + except Exception: + logger.exception("Error updating prompt '%s'", identifier) raise _wrap_phoenix_error("update_prompt", identifier, e) from e async def add_tag(self, identifier: str, tag: str) -> None: @@ -212,8 +212,8 @@ async def add_tag(self, identifier: str, tag: str) -> None: name=tag, ) logger.info("Added tag '%s' to prompt '%s'", tag, identifier) - except Exception as e: - logger.error("Error adding tag '%s' to '%s': %s", tag, identifier, e) + except Exception: + logger.exception("Error adding tag '%s' to '%s'", tag, identifier) raise _wrap_phoenix_error("add_tag", identifier, e) from e def _to_domain_prompt( diff --git a/src/infrastructure/tracing/phoenix_adapter.py b/src/infrastructure/tracing/phoenix_adapter.py index cf2ccc7..1c96327 100644 --- a/src/infrastructure/tracing/phoenix_adapter.py +++ b/src/infrastructure/tracing/phoenix_adapter.py @@ -63,8 +63,8 @@ async def flush(self) -> None: if hasattr(self._tracer_provider, "force_flush"): self._tracer_provider.force_flush(timeout_millis=timeout_millis) logger.info("Flushed pending spans to Phoenix") - except Exception as e: - logger.error("Error flushing spans to Phoenix: %s", e) + except Exception: + logger.exception("Error flushing spans to Phoenix") async def shutdown(self) -> None: """Shutdown the tracer provider and flush remaining spans.""" @@ -76,5 +76,5 @@ async def shutdown(self) -> None: if hasattr(self._tracer_provider, "shutdown"): self._tracer_provider.shutdown() logger.info("Phoenix tracing provider shutdown complete") - except Exception as e: - logger.error("Error shutting down tracer provider: %s", e) + except Exception: + logger.exception("Error shutting down tracer provider") diff --git a/src/main.py b/src/main.py index 3b36cf9..77718f0 100644 --- a/src/main.py +++ b/src/main.py @@ -1,53 +1,28 @@ -import asyncio +"""Main entry point for the Composable Agents API.""" + import logging from contextlib import asynccontextmanager from pathlib import Path +import uvicorn +from fastapi import FastAPI, Request +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse + from src.config import Settings +settings = Settings() + logging.basicConfig( - level=logging.INFO, + level=settings.uvicorn_log_level.upper(), format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) -from fastapi import FastAPI, Request -from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import JSONResponse - -from src.application.routes.agents import router as agents_router -from src.application.routes.chat import router as chat_router -from src.application.routes.health import router as health_router -from src.application.routes.prompt import router as prompt_router -from src.application.routes.threads import router as threads_router -from src.application.routes.websocket import router as websocket_router -from src.dependencies import ( - close_persistence, - init_persistence, - mcp_tool_loader, - tracing_provider, -) -from src.domain.exceptions import ( - AgentConfigAlreadyExistsError, - AgentError, - AgentNotFoundError, - ConfigError, - ConfigNotFoundError, - ConfigValidationError, - DomainError, - StorageError, - ThreadNotFoundError, -) - logger = logging.getLogger(__name__) -settings = Settings() def _run_alembic_upgrade() -> None: - """Run Alembic migrations to head synchronously. - - Designed to be called via asyncio.to_thread() during startup. - """ from alembic.config import Config from alembic import command @@ -60,12 +35,10 @@ def _run_alembic_upgrade() -> None: @asynccontextmanager async def lifespan(_app: FastAPI): - """Application lifespan: run migrations, init persistence on startup, cleanup on shutdown.""" + logging.getLogger().setLevel(settings.uvicorn_log_level.upper()) + logger.info("Application startup initiated") try: - logger.info("Running database migrations...") - await asyncio.to_thread(_run_alembic_upgrade) - logger.info("Database migrations completed") await init_persistence() logger.info("Persistence initialized") except Exception: @@ -107,6 +80,30 @@ async def lifespan(_app: FastAPI): allow_headers=["*"], ) +from src.application.routes.agents import router as agents_router +from src.application.routes.chat import router as chat_router +from src.application.routes.health import router as health_router +from src.application.routes.prompt import router as prompt_router +from src.application.routes.threads import router as threads_router +from src.application.routes.websocket import router as websocket_router +from src.dependencies import ( + close_persistence, + init_persistence, + mcp_tool_loader, + tracing_provider, +) +from src.domain.exceptions import ( + AgentConfigAlreadyExistsError, + AgentError, + AgentNotFoundError, + ConfigError, + ConfigNotFoundError, + ConfigValidationError, + DomainError, + StorageError, + ThreadNotFoundError, +) + app.include_router(health_router) app.include_router(threads_router) app.include_router(chat_router) @@ -168,7 +165,20 @@ async def domain_error_handler(_request: Request, exc: DomainError) -> JSONRespo logger.error("Domain error: %s", exc) return JSONResponse(status_code=500, content={"detail": str(exc)}) -if __name__ == "__main__": - import uvicorn - uvicorn.run(app, host=settings.host, port=settings.port, log_level=settings.log_level.lower()) \ No newline at end of file +def run_fastapi(): + logger.info("Running database migrations...") + _run_alembic_upgrade() + logger.info("Database migrations completed") + + uvicorn.run( + app, + host=settings.host, + port=settings.port, + log_level=settings.uvicorn_log_level, + access_log=True, + ) + + +if __name__ == "__main__": + run_fastapi() \ No newline at end of file