A small client for Rockets using JSON-RPC as communication contract over a WebSocket.
You can install this package from PyPI:
pip install rocketsRockets provides to types of clients to support asychronous and synchronous usage.
The AsyncClient exposes all of its functionality as async functions, hence an asyncio
event loop is needed to complete pending
execution via await or run_until_complete().
For simplicity, a synchronous Client is provided which automagically executes in a synchronous,
blocking fashion.
Create a client and connect:
from rockets import Client
# client does not connect during __init__;
# either explicit or automatically on any notify/request/send
client = Client('myhost:8080')
client.connect()
print(client.connected())Close the connection with the socket cleanly:
from rockets import Client
client = Client('myhost:8080')
client.connect()
client.disconnect()
print(client.connected())Listen to server notifications:
from rockets import Client
client = Client('myhost:8080')
client.notifications.subscribe(lambda msg: print("Got message:", msg.data))NOTE: The notification object is of type Notification.
Listen to any server message:
from rockets import Client
client = Client('myhost:8080')
client.ws_observable.subscribe(lambda msg: print("Got message:", msg))Send notifications to the server:
from rockets import Client
client = Client('myhost:8080')
client.notify('mymethod', {'ping': True})Make a synchronous, blocking request:
from rockets import Client
client = Client('myhost:8080')
response = client.request('mymethod', {'ping': True})
print(response)Handle a request error:
from rockets import Client, RequestError
client = Client('myhost:8080')
try:
client.request('mymethod')
except RequestError as err:
print(err.code, err.message)NOTE: Any error that may occur will be a RequestError.
Make an asynchronous request, using the AsyncClient and asyncio:
import asyncio
from rockets import AsyncClient
client = AsyncClient('myhost:8080')
request_task = client.async_request('mymethod', {'ping': True})
asyncio.get_event_loop().run_until_complete(request_task)
print(request_task.result())Alternatively, you can use add_done_callback() from the returned RequestTask which is called
once the request has finished:
import asyncio
from rockets import AsyncClient
client = AsyncClient('myhost:8080')
request_task = client.async_request('mymethod', {'ping': True})
request_task.add_done_callback(lambda task: print(task.result()))
asyncio.get_event_loop().run_until_complete(request_task)If the RequestTask is not needed, i.e. no cancel() or add_progress_callback() is desired, use
the request() coroutine:
import asyncio
from rockets import AsyncClient
client = AsyncClient('myhost:8080')
coro = client.request('mymethod', {'ping': True})
result = asyncio.get_event_loop().run_until_complete(coro)
print(result)If you are already in an async function or in a Jupyter notebook cell, you may use await to
execute an asynchronous request:
# Inside a notebook cell here
import asyncio
from rockets import AsyncClient
client = AsyncClient('myhost:8080')
result = await client.request('mymethod', {'ping': True})
print(result)Cancel a request:
from rockets import AsyncClient
client = AsyncClient('myhost:8080')
request_task = client.async_request('mymethod')
request_task.cancel()Get progress updates for a request:
from rockets import AsyncClient
client = AsyncClient('myhost:8080')
request_task = client.async_request('mymethod')
request_task.add_progress_callback(lambda progress: print(progress))NOTE: The progress object is of type RequestProgress.
Make a batch request:
from rockets import Client, Request, Notification
client = Client('myhost:8080')
request = Request('myrequest')
notification = Notification('mynotify')
responses = client.batch([request, notification])
for response in responses:
print(response)Cancel a batch request:
from rockets import AsyncClient
client = AsyncClient('myhost:8080')
request = Request('myrequest')
notification = Notification('mynotify')
request_task = client.async_batch([request, notification])
request_task.cancel()