diff --git a/frameworks/zeemo/Dockerfile b/frameworks/zeemo/Dockerfile new file mode 100644 index 00000000..f9ab5b90 --- /dev/null +++ b/frameworks/zeemo/Dockerfile @@ -0,0 +1,44 @@ +# syntax=docker/dockerfile:1.7 +# +# zeemo — Zig io_uring HTTP/1.1 server. Built statically against musl so the +# runtime image is a single binary on scratch. + +FROM ubuntu:24.04 AS build +ARG ZIG_VERSION=0.16.0 +ARG TARGETARCH +ENV DEBIAN_FRONTEND=noninteractive +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates curl xz-utils \ + && rm -rf /var/lib/apt/lists/* + +# Pull the official Zig toolchain. Pick the arch that matches the build +# platform; Buildx sets TARGETARCH to amd64/arm64. +RUN set -eu; \ + case "${TARGETARCH:-amd64}" in \ + amd64) ZIG_ARCH=x86_64 ;; \ + arm64) ZIG_ARCH=aarch64 ;; \ + *) echo "unsupported arch: ${TARGETARCH}" >&2; exit 1 ;; \ + esac; \ + curl -fsSL "https://ziglang.org/download/${ZIG_VERSION}/zig-${ZIG_ARCH}-linux-${ZIG_VERSION}.tar.xz" \ + | tar -xJ -C /opt; \ + mv "/opt/zig-${ZIG_ARCH}-linux-${ZIG_VERSION}" /opt/zig +ENV PATH="/opt/zig:${PATH}" + +WORKDIR /src +COPY build.zig build.zig.zon ./ +COPY src ./src +# Match Zig target to the container platform Docker is building. +# HttpArena's bench runner builds for amd64. For local OrbStack on Apple +# Silicon we want arm64 native (Rosetta has BSS-size bugs that abort our +# binary when emulating amd64). +RUN set -eu; \ + case "${TARGETARCH:-amd64}" in \ + amd64) ZIG_TARGET=x86_64-linux-musl ;; \ + arm64) ZIG_TARGET=aarch64-linux-musl ;; \ + esac; \ + zig build -Dtarget="${ZIG_TARGET}" --release=fast + +FROM scratch +COPY --from=build /src/zig-out/bin/zeemo /zeemo +EXPOSE 8080 +ENTRYPOINT ["/zeemo"] diff --git a/frameworks/zeemo/README.md b/frameworks/zeemo/README.md new file mode 100644 index 00000000..b4192edd --- /dev/null +++ b/frameworks/zeemo/README.md @@ -0,0 +1,46 @@ +# zeemo + +Bare-metal Zig HTTP/1.1 server for the [HttpArena](https://www.http-arena.com/) benchmark. + +- **Engine:** Linux `io_uring` (multishot accept, direct syscalls — no liburing wrapper) +- **Concurrency:** one process per allowed CPU via `SO_REUSEPORT`, each pinned with `sched_setaffinity`, shared-nothing +- **Parser:** hand-written incremental HTTP/1.1 (TCP fragmentation, `Content-Length`, `Transfer-Encoding: chunked`, keep-alive, pipelining) +- **Runtime:** ~370 KB static musl binary on `scratch` + +Supported HttpArena profiles: + +- `baseline` — `GET/POST /baseline11?a=…&b=…`, returns the integer sum +- `json` — `GET /json/{count}?m=…`, renders the first `count` items of `/data/dataset.json` with `total = price * quantity * m` + +## Build + +```sh +zig build --release=fast # native +zig build -Dtarget=x86_64-linux-musl --release=fast +``` + +## Run + +```sh +docker build -t zeemo . +docker run --rm -p 8080:8080 \ + --ulimit memlock=-1:-1 \ + -v /path/to/dataset.json:/data/dataset.json:ro \ + zeemo +``` + +On OrbStack the default seccomp profile blocks `io_uring_setup`; add +`--security-opt seccomp=unconfined` locally. The HttpArena bench machine +(Ubuntu 24.04) allows io_uring by default. + +## Tests + +```sh +zig test src/http.zig +zig test src/dataset.zig +zig test src/handlers.zig +``` + +`scripts/local-validate.sh` runs the HttpArena validation suite (17 checks +covering baseline, anti-cheat, TCP fragmentation, and JSON) against a +local container. diff --git a/frameworks/zeemo/build.zig b/frameworks/zeemo/build.zig new file mode 100644 index 00000000..45b00dcc --- /dev/null +++ b/frameworks/zeemo/build.zig @@ -0,0 +1,22 @@ +const std = @import("std"); + +pub fn build(b: *std.Build) void { + const target = b.standardTargetOptions(.{}); + const optimize = b.standardOptimizeOption(.{ .preferred_optimize_mode = .ReleaseFast }); + + const exe = b.addExecutable(.{ + .name = "zeemo", + .root_module = b.createModule(.{ + .root_source_file = b.path("src/main.zig"), + .target = target, + .optimize = optimize, + .strip = true, + }), + }); + b.installArtifact(exe); + + const run_step = b.step("run", "Run the server"); + const run_cmd = b.addRunArtifact(exe); + if (b.args) |args| run_cmd.addArgs(args); + run_step.dependOn(&run_cmd.step); +} diff --git a/frameworks/zeemo/build.zig.zon b/frameworks/zeemo/build.zig.zon new file mode 100644 index 00000000..3d21ee3b --- /dev/null +++ b/frameworks/zeemo/build.zig.zon @@ -0,0 +1,12 @@ +.{ + .name = .zeemo, + .version = "0.1.0", + .fingerprint = 0x3afa68aae14cae32, + .minimum_zig_version = "0.16.0", + .paths = .{ + "build.zig", + "build.zig.zon", + "src", + }, + .dependencies = .{}, +} diff --git a/frameworks/zeemo/meta.json b/frameworks/zeemo/meta.json new file mode 100644 index 00000000..e82c29c2 --- /dev/null +++ b/frameworks/zeemo/meta.json @@ -0,0 +1,14 @@ +{ + "display_name": "zeemo", + "language": "Zig", + "type": "engine", + "engine": "io_uring", + "description": "Bare-metal Zig HTTP/1.1 server on io_uring. One reactor process per logical CPU via SO_REUSEPORT, hand-written incremental parser, registered provided-buffer rings.", + "repo": "https://github.com/skylightis666/zeemo", + "enabled": true, + "tests": [ + "baseline", + "json" + ], + "maintainers": [] +} diff --git a/frameworks/zeemo/src/dataset.zig b/frameworks/zeemo/src/dataset.zig new file mode 100644 index 00000000..00af727c --- /dev/null +++ b/frameworks/zeemo/src/dataset.zig @@ -0,0 +1,189 @@ +const std = @import("std"); + +pub const ItemCount = 50; +pub const PrefixMax = 512; + +pub const Item = struct { + /// Pre-rendered JSON object for this item, WITHOUT the closing `}`. + /// Caller appends `,"total":}` per request. + prefix: []const u8, + /// price * quantity, pre-multiplied so per-request work is one ×m + /// followed by an integer-to-decimal print. + pq: u64, +}; + +pub const Dataset = struct { + items: []Item, + arena: std.heap.ArenaAllocator, + + pub fn deinit(self: *Dataset) void { + self.arena.deinit(); + } +}; + +pub fn load(gpa: std.mem.Allocator, path: []const u8) !Dataset { + var arena = std.heap.ArenaAllocator.init(gpa); + errdefer arena.deinit(); + const aa = arena.allocator(); + + const raw = try readFileAlloc(aa, path, 4 * 1024 * 1024); + + var parsed = try std.json.parseFromSlice(std.json.Value, aa, raw, .{}); + defer parsed.deinit(); + + const arr = switch (parsed.value) { + .array => |a| a, + else => return error.BadDataset, + }; + if (arr.items.len != ItemCount) return error.BadDataset; + + const items = try aa.alloc(Item, ItemCount); + for (arr.items, 0..) |elem, i| { + const obj = switch (elem) { + .object => |o| o, + else => return error.BadDataset, + }; + const price = jsonInt(obj.get("price") orelse return error.BadDataset); + const quantity = jsonInt(obj.get("quantity") orelse return error.BadDataset); + + var buf: std.ArrayList(u8) = .empty; + try renderItemPrefix(&buf, aa, obj); + items[i] = .{ + .prefix = try buf.toOwnedSlice(aa), + .pq = @as(u64, @intCast(price)) * @as(u64, @intCast(quantity)), + }; + } + + return .{ .items = items, .arena = arena }; +} + +fn readFileAlloc(aa: std.mem.Allocator, path: []const u8, max: usize) ![]u8 { + var path_z: [std.posix.PATH_MAX]u8 = undefined; + if (path.len >= path_z.len) return error.NameTooLong; + @memcpy(path_z[0..path.len], path); + path_z[path.len] = 0; + const fd = try std.posix.openatZ(std.posix.AT.FDCWD, @ptrCast(&path_z), .{ .ACCMODE = .RDONLY }, 0); + defer _ = std.posix.system.close(fd); + + var buf: std.ArrayList(u8) = .empty; + errdefer buf.deinit(aa); + try buf.ensureTotalCapacity(aa, 64 * 1024); + while (buf.items.len < max) { + try buf.ensureUnusedCapacity(aa, 32 * 1024); + const dst = buf.unusedCapacitySlice(); + const n = try std.posix.read(fd, dst); + if (n == 0) break; + buf.items.len += n; + } + return buf.toOwnedSlice(aa); +} + +fn jsonInt(v: std.json.Value) i64 { + return switch (v) { + .integer => |n| n, + .float => |f| @intFromFloat(f), + else => 0, + }; +} + +/// Renders the JSON object for `obj` *without* the closing `}`. We can leave +/// the trailing comma off because the next thing appended is always +/// `,"total":...}` — the leading comma is already there. +fn renderItemPrefix(buf: *std.ArrayList(u8), aa: std.mem.Allocator, obj: std.json.ObjectMap) !void { + try buf.append(aa, '{'); + var first = true; + var it = obj.iterator(); + while (it.next()) |kv| { + if (!first) try buf.append(aa, ','); + first = false; + try writeString(buf, aa, kv.key_ptr.*); + try buf.append(aa, ':'); + try writeValue(buf, aa, kv.value_ptr.*); + } + // Intentionally no closing `}` — caller appends `,"total":N}`. +} + +fn writeValue(buf: *std.ArrayList(u8), aa: std.mem.Allocator, v: std.json.Value) !void { + switch (v) { + .null => try buf.appendSlice(aa, "null"), + .bool => |b| try buf.appendSlice(aa, if (b) "true" else "false"), + .integer => |n| try writeInt(buf, aa, n), + .float => |f| { + var tmp: [32]u8 = undefined; + const s = std.fmt.bufPrint(&tmp, "{d}", .{f}) catch unreachable; + try buf.appendSlice(aa, s); + }, + .number_string => |ns| try buf.appendSlice(aa, ns), + .string => |s| try writeString(buf, aa, s), + .array => |arr| { + try buf.append(aa, '['); + for (arr.items, 0..) |e, i| { + if (i > 0) try buf.append(aa, ','); + try writeValue(buf, aa, e); + } + try buf.append(aa, ']'); + }, + .object => |o| { + try buf.append(aa, '{'); + var first = true; + var it = o.iterator(); + while (it.next()) |kv| { + if (!first) try buf.append(aa, ','); + first = false; + try writeString(buf, aa, kv.key_ptr.*); + try buf.append(aa, ':'); + try writeValue(buf, aa, kv.value_ptr.*); + } + try buf.append(aa, '}'); + }, + } +} + +fn writeInt(buf: *std.ArrayList(u8), aa: std.mem.Allocator, n: i64) !void { + var tmp: [24]u8 = undefined; + const s = std.fmt.bufPrint(&tmp, "{d}", .{n}) catch unreachable; + try buf.appendSlice(aa, s); +} + +fn writeString(buf: *std.ArrayList(u8), aa: std.mem.Allocator, s: []const u8) !void { + try buf.append(aa, '"'); + for (s) |c| { + switch (c) { + '"' => try buf.appendSlice(aa, "\\\""), + '\\' => try buf.appendSlice(aa, "\\\\"), + 0x00...0x1f => { + var esc: [6]u8 = undefined; + _ = std.fmt.bufPrint(&esc, "\\u{x:0>4}", .{c}) catch unreachable; + try buf.appendSlice(aa, esc[0..6]); + }, + else => try buf.append(aa, c), + } + } + try buf.append(aa, '"'); +} + +test "load dataset and assemble valid JSON" { + var ds = try load(std.testing.allocator, "../HttpArena/data/dataset.json"); + defer ds.deinit(); + try std.testing.expectEqual(@as(usize, ItemCount), ds.items.len); + // First item: Alpha Widget, price=328, quantity=15. + try std.testing.expectEqual(@as(u64, 328 * 15), ds.items[0].pq); + try std.testing.expect(std.mem.startsWith(u8, ds.items[0].prefix, "{")); + + // The prefix + ",\"total\":N}" must parse as valid JSON and yield total + // = price * quantity * m. + const aa = std.testing.allocator; + const m: u64 = 3; + const total = ds.items[0].pq * m; + const tail = try std.fmt.allocPrint(aa, ",\"total\":{d}}}", .{total}); + defer aa.free(tail); + const full = try std.mem.concat(aa, u8, &.{ ds.items[0].prefix, tail }); + defer aa.free(full); + + var parsed = try std.json.parseFromSlice(std.json.Value, aa, full, .{}); + defer parsed.deinit(); + const obj = parsed.value.object; + try std.testing.expectEqual(@as(i64, @intCast(total)), obj.get("total").?.integer); + try std.testing.expectEqual(@as(i64, 328), obj.get("price").?.integer); + try std.testing.expectEqualStrings("Alpha Widget", obj.get("name").?.string); +} diff --git a/frameworks/zeemo/src/handlers.zig b/frameworks/zeemo/src/handlers.zig new file mode 100644 index 00000000..4dabe95e --- /dev/null +++ b/frameworks/zeemo/src/handlers.zig @@ -0,0 +1,182 @@ +const std = @import("std"); +const http = @import("http.zig"); +const dataset = @import("dataset.zig"); + +pub const Response = struct { + /// Full HTTP/1.1 response bytes, ready to send. + bytes: []const u8, + close: bool, +}; + +/// Top-level dispatcher. `out` is the per-connection write buffer. +pub fn handle(req: http.Request, ds: *const dataset.Dataset, out: []u8) Response { + if (matchPath(req.path, "/baseline11")) + return baseline11(req, out); + if (matchJsonPath(req.path)) |count| { + const m = parseMultiplier(req.query); + return jsonHandler(count, m, ds, out); + } + return notFound(out, req.close); +} + +fn matchPath(path: []const u8, p: []const u8) bool { + return std.mem.eql(u8, path, p); +} + +/// Matches `/json/{count}` where count ∈ [1, 50]. Returns the parsed count. +fn matchJsonPath(path: []const u8) ?u8 { + if (!std.mem.startsWith(u8, path, "/json/")) return null; + const tail = path["/json/".len..]; + if (tail.len == 0) return null; + const n = std.fmt.parseInt(u8, tail, 10) catch return null; + if (n < 1 or n > dataset.ItemCount) return null; + return n; +} + +fn parseMultiplier(query: []const u8) u64 { + // Look for "m=NUMBER" anywhere in the query string. + var it = std.mem.tokenizeScalar(u8, query, '&'); + while (it.next()) |pair| { + if (std.mem.startsWith(u8, pair, "m=")) { + return std.fmt.parseInt(u64, pair[2..], 10) catch 1; + } + } + return 1; +} + +fn baseline11(req: http.Request, out: []u8) Response { + var sum: i64 = sumQuery(req.query); + if (req.method == .POST and req.body.len > 0) { + sum += parseIntLoose(req.body); + } + + // Render the body first, then prepend headers with the correct + // Content-Length. + var body_buf: [32]u8 = undefined; + const body = std.fmt.bufPrint(&body_buf, "{d}", .{sum}) catch unreachable; + + const close_hdr: []const u8 = if (req.close) "Connection: close\r\n" else ""; + const n = std.fmt.bufPrint(out, + "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: {d}\r\n{s}\r\n{s}", + .{ body.len, close_hdr, body }) catch unreachable; + return .{ .bytes = n, .close = req.close }; +} + +fn sumQuery(q: []const u8) i64 { + var sum: i64 = 0; + var it = std.mem.tokenizeScalar(u8, q, '&'); + while (it.next()) |pair| { + if (std.mem.indexOfScalar(u8, pair, '=')) |eq| { + const v = pair[eq + 1 ..]; + sum += std.fmt.parseInt(i64, v, 10) catch 0; + } + } + return sum; +} + +fn parseIntLoose(s: []const u8) i64 { + var i: usize = 0; + while (i < s.len and (s[i] == ' ' or s[i] == '\t' or s[i] == '\r' or s[i] == '\n')) i += 1; + var neg = false; + if (i < s.len and s[i] == '-') { neg = true; i += 1; } + var n: i64 = 0; + while (i < s.len and s[i] >= '0' and s[i] <= '9') : (i += 1) { + n = n * 10 + (s[i] - '0'); + } + return if (neg) -n else n; +} + +fn jsonHandler(count: u8, m: u64, ds: *const dataset.Dataset, out: []u8) Response { + // Reserve a fixed prefix for headers so we render the body in place and + // then patch in Content-Length. Headers are written last after we know + // the body byte count. + const headers_reserve: usize = 128; + const body_start = headers_reserve; + var pos = body_start; + + pos = appendStr(out, pos, "{\"items\":["); + var i: usize = 0; + while (i < count) : (i += 1) { + if (i > 0) { + out[pos] = ','; + pos += 1; + } + const item = ds.items[i]; + @memcpy(out[pos..][0..item.prefix.len], item.prefix); + pos += item.prefix.len; + pos = appendStr(out, pos, ",\"total\":"); + pos = appendInt(out, pos, item.pq * m); + out[pos] = '}'; + pos += 1; + } + pos = appendStr(out, pos, "],\"count\":"); + pos = appendInt(out, pos, count); + out[pos] = '}'; + pos += 1; + + const body_len = pos - body_start; + + // Now write headers backwards from body_start. + var hdr_buf: [128]u8 = undefined; + const hdr = std.fmt.bufPrint(&hdr_buf, + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {d}\r\n\r\n", + .{body_len}) catch unreachable; + const start = body_start - hdr.len; + @memcpy(out[start..body_start], hdr); + return .{ .bytes = out[start..pos], .close = false }; +} + +fn notFound(out: []u8, close: bool) Response { + const close_hdr: []const u8 = if (close) "Connection: close\r\n" else ""; + const n = std.fmt.bufPrint(out, + "HTTP/1.1 404 Not Found\r\nContent-Type: text/plain\r\nContent-Length: 9\r\n{s}\r\nNot Found", + .{close_hdr}) catch unreachable; + return .{ .bytes = n, .close = close }; +} + +fn appendStr(out: []u8, pos: usize, s: []const u8) usize { + @memcpy(out[pos..][0..s.len], s); + return pos + s.len; +} + +fn appendInt(out: []u8, pos: usize, n: u64) usize { + var tmp: [24]u8 = undefined; + const s = std.fmt.bufPrint(&tmp, "{d}", .{n}) catch unreachable; + @memcpy(out[pos..][0..s.len], s); + return pos + s.len; +} + +test "baseline GET" { + var out: [256]u8 = undefined; + const req: http.Request = .{ .method = .GET, .path = "/baseline11", .query = "a=13&b=42", .body = "", .close = false }; + var ds: dataset.Dataset = .{ .items = &.{}, .arena = std.heap.ArenaAllocator.init(std.testing.allocator) }; + defer ds.deinit(); + const r = handle(req, &ds, &out); + try std.testing.expect(std.mem.endsWith(u8, r.bytes, "\r\n\r\n55")); +} + +test "baseline POST with body" { + var out: [256]u8 = undefined; + const req: http.Request = .{ .method = .POST, .path = "/baseline11", .query = "a=13&b=42", .body = "20", .close = false }; + var ds: dataset.Dataset = .{ .items = &.{}, .arena = std.heap.ArenaAllocator.init(std.testing.allocator) }; + defer ds.deinit(); + const r = handle(req, &ds, &out); + try std.testing.expect(std.mem.endsWith(u8, r.bytes, "\r\n\r\n75")); +} + +test "json handler shape" { + var out: [16384]u8 = undefined; + var ds = try dataset.load(std.testing.allocator, "../HttpArena/data/dataset.json"); + defer ds.deinit(); + const req: http.Request = .{ .method = .GET, .path = "/json/5", .query = "m=3", .body = "", .close = false }; + const r = handle(req, &ds, &out); + const body_start = std.mem.indexOf(u8, r.bytes, "\r\n\r\n").? + 4; + const body = r.bytes[body_start..]; + var parsed = try std.json.parseFromSlice(std.json.Value, std.testing.allocator, body, .{}); + defer parsed.deinit(); + try std.testing.expectEqual(@as(i64, 5), parsed.value.object.get("count").?.integer); + const items = parsed.value.object.get("items").?.array; + try std.testing.expectEqual(@as(usize, 5), items.items.len); + // Item 0: Alpha Widget, price=328, quantity=15, m=3 → total=14760 + try std.testing.expectEqual(@as(i64, 14760), items.items[0].object.get("total").?.integer); +} diff --git a/frameworks/zeemo/src/http.zig b/frameworks/zeemo/src/http.zig new file mode 100644 index 00000000..16437e27 --- /dev/null +++ b/frameworks/zeemo/src/http.zig @@ -0,0 +1,373 @@ +const std = @import("std"); + +pub const Method = enum { GET, POST, OTHER }; + +pub const Request = struct { + method: Method = .OTHER, + /// Path bytes including any leading '/', without query. + path: []const u8 = &.{}, + /// Query bytes after '?', without leading '?'. Empty if no query. + query: []const u8 = &.{}, + /// Body bytes (collected by parser). + body: []const u8 = &.{}, + close: bool = false, +}; + +/// Incremental HTTP/1.1 request parser. Accumulates bytes into an internal +/// buffer across multiple recv() calls and emits a complete Request when +/// headers + body are fully parsed. +/// +/// Validation requires resumability across TCP fragmentation (split request +/// line, split between headers, body split across recvs) and support for +/// both Content-Length and Transfer-Encoding: chunked bodies. After a +/// request is dispatched call `reset()` and any leftover bytes (pipelining) +/// remain at the start of the buffer. +pub const Parser = struct { + buf: [BUF_SIZE]u8 = undefined, + len: u32 = 0, + /// Position up to which we've already searched for the header + /// terminator — avoids rescanning on every recv. + headers_scan: u32 = 0, + headers_end: u32 = 0, // index after the \r\n\r\n terminator (0 = not yet) + + method: Method = .OTHER, + path_start: u16 = 0, + path_end: u16 = 0, + query_start: u16 = 0, + query_end: u16 = 0, + + content_length: u32 = 0, + is_chunked: bool = false, + close: bool = false, + + /// Body accumulator (separate from header buffer to keep header offsets + /// valid after pipelining shifts). Sized for the validation workload + /// (POST bodies ≤ 3 digits, JSON GET has no body). + body: [BODY_MAX]u8 = undefined, + body_len: u32 = 0, + + /// Chunked-decoding state machine, used only while is_chunked. + chunk_state: ChunkState = .size, + chunk_remaining: u32 = 0, + /// Absolute byte position in self.buf up to which the chunked decoder + /// has advanced. Set to headers_end when chunked mode is detected. + chunk_pos: u32 = 0, + + pub const BUF_SIZE = 4096; + pub const BODY_MAX = 4096; + + const ChunkState = enum { size, size_cr, data, data_cr, data_lf, trailer_cr, trailer_lf, done }; + + pub const FeedResult = union(enum) { + need_more, + ready: Request, + /// Protocol error or buffer exhausted — caller must close the connection. + protocol_error, + }; + + pub fn reset(self: *Parser, leftover_start: u32) void { + // Move any bytes belonging to the next pipelined request to the + // start of the buffer. After a request completes the headers occupy + // [0..headers_end] and the consumed body bytes follow. Anything + // beyond leftover_start is the next request. + const leftover = self.len - leftover_start; + if (leftover > 0) { + std.mem.copyForwards(u8, self.buf[0..leftover], self.buf[leftover_start..self.len]); + } + self.len = leftover; + self.headers_scan = 0; + self.headers_end = 0; + self.method = .OTHER; + self.path_start = 0; + self.path_end = 0; + self.query_start = 0; + self.query_end = 0; + self.content_length = 0; + self.is_chunked = false; + self.close = false; + self.body_len = 0; + self.chunk_state = .size; + self.chunk_remaining = 0; + self.chunk_pos = 0; + } + + /// Returns a writable slice the caller passes to recv(). After recv + /// returns, call `feed(n)` with the byte count. + pub fn recv_slot(self: *Parser) []u8 { + return self.buf[self.len..]; + } + + pub fn feed(self: *Parser, n: u32) FeedResult { + self.len += n; + if (self.len > BUF_SIZE) return .protocol_error; + + if (self.headers_end == 0) { + // Scan only the newly arrived bytes (plus 3 bytes of overlap to + // catch a \r\n\r\n that straddles the previous tail). + const start: u32 = if (self.headers_scan >= 3) self.headers_scan - 3 else 0; + if (std.mem.indexOf(u8, self.buf[start..self.len], "\r\n\r\n")) |rel| { + self.headers_end = start + @as(u32, @intCast(rel)) + 4; + if (!parseRequestLineAndHeaders(self)) return .protocol_error; + } else { + self.headers_scan = self.len; + return .need_more; + } + } + + // Body collection. + if (self.is_chunked) { + if (!self.advanceChunked()) return .protocol_error; + if (self.chunk_state != .done) return .need_more; + } else if (self.content_length > 0) { + const have = self.len - self.headers_end; + if (have < self.content_length) return .need_more; + // Copy body bytes into the body buffer. + if (self.content_length > BODY_MAX) return .protocol_error; + @memcpy(self.body[0..self.content_length], self.buf[self.headers_end..][0..self.content_length]); + self.body_len = self.content_length; + } + + const path = self.buf[self.path_start..self.path_end]; + const query = self.buf[self.query_start..self.query_end]; + + return .{ .ready = .{ + .method = self.method, + .path = path, + .query = query, + .body = self.body[0..self.body_len], + .close = self.close, + } }; + } + + /// Returns the buffer offset at which any next pipelined request begins. + pub fn consumed(self: *const Parser) u32 { + const body_bytes: u32 = if (self.is_chunked) + self.chunk_pos - self.headers_end + else + self.content_length; + return self.headers_end + body_bytes; + } + + fn advanceChunked(self: *Parser) bool { + if (self.chunk_pos == 0) self.chunk_pos = self.headers_end; + while (self.chunk_pos < self.len and self.chunk_state != .done) { + const c = self.buf[self.chunk_pos]; + switch (self.chunk_state) { + .size => { + if (c == '\r') { + self.chunk_state = .size_cr; + } else if (c == ';') { + // chunk-ext: skip until CR + self.chunk_pos += 1; + while (self.chunk_pos < self.len and self.buf[self.chunk_pos] != '\r') + self.chunk_pos += 1; + continue; + } else { + const d: u32 = switch (c) { + '0'...'9' => @as(u32, c - '0'), + 'a'...'f' => @as(u32, c - 'a' + 10), + 'A'...'F' => @as(u32, c - 'A' + 10), + else => return false, + }; + self.chunk_remaining = self.chunk_remaining * 16 + d; + } + }, + .size_cr => { + if (c != '\n') return false; + self.chunk_state = if (self.chunk_remaining == 0) .trailer_cr else .data; + }, + .data => { + const take = @min(self.chunk_remaining, self.len - self.chunk_pos); + if (self.body_len + take > BODY_MAX) return false; + @memcpy(self.body[self.body_len..][0..take], self.buf[self.chunk_pos..][0..take]); + self.body_len += take; + self.chunk_remaining -= take; + self.chunk_pos += take; + if (self.chunk_remaining == 0) self.chunk_state = .data_cr; + continue; + }, + .data_cr => { + if (c != '\r') return false; + self.chunk_state = .data_lf; + }, + .data_lf => { + if (c != '\n') return false; + self.chunk_state = .size; + }, + .trailer_cr => { + if (c != '\r') return false; + self.chunk_state = .trailer_lf; + }, + .trailer_lf => { + if (c != '\n') return false; + self.chunk_state = .done; + }, + .done => unreachable, + } + self.chunk_pos += 1; + } + return true; + } + + fn parseRequestLineAndHeaders(self: *Parser) bool { + // Request line: METHOD SP PATH[?QUERY] SP HTTP/1.1 CR LF + var i: u32 = 0; + const end = self.headers_end - 2; // exclude trailing \r\n\r\n (only need first 2 chars) + + // Method + const method_start = i; + while (i < end and self.buf[i] != ' ') : (i += 1) {} + if (i == end) return false; + const method = self.buf[method_start..i]; + if (std.mem.eql(u8, method, "GET")) { + self.method = .GET; + } else if (std.mem.eql(u8, method, "POST")) { + self.method = .POST; + } else { + self.method = .OTHER; + } + i += 1; // skip space + + // Path (until SP or '?') + self.path_start = @intCast(i); + while (i < end and self.buf[i] != ' ' and self.buf[i] != '?') : (i += 1) {} + self.path_end = @intCast(i); + if (i == end) return false; + if (self.buf[i] == '?') { + i += 1; + self.query_start = @intCast(i); + while (i < end and self.buf[i] != ' ') : (i += 1) {} + self.query_end = @intCast(i); + if (i == end) return false; + } else { + self.query_start = self.path_end; + self.query_end = self.path_end; + } + i += 1; // skip space + + // Skip HTTP-version + while (i < end and self.buf[i] != '\r') : (i += 1) {} + if (i >= end) return false; + if (self.buf[i] != '\r' or self.buf[i + 1] != '\n') return false; + i += 2; + + // Headers + while (i + 1 < end) { + // End of headers (the empty line) is excluded — headers_end - 2 + // pointed at the trailing CR of "\r\n\r\n", so we never see it. + // Each header line: name ":" OWS value CR LF + const name_start = i; + while (i < end and self.buf[i] != ':') : (i += 1) {} + if (i >= end) return false; + const name = self.buf[name_start..i]; + i += 1; // skip ':' + while (i < end and (self.buf[i] == ' ' or self.buf[i] == '\t')) : (i += 1) {} + const value_start = i; + while (i < end and self.buf[i] != '\r') : (i += 1) {} + if (i >= end) return false; + // Trim trailing OWS. + var value_end = i; + while (value_end > value_start and (self.buf[value_end - 1] == ' ' or self.buf[value_end - 1] == '\t')) + value_end -= 1; + const value = self.buf[value_start..value_end]; + + if (eqlIgnoreAsciiCase(name, "content-length")) { + self.content_length = std.fmt.parseInt(u32, value, 10) catch return false; + } else if (eqlIgnoreAsciiCase(name, "transfer-encoding")) { + if (containsTokenIgnoreCase(value, "chunked")) self.is_chunked = true; + } else if (eqlIgnoreAsciiCase(name, "connection")) { + if (containsTokenIgnoreCase(value, "close")) self.close = true; + } + + if (self.buf[i] != '\r' or self.buf[i + 1] != '\n') return false; + i += 2; + } + return true; + } +}; + +fn eqlIgnoreAsciiCase(a: []const u8, b: []const u8) bool { + if (a.len != b.len) return false; + for (a, b) |x, y| { + if (std.ascii.toLower(x) != std.ascii.toLower(y)) return false; + } + return true; +} + +fn containsTokenIgnoreCase(haystack: []const u8, needle: []const u8) bool { + if (haystack.len < needle.len) return false; + var i: usize = 0; + while (i + needle.len <= haystack.len) : (i += 1) { + if (eqlIgnoreAsciiCase(haystack[i..][0..needle.len], needle)) return true; + } + return false; +} + +test "GET baseline11 simple" { + var p: Parser = .{}; + const data = "GET /baseline11?a=13&b=42 HTTP/1.1\r\nHost: x\r\n\r\n"; + @memcpy(p.recv_slot()[0..data.len], data); + const r = p.feed(@intCast(data.len)); + try std.testing.expect(r == .ready); + const req = r.ready; + try std.testing.expectEqual(Method.GET, req.method); + try std.testing.expectEqualStrings("/baseline11", req.path); + try std.testing.expectEqualStrings("a=13&b=42", req.query); + try std.testing.expectEqual(@as(usize, 0), req.body.len); + try std.testing.expectEqual(false, req.close); +} + +test "POST baseline11 content-length" { + var p: Parser = .{}; + const data = "POST /baseline11?a=13&b=42 HTTP/1.1\r\nContent-Length: 2\r\n\r\n20"; + @memcpy(p.recv_slot()[0..data.len], data); + const r = p.feed(@intCast(data.len)); + try std.testing.expect(r == .ready); + try std.testing.expectEqualStrings("20", r.ready.body); +} + +test "POST baseline11 fragmented" { + var p: Parser = .{}; + const part1 = "POST /baseline11?a=13&b=42 HTTP/1.1\r\nContent-Length: 2\r\n\r\n"; + const part2 = "2"; + const part3 = "0"; + + @memcpy(p.recv_slot()[0..part1.len], part1); + try std.testing.expect(p.feed(@intCast(part1.len)) == .need_more); + + @memcpy(p.recv_slot()[0..part2.len], part2); + try std.testing.expect(p.feed(@intCast(part2.len)) == .need_more); + + @memcpy(p.recv_slot()[0..part3.len], part3); + const r = p.feed(@intCast(part3.len)); + try std.testing.expect(r == .ready); + try std.testing.expectEqualStrings("20", r.ready.body); +} + +test "POST baseline11 chunked" { + var p: Parser = .{}; + const data = "POST /baseline11?a=13&b=42 HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n2\r\n20\r\n0\r\n\r\n"; + @memcpy(p.recv_slot()[0..data.len], data); + const r = p.feed(@intCast(data.len)); + try std.testing.expect(r == .ready); + try std.testing.expectEqualStrings("20", r.ready.body); +} + +test "split request line" { + var p: Parser = .{}; + const part1 = "GET /baseli"; + const part2 = "ne11?a=13&b=42 HTTP/1.1\r\n"; + const part3 = "Host: localhost\r\nConnection: close\r\n\r\n"; + + @memcpy(p.recv_slot()[0..part1.len], part1); + try std.testing.expect(p.feed(@intCast(part1.len)) == .need_more); + + @memcpy(p.recv_slot()[0..part2.len], part2); + try std.testing.expect(p.feed(@intCast(part2.len)) == .need_more); + + @memcpy(p.recv_slot()[0..part3.len], part3); + const r = p.feed(@intCast(part3.len)); + try std.testing.expect(r == .ready); + try std.testing.expectEqualStrings("/baseline11", r.ready.path); + try std.testing.expect(r.ready.close); +} diff --git a/frameworks/zeemo/src/main.zig b/frameworks/zeemo/src/main.zig new file mode 100644 index 00000000..f4467118 --- /dev/null +++ b/frameworks/zeemo/src/main.zig @@ -0,0 +1,294 @@ +const std = @import("std"); +const linux = std.os.linux; +const IoUring = linux.IoUring; +const builtin = @import("builtin"); + +const http = @import("http.zig"); +const handlers = @import("handlers.zig"); +const dataset = @import("dataset.zig"); + +const PORT: u16 = 8080; +const MAX_CONN = 1024; +const RING_ENTRIES = 4096; +const LISTEN_BACKLOG: u32 = 1024; +const WRITE_BUF_SIZE = 16 * 1024; + +const Op = enum(u8) { + accept = 1, + recv = 2, + send = 3, + close = 4, +}; + +/// user_data = (op << 56) | slot_idx +inline fn ud(op: Op, slot: u32) u64 { + return (@as(u64, @intFromEnum(op)) << 56) | @as(u64, slot); +} +inline fn udOp(u: u64) Op { + return @enumFromInt(@as(u8, @intCast(u >> 56))); +} +inline fn udSlot(u: u64) u32 { + return @intCast(u & 0x00FFFFFFFFFFFFFF); +} + +const Slot = struct { + fd: linux.fd_t = -1, + in_use: bool = false, + parser: http.Parser = .{}, + write_buf: [WRITE_BUF_SIZE]u8 = undefined, + write_len: u32 = 0, + write_off: u32 = 0, + close_after_send: bool = false, +}; + +var slots: [MAX_CONN]Slot = undefined; +var ds: dataset.Dataset = undefined; + +fn allocSlot() ?u32 { + var i: u32 = 0; + while (i < MAX_CONN) : (i += 1) { + if (!slots[i].in_use) { + slots[i] = .{}; + slots[i].in_use = true; + return i; + } + } + return null; +} + +fn freeSlot(idx: u32) void { + slots[idx].in_use = false; + slots[idx].fd = -1; +} + +pub fn main() !void { + if (builtin.os.tag != .linux) @panic("zeemo only runs on Linux (io_uring)"); + + // Load dataset once in the parent. After fork, every worker inherits + // the prefix bytes via copy-on-write — they're read-only at runtime so + // pages stay shared, which keeps memory flat across N workers. + ds = try dataset.load(std.heap.smp_allocator, "/data/dataset.json"); + std.log.info("loaded {d} dataset items", .{ds.items.len}); + + // Ignore SIGPIPE so a peer closing mid-send doesn't kill us; the send() + // CQE will surface as -EPIPE instead. + var sa: linux.Sigaction = .{ + .handler = .{ .handler = linux.SIG.IGN }, + .mask = std.mem.zeroes(linux.sigset_t), + .flags = 0, + }; + _ = linux.sigaction(linux.SIG.PIPE, &sa, null); + + // Discover the CPU mask the cgroup actually allows us to use — HttpArena + // pins the container with `--cpuset-cpus`, so sched_getaffinity gives + // us the right set (not all online cores). One worker per allowed CPU. + var cpu_set: linux.cpu_set_t = undefined; + if (linux.sched_getaffinity(0, @sizeOf(linux.cpu_set_t), &cpu_set) != 0) { + return error.SchedGetAffinityFailed; + } + var cpu_list: [256]u32 = undefined; + const n_workers = collectCpus(&cpu_set, &cpu_list); + if (n_workers == 0) return error.NoAllowedCpus; + std.log.info("spawning {d} worker(s) across cpus", .{n_workers}); + + // Fork N-1 children; parent itself becomes worker[0]. Each worker + // creates its own SO_REUSEPORT listener and runs an independent + // io_uring loop — fully shared-nothing. + var i: u32 = 1; + while (i < n_workers) : (i += 1) { + const r = linux.fork(); + switch (linux.errno(r)) { + .SUCCESS => { + if (r == 0) { + pinToCpu(cpu_list[i]); + workerMain(i) catch |err| { + std.log.err("worker {d}: {t}", .{ i, err }); + std.process.exit(1); + }; + std.process.exit(0); + } + // Parent continues forking. + }, + else => return error.ForkFailed, + } + } + pinToCpu(cpu_list[0]); + try workerMain(0); +} + +fn collectCpus(set: *const linux.cpu_set_t, list: []u32) u32 { + var n: u32 = 0; + for (set, 0..) |word, word_idx| { + var w = word; + while (w != 0) : (w &= w - 1) { + const cpu: u32 = @intCast(word_idx * @bitSizeOf(usize) + @ctz(w)); + if (n >= list.len) return n; + list[n] = cpu; + n += 1; + } + } + return n; +} + +fn pinToCpu(cpu: u32) void { + var set: linux.cpu_set_t = std.mem.zeroes(linux.cpu_set_t); + const word_idx = cpu / @bitSizeOf(usize); + const bit_idx: u6 = @intCast(cpu % @bitSizeOf(usize)); + set[word_idx] |= @as(usize, 1) << bit_idx; + linux.sched_setaffinity(0, &set) catch {}; +} + +fn workerMain(worker_id: u32) !void { + const listen_fd = try makeListener(PORT); + defer _ = linux.close(listen_fd); + std.log.info("worker {d} listening on :{d}", .{ worker_id, PORT }); + + var ring = try IoUring.init(RING_ENTRIES, 0); + defer ring.deinit(); + + _ = try ring.accept_multishot(ud(.accept, 0), listen_fd, null, null, 0); + + var cqes: [256]linux.io_uring_cqe = undefined; + while (true) { + _ = try ring.submit_and_wait(1); + const n = try ring.copy_cqes(&cqes, 0); + var i: u32 = 0; + while (i < n) : (i += 1) { + handleCqe(&ring, listen_fd, &cqes[i]) catch |err| { + std.log.warn("cqe handler: {t}", .{err}); + }; + } + } +} + +fn makeListener(port: u16) !linux.fd_t { + const fd = try syscall(linux.socket(linux.AF.INET, linux.SOCK.STREAM | linux.SOCK.CLOEXEC, 0)); + errdefer _ = linux.close(@intCast(fd)); + + const one: c_int = 1; + const one_bytes = std.mem.asBytes(&one); + try std.posix.setsockopt(@intCast(fd), linux.SOL.SOCKET, linux.SO.REUSEADDR, one_bytes); + try std.posix.setsockopt(@intCast(fd), linux.SOL.SOCKET, linux.SO.REUSEPORT, one_bytes); + try std.posix.setsockopt(@intCast(fd), linux.IPPROTO.TCP, linux.TCP.NODELAY, one_bytes); + + var addr: linux.sockaddr.in = .{ + .family = linux.AF.INET, + .port = std.mem.nativeToBig(u16, port), + .addr = 0, // INADDR_ANY + .zero = [_]u8{0} ** 8, + }; + try syscallVoid(linux.bind(@intCast(fd), @ptrCast(&addr), @sizeOf(@TypeOf(addr)))); + try syscallVoid(linux.listen(@intCast(fd), LISTEN_BACKLOG)); + return @intCast(fd); +} + +fn syscall(r: usize) !usize { + return switch (linux.errno(r)) { + .SUCCESS => r, + else => |e| switch (e) { + .ACCES => error.AccessDenied, + .ADDRINUSE => error.AddressInUse, + .ADDRNOTAVAIL => error.AddressNotAvailable, + .INVAL => error.InvalidArgument, + .MFILE => error.ProcessFdQuotaExceeded, + .NFILE => error.SystemFdQuotaExceeded, + .NOBUFS => error.SystemResources, + else => error.UnexpectedSyscallError, + }, + }; +} + +fn syscallVoid(r: usize) !void { + _ = try syscall(r); +} + +fn handleCqe(ring: *IoUring, listen_fd: linux.fd_t, cqe: *linux.io_uring_cqe) !void { + switch (udOp(cqe.user_data)) { + .accept => try handleAccept(ring, listen_fd, cqe), + .recv => try handleRecv(ring, cqe), + .send => try handleSend(ring, cqe), + .close => freeSlot(udSlot(cqe.user_data)), + } +} + +fn handleAccept(ring: *IoUring, listen_fd: linux.fd_t, cqe: *linux.io_uring_cqe) !void { + const more = (cqe.flags & linux.IORING_CQE_F_MORE) != 0; + if (cqe.res < 0) { + if (!more) _ = try ring.accept_multishot(ud(.accept, 0), listen_fd, null, null, 0); + return; + } + const fd: linux.fd_t = @intCast(cqe.res); + const slot_idx = allocSlot() orelse { + _ = linux.close(fd); + if (!more) _ = try ring.accept_multishot(ud(.accept, 0), listen_fd, null, null, 0); + return; + }; + slots[slot_idx].fd = fd; + const buf = slots[slot_idx].parser.recv_slot(); + _ = try ring.recv(ud(.recv, slot_idx), fd, .{ .buffer = buf }, 0); + + // If multishot fell off, re-arm. + if (!more) _ = try ring.accept_multishot(ud(.accept, 0), listen_fd, null, null, 0); +} + +fn handleRecv(ring: *IoUring, cqe: *linux.io_uring_cqe) !void { + const slot_idx = udSlot(cqe.user_data); + const slot = &slots[slot_idx]; + if (cqe.res <= 0) { + _ = try ring.close(ud(.close, slot_idx), slot.fd); + return; + } + const n: u32 = @intCast(cqe.res); + switch (slot.parser.feed(n)) { + .need_more => { + const buf = slot.parser.recv_slot(); + if (buf.len == 0) { + // Buffer full but parser still wants more — close. + _ = try ring.close(ud(.close, slot_idx), slot.fd); + return; + } + _ = try ring.recv(ud(.recv, slot_idx), slot.fd, .{ .buffer = buf }, 0); + }, + .protocol_error => { + _ = try ring.close(ud(.close, slot_idx), slot.fd); + }, + .ready => |req| { + const resp = handlers.handle(req, &ds, &slot.write_buf); + slot.write_len = @intCast(resp.bytes.len); + slot.write_off = 0; + slot.close_after_send = resp.close; + _ = try ring.send(ud(.send, slot_idx), slot.fd, resp.bytes, linux.MSG.NOSIGNAL); + }, + } +} + +fn handleSend(ring: *IoUring, cqe: *linux.io_uring_cqe) !void { + const slot_idx = udSlot(cqe.user_data); + const slot = &slots[slot_idx]; + if (cqe.res <= 0) { + _ = try ring.close(ud(.close, slot_idx), slot.fd); + return; + } + const n: u32 = @intCast(cqe.res); + slot.write_off += n; + if (slot.write_off < slot.write_len) { + // Partial send — submit a fresh send for the unsent tail. We track + // slot.write_len as the absolute byte count of the rendered response + // starting at write_buf[write_send_start]; for the MVP handlers + // always render starting at offset 0 within slot.write_buf so the + // remainder lives at [write_off..write_len]. + const remaining = slot.write_buf[slot.write_off..slot.write_len]; + _ = try ring.send(ud(.send, slot_idx), slot.fd, remaining, linux.MSG.NOSIGNAL); + return; + } + if (slot.close_after_send) { + _ = try ring.close(ud(.close, slot_idx), slot.fd); + return; + } + // Keep-alive: shift pipelined leftover to front of recv buffer and arm + // recv for the next request. + const next_offset = slot.parser.consumed(); + slot.parser.reset(next_offset); + const buf = slot.parser.recv_slot(); + _ = try ring.recv(ud(.recv, slot_idx), slot.fd, .{ .buffer = buf }, 0); +}