diff --git a/sei-tendermint/internal/blocksync/pool.go b/sei-tendermint/internal/blocksync/pool.go index 8b8b2dc4de..467f5c9567 100644 --- a/sei-tendermint/internal/blocksync/pool.go +++ b/sei-tendermint/internal/blocksync/pool.go @@ -135,7 +135,10 @@ func NewBlockPool( func (pool *BlockPool) OnStart(ctx context.Context) error { pool.lastAdvance = time.Now() pool.lastHundredBlockTimeStamp = pool.lastAdvance - go pool.makeRequestersRoutine(ctx) + pool.Spawn("makeRequestersRoutine", func(ctx context.Context) error { + pool.makeRequestersRoutine(ctx) + return nil + }) return nil } diff --git a/sei-tendermint/internal/blocksync/reactor.go b/sei-tendermint/internal/blocksync/reactor.go index 7847ac5c1c..742ce31592 100644 --- a/sei-tendermint/internal/blocksync/reactor.go +++ b/sei-tendermint/internal/blocksync/reactor.go @@ -15,6 +15,7 @@ import ( sm "github.com/sei-protocol/sei-chain/sei-tendermint/internal/state" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/store" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/service" + "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" pb "github.com/sei-protocol/sei-chain/sei-tendermint/proto/tendermint/blocksync" "github.com/sei-protocol/sei-chain/sei-tendermint/types" ) @@ -70,7 +71,7 @@ func GetChannelDescriptor() p2p.ChannelDescriptor[*pb.Message] { type consensusReactor interface { // For when we switch from block sync reactor to the consensus // machine. - SwitchToConsensus(ctx context.Context, state sm.State, skipWAL bool) + SwitchToConsensus(state sm.State, skipWAL bool) } type peerError struct { @@ -82,6 +83,8 @@ func (e peerError) Error() string { return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error()) } +type blocksyncResult struct{ stateSynced bool } + // Reactor handles long-term catchup syncing. type Reactor struct { service.BaseService @@ -98,6 +101,16 @@ type Reactor struct { blockSync *atomicBool previousMaxPeerHeight int64 + // blocksyncReady fires when blocksync should start processing blocks — + // either at OnStart (if blockSync was initially set) or via + // SwitchToBlockSync. Pre-spawned requestRoutine and poolRoutine wait on + // it before doing any work. + blocksyncReady utils.AtomicSend[utils.Option[blocksyncResult]] + // consensusReady fires once the blocksync->consensus handoff has + // happened. The pre-spawned autoRestartIfBehind monitor gates on this + // signal. + consensusReady utils.AtomicSend[bool] + router *p2p.Router channel *p2p.Channel[*pb.Message] @@ -148,8 +161,9 @@ func NewReactor( blocksBehindThreshold: selfRemediationConfig.BlocksBehindThreshold, blocksBehindCheckInterval: time.Duration(selfRemediationConfig.BlocksBehindCheckIntervalSeconds) * time.Second, //nolint:gosec // validated in config.ValidateBasic against MaxInt64 restartCooldownSeconds: selfRemediationConfig.RestartCooldownSeconds, + blocksyncReady: utils.NewAtomicSend(utils.None[blocksyncResult]()), + consensusReady: utils.NewAtomicSend(false), } - r.BaseService = *service.NewBaseService("BlockSync", r) return r, nil } @@ -184,23 +198,61 @@ func (r *Reactor) OnStart(ctx context.Context) error { r.requestsCh = requestsCh r.errorsCh = errorsCh + // Pre-spawn all long-running routines so their lifetime is bound to the + // BaseService WaitGroup. Conditional routines gate on AtomicSend[bool] + // signals so SwitchToBlockSync (and the in-poolRoutine consensus handoff + // for autoRestartIfBehind) can wake them later without spawning fresh + // goroutines from outside OnStart. + r.Spawn("requestRoutine", func(ctx context.Context) error { + _, err := r.blocksyncReady.Wait(ctx, func(o utils.Option[blocksyncResult]) bool { + return o.IsPresent() + }) + if err != nil { + return err + } + r.requestRoutine(ctx) + return nil + }) + r.Spawn("poolRoutine", func(ctx context.Context) error { + result, err := r.blocksyncReady.Wait(ctx, func(o utils.Option[blocksyncResult]) bool { + return o.IsPresent() + }) + if err != nil { + return err + } + res, _ := result.Get() + r.poolRoutine(ctx, res.stateSynced) + return nil + }) + r.SpawnCritical("processBlockSyncCh", func(ctx context.Context) error { + r.processBlockSyncCh(ctx) + return nil + }) + r.SpawnCritical("processPeerUpdates", func(ctx context.Context) error { + r.processPeerUpdates(ctx) + return nil + }) + r.SpawnCritical("autoRestartIfBehind", func(ctx context.Context) error { + if _, err := r.consensusReady.Wait(ctx, func(ready bool) bool { return ready }); err != nil { + logger.Error("Failed to wait for consensus ready to spawn autoRestartIfBehind", "err", err) + return nil + } + r.autoRestartIfBehind(ctx) + return nil + }) + if r.blockSync.IsSet() { if err := r.pool.Start(ctx); err != nil { return err } - go r.requestRoutine(ctx) - - go r.poolRoutine(ctx, false) + r.blocksyncReady.Store(utils.Some(blocksyncResult{true})) } - - go r.processBlockSyncCh(ctx) - go r.processPeerUpdates(ctx) - return nil } -// OnStop stops the reactor by signaling to all spawned goroutines to exit and -// blocking until they all exit. +// OnStop stops the BlockPool. The reactor's own long-running goroutines were +// registered with the BaseService WaitGroup via Spawn in OnStart, so the +// BaseService blocks Stop() on their exit before this method returns. func (r *Reactor) OnStop() { if r.blockSync.IsSet() { r.pool.Stop() @@ -378,9 +430,7 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error { } r.syncStartTime = time.Now() - - go r.requestRoutine(ctx) - go r.poolRoutine(ctx, true) + r.blocksyncReady.Store(utils.Some(blocksyncResult{true})) if err := r.PublishStatus(types.EventDataBlockSyncStatus{ Complete: false, @@ -473,10 +523,11 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) { if r.consReactor != nil { logger.Info("switching to consensus reactor", "height", height, "blocks_synced", blocksSynced, "state_synced", stateSynced, "max_peer_height", r.pool.MaxPeerHeight()) - r.consReactor.SwitchToConsensus(ctx, state, blocksSynced > 0 || stateSynced) - - // Auto restart should only be checked after switching to consensus mode - go r.autoRestartIfBehind(ctx) + // Use the node-scoped context: SwitchToConsensus is a handoff + // to a peer reactor whose lifecycle is not tied to blocksync. + r.consReactor.SwitchToConsensus(state, blocksSynced > 0 || stateSynced) + // Wake the pre-spawned auto-restart monitor. + r.consensusReady.Store(true) } return diff --git a/sei-tendermint/internal/blocksync/reactor_test.go b/sei-tendermint/internal/blocksync/reactor_test.go index 56936a0f5d..2fd2aee03e 100644 --- a/sei-tendermint/internal/blocksync/reactor_test.go +++ b/sei-tendermint/internal/blocksync/reactor_test.go @@ -2,6 +2,8 @@ package blocksync import ( "context" + "runtime" + "strings" "testing" "time" @@ -242,6 +244,85 @@ func TestReactor_AbruptDisconnect(t *testing.T) { rts.network.Remove(t, rts.nodes[0]) } +// TestReactor_OnStopWaitsForGoroutines is a regression test for the +// "panic: leveldb/table: reader released" shutdown panic seen on v6.4.4 +// sentry nodes. Before the fix, blocksync's long-running goroutines +// (Reactor.requestRoutine, Reactor.poolRoutine, Reactor.processBlockSyncCh, +// Reactor.processPeerUpdates, Reactor.autoRestartIfBehind, and +// BlockPool.makeRequestersRoutine) were started with raw `go fn(ctx)` using +// the outer ctx, instead of `Spawn(...)` which would register them with the +// BaseService WaitGroup and bind them to BaseService.inner.ctx. As a result, +// Reactor.Stop() / BlockPool.Stop() — which cancels only the inner ctx — +// did not signal these goroutines to exit, let alone wait for them. The +// node's OnStop then proceeded to n.blockStore.Close() while poolRoutine +// was still mid-SaveBlock -> Base() -> bs.db.Iterator, causing goleveldb to +// panic when the table reader was released underneath the live iterator. +// +// This test asserts the fix: after `reactor.Stop()` returns, the +// blocksync-package goroutines have exited. The outer ctx is still live at +// this point in the test, so the unfixed code keeps them running and the +// assertion fails deterministically. On failure the live goroutine stacks +// are dumped to make the leak obvious. +func TestReactor_OnStopWaitsForGoroutines(t *testing.T) { + ctx := t.Context() + + cfg, err := config.ResetTestRoot(t.TempDir(), "block_sync_reactor_stop_test") + require.NoError(t, err) + + valSet, privVals := factory.ValidatorSet(ctx, 1, 30) + genDoc := factory.GenesisDoc(cfg, time.Now(), valSet.Validators, factory.ConsensusParams()) + + rts := setup(ctx, t, genDoc, privVals[0], []int64{0}) + + reactor := rts.reactors[rts.nodes[0]] + require.True(t, reactor.IsRunning()) + + dumpBlocksyncGoroutines := func() (string, int) { + buf := make([]byte, 1<<20) + n := runtime.Stack(buf, true) + var out strings.Builder + count := 0 + for _, g := range strings.Split(string(buf[:n]), "\n\n") { + if !strings.Contains(g, "/internal/blocksync.") { + continue + } + // The test functions themselves live in the blocksync package, so + // runtime.Stack reports them as matches. Only count background + // routines spawned by Reactor.OnStart and BlockPool.OnStart, + // which are created by libs/service.Spawn, not testing.tRunner. + if strings.Contains(g, "testing.tRunner") { + continue + } + out.WriteString(g) + out.WriteString("\n\n") + count++ + } + return out.String(), count + } + + // OnStart Spawns 5 reactor routines and BlockPool.OnStart Spawns 1. + require.Eventually(t, func() bool { + _, c := dumpBlocksyncGoroutines() + return c >= 6 + }, 5*time.Second, 10*time.Millisecond, "blocksync goroutines did not start") + + reactor.Stop() + require.False(t, reactor.IsRunning()) + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if _, c := dumpBlocksyncGoroutines(); c == 0 { + return + } + time.Sleep(time.Millisecond) + } + dump, c := dumpBlocksyncGoroutines() + t.Fatalf("%d blocksync goroutine(s) still alive after Reactor.Stop() returned. "+ + "This means at least one routine was not registered with the "+ + "BaseService WaitGroup via Spawn(), so Stop did not wait for it. "+ + "Live stacks:\n\n%s", c, dump) +} + func TestReactor_SyncTime(t *testing.T) { ctx := t.Context() diff --git a/sei-tendermint/internal/consensus/byzantine_test.go b/sei-tendermint/internal/consensus/byzantine_test.go index ce74659a8c..3084212535 100644 --- a/sei-tendermint/internal/consensus/byzantine_test.go +++ b/sei-tendermint/internal/consensus/byzantine_test.go @@ -239,7 +239,7 @@ package consensus // // for _, reactor := range rts.reactors { // reactor.StopWaitSync() -// reactor.SwitchToConsensus(ctx, reactor.state.GetState(), false) +// reactor.SwitchToConsensus(reactor.state.GetState(), false) // } // // // Evidence should be submitted and committed at the third height but diff --git a/sei-tendermint/internal/consensus/invalid_test.go b/sei-tendermint/internal/consensus/invalid_test.go index bd5106e72b..3a8d285133 100644 --- a/sei-tendermint/internal/consensus/invalid_test.go +++ b/sei-tendermint/internal/consensus/invalid_test.go @@ -46,7 +46,7 @@ func TestGossipVotesForHeightPoisonedProposalPOL(t *testing.T) { reactor := rts.reactors[nodeIDs[0]] peerID := nodeIDs[1] state := reactor.state.GetState() - reactor.SwitchToConsensus(ctx, state, false) + reactor.SwitchToConsensus(state, false) require.Eventually(t, func() bool { _, ok := reactor.GetPeerState(peerID) @@ -176,7 +176,7 @@ func TestReactorInvalidPrecommit(t *testing.T) { for _, reactor := range rts.reactors { state := reactor.state.GetState() - reactor.SwitchToConsensus(ctx, state, false) + reactor.SwitchToConsensus(state, false) } // this val sends a random precommit at each height diff --git a/sei-tendermint/internal/consensus/reactor.go b/sei-tendermint/internal/consensus/reactor.go index 7f499c75c1..095203cfa8 100644 --- a/sei-tendermint/internal/consensus/reactor.go +++ b/sei-tendermint/internal/consensus/reactor.go @@ -204,7 +204,7 @@ func (r *Reactor) WaitSync() bool { // SwitchToConsensus switches from block-sync mode to consensus mode. It resets // the state, turns off block-sync, and starts the consensus state-machine. -func (r *Reactor) SwitchToConsensus(ctx context.Context, state sm.State, skipWAL bool) { +func (r *Reactor) SwitchToConsensus(state sm.State, skipWAL bool) { logger.Info("switching to consensus") d := types.EventDataBlockSyncStatus{Complete: true, Height: state.LastBlockHeight} diff --git a/sei-tendermint/internal/consensus/reactor_test.go b/sei-tendermint/internal/consensus/reactor_test.go index b42e8d7f64..1d56c57132 100644 --- a/sei-tendermint/internal/consensus/reactor_test.go +++ b/sei-tendermint/internal/consensus/reactor_test.go @@ -196,7 +196,7 @@ func TestReactorBasic(t *testing.T) { for _, reactor := range rts.reactors { state := reactor.state.GetState() - reactor.SwitchToConsensus(ctx, state, false) + reactor.SwitchToConsensus(state, false) } t.Logf("wait till everyone makes the first new block") @@ -312,7 +312,7 @@ func TestReactorWithEvidence(t *testing.T) { for _, reactor := range rts.reactors { state := reactor.state.GetState() - reactor.SwitchToConsensus(ctx, state, false) + reactor.SwitchToConsensus(state, false) } var wg sync.WaitGroup @@ -360,7 +360,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) { for _, reactor := range rts.reactors { state := reactor.state.GetState() - reactor.SwitchToConsensus(ctx, state, false) + reactor.SwitchToConsensus(state, false) } // send a tx @@ -404,7 +404,7 @@ func TestReactorRecordsVotesAndBlockParts(t *testing.T) { for _, reactor := range rts.reactors { state := reactor.state.GetState() - reactor.SwitchToConsensus(ctx, state, false) + reactor.SwitchToConsensus(state, false) } var wg sync.WaitGroup @@ -475,7 +475,7 @@ func TestReactorValidatorSetChanges(t *testing.T) { rts := setup(ctx, t, nPeers, unwrapTestStates(states), 1024) // buffer must be large enough to not deadlock for _, reactor := range rts.reactors { state := reactor.state.GetState() - reactor.SwitchToConsensus(ctx, state, false) + reactor.SwitchToConsensus(state, false) } blocksSubs := []eventbus.Subscription{}