Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 102 additions & 19 deletions src/ralphify/_console_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@

_LIVE_REFRESH_RATE = 4 # Hz — how often the spinner redraws

# Scroll-line buffer limits for the Live renderable. Lines beyond
# _MAX_SCROLL_LINES are dropped from memory; only the most recent
# _MAX_VISIBLE_SCROLL are rendered inside the Live region.
_MAX_SCROLL_LINES = 50
_MAX_VISIBLE_SCROLL = 15

# The key that toggles live peek of agent output. Used here for status
# messages and imported by cli.py for the keypress handler.
PEEK_TOGGLE_KEY = "p"
Expand Down Expand Up @@ -184,33 +190,58 @@ def __init__(self) -> None:
self._input_tokens: int = 0
self._output_tokens: int = 0
self._cache_read_tokens: int = 0
# Scroll lines rendered inside the Live region (transient).
self._scroll_lines: list[Text] = []
self._peek_message: Text | None = None

# ── Scroll buffer management ─────────────────────────────────────

def add_scroll_line(self, markup: str) -> None:
"""Append a Rich-markup scroll line to the transient buffer."""
self._scroll_lines.append(Text.from_markup(markup))
if len(self._scroll_lines) > _MAX_SCROLL_LINES:
self._scroll_lines.pop(0)
self._peek_message = None # content is flowing — clear status

def clear_scroll(self) -> None:
"""Drop all buffered scroll lines."""
self._scroll_lines.clear()

def set_peek_message(self, markup: str) -> None:
"""Set a transient status message shown inside the Live region."""
self._peek_message = Text.from_markup(markup)

# ── Stream-json processing ───────────────────────────────────────

def apply(self, raw: dict[str, Any]) -> str | None:
"""Update panel state from a parsed stream-json dict.

Returns a string to print above the Live region (scroll log), or
``None`` when no scroll output is needed.
Returns the scroll-line markup string (or ``None``). The line is
also appended to the internal scroll buffer so it renders inside
the Live region.
"""
event_type = raw.get("type")

if event_type == "system" and raw.get("subtype") == "init":
self._model = raw.get("model", "")
return None

if event_type == "assistant":
return self._apply_assistant(raw)

if event_type == "user":
return self._apply_user(raw)
scroll_line: str | None = None

if event_type == "rate_limit_event":
if event_type == "assistant":
scroll_line = self._apply_assistant(raw)
elif event_type == "user":
scroll_line = self._apply_user(raw)
elif event_type == "rate_limit_event":
info = raw.get("rate_limit_info", {})
status = info.get("status", "")
resets = info.get("resetsAt", "")
return f"[dim]⏱ rate limit: {escape_markup(str(status))}, resets {escape_markup(str(resets))}[/]"
scroll_line = f"[dim]⏱ rate limit: {escape_markup(str(status))}, resets {escape_markup(str(resets))}[/]"

# Unknown type — silent drop
return None
if scroll_line is not None:
self.add_scroll_line(scroll_line)

return scroll_line

def _apply_assistant(self, raw: dict[str, Any]) -> str | None:
msg = raw.get("message", {})
Expand Down Expand Up @@ -304,6 +335,17 @@ def _format_categories(self) -> str:
def __rich_console__(
self, console: Console, options: ConsoleOptions
) -> RenderResult:
# Transient scroll lines (tool calls, text previews, etc.)
visible = self._scroll_lines[-_MAX_VISIBLE_SCROLL:]
for line in visible:
yield line
yield Text("\n")

# Peek status message (shown when no scroll lines present)
if not self._scroll_lines and self._peek_message:
yield self._peek_message
yield Text("\n")

