Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions BENCHMARKS.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ Performance benchmarks measuring L0 (Python) overhead on high-throughput streami

| Scenario | Tokens/s | Avg Duration | TTFT | Overhead |
| ------------------------ | --------- | ------------ | ------- | -------- |
| Baseline (raw streaming) | 1,518,271 | 1.32 ms | 0.02 ms | - |
| L0 Core (no features) | 551,696 | 3.63 ms | 0.08 ms | 175% |
| L0 + JSON Guardrail | 469,922 | 4.26 ms | 0.07 ms | 223% |
| L0 + All Guardrails | 367,328 | 5.44 ms | 0.08 ms | 313% |
| L0 + Drift Detection | 119,758 | 16.70 ms | 0.08 ms | 1166% |
| L0 Full Stack | 108,257 | 18.48 ms | 0.07 ms | 1301% |
| Baseline (raw streaming) | 1,406,390 | 1.42 ms | 0.02 ms | - |
| L0 Core (no features) | 596,086 | 3.36 ms | 0.10 ms | 136% |
| L0 + JSON Guardrail | 557,550 | 3.59 ms | 0.09 ms | 152% |
| L0 + All Guardrails | 547,991 | 3.65 ms | 0.09 ms | 157% |
| L0 + Drift Detection | 114,935 | 17.41 ms | 0.10 ms | 1124% |
| L0 Full Stack | 114,895 | 17.43 ms | 0.10 ms | 1126% |

