Skip to content

commoncurriculum/s2-elixir-client

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

63 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

S2 Elixir SDK

Elixir client for the S2 durable stream API. Protobuf data plane (append, read, check tail) over Mint HTTP/2 + JSON control plane (basins, streams, access tokens, metrics) over Req.

Installation

def deps do
  [
    {:s2_client, "~> 1.0"}
  ]
end

Example: Chat App

A chat app where each room gets its own stream. Messages are Ecto embedded schemas, durably ordered, and listeners tail the stream in real time.

alias MyApp.Chat
alias MyApp.Chat.Message

# Create rooms (each becomes its own S2 stream)
Chat.create_room("general")
Chat.create_room("random")

# Append typed messages
Chat.append("general", Message.new(user: "alice", text: "hey everyone!"))
Chat.append("general", Message.new(user: "bob", text: "hi alice!"))
Chat.append("random", Message.new(user: "alice", text: "anyone here?"))

# Listen to a room — tails the stream, calling your function for each message
{:ok, listener} = Chat.listen("general", fn %Message{} = msg ->
  IO.puts("[#{msg.ts}] #{msg.user}: #{msg.text}")
end)
# [2026-02-25T14:30:00Z] alice: hey everyone!
# [2026-02-25T14:30:01Z] bob: hi alice!
# ... stays open, prints new messages as they arrive

# Stop listening when done
Chat.stop_listener(listener)

# Listen for only new messages (skip history)
Chat.listen("general", fn msg -> IO.inspect(msg) end, from: :tail)

Here's the implementation. S2.Store manages connections, serialization, and session lifecycle — like Ecto.Repo for streams. It handles chunking, framing, and deduplication automatically (see Patterns), so you just work with your own types.

# config/config.exs
config :my_app, MyApp.S2,
  base_url: "https://aws.s2.dev",
  token: System.get_env("S2_TOKEN"),
  max_retries: :infinity, # reconnection attempts before giving up (default: :infinity)
  base_delay: 500,      # base delay in ms for exponential backoff (default: 500)
  max_queue_size: 1000   # pending appends per stream before {:error, :overloaded} (default: 1000)
# lib/my_app/s2.ex
defmodule MyApp.S2 do
  use S2.Store,
    otp_app: :my_app,
    basin: "my-app"
end
# lib/my_app/chat/message.ex
defmodule MyApp.Chat.Message do
  use Ecto.Schema
  import Ecto.Changeset

  @derive Jason.Encoder
  @primary_key false
  embedded_schema do
    field :user, :string
    field :text, :string
    field :ts, :utc_datetime
  end

  def new(attrs) do
    %__MODULE__{ts: DateTime.utc_now()}
    |> changeset(Map.new(attrs))
    |> apply_action!(:new)
  end

  def changeset(message \\ %__MODULE__{}, attrs) do
    message
    |> cast(attrs, [:user, :text, :ts])
    |> validate_required([:user, :text])
  end

  def serializer do
    %{
      serialize: &Jason.encode!/1,
      deserialize: fn json ->
        attrs = Jason.decode!(json)
        %__MODULE__{} |> cast(attrs, [:user, :text, :ts]) |> apply_action!(:load)
      end
    }
  end
end
# lib/my_app/application.ex
def start(_type, _args) do
  children = [
    MyApp.S2
  ]

  Supervisor.start_link(children, strategy: :one_for_one)
end
# lib/my_app/chat.ex
defmodule MyApp.Chat do
  use MyApp.S2, serializer: MyApp.Chat.Message.serializer()

  def create_room(room), do: create_stream("chat/#{room}")
end

Control Plane

All control plane functions take an opts keyword list with server: client (and basin: "name" where required).

First, create a client:

config = S2.Config.new(base_url: "https://aws.s2.dev", token: "my-token")
client = S2.Client.new(config)

Basins

{:ok, basins}  = S2.Basins.list_basins(server: client)
{:ok, basin}   = S2.Basins.create_basin(%S2.CreateBasinRequest{basin: "my-basin"}, server: client)
{:ok, config}  = S2.Basins.get_basin_config("my-basin", server: client)
:ok            = S2.Basins.delete_basin("my-basin", server: client)

Streams

