Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,37 @@ The `IAudioInference` class supports the following parameters:
- `duration`: Duration of the generated audio in seconds
- `includeCost`: Whether to include cost information in the response

### Text inference streaming

To stream text inference (e.g. LLM chat) over HTTP SSE, set `deliveryMethod="stream"`. The SDK yields content chunks (strings) and a final `IText` with usage and cost:

```python
import asyncio
from runware import Runware, ITextInference, ITextInferenceMessage

async def main() -> None:
runware = Runware(api_key=RUNWARE_API_KEY)
await runware.connect()

request = ITextInference(
model="runware:qwen3-thinking@1",
messages=[ITextInferenceMessage(role="user", content="Explain photosynthesis in one sentence.")],
deliveryMethod="stream",
includeCost=True,
)

stream = await runware.textInference(request)
async for chunk in stream:
if isinstance(chunk, str):
print(chunk, end="", flush=True)
else:
print(chunk)

asyncio.run(main())
```

Streaming uses the same concurrency limit as other requests (`RUNWARE_MAX_CONCURRENT_REQUESTS`). To allow longer streams, set `RUNWARE_TEXT_STREAM_TIMEOUT` (milliseconds; default 600000).

### Model Upload

To upload model using the Runware API, you can use the `uploadModel` method of the `Runware` class. Here are examples:
Expand Down Expand Up @@ -1068,6 +1099,9 @@ RUNWARE_AUDIO_INFERENCE_TIMEOUT=300000 # Audio generation (default: 5 min)
RUNWARE_AUDIO_POLLING_DELAY=1000 # Delay between status checks (default: 1 sec)
RUNWARE_MAX_POLLS_AUDIO_GENERATION=240 # Max polling attempts for audio inference (default: 240, ~4 min total)

# Text Operations (milliseconds)
RUNWARE_TEXT_STREAM_TIMEOUT=600000 # Text inference streaming (SSE) read timeout (default: 10 min)

# Other Operations (milliseconds)
RUNWARE_PROMPT_ENHANCE_TIMEOUT=60000 # Prompt enhancement (default: 1 min)
RUNWARE_WEBHOOK_TIMEOUT=30000 # Webhook acknowledgment (default: 30 sec)
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
aiofiles>=23.2.1
httpx>=0.27.0
python-dotenv>=1.0.1
websockets>=12.0
websockets>=12.0
145 changes: 133 additions & 12 deletions runware/base.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import asyncio
import inspect
import json
import logging
import os
import re
from asyncio import gather
from dataclasses import asdict, is_dataclass, fields
from enum import Enum
from random import uniform
from typing import List, Optional, Union, Callable, Any, Dict, Tuple
from typing import List, Optional, Union, Callable, Any, Dict, Tuple, AsyncIterator

import httpx
from websockets.protocol import State

