-
Notifications
You must be signed in to change notification settings - Fork 878
Wait for blocksync goroutines on Stop to fix leveldb shutdown panic #3415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a548430
9c4d814
667d611
a8cd864
2f25615
8925f4f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ import ( | |
| sm "github.com/sei-protocol/sei-chain/sei-tendermint/internal/state" | ||
| "github.com/sei-protocol/sei-chain/sei-tendermint/internal/store" | ||
| "github.com/sei-protocol/sei-chain/sei-tendermint/libs/service" | ||
| "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" | ||
| pb "github.com/sei-protocol/sei-chain/sei-tendermint/proto/tendermint/blocksync" | ||
| "github.com/sei-protocol/sei-chain/sei-tendermint/types" | ||
| ) | ||
|
|
@@ -70,7 +71,7 @@ func GetChannelDescriptor() p2p.ChannelDescriptor[*pb.Message] { | |
| type consensusReactor interface { | ||
| // For when we switch from block sync reactor to the consensus | ||
| // machine. | ||
| SwitchToConsensus(ctx context.Context, state sm.State, skipWAL bool) | ||
| SwitchToConsensus(state sm.State, skipWAL bool) | ||
| } | ||
|
|
||
| type peerError struct { | ||
|
|
@@ -82,6 +83,8 @@ func (e peerError) Error() string { | |
| return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error()) | ||
| } | ||
|
|
||
| type blocksyncResult struct{ stateSynced bool } | ||
|
|
||
| // Reactor handles long-term catchup syncing. | ||
| type Reactor struct { | ||
| service.BaseService | ||
|
|
@@ -98,6 +101,16 @@ type Reactor struct { | |
| blockSync *atomicBool | ||
| previousMaxPeerHeight int64 | ||
|
|
||
| // blocksyncReady fires when blocksync should start processing blocks — | ||
| // either at OnStart (if blockSync was initially set) or via | ||
| // SwitchToBlockSync. Pre-spawned requestRoutine and poolRoutine wait on | ||
| // it before doing any work. | ||
| blocksyncReady utils.AtomicSend[utils.Option[blocksyncResult]] | ||
| // consensusReady fires once the blocksync->consensus handoff has | ||
| // happened. The pre-spawned autoRestartIfBehind monitor gates on this | ||
| // signal. | ||
| consensusReady utils.AtomicSend[bool] | ||
|
|
||
| router *p2p.Router | ||
| channel *p2p.Channel[*pb.Message] | ||
|
|
||
|
|
@@ -148,8 +161,9 @@ func NewReactor( | |
| blocksBehindThreshold: selfRemediationConfig.BlocksBehindThreshold, | ||
| blocksBehindCheckInterval: time.Duration(selfRemediationConfig.BlocksBehindCheckIntervalSeconds) * time.Second, //nolint:gosec // validated in config.ValidateBasic against MaxInt64 | ||
| restartCooldownSeconds: selfRemediationConfig.RestartCooldownSeconds, | ||
| blocksyncReady: utils.NewAtomicSend(utils.None[blocksyncResult]()), | ||
| consensusReady: utils.NewAtomicSend(false), | ||
| } | ||
|
|
||
| r.BaseService = *service.NewBaseService("BlockSync", r) | ||
| return r, nil | ||
| } | ||
|
|
@@ -184,23 +198,61 @@ func (r *Reactor) OnStart(ctx context.Context) error { | |
| r.requestsCh = requestsCh | ||
| r.errorsCh = errorsCh | ||
|
|
||
| // Pre-spawn all long-running routines so their lifetime is bound to the | ||
| // BaseService WaitGroup. Conditional routines gate on AtomicSend[bool] | ||
| // signals so SwitchToBlockSync (and the in-poolRoutine consensus handoff | ||
| // for autoRestartIfBehind) can wake them later without spawning fresh | ||
| // goroutines from outside OnStart. | ||
| r.Spawn("requestRoutine", func(ctx context.Context) error { | ||
| _, err := r.blocksyncReady.Wait(ctx, func(o utils.Option[blocksyncResult]) bool { | ||
| return o.IsPresent() | ||
| }) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| r.requestRoutine(ctx) | ||
| return nil | ||
| }) | ||
| r.Spawn("poolRoutine", func(ctx context.Context) error { | ||
| result, err := r.blocksyncReady.Wait(ctx, func(o utils.Option[blocksyncResult]) bool { | ||
| return o.IsPresent() | ||
| }) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| res, _ := result.Get() | ||
| r.poolRoutine(ctx, res.stateSynced) | ||
| return nil | ||
| }) | ||
| r.SpawnCritical("processBlockSyncCh", func(ctx context.Context) error { | ||
| r.processBlockSyncCh(ctx) | ||
| return nil | ||
| }) | ||
| r.SpawnCritical("processPeerUpdates", func(ctx context.Context) error { | ||
| r.processPeerUpdates(ctx) | ||
| return nil | ||
| }) | ||
| r.SpawnCritical("autoRestartIfBehind", func(ctx context.Context) error { | ||
| if _, err := r.consensusReady.Wait(ctx, func(ready bool) bool { return ready }); err != nil { | ||
| logger.Error("Failed to wait for consensus ready to spawn autoRestartIfBehind", "err", err) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto, can just return err |
||
| return nil | ||
| } | ||
| r.autoRestartIfBehind(ctx) | ||
| return nil | ||
| }) | ||
|
|
||
| if r.blockSync.IsSet() { | ||
| if err := r.pool.Start(ctx); err != nil { | ||
| return err | ||
| } | ||
| go r.requestRoutine(ctx) | ||
|
|
||
| go r.poolRoutine(ctx, false) | ||
| r.blocksyncReady.Store(utils.Some(blocksyncResult{true})) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wrong
|
||
| } | ||
|
|
||
| go r.processBlockSyncCh(ctx) | ||
| go r.processPeerUpdates(ctx) | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // OnStop stops the reactor by signaling to all spawned goroutines to exit and | ||
| // blocking until they all exit. | ||
| // OnStop stops the BlockPool. The reactor's own long-running goroutines were | ||
| // registered with the BaseService WaitGroup via Spawn in OnStart, so the | ||
| // BaseService blocks Stop() on their exit before this method returns. | ||
| func (r *Reactor) OnStop() { | ||
| if r.blockSync.IsSet() { | ||
| r.pool.Stop() | ||
|
|
@@ -378,9 +430,7 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error { | |
| } | ||
|
|
||
| r.syncStartTime = time.Now() | ||
|
|
||
| go r.requestRoutine(ctx) | ||
| go r.poolRoutine(ctx, true) | ||
| r.blocksyncReady.Store(utils.Some(blocksyncResult{true})) | ||
|
|
||
| if err := r.PublishStatus(types.EventDataBlockSyncStatus{ | ||
| Complete: false, | ||
|
|
@@ -473,10 +523,11 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) { | |
|
|
||
| if r.consReactor != nil { | ||
| logger.Info("switching to consensus reactor", "height", height, "blocks_synced", blocksSynced, "state_synced", stateSynced, "max_peer_height", r.pool.MaxPeerHeight()) | ||
| r.consReactor.SwitchToConsensus(ctx, state, blocksSynced > 0 || stateSynced) | ||
|
|
||
| // Auto restart should only be checked after switching to consensus mode | ||
| go r.autoRestartIfBehind(ctx) | ||
|
pompon0 marked this conversation as resolved.
|
||
| // Use the node-scoped context: SwitchToConsensus is a handoff | ||
| // to a peer reactor whose lifecycle is not tied to blocksync. | ||
| r.consReactor.SwitchToConsensus(state, blocksSynced > 0 || stateSynced) | ||
| // Wake the pre-spawned auto-restart monitor. | ||
| r.consensusReady.Store(true) | ||
| } | ||
|
|
||
| return | ||
|
|
||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OrPanic()