Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 48 additions & 54 deletions pkg/runtime/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,10 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session,
// extras and threaded into every model call below — never persisted,
// to keep the visible transcript clean and the user message tail
// stable.
sessionStartMsgs := r.executeSessionStartHooks(ctx, sess, a, events)
ls := &loopState{
maxIterations: sess.MaxIterations,
sessionStartMsgs: r.executeSessionStartHooks(ctx, sess, a, events),
}

// Emit team information
events <- TeamInfo(r.agentDetailsFromTeam(), a.Name())
Expand All @@ -222,7 +225,6 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session,
// no human authored. SendUserMessage is the same flag the runtime
// uses to gate the UserMessageEvent, which is exactly the right
// signal here too: "a real user prompt is at the tail of the session".
var userPromptMsgs []chat.Message
if sess.SendUserMessage && len(messages) > 0 {
lastMsg := messages[len(messages)-1]
events <- UserMessage(lastMsg.Content, sess.ID, lastMsg.MultiContent, len(sess.Messages)-1)
Expand All @@ -237,7 +239,7 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session,
r.emitHookDrivenShutdown(ctx, a, sess, msg, events)
return
}
userPromptMsgs = ctxMsgs
ls.userPromptMsgs = ctxMsgs
}
}

Expand All @@ -253,12 +255,7 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session,
return
}

iteration := 0
// Use a runtime copy of maxIterations so we don't modify the session's persistent config
runtimeMaxIterations := sess.MaxIterations

