From c7b406ecc7bc6eb301f19df647fd88a4db23bff1 Mon Sep 17 00:00:00 2001 From: Wen Date: Mon, 11 May 2026 13:15:19 -0700 Subject: [PATCH 1/8] Wire eth_subscribe(newHeads) for Autobahn via in-process notifier Under Autobahn, app.FinalizeBlock is driven by giga_router rather than CometBFT consensus, so the legacy Tendermint event-bus subscription that backs eth_subscribe("newHeads") never fires. This adds a direct in-process notifier from giga_router's commit path into evmrpc's SubscriptionAPI fan-out (overwrite-on-full, capacity 1, so the latest head always wins if a consumer lags). The legacy path is untouched. Co-Authored-By: Claude Opus 4.7 (1M context) --- app/app.go | 36 ++++- evmrpc/notifier.go | 74 ++++++++++ evmrpc/notifier_internal_test.go | 118 +++++++++++++++ evmrpc/server.go | 3 +- evmrpc/setup_test.go | 23 ++- evmrpc/subscribe.go | 135 ++++++++++++++---- evmrpc/subscribe_test.go | 83 +++++++++++ .../evm_module/scripts/evm_rpc_tests.sh | 7 + .../evm_module/ws_test/ws_test.go | 109 ++++++++++++++ sei-tendermint/internal/p2p/giga_router.go | 15 ++ sei-tendermint/node/node.go | 2 + sei-tendermint/node/public.go | 8 ++ sei-tendermint/node/seed.go | 1 + sei-tendermint/node/setup.go | 2 + sei-tendermint/types/block_listener.go | 17 +++ 15 files changed, 595 insertions(+), 38 deletions(-) create mode 100644 evmrpc/notifier.go create mode 100644 evmrpc/notifier_internal_test.go create mode 100644 integration_test/evm_module/ws_test/ws_test.go create mode 100644 sei-tendermint/types/block_listener.go diff --git a/app/app.go b/app/app.go index a660f3f28d..c0ffd24ea7 100644 --- a/app/app.go +++ b/app/app.go @@ -307,6 +307,14 @@ var ( const ( MinGasEVMTx = 21000 + + // NewHeadsNotifierCapacity bounds the in-process eth_newHeads + // notifier buffer. Capacity 1 pairs with the notifier's + // overwrite-on-full semantics: if a consumer lags, the latest head + // always wins and stale heads are dropped. Anything larger only + // buffers staleness — newHeads subscribers care about the current + // head, not a backlog. + NewHeadsNotifierCapacity = 1 ) func init() { @@ -445,9 +453,14 @@ type App struct { HardForkManager *upgrades.HardForkManager - encodingConfig appparams.EncodingConfig - legacyEncodingConfig appparams.EncodingConfig - evmRPCConfig evmrpcconfig.Config + encodingConfig appparams.EncodingConfig + legacyEncodingConfig appparams.EncodingConfig + evmRPCConfig evmrpcconfig.Config + // blockHeaderNotifier is non-nil only when Autobahn is enabled. It is + // fed by GigaRouter after every committed block and consumed by + // evmrpc to drive eth_subscribe("newHeads") without going through the + // Tendermint event bus. + blockHeaderNotifier *evmrpc.BlockHeaderNotifier adminConfig admin.Config adminServer *grpc.Server lightInvarianceConfig LightInvarianceConfig @@ -714,6 +727,9 @@ func New( if err != nil { panic(fmt.Sprintf("error reading EVM config due to %s", err)) } + if tmConfig != nil && tmConfig.AutobahnConfigFile != "" { + app.blockHeaderNotifier = evmrpc.NewBlockHeaderNotifier(NewHeadsNotifierCapacity) + } if app.evmRPCConfig.TraceBakeEnabled { traceDB, dbErr := evmkeeper.NewTraceDB(homePath) if dbErr != nil { @@ -1324,6 +1340,18 @@ func (app *App) ProcessProposalHandler(ctx sdk.Context, req *abci.RequestProcess return resp, nil } +// OnBlockCommitted implements tmtypes.BlockHeaderListener. The Autobahn +// block-execution path calls this once per committed block, and the call +// is forwarded to the in-process eth_newHeads notifier consumed by +// evmrpc's SubscriptionAPI. The method is a no-op when Autobahn is not +// enabled (blockHeaderNotifier is nil) and never blocks. +func (app *App) OnBlockCommitted(hash []byte, header *tmproto.Header, response *abci.ResponseFinalizeBlock) { + if app.blockHeaderNotifier == nil { + return + } + app.blockHeaderNotifier.OnBlockCommitted(hash, header, response) +} + func (app *App) FinalizeBlocker(ctx sdk.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) { startTime := time.Now() defer func() { @@ -2532,7 +2560,7 @@ func (app *App) RegisterTendermintService(clientCtx client.Context) { } if app.evmRPCConfig.WSEnabled { - evmWSServer, err := evmrpc.NewEVMWebSocketServer(app.evmRPCConfig, clientCtx.Client, &app.EvmKeeper, app.BeginBlockKeepers, app.BaseApp, app.TracerAnteHandler, rpcCtxProvider, txConfigProvider, DefaultNodeHome, app.GetStateStore()) + evmWSServer, err := evmrpc.NewEVMWebSocketServer(app.evmRPCConfig, clientCtx.Client, &app.EvmKeeper, app.BeginBlockKeepers, app.BaseApp, app.TracerAnteHandler, rpcCtxProvider, txConfigProvider, DefaultNodeHome, app.GetStateStore(), app.blockHeaderNotifier) if err != nil { panic(err) } diff --git a/evmrpc/notifier.go b/evmrpc/notifier.go new file mode 100644 index 0000000000..289693aa4e --- /dev/null +++ b/evmrpc/notifier.go @@ -0,0 +1,74 @@ +package evmrpc + +import ( + abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types" + tmproto "github.com/sei-protocol/sei-chain/sei-tendermint/proto/tendermint/types" +) + +// blockHeaderEvent is the in-process payload delivered to SubscriptionAPI +// for each committed block. It mirrors the data evmrpc needs to build an +// Ethereum block header, without going through the Tendermint event bus. +// +// hash is the canonical block hash that EVM contracts and the receipt +// store see. Under Autobahn that is the autobahn lane-block header hash +// passed as Hash to app.FinalizeBlock — NOT a hash computed over the +// (partially-populated) Tendermint Header we synthesize for this event. +// Surfacing the receipt-store hash here keeps eth_newHeads consistent +// with what eth_getTransactionReceipt and eth_getBlockBy* return. +type blockHeaderEvent struct { + hash []byte + header *tmproto.Header + response *abci.ResponseFinalizeBlock +} + +// BlockHeaderNotifier implements sei-tendermint/types.BlockHeaderListener +// and feeds eth_subscribe("newHeads") via a direct in-process channel. +// +// Producers (e.g. the Autobahn block-execution path) call OnBlockCommitted +// once per committed block. The single consumer is SubscriptionAPI's +// fan-out goroutine, which broadcasts to all per-client subscribers. +// +// OnBlockCommitted is non-blocking and uses overwrite-on-full semantics: +// if the consumer is lagging, the oldest buffered event is dropped in +// favour of the newest. For eth_newHeads, the latest head is always more +// useful than a stale one. +// +// Concurrency: assumes a single producer (the block-execution loop) and a +// single consumer. With those invariants, after the drain step there is +// guaranteed space for the new event. +type BlockHeaderNotifier struct { + ch chan blockHeaderEvent +} + +func NewBlockHeaderNotifier(capacity int) *BlockHeaderNotifier { + return &BlockHeaderNotifier{ch: make(chan blockHeaderEvent, capacity)} +} + +// OnBlockCommitted implements types.BlockHeaderListener. +func (n *BlockHeaderNotifier) OnBlockCommitted(hash []byte, header *tmproto.Header, response *abci.ResponseFinalizeBlock) { + if n == nil { + return + } + evt := blockHeaderEvent{hash: hash, header: header, response: response} + select { + case n.ch <- evt: + return + default: + } + // Buffer full: drain one stale event to make room for the new one. + // With a single producer, draining one slot is sufficient; the second + // send must succeed under that invariant. The default branch is + // defensive and unreachable in practice. + select { + case <-n.ch: + default: + } + select { + case n.ch <- evt: + default: + } +} + +func (n *BlockHeaderNotifier) recv() <-chan blockHeaderEvent { + return n.ch +} diff --git a/evmrpc/notifier_internal_test.go b/evmrpc/notifier_internal_test.go new file mode 100644 index 0000000000..d09394323d --- /dev/null +++ b/evmrpc/notifier_internal_test.go @@ -0,0 +1,118 @@ +package evmrpc + +import ( + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types" + tmproto "github.com/sei-protocol/sei-chain/sei-tendermint/proto/tendermint/types" + "github.com/stretchr/testify/require" +) + +func TestBlockHeaderNotifier_DeliversEvent(t *testing.T) { + n := NewBlockHeaderNotifier(4) + + hash := []byte{0x01, 0x02, 0x03} + header := &tmproto.Header{Height: 42} + resp := &abci.ResponseFinalizeBlock{} + + n.OnBlockCommitted(hash, header, resp) + + select { + case evt := <-n.recv(): + require.Equal(t, hash, evt.hash) + require.Equal(t, header, evt.header) + require.Equal(t, resp, evt.response) + case <-time.After(time.Second): + t.Fatal("expected event on notifier channel") + } +} + +func TestBlockHeaderNotifier_OverwritesWhenFull(t *testing.T) { + n := NewBlockHeaderNotifier(1) + + // Fill the buffer with a stale event. + n.OnBlockCommitted([]byte{1}, &tmproto.Header{Height: 1}, &abci.ResponseFinalizeBlock{}) + + // A second call must not block and must replace the buffered event. + done := make(chan struct{}) + go func() { + n.OnBlockCommitted([]byte{2}, &tmproto.Header{Height: 2}, &abci.ResponseFinalizeBlock{}) + close(done) + }() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("OnBlockCommitted blocked when buffer was full") + } + + // The newest event must survive; the stale one must be gone. + evt := <-n.recv() + require.EqualValues(t, 2, evt.header.Height, "expected newest event to win on overwrite") + select { + case extra := <-n.recv(): + t.Fatalf("expected stale event to be dropped, got %+v", extra) + case <-time.After(50 * time.Millisecond): + } +} + +func TestBlockHeaderNotifier_NilReceiverIsNoOp(t *testing.T) { + var n *BlockHeaderNotifier + // Must not panic. + n.OnBlockCommitted(nil, &tmproto.Header{}, &abci.ResponseFinalizeBlock{}) +} + +func TestEncodeCommittedBlock(t *testing.T) { + hash := common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111").Bytes() + proposer := common.HexToAddress("0x2222222222222222222222222222222222222222").Bytes() + appHash := common.HexToHash("0x3333333333333333333333333333333333333333333333333333333333333333").Bytes() + ts := time.Unix(1_700_000_000, 0).UTC() + evt := blockHeaderEvent{ + hash: hash, + header: &tmproto.Header{ + Height: 12345, + Time: ts, + ProposerAddress: proposer, + AppHash: appHash, + }, + response: &abci.ResponseFinalizeBlock{ + TxResults: []*abci.ExecTxResult{ + {GasUsed: 21000}, + {GasUsed: 100000}, + }, + ConsensusParamUpdates: &tmproto.ConsensusParams{ + Block: &tmproto.BlockParams{MaxGas: 10_000_000}, + }, + }, + } + + out := encodeCommittedBlock(evt, big.NewInt(42)) + + require.Equal(t, common.BytesToHash(hash), out["hash"]) + require.Equal(t, (*hexutil.Big)(big.NewInt(12345)), out["number"]) + require.Equal(t, common.BytesToAddress(proposer), out["miner"]) + require.Equal(t, common.BytesToHash(appHash), out["stateRoot"]) + require.Equal(t, hexutil.Uint64(ts.Unix()), out["timestamp"]) + require.Equal(t, hexutil.Uint64(121000), out["gasUsed"]) + require.Equal(t, hexutil.Uint64(10_000_000), out["gasLimit"]) + require.Equal(t, (*hexutil.Big)(big.NewInt(42)), out["baseFeePerGas"]) + // Fields not surfaced by the Autobahn path must be zero, but present. + require.Equal(t, common.Hash{}, out["parentHash"]) + require.Equal(t, common.Hash{}, out["receiptsRoot"]) + require.Equal(t, common.Hash{}, out["transactionsRoot"]) +} + +func TestEncodeCommittedBlock_NilConsensusParamUpdates(t *testing.T) { + evt := blockHeaderEvent{ + hash: []byte{0xab}, + header: &tmproto.Header{Height: 1, Time: time.Unix(0, 0)}, + response: &abci.ResponseFinalizeBlock{ + // ConsensusParamUpdates intentionally nil; must not panic. + }, + } + out := encodeCommittedBlock(evt, big.NewInt(0)) + require.Equal(t, hexutil.Uint64(0), out["gasLimit"]) +} diff --git a/evmrpc/server.go b/evmrpc/server.go index a130bb8031..0d1c392764 100644 --- a/evmrpc/server.go +++ b/evmrpc/server.go @@ -254,6 +254,7 @@ func NewEVMWebSocketServer( txConfigProvider func(int64) client.TxConfig, homeDir string, stateStore types.StateStore, + blockHeaderNotifier *BlockHeaderNotifier, ) (EVMServer, error) { // Initialize global worker pool with configuration (metrics are embedded in pool) // This is idempotent - if HTTP server already initialized it, this is a no-op @@ -327,7 +328,7 @@ func NewEVMWebSocketServer( cacheCreationMutex: cacheCreationMutex, globalLogSlicePool: globalLogSlicePool, watermarks: watermarks, - }, &SubscriptionConfig{subscriptionCapacity: 100, newHeadLimit: config.MaxSubscriptionsNewHead}, &FilterConfig{timeout: config.FilterTimeout, maxLog: config.MaxLogNoBlock, maxBlock: config.MaxBlocksForLog}, ConnectionTypeWS), + }, &SubscriptionConfig{subscriptionCapacity: 100, newHeadLimit: config.MaxSubscriptionsNewHead}, &FilterConfig{timeout: config.FilterTimeout, maxLog: config.MaxLogNoBlock, maxBlock: config.MaxBlocksForLog}, ConnectionTypeWS, blockHeaderNotifier), }, { Namespace: "web3", diff --git a/evmrpc/setup_test.go b/evmrpc/setup_test.go index 4d03f2e4e0..b8e5a76f51 100644 --- a/evmrpc/setup_test.go +++ b/evmrpc/setup_test.go @@ -49,6 +49,7 @@ const TestWSPort = 7778 const TestBadPort = 7779 const TestStrictPort = 7780 const TestArchivePort = 7782 +const TestNotifierWSPort = 7784 const GenesisBlockHeight = 0 const MockHeight8 = 8 @@ -114,6 +115,11 @@ var MockBlockIDMultiTx = tmtypes.BlockID{ var NewHeadsCalled = make(chan struct{}, 1) +// NotifierForTest backs the Autobahn-style WS server started on +// TestNotifierWSPort. Tests publish to it via OnBlockCommitted to drive +// eth_subscribe("newHeads") through the in-process notifier path. +var NotifierForTest = evmrpc.NewBlockHeaderNotifier(16) + type MockClient struct { mock.Client latestOverride int64 @@ -667,7 +673,7 @@ func init() { } // Start ws server - wsServer, err := evmrpc.NewEVMWebSocketServer(goodConfig, &MockClient{}, EVMKeeper, testApp.BeginBlockKeepers, testApp.BaseApp, testApp.TracerAnteHandler, ctxProvider, txConfigProvider, "", nil) + wsServer, err := evmrpc.NewEVMWebSocketServer(goodConfig, &MockClient{}, EVMKeeper, testApp.BeginBlockKeepers, testApp.BaseApp, testApp.TracerAnteHandler, ctxProvider, txConfigProvider, "", nil, nil) if err != nil { panic(err) } @@ -675,6 +681,19 @@ func init() { panic(err) } fmt.Printf("wsServer started with config = %+v\n", goodConfig) + + // Start a second WS server wired to NotifierForTest, exercising the + // Autobahn (notifier-fed) eth_subscribe("newHeads") path. + notifierConfig := goodConfig + notifierConfig.HTTPPort = TestNotifierWSPort - 1 + notifierConfig.WSPort = TestNotifierWSPort + notifierWSServer, err := evmrpc.NewEVMWebSocketServer(notifierConfig, &MockClient{}, EVMKeeper, testApp.BeginBlockKeepers, testApp.BaseApp, testApp.TracerAnteHandler, ctxProvider, txConfigProvider, "", nil, NotifierForTest) + if err != nil { + panic(err) + } + if err := notifierWSServer.Start(); err != nil { + panic(err) + } time.Sleep(1 * time.Second) // Generate data @@ -1170,7 +1189,7 @@ func sendWSRequestAndListen(t *testing.T, port int, method string, params ...int headers := make(http.Header) headers.Set("Origin", "localhost") headers.Set("Content-Type", "application/json") - conn, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("ws://%s:%d", TestAddr, TestWSPort), headers) + conn, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("ws://%s:%d", TestAddr, port), headers) require.Nil(t, err) recv := make(chan map[string]interface{}) diff --git a/evmrpc/subscribe.go b/evmrpc/subscribe.go index 1d31d4283c..4c0adf7e59 100644 --- a/evmrpc/subscribe.go +++ b/evmrpc/subscribe.go @@ -40,7 +40,7 @@ type SubscriptionConfig struct { newHeadLimit uint64 } -func NewSubscriptionAPI(tmClient rpcclient.Client, k *keeper.Keeper, ctxProvider func(int64) sdk.Context, logFetcher *LogFetcher, subscriptionConfig *SubscriptionConfig, filterConfig *FilterConfig, connectionType ConnectionType) *SubscriptionAPI { +func NewSubscriptionAPI(tmClient rpcclient.Client, k *keeper.Keeper, ctxProvider func(int64) sdk.Context, logFetcher *LogFetcher, subscriptionConfig *SubscriptionConfig, filterConfig *FilterConfig, connectionType ConnectionType, blockHeaderNotifier *BlockHeaderNotifier) *SubscriptionAPI { logFetcher.filterConfig = filterConfig api := &SubscriptionAPI{ tmClient: tmClient, @@ -51,41 +51,63 @@ func NewSubscriptionAPI(tmClient rpcclient.Client, k *keeper.Keeper, ctxProvider newHeadListeners: make(map[rpc.ID]chan map[string]interface{}), connectionType: connectionType, } - id, subCh, err := api.subscriptionManager.Subscribe(context.Background(), NewHeadQueryBuilder(), api.subscriptonConfig.subscriptionCapacity) - if err != nil { - panic(err) - } - go func() { - defer recoverAndLog() - defer func() { - _ = api.subscriptionManager.Unsubscribe(context.Background(), id) - }() - for { - res := <-subCh - eventHeader := res.Data.(tmtypes.EventDataNewBlockHeader) - ctx := ctxProvider(eventHeader.Header.Height) - baseFeePerGas := k.GetNextBaseFeePerGas(ctx).TruncateInt().BigInt() - ethHeader, err := encodeTmHeader(eventHeader, baseFeePerGas) - if err != nil { - fmt.Printf("error encoding new head event %#v due to %s\n", res.Data, err) - continue - } - api.newHeadListenersMtx.Lock() - toDelete := []rpc.ID{} - for id, c := range api.newHeadListeners { - if !handleListener(c, ethHeader) { - toDelete = append(toDelete, id) + if blockHeaderNotifier != nil { + // Autobahn (and any future direct-channel) path. The producer + // pushes one event per committed block; there is no Tendermint + // event-bus subscription. + go api.runNewHeadsFromNotifier(blockHeaderNotifier, k, ctxProvider) + } else { + // Legacy CometBFT path: subscribe to the Tendermint event bus. + id, subCh, err := api.subscriptionManager.Subscribe(context.Background(), NewHeadQueryBuilder(), api.subscriptonConfig.subscriptionCapacity) + if err != nil { + panic(err) + } + go func() { + defer recoverAndLog() + defer func() { + _ = api.subscriptionManager.Unsubscribe(context.Background(), id) + }() + for { + res := <-subCh + eventHeader := res.Data.(tmtypes.EventDataNewBlockHeader) + ctx := ctxProvider(eventHeader.Header.Height) + baseFeePerGas := k.GetNextBaseFeePerGas(ctx).TruncateInt().BigInt() + ethHeader, err := encodeTmHeader(eventHeader, baseFeePerGas) + if err != nil { + fmt.Printf("error encoding new head event %#v due to %s\n", res.Data, err) + continue } + api.broadcastNewHead(ethHeader) } - for _, id := range toDelete { - delete(api.newHeadListeners, id) - } - api.newHeadListenersMtx.Unlock() - } - }() + }() + } return api } +func (a *SubscriptionAPI) runNewHeadsFromNotifier(notifier *BlockHeaderNotifier, k *keeper.Keeper, ctxProvider func(int64) sdk.Context) { + defer recoverAndLog() + for evt := range notifier.recv() { + ctx := ctxProvider(evt.header.Height) + baseFeePerGas := k.GetNextBaseFeePerGas(ctx).TruncateInt().BigInt() + ethHeader := encodeCommittedBlock(evt, baseFeePerGas) + a.broadcastNewHead(ethHeader) + } +} + +func (a *SubscriptionAPI) broadcastNewHead(ethHeader map[string]interface{}) { + a.newHeadListenersMtx.Lock() + defer a.newHeadListenersMtx.Unlock() + toDelete := []rpc.ID{} + for id, c := range a.newHeadListeners { + if !handleListener(c, ethHeader) { + toDelete = append(toDelete, id) + } + } + for _, id := range toDelete { + delete(a.newHeadListeners, id) + } +} + func handleListener(c chan map[string]interface{}, ethHeader map[string]interface{}) bool { // if the channel is already closed, sending to it/closing it will panic defer func() { _ = recover() }() @@ -281,6 +303,57 @@ func (s *SubscriptionManager) Unsubscribe(ctx context.Context, id SubscriberID) return nil } +// encodeCommittedBlock builds the eth_newHeads payload for an Autobahn- +// committed block. It differs from encodeTmHeader in two notable ways: +// +// 1. "hash" is the explicit Autobahn block-header hash from evt.hash +// (the same value the EVM receipt store records as blockHash). See +// blockHeaderEvent's doc for the rationale. +// 2. parentHash, receiptsRoot, and transactionsRoot are zero. The +// Autobahn block-execution path does not compute a Tendermint-style +// hash chain (LastBlockID / LastResultsHash / DataHash), so there is +// nothing meaningful to surface for those fields. Subscribers that +// chain-validate the head stream will need a different mechanism +// under Autobahn. +func encodeCommittedBlock(evt blockHeaderEvent, baseFee *big.Int) map[string]interface{} { + blockHash := common.BytesToHash(evt.hash) + number := big.NewInt(evt.header.Height) + miner := common.BytesToAddress(evt.header.ProposerAddress) + appHash := common.BytesToHash(evt.header.AppHash) + var gasWanted int64 + for _, txRes := range evt.response.TxResults { + gasWanted += txRes.GasUsed + } + var gasLimit uint64 + if cp := evt.response.ConsensusParamUpdates; cp != nil && cp.Block != nil { + gasLimit = uint64(cp.Block.MaxGas) //nolint:gosec + } + return map[string]interface{}{ + "difficulty": (*hexutil.Big)(utils.Big0), // inapplicable to Sei + "extraData": hexutil.Bytes{}, // inapplicable to Sei + "gasLimit": hexutil.Uint64(gasLimit), + "gasUsed": hexutil.Uint64(gasWanted), //nolint:gosec + "logsBloom": ethtypes.Bloom{}, // inapplicable to Sei + "miner": miner, + "nonce": ethtypes.BlockNonce{}, // inapplicable to Sei + "number": (*hexutil.Big)(number), + "parentHash": common.Hash{}, // see function doc + "receiptsRoot": common.Hash{}, // see function doc + "sha3Uncles": common.Hash{}, // inapplicable to Sei + "stateRoot": appHash, + "timestamp": hexutil.Uint64(evt.header.Time.Unix()), //nolint:gosec + "transactionsRoot": common.Hash{}, // see function doc + "mixHash": common.Hash{}, // inapplicable to Sei + "excessBlobGas": hexutil.Uint64(0), // inapplicable to Sei + "parentBeaconBlockRoot": common.Hash{}, // inapplicable to Sei + "hash": blockHash, + "withdrawlsRoot": common.Hash{}, // inapplicable to Sei + "baseFeePerGas": (*hexutil.Big)(baseFee), + "withdrawalsRoot": common.Hash{}, // inapplicable to Sei + "blobGasUsed": hexutil.Uint64(0), // inapplicable to Sei + } +} + func encodeTmHeader( header tmtypes.EventDataNewBlockHeader, baseFee *big.Int, diff --git a/evmrpc/subscribe_test.go b/evmrpc/subscribe_test.go index 82b4647351..fb22c9cf7a 100644 --- a/evmrpc/subscribe_test.go +++ b/evmrpc/subscribe_test.go @@ -9,6 +9,8 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/sei-protocol/sei-chain/evmrpc" + abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types" + tmproto "github.com/sei-protocol/sei-chain/sei-tendermint/proto/tendermint/types" "github.com/stretchr/testify/require" ) @@ -81,6 +83,87 @@ func TestSubscribeNewHeads(t *testing.T) { } } } + +// TestSubscribeNewHeadsAutobahn exercises the in-process notifier path +// (used under Autobahn) end-to-end: a WS client subscribes to newHeads +// against the notifier-backed server, the test pushes an event via +// NotifierForTest.OnBlockCommitted, and the WS subscriber must receive an +// eth_subscription notification carrying the encoded header. +func TestSubscribeNewHeadsAutobahn(t *testing.T) { + t.Parallel() + recvCh, done := sendWSRequestAndListen(t, TestNotifierWSPort, "subscribe", "newHeads") + defer func() { done <- struct{}{} }() + + hash := common.HexToHash("0x4242424242424242424242424242424242424242424242424242424242424242").Bytes() + appHash := common.HexToHash("0x3131313131313131313131313131313131313131313131313131313131313131").Bytes() + proposer := common.HexToAddress("0x9999999999999999999999999999999999999999").Bytes() + ts := time.Unix(1_700_000_500, 0).UTC() + + expectedKeys := []string{ + "parentHash", "sha3Uncles", "miner", "stateRoot", "transactionsRoot", + "receiptsRoot", "logsBloom", "difficulty", "number", "gasLimit", + "gasUsed", "timestamp", "extraData", "mixHash", "nonce", + "baseFeePerGas", "withdrawalsRoot", "blobGasUsed", "excessBlobGas", + "parentBeaconBlockRoot", "hash", + } + + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + + var subscriptionId string + for t.Context().Err() == nil { + select { + case resObj := <-recvCh: + if errVal, ok := resObj["error"]; ok { + t.Fatal("Received error:", errVal) + } + if subscriptionId == "" { + subscriptionId = resObj["result"].(string) + timer.Reset(5 * time.Second) + NotifierForTest.OnBlockCommitted(hash, &tmproto.Header{ + Height: MockHeight8, + Time: ts, + ProposerAddress: proposer, + AppHash: appHash, + }, &abci.ResponseFinalizeBlock{ + TxResults: []*abci.ExecTxResult{ + {GasUsed: 21000}, + {GasUsed: 50000}, + }, + ConsensusParamUpdates: &tmproto.ConsensusParams{ + Block: &tmproto.BlockParams{MaxGas: 5_000_000}, + }, + }) + continue + } + require.Equal(t, "eth_subscription", resObj["method"]) + paramMap := resObj["params"].(map[string]interface{}) + require.Equal(t, subscriptionId, paramMap["subscription"]) + resultMap := paramMap["result"].(map[string]interface{}) + for _, k := range expectedKeys { + _, ok := resultMap[k] + require.Truef(t, ok, "missing key %q in header", k) + } + require.Equal(t, common.BytesToHash(hash).Hex(), resultMap["hash"]) + require.Equal(t, fmt.Sprintf("0x%x", MockHeight8), resultMap["number"]) + require.Equal(t, common.BytesToAddress(proposer).Hex(), resultMap["miner"]) + require.Equal(t, common.BytesToHash(appHash).Hex(), resultMap["stateRoot"]) + require.Equal(t, fmt.Sprintf("0x%x", ts.Unix()), resultMap["timestamp"]) + require.Equal(t, fmt.Sprintf("0x%x", 21000+50000), resultMap["gasUsed"]) + require.Equal(t, fmt.Sprintf("0x%x", 5_000_000), resultMap["gasLimit"]) + // Fields the Autobahn path does not surface must serialize to + // the zero hash. + zeroHash := (common.Hash{}).Hex() + require.Equal(t, zeroHash, resultMap["parentHash"]) + require.Equal(t, zeroHash, resultMap["receiptsRoot"]) + require.Equal(t, zeroHash, resultMap["transactionsRoot"]) + return + case <-timer.C: + t.Fatal("No event received within 5 seconds") + } + } +} + func TestSubscribeEmptyLogs(t *testing.T) { t.Parallel() recvCh, done := sendWSRequestGood(t, "subscribe", "logs") diff --git a/integration_test/evm_module/scripts/evm_rpc_tests.sh b/integration_test/evm_module/scripts/evm_rpc_tests.sh index c0f6f29b3a..26505191e1 100755 --- a/integration_test/evm_module/scripts/evm_rpc_tests.sh +++ b/integration_test/evm_module/scripts/evm_rpc_tests.sh @@ -65,3 +65,10 @@ fi export SEI_EVM_IO_RUN_INTEGRATION=1 go test ./integration_test/evm_module/rpc_io_test/ -v -count=1 + +# WebSocket integration tests (eth_subscribe et al.). Lives in a sibling +# package because the .io/.iox framework cannot represent streaming +# methods. The test itself is consensus-mode agnostic, so the same +# invocation works under standard CometBFT and Autobahn clusters alike. +export SEI_EVM_WS_RUN_INTEGRATION=1 +go test ./integration_test/evm_module/ws_test/ -v -count=1 diff --git a/integration_test/evm_module/ws_test/ws_test.go b/integration_test/evm_module/ws_test/ws_test.go new file mode 100644 index 0000000000..a83a8a1b37 --- /dev/null +++ b/integration_test/evm_module/ws_test/ws_test.go @@ -0,0 +1,109 @@ +// Package ws_test exercises WebSocket JSON-RPC subscriptions against a +// live Sei EVM RPC. The test is consensus-mode agnostic: it dials the +// EVM WS port and asserts that eth_subscribe("newHeads") delivers a +// head notification. It runs under both standard CometBFT clusters and +// Autobahn clusters — the producer hook differs between them (legacy +// event bus vs in-process notifier), but the externally observable +// behaviour must be identical. +// +// Env: +// - SEI_EVM_WS_RUN_INTEGRATION=1 to run (set by integration scripts/CI). +// Otherwise the test skips so `go test ./...` stays cheap. +// - SEI_EVM_WS_URL overrides the default ws://127.0.0.1:8546. +package ws_test + +import ( + "os" + "testing" + "time" + + "github.com/gorilla/websocket" +) + +func wsURL() string { + if u := os.Getenv("SEI_EVM_WS_URL"); u != "" { + return u + } + return "ws://127.0.0.1:8546" +} + +func TestEthSubscribeNewHeads(t *testing.T) { + if os.Getenv("SEI_EVM_WS_RUN_INTEGRATION") != "1" { + t.Skip("EVM WS integration tests skipped (set SEI_EVM_WS_RUN_INTEGRATION=1 to run)") + } + + url := wsURL() + conn, _, err := websocket.DefaultDialer.Dial(url, nil) + if err != nil { + t.Fatalf("dial %s: %v", url, err) + } + defer conn.Close() + + if err := conn.WriteJSON(map[string]interface{}{ + "jsonrpc": "2.0", + "id": 1, + "method": "eth_subscribe", + "params": []string{"newHeads"}, + }); err != nil { + t.Fatalf("write subscribe: %v", err) + } + + // First message must be the subscription confirmation. Bound the wait + // so a broken handshake fails fast. + if err := conn.SetReadDeadline(time.Now().Add(10 * time.Second)); err != nil { + t.Fatalf("set deadline: %v", err) + } + var ack struct { + Result string `json:"result"` + Error map[string]interface{} `json:"error"` + } + if err := conn.ReadJSON(&ack); err != nil { + t.Fatalf("read subscribe ack: %v", err) + } + if ack.Error != nil { + t.Fatalf("subscribe error: %v", ack.Error) + } + if ack.Result == "" { + t.Fatalf("subscribe returned empty subscription id") + } + t.Logf("subscription id: %s", ack.Result) + + // Wait for at least one head notification. At Sei's block cadence + // this should arrive within a few seconds; allow generous slack. + if err := conn.SetReadDeadline(time.Now().Add(15 * time.Second)); err != nil { + t.Fatalf("set deadline: %v", err) + } + var note struct { + Method string `json:"method"` + Params struct { + Subscription string `json:"subscription"` + Result map[string]interface{} `json:"result"` + } `json:"params"` + } + if err := conn.ReadJSON(¬e); err != nil { + t.Fatalf("read head notification: %v", err) + } + if note.Method != "eth_subscription" { + t.Fatalf("expected eth_subscription, got %q", note.Method) + } + if note.Params.Subscription != ack.Result { + t.Fatalf("subscription id mismatch: got %q want %q", + note.Params.Subscription, ack.Result) + } + header := note.Params.Result + for _, key := range []string{"hash", "number", "timestamp", "stateRoot", "miner"} { + v, ok := header[key] + if !ok { + t.Fatalf("head notification missing key %q (got %+v)", key, header) + } + // All-zero values for hash, number, or timestamp would indicate + // the producer hook didn't fire and we're seeing a default- + // constructed header. + if s, _ := v.(string); s == "" || s == "0x" || s == "0x0" { + if key == "hash" || key == "number" || key == "timestamp" { + t.Fatalf("head notification %q has zero-ish value %q", key, s) + } + } + } + t.Logf("received head: number=%v hash=%v", header["number"], header["hash"]) +} diff --git a/sei-tendermint/internal/p2p/giga_router.go b/sei-tendermint/internal/p2p/giga_router.go index 5434979ab0..7c10d951a8 100644 --- a/sei-tendermint/internal/p2p/giga_router.go +++ b/sei-tendermint/internal/p2p/giga_router.go @@ -42,6 +42,11 @@ type GigaRouterConfig struct { Producer *producer.Config TxMempool *mempool.TxMempool GenDoc *types.GenesisDoc + + // BlockHeaderListener, if non-nil, is invoked after each block is + // committed. Used to feed evmrpc's eth_subscribe("newHeads") without + // going through the legacy Tendermint event bus. + BlockHeaderListener types.BlockHeaderListener } type GigaRouter struct { @@ -287,6 +292,16 @@ func (r *GigaRouter) executeBlock(ctx context.Context, b *atypes.GlobalBlock) (* if err != nil { return nil, fmt.Errorf("r.cfg.App.Commit(): %w", err) } + if r.cfg.BlockHeaderListener != nil { + header := (&types.Header{ + ChainID: r.cfg.GenDoc.ChainID, + Height: int64(b.GlobalNumber), // nolint:gosec + Time: b.Timestamp, + ProposerAddress: proposerAddress, + AppHash: resp.AppHash, + }).ToProto() + r.cfg.BlockHeaderListener.OnBlockCommitted(hash[:], header, resp) + } blockTxs := make(types.Txs, len(b.Payload.Txs())) for i, tx := range b.Payload.Txs() { blockTxs[i] = tx diff --git a/sei-tendermint/node/node.go b/sei-tendermint/node/node.go index 0016be94da..9b2900928b 100644 --- a/sei-tendermint/node/node.go +++ b/sei-tendermint/node/node.go @@ -89,6 +89,7 @@ func makeNode( tracerProviderOptions []trace.TracerProviderOption, nodeMetrics *NodeMetrics, consensusPolicy types.ConsensusPolicy, + blockHeaderListener types.BlockHeaderListener, ) (_ service.Service, err error) { var cancel context.CancelFunc ctx, cancel = context.WithCancel(ctx) @@ -202,6 +203,7 @@ func makeNode( utils.Some(mp), genDoc, dbProvider, + blockHeaderListener, ) closers = append(closers, peerCloser) if err != nil { diff --git a/sei-tendermint/node/public.go b/sei-tendermint/node/public.go index efc9d8881f..c5e5db4751 100644 --- a/sei-tendermint/node/public.go +++ b/sei-tendermint/node/public.go @@ -32,6 +32,13 @@ func New( nodeMetrics *NodeMetrics, consensusPolicy tmtypes.ConsensusPolicy, ) (service.Service, error) { + // Capture any optional BlockHeaderListener implementation before + // wrapping the app with the ABCI proxy, which only forwards ABCI + // methods. + var blockHeaderListener tmtypes.BlockHeaderListener + if l, ok := app.(tmtypes.BlockHeaderListener); ok { + blockHeaderListener = l + } proxyApp := proxy.New(app, nodeMetrics.proxy) nodeKey, err := tmtypes.LoadOrGenNodeKey(conf.NodeKeyFile()) if err != nil { @@ -65,6 +72,7 @@ func New( tracerProviderOptions, nodeMetrics, consensusPolicy, + blockHeaderListener, ) case config.ModeSeed: return makeSeedNode( diff --git a/sei-tendermint/node/seed.go b/sei-tendermint/node/seed.go index 1e19d9daa3..e70d960188 100644 --- a/sei-tendermint/node/seed.go +++ b/sei-tendermint/node/seed.go @@ -87,6 +87,7 @@ func makeSeedNode( utils.None[*mempool.TxMempool](), genDoc, dbProvider, + nil, ) closers = append(closers, peerCloser) if err != nil { diff --git a/sei-tendermint/node/setup.go b/sei-tendermint/node/setup.go index 142b9d2961..fcffa1c398 100644 --- a/sei-tendermint/node/setup.go +++ b/sei-tendermint/node/setup.go @@ -272,6 +272,7 @@ func createRouter( txMempool utils.Option[*mempool.TxMempool], genDoc *types.GenesisDoc, dbProvider config.DBProvider, + blockHeaderListener types.BlockHeaderListener, ) (*p2p.Router, closer, error) { closer := func() error { return nil } ep, err := p2p.ResolveEndpoint(nodeKey.ID().AddressString(cfg.P2P.ListenAddress)) @@ -369,6 +370,7 @@ func createRouter( if err != nil { return nil, closer, fmt.Errorf("buildGigaConfig: %w", err) } + gigaCfg.BlockHeaderListener = blockHeaderListener logger.Info("Autobahn config loaded", "validators", len(gigaCfg.ValidatorAddrs)) options.Giga = utils.Some(gigaCfg) } diff --git a/sei-tendermint/types/block_listener.go b/sei-tendermint/types/block_listener.go new file mode 100644 index 0000000000..bab837c63d --- /dev/null +++ b/sei-tendermint/types/block_listener.go @@ -0,0 +1,17 @@ +package types + +import ( + abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types" + tmproto "github.com/sei-protocol/sei-chain/sei-tendermint/proto/tendermint/types" +) + +// BlockHeaderListener is invoked once a block has been finalized and +// committed by the application. It is intended as a lightweight in-process +// alternative to subscribing on the Tendermint event bus for consumers that +// only need to be notified that a new head is available. +// +// Implementations must not block: the call site sits on the block-execution +// hot path and a slow listener will stall block production. +type BlockHeaderListener interface { + OnBlockCommitted(hash []byte, header *tmproto.Header, response *abci.ResponseFinalizeBlock) +} From 8bb4d41a91b3bb8b295efe6c04fdba40d56ab696 Mon Sep 17 00:00:00 2001 From: Wen Date: Mon, 11 May 2026 13:29:50 -0700 Subject: [PATCH 2/8] Drop typo'd withdrawlsRoot key from encodeCommittedBlock encodeCommittedBlock was mirroring the legacy encodeTmHeader, which emits both the typo'd "withdrawlsRoot" and the correctly-spelled "withdrawalsRoot". Since the Autobahn encoder is new code there is no back-compat reason to carry the typo; legacy encodeTmHeader is left untouched. Reported by Cursor Bugbot on #3419. Co-Authored-By: Claude Opus 4.7 (1M context) --- evmrpc/subscribe.go | 1 - 1 file changed, 1 deletion(-) diff --git a/evmrpc/subscribe.go b/evmrpc/subscribe.go index 4c0adf7e59..92bc638d7e 100644 --- a/evmrpc/subscribe.go +++ b/evmrpc/subscribe.go @@ -347,7 +347,6 @@ func encodeCommittedBlock(evt blockHeaderEvent, baseFee *big.Int) map[string]int "excessBlobGas": hexutil.Uint64(0), // inapplicable to Sei "parentBeaconBlockRoot": common.Hash{}, // inapplicable to Sei "hash": blockHash, - "withdrawlsRoot": common.Hash{}, // inapplicable to Sei "baseFeePerGas": (*hexutil.Big)(baseFee), "withdrawalsRoot": common.Hash{}, // inapplicable to Sei "blobGasUsed": hexutil.Uint64(0), // inapplicable to Sei From b060d93e76f7dfbc5b19bdb9d328faa42491ece9 Mon Sep 17 00:00:00 2001 From: Wen Date: Mon, 11 May 2026 13:35:16 -0700 Subject: [PATCH 3/8] Doc + defense polish from review - Tighten hash field doc in blockHeaderEvent: receipt store records zero blockHash on disk and evmrpc overlays the autobahn hash at read time. The prior wording implied the receipt store stored it directly. - Clarify single-producer assumption in OnBlockCommitted and document multi-producer behaviour (still safe, latest-wins is acceptable). - Move SubscriptionManager construction inside the legacy event-bus branch in NewSubscriptionAPI so the field is nil under Autobahn instead of dead state. - Document the non-nil header/response contract on BlockHeaderListener, and add a defensive guard in runNewHeadsFromNotifier so a single malformed event does not kill the fan-out goroutine for all subscribers. - Annotate the listener call site in giga_router.executeBlock so the fire-after-Commit-before-mempool-update ordering is explicit. Co-Authored-By: Claude Opus 4.7 (1M context) --- evmrpc/notifier.go | 32 ++++++++++++++-------- evmrpc/subscribe.go | 12 +++++++- sei-tendermint/internal/p2p/giga_router.go | 5 ++++ sei-tendermint/types/block_listener.go | 3 ++ 4 files changed, 39 insertions(+), 13 deletions(-) diff --git a/evmrpc/notifier.go b/evmrpc/notifier.go index 289693aa4e..330242ef5f 100644 --- a/evmrpc/notifier.go +++ b/evmrpc/notifier.go @@ -9,12 +9,13 @@ import ( // for each committed block. It mirrors the data evmrpc needs to build an // Ethereum block header, without going through the Tendermint event bus. // -// hash is the canonical block hash that EVM contracts and the receipt -// store see. Under Autobahn that is the autobahn lane-block header hash -// passed as Hash to app.FinalizeBlock — NOT a hash computed over the -// (partially-populated) Tendermint Header we synthesize for this event. -// Surfacing the receipt-store hash here keeps eth_newHeads consistent -// with what eth_getTransactionReceipt and eth_getBlockBy* return. +// hash is the autobahn lane-block header hash passed as Hash to +// app.FinalizeBlock — NOT a hash computed over the (partially-populated) +// Tendermint Header we synthesize for this event. This is the same value +// the eth_getBlockBy* and receipt API surfaces report as blockHash (the +// receipt store on disk records a zero blockHash; evmrpc overlays this +// hash at read time, see evmrpc/tx.go). Surfacing the same hash here +// keeps eth_newHeads consistent with the rest of the EVM RPC surface. type blockHeaderEvent struct { hash []byte header *tmproto.Header @@ -33,9 +34,14 @@ type blockHeaderEvent struct { // favour of the newest. For eth_newHeads, the latest head is always more // useful than a stale one. // -// Concurrency: assumes a single producer (the block-execution loop) and a -// single consumer. With those invariants, after the drain step there is -// guaranteed space for the new event. +// Concurrency: designed for a single producer (the block-execution loop) +// and a single consumer. Under that invariant the drain+send sequence in +// OnBlockCommitted always lands the new event. With multiple concurrent +// producers the same drain/send sequence still terminates without +// blocking, but the "latest" survivor among any racing publishes is +// nondeterministic — which is still acceptable for newHeads (we promise +// only that some recent head wins, not strict ordering across concurrent +// publishers). type BlockHeaderNotifier struct { ch chan blockHeaderEvent } @@ -56,9 +62,11 @@ func (n *BlockHeaderNotifier) OnBlockCommitted(hash []byte, header *tmproto.Head default: } // Buffer full: drain one stale event to make room for the new one. - // With a single producer, draining one slot is sufficient; the second - // send must succeed under that invariant. The default branch is - // defensive and unreachable in practice. + // With a single producer, draining one slot is sufficient and the + // second send always succeeds. With multiple producers a racing + // publisher could refill the slot between the drain and the send, + // in which case the default branch drops the new event — that is + // still consistent with overwrite-on-full (some recent head wins). select { case <-n.ch: default: diff --git a/evmrpc/subscribe.go b/evmrpc/subscribe.go index 92bc638d7e..4a29b6ab03 100644 --- a/evmrpc/subscribe.go +++ b/evmrpc/subscribe.go @@ -44,12 +44,14 @@ func NewSubscriptionAPI(tmClient rpcclient.Client, k *keeper.Keeper, ctxProvider logFetcher.filterConfig = filterConfig api := &SubscriptionAPI{ tmClient: tmClient, - subscriptionManager: NewSubscriptionManager(tmClient), subscriptonConfig: subscriptionConfig, logFetcher: logFetcher, newHeadListenersMtx: &sync.RWMutex{}, newHeadListeners: make(map[rpc.ID]chan map[string]interface{}), connectionType: connectionType, + // subscriptionManager is only constructed for the legacy + // event-bus path below; under Autobahn the notifier feeds the + // fan-out directly and the manager is unused. } if blockHeaderNotifier != nil { // Autobahn (and any future direct-channel) path. The producer @@ -58,6 +60,7 @@ func NewSubscriptionAPI(tmClient rpcclient.Client, k *keeper.Keeper, ctxProvider go api.runNewHeadsFromNotifier(blockHeaderNotifier, k, ctxProvider) } else { // Legacy CometBFT path: subscribe to the Tendermint event bus. + api.subscriptionManager = NewSubscriptionManager(tmClient) id, subCh, err := api.subscriptionManager.Subscribe(context.Background(), NewHeadQueryBuilder(), api.subscriptonConfig.subscriptionCapacity) if err != nil { panic(err) @@ -87,6 +90,13 @@ func NewSubscriptionAPI(tmClient rpcclient.Client, k *keeper.Keeper, ctxProvider func (a *SubscriptionAPI) runNewHeadsFromNotifier(notifier *BlockHeaderNotifier, k *keeper.Keeper, ctxProvider func(int64) sdk.Context) { defer recoverAndLog() for evt := range notifier.recv() { + // Defend against a misbehaving producer. BlockHeaderListener's + // contract requires non-nil header/response, but a single bad + // event must not kill the fan-out goroutine for all subscribers. + if evt.header == nil || evt.response == nil { + fmt.Printf("dropping malformed newHeads event: header=%v response=%v\n", evt.header, evt.response) + continue + } ctx := ctxProvider(evt.header.Height) baseFeePerGas := k.GetNextBaseFeePerGas(ctx).TruncateInt().BigInt() ethHeader := encodeCommittedBlock(evt, baseFeePerGas) diff --git a/sei-tendermint/internal/p2p/giga_router.go b/sei-tendermint/internal/p2p/giga_router.go index 7c10d951a8..3922af642c 100644 --- a/sei-tendermint/internal/p2p/giga_router.go +++ b/sei-tendermint/internal/p2p/giga_router.go @@ -292,6 +292,11 @@ func (r *GigaRouter) executeBlock(ctx context.Context, b *atypes.GlobalBlock) (* if err != nil { return nil, fmt.Errorf("r.cfg.App.Commit(): %w", err) } + // Block-header listener fires here — after a successful Commit (so + // subscribers only see committed state) and before the mempool + // reconciliation below (which is bookkeeping, not part of the block + // itself). On a Commit error we return early and the listener does + // not fire — there is no committed block to announce. if r.cfg.BlockHeaderListener != nil { header := (&types.Header{ ChainID: r.cfg.GenDoc.ChainID, diff --git a/sei-tendermint/types/block_listener.go b/sei-tendermint/types/block_listener.go index bab837c63d..c891a2ed97 100644 --- a/sei-tendermint/types/block_listener.go +++ b/sei-tendermint/types/block_listener.go @@ -12,6 +12,9 @@ import ( // // Implementations must not block: the call site sits on the block-execution // hot path and a slow listener will stall block production. +// +// Callers must pass non-nil header and response. Implementations are not +// required to nil-check; passing nil is a programming error. type BlockHeaderListener interface { OnBlockCommitted(hash []byte, header *tmproto.Header, response *abci.ResponseFinalizeBlock) } From 3b437df416c54ffb2f8b9a4bde86afb1c92d6e0f Mon Sep 17 00:00:00 2001 From: Wen Date: Tue, 12 May 2026 06:53:57 -0700 Subject: [PATCH 4/8] Source newHeads gasLimit from SDK ConsensusParams, not response updates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ResponseFinalizeBlock.ConsensusParamUpdates is only populated when the app proposes a consensus-param update — nil on the vast majority of blocks. Reading it for newHeads gasLimit means the reported gasLimit would be 0 for nearly every notification under Autobahn. Match the pattern already used in evmrpc/block.go's GetBlockByNumber: pull gasLimit from ctx.ConsensusParams() in the consumer goroutine and pass it as a parameter into encodeCommittedBlock. Reported by Cursor Bugbot on #3419 (high severity). Co-Authored-By: Claude Opus 4.7 (1M context) --- evmrpc/notifier_internal_test.go | 13 ++++--------- evmrpc/subscribe.go | 26 +++++++++++++++++--------- evmrpc/subscribe_test.go | 12 ++++++++---- 3 files changed, 29 insertions(+), 22 deletions(-) diff --git a/evmrpc/notifier_internal_test.go b/evmrpc/notifier_internal_test.go index d09394323d..5c3a66168f 100644 --- a/evmrpc/notifier_internal_test.go +++ b/evmrpc/notifier_internal_test.go @@ -83,13 +83,10 @@ func TestEncodeCommittedBlock(t *testing.T) { {GasUsed: 21000}, {GasUsed: 100000}, }, - ConsensusParamUpdates: &tmproto.ConsensusParams{ - Block: &tmproto.BlockParams{MaxGas: 10_000_000}, - }, }, } - out := encodeCommittedBlock(evt, big.NewInt(42)) + out := encodeCommittedBlock(evt, big.NewInt(42), 10_000_000) require.Equal(t, common.BytesToHash(hash), out["hash"]) require.Equal(t, (*hexutil.Big)(big.NewInt(12345)), out["number"]) @@ -105,14 +102,12 @@ func TestEncodeCommittedBlock(t *testing.T) { require.Equal(t, common.Hash{}, out["transactionsRoot"]) } -func TestEncodeCommittedBlock_NilConsensusParamUpdates(t *testing.T) { +func TestEncodeCommittedBlock_ZeroGasLimit(t *testing.T) { evt := blockHeaderEvent{ hash: []byte{0xab}, header: &tmproto.Header{Height: 1, Time: time.Unix(0, 0)}, - response: &abci.ResponseFinalizeBlock{ - // ConsensusParamUpdates intentionally nil; must not panic. - }, + response: &abci.ResponseFinalizeBlock{}, } - out := encodeCommittedBlock(evt, big.NewInt(0)) + out := encodeCommittedBlock(evt, big.NewInt(0), 0) require.Equal(t, hexutil.Uint64(0), out["gasLimit"]) } diff --git a/evmrpc/subscribe.go b/evmrpc/subscribe.go index 4a29b6ab03..f55c4a052d 100644 --- a/evmrpc/subscribe.go +++ b/evmrpc/subscribe.go @@ -99,7 +99,15 @@ func (a *SubscriptionAPI) runNewHeadsFromNotifier(notifier *BlockHeaderNotifier, } ctx := ctxProvider(evt.header.Height) baseFeePerGas := k.GetNextBaseFeePerGas(ctx).TruncateInt().BigInt() - ethHeader := encodeCommittedBlock(evt, baseFeePerGas) + // Source gasLimit from the active SDK ConsensusParams rather than + // evt.response.ConsensusParamUpdates: the latter is only populated + // on actual updates (nil for nearly every block). See block.go's + // GetBlockByNumber for the same pattern + rationale. + var gasLimit int64 + if cp := ctx.ConsensusParams(); cp != nil && cp.Block != nil { + gasLimit = cp.Block.MaxGas + } + ethHeader := encodeCommittedBlock(evt, baseFeePerGas, gasLimit) a.broadcastNewHead(ethHeader) } } @@ -325,7 +333,11 @@ func (s *SubscriptionManager) Unsubscribe(ctx context.Context, id SubscriberID) // nothing meaningful to surface for those fields. Subscribers that // chain-validate the head stream will need a different mechanism // under Autobahn. -func encodeCommittedBlock(evt blockHeaderEvent, baseFee *big.Int) map[string]interface{} { +// +// gasLimit is read by the caller from the active SDK ConsensusParams +// (see runNewHeadsFromNotifier); ConsensusParamUpdates on the response +// would be nil for the vast majority of blocks. +func encodeCommittedBlock(evt blockHeaderEvent, baseFee *big.Int, gasLimit int64) map[string]interface{} { blockHash := common.BytesToHash(evt.hash) number := big.NewInt(evt.header.Height) miner := common.BytesToAddress(evt.header.ProposerAddress) @@ -334,16 +346,12 @@ func encodeCommittedBlock(evt blockHeaderEvent, baseFee *big.Int) map[string]int for _, txRes := range evt.response.TxResults { gasWanted += txRes.GasUsed } - var gasLimit uint64 - if cp := evt.response.ConsensusParamUpdates; cp != nil && cp.Block != nil { - gasLimit = uint64(cp.Block.MaxGas) //nolint:gosec - } return map[string]interface{}{ "difficulty": (*hexutil.Big)(utils.Big0), // inapplicable to Sei "extraData": hexutil.Bytes{}, // inapplicable to Sei - "gasLimit": hexutil.Uint64(gasLimit), - "gasUsed": hexutil.Uint64(gasWanted), //nolint:gosec - "logsBloom": ethtypes.Bloom{}, // inapplicable to Sei + "gasLimit": hexutil.Uint64(gasLimit), //nolint:gosec + "gasUsed": hexutil.Uint64(gasWanted), //nolint:gosec + "logsBloom": ethtypes.Bloom{}, // inapplicable to Sei "miner": miner, "nonce": ethtypes.BlockNonce{}, // inapplicable to Sei "number": (*hexutil.Big)(number), diff --git a/evmrpc/subscribe_test.go b/evmrpc/subscribe_test.go index fb22c9cf7a..d3a4d3eedf 100644 --- a/evmrpc/subscribe_test.go +++ b/evmrpc/subscribe_test.go @@ -130,9 +130,9 @@ func TestSubscribeNewHeadsAutobahn(t *testing.T) { {GasUsed: 21000}, {GasUsed: 50000}, }, - ConsensusParamUpdates: &tmproto.ConsensusParams{ - Block: &tmproto.BlockParams{MaxGas: 5_000_000}, - }, + // gasLimit is sourced from the SDK ConsensusParams in + // the consumer goroutine, not from the response, so + // ConsensusParamUpdates is intentionally omitted here. }) continue } @@ -150,7 +150,11 @@ func TestSubscribeNewHeadsAutobahn(t *testing.T) { require.Equal(t, common.BytesToHash(appHash).Hex(), resultMap["stateRoot"]) require.Equal(t, fmt.Sprintf("0x%x", ts.Unix()), resultMap["timestamp"]) require.Equal(t, fmt.Sprintf("0x%x", 21000+50000), resultMap["gasUsed"]) - require.Equal(t, fmt.Sprintf("0x%x", 5_000_000), resultMap["gasLimit"]) + // gasLimit comes from the SDK ConsensusParams that the test + // runtime sets; just assert it's a non-zero hex string. + gasLimitStr, _ := resultMap["gasLimit"].(string) + require.NotEmpty(t, gasLimitStr) + require.NotEqual(t, "0x0", gasLimitStr) // Fields the Autobahn path does not surface must serialize to // the zero hash. zeroHash := (common.Hash{}).Hex() From 0113ac5292277431009b2ec2b356f9d2e2eab445 Mon Sep 17 00:00:00 2001 From: Wen Date: Tue, 12 May 2026 09:00:08 -0700 Subject: [PATCH 5/8] Fix newHeads baseFeePerGas off-by-one under Autobahn MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit GetNextBaseFeePerGas(ctx_at_N) returns the base fee for block N+1, not block N. The previous code passed the current block's ctx, so the newHeads notification for block N reported what should have been attached to N+1. Match the pattern from block.go's GetBlockByNumber: derive baseFee from the parent block's ctx, with a genesis (height==1) fallback to DefaultMinFeePerGas. The legacy CometBFT path has the same bug but is left alone — the more important consistency to restore is between eth_subscribe(newHeads) and eth_getBlockByNumber on the same Autobahn cluster. Co-Authored-By: Claude Opus 4.7 (1M context) --- evmrpc/subscribe.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/evmrpc/subscribe.go b/evmrpc/subscribe.go index f55c4a052d..596e8a5b95 100644 --- a/evmrpc/subscribe.go +++ b/evmrpc/subscribe.go @@ -19,6 +19,7 @@ import ( tmtypes "github.com/sei-protocol/sei-chain/sei-tendermint/types" "github.com/sei-protocol/sei-chain/utils" "github.com/sei-protocol/sei-chain/x/evm/keeper" + evmtypes "github.com/sei-protocol/sei-chain/x/evm/types" ) const SleepInterval = 5 * time.Second @@ -98,7 +99,17 @@ func (a *SubscriptionAPI) runNewHeadsFromNotifier(notifier *BlockHeaderNotifier, continue } ctx := ctxProvider(evt.header.Height) - baseFeePerGas := k.GetNextBaseFeePerGas(ctx).TruncateInt().BigInt() + // baseFeePerGas applies TO block N, but the keeper helper + // returns the *next* base fee given a context — so derive it + // from the parent block's ctx. Genesis (height 1) has no + // parent; fall back to the configured default min fee. Mirrors + // block.go's GetBlockByNumber. + var baseFeePerGas *big.Int + if evt.header.Height > 1 { + baseFeePerGas = k.GetNextBaseFeePerGas(ctxProvider(evt.header.Height - 1)).TruncateInt().BigInt() + } else { + baseFeePerGas = evmtypes.DefaultMinFeePerGas.TruncateInt().BigInt() + } // Source gasLimit from the active SDK ConsensusParams rather than // evt.response.ConsensusParamUpdates: the latter is only populated // on actual updates (nil for nearly every block). See block.go's From c8c0d46c9f96afe33bee407567cf40631cfd2881 Mon Sep 17 00:00:00 2001 From: Wen Date: Tue, 12 May 2026 10:01:13 -0700 Subject: [PATCH 6/8] Rename gasWanted to totalGasUsed in encodeCommittedBlock ExecTxResult has both a GasWanted and a GasUsed field with different semantics. The variable was accumulating GasUsed but named "gasWanted", inviting a future field-swap regression. Rename for clarity. Legacy encodeTmHeader carries the same anti-pattern but is left alone. Reported by Cursor Bugbot on #3419 (low severity). Co-Authored-By: Claude Opus 4.7 (1M context) --- evmrpc/subscribe.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/evmrpc/subscribe.go b/evmrpc/subscribe.go index 596e8a5b95..7180e7cd20 100644 --- a/evmrpc/subscribe.go +++ b/evmrpc/subscribe.go @@ -353,16 +353,16 @@ func encodeCommittedBlock(evt blockHeaderEvent, baseFee *big.Int, gasLimit int64 number := big.NewInt(evt.header.Height) miner := common.BytesToAddress(evt.header.ProposerAddress) appHash := common.BytesToHash(evt.header.AppHash) - var gasWanted int64 + var totalGasUsed int64 for _, txRes := range evt.response.TxResults { - gasWanted += txRes.GasUsed + totalGasUsed += txRes.GasUsed } return map[string]interface{}{ - "difficulty": (*hexutil.Big)(utils.Big0), // inapplicable to Sei - "extraData": hexutil.Bytes{}, // inapplicable to Sei - "gasLimit": hexutil.Uint64(gasLimit), //nolint:gosec - "gasUsed": hexutil.Uint64(gasWanted), //nolint:gosec - "logsBloom": ethtypes.Bloom{}, // inapplicable to Sei + "difficulty": (*hexutil.Big)(utils.Big0), // inapplicable to Sei + "extraData": hexutil.Bytes{}, // inapplicable to Sei + "gasLimit": hexutil.Uint64(gasLimit), //nolint:gosec + "gasUsed": hexutil.Uint64(totalGasUsed), //nolint:gosec + "logsBloom": ethtypes.Bloom{}, // inapplicable to Sei "miner": miner, "nonce": ethtypes.BlockNonce{}, // inapplicable to Sei "number": (*hexutil.Big)(number), From aef647d86b989556553a6dc5cbaf2916d682b854 Mon Sep 17 00:00:00 2001 From: Wen Date: Tue, 12 May 2026 10:37:45 -0700 Subject: [PATCH 7/8] ws_test: reuse outer err instead of shadowing The five `if err := ...` blocks in TestEthSubscribeNewHeads each shadowed the outer err from the initial Dial call. Reuse with `err =` for clarity (the outer err is already in scope and gets overwritten on each call anyway). Reported by @amir-deris on #3419. Co-Authored-By: Claude Opus 4.7 (1M context) --- integration_test/evm_module/ws_test/ws_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/integration_test/evm_module/ws_test/ws_test.go b/integration_test/evm_module/ws_test/ws_test.go index a83a8a1b37..13e9896508 100644 --- a/integration_test/evm_module/ws_test/ws_test.go +++ b/integration_test/evm_module/ws_test/ws_test.go @@ -39,7 +39,7 @@ func TestEthSubscribeNewHeads(t *testing.T) { } defer conn.Close() - if err := conn.WriteJSON(map[string]interface{}{ + if err = conn.WriteJSON(map[string]interface{}{ "jsonrpc": "2.0", "id": 1, "method": "eth_subscribe", @@ -50,14 +50,14 @@ func TestEthSubscribeNewHeads(t *testing.T) { // First message must be the subscription confirmation. Bound the wait // so a broken handshake fails fast. - if err := conn.SetReadDeadline(time.Now().Add(10 * time.Second)); err != nil { + if err = conn.SetReadDeadline(time.Now().Add(10 * time.Second)); err != nil { t.Fatalf("set deadline: %v", err) } var ack struct { Result string `json:"result"` Error map[string]interface{} `json:"error"` } - if err := conn.ReadJSON(&ack); err != nil { + if err = conn.ReadJSON(&ack); err != nil { t.Fatalf("read subscribe ack: %v", err) } if ack.Error != nil { @@ -70,7 +70,7 @@ func TestEthSubscribeNewHeads(t *testing.T) { // Wait for at least one head notification. At Sei's block cadence // this should arrive within a few seconds; allow generous slack. - if err := conn.SetReadDeadline(time.Now().Add(15 * time.Second)); err != nil { + if err = conn.SetReadDeadline(time.Now().Add(15 * time.Second)); err != nil { t.Fatalf("set deadline: %v", err) } var note struct { @@ -80,7 +80,7 @@ func TestEthSubscribeNewHeads(t *testing.T) { Result map[string]interface{} `json:"result"` } `json:"params"` } - if err := conn.ReadJSON(¬e); err != nil { + if err = conn.ReadJSON(¬e); err != nil { t.Fatalf("read head notification: %v", err) } if note.Method != "eth_subscription" { From 1cd560010380d508707a765df2a9a2e32bac86ee Mon Sep 17 00:00:00 2001 From: Wen Date: Tue, 12 May 2026 12:51:21 -0700 Subject: [PATCH 8/8] Extract pickHeadBaseFee + cover off-by-one with unit tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The base-fee selection in runNewHeadsFromNotifier was the actual location of the off-by-one fix (parent ctx vs current ctx) but had no direct test coverage — encodeCommittedBlock takes baseFee as an arg, so testing it only proved the encoder forwards the value it's given, not that the caller picks the right ctx. Extract pickHeadBaseFee as a free function that takes getNextBaseFee as a function pointer (rather than reaching into *keeper.Keeper), then spy on the ctxProvider in tests to verify: - TestPickHeadBaseFee_UsesParentCtx: ctxProvider called with height-1 (NOT height), and result is forwarded. - TestPickHeadBaseFee_GenesisFallback: at height 1 the keeper call is skipped entirely and DefaultMinFeePerGas is returned. Co-Authored-By: Claude Opus 4.7 (1M context) --- evmrpc/notifier_internal_test.go | 43 ++++++++++++++++++++++++++++++++ evmrpc/subscribe.go | 27 ++++++++++++-------- 2 files changed, 59 insertions(+), 11 deletions(-) diff --git a/evmrpc/notifier_internal_test.go b/evmrpc/notifier_internal_test.go index 5c3a66168f..86fe589f74 100644 --- a/evmrpc/notifier_internal_test.go +++ b/evmrpc/notifier_internal_test.go @@ -7,8 +7,10 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + sdk "github.com/sei-protocol/sei-chain/sei-cosmos/types" abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types" tmproto "github.com/sei-protocol/sei-chain/sei-tendermint/proto/tendermint/types" + evmtypes "github.com/sei-protocol/sei-chain/x/evm/types" "github.com/stretchr/testify/require" ) @@ -111,3 +113,44 @@ func TestEncodeCommittedBlock_ZeroGasLimit(t *testing.T) { out := encodeCommittedBlock(evt, big.NewInt(0), 0) require.Equal(t, hexutil.Uint64(0), out["gasLimit"]) } + +// TestPickHeadBaseFee_UsesParentCtx pins down the off-by-one fix: +// GetNextBaseFeePerGas(ctx_at_N) returns the fee for block N+1, so the +// base fee for the newHeads notification of block N must come from +// ctxProvider(N-1) — NOT ctxProvider(N). We spy on the ctxProvider to +// assert which height was queried. +func TestPickHeadBaseFee_UsesParentCtx(t *testing.T) { + var captured []int64 + ctxProvider := func(h int64) sdk.Context { + captured = append(captured, h) + return sdk.Context{} + } + getNextBaseFee := func(sdk.Context) sdk.Dec { + return sdk.NewDec(42) + } + + got := pickHeadBaseFee(getNextBaseFee, ctxProvider, 5) + + require.Equal(t, big.NewInt(42), got, "should forward getNextBaseFee result") + require.Equal(t, []int64{4}, captured, "ctxProvider must be called with parent height (height-1)") +} + +// TestPickHeadBaseFee_GenesisFallback verifies that at height 1 we skip +// the keeper call entirely (there is no parent block) and return the +// configured default min fee. +func TestPickHeadBaseFee_GenesisFallback(t *testing.T) { + var captured []int64 + ctxProvider := func(h int64) sdk.Context { + captured = append(captured, h) + return sdk.Context{} + } + getNextBaseFee := func(sdk.Context) sdk.Dec { + t.Fatal("getNextBaseFee must not be called at genesis") + return sdk.ZeroDec() + } + + got := pickHeadBaseFee(getNextBaseFee, ctxProvider, 1) + + require.Equal(t, evmtypes.DefaultMinFeePerGas.TruncateInt().BigInt(), got) + require.Empty(t, captured, "ctxProvider must not be called at height 1") +} diff --git a/evmrpc/subscribe.go b/evmrpc/subscribe.go index 7180e7cd20..20e5f2e837 100644 --- a/evmrpc/subscribe.go +++ b/evmrpc/subscribe.go @@ -99,17 +99,7 @@ func (a *SubscriptionAPI) runNewHeadsFromNotifier(notifier *BlockHeaderNotifier, continue } ctx := ctxProvider(evt.header.Height) - // baseFeePerGas applies TO block N, but the keeper helper - // returns the *next* base fee given a context — so derive it - // from the parent block's ctx. Genesis (height 1) has no - // parent; fall back to the configured default min fee. Mirrors - // block.go's GetBlockByNumber. - var baseFeePerGas *big.Int - if evt.header.Height > 1 { - baseFeePerGas = k.GetNextBaseFeePerGas(ctxProvider(evt.header.Height - 1)).TruncateInt().BigInt() - } else { - baseFeePerGas = evmtypes.DefaultMinFeePerGas.TruncateInt().BigInt() - } + baseFeePerGas := pickHeadBaseFee(k.GetNextBaseFeePerGas, ctxProvider, evt.header.Height) // Source gasLimit from the active SDK ConsensusParams rather than // evt.response.ConsensusParamUpdates: the latter is only populated // on actual updates (nil for nearly every block). See block.go's @@ -123,6 +113,21 @@ func (a *SubscriptionAPI) runNewHeadsFromNotifier(notifier *BlockHeaderNotifier, } } +// pickHeadBaseFee returns the baseFeePerGas to attach to the eth_newHeads +// notification for the block at `height`. Mirrors block.go's +// GetBlockByNumber: GetNextBaseFeePerGas(ctx_at_N) is the fee for N+1, so +// we call it on the *parent* ctx (height-1). Genesis (height 1) has no +// parent; return the configured default min fee instead. +// +// `getNextBaseFee` is a function pointer rather than a *keeper.Keeper +// method so tests can inject a fake without needing a full keeper. +func pickHeadBaseFee(getNextBaseFee func(sdk.Context) sdk.Dec, ctxProvider func(int64) sdk.Context, height int64) *big.Int { + if height > 1 { + return getNextBaseFee(ctxProvider(height - 1)).TruncateInt().BigInt() + } + return evmtypes.DefaultMinFeePerGas.TruncateInt().BigInt() +} + func (a *SubscriptionAPI) broadcastNewHead(ethHeader map[string]interface{}) { a.newHeadListenersMtx.Lock() defer a.newHeadListenersMtx.Unlock()