diff --git a/pkg/runtime/loop.go b/pkg/runtime/loop.go index 4952f3f91..d8cb17af9 100644 --- a/pkg/runtime/loop.go +++ b/pkg/runtime/loop.go @@ -198,7 +198,10 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, // extras and threaded into every model call below — never persisted, // to keep the visible transcript clean and the user message tail // stable. - sessionStartMsgs := r.executeSessionStartHooks(ctx, sess, a, events) + ls := &loopState{ + maxIterations: sess.MaxIterations, + sessionStartMsgs: r.executeSessionStartHooks(ctx, sess, a, events), + } // Emit team information events <- TeamInfo(r.agentDetailsFromTeam(), a.Name()) @@ -222,7 +225,6 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, // no human authored. SendUserMessage is the same flag the runtime // uses to gate the UserMessageEvent, which is exactly the right // signal here too: "a real user prompt is at the tail of the session". - var userPromptMsgs []chat.Message if sess.SendUserMessage && len(messages) > 0 { lastMsg := messages[len(messages)-1] events <- UserMessage(lastMsg.Content, sess.ID, lastMsg.MultiContent, len(sess.Messages)-1) @@ -237,7 +239,7 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, r.emitHookDrivenShutdown(ctx, a, sess, msg, events) return } - userPromptMsgs = ctxMsgs + ls.userPromptMsgs = ctxMsgs } } @@ -253,12 +255,7 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, return } - iteration := 0 - // Use a runtime copy of maxIterations so we don't modify the session's persistent config - runtimeMaxIterations := sess.MaxIterations - - // Initialize consecutive duplicate tool call detector - // + // Initialize consecutive duplicate tool call detector. // Polling tools (view_background_agent, view_background_job) are // expected to be called repeatedly with identical arguments while a // background task is in progress. Exempt them so they never trigger @@ -267,24 +264,11 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, if loopThreshold == 0 { loopThreshold = 5 // default: always active } - loopDetector := toolexec.NewLoopDetector(loopThreshold, + ls.loopDetector = toolexec.NewLoopDetector(loopThreshold, bgagent.ToolNameViewBackgroundAgent, shell.ToolNameViewBackgroundJob, ) - // overflowCompactions counts how many consecutive context-overflow - // auto-compactions have been attempted without a successful model - // call in between. The cap (r.maxOverflowCompactions) prevents an - // infinite loop when compaction cannot reduce the context below the - // model's limit; see defaultMaxOverflowCompactions for the default - // and WithMaxOverflowCompactions for the test seam. - var overflowCompactions int - - // toolModelOverride holds the per-toolset model from the most recent - // tool calls. It applies for one LLM turn, then resets. - var toolModelOverride string - var prevAgentName string - for { // Pause the loop here if /pause has been toggled on. Any in-flight // LLM request and its tool calls have already completed. @@ -296,9 +280,9 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, // Clear per-tool model override on agent switch so it doesn't // leak from one agent's toolset into another agent's turn. - if a.Name() != prevAgentName { - toolModelOverride = "" - prevAgentName = a.Name() + if a.Name() != ls.prevAgentName { + ls.toolModelOverride = "" + ls.prevAgentName = a.Name() } r.emitAgentWarnings(a, chanSend(events)) @@ -317,13 +301,13 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, events <- ToolsetInfo(len(agentTools), false, a.Name()) // Check iteration limit - newMax, decision := r.enforceMaxIterations(ctx, sess, a, iteration, runtimeMaxIterations, events) + newMax, decision := r.enforceMaxIterations(ctx, sess, a, ls.iteration, ls.maxIterations, events) if decision == iterationStop { return } - runtimeMaxIterations = newMax + ls.maxIterations = newMax - iteration++ + ls.iteration++ // Exit immediately if the stream context has been cancelled (e.g., Ctrl+C) if err := ctx.Err(); err != nil { @@ -336,16 +320,16 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, // Per-tool model routing: use a cheaper model for this turn // if the previous tool calls specified one, then reset. - if toolModelOverride != "" { - if overrideModel, err := r.resolveModelRef(ctx, toolModelOverride); err != nil { + if ls.toolModelOverride != "" { + if overrideModel, err := r.resolveModelRef(ctx, ls.toolModelOverride); err != nil { slog.WarnContext(ctx, "Failed to resolve per-tool model override; using agent default", - "model_override", toolModelOverride, "error", err) + "model_override", ls.toolModelOverride, "error", err) } else { slog.InfoContext(ctx, "Using per-tool model override for this turn", "agent", a.Name(), "override", overrideModel.ID(), "primary", model.ID()) model = overrideModel } - toolModelOverride = "" + ls.toolModelOverride = "" } modelID := model.ID() @@ -386,9 +370,7 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, // AFTER the closure body has assigned both, so callers see the same // reason the runtime took. ctrl drives the outer for-loop's // continue-or-exit decision. - ctrl := r.runTurn(ctx, sess, a, m, model, modelID, iteration, contextLimit, sessionSpan, - slices.Concat(sessionStartMsgs, userPromptMsgs), - agentTools, loopDetector, &overflowCompactions, &toolModelOverride, events) + ctrl := r.runTurn(ctx, sess, a, m, model, modelID, contextLimit, sessionSpan, agentTools, ls, events) switch ctrl { case turnContinue: continue @@ -414,6 +396,23 @@ const ( turnExit ) +// loopState bundles the mutable per-RunStream state that persists across +// iterations. Previously these were individual local variables in +// runStreamLoop and pointer parameters of runTurn; grouping them in a +// struct keeps the function signatures small and makes it trivial to add +// new per-stream tracking (cost ceiling, token budget, turn timing) +// without touching any signature. +type loopState struct { + iteration int + maxIterations int + overflowCompactions int + toolModelOverride string + prevAgentName string + loopDetector *toolexec.LoopDetector + sessionStartMsgs []chat.Message + userPromptMsgs []chat.Message +} + // runTurn performs one iteration of the run-stream loop, from // turn_start onwards. Wrapping the body in its own function exists for // one reason: a deferred call can fire turn_end on every exit path — a @@ -424,11 +423,10 @@ const ( // it before falling out; the deferred call reads it AFTER the body has // assigned the final value. // -// The outer loop owns persistent per-stream state (iteration counter, -// session-start extras, agent-switch tracking); per-turn state that -// needs to survive into the next iteration (overflowCompactions, -// toolModelOverride) is passed by pointer so this function can mutate -// it the same way the inline body did. +// The outer loop owns persistent per-stream state via ls ([loopState]); +// per-turn state that needs to survive into the next iteration +// (overflowCompactions, toolModelOverride) is mutated through the +// shared loopState pointer. func (r *LocalRuntime) runTurn( ctx context.Context, sess *session.Session, @@ -436,14 +434,10 @@ func (r *LocalRuntime) runTurn( m *modelsdev.Model, model provider.Provider, modelID string, - iteration int, contextLimit int64, sessionSpan trace.Span, - priorExtras []chat.Message, agentTools []tools.Tool, - loopDetector *toolexec.LoopDetector, - overflowCompactions *int, - toolModelOverride *string, + ls *loopState, events chan Event, ) (ctrl turnControl) { streamCtx, streamSpan := r.startSpan(ctx, "runtime.stream", trace.WithAttributes( @@ -492,7 +486,7 @@ func (r *LocalRuntime) runTurn( // files) refresh every turn while session-level context (cwd, OS, // arch) stays stable — all without bloating the stored history. turnStartMsgs := r.executeTurnStartHooks(ctx, sess, a, events) - messages := sess.GetMessages(a, slices.Concat(priorExtras, turnStartMsgs)...) + messages := sess.GetMessages(a, slices.Concat(ls.sessionStartMsgs, ls.userPromptMsgs, turnStartMsgs)...) slog.DebugContext(ctx, "Retrieved messages for processing", "agent", a.Name(), "message_count", len(messages)) // before_llm_call hooks fire just before the model is invoked. @@ -505,7 +499,7 @@ func (r *LocalRuntime) runTurn( // runtime's Go-only message transforms so a hook that drops a // message (e.g. a custom "strip system reminders") doesn't get // silently overridden by a transform later in the chain. - stop, msg, rewritten := r.executeBeforeLLMCallHooks(ctx, sess, a, modelID, iteration, messages) + stop, msg, rewritten := r.executeBeforeLLMCallHooks(ctx, sess, a, modelID, ls.iteration, messages) if stop { slog.WarnContext(ctx, "before_llm_call hook signalled run termination", "agent", a.Name(), "session_id", sess.ID, "reason", msg) @@ -531,7 +525,7 @@ func (r *LocalRuntime) runTurn( // Try primary model with fallback chain if configured res, usedModel, err := r.fallback.execute(streamCtx, a, model, messages, agentTools, sess, m, events) if err != nil { - outcome := r.handleStreamError(ctx, sess, a, err, contextLimit, overflowCompactions, streamSpan, events) + outcome := r.handleStreamError(ctx, sess, a, err, contextLimit, &ls.overflowCompactions, streamSpan, events) endStreamSpan() endReason = turnEndReasonError if outcome == streamErrorRetry { @@ -541,7 +535,7 @@ func (r *LocalRuntime) runTurn( } // A successful model call resets the overflow compaction counter. - *overflowCompactions = 0 + ls.overflowCompactions = 0 // after_llm_call hooks fire on success only; failed calls // fire on_error above. The assistant text content is passed @@ -588,12 +582,12 @@ func (r *LocalRuntime) runTurn( } // Check for degenerate tool call loops - if loopDetector.Record(res.Calls) { + if ls.loopDetector.Record(res.Calls) { toolName := "unknown" if len(res.Calls) > 0 { toolName = res.Calls[0].Function.Name } - consecutive := loopDetector.Consecutive() + consecutive := ls.loopDetector.Consecutive() slog.WarnContext(ctx, "Repetitive tool call loop detected", "agent", a.Name(), "tool", toolName, "consecutive", consecutive, "session_id", sess.ID) @@ -603,7 +597,7 @@ func (r *LocalRuntime) runTurn( consecutive, toolName) events <- Error(errMsg) r.notifyError(ctx, a, sess.ID, errMsg) - loopDetector.Reset() + ls.loopDetector.Reset() endReason = turnEndReasonLoopDetected return turnExit } @@ -622,7 +616,7 @@ func (r *LocalRuntime) runTurn( } // Record per-toolset model override for the next LLM turn. - *toolModelOverride = toolexec.ResolveModelOverride(res.Calls, agentTools) + ls.toolModelOverride = toolexec.ResolveModelOverride(res.Calls, agentTools) // Drain steer messages that arrived during tool calls. if drained, _ := r.drainAndEmitSteered(ctx, sess, events); drained {