Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 124 additions & 34 deletions frameworks/zeemo/src/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,23 @@ const PORT: u16 = 8080;
const MAX_CONN = 128;
const RING_ENTRIES = 4096;
const LISTEN_BACKLOG: u32 = 1024;
const WRITE_BUF_SIZE = 16 * 1024;
/// When `write_buf` accumulates more than this in a single drain pass we
/// flush before dispatching another request, leaving headroom for one
/// max-sized JSON response (~10.5 KiB).
const WRITE_FLUSH_AT: u32 = WRITE_BUF_SIZE - 12 * 1024;

/// Per-connection inline write buffer. Sized for the small-response
/// profiles (baseline, pipelined, limited-conn) — typical response is
/// ~80 B, so 4 KiB fits a 50-deep pipeline batch with headroom. JSON
/// responses don't fit and use the big-buf pool below.
const WRITE_INLINE = 4 * 1024;
/// When accumulated inline bytes exceed this in a single drain pass we
/// flush before dispatching another request, leaving room for one more
/// pipelined response (~80 B nominally, capped at ~200 B for safety).
const WRITE_INLINE_FLUSH_AT: u32 = WRITE_INLINE - 256;

/// Worker-local big-buffer pool used exclusively for JSON responses.
/// 16 KiB per slot is enough for /json/50?m=N (~10.5 KiB body plus 74 B
/// header). Pool slots are BSS — never touched, never resident — until
/// JSON traffic actually arrives, then released back on connection close.
const BIG_BUF_SIZE = 16 * 1024;
const BIG_POOL_SIZE = MAX_CONN;