{:ok, streams} = S2.Streams.list_streams(server: client, basin: "my-basin")
{:ok, stream}  = S2.Streams.create_stream(%S2.CreateStreamRequest{stream: "my-stream"}, server: client, basin: "my-basin")
{:ok, config}  = S2.Streams.get_stream_config("my-stream", server: client, basin: "my-basin")
:ok            = S2.Streams.delete_stream("my-stream", server: client, basin: "my-basin")

Access Tokens

Issue scoped, expiring tokens for clients. Useful for giving a browser read-only access to a specific stream (e.g. for real-time updates over SSE or a WebSocket bridge).

# Issue a read-only token scoped to a single basin
{:ok, resp} = S2.AccessTokens.issue_access_token(
  %S2.AccessTokenInfo{
    expires_at: DateTime.add(DateTime.utc_now(), 3600, :second),
    scope: %S2.AccessTokenScope{
      basins: %{"my-basin" => %{}},
      op_groups: %S2.PermittedOperationGroups{
        stream: %S2.ReadWritePermissions{read: true, write: false}
      }
    }
  },
  server: client
)

# resp.access_token is the bearer token string — send it to the client
# The client can then connect to the S2 data plane directly to read streams

{:ok, tokens} = S2.AccessTokens.list_access_tokens(server: client)
:ok           = S2.AccessTokens.revoke_access_token("token-id", server: client)

You can also scope tokens to specific operations via the ops field (e.g. ["read", "check-tail"]) or to specific streams via the streams field.

Metrics

{:ok, metrics} = S2.Metrics.account_metrics(server: client)
{:ok, metrics} = S2.Metrics.basin_metrics("my-basin", server: client)
{:ok, metrics} = S2.Metrics.stream_metrics("my-basin", "my-stream", server: client)

Data Plane

Data plane operations use S2S-framed protobuf over a persistent Mint HTTP/2 connection. All calls return an updated conn for connection reuse.

Single Request

{:ok, conn} = S2.S2S.Connection.open("https://aws.s2.dev")

# Append records
input = %S2.V1.AppendInput{records: [%S2.V1.AppendRecord{body: "hello"}]}
{:ok, ack, conn} = S2.S2S.Append.call(conn, "my-basin", "my-stream", input)

# Read records
{:ok, batch, conn} = S2.S2S.Read.call(conn, "my-basin", "my-stream", seq_num: 0)

# Check tail position
{:ok, position, conn} = S2.S2S.CheckTail.call(conn, "my-basin", "my-stream")

Streaming Append

{:ok, session} = S2.S2S.AppendSession.open(conn, "my-basin", "my-stream")

input = %S2.V1.AppendInput{records: [%S2.V1.AppendRecord{body: "event-1"}]}
{:ok, ack, session} = S2.S2S.AppendSession.append(session, input)

{:ok, session} = S2.S2S.AppendSession.close(session)

Streaming Read

{:ok, session} = S2.S2S.ReadSession.open(conn, "my-basin", "my-stream", seq_num: 0)

{:ok, batch, session} = S2.S2S.ReadSession.next_batch(session)
# batch.records contains the records

{:ok, session} = S2.S2S.ReadSession.close(session)

Read options: :seq_num, :count, :wait, :until, :clamp, :tail_offset.

Process Affinity

Streaming sessions are not safe to share across processes. The underlying Mint connection delivers TCP messages to the owning process's mailbox. Create and use sessions within the same process.

Patterns

S2.Store automatically handles all of this for you. You only need this section if you're using the data plane directly.

Under the hood, every append and listen call runs through a pipeline that handles chunking, framing, deduplication, and serialization — mirroring the TypeScript SDK patterns.

Step Write side Read side
1 Serialize term to binary Filter duplicate records
2 Chunk binary into sub-1 MiB pieces Reassemble chunks into complete message
3 Frame chunks with reassembly headers Deserialize binary back to term
4 Stamp with writer ID + dedupe sequence

If you're working with the data plane directly, you can use the patterns modules yourself:

alias S2.Patterns.Serialization

serializer = %{serialize: &Jason.encode!/1, deserialize: &Jason.decode!/1}

# Writing — serialize, chunk, frame, and stamp for dedup
writer = Serialization.writer()
{input, writer} = Serialization.prepare(writer, %{"event" => "signup"}, serializer)
{:ok, ack, conn} = S2.S2S.Append.call(conn, "my-basin", "my-stream", input)