**Legend:**
- **Tokens/s** = Throughput (higher is better)
Expand Down Expand Up @@ -60,12 +60,12 @@ result = await l0.run(

## Nvidia Blackwell Ready

Even with full guardrails, drift detection, and checkpointing enabled, L0 sustains **108K+ tokens/s** - well above current LLM inference speeds and ready for Nvidia Blackwell's 1000+ tokens/s streaming.
Even with full guardrails, drift detection, and checkpointing enabled, L0 sustains **114K+ tokens/s** - well above current LLM inference speeds and ready for Nvidia Blackwell's 1000+ tokens/s streaming.

| GPU Generation | Expected Tokens/s | L0 Headroom |
| ---------------- | ----------------- | ----------- |
| Current (H100) | ~100-200 | 540-1080x |
| Blackwell (B200) | ~1000+ | 108x |
| Current (H100) | ~100-200 | 574-1149x |
| Blackwell (B200) | ~1000+ | 114x |

## Python Version Note

Expand Down
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,21 @@ All L0 types have corresponding Pydantic models: `StateModel`, `RetryModel`, `Ti
- **Pure asyncio** - No compatibility layers, native Python async
- **Own retry logic** - No tenacity, full control over behavior

## Performance

Benchmarks on Apple M1 Max, Python 3.13, zero-delay mock streams (2000 tokens):

| Scenario | Tokens/s | Avg Duration | TTFT |
| ------------------------ | ----------- | ------------ | ----------- |
| Baseline (raw streaming) | 1,406,390 | 1.42 ms | 0.02 ms |
| L0 Core (no features) | 596,086 | 3.36 ms | 0.10 ms |
| L0 + JSON Guardrail | 557,550 | 3.59 ms | 0.09 ms |
| L0 + All Guardrails | 547,991 | 3.65 ms | 0.09 ms |
| L0 + Drift Detection | 114,935 | 17.41 ms | 0.10 ms |
| **L0 Full Stack** | **114,895** | **17.43 ms** | **0.10 ms** |

Full stack = JSON + Markdown + zero-output guardrails + drift detection + checkpointing. See [BENCHMARKS.md](./BENCHMARKS.md) for details.

## Documentation

| Guide | Description |
Expand Down
35 changes: 19 additions & 16 deletions src/l0/drift.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import math
import re
from collections import deque
from dataclasses import dataclass, field
from typing import Any, Literal

Expand Down Expand Up @@ -133,9 +134,9 @@ class DriftConfig:
class _DriftHistory:
"""Internal history tracking for drift detection."""

entropy: list[float] = field(default_factory=list)
tokens: list[str] = field(default_factory=list)
last_content: str = ""
entropy: deque[float] = field(default_factory=lambda: deque(maxlen=50))
tokens: deque[str] = field(default_factory=lambda: deque(maxlen=50))
last_window: str = "" # Store only the window, not full content


class DriftDetector:
Expand Down Expand Up @@ -163,7 +164,10 @@ def __init__(self, config: DriftConfig | None = None) -> None:
config: Detection configuration (uses defaults if not provided)
"""
self.config = config or DriftConfig()
self._history = _DriftHistory()
self._history = _DriftHistory(
entropy=deque(maxlen=self.config.entropy_window),
tokens=deque(maxlen=self.config.entropy_window),
)

def _get_window(self, content: str) -> str:
"""Get sliding window of content for analysis.
Expand Down Expand Up @@ -191,13 +195,11 @@ def check(self, content: str, delta: str | None = None) -> DriftResult:

# Use sliding window for content analysis (O(window_size) instead of O(content_length))
window = self._get_window(content)
last_window = self._get_window(self._history.last_content)
last_window = self._history.last_window

# Update history
# Update history (deque handles maxlen automatically)
if delta:
self._history.tokens.append(delta)
if len(self._history.tokens) > self.config.entropy_window:
self._history.tokens.pop(0)

# Check for meta commentary (on window only)
if self.config.detect_meta_commentary:
Expand All @@ -224,8 +226,6 @@ def check(self, content: str, delta: str | None = None) -> DriftResult:
if self.config.detect_entropy_spike and delta:
entropy = self._calculate_entropy(delta)
self._history.entropy.append(entropy)
if len(self._history.entropy) > self.config.entropy_window:
self._history.entropy.pop(0)

if self._detect_entropy_spike():
types.append("entropy_spike")
Expand All @@ -250,8 +250,8 @@ def check(self, content: str, delta: str | None = None) -> DriftResult:
confidence = max(confidence, 0.5)
details.append("Excessive hedging detected")

# Update last content
self._history.last_content = content
# Update last window (store only the window, not full content)
self._history.last_window = window

return DriftResult(
detected=len(types) > 0,
Expand Down Expand Up @@ -396,14 +396,17 @@ def _detect_excessive_hedging(self, content: str) -> bool:

def reset(self) -> None:
"""Reset detector state."""
self._history = _DriftHistory()
self._history = _DriftHistory(
entropy=deque(maxlen=self.config.entropy_window),
tokens=deque(maxlen=self.config.entropy_window),
)

def get_history(self) -> dict[str, Any]:
"""Get detection history."""
return {
"entropy": self._history.entropy.copy(),
"tokens": self._history.tokens.copy(),
"last_content": self._history.last_content,
"entropy": list(self._history.entropy),
"tokens": list(self._history.tokens),
"last_content": self._history.last_window,
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai Bot Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: get_history()["last_content"] now returns only the sliding window, which silently truncates long outputs behind the existing API name.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/l0/drift.py, line 409:

<comment>`get_history()["last_content"]` now returns only the sliding window, which silently truncates long outputs behind the existing API name.</comment>

<file context>
@@ -396,14 +396,17 @@ def _detect_excessive_hedging(self, content: str) -> bool:
-            "last_content": self._history.last_content,
+            "entropy": list(self._history.entropy),
+            "tokens": list(self._history.tokens),
+            "last_content": self._history.last_window,
         }
 
</file context>
Fix with Cubic

}


Expand Down
52 changes: 40 additions & 12 deletions src/l0/guardrails.py
Original file line number Diff line number Diff line change
Expand Up @@ -1560,19 +1560,23 @@ def json_rule() -> GuardrailRule:
# Incremental state for O(delta) streaming checks
incremental_state = IncrementalJsonState()
last_content_length = 0
is_json_content: bool | None = None # Cache: None=unknown, True/False=determined
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.

def check(state: State) -> list[GuardrailViolation]:
nonlocal incremental_state, last_content_length
nonlocal incremental_state, last_content_length, is_json_content

content = state.content
if not content.strip():
# Reset state when content is empty (new stream starting)
incremental_state = IncrementalJsonState()
last_content_length = 0
is_json_content = None
return []

# Only check if it looks like JSON
if not looks_like_json(content):
# Only check if it looks like JSON (cache after first determination)
if is_json_content is None:
is_json_content = looks_like_json(content)
if not is_json_content:
# Reset state when content doesn't look like JSON
incremental_state = IncrementalJsonState()
last_content_length = 0
Expand All @@ -1583,6 +1587,7 @@ def check(state: State) -> list[GuardrailViolation]:
if len(content) < last_content_length:
incremental_state = IncrementalJsonState()
last_content_length = 0
is_json_content = None

violations = []

Expand Down Expand Up @@ -1745,18 +1750,18 @@ def markdown_rule() -> GuardrailRule:
"""

def check(state: State) -> list[GuardrailViolation]:
# During streaming, markdown is always incomplete — skip expensive analysis
if not state.completed:
return []

content = state.content
if not content.strip():
return []

analysis = analyze_markdown_structure(content)
violations = []

# During streaming, only warn about unclosed fences
if not state.completed:
# This is expected during streaming, don't report
pass
else:
if True:
# On completion, report issues
for issue in analysis.issues:
severity: Severity = "warning"
Expand Down Expand Up @@ -1881,17 +1886,40 @@ def pattern_rule(
for cat_patterns in categories.values():
patterns.extend(cat_patterns)

# Pre-compile all patterns into a single combined regex for O(1) pass
combined = re.compile(
"|".join(f"(?:{p})" for p in patterns) if patterns else r"(?!x)x",
re.IGNORECASE | re.MULTILINE,
)
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
last_scanned_length = 0

def check(state: State) -> list[GuardrailViolation]:
nonlocal last_scanned_length
content = state.content

# Reset tracking if content was replaced (e.g. new stream)
if len(content) < last_scanned_length:
last_scanned_length = 0

# On completion, do a full scan to catch anything delta scanning might miss
# (e.g. ^-anchored patterns that span chunk boundaries).
# During streaming, only scan new content + overlap for performance.
if state.completed:
scan_start = 0
else:
scan_start = max(0, last_scanned_length - 50) # overlap for boundary matches
scan_region = content[scan_start:]
last_scanned_length = len(content)

violations = []
matches = find_bad_patterns(state.content, patterns)
for pattern, match in matches:
for match in combined.finditer(scan_region):
violations.append(
GuardrailViolation(
rule="pattern",
message=f"Matched unwanted pattern: {match.group()}",
severity="warning",
position=match.start(),
context={"pattern": pattern, "matched": match.group()},
position=scan_start + match.start(),
context={"matched": match.group()},
)
)
return violations
Expand Down
Loading