Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions livekit-agents/livekit/agents/voice/agent_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ def __init__(self, agent: Agent, sess: AgentSession) -> None:
# speeches that audio playout finished but not done because of tool calls
self._background_speeches: set[SpeechHandle] = set()

# placeholder used to hold a RunResult open while waiting for a realtime
# model to auto-generate a tool reply (auto_tool_reply_generation=True).
self._pending_auto_tool_reply_fut: asyncio.Future[None] | None = None

def _validate_turn_detection(
self, turn_detection: TurnDetectionMode | None
) -> TurnDetectionMode | None:
Expand Down Expand Up @@ -1585,6 +1589,13 @@ def _on_generation_created(self, ev: llm.GenerationCreatedEvent) -> None:
speech_handle=handle,
name="AgentActivity.realtime_generation",
)

if (fut := self._pending_auto_tool_reply_fut) and not fut.done():
if (run_state := self._session._global_run_state) is not None and not run_state.done():
run_state._watch_handle(handle)
self._pending_auto_tool_reply_fut = None
fut.set_result(None)

self._schedule_speech(handle, SpeechHandle.SPEECH_PRIORITY_NORMAL)

def _interrupt_by_audio_activity(
Expand Down Expand Up @@ -3378,6 +3389,32 @@ def _create_assistant_message(
else:
await asyncio.sleep(0)

# if the realtime model auto-generates the tool reply, install a
# placeholder so the active RunResult waits for that reply
auto_reply_fut: asyncio.Future[None] | None = None
if (
self.llm.capabilities.auto_tool_reply_generation
and fnc_executed_ev._reply_required
and self._pending_auto_tool_reply_fut is None
and (run_state := self._session._global_run_state) is not None
and not run_state.done()
):
auto_reply_fut = asyncio.get_event_loop().create_future()
self._pending_auto_tool_reply_fut = auto_reply_fut
llm_label = self.llm._label

async def _wait_for_auto_tool_reply() -> None:
try:
await asyncio.wait_for(asyncio.shield(auto_reply_fut), 5.0)
except asyncio.TimeoutError:
logger.warning(
"timed out waiting for realtime auto tool reply from %s",
llm_label,
)

task = asyncio.create_task(_wait_for_auto_tool_reply())
run_state._watch_handle(task)

chat_ctx = self._rt_session.chat_ctx.copy()
chat_ctx.items.extend(new_fnc_outputs)
try:
Expand All @@ -3387,6 +3424,10 @@ def _create_assistant_message(
"failed to update chat context before generating the function calls results", # noqa: E501
extra={"error": str(e)},
)
if auto_reply_fut is not None and not auto_reply_fut.done():
if self._pending_auto_tool_reply_fut is auto_reply_fut:
self._pending_auto_tool_reply_fut = None
auto_reply_fut.set_result(None)

if (
fnc_executed_ev._reply_required
Expand Down
Loading