From 40f82bd5eb717d476cba55ce4d8cc0936dcb7576 Mon Sep 17 00:00:00 2001 From: Katrina Date: Wed, 29 Apr 2026 16:51:02 -0400 Subject: [PATCH 01/10] elevenlabs not saving result.json --- src/eva/assistant/elevenlabs_server.py | 7 +- src/eva/orchestrator/worker.py | 173 +++++++++++++++---------- 2 files changed, 106 insertions(+), 74 deletions(-) diff --git a/src/eva/assistant/elevenlabs_server.py b/src/eva/assistant/elevenlabs_server.py index 2b978b3f..f1222a69 100644 --- a/src/eva/assistant/elevenlabs_server.py +++ b/src/eva/assistant/elevenlabs_server.py @@ -195,7 +195,11 @@ async def websocket_root(websocket: WebSocket): logger.info(f"Elevenlabs server started on ws://localhost:{self.port}") async def _shutdown(self) -> None: - """Stop the server, save outputs.""" + """Stop the server (framework-specific teardown). + + Note: save_outputs() is called by the base class stop() after _shutdown() + returns — do NOT call it here to avoid writing outputs twice. + """ if not self._running: return self._running = False @@ -216,7 +220,6 @@ async def _shutdown(self) -> None: self._server = None self._server_task = None - await self.save_outputs() logger.info(f"ElevenLabs server stopped on port {self.port}") async def _handle_session(self, websocket: WebSocket) -> None: # noqa: C901 diff --git a/src/eva/orchestrator/worker.py b/src/eva/orchestrator/worker.py index 64ba9301..aad9fc06 100644 --- a/src/eva/orchestrator/worker.py +++ b/src/eva/orchestrator/worker.py @@ -179,90 +179,119 @@ async def run(self) -> ConversationResult: ) finally: await self._cleanup() - # Remove the log file handler after cleanup is complete - if self._log_file_handler: - remove_record_log_file(self._log_file_handler) - self._log_file_handler = None - # If the conversation errored, return a failed result immediately. DB hashes or latency stats cannot be computed if the run did not complete. - if error is not None: - now = datetime.now() + try: + # If the conversation errored, return a failed result immediately. + # DB hashes or latency stats cannot be computed if the run did not complete. + if error is not None: + now = datetime.now() + return ConversationResult( + record_id=self.record.id, + completed=False, + error=error, + error_details=error_details, + started_at=started_at, + ended_at=now, + duration_seconds=(now - started_at).total_seconds(), + output_dir=str(self.output_dir), + conversation_ended_reason="error", + ) + + ended_at = datetime.now() + + # Compute scenario database hashes (REQUIRED for deterministic metrics) + initial_db_path = self.output_dir / "initial_scenario_db.json" + final_db_path = self.output_dir / "final_scenario_db.json" + + if not initial_db_path.exists(): + raise FileNotFoundError( + f"Initial scenario database not found at {initial_db_path}. " + "This is required for deterministic task completion metrics." + ) + if not final_db_path.exists(): + raise FileNotFoundError( + f"Final scenario database not found at {final_db_path}. " + "This is required for deterministic task completion metrics." + ) + + with open(initial_db_path) as f: + initial_db = json.load(f) + with open(final_db_path) as f: + final_db = json.load(f) + + initial_scenario_db_hash = get_dict_hash(initial_db) + final_scenario_db_hash = get_dict_hash(final_db) + + logger.info( + f"Computed scenario DB hashes - Initial: {initial_scenario_db_hash[:8]}..., " + f"Final: {final_scenario_db_hash[:8]}..." + ) + + # Calculate latency statistics + llm_latency = self._calculate_llm_latency() + stt_latency = self._calculate_stt_latency() + tts_latency = self._calculate_tts_latency() + model_response_latency = self._calculate_model_response_latency() + return ConversationResult( record_id=self.record.id, - completed=False, + completed=error is None and conversation_ended_reason != "error", error=error, error_details=error_details, + llm_latency=llm_latency, + stt_latency=stt_latency, + tts_latency=tts_latency, + model_response_latency=model_response_latency, started_at=started_at, - ended_at=now, - duration_seconds=(now - started_at).total_seconds(), + ended_at=ended_at, + duration_seconds=(ended_at - started_at).total_seconds(), output_dir=str(self.output_dir), - conversation_ended_reason="error", + audio_assistant_path=str(self.output_dir / "audio_assistant.wav"), + audio_user_path=str(self.output_dir / "audio_user_clean.wav"), + audio_mixed_path=str(self.output_dir / "audio_mixed.wav"), + transcript_path=str(self.output_dir / "transcript.jsonl"), + audit_log_path=str(self.output_dir / "audit_log.json"), + conversation_log_path=str(self.output_dir / "logs.log"), + pipecat_logs_path=self._resolve_framework_logs_path(), + elevenlabs_logs_path=str(self.output_dir / "elevenlabs_events.jsonl"), + num_turns=self._conversation_stats.get("num_turns", 0), + num_tool_calls=self._conversation_stats.get("num_tool_calls", 0), + tools_called=self._conversation_stats.get("tools_called", []), + conversation_ended_reason=conversation_ended_reason, + initial_scenario_db_hash=initial_scenario_db_hash, + final_scenario_db_hash=final_scenario_db_hash, ) - ended_at = datetime.now() - - # Compute scenario database hashes (REQUIRED for deterministic metrics) - initial_db_path = self.output_dir / "initial_scenario_db.json" - final_db_path = self.output_dir / "final_scenario_db.json" - - if not initial_db_path.exists(): - raise FileNotFoundError( - f"Initial scenario database not found at {initial_db_path}. " - "This is required for deterministic task completion metrics." + except Exception as e: + # Post-conversation processing failed (hash computation, latency stats, etc.) + # Return a failed result so result.json still gets written by the runner, + # rather than raising and leaving no result.json on disk. + logger.error( + f"Post-conversation processing failed for {self.record.id}: {e}", + exc_info=True, ) - if not final_db_path.exists(): - raise FileNotFoundError( - f"Final scenario database not found at {final_db_path}. " - "This is required for deterministic task completion metrics." + now = datetime.now() + return ConversationResult( + record_id=self.record.id, + completed=False, + error=f"Post-conversation processing failed: {e}", + error_details=create_error_details(error=e, retry_count=0, retry_succeeded=False), + started_at=started_at, + ended_at=now, + duration_seconds=(now - started_at).total_seconds(), + output_dir=str(self.output_dir), + conversation_ended_reason=conversation_ended_reason, + num_turns=self._conversation_stats.get("num_turns", 0), + num_tool_calls=self._conversation_stats.get("num_tool_calls", 0), + tools_called=self._conversation_stats.get("tools_called", []), ) - with open(initial_db_path) as f: - initial_db = json.load(f) - with open(final_db_path) as f: - final_db = json.load(f) - - initial_scenario_db_hash = get_dict_hash(initial_db) - final_scenario_db_hash = get_dict_hash(final_db) - - logger.info( - f"Computed scenario DB hashes - Initial: {initial_scenario_db_hash[:8]}..., " - f"Final: {final_scenario_db_hash[:8]}..." - ) - - # Calculate latency statistics - llm_latency = self._calculate_llm_latency() - stt_latency = self._calculate_stt_latency() - tts_latency = self._calculate_tts_latency() - model_response_latency = self._calculate_model_response_latency() - - return ConversationResult( - record_id=self.record.id, - completed=error is None and conversation_ended_reason != "error", - error=error, - error_details=error_details, - llm_latency=llm_latency, - stt_latency=stt_latency, - tts_latency=tts_latency, - model_response_latency=model_response_latency, - started_at=started_at, - ended_at=ended_at, - duration_seconds=(ended_at - started_at).total_seconds(), - output_dir=str(self.output_dir), - audio_assistant_path=str(self.output_dir / "audio_assistant.wav"), - audio_user_path=str(self.output_dir / "audio_user_clean.wav"), - audio_mixed_path=str(self.output_dir / "audio_mixed.wav"), - transcript_path=str(self.output_dir / "transcript.jsonl"), - audit_log_path=str(self.output_dir / "audit_log.json"), - conversation_log_path=str(self.output_dir / "logs.log"), - pipecat_logs_path=self._resolve_framework_logs_path(), - elevenlabs_logs_path=str(self.output_dir / "elevenlabs_events.jsonl"), - num_turns=self._conversation_stats.get("num_turns", 0), - num_tool_calls=self._conversation_stats.get("num_tool_calls", 0), - tools_called=self._conversation_stats.get("tools_called", []), - conversation_ended_reason=conversation_ended_reason, - initial_scenario_db_hash=initial_scenario_db_hash, - final_scenario_db_hash=final_scenario_db_hash, - ) + finally: + # Remove the log file handler LAST so post-conversation errors + # are captured in the record's logs.log (not just the console). + if self._log_file_handler: + remove_record_log_file(self._log_file_handler) + self._log_file_handler = None async def _start_assistant(self) -> None: """Start the assistant server using the configured framework.""" From cb272004ebe7778959cc6b6da2133391e9fa31f0 Mon Sep 17 00:00:00 2001 From: Katrina Date: Wed, 29 Apr 2026 17:29:48 -0400 Subject: [PATCH 02/10] add error message --- src/eva/orchestrator/runner.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/eva/orchestrator/runner.py b/src/eva/orchestrator/runner.py index d9424bc9..d48295f4 100644 --- a/src/eva/orchestrator/runner.py +++ b/src/eva/orchestrator/runner.py @@ -226,6 +226,11 @@ async def _run_and_pipeline( result_path = self.output_dir / "records" / output_id / "result.json" result_path.parent.mkdir(parents=True, exist_ok=True) await asyncio.to_thread(result_path.write_text, result.model_dump_json(indent=2)) + else: + logger.error( + f"result.json NOT saved for {output_id}: expected ConversationResult, " + f"got {type(result).__name__} (module: {type(result).__module__}): {result!r}" + ) # Phase 3: If the conversation didn't complete, skip validation. # validate_one() handles the gate (conversation_valid_end); returning From a12cdd0b11dcd693f409db804cf349dc16b2f007 Mon Sep 17 00:00:00 2001 From: Katrina Date: Wed, 29 Apr 2026 18:19:11 -0400 Subject: [PATCH 03/10] return on exception instead of raising so the loop doesn't crash --- src/eva/orchestrator/runner.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/eva/orchestrator/runner.py b/src/eva/orchestrator/runner.py index d48295f4..cbf114ab 100644 --- a/src/eva/orchestrator/runner.py +++ b/src/eva/orchestrator/runner.py @@ -262,10 +262,11 @@ async def _run_and_pipeline( logger.warning(f"Skipping {output_id} as invalid: {exc}") return output_id, exc, False, None except Exception as exc: - # Anything else (KeyError, TypeError, programming bugs, etc.) is - # surfaced loudly so it doesn't get silently swallowed. + # Log loudly but return instead of raising — asyncio.gather does + # not use return_exceptions, so a raise here kills result.json + # writes for every other record still in flight. logger.error(f"Pipeline error for {output_id}: {exc}", exc_info=True) - raise + return output_id, exc, False, None pipeline_results = await asyncio.gather( *(_run_and_pipeline(output_id_to_record[oid], oid) for oid in pending_output_ids), From 6625aeddfedccc2e80e8441f49a77be87e00a7de Mon Sep 17 00:00:00 2001 From: Katrina Date: Wed, 29 Apr 2026 18:20:33 -0400 Subject: [PATCH 04/10] return on exception instead of raising so the loop doesn't crash --- src/eva/orchestrator/runner.py | 16 ++++------------ src/eva/orchestrator/worker.py | 11 ++++++++++- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/eva/orchestrator/runner.py b/src/eva/orchestrator/runner.py index cbf114ab..95b6d7d8 100644 --- a/src/eva/orchestrator/runner.py +++ b/src/eva/orchestrator/runner.py @@ -221,18 +221,10 @@ async def _run_and_pipeline( async with semaphore: result, audio_task = await self._run_conversation(record, output_id) - # Phase 2: Save result.json outside the semaphore - if isinstance(result, ConversationResult): - result_path = self.output_dir / "records" / output_id / "result.json" - result_path.parent.mkdir(parents=True, exist_ok=True) - await asyncio.to_thread(result_path.write_text, result.model_dump_json(indent=2)) - else: - logger.error( - f"result.json NOT saved for {output_id}: expected ConversationResult, " - f"got {type(result).__name__} (module: {type(result).__module__}): {result!r}" - ) - - # Phase 3: If the conversation didn't complete, skip validation. + # result.json is now written by the worker itself (inside the + # semaphore) so it survives task cancellation and process signals. + + # Phase 2: If the conversation didn't complete, skip validation. # validate_one() handles the gate (conversation_valid_end); returning # vr=None signals "not_finished" to the classification loop below. if not (isinstance(result, ConversationResult) and result.completed): diff --git a/src/eva/orchestrator/worker.py b/src/eva/orchestrator/worker.py index aad9fc06..01dd7aee 100644 --- a/src/eva/orchestrator/worker.py +++ b/src/eva/orchestrator/worker.py @@ -233,7 +233,7 @@ async def run(self) -> ConversationResult: tts_latency = self._calculate_tts_latency() model_response_latency = self._calculate_model_response_latency() - return ConversationResult( + result = ConversationResult( record_id=self.record.id, completed=error is None and conversation_ended_reason != "error", error=error, @@ -262,6 +262,15 @@ async def run(self) -> ConversationResult: final_scenario_db_hash=final_scenario_db_hash, ) + # Write result.json here (inside Phase 1 / semaphore) so it lands on + # disk alongside audit_log and scenario DBs. Previously this was done + # in the runner's Phase 2 *after* the semaphore release, where it was + # vulnerable to task cancellation from process signals or gather crashes. + result_path = self.output_dir / "result.json" + result_path.write_text(result.model_dump_json(indent=2)) + + return result + except Exception as e: # Post-conversation processing failed (hash computation, latency stats, etc.) # Return a failed result so result.json still gets written by the runner, From b18ef706e111c16bf545cf352bb8728a211b525c Mon Sep 17 00:00:00 2001 From: Katrina Date: Wed, 29 Apr 2026 20:13:59 -0400 Subject: [PATCH 05/10] try to log segfault --- Dockerfile | 2 +- src/eva/cli.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 0639ea42..ecbad86e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -66,7 +66,7 @@ RUN groupadd --gid 1000 eva && \ RUN mkdir -p /app/output && chown eva:eva /app/output # Python runtime settings -ENV PYTHONPATH="/app/src:$PYTHONPATH" +ENV PYTHONPATH="/app/src" ENV PYTHONUNBUFFERED=1 # Health check diff --git a/src/eva/cli.py b/src/eva/cli.py index d91265b2..2a2b4842 100644 --- a/src/eva/cli.py +++ b/src/eva/cli.py @@ -5,11 +5,14 @@ """ import asyncio +import faulthandler import os import sys from pydantic import ValidationError +faulthandler.enable() # Print Python stack trace on segfaults (exit 139) + def _extract_domain_spec() -> tuple[str | None, bool]: """Return (raw_domain_spec, came_from_argv). From 9c766802ff5cbbb839156680be2fb2fdecaec028 Mon Sep 17 00:00:00 2001 From: Katrina Date: Thu, 30 Apr 2026 11:57:27 -0400 Subject: [PATCH 06/10] add assets to dockerfile --- Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/Dockerfile b/Dockerfile index ecbad86e..fbb1d789 100644 --- a/Dockerfile +++ b/Dockerfile @@ -57,6 +57,7 @@ COPY src/ ./src/ COPY scripts/ ./scripts/ COPY configs/ ./configs/ COPY data/ ./data/ +COPY assets/ ./assets/ # Create non-root user for runtime security RUN groupadd --gid 1000 eva && \ From 398dcd70f08508df8457c597e6995a17998ebdef Mon Sep 17 00:00:00 2001 From: Katrina Date: Fri, 8 May 2026 12:33:15 -0700 Subject: [PATCH 07/10] update tts name for elevenlabs --- src/eva/models/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eva/models/config.py b/src/eva/models/config.py index 338d4097..e25699f2 100644 --- a/src/eva/models/config.py +++ b/src/eva/models/config.py @@ -200,7 +200,7 @@ def pipeline_parts(self) -> dict[str, str]: "s2s": _param_alias(self.s2s_params), "stt": "scribe_v2.2_realtime", "llm": "gemini-3-flash-preview", - "tts": "v3-conversational", + "tts": "eleven_flash_v2", } return {"s2s": _param_alias(self.s2s_params)} From 510c29031470b32a5eb3b0d3d6695ddea0be3efb Mon Sep 17 00:00:00 2001 From: Katrina Date: Fri, 8 May 2026 12:41:15 -0700 Subject: [PATCH 08/10] update llm name for elevenlabs --- src/eva/models/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eva/models/config.py b/src/eva/models/config.py index e25699f2..948c1a90 100644 --- a/src/eva/models/config.py +++ b/src/eva/models/config.py @@ -199,7 +199,7 @@ def pipeline_parts(self) -> dict[str, str]: return { "s2s": _param_alias(self.s2s_params), "stt": "scribe_v2.2_realtime", - "llm": "gemini-3-flash-preview", + "llm": "haiku-4-5", "tts": "eleven_flash_v2", } return {"s2s": _param_alias(self.s2s_params)} From 872be5774656a65137288d76ce6812adbe832aea Mon Sep 17 00:00:00 2001 From: Katrina Date: Fri, 8 May 2026 12:46:34 -0700 Subject: [PATCH 09/10] get model names from elevenlabs api --- src/eva/models/config.py | 50 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 4 deletions(-) diff --git a/src/eva/models/config.py b/src/eva/models/config.py index 948c1a90..8fe4c112 100644 --- a/src/eva/models/config.py +++ b/src/eva/models/config.py @@ -53,6 +53,51 @@ def _param_alias(params: dict[str, Any]) -> str: return params.get("alias") or params["model"] +_elevenlabs_agent_cache: dict[str, dict[str, str]] = {} + + +def _fetch_elevenlabs_agent_models(s2s_params: dict[str, Any]) -> dict[str, str]: + """Fetch STT, LLM, and TTS model names from the ElevenLabs agent API. + + Results are cached per agent ID so repeated calls (e.g. run_id generation) + don't hit the API multiple times. + """ + agent_id = s2s_params.get("assistant_agent_id", "") + if not agent_id: + logger.warning("No assistant_agent_id in s2s_params, cannot fetch ElevenLabs agent models") + return {"stt": "unknown", "llm": "unknown", "tts": "unknown"} + + if agent_id in _elevenlabs_agent_cache: + return _elevenlabs_agent_cache[agent_id] + + try: + from elevenlabs.client import ElevenLabs + + client = ElevenLabs(api_key=s2s_params.get("api_key")) + agent = client.conversational_ai.agents.get(agent_id=agent_id) + cc = agent.conversation_config + + stt = "unknown" + if cc.asr and cc.asr.provider: + stt = cc.asr.provider + + llm = "unknown" + if cc.agent and cc.agent.prompt and cc.agent.prompt.llm: + llm = cc.agent.prompt.llm + + tts = "unknown" + if cc.tts and cc.tts.model_id: + tts = cc.tts.model_id + + result = {"stt": stt, "llm": llm, "tts": tts} + _elevenlabs_agent_cache[agent_id] = result + logger.info(f"Fetched ElevenLabs agent models: {result}") + return result + except Exception as e: + logger.warning(f"Failed to fetch ElevenLabs agent models: {e}") + return {"stt": "unknown", "llm": "unknown", "tts": "unknown"} + + class PipelineConfig(BaseModel): """Configuration for a STT + LLM + TTS pipeline.""" @@ -195,12 +240,9 @@ class SpeechToSpeechConfig(BaseModel): def pipeline_parts(self) -> dict[str, str]: """Component names for this pipeline.""" if self.s2s == "elevenlabs": - # hardcoded for now. Models are set on the agent UI return { "s2s": _param_alias(self.s2s_params), - "stt": "scribe_v2.2_realtime", - "llm": "haiku-4-5", - "tts": "eleven_flash_v2", + **_fetch_elevenlabs_agent_models(self.s2s_params), } return {"s2s": _param_alias(self.s2s_params)} From 40b981449e090b98c332982c967b4423282948a5 Mon Sep 17 00:00:00 2001 From: Katrina Date: Fri, 8 May 2026 12:21:26 -0700 Subject: [PATCH 10/10] update elevenlabs tool script to control pre-tool speech and execution mode --- scripts/create_elevenlabs_tools.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/scripts/create_elevenlabs_tools.py b/scripts/create_elevenlabs_tools.py index 371406d9..6d98614c 100644 --- a/scripts/create_elevenlabs_tools.py +++ b/scripts/create_elevenlabs_tools.py @@ -86,7 +86,11 @@ def build_parameters(tool: dict) -> ObjectJsonSchemaPropertyInput: ) -def convert_tool(tool: dict) -> ToolRequestModel: +def convert_tool( + tool: dict, + pre_tool_speech: str = "force", + execution_mode: str = "immediate", +) -> ToolRequestModel: """Convert a YAML tool definition to an ElevenLabs ToolRequestModel.""" client_config = ToolRequestModelToolConfig_Client( type="client", @@ -94,6 +98,8 @@ def convert_tool(tool: dict) -> ToolRequestModel: description=f"{tool['name']}: {tool['description']}", expects_response=True, parameters=build_parameters(tool), + pre_tool_speech=pre_tool_speech, + execution_mode=execution_mode, ) return ToolRequestModel(tool_config=client_config) @@ -104,6 +110,18 @@ def main(): parser.add_argument("--agent-id", default="", help="ElevenLabs agent ID") parser.add_argument("--domain", default="airline", help="Agent domain name (e.g. airline, itsm, medical_hr)") parser.add_argument("--config", default=None, help="Path to agent YAML config (overrides --domain)") + parser.add_argument( + "--pre-tool-speech", + default="force", + choices=["auto", "force", "off"], + help="Pre-tool speech mode (default: force)", + ) + parser.add_argument( + "--execution-mode", + default="immediate", + choices=["immediate", "post_tool_speech", "async"], + help="Tool execution mode (default: immediate)", + ) parser.add_argument("--dry-run", action="store_true", help="Print tool configs without creating them") args = parser.parse_args() @@ -113,7 +131,7 @@ def main(): if args.dry_run: for tool in tools: - request_model = convert_tool(tool) + request_model = convert_tool(tool, pre_tool_speech=args.pre_tool_speech, execution_mode=args.execution_mode) tool_cfg = request_model.tool_config print(f"\n--- {tool_cfg.name} ---") print(f" description: {tool_cfg.description}")