diff --git a/CHANGELOG.md b/CHANGELOG.md index e09cf5fa00..1ff76dd000 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changes + +- Skip draining when exec client unavailable. [#3060](https://github.com/evstack/ev-node/pull/3060) + ## v1.0.0-rc.3 ### Added diff --git a/block/components_test.go b/block/components_test.go index f1fbac743b..93c08a655a 100644 --- a/block/components_test.go +++ b/block/components_test.go @@ -5,6 +5,7 @@ import ( crand "crypto/rand" "errors" "testing" + "testing/synctest" "time" "github.com/ipfs/go-datastore" @@ -189,101 +190,103 @@ func TestNewAggregatorComponents_Creation(t *testing.T) { func TestExecutor_RealExecutionClientFailure_StopsNode(t *testing.T) { // This test verifies that when the executor's execution client calls fail, // the error is properly propagated through the error channel and stops the node - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - memStore := store.New(ds) - - cfg := config.DefaultConfig() - cfg.Node.BlockTime.Duration = 50 * time.Millisecond // Fast for testing - - // Create test signer - priv, _, err := crypto.GenerateEd25519Key(crand.Reader) - require.NoError(t, err) - testSigner, err := noop.NewNoopSigner(priv) - require.NoError(t, err) - addr, err := testSigner.GetAddress() - require.NoError(t, err) - - gen := genesis.Genesis{ - ChainID: "test-chain", - InitialHeight: 1, - StartTime: time.Now().Add(-time.Second), // Start in past to trigger immediate execution - ProposerAddress: addr, - } - - // Create mock executor that will fail on ExecuteTxs - mockExec := testmocks.NewMockExecutor(t) - mockSeq := testmocks.NewMockSequencer(t) - daClient := testmocks.NewMockClient(t) - daClient.On("GetHeaderNamespace").Return(datypes.NamespaceFromString("ns").Bytes()).Maybe() - daClient.On("GetDataNamespace").Return(datypes.NamespaceFromString("data-ns").Bytes()).Maybe() - daClient.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() - daClient.On("HasForcedInclusionNamespace").Return(false).Maybe() - - // Mock InitChain to succeed initially - mockExec.On("InitChain", mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return([]byte("state-root"), nil).Once() - - // Mock SetDAHeight to be called during initialization - mockSeq.On("SetDAHeight", uint64(0)).Return().Once() - - // Mock GetNextBatch to return empty batch - mockSeq.On("GetNextBatch", mock.Anything, mock.Anything). - Return(&coresequencer.GetNextBatchResponse{ - Batch: &coresequencer.Batch{Transactions: nil}, - Timestamp: time.Now(), - }, nil).Maybe() - - // Mock GetTxs for reaper (return empty to avoid interfering with test) - mockExec.On("GetTxs", mock.Anything). - Return([][]byte{}, nil).Maybe() - - // Mock ExecuteTxs to fail with a critical error - criticalError := errors.New("execution client RPC connection failed") - mockExec.On("ExecuteTxs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return(nil, criticalError).Maybe() - - // Create aggregator node - components, err := NewAggregatorComponents( - cfg, - gen, - memStore, - mockExec, - mockSeq, - daClient, - testSigner, - nil, // header broadcaster - nil, // data broadcaster - zerolog.Nop(), - NopMetrics(), - DefaultBlockOptions(), - nil, - ) - require.NoError(t, err) - - // Start should return with error when execution client fails - // Timeout accounts for retry delays: 3 retries × 10s timeout = ~30s plus buffer - ctx, cancel := context.WithTimeout(context.Background(), 35*time.Second) - defer cancel() - - // Run Start in a goroutine to handle the blocking call - startErrCh := make(chan error, 1) - go func() { - startErrCh <- components.Start(ctx) - }() - - // Wait for either the error or timeout - select { - case err = <-startErrCh: - // We expect an error containing the critical execution client failure - require.Error(t, err) - assert.Contains(t, err.Error(), "critical execution client failure") - assert.Contains(t, err.Error(), "execution client RPC connection failed") - case <-ctx.Done(): - t.Fatal("timeout waiting for critical error to propagate") - } - - // Clean up - stopErr := components.Stop() - assert.NoError(t, stopErr) + synctest.Test(t, func(t *testing.T) { + ds := sync.MutexWrap(datastore.NewMapDatastore()) + memStore := store.New(ds) + + cfg := config.DefaultConfig() + cfg.Node.BlockTime.Duration = 50 * time.Millisecond // Fast for testing + + // Create test signer + priv, _, err := crypto.GenerateEd25519Key(crand.Reader) + require.NoError(t, err) + testSigner, err := noop.NewNoopSigner(priv) + require.NoError(t, err) + addr, err := testSigner.GetAddress() + require.NoError(t, err) + + gen := genesis.Genesis{ + ChainID: "test-chain", + InitialHeight: 1, + StartTime: time.Now().Add(-time.Second), // Start in past to trigger immediate execution + ProposerAddress: addr, + } + + // Create mock executor that will fail on ExecuteTxs + mockExec := testmocks.NewMockExecutor(t) + mockSeq := testmocks.NewMockSequencer(t) + daClient := testmocks.NewMockClient(t) + daClient.On("GetHeaderNamespace").Return(datypes.NamespaceFromString("ns").Bytes()).Maybe() + daClient.On("GetDataNamespace").Return(datypes.NamespaceFromString("data-ns").Bytes()).Maybe() + daClient.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() + daClient.On("HasForcedInclusionNamespace").Return(false).Maybe() + + // Mock InitChain to succeed initially + mockExec.On("InitChain", mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([]byte("state-root"), nil).Once() + + // Mock SetDAHeight to be called during initialization + mockSeq.On("SetDAHeight", uint64(0)).Return().Once() + + // Mock GetNextBatch to return empty batch + mockSeq.On("GetNextBatch", mock.Anything, mock.Anything). + Return(&coresequencer.GetNextBatchResponse{ + Batch: &coresequencer.Batch{Transactions: nil}, + Timestamp: time.Now(), + }, nil).Maybe() + + // Mock GetTxs for reaper (return empty to avoid interfering with test) + mockExec.On("GetTxs", mock.Anything). + Return([][]byte{}, nil).Maybe() + + // Mock ExecuteTxs to fail with a critical error + criticalError := errors.New("execution client RPC connection failed") + mockExec.On("ExecuteTxs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(nil, criticalError).Maybe() + + // Create aggregator node + components, err := NewAggregatorComponents( + cfg, + gen, + memStore, + mockExec, + mockSeq, + daClient, + testSigner, + nil, // header broadcaster + nil, // data broadcaster + zerolog.Nop(), + NopMetrics(), + DefaultBlockOptions(), + nil, + ) + require.NoError(t, err) + + // Start should return with error when execution client fails. + // With synctest the fake clock advances the retry delays instantly. + ctx, cancel := context.WithTimeout(t.Context(), 35*time.Second) + defer cancel() + + // Run Start in a goroutine to handle the blocking call + startErrCh := make(chan error, 1) + go func() { + startErrCh <- components.Start(ctx) + }() + + // Wait for either the error or timeout + synctest.Wait() + select { + case err = <-startErrCh: + // We expect an error containing the critical execution client failure + require.Error(t, err) + assert.Contains(t, err.Error(), "critical execution client failure") + assert.Contains(t, err.Error(), "execution client RPC connection failed") + case <-ctx.Done(): + t.Fatal("timeout waiting for critical error to propagate") + } + + // Clean up + stopErr := components.Stop() + assert.NoError(t, stopErr) + }) } diff --git a/block/internal/executing/executor_logic_test.go b/block/internal/executing/executor_logic_test.go index 0389bbf643..31a237315f 100644 --- a/block/internal/executing/executor_logic_test.go +++ b/block/internal/executing/executor_logic_test.go @@ -5,6 +5,7 @@ import ( crand "crypto/rand" "errors" "testing" + "testing/synctest" "time" "github.com/ipfs/go-datastore" @@ -281,49 +282,50 @@ func TestExecutor_executeTxsWithRetry(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { t.Parallel() + synctest.Test(t, func(t *testing.T) { + ctx := context.Background() + execCtx := ctx + + // For context cancellation test, create a cancellable context + if tt.name == "context cancelled during retry" { + var cancel context.CancelFunc + execCtx, cancel = context.WithCancel(ctx) + // Cancel context after first failure to simulate cancellation during retry + go func() { + time.Sleep(100 * time.Millisecond) + cancel() + }() + } + + mockExec := testmocks.NewMockExecutor(t) + tt.setupMock(mockExec) - ctx := context.Background() - execCtx := ctx - - // For context cancellation test, create a cancellable context - if tt.name == "context cancelled during retry" { - var cancel context.CancelFunc - execCtx, cancel = context.WithCancel(ctx) - // Cancel context after first failure to simulate cancellation during retry - go func() { - time.Sleep(100 * time.Millisecond) - cancel() - }() - } - - mockExec := testmocks.NewMockExecutor(t) - tt.setupMock(mockExec) - - e := &Executor{ - exec: mockExec, - ctx: execCtx, - logger: zerolog.Nop(), - } - - rawTxs := [][]byte{[]byte("tx1"), []byte("tx2")} - header := types.Header{ - BaseHeader: types.BaseHeader{Height: 100, Time: uint64(time.Now().UnixNano())}, - } - currentState := types.State{AppHash: []byte("current-hash")} - - result, err := e.executeTxsWithRetry(ctx, rawTxs, header, currentState) - - if tt.expectSuccess { - require.NoError(t, err) - assert.Equal(t, tt.expectHash, result) - } else { - require.Error(t, err) - if tt.expectError != "" { - assert.Contains(t, err.Error(), tt.expectError) + e := &Executor{ + exec: mockExec, + ctx: execCtx, + logger: zerolog.Nop(), + } + + rawTxs := [][]byte{[]byte("tx1"), []byte("tx2")} + header := types.Header{ + BaseHeader: types.BaseHeader{Height: 100, Time: uint64(time.Now().UnixNano())}, + } + currentState := types.State{AppHash: []byte("current-hash")} + + result, err := e.executeTxsWithRetry(ctx, rawTxs, header, currentState) + + if tt.expectSuccess { + require.NoError(t, err) + assert.Equal(t, tt.expectHash, result) + } else { + require.Error(t, err) + if tt.expectError != "" { + assert.Contains(t, err.Error(), tt.expectError) + } } - } - mockExec.AssertExpectations(t) + mockExec.AssertExpectations(t) + }) }) } } diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index 6420a16f71..193f543d8a 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -7,6 +7,7 @@ import ( "fmt" "sync/atomic" "testing" + "testing/synctest" "time" "github.com/ipfs/go-datastore" @@ -77,29 +78,30 @@ func TestSubmitter_setFinalWithRetry(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { t.Parallel() + synctest.Test(t, func(t *testing.T) { + ctx := context.Background() + exec := testmocks.NewMockExecutor(t) + tt.setupMock(exec) + + s := &Submitter{ + exec: exec, + ctx: ctx, + logger: zerolog.Nop(), + } - ctx := context.Background() - exec := testmocks.NewMockExecutor(t) - tt.setupMock(exec) - - s := &Submitter{ - exec: exec, - ctx: ctx, - logger: zerolog.Nop(), - } - - err := s.setFinalWithRetry(100) + err := s.setFinalWithRetry(100) - if tt.expectSuccess { - require.NoError(t, err) - } else { - require.Error(t, err) - if tt.expectError != "" { - assert.Contains(t, err.Error(), tt.expectError) + if tt.expectSuccess { + require.NoError(t, err) + } else { + require.Error(t, err) + if tt.expectError != "" { + assert.Contains(t, err.Error(), tt.expectError) + } } - } - exec.AssertExpectations(t) + exec.AssertExpectations(t) + }) }) } } diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 6df6b2c22e..939abb0389 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -115,9 +115,10 @@ type Syncer struct { gracePeriodConfig forcedInclusionGracePeriodConfig // Lifecycle - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + hasCriticalError atomic.Bool // P2P wait coordination p2pWaitState atomic.Value // stores p2pWaitState @@ -254,28 +255,32 @@ func (s *Syncer) Stop() error { s.cancelP2PWait(0) s.wg.Wait() - drainCtx, drainCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer drainCancel() + // Skip draining if we're shutting down due to a critical error (e.g. execution + // client unavailable). + if !s.hasCriticalError.Load() { + drainCtx, drainCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer drainCancel() - drained := 0 -drainLoop: - for { - select { - case event, ok := <-s.heightInCh: - if !ok { + drained := 0 + drainLoop: + for { + select { + case event, ok := <-s.heightInCh: + if !ok { + break drainLoop + } + s.processHeightEvent(drainCtx, &event) + drained++ + case <-drainCtx.Done(): + s.logger.Warn().Int("remaining", len(s.heightInCh)).Msg("timeout draining height events during shutdown") + break drainLoop + default: break drainLoop } - s.processHeightEvent(drainCtx, &event) - drained++ - case <-drainCtx.Done(): - s.logger.Warn().Int("remaining", len(s.heightInCh)).Msg("timeout draining height events during shutdown") - break drainLoop - default: - break drainLoop } - } - if drained > 0 { - s.logger.Info().Int("count", drained).Msg("drained pending height events during shutdown") + if drained > 0 { + s.logger.Info().Int("count", drained).Msg("drained pending height events during shutdown") + } } s.logger.Info().Msg("syncer stopped") @@ -680,7 +685,7 @@ func (s *Syncer) processHeightEvent(ctx context.Context, event *common.DAHeightE Msg("failed to sync next block") // If the error is not due to a validation error, re-store the event as pending switch { - case errors.Is(err, errInvalidBlock): + case errors.Is(err, errInvalidBlock) || s.hasCriticalError.Load(): // do not reschedule case errors.Is(err, errMaliciousProposer): s.sendCriticalError(fmt.Errorf("sequencer malicious. Restart the node with --node.aggregator --node.based_sequencer or keep the chain halted: %w", err)) @@ -1132,6 +1137,7 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState type // sendCriticalError sends a critical error to the error channel without blocking func (s *Syncer) sendCriticalError(err error) { + s.hasCriticalError.Store(true) if s.errorCh != nil { select { case s.errorCh <- err: diff --git a/block/internal/syncing/syncer_backoff_test.go b/block/internal/syncing/syncer_backoff_test.go index 7bc6b4f5c6..d6c6689f29 100644 --- a/block/internal/syncing/syncer_backoff_test.go +++ b/block/internal/syncing/syncer_backoff_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "testing" + "testing/synctest" "time" "github.com/ipfs/go-datastore" @@ -61,89 +62,91 @@ func TestSyncer_BackoffOnDAError(t *testing.T) { for name, tc := range tests { t.Run(name, func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - // Setup syncer - syncer := setupTestSyncer(t, tc.daBlockTime) - syncer.ctx = ctx - - // Setup mocks - daRetriever := NewMockDARetriever(t) - p2pHandler := newMockp2pHandler(t) - p2pHandler.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() - syncer.daRetriever = daRetriever - syncer.p2pHandler = p2pHandler - p2pHandler.On("SetProcessedHeight", mock.Anything).Return().Maybe() - - // Mock PopPriorityHeight to always return 0 (no priority heights) - daRetriever.On("PopPriorityHeight").Return(uint64(0)).Maybe() - - // Create mock stores for P2P - mockHeaderStore := extmocks.NewMockStore[*types.SignedHeader](t) - mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() - - mockDataStore := extmocks.NewMockStore[*types.Data](t) - mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() - - var callTimes []time.Time - callCount := 0 - - // First call - returns test error - daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). - Run(func(args mock.Arguments) { - callTimes = append(callTimes, time.Now()) - callCount++ - }). - Return(nil, tc.error).Once() - - if tc.expectsBackoff { - // Second call should be delayed due to backoff + synctest.Test(t, func(t *testing.T) { + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + + // Setup syncer + syncer := setupTestSyncer(t, tc.daBlockTime) + syncer.ctx = ctx + + // Setup mocks + daRetriever := NewMockDARetriever(t) + p2pHandler := newMockp2pHandler(t) + p2pHandler.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + syncer.daRetriever = daRetriever + syncer.p2pHandler = p2pHandler + p2pHandler.On("SetProcessedHeight", mock.Anything).Return().Maybe() + + // Mock PopPriorityHeight to always return 0 (no priority heights) + daRetriever.On("PopPriorityHeight").Return(uint64(0)).Maybe() + + // Create mock stores for P2P + mockHeaderStore := extmocks.NewMockStore[*types.SignedHeader](t) + mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + + mockDataStore := extmocks.NewMockStore[*types.Data](t) + mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() + + var callTimes []time.Time + callCount := 0 + + // First call - returns test error daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). Run(func(args mock.Arguments) { callTimes = append(callTimes, time.Now()) callCount++ - // Cancel to end test - cancel() }). - Return(nil, datypes.ErrBlobNotFound).Once() - } else { - // For ErrBlobNotFound, DA height should increment - daRetriever.On("RetrieveFromDA", mock.Anything, uint64(101)). - Run(func(args mock.Arguments) { - callTimes = append(callTimes, time.Now()) - callCount++ - cancel() - }). - Return(nil, datypes.ErrBlobNotFound).Once() - } - - // Run sync loop - syncer.startSyncWorkers() - <-ctx.Done() - syncer.wg.Wait() - - // Verify behavior - if tc.expectsBackoff { - require.Len(t, callTimes, 2, "should make exactly 2 calls with backoff") - - timeBetweenCalls := callTimes[1].Sub(callTimes[0]) - expectedDelay := tc.daBlockTime - if expectedDelay == 0 { - expectedDelay = 2 * time.Second + Return(nil, tc.error).Once() + + if tc.expectsBackoff { + // Second call should be delayed due to backoff + daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). + Run(func(args mock.Arguments) { + callTimes = append(callTimes, time.Now()) + callCount++ + // Cancel to end test + cancel() + }). + Return(nil, datypes.ErrBlobNotFound).Once() + } else { + // For ErrBlobNotFound, DA height should increment + daRetriever.On("RetrieveFromDA", mock.Anything, uint64(101)). + Run(func(args mock.Arguments) { + callTimes = append(callTimes, time.Now()) + callCount++ + cancel() + }). + Return(nil, datypes.ErrBlobNotFound).Once() } - assert.GreaterOrEqual(t, timeBetweenCalls, expectedDelay-50*time.Millisecond, - "second call should be delayed by backoff duration (expected ~%v, got %v)", - expectedDelay, timeBetweenCalls) - } else { - assert.GreaterOrEqual(t, callCount, 2, "should continue without significant delay") - if len(callTimes) >= 2 { + // Run sync loop + syncer.startSyncWorkers() + <-ctx.Done() + syncer.wg.Wait() + + // Verify behavior + if tc.expectsBackoff { + require.Len(t, callTimes, 2, "should make exactly 2 calls with backoff") + timeBetweenCalls := callTimes[1].Sub(callTimes[0]) - assert.Less(t, timeBetweenCalls, 120*time.Millisecond, - "should not have backoff delay for ErrBlobNotFound") + expectedDelay := tc.daBlockTime + if expectedDelay == 0 { + expectedDelay = 2 * time.Second + } + + assert.GreaterOrEqual(t, timeBetweenCalls, expectedDelay, + "second call should be delayed by backoff duration (expected ~%v, got %v)", + expectedDelay, timeBetweenCalls) + } else { + assert.GreaterOrEqual(t, callCount, 2, "should continue without significant delay") + if len(callTimes) >= 2 { + timeBetweenCalls := callTimes[1].Sub(callTimes[0]) + assert.Less(t, timeBetweenCalls, 120*time.Millisecond, + "should not have backoff delay for ErrBlobNotFound") + } } - } + }) }) } } @@ -151,158 +154,162 @@ func TestSyncer_BackoffOnDAError(t *testing.T) { // TestSyncer_BackoffResetOnSuccess verifies that backoff is properly reset // after a successful DA retrieval, allowing the syncer to continue at normal speed. func TestSyncer_BackoffResetOnSuccess(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - syncer := setupTestSyncer(t, 1*time.Second) - syncer.ctx = ctx - - addr, pub, signer := buildSyncTestSigner(t) - gen := syncer.genesis - - daRetriever := NewMockDARetriever(t) - p2pHandler := newMockp2pHandler(t) - p2pHandler.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() - syncer.daRetriever = daRetriever - syncer.p2pHandler = p2pHandler - p2pHandler.On("SetProcessedHeight", mock.Anything).Return().Maybe() - - // Mock PopPriorityHeight to always return 0 (no priority heights) - daRetriever.On("PopPriorityHeight").Return(uint64(0)).Maybe() - - // Create mock stores for P2P - mockHeaderStore := extmocks.NewMockStore[*types.SignedHeader](t) - mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() - - mockDataStore := extmocks.NewMockStore[*types.Data](t) - mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() - - var callTimes []time.Time - - // First call - error (should trigger backoff) - daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). - Run(func(args mock.Arguments) { - callTimes = append(callTimes, time.Now()) - }). - Return(nil, errors.New("temporary failure")).Once() - - // Second call - success (should reset backoff and increment DA height) - _, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil, nil) - data := &types.Data{ - Metadata: &types.Metadata{ - ChainID: gen.ChainID, - Height: 1, - Time: uint64(time.Now().UnixNano()), - }, - } - event := common.DAHeightEvent{ - Header: header, - Data: data, - DaHeight: 100, - } - - daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). - Run(func(args mock.Arguments) { - callTimes = append(callTimes, time.Now()) - }). - Return([]common.DAHeightEvent{event}, nil).Once() - - // Third call - should happen immediately after success (DA height incremented to 101) - daRetriever.On("RetrieveFromDA", mock.Anything, uint64(101)). - Run(func(args mock.Arguments) { - callTimes = append(callTimes, time.Now()) - cancel() - }). - Return(nil, datypes.ErrBlobNotFound).Once() - - // Start process loop to handle events - go syncer.processLoop() - - // Run workers - syncer.startSyncWorkers() - <-ctx.Done() - syncer.wg.Wait() - - require.Len(t, callTimes, 3, "should make exactly 3 calls") - - // Verify backoff between first and second call - delay1to2 := callTimes[1].Sub(callTimes[0]) - assert.GreaterOrEqual(t, delay1to2, 950*time.Millisecond, - "should have backed off between error and success (got %v)", delay1to2) - - // Verify no backoff between second and third call (backoff reset) - delay2to3 := callTimes[2].Sub(callTimes[1]) - assert.Less(t, delay2to3, 100*time.Millisecond, - "should continue immediately after success (got %v)", delay2to3) + synctest.Test(t, func(t *testing.T) { + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + + syncer := setupTestSyncer(t, 1*time.Second) + syncer.ctx = ctx + + addr, pub, signer := buildSyncTestSigner(t) + gen := syncer.genesis + + daRetriever := NewMockDARetriever(t) + p2pHandler := newMockp2pHandler(t) + p2pHandler.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + syncer.daRetriever = daRetriever + syncer.p2pHandler = p2pHandler + p2pHandler.On("SetProcessedHeight", mock.Anything).Return().Maybe() + + // Mock PopPriorityHeight to always return 0 (no priority heights) + daRetriever.On("PopPriorityHeight").Return(uint64(0)).Maybe() + + // Create mock stores for P2P + mockHeaderStore := extmocks.NewMockStore[*types.SignedHeader](t) + mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + + mockDataStore := extmocks.NewMockStore[*types.Data](t) + mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() + + var callTimes []time.Time + + // First call - error (should trigger backoff) + daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). + Run(func(args mock.Arguments) { + callTimes = append(callTimes, time.Now()) + }). + Return(nil, errors.New("temporary failure")).Once() + + // Second call - success (should reset backoff and increment DA height) + _, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil, nil) + data := &types.Data{ + Metadata: &types.Metadata{ + ChainID: gen.ChainID, + Height: 1, + Time: uint64(time.Now().UnixNano()), + }, + } + event := common.DAHeightEvent{ + Header: header, + Data: data, + DaHeight: 100, + } + + daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). + Run(func(args mock.Arguments) { + callTimes = append(callTimes, time.Now()) + }). + Return([]common.DAHeightEvent{event}, nil).Once() + + // Third call - should happen immediately after success (DA height incremented to 101) + daRetriever.On("RetrieveFromDA", mock.Anything, uint64(101)). + Run(func(args mock.Arguments) { + callTimes = append(callTimes, time.Now()) + cancel() + }). + Return(nil, datypes.ErrBlobNotFound).Once() + + // Start process loop to handle events + go syncer.processLoop() + + // Run workers + syncer.startSyncWorkers() + <-ctx.Done() + syncer.wg.Wait() + + require.Len(t, callTimes, 3, "should make exactly 3 calls") + + // Verify backoff between first and second call + delay1to2 := callTimes[1].Sub(callTimes[0]) + assert.GreaterOrEqual(t, delay1to2, 1*time.Second, + "should have backed off between error and success (got %v)", delay1to2) + + // Verify no backoff between second and third call (backoff reset) + delay2to3 := callTimes[2].Sub(callTimes[1]) + assert.Less(t, delay2to3, 100*time.Millisecond, + "should continue immediately after success (got %v)", delay2to3) + }) } // TestSyncer_BackoffBehaviorIntegration tests the complete backoff flow: // error -> backoff delay -> recovery -> normal operation. func TestSyncer_BackoffBehaviorIntegration(t *testing.T) { // Test simpler backoff behavior: error -> backoff -> success -> continue - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - syncer := setupTestSyncer(t, 500*time.Millisecond) - syncer.ctx = ctx - - daRetriever := NewMockDARetriever(t) - p2pHandler := newMockp2pHandler(t) - p2pHandler.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() - syncer.daRetriever = daRetriever - syncer.p2pHandler = p2pHandler - - // Mock PopPriorityHeight to always return 0 (no priority heights) - daRetriever.On("PopPriorityHeight").Return(uint64(0)).Maybe() - - // Create mock stores for P2P - mockHeaderStore := extmocks.NewMockStore[*types.SignedHeader](t) - mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() - - mockDataStore := extmocks.NewMockStore[*types.Data](t) - mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() - - var callTimes []time.Time - p2pHandler.On("SetProcessedHeight", mock.Anything).Return().Maybe() - - // First call - error (triggers backoff) - daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). - Run(func(args mock.Arguments) { - callTimes = append(callTimes, time.Now()) - }). - Return(nil, errors.New("network error")).Once() - - // Second call - should be delayed due to backoff - daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). - Run(func(args mock.Arguments) { - callTimes = append(callTimes, time.Now()) - }). - Return(nil, datypes.ErrBlobNotFound).Once() - - // Third call - should continue without delay (DA height incremented) - daRetriever.On("RetrieveFromDA", mock.Anything, uint64(101)). - Run(func(args mock.Arguments) { - callTimes = append(callTimes, time.Now()) - cancel() - }). - Return(nil, datypes.ErrBlobNotFound).Once() - - go syncer.processLoop() - syncer.startSyncWorkers() - <-ctx.Done() - syncer.wg.Wait() - - require.Len(t, callTimes, 3, "should make exactly 3 calls") - - // First to second call should be delayed (backoff) - delay1to2 := callTimes[1].Sub(callTimes[0]) - assert.GreaterOrEqual(t, delay1to2, 450*time.Millisecond, - "should have backoff delay between first and second call (got %v)", delay1to2) - - // Second to third call should be immediate (no backoff after ErrBlobNotFound) - delay2to3 := callTimes[2].Sub(callTimes[1]) - assert.Less(t, delay2to3, 100*time.Millisecond, - "should continue immediately after ErrBlobNotFound (got %v)", delay2to3) + synctest.Test(t, func(t *testing.T) { + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + + syncer := setupTestSyncer(t, 500*time.Millisecond) + syncer.ctx = ctx + + daRetriever := NewMockDARetriever(t) + p2pHandler := newMockp2pHandler(t) + p2pHandler.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + syncer.daRetriever = daRetriever + syncer.p2pHandler = p2pHandler + + // Mock PopPriorityHeight to always return 0 (no priority heights) + daRetriever.On("PopPriorityHeight").Return(uint64(0)).Maybe() + + // Create mock stores for P2P + mockHeaderStore := extmocks.NewMockStore[*types.SignedHeader](t) + mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + + mockDataStore := extmocks.NewMockStore[*types.Data](t) + mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() + + var callTimes []time.Time + p2pHandler.On("SetProcessedHeight", mock.Anything).Return().Maybe() + + // First call - error (triggers backoff) + daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). + Run(func(args mock.Arguments) { + callTimes = append(callTimes, time.Now()) + }). + Return(nil, errors.New("network error")).Once() + + // Second call - should be delayed due to backoff + daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). + Run(func(args mock.Arguments) { + callTimes = append(callTimes, time.Now()) + }). + Return(nil, datypes.ErrBlobNotFound).Once() + + // Third call - should continue without delay (DA height incremented) + daRetriever.On("RetrieveFromDA", mock.Anything, uint64(101)). + Run(func(args mock.Arguments) { + callTimes = append(callTimes, time.Now()) + cancel() + }). + Return(nil, datypes.ErrBlobNotFound).Once() + + go syncer.processLoop() + syncer.startSyncWorkers() + <-ctx.Done() + syncer.wg.Wait() + + require.Len(t, callTimes, 3, "should make exactly 3 calls") + + // First to second call should be delayed (backoff) + delay1to2 := callTimes[1].Sub(callTimes[0]) + assert.GreaterOrEqual(t, delay1to2, 500*time.Millisecond, + "should have backoff delay between first and second call (got %v)", delay1to2) + + // Second to third call should be immediate (no backoff after ErrBlobNotFound) + delay2to3 := callTimes[2].Sub(callTimes[1]) + assert.Less(t, delay2to3, 100*time.Millisecond, + "should continue immediately after ErrBlobNotFound (got %v)", delay2to3) + }) } func setupTestSyncer(t *testing.T, daBlockTime time.Duration) *Syncer { diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 85aa9a5978..c92f34e3c4 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -7,6 +7,7 @@ import ( "errors" "sync/atomic" "testing" + "testing/synctest" "time" "github.com/evstack/ev-node/core/execution" @@ -539,36 +540,37 @@ func TestSyncer_executeTxsWithRetry(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { t.Parallel() + synctest.Test(t, func(t *testing.T) { + ctx := t.Context() + exec := testmocks.NewMockExecutor(t) + tt.setupMock(exec) + + s := &Syncer{ + exec: exec, + ctx: ctx, + logger: zerolog.Nop(), + } - ctx := t.Context() - exec := testmocks.NewMockExecutor(t) - tt.setupMock(exec) - - s := &Syncer{ - exec: exec, - ctx: ctx, - logger: zerolog.Nop(), - } - - rawTxs := [][]byte{[]byte("tx1"), []byte("tx2")} - header := types.Header{ - BaseHeader: types.BaseHeader{Height: 100, Time: uint64(time.Now().UnixNano())}, - } - currentState := types.State{AppHash: []byte("current-hash")} - - result, err := s.executeTxsWithRetry(ctx, rawTxs, header, currentState) - - if tt.expectSuccess { - require.NoError(t, err) - assert.Equal(t, tt.expectHash, result) - } else { - require.Error(t, err) - if tt.expectError != "" { - assert.Contains(t, err.Error(), tt.expectError) + rawTxs := [][]byte{[]byte("tx1"), []byte("tx2")} + header := types.Header{ + BaseHeader: types.BaseHeader{Height: 100, Time: uint64(time.Now().UnixNano())}, + } + currentState := types.State{AppHash: []byte("current-hash")} + + result, err := s.executeTxsWithRetry(ctx, rawTxs, header, currentState) + + if tt.expectSuccess { + require.NoError(t, err) + assert.Equal(t, tt.expectHash, result) + } else { + require.Error(t, err) + if tt.expectError != "" { + assert.Contains(t, err.Error(), tt.expectError) + } } - } - exec.AssertExpectations(t) + exec.AssertExpectations(t) + }) }) } } @@ -796,3 +798,205 @@ func TestProcessHeightEvent_SkipsDAHintWhenAlreadyFetched(t *testing.T) { priorityHeight = s.daRetriever.PopPriorityHeight() assert.Equal(t, uint64(200), priorityHeight, "should queue DA hint that is above current daRetrieverHeight") } + +// TestProcessHeightEvent_ExecutionFailure_DoesNotReschedule verifies that when +// ExecuteTxs fails after all retries (execution client unavailable), the event +// is NOT re-queued as pending. +func TestProcessHeightEvent_ExecutionFailure_DoesNotReschedule(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(t, err) + + addr, pub, signer := buildSyncTestSigner(t) + + cfg := config.DefaultConfig() + gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} + + mockExec := testmocks.NewMockExecutor(t) + mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain").Return([]byte("app0"), nil).Once() + + // ExecuteTxs fails on all attempts — simulates unavailable execution client + mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, mock.Anything). + Return([]byte(nil), errors.New("connection refused")).Times(common.MaxRetriesBeforeHalt) + + errChan := make(chan error, 1) + s := NewSyncer( + st, + mockExec, + nil, + cm, + common.NopMetrics(), + cfg, + gen, + extmocks.NewMockStore[*types.P2PSignedHeader](t), + extmocks.NewMockStore[*types.P2PData](t), + zerolog.Nop(), + common.DefaultBlockOptions(), + errChan, + nil, + ) + + require.NoError(t, s.initializeState()) + s.ctx = t.Context() + + lastState := s.getLastState() + data := makeData(gen.ChainID, 1, 0) + _, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, lastState.AppHash, data, nil) + + evt := common.DAHeightEvent{Header: hdr, Data: data, DaHeight: 1} + s.processHeightEvent(t.Context(), &evt) + + // A critical error must have been sent + select { + case critErr := <-errChan: + assert.ErrorContains(t, critErr, "failed to execute transactions") + default: + t.Fatal("expected a critical error on errorCh, got none") + } + + // The hasCriticalError flag must be set + assert.True(t, s.hasCriticalError.Load(), "hasCriticalError should be true after execution failure") + + // The event must NOT have been re-queued as pending + pending := cm.GetNextPendingEvent(1) + assert.Nil(t, pending, "event should not be re-queued as pending after execution client failure") + }) +} + +// TestSyncer_Stop_SkipsDrainOnCriticalError verifies that Syncer.Stop skips the +// drain loop when hasCriticalError is set. +func TestSyncer_Stop_SkipsDrainOnCriticalError(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(t, err) + + addr, pub, signer := buildSyncTestSigner(t) + + cfg := config.DefaultConfig() + gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} + + // The executor should NOT be called during Stop's drain. + mockExec := testmocks.NewMockExecutor(t) + mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain").Return([]byte("app0"), nil).Once() + + errChan := make(chan error, 1) + s := NewSyncer( + st, + mockExec, + nil, + cm, + common.NopMetrics(), + cfg, + gen, + extmocks.NewMockStore[*types.P2PSignedHeader](t), + extmocks.NewMockStore[*types.P2PData](t), + zerolog.Nop(), + common.DefaultBlockOptions(), + errChan, + nil, + ) + + require.NoError(t, s.initializeState()) + + ctx, cancel := context.WithCancel(t.Context()) + s.ctx = ctx + s.cancel = cancel + + // Enqueue events into heightInCh that would trigger ExecuteTxs if drained + lastState := s.getLastState() + data := makeData(gen.ChainID, 1, 0) + _, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, lastState.AppHash, data, nil) + evt := common.DAHeightEvent{Header: hdr, Data: data, DaHeight: 1} + s.heightInCh <- evt + + // Simulate that a critical error was already reported (execution client died) + s.hasCriticalError.Store(true) + + // Start a no-op goroutine tracked by the WaitGroup so Stop() doesn't block on wg.Wait() + s.wg.Add(1) + go func() { defer s.wg.Done() }() + + // Stop must complete quickly — no drain, no ExecuteTxs calls + done := make(chan struct{}) + go func() { + _ = s.Stop() + close(done) + }() + + select { + case <-done: + // Stop returned promptly — drain was correctly skipped + case <-time.After(2 * time.Second): + t.Fatal("Syncer.Stop() hung — drain was not skipped despite critical error") + } + + // ExecuteTxs should never have been called (only InitChain was expected) + mockExec.AssertExpectations(t) +} + +// TestSyncer_Stop_DrainWorksWithoutCriticalError is a sanity check confirming +// that the drain still runs (including ExecuteTxs) when there is no critical error. +func TestSyncer_Stop_DrainWorksWithoutCriticalError(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(t, err) + + addr, pub, signer := buildSyncTestSigner(t) + + cfg := config.DefaultConfig() + gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} + + mockExec := testmocks.NewMockExecutor(t) + mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain").Return([]byte("app0"), nil).Once() + // The drain must call ExecuteTxs for a height-1 event that has not been processed yet. + mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, mock.Anything). + Return([]byte("app1"), nil).Once() + + errChan := make(chan error, 1) + s := NewSyncer( + st, + mockExec, + nil, + cm, + common.NopMetrics(), + cfg, + gen, + extmocks.NewMockStore[*types.P2PSignedHeader](t), + extmocks.NewMockStore[*types.P2PData](t), + zerolog.Nop(), + common.DefaultBlockOptions(), + errChan, + nil, + ) + + require.NoError(t, s.initializeState()) + + ctx, cancel := context.WithCancel(t.Context()) + s.ctx = ctx + s.cancel = cancel + + // Build a valid height-1 event that will actually reach ExecuteTxs during drain + lastState := s.getLastState() + data := makeData(gen.ChainID, 1, 0) + _, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, lastState.AppHash, data, nil) + evt := common.DAHeightEvent{Header: hdr, Data: data, DaHeight: 1} + s.heightInCh <- evt + + // hasCriticalError is false (default) — drain should process events including ExecuteTxs + s.wg.Add(1) + go func() { defer s.wg.Done() }() + + _ = s.Stop() + + // Verify ExecuteTxs was actually called during drain + mockExec.AssertExpectations(t) + }) +} diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index 777bc0aabc..aa6a01b7ae 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -216,20 +216,22 @@ func StartNode( logger.Info().Msg("shutting down node...") cancel() case err := <-errCh: - logger.Error().Err(err).Msg("node error") + if err != nil && !errors.Is(err, context.Canceled) { + logger.Error().Err(err).Msg("node error") + } cancel() return err } - // Wait for node to finish shutting down + // Wait for node to finish shutting down after signal select { - case <-time.After(5 * time.Second): - logger.Info().Msg("Node shutdown timed out") case err := <-errCh: if err != nil && !errors.Is(err, context.Canceled) { logger.Error().Err(err).Msg("Error during shutdown") return err } + case <-time.After(10 * time.Second): + return fmt.Errorf("shutdown timeout exceeded") } return nil