# Reading — dedup, reassemble, and deserialize
reader = Serialization.reader()
{:ok, batch, conn} = S2.S2S.Read.call(conn, "my-basin", "my-stream", seq_num: 0)
{messages, reader} = Serialization.decode(reader, batch.records, serializer)

Architecture

How S2.Store works

When you call MyApp.S2.append("chat/general", message), here's what happens:

MyApp.S2 (Supervisor)
├── Registry          — maps stream names to worker pids
├── DynamicSupervisor — owns stream workers
│   ├── StreamWorker("chat/general")  — own connection + open AppendSession
│   ├── StreamWorker("chat/random")   — own connection + open AppendSession
│   └── ...started lazily on first append
├── ControlPlane      — shared JSON client for create/delete stream
└── listener Tasks    — each spawned with own connection + ReadSession
  • One process per stream. Each stream gets its own StreamWorker GenServer with a dedicated Mint HTTP/2 connection and a persistent AppendSession. Appends to different streams run in parallel.
  • Workers start lazily. The first append("chat/general", ...) starts a worker for that stream. Subsequent appends reuse the open session — no handshake overhead.
  • Listeners are independent. Each listen call spawns a Task with its own connection and ReadSession, tailing the stream and calling your callback as messages arrive. This is required because Mint delivers TCP data to the owning process's mailbox.
  • Control plane is shared. create_stream and delete_stream go through a single ControlPlane GenServer using the JSON/Req client. These are infrequent operations that don't need per-stream isolation.
  • Automatic reconnection with backoff. If a TCP connection drops, both appends and listeners reconnect transparently with exponential backoff (configurable via max_retries and base_delay). Append workers detect the failure on the next send, open a new connection and session, and retry — the caller just gets back {:ok, ack}. Listeners detect the drop when the read times out, then reconnect from the last successfully read sequence number so no messages are lost. The dedupe writer preserves its ID and sequence across reconnects, so if a message was written but the ack was lost, readers filter the duplicate automatically.
  • Backpressure. Each stream worker monitors its mailbox depth. If pending appends exceed max_queue_size, the worker returns {:error, :overloaded} immediately instead of buffering without bound.
  • Telemetry. All operations emit :telemetry events under the [:s2, :store, ...] prefix — append start/stop/exception, reconnect attempts, and listener connections. Attach your own handlers for metrics, logging, or alerting.

Protocol layers

Layer Transport Encoding Library
S2.Store Managed Managed
Control plane (basins, streams, tokens, metrics) HTTP/1.1 or 2 JSON Req
Data plane (append, read, check tail) HTTP/2 S2S-framed Protobuf Mint

S2.Store is the recommended way to use the SDK. The control and data plane modules below it are available if you need lower-level access.

Guarantees

From S2 (the server)

  • Durability. Appended records are persisted and survive restarts.
  • Ordering. Records within a stream are totally ordered by sequence number.
  • At-least-once delivery. Readers see every record at least once.

From this SDK

  • Automatic reconnection. If a TCP connection drops, appends reconnect and retry transparently with exponential backoff. Listeners reconnect from the last read sequence number — no messages are lost.
  • No duplicates. Each writer stamps records with a unique ID and monotonic sequence. If a message was written but the ack was lost, the retry produces a duplicate on the wire, but readers filter it out automatically.
  • Large message support. Messages over 1 MiB are chunked on write and reassembled on read. You don't need to think about record size limits.
  • Stream isolation. Each stream gets its own process and connection. A slow or failed stream doesn't block others.
  • Backpressure. Stream workers reject appends with {:error, :overloaded} when the mailbox exceeds max_queue_size, preventing unbounded memory growth.
  • Supervised workers. Stream workers are managed by a DynamicSupervisor. If a worker crashes, the supervisor restarts it and the next append picks up where it left off.
  • Validate on write, not on read. Serializers cast fields on read without validation, so schema changes don't break deserialization of old messages.

Testing

98.8% test coverage (with Toxiproxy network fault tests). The remaining uncovered lines are exhaustive pattern match arms that can't be triggered — see test/test_helper.exs for details.

Coverage threshold is set to 95% — CI fails if coverage drops below that.

License

MIT

About

Client for s2.dev

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages