Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions frameworks/zeemo/src/http.zig
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,15 @@ pub const Parser = struct {
/// has advanced. Set to headers_end when chunked mode is detected.
chunk_pos: u32 = 0,

pub const BUF_SIZE = 4096;
pub const BODY_MAX = 4096;
/// Header-accumulation buffer. Sized to fit the largest pipelined
/// burst the bench profiles emit: 16 requests × ~80 B headers = ~1.3 KB
/// for the `pipelined` profile, comfortably below 2 KiB. Validation's
/// fragmentation tests stay tiny too.
pub const BUF_SIZE = 2048;
/// Body buffer. Validation sends ≤ 4-byte bodies; gcannon's baseline
/// POSTs are short integers. 512 B is well above realistic load while
/// staying ~8× leaner than the old 4 KiB.
pub const BODY_MAX = 512;

const ChunkState = enum { size, size_cr, data, data_cr, data_lf, trailer_cr, trailer_lf, done };

Expand Down
165 changes: 85 additions & 80 deletions frameworks/zeemo/src/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ const handlers = @import("handlers.zig");
const dataset = @import("dataset.zig");

const PORT: u16 = 8080;
/// Per-worker connection cap. The bench distributes 4096 connections across
/// N workers via SO_REUSEPORT (4-tuple hash), so per-worker mean is ≤ 64
/// with σ ≈ 8. 128 gives a healthy margin and roughly 4× less BSS than the
/// previous 1024 — the memory bonus in HttpArena's composite uses
/// `sqrt(rps)/memMB`, so even when rps stays flat the lower RSS bumps the score.
const MAX_CONN = 128;
/// Highest accepted fd. Reserves an O(1) lookup table; Linux assigns
/// fresh fds starting from the lowest free slot so per-worker fds stay
/// well below this even under churn. 4096 covers HttpArena's entire
/// connection budget per worker with comfortable headroom.
const MAX_FD: usize = 4096;
/// Max concurrent JSON-promoted slots per worker (bounds the big_pool).
const MAX_JSON_CONN = 128;
const RING_ENTRIES = 4096;
const LISTEN_BACKLOG: u32 = 1024;

Expand All @@ -32,7 +33,7 @@ const WRITE_INLINE_FLUSH_AT: u32 = WRITE_INLINE - 256;
/// header). Pool slots are BSS — never touched, never resident — until
/// JSON traffic actually arrives, then released back on connection close.
const BIG_BUF_SIZE = 16 * 1024;
const BIG_POOL_SIZE = MAX_CONN;
const BIG_POOL_SIZE = MAX_JSON_CONN;

const Op = enum(u8) {
accept = 1,
Expand All @@ -41,27 +42,28 @@ const Op = enum(u8) {
close = 4,
};

/// user_data = (op << 56) | slot_idx
inline fn ud(op: Op, slot: u32) u64 {
return (@as(u64, @intFromEnum(op)) << 56) | @as(u64, slot);
/// user_data = (op << 32) | fd. The fd both identifies the operation and
/// is the lookup key into `slot_table`. Linux gives us back the lowest
/// free fd at each accept so the table stays sparse in the low range.
inline fn ud(op: Op, fd: linux.fd_t) u64 {
return (@as(u64, @intFromEnum(op)) << 32) | @as(u64, @as(u32, @bitCast(fd)));
}
inline fn udOp(u: u64) Op {
return @enumFromInt(@as(u8, @intCast(u >> 56)));
return @enumFromInt(@as(u8, @intCast(u >> 32)));
}
inline fn udSlot(u: u64) u32 {
return @intCast(u & 0x00FFFFFFFFFFFFFF);
inline fn udFd(u: u64) linux.fd_t {
return @as(linux.fd_t, @bitCast(@as(u32, @truncate(u))));
}

const Slot = struct {
fd: linux.fd_t = -1,
in_use: bool = false,
fd: linux.fd_t,
parser: http.Parser = .{},
/// Inline buffer used for small responses (baseline / pipelined /
/// Inline buffer for small responses (baseline / pipelined /
/// limited-conn). Pipelined batches concatenate here.
write_inline: [WRITE_INLINE]u8 = undefined,
/// Index into `big_pool` if this connection has been promoted to a
/// large buffer (after seeing a /json/ request). Kept across requests
/// on the same connection; released to the pool on close.
/// large buffer (first /json/ request on the conn). Kept across
/// requests on the same connection; released to the pool on close.
big_idx: ?u32 = null,
/// Pointer + length of whatever buffer the in-flight send is reading
/// from. Either &write_inline[0] or &big_pool[big_idx][0].
Expand All @@ -71,7 +73,17 @@ const Slot = struct {
close_after_send: bool = false,
};

var slots: [MAX_CONN]Slot = undefined;
/// fd-indexed pointer table — null for free fds, *Slot once accepted.
/// 4096 × 8 B = 32 KiB BSS, almost entirely zero-fill (kernel touches
/// only the cache lines holding active fds, ~1 page resident in practice).
var slot_table: [MAX_FD]?*Slot = .{null} ** MAX_FD;

/// Per-worker page-backed allocator. `page_allocator` returns pages via
/// mmap and releases them via munmap on free, so freed Slots actually
/// give memory back to the kernel — important for the limited-conn churn
/// pattern where many connections cycle through.
const slot_allocator = std.heap.page_allocator;

var ds: dataset.Dataset = undefined;

/// Per-worker pool of 16 KiB JSON response buffers. Static BSS — pages
Expand All @@ -95,23 +107,29 @@ fn bigRelease(idx: u32) void {
big_used[idx] = false;
}

fn allocSlot() ?u32 {
var i: u32 = 0;
while (i < MAX_CONN) : (i += 1) {
if (!slots[i].in_use) {
slots[i] = .{};
slots[i].in_use = true;
return i;
}
}
return null;
fn getSlot(fd: linux.fd_t) ?*Slot {
const fdu: usize = @intCast(fd);
if (fdu >= MAX_FD) return null;
return slot_table[fdu];
}

fn freeSlot(idx: u32) void {
if (slots[idx].big_idx) |b| bigRelease(b);
slots[idx].big_idx = null;
slots[idx].in_use = false;
slots[idx].fd = -1;
fn allocSlotFor(fd: linux.fd_t) ?*Slot {
const fdu: usize = @intCast(fd);
if (fdu >= MAX_FD) return null;
const slot = slot_allocator.create(Slot) catch return null;
slot.* = .{ .fd = fd };
slot_table[fdu] = slot;
return slot;
}

fn freeSlotFor(fd: linux.fd_t) void {
const fdu: usize = @intCast(fd);
if (fdu >= MAX_FD) return;
if (slot_table[fdu]) |slot| {
if (slot.big_idx) |b| bigRelease(b);
slot_allocator.destroy(slot);
slot_table[fdu] = null;
}
}

pub fn main() !void {
Expand Down Expand Up @@ -260,7 +278,7 @@ fn handleCqe(ring: *IoUring, listen_fd: linux.fd_t, cqe: *linux.io_uring_cqe) !v
.accept => try handleAccept(ring, listen_fd, cqe),
.recv => try handleRecv(ring, cqe),
.send => try handleSend(ring, cqe),
.close => freeSlot(udSlot(cqe.user_data)),
.close => freeSlotFor(udFd(cqe.user_data)),
}
}

Expand All @@ -271,51 +289,48 @@ fn handleAccept(ring: *IoUring, listen_fd: linux.fd_t, cqe: *linux.io_uring_cqe)
return;
}
const fd: linux.fd_t = @intCast(cqe.res);
const slot_idx = allocSlot() orelse {
const slot = allocSlotFor(fd) orelse {
_ = linux.close(fd);
if (!more) _ = try ring.accept_multishot(ud(.accept, 0), listen_fd, null, null, 0);
return;
};
slots[slot_idx].fd = fd;
const buf = slots[slot_idx].parser.recv_slot();
_ = try ring.recv(ud(.recv, slot_idx), fd, .{ .buffer = buf }, 0);
const buf = slot.parser.recv_slot();
_ = try ring.recv(ud(.recv, fd), fd, .{ .buffer = buf }, 0);

// If multishot fell off, re-arm.
if (!more) _ = try ring.accept_multishot(ud(.accept, 0), listen_fd, null, null, 0);
}

fn handleRecv(ring: *IoUring, cqe: *linux.io_uring_cqe) !void {
const slot_idx = udSlot(cqe.user_data);
const slot = &slots[slot_idx];
const fd = udFd(cqe.user_data);
const slot = getSlot(fd) orelse return; // already freed — race with close CQE
if (cqe.res <= 0) {
_ = try ring.close(ud(.close, slot_idx), slot.fd);
_ = try ring.close(ud(.close, fd), fd);
return;
}
try drainAndSend(ring, slot_idx, @intCast(cqe.res));
try drainAndSend(ring, slot, @intCast(cqe.res));
}

fn handleSend(ring: *IoUring, cqe: *linux.io_uring_cqe) !void {
const slot_idx = udSlot(cqe.user_data);
const slot = &slots[slot_idx];
const fd = udFd(cqe.user_data);
const slot = getSlot(fd) orelse return;
if (cqe.res <= 0) {
_ = try ring.close(ud(.close, slot_idx), slot.fd);
_ = try ring.close(ud(.close, fd), fd);
return;
}
const n: u32 = @intCast(cqe.res);
slot.send_off += n;
if (slot.send_off < slot.send_len) {
// Partial send — replay from wherever the active buffer is
// (inline or big), tracked by send_ptr.
// Partial send — replay from wherever the active buffer is.
const tail = slot.send_ptr[slot.send_off..slot.send_len];
_ = try ring.send(ud(.send, slot_idx), slot.fd, tail, linux.MSG.NOSIGNAL);
_ = try ring.send(ud(.send, fd), fd, tail, linux.MSG.NOSIGNAL);
return;
}
if (slot.close_after_send) {
_ = try ring.close(ud(.close, slot_idx), slot.fd);
_ = try ring.close(ud(.close, fd), fd);
return;
}
// Keep-alive: drain any further pipelined requests already buffered.
try drainAndSend(ring, slot_idx, 0);
try drainAndSend(ring, slot, 0);
}

/// Drain as many complete requests as fit in the inline write buffer,
Expand All @@ -331,8 +346,7 @@ fn handleSend(ring: *IoUring, cqe: *linux.io_uring_cqe) !void {
/// We dispatch all complete requests buffered before submitting one batched
/// send — saves N-1 syscalls and avoids a deadlock that the older
/// "recv → dispatch one → send → recv" code path hit on pipelined input.
fn drainAndSend(ring: *IoUring, slot_idx: u32, initial_feed_n: u32) !void {
const slot = &slots[slot_idx];
fn drainAndSend(ring: *IoUring, slot: *Slot, initial_feed_n: u32) !void {
var inline_pos: u32 = 0;
var feed_n = initial_feed_n;

Expand All @@ -342,63 +356,56 @@ fn drainAndSend(ring: *IoUring, slot_idx: u32, initial_feed_n: u32) !void {
switch (result) {
.protocol_error => {
if (inline_pos > 0) {
submitInline(ring, slot_idx, inline_pos, true) catch {};
submitInline(ring, slot, inline_pos, true) catch {};
} else {
_ = try ring.close(ud(.close, slot_idx), slot.fd);
_ = try ring.close(ud(.close, slot.fd), slot.fd);
}
return;
},
.need_more => {
if (inline_pos > 0) {
try submitInline(ring, slot_idx, inline_pos, false);
try submitInline(ring, slot, inline_pos, false);
return;
}
const buf = slot.parser.recv_slot();
if (buf.len == 0) {
_ = try ring.close(ud(.close, slot_idx), slot.fd);
_ = try ring.close(ud(.close, slot.fd), slot.fd);
return;
}
_ = try ring.recv(ud(.recv, slot_idx), slot.fd, .{ .buffer = buf }, 0);
_ = try ring.recv(ud(.recv, slot.fd), slot.fd, .{ .buffer = buf }, 0);
return;
},
.ready => |req| {
const needs_big = std.mem.startsWith(u8, req.path, "/json/");
if (needs_big) {
// JSON responses don't fit in the 4 KiB inline buffer
// and can't be batched with other responses. Flush any
// queued inline bytes first, then dispatch JSON into
// big_buf and submit it as its own send.
if (inline_pos > 0) {
// Defer JSON to the next drain pass after the
// inline batch flushes — leave the parser pointing
// at this request by NOT resetting yet. handleSend
// completion will call drainAndSend(0) and feed(0)
// will re-yield this same request.
try submitInline(ring, slot_idx, inline_pos, false);
// Flush queued non-JSON bytes first; the JSON
// request is re-yielded on the next drain pass
// because we haven't called parser.reset yet.
try submitInline(ring, slot, inline_pos, false);
return;
}
if (slot.big_idx == null) {
slot.big_idx = bigAcquire() orelse {
// Pool exhausted — refuse this connection.
_ = try ring.close(ud(.close, slot_idx), slot.fd);
_ = try ring.close(ud(.close, slot.fd), slot.fd);
return;
};
}
const big = bigSlice(slot.big_idx.?);
const resp = handlers.handle(req, &ds, big);
slot.parser.reset(slot.parser.consumed());
try submitBig(ring, slot_idx, @intCast(resp.bytes.len), resp.close);
try submitBig(ring, slot, @intCast(resp.bytes.len), resp.close);
return;
}
const resp = handlers.handle(req, &ds, slot.write_inline[inline_pos..]);
inline_pos += @intCast(resp.bytes.len);
slot.parser.reset(slot.parser.consumed());
if (resp.close) {
try submitInline(ring, slot_idx, inline_pos, true);
try submitInline(ring, slot, inline_pos, true);
return;
}
if (inline_pos > WRITE_INLINE_FLUSH_AT) {
try submitInline(ring, slot_idx, inline_pos, false);
try submitInline(ring, slot, inline_pos, false);
return;
}
// Loop: feed(0) for the next pipelined request.
Expand All @@ -411,21 +418,19 @@ fn bigSlice(idx: u32) []u8 {
return &big_pool[idx];
}

fn submitInline(ring: *IoUring, slot_idx: u32, len: u32, close_after: bool) !void {
const slot = &slots[slot_idx];
fn submitInline(ring: *IoUring, slot: *Slot, len: u32, close_after: bool) !void {
slot.send_ptr = &slot.write_inline;
slot.send_len = len;
slot.send_off = 0;
slot.close_after_send = close_after;
_ = try ring.send(ud(.send, slot_idx), slot.fd, slot.write_inline[0..len], linux.MSG.NOSIGNAL);
_ = try ring.send(ud(.send, slot.fd), slot.fd, slot.write_inline[0..len], linux.MSG.NOSIGNAL);
}

fn submitBig(ring: *IoUring, slot_idx: u32, len: u32, close_after: bool) !void {
const slot = &slots[slot_idx];
fn submitBig(ring: *IoUring, slot: *Slot, len: u32, close_after: bool) !void {
const buf = bigSlice(slot.big_idx.?);
slot.send_ptr = buf.ptr;
slot.send_len = len;
slot.send_off = 0;
slot.close_after_send = close_after;
_ = try ring.send(ud(.send, slot_idx), slot.fd, buf[0..len], linux.MSG.NOSIGNAL);
_ = try ring.send(ud(.send, slot.fd), slot.fd, buf[0..len], linux.MSG.NOSIGNAL);
}
16 changes: 8 additions & 8 deletions site/data/baseline-4096.json
Original file line number Diff line number Diff line change
Expand Up @@ -1326,19 +1326,19 @@
{
"framework": "zeemo",
"language": "Zig",
"rps": 4435413,
"avg_latency": "923us",
"p99_latency": "1.22ms",
"cpu": "6406.3%",
"memory": "130MiB",
"rps": 4429132,
"avg_latency": "924us",
"p99_latency": "1.27ms",
"cpu": "6407.2%",
"memory": "110MiB",
"connections": 4096,
"threads": 64,
"duration": "5s",
"pipeline": 1,
"bandwidth": "279.07MB/s",
"input_bw": "342.63MB/s",
"bandwidth": "278.66MB/s",
"input_bw": "342.14MB/s",
"reconnects": 0,
"status_2xx": 22177068,
"status_2xx": 22145662,
"status_3xx": 0,
"status_4xx": 0,
"status_5xx": 0
Expand Down
12 changes: 6 additions & 6 deletions site/data/baseline-512.json
Original file line number Diff line number Diff line change
Expand Up @@ -1326,19 +1326,19 @@
{
"framework": "zeemo",
"language": "Zig",
"rps": 4120722,
"rps": 4113749,
"avg_latency": "123us",
"p99_latency": "238us",
"cpu": "6267.1%",
"p99_latency": "218us",
"cpu": "6278.0%",
"memory": "69MiB",
"connections": 512,
"threads": 64,
"duration": "5s",
"pipeline": 1,
"bandwidth": "259.30MB/s",
"input_bw": "318.32MB/s",
"bandwidth": "258.86MB/s",
"input_bw": "317.78MB/s",
"reconnects": 0,
"status_2xx": 20603611,
"status_2xx": 20568745,
"status_3xx": 0,
"status_4xx": 0,
"status_5xx": 0
Expand Down
Loading