diff --git a/src/ralphify/_console_emitter.py b/src/ralphify/_console_emitter.py index 4e7b8fb6..c73280e7 100644 --- a/src/ralphify/_console_emitter.py +++ b/src/ralphify/_console_emitter.py @@ -51,6 +51,12 @@ _LIVE_REFRESH_RATE = 4 # Hz — how often the spinner redraws +# Scroll-line buffer limits for the Live renderable. Lines beyond +# _MAX_SCROLL_LINES are dropped from memory; only the most recent +# _MAX_VISIBLE_SCROLL are rendered inside the Live region. +_MAX_SCROLL_LINES = 50 +_MAX_VISIBLE_SCROLL = 15 + # The key that toggles live peek of agent output. Used here for status # messages and imported by cli.py for the keypress handler. PEEK_TOGGLE_KEY = "p" @@ -184,12 +190,35 @@ def __init__(self) -> None: self._input_tokens: int = 0 self._output_tokens: int = 0 self._cache_read_tokens: int = 0 + # Scroll lines rendered inside the Live region (transient). + self._scroll_lines: list[Text] = [] + self._peek_message: Text | None = None + + # ── Scroll buffer management ───────────────────────────────────── + + def add_scroll_line(self, markup: str) -> None: + """Append a Rich-markup scroll line to the transient buffer.""" + self._scroll_lines.append(Text.from_markup(markup)) + if len(self._scroll_lines) > _MAX_SCROLL_LINES: + self._scroll_lines.pop(0) + self._peek_message = None # content is flowing — clear status + + def clear_scroll(self) -> None: + """Drop all buffered scroll lines.""" + self._scroll_lines.clear() + + def set_peek_message(self, markup: str) -> None: + """Set a transient status message shown inside the Live region.""" + self._peek_message = Text.from_markup(markup) + + # ── Stream-json processing ─────────────────────────────────────── def apply(self, raw: dict[str, Any]) -> str | None: """Update panel state from a parsed stream-json dict. - Returns a string to print above the Live region (scroll log), or - ``None`` when no scroll output is needed. + Returns the scroll-line markup string (or ``None``). The line is + also appended to the internal scroll buffer so it renders inside + the Live region. """ event_type = raw.get("type") @@ -197,20 +226,22 @@ def apply(self, raw: dict[str, Any]) -> str | None: self._model = raw.get("model", "") return None - if event_type == "assistant": - return self._apply_assistant(raw) - - if event_type == "user": - return self._apply_user(raw) + scroll_line: str | None = None - if event_type == "rate_limit_event": + if event_type == "assistant": + scroll_line = self._apply_assistant(raw) + elif event_type == "user": + scroll_line = self._apply_user(raw) + elif event_type == "rate_limit_event": info = raw.get("rate_limit_info", {}) status = info.get("status", "") resets = info.get("resetsAt", "") - return f"[dim]⏱ rate limit: {escape_markup(str(status))}, resets {escape_markup(str(resets))}[/]" + scroll_line = f"[dim]⏱ rate limit: {escape_markup(str(status))}, resets {escape_markup(str(resets))}[/]" - # Unknown type — silent drop - return None + if scroll_line is not None: + self.add_scroll_line(scroll_line) + + return scroll_line def _apply_assistant(self, raw: dict[str, Any]) -> str | None: msg = raw.get("message", {}) @@ -304,6 +335,17 @@ def _format_categories(self) -> str: def __rich_console__( self, console: Console, options: ConsoleOptions ) -> RenderResult: + # Transient scroll lines (tool calls, text previews, etc.) + visible = self._scroll_lines[-_MAX_VISIBLE_SCROLL:] + for line in visible: + yield line + yield Text("\n") + + # Peek status message (shown when no scroll lines present) + if not self._scroll_lines and self._peek_message: + yield self._peek_message + yield Text("\n") + elapsed = time.monotonic() - self._start # Line 1: spinner + elapsed + tokens tokens = self._format_tokens() @@ -357,6 +399,7 @@ def __init__(self, console: Console) -> None: self._structured_agent: bool = False self._peek_broken: bool = False self._iteration_panel: _IterationPanel | None = None + self._iteration_spinner: _IterationSpinner | None = None # Single lock that serialises every ``_console.print`` call and # protects ``_peek_enabled`` mutations so that reader-thread / # keypress-thread writes cannot interleave with main-thread event @@ -388,8 +431,9 @@ def toggle_peek(self) -> bool: """Flip live-output rendering on or off. Safe to call from a non-main thread (e.g. the keypress listener). - Returns the new peek state. A short status banner is printed so - the user gets visible feedback that the toggle took effect. + Returns the new peek state. When a Live display is active the + status message is shown inside it (transient); otherwise it is + printed as a normal console line. """ with self._console_lock: self._peek_enabled = not self._peek_enabled @@ -402,7 +446,15 @@ def toggle_peek(self) -> bool: ) else: msg = _PEEK_OFF_MSG - self._console.print(msg) + + renderable = self._iteration_panel or self._iteration_spinner + if renderable is not None and self._live is not None: + if not enabled: + renderable.clear_scroll() + renderable.set_peek_message(msg) + self._live.update(renderable) + else: + self._console.print(msg) return enabled def _on_agent_output_line(self, data: AgentOutputLineData) -> None: @@ -413,7 +465,11 @@ def _on_agent_output_line(self, data: AgentOutputLineData) -> None: if self._structured_agent: return line = escape_markup(data["line"]) - self._console.print(f"[dim]{line}[/]") + spinner = self._iteration_spinner + if spinner is not None: + spinner.add_scroll_line(f"[dim]{line}[/]") + if self._live is not None: + self._live.update(spinner) def _on_agent_activity(self, data: AgentActivityData) -> None: """Handle structured agent activity events (Claude stream-json). @@ -433,10 +489,9 @@ def _on_agent_activity(self, data: AgentActivityData) -> None: panel = self._iteration_panel if panel is None: return - scroll_line = panel.apply(data["raw"]) - if scroll_line is not None: - self._console.print(scroll_line) + panel.apply(data["raw"]) # Update the Live renderable so it reflects new counters + # (scroll lines are now stored inside the panel). if self._live is not None: self._live.update(panel) except Exception: @@ -476,10 +531,13 @@ def _start_live_unlocked(self) -> None: if self._structured_agent: panel = _IterationPanel() self._iteration_panel = panel + self._iteration_spinner = None renderable = panel else: - renderable = _IterationSpinner() + spinner = _IterationSpinner() self._iteration_panel = None + self._iteration_spinner = spinner + renderable = spinner self._live = Live( renderable, console=self._console, @@ -494,6 +552,7 @@ def _stop_live_unlocked(self) -> None: self._live.stop() self._live = None self._iteration_panel = None + self._iteration_spinner = None def _stop_live(self) -> None: with self._console_lock: @@ -585,10 +644,34 @@ class _IterationSpinner: def __init__(self) -> None: self._spinner = Spinner("dots") self._start = time.monotonic() + self._scroll_lines: list[Text] = [] + self._peek_message: Text | None = None + + def add_scroll_line(self, markup: str) -> None: + """Append a Rich-markup scroll line to the transient buffer.""" + self._scroll_lines.append(Text.from_markup(markup)) + if len(self._scroll_lines) > _MAX_SCROLL_LINES: + self._scroll_lines.pop(0) + self._peek_message = None + + def clear_scroll(self) -> None: + """Drop all buffered scroll lines.""" + self._scroll_lines.clear() + + def set_peek_message(self, markup: str) -> None: + """Set a transient status message shown inside the Live region.""" + self._peek_message = Text.from_markup(markup) def __rich_console__( self, console: Console, options: ConsoleOptions ) -> RenderResult: + visible = self._scroll_lines[-_MAX_VISIBLE_SCROLL:] + for line in visible: + yield line + yield Text("\n") + if not self._scroll_lines and self._peek_message: + yield self._peek_message + yield Text("\n") elapsed = time.monotonic() - self._start text = Text(f" {format_duration(elapsed)}", style="dim") yield self._spinner diff --git a/tests/test_console_emitter.py b/tests/test_console_emitter.py index 5245af85..39d69df5 100644 --- a/tests/test_console_emitter.py +++ b/tests/test_console_emitter.py @@ -51,6 +51,8 @@ def test_peek_disabled_by_default_drops_output_lines(self): def test_toggle_peek_enables_rendering(self): emitter, console = _capture_emitter() assert emitter.toggle_peek() is True + # Start an iteration so a spinner with a scroll buffer exists. + emitter.emit(_make_event(EventType.ITERATION_STARTED, iteration=1)) emitter.emit( _make_event( EventType.AGENT_OUTPUT_LINE, @@ -59,12 +61,17 @@ def test_toggle_peek_enables_rendering(self): iteration=1, ) ) - assert "visible line" in console.export_text() + spinner = emitter._iteration_spinner + assert spinner is not None + assert any("visible line" in line.plain for line in spinner._scroll_lines) + emitter._stop_live() def test_toggle_peek_twice_disables_rendering(self): emitter, console = _capture_emitter() emitter.toggle_peek() # on assert emitter.toggle_peek() is False # off + # Start an iteration so a spinner exists. + emitter.emit(_make_event(EventType.ITERATION_STARTED, iteration=1)) emitter.emit( _make_event( EventType.AGENT_OUTPUT_LINE, @@ -73,7 +80,10 @@ def test_toggle_peek_twice_disables_rendering(self): iteration=1, ) ) - assert "should not appear" not in console.export_text() + spinner = emitter._iteration_spinner + assert spinner is not None + assert len(spinner._scroll_lines) == 0 + emitter._stop_live() def test_toggle_peek_prints_status_banner(self): emitter, console = _capture_emitter() @@ -95,6 +105,8 @@ def test_concurrent_peek_writes_do_not_interleave(self): emitter, console = _capture_emitter() emitter.toggle_peek() # turn peek on (console.is_terminal is False # in record mode, so the default is off; we flip it explicitly). + # Start an iteration so a spinner with a scroll buffer exists. + emitter.emit(_make_event(EventType.ITERATION_STARTED, iteration=1)) line_a = "A" * 50 line_b = "B" * 50 @@ -115,12 +127,15 @@ def worker(line: str) -> None: for t in threads: t.join() - output = console.export_text() - # Each worker prints ``iterations`` whole copies of its line. If the - # lock is working, both substrings appear exactly that many times; - # any interleaving would split one of them and drop the count. - assert output.count(line_a) == iterations - assert output.count(line_b) == iterations + spinner = emitter._iteration_spinner + assert spinner is not None + # Each worker adds ``iterations`` whole copies of its line to the + # scroll buffer. If the lock is working, both substrings appear + # exactly that many times; any interleaving would split one of them. + all_text = "\n".join(line.plain for line in spinner._scroll_lines) + assert all_text.count(line_a) == iterations + assert all_text.count(line_b) == iterations + emitter._stop_live() def test_toggle_peek_survives_console_print_error(self): """If ``_console.print`` raises inside ``toggle_peek``, the emitter @@ -142,6 +157,8 @@ def test_peek_line_escapes_rich_markup(self): markup — escape so the literal text is preserved.""" emitter, console = _capture_emitter() emitter.toggle_peek() + # Start an iteration so a spinner with a scroll buffer exists. + emitter.emit(_make_event(EventType.ITERATION_STARTED, iteration=1)) emitter.emit( _make_event( EventType.AGENT_OUTPUT_LINE, @@ -150,7 +167,12 @@ def test_peek_line_escapes_rich_markup(self): iteration=1, ) ) - assert "[bold red]not markup[/]" in console.export_text() + spinner = emitter._iteration_spinner + assert spinner is not None + assert any( + "[bold red]not markup[/]" in line.plain for line in spinner._scroll_lines + ) + emitter._stop_live() def test_run_started_shows_ralph_name(self): emitter, console = _capture_emitter() @@ -345,6 +367,60 @@ def test_no_startup_hint_when_peek_off(self): assert "live output on" not in output assert "press p" not in output + def test_toggle_peek_off_in_live_clears_scroll_buffer(self): + """Toggling peek off during an iteration clears the scroll buffer.""" + emitter, console = _capture_emitter() + emitter._peek_enabled = True + emitter._structured_agent = True + emitter.emit(_make_event(EventType.ITERATION_STARTED, iteration=1)) + # Emit some activity to populate the scroll buffer + emitter.emit( + _make_event( + EventType.AGENT_ACTIVITY, + raw={ + "type": "assistant", + "message": { + "content": [ + { + "type": "tool_use", + "name": "Read", + "input": {"file_path": "/tmp/foo.py"}, + } + ] + }, + }, + iteration=1, + ) + ) + panel = emitter._iteration_panel + assert panel is not None + assert len(panel._scroll_lines) > 0 + # Toggle peek off — should clear the buffer + emitter.toggle_peek() + assert len(panel._scroll_lines) == 0 + emitter._stop_live() + + def test_toggle_peek_in_live_sets_peek_message(self): + """Toggling peek with an active Live sets a message on the renderable.""" + emitter, console = _capture_emitter() + emitter._peek_enabled = True + emitter.emit(_make_event(EventType.ITERATION_STARTED, iteration=1)) + # Toggle peek off — should set peek message on the spinner + emitter.toggle_peek() + spinner = emitter._iteration_spinner + assert spinner is not None + assert spinner._peek_message is not None + assert "peek off" in spinner._peek_message.plain + emitter._stop_live() + + def test_toggle_peek_without_live_prints_to_console(self): + """Toggling peek without an active Live prints to the console.""" + emitter, console = _capture_emitter() + # No iteration started — no Live display + emitter.toggle_peek() + output = console.export_text() + assert "live output on" in output or "peek off" in output + class TestStructuredPeek: """Tests for the structured activity rendering (Claude agents).""" @@ -370,10 +446,12 @@ def test_agent_output_line_early_returns_for_claude(self): assert console.export_text().strip() == "" def test_non_claude_agent_keeps_raw_line_rendering(self): - """Non-claude agents still get dim raw-line rendering.""" + """Non-claude agents still get dim raw-line rendering inside Live.""" emitter, console = _capture_emitter() emitter._peek_enabled = True emitter._structured_agent = False + # Start an iteration so there's a spinner with a scroll buffer + emitter.emit(_make_event(EventType.ITERATION_STARTED, iteration=1)) emitter.emit( _make_event( EventType.AGENT_OUTPUT_LINE, @@ -382,10 +460,13 @@ def test_non_claude_agent_keeps_raw_line_rendering(self): iteration=1, ) ) - assert "raw agent output" in console.export_text() + spinner = emitter._iteration_spinner + assert spinner is not None + assert any("raw agent output" in line.plain for line in spinner._scroll_lines) + emitter._stop_live() def test_tool_use_scroll_line(self): - """Tool use events produce a scroll line above the Live region.""" + """Tool use events are buffered inside the panel's scroll buffer.""" emitter, console = self._make_structured_emitter() # Start an iteration so there's a panel emitter.emit(_make_event(EventType.ITERATION_STARTED, iteration=1)) @@ -407,13 +488,15 @@ def test_tool_use_scroll_line(self): iteration=1, ) ) + panel = emitter._iteration_panel + assert panel is not None + assert len(panel._scroll_lines) == 1 + assert "Bash" in panel._scroll_lines[0].plain + assert "uv run pytest" in panel._scroll_lines[0].plain emitter._stop_live() - output = console.export_text() - assert "Bash" in output - assert "uv run pytest" in output def test_assistant_text_scroll_line(self): - """Assistant text events produce a scroll line with a preview.""" + """Assistant text events are buffered inside the panel's scroll buffer.""" emitter, console = self._make_structured_emitter() emitter.emit(_make_event(EventType.ITERATION_STARTED, iteration=1)) emitter.emit( @@ -428,9 +511,10 @@ def test_assistant_text_scroll_line(self): iteration=1, ) ) + panel = emitter._iteration_panel + assert panel is not None + assert any("fix the bug" in line.plain for line in panel._scroll_lines) emitter._stop_live() - output = console.export_text() - assert "fix the bug" in output def test_thinking_does_not_scroll(self): """Thinking events update the panel status but don't produce scroll output.""" @@ -448,10 +532,10 @@ def test_thinking_does_not_scroll(self): iteration=1, ) ) + panel = emitter._iteration_panel + assert panel is not None + assert len(panel._scroll_lines) == 0 emitter._stop_live() - output = console.export_text() - # The thinking content should NOT appear in scroll output - assert "let me think" not in output def test_rate_limit_scroll_line(self): emitter, console = self._make_structured_emitter() @@ -469,10 +553,11 @@ def test_rate_limit_scroll_line(self): iteration=1, ) ) + panel = emitter._iteration_panel + assert panel is not None + assert any("rate limit" in line.plain for line in panel._scroll_lines) + assert any("rate_limited" in line.plain for line in panel._scroll_lines) emitter._stop_live() - output = console.export_text() - assert "rate limit" in output - assert "rate_limited" in output def test_unknown_type_silently_dropped(self): """Unknown event types are silently dropped, not errors.""" @@ -553,12 +638,13 @@ def test_peek_off_skips_activity(self): iteration=1, ) ) + panel = emitter._iteration_panel + assert panel is not None + assert len(panel._scroll_lines) == 0 emitter._stop_live() - output = console.export_text() - assert "Bash" not in output def test_tool_error_scroll_line(self): - """Tool result errors produce a scroll line.""" + """Tool result errors are buffered inside the panel's scroll buffer.""" emitter, console = self._make_structured_emitter() emitter.emit(_make_event(EventType.ITERATION_STARTED, iteration=1)) emitter.emit( @@ -580,10 +666,11 @@ def test_tool_error_scroll_line(self): iteration=1, ) ) + panel = emitter._iteration_panel + assert panel is not None + assert any("tool error" in line.plain for line in panel._scroll_lines) + assert any("File not found" in line.plain for line in panel._scroll_lines) emitter._stop_live() - output = console.export_text() - assert "tool error" in output - assert "File not found" in output class TestIterationLifecycle: @@ -1258,6 +1345,90 @@ def test_format_count_boundary_k_to_m(self): assert _IterationPanel._format_count(999_950) == "1.0M" assert _IterationPanel._format_count(999_999) == "1.0M" + def test_apply_stores_scroll_lines_in_buffer(self): + """apply() stores scroll lines in the internal buffer.""" + panel = _IterationPanel() + panel.apply( + { + "type": "assistant", + "message": { + "content": [ + { + "type": "tool_use", + "name": "Bash", + "input": {"command": "ls"}, + } + ] + }, + } + ) + assert len(panel._scroll_lines) == 1 + assert "Bash" in panel._scroll_lines[0].plain + + def test_scroll_lines_rendered_in_panel(self): + """Scroll lines appear in the panel's rendered output.""" + panel = _IterationPanel() + panel.add_scroll_line("[dim]🔧 Read /tmp/foo.py[/]") + console = Console(record=True, width=80) + console.print(panel) + output = console.export_text() + assert "Read" in output + assert "/tmp/foo.py" in output + + def test_clear_scroll_empties_buffer(self): + panel = _IterationPanel() + panel.add_scroll_line("[dim]line1[/]") + panel.add_scroll_line("[dim]line2[/]") + assert len(panel._scroll_lines) == 2 + panel.clear_scroll() + assert len(panel._scroll_lines) == 0 + + def test_peek_message_shown_when_no_scroll_lines(self): + """Peek message is rendered when scroll buffer is empty.""" + panel = _IterationPanel() + panel.set_peek_message("[dim]peek off[/]") + console = Console(record=True, width=80) + console.print(panel) + output = console.export_text() + assert "peek off" in output + + def test_peek_message_hidden_when_scroll_lines_present(self): + """Peek message is NOT rendered when scroll lines are present.""" + panel = _IterationPanel() + panel.set_peek_message("[dim]peek off[/]") + panel.add_scroll_line("[dim]tool output[/]") + console = Console(record=True, width=80) + console.print(panel) + output = console.export_text() + assert "peek off" not in output + assert "tool output" in output + + +class TestIterationSpinnerScrollLines: + def test_scroll_lines_rendered_in_spinner(self): + """Scroll lines appear in the spinner's rendered output.""" + spinner = _IterationSpinner() + spinner.add_scroll_line("[dim]raw output line[/]") + console = Console(record=True, width=80) + console.print(spinner) + output = console.export_text() + assert "raw output line" in output + + def test_clear_scroll_empties_spinner_buffer(self): + spinner = _IterationSpinner() + spinner.add_scroll_line("[dim]line1[/]") + assert len(spinner._scroll_lines) == 1 + spinner.clear_scroll() + assert len(spinner._scroll_lines) == 0 + + def test_peek_message_shown_in_spinner(self): + spinner = _IterationSpinner() + spinner.set_peek_message("[dim]live output on[/]") + console = Console(record=True, width=80) + console.print(spinner) + output = console.export_text() + assert "live output on" in output + class TestIsClaudeCommand: def test_claude_binary(self): diff --git a/tests/test_engine.py b/tests/test_engine.py index 8fe641f0..b05f8d64 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -1102,8 +1102,9 @@ def test_credit_false_no_trailer_in_agent_input(self, mock_run, tmp_path): class TestEchoCoordination: @patch(MOCK_SUBPROCESS) def test_no_double_print_with_log_dir_and_peek(self, mock_run, tmp_path): - """When --log-dir is set and peek is on, each agent output line - appears exactly once in the console — no double-print from echo.""" + """When --log-dir is set and peek is on, agent output lines are + rendered inside the transient Live display — they must NOT also + be echoed as permanent output at iteration end.""" mock_run.return_value = ok_proc( stdout_text="alpha\nbeta\ngamma\n", ) @@ -1117,10 +1118,13 @@ def test_no_double_print_with_log_dir_and_peek(self, mock_run, tmp_path): run_loop(config, state, emitter) + # Lines were shown inside the transient Live display (not permanent + # console output). The important invariant: they must not ALSO be + # echoed at iteration end (which would be double-printing). output = console.export_text() - assert output.count("alpha") == 1 - assert output.count("beta") == 1 - assert output.count("gamma") == 1 + assert output.count("alpha") == 0 + assert output.count("beta") == 0 + assert output.count("gamma") == 0 @patch(MOCK_SUBPROCESS) def test_echo_shown_when_peek_off_and_log_dir_set(self, mock_run, tmp_path): @@ -1166,8 +1170,9 @@ def test_agent_output_line_emitted_when_peek_toggled_mid_iteration( self, mock_run, tmp_path ): """Start with peek off, toggle on before agent runs — subsequent - lines appear as AGENT_OUTPUT_LINE events. Requires log_dir so the - callback path is taken (without log_dir the inherit path gives + lines appear as AGENT_OUTPUT_LINE events rendered inside the + transient Live display. Requires log_dir so the callback path + is taken (without log_dir the inherit path gives on_output_line=None and no mid-iteration toggle is possible).""" console = Console(record=True, width=120) emitter = ConsoleEmitter(console) @@ -1189,6 +1194,11 @@ def popen_with_toggle(*args, **kwargs): run_loop(config, state, emitter) + # Lines are rendered inside the transient Live display (not + # permanent console output) so they don't appear in export_text. + # The important invariant: they must not also be echoed at + # iteration end (which would be double-printing). output = console.export_text() - # With peek toggled on, agent output lines should appear - assert "first" in output or "second" in output or "third" in output + assert output.count("first") == 0 + assert output.count("second") == 0 + assert output.count("third") == 0 diff --git a/uv.lock b/uv.lock index 60093e66..98cc38d3 100644 --- a/uv.lock +++ b/uv.lock @@ -682,7 +682,7 @@ wheels = [ [[package]] name = "ralphify" -version = "0.3.0" +version = "0.4.0b1" source = { editable = "." } dependencies = [ { name = "pyyaml" },