diff --git a/BENCHMARKS.md b/BENCHMARKS.md index e5c5d8b..84770f8 100644 --- a/BENCHMARKS.md +++ b/BENCHMARKS.md @@ -12,12 +12,12 @@ Performance benchmarks measuring L0 (Python) overhead on high-throughput streami | Scenario | Tokens/s | Avg Duration | TTFT | Overhead | | ------------------------ | --------- | ------------ | ------- | -------- | -| Baseline (raw streaming) | 1,518,271 | 1.32 ms | 0.02 ms | - | -| L0 Core (no features) | 551,696 | 3.63 ms | 0.08 ms | 175% | -| L0 + JSON Guardrail | 469,922 | 4.26 ms | 0.07 ms | 223% | -| L0 + All Guardrails | 367,328 | 5.44 ms | 0.08 ms | 313% | -| L0 + Drift Detection | 119,758 | 16.70 ms | 0.08 ms | 1166% | -| L0 Full Stack | 108,257 | 18.48 ms | 0.07 ms | 1301% | +| Baseline (raw streaming) | 1,406,390 | 1.42 ms | 0.02 ms | - | +| L0 Core (no features) | 596,086 | 3.36 ms | 0.10 ms | 136% | +| L0 + JSON Guardrail | 557,550 | 3.59 ms | 0.09 ms | 152% | +| L0 + All Guardrails | 547,991 | 3.65 ms | 0.09 ms | 157% | +| L0 + Drift Detection | 114,935 | 17.41 ms | 0.10 ms | 1124% | +| L0 Full Stack | 114,895 | 17.43 ms | 0.10 ms | 1126% | **Legend:** - **Tokens/s** = Throughput (higher is better) @@ -60,12 +60,12 @@ result = await l0.run( ## Nvidia Blackwell Ready -Even with full guardrails, drift detection, and checkpointing enabled, L0 sustains **108K+ tokens/s** - well above current LLM inference speeds and ready for Nvidia Blackwell's 1000+ tokens/s streaming. +Even with full guardrails, drift detection, and checkpointing enabled, L0 sustains **114K+ tokens/s** - well above current LLM inference speeds and ready for Nvidia Blackwell's 1000+ tokens/s streaming. | GPU Generation | Expected Tokens/s | L0 Headroom | | ---------------- | ----------------- | ----------- | -| Current (H100) | ~100-200 | 540-1080x | -| Blackwell (B200) | ~1000+ | 108x | +| Current (H100) | ~100-200 | 574-1149x | +| Blackwell (B200) | ~1000+ | 114x | ## Python Version Note diff --git a/README.md b/README.md index bc1539b..30d36fa 100644 --- a/README.md +++ b/README.md @@ -290,6 +290,21 @@ All L0 types have corresponding Pydantic models: `StateModel`, `RetryModel`, `Ti - **Pure asyncio** - No compatibility layers, native Python async - **Own retry logic** - No tenacity, full control over behavior +## Performance + +Benchmarks on Apple M1 Max, Python 3.13, zero-delay mock streams (2000 tokens): + +| Scenario | Tokens/s | Avg Duration | TTFT | +| ------------------------ | ----------- | ------------ | ----------- | +| Baseline (raw streaming) | 1,406,390 | 1.42 ms | 0.02 ms | +| L0 Core (no features) | 596,086 | 3.36 ms | 0.10 ms | +| L0 + JSON Guardrail | 557,550 | 3.59 ms | 0.09 ms | +| L0 + All Guardrails | 547,991 | 3.65 ms | 0.09 ms | +| L0 + Drift Detection | 114,935 | 17.41 ms | 0.10 ms | +| **L0 Full Stack** | **114,895** | **17.43 ms** | **0.10 ms** | + +Full stack = JSON + Markdown + zero-output guardrails + drift detection + checkpointing. See [BENCHMARKS.md](./BENCHMARKS.md) for details. + ## Documentation | Guide | Description | diff --git a/src/l0/drift.py b/src/l0/drift.py index 6a8bb64..fcb06e4 100644 --- a/src/l0/drift.py +++ b/src/l0/drift.py @@ -14,6 +14,7 @@ import math import re +from collections import deque from dataclasses import dataclass, field from typing import Any, Literal @@ -133,9 +134,9 @@ class DriftConfig: class _DriftHistory: """Internal history tracking for drift detection.""" - entropy: list[float] = field(default_factory=list) - tokens: list[str] = field(default_factory=list) - last_content: str = "" + entropy: deque[float] = field(default_factory=lambda: deque(maxlen=50)) + tokens: deque[str] = field(default_factory=lambda: deque(maxlen=50)) + last_window: str = "" # Store only the window, not full content class DriftDetector: @@ -163,7 +164,10 @@ def __init__(self, config: DriftConfig | None = None) -> None: config: Detection configuration (uses defaults if not provided) """ self.config = config or DriftConfig() - self._history = _DriftHistory() + self._history = _DriftHistory( + entropy=deque(maxlen=self.config.entropy_window), + tokens=deque(maxlen=self.config.entropy_window), + ) def _get_window(self, content: str) -> str: """Get sliding window of content for analysis. @@ -191,13 +195,11 @@ def check(self, content: str, delta: str | None = None) -> DriftResult: # Use sliding window for content analysis (O(window_size) instead of O(content_length)) window = self._get_window(content) - last_window = self._get_window(self._history.last_content) + last_window = self._history.last_window - # Update history + # Update history (deque handles maxlen automatically) if delta: self._history.tokens.append(delta) - if len(self._history.tokens) > self.config.entropy_window: - self._history.tokens.pop(0) # Check for meta commentary (on window only) if self.config.detect_meta_commentary: @@ -224,8 +226,6 @@ def check(self, content: str, delta: str | None = None) -> DriftResult: if self.config.detect_entropy_spike and delta: entropy = self._calculate_entropy(delta) self._history.entropy.append(entropy) - if len(self._history.entropy) > self.config.entropy_window: - self._history.entropy.pop(0) if self._detect_entropy_spike(): types.append("entropy_spike") @@ -250,8 +250,8 @@ def check(self, content: str, delta: str | None = None) -> DriftResult: confidence = max(confidence, 0.5) details.append("Excessive hedging detected") - # Update last content - self._history.last_content = content + # Update last window (store only the window, not full content) + self._history.last_window = window return DriftResult( detected=len(types) > 0, @@ -396,14 +396,17 @@ def _detect_excessive_hedging(self, content: str) -> bool: def reset(self) -> None: """Reset detector state.""" - self._history = _DriftHistory() + self._history = _DriftHistory( + entropy=deque(maxlen=self.config.entropy_window), + tokens=deque(maxlen=self.config.entropy_window), + ) def get_history(self) -> dict[str, Any]: """Get detection history.""" return { - "entropy": self._history.entropy.copy(), - "tokens": self._history.tokens.copy(), - "last_content": self._history.last_content, + "entropy": list(self._history.entropy), + "tokens": list(self._history.tokens), + "last_content": self._history.last_window, } diff --git a/src/l0/guardrails.py b/src/l0/guardrails.py index b0a2ad1..4f7d1ef 100644 --- a/src/l0/guardrails.py +++ b/src/l0/guardrails.py @@ -1560,19 +1560,23 @@ def json_rule() -> GuardrailRule: # Incremental state for O(delta) streaming checks incremental_state = IncrementalJsonState() last_content_length = 0 + is_json_content: bool | None = None # Cache: None=unknown, True/False=determined def check(state: State) -> list[GuardrailViolation]: - nonlocal incremental_state, last_content_length + nonlocal incremental_state, last_content_length, is_json_content content = state.content if not content.strip(): # Reset state when content is empty (new stream starting) incremental_state = IncrementalJsonState() last_content_length = 0 + is_json_content = None return [] - # Only check if it looks like JSON - if not looks_like_json(content): + # Only check if it looks like JSON (cache after first determination) + if is_json_content is None: + is_json_content = looks_like_json(content) + if not is_json_content: # Reset state when content doesn't look like JSON incremental_state = IncrementalJsonState() last_content_length = 0 @@ -1583,6 +1587,7 @@ def check(state: State) -> list[GuardrailViolation]: if len(content) < last_content_length: incremental_state = IncrementalJsonState() last_content_length = 0 + is_json_content = None violations = [] @@ -1745,6 +1750,10 @@ def markdown_rule() -> GuardrailRule: """ def check(state: State) -> list[GuardrailViolation]: + # During streaming, markdown is always incomplete — skip expensive analysis + if not state.completed: + return [] + content = state.content if not content.strip(): return [] @@ -1752,11 +1761,7 @@ def check(state: State) -> list[GuardrailViolation]: analysis = analyze_markdown_structure(content) violations = [] - # During streaming, only warn about unclosed fences - if not state.completed: - # This is expected during streaming, don't report - pass - else: + if True: # On completion, report issues for issue in analysis.issues: severity: Severity = "warning" @@ -1881,17 +1886,40 @@ def pattern_rule( for cat_patterns in categories.values(): patterns.extend(cat_patterns) + # Pre-compile all patterns into a single combined regex for O(1) pass + combined = re.compile( + "|".join(f"(?:{p})" for p in patterns) if patterns else r"(?!x)x", + re.IGNORECASE | re.MULTILINE, + ) + last_scanned_length = 0 + def check(state: State) -> list[GuardrailViolation]: + nonlocal last_scanned_length + content = state.content + + # Reset tracking if content was replaced (e.g. new stream) + if len(content) < last_scanned_length: + last_scanned_length = 0 + + # On completion, do a full scan to catch anything delta scanning might miss + # (e.g. ^-anchored patterns that span chunk boundaries). + # During streaming, only scan new content + overlap for performance. + if state.completed: + scan_start = 0 + else: + scan_start = max(0, last_scanned_length - 50) # overlap for boundary matches + scan_region = content[scan_start:] + last_scanned_length = len(content) + violations = [] - matches = find_bad_patterns(state.content, patterns) - for pattern, match in matches: + for match in combined.finditer(scan_region): violations.append( GuardrailViolation( rule="pattern", message=f"Matched unwanted pattern: {match.group()}", severity="warning", - position=match.start(), - context={"pattern": pattern, "matched": match.group()}, + position=scan_start + match.start(), + context={"matched": match.group()}, ) ) return violations diff --git a/src/l0/runtime.py b/src/l0/runtime.py index d3e2b03..cf02263 100644 --- a/src/l0/runtime.py +++ b/src/l0/runtime.py @@ -482,6 +482,7 @@ async def run_stream() -> AsyncIterator[Event]: # Checkpoint invalid - start fresh logger.debug("Checkpoint validation failed, starting fresh") state.content = "" + state._content_buffer.clear() state.token_count = 0 pending_checkpoint = None @@ -718,59 +719,69 @@ async def emit_buffered_tool_calls() -> AsyncIterator[Event]: state.token_count, ) - # Fire on_event callback for token events - _fire_callback(cb.on_event, event) - - # Fire on_token callback - _fire_callback(cb.on_token, token_text) + # Fire per-token callbacks (skip function call overhead when None) + if cb.on_event is not None: + _fire_callback(cb.on_event, event) + if cb.on_token is not None: + _fire_callback(cb.on_token, token_text) # Check guardrails periodically if ( state.token_count % guardrail_interval == 0 and guardrails ): - phase_start_time = time.perf_counter() - event_bus.emit( - ObservabilityEventType.GUARDRAIL_PHASE_START, - phase="post", - ruleCount=len(guardrails), - ) - all_violations = [] - for idx, rule in enumerate(guardrails): - callback_id = _next_callback_id() - rule_start_time = time.perf_counter() + _has_obs = event_bus._handler is not None + + if _has_obs: + phase_start_time = time.perf_counter() event_bus.emit( - ObservabilityEventType.GUARDRAIL_RULE_START, - index=idx, - ruleId=rule.name, - callbackId=callback_id, + ObservabilityEventType.GUARDRAIL_PHASE_START, + phase="post", + ruleCount=len(guardrails), ) + + all_violations = [] + for idx, rule in enumerate(guardrails): + if _has_obs: + callback_id = _next_callback_id() + rule_start_time = time.perf_counter() + event_bus.emit( + ObservabilityEventType.GUARDRAIL_RULE_START, + index=idx, + ruleId=rule.name, + callbackId=callback_id, + ) + rule_violations = rule.check(state) - passed = len(rule_violations) == 0 - rule_duration_ms = int( - (time.perf_counter() - rule_start_time) * 1000 - ) - # Emit result for each rule - event_bus.emit( - ObservabilityEventType.GUARDRAIL_RULE_RESULT, - index=idx, - ruleId=rule.name, - passed=passed, - violation=rule_violations[0].__dict__ - if rule_violations - else None, - ) + + if _has_obs: + passed = len(rule_violations) == 0 + rule_duration_ms = int( + (time.perf_counter() - rule_start_time) * 1000 + ) + event_bus.emit( + ObservabilityEventType.GUARDRAIL_RULE_RESULT, + index=idx, + ruleId=rule.name, + passed=passed, + violation=rule_violations[0].__dict__ + if rule_violations + else None, + ) + if rule_violations: all_violations.extend(rule_violations) - event_bus.emit( - ObservabilityEventType.GUARDRAIL_RULE_END, - index=idx, - ruleId=rule.name, - passed=passed, - callbackId=callback_id, - durationMs=rule_duration_ms, - ) + + if _has_obs: + event_bus.emit( + ObservabilityEventType.GUARDRAIL_RULE_END, + index=idx, + ruleId=rule.name, + passed=passed, + callbackId=callback_id, + durationMs=rule_duration_ms, + ) if all_violations: state.violations.extend(all_violations) @@ -778,22 +789,24 @@ async def emit_buffered_tool_calls() -> AsyncIterator[Event]: for v in all_violations: _fire_callback(cb.on_violation, v) - phase_duration_ms = int( - (time.perf_counter() - phase_start_time) * 1000 - ) - event_bus.emit( - ObservabilityEventType.GUARDRAIL_PHASE_END, - phase="post", - passed=len(all_violations) == 0, - violations=[v.__dict__ for v in all_violations], - durationMs=phase_duration_ms, - ) + if _has_obs: + phase_duration_ms = int( + (time.perf_counter() - phase_start_time) * 1000 + ) + event_bus.emit( + ObservabilityEventType.GUARDRAIL_PHASE_END, + phase="post", + passed=len(all_violations) == 0, + violations=[v.__dict__ for v in all_violations], + durationMs=phase_duration_ms, + ) # Check drift periodically if ( drift_detector is not None and state.token_count % drift_interval == 0 ): + drift_result = drift_detector.check( state.content, token_text ) @@ -1189,6 +1202,7 @@ async def emit_buffered_tool_calls() -> AsyncIterator[Event]: else: # Reset state for fresh retry (no continuation) state.content = "" + state._content_buffer.clear() state.token_count = 0 state.checkpoint = "" state.completed = False diff --git a/src/l0/state.py b/src/l0/state.py index eb9562c..dc5c34e 100644 --- a/src/l0/state.py +++ b/src/l0/state.py @@ -18,12 +18,17 @@ def update_checkpoint(state: State) -> None: def append_token(state: State, token: str) -> None: - """Append token to content and update timing.""" + """Append token to content buffer and update timing. + + Tokens are buffered in _content_buffer and joined lazily via the + _ContentDescriptor when state.content is read. This gives O(n) total + cost instead of O(n^2) from repeated string concatenation. + """ now = time.time() if state.first_token_at is None: state.first_token_at = now state.last_token_at = now - state.content += token + state._content_buffer.append(token) state.token_count += 1 diff --git a/src/l0/types.py b/src/l0/types.py index 7475037..94bad46 100644 --- a/src/l0/types.py +++ b/src/l0/types.py @@ -260,11 +260,34 @@ class BackoffStrategy(str, Enum): # ───────────────────────────────────────────────────────────────────────────── +class _ContentDescriptor: + """Descriptor that auto-flushes _content_buffer on read for O(n) total concat.""" + + def __set_name__(self, owner: type, name: str) -> None: + self._attr = "_content_value" + + def __get__(self, obj: Any, objtype: type | None = None) -> Any: + if obj is None: + return self + buf = obj.__dict__.get("_content_buffer") + if buf: + obj.__dict__[self._attr] = obj.__dict__.get(self._attr, "") + "".join(buf) + buf.clear() + return obj.__dict__.get(self._attr, "") + + def __set__(self, obj: Any, value: str) -> None: + obj.__dict__[self._attr] = value + buf = obj.__dict__.get("_content_buffer") + if buf: + buf.clear() + + @dataclass class State: """Runtime state tracking.""" - content: str = "" + content: str = "" # type: ignore[assignment] # managed by _ContentDescriptor + _content_buffer: list[str] = field(default_factory=list, init=False, repr=False, compare=False) checkpoint: str = "" # Last known good slice for continuation token_count: int = 0 model_retry_count: int = 0 @@ -290,6 +313,11 @@ class State: overlap_removed: str | None = None # The overlapping text that was removed +# Install the descriptor after dataclass creation so it intercepts attribute access +State.content = _ContentDescriptor() # type: ignore[assignment] +State.content.__set_name__(State, "content") + + # ───────────────────────────────────────────────────────────────────────────── # Retry + Timeout # Retry delays are in seconds (Pythonic). Timeout uses milliseconds (TS parity).