From cb7ed658a443248c9e0c9e53a93ec7d70d025d17 Mon Sep 17 00:00:00 2001 From: Lukasz Czarnecki <6866304+lukcz@users.noreply.github.com> Date: Mon, 6 Apr 2026 07:28:06 +0200 Subject: [PATCH 1/3] Add native Assist streaming for OpenClaw conversation --- custom_components/openclaw/conversation.py | 231 +++++++++-- tests/test_conversation_streaming.py | 422 +++++++++++++++++++++ 2 files changed, 611 insertions(+), 42 deletions(-) create mode 100644 tests/test_conversation_streaming.py diff --git a/custom_components/openclaw/conversation.py b/custom_components/openclaw/conversation.py index 633a7d1..fc05f2d 100644 --- a/custom_components/openclaw/conversation.py +++ b/custom_components/openclaw/conversation.py @@ -9,13 +9,13 @@ from datetime import datetime, timezone import logging import re -from typing import Any +from typing import Any, AsyncIterator from homeassistant.components import conversation from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant +from homeassistant.helpers import chat_session, intent from homeassistant.helpers.entity_platform import AddEntitiesCallback -from homeassistant.helpers import intent from .api import OpenClawApiClient, OpenClawApiError from .const import ( @@ -57,8 +57,7 @@ async def async_setup_entry( async_add_entities: AddEntitiesCallback, ) -> None: """Set up the OpenClaw conversation agent.""" - agent = OpenClawConversationAgent(hass, entry) - conversation.async_set_agent(hass, entry, agent) + async_add_entities([OpenClawConversationAgent(hass, entry)]) async def async_unload_entry( @@ -66,21 +65,37 @@ async def async_unload_entry( entry: ConfigEntry, ) -> bool: """Unload the conversation agent.""" - conversation.async_unset_agent(hass, entry) return True -class OpenClawConversationAgent(conversation.AbstractConversationAgent): +class OpenClawConversationAgent( + conversation.ConversationEntity, + conversation.AbstractConversationAgent, +): """Conversation agent that routes messages through OpenClaw. Enables OpenClaw to appear as a selectable agent in the Assist pipeline, allowing use with Voice PE, satellites, and the built-in HA Assist dialog. """ + _attr_supports_streaming = True + def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None: """Initialize the conversation agent.""" self.hass = hass self.entry = entry + self._attr_unique_id = entry.entry_id + self._attr_name = entry.title or "OpenClaw" + + async def async_added_to_hass(self) -> None: + """Register the entity-backed agent when added to Home Assistant.""" + await super().async_added_to_hass() + conversation.async_set_agent(self.hass, self.entry, self) + + async def async_will_remove_from_hass(self) -> None: + """Unregister the entity-backed agent when removed from Home Assistant.""" + conversation.async_unset_agent(self.hass, self.entry) + await super().async_will_remove_from_hass() @property def attribution(self) -> dict[str, str]: @@ -132,7 +147,6 @@ async def async_process( ) ) resolved_agent_id = voice_agent_id or configured_agent_id - conversation_id = self._resolve_conversation_id(user_input, resolved_agent_id) active_model = self._normalize_optional_text(options.get("active_model")) include_context = options.get( CONF_INCLUDE_EXPOSED_CONTEXT, @@ -154,15 +168,20 @@ async def async_process( system_prompt = "\n\n".join( part for part in (exposed_context, extra_system_prompt) if part ) or None + backend_conversation_id = self._resolve_conversation_id( + user_input, + resolved_agent_id, + ) try: - full_response = await self._get_response( - client, - message, - conversation_id, - resolved_agent_id, - system_prompt, - active_model, + full_response, result = await self._async_process_with_chat_log( + user_input=user_input, + client=client, + backend_conversation_id=backend_conversation_id, + message=message, + agent_id=resolved_agent_id, + system_prompt=system_prompt, + model=active_model, ) except OpenClawApiError as err: _LOGGER.error("OpenClaw conversation error: %s", err) @@ -173,13 +192,14 @@ async def async_process( refreshed = await refresh_fn() if refreshed: try: - full_response = await self._get_response( - client, - message, - conversation_id, - resolved_agent_id, - system_prompt, - active_model, + full_response, result = await self._async_process_with_chat_log( + user_input=user_input, + client=client, + backend_conversation_id=backend_conversation_id, + message=message, + agent_id=resolved_agent_id, + system_prompt=system_prompt, + model=active_model, ) except OpenClawApiError as retry_err: return self._error_result( @@ -202,21 +222,15 @@ async def async_process( EVENT_MESSAGE_RECEIVED, { ATTR_MESSAGE: full_response, - ATTR_SESSION_ID: conversation_id, - ATTR_MODEL: coordinator.data.get(DATA_MODEL) if coordinator.data else None, + ATTR_SESSION_ID: backend_conversation_id, + ATTR_MODEL: ( + coordinator.data.get(DATA_MODEL) if coordinator.data else None + ), ATTR_TIMESTAMP: datetime.now(timezone.utc).isoformat(), }, ) coordinator.update_last_activity() - - intent_response = intent.IntentResponse(language=user_input.language) - intent_response.async_set_speech(full_response) - - return conversation.ConversationResult( - response=intent_response, - conversation_id=conversation_id, - continue_conversation=self._should_continue(full_response), - ) + return result def _resolve_conversation_id( self, @@ -269,30 +283,163 @@ async def _get_response( system_prompt: str | None = None, model: str | None = None, ) -> str: - """Get a response from OpenClaw, trying streaming first.""" - full_response = "" - async for chunk in client.async_stream_message( + """Get a non-streaming response from OpenClaw.""" + response = await client.async_send_message( message=message, session_id=conversation_id, model=model, system_prompt=system_prompt, agent_id=agent_id, extra_headers=_VOICE_REQUEST_HEADERS, + ) + return extract_text_recursive(response) or "" + + async def _async_process_with_chat_log( + self, + user_input: conversation.ConversationInput, + client: OpenClawApiClient, + backend_conversation_id: str, + message: str, + agent_id: str | None = None, + system_prompt: str | None = None, + model: str | None = None, + ) -> tuple[str, conversation.ConversationResult]: + """Write the assistant response into the Home Assistant chat log.""" + chat_log_conversation_id = user_input.conversation_id or backend_conversation_id + + with ( + chat_session.async_get_chat_session( + self.hass, + chat_log_conversation_id, + ) as session, + conversation.async_get_chat_log( + self.hass, + session, + user_input, + ) as chat_log, + ): + full_response = await self._async_populate_chat_log( + chat_log=chat_log, + client=client, + message=message, + conversation_id=backend_conversation_id, + agent_id=agent_id, + system_prompt=system_prompt, + model=model, + ) + result = conversation.async_get_result_from_chat_log(user_input, chat_log) + result.continue_conversation = ( + self._should_continue(full_response) or result.continue_conversation + ) + return full_response, result + + async def _async_populate_chat_log( + self, + chat_log: conversation.ChatLog, + client: OpenClawApiClient, + message: str, + conversation_id: str, + agent_id: str | None = None, + system_prompt: str | None = None, + model: str | None = None, + ) -> str: + """Populate the HA chat log from streaming or fallback responses.""" + try: + full_response = await self._async_stream_response_to_chat_log( + chat_log=chat_log, + client=client, + message=message, + conversation_id=conversation_id, + agent_id=agent_id, + system_prompt=system_prompt, + model=model, + ) + if full_response: + return full_response + except OpenClawApiError as err: + _LOGGER.warning( + "OpenClaw streaming failed, falling back to non-streaming response: %s", + err, + ) + + full_response = await self._get_response( + client=client, + message=message, + conversation_id=conversation_id, + agent_id=agent_id, + system_prompt=system_prompt, + model=model, + ) + self._add_final_response_to_chat_log(chat_log, full_response) + return full_response + + async def _async_stream_response_to_chat_log( + self, + chat_log: conversation.ChatLog, + client: OpenClawApiClient, + message: str, + conversation_id: str, + agent_id: str | None = None, + system_prompt: str | None = None, + model: str | None = None, + ) -> str: + """Stream OpenClaw deltas into the HA chat log.""" + full_response_parts: list[str] = [] + async for content in chat_log.async_add_delta_content_stream( + self._chat_log_agent_id, + self._async_openclaw_delta_stream( + client=client, + message=message, + conversation_id=conversation_id, + agent_id=agent_id, + system_prompt=system_prompt, + model=model, + ), ): - full_response += chunk + if isinstance(content, conversation.AssistantContent) and content.content: + full_response_parts.append(content.content) + return "".join(full_response_parts) - if full_response: - return full_response + async def _async_openclaw_delta_stream( + self, + client: OpenClawApiClient, + message: str, + conversation_id: str, + agent_id: str | None = None, + system_prompt: str | None = None, + model: str | None = None, + ) -> AsyncIterator[dict[str, Any]]: + """Map OpenClaw SSE chunks to the delta format expected by HA.""" + yield {"role": "assistant"} - response = await client.async_send_message( + async for chunk in client.async_stream_message( message=message, session_id=conversation_id, model=model, system_prompt=system_prompt, agent_id=agent_id, extra_headers=_VOICE_REQUEST_HEADERS, + ): + if chunk: + yield {"content": chunk} + + @property + def _chat_log_agent_id(self) -> str: + """Return the assistant identifier used for HA chat log messages.""" + return self.entity_id or self.entry.entry_id + + def _add_final_response_to_chat_log( + self, + chat_log: conversation.ChatLog, + full_response: str, + ) -> None: + """Append a non-streaming final assistant message to the chat log.""" + chat_log.async_add_assistant_content_without_tools( + conversation.AssistantContent( + agent_id=self._chat_log_agent_id, + content=full_response or None, + ) ) - return extract_text_recursive(response) or "" @staticmethod def _should_continue(response: str) -> bool: @@ -312,8 +459,8 @@ def _should_continue(response: str) -> bool: text = response.strip() # Check if the response ends with a question mark - # (allow trailing punctuation like quotes, parens, or emoji) - if re.search(r"\?\s*[\"'""ยป)\]]*\s*$", text): + # (allow trailing punctuation like quotes or parens) + if re.search(r"\?\s*['\")\]]*\s*$", text): return True # Common follow-up patterns (EN + DE) diff --git a/tests/test_conversation_streaming.py b/tests/test_conversation_streaming.py new file mode 100644 index 0000000..4f62ce8 --- /dev/null +++ b/tests/test_conversation_streaming.py @@ -0,0 +1,422 @@ +from __future__ import annotations + +import asyncio +from contextlib import contextmanager +from dataclasses import dataclass +import importlib.util +from pathlib import Path +import sys +import types +from typing import Any + +import pytest + + +REPO_ROOT = Path(__file__).resolve().parents[1] +MODULE_NAME = "custom_components.openclaw.conversation" +MODULE_PATH = REPO_ROOT / "custom_components" / "openclaw" / "conversation.py" + + +class FakeBus: + def __init__(self) -> None: + self.events: list[tuple[str, dict[str, Any]]] = [] + + def async_fire(self, event_type: str, event_data: dict[str, Any]) -> None: + self.events.append((event_type, event_data)) + + +class FakeHass: + def __init__(self) -> None: + self.data: dict[str, Any] = {} + self.bus = FakeBus() + self._last_chat_log: FakeChatLog | None = None + + +@dataclass +class FakeConfigEntry: + entry_id: str + title: str + data: dict[str, Any] + options: dict[str, Any] + + +@dataclass +class FakeContext: + user_id: str | None = None + + +@dataclass +class FakeConversationInput: + text: str + context: FakeContext + conversation_id: str | None + device_id: str | None + satellite_id: str | None + language: str + agent_id: str + extra_system_prompt: str | None = None + + +@dataclass +class FakeConversationResult: + response: Any + conversation_id: str | None = None + continue_conversation: bool = False + + +@dataclass +class FakeAssistantContent: + agent_id: str + content: str | None = None + + +class FakeIntentResponse: + def __init__(self, language: str) -> None: + self.language = language + self.speech: str = "" + self.error: tuple[str, str] | None = None + + def async_set_speech(self, speech: str) -> None: + self.speech = speech + + def async_set_error(self, code: str, message: str) -> None: + self.error = (code, message) + + +class FakeConversationEntity: + _attr_supports_streaming = False + + def __init__(self, *args: Any, **kwargs: Any) -> None: + self.entity_id: str | None = None + + @property + def supports_streaming(self) -> bool: + return self._attr_supports_streaming + + async def async_added_to_hass(self) -> None: + return None + + async def async_will_remove_from_hass(self) -> None: + return None + + +class FakeAbstractConversationAgent: + pass + + +class FakeChatLog: + def __init__(self, conversation_id: str) -> None: + self.conversation_id = conversation_id + self.deltas: list[dict[str, Any]] = [] + self.added: list[FakeAssistantContent] = [] + self.continue_conversation = False + + async def async_add_delta_content_stream( + self, + agent_id: str, + stream, + ): + full_response = "" + async for delta in stream: + self.deltas.append(delta) + if content := delta.get("content"): + full_response += content + + if full_response: + assistant_content = FakeAssistantContent( + agent_id=agent_id, + content=full_response, + ) + self.added.append(assistant_content) + yield assistant_content + + def async_add_assistant_content_without_tools( + self, + content: FakeAssistantContent, + ) -> None: + self.added.append(content) + + +class FakeCoordinator: + def __init__(self, data: dict[str, Any] | None = None) -> None: + self.data = data or {} + self.updated = False + + def update_last_activity(self) -> None: + self.updated = True + + +class FakeClient: + def __init__( + self, + *, + stream_chunks: list[str] | None = None, + response: dict[str, Any] | None = None, + stream_error: Exception | None = None, + ) -> None: + self.stream_chunks = stream_chunks or [] + self.response = response or {"text": ""} + self.stream_error = stream_error + self.stream_calls: list[dict[str, Any]] = [] + self.send_calls: list[dict[str, Any]] = [] + + async def async_stream_message(self, **kwargs: Any): + self.stream_calls.append(kwargs) + if self.stream_error is not None: + raise self.stream_error + + for chunk in self.stream_chunks: + yield chunk + + async def async_send_message(self, **kwargs: Any) -> dict[str, Any]: + self.send_calls.append(kwargs) + if isinstance(self.response, Exception): + raise self.response + return self.response + + +def _extract_text_recursive(value: Any) -> str | None: + if isinstance(value, str): + return value + if isinstance(value, dict): + for key in ("output_text", "text", "content", "message", "response", "answer"): + nested = value.get(key) + if nested: + return _extract_text_recursive(nested) + return None + + +def _install_stub_modules() -> None: + for name in list(sys.modules): + if name == "homeassistant" or name.startswith("homeassistant."): + sys.modules.pop(name) + if name == "custom_components" or name.startswith("custom_components.openclaw"): + sys.modules.pop(name) + + homeassistant = types.ModuleType("homeassistant") + components = types.ModuleType("homeassistant.components") + helpers = types.ModuleType("homeassistant.helpers") + + conversation_module = types.ModuleType("homeassistant.components.conversation") + conversation_module.MATCH_ALL = "*" + conversation_module.ConversationEntity = FakeConversationEntity + conversation_module.AbstractConversationAgent = FakeAbstractConversationAgent + conversation_module.ConversationInput = FakeConversationInput + conversation_module.ConversationResult = FakeConversationResult + conversation_module.ChatLog = FakeChatLog + conversation_module.AssistantContent = FakeAssistantContent + conversation_module.async_set_agent = lambda hass, entry, agent: None + conversation_module.async_unset_agent = lambda hass, entry: None + + @contextmanager + def async_get_chat_log(hass: FakeHass, session, user_input): + chat_log = FakeChatLog(session.conversation_id) + hass._last_chat_log = chat_log + yield chat_log + + def async_get_result_from_chat_log(user_input: FakeConversationInput, chat_log: FakeChatLog): + response = FakeIntentResponse(user_input.language) + last_content = chat_log.added[-1].content if chat_log.added else "" + response.async_set_speech(last_content or "") + return FakeConversationResult( + response=response, + conversation_id=chat_log.conversation_id, + continue_conversation=chat_log.continue_conversation, + ) + + conversation_module.async_get_chat_log = async_get_chat_log + conversation_module.async_get_result_from_chat_log = async_get_result_from_chat_log + + chat_session_module = types.ModuleType("homeassistant.helpers.chat_session") + + @contextmanager + def async_get_chat_session(hass: FakeHass, conversation_id: str | None): + yield types.SimpleNamespace( + conversation_id=conversation_id or "generated-conversation-id" + ) + + chat_session_module.async_get_chat_session = async_get_chat_session + + intent_module = types.ModuleType("homeassistant.helpers.intent") + intent_module.IntentResponse = FakeIntentResponse + intent_module.IntentResponseErrorCode = types.SimpleNamespace(UNKNOWN="unknown") + + entity_platform_module = types.ModuleType("homeassistant.helpers.entity_platform") + entity_platform_module.AddEntitiesCallback = object + + config_entries_module = types.ModuleType("homeassistant.config_entries") + config_entries_module.ConfigEntry = FakeConfigEntry + + core_module = types.ModuleType("homeassistant.core") + core_module.HomeAssistant = FakeHass + + custom_components = types.ModuleType("custom_components") + custom_components.__path__ = [str(REPO_ROOT / "custom_components")] + + openclaw_package = types.ModuleType("custom_components.openclaw") + openclaw_package.__path__ = [str(REPO_ROOT / "custom_components" / "openclaw")] + + api_module = types.ModuleType("custom_components.openclaw.api") + + class OpenClawApiError(Exception): + pass + + class OpenClawApiClient: + pass + + api_module.OpenClawApiError = OpenClawApiError + api_module.OpenClawApiClient = OpenClawApiClient + + const_module = types.ModuleType("custom_components.openclaw.const") + const_values = { + "ATTR_MESSAGE": "message", + "ATTR_MODEL": "model", + "ATTR_SESSION_ID": "session_id", + "ATTR_TIMESTAMP": "timestamp", + "CONF_ASSIST_SESSION_ID": "assist_session_id", + "CONF_AGENT_ID": "agent_id", + "CONF_CONTEXT_MAX_CHARS": "context_max_chars", + "CONF_CONTEXT_STRATEGY": "context_strategy", + "CONF_INCLUDE_EXPOSED_CONTEXT": "include_exposed_context", + "CONF_VOICE_AGENT_ID": "voice_agent_id", + "DEFAULT_ASSIST_SESSION_ID": "", + "DEFAULT_AGENT_ID": "main", + "DEFAULT_CONTEXT_MAX_CHARS": 13000, + "DEFAULT_CONTEXT_STRATEGY": "truncate", + "DEFAULT_INCLUDE_EXPOSED_CONTEXT": True, + "DATA_MODEL": "model", + "DOMAIN": "openclaw", + "EVENT_MESSAGE_RECEIVED": "openclaw_message_received", + } + for key, value in const_values.items(): + setattr(const_module, key, value) + + coordinator_module = types.ModuleType("custom_components.openclaw.coordinator") + coordinator_module.OpenClawCoordinator = FakeCoordinator + + exposure_module = types.ModuleType("custom_components.openclaw.exposure") + exposure_module.apply_context_policy = lambda raw, max_chars, strategy: raw + exposure_module.build_exposed_entities_context = lambda hass, assistant: None + + helpers_module = types.ModuleType("custom_components.openclaw.helpers") + helpers_module.extract_text_recursive = _extract_text_recursive + + sys.modules["homeassistant"] = homeassistant + sys.modules["homeassistant.components"] = components + sys.modules["homeassistant.components.conversation"] = conversation_module + sys.modules["homeassistant.helpers"] = helpers + sys.modules["homeassistant.helpers.chat_session"] = chat_session_module + sys.modules["homeassistant.helpers.intent"] = intent_module + sys.modules["homeassistant.helpers.entity_platform"] = entity_platform_module + sys.modules["homeassistant.config_entries"] = config_entries_module + sys.modules["homeassistant.core"] = core_module + sys.modules["custom_components"] = custom_components + sys.modules["custom_components.openclaw"] = openclaw_package + sys.modules["custom_components.openclaw.api"] = api_module + sys.modules["custom_components.openclaw.const"] = const_module + sys.modules["custom_components.openclaw.coordinator"] = coordinator_module + sys.modules["custom_components.openclaw.exposure"] = exposure_module + sys.modules["custom_components.openclaw.helpers"] = helpers_module + + +def _load_conversation_module(): + _install_stub_modules() + spec = importlib.util.spec_from_file_location(MODULE_NAME, MODULE_PATH) + module = importlib.util.module_from_spec(spec) + assert spec and spec.loader + sys.modules[MODULE_NAME] = module + spec.loader.exec_module(module) + return module + + +@pytest.fixture() +def conversation_module(): + return _load_conversation_module() + + +def _make_agent(conversation_module, *, client: FakeClient, options: dict[str, Any] | None = None): + hass = FakeHass() + entry = FakeConfigEntry( + entry_id="entry-1", + title="OpenClaw", + data={"agent_id": "main"}, + options=options or {}, + ) + coordinator = FakeCoordinator({"model": "model-x"}) + hass.data[conversation_module.DOMAIN] = { + entry.entry_id: { + "client": client, + "coordinator": coordinator, + } + } + agent = conversation_module.OpenClawConversationAgent(hass, entry) + agent.entity_id = "conversation.openclaw" + return agent, hass, coordinator + + +def _make_user_input(*, text: str, conversation_id: str | None = "conv-1"): + return FakeConversationInput( + text=text, + context=FakeContext(user_id="user-123"), + conversation_id=conversation_id, + device_id=None, + satellite_id=None, + language="en", + agent_id="openclaw", + ) + + +def test_async_process_streams_into_chat_log(conversation_module) -> None: + client = FakeClient(stream_chunks=["Ala ", "ma ", "kota"]) + agent, hass, coordinator = _make_agent(conversation_module, client=client) + + result = asyncio.run(agent.async_process(_make_user_input(text="Hello there"))) + + assert result.response.speech == "Ala ma kota" + assert result.conversation_id == "conv-1" + assert coordinator.updated is True + assert client.send_calls == [] + assert client.stream_calls[0]["session_id"] == "conv-1:main" + assert hass._last_chat_log is not None + assert hass._last_chat_log.deltas == [ + {"role": "assistant"}, + {"content": "Ala "}, + {"content": "ma "}, + {"content": "kota"}, + ] + assert hass.bus.events[0][1]["session_id"] == "conv-1:main" + + +def test_async_process_falls_back_when_stream_is_empty(conversation_module) -> None: + client = FakeClient(stream_chunks=[], response={"text": "Fallback reply"}) + agent, hass, _ = _make_agent(conversation_module, client=client) + + result = asyncio.run(agent.async_process(_make_user_input(text="Hello there"))) + + assert result.response.speech == "Fallback reply" + assert len(client.send_calls) == 1 + assert hass._last_chat_log is not None + assert hass._last_chat_log.added[-1].content == "Fallback reply" + + +def test_agent_advertises_streaming_support(conversation_module) -> None: + agent, _, _ = _make_agent(conversation_module, client=FakeClient()) + + assert agent.supports_streaming is True + assert agent._attr_supports_streaming is True + + +def test_conversation_id_resolution_keeps_agent_namespace(conversation_module) -> None: + agent, _, _ = _make_agent(conversation_module, client=FakeClient()) + user_input = _make_user_input(text="Hello there", conversation_id="session-42") + + assert agent._resolve_conversation_id(user_input, "assistant-a") == "session-42:assistant-a" + + +def test_question_replies_keep_continue_conversation(conversation_module) -> None: + client = FakeClient(stream_chunks=["Need anything else?"]) + agent, _, _ = _make_agent(conversation_module, client=client) + + result = asyncio.run(agent.async_process(_make_user_input(text="Hello there"))) + + assert result.continue_conversation is True From 3163192a6be2d99ee08cf40dcb40e65e4f0d4e96 Mon Sep 17 00:00:00 2001 From: Lukasz Czarnecki <6866304+lukcz@users.noreply.github.com> Date: Mon, 6 Apr 2026 08:04:53 +0200 Subject: [PATCH 2/3] Stream thinking deltas to Assist chat log --- custom_components/openclaw/api.py | 77 +++++++++++++++++++--- custom_components/openclaw/conversation.py | 13 +++- tests/test_conversation_streaming.py | 47 +++++++++++-- 3 files changed, 122 insertions(+), 15 deletions(-) diff --git a/custom_components/openclaw/api.py b/custom_components/openclaw/api.py index bd277d8..85a9c7a 100644 --- a/custom_components/openclaw/api.py +++ b/custom_components/openclaw/api.py @@ -5,7 +5,7 @@ import asyncio import json import logging -from typing import Any, AsyncIterator +from typing import Any, AsyncIterator, TypedDict import aiohttp @@ -14,6 +14,7 @@ API_MODELS, API_TOOLS_INVOKE, ) +from .helpers import extract_text_recursive _LOGGER = logging.getLogger(__name__) @@ -36,6 +37,13 @@ class OpenClawAuthError(OpenClawApiError): """Authentication with OpenClaw gateway failed.""" +class OpenClawStreamDelta(TypedDict, total=False): + """Structured delta emitted by the OpenClaw streaming API.""" + + content: str + thinking_content: str + + class OpenClawApiClient: """HTTP client for the OpenClaw gateway API. @@ -273,7 +281,7 @@ async def async_stream_message( ) -> AsyncIterator[str]: """Send a chat message and stream the response via SSE. - Yields delta content strings as they arrive from the gateway. + Yields only assistant text deltas for backwards compatibility. Args: message: The user message text. @@ -285,6 +293,27 @@ async def async_stream_message( Yields: Content delta strings from the streaming response. """ + async for delta in self.async_stream_message_deltas( + message=message, + session_id=session_id, + model=model, + system_prompt=system_prompt, + agent_id=agent_id, + extra_headers=extra_headers, + ): + if content := delta.get("content"): + yield content + + async def async_stream_message_deltas( + self, + message: str, + session_id: str | None = None, + model: str | None = None, + system_prompt: str | None = None, + agent_id: str | None = None, + extra_headers: dict[str, str] | None = None, + ) -> AsyncIterator[OpenClawStreamDelta]: + """Send a chat message and stream structured deltas via SSE.""" messages: list[dict[str, str]] = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) @@ -334,12 +363,8 @@ async def async_stream_message( try: chunk = json.loads(data_str) - choices = chunk.get("choices", []) - if choices: - delta = choices[0].get("delta", {}) - content = delta.get("content") - if content: - yield content + if delta := self._extract_stream_delta(chunk): + yield delta except json.JSONDecodeError: _LOGGER.debug("Skipping non-JSON SSE line: %s", data_str[:100]) @@ -348,6 +373,42 @@ async def async_stream_message( f"Cannot connect to OpenClaw gateway: {err}" ) from err + @staticmethod + def _extract_stream_delta(chunk: dict[str, Any]) -> OpenClawStreamDelta | None: + """Extract assistant text and reasoning deltas from a stream chunk.""" + choices = chunk.get("choices", []) + if not choices: + return None + + delta = choices[0].get("delta", {}) + if not isinstance(delta, dict): + return None + + stream_delta: OpenClawStreamDelta = {} + + if content := delta.get("content"): + stream_delta["content"] = content + + if thinking_content := OpenClawApiClient._extract_thinking_content(delta): + stream_delta["thinking_content"] = thinking_content + + return stream_delta or None + + @staticmethod + def _extract_thinking_content(delta: dict[str, Any]) -> str | None: + """Extract reasoning text from provider-specific delta shapes.""" + for key in ( + "thinking_content", + "reasoning_content", + "thinking", + "reasoning", + ): + extracted = extract_text_recursive(delta.get(key)) + if extracted: + return extracted + + return None + async def async_check_connection(self) -> bool: """Check if the gateway is reachable, API is enabled, and auth works. diff --git a/custom_components/openclaw/conversation.py b/custom_components/openclaw/conversation.py index fc05f2d..83dffe9 100644 --- a/custom_components/openclaw/conversation.py +++ b/custom_components/openclaw/conversation.py @@ -412,7 +412,7 @@ async def _async_openclaw_delta_stream( """Map OpenClaw SSE chunks to the delta format expected by HA.""" yield {"role": "assistant"} - async for chunk in client.async_stream_message( + async for chunk in client.async_stream_message_deltas( message=message, session_id=conversation_id, model=model, @@ -420,8 +420,15 @@ async def _async_openclaw_delta_stream( agent_id=agent_id, extra_headers=_VOICE_REQUEST_HEADERS, ): - if chunk: - yield {"content": chunk} + delta: dict[str, Any] = {} + + if thinking_content := chunk.get("thinking_content"): + delta["thinking_content"] = thinking_content + if content := chunk.get("content"): + delta["content"] = content + + if delta: + yield delta @property def _chat_log_agent_id(self) -> str: diff --git a/tests/test_conversation_streaming.py b/tests/test_conversation_streaming.py index 4f62ce8..af241a2 100644 --- a/tests/test_conversation_streaming.py +++ b/tests/test_conversation_streaming.py @@ -68,6 +68,7 @@ class FakeConversationResult: class FakeAssistantContent: agent_id: str content: str | None = None + thinking_content: str | None = None class FakeIntentResponse: @@ -117,15 +118,19 @@ async def async_add_delta_content_stream( stream, ): full_response = "" + full_thinking = "" async for delta in stream: self.deltas.append(delta) if content := delta.get("content"): full_response += content + if thinking_content := delta.get("thinking_content"): + full_thinking += thinking_content - if full_response: + if full_response or full_thinking: assistant_content = FakeAssistantContent( agent_id=agent_id, content=full_response, + thinking_content=full_thinking, ) self.added.append(assistant_content) yield assistant_content @@ -151,22 +156,31 @@ def __init__( self, *, stream_chunks: list[str] | None = None, + stream_deltas: list[dict[str, str]] | None = None, response: dict[str, Any] | None = None, stream_error: Exception | None = None, ) -> None: self.stream_chunks = stream_chunks or [] + self.stream_deltas = stream_deltas or [ + {"content": chunk} for chunk in self.stream_chunks + ] self.response = response or {"text": ""} self.stream_error = stream_error self.stream_calls: list[dict[str, Any]] = [] self.send_calls: list[dict[str, Any]] = [] - async def async_stream_message(self, **kwargs: Any): + async def async_stream_message_deltas(self, **kwargs: Any): self.stream_calls.append(kwargs) if self.stream_error is not None: raise self.stream_error - for chunk in self.stream_chunks: - yield chunk + for delta in self.stream_deltas: + yield delta + + async def async_stream_message(self, **kwargs: Any): + async for delta in self.async_stream_message_deltas(**kwargs): + if content := delta.get("content"): + yield content async def async_send_message(self, **kwargs: Any) -> dict[str, Any]: self.send_calls.append(kwargs) @@ -399,6 +413,31 @@ def test_async_process_falls_back_when_stream_is_empty(conversation_module) -> N assert hass._last_chat_log.added[-1].content == "Fallback reply" +def test_async_process_streams_thinking_content(conversation_module) -> None: + client = FakeClient( + stream_deltas=[ + {"thinking_content": "Analyzing the request. "}, + {"thinking_content": "Checking context. "}, + {"content": "Final answer"}, + ] + ) + agent, hass, _ = _make_agent(conversation_module, client=client) + + result = asyncio.run(agent.async_process(_make_user_input(text="Hello there"))) + + assert result.response.speech == "Final answer" + assert hass._last_chat_log is not None + assert hass._last_chat_log.deltas == [ + {"role": "assistant"}, + {"thinking_content": "Analyzing the request. "}, + {"thinking_content": "Checking context. "}, + {"content": "Final answer"}, + ] + assert hass._last_chat_log.added[-1].thinking_content == ( + "Analyzing the request. Checking context. " + ) + + def test_agent_advertises_streaming_support(conversation_module) -> None: agent, _, _ = _make_agent(conversation_module, client=FakeClient()) From 914b73e63110593e36a826d0e598d22aa0dedc50 Mon Sep 17 00:00:00 2001 From: Lukasz Czarnecki <6866304+lukcz@users.noreply.github.com> Date: Mon, 6 Apr 2026 08:42:43 +0200 Subject: [PATCH 3/3] Add debug logging for Assist streaming deltas --- custom_components/openclaw/api.py | 6 ++++++ custom_components/openclaw/conversation.py | 23 ++++++++++++++++++++-- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/custom_components/openclaw/api.py b/custom_components/openclaw/api.py index 85a9c7a..b5f40d1 100644 --- a/custom_components/openclaw/api.py +++ b/custom_components/openclaw/api.py @@ -364,6 +364,12 @@ async def async_stream_message_deltas( try: chunk = json.loads(data_str) if delta := self._extract_stream_delta(chunk): + _LOGGER.debug( + "OpenClaw streaming delta received " + "(content_len=%s, thinking_len=%s)", + len(delta.get("content", "")), + len(delta.get("thinking_content", "")), + ) yield delta except json.JSONDecodeError: _LOGGER.debug("Skipping non-JSON SSE line: %s", data_str[:100]) diff --git a/custom_components/openclaw/conversation.py b/custom_components/openclaw/conversation.py index 83dffe9..2adc590 100644 --- a/custom_components/openclaw/conversation.py +++ b/custom_components/openclaw/conversation.py @@ -385,6 +385,8 @@ async def _async_stream_response_to_chat_log( ) -> str: """Stream OpenClaw deltas into the HA chat log.""" full_response_parts: list[str] = [] + content_chunk_count = 0 + thinking_chunk_count = 0 async for content in chat_log.async_add_delta_content_stream( self._chat_log_agent_id, self._async_openclaw_delta_stream( @@ -396,8 +398,19 @@ async def _async_stream_response_to_chat_log( model=model, ), ): - if isinstance(content, conversation.AssistantContent) and content.content: - full_response_parts.append(content.content) + if isinstance(content, conversation.AssistantContent): + if content.content: + full_response_parts.append(content.content) + content_chunk_count += 1 + if content.thinking_content: + thinking_chunk_count += 1 + + _LOGGER.debug( + "OpenClaw chat log stream completed " + "(assistant_messages=%s, thinking_messages=%s)", + content_chunk_count, + thinking_chunk_count, + ) return "".join(full_response_parts) async def _async_openclaw_delta_stream( @@ -428,6 +441,12 @@ async def _async_openclaw_delta_stream( delta["content"] = content if delta: + _LOGGER.debug( + "Forwarding OpenClaw delta to HA chat log " + "(content_len=%s, thinking_len=%s)", + len(delta.get("content", "")), + len(delta.get("thinking_content", "")), + ) yield delta @property