Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/runtime/agent_delegation.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (r *LocalRuntime) RunAgent(ctx context.Context, params agenttool.RunParams)
}, params.OnContent)
}

func (r *LocalRuntime) handleTaskTransfer(ctx context.Context, sess *session.Session, toolCall tools.ToolCall, evts EventSink) (*tools.ToolCallResult, error) {
func (r *LocalRuntime) handleTaskTransfer(ctx context.Context, sess *session.Session, toolCall tools.ToolCall) (*tools.ToolCallResult, error) {
var params struct {
Agent string `json:"agent"`
Task string `json:"task"`
Expand All @@ -415,7 +415,7 @@ func (r *LocalRuntime) handleTaskTransfer(ctx context.Context, sess *session.Ses
))
defer span.End()

return r.runForwarding(ctx, sess, evts, delegationRequest{
return r.runForwarding(ctx, sess, r.toolSink, delegationRequest{
SubSessionConfig: SubSessionConfig{
Task: params.Task,
ExpectedOutput: params.ExpectedOutput,
Expand All @@ -428,7 +428,7 @@ func (r *LocalRuntime) handleTaskTransfer(ctx context.Context, sess *session.Ses
})
}

func (r *LocalRuntime) handleHandoff(ctx context.Context, sess *session.Session, toolCall tools.ToolCall, _ EventSink) (*tools.ToolCallResult, error) {
func (r *LocalRuntime) handleHandoff(ctx context.Context, sess *session.Session, toolCall tools.ToolCall) (*tools.ToolCallResult, error) {
var params handoff.Args
if err := json.Unmarshal([]byte(toolCall.Function.Arguments), &params); err != nil {
return nil, fmt.Errorf("invalid arguments: %w", err)
Expand Down
198 changes: 100 additions & 98 deletions pkg/runtime/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/docker/docker-agent/pkg/agent"
"github.com/docker/docker-agent/pkg/chat"
"github.com/docker/docker-agent/pkg/compaction"
"github.com/docker/docker-agent/pkg/httpclient"
"github.com/docker/docker-agent/pkg/model/provider"
"github.com/docker/docker-agent/pkg/modelsdev"
Expand All @@ -38,11 +37,8 @@ func (r *LocalRuntime) registerDefaultTools() {
r.toolMap[modelpicker.ToolNameChangeModel] = r.handleChangeModel
r.toolMap[modelpicker.ToolNameRevertModel] = r.handleRevertModel
r.toolMap[skills.ToolNameRunSkill] = r.handleRunSkill

r.bgAgents.RegisterHandlers(func(name string, fn func(context.Context, *session.Session, tools.ToolCall) (*tools.ToolCallResult, error)) {
r.toolMap[name] = func(ctx context.Context, sess *session.Session, tc tools.ToolCall, _ EventSink) (*tools.ToolCallResult, error) {
return fn(ctx, sess, tc)
}
r.toolMap[name] = fn
})
}

Expand Down Expand Up @@ -366,16 +362,13 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session,
var contextLimit int64
if m != nil {
contextLimit = int64(m.Limit.Context)

if r.sessionCompaction && compaction.ShouldCompact(sess.InputTokens, sess.OutputTokens, 0, contextLimit) {
r.compactWithReason(ctx, sess, "", compactionReasonThreshold, sink)
}
r.compactor.CompactIfOverThreshold(ctx, sess, m, sink)
}

// Drain steer messages queued while idle or before the first model call
// (covers idle-window and first-turn-miss races).
if drained, messageCountBeforeSteer := r.drainAndEmitSteered(ctx, sess, sink); drained {
r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeSteer, sink)
r.compactor.CompactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeSteer, sink)
}

// Everything from turn_start onwards is wrapped in a closure so a
Expand Down Expand Up @@ -509,60 +502,23 @@ func (r *LocalRuntime) runTurn(
messages := sess.GetMessages(a, slices.Concat(ls.sessionStartMsgs, ls.userPromptMsgs, turnStartMsgs)...)
slog.DebugContext(ctx, "Retrieved messages for processing", "agent", a.Name(), "message_count", len(messages))

// before_llm_call hooks fire just before the model is invoked.
// A terminating verdict (e.g. from the max_iterations builtin)
// stops the run loop here, before any tokens are spent. Hooks
// may also rewrite the outgoing messages by returning
// HookSpecificOutput.UpdatedMessages — the redact_secrets
// builtin uses this to scrub secrets from chat content before
// the LLM ever sees it. The rewrite happens BEFORE the
// runtime's Go-only message transforms so a hook that drops a
// message (e.g. a custom "strip system reminders") doesn't get
// silently overridden by a transform later in the chain.
stop, msg, rewritten := r.executeBeforeLLMCallHooks(ctx, sess, a, modelID, ls.iteration, messages)
if stop {
slog.WarnContext(ctx, "before_llm_call hook signalled run termination",
"agent", a.Name(), "session_id", sess.ID, "reason", msg)
r.emitHookDrivenShutdown(ctx, a, sess, msg, events)
res, usedModel, callCtrl := r.callModel(streamCtx, ctx, sess, a, m, model, modelID, messages, agentTools, ls, streamSpan, events)
switch callCtrl {
case modelCallHookBlocked:
endStreamSpan()
endReason = turnEndReasonHookBlocked
return turnExit
}
if rewritten != nil {
messages = rewritten
}

// Apply registered before_llm_call message transforms (e.g.
// strip_unsupported_modalities for text-only models, plus any
// embedder-supplied redactor / scrubber registered via
// WithMessageTransform). Runs after the gate so a transform
// failure cannot waste the gate's allow verdict. modelID is
// passed explicitly so transforms see the actual model the
// loop chose (per-tool override + alloy-mode selection),
// not whatever a fresh agent.Model() call would re-randomize.
messages = r.applyBeforeLLMCallTransforms(ctx, sess, a, modelID, messages)

// Try primary model with fallback chain if configured
res, usedModel, err := r.fallback.execute(streamCtx, a, model, messages, agentTools, sess, m, events)
if err != nil {
outcome := r.handleStreamError(ctx, sess, a, err, contextLimit, &ls.overflowCompactions, streamSpan, events)
case modelCallErrorRetry:
endStreamSpan()
endReason = turnEndReasonError
return turnContinue
case modelCallErrorFatal:
endStreamSpan()
endReason = turnEndReasonError
if outcome == streamErrorRetry {
return turnContinue
}
return turnExit
case modelCallOK:
}

// A successful model call resets the overflow compaction counter.
ls.overflowCompactions = 0

// after_llm_call hooks fire on success only; failed calls
// fire on_error above. The assistant text content is passed
// via stop_response, matching the stop event's payload, so
// handlers can reuse the same parsing.
r.executeAfterLLMCallHooks(ctx, sess, a, res.Content)

if usedModel != nil && usedModel.ID() != model.ID() {
slog.InfoContext(ctx, "Used fallback model", "agent", a.Name(), "primary", model.ID(), "used", usedModel.ID())
events.Emit(AgentInfo(a.Name(), usedModel.ID(), a.Description(), a.WelcomeMessage()))
Expand Down Expand Up @@ -640,7 +596,7 @@ func (r *LocalRuntime) runTurn(

// Drain steer messages that arrived during tool calls.
if drained, _ := r.drainAndEmitSteered(ctx, sess, events); drained {
r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events)
r.compactor.CompactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events)
endReason = turnEndReasonSteered
return turnContinue
}
Expand All @@ -651,7 +607,7 @@ func (r *LocalRuntime) runTurn(

// Re-check steer queue: closes the race between the mid-loop drain and this stop.
if drained, _ := r.drainAndEmitSteered(ctx, sess, events); drained {
r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events)
r.compactor.CompactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events)
endReason = turnEndReasonSteered
return turnContinue
}
Expand All @@ -666,7 +622,7 @@ func (r *LocalRuntime) runTurn(
userMsg := session.UserMessage(followUp.Content, followUp.MultiContent...)
sess.AddMessage(userMsg)
events.Emit(UserMessage(followUp.Content, sess.ID, followUp.MultiContent, len(sess.Messages)-1))
r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events)
r.compactor.CompactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events)
endReason = turnEndReasonContinue
return turnContinue // re-enter the loop for a new turn
}
Expand All @@ -675,11 +631,95 @@ func (r *LocalRuntime) runTurn(
return turnExit
}

r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events)
r.compactor.CompactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events)
endReason = turnEndReasonContinue
return turnContinue
}

// modelCallResult describes the outcome of [LocalRuntime.callModel] so
// the caller can branch without inspecting error values.
type modelCallResult int

const (
modelCallOK modelCallResult = iota // LLM call succeeded; response is valid.
modelCallHookBlocked // A before_llm_call hook signalled termination.
modelCallErrorRetry // Stream error; caller should retry the turn.
modelCallErrorFatal // Stream error; caller should exit the loop.
)

// callModel runs the full LLM invocation pipeline for a single turn:
//
// 1. before_llm_call hooks — gate / rewrite
// 2. before_llm_call message transforms
// 3. fallback.execute (streaming model call with retry chain)
// 4. error handling (overflow compaction, telemetry)
// 5. after_llm_call hooks (success path only)
//
// Extracting this from [runTurn] concentrates the pre/post LLM logic in
// one place and makes it independently testable. The caller still owns
// span lifecycle, turn_end dispatch, and tool processing.
func (r *LocalRuntime) callModel(
streamCtx context.Context,
turnCtx context.Context,
sess *session.Session,
a *agent.Agent,
m *modelsdev.Model,
model provider.Provider,
modelID string,
messages []chat.Message,
agentTools []tools.Tool,
ls *loopState,
streamSpan trace.Span,
events EventSink,
) (streamResult, provider.Provider, modelCallResult) {
// before_llm_call hooks fire just before the model is invoked.
// A terminating verdict (e.g. from the max_iterations builtin)
// stops the run loop here, before any tokens are spent. Hooks
// may also rewrite the outgoing messages.
stop, msg, rewritten := r.executeBeforeLLMCallHooks(turnCtx, sess, a, modelID, ls.iteration, messages)
if stop {
slog.WarnContext(turnCtx, "before_llm_call hook signalled run termination",
"agent", a.Name(), "session_id", sess.ID, "reason", msg)
r.emitHookDrivenShutdown(turnCtx, a, sess, msg, events)
return streamResult{}, nil, modelCallHookBlocked
}
if rewritten != nil {
messages = rewritten
}

// Apply registered before_llm_call message transforms (e.g.
// strip_unsupported_modalities for text-only models). Runs
// after the hook gate so a transform failure cannot waste the
// gate's allow verdict.
messages = r.applyBeforeLLMCallTransforms(turnCtx, sess, a, modelID, messages)

// Try primary model with fallback chain if configured.
res, usedModel, err := r.fallback.execute(streamCtx, a, model, messages, agentTools, sess, m, events)
if err != nil {
outcome := r.handleStreamError(turnCtx, sess, a, err, modelContextLimit(m), &ls.overflowCompactions, streamSpan, events)
if outcome == streamErrorRetry {
return streamResult{}, nil, modelCallErrorRetry
}
return streamResult{}, nil, modelCallErrorFatal
}

// A successful model call resets the overflow compaction counter.
ls.overflowCompactions = 0

// after_llm_call hooks fire on success only.
r.executeAfterLLMCallHooks(turnCtx, sess, a, res.Content)

return res, usedModel, modelCallOK
}

// modelContextLimit returns the model's context window size, or 0 if unknown.
func modelContextLimit(m *modelsdev.Model) int64 {
if m == nil {
return 0
}
return int64(m.Limit.Context)
}

// Run executes the agent loop synchronously and returns the final session
// messages. This is a convenience wrapper around RunStream for non-streaming
// callers.
Expand Down Expand Up @@ -766,44 +806,6 @@ func (r *LocalRuntime) recordAssistantMessage(
return msgUsage
}

// compactIfNeeded estimates the token impact of tool results added since
// messageCountBefore and triggers proactive compaction when the estimated
// total exceeds 90% of the context window. This prevents sending an
// oversized request on the next iteration.
func (r *LocalRuntime) compactIfNeeded(
ctx context.Context,
sess *session.Session,
a *agent.Agent,
m *modelsdev.Model,
contextLimit int64,
messageCountBefore int,
events EventSink,
) {
if m == nil || !r.sessionCompaction || contextLimit <= 0 {
return
}

newMessages := sess.GetAllMessages()[messageCountBefore:]
var addedTokens int64
for _, msg := range newMessages {
addedTokens += compaction.EstimateMessageTokens(&msg.Message)
}

if !compaction.ShouldCompact(sess.InputTokens, sess.OutputTokens, addedTokens, contextLimit) {
return
}

slog.InfoContext(ctx, "Proactive compaction: tool results pushed estimated context past 90%% threshold",
"agent", a.Name(),
"input_tokens", sess.InputTokens,
"output_tokens", sess.OutputTokens,
"added_estimated_tokens", addedTokens,
"estimated_total", sess.InputTokens+sess.OutputTokens+addedTokens,
"context_limit", contextLimit,
)
r.compactWithReason(ctx, sess, "", compactionReasonThreshold, events)
}

// getTools executes tool retrieval with automatic OAuth handling.
// emitLifecycleEvents controls whether MCPInitStarted/Finished are emitted;
// pass false when calling from reprobe to avoid spurious TUI spinner flicker.
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/loop_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (r *LocalRuntime) handleStreamError(
"The conversation has exceeded the model's context window. Automatically compacting the conversation history...",
a.Name(),
))
r.compactWithReason(ctx, sess, "", compactionReasonOverflow, events)
r.compactor.Compact(ctx, sess, "", compactionReasonOverflow, events)
return streamErrorRetry
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/runtime/model_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (r *LocalRuntime) findModelPickerTool() *modelpicker.Tool {
}

// handleChangeModel handles the change_model tool call by switching the current agent's model.
func (r *LocalRuntime) handleChangeModel(ctx context.Context, _ *session.Session, toolCall tools.ToolCall, events EventSink) (*tools.ToolCallResult, error) {
func (r *LocalRuntime) handleChangeModel(ctx context.Context, _ *session.Session, toolCall tools.ToolCall) (*tools.ToolCallResult, error) {
var params modelpicker.ChangeModelArgs
if err := json.Unmarshal([]byte(toolCall.Function.Arguments), &params); err != nil {
return nil, fmt.Errorf("invalid arguments: %w", err)
Expand All @@ -53,25 +53,25 @@ func (r *LocalRuntime) handleChangeModel(ctx context.Context, _ *session.Session
)), nil
}

return r.setModelAndEmitInfo(ctx, params.Model, events)
return r.setModelAndEmitInfo(ctx, params.Model)
}

// handleRevertModel handles the revert_model tool call by reverting the current agent to its default model.
func (r *LocalRuntime) handleRevertModel(ctx context.Context, _ *session.Session, _ tools.ToolCall, events EventSink) (*tools.ToolCallResult, error) {
return r.setModelAndEmitInfo(ctx, "", events)
func (r *LocalRuntime) handleRevertModel(ctx context.Context, _ *session.Session, _ tools.ToolCall) (*tools.ToolCallResult, error) {
return r.setModelAndEmitInfo(ctx, "")
}

// setModelAndEmitInfo sets the model for the current agent and emits an updated
// AgentInfo event so the UI reflects the change. An empty modelRef reverts to
// the agent's default model.
func (r *LocalRuntime) setModelAndEmitInfo(ctx context.Context, modelRef string, events EventSink) (*tools.ToolCallResult, error) {
func (r *LocalRuntime) setModelAndEmitInfo(ctx context.Context, modelRef string) (*tools.ToolCallResult, error) {
currentName := r.CurrentAgentName()
if err := r.SetAgentModel(ctx, currentName, modelRef); err != nil {
return tools.ResultError(fmt.Sprintf("failed to set model: %v", err)), nil
}

if a, err := r.team.Agent(currentName); err == nil {
events.Emit(AgentInfo(a.Name(), r.getEffectiveModelID(a), a.Description(), a.WelcomeMessage()))
if a, err := r.team.Agent(currentName); err == nil && r.toolSink != nil {
r.toolSink.Emit(AgentInfo(a.Name(), r.getEffectiveModelID(a), a.Description(), a.WelcomeMessage()))
} else {
slog.WarnContext(ctx, "Failed to retrieve agent after model change; UI may not reflect the update", "agent", currentName, "error", err)
}
Expand Down
Loading
Loading