Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions frameworks/zeemo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ Bare-metal Zig HTTP/1.1 server for the [HttpArena](https://www.http-arena.com/)
Supported HttpArena profiles:

- `baseline` — `GET/POST /baseline11?a=…&b=…`, returns the integer sum
- `pipelined` — `GET /pipeline`, returns `ok` (16 requests batched per connection)
- `limited-conn` — same `/baseline11` endpoint as baseline, connection closes after 10 requests
- `json` — `GET /json/{count}?m=…`, renders the first `count` items of `/data/dataset.json` with `total = price * quantity * m`

The io_uring loop drains all pipelined requests from one `recv()` and emits the responses as a single batched `send()`. JSON responses use a fixed-length header prefix (Content-Length padded to 5 digits with leading zeros) so every response starts at offset 0 of the per-connection write buffer — pipelined batches concatenate without a `memmove`.

## Build

```sh
Expand Down
2 changes: 2 additions & 0 deletions frameworks/zeemo/meta.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
"enabled": true,
"tests": [
"baseline",
"pipelined",
"limited-conn",
"json"
],
"maintainers": []
Expand Down
48 changes: 32 additions & 16 deletions frameworks/zeemo/src/handlers.zig
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,25 @@ pub const Response = struct {
pub fn handle(req: http.Request, ds: *const dataset.Dataset, out: []u8) Response {
if (matchPath(req.path, "/baseline11"))
return baseline11(req, out);
if (matchPath(req.path, "/pipeline"))
return pipelineHandler(req, out);
if (matchJsonPath(req.path)) |count| {
const m = parseMultiplier(req.query);
return jsonHandler(count, m, ds, out);
}
return notFound(out, req.close);
}

fn pipelineHandler(req: http.Request, out: []u8) Response {
const close_hdr: []const u8 = if (req.close) "Connection: close\r\n" else "";
const n = std.fmt.bufPrint(
out,
"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 2\r\n{s}\r\nok",
.{close_hdr},
) catch unreachable;
return .{ .bytes = n, .close = req.close };
}

fn matchPath(path: []const u8, p: []const u8) bool {
return std.mem.eql(u8, path, p);
}
Expand Down Expand Up @@ -87,13 +99,21 @@ fn parseIntLoose(s: []const u8) i64 {
}

fn jsonHandler(count: u8, m: u64, ds: *const dataset.Dataset, out: []u8) Response {
// Reserve a fixed prefix for headers so we render the body in place and
// then patch in Content-Length. Headers are written last after we know
// the body byte count.
const headers_reserve: usize = 128;
const body_start = headers_reserve;
var pos = body_start;

// Fixed-length header prefix: Content-Length is reserved at a known
// offset and padded to 5 digits with leading zeros after we know the
// body size. Leading zeros are RFC-compliant and let every response
// start at out[0], which lets drainAndSend batch them contiguously
// without a memmove.
const HDR_PREFIX = "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: ";
const CL_DIGITS = 5;
const HDR_TAIL = "\r\n\r\n";
const HEADERS_LEN = HDR_PREFIX.len + CL_DIGITS + HDR_TAIL.len;
const CL_OFFSET = HDR_PREFIX.len;

@memcpy(out[0..HDR_PREFIX.len], HDR_PREFIX);
@memcpy(out[CL_OFFSET + CL_DIGITS ..][0..HDR_TAIL.len], HDR_TAIL);

var pos: usize = HEADERS_LEN;
pos = appendStr(out, pos, "{\"items\":[");
var i: usize = 0;
while (i < count) : (i += 1) {
Expand All @@ -114,16 +134,12 @@ fn jsonHandler(count: u8, m: u64, ds: *const dataset.Dataset, out: []u8) Respons
out[pos] = '}';
pos += 1;

const body_len = pos - body_start;
const body_len = pos - HEADERS_LEN;
var cl_buf: [CL_DIGITS]u8 = undefined;
_ = std.fmt.bufPrint(&cl_buf, "{d:0>5}", .{body_len}) catch unreachable;
@memcpy(out[CL_OFFSET..][0..CL_DIGITS], &cl_buf);

// Now write headers backwards from body_start.
var hdr_buf: [128]u8 = undefined;
const hdr = std.fmt.bufPrint(&hdr_buf,
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {d}\r\n\r\n",
.{body_len}) catch unreachable;
const start = body_start - hdr.len;
@memcpy(out[start..body_start], hdr);
return .{ .bytes = out[start..pos], .close = false };
return .{ .bytes = out[0..pos], .close = false };
}

fn notFound(out: []u8, close: bool) Response {
Expand Down
20 changes: 20 additions & 0 deletions frameworks/zeemo/src/http.zig
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,26 @@ test "POST baseline11 chunked" {
try std.testing.expectEqualStrings("20", r.ready.body);
}

test "two pipelined requests in one recv" {
var p: Parser = .{};
const data = "GET /a HTTP/1.1\r\nHost: x\r\n\r\nGET /b HTTP/1.1\r\nHost: x\r\n\r\n";
@memcpy(p.recv_slot()[0..data.len], data);
const r1 = p.feed(@intCast(data.len));
try std.testing.expect(r1 == .ready);
try std.testing.expectEqualStrings("/a", r1.ready.path);

// After dispatching the first, caller resets at parser.consumed() and
// re-feeds zero bytes — the second request must already be ready.
p.reset(p.consumed());
try std.testing.expect(p.len > 0);
const r2 = p.feed(0);
try std.testing.expect(r2 == .ready);
try std.testing.expectEqualStrings("/b", r2.ready.path);

p.reset(p.consumed());
try std.testing.expectEqual(@as(u32, 0), p.len);
}

test "split request line" {
var p: Parser = .{};
const part1 = "GET /baseli";
Expand Down
115 changes: 81 additions & 34 deletions frameworks/zeemo/src/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,19 @@ const handlers = @import("handlers.zig");
const dataset = @import("dataset.zig");

const PORT: u16 = 8080;
const MAX_CONN = 1024;
/// Per-worker connection cap. The bench distributes 4096 connections across
/// N workers via SO_REUSEPORT (4-tuple hash), so per-worker mean is ≤ 64
/// with σ ≈ 8. 128 gives a healthy margin and roughly 4× less BSS than the
/// previous 1024 — the memory bonus in HttpArena's composite uses
/// `sqrt(rps)/memMB`, so even when rps stays flat the lower RSS bumps the score.
const MAX_CONN = 128;
const RING_ENTRIES = 4096;
const LISTEN_BACKLOG: u32 = 1024;
const WRITE_BUF_SIZE = 16 * 1024;
/// When `write_buf` accumulates more than this in a single drain pass we
/// flush before dispatching another request, leaving headroom for one
/// max-sized JSON response (~10.5 KiB).
const WRITE_FLUSH_AT: u32 = WRITE_BUF_SIZE - 12 * 1024;

const Op = enum(u8) {
accept = 1,
Expand Down Expand Up @@ -238,28 +247,7 @@ fn handleRecv(ring: *IoUring, cqe: *linux.io_uring_cqe) !void {
_ = try ring.close(ud(.close, slot_idx), slot.fd);
return;
}
const n: u32 = @intCast(cqe.res);
switch (slot.parser.feed(n)) {
.need_more => {
const buf = slot.parser.recv_slot();
if (buf.len == 0) {
// Buffer full but parser still wants more — close.
_ = try ring.close(ud(.close, slot_idx), slot.fd);
return;
}
_ = try ring.recv(ud(.recv, slot_idx), slot.fd, .{ .buffer = buf }, 0);
},
.protocol_error => {
_ = try ring.close(ud(.close, slot_idx), slot.fd);
},
.ready => |req| {
const resp = handlers.handle(req, &ds, &slot.write_buf);
slot.write_len = @intCast(resp.bytes.len);
slot.write_off = 0;
slot.close_after_send = resp.close;
_ = try ring.send(ud(.send, slot_idx), slot.fd, resp.bytes, linux.MSG.NOSIGNAL);
},
}
try drainAndSend(ring, slot_idx, @intCast(cqe.res));
}

fn handleSend(ring: *IoUring, cqe: *linux.io_uring_cqe) !void {
Expand All @@ -272,11 +260,7 @@ fn handleSend(ring: *IoUring, cqe: *linux.io_uring_cqe) !void {
const n: u32 = @intCast(cqe.res);
slot.write_off += n;
if (slot.write_off < slot.write_len) {
// Partial send — submit a fresh send for the unsent tail. We track
// slot.write_len as the absolute byte count of the rendered response
// starting at write_buf[write_send_start]; for the MVP handlers
// always render starting at offset 0 within slot.write_buf so the
// remainder lives at [write_off..write_len].
// Partial send — submit a fresh send for the unsent tail.
const remaining = slot.write_buf[slot.write_off..slot.write_len];
_ = try ring.send(ud(.send, slot_idx), slot.fd, remaining, linux.MSG.NOSIGNAL);
return;
Expand All @@ -285,10 +269,73 @@ fn handleSend(ring: *IoUring, cqe: *linux.io_uring_cqe) !void {
_ = try ring.close(ud(.close, slot_idx), slot.fd);
return;
}
// Keep-alive: shift pipelined leftover to front of recv buffer and arm
// recv for the next request.
const next_offset = slot.parser.consumed();
slot.parser.reset(next_offset);
const buf = slot.parser.recv_slot();
_ = try ring.recv(ud(.recv, slot_idx), slot.fd, .{ .buffer = buf }, 0);
// Keep-alive: drain any further pipelined requests already buffered.
try drainAndSend(ring, slot_idx, 0);
}

/// Drain as many complete requests as fit in `write_buf`, then submit one
/// send for the whole batch. If no requests are ready, arm a recv instead.
/// `initial_feed_n` is the byte count just delivered by recv (0 when called
/// from handleSend completion).
///
/// HTTP/1.1 pipelining sends N requests back-to-back without waiting for
/// responses, so the first recv often delivers more than one full request.
/// Without this drain loop the previous main.zig would dispatch only the
/// first, submit recv, and block — by then the client is already waiting
/// for responses 2..N and we'd deadlock until the recv timeout. Batching
/// also amortizes one send syscall across the whole burst.
fn drainAndSend(ring: *IoUring, slot_idx: u32, initial_feed_n: u32) !void {
const slot = &slots[slot_idx];
var write_pos: u32 = 0;
var feed_n = initial_feed_n;

while (true) {
const result = slot.parser.feed(feed_n);
feed_n = 0;
switch (result) {
.protocol_error => {
if (write_pos > 0) {
try submitSend(ring, slot_idx, write_pos, true);
} else {
_ = try ring.close(ud(.close, slot_idx), slot.fd);
}
return;
},
.need_more => {
if (write_pos > 0) {
try submitSend(ring, slot_idx, write_pos, false);
return;
}
const buf = slot.parser.recv_slot();
if (buf.len == 0) {
_ = try ring.close(ud(.close, slot_idx), slot.fd);
return;
}
_ = try ring.recv(ud(.recv, slot_idx), slot.fd, .{ .buffer = buf }, 0);
return;
},
.ready => |req| {
const resp = handlers.handle(req, &ds, slot.write_buf[write_pos..]);
write_pos += @intCast(resp.bytes.len);
slot.parser.reset(slot.parser.consumed());
if (resp.close) {
try submitSend(ring, slot_idx, write_pos, true);
return;
}
if (write_pos > WRITE_FLUSH_AT) {
try submitSend(ring, slot_idx, write_pos, false);
return;
}
// Loop: feed(0) to see if the next pipelined request is here.
},
}
}
}

fn submitSend(ring: *IoUring, slot_idx: u32, len: u32, close_after: bool) !void {
const slot = &slots[slot_idx];
slot.write_len = len;
slot.write_off = 0;
slot.close_after_send = close_after;
_ = try ring.send(ud(.send, slot_idx), slot.fd, slot.write_buf[0..len], linux.MSG.NOSIGNAL);
}
16 changes: 8 additions & 8 deletions site/data/baseline-4096.json
Original file line number Diff line number Diff line change
Expand Up @@ -1326,19 +1326,19 @@
{
"framework": "zeemo",
"language": "Zig",
"rps": 4422489,
"avg_latency": "925us",
"p99_latency": "1.22ms",
"cpu": "6410.6%",
"memory": "182MiB",
"rps": 4428334,
"avg_latency": "924us",
"p99_latency": "1.24ms",
"cpu": "6404.3%",
"memory": "184MiB",
"connections": 4096,
"threads": 64,
"duration": "5s",
"pipeline": 1,
"bandwidth": "278.29MB/s",
"input_bw": "341.63MB/s",
"bandwidth": "278.60MB/s",
"input_bw": "342.08MB/s",
"reconnects": 0,
"status_2xx": 22112448,
"status_2xx": 22141672,
"status_3xx": 0,
"status_4xx": 0,
"status_5xx": 0
Expand Down
12 changes: 6 additions & 6 deletions site/data/baseline-512.json
Original file line number Diff line number Diff line change
Expand Up @@ -1326,19 +1326,19 @@
{
"framework": "zeemo",
"language": "Zig",
"rps": 4096391,
"rps": 4107322,
"avg_latency": "124us",
"p99_latency": "229us",
"cpu": "6259.7%",
"p99_latency": "244us",
"cpu": "6336.8%",
"memory": "75MiB",
"connections": 512,
"threads": 64,
"duration": "5s",
"pipeline": 1,
"bandwidth": "257.77MB/s",
"input_bw": "316.44MB/s",
"bandwidth": "258.45MB/s",
"input_bw": "317.28MB/s",
"reconnects": 0,
"status_2xx": 20481958,
"status_2xx": 20536614,
"status_3xx": 0,
"status_4xx": 0,
"status_5xx": 0
Expand Down
18 changes: 9 additions & 9 deletions site/data/json-4096.json
Original file line number Diff line number Diff line change
Expand Up @@ -1054,19 +1054,19 @@
{
"framework": "zeemo",
"language": "Zig",
"rps": 2389288,
"avg_latency": "686us",
"p99_latency": "2.94ms",
"cpu": "5854.7%",
"memory": "289MiB",
"rps": 2352168,
"avg_latency": "681us",
"p99_latency": "2.70ms",
"cpu": "6385.8%",
"memory": "258MiB",
"connections": 4096,
"threads": 64,
"duration": "5s",
"pipeline": 1,
"bandwidth": "8.00GB/s",
"input_bw": "113.93MB/s",
"reconnects": 477021,
"status_2xx": 11946443,
"bandwidth": "7.87GB/s",
"input_bw": "112.16MB/s",
"reconnects": 473053,
"status_2xx": 11760843,
"status_3xx": 0,
"status_4xx": 0,
"status_5xx": 0
Expand Down
20 changes: 20 additions & 0 deletions site/data/limited-conn-4096.json
Original file line number Diff line number Diff line change
Expand Up @@ -1322,5 +1322,25 @@
"status_3xx": 0,
"status_4xx": 0,
"status_5xx": 0
},
{
"framework": "zeemo",
"language": "Zig",
"rps": 2606729,
"avg_latency": "1.42ms",
"p99_latency": "1.98ms",
"cpu": "5592.4%",
"memory": "254MiB",
"connections": 4096,
"threads": 64,
"duration": "5s",
"pipeline": 1,
"bandwidth": "164.01MB/s",
"input_bw": "201.36MB/s",
"reconnects": 1302981,
"status_2xx": 13033649,
"status_3xx": 0,
"status_4xx": 0,
"status_5xx": 0
}
]
Loading