diff --git a/Dockerfile b/Dockerfile index 04f52fa8..fbb1d789 100644 --- a/Dockerfile +++ b/Dockerfile @@ -67,7 +67,7 @@ RUN groupadd --gid 1000 eva && \ RUN mkdir -p /app/output && chown eva:eva /app/output # Python runtime settings -ENV PYTHONPATH="/app/src:$PYTHONPATH" +ENV PYTHONPATH="/app/src" ENV PYTHONUNBUFFERED=1 # Health check diff --git a/scripts/create_elevenlabs_tools.py b/scripts/create_elevenlabs_tools.py index 371406d9..6d98614c 100644 --- a/scripts/create_elevenlabs_tools.py +++ b/scripts/create_elevenlabs_tools.py @@ -86,7 +86,11 @@ def build_parameters(tool: dict) -> ObjectJsonSchemaPropertyInput: ) -def convert_tool(tool: dict) -> ToolRequestModel: +def convert_tool( + tool: dict, + pre_tool_speech: str = "force", + execution_mode: str = "immediate", +) -> ToolRequestModel: """Convert a YAML tool definition to an ElevenLabs ToolRequestModel.""" client_config = ToolRequestModelToolConfig_Client( type="client", @@ -94,6 +98,8 @@ def convert_tool(tool: dict) -> ToolRequestModel: description=f"{tool['name']}: {tool['description']}", expects_response=True, parameters=build_parameters(tool), + pre_tool_speech=pre_tool_speech, + execution_mode=execution_mode, ) return ToolRequestModel(tool_config=client_config) @@ -104,6 +110,18 @@ def main(): parser.add_argument("--agent-id", default="", help="ElevenLabs agent ID") parser.add_argument("--domain", default="airline", help="Agent domain name (e.g. airline, itsm, medical_hr)") parser.add_argument("--config", default=None, help="Path to agent YAML config (overrides --domain)") + parser.add_argument( + "--pre-tool-speech", + default="force", + choices=["auto", "force", "off"], + help="Pre-tool speech mode (default: force)", + ) + parser.add_argument( + "--execution-mode", + default="immediate", + choices=["immediate", "post_tool_speech", "async"], + help="Tool execution mode (default: immediate)", + ) parser.add_argument("--dry-run", action="store_true", help="Print tool configs without creating them") args = parser.parse_args() @@ -113,7 +131,7 @@ def main(): if args.dry_run: for tool in tools: - request_model = convert_tool(tool) + request_model = convert_tool(tool, pre_tool_speech=args.pre_tool_speech, execution_mode=args.execution_mode) tool_cfg = request_model.tool_config print(f"\n--- {tool_cfg.name} ---") print(f" description: {tool_cfg.description}") diff --git a/src/eva/assistant/elevenlabs_server.py b/src/eva/assistant/elevenlabs_server.py index 2b978b3f..f1222a69 100644 --- a/src/eva/assistant/elevenlabs_server.py +++ b/src/eva/assistant/elevenlabs_server.py @@ -195,7 +195,11 @@ async def websocket_root(websocket: WebSocket): logger.info(f"Elevenlabs server started on ws://localhost:{self.port}") async def _shutdown(self) -> None: - """Stop the server, save outputs.""" + """Stop the server (framework-specific teardown). + + Note: save_outputs() is called by the base class stop() after _shutdown() + returns — do NOT call it here to avoid writing outputs twice. + """ if not self._running: return self._running = False @@ -216,7 +220,6 @@ async def _shutdown(self) -> None: self._server = None self._server_task = None - await self.save_outputs() logger.info(f"ElevenLabs server stopped on port {self.port}") async def _handle_session(self, websocket: WebSocket) -> None: # noqa: C901 diff --git a/src/eva/cli.py b/src/eva/cli.py index 0a079a90..818573fb 100644 --- a/src/eva/cli.py +++ b/src/eva/cli.py @@ -5,11 +5,14 @@ """ import asyncio +import faulthandler import os import sys from pydantic import ValidationError +faulthandler.enable() # Print Python stack trace on segfaults (exit 139) + def _extract_domain_spec() -> tuple[str | None, bool]: """Return (raw_domain_spec, came_from_argv). diff --git a/src/eva/models/config.py b/src/eva/models/config.py index 338d4097..8fe4c112 100644 --- a/src/eva/models/config.py +++ b/src/eva/models/config.py @@ -53,6 +53,51 @@ def _param_alias(params: dict[str, Any]) -> str: return params.get("alias") or params["model"] +_elevenlabs_agent_cache: dict[str, dict[str, str]] = {} + + +def _fetch_elevenlabs_agent_models(s2s_params: dict[str, Any]) -> dict[str, str]: + """Fetch STT, LLM, and TTS model names from the ElevenLabs agent API. + + Results are cached per agent ID so repeated calls (e.g. run_id generation) + don't hit the API multiple times. + """ + agent_id = s2s_params.get("assistant_agent_id", "") + if not agent_id: + logger.warning("No assistant_agent_id in s2s_params, cannot fetch ElevenLabs agent models") + return {"stt": "unknown", "llm": "unknown", "tts": "unknown"} + + if agent_id in _elevenlabs_agent_cache: + return _elevenlabs_agent_cache[agent_id] + + try: + from elevenlabs.client import ElevenLabs + + client = ElevenLabs(api_key=s2s_params.get("api_key")) + agent = client.conversational_ai.agents.get(agent_id=agent_id) + cc = agent.conversation_config + + stt = "unknown" + if cc.asr and cc.asr.provider: + stt = cc.asr.provider + + llm = "unknown" + if cc.agent and cc.agent.prompt and cc.agent.prompt.llm: + llm = cc.agent.prompt.llm + + tts = "unknown" + if cc.tts and cc.tts.model_id: + tts = cc.tts.model_id + + result = {"stt": stt, "llm": llm, "tts": tts} + _elevenlabs_agent_cache[agent_id] = result + logger.info(f"Fetched ElevenLabs agent models: {result}") + return result + except Exception as e: + logger.warning(f"Failed to fetch ElevenLabs agent models: {e}") + return {"stt": "unknown", "llm": "unknown", "tts": "unknown"} + + class PipelineConfig(BaseModel): """Configuration for a STT + LLM + TTS pipeline.""" @@ -195,12 +240,9 @@ class SpeechToSpeechConfig(BaseModel): def pipeline_parts(self) -> dict[str, str]: """Component names for this pipeline.""" if self.s2s == "elevenlabs": - # hardcoded for now. Models are set on the agent UI return { "s2s": _param_alias(self.s2s_params), - "stt": "scribe_v2.2_realtime", - "llm": "gemini-3-flash-preview", - "tts": "v3-conversational", + **_fetch_elevenlabs_agent_models(self.s2s_params), } return {"s2s": _param_alias(self.s2s_params)} diff --git a/src/eva/orchestrator/runner.py b/src/eva/orchestrator/runner.py index d9424bc9..95b6d7d8 100644 --- a/src/eva/orchestrator/runner.py +++ b/src/eva/orchestrator/runner.py @@ -221,13 +221,10 @@ async def _run_and_pipeline( async with semaphore: result, audio_task = await self._run_conversation(record, output_id) - # Phase 2: Save result.json outside the semaphore - if isinstance(result, ConversationResult): - result_path = self.output_dir / "records" / output_id / "result.json" - result_path.parent.mkdir(parents=True, exist_ok=True) - await asyncio.to_thread(result_path.write_text, result.model_dump_json(indent=2)) + # result.json is now written by the worker itself (inside the + # semaphore) so it survives task cancellation and process signals. - # Phase 3: If the conversation didn't complete, skip validation. + # Phase 2: If the conversation didn't complete, skip validation. # validate_one() handles the gate (conversation_valid_end); returning # vr=None signals "not_finished" to the classification loop below. if not (isinstance(result, ConversationResult) and result.completed): @@ -257,10 +254,11 @@ async def _run_and_pipeline( logger.warning(f"Skipping {output_id} as invalid: {exc}") return output_id, exc, False, None except Exception as exc: - # Anything else (KeyError, TypeError, programming bugs, etc.) is - # surfaced loudly so it doesn't get silently swallowed. + # Log loudly but return instead of raising — asyncio.gather does + # not use return_exceptions, so a raise here kills result.json + # writes for every other record still in flight. logger.error(f"Pipeline error for {output_id}: {exc}", exc_info=True) - raise + return output_id, exc, False, None pipeline_results = await asyncio.gather( *(_run_and_pipeline(output_id_to_record[oid], oid) for oid in pending_output_ids), diff --git a/src/eva/orchestrator/worker.py b/src/eva/orchestrator/worker.py index 64ba9301..01dd7aee 100644 --- a/src/eva/orchestrator/worker.py +++ b/src/eva/orchestrator/worker.py @@ -179,90 +179,128 @@ async def run(self) -> ConversationResult: ) finally: await self._cleanup() - # Remove the log file handler after cleanup is complete - if self._log_file_handler: - remove_record_log_file(self._log_file_handler) - self._log_file_handler = None - # If the conversation errored, return a failed result immediately. DB hashes or latency stats cannot be computed if the run did not complete. - if error is not None: - now = datetime.now() - return ConversationResult( + try: + # If the conversation errored, return a failed result immediately. + # DB hashes or latency stats cannot be computed if the run did not complete. + if error is not None: + now = datetime.now() + return ConversationResult( + record_id=self.record.id, + completed=False, + error=error, + error_details=error_details, + started_at=started_at, + ended_at=now, + duration_seconds=(now - started_at).total_seconds(), + output_dir=str(self.output_dir), + conversation_ended_reason="error", + ) + + ended_at = datetime.now() + + # Compute scenario database hashes (REQUIRED for deterministic metrics) + initial_db_path = self.output_dir / "initial_scenario_db.json" + final_db_path = self.output_dir / "final_scenario_db.json" + + if not initial_db_path.exists(): + raise FileNotFoundError( + f"Initial scenario database not found at {initial_db_path}. " + "This is required for deterministic task completion metrics." + ) + if not final_db_path.exists(): + raise FileNotFoundError( + f"Final scenario database not found at {final_db_path}. " + "This is required for deterministic task completion metrics." + ) + + with open(initial_db_path) as f: + initial_db = json.load(f) + with open(final_db_path) as f: + final_db = json.load(f) + + initial_scenario_db_hash = get_dict_hash(initial_db) + final_scenario_db_hash = get_dict_hash(final_db) + + logger.info( + f"Computed scenario DB hashes - Initial: {initial_scenario_db_hash[:8]}..., " + f"Final: {final_scenario_db_hash[:8]}..." + ) + + # Calculate latency statistics + llm_latency = self._calculate_llm_latency() + stt_latency = self._calculate_stt_latency() + tts_latency = self._calculate_tts_latency() + model_response_latency = self._calculate_model_response_latency() + + result = ConversationResult( record_id=self.record.id, - completed=False, + completed=error is None and conversation_ended_reason != "error", error=error, error_details=error_details, + llm_latency=llm_latency, + stt_latency=stt_latency, + tts_latency=tts_latency, + model_response_latency=model_response_latency, started_at=started_at, - ended_at=now, - duration_seconds=(now - started_at).total_seconds(), + ended_at=ended_at, + duration_seconds=(ended_at - started_at).total_seconds(), output_dir=str(self.output_dir), - conversation_ended_reason="error", + audio_assistant_path=str(self.output_dir / "audio_assistant.wav"), + audio_user_path=str(self.output_dir / "audio_user_clean.wav"), + audio_mixed_path=str(self.output_dir / "audio_mixed.wav"), + transcript_path=str(self.output_dir / "transcript.jsonl"), + audit_log_path=str(self.output_dir / "audit_log.json"), + conversation_log_path=str(self.output_dir / "logs.log"), + pipecat_logs_path=self._resolve_framework_logs_path(), + elevenlabs_logs_path=str(self.output_dir / "elevenlabs_events.jsonl"), + num_turns=self._conversation_stats.get("num_turns", 0), + num_tool_calls=self._conversation_stats.get("num_tool_calls", 0), + tools_called=self._conversation_stats.get("tools_called", []), + conversation_ended_reason=conversation_ended_reason, + initial_scenario_db_hash=initial_scenario_db_hash, + final_scenario_db_hash=final_scenario_db_hash, ) - ended_at = datetime.now() + # Write result.json here (inside Phase 1 / semaphore) so it lands on + # disk alongside audit_log and scenario DBs. Previously this was done + # in the runner's Phase 2 *after* the semaphore release, where it was + # vulnerable to task cancellation from process signals or gather crashes. + result_path = self.output_dir / "result.json" + result_path.write_text(result.model_dump_json(indent=2)) - # Compute scenario database hashes (REQUIRED for deterministic metrics) - initial_db_path = self.output_dir / "initial_scenario_db.json" - final_db_path = self.output_dir / "final_scenario_db.json" + return result - if not initial_db_path.exists(): - raise FileNotFoundError( - f"Initial scenario database not found at {initial_db_path}. " - "This is required for deterministic task completion metrics." + except Exception as e: + # Post-conversation processing failed (hash computation, latency stats, etc.) + # Return a failed result so result.json still gets written by the runner, + # rather than raising and leaving no result.json on disk. + logger.error( + f"Post-conversation processing failed for {self.record.id}: {e}", + exc_info=True, ) - if not final_db_path.exists(): - raise FileNotFoundError( - f"Final scenario database not found at {final_db_path}. " - "This is required for deterministic task completion metrics." + now = datetime.now() + return ConversationResult( + record_id=self.record.id, + completed=False, + error=f"Post-conversation processing failed: {e}", + error_details=create_error_details(error=e, retry_count=0, retry_succeeded=False), + started_at=started_at, + ended_at=now, + duration_seconds=(now - started_at).total_seconds(), + output_dir=str(self.output_dir), + conversation_ended_reason=conversation_ended_reason, + num_turns=self._conversation_stats.get("num_turns", 0), + num_tool_calls=self._conversation_stats.get("num_tool_calls", 0), + tools_called=self._conversation_stats.get("tools_called", []), ) - with open(initial_db_path) as f: - initial_db = json.load(f) - with open(final_db_path) as f: - final_db = json.load(f) - - initial_scenario_db_hash = get_dict_hash(initial_db) - final_scenario_db_hash = get_dict_hash(final_db) - - logger.info( - f"Computed scenario DB hashes - Initial: {initial_scenario_db_hash[:8]}..., " - f"Final: {final_scenario_db_hash[:8]}..." - ) - - # Calculate latency statistics - llm_latency = self._calculate_llm_latency() - stt_latency = self._calculate_stt_latency() - tts_latency = self._calculate_tts_latency() - model_response_latency = self._calculate_model_response_latency() - - return ConversationResult( - record_id=self.record.id, - completed=error is None and conversation_ended_reason != "error", - error=error, - error_details=error_details, - llm_latency=llm_latency, - stt_latency=stt_latency, - tts_latency=tts_latency, - model_response_latency=model_response_latency, - started_at=started_at, - ended_at=ended_at, - duration_seconds=(ended_at - started_at).total_seconds(), - output_dir=str(self.output_dir), - audio_assistant_path=str(self.output_dir / "audio_assistant.wav"), - audio_user_path=str(self.output_dir / "audio_user_clean.wav"), - audio_mixed_path=str(self.output_dir / "audio_mixed.wav"), - transcript_path=str(self.output_dir / "transcript.jsonl"), - audit_log_path=str(self.output_dir / "audit_log.json"), - conversation_log_path=str(self.output_dir / "logs.log"), - pipecat_logs_path=self._resolve_framework_logs_path(), - elevenlabs_logs_path=str(self.output_dir / "elevenlabs_events.jsonl"), - num_turns=self._conversation_stats.get("num_turns", 0), - num_tool_calls=self._conversation_stats.get("num_tool_calls", 0), - tools_called=self._conversation_stats.get("tools_called", []), - conversation_ended_reason=conversation_ended_reason, - initial_scenario_db_hash=initial_scenario_db_hash, - final_scenario_db_hash=final_scenario_db_hash, - ) + finally: + # Remove the log file handler LAST so post-conversation errors + # are captured in the record's logs.log (not just the console). + if self._log_file_handler: + remove_record_log_file(self._log_file_handler) + self._log_file_handler = None async def _start_assistant(self) -> None: """Start the assistant server using the configured framework."""