From 7556108d2995558cf7d295ec9b76b1d8782158c9 Mon Sep 17 00:00:00 2001 From: David Gageot Date: Mon, 11 May 2026 18:45:36 +0200 Subject: [PATCH] refactor: replace chan Event threading with EventSink interface --- pkg/app/app.go | 8 +- pkg/app/app_test.go | 4 +- pkg/cli/runner_test.go | 20 ++-- pkg/runtime/agent_delegation.go | 22 ++-- pkg/runtime/cache.go | 4 +- pkg/runtime/commands_test.go | 6 +- pkg/runtime/event_sink.go | 75 ++++++++++++++ pkg/runtime/fallback.go | 8 +- pkg/runtime/hooks.go | 24 ++--- pkg/runtime/loop.go | 116 ++++++++++------------ pkg/runtime/loop_steps.go | 12 +-- pkg/runtime/loop_steps_test.go | 22 ++-- pkg/runtime/model_picker.go | 8 +- pkg/runtime/pre_tool_use_approval_test.go | 4 +- pkg/runtime/remote_runtime.go | 18 ++-- pkg/runtime/runtime.go | 25 +++-- pkg/runtime/runtime_test.go | 68 ++++++------- pkg/runtime/session_compaction.go | 12 +-- pkg/runtime/session_compaction_test.go | 8 +- pkg/runtime/skill_runner.go | 2 +- pkg/runtime/streaming.go | 12 +-- pkg/runtime/tool_dispatch.go | 36 +++---- 22 files changed, 290 insertions(+), 224 deletions(-) create mode 100644 pkg/runtime/event_sink.go diff --git a/pkg/app/app.go b/pkg/app/app.go index 5147a7526..1454f3023 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -125,7 +125,7 @@ func New(ctx context.Context, rt runtime.Runtime, sess *session.Session, opts .. startupEvents := make(chan runtime.Event, 10) go func() { defer close(startupEvents) - rt.EmitStartupInfo(ctx, sess, startupEvents) + rt.EmitStartupInfo(ctx, sess, runtime.NewChannelSink(startupEvents)) }() for event := range startupEvents { select { @@ -303,7 +303,7 @@ func (a *App) ResolveCommand(ctx context.Context, userInput string) string { // EmitStartupInfo emits initial agent, team, and toolset information to the provided channel func (a *App) EmitStartupInfo(ctx context.Context, events chan runtime.Event) { - a.runtime.EmitStartupInfo(ctx, a.session, events) + a.runtime.EmitStartupInfo(ctx, a.session, runtime.NewChannelSink(events)) } // Run one agent loop @@ -657,7 +657,7 @@ func (a *App) reEmitStartupInfo(ctx context.Context) { startupEvents := make(chan runtime.Event, 10) go func() { defer close(startupEvents) - a.runtime.EmitStartupInfo(ctx, a.session, startupEvents) + a.runtime.EmitStartupInfo(ctx, a.session, runtime.NewChannelSink(startupEvents)) }() for event := range startupEvents { select { @@ -885,7 +885,7 @@ func (a *App) CompactSession(ctx context.Context, additionalPrompt string) { events := make(chan runtime.Event, 100) go func() { defer close(events) - a.runtime.Summarize(ctx, sess, additionalPrompt, events) + a.runtime.Summarize(ctx, sess, additionalPrompt, runtime.NewChannelSink(events)) }() for event := range events { if ctx.Err() != nil { diff --git a/pkg/app/app_test.go b/pkg/app/app_test.go index efc72186e..7f2304c5f 100644 --- a/pkg/app/app_test.go +++ b/pkg/app/app_test.go @@ -40,7 +40,7 @@ func (m *mockRuntime) CurrentAgentTools(ctx context.Context) ([]tools.Tool, erro func (m *mockRuntime) CurrentAgentToolsetStatuses() []tools.ToolsetStatus { return nil } func (m *mockRuntime) RestartToolset(context.Context, string) error { return nil } -func (m *mockRuntime) EmitStartupInfo(ctx context.Context, sess *session.Session, events chan runtime.Event) { +func (m *mockRuntime) EmitStartupInfo(ctx context.Context, sess *session.Session, events runtime.EventSink) { } func (m *mockRuntime) ResetStartupInfo() {} func (m *mockRuntime) RunStream(ctx context.Context, sess *session.Session) <-chan runtime.Event { @@ -57,7 +57,7 @@ func (m *mockRuntime) ResumeElicitation(ctx context.Context, action tools.Elicit return nil } func (m *mockRuntime) SessionStore() session.Store { return m.store } -func (m *mockRuntime) Summarize(ctx context.Context, sess *session.Session, additionalPrompt string, events chan runtime.Event) { +func (m *mockRuntime) Summarize(ctx context.Context, sess *session.Session, additionalPrompt string, events runtime.EventSink) { } func (m *mockRuntime) PermissionsInfo() *runtime.PermissionsInfo { return nil } func (m *mockRuntime) CurrentAgentSkillsToolset() *skillstool.Toolset { diff --git a/pkg/cli/runner_test.go b/pkg/cli/runner_test.go index 3ab00de55..4c14f7b63 100644 --- a/pkg/cli/runner_test.go +++ b/pkg/cli/runner_test.go @@ -31,12 +31,12 @@ func (m *mockRuntime) CurrentAgentName() string { return "test" } func (m *mockRuntime) CurrentAgentInfo(context.Context) runtime.CurrentAgentInfo { return runtime.CurrentAgentInfo{Name: "test"} } -func (m *mockRuntime) SetCurrentAgent(string) error { return nil } -func (m *mockRuntime) CurrentAgentTools(context.Context) ([]tools.Tool, error) { return nil, nil } -func (m *mockRuntime) CurrentAgentToolsetStatuses() []tools.ToolsetStatus { return nil } -func (m *mockRuntime) RestartToolset(context.Context, string) error { return nil } -func (m *mockRuntime) EmitStartupInfo(context.Context, *session.Session, chan runtime.Event) {} -func (m *mockRuntime) ResetStartupInfo() {} +func (m *mockRuntime) SetCurrentAgent(string) error { return nil } +func (m *mockRuntime) CurrentAgentTools(context.Context) ([]tools.Tool, error) { return nil, nil } +func (m *mockRuntime) CurrentAgentToolsetStatuses() []tools.ToolsetStatus { return nil } +func (m *mockRuntime) RestartToolset(context.Context, string) error { return nil } +func (m *mockRuntime) EmitStartupInfo(context.Context, *session.Session, runtime.EventSink) {} +func (m *mockRuntime) ResetStartupInfo() {} func (m *mockRuntime) Run(context.Context, *session.Session) ([]session.Message, error) { return nil, nil } @@ -48,10 +48,10 @@ func (m *mockRuntime) ResumeElicitation(_ context.Context, action tools.Elicitat m.elicitationLastAction = action return nil } -func (m *mockRuntime) SessionStore() session.Store { return nil } -func (m *mockRuntime) Summarize(context.Context, *session.Session, string, chan runtime.Event) {} -func (m *mockRuntime) PermissionsInfo() *runtime.PermissionsInfo { return nil } -func (m *mockRuntime) CurrentAgentSkillsToolset() *skillstool.Toolset { return nil } +func (m *mockRuntime) SessionStore() session.Store { return nil } +func (m *mockRuntime) Summarize(context.Context, *session.Session, string, runtime.EventSink) {} +func (m *mockRuntime) PermissionsInfo() *runtime.PermissionsInfo { return nil } +func (m *mockRuntime) CurrentAgentSkillsToolset() *skillstool.Toolset { return nil } func (m *mockRuntime) CurrentMCPPrompts(context.Context) map[string]mcptools.PromptInfo { return nil } diff --git a/pkg/runtime/agent_delegation.go b/pkg/runtime/agent_delegation.go index 27672137a..1c24a393a 100644 --- a/pkg/runtime/agent_delegation.go +++ b/pkg/runtime/agent_delegation.go @@ -214,16 +214,16 @@ func mergeExcludedTools(parent, child []string) []string { // // Use as `defer r.swapCurrentAgent(ctx, sessionID, from, to, evts)()` so the // swap takes effect immediately and the restore runs at function exit. -func (r *LocalRuntime) swapCurrentAgent(ctx context.Context, sessionID string, from, to *agent.Agent, evts chan<- Event) func() { - evts <- AgentSwitching(true, from.Name(), to.Name()) +func (r *LocalRuntime) swapCurrentAgent(ctx context.Context, sessionID string, from, to *agent.Agent, evts EventSink) func() { + evts.Emit(AgentSwitching(true, from.Name(), to.Name())) r.executeOnAgentSwitchHooks(ctx, from, sessionID, from.Name(), to.Name(), agentSwitchKindTransferTask) r.setCurrentAgent(to.Name()) - evts <- AgentInfo(to.Name(), getAgentModelID(to), to.Description(), to.WelcomeMessage()) + evts.Emit(AgentInfo(to.Name(), getAgentModelID(to), to.Description(), to.WelcomeMessage())) return func() { r.setCurrentAgent(from.Name()) - evts <- AgentSwitching(false, to.Name(), from.Name()) + evts.Emit(AgentSwitching(false, to.Name(), from.Name())) r.executeOnAgentSwitchHooks(ctx, from, sessionID, to.Name(), from.Name(), agentSwitchKindTransferTaskReturn) - evts <- AgentInfo(from.Name(), getAgentModelID(from), from.Description(), from.WelcomeMessage()) + evts.Emit(AgentInfo(from.Name(), getAgentModelID(from), from.Description(), from.WelcomeMessage())) } } @@ -245,7 +245,7 @@ func (r *LocalRuntime) swapCurrentAgent(ctx context.Context, sessionID string, f // swapping the current agent (if requested), resolving the child agent, // building the sub-session, driving RunStream, and recording the // sub-session on the parent. -func (r *LocalRuntime) runForwarding(ctx context.Context, parent *session.Session, evts chan<- Event, req delegationRequest) (*tools.ToolCallResult, error) { +func (r *LocalRuntime) runForwarding(ctx context.Context, parent *session.Session, evts EventSink, req delegationRequest) (*tools.ToolCallResult, error) { span := trace.SpanFromContext(ctx) callerAgent, err := r.team.Agent(r.CurrentAgentName()) @@ -276,12 +276,12 @@ func (r *LocalRuntime) runForwarding(ctx context.Context, parent *session.Sessio childEvents := r.RunStream(ctx, s) for event := range childEvents { - evts <- event + evts.Emit(event) if errEvent, ok := event.(*ErrorEvent); ok { // Drain remaining events (including StreamStoppedEvent) so the // TUI's streamDepth counter stays balanced. for remaining := range childEvents { - evts <- remaining + evts.Emit(remaining) } err := fmt.Errorf("%s", errEvent.Error) span.RecordError(err) @@ -292,7 +292,7 @@ func (r *LocalRuntime) runForwarding(ctx context.Context, parent *session.Sessio parent.ToolsApproved = s.ToolsApproved parent.AddSubSession(s) - evts <- SubSessionCompleted(parent.ID, s, callerAgent.Name()) + evts.Emit(SubSessionCompleted(parent.ID, s, callerAgent.Name())) span.SetStatus(codes.Ok, "sub-session completed") return tools.ResultSuccess(s.GetLastAssistantMessageContent()), nil @@ -391,7 +391,7 @@ func (r *LocalRuntime) RunAgent(ctx context.Context, params agenttool.RunParams) }, params.OnContent) } -func (r *LocalRuntime) handleTaskTransfer(ctx context.Context, sess *session.Session, toolCall tools.ToolCall, evts chan Event) (*tools.ToolCallResult, error) { +func (r *LocalRuntime) handleTaskTransfer(ctx context.Context, sess *session.Session, toolCall tools.ToolCall, evts EventSink) (*tools.ToolCallResult, error) { var params struct { Agent string `json:"agent"` Task string `json:"task"` @@ -428,7 +428,7 @@ func (r *LocalRuntime) handleTaskTransfer(ctx context.Context, sess *session.Ses }) } -func (r *LocalRuntime) handleHandoff(ctx context.Context, sess *session.Session, toolCall tools.ToolCall, _ chan Event) (*tools.ToolCallResult, error) { +func (r *LocalRuntime) handleHandoff(ctx context.Context, sess *session.Session, toolCall tools.ToolCall, _ EventSink) (*tools.ToolCallResult, error) { var params handoff.Args if err := json.Unmarshal([]byte(toolCall.Function.Arguments), ¶ms); err != nil { return nil, fmt.Errorf("invalid arguments: %w", err) diff --git a/pkg/runtime/cache.go b/pkg/runtime/cache.go index 4423b3955..03bc62df7 100644 --- a/pkg/runtime/cache.go +++ b/pkg/runtime/cache.go @@ -53,7 +53,7 @@ func (r *LocalRuntime) tryReplayCachedResponse( ctx context.Context, sess *session.Session, a *agent.Agent, - events chan Event, + events EventSink, ) bool { c := a.Cache() if c == nil { @@ -76,7 +76,7 @@ func (r *LocalRuntime) tryReplayCachedResponse( slog.DebugContext(ctx, "Response cache hit; replaying cached answer", "agent", a.Name(), "session_id", sess.ID) modelID := a.Model(ctx).ID() - events <- AgentInfo(a.Name(), modelID, a.Description(), a.WelcomeMessage()) + events.Emit(AgentInfo(a.Name(), modelID, a.Description(), a.WelcomeMessage())) addAgentMessage(sess, a, &chat.Message{ Role: chat.MessageRoleAssistant, Content: cached, diff --git a/pkg/runtime/commands_test.go b/pkg/runtime/commands_test.go index 8a91dcd1c..2a42409dd 100644 --- a/pkg/runtime/commands_test.go +++ b/pkg/runtime/commands_test.go @@ -37,8 +37,8 @@ func (m *mockRuntime) CurrentAgentInfo(context.Context) CurrentAgentInfo { func (m *mockRuntime) SetCurrentAgent(string) error { return nil } -func (m *mockRuntime) EmitStartupInfo(context.Context, *session.Session, chan Event) {} -func (m *mockRuntime) ResetStartupInfo() {} +func (m *mockRuntime) EmitStartupInfo(context.Context, *session.Session, EventSink) {} +func (m *mockRuntime) ResetStartupInfo() {} func (m *mockRuntime) RunStream(context.Context, *session.Session) <-chan Event { return nil } @@ -51,7 +51,7 @@ func (m *mockRuntime) ResumeElicitation(context.Context, tools.ElicitationAction return nil } func (m *mockRuntime) SessionStore() session.Store { return nil } -func (m *mockRuntime) Summarize(context.Context, *session.Session, string, chan Event) { +func (m *mockRuntime) Summarize(context.Context, *session.Session, string, EventSink) { } func (m *mockRuntime) PermissionsInfo() *PermissionsInfo { return nil } func (m *mockRuntime) CurrentAgentSkillsToolset() *skillstool.Toolset { diff --git a/pkg/runtime/event_sink.go b/pkg/runtime/event_sink.go new file mode 100644 index 000000000..b2b4594e3 --- /dev/null +++ b/pkg/runtime/event_sink.go @@ -0,0 +1,75 @@ +package runtime + +// EventSink is the write side of the runtime's event stream. Methods +// that produce events accept an EventSink instead of a raw chan Event, +// decoupling event producers from the channel implementation. +// +// Implementations must be safe for concurrent use: multiple goroutines +// may call Emit simultaneously (e.g. tool-call handlers running in +// parallel). +type EventSink interface { + // Emit delivers an event to the sink. The default implementation + // preserves back-pressure: if the consumer is slow, Emit blocks + // until the event can be delivered. Implementations must still be + // panic-safe so a closed channel does not crash the caller. + Emit(event Event) +} + +// channelSink adapts a chan Event into an [EventSink]. Sends are +// blocking so back-pressure is preserved between producers and the +// consumer. Send-on-closed-channel panics are recovered and the event +// is dropped, since a closed channel signals that the consumer has +// gone away. +type channelSink struct { + ch chan Event +} + +// NewChannelSink returns an [EventSink] that writes to ch. This is the +// standard bridge between the runtime's EventSink-based internals and +// external callers that create raw event channels (e.g. app.go, +// tests). +func NewChannelSink(ch chan Event) EventSink { + return &channelSink{ch: ch} +} + +func (s *channelSink) Emit(e Event) { + defer func() { recover() }() //nolint:errcheck // swallow send-on-closed-channel panic + s.ch <- e +} + +// nonBlockingChannelSink wraps an event channel with non-blocking +// semantics: if the buffer is full, the event is dropped instead of +// blocking the producer. Use this only from long-lived goroutines +// that may outlive the event channel (e.g. the RAG file watcher); +// regular runtime code should use the blocking [channelSink] so +// back-pressure is preserved. +type nonBlockingChannelSink struct { + ch chan Event +} + +// nonBlocking returns a non-blocking sink for sink. If sink wraps a +// channel directly, the result writes to that channel with a +// select-default; otherwise, sink is returned unchanged because +// non-channel sinks (notably [EventSinkFunc] used in tests) do not +// have an underlying buffer that can fill up. +func nonBlocking(sink EventSink) EventSink { + if cs, ok := sink.(*channelSink); ok { + return nonBlockingChannelSink{ch: cs.ch} + } + return sink +} + +func (s nonBlockingChannelSink) Emit(e Event) { + defer func() { recover() }() //nolint:errcheck // swallow send-on-closed-channel panic + select { + case s.ch <- e: + default: + } +} + +// EventSinkFunc adapts a plain function into an [EventSink], following +// the same adapter pattern as http.HandlerFunc. This is convenient for +// tests and one-off callbacks that don't need a full struct. +type EventSinkFunc func(Event) + +func (f EventSinkFunc) Emit(e Event) { f(e) } diff --git a/pkg/runtime/fallback.go b/pkg/runtime/fallback.go index ae82b83c8..dab915ceb 100644 --- a/pkg/runtime/fallback.go +++ b/pkg/runtime/fallback.go @@ -230,7 +230,7 @@ func (e *fallbackExecutor) execute( agentTools []tools.Tool, sess *session.Session, m *modelsdev.Model, - events chan Event, + events EventSink, ) (streamResult, provider.Provider, error) { fallbackModels := a.FallbackModels() fallbackRetries := getEffectiveRetries(a) @@ -272,14 +272,14 @@ func (e *fallbackExecutor) execute( if lastErr != nil { reason = lastErr.Error() } - events <- ModelFallback( + events.Emit(ModelFallback( a.Name(), prevModelID, modelEntry.provider.ID(), reason, attempt+1, maxAttempts, - ) + )) } slog.DebugContext(ctx, "Creating chat completion stream", @@ -308,7 +308,7 @@ func (e *fallbackExecutor) execute( // of the selected sub-model's YAML-configured name. if rp, ok := modelEntry.provider.(interface{ LastSelectedModelID() string }); ok { if selected := rp.LastSelectedModelID(); selected != "" { - events <- AgentInfo(a.Name(), selected, a.Description(), a.WelcomeMessage()) + events.Emit(AgentInfo(a.Name(), selected, a.Description(), a.WelcomeMessage())) } } diff --git a/pkg/runtime/hooks.go b/pkg/runtime/hooks.go index 671a752bc..6ca312f77 100644 --- a/pkg/runtime/hooks.go +++ b/pkg/runtime/hooks.go @@ -97,7 +97,7 @@ func (r *LocalRuntime) dispatchHook( a *agent.Agent, event hooks.EventType, input *hooks.Input, - events chan Event, + events EventSink, ) *hooks.Result { exec := r.hooksExec(a) if exec == nil || !exec.Has(event) { @@ -106,11 +106,11 @@ func (r *LocalRuntime) dispatchHook( started := time.Now() if events != nil { - events <- HookStarted(event, input.SessionID, a.Name()) + events.Emit(HookStarted(event, input.SessionID, a.Name())) } result, err := exec.Dispatch(ctx, event, input) if events != nil { - events <- HookFinished(event, input.SessionID, result, err, time.Since(started), a.Name()) + events.Emit(HookFinished(event, input.SessionID, result, err, time.Since(started), a.Name())) } if err != nil { slog.WarnContext(ctx, "Hook execution failed", "event", event, "agent", a.Name(), "error", err) @@ -118,7 +118,7 @@ func (r *LocalRuntime) dispatchHook( } if events != nil && result.SystemMessage != "" { - events <- Warning(result.SystemMessage, a.Name()) + events.Emit(Warning(result.SystemMessage, a.Name())) } return result } @@ -138,7 +138,7 @@ func (r *LocalRuntime) dispatchHook( // included in every model call after a compaction, without any extra // dispatch — there's nothing to "re-inject" because nothing was // persisted in the first place. -func (r *LocalRuntime) executeSessionStartHooks(ctx context.Context, sess *session.Session, a *agent.Agent, events chan Event) []chat.Message { +func (r *LocalRuntime) executeSessionStartHooks(ctx context.Context, sess *session.Session, a *agent.Agent, events EventSink) []chat.Message { return contextMessages(r.dispatchHook(ctx, a, hooks.EventSessionStart, &hooks.Input{ SessionID: sess.ID, Source: "startup", @@ -151,7 +151,7 @@ func (r *LocalRuntime) executeSessionStartHooks(ctx context.Context, sess *sessi // every iteration so its content is recomputed each turn — the right // semantics for fast-changing context like the current date or the // contents of a prompt file the user might be editing mid-session. -func (r *LocalRuntime) executeTurnStartHooks(ctx context.Context, sess *session.Session, a *agent.Agent, events chan Event) []chat.Message { +func (r *LocalRuntime) executeTurnStartHooks(ctx context.Context, sess *session.Session, a *agent.Agent, events EventSink) []chat.Message { return contextMessages(r.dispatchHook(ctx, a, hooks.EventTurnStart, &hooks.Input{ SessionID: sess.ID, }, events)) @@ -191,7 +191,7 @@ const ( // turn_start. Observational; the result is discarded. Reason is one // of the turnEndReason* constants above and is reported via // [hooks.Input.Reason] so handlers can branch on the exit path. -func (r *LocalRuntime) executeTurnEndHooks(ctx context.Context, sess *session.Session, a *agent.Agent, reason string, events chan Event) { +func (r *LocalRuntime) executeTurnEndHooks(ctx context.Context, sess *session.Session, a *agent.Agent, reason string, events EventSink) { r.dispatchHook(ctx, a, hooks.EventTurnEnd, &hooks.Input{ SessionID: sess.ID, AgentName: a.Name(), @@ -226,7 +226,7 @@ func (r *LocalRuntime) executeSessionEndHooks(ctx context.Context, sess *session // surfaced as a Warning by [dispatchHook]. AgentName + LastUserMessage // are populated so builtins like cache_response can key on the user's // question and resolve the agent through the runtime closure. -func (r *LocalRuntime) executeStopHooks(ctx context.Context, sess *session.Session, a *agent.Agent, responseContent string, events chan Event) { +func (r *LocalRuntime) executeStopHooks(ctx context.Context, sess *session.Session, a *agent.Agent, responseContent string, events EventSink) { r.dispatchHook(ctx, a, hooks.EventStop, &hooks.Input{ SessionID: sess.ID, AgentName: a.Name(), @@ -482,7 +482,7 @@ func (r *LocalRuntime) executeBeforeCompactionHooks( a *agent.Agent, reason string, contextLimit int64, - events chan Event, + events EventSink, ) *hooks.Result { return r.dispatchHook(ctx, a, hooks.EventBeforeCompaction, &hooks.Input{ SessionID: sess.ID, @@ -509,7 +509,7 @@ func (r *LocalRuntime) executeAfterCompactionHooks( contextLimit int64, preInputTokens, preOutputTokens int64, summary string, - events chan Event, + events EventSink, ) { r.dispatchHook(ctx, a, hooks.EventAfterCompaction, &hooks.Input{ SessionID: sess.ID, @@ -527,7 +527,7 @@ func (r *LocalRuntime) executeAfterCompactionHooks( // (decision="block" / continue=false / exit 2) stops the run loop; // AdditionalContext is returned as a transient system message that // callers splice into the conversation for that turn only. -func (r *LocalRuntime) executeUserPromptSubmitHooks(ctx context.Context, sess *session.Session, a *agent.Agent, prompt string, events chan Event) (stop bool, message string, contextMsgs []chat.Message) { +func (r *LocalRuntime) executeUserPromptSubmitHooks(ctx context.Context, sess *session.Session, a *agent.Agent, prompt string, events EventSink) (stop bool, message string, contextMsgs []chat.Message) { result := r.dispatchHook(ctx, a, hooks.EventUserPromptSubmit, &hooks.Input{ SessionID: sess.ID, Prompt: prompt, @@ -546,7 +546,7 @@ func (r *LocalRuntime) executeUserPromptSubmitHooks(ctx context.Context, sess *s // is reported in [hooks.Input.Source]. A terminating verdict skips // compaction entirely; AdditionalContext is appended to the // compaction prompt so handlers can steer the summary. -func (r *LocalRuntime) executePreCompactHooks(ctx context.Context, sess *session.Session, a *agent.Agent, source string, events chan Event) (skip bool, message, additionalPrompt string) { +func (r *LocalRuntime) executePreCompactHooks(ctx context.Context, sess *session.Session, a *agent.Agent, source string, events EventSink) (skip bool, message, additionalPrompt string) { result := r.dispatchHook(ctx, a, hooks.EventPreCompact, &hooks.Input{ SessionID: sess.ID, Source: source, diff --git a/pkg/runtime/loop.go b/pkg/runtime/loop.go index cf0dd4d41..443d647af 100644 --- a/pkg/runtime/loop.go +++ b/pkg/runtime/loop.go @@ -40,16 +40,16 @@ func (r *LocalRuntime) registerDefaultTools() { r.toolMap[skills.ToolNameRunSkill] = r.handleRunSkill r.bgAgents.RegisterHandlers(func(name string, fn func(context.Context, *session.Session, tools.ToolCall) (*tools.ToolCallResult, error)) { - r.toolMap[name] = func(ctx context.Context, sess *session.Session, tc tools.ToolCall, _ chan Event) (*tools.ToolCallResult, error) { + r.toolMap[name] = func(ctx context.Context, sess *session.Session, tc tools.ToolCall, _ EventSink) (*tools.ToolCallResult, error) { return fn(ctx, sess, tc) } }) } // appendSteerAndEmit adds a steer message to the session and emits the corresponding event. -func (r *LocalRuntime) appendSteerAndEmit(sess *session.Session, sm QueuedMessage, events chan<- Event) { +func (r *LocalRuntime) appendSteerAndEmit(sess *session.Session, sm QueuedMessage, events EventSink) { sess.AddMessage(session.UserMessage(sm.Content, sm.MultiContent...)) - events <- UserMessage(sm.Content, sess.ID, sm.MultiContent, len(sess.Messages)-1) + events.Emit(UserMessage(sm.Content, sess.ID, sm.MultiContent, len(sess.Messages)-1)) } // drainAndEmitSteered drains all messages from the steer queue and injects @@ -72,7 +72,7 @@ func (r *LocalRuntime) appendSteerAndEmit(sess *session.Session, sm QueuedMessag // // Returns (true, messageCountBefore) if any messages were drained and emitted; // (false, 0) otherwise. -func (r *LocalRuntime) drainAndEmitSteered(ctx context.Context, sess *session.Session, events chan<- Event) (bool, int) { +func (r *LocalRuntime) drainAndEmitSteered(ctx context.Context, sess *session.Session, events EventSink) (bool, int) { steered := r.steerQueue.Drain(ctx) if len(steered) == 0 { return false, 0 @@ -119,7 +119,7 @@ func (r *LocalRuntime) emitHookDrivenShutdown( a *agent.Agent, sess *session.Session, message string, - events chan Event, + events EventSink, ) { if message == "" { // aggregate() always populates Result.Message on a deny @@ -127,7 +127,7 @@ func (r *LocalRuntime) emitHookDrivenShutdown( // block without a reason. message = "Agent terminated by a hook." } - events <- ErrorWithCode(ErrorCodeHookBlocked, message) + events.Emit(ErrorWithCode(ErrorCodeHookBlocked, message)) r.notifyError(ctx, a, sess.ID, message) } @@ -181,6 +181,8 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c // goroutine so it has a real name in stack traces and is easier to navigate // in editors. func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, events chan Event) { + sink := &channelSink{ch: events} + // Seed the cagent session ID at the run-loop boundary so any // gateway-bound HTTP call originating from this loop can correlate // back to the originating session. Plumbing happens in @@ -209,23 +211,23 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, // stable. ls := &loopState{ maxIterations: sess.MaxIterations, - sessionStartMsgs: r.executeSessionStartHooks(ctx, sess, a, events), + sessionStartMsgs: r.executeSessionStartHooks(ctx, sess, a, sink), } // Emit team information - events <- TeamInfo(r.agentDetailsFromTeam(), a.Name()) + sink.Emit(TeamInfo(r.agentDetailsFromTeam(), a.Name())) - r.emitAgentWarnings(a, chanSend(events)) - r.configureToolsetHandlers(a, events) + r.emitAgentWarnings(a, sink) + r.configureToolsetHandlers(a, sink) - agentTools, err := r.getTools(ctx, a, sessionSpan, events, true) + agentTools, err := r.getTools(ctx, a, sessionSpan, sink, true) if err != nil { - events <- ErrorWithCode(ErrorCodeToolFailed, fmt.Sprintf("failed to get tools: %v", err)) + sink.Emit(ErrorWithCode(ErrorCodeToolFailed, fmt.Sprintf("failed to get tools: %v", err))) return } agentTools = filterExcludedTools(agentTools, sess.ExcludedTools) - events <- ToolsetInfo(len(agentTools), false, a.Name()) + sink.Emit(ToolsetInfo(len(agentTools), false, a.Name())) messages := sess.GetMessages(a) @@ -236,23 +238,23 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, // signal here too: "a real user prompt is at the tail of the session". if sess.SendUserMessage && len(messages) > 0 { lastMsg := messages[len(messages)-1] - events <- UserMessage(lastMsg.Content, sess.ID, lastMsg.MultiContent, len(sess.Messages)-1) + sink.Emit(UserMessage(lastMsg.Content, sess.ID, lastMsg.MultiContent, len(sess.Messages)-1)) // user_prompt_submit fires once per real user message, after // session_start and before the first model call. if lastMsg.Role == chat.MessageRoleUser { - stop, msg, ctxMsgs := r.executeUserPromptSubmitHooks(ctx, sess, a, lastMsg.Content, events) + stop, msg, ctxMsgs := r.executeUserPromptSubmitHooks(ctx, sess, a, lastMsg.Content, sink) if stop { slog.WarnContext(ctx, "user_prompt_submit hook signalled run termination", "agent", a.Name(), "session_id", sess.ID, "reason", msg) - r.emitHookDrivenShutdown(ctx, a, sess, msg, events) + r.emitHookDrivenShutdown(ctx, a, sess, msg, sink) return } ls.userPromptMsgs = ctxMsgs } } - events <- StreamStarted(sess.ID, a.Name()) + sink.Emit(StreamStarted(sess.ID, a.Name())) // streamReason records the exit reason from the final turn so // finalizeEventChannel can surface it in the StreamStoppedEvent. @@ -266,7 +268,7 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, // skip the model entirely. The matching storage half is // implemented as the cache_response stop-hook builtin (see // runtime/cache.go and getHooksExecutor). - if r.tryReplayCachedResponse(ctx, sess, a, events) { + if r.tryReplayCachedResponse(ctx, sess, a, sink) { return } @@ -300,12 +302,12 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, ls.prevAgentName = a.Name() } - r.emitAgentWarnings(a, chanSend(events)) - r.configureToolsetHandlers(a, events) + r.emitAgentWarnings(a, sink) + r.configureToolsetHandlers(a, sink) - agentTools, err := r.getTools(ctx, a, sessionSpan, events, true) + agentTools, err := r.getTools(ctx, a, sessionSpan, sink, true) if err != nil { - events <- ErrorWithCode(ErrorCodeToolFailed, fmt.Sprintf("failed to get tools: %v", err)) + sink.Emit(ErrorWithCode(ErrorCodeToolFailed, fmt.Sprintf("failed to get tools: %v", err))) return } agentTools = filterExcludedTools(agentTools, sess.ExcludedTools) @@ -313,10 +315,10 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, // Emit updated tool count. After a ToolListChanged MCP notification // the cache is invalidated, so getTools above re-fetches from the // server and may return a different count. - events <- ToolsetInfo(len(agentTools), false, a.Name()) + sink.Emit(ToolsetInfo(len(agentTools), false, a.Name())) // Check iteration limit - newMax, decision := r.enforceMaxIterations(ctx, sess, a, ls.iteration, ls.maxIterations, events) + newMax, decision := r.enforceMaxIterations(ctx, sess, a, ls.iteration, ls.maxIterations, sink) if decision == iterationStop { return } @@ -352,7 +354,7 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, // Notify sidebar of the model for this turn. For rule-based // routing, the actual routed model is emitted from within the // stream once the first chunk arrives. - events <- AgentInfo(a.Name(), modelID, a.Description(), a.WelcomeMessage()) + sink.Emit(AgentInfo(a.Name(), modelID, a.Description(), a.WelcomeMessage())) slog.DebugContext(ctx, "Using agent", "agent", a.Name(), "model", modelID) slog.DebugContext(ctx, "Getting model definition", "model_id", modelID) @@ -366,14 +368,14 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, contextLimit = int64(m.Limit.Context) if r.sessionCompaction && compaction.ShouldCompact(sess.InputTokens, sess.OutputTokens, 0, contextLimit) { - r.compactWithReason(ctx, sess, "", compactionReasonThreshold, events) + r.compactWithReason(ctx, sess, "", compactionReasonThreshold, sink) } } // Drain steer messages queued while idle or before the first model call // (covers idle-window and first-turn-miss races). - if drained, messageCountBeforeSteer := r.drainAndEmitSteered(ctx, sess, events); drained { - r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeSteer, events) + if drained, messageCountBeforeSteer := r.drainAndEmitSteered(ctx, sess, sink); drained { + r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeSteer, sink) } // Everything from turn_start onwards is wrapped in a closure so a @@ -385,7 +387,7 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, // AFTER the closure body has assigned both, so callers see the same // reason the runtime took. ctrl drives the outer for-loop's // continue-or-exit decision. - ctrl := r.runTurn(ctx, sess, a, m, model, modelID, contextLimit, sessionSpan, agentTools, ls, events) + ctrl := r.runTurn(ctx, sess, a, m, model, modelID, contextLimit, sessionSpan, agentTools, ls, sink) streamReason = ls.exitReason switch ctrl { case turnContinue: @@ -455,7 +457,7 @@ func (r *LocalRuntime) runTurn( sessionSpan trace.Span, agentTools []tools.Tool, ls *loopState, - events chan Event, + events EventSink, ) turnControl { streamCtx, streamSpan := r.startSpan(ctx, "runtime.stream", trace.WithAttributes( attribute.String("agent", a.Name()), @@ -563,7 +565,7 @@ func (r *LocalRuntime) runTurn( if usedModel != nil && usedModel.ID() != model.ID() { slog.InfoContext(ctx, "Used fallback model", "agent", a.Name(), "primary", model.ID(), "used", usedModel.ID()) - events <- AgentInfo(a.Name(), usedModel.ID(), a.Description(), a.WelcomeMessage()) + events.Emit(AgentInfo(a.Name(), usedModel.ID(), a.Description(), a.WelcomeMessage())) } streamSpan.SetAttributes( attribute.Int("tool.calls", len(res.Calls)), @@ -577,7 +579,7 @@ func (r *LocalRuntime) runTurn( usage := SessionUsage(sess, contextLimit) usage.LastMessage = msgUsage - events <- NewTokenUsageEvent(sess.ID, a.Name(), usage) + events.Emit(NewTokenUsageEvent(sess.ID, a.Name(), usage)) // Record the message count before tool calls so we can // measure how much content was added by tool results. @@ -613,7 +615,7 @@ func (r *LocalRuntime) runTurn( "Agent terminated: detected %d consecutive identical calls to %s. "+ "This indicates a degenerate loop where the model is not making progress.", consecutive, toolName) - events <- ErrorWithCode(ErrorCodeLoopDetected, errMsg) + events.Emit(ErrorWithCode(ErrorCodeLoopDetected, errMsg)) r.notifyError(ctx, a, sess.ID, errMsg) ls.loopDetector.Reset() endReason = turnEndReasonLoopDetected @@ -663,7 +665,7 @@ func (r *LocalRuntime) runTurn( if followUp, ok := r.followUpQueue.Dequeue(ctx); ok { userMsg := session.UserMessage(followUp.Content, followUp.MultiContent...) sess.AddMessage(userMsg) - events <- UserMessage(followUp.Content, sess.ID, followUp.MultiContent, len(sess.Messages)-1) + events.Emit(UserMessage(followUp.Content, sess.ID, followUp.MultiContent, len(sess.Messages)-1)) r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events) endReason = turnEndReasonContinue return turnContinue // re-enter the loop for a new turn @@ -701,7 +703,7 @@ func (r *LocalRuntime) recordAssistantMessage( agentTools []tools.Tool, modelID string, m *modelsdev.Model, - events chan Event, + events EventSink, ) *MessageUsage { if strings.TrimSpace(res.Content) == "" && len(res.Calls) == 0 { slog.Debug("Skipping empty assistant message (no content and no tool calls)", "agent", a.Name()) @@ -775,7 +777,7 @@ func (r *LocalRuntime) compactIfNeeded( m *modelsdev.Model, contextLimit int64, messageCountBefore int, - events chan Event, + events EventSink, ) { if m == nil || !r.sessionCompaction || contextLimit <= 0 { return @@ -805,10 +807,10 @@ func (r *LocalRuntime) compactIfNeeded( // getTools executes tool retrieval with automatic OAuth handling. // emitLifecycleEvents controls whether MCPInitStarted/Finished are emitted; // pass false when calling from reprobe to avoid spurious TUI spinner flicker. -func (r *LocalRuntime) getTools(ctx context.Context, a *agent.Agent, sessionSpan trace.Span, events chan Event, emitLifecycleEvents bool) ([]tools.Tool, error) { +func (r *LocalRuntime) getTools(ctx context.Context, a *agent.Agent, sessionSpan trace.Span, events EventSink, emitLifecycleEvents bool) ([]tools.Tool, error) { if emitLifecycleEvents && len(a.ToolSets()) > 0 { - events <- MCPInitStarted(a.Name()) - defer func() { events <- MCPInitFinished(a.Name()) }() + events.Emit(MCPInitStarted(a.Name())) + defer func() { events.Emit(MCPInitFinished(a.Name())) }() } agentTools, err := a.Tools(ctx) @@ -825,17 +827,22 @@ func (r *LocalRuntime) getTools(ctx context.Context, a *agent.Agent, sessionSpan } // configureToolsetHandlers sets up elicitation and OAuth handlers for all toolsets of an agent. -func (r *LocalRuntime) configureToolsetHandlers(a *agent.Agent, events chan Event) { +func (r *LocalRuntime) configureToolsetHandlers(a *agent.Agent, events EventSink) { for _, toolset := range a.ToolSets() { tools.ConfigureHandlers(toolset, r.elicitationHandler, - func() { events <- Authorization(tools.ElicitationActionAccept, a.Name()) }, + func() { events.Emit(Authorization(tools.ElicitationActionAccept, a.Name())) }, r.managedOAuth, ) // Wire RAG event forwarding so the TUI shows indexing progress. + // Use a non-blocking sink because the RAG file watcher is a + // long-lived goroutine that may outlive the per-message events + // channel; a blocking send after the channel is closed would + // crash, and a blocking send when the consumer has gone away + // would deadlock. if ragTool, ok := tools.As[*builtinrag.Tool](toolset); ok { - ragTool.SetEventCallback(ragEventForwarder(ragTool.Name(), r, chanSend(events))) + ragTool.SetEventCallback(ragEventForwarder(ragTool.Name(), r, nonBlocking(events).Emit)) } } } @@ -846,13 +853,13 @@ func (r *LocalRuntime) configureToolsetHandlers(a *agent.Agent, events chan Even // not emitted — "X is now available" reads as a spurious warning right // after the user completes an OAuth dance, and adds no signal for other // recoveries either. -func (r *LocalRuntime) emitAgentWarnings(a *agent.Agent, send func(Event)) { +func (r *LocalRuntime) emitAgentWarnings(a *agent.Agent, events EventSink) { warnings := a.DrainWarnings() if len(warnings) == 0 { return } slog.Warn("Tool setup partially failed; continuing", "agent", a.Name(), "warnings", warnings) - send(Warning(formatToolWarning(a, warnings), a.Name())) + events.Emit(Warning(formatToolWarning(a, warnings), a.Name())) } func formatToolWarning(a *agent.Agent, warnings []string) string { @@ -883,21 +890,6 @@ func filterExcludedTools(agentTools []tools.Tool, excluded []string) []tools.Too return filtered } -// chanSend wraps a channel as a func(Event) for use with emitAgentWarnings -// and RAG event forwarding. The send is non-blocking: if the channel is full -// or closed, the event is silently dropped. This prevents a panic when a -// long-lived goroutine (e.g. RAG file watcher) tries to forward an event -// after the per-message events channel has been closed. -func chanSend(ch chan Event) func(Event) { - return func(e Event) { - defer func() { recover() }() //nolint:errcheck // swallow send-on-closed-channel panic - select { - case ch <- e: - default: - } - } -} - // reprobe re-runs ensureToolSetsAreStarted after a batch of tool calls. // If new tools became available (by name-set diff), it emits a ToolsetInfo // event to update the TUI immediately. The new tools will be picked up by @@ -911,7 +903,7 @@ func (r *LocalRuntime) reprobe( a *agent.Agent, currentTools []tools.Tool, sessionSpan trace.Span, - events chan Event, + events EventSink, ) { updated, err := r.getTools(ctx, a, sessionSpan, events, false) if err != nil { @@ -921,7 +913,7 @@ func (r *LocalRuntime) reprobe( updated = filterExcludedTools(updated, sess.ExcludedTools) // Emit any pending warnings that getTools just generated. - r.emitAgentWarnings(a, chanSend(events)) + r.emitAgentWarnings(a, events) // Compute added tools by comparing name-sets (not just counts), so we // correctly handle a toolset that replaced one tool with another. @@ -944,5 +936,5 @@ func (r *LocalRuntime) reprobe( "agent", a.Name(), "added", added) // Emit updated tool count to the TUI immediately. - chanSend(events)(ToolsetInfo(len(updated), false, a.Name())) + events.Emit(ToolsetInfo(len(updated), false, a.Name())) } diff --git a/pkg/runtime/loop_steps.go b/pkg/runtime/loop_steps.go index b102710c4..f14148512 100644 --- a/pkg/runtime/loop_steps.go +++ b/pkg/runtime/loop_steps.go @@ -51,7 +51,7 @@ func (r *LocalRuntime) enforceMaxIterations( a *agent.Agent, iteration int, runtimeMaxIterations int, - events chan Event, + events EventSink, ) (newMax int, decision iterationDecision) { if runtimeMaxIterations <= 0 || iteration < runtimeMaxIterations { return runtimeMaxIterations, iterationContinue @@ -63,7 +63,7 @@ func (r *LocalRuntime) enforceMaxIterations( "max", runtimeMaxIterations, ) - events <- MaxIterationsReached(runtimeMaxIterations) + events.Emit(MaxIterationsReached(runtimeMaxIterations)) maxIterMsg := fmt.Sprintf("Maximum iterations reached (%d)", runtimeMaxIterations) r.notifyMaxIterations(ctx, a, sess.ID, maxIterMsg) @@ -148,7 +148,7 @@ func (r *LocalRuntime) handleStreamError( contextLimit int64, overflowCompactions *int, streamSpan trace.Span, - events chan Event, + events EventSink, ) streamErrorOutcome { // Treat context cancellation as a graceful stop. if errors.Is(err, context.Canceled) { @@ -171,10 +171,10 @@ func (r *LocalRuntime) handleStreamError( "context_limit", contextLimit, "attempt", *overflowCompactions, ) - events <- Warning( + events.Emit(Warning( "The conversation has exceeded the model's context window. Automatically compacting the conversation history...", a.Name(), - ) + )) r.compactWithReason(ctx, sess, "", compactionReasonOverflow, events) return streamErrorRetry } @@ -184,7 +184,7 @@ func (r *LocalRuntime) handleStreamError( slog.ErrorContext(ctx, "All models failed", "agent", a.Name(), "error", err) r.telemetry.RecordError(ctx, err.Error()) errMsg := modelerrors.FormatError(err) - events <- ErrorWithCode(classifyErrorCode(err), errMsg) + events.Emit(ErrorWithCode(classifyErrorCode(err), errMsg)) r.notifyError(ctx, a, sess.ID, errMsg) return streamErrorFatal } diff --git a/pkg/runtime/loop_steps_test.go b/pkg/runtime/loop_steps_test.go index 35809af14..216a381ea 100644 --- a/pkg/runtime/loop_steps_test.go +++ b/pkg/runtime/loop_steps_test.go @@ -50,7 +50,7 @@ func TestEnforceMaxIterations_BelowLimit_Continues(t *testing.T) { sess := session.New() events := make(chan Event, 8) - newMax, decision := rt.enforceMaxIterations(t.Context(), sess, a, 3, 10, events) + newMax, decision := rt.enforceMaxIterations(t.Context(), sess, a, 3, 10, NewChannelSink(events)) assert.Equal(t, iterationContinue, decision) assert.Equal(t, 10, newMax, "limit must be unchanged when below the cap") @@ -65,7 +65,7 @@ func TestEnforceMaxIterations_DisabledLimit_Continues(t *testing.T) { events := make(chan Event, 8) // runtimeMaxIterations <= 0 disables the cap entirely. - newMax, decision := rt.enforceMaxIterations(t.Context(), sess, a, 1_000_000, 0, events) + newMax, decision := rt.enforceMaxIterations(t.Context(), sess, a, 1_000_000, 0, NewChannelSink(events)) assert.Equal(t, iterationContinue, decision) assert.Equal(t, 0, newMax) @@ -80,7 +80,7 @@ func TestEnforceMaxIterations_NonInteractive_AutoStops(t *testing.T) { sess.NonInteractive = true events := make(chan Event, 8) - _, decision := rt.enforceMaxIterations(t.Context(), sess, a, 10, 10, events) + _, decision := rt.enforceMaxIterations(t.Context(), sess, a, 10, 10, NewChannelSink(events)) assert.Equal(t, iterationStop, decision, "non-interactive must auto-stop when at the cap") @@ -113,7 +113,7 @@ func TestEnforceMaxIterations_Interactive_ApproveExtends(t *testing.T) { // returns immediately instead of blocking on user input. go func() { rt.resumeChan <- ResumeApprove() }() - newMax, decision := rt.enforceMaxIterations(t.Context(), sess, a, 10, 10, events) + newMax, decision := rt.enforceMaxIterations(t.Context(), sess, a, 10, 10, NewChannelSink(events)) assert.Equal(t, iterationContinue, decision) assert.Equal(t, 20, newMax, "approve must extend by 10 iterations beyond the current iteration") @@ -128,7 +128,7 @@ func TestEnforceMaxIterations_Interactive_RejectStops(t *testing.T) { go func() { rt.resumeChan <- ResumeReject("no thanks") }() - _, decision := rt.enforceMaxIterations(t.Context(), sess, a, 10, 10, events) + _, decision := rt.enforceMaxIterations(t.Context(), sess, a, 10, 10, NewChannelSink(events)) assert.Equal(t, iterationStop, decision) @@ -153,7 +153,7 @@ func TestEnforceMaxIterations_ContextCancelled_Stops(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) cancel() - _, decision := rt.enforceMaxIterations(ctx, sess, a, 10, 10, events) + _, decision := rt.enforceMaxIterations(ctx, sess, a, 10, 10, NewChannelSink(events)) assert.Equal(t, iterationStop, decision) } @@ -169,7 +169,7 @@ func TestHandleStreamError_ContextCanceled_Fatal(t *testing.T) { _, sp := span(t.Context(), "x") overflowCount := 0 - outcome := rt.handleStreamError(t.Context(), sess, a, context.Canceled, 1000, &overflowCount, sp, events) + outcome := rt.handleStreamError(t.Context(), sess, a, context.Canceled, 1000, &overflowCount, sp, NewChannelSink(events)) assert.Equal(t, streamErrorFatal, outcome) assert.Empty(t, drainEvents(events), "context cancel should not emit any events") @@ -202,7 +202,7 @@ func TestHandleStreamError_OverflowWithCompactionEnabled_Retries(t *testing.T) { overflow := modelerrors.NewContextOverflowError(errors.New("too long")) overflowCount := 0 - outcome := rt.handleStreamError(t.Context(), sess, root, overflow, 1000, &overflowCount, sp, events) + outcome := rt.handleStreamError(t.Context(), sess, root, overflow, 1000, &overflowCount, sp, NewChannelSink(events)) assert.Equal(t, streamErrorRetry, outcome) assert.Equal(t, 1, overflowCount, "overflow counter should bump on retry") @@ -230,7 +230,7 @@ func TestHandleStreamError_OverflowExhausted_Fatal(t *testing.T) { // Counter is already at the cap, so we must NOT retry again. overflowCount := rt.maxOverflowCompactions - outcome := rt.handleStreamError(t.Context(), sess, a, overflow, 1000, &overflowCount, sp, events) + outcome := rt.handleStreamError(t.Context(), sess, a, overflow, 1000, &overflowCount, sp, NewChannelSink(events)) assert.Equal(t, streamErrorFatal, outcome) assert.Equal(t, rt.maxOverflowCompactions, overflowCount, "exhausted path must not bump counter further") @@ -264,7 +264,7 @@ func TestHandleStreamError_OverflowWithCompactionDisabled_Fatal(t *testing.T) { overflow := modelerrors.NewContextOverflowError(errors.New("too long")) overflowCount := 0 - outcome := rt.handleStreamError(t.Context(), sess, root, overflow, 1000, &overflowCount, sp, events) + outcome := rt.handleStreamError(t.Context(), sess, root, overflow, 1000, &overflowCount, sp, NewChannelSink(events)) assert.Equal(t, streamErrorFatal, outcome, "overflow must be fatal when session compaction is disabled") assert.Equal(t, 0, overflowCount) @@ -279,7 +279,7 @@ func TestHandleStreamError_GenericError_FatalAndEmitsError(t *testing.T) { _, sp := noop.NewTracerProvider().Tracer("t").Start(t.Context(), "x") overflowCount := 0 - outcome := rt.handleStreamError(t.Context(), sess, a, errors.New("boom"), 1000, &overflowCount, sp, events) + outcome := rt.handleStreamError(t.Context(), sess, a, errors.New("boom"), 1000, &overflowCount, sp, NewChannelSink(events)) assert.Equal(t, streamErrorFatal, outcome) diff --git a/pkg/runtime/model_picker.go b/pkg/runtime/model_picker.go index 0cd6df355..f788a11a7 100644 --- a/pkg/runtime/model_picker.go +++ b/pkg/runtime/model_picker.go @@ -30,7 +30,7 @@ func (r *LocalRuntime) findModelPickerTool() *modelpicker.Tool { } // handleChangeModel handles the change_model tool call by switching the current agent's model. -func (r *LocalRuntime) handleChangeModel(ctx context.Context, _ *session.Session, toolCall tools.ToolCall, events chan Event) (*tools.ToolCallResult, error) { +func (r *LocalRuntime) handleChangeModel(ctx context.Context, _ *session.Session, toolCall tools.ToolCall, events EventSink) (*tools.ToolCallResult, error) { var params modelpicker.ChangeModelArgs if err := json.Unmarshal([]byte(toolCall.Function.Arguments), ¶ms); err != nil { return nil, fmt.Errorf("invalid arguments: %w", err) @@ -57,21 +57,21 @@ func (r *LocalRuntime) handleChangeModel(ctx context.Context, _ *session.Session } // handleRevertModel handles the revert_model tool call by reverting the current agent to its default model. -func (r *LocalRuntime) handleRevertModel(ctx context.Context, _ *session.Session, _ tools.ToolCall, events chan Event) (*tools.ToolCallResult, error) { +func (r *LocalRuntime) handleRevertModel(ctx context.Context, _ *session.Session, _ tools.ToolCall, events EventSink) (*tools.ToolCallResult, error) { return r.setModelAndEmitInfo(ctx, "", events) } // setModelAndEmitInfo sets the model for the current agent and emits an updated // AgentInfo event so the UI reflects the change. An empty modelRef reverts to // the agent's default model. -func (r *LocalRuntime) setModelAndEmitInfo(ctx context.Context, modelRef string, events chan Event) (*tools.ToolCallResult, error) { +func (r *LocalRuntime) setModelAndEmitInfo(ctx context.Context, modelRef string, events EventSink) (*tools.ToolCallResult, error) { currentName := r.CurrentAgentName() if err := r.SetAgentModel(ctx, currentName, modelRef); err != nil { return tools.ResultError(fmt.Sprintf("failed to set model: %v", err)), nil } if a, err := r.team.Agent(currentName); err == nil { - events <- AgentInfo(a.Name(), r.getEffectiveModelID(a), a.Description(), a.WelcomeMessage()) + events.Emit(AgentInfo(a.Name(), r.getEffectiveModelID(a), a.Description(), a.WelcomeMessage())) } else { slog.WarnContext(ctx, "Failed to retrieve agent after model change; UI may not reflect the update", "agent", currentName, "error", err) } diff --git a/pkg/runtime/pre_tool_use_approval_test.go b/pkg/runtime/pre_tool_use_approval_test.go index 22465df95..2beaff6f6 100644 --- a/pkg/runtime/pre_tool_use_approval_test.go +++ b/pkg/runtime/pre_tool_use_approval_test.go @@ -139,7 +139,7 @@ func runJudgedToolCall(t *testing.T, rt *LocalRuntime, sess *session.Session, ag Function: tools.FunctionCall{Name: "the_tool", Arguments: "{}"}, }} events := make(chan Event, 16) - rt.processToolCalls(t.Context(), sess, calls, agentTools, events) + rt.processToolCalls(t.Context(), sess, calls, agentTools, NewChannelSink(events)) close(events) return collectClosedEvents(events) } @@ -199,7 +199,7 @@ func TestPreToolUseHook_AskEscalatesToUser(t *testing.T) { events := make(chan Event, 16) done := make(chan struct{}) go func() { - rt.processToolCalls(ctx, sess, calls, agentTools, events) + rt.processToolCalls(ctx, sess, calls, agentTools, NewChannelSink(events)) close(done) }() diff --git a/pkg/runtime/remote_runtime.go b/pkg/runtime/remote_runtime.go index 079029565..dcbe1dea8 100644 --- a/pkg/runtime/remote_runtime.go +++ b/pkg/runtime/remote_runtime.go @@ -161,15 +161,15 @@ func (r *RemoteRuntime) RestartToolset(ctx context.Context, toolsetName string) } // EmitStartupInfo emits initial agent, team, and toolset information -func (r *RemoteRuntime) EmitStartupInfo(ctx context.Context, _ *session.Session, events chan Event) { +func (r *RemoteRuntime) EmitStartupInfo(ctx context.Context, _ *session.Session, events EventSink) { agentName, cfg := r.resolvedAgent(ctx) - events <- AgentInfo(agentName, cfg.Model, cfg.Description, cfg.WelcomeMessage) - events <- TeamInfo(r.agentDetailsFromConfig(ctx), agentName) + events.Emit(AgentInfo(agentName, cfg.Model, cfg.Description, cfg.WelcomeMessage)) + events.Emit(TeamInfo(r.agentDetailsFromConfig(ctx), agentName)) // Emit a loading indicator while we fetch the real tool count from the server. if len(cfg.Toolsets) > 0 { - events <- ToolsetInfo(0, true, agentName) + events.Emit(ToolsetInfo(0, true, agentName)) } toolCount, err := r.client.GetAgentToolCount(ctx, r.agentFilename, agentName) @@ -178,7 +178,7 @@ func (r *RemoteRuntime) EmitStartupInfo(ctx context.Context, _ *session.Session, return } - events <- ToolsetInfo(toolCount, false, agentName) + events.Emit(ToolsetInfo(toolCount, false, agentName)) } func (r *RemoteRuntime) agentDetailsFromConfig(ctx context.Context) []AgentDetails { @@ -321,17 +321,17 @@ func (r *RemoteRuntime) Resume(ctx context.Context, req ResumeRequest) { } // Summarize generates a summary for the session by compacting it server-side. -func (r *RemoteRuntime) Summarize(ctx context.Context, sess *session.Session, _ string, events chan Event) { +func (r *RemoteRuntime) Summarize(ctx context.Context, sess *session.Session, _ string, sink EventSink) { if r.sessionID == "" { - events <- SessionSummary(sess.ID, "No active session to summarize", r.currentAgent, 0) + sink.Emit(SessionSummary(sess.ID, "No active session to summarize", r.currentAgent, 0)) return } if err := r.client.CompactSession(ctx, r.sessionID); err != nil { slog.WarnContext(ctx, "Failed to compact session", "error", err) - events <- SessionSummary(sess.ID, fmt.Sprintf("Compaction failed: %v", err), r.currentAgent, 0) + sink.Emit(SessionSummary(sess.ID, fmt.Sprintf("Compaction failed: %v", err), r.currentAgent, 0)) return } - events <- SessionSummary(sess.ID, "Session compacted successfully", r.currentAgent, 0) + sink.Emit(SessionSummary(sess.ID, "Session compacted successfully", r.currentAgent, 0)) } func (r *RemoteRuntime) convertSessionMessages(sess *session.Session) []api.Message { diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 67246bd2b..d60c91762 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -33,7 +33,7 @@ import ( ) // ToolHandlerFunc is a function type for handling tool calls -type ToolHandlerFunc func(ctx context.Context, sess *session.Session, toolCall tools.ToolCall, events chan Event) (*tools.ToolCallResult, error) +type ToolHandlerFunc func(ctx context.Context, sess *session.Session, toolCall tools.ToolCall, events EventSink) (*tools.ToolCallResult, error) // Runtime defines the contract for runtime execution type Runtime interface { @@ -59,7 +59,7 @@ type Runtime interface { // EmitStartupInfo emits initial agent, team, and toolset information for immediate display. // When sess is non-nil and contains token data, a TokenUsageEvent is also emitted // so the UI can display context usage percentage on session restore. - EmitStartupInfo(ctx context.Context, sess *session.Session, events chan Event) + EmitStartupInfo(ctx context.Context, sess *session.Session, events EventSink) // ResetStartupInfo resets the startup info emission flag, allowing re-emission ResetStartupInfo() // RunStream starts the agent's interaction loop and returns a channel of events @@ -76,7 +76,7 @@ type Runtime interface { SessionStore() session.Store // Summarize generates a summary for the session - Summarize(ctx context.Context, sess *session.Session, additionalPrompt string, events chan Event) + Summarize(ctx context.Context, sess *session.Session, additionalPrompt string, events EventSink) // PermissionsInfo returns the team-level permission patterns (allow/ask/deny). // Returns nil if no permissions are configured. @@ -997,7 +997,7 @@ func (r *LocalRuntime) emitToolsChanged() { // EmitStartupInfo emits initial agent, team, and toolset information for immediate sidebar display. // When sess is non-nil and contains token data, a TokenUsageEvent is also emitted so that the // sidebar can display context usage percentage on session restore. -func (r *LocalRuntime) EmitStartupInfo(ctx context.Context, sess *session.Session, events chan Event) { +func (r *LocalRuntime) EmitStartupInfo(ctx context.Context, sess *session.Session, events EventSink) { // Prevent duplicate emissions if r.startupInfoEmitted { return @@ -1008,12 +1008,11 @@ func (r *LocalRuntime) EmitStartupInfo(ctx context.Context, sess *session.Sessio // Helper to send events with context check send := func(event Event) bool { - select { - case events <- event: - return true - case <-ctx.Done(): + if ctx.Err() != nil { return false } + events.Emit(event) + return true } // Emit agent and team information immediately for fast sidebar display @@ -1086,7 +1085,7 @@ func (r *LocalRuntime) EmitStartupInfo(ctx context.Context, sess *session.Sessio // persistent notice with the actual server-side explanation — otherwise // the user only sees the toolset disappear from the sidebar with no clue // as to why. - r.emitAgentWarnings(a, func(e Event) { send(e) }) + r.emitAgentWarnings(a, events) } // emitToolsProgressively loads tools from each toolset and emits progress updates. @@ -1261,7 +1260,7 @@ func (r *LocalRuntime) startSpan(ctx context.Context, name string, opts ...trace // and "manual" to PreCompact hooks. // Internal callers (proactive threshold, overflow recovery) use // [LocalRuntime.compactWithReason] directly to forward a more specific reason. -func (r *LocalRuntime) Summarize(ctx context.Context, sess *session.Session, additionalPrompt string, events chan Event) { +func (r *LocalRuntime) Summarize(ctx context.Context, sess *session.Session, additionalPrompt string, events EventSink) { r.compactWithReason(ctx, sess, additionalPrompt, compactionReasonManual, events) } @@ -1280,7 +1279,7 @@ func (r *LocalRuntime) Summarize(ctx context.Context, sess *session.Session, add // compaction or contribute additional steering text. BeforeCompaction // hooks then fire inside [LocalRuntime.doCompact] with [Input.CompactionReason] // set to the canonical reason; they may veto or supply a custom summary. -func (r *LocalRuntime) compactWithReason(ctx context.Context, sess *session.Session, additionalPrompt, reason string, events chan Event) { +func (r *LocalRuntime) compactWithReason(ctx context.Context, sess *session.Session, additionalPrompt, reason string, events EventSink) { // Stamp the session ID on ctx so the compaction LLM call carries // `X-Cagent-Session-Id` to the gateway. Manual compaction // (via `Summarize` from the App) bypasses `runStreamLoop`'s seed; @@ -1295,7 +1294,7 @@ func (r *LocalRuntime) compactWithReason(ctx context.Context, sess *session.Sess slog.WarnContext(ctx, "pre_compact hook signalled skip", "agent", a.Name(), "session_id", sess.ID, "source", source, "reason", msg) if msg != "" { - events <- Warning(msg, a.Name()) + events.Emit(Warning(msg, a.Name())) } return } @@ -1311,7 +1310,7 @@ func (r *LocalRuntime) compactWithReason(ctx context.Context, sess *session.Sess if m, err := r.modelsStore.GetModel(ctx, modelID); err == nil && m != nil { contextLimit = int64(m.Limit.Context) } - events <- NewTokenUsageEvent(sess.ID, a.Name(), SessionUsage(sess, contextLimit)) + events.Emit(NewTokenUsageEvent(sess.ID, a.Name(), SessionUsage(sess, contextLimit))) } // preCompactSourceFor maps the canonical compaction reason diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go index 33c8b43c2..03d0f5649 100644 --- a/pkg/runtime/runtime_test.go +++ b/pkg/runtime/runtime_test.go @@ -845,11 +845,11 @@ func TestGetTools_WarningHandling(t *testing.T) { sessionSpan := trace.SpanFromContext(t.Context()) // First call - tools1, err := rt.getTools(t.Context(), root, sessionSpan, events, true) + tools1, err := rt.getTools(t.Context(), root, sessionSpan, NewChannelSink(events), true) require.NoError(t, err) require.Len(t, tools1, tt.wantToolCount) - rt.emitAgentWarnings(root, chanSend(events)) + rt.emitAgentWarnings(root, NewChannelSink(events)) evs := collectEvents(events) require.Equal(t, tt.wantWarning, hasWarningEvent(evs), "warning event mismatch on first call") }) @@ -890,7 +890,7 @@ func TestProcessToolCalls_UnknownTool_ReturnsErrorResponse(t *testing.T) { }} events := make(chan Event, 10) - rt.processToolCalls(t.Context(), sess, calls, nil, events) + rt.processToolCalls(t.Context(), sess, calls, nil, NewChannelSink(events)) close(events) for range events { } @@ -997,7 +997,7 @@ func TestEmitStartupInfo_DoesNotBlockOnInteractiveOAuth(t *testing.T) { done := make(chan struct{}) go func() { - rt.EmitStartupInfo(t.Context(), nil, events) + rt.EmitStartupInfo(t.Context(), nil, NewChannelSink(events)) close(done) }() @@ -1055,7 +1055,7 @@ func TestEmitStartupInfo_SurfacesToolsetStartFailureAsWarning(t *testing.T) { require.NoError(t, err) events := make(chan Event, 32) - rt.EmitStartupInfo(t.Context(), nil, events) + rt.EmitStartupInfo(t.Context(), nil, NewChannelSink(events)) close(events) var warning *WarningEvent @@ -1094,7 +1094,7 @@ func TestEmitStartupInfo_AuthRequiredIsSilent(t *testing.T) { require.NoError(t, err) events := make(chan Event, 32) - rt.EmitStartupInfo(t.Context(), nil, events) + rt.EmitStartupInfo(t.Context(), nil, NewChannelSink(events)) close(events) for e := range events { @@ -1134,7 +1134,7 @@ func TestEmitStartupInfo_DeferredAuthDoesNotConsumeFailureGate(t *testing.T) { require.NoError(t, err) events := make(chan Event, 32) - rt.EmitStartupInfo(t.Context(), nil, events) + rt.EmitStartupInfo(t.Context(), nil, NewChannelSink(events)) close(events) for range events { } @@ -1176,11 +1176,11 @@ func TestEmitAgentWarnings_OnlyEmitsFailures(t *testing.T) { root.AddToolWarning("toolset_a start failed: connection refused") var emitted []*WarningEvent - rt.emitAgentWarnings(root, func(e Event) { + rt.emitAgentWarnings(root, EventSinkFunc(func(e Event) { if w, ok := e.(*WarningEvent); ok { emitted = append(emitted, w) } - }) + })) require.Len(t, emitted, 1, "expected exactly one event for one failure (recoveries are silent)") w := emitted[0] @@ -1202,7 +1202,7 @@ func TestEmitAgentWarnings_NoEventsWhenQueueEmpty(t *testing.T) { require.NoError(t, err) var emitted int - rt.emitAgentWarnings(root, func(Event) { emitted++ }) + rt.emitAgentWarnings(root, EventSinkFunc(func(Event) { emitted++ })) assert.Zero(t, emitted, "empty warnings queue must produce zero events") } @@ -1227,7 +1227,7 @@ func TestEmitStartupInfo(t *testing.T) { events := make(chan Event, 10) // Call EmitStartupInfo - rt.EmitStartupInfo(t.Context(), nil, events) + rt.EmitStartupInfo(t.Context(), nil, NewChannelSink(events)) close(events) // Collect events @@ -1250,7 +1250,7 @@ func TestEmitStartupInfo(t *testing.T) { // Test that calling EmitStartupInfo again doesn't emit duplicate events events2 := make(chan Event, 10) - rt.EmitStartupInfo(t.Context(), nil, events2) + rt.EmitStartupInfo(t.Context(), nil, NewChannelSink(events2)) close(events2) var collectedEvents2 []Event @@ -1283,7 +1283,7 @@ func TestEmitStartupInfo_WithSessionTokenData(t *testing.T) { sess.OutputTokens = 1000 events := make(chan Event, 20) - rt.EmitStartupInfo(t.Context(), sess, events) + rt.EmitStartupInfo(t.Context(), sess, NewChannelSink(events)) close(events) // Collect events and find the TokenUsageEvent @@ -1352,7 +1352,7 @@ func TestEmitStartupInfo_CostIncludesSubSessions(t *testing.T) { sess.Messages = append(sess.Messages, session.Item{SubSession: subSess}) events := make(chan Event, 20) - rt.EmitStartupInfo(t.Context(), sess, events) + rt.EmitStartupInfo(t.Context(), sess, NewChannelSink(events)) close(events) var tokenEvent *TokenUsageEvent @@ -1402,7 +1402,7 @@ func TestEmitStartupInfo_LastMessageFinishReason(t *testing.T) { }) events := make(chan Event, 20) - rt.EmitStartupInfo(t.Context(), sess, events) + rt.EmitStartupInfo(t.Context(), sess, NewChannelSink(events)) close(events) var tokenEvent *TokenUsageEvent @@ -1435,7 +1435,7 @@ func TestEmitStartupInfo_NilSessionNoTokenEvent(t *testing.T) { require.NoError(t, err) events := make(chan Event, 20) - rt.EmitStartupInfo(t.Context(), nil, events) + rt.EmitStartupInfo(t.Context(), nil, NewChannelSink(events)) close(events) for event := range events { @@ -1479,7 +1479,7 @@ func TestPermissions_DenyBlocksToolExecution(t *testing.T) { }} events := make(chan Event, 10) - rt.processToolCalls(t.Context(), sess, calls, agentTools, events) + rt.processToolCalls(t.Context(), sess, calls, agentTools, NewChannelSink(events)) close(events) // The tool should be denied, look for a ToolCallResponseEvent with error @@ -1535,7 +1535,7 @@ func TestPermissions_AllowAutoApprovesTool(t *testing.T) { }} events := make(chan Event, 10) - rt.processToolCalls(t.Context(), sess, calls, agentTools, events) + rt.processToolCalls(t.Context(), sess, calls, agentTools, NewChannelSink(events)) close(events) // The tool should have been executed due to allow pattern @@ -1576,7 +1576,7 @@ func TestPermissions_DenyTakesPriorityOverAllow(t *testing.T) { }} events := make(chan Event, 10) - rt.processToolCalls(t.Context(), sess, calls, agentTools, events) + rt.processToolCalls(t.Context(), sess, calls, agentTools, NewChannelSink(events)) close(events) // The tool should be denied despite wildcard allow @@ -1624,7 +1624,7 @@ func TestSessionPermissions_DenyBlocksToolExecution(t *testing.T) { }} events := make(chan Event, 10) - rt.processToolCalls(t.Context(), sess, calls, agentTools, events) + rt.processToolCalls(t.Context(), sess, calls, agentTools, NewChannelSink(events)) close(events) var toolResponse *ToolCallResponseEvent @@ -1677,7 +1677,7 @@ func TestSessionPermissions_AllowAutoApprovesTool(t *testing.T) { }} events := make(chan Event, 10) - rt.processToolCalls(t.Context(), sess, calls, agentTools, events) + rt.processToolCalls(t.Context(), sess, calls, agentTools, NewChannelSink(events)) close(events) require.True(t, executed, "expected tool to be auto-approved by session permissions") @@ -1723,7 +1723,7 @@ func TestSessionPermissions_TakePriorityOverTeamPermissions(t *testing.T) { }} events := make(chan Event, 10) - rt.processToolCalls(t.Context(), sess, calls, agentTools, events) + rt.processToolCalls(t.Context(), sess, calls, agentTools, NewChannelSink(events)) close(events) // Session deny should take priority over team allow @@ -1773,7 +1773,7 @@ func TestToolRejectionWithReason(t *testing.T) { // Run in goroutine since it will block waiting for confirmation go func() { - rt.processToolCalls(t.Context(), sess, calls, agentTools, events) + rt.processToolCalls(t.Context(), sess, calls, agentTools, NewChannelSink(events)) close(events) }() @@ -1829,7 +1829,7 @@ func TestToolRejectionWithoutReason(t *testing.T) { // Run in goroutine since it will block waiting for confirmation go func() { - rt.processToolCalls(t.Context(), sess, calls, agentTools, events) + rt.processToolCalls(t.Context(), sess, calls, agentTools, NewChannelSink(events)) close(events) }() @@ -1879,7 +1879,7 @@ func TestTransferTaskRejectsNonSubAgent(t *testing.T) { }, } - result, err := rt.handleTaskTransfer(t.Context(), sess, toolCall, evts) + result, err := rt.handleTaskTransfer(t.Context(), sess, toolCall, NewChannelSink(evts)) require.NoError(t, err) require.NotNil(t, result) assert.True(t, result.IsError, "transfer to non-sub-agent should return an error result") @@ -1917,7 +1917,7 @@ func TestTransferTaskAllowsSubAgent(t *testing.T) { }, } - result, err := rt.handleTaskTransfer(t.Context(), sess, toolCall, evts) + result, err := rt.handleTaskTransfer(t.Context(), sess, toolCall, NewChannelSink(evts)) require.NoError(t, err) require.NotNil(t, result) assert.False(t, result.IsError, "transfer to valid sub-agent should succeed") @@ -1962,7 +1962,7 @@ func TestYoloMode_OverridesPermissionsDeny(t *testing.T) { }} events := make(chan Event, 10) - rt.processToolCalls(t.Context(), sess, calls, agentTools, events) + rt.processToolCalls(t.Context(), sess, calls, agentTools, NewChannelSink(events)) close(events) // With --yolo, the tool should execute despite deny permission @@ -2008,7 +2008,7 @@ func TestYoloMode_OverridesForceAsk(t *testing.T) { }} events := make(chan Event, 10) - rt.processToolCalls(t.Context(), sess, calls, agentTools, events) + rt.processToolCalls(t.Context(), sess, calls, agentTools, NewChannelSink(events)) close(events) // With --yolo, the tool should execute without asking @@ -2053,7 +2053,7 @@ func TestYoloMode_OverridesSessionDeny(t *testing.T) { }} events := make(chan Event, 10) - rt.processToolCalls(t.Context(), sess, calls, agentTools, events) + rt.processToolCalls(t.Context(), sess, calls, agentTools, NewChannelSink(events)) close(events) // With --yolo, the tool should execute despite session deny @@ -2352,7 +2352,7 @@ func TestProcessToolCalls_UsesPinnedAgent(t *testing.T) { }} events := make(chan Event, 32) - rt.processToolCalls(t.Context(), sess, calls, []tools.Tool{workerTool}, events) + rt.processToolCalls(t.Context(), sess, calls, []tools.Tool{workerTool}, NewChannelSink(events)) close(events) assert.True(t, executed, "worker_tool handler should have been called") @@ -3202,7 +3202,7 @@ func TestDrainAndEmitSteered_MultipleMessages(t *testing.T) { sess := session.New() events := make(chan Event, 16) - drained, _ := rt.drainAndEmitSteered(t.Context(), sess, events) + drained, _ := rt.drainAndEmitSteered(t.Context(), sess, NewChannelSink(events)) close(events) assert.True(t, drained, "should report messages were drained") @@ -3266,7 +3266,7 @@ func TestDrainAndEmitSteered_MultiContent(t *testing.T) { sess := session.New() events := make(chan Event, 16) - drained, _ := rt.drainAndEmitSteered(t.Context(), sess, events) + drained, _ := rt.drainAndEmitSteered(t.Context(), sess, NewChannelSink(events)) close(events) assert.True(t, drained) @@ -3336,7 +3336,7 @@ func TestPostToolHookReceivesToolResult(t *testing.T) { }} events := make(chan Event, 10) - rt.processToolCalls(t.Context(), sess, calls, agentTools, events) + rt.processToolCalls(t.Context(), sess, calls, agentTools, NewChannelSink(events)) require.NotNil(t, got) assert.Equal(t, hooks.EventPostToolUse, got.HookEventName) @@ -3388,7 +3388,7 @@ func TestPostToolHookEmitsLifecycleEvents(t *testing.T) { }} events := make(chan Event, 10) - rt.processToolCalls(t.Context(), sess, calls, agentTools, events) + rt.processToolCalls(t.Context(), sess, calls, agentTools, NewChannelSink(events)) var started *HookStartedEvent var finished *HookFinishedEvent diff --git a/pkg/runtime/session_compaction.go b/pkg/runtime/session_compaction.go index deb7791bb..6bc557c00 100644 --- a/pkg/runtime/session_compaction.go +++ b/pkg/runtime/session_compaction.go @@ -52,7 +52,7 @@ const ( // into every model call (see [LocalRuntime.executeSessionStartHooks]), so // env / cwd / OS info is automatically present after a compaction without // any extra dispatch. -func (r *LocalRuntime) doCompact(ctx context.Context, sess *session.Session, a *agent.Agent, additionalPrompt, reason string, events chan Event) { +func (r *LocalRuntime) doCompact(ctx context.Context, sess *session.Session, a *agent.Agent, additionalPrompt, reason string, events EventSink) { contextLimit := r.compactionContextLimit(ctx, a) // before_compaction: hooks can veto or supply a custom summary. @@ -66,9 +66,9 @@ func (r *LocalRuntime) doCompact(ctx context.Context, sess *session.Session, a * } slog.DebugContext(ctx, "Generating summary for session", "session_id", sess.ID, "reason", reason) - events <- SessionCompaction(sess.ID, "started", a.Name()) + events.Emit(SessionCompaction(sess.ID, "started", a.Name())) defer func() { - events <- SessionCompaction(sess.ID, "completed", a.Name()) + events.Emit(SessionCompaction(sess.ID, "completed", a.Name())) }() // Choose the strategy: a hook-supplied summary if before_compaction @@ -78,7 +78,7 @@ func (r *LocalRuntime) doCompact(ctx context.Context, sess *session.Session, a * if contextLimit <= 0 { slog.ErrorContext(ctx, "Failed to generate session summary", "error", "model definition unavailable") - events <- Error("Failed to get model definition") + events.Emit(Error("Failed to get model definition")) return } @@ -92,7 +92,7 @@ func (r *LocalRuntime) doCompact(ctx context.Context, sess *session.Session, a * }) if err != nil { slog.ErrorContext(ctx, "Failed to generate session summary", "error", err) - events <- Error(err.Error()) + events.Emit(Error(err.Error())) return } if result == nil { @@ -119,7 +119,7 @@ func (r *LocalRuntime) doCompact(ctx context.Context, sess *session.Session, a * _ = r.sessionStore.UpdateSession(ctx, sess) slog.DebugContext(ctx, "Generated session summary", "session_id", sess.ID, "summary_length", len(result.Summary)) - events <- SessionSummary(sess.ID, result.Summary, a.Name(), result.FirstKeptEntry) + events.Emit(SessionSummary(sess.ID, result.Summary, a.Name(), result.FirstKeptEntry)) // after_compaction: observational. Fired only when a summary was // actually applied to the session. The hook receives the diff --git a/pkg/runtime/session_compaction_test.go b/pkg/runtime/session_compaction_test.go index a1c067b40..3df7b42d9 100644 --- a/pkg/runtime/session_compaction_test.go +++ b/pkg/runtime/session_compaction_test.go @@ -128,7 +128,7 @@ func TestDoCompactBeforeHookDeniesSkipsCompaction(t *testing.T) { originalLen := len(sess.Messages) events := make(chan Event, 32) - rt.compactWithReason(t.Context(), sess, "", compactionReasonManual, events) + rt.compactWithReason(t.Context(), sess, "", compactionReasonManual, NewChannelSink(events)) close(events) var sawCompactionEvent, sawSummaryEvent bool @@ -184,7 +184,7 @@ func TestDoCompactBeforeHookSuppliesSummary(t *testing.T) { })) events := make(chan Event, 32) - rt.compactWithReason(t.Context(), sess, "", compactionReasonManual, events) + rt.compactWithReason(t.Context(), sess, "", compactionReasonManual, NewChannelSink(events)) close(events) var summaryEvent *SessionSummaryEvent @@ -264,7 +264,7 @@ func TestDoCompactAfterHookFires(t *testing.T) { sess.OutputTokens = 567 events := make(chan Event, 32) - rt.compactWithReason(t.Context(), sess, "", compactionReasonThreshold, events) + rt.compactWithReason(t.Context(), sess, "", compactionReasonThreshold, NewChannelSink(events)) close(events) for range events { } @@ -301,7 +301,7 @@ func TestDoCompactNoHooksMatchesPriorBehavior(t *testing.T) { })) events := make(chan Event, 32) - rt.compactWithReason(t.Context(), sess, "", compactionReasonManual, events) + rt.compactWithReason(t.Context(), sess, "", compactionReasonManual, NewChannelSink(events)) close(events) var startCount, doneCount int diff --git a/pkg/runtime/skill_runner.go b/pkg/runtime/skill_runner.go index 0cfd7f02d..5f5fbd2fa 100644 --- a/pkg/runtime/skill_runner.go +++ b/pkg/runtime/skill_runner.go @@ -28,7 +28,7 @@ import ( // // This implements the `context: fork` behaviour from the SKILL.md frontmatter, // following the same convention as Claude Code. -func (r *LocalRuntime) handleRunSkill(ctx context.Context, sess *session.Session, toolCall tools.ToolCall, evts chan Event) (*tools.ToolCallResult, error) { +func (r *LocalRuntime) handleRunSkill(ctx context.Context, sess *session.Session, toolCall tools.ToolCall, evts EventSink) (*tools.ToolCallResult, error) { var args skills.RunSkillArgs if err := json.Unmarshal([]byte(toolCall.Function.Arguments), &args); err != nil { return nil, fmt.Errorf("invalid arguments: %w", err) diff --git a/pkg/runtime/streaming.go b/pkg/runtime/streaming.go index 53447e437..d46a28561 100644 --- a/pkg/runtime/streaming.go +++ b/pkg/runtime/streaming.go @@ -39,7 +39,7 @@ type streamResult struct { // It is intentionally a free function rather than a method on *LocalRuntime // so the dependency direction is explicit (the loop calls into the chunker, // never the reverse). -func handleStream(ctx context.Context, stream chat.MessageStream, a *agent.Agent, agentTools []tools.Tool, sess *session.Session, m *modelsdev.Model, tel Telemetry, events chan<- Event) (streamResult, error) { +func handleStream(ctx context.Context, stream chat.MessageStream, a *agent.Agent, agentTools []tools.Tool, sess *session.Session, m *modelsdev.Model, tel Telemetry, events EventSink) (streamResult, error) { defer stream.Close() var fullContent strings.Builder @@ -81,7 +81,7 @@ func handleStream(ctx context.Context, stream chat.MessageStream, a *agent.Agent fullContent.WriteString(textBefore) for _, tc := range toolCalls { toolDef := toolDefMap[tc.Function.Name] - events <- PartialToolCall(tc, toolDef, a.Name()) + events.Emit(PartialToolCall(tc, toolDef, a.Name())) } } @@ -203,7 +203,7 @@ func handleStream(ctx context.Context, stream chat.MessageStream, a *agent.Agent if !emittedPartial[delta.ID] { toolDef = toolDefMap[tc.Function.Name] } - events <- PartialToolCall(partial, toolDef, a.Name()) + events.Emit(PartialToolCall(partial, toolDef, a.Name())) emittedPartial[delta.ID] = true } } @@ -212,7 +212,7 @@ func handleStream(ctx context.Context, stream chat.MessageStream, a *agent.Agent } if choice.Delta.ReasoningContent != "" { - events <- AgentChoiceReasoning(a.Name(), sess.ID, choice.Delta.ReasoningContent) + events.Emit(AgentChoiceReasoning(a.Name(), sess.ID, choice.Delta.ReasoningContent)) fullReasoningContent.WriteString(choice.Delta.ReasoningContent) } @@ -225,11 +225,11 @@ func handleStream(ctx context.Context, stream chat.MessageStream, a *agent.Agent if !xmlToolCallGate { tagIdx := strings.Index(choice.Delta.Content, "") if tagIdx < 0 { - events <- AgentChoice(a.Name(), sess.ID, choice.Delta.Content) + events.Emit(AgentChoice(a.Name(), sess.ID, choice.Delta.Content)) } else { xmlToolCallGate = true if tagIdx > 0 { - events <- AgentChoice(a.Name(), sess.ID, choice.Delta.Content[:tagIdx]) + events.Emit(AgentChoice(a.Name(), sess.ID, choice.Delta.Content[:tagIdx])) } } } diff --git a/pkg/runtime/tool_dispatch.go b/pkg/runtime/tool_dispatch.go index afebc6386..6d3b7a44e 100644 --- a/pkg/runtime/tool_dispatch.go +++ b/pkg/runtime/tool_dispatch.go @@ -29,7 +29,7 @@ import ( // (false, "") in every other path — including user cancellation, which // halts the *batch* but keeps the loop alive so the synthesised tool // error responses can be sent back to the model on the next turn. -func (r *LocalRuntime) processToolCalls(ctx context.Context, sess *session.Session, calls []tools.ToolCall, agentTools []tools.Tool, events chan Event) (stopRun bool, stopMessage string) { +func (r *LocalRuntime) processToolCalls(ctx context.Context, sess *session.Session, calls []tools.ToolCall, agentTools []tools.Tool, events EventSink) (stopRun bool, stopMessage string) { // Bind runtime-managed handlers (transfer_task, handoff, change_model, ...) // to the current events channel: r.toolMap entries take chan Event, // toolexec.ToolHandler doesn't. @@ -48,7 +48,7 @@ func (r *LocalRuntime) processToolCalls(ctx context.Context, sess *session.Sessi Permissions: r.permissionCheckers, Handlers: handlers, } - return d.Process(ctx, sess, calls, agentTools, &chanEmitter{events: events}) + return d.Process(ctx, sess, calls, agentTools, &sinkEmitter{events: events}) } // permissionCheckers returns the ordered list of permission checkers to @@ -74,32 +74,32 @@ func (r *LocalRuntime) permissionCheckers(sess *session.Session) []toolexec.Name return checkers } -// chanEmitter adapts a chan Event into a [toolexec.Emitter]. It's the +// sinkEmitter adapts an [EventSink] into a [toolexec.Emitter]. It's the // only place where the dispatcher's typed event surface meets the // runtime's event channel; new dispatcher events grow this type in // lockstep with the [toolexec.Emitter] interface. -type chanEmitter struct { - events chan Event +type sinkEmitter struct { + events EventSink } -func (e *chanEmitter) EmitToolCall(toolCall tools.ToolCall, tool tools.Tool, agentName string) { - e.events <- ToolCall(toolCall, tool, agentName) +func (e *sinkEmitter) EmitToolCall(toolCall tools.ToolCall, tool tools.Tool, agentName string) { + e.events.Emit(ToolCall(toolCall, tool, agentName)) } -func (e *chanEmitter) EmitToolCallResponse(toolCallID string, tool tools.Tool, result *tools.ToolCallResult, output, agentName string) { - e.events <- ToolCallResponse(toolCallID, tool, result, output, agentName) +func (e *sinkEmitter) EmitToolCallResponse(toolCallID string, tool tools.Tool, result *tools.ToolCallResult, output, agentName string) { + e.events.Emit(ToolCallResponse(toolCallID, tool, result, output, agentName)) } -func (e *chanEmitter) EmitToolCallConfirmation(toolCall tools.ToolCall, tool tools.Tool, agentName string) { - e.events <- ToolCallConfirmation(toolCall, tool, agentName) +func (e *sinkEmitter) EmitToolCallConfirmation(toolCall tools.ToolCall, tool tools.Tool, agentName string) { + e.events.Emit(ToolCallConfirmation(toolCall, tool, agentName)) } -func (e *chanEmitter) EmitHookBlocked(toolCall tools.ToolCall, tool tools.Tool, message, agentName string) { - e.events <- HookBlocked(toolCall, tool, message, agentName) +func (e *sinkEmitter) EmitHookBlocked(toolCall tools.ToolCall, tool tools.Tool, message, agentName string) { + e.events.Emit(HookBlocked(toolCall, tool, message, agentName)) } -func (e *chanEmitter) EmitMessageAdded(sessionID string, msg *session.Message, agentName string) { - e.events <- MessageAdded(sessionID, msg, agentName) +func (e *sinkEmitter) EmitMessageAdded(sessionID string, msg *session.Message, agentName string) { + e.events.Emit(MessageAdded(sessionID, msg, agentName)) } // hookDispatcher adapts the runtime's per-agent [hooks.Executor] machinery @@ -108,7 +108,7 @@ func (e *chanEmitter) EmitMessageAdded(sessionID string, msg *session.Message, a // as a Warning event during dispatch. type hookDispatcher struct { r *LocalRuntime - events chan Event + events EventSink } func (h *hookDispatcher) Dispatch(ctx context.Context, a *agent.Agent, event hooks.EventType, in *hooks.Input) *hooks.Result { @@ -146,8 +146,8 @@ func denySourceFor(checkerSource string) string { // resulting MessageAdded event. Used by the loop for assistant messages // and max-iteration stop messages. The dispatcher emits its own variant // directly via the [toolexec.Emitter] interface. -func addAgentMessage(sess *session.Session, a *agent.Agent, msg *chat.Message, events chan Event) { +func addAgentMessage(sess *session.Session, a *agent.Agent, msg *chat.Message, events EventSink) { agentMsg := session.NewAgentMessage(a.Name(), msg) sess.AddMessage(agentMsg) - events <- MessageAdded(sess.ID, agentMsg, a.Name()) + events.Emit(MessageAdded(sess.ID, agentMsg, a.Name())) }