Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 75 additions & 8 deletions custom_components/openclaw/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import asyncio
import json
import logging
from typing import Any, AsyncIterator
from typing import Any, AsyncIterator, TypedDict

import aiohttp

Expand All @@ -14,6 +14,7 @@
API_MODELS,
API_TOOLS_INVOKE,
)
from .helpers import extract_text_recursive

_LOGGER = logging.getLogger(__name__)

Expand All @@ -36,6 +37,13 @@ class OpenClawAuthError(OpenClawApiError):
"""Authentication with OpenClaw gateway failed."""


class OpenClawStreamDelta(TypedDict, total=False):
"""Structured delta emitted by the OpenClaw streaming API."""

content: str
thinking_content: str


class OpenClawApiClient:
"""HTTP client for the OpenClaw gateway API.

Expand Down Expand Up @@ -273,7 +281,7 @@ async def async_stream_message(
) -> AsyncIterator[str]:
"""Send a chat message and stream the response via SSE.

Yields delta content strings as they arrive from the gateway.
Yields only assistant text deltas for backwards compatibility.

Args:
message: The user message text.
Expand All @@ -285,6 +293,27 @@ async def async_stream_message(
Yields:
Content delta strings from the streaming response.
"""
async for delta in self.async_stream_message_deltas(
message=message,
session_id=session_id,
model=model,
system_prompt=system_prompt,
agent_id=agent_id,
extra_headers=extra_headers,
):
if content := delta.get("content"):
yield content

async def async_stream_message_deltas(
self,
message: str,
session_id: str | None = None,
model: str | None = None,
system_prompt: str | None = None,
agent_id: str | None = None,
extra_headers: dict[str, str] | None = None,
) -> AsyncIterator[OpenClawStreamDelta]:
"""Send a chat message and stream structured deltas via SSE."""
messages: list[dict[str, str]] = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
Expand Down Expand Up @@ -334,12 +363,14 @@ async def async_stream_message(

try:
chunk = json.loads(data_str)
choices = chunk.get("choices", [])
if choices:
delta = choices[0].get("delta", {})
content = delta.get("content")
if content:
yield content
if delta := self._extract_stream_delta(chunk):
_LOGGER.debug(
"OpenClaw streaming delta received "
"(content_len=%s, thinking_len=%s)",
len(delta.get("content", "")),
len(delta.get("thinking_content", "")),
)
yield delta
except json.JSONDecodeError:
_LOGGER.debug("Skipping non-JSON SSE line: %s", data_str[:100])

Expand All @@ -348,6 +379,42 @@ async def async_stream_message(
f"Cannot connect to OpenClaw gateway: {err}"
) from err

@staticmethod
def _extract_stream_delta(chunk: dict[str, Any]) -> OpenClawStreamDelta | None:
"""Extract assistant text and reasoning deltas from a stream chunk."""
choices = chunk.get("choices", [])
if not choices:
return None

delta = choices[0].get("delta", {})
if not isinstance(delta, dict):
return None

stream_delta: OpenClawStreamDelta = {}

if content := delta.get("content"):
stream_delta["content"] = content

if thinking_content := OpenClawApiClient._extract_thinking_content(delta):
stream_delta["thinking_content"] = thinking_content

return stream_delta or None

@staticmethod
def _extract_thinking_content(delta: dict[str, Any]) -> str | None:
"""Extract reasoning text from provider-specific delta shapes."""
for key in (
"thinking_content",
"reasoning_content",
"thinking",
"reasoning",
):
extracted = extract_text_recursive(delta.get(key))
if extracted:
return extracted

return None

async def async_check_connection(self) -> bool:
"""Check if the gateway is reachable, API is enabled, and auth works.

Expand Down
Loading