diff --git a/.gitignore b/.gitignore index a408536f..a20d45c9 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,10 @@ src/*.html # SSL certificates *.pem + +/guest-profiles +/benchmark-results/** + +# Next.js example app +examples/nextjs-rsc-app/node_modules/ +examples/nextjs-rsc-app/.next/ diff --git a/OPTIMIZATION.md b/OPTIMIZATION.md new file mode 100644 index 00000000..14fd162c --- /dev/null +++ b/OPTIMIZATION.md @@ -0,0 +1,532 @@ +# Trusted Server Optimization Plan + +## Summary + +This document presents a performance analysis and optimization plan for the Trusted Server running on Fastly Compute (WASM). WASM guest profiling reveals that **HTML processing consumes ~76% of per-request CPU** on the publisher proxy path, with the `lol_html` parser alone accounting for ~47%. The optimization strategy focuses on two phases: (1) low-risk code fixes that reduce memory waste and enable streaming, and (2) an architectural shift to `stream_to_client()` that eliminates response buffering and reduces time-to-last-byte. + +--- + +## Profiling Results + +**Methodology**: WASM guest profiling via `fastly compute serve --profile-guest`, 50 requests to `GET /` (publisher proxy to golf.com, 222KB HTML). ~131 samples per request at 50μs intervals. Profiles analyzed in Firefox Profiler. + +### CPU Breakdown — Top Level + +| % CPU | Function | Notes | +|-------|----------|-------| +| ~96% | `trusted_server_fastly::main` | Almost all time is in application code | +| ~90% | `route_request` → `handle_publisher_request_streaming` | Publisher proxy is the hot path | +| **~76%** | **HTML processing pipeline** (`streaming_processor` → `lol_html`) | **Dominant bottleneck** | +| ~~5-8%~~ → **3.3%** | `get_settings()` | ~~Redundant config crate parsing~~ **Fixed** — now uses `toml::from_str` | +| ~5-7% | `handle_publisher_request_streaming` (non-HTML) | Backend send, cookie handling | + +### CPU Breakdown — HTML Processing (~76% total) + +| % CPU | Function | Notes | +|-------|----------|-------| +| **~47%** | `lol_html::parser` state machine | HTML tokenizer/parser — character-by-character parsing | +| ~11% | `create_html_processor` | Building the lol_html rewriter with all handlers | +| ~18% | Processing callbacks | URL rewriting, attribute scanning, output sink handling | + +### CPU Breakdown — Other Components + +| % CPU | Function | Notes | +|-------|----------|-------| +| ~2% | `IntegrationRegistry` | Route lookup + attribute rewriting + initialization | +| ~0.8% | Memory allocation (`RawVec::reserve`) | Buffer growth during processing | +| ~0.5% | Logging (`fern` / `log_fastly`) | Minimal overhead | +| ~0.5% | Synthetic ID generation | HMAC computation | +| ~0.5% | Header extraction | `fastly::http::handle::get_header_values` | + +### Key Takeaways + +1. **The lol_html parser at ~47% cannot be directly optimized** — it's doing its job parsing a 222KB HTML page. The focus should be on reducing unnecessary work around it and enabling streaming so processed chunks reach the client sooner. +2. **`get_settings()` was ~5-8%, now ~3.3% after fix** — `build.rs` already resolves all config at compile time. Replaced `Settings::from_toml()` with direct `toml::from_str()` to eliminate redundant `config` crate pipeline. +3. **Memory allocation at ~0.8%** confirms buffer growth during processing. Fixing gzip and HTML streaming (items 1.1 + 1.2) should reduce this. +4. **Logging is negligible** at ~0.5%, but `log::info!("Settings {settings:?}")` still serializes the entire Settings struct on every request. + +--- + +## Baseline Measurements + +### Viceroy (Local Simulator) + +Measured on `main` branch. Value is in **relative comparison between branches**, not absolute values. + +| Endpoint | P50 | P95 | Req/sec | Notes | +|---|---|---|---|---| +| `GET /static/tsjs=tsjs-unified.min.js` | 1.9 ms | 3.1 ms | 4,672 | Pure WASM, no backend | +| `GET /.well-known/trusted-server.json` | 1.3 ms | 1.4 ms | ~770 | Server-side only | +| `GET /` (publisher proxy) | 400 ms | 595 ms | 21 | Proxies to golf.com, 222KB HTML | +| `POST /auction` | 984 ms | 1,087 ms | 9.3 | Calls Prebid + APS backends | + +- **WASM heap**: 3.0-4.1 MB per request +- **Init overhead**: <2ms (settings parse + orchestrator + registry) +- **No cold start pattern** detected in Viceroy + +### Staging (External) + +Measured externally against staging deployment (golf.com proxy), `main` branch. + +| Endpoint | TTFB | Total | Size | Notes | +|---|---|---|---|---| +| `GET /static/tsjs=tsjs-unified.min.js` | ~204 ms | ~219 ms | 28 KB | No backend; includes client-network + edge path from benchmark vantage | +| `GET /` (publisher proxy, golf.com) | ~234 ms | ~441 ms | 230 KB | Backend + processing | +| `GET /.well-known/trusted-server.json` | ~191 ms | - | - | Returns 500 (needs investigation) | + +**Key insight**: Static JS has ~204ms TTFB with zero backend work **from this specific benchmark vantage point**. That number includes client-to-edge RTT, DNS, TLS/connection state, and edge processing — it is **not** a universal Fastly floor. + +For this dataset, treat static TTFB as an environment baseline and compare deltas: the publisher proxy adds only ~30ms TTFB on top. The larger optimization target is the TTFB→TTLB gap (~207ms here), which streaming can shrink by sending body chunks as they are processed instead of waiting for full buffering. + +--- + +## Key Finding: Streaming to Client IS Possible + +The Fastly Compute SDK provides `Response::stream_to_client()` which returns a `StreamingBody` handle that implements `std::io::Write`. Headers are sent immediately and body chunks stream as they're written. + +```rust +// Current: fully buffered (no bytes reach client until everything is done) +let body = response.take_body(); +let mut output = Vec::new(); +pipeline.process(body, &mut output)?; // blocks until complete +response.set_body(Body::from(output)); // only NOW does client get anything +return Ok(response); + +// Possible: streaming (headers sent immediately, body chunks as processed) +let body = response.take_body(); +let mut streaming = response.stream_to_client(); // headers sent NOW +pipeline.process(body, &mut streaming)?; // each write() → client +streaming.finish()?; +``` + +This changes the optimization strategy — **time-to-last-byte (TTLB) and peak memory CAN be significantly reduced**. TTFB itself is still gated by the Fastly platform floor plus backend response time, but body bytes start reaching the client as soon as the first chunk is processed instead of waiting for the entire response to be buffered. + +### Compatibility with `#[fastly::main]` — use undecorated `main()` (recommended) + +For streaming final responses, the Fastly SDK docs define the intended pattern: + +- `Request::from_client()` docs explicitly state it is incompatible with `#[fastly::main]` and recommend an undecorated `main()` with explicit response sending. +- `Response::send_to_client()` / `Response::stream_to_client()` include the same compatibility guidance. +- `fastly::init()` is public (doc-hidden) and can be called from raw `main()` to initialize the ABI. + +Recommended shape: + +```rust +fn main() -> Result<(), fastly::Error> { + fastly::init(); + let req = fastly::Request::from_client(); + + match route_request(req)? { + Some(resp) => resp.send_to_client(), // non-streaming path + None => {} // streaming path already sent + finished + } + + Ok(()) +} +``` + +**Action item**: Do a focused spike on real Fastly Compute to validate runtime behavior (no double-send panics across mixed routes, proper error behavior for partially streamed responses, and observability expectations). + +Non-streaming endpoints (static JS, discovery, auction) continue returning `Response` normally. Only the publisher proxy path (the hot path) would use streaming. + +--- + +## Implementation Plan + +### Phase 1: Low-Risk Code Optimizations + +Small, safe changes that reduce CPU and memory waste. Ship as one PR, measure before/after. + +#### 1.1 Fix gzip streaming — remove full-body buffering + +**File**: `crates/common/src/streaming_processor.rs` — `process_gzip_to_gzip` + +**Problem**: Reads entire decompressed body into memory via `read_to_end`, despite deflate/brotli paths already using chunk-based `process_through_compression`. + +**Fix**: 3 lines — use `process_through_compression` like deflate/brotli: + +```rust +fn process_gzip_to_gzip(&mut self, input: R, output: W) -> Result<...> { + let decoder = GzDecoder::new(input); + let encoder = GzEncoder::new(output, Compression::default()); + self.process_through_compression(decoder, encoder) +} +``` + +| Impact | LOC | Risk | +|--------|-----|------| +| **High** (most responses are gzip; reduces peak memory) | -15/+3 | Low | + +#### 1.2 Fix `HtmlRewriterAdapter` — enable true streaming + +**File**: `crates/common/src/streaming_processor.rs` — `HtmlRewriterAdapter` + +**Problem**: Accumulates entire HTML document before processing, defeating the streaming pipeline. The comment says this is a `lol_html` limitation — **it's not**. `lol_html::HtmlRewriter` supports incremental `write()` calls and emits output via its `OutputSink` callback per-chunk. + +**Fix**: Create the `HtmlRewriter` eagerly in `new()`, use `Rc>>` via the public `lol_html::OutputSink` trait to share the output buffer: + +```rust +struct RcVecSink(Rc>>); + +impl lol_html::OutputSink for RcVecSink { + fn handle_chunk(&mut self, chunk: &[u8]) { + self.0.borrow_mut().extend_from_slice(chunk); + } +} + +pub struct HtmlRewriterAdapter { + rewriter: Option>, + output: Rc>>, +} + +impl StreamProcessor for HtmlRewriterAdapter { + fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result, io::Error> { + if let Some(rewriter) = &mut self.rewriter { + if !chunk.is_empty() { + rewriter.write(chunk)?; + } + } + if is_last { + if let Some(rewriter) = self.rewriter.take() { + rewriter.end()?; + } + } + // Drain whatever lol_html produced + Ok(std::mem::take(&mut *self.output.borrow_mut())) + } +} +``` + +| Impact | LOC | Risk | +|--------|-----|------| +| **High** (HTML is most common content type; eliminates 222KB+ buffer) | ~30 refactored | Medium — needs test coverage | + +#### 1.3 ~~Eliminate redundant `config` crate parsing in `get_settings()` — ~5-8% CPU~~ DONE (~3.3% post-fix) + +**Files**: `crates/common/src/settings_data.rs`, `crates/common/src/settings.rs` + +**Problem**: Profiling shows `get_settings()` consuming ~5-8% of per-request CPU. The `build.rs` already merges `trusted-server.toml` + all `TRUSTED_SERVER__*` env vars at compile time and writes a fully-resolved TOML file to `target/trusted-server-out.toml`. But at runtime, `get_settings()` calls `Settings::from_toml()`, which re-runs the entire `config` crate pipeline — `Config::builder().add_source(File).add_source(Environment).build().try_deserialize()` — redundantly scanning env vars and merging sources that were already resolved at build time. + +**Root cause**: `settings_data.rs` embeds the build-time-resolved TOML via `include_bytes!`, then hands it to `from_toml()` which treats it as a raw config source and re-layers env vars on top. + +**Fix**: Replace `Settings::from_toml()` with direct `toml::from_str()` in `get_settings()`. The embedded TOML is already fully resolved — no `config` crate needed at runtime. + +```rust +// Before (~5-8% CPU — re-runs config crate pipeline + env var scan) +let settings = Settings::from_toml(toml_str)?; + +// After (near-instant — just TOML deserialization) +let settings: Settings = toml::from_str(toml_str) + .change_context(TrustedServerError::Configuration { + message: "Failed to deserialize embedded config".to_string(), + })?; +``` + +**Alternative — binary serialization for near-zero cost**: Since `build.rs` already has a fully constructed `Settings` struct, it could serialize to `postcard` (a `no_std`-compatible, WASM-safe binary format). Runtime deserialization becomes a memcpy-like operation instead of TOML parsing. Requires adding `postcard` + updating `build.rs` to write binary and `settings_data.rs` to deserialize binary. + +```rust +// build.rs: serialize to binary instead of TOML +let bytes = postcard::to_allocvec(&settings).expect("Failed to serialize"); +fs::write(dest_path, bytes)?; + +// settings_data.rs: near-instant deserialization +let settings: Settings = postcard::from_bytes(SETTINGS_DATA) + .change_context(TrustedServerError::Configuration { ... })?; +``` + +**Recommendation**: Start with the `toml::from_str()` fix (1-line change, no new deps). If profiling still shows meaningful time in TOML parsing, upgrade to `postcard`. + +| Impact | LOC | Risk | +|--------|-----|------| +| **Medium** (~5-8% → ~3.3% CPU, verified) | 1-3 | Low — `build.rs` already resolves everything | + +**Status**: Done. Replaced `Settings::from_toml()` with `toml::from_str()` + explicit `normalize()` + `validate()`. Profiling confirmed: **~5-8% → ~3.3% CPU per request**. + +#### 1.4 ~~Reduce verbose per-request logging — ~0.5% CPU~~ DONE + +**Files**: `crates/fastly/src/main.rs` + +**Problem**: The logger was configured with `max_level(LevelFilter::Debug)`, meaning every `debug!` and above was evaluated — including `log::debug!("Settings {settings:?}")` which serializes the entire Settings struct (~2KB) on every request. + +**Fix**: Tightened `max_level` to `LevelFilter::Info`. Debug macros now short-circuit before evaluating arguments. + +| Impact | LOC | Risk | +|--------|-----|------| +| Low (~0.5% CPU) | ~3 | None | + +**Status**: Done. Logger `max_level` set to `Info` in production. + +#### 1.5 Trivial fixes batch + +| Fix | File | LOC | +|-----|------|-----| +| Const cookie prefix instead of `format!()` | `publisher.rs:207-210` | 2 | +| `mem::take` instead of `clone` for overlap buffer | `streaming_replacer.rs:63` | 1 | +| `eq_ignore_ascii_case` for compression detection | `streaming_processor.rs:47` | 5 | +| `Cow` for string replacements | `streaming_replacer.rs:120-125` | 5-10 | +| Remove base64 roundtrip in token computation | `http_util.rs:286-294` | 10-15 | +| Replace Handlebars with manual interpolation | `synthetic.rs:82-99` | ~20 | +| Cache `origin_host()` result per-request | `settings.rs` | 5-10 | + +--- + +### Phase 2: Streaming Response Architecture + +The high-impact architectural change. Uses Fastly's `stream_to_client()` API to send response headers and body chunks to the client as they're processed, instead of buffering everything. + +#### 2.1 Publisher proxy: `stream_to_client()` integration + +**Files**: `crates/common/src/publisher.rs`, `crates/fastly/src/main.rs` + +**Previous flow** (fully buffered, removed): +``` +req.send() → wait for full response → take_body() + → process_response_streaming() → collects into Vec + → Body::from(output) → return complete Response +``` + +**Current flow** (streaming, implemented in `handle_publisher_request_streaming`): +``` +req.send() → take_body() → set response headers + → stream_to_client() → returns StreamingBody (headers sent immediately) + → pipeline.process(body, &mut streaming_body) → chunks written to client as processed + → streaming_body.finish() +``` + +**Key enablers**: +- `StreamingPipeline.process()` already accepts `W: Write` — `StreamingBody` implements `Write` +- With Phase 1 fixes (gzip streaming + HTML rewriter streaming), the pipeline is already chunk-based +- Non-text responses can use `streaming_body.append(body)` for O(1) pass-through + +**Architecture change in `main.rs`**: The publisher proxy path calls `stream_to_client()` directly instead of returning a `Response`. Other endpoints (static, auction, discovery) continue returning `Response` as before. + +**Error handling for streaming**: Once `stream_to_client()` is called, response headers (including status 200) are already sent. If processing fails mid-stream: +- We cannot change the status code — the client already received 200 +- The `StreamingBody` will be aborted on drop (client sees incomplete response) +- We should log the error server-side for debugging +- This is the same trade-off every streaming proxy makes (nginx, Cloudflare Workers, etc.) + +To mitigate: validate backend response status and content-type **before** calling `stream_to_client()`. If the backend returns an error, fall back to the buffered path to return a proper error response. + +```rust +// Fetch from backend (blocks for full response including headers) +let mut backend_resp = req.send(&backend)?; + +// Check backend status BEFORE committing to streaming +if !backend_resp.get_status().is_success() || !should_process_content_type(&backend_resp) { + // Buffered path — can return proper error/pass-through response + return Ok(backend_resp); +} + +// Commit to streaming — headers sent to client NOW +let backend_body = backend_resp.take_body(); +let mut client_body = backend_resp.stream_to_client(); + +// Process chunks — errors logged but response is already in flight +match pipeline.process(backend_body, &mut client_body) { + Ok(()) => client_body.finish()?, + Err(e) => { + log::error!("Streaming processing failed: {:?}", e); + // StreamingBody dropped → client sees truncated response + // This is the best we can do after headers are sent + } +} +``` + +| Impact | LOC | Risk | +|--------|-----|------| +| **High** — reduces time-to-last-byte and peak memory for all proxied pages | ~80-120 | Medium — error handling requires careful design | + +**Status**: Done. Implemented in `handle_publisher_request_streaming`. Legacy buffered path removed. External monitoring confirmed 70-75% TTFB reduction. + +#### 2.2 Concurrent origin fetch + auction (future) + +**Not applicable for golf.com** (no on-page auction), but for publishers with auction. + +The idea: use `req.send_async()` to launch the origin fetch concurrently with auction backend calls (which already use `fastly::http::request::select()` internally). When the origin response arrives, start streaming it to the client via `stream_to_client()`. When the lol_html rewriter reaches the ad injection point in the HTML, check if auction results are available. + +This would overlap origin fetch time with auction execution, so the browser starts receiving `` content (CSS, fonts) while the auction is still running. + +**Note**: This requires significant refactoring of the auction orchestrator and HTML processor to support async injection. + +| Impact | LOC | Risk | +|--------|-----|------| +| **Very High** for auction pages — browser starts loading ~400ms earlier | ~150-200 | High — complex coordination | + +--- + +### Phase 3: Measure and Validate + +After implementing Phases 1-2: + +1. Deploy to staging +2. Run `./scripts/benchmark.sh` against staging for external TTFB/TTLB +3. Run `./scripts/profile.sh` locally for flame graph comparison +4. Compare external TTFB and time-to-last-byte before vs after +5. Check Fastly dashboard for memory/compute metrics +6. If improvement is marginal, don't ship the streaming architecture (Phase 2) + +**Success criteria**: +- Peak memory per request reduced by 30%+ (measurable via Fastly logs) +- Time-to-last-byte reduced for large HTML pages +- No regression on static endpoints or auction +- Code complexity is justified by measured improvement + +**Current Status:** The streaming architecture changes (Phases 1 and 2.1) have been implemented on the `feat/optimize-html-streaming` branch. The legacy buffered path (`handle_publisher_request` + `process_response_streaming`) has been removed — all publisher proxy traffic now uses `handle_publisher_request_streaming`. Local testing with `curl` demonstrated significant TTFB improvements (from ~0.716s buffered to ~0.256s streaming). External synthetic monitoring confirmed 70-75% Wait Time (TTFB) reduction on staging. **External load testing on the staging edge network is currently due/pending.** + +--- + +## Optimization Summary Table + +| # | Optimization | Measured CPU | Impact | LOC | Risk | Phase | +|---|---|---|---|---|---|---| +| **1.1** | Gzip streaming fix | Part of ~76% HTML pipeline | **High** (memory) | -15/+3 | Low | 1 | +| **1.2** | HTML rewriter streaming | Part of ~76% HTML pipeline | **High** (memory) | ~30 | Medium | 1 | +| **1.3** | ~~Eliminate redundant `config` crate~~ | ~~5-8%~~ → **3.3%** | **Done** | 1-3 | Low | 1 | +| **1.4** | ~~Reduce verbose logging~~ | ~~~0.5%~~ | **Done** | ~3 | None | 1 | +| **1.5** | Trivial fixes batch | <1% combined | Low | ~50 | None | 1 | +| **2.1** | ~~`stream_to_client()` integration~~ | N/A (architectural) | **Done** (70-75% TTFB reduction) | ~80-120 | Medium | 2 | +| **2.2** | Concurrent origin + auction | N/A (architectural) | **Very High** | ~150-200 | High | 2 (future) | + +--- + +## Architecture: Previous vs Current + +### Previous (fully buffered, removed) + +``` +Client → Fastly Edge → [WASM starts] + → init (settings, orchestrator, registry) ~1ms + → req.send(backend) blocks for full response + → response.take_body() full body in memory + → GzDecoder.read_to_end() full decompressed in memory + → HtmlRewriterAdapter accumulates all input full HTML in memory + → lol_html processes entire document full output in memory + → GzEncoder.write_all() full recompressed in memory + → Body::from(output) Response constructed + → return Response NOW client gets first byte +``` + +**Memory**: compressed + decompressed + processed + recompressed = ~4x response size +**TTLB**: cannot send any bytes until all processing is complete + +### Current (streaming, implemented) + +``` +Client → Fastly Edge → [WASM starts] + → init (settings, orchestrator, registry) ~1ms + → req.send(backend) blocks for full response + → response.take_body() body available as Read stream + → validate status, set response headers + → stream_to_client() headers sent to client NOW + → GzDecoder.read(8KB chunk) 8KB decompressed + → HtmlRewriter.write(chunk) output emitted via callback + → GzEncoder.write(processed) compressed chunk + → StreamingBody.write(chunk) chunk sent to client + → ... repeat for each chunk ... + → StreamingBody.finish() done +``` + +**Memory**: ~8KB chunk buffer + lol_html internal state (significantly less than 4x response size) +**TTLB**: client receives first body bytes after first processed chunk, instead of waiting for all processing to complete + +--- + +## Benchmarking Setup + +### Prerequisites + +```bash +brew install hey # HTTP load testing tool (auto-installed by benchmark.sh) +``` + +### Available Modes + +```bash +./scripts/benchmark.sh # Full benchmark suite +./scripts/benchmark.sh --quick # Quick smoke test +./scripts/benchmark.sh --ttfb # TTFB analysis only +./scripts/benchmark.sh --load-test # Load test only +./scripts/benchmark.sh --cold-start # Cold start analysis +./scripts/benchmark.sh --save baseline # Save results to file +./scripts/benchmark.sh --compare baseline # Compare against saved results +``` + +### WASM Guest Profiling (Flame Graphs) + +`fastly compute serve --profile-guest` samples the WASM call stack every 50μs and writes a Firefox Profiler-compatible JSON on exit. This shows exactly which Rust functions consume CPU time — compression, HTML rewriting, string operations, init, etc. + +```bash +./scripts/profile.sh # Profile GET / (publisher proxy) +./scripts/profile.sh --endpoint /auction \ + --method POST --body '{"adUnits":[]}' # Profile specific endpoint +./scripts/profile.sh --requests 50 # More samples for stable flame graph +./scripts/profile.sh --no-build # Skip rebuild +./scripts/profile.sh --open # Auto-open Firefox Profiler (macOS) + +# View: drag output file onto https://profiler.firefox.com/ +``` + +The script builds, starts the profiling server, fires requests, stops the server, and saves the profile to `benchmark-results/profiles/`. + +### What the Tools Measure + +| Tool | What it tells you | +|---|---| +| `benchmark.sh` — TTFB analysis | 20 sequential requests — detects cold start patterns | +| `benchmark.sh` — Cold start | First vs subsequent request latency | +| `benchmark.sh` — Endpoint latency | Per-endpoint timing breakdown (DNS, connect, TTFB, total) | +| `benchmark.sh` — Load test (hey) | Throughput (req/sec), latency distribution (P50/P95/P99) | +| `profile.sh` | Per-function CPU time inside WASM — flame graph via `--profile-guest` | + +**Use `profile.sh` first** to identify which functions are bottlenecks, then use `benchmark.sh` to measure the impact of fixes on external timing. + +### What These Tools Do NOT Measure + +- Real Fastly edge performance (Viceroy is a simulator) +- WASM cold start on actual Fastly infrastructure +- Production TLS handshake overhead +- Memory usage (use Fastly dashboard or Viceroy logs) + +--- + +## How to Use This Document + +**For any optimization work:** + +1. Run `./scripts/benchmark.sh --save baseline` on `main` +2. Make your change on a branch +3. Rebuild: `fastly compute build` +4. Run `./scripts/benchmark.sh --save branch-name` +5. Compare: `diff benchmark-results/baseline.txt benchmark-results/branch-name.txt` +6. Run `./scripts/profile.sh` for flame graph comparison +7. If the numbers don't improve meaningfully, don't ship it + +--- + +## Notes for Team + +### Teammate's `streaming_processor.rs` Changes + +A teammate has prepared changes to `streaming_processor.rs` that address items 1.1 and 1.2: + +- **Gzip fix**: `process_gzip_to_gzip` now uses `process_through_compression` (3-line change) +- **HTML rewriter fix**: `HtmlRewriterAdapter` rewritten to use `lol_html::OutputSink` trait with `Rc>>` for incremental streaming + +**Review notes on the HTML rewriter change**: +- `lol_html::OutputSink` is a public trait (verified in lol_html 2.7.1) +- The `Rc` pattern is necessary because `HtmlRewriter::new()` takes ownership of the sink, but we need to read output in `process_chunk()` +- `Option` with `.take()` is correct — `end()` consumes self +- The adapter is no longer reusable after `end()` — one per document, which matches actual usage +- Tests correctly updated to collect output across all chunks + +**Correctness issue — must fix in same PR**: `process_through_compression` uses `drop(encoder)` for finalization. For `GzEncoder`, `Drop` calls `finish()` internally but **silently ignores errors**. The gzip trailer contains a CRC32 checksum — if `finish()` fails, corrupted gzip responses are served to clients without any error being reported. This is a pre-existing issue (deflate/brotli have the same `drop()` pattern) but it **must be fixed** when gzip moves to this code path, since gzip is the most common encoding. + +Fix: change `process_through_compression` to accept an optional finalization closure, or add a separate `process_gzip_to_gzip` that calls `encoder.finish()` explicitly after `process_through_compression`-style chunk loop. + +### Decisions Needed + +1. **Raw `main()` migration spike** — Validate end-to-end behavior on Fastly Compute when using undecorated `main()` + `Request::from_client()` and mixing buffered + streaming routes in one service. +2. **Phase 1 vs Phase 2 priority** — Phase 1 (code fixes) is low risk and can ship independently. Phase 2 (streaming architecture) is higher impact and should proceed after decision #1 confirms runtime behavior. +3. **Concurrent auction + origin (2.2)** — Not applicable for golf.com. Defer to a separate ticket? +4. **GzEncoder `finish()` correctness** — Fix the `drop(encoder)` error swallowing in `process_through_compression`, or accept the risk? diff --git a/crates/common/src/html_processor.rs b/crates/common/src/html_processor.rs index a3b3c9bf..8829cb6d 100644 --- a/crates/common/src/html_processor.rs +++ b/crates/common/src/html_processor.rs @@ -1,6 +1,32 @@ //! Simplified HTML processor that combines URL replacement and integration injection //! -//! This module provides a `StreamProcessor` implementation for HTML content. +//! This module provides a [`StreamProcessor`] implementation for HTML content. +//! +//! ## Streaming Behavior with Post-Processing +//! +//! When post-processors are registered (e.g., Next.js RSC URL rewriting), the processor +//! uses **lazy accumulation** to optimize streaming: +//! +//! 1. **Initial streaming**: Chunks are streamed immediately until RSC content is detected +//! 2. **Accumulation trigger**: When RSC scripts or placeholders are found, buffering begins +//! 3. **Post-processing**: At document end, accumulated HTML is processed to rewrite RSC payloads +//! +//! ### Streaming Ratios +//! +//! Observed streaming performance: +//! - **Non-RSC pages**: 96%+ streaming (minimal buffering) +//! - **RSC pages**: 28-37% streaming (depends on where RSC scripts appear in HTML) +//! - **Before optimization**: 0% streaming (everything buffered) +//! +//! The streaming ratio for RSC pages is limited by Next.js's architecture: RSC scripts +//! appear at the end of the HTML and make up 60-72% of the document. Bytes already +//! streamed before RSC detection cannot be recovered, so the post-processor's fallback +//! re-parse path handles RSC scripts in the already-streamed prefix. +//! +//! ## Memory Safety +//! +//! Accumulated output is limited to [`MAX_ACCUMULATED_HTML_BYTES`] (10MB) to prevent +//! unbounded memory growth from malicious or extremely large documents. use std::cell::Cell; use std::io; use std::rc::Rc; @@ -8,6 +34,10 @@ use std::sync::Arc; use lol_html::{element, html_content::ContentType, text, Settings as RewriterSettings}; +/// Maximum size for accumulated HTML output when post-processing is required. +/// This prevents unbounded memory growth from malicious or extremely large documents. +const MAX_ACCUMULATED_HTML_BYTES: usize = 10 * 1024 * 1024; // 10 MB + use crate::integrations::{ AttributeRewriteOutcome, IntegrationAttributeContext, IntegrationDocumentState, IntegrationHtmlContext, IntegrationHtmlPostProcessor, IntegrationRegistry, @@ -20,21 +50,130 @@ use crate::tsjs; struct HtmlWithPostProcessing { inner: HtmlRewriterAdapter, post_processors: Vec>, + /// Accumulated output from intermediate chunks. Only populated once we + /// detect that post-processing will be needed (e.g. an RSC placeholder was + /// inserted or a fragmented RSC script was observed). Before that trigger, + /// chunks stream through immediately. + accumulated_output: Vec, + /// Number of bytes already streamed to the caller before accumulation began. + /// When accumulation triggers, we cannot recover those bytes, so we must + /// fall back to the post-processor's re-parse path for any RSC scripts in + /// the already-streamed prefix. + streamed_bytes: usize, + /// Whether we are accumulating output for post-processing. + accumulating: bool, origin_host: String, request_host: String, request_scheme: String, document_state: IntegrationDocumentState, } +impl HtmlWithPostProcessing { + /// Check whether we need to start accumulating output for post-processing. + /// + /// Processors may inspect [`IntegrationDocumentState`] to lazily trigger + /// accumulation once they detect content that requires whole-document + /// post-processing. + fn needs_accumulation(&self) -> bool { + self.post_processors + .iter() + .any(|processor| processor.needs_accumulation(&self.document_state)) + } +} + impl StreamProcessor for HtmlWithPostProcessing { fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result, io::Error> { let output = self.inner.process_chunk(chunk, is_last)?; - if !is_last || output.is_empty() || self.post_processors.is_empty() { + + // No post-processors → stream through immediately (fast path). + if self.post_processors.is_empty() { return Ok(output); } - let Ok(output_str) = std::str::from_utf8(&output) else { - return Ok(output); + // If we're not yet accumulating, check if we need to start. + // This allows non-RSC pages with post-processors registered to stream + // through without buffering. + if !self.accumulating && self.needs_accumulation() { + self.accumulating = true; + log::debug!( + "HTML post-processing: switching to accumulation mode, streamed_bytes={}", + self.streamed_bytes + ); + } + + if !self.accumulating { + if !is_last { + self.streamed_bytes += output.len(); + return Ok(output); + } + + // Final chunk, never accumulated — check if post-processing is needed. + // This handles the rare case where RSC scripts appear only in the final + // chunk, or where fragmented scripts need the fallback re-parse path. + let ctx = IntegrationHtmlContext { + request_host: &self.request_host, + request_scheme: &self.request_scheme, + origin_host: &self.origin_host, + document_state: &self.document_state, + }; + + let Ok(output_str) = std::str::from_utf8(&output) else { + return Ok(output); + }; + + if !self + .post_processors + .iter() + .any(|p| p.should_process(output_str, &ctx)) + { + return Ok(output); + } + + // Post-processing needed on just the final chunk. + // This is only correct if no earlier chunks contained RSC content + // (which would mean they were already streamed without rewriting). + // In practice, this handles pages where RSC scripts are small + // enough to fit in the final chunk. + let mut html = String::from_utf8(output).map_err(|e| { + io::Error::other(format!( + "HTML post-processing expected valid UTF-8 output: {e}" + )) + })?; + + for processor in &self.post_processors { + if processor.should_process(&html, &ctx) { + processor.post_process(&mut html, &ctx); + } + } + + return Ok(html.into_bytes()); + } + + // Accumulating mode: buffer output for end-of-document post-processing. + // Check size limit to prevent unbounded memory growth. + if self.accumulated_output.len() + output.len() > MAX_ACCUMULATED_HTML_BYTES { + return Err(io::Error::other(format!( + "HTML post-processing: accumulated output would exceed {}MB size limit \ + (current: {} bytes, chunk: {} bytes)", + MAX_ACCUMULATED_HTML_BYTES / (1024 * 1024), + self.accumulated_output.len(), + output.len() + ))); + } + + self.accumulated_output.extend_from_slice(&output); + if !is_last { + return Ok(Vec::new()); + } + + // All chunks received — run post-processing on the accumulated output. + let full_output = std::mem::take(&mut self.accumulated_output); + if full_output.is_empty() { + return Ok(full_output); + } + + let Ok(output_str) = std::str::from_utf8(&full_output) else { + return Ok(full_output); }; let ctx = IntegrationHtmlContext { @@ -50,10 +189,10 @@ impl StreamProcessor for HtmlWithPostProcessing { .iter() .any(|p| p.should_process(output_str, &ctx)) { - return Ok(output); + return Ok(full_output); } - let mut html = String::from_utf8(output).map_err(|e| { + let mut html = String::from_utf8(full_output).map_err(|e| { io::Error::other(format!( "HTML post-processing expected valid UTF-8 output: {e}" )) @@ -68,9 +207,10 @@ impl StreamProcessor for HtmlWithPostProcessing { if changed { log::debug!( - "HTML post-processing complete: origin_host={}, output_len={}", + "HTML post-processing complete: origin_host={}, output_len={}, streamed_prefix_bytes={}", self.origin_host, - html.len() + html.len(), + self.streamed_bytes, ); } @@ -79,6 +219,9 @@ impl StreamProcessor for HtmlWithPostProcessing { fn reset(&mut self) { self.inner.reset(); + self.accumulated_output.clear(); + self.streamed_bytes = 0; + self.accumulating = false; self.document_state.clear(); } } @@ -464,6 +607,9 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso HtmlWithPostProcessing { inner: HtmlRewriterAdapter::new(rewriter_settings), post_processors, + accumulated_output: Vec::new(), + streamed_bytes: 0, + accumulating: false, origin_host: config.origin_host, request_host: config.request_host, request_scheme: config.request_scheme, @@ -988,4 +1134,304 @@ mod tests { .collect::() ); } + + /// E2E test: verifies that RSC pages with Next.js post-processors produce correct output + /// when processed through the full streaming pipeline, and quantifies the streaming + /// behavior (how much output is emitted before `is_last`). + #[test] + fn rsc_html_streams_correctly_with_post_processors() { + use crate::streaming_processor::StreamProcessor; + + // Simulate a Next.js App Router page with multiple RSC scripts, including + // a cross-script T-chunk (header in script 1, content continues in script 2). + let html = concat!( + "Next.js RSC Page", + "", + "", + "
Hello World
", + // RSC script 1: contains a T-chunk header that spans into script 2 + r#""#, + // RSC script 2: continuation of the T-chunk from script 1 + r#""#, + // Non-RSC script that must be preserved + r#""#, + "About", + "", + ); + + let mut settings = create_test_settings(); + settings + .integrations + .insert_config( + "nextjs", + &json!({ + "enabled": true, + "rewrite_attributes": ["href", "link", "url"], + }), + ) + .expect("should update nextjs config"); + let registry = + IntegrationRegistry::new(&settings).expect("should create integration registry"); + + // Verify post-processors ARE registered (this is the key precondition) + let post_processors = registry.html_post_processors(); + assert!( + !post_processors.is_empty(), + "Next.js post-processors should be registered when enabled" + ); + + let config = HtmlProcessorConfig::from_settings( + &settings, + ®istry, + "origin.example.com", + "test.example.com", + "https", + ); + let mut processor = create_html_processor(config); + + // Process in chunks to simulate streaming, tracking per-chunk output + let bytes = html.as_bytes(); + let chunk_size = 64; + let chunks: Vec<&[u8]> = bytes.chunks(chunk_size).collect(); + let last_idx = chunks.len().saturating_sub(1); + + let mut intermediate_bytes = 0usize; + let mut final_bytes = 0usize; + let mut full_output = Vec::new(); + + for (i, chunk) in chunks.iter().enumerate() { + let is_last = i == last_idx; + let result = processor + .process_chunk(chunk, is_last) + .expect("should process chunk"); + + if is_last { + final_bytes = result.len(); + } else { + intermediate_bytes += result.len(); + } + full_output.extend_from_slice(&result); + } + + let output = String::from_utf8(full_output).expect("output should be valid UTF-8"); + + // --- Correctness assertions --- + + // 1. URL rewriting in HTML attributes should work + assert!( + output.contains("test.example.com/about"), + "HTML href URLs should be rewritten. Got: {output}" + ); + assert!( + output.contains("test.example.com/styles.css"), + "Link href URLs should be rewritten. Got: {output}" + ); + + // 2. RSC payloads should be rewritten via post-processing + assert!( + output.contains("test.example.com/page"), + "RSC payload URLs should be rewritten. Got: {output}" + ); + + // 3. No placeholder markers should leak into the output + assert!( + !output.contains("__ts_rsc_payload_"), + "RSC placeholder markers should not appear in final output. Got: {output}" + ); + + // 4. Non-RSC scripts should be preserved + assert!( + output.contains("analytics ready"), + "Non-RSC scripts should be preserved. Got: {output}" + ); + + // 5. HTML structure should be intact + assert!( + output.contains("") || output.contains(" 0 { + intermediate_bytes as f64 / (intermediate_bytes + final_bytes) as f64 * 100.0 + } else { + 0.0 + } + ); + } + + /// E2E test: verifies that HTML pages WITHOUT RSC (no post-processors active) + /// stream incrementally — chunks are emitted before `is_last`. + #[test] + fn non_rsc_html_streams_incrementally_without_post_processors() { + use crate::streaming_processor::StreamProcessor; + + let html = concat!( + "Regular Page", + "", + "", + "
", + "Page 1", + "Page 2", + "Page 3", + "
", + "", + ); + + // No Next.js integration — post_processors will be empty + let config = create_test_config(); + let mut processor = create_html_processor(config); + + let bytes = html.as_bytes(); + let chunk_size = 64; + let chunks: Vec<&[u8]> = bytes.chunks(chunk_size).collect(); + let last_idx = chunks.len().saturating_sub(1); + + let mut intermediate_bytes = 0usize; + let mut final_bytes = 0usize; + let mut full_output = Vec::new(); + + for (i, chunk) in chunks.iter().enumerate() { + let is_last = i == last_idx; + let result = processor + .process_chunk(chunk, is_last) + .expect("should process chunk"); + + if is_last { + final_bytes = result.len(); + } else { + intermediate_bytes += result.len(); + } + full_output.extend_from_slice(&result); + } + + let output = String::from_utf8(full_output).expect("output should be valid UTF-8"); + + // Correctness: URLs should be rewritten + assert!( + output.contains("test.example.com/page1"), + "URLs should be rewritten. Got: {output}" + ); + assert!( + !output.contains("origin.example.com"), + "No origin URLs should remain. Got: {output}" + ); + + // Streaming: intermediate chunks SHOULD produce output (no post-processors) + assert!( + intermediate_bytes > 0, + "Without post-processors, intermediate chunks should emit output (got 0 bytes). \ + This confirms true streaming. Final bytes: {final_bytes}" + ); + + println!( + "Streaming behavior without post-processors: intermediate_bytes={}, final_bytes={}, total={}", + intermediate_bytes, + final_bytes, + intermediate_bytes + final_bytes + ); + println!( + " Streaming ratio: {:.1}% of bytes emitted before is_last", + intermediate_bytes as f64 / (intermediate_bytes + final_bytes) as f64 * 100.0 + ); + } + + /// E2E test: RSC Flight responses (`text/x-component`) stream correctly + /// through the pipeline with URL rewriting and T-row length recalculation. + #[test] + fn rsc_flight_response_streams_with_url_rewriting() { + use crate::rsc_flight::RscFlightUrlRewriter; + use crate::streaming_processor::StreamProcessor; + + // Simulate a Flight response with mixed row types + let t_content = r#"{"url":"https://origin.example.com/dashboard"}"#; + let flight_response = format!( + "0:[\"https://origin.example.com/page\"]\n\ + 1:T{:x},{}\ + 2:[\"ok\"]\n", + t_content.len(), + t_content, + ); + + let mut processor = RscFlightUrlRewriter::new( + "origin.example.com", + "https://origin.example.com", + "test.example.com", + "https", + ); + + // Process in small chunks to exercise cross-chunk state handling + let bytes = flight_response.as_bytes(); + let chunk_size = 11; // intentionally misaligned with row boundaries + let chunks: Vec<&[u8]> = bytes.chunks(chunk_size).collect(); + let last_idx = chunks.len().saturating_sub(1); + + let mut intermediate_bytes = 0usize; + let mut full_output = Vec::new(); + + for (i, chunk) in chunks.iter().enumerate() { + let is_last = i == last_idx; + let result = processor + .process_chunk(chunk, is_last) + .expect("should process flight chunk"); + + if !is_last { + intermediate_bytes += result.len(); + } + full_output.extend_from_slice(&result); + } + + let output = String::from_utf8(full_output).expect("output should be valid UTF-8"); + + // URLs should be rewritten + assert!( + output.contains("test.example.com/page"), + "Newline row URLs should be rewritten. Got: {output}" + ); + assert!( + output.contains("test.example.com/dashboard"), + "T-row URLs should be rewritten. Got: {output}" + ); + + // T-row length should be recalculated + let rewritten_t_content = r#"{"url":"https://test.example.com/dashboard"}"#; + let expected_len_hex = format!("{:x}", rewritten_t_content.len()); + assert!( + output.contains(&format!(":T{expected_len_hex},")), + "T-row length should be recalculated. Got: {output}" + ); + + // No origin URLs should remain + assert!( + !output.contains("origin.example.com"), + "No origin URLs should remain. Got: {output}" + ); + + // Flight rewriter should stream incrementally + assert!( + intermediate_bytes > 0, + "RSC Flight rewriter should emit output for intermediate chunks (got 0 bytes)" + ); + + // Trailing row should be preserved + assert!( + output.contains("2:[\"ok\"]\n"), + "Trailing rows should be preserved. Got: {output}" + ); + } } diff --git a/crates/common/src/integrations/nextjs/fixtures/app-router-large.html b/crates/common/src/integrations/nextjs/fixtures/app-router-large.html new file mode 100644 index 00000000..944609d2 --- /dev/null +++ b/crates/common/src/integrations/nextjs/fixtures/app-router-large.html @@ -0,0 +1,37 @@ + + + + + + Blog Post - Next.js RSC Test App + + + + + +
+
+

Blog Post: hello-world

+
Published on the blog
+
Hero image for hello-world
+

Paragraph 1: This content references https://origin.example.com/article/1 and includes links to https://origin.example.com/category/tech.

+

Paragraph 2: This content references https://origin.example.com/article/2 and includes links to https://origin.example.com/category/tech.

+

Paragraph 3: This content references https://origin.example.com/article/3 and includes links to https://origin.example.com/category/tech.

+

Paragraph 4: This content references https://origin.example.com/article/4 and includes links to https://origin.example.com/category/tech.

+

Paragraph 5: This content references https://origin.example.com/article/5 and includes links to https://origin.example.com/category/tech.

+ + +
+
+ + + + + + + + + + + + diff --git a/crates/common/src/integrations/nextjs/fixtures/app-router-simple.html b/crates/common/src/integrations/nextjs/fixtures/app-router-simple.html new file mode 100644 index 00000000..2d631860 --- /dev/null +++ b/crates/common/src/integrations/nextjs/fixtures/app-router-simple.html @@ -0,0 +1,34 @@ + + + + + + Next.js RSC Test App + + + + + + + +
+
+

Welcome to the Test App

+

Visit our getting started guide.

+ + Hero +
+
+ + + + + + + + + diff --git a/crates/common/src/integrations/nextjs/fixtures/app-router-tchunk.html b/crates/common/src/integrations/nextjs/fixtures/app-router-tchunk.html new file mode 100644 index 00000000..981b131c --- /dev/null +++ b/crates/common/src/integrations/nextjs/fixtures/app-router-tchunk.html @@ -0,0 +1,29 @@ + + + + + + About Us - Next.js RSC Test App + + + + +
+
+

About Us

+

We are building at origin.example.com.

+
+

Our Team

+
Alice Johnson

Alice Johnson

Engineering Lead

+
Bob Smith

Bob Smith

Product Manager

+
+
+
+ + + + + + + + diff --git a/crates/common/src/integrations/nextjs/fixtures/non-rsc-page.html b/crates/common/src/integrations/nextjs/fixtures/non-rsc-page.html new file mode 100644 index 00000000..8997cd86 --- /dev/null +++ b/crates/common/src/integrations/nextjs/fixtures/non-rsc-page.html @@ -0,0 +1,38 @@ + + + + + + Static Page - No RSC + + + + + +
+

Welcome

+

This is a static page without any React Server Components.

+

It contains regular HTML with URLs that should be rewritten:

+ + Banner +
+ + +
+
+ + + + + diff --git a/crates/common/src/integrations/nextjs/fixtures/real-nextjs-about.html b/crates/common/src/integrations/nextjs/fixtures/real-nextjs-about.html new file mode 100644 index 00000000..9b30d187 --- /dev/null +++ b/crates/common/src/integrations/nextjs/fixtures/real-nextjs-about.html @@ -0,0 +1 @@ +Next.js RSC Test App

About Us

We are building at origin.example.com.

Our Team

Alice Johnson

Alice Johnson

Engineering Lead

Bob Smith

Bob Smith

Product Manager

Resources

\ No newline at end of file diff --git a/crates/common/src/integrations/nextjs/fixtures/real-nextjs-blog.html b/crates/common/src/integrations/nextjs/fixtures/real-nextjs-blog.html new file mode 100644 index 00000000..607d894f --- /dev/null +++ b/crates/common/src/integrations/nextjs/fixtures/real-nextjs-blog.html @@ -0,0 +1 @@ +Next.js RSC Test App

Blog Post: hello-world

Published on the blog
Hero image for hello-world

Paragraph 1: This content references https://origin.example.com/article/1 and includes links to https://origin.example.com/category/tech and https://origin.example.com/author/staff. For more information, visit https://origin.example.com/resources/guide-1.

Paragraph 2: This content references https://origin.example.com/article/2 and includes links to https://origin.example.com/category/tech and https://origin.example.com/author/staff. For more information, visit https://origin.example.com/resources/guide-2.

Paragraph 3: This content references https://origin.example.com/article/3 and includes links to https://origin.example.com/category/tech and https://origin.example.com/author/staff. For more information, visit https://origin.example.com/resources/guide-3.

Paragraph 4: This content references https://origin.example.com/article/4 and includes links to https://origin.example.com/category/tech and https://origin.example.com/author/staff. For more information, visit https://origin.example.com/resources/guide-4.

Paragraph 5: This content references https://origin.example.com/article/5 and includes links to https://origin.example.com/category/tech and https://origin.example.com/author/staff. For more information, visit https://origin.example.com/resources/guide-5.

Paragraph 6: This content references https://origin.example.com/article/6 and includes links to https://origin.example.com/category/tech and https://origin.example.com/author/staff. For more information, visit https://origin.example.com/resources/guide-6.

Paragraph 7: This content references https://origin.example.com/article/7 and includes links to https://origin.example.com/category/tech and https://origin.example.com/author/staff. For more information, visit https://origin.example.com/resources/guide-7.

Paragraph 8: This content references https://origin.example.com/article/8 and includes links to https://origin.example.com/category/tech and https://origin.example.com/author/staff. For more information, visit https://origin.example.com/resources/guide-8.

Paragraph 9: This content references https://origin.example.com/article/9 and includes links to https://origin.example.com/category/tech and https://origin.example.com/author/staff. For more information, visit https://origin.example.com/resources/guide-9.

Paragraph 10: This content references https://origin.example.com/article/10 and includes links to https://origin.example.com/category/tech and https://origin.example.com/author/staff. For more information, visit https://origin.example.com/resources/guide-10.

Paragraph 11: This content references https://origin.example.com/article/11 and includes links to https://origin.example.com/category/tech and https://origin.example.com/author/staff. For more information, visit https://origin.example.com/resources/guide-11.

Paragraph 12: This content references https://origin.example.com/article/12 and includes links to https://origin.example.com/category/tech and https://origin.example.com/author/staff. For more information, visit https://origin.example.com/resources/guide-12.

Paragraph 13: This content references https://origin.example.com/article/13 and includes links to https://origin.example.com/category/tech and https://origin.example.com/author/staff. For more information, visit https://origin.example.com/resources/guide-13.

Paragraph 14: This content references https://origin.example.com/article/14 and includes links to https://origin.example.com/category/tech and https://origin.example.com/author/staff. For more information, visit https://origin.example.com/resources/guide-14.

Paragraph 15: This content references https://origin.example.com/article/15 and includes links to https://origin.example.com/category/tech and https://origin.example.com/author/staff. For more information, visit https://origin.example.com/resources/guide-15.

Paragraph 16: This content references https://origin.example.com/article/16 and includes links to https://origin.example.com/category/tech and https://origin.example.com/author/staff. For more information, visit https://origin.example.com/resources/guide-16.

Paragraph 17: This content references https://origin.example.com/article/17 and includes links to https://origin.example.com/category/tech and https://origin.example.com/author/staff. For more information, visit https://origin.example.com/resources/guide-17.

Paragraph 18: This content references https://origin.example.com/article/18 and includes links to https://origin.example.com/category/tech and https://origin.example.com/author/staff. For more information, visit https://origin.example.com/resources/guide-18.

Paragraph 19: This content references https://origin.example.com/article/19 and includes links to https://origin.example.com/category/tech and https://origin.example.com/author/staff. For more information, visit https://origin.example.com/resources/guide-19.

Paragraph 20: This content references https://origin.example.com/article/20 and includes links to https://origin.example.com/category/tech and https://origin.example.com/author/staff. For more information, visit https://origin.example.com/resources/guide-20.

\ No newline at end of file diff --git a/crates/common/src/integrations/nextjs/fixtures/real-nextjs-home.html b/crates/common/src/integrations/nextjs/fixtures/real-nextjs-home.html new file mode 100644 index 00000000..9782dddc --- /dev/null +++ b/crates/common/src/integrations/nextjs/fixtures/real-nextjs-home.html @@ -0,0 +1 @@ +Next.js RSC Test App

Welcome to the Test App

Visit our getting started guide.

Hero
\ No newline at end of file diff --git a/crates/common/src/integrations/nextjs/html_post_process.rs b/crates/common/src/integrations/nextjs/html_post_process.rs index c85bf517..e9ea7af4 100644 --- a/crates/common/src/integrations/nextjs/html_post_process.rs +++ b/crates/common/src/integrations/nextjs/html_post_process.rs @@ -9,7 +9,8 @@ use crate::integrations::{IntegrationHtmlContext, IntegrationHtmlPostProcessor}; use super::rsc::rewrite_rsc_scripts_combined_with_limit; use super::rsc_placeholders::{ - NextJsRscPostProcessState, RSC_PAYLOAD_PLACEHOLDER_PREFIX, RSC_PAYLOAD_PLACEHOLDER_SUFFIX, + needs_post_processing, NextJsRscPostProcessState, RSC_PAYLOAD_PLACEHOLDER_PREFIX, + RSC_PAYLOAD_PLACEHOLDER_SUFFIX, }; use super::shared::find_rsc_push_payload_range; use super::{NextJsIntegrationConfig, NEXTJS_INTEGRATION_ID}; @@ -29,6 +30,15 @@ impl IntegrationHtmlPostProcessor for NextJsHtmlPostProcessor { NEXTJS_INTEGRATION_ID } + fn needs_accumulation( + &self, + document_state: &crate::integrations::IntegrationDocumentState, + ) -> bool { + self.config.enabled + && !self.config.rewrite_attributes.is_empty() + && needs_post_processing(document_state) + } + fn should_process(&self, html: &str, ctx: &IntegrationHtmlContext<'_>) -> bool { if !self.config.enabled || self.config.rewrite_attributes.is_empty() { return false; diff --git a/crates/common/src/integrations/nextjs/rsc_placeholders.rs b/crates/common/src/integrations/nextjs/rsc_placeholders.rs index 1aa0b391..8ab9a03e 100644 --- a/crates/common/src/integrations/nextjs/rsc_placeholders.rs +++ b/crates/common/src/integrations/nextjs/rsc_placeholders.rs @@ -18,6 +18,14 @@ pub(super) const RSC_PAYLOAD_PLACEHOLDER_SUFFIX: &str = "__"; #[derive(Default)] pub(super) struct NextJsRscPostProcessState { pub(super) payloads: Vec, + /// Set to `true` when a fragmented script was observed during the streaming + /// pass (i.e. `lol_html` delivered script text in multiple chunks). The + /// placeholder rewriter cannot process fragmented scripts, so the + /// post-processor's fallback re-parse path must handle them. This flag + /// ensures accumulation is triggered even when no payloads were captured + /// via placeholders. For non-RSC scripts the post-processor's + /// `should_process` check will return false, so the only cost is buffering. + pub(super) saw_fragmented_script: bool, } impl NextJsRscPostProcessState { @@ -26,6 +34,31 @@ impl NextJsRscPostProcessState { } } +/// Returns `true` if the streaming pass detected RSC content that requires +/// post-processing. +/// +/// This covers two scenarios: +/// 1. Unfragmented RSC scripts whose payloads were captured as placeholders. +/// 2. Fragmented RSC scripts (script text split across `lol_html` chunks) +/// that the placeholder rewriter could not process — the post-processor's +/// fallback re-parse path will handle these. +/// +/// Used by `HtmlWithPostProcessing` to decide whether to start +/// accumulating output for post-processing. +#[must_use] +pub(super) fn needs_post_processing( + document_state: &crate::integrations::IntegrationDocumentState, +) -> bool { + document_state + .get::>(NEXTJS_INTEGRATION_ID) + .is_some_and(|state| { + let guard = state + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + !guard.payloads.is_empty() || guard.saw_fragmented_script + }) +} + fn rsc_payload_placeholder(index: usize) -> String { format!("{RSC_PAYLOAD_PLACEHOLDER_PREFIX}{index}{RSC_PAYLOAD_PLACEHOLDER_SUFFIX}") } @@ -58,8 +91,20 @@ impl IntegrationScriptRewriter for NextJsRscPlaceholderRewriter { // Fragmented scripts are handled by the post-processor which re-parses the final HTML. // This avoids corrupting non-RSC scripts that happen to be fragmented during streaming. if !ctx.is_last_in_text_node { - // Script is fragmented - skip placeholder processing. - // The post-processor will handle RSC scripts at end-of-document. + // Script is fragmented — skip placeholder processing but flag it so + // that `HtmlWithPostProcessing` knows to accumulate output for the + // post-processor's fallback re-parse path. We flag any fragmented + // script (not just those containing `__next_f`) because the RSC + // marker can itself be split across chunk boundaries. + let state = ctx + .document_state + .get_or_insert_with(NEXTJS_INTEGRATION_ID, || { + Mutex::new(NextJsRscPostProcessState::default()) + }); + let mut guard = state + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + guard.saw_fragmented_script = true; return ScriptRewriteAction::keep(); } @@ -183,12 +228,18 @@ mod tests { "Final chunk of fragmented script should be kept" ); - // No payloads should be stored - post-processor will handle this + // No payloads should be stored - post-processor will handle this via re-parse + let stored = state + .get::>(NEXTJS_INTEGRATION_ID) + .expect("RSC state should be created for fragmented RSC scripts"); + let guard = stored.lock().expect("should lock Next.js RSC state"); + assert!( + guard.payloads.is_empty(), + "No payloads should be captured for fragmented scripts" + ); assert!( - state - .get::>(NEXTJS_INTEGRATION_ID) - .is_none(), - "No RSC state should be created for fragmented scripts" + guard.saw_fragmented_script, + "Fragmented scripts should set the saw_fragmented_script flag" ); } diff --git a/crates/common/src/integrations/registry.rs b/crates/common/src/integrations/registry.rs index 07fec408..2540c2fc 100644 --- a/crates/common/src/integrations/registry.rs +++ b/crates/common/src/integrations/registry.rs @@ -364,6 +364,15 @@ pub trait IntegrationHtmlPostProcessor: Send + Sync { /// Identifier for logging/diagnostics. fn integration_id(&self) -> &'static str; + /// Return `true` when this processor requires buffering of subsequent + /// streamed output so [`Self::post_process`] can run against complete HTML. + /// + /// Defaults to `true` for correctness: processors that do not override this + /// method will continue to receive whole-document HTML as before. + fn needs_accumulation(&self, _document_state: &IntegrationDocumentState) -> bool { + true + } + /// Fast preflight check to decide whether post-processing should run for this document. /// /// Implementations should keep this cheap (e.g., a substring check) because it may run on @@ -945,6 +954,17 @@ mod tests { ); } + #[test] + fn default_html_post_processor_needs_accumulation_is_true() { + let processor = NoopHtmlPostProcessor; + let document_state = IntegrationDocumentState::default(); + + assert!( + processor.needs_accumulation(&document_state), + "Default `needs_accumulation` should be true for post-processing correctness" + ); + } + #[test] fn test_exact_route_matching() { let routes = vec![( diff --git a/crates/common/src/publisher.rs b/crates/common/src/publisher.rs index 78489d2e..a54a6bb8 100644 --- a/crates/common/src/publisher.rs +++ b/crates/common/src/publisher.rs @@ -1,11 +1,14 @@ use error_stack::{Report, ResultExt}; use fastly::http::{header, StatusCode}; -use fastly::{Body, Request, Response}; +use fastly::{Request, Response}; use crate::backend::BackendConfig; use crate::http_util::{serve_static_with_etag, RequestInfo}; -use crate::constants::{HEADER_X_COMPRESS_HINT, HEADER_X_SYNTHETIC_ID}; +use crate::constants::{ + ENV_FASTLY_IS_STAGING, ENV_FASTLY_SERVICE_VERSION, HEADER_X_COMPRESS_HINT, + HEADER_X_SYNTHETIC_ID, HEADER_X_TS_ENV, HEADER_X_TS_VERSION, +}; use crate::cookies::set_synthetic_cookie; use crate::error::TrustedServerError; use crate::integrations::IntegrationRegistry; @@ -50,7 +53,6 @@ pub fn handle_tsjs_dynamic( /// Parameters for processing response streaming struct ProcessResponseParams<'a> { - content_encoding: &'a str, origin_host: &'a str, origin_url: &'a str, request_host: &'a str, @@ -60,92 +62,6 @@ struct ProcessResponseParams<'a> { integration_registry: &'a IntegrationRegistry, } -/// Process response body in streaming fashion with compression preservation -fn process_response_streaming( - body: Body, - params: &ProcessResponseParams, -) -> Result> { - // Check if this is HTML content - let is_html = params.content_type.contains("text/html"); - let is_rsc_flight = params.content_type.contains("text/x-component"); - log::debug!( - "process_response_streaming: content_type={}, content_encoding={}, is_html={}, is_rsc_flight={}, origin_host={}", - params.content_type, - params.content_encoding, - is_html, - is_rsc_flight, - params.origin_host - ); - - // Determine compression type - let compression = Compression::from_content_encoding(params.content_encoding); - - // Create output body to collect results - let mut output = Vec::new(); - - // Choose processor based on content type - if is_html { - // Use HTML rewriter for HTML content - let processor = create_html_stream_processor( - params.origin_host, - params.request_host, - params.request_scheme, - params.settings, - params.integration_registry, - )?; - - let config = PipelineConfig { - input_compression: compression, - output_compression: compression, - chunk_size: 8192, - }; - - let mut pipeline = StreamingPipeline::new(config, processor); - pipeline.process(body, &mut output)?; - } else if is_rsc_flight { - // RSC Flight responses are length-prefixed (T rows). A naive string replacement will - // corrupt the stream by changing byte lengths without updating the prefixes. - let processor = RscFlightUrlRewriter::new( - params.origin_host, - params.origin_url, - params.request_host, - params.request_scheme, - ); - - let config = PipelineConfig { - input_compression: compression, - output_compression: compression, - chunk_size: 8192, - }; - - let mut pipeline = StreamingPipeline::new(config, processor); - pipeline.process(body, &mut output)?; - } else { - // Use simple text replacer for non-HTML content - let replacer = create_url_replacer( - params.origin_host, - params.origin_url, - params.request_host, - params.request_scheme, - ); - - let config = PipelineConfig { - input_compression: compression, - output_compression: compression, - chunk_size: 8192, - }; - - let mut pipeline = StreamingPipeline::new(config, replacer); - pipeline.process(body, &mut output)?; - } - - log::debug!( - "Streaming processing complete - output size: {} bytes", - output.len() - ); - Ok(Body::from(output)) -} - /// Create a unified HTML stream processor fn create_html_stream_processor( origin_host: &str, @@ -167,57 +83,55 @@ fn create_html_stream_processor( Ok(create_html_processor(config)) } -/// Proxies requests to the publisher's origin server. +/// Apply standard Trusted Server response headers (version, staging flag, custom headers). /// -/// This function forwards incoming requests to the configured origin URL, -/// preserving headers and request body. It's used as a fallback for routes -/// not explicitly handled by the trusted server. +/// This should be called on every response before sending to the client, regardless of +/// whether the response is buffered or streamed. +pub fn apply_standard_response_headers(settings: &Settings, response: &mut Response) { + if let Ok(v) = ::std::env::var(ENV_FASTLY_SERVICE_VERSION) { + response.set_header(HEADER_X_TS_VERSION, v); + } + if ::std::env::var(ENV_FASTLY_IS_STAGING).as_deref() == Ok("1") { + response.set_header(HEADER_X_TS_ENV, "staging"); + } + for (key, value) in &settings.response_headers { + response.set_header(key, value); + } +} + +#[allow(clippy::large_enum_variant)] +pub enum RouteResult { + /// Response fully buffered — send via `send_to_client()` + Buffered(Response), + /// Response already streamed to client + Streamed, +} + +/// Streaming version of publisher request handling. +/// Uses `stream_to_client()` for text responses, falling back to buffered for errors. /// /// # Errors /// -/// Returns a [`TrustedServerError`] if: -/// - The proxy request fails -/// - The origin backend is unreachable -pub fn handle_publisher_request( +/// Returns an error if the generation of a synthetic ID fails, or if making the backend HTTP request to the origin fails. +pub fn handle_publisher_request_streaming( settings: &Settings, integration_registry: &IntegrationRegistry, mut req: Request, -) -> Result> { - log::debug!("Proxying request to publisher_origin"); - - // Prebid.js requests are not intercepted here anymore. The HTML processor removes - // publisher-supplied Prebid scripts; the unified TSJS bundle includes Prebid.js when enabled. +) -> Result> { + log::debug!("Streaming: Proxying request to publisher_origin"); - // Extract request host and scheme from headers (supports X-Forwarded-Host/Proto for chained proxies) let request_info = RequestInfo::from_request(&req); let request_host = &request_info.host; let request_scheme = &request_info.scheme; - log::debug!( - "Request info: host={}, scheme={} (X-Forwarded-Host: {:?}, Host: {:?}, X-Forwarded-Proto: {:?})", - request_host, - request_scheme, - req.get_header("x-forwarded-host"), - req.get_header(header::HOST), - req.get_header("x-forwarded-proto"), - ); - - // Generate synthetic identifiers before the request body is consumed. let synthetic_id = get_or_generate_synthetic_id(settings, &req)?; - log::debug!("Proxy synthetic IDs - trusted: {}", synthetic_id); - let backend_name = BackendConfig::from_url( &settings.publisher.origin_url, settings.proxy.certificate_check, )?; let origin_host = settings.publisher.origin_host(); - log::debug!( - "Proxying to dynamic backend: {} (from {})", - backend_name, - settings.publisher.origin_url - ); req.set_header("host", &origin_host); let mut response = req @@ -226,13 +140,6 @@ pub fn handle_publisher_request( message: "Failed to proxy request to origin".to_string(), })?; - // Log all response headers for debugging - log::debug!("Response headers:"); - for (name, value) in response.get_headers() { - log::debug!(" {}: {:?}", name, value); - } - - // Check if the response has a text-based content type that we should process let content_type = response .get_header(header::CONTENT_TYPE) .map(|h| h.to_str().unwrap_or_default()) @@ -243,68 +150,125 @@ pub fn handle_publisher_request( || content_type.contains("application/javascript") || content_type.contains("application/json"); - if should_process && !request_host.is_empty() { - // Check if the response is compressed - let content_encoding = response - .get_header(header::CONTENT_ENCODING) - .map(|h| h.to_str().unwrap_or_default()) - .unwrap_or_default() - .to_lowercase(); + // Only stream 2xx text responses that need processing. Non-processable responses + // fall back to buffered return via `RouteResult::Buffered`. + let will_stream = + response.get_status().is_success() && should_process && !request_host.is_empty(); - // Log response details for debugging + if !will_stream { log::debug!( - "Processing response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", - content_type, content_encoding, request_host, origin_host + "Falling back to buffered for response - status: {}, should_process: {}, request_host: '{}'", + response.get_status(), + should_process, + request_host ); + response.set_header(HEADER_X_SYNTHETIC_ID, synthetic_id.as_str()); + set_synthetic_cookie(settings, &mut response, synthetic_id.as_str()); + apply_standard_response_headers(settings, &mut response); + return Ok(RouteResult::Buffered(response)); + } + + let content_encoding = response + .get_header(header::CONTENT_ENCODING) + .map(|h| h.to_str().unwrap_or_default()) + .unwrap_or_default() + .to_lowercase(); + + log::debug!( + "Streaming response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", + content_type, content_encoding, request_host, origin_host + ); + + let body = response.take_body(); + let compression = Compression::from_content_encoding(&content_encoding); + + response.set_header(HEADER_X_SYNTHETIC_ID, synthetic_id.as_str()); + set_synthetic_cookie(settings, &mut response, synthetic_id.as_str()); + apply_standard_response_headers(settings, &mut response); - // Take the response body for streaming processing - let body = response.take_body(); - - // Process the body using streaming approach - let params = ProcessResponseParams { - content_encoding: &content_encoding, - origin_host: &origin_host, - origin_url: &settings.publisher.origin_url, - request_host, - request_scheme, - settings, - content_type: &content_type, - integration_registry, - }; - match process_response_streaming(body, ¶ms) { - Ok(processed_body) => { - // Set the processed body back - response.set_body(processed_body); - - // Remove Content-Length as the size has likely changed - response.remove_header(header::CONTENT_LENGTH); - - // Keep Content-Encoding header since we're returning compressed content - log::debug!( - "Preserved Content-Encoding: {} for compressed response", - content_encoding - ); - - log::debug!("Completed streaming processing of response body"); + // Remove content-length since we stream and modify size + response.remove_header(header::CONTENT_LENGTH); + + // Commit to streaming — headers (including our additions) sent NOW + let streaming_body = response.stream_to_client(); + let mut buffered_streaming_body = std::io::BufWriter::with_capacity(8192, streaming_body); + + let params = ProcessResponseParams { + origin_host: &origin_host, + origin_url: &settings.publisher.origin_url, + request_host, + request_scheme, + settings, + content_type: &content_type, + integration_registry, + }; + + let is_html = params.content_type.contains("text/html"); + let is_rsc_flight = params.content_type.contains("text/x-component"); + + let config = PipelineConfig { + input_compression: compression, + output_compression: compression, + chunk_size: 8192, + }; + + let process_result = if is_html { + match create_html_stream_processor( + params.origin_host, + params.request_host, + params.request_scheme, + params.settings, + params.integration_registry, + ) { + Ok(processor) => { + let mut pipeline = StreamingPipeline::new(config, processor); + pipeline.process(body, &mut buffered_streaming_body) } Err(e) => { - log::error!("Failed to process response body: {:?}", e); - // Return an error response - return Err(e); + log::error!("Failed to create html stream processor: {:?}", e); + // We've already sent headers, we can't change the status. Just return. + return Ok(RouteResult::Streamed); } } + } else if is_rsc_flight { + let processor = RscFlightUrlRewriter::new( + params.origin_host, + params.origin_url, + params.request_host, + params.request_scheme, + ); + let mut pipeline = StreamingPipeline::new(config, processor); + pipeline.process(body, &mut buffered_streaming_body) } else { - log::debug!( - "Skipping response processing - should_process: {}, request_host: '{}'", - should_process, - request_host + let replacer = create_url_replacer( + params.origin_host, + params.origin_url, + params.request_host, + params.request_scheme, ); + let mut pipeline = StreamingPipeline::new(config, replacer); + pipeline.process(body, &mut buffered_streaming_body) + }; + + match process_result { + Ok(()) => match buffered_streaming_body.into_inner() { + Ok(streaming_body) => { + if let Err(e) = streaming_body.finish() { + log::error!("Failed to finish streaming_body: {:?}", e); + } else { + log::debug!("Completed streaming processing of response body"); + } + } + Err(e) => { + log::error!("Failed to flush buffered streaming body: {:?}", e.error()); + } + }, + Err(e) => { + log::error!("Streaming failed mid-flight: {:?}", e); + } } - response.set_header(HEADER_X_SYNTHETIC_ID, synthetic_id.as_str()); - set_synthetic_cookie(settings, &mut response, synthetic_id.as_str()); - - Ok(response) + Ok(RouteResult::Streamed) } #[cfg(test)] @@ -398,12 +362,6 @@ mod tests { } } - // Note: test_streaming_compressed_content removed as it directly tested private function - // process_response_streaming. The functionality is tested through handle_publisher_request. - - // Note: test_streaming_brotli_content removed as it directly tested private function - // process_response_streaming. The functionality is tested through handle_publisher_request. - #[test] fn test_content_encoding_detection() { // Test that we properly handle responses with various content encodings diff --git a/crates/common/src/streaming_processor.rs b/crates/common/src/streaming_processor.rs index cda62e6f..f53fbdc5 100644 --- a/crates/common/src/streaming_processor.rs +++ b/crates/common/src/streaming_processor.rs @@ -7,7 +7,9 @@ //! - UTF-8 boundary handling use error_stack::{Report, ResultExt}; +use std::cell::RefCell; use std::io::{self, Read, Write}; +use std::rc::Rc; use crate::error::TrustedServerError; @@ -179,7 +181,7 @@ impl StreamingPipeline

{ Ok(()) } - /// Process gzip compressed stream + /// Process gzip compressed stream (streaming — no full-body buffering) fn process_gzip_to_gzip( &mut self, input: R, @@ -189,38 +191,13 @@ impl StreamingPipeline

{ use flate2::write::GzEncoder; use flate2::Compression; - // Decompress input - let mut decoder = GzDecoder::new(input); - let mut decompressed = Vec::new(); - decoder - .read_to_end(&mut decompressed) - .change_context(TrustedServerError::Proxy { - message: "Failed to decompress gzip".to_string(), - })?; - - log::info!("Decompressed size: {} bytes", decompressed.len()); - - // Process the decompressed content - let processed = self - .processor - .process_chunk(&decompressed, true) - .change_context(TrustedServerError::Proxy { - message: "Failed to process content".to_string(), - })?; - - log::info!("Processed size: {} bytes", processed.len()); + let decoder = GzDecoder::new(input); + let encoder = GzEncoder::new(output, Compression::default()); - // Recompress the output - let mut encoder = GzEncoder::new(output, Compression::default()); - encoder - .write_all(&processed) - .change_context(TrustedServerError::Proxy { - message: "Failed to write to gzip encoder".to_string(), - })?; + let encoder = self.process_through_compression(decoder, encoder)?; encoder.finish().change_context(TrustedServerError::Proxy { message: "Failed to finish gzip encoder".to_string(), })?; - Ok(()) } @@ -272,7 +249,7 @@ impl StreamingPipeline

{ self.decompress_and_process(GzDecoder::new(input), output, "gzip") } - /// Process deflate compressed stream + /// Process deflate compressed stream (streaming) fn process_deflate_to_deflate( &mut self, input: R, @@ -285,7 +262,11 @@ impl StreamingPipeline

{ let decoder = ZlibDecoder::new(input); let encoder = ZlibEncoder::new(output, Compression::default()); - self.process_through_compression(decoder, encoder) + let encoder = self.process_through_compression(decoder, encoder)?; + encoder.finish().change_context(TrustedServerError::Proxy { + message: "Failed to finish deflate encoder".to_string(), + })?; + Ok(()) } /// Process deflate compressed input to uncompressed output (decompression only) @@ -299,7 +280,7 @@ impl StreamingPipeline

{ self.decompress_and_process(ZlibDecoder::new(input), output, "deflate") } - /// Process brotli compressed stream + /// Process brotli compressed stream (streaming) fn process_brotli_to_brotli( &mut self, input: R, @@ -317,7 +298,11 @@ impl StreamingPipeline

{ }; let encoder = CompressorWriter::with_params(output, 4096, ¶ms); - self.process_through_compression(decoder, encoder) + let encoder = self.process_through_compression(decoder, encoder)?; + // CompressorWriter finalizes the brotli stream on drop. Unlike gzip/deflate, + // brotli has no checksum trailer so drop-based finalization is safe. + drop(encoder); + Ok(()) } /// Process brotli compressed input to uncompressed output (decompression only) @@ -331,12 +316,17 @@ impl StreamingPipeline

{ self.decompress_and_process(Decompressor::new(input, 4096), output, "brotli") } - /// Generic processing through compression layers + /// Generic chunk loop through compression layers. + /// + /// Returns the encoder so the caller can finalize it properly (e.g. + /// `GzEncoder::finish()`, `ZlibEncoder::finish()`). This avoids the + /// silent error swallowing that `drop(encoder)` causes — gzip/deflate + /// trailers contain checksums whose write failures must be propagated. fn process_through_compression( &mut self, mut decoder: R, mut encoder: W, - ) -> Result<(), Report> { + ) -> Result> { let mut buffer = vec![0u8; self.config.chunk_size]; loop { @@ -380,94 +370,86 @@ impl StreamingPipeline

{ } } - // Flush encoder (this also finishes compression) encoder.flush().change_context(TrustedServerError::Proxy { message: "Failed to flush encoder".to_string(), })?; - // For GzEncoder and similar, we need to finish() to properly close the stream - // The flush above might not be enough - drop(encoder); + Ok(encoder) + } +} - Ok(()) +/// Output sink that writes `lol_html` output chunks into a shared `Rc>>` buffer. +struct RcVecSink(Rc>>); + +impl lol_html::OutputSink for RcVecSink { + fn handle_chunk(&mut self, chunk: &[u8]) { + self.0.borrow_mut().extend_from_slice(chunk); } } -/// Adapter to use `lol_html` `HtmlRewriter` as a `StreamProcessor` -/// Important: Due to `lol_html`'s ownership model, we must accumulate input -/// and process it all at once when the stream ends. This is a limitation -/// of the `lol_html` library's API design. +/// Adapter to use `lol_html` `HtmlRewriter` as a `StreamProcessor`. +/// +/// Uses `lol_html`'s incremental streaming API: each incoming chunk is written to +/// the rewriter immediately, and whatever output `lol_html` has ready is drained +/// and returned. This avoids buffering the full document before processing begins. pub struct HtmlRewriterAdapter { - settings: lol_html::Settings<'static, 'static>, - accumulated_input: Vec, + rewriter: Option>, + output: Rc>>, } impl HtmlRewriterAdapter { - /// Create a new HTML rewriter adapter + /// Create a new HTML rewriter adapter. #[must_use] pub fn new(settings: lol_html::Settings<'static, 'static>) -> Self { + // Pre-allocate to avoid reallocation churn since lol_html writes incrementally + let output = Rc::new(RefCell::new(Vec::with_capacity(8192))); + let rewriter = lol_html::HtmlRewriter::new(settings, RcVecSink(Rc::clone(&output))); Self { - settings, - accumulated_input: Vec::new(), + rewriter: Some(rewriter), + output, } } } impl StreamProcessor for HtmlRewriterAdapter { fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result, io::Error> { - // Accumulate input chunks - self.accumulated_input.extend_from_slice(chunk); - - if !chunk.is_empty() { - log::debug!( - "Buffering chunk: {} bytes, total buffered: {} bytes", - chunk.len(), - self.accumulated_input.len() - ); + if let Some(rewriter) = &mut self.rewriter { + if !chunk.is_empty() { + rewriter.write(chunk).map_err(|e| { + log::error!("Failed to write HTML chunk: {}", e); + io::Error::other(format!("HTML processing failed: {}", e)) + })?; + } } - // Only process when we have all the input if is_last { - log::info!( - "Processing complete document: {} bytes", - self.accumulated_input.len() - ); - - // Process all accumulated input at once - let mut output = Vec::new(); - - // Create rewriter with output sink - let mut rewriter = lol_html::HtmlRewriter::new( - std::mem::take(&mut self.settings), - |chunk: &[u8]| { - output.extend_from_slice(chunk); - }, - ); - - // Process the entire document - rewriter.write(&self.accumulated_input).map_err(|e| { - log::error!("Failed to process HTML: {}", e); - io::Error::other(format!("HTML processing failed: {}", e)) - })?; - - // Finalize the rewriter - rewriter.end().map_err(|e| { - log::error!("Failed to finalize: {}", e); - io::Error::other(format!("HTML finalization failed: {}", e)) - })?; - - log::debug!("Output size: {} bytes", output.len()); - self.accumulated_input.clear(); - Ok(output) - } else { - // Return empty until we have all input - // This is a limitation of lol_html's API - Ok(Vec::new()) + if let Some(rewriter) = self.rewriter.take() { + rewriter.end().map_err(|e| { + log::error!("Failed to finalize HTML rewriter: {}", e); + io::Error::other(format!("HTML finalization failed: {}", e)) + })?; + } } + + // Drain whatever lol_html produced for this chunk and return it. + // Pre-allocate the next buffer to prevent lol_html from triggering allocations on its many small writes. + let result = std::mem::replace( + &mut *self.output.borrow_mut(), + Vec::with_capacity(std::cmp::max(chunk.len() + 1024, 8192)), + ); + log::debug!( + "HtmlRewriterAdapter::process_chunk: input={} bytes, output={} bytes, is_last={}", + chunk.len(), + result.len(), + is_last + ); + Ok(result) } fn reset(&mut self) { - self.accumulated_input.clear(); + // The rewriter is consumed after end(); a new HtmlRewriterAdapter should + // be created per document. Clear any remaining output buffer. + self.output.borrow_mut().clear(); } } @@ -534,7 +516,7 @@ mod tests { } #[test] - fn test_html_rewriter_adapter_accumulates_until_last() { + fn test_html_rewriter_adapter_streams_incrementally() { use lol_html::{element, Settings}; // Create a simple HTML rewriter that replaces text @@ -548,30 +530,32 @@ mod tests { let mut adapter = HtmlRewriterAdapter::new(settings); - // Test that intermediate chunks return empty + // Collect all output across chunks; the rewriter may emit partial output at any point. + let mut full_output = Vec::new(); + let chunk1 = b""; - let result1 = adapter - .process_chunk(chunk1, false) - .expect("should process chunk1"); - assert_eq!(result1.len(), 0, "Should return empty for non-last chunk"); + full_output.extend( + adapter + .process_chunk(chunk1, false) + .expect("should process chunk1"), + ); let chunk2 = b"

original

"; - let result2 = adapter - .process_chunk(chunk2, false) - .expect("should process chunk2"); - assert_eq!(result2.len(), 0, "Should return empty for non-last chunk"); + full_output.extend( + adapter + .process_chunk(chunk2, false) + .expect("should process chunk2"), + ); - // Test that last chunk processes everything let chunk3 = b""; - let result3 = adapter - .process_chunk(chunk3, true) - .expect("should process final chunk"); - assert!( - !result3.is_empty(), - "Should return processed content for last chunk" + full_output.extend( + adapter + .process_chunk(chunk3, true) + .expect("should process final chunk"), ); - let output = String::from_utf8(result3).expect("output should be valid UTF-8"); + assert!(!full_output.is_empty(), "Should have produced output"); + let output = String::from_utf8(full_output).expect("output should be valid UTF-8"); assert!(output.contains("replaced"), "Should have replaced content"); assert!(output.contains(""), "Should have complete HTML"); } @@ -590,27 +574,23 @@ mod tests { } large_html.push_str(""); - // Process in chunks + // Process in chunks, collecting all output. let chunk_size = 1024; let bytes = large_html.as_bytes(); - let mut chunks = bytes.chunks(chunk_size); - let mut last_chunk = chunks.next().unwrap_or(&[]); + let chunks: Vec<_> = bytes.chunks(chunk_size).collect(); + let last_idx = chunks.len().saturating_sub(1); - for chunk in chunks { + let mut full_output = Vec::new(); + for (i, chunk) in chunks.iter().enumerate() { + let is_last = i == last_idx; let result = adapter - .process_chunk(last_chunk, false) - .expect("should process intermediate chunk"); - assert_eq!(result.len(), 0, "Intermediate chunks should return empty"); - last_chunk = chunk; + .process_chunk(chunk, is_last) + .expect("should process chunk"); + full_output.extend(result); } - // Process last chunk - let result = adapter - .process_chunk(last_chunk, true) - .expect("should process last chunk"); - assert!(!result.is_empty(), "Last chunk should return content"); - - let output = String::from_utf8(result).expect("output should be valid UTF-8"); + assert!(!full_output.is_empty(), "Should have produced output"); + let output = String::from_utf8(full_output).expect("output should be valid UTF-8"); assert!( output.contains("Paragraph 999"), "Should contain all content" @@ -618,32 +598,22 @@ mod tests { } #[test] - fn test_html_rewriter_adapter_reset() { + fn test_html_rewriter_adapter_reset_clears_output_buffer() { use lol_html::Settings; + // reset() is a no-op on the rewriter itself (a new adapter is needed per document), + // but it must clear any pending bytes in the output buffer. let settings = Settings::default(); let mut adapter = HtmlRewriterAdapter::new(settings); - // Process some content - adapter - .process_chunk(b"", false) - .expect("should process html tag"); - adapter - .process_chunk(b"test", false) - .expect("should process body"); + // Write a full document so the rewriter is finished. + let _ = adapter + .process_chunk(b"

test

", true) + .expect("should process complete document"); - // Reset should clear accumulated input + // reset() should not panic and should leave the buffer empty. adapter.reset(); - - // After reset, adapter should be ready for new input - let result = adapter - .process_chunk(b"

new

", true) - .expect("should process new content after reset"); - let output = String::from_utf8(result).expect("output should be valid UTF-8"); - assert_eq!( - output, "

new

", - "Should only contain new input after reset" - ); + // No assertion on a subsequent process_chunk — the rewriter is consumed. } #[test] diff --git a/crates/common/tests/nextjs_integration.rs b/crates/common/tests/nextjs_integration.rs new file mode 100644 index 00000000..16e1845b --- /dev/null +++ b/crates/common/tests/nextjs_integration.rs @@ -0,0 +1,629 @@ +//! Fixture-driven integration tests for Next.js RSC URL rewriting. +//! +//! These tests exercise the full streaming pipeline against realistic HTML +//! fixtures captured from a Next.js App Router application. Each fixture is +//! processed with multiple chunk sizes to exercise both the placeholder path +//! (unfragmented scripts) and the fallback re-parse path (fragmented scripts). + +#![allow(clippy::print_stdout)] + +use std::io::Cursor; + +use trusted_server_common::html_processor::{create_html_processor, HtmlProcessorConfig}; +use trusted_server_common::integrations::IntegrationRegistry; +use trusted_server_common::settings::Settings; +use trusted_server_common::streaming_processor::{ + Compression, PipelineConfig, StreamProcessor, StreamingPipeline, +}; + +// --------------------------------------------------------------------------- +// Fixtures +// --------------------------------------------------------------------------- + +const FIXTURE_SIMPLE: &str = + include_str!("../src/integrations/nextjs/fixtures/app-router-simple.html"); +const FIXTURE_TCHUNK: &str = + include_str!("../src/integrations/nextjs/fixtures/app-router-tchunk.html"); +const FIXTURE_LARGE: &str = + include_str!("../src/integrations/nextjs/fixtures/app-router-large.html"); +const FIXTURE_NON_RSC: &str = include_str!("../src/integrations/nextjs/fixtures/non-rsc-page.html"); + +// --------------------------------------------------------------------------- +// Test helpers +// --------------------------------------------------------------------------- + +const ORIGIN_HOST: &str = "origin.example.com"; +const PROXY_HOST: &str = "proxy.example.com"; +const SCHEME: &str = "https"; + +/// Small chunk size to maximize script fragmentation and exercise cross-chunk state handling. +/// With 32-64 byte chunks, `lol_html` frequently fragments script text nodes, forcing the +/// fallback re-parse path for RSC placeholder substitution. +const CHUNK_SIZE_SMALL: usize = 32; + +/// Medium chunk size - typical for network reads. Balances between fragmentation +/// and realistic streaming behavior. +const CHUNK_SIZE_MEDIUM: usize = 256; + +/// Large chunk size - can fit small to medium HTML documents in a single chunk. +/// Tests the placeholder path (unfragmented scripts) vs fallback re-parse path. +const CHUNK_SIZE_LARGE: usize = 8192; + +fn create_nextjs_settings() -> Settings { + let toml = r#" + [[handlers]] + path = "^/secure" + username = "user" + password = "pass" + + [publisher] + domain = "test-publisher.com" + cookie_domain = ".test-publisher.com" + origin_backend = "publisher_origin" + origin_url = "https://origin.example.com" + proxy_secret = "unit-test-proxy-secret" + + [integrations.prebid] + enabled = false + + [integrations.nextjs] + enabled = true + rewrite_attributes = ["href", "link", "url"] + + [synthetic] + counter_store = "test-counter-store" + opid_store = "test-opid-store" + secret_key = "test-secret-key" + template = "{{client_ip}}:{{user_agent}}" + + [request_signing] + config_store_id = "test-config-store-id" + secret_store_id = "test-secret-store-id" + + [[backends]] + name = "publisher_origin" + target = "https://origin.example.com" + "#; + Settings::from_toml(toml).expect("test settings should parse") +} + +fn create_non_rsc_settings() -> Settings { + let toml = r#" + [[handlers]] + path = "^/secure" + username = "user" + password = "pass" + + [publisher] + domain = "test-publisher.com" + cookie_domain = ".test-publisher.com" + origin_backend = "publisher_origin" + origin_url = "https://origin.example.com" + proxy_secret = "unit-test-proxy-secret" + + [integrations.prebid] + enabled = false + + [integrations.nextjs] + enabled = false + + [synthetic] + counter_store = "test-counter-store" + opid_store = "test-opid-store" + secret_key = "test-secret-key" + template = "{{client_ip}}:{{user_agent}}" + + [request_signing] + config_store_id = "test-config-store-id" + secret_store_id = "test-secret-store-id" + + [[backends]] + name = "publisher_origin" + target = "https://origin.example.com" + "#; + Settings::from_toml(toml).expect("test settings should parse") +} + +struct FixtureTestResult { + output: String, + intermediate_bytes: usize, + final_bytes: usize, +} + +impl FixtureTestResult { + fn total_bytes(&self) -> usize { + self.intermediate_bytes + self.final_bytes + } + + fn streaming_ratio(&self) -> f64 { + let total = self.total_bytes(); + if total == 0 { + 0.0 + } else { + self.intermediate_bytes as f64 / total as f64 + } + } +} + +/// Process a fixture through the full streaming pipeline and return results. +fn run_pipeline_test(fixture: &str, chunk_size: usize, settings: &Settings) -> FixtureTestResult { + let registry = IntegrationRegistry::new(settings).expect("should create registry"); + let config = + HtmlProcessorConfig::from_settings(settings, ®istry, ORIGIN_HOST, PROXY_HOST, SCHEME); + let processor = create_html_processor(config); + + let pipeline_config = PipelineConfig { + input_compression: Compression::None, + output_compression: Compression::None, + chunk_size, + }; + let mut pipeline = StreamingPipeline::new(pipeline_config, processor); + let mut output = Vec::new(); + pipeline + .process(Cursor::new(fixture.as_bytes()), &mut output) + .expect("pipeline should process fixture"); + + let output_str = String::from_utf8(output).expect("output should be valid UTF-8"); + + // StreamingPipeline doesn't expose per-chunk metrics, so we use a + // chunk-level processor to measure streaming behavior. + FixtureTestResult { + output: output_str, + intermediate_bytes: 0, + final_bytes: 0, + } +} + +/// Process a fixture chunk-by-chunk using the raw `StreamProcessor` interface +/// to measure streaming behavior. +fn run_chunked_test(fixture: &str, chunk_size: usize, settings: &Settings) -> FixtureTestResult { + let registry = IntegrationRegistry::new(settings).expect("should create registry"); + let config = + HtmlProcessorConfig::from_settings(settings, ®istry, ORIGIN_HOST, PROXY_HOST, SCHEME); + let mut processor = create_html_processor(config); + + let bytes = fixture.as_bytes(); + let chunks: Vec<&[u8]> = bytes.chunks(chunk_size).collect(); + let last_idx = chunks.len().saturating_sub(1); + + let mut intermediate_bytes = 0usize; + let mut final_bytes = 0usize; + let mut full_output = Vec::new(); + + for (i, chunk) in chunks.iter().enumerate() { + let is_last = i == last_idx; + let result = processor + .process_chunk(chunk, is_last) + .expect("should process chunk"); + + if is_last { + final_bytes = result.len(); + } else { + intermediate_bytes += result.len(); + } + full_output.extend_from_slice(&result); + } + + let output = String::from_utf8(full_output).expect("output should be valid UTF-8"); + + FixtureTestResult { + output, + intermediate_bytes, + final_bytes, + } +} + +/// Shared correctness assertions for RSC fixtures. +fn assert_rsc_correctness(result: &FixtureTestResult, fixture_name: &str) { + // All origin URLs should be rewritten + assert!( + result.output.contains(PROXY_HOST), + "[{fixture_name}] Output should contain proxy host. Got:\n{}", + &result.output[..result.output.len().min(500)] + ); + + // No RSC placeholder markers should leak + assert!( + !result.output.contains("__ts_rsc_payload_"), + "[{fixture_name}] No RSC placeholder markers should appear in output" + ); + + // HTML structure should be intact + assert!( + result.output.contains(""), + "[{fixture_name}] HTML closing tag should be present" + ); + + // RSC scripts should still be present (even if content is rewritten) + assert!( + result.output.contains("__next_f"), + "[{fixture_name}] RSC scripts should be preserved in output" + ); +} + +fn assert_non_rsc_correctness(result: &FixtureTestResult, fixture_name: &str) { + assert!( + result.output.contains(PROXY_HOST), + "[{fixture_name}] Output should contain proxy host" + ); + assert!( + result.output.contains(" 0, + "Non-RSC pages should stream incrementally (got 0 intermediate bytes). \ + Final bytes: {}", + result.final_bytes + ); + + println!( + "non-rsc streaming ratio: {:.1}%", + result.streaming_ratio() * 100.0 + ); +} + +#[test] +fn non_rsc_page_streams_with_nextjs_enabled() { + // Even with Next.js enabled, non-RSC pages with unfragmented scripts should + // stream because the lazy accumulation fix only triggers for RSC content. + let settings = create_nextjs_settings(); + + // Use a chunk size that produces multiple chunks for the ~1KB fixture, + // but is large enough that the small analytics scripts (~30 bytes each) + // won't be fragmented by lol_html. + let result = run_chunked_test(FIXTURE_NON_RSC, CHUNK_SIZE_MEDIUM, &settings); + assert_non_rsc_correctness(&result, "non-rsc/nextjs-enabled/256"); + + assert!( + result.intermediate_bytes > 0, + "Non-RSC pages should stream even when Next.js is enabled \ + (got 0 intermediate bytes). Final bytes: {}", + result.final_bytes + ); + + println!( + "non-rsc with nextjs enabled streaming ratio: {:.1}%", + result.streaming_ratio() * 100.0 + ); +} + +// =========================================================================== +// Tests: URL rewriting completeness across fixtures +// =========================================================================== + +#[test] +fn all_fixtures_rewrite_html_attribute_urls() { + let settings = create_nextjs_settings(); + + for (name, fixture) in [ + ("simple", FIXTURE_SIMPLE), + ("tchunk", FIXTURE_TCHUNK), + ("large", FIXTURE_LARGE), + ] { + let result = run_pipeline_test(fixture, 8192, &settings); + + // href attributes should be rewritten + assert!( + !result.output.contains("href=\"https://origin.example.com"), + "[{name}] href attributes should be rewritten to proxy host" + ); + + // src attributes should be rewritten + assert!( + !result.output.contains("src=\"https://origin.example.com"), + "[{name}] src attributes should be rewritten to proxy host" + ); + } +} + +// =========================================================================== +// Tests: Real Next.js output (captured from the example app) +// =========================================================================== +// These fixtures are actual HTML responses from a Next.js 15 App Router app, +// not hand-crafted. They exercise the full complexity of real RSC payloads. + +const REAL_HOME: &str = include_str!("../src/integrations/nextjs/fixtures/real-nextjs-home.html"); +const REAL_ABOUT: &str = include_str!("../src/integrations/nextjs/fixtures/real-nextjs-about.html"); +const REAL_BLOG: &str = include_str!("../src/integrations/nextjs/fixtures/real-nextjs-blog.html"); + +#[test] +fn real_nextjs_home_pipeline() { + let settings = create_nextjs_settings(); + for chunk_size in [32, 64, 256, 8192] { + let result = run_pipeline_test(REAL_HOME, chunk_size, &settings); + + assert!( + result.output.contains(PROXY_HOST), + "[real-home/chunk={chunk_size}] Output should contain proxy host" + ); + assert!( + !result.output.contains("__ts_rsc_payload_"), + "[real-home/chunk={chunk_size}] No placeholder markers should leak" + ); + assert!( + result.output.contains(" content (RSC payloads) + let before = &result.output[..*pos]; + let last_script_open = before.rfind(""); + match (last_script_open, last_script_close) { + (Some(open), Some(close)) => open > close, // inside a script + (Some(_), None) => true, // inside first script + _ => false, + } + }) + .count(); + + println!( + "[{name}/chunk={chunk_size}] RSC payload origin URLs remaining: {rsc_origin_count}" + ); + + // RSC payloads should be rewritten (origin URLs replaced with proxy URLs) + assert_eq!( + rsc_origin_count, 0, + "[{name}/chunk={chunk_size}] All origin URLs in RSC payloads should be rewritten \ + to proxy host. Found {rsc_origin_count} remaining." + ); + } + } +} + +#[test] +fn real_nextjs_streaming_behavior() { + let settings = create_nextjs_settings(); + + for (name, fixture) in [ + ("real-home", REAL_HOME), + ("real-about", REAL_ABOUT), + ("real-blog", REAL_BLOG), + ] { + // Small chunks to see streaming behavior + let result = run_chunked_test(fixture, 64, &settings); + + println!( + "[{name}] streaming: {:.1}% ({} intermediate, {} final)", + result.streaming_ratio() * 100.0, + result.intermediate_bytes, + result.final_bytes + ); + + // Correctness should hold regardless of chunk size + assert!( + result.output.contains(PROXY_HOST), + "[{name}] Output should contain proxy host with 64-byte chunks" + ); + assert!( + result.output.contains(" Result { +use trusted_server_common::publisher::RouteResult; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum RouteTarget { + TsjsDynamic, + Discovery, + VerifySignature, + RotateKey, + DeactivateKey, + Auction, + FirstPartyProxy, + FirstPartyClick, + FirstPartySign, + FirstPartyProxyRebuild, + Integration, + PublisherProxy, +} + +fn classify_route( + method: &Method, + path: &str, + integration_registry: &IntegrationRegistry, +) -> RouteTarget { + if path.starts_with("/static/tsjs=") && method == Method::GET { + return RouteTarget::TsjsDynamic; + } + + match (method, path) { + (&Method::GET, "/.well-known/trusted-server.json") => RouteTarget::Discovery, + (&Method::POST, "/verify-signature") => RouteTarget::VerifySignature, + (&Method::POST, "/admin/keys/rotate") => RouteTarget::RotateKey, + (&Method::POST, "/admin/keys/deactivate") => RouteTarget::DeactivateKey, + (&Method::POST, "/auction") => RouteTarget::Auction, + (&Method::GET, "/first-party/proxy") => RouteTarget::FirstPartyProxy, + (&Method::GET, "/first-party/click") => RouteTarget::FirstPartyClick, + (&Method::GET, "/first-party/sign") | (&Method::POST, "/first-party/sign") => { + RouteTarget::FirstPartySign + } + (&Method::POST, "/first-party/proxy-rebuild") => RouteTarget::FirstPartyProxyRebuild, + (m, p) if integration_registry.has_route(m, p) => RouteTarget::Integration, + _ => RouteTarget::PublisherProxy, + } +} + +fn main() { + fastly::init(); init_logger(); + let req = Request::from_client(); let settings = match get_settings() { Ok(s) => s, Err(e) => { log::error!("Failed to load settings: {:?}", e); - return Ok(to_error_response(&e)); + to_error_response(&e).send_to_client(); + return; } }; - log::info!("Settings {settings:?}"); + log::debug!("Settings {settings:?}"); // Build the auction orchestrator once at startup let orchestrator = build_orchestrator(&settings); @@ -46,16 +89,26 @@ fn main(req: Request) -> Result { Ok(r) => r, Err(e) => { log::error!("Failed to create integration registry: {:?}", e); - return Ok(to_error_response(&e)); + to_error_response(&e).send_to_client(); + return; } }; - futures::executor::block_on(route_request( + match futures::executor::block_on(route_request( &settings, &orchestrator, &integration_registry, req, - )) + )) { + Ok(RouteResult::Buffered(resp)) => resp.send_to_client(), + Ok(RouteResult::Streamed) => { /* already streamed */ } + Err(e) => { + log::error!("Request routing failed: {:?}", e); + Response::from_status(fastly::http::StatusCode::INTERNAL_SERVER_ERROR) + .with_body(format!("Internal Server Error: {}", e)) + .send_to_client(); + } + } } async fn route_request( @@ -63,94 +116,76 @@ async fn route_request( orchestrator: &AuctionOrchestrator, integration_registry: &IntegrationRegistry, req: Request, -) -> Result { - if let Some(response) = enforce_basic_auth(settings, &req) { - return Ok(response); +) -> Result { + log::debug!( + "FASTLY_SERVICE_VERSION: {}", + ::std::env::var("FASTLY_SERVICE_VERSION").unwrap_or_else(|_| String::new()) + ); + + if let Some(mut response) = enforce_basic_auth(settings, &req) { + apply_standard_response_headers(settings, &mut response); + return Ok(RouteResult::Buffered(response)); } // Get path and method for routing let path = req.get_path().to_string(); let method = req.get_method().clone(); - - // Match known routes and handle them - let result = match (method, path.as_str()) { - // Serve the tsjs library - (Method::GET, path) if path.starts_with("/static/tsjs=") => { - handle_tsjs_dynamic(&req, integration_registry) - } - - // Discovery endpoint for trusted-server capabilities and JWKS - (Method::GET, "/.well-known/trusted-server.json") => { - handle_trusted_server_discovery(settings, req) + let target = classify_route(&method, &path, integration_registry); + if target == RouteTarget::PublisherProxy { + log::info!( + "No known route matched for path: {}, proxying to publisher origin", + path + ); + + use trusted_server_common::publisher::handle_publisher_request_streaming; + match handle_publisher_request_streaming(settings, integration_registry, req) { + Ok(route_result) => return Ok(route_result), + Err(e) => { + log::error!("Failed to proxy to publisher origin: {:?}", e); + let mut err_resp = to_error_response(&e); + apply_standard_response_headers(settings, &mut err_resp); + return Ok(RouteResult::Buffered(err_resp)); + } } + } - // Signature verification endpoint - (Method::POST, "/verify-signature") => handle_verify_signature(settings, req), - - // Key rotation admin endpoints - (Method::POST, "/admin/keys/rotate") => handle_rotate_key(settings, req), - (Method::POST, "/admin/keys/deactivate") => handle_deactivate_key(settings, req), - - // Unified auction endpoint (returns creative HTML inline) - (Method::POST, "/auction") => handle_auction(settings, orchestrator, req).await, - - // tsjs endpoints - (Method::GET, "/first-party/proxy") => handle_first_party_proxy(settings, req).await, - (Method::GET, "/first-party/click") => handle_first_party_click(settings, req).await, - (Method::GET, "/first-party/sign") | (Method::POST, "/first-party/sign") => { - handle_first_party_proxy_sign(settings, req).await - } - (Method::POST, "/first-party/proxy-rebuild") => { + // Match known routes and handle them + let result = match target { + RouteTarget::TsjsDynamic => handle_tsjs_dynamic(&req, integration_registry), + RouteTarget::Discovery => handle_trusted_server_discovery(settings, req), + RouteTarget::VerifySignature => handle_verify_signature(settings, req), + RouteTarget::RotateKey => handle_rotate_key(settings, req), + RouteTarget::DeactivateKey => handle_deactivate_key(settings, req), + RouteTarget::Auction => handle_auction(settings, orchestrator, req).await, + RouteTarget::FirstPartyProxy => handle_first_party_proxy(settings, req).await, + RouteTarget::FirstPartyClick => handle_first_party_click(settings, req).await, + RouteTarget::FirstPartySign => handle_first_party_proxy_sign(settings, req).await, + RouteTarget::FirstPartyProxyRebuild => { handle_first_party_proxy_rebuild(settings, req).await } - (m, path) if integration_registry.has_route(&m, path) => integration_registry - .handle_proxy(&m, path, settings, req) + RouteTarget::Integration => integration_registry + .handle_proxy(&method, &path, settings, req) .await .unwrap_or_else(|| { Err(Report::new(TrustedServerError::BadRequest { message: format!("Unknown integration route: {path}"), })) }), - - // No known route matched, proxy to publisher origin as fallback - _ => { - log::info!( - "No known route matched for path: {}, proxying to publisher origin", - path - ); - - match handle_publisher_request(settings, integration_registry, req) { - Ok(response) => Ok(response), - Err(e) => { - log::error!("Failed to proxy to publisher origin: {:?}", e); - Err(e) - } - } - } + RouteTarget::PublisherProxy => unreachable!(), }; // Convert any errors to HTTP error responses let mut response = result.unwrap_or_else(|e| to_error_response(&e)); + apply_standard_response_headers(settings, &mut response); - if let Ok(v) = ::std::env::var(ENV_FASTLY_SERVICE_VERSION) { - response.set_header(HEADER_X_TS_VERSION, v); - } - if ::std::env::var(ENV_FASTLY_IS_STAGING).as_deref() == Ok("1") { - response.set_header(HEADER_X_TS_ENV, "staging"); - } - - for (key, value) in &settings.response_headers { - response.set_header(key, value); - } - - Ok(response) + Ok(RouteResult::Buffered(response)) } fn init_logger() { let logger = Logger::builder() .default_endpoint("tslog") .echo_stdout(true) - .max_level(log::LevelFilter::Debug) + .max_level(log::LevelFilter::Info) .build() .expect("Failed to build Logger"); diff --git a/examples/nextjs-rsc-app/README.md b/examples/nextjs-rsc-app/README.md new file mode 100644 index 00000000..834aedb4 --- /dev/null +++ b/examples/nextjs-rsc-app/README.md @@ -0,0 +1,93 @@ +# Next.js RSC Test App + +Minimal Next.js 15 App Router application for testing Trusted Server's RSC +(React Server Components) URL rewriting integration. + +## Purpose + +This app generates realistic RSC Flight payloads containing +`origin.example.com` URLs. These payloads exercise every rewriting path in the +Trusted Server HTML processor: + +| Route | RSC Pattern | Rewriting Path | +|-------|------------|----------------| +| `/` | Simple JSON URLs in `__next_f.push` | Placeholder substitution | +| `/about` | HTML content with URLs (T-chunks) | T-chunk length recalculation | +| `/blog/hello-world` | Large payload spanning multiple scripts | Cross-script T-chunk handling | + +## Quick Start + +```bash +npm install +npm run dev +# Visit http://localhost:3099 +``` + +## Testing RSC Streaming + +### Quick Test with Live HTML + +Test with HTML from your **currently running** server: + +```bash +# Terminal 1: Start dev server +npm run dev + +# Terminal 2: Test live HTML +./test-live-html.sh # Test home page +./test-live-html.sh http://localhost:3099/about # Test specific route +``` + +This fetches fresh HTML from your server and processes it through the trusted-server pipeline. Perfect for rapid iteration during development. + +### Full E2E Test + +Run a complete end-to-end test (builds production server): + +```bash +./test-streaming.sh +``` + +This script: +1. Builds and starts the Next.js production server +2. Fetches HTML from all routes +3. Verifies RSC content is present +4. Runs Rust integration tests +5. Shows streaming metrics for each route + +**Expected Results:** +- ✅ RSC payloads contain `origin.example.com` URLs before processing +- ✅ After processing through trusted-server pipeline: **0 origin URLs remain in RSC payloads** +- ✅ Streaming ratios: 20-40% for RSC pages (vs 0% before the fix) +- ✅ Non-RSC pages stream at 96%+ + +📖 See [TESTING.md](./TESTING.md) for detailed testing documentation. + +## Capturing Fixtures + +To regenerate the HTML fixtures used by Rust integration tests: + +```bash +npm ci +npm run capture-fixtures +``` + +This installs dependencies with `npm ci`, builds the app, starts `next start`, +captures HTML from each route, validates that RSC payloads are present, and +saves the output to `crates/common/src/integrations/nextjs/fixtures/`. + +## How It Works + +Each page component includes URLs with the `origin.example.com` hostname. When +Next.js renders these as RSC Flight data (inlined `