Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion sei-tendermint/internal/blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ func NewBlockPool(
func (pool *BlockPool) OnStart(ctx context.Context) error {
pool.lastAdvance = time.Now()
pool.lastHundredBlockTimeStamp = pool.lastAdvance
go pool.makeRequestersRoutine(ctx)
pool.Spawn("makeRequestersRoutine", func(ctx context.Context) error {
pool.makeRequestersRoutine(ctx)
return nil
})

return nil
}
Expand Down
87 changes: 69 additions & 18 deletions sei-tendermint/internal/blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
sm "github.com/sei-protocol/sei-chain/sei-tendermint/internal/state"
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/store"
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/service"
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils"
pb "github.com/sei-protocol/sei-chain/sei-tendermint/proto/tendermint/blocksync"
"github.com/sei-protocol/sei-chain/sei-tendermint/types"
)
Expand Down Expand Up @@ -70,7 +71,7 @@ func GetChannelDescriptor() p2p.ChannelDescriptor[*pb.Message] {
type consensusReactor interface {
// For when we switch from block sync reactor to the consensus
// machine.
SwitchToConsensus(ctx context.Context, state sm.State, skipWAL bool)
SwitchToConsensus(state sm.State, skipWAL bool)
}

type peerError struct {
Expand All @@ -82,6 +83,8 @@ func (e peerError) Error() string {
return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error())
}

type blocksyncResult struct{ stateSynced bool }

// Reactor handles long-term catchup syncing.
type Reactor struct {
service.BaseService
Expand All @@ -98,6 +101,16 @@ type Reactor struct {
blockSync *atomicBool
previousMaxPeerHeight int64

// blocksyncReady fires when blocksync should start processing blocks —
// either at OnStart (if blockSync was initially set) or via
// SwitchToBlockSync. Pre-spawned requestRoutine and poolRoutine wait on
// it before doing any work.
blocksyncReady utils.AtomicSend[utils.Option[blocksyncResult]]
// consensusReady fires once the blocksync->consensus handoff has
// happened. The pre-spawned autoRestartIfBehind monitor gates on this
// signal.
consensusReady utils.AtomicSend[bool]

router *p2p.Router
channel *p2p.Channel[*pb.Message]

Expand Down Expand Up @@ -148,8 +161,9 @@ func NewReactor(
blocksBehindThreshold: selfRemediationConfig.BlocksBehindThreshold,
blocksBehindCheckInterval: time.Duration(selfRemediationConfig.BlocksBehindCheckIntervalSeconds) * time.Second, //nolint:gosec // validated in config.ValidateBasic against MaxInt64
restartCooldownSeconds: selfRemediationConfig.RestartCooldownSeconds,
blocksyncReady: utils.NewAtomicSend(utils.None[blocksyncResult]()),
consensusReady: utils.NewAtomicSend(false),
}

r.BaseService = *service.NewBaseService("BlockSync", r)
return r, nil
}
Expand Down Expand Up @@ -184,23 +198,61 @@ func (r *Reactor) OnStart(ctx context.Context) error {
r.requestsCh = requestsCh
r.errorsCh = errorsCh

// Pre-spawn all long-running routines so their lifetime is bound to the
// BaseService WaitGroup. Conditional routines gate on AtomicSend[bool]
// signals so SwitchToBlockSync (and the in-poolRoutine consensus handoff
// for autoRestartIfBehind) can wake them later without spawning fresh
// goroutines from outside OnStart.
r.Spawn("requestRoutine", func(ctx context.Context) error {
_, err := r.blocksyncReady.Wait(ctx, func(o utils.Option[blocksyncResult]) bool {
return o.IsPresent()
})
if err != nil {
return err
}
r.requestRoutine(ctx)
return nil
})
r.Spawn("poolRoutine", func(ctx context.Context) error {
result, err := r.blocksyncReady.Wait(ctx, func(o utils.Option[blocksyncResult]) bool {
return o.IsPresent()
})
if err != nil {
return err
}
res, _ := result.Get()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrPanic()

r.poolRoutine(ctx, res.stateSynced)
return nil
})
r.SpawnCritical("processBlockSyncCh", func(ctx context.Context) error {
r.processBlockSyncCh(ctx)
return nil
})
r.SpawnCritical("processPeerUpdates", func(ctx context.Context) error {
r.processPeerUpdates(ctx)
return nil
})
r.SpawnCritical("autoRestartIfBehind", func(ctx context.Context) error {
if _, err := r.consensusReady.Wait(ctx, func(ready bool) bool { return ready }); err != nil {
logger.Error("Failed to wait for consensus ready to spawn autoRestartIfBehind", "err", err)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, can just return err

return nil
}
r.autoRestartIfBehind(ctx)
return nil
})

if r.blockSync.IsSet() {
if err := r.pool.Start(ctx); err != nil {
return err
}
go r.requestRoutine(ctx)

go r.poolRoutine(ctx, false)
r.blocksyncReady.Store(utils.Some(blocksyncResult{true}))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong stateSynced value in OnStart blocksync path

High Severity

In OnStart, blocksyncReady is stored with blocksyncResult{true} (i.e., stateSynced = true), but the original code passed false to poolRoutine on this path (go r.poolRoutine(ctx, false)). Only the SwitchToBlockSync path (called after state sync) correctly uses true. This causes poolRoutine to always pass stateSynced=true to SwitchToConsensus, which incorrectly sets skipWAL=true and disables WAL catchup even when the node started normally without state sync and synced zero blocks.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 8925f4f. Configure here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@masih ptal

}

go r.processBlockSyncCh(ctx)
go r.processPeerUpdates(ctx)

return nil
}

// OnStop stops the reactor by signaling to all spawned goroutines to exit and
// blocking until they all exit.
// OnStop stops the BlockPool. The reactor's own long-running goroutines were
// registered with the BaseService WaitGroup via Spawn in OnStart, so the
// BaseService blocks Stop() on their exit before this method returns.
func (r *Reactor) OnStop() {
if r.blockSync.IsSet() {
r.pool.Stop()
Expand Down Expand Up @@ -378,9 +430,7 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error {
}

r.syncStartTime = time.Now()

go r.requestRoutine(ctx)
go r.poolRoutine(ctx, true)
r.blocksyncReady.Store(utils.Some(blocksyncResult{true}))

if err := r.PublishStatus(types.EventDataBlockSyncStatus{
Complete: false,
Expand Down Expand Up @@ -473,10 +523,11 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) {

if r.consReactor != nil {
logger.Info("switching to consensus reactor", "height", height, "blocks_synced", blocksSynced, "state_synced", stateSynced, "max_peer_height", r.pool.MaxPeerHeight())
r.consReactor.SwitchToConsensus(ctx, state, blocksSynced > 0 || stateSynced)

// Auto restart should only be checked after switching to consensus mode
go r.autoRestartIfBehind(ctx)
Comment thread
pompon0 marked this conversation as resolved.
// Use the node-scoped context: SwitchToConsensus is a handoff
// to a peer reactor whose lifecycle is not tied to blocksync.
r.consReactor.SwitchToConsensus(state, blocksSynced > 0 || stateSynced)
// Wake the pre-spawned auto-restart monitor.
r.consensusReady.Store(true)
}

return
Expand Down
81 changes: 81 additions & 0 deletions sei-tendermint/internal/blocksync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package blocksync

import (
"context"
"runtime"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -242,6 +244,85 @@ func TestReactor_AbruptDisconnect(t *testing.T) {
rts.network.Remove(t, rts.nodes[0])
}

// TestReactor_OnStopWaitsForGoroutines is a regression test for the
// "panic: leveldb/table: reader released" shutdown panic seen on v6.4.4
// sentry nodes. Before the fix, blocksync's long-running goroutines
// (Reactor.requestRoutine, Reactor.poolRoutine, Reactor.processBlockSyncCh,
// Reactor.processPeerUpdates, Reactor.autoRestartIfBehind, and
// BlockPool.makeRequestersRoutine) were started with raw `go fn(ctx)` using
// the outer ctx, instead of `Spawn(...)` which would register them with the
// BaseService WaitGroup and bind them to BaseService.inner.ctx. As a result,
// Reactor.Stop() / BlockPool.Stop() — which cancels only the inner ctx —
// did not signal these goroutines to exit, let alone wait for them. The
// node's OnStop then proceeded to n.blockStore.Close() while poolRoutine
// was still mid-SaveBlock -> Base() -> bs.db.Iterator, causing goleveldb to
// panic when the table reader was released underneath the live iterator.
//
// This test asserts the fix: after `reactor.Stop()` returns, the
// blocksync-package goroutines have exited. The outer ctx is still live at
// this point in the test, so the unfixed code keeps them running and the
// assertion fails deterministically. On failure the live goroutine stacks
// are dumped to make the leak obvious.
func TestReactor_OnStopWaitsForGoroutines(t *testing.T) {
ctx := t.Context()

cfg, err := config.ResetTestRoot(t.TempDir(), "block_sync_reactor_stop_test")
require.NoError(t, err)

valSet, privVals := factory.ValidatorSet(ctx, 1, 30)
genDoc := factory.GenesisDoc(cfg, time.Now(), valSet.Validators, factory.ConsensusParams())

rts := setup(ctx, t, genDoc, privVals[0], []int64{0})

reactor := rts.reactors[rts.nodes[0]]
require.True(t, reactor.IsRunning())

dumpBlocksyncGoroutines := func() (string, int) {
buf := make([]byte, 1<<20)
n := runtime.Stack(buf, true)
var out strings.Builder
count := 0
for _, g := range strings.Split(string(buf[:n]), "\n\n") {
if !strings.Contains(g, "/internal/blocksync.") {
continue
}
// The test functions themselves live in the blocksync package, so
// runtime.Stack reports them as matches. Only count background
// routines spawned by Reactor.OnStart and BlockPool.OnStart,
// which are created by libs/service.Spawn, not testing.tRunner.
if strings.Contains(g, "testing.tRunner") {
continue
}
out.WriteString(g)
out.WriteString("\n\n")
count++
}
return out.String(), count
}

// OnStart Spawns 5 reactor routines and BlockPool.OnStart Spawns 1.
require.Eventually(t, func() bool {
_, c := dumpBlocksyncGoroutines()
return c >= 6
}, 5*time.Second, 10*time.Millisecond, "blocksync goroutines did not start")

reactor.Stop()
require.False(t, reactor.IsRunning())

deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if _, c := dumpBlocksyncGoroutines(); c == 0 {
return
}
time.Sleep(time.Millisecond)
}
dump, c := dumpBlocksyncGoroutines()
t.Fatalf("%d blocksync goroutine(s) still alive after Reactor.Stop() returned. "+
"This means at least one routine was not registered with the "+
"BaseService WaitGroup via Spawn(), so Stop did not wait for it. "+
"Live stacks:\n\n%s", c, dump)
}

func TestReactor_SyncTime(t *testing.T) {
ctx := t.Context()

Expand Down
2 changes: 1 addition & 1 deletion sei-tendermint/internal/consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ package consensus
//
// for _, reactor := range rts.reactors {
// reactor.StopWaitSync()
// reactor.SwitchToConsensus(ctx, reactor.state.GetState(), false)
// reactor.SwitchToConsensus(reactor.state.GetState(), false)
// }
//
// // Evidence should be submitted and committed at the third height but
Expand Down
4 changes: 2 additions & 2 deletions sei-tendermint/internal/consensus/invalid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestGossipVotesForHeightPoisonedProposalPOL(t *testing.T) {
reactor := rts.reactors[nodeIDs[0]]
peerID := nodeIDs[1]
state := reactor.state.GetState()
reactor.SwitchToConsensus(ctx, state, false)
reactor.SwitchToConsensus(state, false)

require.Eventually(t, func() bool {
_, ok := reactor.GetPeerState(peerID)
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestReactorInvalidPrecommit(t *testing.T) {

for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(ctx, state, false)
reactor.SwitchToConsensus(state, false)
}

// this val sends a random precommit at each height
Expand Down
2 changes: 1 addition & 1 deletion sei-tendermint/internal/consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (r *Reactor) WaitSync() bool {

// SwitchToConsensus switches from block-sync mode to consensus mode. It resets
// the state, turns off block-sync, and starts the consensus state-machine.
func (r *Reactor) SwitchToConsensus(ctx context.Context, state sm.State, skipWAL bool) {
func (r *Reactor) SwitchToConsensus(state sm.State, skipWAL bool) {
logger.Info("switching to consensus")

d := types.EventDataBlockSyncStatus{Complete: true, Height: state.LastBlockHeight}
Expand Down
10 changes: 5 additions & 5 deletions sei-tendermint/internal/consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func TestReactorBasic(t *testing.T) {

for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(ctx, state, false)
reactor.SwitchToConsensus(state, false)
}

t.Logf("wait till everyone makes the first new block")
Expand Down Expand Up @@ -312,7 +312,7 @@ func TestReactorWithEvidence(t *testing.T) {

for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(ctx, state, false)
reactor.SwitchToConsensus(state, false)
}

var wg sync.WaitGroup
Expand Down Expand Up @@ -360,7 +360,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {

for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(ctx, state, false)
reactor.SwitchToConsensus(state, false)
}

// send a tx
Expand Down Expand Up @@ -404,7 +404,7 @@ func TestReactorRecordsVotesAndBlockParts(t *testing.T) {

for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(ctx, state, false)
reactor.SwitchToConsensus(state, false)
}

var wg sync.WaitGroup
Expand Down Expand Up @@ -475,7 +475,7 @@ func TestReactorValidatorSetChanges(t *testing.T) {
rts := setup(ctx, t, nPeers, unwrapTestStates(states), 1024) // buffer must be large enough to not deadlock
for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(ctx, state, false)
reactor.SwitchToConsensus(state, false)
}

blocksSubs := []eventbus.Subscription{}
Expand Down
Loading