elapsed = time.monotonic() - self._start
# Line 1: spinner + elapsed + tokens
tokens = self._format_tokens()
Expand Down Expand Up @@ -357,6 +399,7 @@ def __init__(self, console: Console) -> None:
self._structured_agent: bool = False
self._peek_broken: bool = False
self._iteration_panel: _IterationPanel | None = None
self._iteration_spinner: _IterationSpinner | None = None
# Single lock that serialises every ``_console.print`` call and
# protects ``_peek_enabled`` mutations so that reader-thread /
# keypress-thread writes cannot interleave with main-thread event
Expand Down Expand Up @@ -388,8 +431,9 @@ def toggle_peek(self) -> bool:
"""Flip live-output rendering on or off.

Safe to call from a non-main thread (e.g. the keypress listener).
Returns the new peek state. A short status banner is printed so
the user gets visible feedback that the toggle took effect.
Returns the new peek state. When a Live display is active the
status message is shown inside it (transient); otherwise it is
printed as a normal console line.
"""
with self._console_lock:
self._peek_enabled = not self._peek_enabled
Expand All @@ -402,7 +446,15 @@ def toggle_peek(self) -> bool:
)
else:
msg = _PEEK_OFF_MSG
self._console.print(msg)

renderable = self._iteration_panel or self._iteration_spinner
if renderable is not None and self._live is not None:
if not enabled:
renderable.clear_scroll()
renderable.set_peek_message(msg)
self._live.update(renderable)
else:
self._console.print(msg)
return enabled

def _on_agent_output_line(self, data: AgentOutputLineData) -> None:
Expand All @@ -413,7 +465,11 @@ def _on_agent_output_line(self, data: AgentOutputLineData) -> None:
if self._structured_agent:
return
line = escape_markup(data["line"])
self._console.print(f"[dim]{line}[/]")
spinner = self._iteration_spinner
if spinner is not None:
spinner.add_scroll_line(f"[dim]{line}[/]")
if self._live is not None:
self._live.update(spinner)

def _on_agent_activity(self, data: AgentActivityData) -> None:
"""Handle structured agent activity events (Claude stream-json).
Expand All @@ -433,10 +489,9 @@ def _on_agent_activity(self, data: AgentActivityData) -> None:
panel = self._iteration_panel
if panel is None:
return
scroll_line = panel.apply(data["raw"])
if scroll_line is not None:
self._console.print(scroll_line)
panel.apply(data["raw"])
# Update the Live renderable so it reflects new counters
# (scroll lines are now stored inside the panel).
if self._live is not None:
self._live.update(panel)
except Exception:
Expand Down Expand Up @@ -476,10 +531,13 @@ def _start_live_unlocked(self) -> None:
if self._structured_agent:
panel = _IterationPanel()
self._iteration_panel = panel
self._iteration_spinner = None
renderable = panel
else:
renderable = _IterationSpinner()
spinner = _IterationSpinner()
self._iteration_panel = None
self._iteration_spinner = spinner
renderable = spinner
self._live = Live(
renderable,
console=self._console,
Expand All @@ -494,6 +552,7 @@ def _stop_live_unlocked(self) -> None:
self._live.stop()
self._live = None
self._iteration_panel = None
self._iteration_spinner = None

def _stop_live(self) -> None:
with self._console_lock:
Expand Down Expand Up @@ -585,10 +644,34 @@ class _IterationSpinner:
def __init__(self) -> None:
self._spinner = Spinner("dots")
self._start = time.monotonic()
self._scroll_lines: list[Text] = []
self._peek_message: Text | None = None

def add_scroll_line(self, markup: str) -> None:
"""Append a Rich-markup scroll line to the transient buffer."""
self._scroll_lines.append(Text.from_markup(markup))
if len(self._scroll_lines) > _MAX_SCROLL_LINES:
self._scroll_lines.pop(0)
self._peek_message = None

def clear_scroll(self) -> None:
"""Drop all buffered scroll lines."""
self._scroll_lines.clear()

def set_peek_message(self, markup: str) -> None:
"""Set a transient status message shown inside the Live region."""
self._peek_message = Text.from_markup(markup)

def __rich_console__(
self, console: Console, options: ConsoleOptions
) -> RenderResult:
visible = self._scroll_lines[-_MAX_VISIBLE_SCROLL:]
for line in visible:
yield line
yield Text("\n")
if not self._scroll_lines and self._peek_message:
yield self._peek_message
yield Text("\n")
elapsed = time.monotonic() - self._start
text = Text(f" {format_duration(elapsed)}", style="dim")
yield self._spinner
Expand Down
Loading
Loading