const Op = enum(u8) {
accept = 1,
Expand All @@ -44,15 +56,45 @@ const Slot = struct {
fd: linux.fd_t = -1,
in_use: bool = false,
parser: http.Parser = .{},
write_buf: [WRITE_BUF_SIZE]u8 = undefined,
write_len: u32 = 0,
write_off: u32 = 0,
/// Inline buffer used for small responses (baseline / pipelined /
/// limited-conn). Pipelined batches concatenate here.
write_inline: [WRITE_INLINE]u8 = undefined,
/// Index into `big_pool` if this connection has been promoted to a
/// large buffer (after seeing a /json/ request). Kept across requests
/// on the same connection; released to the pool on close.
big_idx: ?u32 = null,
/// Pointer + length of whatever buffer the in-flight send is reading
/// from. Either &write_inline[0] or &big_pool[big_idx][0].
send_ptr: [*]const u8 = undefined,
send_len: u32 = 0,
send_off: u32 = 0,
close_after_send: bool = false,
};

var slots: [MAX_CONN]Slot = undefined;
var ds: dataset.Dataset = undefined;

/// Per-worker pool of 16 KiB JSON response buffers. Static BSS — pages
/// stay zero-fill until JSON traffic touches them, so the baseline
/// profile pays zero RSS for this pool.
var big_pool: [BIG_POOL_SIZE][BIG_BUF_SIZE]u8 = undefined;
var big_used: [BIG_POOL_SIZE]bool = undefined;

fn bigAcquire() ?u32 {
var i: u32 = 0;
while (i < BIG_POOL_SIZE) : (i += 1) {
if (!big_used[i]) {
big_used[i] = true;
return i;
}
}
return null;
}

fn bigRelease(idx: u32) void {
big_used[idx] = false;
}

fn allocSlot() ?u32 {
var i: u32 = 0;
while (i < MAX_CONN) : (i += 1) {
Expand All @@ -66,6 +108,8 @@ fn allocSlot() ?u32 {
}

fn freeSlot(idx: u32) void {
if (slots[idx].big_idx) |b| bigRelease(b);
slots[idx].big_idx = null;
slots[idx].in_use = false;
slots[idx].fd = -1;
}
Expand Down Expand Up @@ -258,11 +302,12 @@ fn handleSend(ring: *IoUring, cqe: *linux.io_uring_cqe) !void {
return;
}
const n: u32 = @intCast(cqe.res);
slot.write_off += n;
if (slot.write_off < slot.write_len) {
// Partial send — submit a fresh send for the unsent tail.
const remaining = slot.write_buf[slot.write_off..slot.write_len];
_ = try ring.send(ud(.send, slot_idx), slot.fd, remaining, linux.MSG.NOSIGNAL);
slot.send_off += n;
if (slot.send_off < slot.send_len) {
// Partial send — replay from wherever the active buffer is
// (inline or big), tracked by send_ptr.
const tail = slot.send_ptr[slot.send_off..slot.send_len];
_ = try ring.send(ud(.send, slot_idx), slot.fd, tail, linux.MSG.NOSIGNAL);
return;
}
if (slot.close_after_send) {
Expand All @@ -273,37 +318,39 @@ fn handleSend(ring: *IoUring, cqe: *linux.io_uring_cqe) !void {
try drainAndSend(ring, slot_idx, 0);
}

/// Drain as many complete requests as fit in `write_buf`, then submit one
/// send for the whole batch. If no requests are ready, arm a recv instead.
/// Drain as many complete requests as fit in the inline write buffer,
/// then submit one send for the whole batch. JSON requests bypass the
/// inline buffer and use the per-connection big_buf instead (pool-backed).
/// If no requests are ready, arm a recv instead.
///
/// `initial_feed_n` is the byte count just delivered by recv (0 when called
/// from handleSend completion).
///
/// HTTP/1.1 pipelining sends N requests back-to-back without waiting for
/// responses, so the first recv often delivers more than one full request.
/// Without this drain loop the previous main.zig would dispatch only the
/// first, submit recv, and block — by then the client is already waiting
/// for responses 2..N and we'd deadlock until the recv timeout. Batching
/// also amortizes one send syscall across the whole burst.
/// We dispatch all complete requests buffered before submitting one batched
/// send — saves N-1 syscalls and avoids a deadlock that the older
/// "recv → dispatch one → send → recv" code path hit on pipelined input.
fn drainAndSend(ring: *IoUring, slot_idx: u32, initial_feed_n: u32) !void {
const slot = &slots[slot_idx];
var write_pos: u32 = 0;
var inline_pos: u32 = 0;
var feed_n = initial_feed_n;

while (true) {
const result = slot.parser.feed(feed_n);
feed_n = 0;
switch (result) {
.protocol_error => {
if (write_pos > 0) {
try submitSend(ring, slot_idx, write_pos, true);
if (inline_pos > 0) {
submitInline(ring, slot_idx, inline_pos, true) catch {};
} else {
_ = try ring.close(ud(.close, slot_idx), slot.fd);
}
return;
},
.need_more => {
if (write_pos > 0) {
try submitSend(ring, slot_idx, write_pos, false);
if (inline_pos > 0) {
try submitInline(ring, slot_idx, inline_pos, false);
return;
}
const buf = slot.parser.recv_slot();
Expand All @@ -315,27 +362,70 @@ fn drainAndSend(ring: *IoUring, slot_idx: u32, initial_feed_n: u32) !void {
return;
},
.ready => |req| {
const resp = handlers.handle(req, &ds, slot.write_buf[write_pos..]);
write_pos += @intCast(resp.bytes.len);
const needs_big = std.mem.startsWith(u8, req.path, "/json/");
if (needs_big) {
// JSON responses don't fit in the 4 KiB inline buffer
// and can't be batched with other responses. Flush any
// queued inline bytes first, then dispatch JSON into
// big_buf and submit it as its own send.
if (inline_pos > 0) {
// Defer JSON to the next drain pass after the
// inline batch flushes — leave the parser pointing
// at this request by NOT resetting yet. handleSend
// completion will call drainAndSend(0) and feed(0)
// will re-yield this same request.
try submitInline(ring, slot_idx, inline_pos, false);
return;
}
if (slot.big_idx == null) {
slot.big_idx = bigAcquire() orelse {
// Pool exhausted — refuse this connection.
_ = try ring.close(ud(.close, slot_idx), slot.fd);
return;
};
}
const big = bigSlice(slot.big_idx.?);
const resp = handlers.handle(req, &ds, big);
slot.parser.reset(slot.parser.consumed());
try submitBig(ring, slot_idx, @intCast(resp.bytes.len), resp.close);
return;
}
const resp = handlers.handle(req, &ds, slot.write_inline[inline_pos..]);
inline_pos += @intCast(resp.bytes.len);
slot.parser.reset(slot.parser.consumed());
if (resp.close) {
try submitSend(ring, slot_idx, write_pos, true);
try submitInline(ring, slot_idx, inline_pos, true);
return;
}
if (write_pos > WRITE_FLUSH_AT) {
try submitSend(ring, slot_idx, write_pos, false);
if (inline_pos > WRITE_INLINE_FLUSH_AT) {
try submitInline(ring, slot_idx, inline_pos, false);
return;
}
// Loop: feed(0) to see if the next pipelined request is here.
// Loop: feed(0) for the next pipelined request.
},
}
}
}

fn submitSend(ring: *IoUring, slot_idx: u32, len: u32, close_after: bool) !void {
fn bigSlice(idx: u32) []u8 {
return &big_pool[idx];
}

fn submitInline(ring: *IoUring, slot_idx: u32, len: u32, close_after: bool) !void {
const slot = &slots[slot_idx];
slot.send_ptr = &slot.write_inline;
slot.send_len = len;
slot.send_off = 0;
slot.close_after_send = close_after;
_ = try ring.send(ud(.send, slot_idx), slot.fd, slot.write_inline[0..len], linux.MSG.NOSIGNAL);
}

fn submitBig(ring: *IoUring, slot_idx: u32, len: u32, close_after: bool) !void {
const slot = &slots[slot_idx];
slot.write_len = len;
slot.write_off = 0;
const buf = bigSlice(slot.big_idx.?);
slot.send_ptr = buf.ptr;
slot.send_len = len;
slot.send_off = 0;
slot.close_after_send = close_after;
_ = try ring.send(ud(.send, slot_idx), slot.fd, slot.write_buf[0..len], linux.MSG.NOSIGNAL);
_ = try ring.send(ud(.send, slot_idx), slot.fd, buf[0..len], linux.MSG.NOSIGNAL);
}
16 changes: 8 additions & 8 deletions site/data/baseline-4096.json
Original file line number Diff line number Diff line change
Expand Up @@ -1326,19 +1326,19 @@
{
"framework": "zeemo",
"language": "Zig",
"rps": 4428334,
"avg_latency": "924us",
"p99_latency": "1.24ms",
"cpu": "6404.3%",
"memory": "184MiB",
"rps": 4435413,
"avg_latency": "923us",
"p99_latency": "1.22ms",
"cpu": "6406.3%",
"memory": "130MiB",
"connections": 4096,
"threads": 64,
"duration": "5s",
"pipeline": 1,
"bandwidth": "278.60MB/s",
"input_bw": "342.08MB/s",
"bandwidth": "279.07MB/s",
"input_bw": "342.63MB/s",
"reconnects": 0,
"status_2xx": 22141672,
"status_2xx": 22177068,
"status_3xx": 0,
"status_4xx": 0,
"status_5xx": 0
Expand Down
16 changes: 8 additions & 8 deletions site/data/baseline-512.json
Original file line number Diff line number Diff line change
Expand Up @@ -1326,19 +1326,19 @@
{
"framework": "zeemo",
"language": "Zig",
"rps": 4107322,
"avg_latency": "124us",
"p99_latency": "244us",
"cpu": "6336.8%",
"memory": "75MiB",
"rps": 4120722,
"avg_latency": "123us",
"p99_latency": "238us",
"cpu": "6267.1%",
"memory": "69MiB",
"connections": 512,
"threads": 64,
"duration": "5s",
"pipeline": 1,
"bandwidth": "258.45MB/s",
"input_bw": "317.28MB/s",
"bandwidth": "259.30MB/s",
"input_bw": "318.32MB/s",
"reconnects": 0,
"status_2xx": 20536614,
"status_2xx": 20603611,
"status_3xx": 0,
"status_4xx": 0,
"status_5xx": 0
Expand Down
18 changes: 9 additions & 9 deletions site/data/json-4096.json
Original file line number Diff line number Diff line change
Expand Up @@ -1054,19 +1054,19 @@
{
"framework": "zeemo",
"language": "Zig",
"rps": 2352168,
"avg_latency": "681us",
"p99_latency": "2.70ms",
"cpu": "6385.8%",
"memory": "258MiB",
"rps": 2374963,
"avg_latency": "704us",
"p99_latency": "2.74ms",
"cpu": "6403.3%",
"memory": "257MiB",
"connections": 4096,
"threads": 64,
"duration": "5s",
"pipeline": 1,
"bandwidth": "7.87GB/s",
"input_bw": "112.16MB/s",
"reconnects": 473053,
"status_2xx": 11760843,
"bandwidth": "7.95GB/s",
"input_bw": "113.25MB/s",
"reconnects": 477786,
"status_2xx": 11874819,
"status_3xx": 0,
"status_4xx": 0,
"status_5xx": 0
Expand Down
16 changes: 8 additions & 8 deletions site/data/limited-conn-4096.json
Original file line number Diff line number Diff line change
Expand Up @@ -1326,19 +1326,19 @@
{
"framework": "zeemo",
"language": "Zig",
"rps": 2606729,
"rps": 2617676,
"avg_latency": "1.42ms",
"p99_latency": "1.98ms",
"cpu": "5592.4%",
"memory": "254MiB",
"p99_latency": "1.96ms",
"cpu": "5554.3%",
"memory": "178MiB",
"connections": 4096,
"threads": 64,
"duration": "5s",
"pipeline": 1,
"bandwidth": "164.01MB/s",
"input_bw": "201.36MB/s",
"reconnects": 1302981,
"status_2xx": 13033649,
"bandwidth": "164.69MB/s",
"input_bw": "202.21MB/s",
"reconnects": 1308298,
"status_2xx": 13088383,
"status_3xx": 0,
"status_4xx": 0,
"status_5xx": 0
Expand Down
16 changes: 8 additions & 8 deletions site/data/limited-conn-512.json
Original file line number Diff line number Diff line change
Expand Up @@ -1326,19 +1326,19 @@
{
"framework": "zeemo",
"language": "Zig",
"rps": 2628568,
"rps": 2635070,
"avg_latency": "177us",
"p99_latency": "403us",
"cpu": "5497.0%",
"memory": "114MiB",
"p99_latency": "400us",
"cpu": "5467.8%",
"memory": "88MiB",
"connections": 512,
"threads": 64,
"duration": "5s",
"pipeline": 1,
"bandwidth": "165.41MB/s",
"input_bw": "203.05MB/s",
"reconnects": 1314288,
"status_2xx": 13142843,
"bandwidth": "165.79MB/s",
"input_bw": "203.55MB/s",
"reconnects": 1317523,
"status_2xx": 13175351,
"status_3xx": 0,
"status_4xx": 0,
"status_5xx": 0
Expand Down
Loading