from .logging_config import configure_logging
Expand Down Expand Up @@ -62,11 +64,13 @@
IUploadMediaRequest,
ITextInference,
IText,
ITextInputs,
)
from .types import IImage, IError, SdkType, ListenerType
from .utils import (
BASE_RUNWARE_URLS,
getUUID,
get_http_url_from_ws_url,
fileToBase64,
createImageFromResponse,
createEnhancedPromptsFromResponse,
Expand All @@ -84,6 +88,7 @@
createAsyncTaskResponse,
VIDEO_INITIAL_TIMEOUT,
TEXT_INITIAL_TIMEOUT,
TEXT_STREAM_READ_TIMEOUT,
VIDEO_POLLING_DELAY,
WEBHOOK_TIMEOUT,
IMAGE_INFERENCE_TIMEOUT,
Expand Down Expand Up @@ -2092,7 +2097,20 @@ async def _inference3d(self, request3d: I3dInference) -> Union[List[I3d], IAsync
await self.ensureConnection()
return await self._request3d(request3d)

async def textInference(self, requestText: ITextInference) -> Union[List[IText], IAsyncTaskResponse]:
async def textInference(
self, requestText: ITextInference
) -> Union[List[IText], IAsyncTaskResponse, AsyncIterator[Union[str, IText]]]:
delivery_method_enum = (
requestText.deliveryMethod
if isinstance(requestText.deliveryMethod, EDeliveryMethod)
else EDeliveryMethod(requestText.deliveryMethod)
)
if delivery_method_enum == EDeliveryMethod.STREAM:
async def stream_with_semaphore() -> AsyncIterator[Union[str, IText]]:
async with self._request_semaphore:
async for chunk in self._requestTextStream(requestText):
yield chunk
return stream_with_semaphore()
async with self._request_semaphore:
return await self._retry_async_with_reconnect(
self._requestText,
Expand Down Expand Up @@ -2281,26 +2299,129 @@ def _buildTextRequest(self, requestText: ITextInference) -> Dict[str, Any]:
"deliveryMethod": requestText.deliveryMethod,
"messages": [asdict(m) for m in requestText.messages],
}
if requestText.maxTokens is not None:
request_object["maxTokens"] = requestText.maxTokens
if requestText.temperature is not None:
request_object["temperature"] = requestText.temperature
if requestText.topP is not None:
request_object["topP"] = requestText.topP
if requestText.topK is not None:
request_object["topK"] = requestText.topK
if requestText.seed is not None:
request_object["seed"] = requestText.seed
if requestText.stopSequences is not None:
request_object["stopSequences"] = requestText.stopSequences
if requestText.includeCost is not None:
request_object["includeCost"] = requestText.includeCost
if requestText.includeUsage is not None:
request_object["includeUsage"] = requestText.includeUsage
if requestText.numberResults is not None:
request_object["numberResults"] = requestText.numberResults
self._addOptionalField(request_object, requestText.settings)
self._addOptionalField(request_object, requestText.inputs)
self._addProviderSettings(request_object, requestText)
return request_object

async def _message_from_http_status_error(self, exc: httpx.HTTPStatusError) -> str:
"""
Build a short, user-facing message from an HTTP error response.
Matches WebSocket auth errors where possible (e.g. invalid API key).
"""
resp = exc.response
try:
await resp.aread()
except Exception:
pass
status = resp.status_code
try:
data = resp.json()
if isinstance(data, dict):
msg = data.get("message")
if isinstance(msg, str) and msg.strip():
return msg.strip()
err = data.get("error")
if isinstance(err, dict):
inner = err.get("message")
if isinstance(inner, str) and inner.strip():
return inner.strip()
if isinstance(err, str) and err.strip():
return err.strip()
except Exception:
pass
if status == 401:
return "Invalid API key. Get one at https://my.runware.ai/signup"
return f"HTTP {status} error for {resp.request.url}"

async def _requestTextStream(
self, requestText: ITextInference
) -> AsyncIterator[Union[str, IText]]:
requestText.taskUUID = requestText.taskUUID or getUUID()
request_object = self._buildTextRequest(requestText)
body = [request_object]
http_url = get_http_url_from_ws_url(self._url or "")
headers = {
"Accept": "text/event-stream",
"Authorization": f"Bearer {self._apiKey}",
"Content-Type": "application/json",
}
accumulated_text = ""
try:
async with httpx.AsyncClient(timeout=TEXT_STREAM_READ_TIMEOUT / 1000) as client:
async with client.stream(
"POST",
http_url,
json=body,
headers=headers,
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if not line:
continue
payload = line.replace("data:", "", 1).strip()
if payload == "[DONE]":
return
try:
line_obj = json.loads(payload)
except json.JSONDecodeError:
continue
data = line_obj.get("data") or line_obj
if data.get("error") is not None:
raise RunwareAPIError(data["error"])

delta = data.get("delta") or {}
finishReason = data.get("finishReason")

content_chunk = delta.get("text")
if content_chunk:
accumulated_text += content_chunk
yield content_chunk
if finishReason is not None:
yield instantiateDataclass(
IText,
{
**data,
"taskType": data.get("taskType"),
"text": data.get("text") or accumulated_text,
"finishReason": finishReason,
},
)
return
except httpx.HTTPStatusError as e:
msg = await self._message_from_http_status_error(e)
if e.response.status_code == 401:
self._invalidAPIkey = msg
self._reconnection_manager.on_auth_failure()
raise ConnectionError(msg) from e
raise RunwareAPIError({"message": msg, "statusCode": e.response.status_code}) from e
except RunwareAPIError:
raise
except Exception as e:
raise RunwareAPIError({"message": str(e)}) from e

async def _requestText(self, requestText: ITextInference) -> Union[List[IText], IAsyncTaskResponse]:
await self.ensureConnection()
requestText.taskUUID = requestText.taskUUID or getUUID()


if requestText.inputs:
inputs = requestText.inputs
if isinstance(inputs, dict):
inputs = ITextInputs(**inputs)
requestText.inputs = inputs

if inputs.images:
inputs.images = await process_image(inputs.images)

request_object = self._buildTextRequest(requestText)

if requestText.webhookURL:
Expand Down
Loading