Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func New(ctx context.Context, rt runtime.Runtime, sess *session.Session, opts ..
startupEvents := make(chan runtime.Event, 10)
go func() {
defer close(startupEvents)
rt.EmitStartupInfo(ctx, sess, startupEvents)
rt.EmitStartupInfo(ctx, sess, runtime.NewChannelSink(startupEvents))
}()
for event := range startupEvents {
select {
Expand Down Expand Up @@ -303,7 +303,7 @@ func (a *App) ResolveCommand(ctx context.Context, userInput string) string {

// EmitStartupInfo emits initial agent, team, and toolset information to the provided channel
func (a *App) EmitStartupInfo(ctx context.Context, events chan runtime.Event) {
a.runtime.EmitStartupInfo(ctx, a.session, events)
a.runtime.EmitStartupInfo(ctx, a.session, runtime.NewChannelSink(events))
}

// Run one agent loop
Expand Down Expand Up @@ -657,7 +657,7 @@ func (a *App) reEmitStartupInfo(ctx context.Context) {
startupEvents := make(chan runtime.Event, 10)
go func() {
defer close(startupEvents)
a.runtime.EmitStartupInfo(ctx, a.session, startupEvents)
a.runtime.EmitStartupInfo(ctx, a.session, runtime.NewChannelSink(startupEvents))
}()
for event := range startupEvents {
select {
Expand Down Expand Up @@ -885,7 +885,7 @@ func (a *App) CompactSession(ctx context.Context, additionalPrompt string) {
events := make(chan runtime.Event, 100)
go func() {
defer close(events)
a.runtime.Summarize(ctx, sess, additionalPrompt, events)
a.runtime.Summarize(ctx, sess, additionalPrompt, runtime.NewChannelSink(events))
}()
for event := range events {
if ctx.Err() != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (m *mockRuntime) CurrentAgentTools(ctx context.Context) ([]tools.Tool, erro
func (m *mockRuntime) CurrentAgentToolsetStatuses() []tools.ToolsetStatus { return nil }
func (m *mockRuntime) RestartToolset(context.Context, string) error { return nil }

func (m *mockRuntime) EmitStartupInfo(ctx context.Context, sess *session.Session, events chan runtime.Event) {
func (m *mockRuntime) EmitStartupInfo(ctx context.Context, sess *session.Session, events runtime.EventSink) {
}
func (m *mockRuntime) ResetStartupInfo() {}
func (m *mockRuntime) RunStream(ctx context.Context, sess *session.Session) <-chan runtime.Event {
Expand All @@ -57,7 +57,7 @@ func (m *mockRuntime) ResumeElicitation(ctx context.Context, action tools.Elicit
return nil
}
func (m *mockRuntime) SessionStore() session.Store { return m.store }
func (m *mockRuntime) Summarize(ctx context.Context, sess *session.Session, additionalPrompt string, events chan runtime.Event) {
func (m *mockRuntime) Summarize(ctx context.Context, sess *session.Session, additionalPrompt string, events runtime.EventSink) {
}
func (m *mockRuntime) PermissionsInfo() *runtime.PermissionsInfo { return nil }
func (m *mockRuntime) CurrentAgentSkillsToolset() *skillstool.Toolset {
Expand Down
20 changes: 10 additions & 10 deletions pkg/cli/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ func (m *mockRuntime) CurrentAgentName() string { return "test" }
func (m *mockRuntime) CurrentAgentInfo(context.Context) runtime.CurrentAgentInfo {
return runtime.CurrentAgentInfo{Name: "test"}
}
func (m *mockRuntime) SetCurrentAgent(string) error { return nil }
func (m *mockRuntime) CurrentAgentTools(context.Context) ([]tools.Tool, error) { return nil, nil }
func (m *mockRuntime) CurrentAgentToolsetStatuses() []tools.ToolsetStatus { return nil }
func (m *mockRuntime) RestartToolset(context.Context, string) error { return nil }
func (m *mockRuntime) EmitStartupInfo(context.Context, *session.Session, chan runtime.Event) {}
func (m *mockRuntime) ResetStartupInfo() {}
func (m *mockRuntime) SetCurrentAgent(string) error { return nil }
func (m *mockRuntime) CurrentAgentTools(context.Context) ([]tools.Tool, error) { return nil, nil }
func (m *mockRuntime) CurrentAgentToolsetStatuses() []tools.ToolsetStatus { return nil }
func (m *mockRuntime) RestartToolset(context.Context, string) error { return nil }
func (m *mockRuntime) EmitStartupInfo(context.Context, *session.Session, runtime.EventSink) {}
func (m *mockRuntime) ResetStartupInfo() {}
func (m *mockRuntime) Run(context.Context, *session.Session) ([]session.Message, error) {
return nil, nil
}
Expand All @@ -48,10 +48,10 @@ func (m *mockRuntime) ResumeElicitation(_ context.Context, action tools.Elicitat
m.elicitationLastAction = action
return nil
}
func (m *mockRuntime) SessionStore() session.Store { return nil }
func (m *mockRuntime) Summarize(context.Context, *session.Session, string, chan runtime.Event) {}
func (m *mockRuntime) PermissionsInfo() *runtime.PermissionsInfo { return nil }
func (m *mockRuntime) CurrentAgentSkillsToolset() *skillstool.Toolset { return nil }
func (m *mockRuntime) SessionStore() session.Store { return nil }
func (m *mockRuntime) Summarize(context.Context, *session.Session, string, runtime.EventSink) {}
func (m *mockRuntime) PermissionsInfo() *runtime.PermissionsInfo { return nil }
func (m *mockRuntime) CurrentAgentSkillsToolset() *skillstool.Toolset { return nil }
func (m *mockRuntime) CurrentMCPPrompts(context.Context) map[string]mcptools.PromptInfo {
return nil
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/runtime/agent_delegation.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,16 @@ func mergeExcludedTools(parent, child []string) []string {
//
// Use as `defer r.swapCurrentAgent(ctx, sessionID, from, to, evts)()` so the
// swap takes effect immediately and the restore runs at function exit.
func (r *LocalRuntime) swapCurrentAgent(ctx context.Context, sessionID string, from, to *agent.Agent, evts chan<- Event) func() {
evts <- AgentSwitching(true, from.Name(), to.Name())
func (r *LocalRuntime) swapCurrentAgent(ctx context.Context, sessionID string, from, to *agent.Agent, evts EventSink) func() {
evts.Emit(AgentSwitching(true, from.Name(), to.Name()))
r.executeOnAgentSwitchHooks(ctx, from, sessionID, from.Name(), to.Name(), agentSwitchKindTransferTask)
r.setCurrentAgent(to.Name())
evts <- AgentInfo(to.Name(), getAgentModelID(to), to.Description(), to.WelcomeMessage())
evts.Emit(AgentInfo(to.Name(), getAgentModelID(to), to.Description(), to.WelcomeMessage()))
return func() {
r.setCurrentAgent(from.Name())
evts <- AgentSwitching(false, to.Name(), from.Name())
evts.Emit(AgentSwitching(false, to.Name(), from.Name()))
r.executeOnAgentSwitchHooks(ctx, from, sessionID, to.Name(), from.Name(), agentSwitchKindTransferTaskReturn)
evts <- AgentInfo(from.Name(), getAgentModelID(from), from.Description(), from.WelcomeMessage())
evts.Emit(AgentInfo(from.Name(), getAgentModelID(from), from.Description(), from.WelcomeMessage()))
}
}

Expand All @@ -245,7 +245,7 @@ func (r *LocalRuntime) swapCurrentAgent(ctx context.Context, sessionID string, f
// swapping the current agent (if requested), resolving the child agent,
// building the sub-session, driving RunStream, and recording the
// sub-session on the parent.
func (r *LocalRuntime) runForwarding(ctx context.Context, parent *session.Session, evts chan<- Event, req delegationRequest) (*tools.ToolCallResult, error) {
func (r *LocalRuntime) runForwarding(ctx context.Context, parent *session.Session, evts EventSink, req delegationRequest) (*tools.ToolCallResult, error) {
span := trace.SpanFromContext(ctx)

callerAgent, err := r.team.Agent(r.CurrentAgentName())
Expand Down Expand Up @@ -276,12 +276,12 @@ func (r *LocalRuntime) runForwarding(ctx context.Context, parent *session.Sessio

childEvents := r.RunStream(ctx, s)
for event := range childEvents {
evts <- event
evts.Emit(event)
if errEvent, ok := event.(*ErrorEvent); ok {
// Drain remaining events (including StreamStoppedEvent) so the
// TUI's streamDepth counter stays balanced.
for remaining := range childEvents {
evts <- remaining
evts.Emit(remaining)
}
err := fmt.Errorf("%s", errEvent.Error)
span.RecordError(err)
Expand All @@ -292,7 +292,7 @@ func (r *LocalRuntime) runForwarding(ctx context.Context, parent *session.Sessio

parent.ToolsApproved = s.ToolsApproved
parent.AddSubSession(s)
evts <- SubSessionCompleted(parent.ID, s, callerAgent.Name())
evts.Emit(SubSessionCompleted(parent.ID, s, callerAgent.Name()))

span.SetStatus(codes.Ok, "sub-session completed")
return tools.ResultSuccess(s.GetLastAssistantMessageContent()), nil
Expand Down Expand Up @@ -391,7 +391,7 @@ func (r *LocalRuntime) RunAgent(ctx context.Context, params agenttool.RunParams)
}, params.OnContent)
}

func (r *LocalRuntime) handleTaskTransfer(ctx context.Context, sess *session.Session, toolCall tools.ToolCall, evts chan Event) (*tools.ToolCallResult, error) {
func (r *LocalRuntime) handleTaskTransfer(ctx context.Context, sess *session.Session, toolCall tools.ToolCall, evts EventSink) (*tools.ToolCallResult, error) {
var params struct {
Agent string `json:"agent"`
Task string `json:"task"`
Expand Down Expand Up @@ -428,7 +428,7 @@ func (r *LocalRuntime) handleTaskTransfer(ctx context.Context, sess *session.Ses
})
}

func (r *LocalRuntime) handleHandoff(ctx context.Context, sess *session.Session, toolCall tools.ToolCall, _ chan Event) (*tools.ToolCallResult, error) {
func (r *LocalRuntime) handleHandoff(ctx context.Context, sess *session.Session, toolCall tools.ToolCall, _ EventSink) (*tools.ToolCallResult, error) {
var params handoff.Args
if err := json.Unmarshal([]byte(toolCall.Function.Arguments), &params); err != nil {
return nil, fmt.Errorf("invalid arguments: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/runtime/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (r *LocalRuntime) tryReplayCachedResponse(
ctx context.Context,
sess *session.Session,
a *agent.Agent,
events chan Event,
events EventSink,
) bool {
c := a.Cache()
if c == nil {
Expand All @@ -76,7 +76,7 @@ func (r *LocalRuntime) tryReplayCachedResponse(
slog.DebugContext(ctx, "Response cache hit; replaying cached answer",
"agent", a.Name(), "session_id", sess.ID)
modelID := a.Model(ctx).ID()
events <- AgentInfo(a.Name(), modelID, a.Description(), a.WelcomeMessage())
events.Emit(AgentInfo(a.Name(), modelID, a.Description(), a.WelcomeMessage()))
addAgentMessage(sess, a, &chat.Message{
Role: chat.MessageRoleAssistant,
Content: cached,
Expand Down
6 changes: 3 additions & 3 deletions pkg/runtime/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func (m *mockRuntime) CurrentAgentInfo(context.Context) CurrentAgentInfo {
func (m *mockRuntime) SetCurrentAgent(string) error {
return nil
}
func (m *mockRuntime) EmitStartupInfo(context.Context, *session.Session, chan Event) {}
func (m *mockRuntime) ResetStartupInfo() {}
func (m *mockRuntime) EmitStartupInfo(context.Context, *session.Session, EventSink) {}
func (m *mockRuntime) ResetStartupInfo() {}
func (m *mockRuntime) RunStream(context.Context, *session.Session) <-chan Event {
return nil
}
Expand All @@ -51,7 +51,7 @@ func (m *mockRuntime) ResumeElicitation(context.Context, tools.ElicitationAction
return nil
}
func (m *mockRuntime) SessionStore() session.Store { return nil }
func (m *mockRuntime) Summarize(context.Context, *session.Session, string, chan Event) {
func (m *mockRuntime) Summarize(context.Context, *session.Session, string, EventSink) {
}
func (m *mockRuntime) PermissionsInfo() *PermissionsInfo { return nil }
func (m *mockRuntime) CurrentAgentSkillsToolset() *skillstool.Toolset {
Expand Down
75 changes: 75 additions & 0 deletions pkg/runtime/event_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package runtime

// EventSink is the write side of the runtime's event stream. Methods
// that produce events accept an EventSink instead of a raw chan Event,
// decoupling event producers from the channel implementation.
//
// Implementations must be safe for concurrent use: multiple goroutines
// may call Emit simultaneously (e.g. tool-call handlers running in
// parallel).
type EventSink interface {
// Emit delivers an event to the sink. The default implementation
// preserves back-pressure: if the consumer is slow, Emit blocks
// until the event can be delivered. Implementations must still be
// panic-safe so a closed channel does not crash the caller.
Emit(event Event)
}

// channelSink adapts a chan Event into an [EventSink]. Sends are
// blocking so back-pressure is preserved between producers and the
// consumer. Send-on-closed-channel panics are recovered and the event
// is dropped, since a closed channel signals that the consumer has
// gone away.
type channelSink struct {
ch chan Event
}

// NewChannelSink returns an [EventSink] that writes to ch. This is the
// standard bridge between the runtime's EventSink-based internals and
// external callers that create raw event channels (e.g. app.go,
// tests).
func NewChannelSink(ch chan Event) EventSink {
return &channelSink{ch: ch}
}

func (s *channelSink) Emit(e Event) {
defer func() { recover() }() //nolint:errcheck // swallow send-on-closed-channel panic
s.ch <- e
}

// nonBlockingChannelSink wraps an event channel with non-blocking
// semantics: if the buffer is full, the event is dropped instead of
// blocking the producer. Use this only from long-lived goroutines
// that may outlive the event channel (e.g. the RAG file watcher);
// regular runtime code should use the blocking [channelSink] so
// back-pressure is preserved.
type nonBlockingChannelSink struct {
ch chan Event
}

// nonBlocking returns a non-blocking sink for sink. If sink wraps a
// channel directly, the result writes to that channel with a
// select-default; otherwise, sink is returned unchanged because
// non-channel sinks (notably [EventSinkFunc] used in tests) do not
// have an underlying buffer that can fill up.
func nonBlocking(sink EventSink) EventSink {
if cs, ok := sink.(*channelSink); ok {
return nonBlockingChannelSink{ch: cs.ch}
}
return sink
}

func (s nonBlockingChannelSink) Emit(e Event) {
defer func() { recover() }() //nolint:errcheck // swallow send-on-closed-channel panic
select {
case s.ch <- e:
default:
}
}

// EventSinkFunc adapts a plain function into an [EventSink], following
// the same adapter pattern as http.HandlerFunc. This is convenient for
// tests and one-off callbacks that don't need a full struct.
type EventSinkFunc func(Event)

func (f EventSinkFunc) Emit(e Event) { f(e) }
8 changes: 4 additions & 4 deletions pkg/runtime/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (e *fallbackExecutor) execute(
agentTools []tools.Tool,
sess *session.Session,
m *modelsdev.Model,
events chan Event,
events EventSink,
) (streamResult, provider.Provider, error) {
fallbackModels := a.FallbackModels()
fallbackRetries := getEffectiveRetries(a)
Expand Down Expand Up @@ -272,14 +272,14 @@ func (e *fallbackExecutor) execute(
if lastErr != nil {
reason = lastErr.Error()
}
events <- ModelFallback(
events.Emit(ModelFallback(
a.Name(),
prevModelID,
modelEntry.provider.ID(),
reason,
attempt+1,
maxAttempts,
)
))
}

slog.DebugContext(ctx, "Creating chat completion stream",
Expand Down Expand Up @@ -308,7 +308,7 @@ func (e *fallbackExecutor) execute(
// of the selected sub-model's YAML-configured name.
if rp, ok := modelEntry.provider.(interface{ LastSelectedModelID() string }); ok {
if selected := rp.LastSelectedModelID(); selected != "" {
events <- AgentInfo(a.Name(), selected, a.Description(), a.WelcomeMessage())
events.Emit(AgentInfo(a.Name(), selected, a.Description(), a.WelcomeMessage()))
}
}

Expand Down
Loading
Loading