Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changes

- Skip draining when exec client unavailable. [#3060](https://github.com/evstack/ev-node/pull/3060)

## v1.0.0-rc.3

### Added
Expand Down
197 changes: 100 additions & 97 deletions block/components_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
crand "crypto/rand"
"errors"
"testing"
"testing/synctest"
"time"

"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -189,101 +190,103 @@ func TestNewAggregatorComponents_Creation(t *testing.T) {
func TestExecutor_RealExecutionClientFailure_StopsNode(t *testing.T) {
// This test verifies that when the executor's execution client calls fail,
// the error is properly propagated through the error channel and stops the node

ds := sync.MutexWrap(datastore.NewMapDatastore())
memStore := store.New(ds)

cfg := config.DefaultConfig()
cfg.Node.BlockTime.Duration = 50 * time.Millisecond // Fast for testing

// Create test signer
priv, _, err := crypto.GenerateEd25519Key(crand.Reader)
require.NoError(t, err)
testSigner, err := noop.NewNoopSigner(priv)
require.NoError(t, err)
addr, err := testSigner.GetAddress()
require.NoError(t, err)

gen := genesis.Genesis{
ChainID: "test-chain",
InitialHeight: 1,
StartTime: time.Now().Add(-time.Second), // Start in past to trigger immediate execution
ProposerAddress: addr,
}

// Create mock executor that will fail on ExecuteTxs
mockExec := testmocks.NewMockExecutor(t)
mockSeq := testmocks.NewMockSequencer(t)
daClient := testmocks.NewMockClient(t)
daClient.On("GetHeaderNamespace").Return(datypes.NamespaceFromString("ns").Bytes()).Maybe()
daClient.On("GetDataNamespace").Return(datypes.NamespaceFromString("data-ns").Bytes()).Maybe()
daClient.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe()
daClient.On("HasForcedInclusionNamespace").Return(false).Maybe()

// Mock InitChain to succeed initially
mockExec.On("InitChain", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return([]byte("state-root"), nil).Once()

// Mock SetDAHeight to be called during initialization
mockSeq.On("SetDAHeight", uint64(0)).Return().Once()

// Mock GetNextBatch to return empty batch
mockSeq.On("GetNextBatch", mock.Anything, mock.Anything).
Return(&coresequencer.GetNextBatchResponse{
Batch: &coresequencer.Batch{Transactions: nil},
Timestamp: time.Now(),
}, nil).Maybe()

// Mock GetTxs for reaper (return empty to avoid interfering with test)
mockExec.On("GetTxs", mock.Anything).
Return([][]byte{}, nil).Maybe()

// Mock ExecuteTxs to fail with a critical error
criticalError := errors.New("execution client RPC connection failed")
mockExec.On("ExecuteTxs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil, criticalError).Maybe()

// Create aggregator node
components, err := NewAggregatorComponents(
cfg,
gen,
memStore,
mockExec,
mockSeq,
daClient,
testSigner,
nil, // header broadcaster
nil, // data broadcaster
zerolog.Nop(),
NopMetrics(),
DefaultBlockOptions(),
nil,
)
require.NoError(t, err)

// Start should return with error when execution client fails
// Timeout accounts for retry delays: 3 retries × 10s timeout = ~30s plus buffer
ctx, cancel := context.WithTimeout(context.Background(), 35*time.Second)
defer cancel()

// Run Start in a goroutine to handle the blocking call
startErrCh := make(chan error, 1)
go func() {
startErrCh <- components.Start(ctx)
}()

// Wait for either the error or timeout
select {
case err = <-startErrCh:
// We expect an error containing the critical execution client failure
require.Error(t, err)
assert.Contains(t, err.Error(), "critical execution client failure")
assert.Contains(t, err.Error(), "execution client RPC connection failed")
case <-ctx.Done():
t.Fatal("timeout waiting for critical error to propagate")
}

// Clean up
stopErr := components.Stop()
assert.NoError(t, stopErr)
synctest.Test(t, func(t *testing.T) {
ds := sync.MutexWrap(datastore.NewMapDatastore())
memStore := store.New(ds)

cfg := config.DefaultConfig()
cfg.Node.BlockTime.Duration = 50 * time.Millisecond // Fast for testing

// Create test signer
priv, _, err := crypto.GenerateEd25519Key(crand.Reader)
require.NoError(t, err)
testSigner, err := noop.NewNoopSigner(priv)
require.NoError(t, err)
addr, err := testSigner.GetAddress()
require.NoError(t, err)

gen := genesis.Genesis{
ChainID: "test-chain",
InitialHeight: 1,
StartTime: time.Now().Add(-time.Second), // Start in past to trigger immediate execution
ProposerAddress: addr,
}

// Create mock executor that will fail on ExecuteTxs
mockExec := testmocks.NewMockExecutor(t)
mockSeq := testmocks.NewMockSequencer(t)
daClient := testmocks.NewMockClient(t)
daClient.On("GetHeaderNamespace").Return(datypes.NamespaceFromString("ns").Bytes()).Maybe()
daClient.On("GetDataNamespace").Return(datypes.NamespaceFromString("data-ns").Bytes()).Maybe()
daClient.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe()
daClient.On("HasForcedInclusionNamespace").Return(false).Maybe()

// Mock InitChain to succeed initially
mockExec.On("InitChain", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return([]byte("state-root"), nil).Once()

// Mock SetDAHeight to be called during initialization
mockSeq.On("SetDAHeight", uint64(0)).Return().Once()

// Mock GetNextBatch to return empty batch
mockSeq.On("GetNextBatch", mock.Anything, mock.Anything).
Return(&coresequencer.GetNextBatchResponse{
Batch: &coresequencer.Batch{Transactions: nil},
Timestamp: time.Now(),
}, nil).Maybe()

// Mock GetTxs for reaper (return empty to avoid interfering with test)
mockExec.On("GetTxs", mock.Anything).
Return([][]byte{}, nil).Maybe()

// Mock ExecuteTxs to fail with a critical error
criticalError := errors.New("execution client RPC connection failed")
mockExec.On("ExecuteTxs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil, criticalError).Maybe()

// Create aggregator node
components, err := NewAggregatorComponents(
cfg,
gen,
memStore,
mockExec,
mockSeq,
daClient,
testSigner,
nil, // header broadcaster
nil, // data broadcaster
zerolog.Nop(),
NopMetrics(),
DefaultBlockOptions(),
nil,
)
require.NoError(t, err)

// Start should return with error when execution client fails.
// With synctest the fake clock advances the retry delays instantly.
ctx, cancel := context.WithTimeout(t.Context(), 35*time.Second)
defer cancel()

// Run Start in a goroutine to handle the blocking call
startErrCh := make(chan error, 1)
go func() {
startErrCh <- components.Start(ctx)
}()

// Wait for either the error or timeout
synctest.Wait()
select {
case err = <-startErrCh:
// We expect an error containing the critical execution client failure
require.Error(t, err)
assert.Contains(t, err.Error(), "critical execution client failure")
assert.Contains(t, err.Error(), "execution client RPC connection failed")
case <-ctx.Done():
t.Fatal("timeout waiting for critical error to propagate")
}

// Clean up
stopErr := components.Stop()
assert.NoError(t, stopErr)
})
}
82 changes: 42 additions & 40 deletions block/internal/executing/executor_logic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
crand "crypto/rand"
"errors"
"testing"
"testing/synctest"
"time"

"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -281,49 +282,50 @@ func TestExecutor_executeTxsWithRetry(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
ctx := context.Background()
execCtx := ctx

// For context cancellation test, create a cancellable context
if tt.name == "context cancelled during retry" {
var cancel context.CancelFunc
execCtx, cancel = context.WithCancel(ctx)
// Cancel context after first failure to simulate cancellation during retry
go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()
}

mockExec := testmocks.NewMockExecutor(t)
tt.setupMock(mockExec)

ctx := context.Background()
execCtx := ctx

// For context cancellation test, create a cancellable context
if tt.name == "context cancelled during retry" {
var cancel context.CancelFunc
execCtx, cancel = context.WithCancel(ctx)
// Cancel context after first failure to simulate cancellation during retry
go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()
}

mockExec := testmocks.NewMockExecutor(t)
tt.setupMock(mockExec)

e := &Executor{
exec: mockExec,
ctx: execCtx,
logger: zerolog.Nop(),
}

rawTxs := [][]byte{[]byte("tx1"), []byte("tx2")}
header := types.Header{
BaseHeader: types.BaseHeader{Height: 100, Time: uint64(time.Now().UnixNano())},
}
currentState := types.State{AppHash: []byte("current-hash")}

result, err := e.executeTxsWithRetry(ctx, rawTxs, header, currentState)

if tt.expectSuccess {
require.NoError(t, err)
assert.Equal(t, tt.expectHash, result)
} else {
require.Error(t, err)
if tt.expectError != "" {
assert.Contains(t, err.Error(), tt.expectError)
e := &Executor{
exec: mockExec,
ctx: execCtx,
logger: zerolog.Nop(),
}

rawTxs := [][]byte{[]byte("tx1"), []byte("tx2")}
header := types.Header{
BaseHeader: types.BaseHeader{Height: 100, Time: uint64(time.Now().UnixNano())},
}
currentState := types.State{AppHash: []byte("current-hash")}

result, err := e.executeTxsWithRetry(ctx, rawTxs, header, currentState)

if tt.expectSuccess {
require.NoError(t, err)
assert.Equal(t, tt.expectHash, result)
} else {
require.Error(t, err)
if tt.expectError != "" {
assert.Contains(t, err.Error(), tt.expectError)
}
}
}

mockExec.AssertExpectations(t)
mockExec.AssertExpectations(t)
})
})
}
}
40 changes: 21 additions & 19 deletions block/internal/submitting/submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"sync/atomic"
"testing"
"testing/synctest"
"time"

"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -77,29 +78,30 @@ func TestSubmitter_setFinalWithRetry(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
ctx := context.Background()
exec := testmocks.NewMockExecutor(t)
tt.setupMock(exec)

s := &Submitter{
exec: exec,
ctx: ctx,
logger: zerolog.Nop(),
}

ctx := context.Background()
exec := testmocks.NewMockExecutor(t)
tt.setupMock(exec)

s := &Submitter{
exec: exec,
ctx: ctx,
logger: zerolog.Nop(),
}

err := s.setFinalWithRetry(100)
err := s.setFinalWithRetry(100)

if tt.expectSuccess {
require.NoError(t, err)
} else {
require.Error(t, err)
if tt.expectError != "" {
assert.Contains(t, err.Error(), tt.expectError)
if tt.expectSuccess {
require.NoError(t, err)
} else {
require.Error(t, err)
if tt.expectError != "" {
assert.Contains(t, err.Error(), tt.expectError)
}
}
}

exec.AssertExpectations(t)
exec.AssertExpectations(t)
})
})
}
}
Expand Down
Loading