diff --git a/cmd/sluice/main.go b/cmd/sluice/main.go index 7968c6e..88c6548 100644 --- a/cmd/sluice/main.go +++ b/cmd/sluice/main.go @@ -513,8 +513,15 @@ func main() { if failoverBroker != nil { // Plain text: TelegramChannel.Notify sends with no parse // mode, so markdown backticks would render literally. + // Exhausted: no distinct member to fail over to (every + // member cooling) — report it as pool exhaustion, NOT a + // self-referential "X -> X" transition. msg := fmt.Sprintf("pool %s failed over %s -> %s (%s)", ev.Pool, ev.From, ev.To, ev.Reason) + if ev.Exhausted { + msg = fmt.Sprintf("pool %s exhausted: all members cooling down (%s); no healthy account to fail over to", + ev.Pool, ev.Reason) + } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() for _, ch := range failoverBroker.Channels() { diff --git a/cmd/sluice/pool.go b/cmd/sluice/pool.go index 4b1a817..ba277a4 100644 --- a/cmd/sluice/pool.go +++ b/cmd/sluice/pool.go @@ -235,7 +235,7 @@ func handlePoolRotate(args []string) error { return fmt.Errorf("pool %q rotate: resolved active member %q is not in the pool snapshot (membership changed under the rotate); re-check with \"sluice pool list %s\"", name, active, name) } until := time.Now().Add(vault.AuthFailCooldown) - wrote, err := db.SetCredentialHealthIfPoolMemberEpoch(active, name, rotateEpoch, "cooldown", until, "manual rotate") + wrote, err := db.SetCredentialHealthIfPoolMemberEpoch(active, name, rotateEpoch, "cooldown", until, vault.ManualRotateReason) if err != nil { return err } diff --git a/internal/proxy/addon.go b/internal/proxy/addon.go index 690cc46..b55154a 100644 --- a/internal/proxy/addon.go +++ b/internal/proxy/addon.go @@ -14,6 +14,7 @@ import ( "strings" "sync" "sync/atomic" + "time" mitmproxy "github.com/lqqyt2423/go-mitmproxy/proxy" "github.com/nemirovsky/sluice/internal/audit" @@ -120,6 +121,17 @@ type SluiceAddon struct { // on the hot StreamResponseModifier path. dlpStreamWarned sync.Map + // poolNoticeMu/poolNoticeAt deduplicate pool failover / exhaustion + // operator notices (audit row + Telegram) within poolNoticeDedupWindow. + // A failing pooled destination produces a burst of identical signals + // (agent retries + pipelined requests racing the synchronous + // MarkCooldown); without this an exhausted pool spammed one notice per + // retry. Mutex-guarded (not sync.Map) so a concurrent burst cannot have + // two goroutines both miss the dedup and both emit. See + // shouldEmitPoolNotice in pool_failover.go. + poolNoticeMu sync.Mutex + poolNoticeAt map[string]time.Time + // refreshGroup deduplicates concurrent OAuth token refresh responses // for the same credential. Keyed by credential name so only one // vault update occurs when multiple requests trigger simultaneous diff --git a/internal/proxy/pool_failover.go b/internal/proxy/pool_failover.go index 52e85c7..e9931f4 100644 --- a/internal/proxy/pool_failover.go +++ b/internal/proxy/pool_failover.go @@ -140,6 +140,13 @@ type FailoverEvent struct { Reason string // short tag: 429 | 403 | 401 | invalid_grant | invalid_token Class failoverClass Until time.Time // member cooldown expiry just applied + // Exhausted is true when there was NO distinct member to fail over to + // (every member is cooling and the soonest-recovering one is the member + // that just failed). The cooldown is still applied to From for + // durability, but this is a pool-exhaustion signal, not a real + // transition: the operator notice and audit action say so, and it is + // deduplicated so an agent's retry storm produces one line, not N. + Exhausted bool // Epoch is the From member's membership epoch in the resolver // generation that produced this failover. The durable guarded write // commits only if (From, Pool, Epoch) is still a live membership row, @@ -447,16 +454,49 @@ func (a *SluiceAddon) handlePoolFailover(f *mitmproxy.Flow) { to = next } - log.Printf("[POOL-FAILOVER] pool %q: %s -> %s (%s); member %q cooling down until %s", - pool, from, to, tag, from, until.Format(time.RFC3339)) + // to == from means ResolveActive degraded back to the member that just + // failed: every member is cooling and the soonest-recovering one IS + // `from`. There is NO distinct member to fail over to. Emitting a + // " -> " cred_failover here (and one Telegram notice per + // request) was both meaningless and a notification storm — the agent + // retries N times, each retry re-fails on the still-exhausted member + // and re-entered this path, producing N identical "failed over A -> A" + // notices. Classify it honestly as pool exhaustion instead. + exhausted := to == from - // Audit: emit a cred_failover action with the documented Reason shape - // ":->:". Safe to call with a nil auditLog. The - // blake3 hash chain is appended synchronously by FileLogger.Log; the + // Deduplicate identical signals within a short window. Concurrent + // in-flight requests (pipelined agents) and retries that race the + // synchronous MarkCooldown above would otherwise each emit one audit + // row + one operator notice. One per (pool,from,to,tag) per window is + // all the operator needs; the cooldown itself was already applied + // unconditionally above, so suppressing the notice loses nothing. + if !a.shouldEmitPoolNotice(pool, from, to, tag) { + return + } + + if exhausted { + log.Printf("[POOL-FAILOVER] pool %q exhausted: all members cooling (%s); no failover target, serving least-bad %q", + pool, tag, from) + } else { + log.Printf("[POOL-FAILOVER] pool %q: %s -> %s (%s); member %q cooling down until %s", + pool, from, to, tag, from, until.Format(time.RFC3339)) + } + + // Audit: a real failover emits cred_failover with the documented Reason + // shape ":->:"; pool exhaustion emits the distinct + // pool_exhausted action so operators can alert on it separately and are + // not misled by a self-referential transition. Safe with a nil auditLog. + // The blake3 hash chain is appended synchronously by FileLogger.Log; the // write is local and fast (mirrors logDLPAudit on the same path), so it // does not warrant detaching like the store/Telegram side effects. if a.auditLog != nil { host, port := connectTargetForFlow(a, f) + action := "cred_failover" + reason := fmt.Sprintf("%s:%s->%s:%s", pool, from, to, tag) + if exhausted { + action = "pool_exhausted" + reason = fmt.Sprintf("%s:exhausted:%s", pool, tag) + } evt := audit.Event{ Destination: host, Port: port, @@ -465,8 +505,8 @@ func (a *SluiceAddon) handlePoolFailover(f *mitmproxy.Flow) { // scoped pooled binding the audit must record the real protocol. Protocol: proto, Verdict: "failover", - Action: "cred_failover", - Reason: fmt.Sprintf("%s:%s->%s:%s", pool, from, to, tag), + Action: action, + Reason: reason, Credential: from, } if err := a.auditLog.Log(evt); err != nil { @@ -477,15 +517,46 @@ func (a *SluiceAddon) handlePoolFailover(f *mitmproxy.Flow) { // (3) Durability + Telegram via the callback. The callback is // responsible for being non-blocking (it runs the store write and the // Telegram send in its own goroutine); we still guard with a nil check. + // The durable cooldown is persisted even when exhausted (the member did + // fail); only the operator-facing wording differs. if a.onFailover != nil { a.onFailover(FailoverEvent{ - Pool: pool, - From: from, - To: to, - Reason: tag, - Class: class, - Until: until, - Epoch: idEpoch, + Pool: pool, + From: from, + To: to, + Reason: tag, + Class: class, + Until: until, + Exhausted: exhausted, + Epoch: idEpoch, }) } } + +// poolNoticeDedupWindow bounds how often an identical pool failover / +// exhaustion signal (same pool, from, to, tag) produces an audit row + +// operator notice. The synchronous in-memory MarkCooldown already switched +// the active member before this fires, so a burst of agent retries within +// the window is genuinely the same event, not new information. +const poolNoticeDedupWindow = 30 * time.Second + +// shouldEmitPoolNotice returns true at most once per poolNoticeDedupWindow +// for a given (pool,from,to,tag). It is mutex-guarded (not a sync.Map +// LoadOrStore) so a concurrent burst cannot have two goroutines both miss +// and both emit. The map is keyed by a NUL-joined tuple; key cardinality is +// bounded by pool x member x member x tag, so it does not grow unbounded in +// practice. +func (a *SluiceAddon) shouldEmitPoolNotice(pool, from, to, tag string) bool { + key := pool + "\x00" + from + "\x00" + to + "\x00" + tag + now := time.Now() + a.poolNoticeMu.Lock() + defer a.poolNoticeMu.Unlock() + if a.poolNoticeAt == nil { + a.poolNoticeAt = make(map[string]time.Time) + } + if last, ok := a.poolNoticeAt[key]; ok && now.Sub(last) < poolNoticeDedupWindow { + return false + } + a.poolNoticeAt[key] = now + return true +} diff --git a/internal/proxy/pool_failover_test.go b/internal/proxy/pool_failover_test.go index deb5400..b039a61 100644 --- a/internal/proxy/pool_failover_test.go +++ b/internal/proxy/pool_failover_test.go @@ -919,3 +919,163 @@ func TestServerStorePoolStaleGenerationCooldownNotLost(t *testing.T) { t.Fatalf("ResolveActive = %q, want y (z cooled via stale-gen mark)", got) } } + +// auditActionCount reads a closed FileLogger's file and counts lines whose +// audit Action equals want. +func auditActionCount(t *testing.T, logPath, want string) int { + t.Helper() + data, err := os.ReadFile(logPath) + if err != nil { + t.Fatalf("read audit log: %v", err) + } + n := 0 + for _, line := range strings.Split(strings.TrimSpace(string(data)), "\n") { + if line == "" { + continue + } + var evt audit.Event + if uerr := json.Unmarshal([]byte(line), &evt); uerr != nil { + t.Fatalf("unmarshal audit line %q: %v", line, uerr) + } + if evt.Action == want { + n++ + } + } + return n +} + +// TestFailoverPoolExhaustedNoSelfFailoverSpam is the self-failover regression. +// When every member is cooling and the soonest-recovering one IS the member +// that just failed, ResolveActive degrades back to it: there is NO distinct +// failover target. The old code emitted a meaningless "memA -> memA" +// cred_failover AND one Telegram notice + audit row per agent retry (the +// production symptom was six identical "failed over X -> X (429)" notices). +// +// Fail-before: action == cred_failover, From==To, and a second Response emits +// a second row (no dedup). Pass-after: action == pool_exhausted, Exhausted +// is true, From==To==memA, and the dedup window collapses the retry storm to +// exactly one audit row + one onFailover. +func TestFailoverPoolExhaustedNoSelfFailoverSpam(t *testing.T) { + dir := t.TempDir() + logPath := filepath.Join(dir, "audit.log") + logger, err := audit.NewFileLogger(logPath) + if err != nil { + t.Fatalf("NewFileLogger: %v", err) + } + t.Cleanup(func() { _ = logger.Close() }) + + addon, _, prPtr := setupPoolAddon(t, "memA", "memB") + addon.auditLog = logger + client := setupAddonConn(addon, "auth.example.com:443") + + // memB is already genuinely failure-cooled for LONGER than a 429 TTL, so + // after memA's own 429 cooldown the soonest-recovering member is memA + // itself -> ResolveActive degrades to memA -> no distinct target. + prPtr.Load().MarkCooldown("memB", time.Now().Add(10*time.Minute), "401") + + var calls int32 + var last FailoverEvent + done := make(chan struct{}, 4) + addon.SetOnFailover(func(ev FailoverEvent) { + atomic.AddInt32(&calls, 1) + last = ev + done <- struct{}{} + }) + + // Two back-to-back identical 429s (the agent's retry storm). + for i := 0; i < 2; i++ { + f := newPoolRespFlow(client, 429, []byte(`{"error":"rate_limited"}`)) + addon.flowInjected.Tag(f.Id, "memA") + addon.Response(f) + } + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("onFailover callback not invoked") + } + // Dedup: the second identical signal within the window is suppressed. + if got := atomic.LoadInt32(&calls); got != 1 { + t.Fatalf("onFailover invoked %d times, want exactly 1 (dedup window must collapse the retry storm)", got) + } + if !last.Exhausted { + t.Fatalf("FailoverEvent.Exhausted = false, want true (no distinct failover target)") + } + if last.From != "memA" || last.To != "memA" { + t.Fatalf("FailoverEvent from=%q to=%q, want memA/memA (degraded to self)", last.From, last.To) + } + + if err := logger.Close(); err != nil { + t.Fatalf("logger close: %v", err) + } + if n := auditActionCount(t, logPath, "pool_exhausted"); n != 1 { + t.Fatalf("pool_exhausted audit rows = %d, want exactly 1 (no per-retry spam)", n) + } + if n := auditActionCount(t, logPath, "cred_failover"); n != 0 { + t.Fatalf("cred_failover audit rows = %d, want 0 (a self-failover is NOT a real failover)", n) + } +} + +// TestFailoverToManualRotateParkedPeer is the pool-stranding regression that +// broke the live agent: `sluice pool rotate` parks the previously-active +// member (reason ManualRotateReason). That member is healthy, just operator +// deprioritized. When the rotated-to member then 429s, EVERY member is +// cooling — the old soonest-by-time degrade picked the just-failed member +// (60s 429 TTL < 300s rotate park) and self-looped, hard-failing the agent. +// +// Fail-before: To == memA (self-loop), Exhausted true. Pass-after: the +// operator-parked-but-healthy memB is preferred -> a REAL failover memA -> +// memB, Exhausted false, cred_failover audit. +func TestFailoverToManualRotateParkedPeer(t *testing.T) { + dir := t.TempDir() + logPath := filepath.Join(dir, "audit.log") + logger, err := audit.NewFileLogger(logPath) + if err != nil { + t.Fatalf("NewFileLogger: %v", err) + } + t.Cleanup(func() { _ = logger.Close() }) + + addon, _, prPtr := setupPoolAddon(t, "memA", "memB") + addon.auditLog = logger + client := setupAddonConn(addon, "auth.example.com:443") + + // Operator rotated onto memA: memB is parked (healthy, just + // deprioritized) for the long manual-rotate TTL. memA is active. + prPtr.Load().MarkCooldown("memB", time.Now().Add(vault.AuthFailCooldown), vault.ManualRotateReason) + if got, _ := prPtr.Load().ResolveActive("codex_pool"); got != "memA" { + t.Fatalf("pre-failover active = %q, want memA (memB operator-parked)", got) + } + + var got FailoverEvent + done := make(chan struct{}, 1) + addon.SetOnFailover(func(ev FailoverEvent) { + got = ev + done <- struct{}{} + }) + + f := newPoolRespFlow(client, 429, []byte(`{"error":"rate_limited"}`)) + addon.flowInjected.Tag(f.Id, "memA") + addon.Response(f) + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("onFailover callback not invoked") + } + if got.Exhausted { + t.Fatalf("FailoverEvent.Exhausted = true, want false (memB is a valid parked-but-healthy target)") + } + if got.From != "memA" || got.To != "memB" { + t.Fatalf("FailoverEvent from=%q to=%q, want memA -> memB (failover to operator-parked-but-healthy peer)", got.From, got.To) + } + + if err := logger.Close(); err != nil { + t.Fatalf("logger close: %v", err) + } + if n := auditActionCount(t, logPath, "cred_failover"); n != 1 { + t.Fatalf("cred_failover audit rows = %d, want exactly 1 (real failover to memB)", n) + } + if n := auditActionCount(t, logPath, "pool_exhausted"); n != 0 { + t.Fatalf("pool_exhausted audit rows = %d, want 0 (a healthy parked peer exists)", n) + } +} diff --git a/internal/vault/pool.go b/internal/vault/pool.go index bdeefa8..2ecb084 100644 --- a/internal/vault/pool.go +++ b/internal/vault/pool.go @@ -18,6 +18,18 @@ const ( AuthFailCooldown = 300 * time.Second ) +// ManualRotateReason is the cooldown reason stamped by `sluice pool rotate` +// when it parks the previously-active member. A member parked for this +// reason is operationally deprioritized BY AN OPERATOR, not unhealthy: it +// must still be skipped for normal position-order active selection (so the +// rotated-to member wins), but it REMAINS a valid failover / degrade target. +// A manual park must never strand the pool with no servable member when the +// rotated-to member subsequently fails — otherwise a rotate onto an +// exhausted account self-loops instead of falling back to the parked-but- +// healthy peer. The literal is shared with cmd/sluice's rotate writer so the +// two stay in sync. +const ManualRotateReason = "manual rotate" + // memberHealth is the in-memory health view for one credential. Status is // derived: a credential with a zero cooldownUntil is healthy. type memberHealth struct { @@ -301,8 +313,8 @@ func (pr *PoolResolver) ResolveActive(name string) (member string, ok bool) { defer pr.health.mu.RUnlock() now := time.Now() - var soonest string - var soonestUntil time.Time + var soonest, soonestParked string + var soonestUntil, soonestParkedUntil time.Time for _, m := range members { h, tracked := pr.health.health[m] if !tracked || h.cooldownUntil.IsZero() || !h.cooldownUntil.After(now) { @@ -312,6 +324,24 @@ func (pr *PoolResolver) ResolveActive(name string) (member string, ok bool) { soonest = m soonestUntil = h.cooldownUntil } + // A "manual rotate" park is an operator deprioritization, not a + // health failure: the member is still servable. When EVERY member + // is cooling, such a member is a strictly better degrade target + // than a genuinely failed (rate-limited / auth-failed) one — this + // is what lets a `pool rotate` onto an exhausted member still fail + // over to the parked-but-healthy peer instead of self-looping on + // the exhausted one. + if h.reason == ManualRotateReason { + if soonestParked == "" || h.cooldownUntil.Before(soonestParkedUntil) { + soonestParked = m + soonestParkedUntil = h.cooldownUntil + } + } + } + if soonestParked != "" { + log.Printf("[POOL] all %d members of pool %q are cooling; degrading to operator-parked-but-healthy %q", + len(members), name, soonestParked) + return soonestParked, true } log.Printf("[POOL] all %d members of pool %q are in cooldown; degrading to %q (recovers %s)", len(members), name, soonest, soonestUntil.Format(time.RFC3339)) diff --git a/internal/vault/pool_test.go b/internal/vault/pool_test.go index e6ed6e4..fd61a2f 100644 --- a/internal/vault/pool_test.go +++ b/internal/vault/pool_test.go @@ -157,6 +157,47 @@ func TestResolveActiveAllDownDegradesToSoonest(t *testing.T) { } } +// TestResolveActiveAllDownPrefersManualRotateOverFailure is the +// pool-stranding regression. `sluice pool rotate` parks the previously +// active member with reason ManualRotateReason — that member is healthy, +// just operator-deprioritized. If the rotated-to member then fails (rate +// limit / auth), EVERY member is cooling. The old degrade picked the member +// with the soonest cooldownUntil, which is the genuinely-FAILED one (a 429 +// cooldown is 60s; a manual rotate park is 300s), so a rotate onto an +// exhausted account self-looped on the exhausted account and the agent hard +// errored. Fail-before: soonest-by-time -> "a" (the failed member). +// Pass-after: a manual-rotate-parked-but-healthy member is preferred -> "b". +func TestResolveActiveAllDownPrefersManualRotateOverFailure(t *testing.T) { + now := time.Now() + health := []store.CredentialHealth{ + // Genuinely failed, recovers SOON (would win the old soonest rule). + {Credential: "a", Status: "cooldown", CooldownUntil: now.Add(30 * time.Second), LastFailureReason: "429"}, + // Operator-parked by `pool rotate`, recovers LATER, but healthy. + {Credential: "b", Status: "cooldown", CooldownUntil: now.Add(300 * time.Second), LastFailureReason: ManualRotateReason}, + } + pr := NewPoolResolver([]store.Pool{mkPool("pool", "a", "b")}, health) + got, ok := pr.ResolveActive("pool") + if !ok || got != "b" { + t.Errorf("ResolveActive (all down) = %q,%v; want b,true (operator-parked-but-healthy preferred over genuinely-failed soonest)", got, ok) + } +} + +// TestResolveActiveAllDownNoManualRotateStillSoonest guards that the +// preference is ONLY for manual-rotate parks: when every member is cooling +// for a genuine failure, behavior is unchanged (soonest recovery wins). +func TestResolveActiveAllDownNoManualRotateStillSoonest(t *testing.T) { + now := time.Now() + health := []store.CredentialHealth{ + {Credential: "a", Status: "cooldown", CooldownUntil: now.Add(300 * time.Second), LastFailureReason: "401"}, + {Credential: "b", Status: "cooldown", CooldownUntil: now.Add(30 * time.Second), LastFailureReason: "429"}, + } + pr := NewPoolResolver([]store.Pool{mkPool("pool", "a", "b")}, health) + got, ok := pr.ResolveActive("pool") + if !ok || got != "b" { + t.Errorf("ResolveActive (all down, no manual rotate) = %q,%v; want b,true (soonest unchanged)", got, ok) + } +} + func TestResolveActiveEmptyPool(t *testing.T) { pr := NewPoolResolver([]store.Pool{mkPool("empty")}, nil) if _, ok := pr.ResolveActive("empty"); ok {