Skip to content

Commit 401b1f9

Browse files
committed
fix(block/syncing): broadcast on sync nodes
1 parent b646e66 commit 401b1f9

10 files changed

Lines changed: 102 additions & 70 deletions

File tree

block/components.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ import (
55
"errors"
66
"fmt"
77

8-
goheader "github.com/celestiaorg/go-header"
98
"github.com/rs/zerolog"
109

1110
"github.com/evstack/ev-node/block/internal/cache"
11+
"github.com/evstack/ev-node/block/internal/common"
1212
"github.com/evstack/ev-node/block/internal/executing"
1313
"github.com/evstack/ev-node/block/internal/reaping"
1414
"github.com/evstack/ev-node/block/internal/submitting"
@@ -122,11 +122,6 @@ func (bc *Components) Stop() error {
122122
return errs
123123
}
124124

125-
// broadcaster interface for P2P broadcasting
126-
type broadcaster[T any] interface {
127-
WriteToStoreAndBroadcast(ctx context.Context, payload T) error
128-
}
129-
130125
// NewSyncComponents creates components for a non-aggregator full node that can only sync blocks.
131126
// Non-aggregator full nodes can sync from P2P and DA but cannot produce blocks or submit to DA.
132127
// They have more sync capabilities than light nodes but no block production. No signer required.
@@ -136,8 +131,8 @@ func NewSyncComponents(
136131
store store.Store,
137132
exec coreexecutor.Executor,
138133
da coreda.DA,
139-
headerStore goheader.Store[*types.SignedHeader],
140-
dataStore goheader.Store[*types.Data],
134+
headerBroadcaster common.Broadcaster[*types.SignedHeader],
135+
dataBroadcaster common.Broadcaster[*types.Data],
141136
logger zerolog.Logger,
142137
metrics *Metrics,
143138
blockOpts BlockOptions,
@@ -158,8 +153,8 @@ func NewSyncComponents(
158153
metrics,
159154
config,
160155
genesis,
161-
headerStore,
162-
dataStore,
156+
headerBroadcaster,
157+
dataBroadcaster,
163158
logger,
164159
blockOpts,
165160
errorCh,
@@ -199,8 +194,8 @@ func NewAggregatorComponents(
199194
sequencer coresequencer.Sequencer,
200195
da coreda.DA,
201196
signer signer.Signer,
202-
headerBroadcaster broadcaster[*types.SignedHeader],
203-
dataBroadcaster broadcaster[*types.Data],
197+
headerBroadcaster common.Broadcaster[*types.SignedHeader],
198+
dataBroadcaster common.Broadcaster[*types.Data],
204199
logger zerolog.Logger,
205200
metrics *Metrics,
206201
blockOpts BlockOptions,
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package common
2+
3+
import (
4+
"context"
5+
6+
goheader "github.com/celestiaorg/go-header"
7+
)
8+
9+
// Broadcaster interface for handling P2P stores and broadcasting
10+
type Broadcaster[H goheader.Header[H]] interface {
11+
WriteToStoreAndBroadcast(ctx context.Context, payload H) error
12+
Store() goheader.Store[H]
13+
}

block/internal/executing/executor.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,6 @@ import (
2323
"github.com/evstack/ev-node/types"
2424
)
2525

26-
// broadcaster interface for P2P broadcasting
27-
type broadcaster[T any] interface {
28-
WriteToStoreAndBroadcast(ctx context.Context, payload T) error
29-
}
30-
3126
// Executor handles block production, transaction processing, and state management
3227
type Executor struct {
3328
// Core components
@@ -41,8 +36,8 @@ type Executor struct {
4136
metrics *common.Metrics
4237

4338
// Broadcasting
44-
headerBroadcaster broadcaster[*types.SignedHeader]
45-
dataBroadcaster broadcaster[*types.Data]
39+
headerBroadcaster common.Broadcaster[*types.SignedHeader]
40+
dataBroadcaster common.Broadcaster[*types.Data]
4641

4742
// Configuration
4843
config config.Config
@@ -81,8 +76,8 @@ func NewExecutor(
8176
metrics *common.Metrics,
8277
config config.Config,
8378
genesis genesis.Genesis,
84-
headerBroadcaster broadcaster[*types.SignedHeader],
85-
dataBroadcaster broadcaster[*types.Data],
79+
headerBroadcaster common.Broadcaster[*types.SignedHeader],
80+
dataBroadcaster common.Broadcaster[*types.Data],
8681
logger zerolog.Logger,
8782
options common.BlockOptions,
8883
errorCh chan<- error,

block/internal/executing/executor_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"testing"
66
"time"
77

8+
goheader "github.com/celestiaorg/go-header"
89
"github.com/ipfs/go-datastore"
910
"github.com/ipfs/go-datastore/sync"
1011
"github.com/rs/zerolog"
@@ -20,7 +21,7 @@ import (
2021
)
2122

2223
// mockBroadcaster for testing
23-
type mockBroadcaster[T any] struct {
24+
type mockBroadcaster[T goheader.Header[T]] struct {
2425
called bool
2526
payload T
2627
}
@@ -31,6 +32,10 @@ func (m *mockBroadcaster[T]) WriteToStoreAndBroadcast(ctx context.Context, paylo
3132
return nil
3233
}
3334

35+
func (m *mockBroadcaster[T]) Store() goheader.Store[T] {
36+
panic("should not need to be needed")
37+
}
38+
3439
func TestExecutor_BroadcasterIntegration(t *testing.T) {
3540
// Create in-memory store
3641
ds := sync.MutexWrap(datastore.NewMapDatastore())

block/internal/syncing/syncer.go

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ import (
99
"sync/atomic"
1010
"time"
1111

12-
goheader "github.com/celestiaorg/go-header"
1312
"github.com/rs/zerolog"
13+
"golang.org/x/sync/errgroup"
1414

1515
"github.com/evstack/ev-node/block/internal/cache"
1616
"github.com/evstack/ev-node/block/internal/common"
@@ -25,6 +25,7 @@ import (
2525
type daRetriever interface {
2626
RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error)
2727
}
28+
2829
type p2pHandler interface {
2930
ProcessHeaderRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent
3031
ProcessDataRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent
@@ -53,9 +54,9 @@ type Syncer struct {
5354
// DA state
5455
daHeight uint64
5556

56-
// P2P stores
57-
headerStore goheader.Store[*types.SignedHeader]
58-
dataStore goheader.Store[*types.Data]
57+
// P2P handling
58+
headerBroadcaster common.Broadcaster[*types.SignedHeader]
59+
dataBroadcaster common.Broadcaster[*types.Data]
5960

6061
// Channels for coordination
6162
heightInCh chan common.DAHeightEvent
@@ -83,27 +84,27 @@ func NewSyncer(
8384
metrics *common.Metrics,
8485
config config.Config,
8586
genesis genesis.Genesis,
86-
headerStore goheader.Store[*types.SignedHeader],
87-
dataStore goheader.Store[*types.Data],
87+
headerBroadcaster common.Broadcaster[*types.SignedHeader],
88+
dataBroadcaster common.Broadcaster[*types.Data],
8889
logger zerolog.Logger,
8990
options common.BlockOptions,
9091
errorCh chan<- error,
9192
) *Syncer {
9293
return &Syncer{
93-
store: store,
94-
exec: exec,
95-
da: da,
96-
cache: cache,
97-
metrics: metrics,
98-
config: config,
99-
genesis: genesis,
100-
options: options,
101-
headerStore: headerStore,
102-
dataStore: dataStore,
103-
lastStateMtx: &sync.RWMutex{},
104-
heightInCh: make(chan common.DAHeightEvent, 10_000),
105-
errorCh: errorCh,
106-
logger: logger.With().Str("component", "syncer").Logger(),
94+
store: store,
95+
exec: exec,
96+
da: da,
97+
cache: cache,
98+
metrics: metrics,
99+
config: config,
100+
genesis: genesis,
101+
options: options,
102+
headerBroadcaster: headerBroadcaster,
103+
dataBroadcaster: dataBroadcaster,
104+
lastStateMtx: &sync.RWMutex{},
105+
heightInCh: make(chan common.DAHeightEvent, 10_000),
106+
errorCh: errorCh,
107+
logger: logger.With().Str("component", "syncer").Logger(),
107108
}
108109
}
109110

@@ -118,7 +119,7 @@ func (s *Syncer) Start(ctx context.Context) error {
118119

119120
// Initialize handlers
120121
s.daRetriever = NewDARetriever(s.da, s.cache, s.config, s.genesis, s.options, s.logger)
121-
s.p2pHandler = NewP2PHandler(s.headerStore, s.dataStore, s.genesis, s.options, s.logger)
122+
s.p2pHandler = NewP2PHandler(s.headerBroadcaster.Store(), s.dataBroadcaster.Store(), s.genesis, s.options, s.logger)
122123

123124
// Start main processing loop
124125
s.wg.Add(1)
@@ -327,7 +328,7 @@ func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64, block
327328
select {
328329
case <-blockTicker:
329330
// Process headers
330-
newHeaderHeight := s.headerStore.Height()
331+
newHeaderHeight := s.headerBroadcaster.Store().Height()
331332
if newHeaderHeight > *lastHeaderHeight {
332333
events := s.p2pHandler.ProcessHeaderRange(s.ctx, *lastHeaderHeight+1, newHeaderHeight)
333334
for _, event := range events {
@@ -344,7 +345,7 @@ func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64, block
344345
}
345346

346347
// Process data
347-
newDataHeight := s.dataStore.Height()
348+
newDataHeight := s.headerBroadcaster.Store().Height()
348349
if newDataHeight == newHeaderHeight {
349350
*lastDataHeight = newDataHeight
350351
} else if newDataHeight > *lastDataHeight {
@@ -407,6 +408,15 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) {
407408
}
408409
return
409410
}
411+
412+
// broadcast header and data to P2P network
413+
g, ctx := errgroup.WithContext(s.ctx)
414+
g.Go(func() error { return s.headerBroadcaster.WriteToStoreAndBroadcast(ctx, event.Header) })
415+
g.Go(func() error { return s.dataBroadcaster.WriteToStoreAndBroadcast(ctx, event.Data) })
416+
if err := g.Wait(); err != nil {
417+
s.logger.Error().Err(err).Msg("failed to broadcast header and/data")
418+
// don't fail block production on broadcast error
419+
}
410420
}
411421

412422
// errInvalidBlock is returned when a block is failing validation

block/internal/syncing/syncer_backoff_test.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"testing"
77
"time"
88

9+
goheader "github.com/celestiaorg/go-header"
910
"github.com/ipfs/go-datastore"
1011
dssync "github.com/ipfs/go-datastore/sync"
1112
"github.com/rs/zerolog"
@@ -24,6 +25,19 @@ import (
2425
"github.com/evstack/ev-node/types"
2526
)
2627

28+
// mockBroadcaster for testing
29+
type mockBroadcaster[T goheader.Header[T]] struct {
30+
store goheader.Store[T]
31+
}
32+
33+
func (m *mockBroadcaster[T]) WriteToStoreAndBroadcast(ctx context.Context, payload T) error {
34+
return nil
35+
}
36+
37+
func (m *mockBroadcaster[T]) Store() goheader.Store[T] {
38+
return m.store
39+
}
40+
2741
// TestSyncer_BackoffOnDAError verifies that the syncer implements proper backoff
2842
// behavior when encountering different types of DA layer errors.
2943
func TestSyncer_BackoffOnDAError(t *testing.T) {
@@ -76,11 +90,11 @@ func TestSyncer_BackoffOnDAError(t *testing.T) {
7690

7791
headerStore := mocks.NewMockStore[*types.SignedHeader](t)
7892
headerStore.On("Height").Return(uint64(0)).Maybe()
79-
syncer.headerStore = headerStore
93+
syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore}
8094

8195
dataStore := mocks.NewMockStore[*types.Data](t)
8296
dataStore.On("Height").Return(uint64(0)).Maybe()
83-
syncer.dataStore = dataStore
97+
syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore}
8498

8599
var callTimes []time.Time
86100
callCount := 0
@@ -167,11 +181,11 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) {
167181

168182
headerStore := mocks.NewMockStore[*types.SignedHeader](t)
169183
headerStore.On("Height").Return(uint64(0)).Maybe()
170-
syncer.headerStore = headerStore
184+
syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore}
171185

172186
dataStore := mocks.NewMockStore[*types.Data](t)
173187
dataStore.On("Height").Return(uint64(0)).Maybe()
174-
syncer.dataStore = dataStore
188+
syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore}
175189

176190
var callTimes []time.Time
177191

@@ -253,11 +267,11 @@ func TestSyncer_BackoffBehaviorIntegration(t *testing.T) {
253267

254268
headerStore := mocks.NewMockStore[*types.SignedHeader](t)
255269
headerStore.On("Height").Return(uint64(0)).Maybe()
256-
syncer.headerStore = headerStore
270+
syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore}
257271

258272
dataStore := mocks.NewMockStore[*types.Data](t)
259273
dataStore.On("Height").Return(uint64(0)).Maybe()
260-
syncer.dataStore = dataStore
274+
syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore}
261275

262276
var callTimes []time.Time
263277

@@ -335,8 +349,8 @@ func setupTestSyncer(t *testing.T, daBlockTime time.Duration) *Syncer {
335349
common.NopMetrics(),
336350
cfg,
337351
gen,
338-
nil,
339-
nil,
352+
&mockBroadcaster[*types.SignedHeader]{},
353+
&mockBroadcaster[*types.Data]{},
340354
zerolog.Nop(),
341355
common.DefaultBlockOptions(),
342356
make(chan error, 1),

block/internal/syncing/syncer_benchmark_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay
108108
common.NopMetrics(),
109109
cfg,
110110
gen,
111-
nil, // headerStore not used; we inject P2P directly to channel when needed
112-
nil, // dataStore not used
111+
nil, // we inject P2P directly to channel when needed
112+
nil, // injected when needed
113113
zerolog.Nop(),
114114
common.DefaultBlockOptions(),
115115
make(chan error, 1),
@@ -152,9 +152,9 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay
152152
s.p2pHandler = newMockp2pHandler(b) // not used directly in this benchmark path
153153
headerP2PStore := mocks.NewMockStore[*types.SignedHeader](b)
154154
headerP2PStore.On("Height").Return(uint64(0)).Maybe()
155-
s.headerStore = headerP2PStore
155+
s.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerP2PStore}
156156
dataP2PStore := mocks.NewMockStore[*types.Data](b)
157157
dataP2PStore.On("Height").Return(uint64(0)).Maybe()
158-
s.dataStore = dataP2PStore
158+
s.dataBroadcaster = &mockBroadcaster[*types.Data]{dataP2PStore}
159159
return &benchFixture{s: s, st: st, cm: cm, cancel: cancel}
160160
}

0 commit comments

Comments
 (0)