From 17fcc4894558cb320f0b684f7d754b1e73727701 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 9 Feb 2026 15:01:30 +0100 Subject: [PATCH 1/5] feat(sequencer): catchup from base --- block/internal/syncing/syncer.go | 44 +- pkg/sequencers/single/sequencer.go | 143 ++++- pkg/sequencers/single/sequencer_test.go | 683 ++++++++++++++++++++++++ 3 files changed, 862 insertions(+), 8 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 6df6b2c22e..4923fe4992 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -755,9 +755,49 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve // Update DA height if needed // This height is only updated when a height is processed from DA as P2P - // events do not contain DA height information + // events do not contain DA height information. + // + // When a sequencer restarts after extended downtime, it produces "catch-up" + // blocks containing forced inclusion transactions from missed DA epochs and + // submits them to DA at the current (much higher) DA height. This creates a + // gap between the state's DAHeight (tracking forced inclusion epoch progress) + // and event.DaHeight (the DA submission height). + // + // If we jump state.DAHeight directly to event.DaHeight, subsequent calls to + // VerifyForcedInclusionTxs would check the wrong epoch (the submission epoch + // instead of the next forced-inclusion epoch), causing valid catch-up blocks + // to be incorrectly flagged as malicious. + // + // To handle this, when the gap exceeds one DA epoch, we advance DAHeight by + // exactly one epoch per block. This lets the forced inclusion verifier check + // the correct epoch for each catch-up block. Once the sequencer finishes + // catching up and the gap closes, DAHeight converges to event.DaHeight. if event.DaHeight > newState.DAHeight { - newState.DAHeight = event.DaHeight + epochSize := s.genesis.DAEpochForcedInclusion + gap := event.DaHeight - newState.DAHeight + + if epochSize > 0 && gap > epochSize { + // Large gap detected — likely catch-up blocks from a restarted sequencer. + // Advance DAHeight by one epoch to keep forced inclusion verification + // aligned with the epoch the sequencer is replaying. + _, epochEnd, _ := types.CalculateEpochBoundaries( + newState.DAHeight, s.genesis.DAStartHeight, epochSize, + ) + nextEpochStart := epochEnd + 1 + if nextEpochStart > event.DaHeight { + // Shouldn't happen, but clamp to event.DaHeight as a safety net. + nextEpochStart = event.DaHeight + } + s.logger.Debug(). + Uint64("current_da_height", newState.DAHeight). + Uint64("event_da_height", event.DaHeight). + Uint64("advancing_to", nextEpochStart). + Uint64("gap", gap). + Msg("large DA height gap detected (sequencer catch-up), advancing DA height by one epoch") + newState.DAHeight = nextEpochStart + } else { + newState.DAHeight = event.DaHeight + } } batch, err := s.store.NewBatch(ctx) diff --git a/pkg/sequencers/single/sequencer.go b/pkg/sequencers/single/sequencer.go index 228bde2791..f11a238837 100644 --- a/pkg/sequencers/single/sequencer.go +++ b/pkg/sequencers/single/sequencer.go @@ -51,6 +51,16 @@ type Sequencer struct { // Cached forced inclusion transactions from the current epoch cachedForcedInclusionTxs [][]byte + + // Catch-up state: when the sequencer restarts after being down for more than + // one DA epoch, it must replay missed epochs (producing blocks with only forced + // inclusion transactions, no mempool) before resuming normal sequencing. + // This ensures the sequencer produces the same blocks that nodes running in + // base sequencing mode would have produced during the downtime. + catchingUp bool + // currentDAEndTime is the DA epoch end timestamp from the last fetched epoch. + // Used as the block timestamp during catch-up to match based sequencing behavior. + currentDAEndTime time.Time } // NewSequencer creates a new Single Sequencer @@ -168,6 +178,13 @@ func (c *Sequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.Submit // GetNextBatch implements sequencing.Sequencer. // It gets the next batch of transactions and fetch for forced included transactions. +// +// During catch-up mode (after sequencer downtime spanning one or more DA epochs), +// only forced inclusion transactions are returned — no mempool transactions. This +// ensures the sequencer produces blocks identical to what nodes running in base +// sequencing mode would have produced during the downtime. Once the sequencer has +// processed all missed DA epochs and reaches the DA head, it exits catch-up mode +// and resumes normal operation with both forced inclusion and mempool transactions. func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) { if !c.isValid(req.Id) { return nil, ErrInvalidId @@ -208,10 +225,22 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB forcedTxs = c.cachedForcedInclusionTxs[c.checkpoint.TxIndex:] } - // Get mempool transactions from queue - mempoolBatch, err := c.queue.Next(ctx) - if err != nil { - return nil, err + // Get mempool transactions from queue, but ONLY if we're not catching up. + // During catch-up, the sequencer must produce blocks identical to what base + // sequencing would produce (forced inclusion txs only, no mempool). + var mempoolBatch *coresequencer.Batch + if !c.catchingUp { + var err error + mempoolBatch, err = c.queue.Next(ctx) + if err != nil { + return nil, err + } + } else { + mempoolBatch = &coresequencer.Batch{} + c.logger.Debug(). + Uint64("checkpoint_da_height", c.checkpoint.DAHeight). + Int("forced_txs", len(forcedTxs)). + Msg("catch-up mode: skipping mempool transactions") } // Build combined tx list for filtering @@ -318,6 +347,7 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB Uint64("consumed_count", forcedTxConsumedCount). Uint64("checkpoint_tx_index", c.checkpoint.TxIndex). Uint64("checkpoint_da_height", c.checkpoint.DAHeight). + Bool("catching_up", c.catchingUp). Msg("updated checkpoint after processing forced inclusion transactions") } @@ -326,11 +356,19 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB batchTxs = append(batchTxs, validForcedTxs...) batchTxs = append(batchTxs, validMempoolTxs...) + // During catch-up, use the DA epoch end timestamp to match based sequencing behavior. + // This ensures blocks produced during catch-up have timestamps consistent with + // what base sequencing nodes would have produced. + timestamp := time.Now() + if c.catchingUp && !c.currentDAEndTime.IsZero() { + timestamp = c.currentDAEndTime + } + return &coresequencer.GetNextBatchResponse{ Batch: &coresequencer.Batch{ Transactions: batchTxs, }, - Timestamp: time.Now(), + Timestamp: timestamp, BatchData: req.LastBatchData, }, nil } @@ -374,13 +412,27 @@ func (c *Sequencer) GetDAHeight() uint64 { return c.daHeight.Load() } -// fetchNextDAEpoch fetches transactions from the next DA epoch using checkpoint +// IsCatchingUp returns whether the sequencer is in catch-up mode. +// During catch-up, the sequencer replays missed DA epochs producing blocks +// with only forced inclusion transactions (no mempool), matching the blocks +// that base sequencing nodes would have produced during sequencer downtime. +func (c *Sequencer) IsCatchingUp() bool { + return c.catchingUp +} + +// fetchNextDAEpoch fetches transactions from the next DA epoch using checkpoint. +// It also updates the catch-up state based on the DA epoch timestamp: +// - If the fetched epoch's timestamp is significantly in the past (more than +// one epoch's wall-clock duration), the sequencer enters catch-up mode. +// - If the DA height is from the future (not yet produced), the sequencer +// exits catch-up mode as it has reached the DA head. func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint64, error) { currentDAHeight := c.checkpoint.DAHeight c.logger.Debug(). Uint64("da_height", currentDAHeight). Uint64("tx_index", c.checkpoint.TxIndex). + Bool("catching_up", c.catchingUp). Msg("fetching forced inclusion transactions from DA") forcedTxsEvent, err := c.fiRetriever.RetrieveForcedIncludedTxs(ctx, currentDAHeight) @@ -389,16 +441,36 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint c.logger.Debug(). Uint64("da_height", currentDAHeight). Msg("DA height from future, waiting for DA to produce block") + + // We've reached the DA head — exit catch-up mode + if c.catchingUp { + c.logger.Info(). + Uint64("da_height", currentDAHeight). + Msg("catch-up complete: reached DA head, resuming normal sequencing") + c.catchingUp = false + } + return 0, nil } else if errors.Is(err, block.ErrForceInclusionNotConfigured) { // Forced inclusion not configured, continue without forced txs c.cachedForcedInclusionTxs = [][]byte{} + c.catchingUp = false return 0, nil } return 0, fmt.Errorf("failed to retrieve forced inclusion transactions: %w", err) } + // Store the DA epoch end time for timestamp usage during catch-up + if !forcedTxsEvent.Timestamp.IsZero() { + c.currentDAEndTime = forcedTxsEvent.Timestamp.UTC() + } + + // Determine catch-up state based on epoch timestamp. + // If the epoch we just fetched ended more than one epoch's wall-clock duration ago, + // we are behind the DA head and must catch up by replaying missed epochs. + c.updateCatchUpState(forcedTxsEvent) + // Validate and filter transactions validTxs := make([][]byte, 0, len(forcedTxsEvent.Txs)) skippedTxs := 0 @@ -420,6 +492,7 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint Int("skipped_tx_count", skippedTxs). Uint64("da_height_start", forcedTxsEvent.StartDaHeight). Uint64("da_height_end", forcedTxsEvent.EndDaHeight). + Bool("catching_up", c.catchingUp). Msg("fetched forced inclusion transactions from DA") // Cache the transactions @@ -427,3 +500,61 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint return forcedTxsEvent.EndDaHeight, nil } + +// updateCatchUpState determines whether the sequencer is catching up to the DA head. +// +// The sequencer is considered to be catching up when the DA epoch it just fetched +// has a timestamp that is significantly in the past — specifically, more than one +// full epoch's wall-clock duration ago. This means other nodes likely switched to +// base sequencing during the sequencer's downtime, and the sequencer must replay +// those missed epochs before resuming normal block production. +// +// When the epoch timestamp is recent (within one epoch duration), the sequencer +// has reached the DA head and can resume normal operation. +func (c *Sequencer) updateCatchUpState(event *block.ForcedInclusionEvent) { + if event == nil || event.Timestamp.IsZero() { + // No timestamp available (e.g., empty epoch) — don't change catch-up state. + // If we were already catching up, we remain in that state until we see a + // recent timestamp or hit HeightFromFuture. + return + } + + if c.genesis.DAEpochForcedInclusion == 0 { + // No epoch-based forced inclusion configured — catch-up is irrelevant. + c.catchingUp = false + return + } + + // Calculate how long one DA epoch takes in wall-clock time. + epochWallDuration := time.Duration(c.genesis.DAEpochForcedInclusion) * c.cfg.DA.BlockTime.Duration + + // Use a minimum threshold to avoid false positives from minor delays. + catchUpThreshold := epochWallDuration + if catchUpThreshold < 30*time.Second { + catchUpThreshold = 30 * time.Second + } + + timeSinceEpoch := time.Since(event.Timestamp) + wasCatchingUp := c.catchingUp + + if timeSinceEpoch > catchUpThreshold { + c.catchingUp = true + if !wasCatchingUp { + c.logger.Warn(). + Dur("time_since_epoch", timeSinceEpoch). + Dur("threshold", catchUpThreshold). + Uint64("epoch_start", event.StartDaHeight). + Uint64("epoch_end", event.EndDaHeight). + Msg("entering catch-up mode: DA epoch is behind head, replaying missed epochs with forced inclusion txs only") + } + } else { + c.catchingUp = false + if wasCatchingUp { + c.logger.Info(). + Dur("time_since_epoch", timeSinceEpoch). + Uint64("epoch_start", event.StartDaHeight). + Uint64("epoch_end", event.EndDaHeight). + Msg("exiting catch-up mode: reached DA head, resuming normal sequencing") + } + } +} diff --git a/pkg/sequencers/single/sequencer_test.go b/pkg/sequencers/single/sequencer_test.go index 04d7f88721..d5780542df 100644 --- a/pkg/sequencers/single/sequencer_test.go +++ b/pkg/sequencers/single/sequencer_test.go @@ -1224,6 +1224,689 @@ func TestSequencer_GetNextBatch_GasFilterError(t *testing.T) { // preserves any transactions that weren't even processed yet due to maxBytes limits. // // This test uses maxBytes to limit how many txs are fetched, triggering the unprocessed txs scenario. +func TestSequencer_CatchUp_DetectsOldEpoch(t *testing.T) { + ctx := context.Background() + logger := zerolog.New(zerolog.NewConsoleWriter()) + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA epoch at height 100 with a timestamp far in the past (simulating sequencer downtime) + oldTimestamp := time.Now().Add(-10 * time.Minute) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp}, + Data: [][]byte{[]byte("forced-tx-1")}, + }).Once() + + // Next DA epoch at height 101 also in the past (still catching up) + // Use .Maybe() since this test only calls GetNextBatch once (processing epoch 100), + // so epoch 101 may not be retrieved. + oldTimestamp2 := time.Now().Add(-9 * time.Minute) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp2}, + Data: [][]byte{[]byte("forced-tx-2")}, + }).Maybe() + + // DA epoch at height 102 is from the future (DA head reached) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + logger, + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit a mempool transaction + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-tx-1")}}, + }) + require.NoError(t, err) + + assert.False(t, seq.IsCatchingUp(), "should not be catching up initially") + + // First GetNextBatch — epoch 100 is far in the past, should enter catch-up + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + require.NotNil(t, resp.Batch) + + assert.True(t, seq.IsCatchingUp(), "should be catching up after fetching old epoch") + + // During catch-up, batch should contain only forced inclusion tx, no mempool tx + assert.Equal(t, 1, len(resp.Batch.Transactions), "should have only forced inclusion tx during catch-up") + assert.Equal(t, []byte("forced-tx-1"), resp.Batch.Transactions[0]) +} + +func TestSequencer_CatchUp_SkipsMempoolDuringCatchUp(t *testing.T) { + ctx := context.Background() + logger := zerolog.New(zerolog.NewConsoleWriter()) + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // Epoch at height 100: old timestamp (catching up) + oldTimestamp := time.Now().Add(-5 * time.Minute) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp}, + Data: [][]byte{[]byte("forced-1"), []byte("forced-2")}, + }).Once() + + // Epoch at height 101: also old (still catching up) + oldTimestamp2 := time.Now().Add(-4 * time.Minute) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp2}, + Data: [][]byte{[]byte("forced-3")}, + }).Once() + + // Epoch at height 102: from the future (head) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + logger, + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit several mempool transactions + for i := 0; i < 5; i++ { + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-tx")}}, + }) + require.NoError(t, err) + } + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // First batch (epoch 100): only forced txs + resp1, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.IsCatchingUp()) + + for _, tx := range resp1.Batch.Transactions { + assert.NotEqual(t, []byte("mempool-tx"), tx, "mempool tx should not appear during catch-up") + } + assert.Equal(t, 2, len(resp1.Batch.Transactions), "should have 2 forced txs from epoch 100") + + // Second batch (epoch 101): only forced txs + resp2, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.IsCatchingUp()) + + for _, tx := range resp2.Batch.Transactions { + assert.NotEqual(t, []byte("mempool-tx"), tx, "mempool tx should not appear during catch-up") + } + assert.Equal(t, 1, len(resp2.Batch.Transactions), "should have 1 forced tx from epoch 101") +} + +func TestSequencer_CatchUp_UsesDATimestamp(t *testing.T) { + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // Epoch at height 100: timestamp 5 minutes ago + epochTimestamp := time.Now().Add(-5 * time.Minute).UTC() + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: epochTimestamp}, + Data: [][]byte{[]byte("forced-tx")}, + }).Once() + + // Next epoch from future + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + require.NotNil(t, resp) + assert.True(t, seq.IsCatchingUp(), "should be in catch-up mode") + + // During catch-up, the timestamp should be the DA epoch end time, not time.Now() + assert.Equal(t, epochTimestamp, resp.Timestamp, + "catch-up batch timestamp should match DA epoch timestamp") +} + +func TestSequencer_CatchUp_ExitsCatchUpAtDAHead(t *testing.T) { + ctx := context.Background() + logger := zerolog.New(zerolog.NewConsoleWriter()) + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // Epoch 100: old (catch-up) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-5 * time.Minute)}, + Data: [][]byte{[]byte("forced-old")}, + }).Once() + + // Epoch 101: recent timestamp (current epoch at DA head) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()}, + Data: [][]byte{[]byte("forced-current")}, + }).Once() + + // Epoch 102: from the future + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + logger, + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit mempool tx + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-tx")}}, + }) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // First batch: catch-up (old epoch 100) + resp1, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.IsCatchingUp(), "should be catching up during old epoch") + assert.Equal(t, 1, len(resp1.Batch.Transactions), "catch-up: only forced tx") + assert.Equal(t, []byte("forced-old"), resp1.Batch.Transactions[0]) + + // Second batch: recent epoch 101 — should exit catch-up + resp2, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.False(t, seq.IsCatchingUp(), "should have exited catch-up after reaching recent epoch") + + // Should include both forced tx and mempool tx now + hasForcedTx := false + hasMempoolTx := false + for _, tx := range resp2.Batch.Transactions { + if bytes.Equal(tx, []byte("forced-current")) { + hasForcedTx = true + } + if bytes.Equal(tx, []byte("mempool-tx")) { + hasMempoolTx = true + } + } + assert.True(t, hasForcedTx, "should contain forced tx from current epoch") + assert.True(t, hasMempoolTx, "should contain mempool tx after exiting catch-up") +} + +func TestSequencer_CatchUp_HeightFromFutureExitsCatchUp(t *testing.T) { + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // Epoch 100: old, triggers catch-up + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-5 * time.Minute)}, + Data: [][]byte{[]byte("forced-tx")}, + }).Once() + + // Epoch 101: from the future — DA head reached + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // First call: fetches epoch 100 (old), enters catch-up + resp1, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.IsCatchingUp()) + assert.Equal(t, 1, len(resp1.Batch.Transactions)) + + // Second call: epoch 101 is from the future, should exit catch-up + resp2, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.False(t, seq.IsCatchingUp(), "should exit catch-up when DA returns HeightFromFuture") + // No forced txs available, batch is empty + assert.Equal(t, 0, len(resp2.Batch.Transactions)) +} + +func TestSequencer_CatchUp_NoCatchUpWhenRecentEpoch(t *testing.T) { + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // Epoch at height 100: RECENT timestamp (sequencer was NOT down for long) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()}, + Data: [][]byte{[]byte("forced-tx")}, + }).Once() + + // Next epoch from the future + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit a mempool tx + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-tx")}}, + }) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.False(t, seq.IsCatchingUp(), "should NOT be catching up when epoch is recent") + + // Should have both forced and mempool txs (normal operation) + assert.Equal(t, 2, len(resp.Batch.Transactions), "should have forced + mempool tx in normal mode") +} + +func TestSequencer_CatchUp_MultiEpochReplay(t *testing.T) { + // Simulates a sequencer that missed 3 DA epochs and must replay them all + // before resuming normal operation. + ctx := context.Background() + logger := zerolog.New(zerolog.NewConsoleWriter()) + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // 3 old epochs (100, 101, 102) — all with timestamps far in the past + for h := uint64(100); h <= 102; h++ { + ts := time.Now().Add(-time.Duration(103-h) * time.Minute) // older epochs further in the past + txData := []byte("forced-from-epoch-" + string(rune('0'+h-100))) + mockDA.MockClient.On("Retrieve", mock.Anything, h, forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: ts}, + Data: [][]byte{txData}, + }).Once() + } + + // Epoch 103: recent (DA head) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(103), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()}, + Data: [][]byte{[]byte("forced-current")}, + }).Once() + + // Epoch 104: from the future + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(104), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + logger, + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit mempool txs + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-1"), []byte("mempool-2")}}, + }) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // Process the 3 old epochs — all should be catch-up (no mempool) + for i := 0; i < 3; i++ { + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.IsCatchingUp(), "should be catching up during epoch %d", 100+i) + assert.Equal(t, 1, len(resp.Batch.Transactions), + "epoch %d: should have exactly 1 forced tx", 100+i) + + for _, tx := range resp.Batch.Transactions { + assert.NotEqual(t, []byte("mempool-1"), tx, "no mempool during catch-up epoch %d", 100+i) + assert.NotEqual(t, []byte("mempool-2"), tx, "no mempool during catch-up epoch %d", 100+i) + } + } + + // DA height should have advanced through the 3 old epochs + assert.Equal(t, uint64(103), seq.GetDAHeight(), "DA height should be at 103 after replaying 3 epochs") + + // Next batch: epoch 103 is recent — should exit catch-up and include mempool + resp4, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.False(t, seq.IsCatchingUp(), "should have exited catch-up at recent epoch") + + hasForcedCurrent := false + hasMempoolTx := false + for _, tx := range resp4.Batch.Transactions { + if bytes.Equal(tx, []byte("forced-current")) { + hasForcedCurrent = true + } + if bytes.Equal(tx, []byte("mempool-1")) || bytes.Equal(tx, []byte("mempool-2")) { + hasMempoolTx = true + } + } + assert.True(t, hasForcedCurrent, "should include forced tx from current epoch") + assert.True(t, hasMempoolTx, "should include mempool txs after exiting catch-up") +} + +func TestSequencer_CatchUp_NoForcedInclusionConfigured(t *testing.T) { + // When forced inclusion is not configured, catch-up should never activate. + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + // No forced inclusion namespace configured + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(false).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit mempool tx + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-tx")}}, + }) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.False(t, seq.IsCatchingUp(), "should never catch up when forced inclusion not configured") + assert.Equal(t, 1, len(resp.Batch.Transactions)) + assert.Equal(t, []byte("mempool-tx"), resp.Batch.Transactions[0]) +} + +func TestSequencer_CatchUp_CheckpointAdvancesDuringCatchUp(t *testing.T) { + // Verify that the checkpoint (DA epoch tracking) advances correctly during catch-up. + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // Epoch 100: old + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-5 * time.Minute)}, + Data: [][]byte{[]byte("tx-a"), []byte("tx-b")}, + }).Once() + + // Epoch 101: old + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-4 * time.Minute)}, + Data: [][]byte{[]byte("tx-c")}, + }).Once() + + // Epoch 102: from the future + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Initial checkpoint + assert.Equal(t, uint64(100), seq.checkpoint.DAHeight) + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // Process epoch 100 + resp1, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.Equal(t, 2, len(resp1.Batch.Transactions)) + + // Checkpoint should advance to epoch 101 + assert.Equal(t, uint64(101), seq.checkpoint.DAHeight) + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) + assert.Equal(t, uint64(101), seq.GetDAHeight()) + + // Process epoch 101 + resp2, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.Equal(t, 1, len(resp2.Batch.Transactions)) + + // Checkpoint should advance to epoch 102 + assert.Equal(t, uint64(102), seq.checkpoint.DAHeight) + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) + assert.Equal(t, uint64(102), seq.GetDAHeight()) +} + func TestSequencer_GetNextBatch_GasFilteringPreservesUnprocessedTxs(t *testing.T) { db := ds.NewMapDatastore() logger := zerolog.New(zerolog.NewTestWriter(t)) From c9639846aad3769b5e8982c9d5dbee77eb5d2a8e Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 9 Feb 2026 17:53:26 +0100 Subject: [PATCH 2/5] fetch DA height --- apps/evm/server/force_inclusion_test.go | 4 + block/internal/da/client.go | 17 +++ block/internal/da/interface.go | 3 + block/internal/da/tracing.go | 14 +++ block/internal/da/tracing_test.go | 9 +- pkg/sequencers/single/sequencer.go | 122 ++++++++++++--------- pkg/sequencers/single/sequencer_test.go | 138 ++++++++++++------------ test/mocks/da.go | 59 +++++++++- test/testda/dummy.go | 5 + 9 files changed, 248 insertions(+), 123 deletions(-) diff --git a/apps/evm/server/force_inclusion_test.go b/apps/evm/server/force_inclusion_test.go index a1ad3059ef..21e06bc5cb 100644 --- a/apps/evm/server/force_inclusion_test.go +++ b/apps/evm/server/force_inclusion_test.go @@ -73,6 +73,10 @@ func (m *mockDA) HasForcedInclusionNamespace() bool { return true } +func (m *mockDA) GetLatestDAHeight(_ context.Context) (uint64, error) { + return 0, nil +} + func TestForceInclusionServer_handleSendRawTransaction_Success(t *testing.T) { testHeight := uint64(100) diff --git a/block/internal/da/client.go b/block/internal/da/client.go index d2e1d626e1..41617ed49e 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -299,6 +299,23 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte) } } +// GetLatestDAHeight returns the latest height available on the DA layer by +// querying the network head. +func (c *client) GetLatestDAHeight(ctx context.Context) (uint64, error) { + headCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) + defer cancel() + + header, err := c.headerAPI.NetworkHead(headCtx) + if err != nil { + return 0, fmt.Errorf("failed to get DA network head: %w", err) + } + if header == nil { + return 0, fmt.Errorf("DA network head returned nil header") + } + + return header.Height, nil +} + // RetrieveForcedInclusion retrieves blobs from the forced inclusion namespace at the specified height. func (c *client) RetrieveForcedInclusion(ctx context.Context, height uint64) datypes.ResultRetrieve { if !c.hasForcedNamespace { diff --git a/block/internal/da/interface.go b/block/internal/da/interface.go index 69c2d18f7e..1e9f6cedee 100644 --- a/block/internal/da/interface.go +++ b/block/internal/da/interface.go @@ -17,6 +17,9 @@ type Client interface { // Get retrieves blobs by their IDs. Used for visualization and fetching specific blobs. Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) + // GetLatestDAHeight returns the latest height available on the DA layer.. + GetLatestDAHeight(ctx context.Context) (uint64, error) + // Namespace accessors. GetHeaderNamespace() []byte GetDataNamespace() []byte diff --git a/block/internal/da/tracing.go b/block/internal/da/tracing.go index 45fae2e863..4d946a8b74 100644 --- a/block/internal/da/tracing.go +++ b/block/internal/da/tracing.go @@ -123,6 +123,20 @@ func (t *tracedClient) Validate(ctx context.Context, ids []datypes.ID, proofs [] return res, nil } +func (t *tracedClient) GetLatestDAHeight(ctx context.Context) (uint64, error) { + ctx, span := t.tracer.Start(ctx, "DA.GetLatestDAHeight") + defer span.End() + + height, err := t.inner.GetLatestDAHeight(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return 0, err + } + span.SetAttributes(attribute.Int64("da.latest_height", int64(height))) + return height, nil +} + func (t *tracedClient) GetHeaderNamespace() []byte { return t.inner.GetHeaderNamespace() } func (t *tracedClient) GetDataNamespace() []byte { return t.inner.GetDataNamespace() } func (t *tracedClient) GetForcedInclusionNamespace() []byte { diff --git a/block/internal/da/tracing_test.go b/block/internal/da/tracing_test.go index ea01c9e425..de32532a31 100644 --- a/block/internal/da/tracing_test.go +++ b/block/internal/da/tracing_test.go @@ -54,10 +54,11 @@ func (m *mockFullClient) Validate(ctx context.Context, ids []datypes.ID, proofs } return nil, nil } -func (m *mockFullClient) GetHeaderNamespace() []byte { return []byte{0x01} } -func (m *mockFullClient) GetDataNamespace() []byte { return []byte{0x02} } -func (m *mockFullClient) GetForcedInclusionNamespace() []byte { return []byte{0x03} } -func (m *mockFullClient) HasForcedInclusionNamespace() bool { return true } +func (m *mockFullClient) GetLatestDAHeight(_ context.Context) (uint64, error) { return 0, nil } +func (m *mockFullClient) GetHeaderNamespace() []byte { return []byte{0x01} } +func (m *mockFullClient) GetDataNamespace() []byte { return []byte{0x02} } +func (m *mockFullClient) GetForcedInclusionNamespace() []byte { return []byte{0x03} } +func (m *mockFullClient) HasForcedInclusionNamespace() bool { return true } // setup a tracer provider + span recorder func setupDATrace(t *testing.T, inner FullClient) (FullClient, *tracetest.SpanRecorder) { diff --git a/pkg/sequencers/single/sequencer.go b/pkg/sequencers/single/sequencer.go index f11a238837..4874e77fc1 100644 --- a/pkg/sequencers/single/sequencer.go +++ b/pkg/sequencers/single/sequencer.go @@ -21,6 +21,7 @@ import ( "github.com/evstack/ev-node/pkg/genesis" seqcommon "github.com/evstack/ev-node/pkg/sequencers/common" "github.com/evstack/ev-node/pkg/store" + "github.com/evstack/ev-node/types" ) // ErrInvalidId is returned when the chain id is invalid @@ -57,6 +58,11 @@ type Sequencer struct { // inclusion transactions, no mempool) before resuming normal sequencing. // This ensures the sequencer produces the same blocks that nodes running in // base sequencing mode would have produced during the downtime. + // + // catchingUp is true when the sequencer is replaying missed DA epochs. + // It is set when we detect (via GetLatestDAHeight) that the DA layer is more + // than one epoch ahead of our checkpoint, and cleared when we hit + // ErrHeightFromFuture (meaning we've reached the DA head). catchingUp bool // currentDAEndTime is the DA epoch end timestamp from the last fetched epoch. // Used as the block timestamp during catch-up to match based sequencing behavior. @@ -421,14 +427,20 @@ func (c *Sequencer) IsCatchingUp() bool { } // fetchNextDAEpoch fetches transactions from the next DA epoch using checkpoint. -// It also updates the catch-up state based on the DA epoch timestamp: -// - If the fetched epoch's timestamp is significantly in the past (more than -// one epoch's wall-clock duration), the sequencer enters catch-up mode. +// It also updates the catch-up state based on DA heights: +// - Before the first fetch, it queries GetLatestDAHeight to determine if the +// sequencer has missed more than one DA epoch. If so, catch-up mode is +// entered and only forced-inclusion blocks (no mempool) are produced. // - If the DA height is from the future (not yet produced), the sequencer // exits catch-up mode as it has reached the DA head. func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint64, error) { currentDAHeight := c.checkpoint.DAHeight + // Determine catch-up state before the (potentially expensive) epoch fetch. + // This is done once per sequencer lifecycle — subsequent catch-up exits are + // handled by ErrHeightFromFuture below. + c.updateCatchUpState(ctx) + c.logger.Debug(). Uint64("da_height", currentDAHeight). Uint64("tx_index", c.checkpoint.TxIndex). @@ -466,11 +478,6 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint c.currentDAEndTime = forcedTxsEvent.Timestamp.UTC() } - // Determine catch-up state based on epoch timestamp. - // If the epoch we just fetched ended more than one epoch's wall-clock duration ago, - // we are behind the DA head and must catch up by replaying missed epochs. - c.updateCatchUpState(forcedTxsEvent) - // Validate and filter transactions validTxs := make([][]byte, 0, len(forcedTxsEvent.Txs)) skippedTxs := 0 @@ -501,60 +508,75 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint return forcedTxsEvent.EndDaHeight, nil } -// updateCatchUpState determines whether the sequencer is catching up to the DA head. +// updateCatchUpState determines whether the sequencer needs to catch up to the +// DA head by comparing the sequencer's checkpoint DA height against the latest +// DA height. // -// The sequencer is considered to be catching up when the DA epoch it just fetched -// has a timestamp that is significantly in the past — specifically, more than one -// full epoch's wall-clock duration ago. This means other nodes likely switched to -// base sequencing during the sequencer's downtime, and the sequencer must replay -// those missed epochs before resuming normal block production. +// The detection is purely height-based: we query GetLatestDAHeight once (on the +// first epoch fetch) and calculate how many epochs the sequencer has missed. If +// the gap exceeds one epoch, the sequencer enters catch-up mode and replays +// missed epochs with forced-inclusion transactions only (no mempool). It remains +// in catch-up until fetchNextDAEpoch hits ErrHeightFromFuture, meaning we've +// reached the DA head. // -// When the epoch timestamp is recent (within one epoch duration), the sequencer -// has reached the DA head and can resume normal operation. -func (c *Sequencer) updateCatchUpState(event *block.ForcedInclusionEvent) { - if event == nil || event.Timestamp.IsZero() { - // No timestamp available (e.g., empty epoch) — don't change catch-up state. - // If we were already catching up, we remain in that state until we see a - // recent timestamp or hit HeightFromFuture. +// This check is performed only once per sequencer lifecycle. If the downtime was +// short enough that the sequencer is still within the current or next epoch, no +// catch-up is needed and the (lightweight) GetLatestDAHeight call is the only +// overhead. +func (c *Sequencer) updateCatchUpState(ctx context.Context) { + // Already catching up — nothing to do. We'll exit via ErrHeightFromFuture. + if c.catchingUp { return } - if c.genesis.DAEpochForcedInclusion == 0 { + epochSize := c.genesis.DAEpochForcedInclusion + if epochSize == 0 { // No epoch-based forced inclusion configured — catch-up is irrelevant. - c.catchingUp = false return } - // Calculate how long one DA epoch takes in wall-clock time. - epochWallDuration := time.Duration(c.genesis.DAEpochForcedInclusion) * c.cfg.DA.BlockTime.Duration + currentDAHeight := c.checkpoint.DAHeight + daStartHeight := c.genesis.DAStartHeight + + latestDAHeight, err := c.daClient.GetLatestDAHeight(ctx) + if err != nil { + c.logger.Warn().Err(err). + Msg("failed to get latest DA height for catch-up detection, skipping check") + return + } - // Use a minimum threshold to avoid false positives from minor delays. - catchUpThreshold := epochWallDuration - if catchUpThreshold < 30*time.Second { - catchUpThreshold = 30 * time.Second + if latestDAHeight <= currentDAHeight { + // DA hasn't moved beyond our position — nothing to catch up. + c.logger.Debug(). + Uint64("checkpoint_da_height", currentDAHeight). + Uint64("latest_da_height", latestDAHeight). + Msg("sequencer is at or ahead of DA head, no catch-up needed") + return } - timeSinceEpoch := time.Since(event.Timestamp) - wasCatchingUp := c.catchingUp + // Calculate epoch numbers for current position and DA head. + currentEpoch := types.CalculateEpochNumber(currentDAHeight, daStartHeight, epochSize) + latestEpoch := types.CalculateEpochNumber(latestDAHeight, daStartHeight, epochSize) + missedEpochs := latestEpoch - currentEpoch - if timeSinceEpoch > catchUpThreshold { - c.catchingUp = true - if !wasCatchingUp { - c.logger.Warn(). - Dur("time_since_epoch", timeSinceEpoch). - Dur("threshold", catchUpThreshold). - Uint64("epoch_start", event.StartDaHeight). - Uint64("epoch_end", event.EndDaHeight). - Msg("entering catch-up mode: DA epoch is behind head, replaying missed epochs with forced inclusion txs only") - } - } else { - c.catchingUp = false - if wasCatchingUp { - c.logger.Info(). - Dur("time_since_epoch", timeSinceEpoch). - Uint64("epoch_start", event.StartDaHeight). - Uint64("epoch_end", event.EndDaHeight). - Msg("exiting catch-up mode: reached DA head, resuming normal sequencing") - } + if missedEpochs <= 1 { + // Within the current or next epoch — normal operation, no catch-up. + c.logger.Debug(). + Uint64("checkpoint_da_height", currentDAHeight). + Uint64("latest_da_height", latestDAHeight). + Uint64("current_epoch", currentEpoch). + Uint64("latest_epoch", latestEpoch). + Msg("sequencer within one epoch of DA head, no catch-up needed") + return } + + // The DA layer is more than one epoch ahead. Enter catch-up mode. + c.catchingUp = true + c.logger.Warn(). + Uint64("checkpoint_da_height", currentDAHeight). + Uint64("latest_da_height", latestDAHeight). + Uint64("current_epoch", currentEpoch). + Uint64("latest_epoch", latestEpoch). + Uint64("missed_epochs", missedEpochs). + Msg("entering catch-up mode: DA layer is multiple epochs ahead, replaying missed epochs with forced inclusion txs only") } diff --git a/pkg/sequencers/single/sequencer_test.go b/pkg/sequencers/single/sequencer_test.go index d5780542df..e4dcfb1250 100644 --- a/pkg/sequencers/single/sequencer_test.go +++ b/pkg/sequencers/single/sequencer_test.go @@ -381,6 +381,9 @@ func TestSequencer_GetNextBatch_ForcedInclusionAndBatch_MaxBytes(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 100 — same as sequencer start, no catch-up needed + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(100), nil).Maybe() + // Create forced inclusion txs that are 50 and 60 bytes forcedTx1 := make([]byte, 50) forcedTx2 := make([]byte, 60) @@ -469,6 +472,9 @@ func TestSequencer_GetNextBatch_ForcedInclusion_ExceedsMaxBytes(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 100 — same as sequencer start, no catch-up needed + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(100), nil).Maybe() + // Create forced inclusion txs where combined they exceed maxBytes forcedTx1 := make([]byte, 100) forcedTx2 := make([]byte, 80) // This would be deferred @@ -549,6 +555,9 @@ func TestSequencer_GetNextBatch_AlwaysCheckPendingForcedInclusion(t *testing.T) mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 100 — same as sequencer start, no catch-up needed + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(100), nil).Maybe() + // First call returns large forced txs largeForcedTx1, largeForcedTx2 := make([]byte, 75), make([]byte, 75) mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ @@ -887,6 +896,10 @@ func TestSequencer_CheckpointPersistence_CrashRecovery(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 101 — close to sequencer start (100), no catch-up needed. + // Use Maybe() since two sequencer instances share this mock. + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(101), nil).Maybe() + // Create forced inclusion txs at DA height 100 // Use sizes that all fit in one batch to test checkpoint advancing forcedTx1 := make([]byte, 50) @@ -986,6 +999,9 @@ func TestSequencer_GetNextBatch_EmptyDABatch_IncreasesDAHeight(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 100 — same as sequencer start, no catch-up needed + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(100), nil).Maybe() + // First DA epoch returns empty transactions mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess}, @@ -1239,27 +1255,17 @@ func TestSequencer_CatchUp_DetectsOldEpoch(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() - // DA epoch at height 100 with a timestamp far in the past (simulating sequencer downtime) + // DA head is at height 105 — sequencer starts at 100 with epoch size 1, + // so it has missed 5 epochs (>1), triggering catch-up. + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + + // DA epoch at height 100 oldTimestamp := time.Now().Add(-10 * time.Minute) mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp}, Data: [][]byte{[]byte("forced-tx-1")}, }).Once() - // Next DA epoch at height 101 also in the past (still catching up) - // Use .Maybe() since this test only calls GetNextBatch once (processing epoch 100), - // so epoch 101 may not be retrieved. - oldTimestamp2 := time.Now().Add(-9 * time.Minute) - mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ - BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp2}, - Data: [][]byte{[]byte("forced-tx-2")}, - }).Maybe() - - // DA epoch at height 102 is from the future (DA head reached) - mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ - BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, - }).Maybe() - gen := genesis.Genesis{ ChainID: "test-chain", DAStartHeight: 100, @@ -1287,7 +1293,7 @@ func TestSequencer_CatchUp_DetectsOldEpoch(t *testing.T) { assert.False(t, seq.IsCatchingUp(), "should not be catching up initially") - // First GetNextBatch — epoch 100 is far in the past, should enter catch-up + // First GetNextBatch — DA head is far ahead, should enter catch-up req := coresequencer.GetNextBatchRequest{ Id: []byte("test-chain"), MaxBytes: 1000000, @@ -1297,7 +1303,7 @@ func TestSequencer_CatchUp_DetectsOldEpoch(t *testing.T) { require.NoError(t, err) require.NotNil(t, resp.Batch) - assert.True(t, seq.IsCatchingUp(), "should be catching up after fetching old epoch") + assert.True(t, seq.IsCatchingUp(), "should be catching up after detecting epoch gap") // During catch-up, batch should contain only forced inclusion tx, no mempool tx assert.Equal(t, 1, len(resp.Batch.Transactions), "should have only forced inclusion tx during catch-up") @@ -1319,21 +1325,26 @@ func TestSequencer_CatchUp_SkipsMempoolDuringCatchUp(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() - // Epoch at height 100: old timestamp (catching up) + // DA head is at 105 — sequencer starts at 100 with epoch size 1, + // so it has missed multiple epochs, triggering catch-up. + // Called once on first fetchNextDAEpoch; subsequent fetches skip the check. + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + + // Epoch at height 100: two forced txs oldTimestamp := time.Now().Add(-5 * time.Minute) mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp}, Data: [][]byte{[]byte("forced-1"), []byte("forced-2")}, }).Once() - // Epoch at height 101: also old (still catching up) + // Epoch at height 101: one forced tx oldTimestamp2 := time.Now().Add(-4 * time.Minute) mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp2}, Data: [][]byte{[]byte("forced-3")}, }).Once() - // Epoch at height 102: from the future (head) + // Epoch at height 102: from the future (head reached during replay) mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, }).Maybe() @@ -1406,6 +1417,9 @@ func TestSequencer_CatchUp_UsesDATimestamp(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 105 — multiple epochs ahead, triggers catch-up + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + // Epoch at height 100: timestamp 5 minutes ago epochTimestamp := time.Now().Add(-5 * time.Minute).UTC() mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ @@ -1413,11 +1427,6 @@ func TestSequencer_CatchUp_UsesDATimestamp(t *testing.T) { Data: [][]byte{[]byte("forced-tx")}, }).Once() - // Next epoch from future - mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ - BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, - }).Maybe() - gen := genesis.Genesis{ ChainID: "test-chain", DAStartHeight: 100, @@ -1467,22 +1476,19 @@ func TestSequencer_CatchUp_ExitsCatchUpAtDAHead(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 105 — multiple epochs ahead, triggers catch-up + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + // Epoch 100: old (catch-up) mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-5 * time.Minute)}, Data: [][]byte{[]byte("forced-old")}, }).Once() - // Epoch 101: recent timestamp (current epoch at DA head) + // Epoch 101: fetched during catch-up, but returns HeightFromFuture to exit catch-up mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ - BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()}, - Data: [][]byte{[]byte("forced-current")}, - }).Once() - - // Epoch 102: from the future - mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, - }).Maybe() + }).Once() gen := genesis.Genesis{ ChainID: "test-chain", @@ -1522,23 +1528,18 @@ func TestSequencer_CatchUp_ExitsCatchUpAtDAHead(t *testing.T) { assert.Equal(t, 1, len(resp1.Batch.Transactions), "catch-up: only forced tx") assert.Equal(t, []byte("forced-old"), resp1.Batch.Transactions[0]) - // Second batch: recent epoch 101 — should exit catch-up + // Second batch: epoch 101 returns HeightFromFuture — should exit catch-up resp2, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) - assert.False(t, seq.IsCatchingUp(), "should have exited catch-up after reaching recent epoch") + assert.False(t, seq.IsCatchingUp(), "should have exited catch-up after reaching DA head") - // Should include both forced tx and mempool tx now - hasForcedTx := false + // Should include mempool tx now (no forced txs available) hasMempoolTx := false for _, tx := range resp2.Batch.Transactions { - if bytes.Equal(tx, []byte("forced-current")) { - hasForcedTx = true - } if bytes.Equal(tx, []byte("mempool-tx")) { hasMempoolTx = true } } - assert.True(t, hasForcedTx, "should contain forced tx from current epoch") assert.True(t, hasMempoolTx, "should contain mempool tx after exiting catch-up") } @@ -1556,16 +1557,19 @@ func TestSequencer_CatchUp_HeightFromFutureExitsCatchUp(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() - // Epoch 100: old, triggers catch-up + // DA head is at 105 — multiple epochs ahead, triggers catch-up + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + + // Epoch 100: success, fetched during catch-up mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-5 * time.Minute)}, Data: [][]byte{[]byte("forced-tx")}, }).Once() - // Epoch 101: from the future — DA head reached + // Epoch 101: from the future — DA head reached, exits catch-up mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, - }).Maybe() + }).Once() gen := genesis.Genesis{ ChainID: "test-chain", @@ -1591,7 +1595,7 @@ func TestSequencer_CatchUp_HeightFromFutureExitsCatchUp(t *testing.T) { LastBatchData: nil, } - // First call: fetches epoch 100 (old), enters catch-up + // First call: fetches epoch 100, enters catch-up via epoch gap detection resp1, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) assert.True(t, seq.IsCatchingUp()) @@ -1619,17 +1623,16 @@ func TestSequencer_CatchUp_NoCatchUpWhenRecentEpoch(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() - // Epoch at height 100: RECENT timestamp (sequencer was NOT down for long) + // DA head is at 100 — sequencer starts at 100 with epoch size 1, + // so it is within the same epoch (0 missed). No catch-up. + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(100), nil).Once() + + // Epoch at height 100: current epoch mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()}, Data: [][]byte{[]byte("forced-tx")}, }).Once() - // Next epoch from the future - mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ - BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, - }).Maybe() - gen := genesis.Genesis{ ChainID: "test-chain", DAStartHeight: 100, @@ -1663,7 +1666,7 @@ func TestSequencer_CatchUp_NoCatchUpWhenRecentEpoch(t *testing.T) { resp, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) - assert.False(t, seq.IsCatchingUp(), "should NOT be catching up when epoch is recent") + assert.False(t, seq.IsCatchingUp(), "should NOT be catching up when within one epoch of DA head") // Should have both forced and mempool txs (normal operation) assert.Equal(t, 2, len(resp.Batch.Transactions), "should have forced + mempool tx in normal mode") @@ -1686,6 +1689,11 @@ func TestSequencer_CatchUp_MultiEpochReplay(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 106 — sequencer starts at 100 with epoch size 1, + // so it has missed 6 epochs (>1), triggering catch-up. + // Called once on first fetchNextDAEpoch. + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(106), nil).Once() + // 3 old epochs (100, 101, 102) — all with timestamps far in the past for h := uint64(100); h <= 102; h++ { ts := time.Now().Add(-time.Duration(103-h) * time.Minute) // older epochs further in the past @@ -1696,16 +1704,10 @@ func TestSequencer_CatchUp_MultiEpochReplay(t *testing.T) { }).Once() } - // Epoch 103: recent (DA head) + // Epoch 103: returns HeightFromFuture — DA head reached, exits catch-up mockDA.MockClient.On("Retrieve", mock.Anything, uint64(103), forcedInclusionNS).Return(datypes.ResultRetrieve{ - BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()}, - Data: [][]byte{[]byte("forced-current")}, - }).Once() - - // Epoch 104: from the future - mockDA.MockClient.On("Retrieve", mock.Anything, uint64(104), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, - }).Maybe() + }).Once() gen := genesis.Genesis{ ChainID: "test-chain", @@ -1755,27 +1757,24 @@ func TestSequencer_CatchUp_MultiEpochReplay(t *testing.T) { // DA height should have advanced through the 3 old epochs assert.Equal(t, uint64(103), seq.GetDAHeight(), "DA height should be at 103 after replaying 3 epochs") - // Next batch: epoch 103 is recent — should exit catch-up and include mempool + // Next batch: epoch 103 returns HeightFromFuture — should exit catch-up and include mempool resp4, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) - assert.False(t, seq.IsCatchingUp(), "should have exited catch-up at recent epoch") + assert.False(t, seq.IsCatchingUp(), "should have exited catch-up at DA head") - hasForcedCurrent := false hasMempoolTx := false for _, tx := range resp4.Batch.Transactions { - if bytes.Equal(tx, []byte("forced-current")) { - hasForcedCurrent = true - } if bytes.Equal(tx, []byte("mempool-1")) || bytes.Equal(tx, []byte("mempool-2")) { hasMempoolTx = true } } - assert.True(t, hasForcedCurrent, "should include forced tx from current epoch") assert.True(t, hasMempoolTx, "should include mempool txs after exiting catch-up") } func TestSequencer_CatchUp_NoForcedInclusionConfigured(t *testing.T) { // When forced inclusion is not configured, catch-up should never activate. + // GetLatestDAHeight should NOT be called because DAEpochForcedInclusion == 0 + // causes updateCatchUpState to bail out early. ctx := context.Background() db := ds.NewMapDatastore() @@ -1791,7 +1790,7 @@ func TestSequencer_CatchUp_NoForcedInclusionConfigured(t *testing.T) { gen := genesis.Genesis{ ChainID: "test-chain", DAStartHeight: 100, - DAEpochForcedInclusion: 1, + DAEpochForcedInclusion: 0, // no epoch-based forced inclusion } seq, err := NewSequencer( @@ -1841,6 +1840,9 @@ func TestSequencer_CatchUp_CheckpointAdvancesDuringCatchUp(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 105 — multiple epochs ahead, triggers catch-up + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + // Epoch 100: old mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-5 * time.Minute)}, diff --git a/test/mocks/da.go b/test/mocks/da.go index 0b5c71a49c..c7d17d5bbf 100644 --- a/test/mocks/da.go +++ b/test/mocks/da.go @@ -7,7 +7,7 @@ package mocks import ( "context" - "github.com/evstack/ev-node/pkg/da/types" + da "github.com/evstack/ev-node/pkg/da/types" mock "github.com/stretchr/testify/mock" ) @@ -251,6 +251,63 @@ func (_c *MockClient_GetHeaderNamespace_Call) RunAndReturn(run func() []byte) *M } // HasForcedInclusionNamespace provides a mock function for the type MockClient +func (_mock *MockClient) GetLatestDAHeight(ctx context.Context) (uint64, error) { + ret := _mock.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for GetLatestDAHeight") + } + + var r0 uint64 + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context) (uint64, error)); ok { + return returnFunc(ctx) + } + if returnFunc, ok := ret.Get(0).(func(context.Context) uint64); ok { + r0 = returnFunc(ctx) + } else { + r0 = ret.Get(0).(uint64) + } + if returnFunc, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = returnFunc(ctx) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockClient_GetLatestDAHeight_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestDAHeight' +type MockClient_GetLatestDAHeight_Call struct { + *mock.Call +} + +// GetLatestDAHeight is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockClient_Expecter) GetLatestDAHeight(ctx interface{}) *MockClient_GetLatestDAHeight_Call { + return &MockClient_GetLatestDAHeight_Call{Call: _e.mock.On("GetLatestDAHeight", ctx)} +} + +func (_c *MockClient_GetLatestDAHeight_Call) Run(run func(ctx context.Context)) *MockClient_GetLatestDAHeight_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + run(arg0) + }) + return _c +} + +func (_c *MockClient_GetLatestDAHeight_Call) Return(height uint64, err error) *MockClient_GetLatestDAHeight_Call { + _c.Call.Return(height, err) + return _c +} + +func (_c *MockClient_GetLatestDAHeight_Call) RunAndReturn(run func(context.Context) (uint64, error)) *MockClient_GetLatestDAHeight_Call { + _c.Call.Return(run) + return _c +} + func (_mock *MockClient) HasForcedInclusionNamespace() bool { ret := _mock.Called() diff --git a/test/testda/dummy.go b/test/testda/dummy.go index 684d3fcee5..648021b76a 100644 --- a/test/testda/dummy.go +++ b/test/testda/dummy.go @@ -184,6 +184,11 @@ func (d *DummyDA) GetForcedInclusionNamespace() []byte { return nil } // HasForcedInclusionNamespace reports whether forced inclusion is configured. func (d *DummyDA) HasForcedInclusionNamespace() bool { return false } +// GetLatestDAHeight returns the current DA height (the latest height available). +func (d *DummyDA) GetLatestDAHeight(_ context.Context) (uint64, error) { + return d.height.Load(), nil +} + // Get retrieves blobs by ID (stub implementation). func (d *DummyDA) Get(_ context.Context, _ []datypes.ID, _ []byte) ([]datypes.Blob, error) { return nil, nil From 42f04058156dfac1db5fc498df837a1b1926dbe6 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 10 Feb 2026 15:43:33 +0100 Subject: [PATCH 3/5] cl --- CHANGELOG.md | 6 ++++++ apps/evm/go.mod | 8 ++++---- apps/evm/go.sum | 4 ---- apps/grpc/go.mod | 8 ++++---- apps/grpc/go.sum | 4 ---- apps/testapp/go.mod | 2 +- apps/testapp/go.sum | 2 -- 7 files changed, 15 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cf7ce9a519..0f6d66dabc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add disaster recovery for sequencer + - Catch up possible DA-only blocks when restarting. [#3057](https://github.com/evstack/ev-node/pull/3057) + - Verify DA and P2P state on restart (prevent double-signing). [#3061](https://github.com/evstack/ev-node/pull/3061) + ## v1.0.0-rc.4 ### Changes diff --git a/apps/evm/go.mod b/apps/evm/go.mod index ed4b6c5126..4052c3afb2 100644 --- a/apps/evm/go.mod +++ b/apps/evm/go.mod @@ -2,10 +2,10 @@ module github.com/evstack/ev-node/apps/evm go 1.25.6 -//replace ( -// github.com/evstack/ev-node => ../../ -// github.com/evstack/ev-node/execution/evm => ../../execution/evm -//) +replace ( + github.com/evstack/ev-node => ../../ + github.com/evstack/ev-node/execution/evm => ../../execution/evm +) require ( github.com/ethereum/go-ethereum v1.16.8 diff --git a/apps/evm/go.sum b/apps/evm/go.sum index 00e5995e9a..49e723062b 100644 --- a/apps/evm/go.sum +++ b/apps/evm/go.sum @@ -411,12 +411,8 @@ github.com/ethereum/go-ethereum v1.16.8 h1:LLLfkZWijhR5m6yrAXbdlTeXoqontH+Ga2f9i github.com/ethereum/go-ethereum v1.16.8/go.mod h1:Fs6QebQbavneQTYcA39PEKv2+zIjX7rPUZ14DER46wk= github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8= github.com/ethereum/go-verkle v0.2.2/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk= -github.com/evstack/ev-node v1.0.0-rc.4 h1:Ju7pSETFdadBZxmAj0//4z7hHkXbSRDy9iTzhF60Dew= -github.com/evstack/ev-node v1.0.0-rc.4/go.mod h1:xGCH5NCdGiYk6v3GVPm4NhzAtcKQgnaVnORg8b4tbOk= github.com/evstack/ev-node/core v1.0.0-rc.1 h1:Dic2PMUMAYUl5JW6DkDj6HXDEWYzorVJQuuUJOV0FjE= github.com/evstack/ev-node/core v1.0.0-rc.1/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= -github.com/evstack/ev-node/execution/evm v1.0.0-rc.3 h1:3o8H1TNywnst56lo2RlS2SXulDfp9yZJtkYYh7ZJrdM= -github.com/evstack/ev-node/execution/evm v1.0.0-rc.3/go.mod h1:VUEEklKoclg45GL7dzLoDwu3UQ4ptT3rF8bw5zUmnRk= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= diff --git a/apps/grpc/go.mod b/apps/grpc/go.mod index e7e57b2bfe..19611a2624 100644 --- a/apps/grpc/go.mod +++ b/apps/grpc/go.mod @@ -2,10 +2,10 @@ module github.com/evstack/ev-node/apps/grpc go 1.25.6 -//replace ( -// github.com/evstack/ev-node => ../../ -// github.com/evstack/ev-node/execution/grpc => ../../execution/grpc -//) +replace ( + github.com/evstack/ev-node => ../../ + github.com/evstack/ev-node/execution/grpc => ../../execution/grpc +) require ( github.com/evstack/ev-node v1.0.0-rc.4 diff --git a/apps/grpc/go.sum b/apps/grpc/go.sum index cd9dc009b9..13b8f9adce 100644 --- a/apps/grpc/go.sum +++ b/apps/grpc/go.sum @@ -367,12 +367,8 @@ github.com/envoyproxy/protoc-gen-validate v0.10.0/go.mod h1:DRjgyB0I43LtJapqN6Ni github.com/envoyproxy/protoc-gen-validate v0.10.1/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss= github.com/envoyproxy/protoc-gen-validate v1.0.1/go.mod h1:0vj8bNkYbSTNS2PIyH87KZaeN4x9zpL9Qt8fQC7d+vs= github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= -github.com/evstack/ev-node v1.0.0-rc.4 h1:Ju7pSETFdadBZxmAj0//4z7hHkXbSRDy9iTzhF60Dew= -github.com/evstack/ev-node v1.0.0-rc.4/go.mod h1:xGCH5NCdGiYk6v3GVPm4NhzAtcKQgnaVnORg8b4tbOk= github.com/evstack/ev-node/core v1.0.0-rc.1 h1:Dic2PMUMAYUl5JW6DkDj6HXDEWYzorVJQuuUJOV0FjE= github.com/evstack/ev-node/core v1.0.0-rc.1/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= -github.com/evstack/ev-node/execution/grpc v1.0.0-rc.1 h1:OzrWLDDY6/9+LWx0XmUqPzxs/CHZRJICOwQ0Me/i6dY= -github.com/evstack/ev-node/execution/grpc v1.0.0-rc.1/go.mod h1:Pr/sF6Zx8am9ZeWFcoz1jYPs0kXmf+OmL8Tz2Gyq7E4= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= diff --git a/apps/testapp/go.mod b/apps/testapp/go.mod index befa3aa536..d4e7306a95 100644 --- a/apps/testapp/go.mod +++ b/apps/testapp/go.mod @@ -2,7 +2,7 @@ module github.com/evstack/ev-node/apps/testapp go 1.25.6 -//replace github.com/evstack/ev-node => ../../ +replace github.com/evstack/ev-node => ../../ require ( github.com/evstack/ev-node v1.0.0-rc.4 diff --git a/apps/testapp/go.sum b/apps/testapp/go.sum index f07cb58dc1..13b8f9adce 100644 --- a/apps/testapp/go.sum +++ b/apps/testapp/go.sum @@ -367,8 +367,6 @@ github.com/envoyproxy/protoc-gen-validate v0.10.0/go.mod h1:DRjgyB0I43LtJapqN6Ni github.com/envoyproxy/protoc-gen-validate v0.10.1/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss= github.com/envoyproxy/protoc-gen-validate v1.0.1/go.mod h1:0vj8bNkYbSTNS2PIyH87KZaeN4x9zpL9Qt8fQC7d+vs= github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= -github.com/evstack/ev-node v1.0.0-rc.4 h1:Ju7pSETFdadBZxmAj0//4z7hHkXbSRDy9iTzhF60Dew= -github.com/evstack/ev-node v1.0.0-rc.4/go.mod h1:xGCH5NCdGiYk6v3GVPm4NhzAtcKQgnaVnORg8b4tbOk= github.com/evstack/ev-node/core v1.0.0-rc.1 h1:Dic2PMUMAYUl5JW6DkDj6HXDEWYzorVJQuuUJOV0FjE= github.com/evstack/ev-node/core v1.0.0-rc.1/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= From 014510be905fab25e1058a1f6c5f266e4e8cfcd6 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 10 Feb 2026 17:20:50 +0100 Subject: [PATCH 4/5] align timestamping --- pkg/sequencers/single/sequencer.go | 9 +- pkg/sequencers/single/sequencer_test.go | 206 ++++++++++++++++++++++++ 2 files changed, 212 insertions(+), 3 deletions(-) diff --git a/pkg/sequencers/single/sequencer.go b/pkg/sequencers/single/sequencer.go index 4874e77fc1..fdf21e4024 100644 --- a/pkg/sequencers/single/sequencer.go +++ b/pkg/sequencers/single/sequencer.go @@ -363,11 +363,14 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB batchTxs = append(batchTxs, validMempoolTxs...) // During catch-up, use the DA epoch end timestamp to match based sequencing behavior. - // This ensures blocks produced during catch-up have timestamps consistent with - // what base sequencing nodes would have produced. + // Replicates based sequencing nodes' behavior of timestamping blocks during catchingUp. timestamp := time.Now() if c.catchingUp && !c.currentDAEndTime.IsZero() { - timestamp = c.currentDAEndTime + var remainingForcedTxs uint64 + if len(c.cachedForcedInclusionTxs) > 0 { + remainingForcedTxs = uint64(len(c.cachedForcedInclusionTxs)) - c.checkpoint.TxIndex + } + timestamp = c.currentDAEndTime.Add(-time.Duration(remainingForcedTxs) * time.Millisecond) } return &coresequencer.GetNextBatchResponse{ diff --git a/pkg/sequencers/single/sequencer_test.go b/pkg/sequencers/single/sequencer_test.go index e4dcfb1250..fc71f6bf08 100644 --- a/pkg/sequencers/single/sequencer_test.go +++ b/pkg/sequencers/single/sequencer_test.go @@ -1909,6 +1909,212 @@ func TestSequencer_CatchUp_CheckpointAdvancesDuringCatchUp(t *testing.T) { assert.Equal(t, uint64(102), seq.GetDAHeight()) } +func TestSequencer_CatchUp_MonotonicTimestamps(t *testing.T) { + // When a single DA epoch has more forced txs than fit in one block, + // catch-up must produce strictly monotonic timestamps across the + // resulting blocks. This uses the same jitter scheme as the based + // sequencer: timestamp = DAEndTime - (remainingForcedTxs * 1ms). + ctx := context.Background() + logger := zerolog.New(zerolog.NewConsoleWriter()) + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA head is far ahead — triggers catch-up + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(110), nil).Once() + + // Epoch at height 100: 3 forced txs, each 100 bytes + epochTimestamp := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) + tx1 := make([]byte, 100) + tx2 := make([]byte, 100) + tx3 := make([]byte, 100) + copy(tx1, "forced-tx-1") + copy(tx2, "forced-tx-2") + copy(tx3, "forced-tx-3") + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: epochTimestamp}, + Data: [][]byte{tx1, tx2, tx3}, + }).Once() + + // Epoch at height 101: single tx (to verify cross-epoch monotonicity) + epoch2Timestamp := time.Date(2025, 1, 1, 12, 0, 10, 0, time.UTC) // 10 seconds later + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: epoch2Timestamp}, + Data: [][]byte{[]byte("forced-tx-4")}, + }).Once() + + // Epoch 102: future — exits catch-up + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + // Custom executor: only 1 tx fits per block (gas-limited) + mockExec := mocks.NewMockExecutor(t) + mockExec.On("GetExecutionInfo", mock.Anything).Return(execution.ExecutionInfo{MaxGas: 1000000}, nil).Maybe() + mockExec.On("FilterTxs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( + func(ctx context.Context, txs [][]byte, maxBytes, maxGas uint64, hasForceIncludedTransaction bool) []execution.FilterStatus { + result := make([]execution.FilterStatus, len(txs)) + // Only first tx fits, rest are postponed + for i := range result { + if i == 0 { + result[i] = execution.FilterOK + } else { + result[i] = execution.FilterPostpone + } + } + return result + }, + nil, + ).Maybe() + + seq, err := NewSequencer( + logger, + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + mockExec, + ) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // Produce 3 blocks from epoch 100 (1 tx each due to gas filter) + var timestamps []time.Time + for i := 0; i < 3; i++ { + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.IsCatchingUp(), "should be catching up during block %d", i) + assert.Equal(t, 1, len(resp.Batch.Transactions), "block %d: exactly 1 forced tx", i) + timestamps = append(timestamps, resp.Timestamp) + } + + // All 3 timestamps must be strictly monotonically increasing + for i := 1; i < len(timestamps); i++ { + assert.True(t, timestamps[i].After(timestamps[i-1]), + "timestamp[%d] (%v) must be strictly after timestamp[%d] (%v)", + i, timestamps[i], i-1, timestamps[i-1]) + } + + // Verify exact jitter values: + // Block 0: 3 txs total, 1 consumed → 2 remaining → T - 2ms + // Block 1: 1 consumed → 1 remaining → T - 1ms + // Block 2: 1 consumed → 0 remaining → T + assert.Equal(t, epochTimestamp.Add(-2*time.Millisecond), timestamps[0], "block 0: T - 2ms") + assert.Equal(t, epochTimestamp.Add(-1*time.Millisecond), timestamps[1], "block 1: T - 1ms") + assert.Equal(t, epochTimestamp, timestamps[2], "block 2: T (exact epoch end time)") + + // Block from epoch 101 should also be monotonically after epoch 100's last block + resp4, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.IsCatchingUp(), "should still be catching up") + assert.Equal(t, 1, len(resp4.Batch.Transactions)) + assert.True(t, resp4.Timestamp.After(timestamps[2]), + "epoch 101 timestamp (%v) must be after epoch 100 last timestamp (%v)", + resp4.Timestamp, timestamps[2]) + assert.Equal(t, epoch2Timestamp, resp4.Timestamp, "single-tx epoch gets exact DA end time") +} + +func TestSequencer_CatchUp_MonotonicTimestamps_EmptyEpoch(t *testing.T) { + // Verify that an empty DA epoch (no forced txs) still advances the + // checkpoint and updates currentDAEndTime so subsequent epochs get + // correct timestamps. + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(110), nil).Once() + + // Epoch 100: empty (no forced txs) but valid timestamp + emptyEpochTimestamp := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: emptyEpochTimestamp}, + Data: [][]byte{}, + }).Once() + + // Epoch 101: has a forced tx with a later timestamp + epoch2Timestamp := time.Date(2025, 1, 1, 12, 0, 15, 0, time.UTC) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: epoch2Timestamp}, + Data: [][]byte{[]byte("forced-tx-after-empty")}, + }).Once() + + // Epoch 102: future + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // First call processes the empty epoch 100 — empty batch, but checkpoint advances + resp1, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.IsCatchingUp()) + assert.Equal(t, 0, len(resp1.Batch.Transactions), "empty epoch should produce empty batch") + assert.Equal(t, emptyEpochTimestamp, resp1.Timestamp, + "empty epoch batch should use epoch DA end time (0 remaining)") + + // Second call processes epoch 101 — should have later timestamp + resp2, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.IsCatchingUp()) + assert.Equal(t, 1, len(resp2.Batch.Transactions)) + assert.True(t, resp2.Timestamp.After(resp1.Timestamp), + "epoch 101 timestamp (%v) must be after empty epoch 100 timestamp (%v)", + resp2.Timestamp, resp1.Timestamp) +} + func TestSequencer_GetNextBatch_GasFilteringPreservesUnprocessedTxs(t *testing.T) { db := ds.NewMapDatastore() logger := zerolog.New(zerolog.NewTestWriter(t)) From a30ea6885a63299e3d3b48e43841e86f54c5bd97 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 11 Feb 2026 13:28:29 +0100 Subject: [PATCH 5/5] updates --- pkg/sequencers/single/sequencer.go | 36 ++++++++++++++----------- pkg/sequencers/single/sequencer_test.go | 18 ++++++------- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/pkg/sequencers/single/sequencer.go b/pkg/sequencers/single/sequencer.go index fdf21e4024..27a36c71ad 100644 --- a/pkg/sequencers/single/sequencer.go +++ b/pkg/sequencers/single/sequencer.go @@ -63,7 +63,8 @@ type Sequencer struct { // It is set when we detect (via GetLatestDAHeight) that the DA layer is more // than one epoch ahead of our checkpoint, and cleared when we hit // ErrHeightFromFuture (meaning we've reached the DA head). - catchingUp bool + + catchingUp atomic.Bool // currentDAEndTime is the DA epoch end timestamp from the last fetched epoch. // Used as the block timestamp during catch-up to match based sequencing behavior. currentDAEndTime time.Time @@ -235,7 +236,7 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB // During catch-up, the sequencer must produce blocks identical to what base // sequencing would produce (forced inclusion txs only, no mempool). var mempoolBatch *coresequencer.Batch - if !c.catchingUp { + if !c.catchingUp.Load() { var err error mempoolBatch, err = c.queue.Next(ctx) if err != nil { @@ -353,7 +354,7 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB Uint64("consumed_count", forcedTxConsumedCount). Uint64("checkpoint_tx_index", c.checkpoint.TxIndex). Uint64("checkpoint_da_height", c.checkpoint.DAHeight). - Bool("catching_up", c.catchingUp). + Bool("catching_up", c.catchingUp.Load()). Msg("updated checkpoint after processing forced inclusion transactions") } @@ -365,12 +366,15 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB // During catch-up, use the DA epoch end timestamp to match based sequencing behavior. // Replicates based sequencing nodes' behavior of timestamping blocks during catchingUp. timestamp := time.Now() - if c.catchingUp && !c.currentDAEndTime.IsZero() { - var remainingForcedTxs uint64 - if len(c.cachedForcedInclusionTxs) > 0 { - remainingForcedTxs = uint64(len(c.cachedForcedInclusionTxs)) - c.checkpoint.TxIndex + if c.catchingUp.Load() { + daEndTime := c.currentDAEndTime + if !daEndTime.IsZero() { + var remainingForcedTxs uint64 + if len(c.cachedForcedInclusionTxs) > 0 { + remainingForcedTxs = uint64(len(c.cachedForcedInclusionTxs)) - c.checkpoint.TxIndex + } + timestamp = daEndTime.Add(-time.Duration(remainingForcedTxs) * time.Millisecond) } - timestamp = c.currentDAEndTime.Add(-time.Duration(remainingForcedTxs) * time.Millisecond) } return &coresequencer.GetNextBatchResponse{ @@ -426,7 +430,7 @@ func (c *Sequencer) GetDAHeight() uint64 { // with only forced inclusion transactions (no mempool), matching the blocks // that base sequencing nodes would have produced during sequencer downtime. func (c *Sequencer) IsCatchingUp() bool { - return c.catchingUp + return c.catchingUp.Load() } // fetchNextDAEpoch fetches transactions from the next DA epoch using checkpoint. @@ -447,7 +451,7 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint c.logger.Debug(). Uint64("da_height", currentDAHeight). Uint64("tx_index", c.checkpoint.TxIndex). - Bool("catching_up", c.catchingUp). + Bool("catching_up", c.catchingUp.Load()). Msg("fetching forced inclusion transactions from DA") forcedTxsEvent, err := c.fiRetriever.RetrieveForcedIncludedTxs(ctx, currentDAHeight) @@ -458,18 +462,18 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint Msg("DA height from future, waiting for DA to produce block") // We've reached the DA head — exit catch-up mode - if c.catchingUp { + if c.catchingUp.Load() { c.logger.Info(). Uint64("da_height", currentDAHeight). Msg("catch-up complete: reached DA head, resuming normal sequencing") - c.catchingUp = false + c.catchingUp.Store(false) } return 0, nil } else if errors.Is(err, block.ErrForceInclusionNotConfigured) { // Forced inclusion not configured, continue without forced txs c.cachedForcedInclusionTxs = [][]byte{} - c.catchingUp = false + c.catchingUp.Store(false) return 0, nil } @@ -502,7 +506,7 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint Int("skipped_tx_count", skippedTxs). Uint64("da_height_start", forcedTxsEvent.StartDaHeight). Uint64("da_height_end", forcedTxsEvent.EndDaHeight). - Bool("catching_up", c.catchingUp). + Bool("catching_up", c.catchingUp.Load()). Msg("fetched forced inclusion transactions from DA") // Cache the transactions @@ -528,7 +532,7 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint // overhead. func (c *Sequencer) updateCatchUpState(ctx context.Context) { // Already catching up — nothing to do. We'll exit via ErrHeightFromFuture. - if c.catchingUp { + if c.catchingUp.Load() { return } @@ -574,7 +578,7 @@ func (c *Sequencer) updateCatchUpState(ctx context.Context) { } // The DA layer is more than one epoch ahead. Enter catch-up mode. - c.catchingUp = true + c.catchingUp.Store(true) c.logger.Warn(). Uint64("checkpoint_da_height", currentDAHeight). Uint64("latest_da_height", latestDAHeight). diff --git a/pkg/sequencers/single/sequencer_test.go b/pkg/sequencers/single/sequencer_test.go index fc71f6bf08..f3ac6a01c0 100644 --- a/pkg/sequencers/single/sequencer_test.go +++ b/pkg/sequencers/single/sequencer_test.go @@ -365,7 +365,7 @@ func TestSequencer_GetNextBatch_BeforeDASubmission(t *testing.T) { func TestSequencer_GetNextBatch_ForcedInclusionAndBatch_MaxBytes(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) // Create in-memory datastore db := ds.NewMapDatastore() @@ -458,7 +458,7 @@ func TestSequencer_GetNextBatch_ForcedInclusionAndBatch_MaxBytes(t *testing.T) { func TestSequencer_GetNextBatch_ForcedInclusion_ExceedsMaxBytes(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close() @@ -541,7 +541,7 @@ func TestSequencer_GetNextBatch_ForcedInclusion_ExceedsMaxBytes(t *testing.T) { func TestSequencer_GetNextBatch_AlwaysCheckPendingForcedInclusion(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close() @@ -882,7 +882,7 @@ func TestSequencer_DAFailureAndQueueThrottling_Integration(t *testing.T) { func TestSequencer_CheckpointPersistence_CrashRecovery(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close() @@ -1242,7 +1242,7 @@ func TestSequencer_GetNextBatch_GasFilterError(t *testing.T) { // This test uses maxBytes to limit how many txs are fetched, triggering the unprocessed txs scenario. func TestSequencer_CatchUp_DetectsOldEpoch(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close() @@ -1312,7 +1312,7 @@ func TestSequencer_CatchUp_DetectsOldEpoch(t *testing.T) { func TestSequencer_CatchUp_SkipsMempoolDuringCatchUp(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close() @@ -1463,7 +1463,7 @@ func TestSequencer_CatchUp_UsesDATimestamp(t *testing.T) { func TestSequencer_CatchUp_ExitsCatchUpAtDAHead(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close() @@ -1676,7 +1676,7 @@ func TestSequencer_CatchUp_MultiEpochReplay(t *testing.T) { // Simulates a sequencer that missed 3 DA epochs and must replay them all // before resuming normal operation. ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close() @@ -1915,7 +1915,7 @@ func TestSequencer_CatchUp_MonotonicTimestamps(t *testing.T) { // resulting blocks. This uses the same jitter scheme as the based // sequencer: timestamp = DAEndTime - (remainingForcedTxs * 1ms). ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close()