// Initialize consecutive duplicate tool call detector
//
// Initialize consecutive duplicate tool call detector.
// Polling tools (view_background_agent, view_background_job) are
// expected to be called repeatedly with identical arguments while a
// background task is in progress. Exempt them so they never trigger
Expand All @@ -267,24 +264,11 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session,
if loopThreshold == 0 {
loopThreshold = 5 // default: always active
}
loopDetector := toolexec.NewLoopDetector(loopThreshold,
ls.loopDetector = toolexec.NewLoopDetector(loopThreshold,
bgagent.ToolNameViewBackgroundAgent,
shell.ToolNameViewBackgroundJob,
)

// overflowCompactions counts how many consecutive context-overflow
// auto-compactions have been attempted without a successful model
// call in between. The cap (r.maxOverflowCompactions) prevents an
// infinite loop when compaction cannot reduce the context below the
// model's limit; see defaultMaxOverflowCompactions for the default
// and WithMaxOverflowCompactions for the test seam.
var overflowCompactions int

// toolModelOverride holds the per-toolset model from the most recent
// tool calls. It applies for one LLM turn, then resets.
var toolModelOverride string
var prevAgentName string

for {
// Pause the loop here if /pause has been toggled on. Any in-flight
// LLM request and its tool calls have already completed.
Expand All @@ -296,9 +280,9 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session,

// Clear per-tool model override on agent switch so it doesn't
// leak from one agent's toolset into another agent's turn.
if a.Name() != prevAgentName {
toolModelOverride = ""
prevAgentName = a.Name()
if a.Name() != ls.prevAgentName {
ls.toolModelOverride = ""
ls.prevAgentName = a.Name()
}

r.emitAgentWarnings(a, chanSend(events))
Expand All @@ -317,13 +301,13 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session,
events <- ToolsetInfo(len(agentTools), false, a.Name())

// Check iteration limit
newMax, decision := r.enforceMaxIterations(ctx, sess, a, iteration, runtimeMaxIterations, events)
newMax, decision := r.enforceMaxIterations(ctx, sess, a, ls.iteration, ls.maxIterations, events)
if decision == iterationStop {
return
}
runtimeMaxIterations = newMax
ls.maxIterations = newMax

iteration++
ls.iteration++

// Exit immediately if the stream context has been cancelled (e.g., Ctrl+C)
if err := ctx.Err(); err != nil {
Expand All @@ -336,16 +320,16 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session,

// Per-tool model routing: use a cheaper model for this turn
// if the previous tool calls specified one, then reset.
if toolModelOverride != "" {
if overrideModel, err := r.resolveModelRef(ctx, toolModelOverride); err != nil {
if ls.toolModelOverride != "" {
if overrideModel, err := r.resolveModelRef(ctx, ls.toolModelOverride); err != nil {
slog.WarnContext(ctx, "Failed to resolve per-tool model override; using agent default",
"model_override", toolModelOverride, "error", err)
"model_override", ls.toolModelOverride, "error", err)
} else {
slog.InfoContext(ctx, "Using per-tool model override for this turn",
"agent", a.Name(), "override", overrideModel.ID(), "primary", model.ID())
model = overrideModel
}
toolModelOverride = ""
ls.toolModelOverride = ""
}

modelID := model.ID()
Expand Down Expand Up @@ -386,9 +370,7 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session,
// AFTER the closure body has assigned both, so callers see the same
// reason the runtime took. ctrl drives the outer for-loop's
// continue-or-exit decision.
ctrl := r.runTurn(ctx, sess, a, m, model, modelID, iteration, contextLimit, sessionSpan,
slices.Concat(sessionStartMsgs, userPromptMsgs),
agentTools, loopDetector, &overflowCompactions, &toolModelOverride, events)
ctrl := r.runTurn(ctx, sess, a, m, model, modelID, contextLimit, sessionSpan, agentTools, ls, events)
switch ctrl {
case turnContinue:
continue
Expand All @@ -414,6 +396,23 @@ const (
turnExit
)

// loopState bundles the mutable per-RunStream state that persists across
// iterations. Previously these were individual local variables in
// runStreamLoop and pointer parameters of runTurn; grouping them in a
// struct keeps the function signatures small and makes it trivial to add
// new per-stream tracking (cost ceiling, token budget, turn timing)
// without touching any signature.
type loopState struct {
iteration int
maxIterations int
overflowCompactions int
toolModelOverride string
prevAgentName string
loopDetector *toolexec.LoopDetector
sessionStartMsgs []chat.Message
userPromptMsgs []chat.Message
}

// runTurn performs one iteration of the run-stream loop, from
// turn_start onwards. Wrapping the body in its own function exists for
// one reason: a deferred call can fire turn_end on every exit path — a
Expand All @@ -424,26 +423,21 @@ const (
// it before falling out; the deferred call reads it AFTER the body has
// assigned the final value.
//
// The outer loop owns persistent per-stream state (iteration counter,
// session-start extras, agent-switch tracking); per-turn state that
// needs to survive into the next iteration (overflowCompactions,
// toolModelOverride) is passed by pointer so this function can mutate
// it the same way the inline body did.
// The outer loop owns persistent per-stream state via ls ([loopState]);
// per-turn state that needs to survive into the next iteration
// (overflowCompactions, toolModelOverride) is mutated through the
// shared loopState pointer.
func (r *LocalRuntime) runTurn(
ctx context.Context,
sess *session.Session,
a *agent.Agent,
m *modelsdev.Model,
model provider.Provider,
modelID string,
iteration int,
contextLimit int64,
sessionSpan trace.Span,
priorExtras []chat.Message,
agentTools []tools.Tool,
loopDetector *toolexec.LoopDetector,
overflowCompactions *int,
toolModelOverride *string,
ls *loopState,
events chan Event,
) (ctrl turnControl) {
streamCtx, streamSpan := r.startSpan(ctx, "runtime.stream", trace.WithAttributes(
Expand Down Expand Up @@ -492,7 +486,7 @@ func (r *LocalRuntime) runTurn(
// files) refresh every turn while session-level context (cwd, OS,
// arch) stays stable — all without bloating the stored history.
turnStartMsgs := r.executeTurnStartHooks(ctx, sess, a, events)
messages := sess.GetMessages(a, slices.Concat(priorExtras, turnStartMsgs)...)
messages := sess.GetMessages(a, slices.Concat(ls.sessionStartMsgs, ls.userPromptMsgs, turnStartMsgs)...)
slog.DebugContext(ctx, "Retrieved messages for processing", "agent", a.Name(), "message_count", len(messages))

// before_llm_call hooks fire just before the model is invoked.
Expand All @@ -505,7 +499,7 @@ func (r *LocalRuntime) runTurn(
// runtime's Go-only message transforms so a hook that drops a
// message (e.g. a custom "strip system reminders") doesn't get
// silently overridden by a transform later in the chain.
stop, msg, rewritten := r.executeBeforeLLMCallHooks(ctx, sess, a, modelID, iteration, messages)
stop, msg, rewritten := r.executeBeforeLLMCallHooks(ctx, sess, a, modelID, ls.iteration, messages)
if stop {
slog.WarnContext(ctx, "before_llm_call hook signalled run termination",
"agent", a.Name(), "session_id", sess.ID, "reason", msg)
Expand All @@ -531,7 +525,7 @@ func (r *LocalRuntime) runTurn(
// Try primary model with fallback chain if configured
res, usedModel, err := r.fallback.execute(streamCtx, a, model, messages, agentTools, sess, m, events)
if err != nil {
outcome := r.handleStreamError(ctx, sess, a, err, contextLimit, overflowCompactions, streamSpan, events)
outcome := r.handleStreamError(ctx, sess, a, err, contextLimit, &ls.overflowCompactions, streamSpan, events)
endStreamSpan()
endReason = turnEndReasonError
if outcome == streamErrorRetry {
Expand All @@ -541,7 +535,7 @@ func (r *LocalRuntime) runTurn(
}

// A successful model call resets the overflow compaction counter.
*overflowCompactions = 0
ls.overflowCompactions = 0

// after_llm_call hooks fire on success only; failed calls
// fire on_error above. The assistant text content is passed
Expand Down Expand Up @@ -588,12 +582,12 @@ func (r *LocalRuntime) runTurn(
}

// Check for degenerate tool call loops
if loopDetector.Record(res.Calls) {
if ls.loopDetector.Record(res.Calls) {
toolName := "unknown"
if len(res.Calls) > 0 {
toolName = res.Calls[0].Function.Name
}
consecutive := loopDetector.Consecutive()
consecutive := ls.loopDetector.Consecutive()
slog.WarnContext(ctx, "Repetitive tool call loop detected",
"agent", a.Name(), "tool", toolName,
"consecutive", consecutive, "session_id", sess.ID)
Expand All @@ -603,7 +597,7 @@ func (r *LocalRuntime) runTurn(
consecutive, toolName)
events <- Error(errMsg)
r.notifyError(ctx, a, sess.ID, errMsg)
loopDetector.Reset()
ls.loopDetector.Reset()
endReason = turnEndReasonLoopDetected
return turnExit
}
Expand All @@ -622,7 +616,7 @@ func (r *LocalRuntime) runTurn(
}

// Record per-toolset model override for the next LLM turn.
*toolModelOverride = toolexec.ResolveModelOverride(res.Calls, agentTools)
ls.toolModelOverride = toolexec.ResolveModelOverride(res.Calls, agentTools)

// Drain steer messages that arrived during tool calls.
if drained, _ := r.drainAndEmitSteered(ctx, sess, events); drained {
Expand Down
Loading