diff --git a/app/app.go b/app/app.go index a660f3f28d..c0ffd24ea7 100644 --- a/app/app.go +++ b/app/app.go @@ -307,6 +307,14 @@ var ( const ( MinGasEVMTx = 21000 + + // NewHeadsNotifierCapacity bounds the in-process eth_newHeads + // notifier buffer. Capacity 1 pairs with the notifier's + // overwrite-on-full semantics: if a consumer lags, the latest head + // always wins and stale heads are dropped. Anything larger only + // buffers staleness — newHeads subscribers care about the current + // head, not a backlog. + NewHeadsNotifierCapacity = 1 ) func init() { @@ -445,9 +453,14 @@ type App struct { HardForkManager *upgrades.HardForkManager - encodingConfig appparams.EncodingConfig - legacyEncodingConfig appparams.EncodingConfig - evmRPCConfig evmrpcconfig.Config + encodingConfig appparams.EncodingConfig + legacyEncodingConfig appparams.EncodingConfig + evmRPCConfig evmrpcconfig.Config + // blockHeaderNotifier is non-nil only when Autobahn is enabled. It is + // fed by GigaRouter after every committed block and consumed by + // evmrpc to drive eth_subscribe("newHeads") without going through the + // Tendermint event bus. + blockHeaderNotifier *evmrpc.BlockHeaderNotifier adminConfig admin.Config adminServer *grpc.Server lightInvarianceConfig LightInvarianceConfig @@ -714,6 +727,9 @@ func New( if err != nil { panic(fmt.Sprintf("error reading EVM config due to %s", err)) } + if tmConfig != nil && tmConfig.AutobahnConfigFile != "" { + app.blockHeaderNotifier = evmrpc.NewBlockHeaderNotifier(NewHeadsNotifierCapacity) + } if app.evmRPCConfig.TraceBakeEnabled { traceDB, dbErr := evmkeeper.NewTraceDB(homePath) if dbErr != nil { @@ -1324,6 +1340,18 @@ func (app *App) ProcessProposalHandler(ctx sdk.Context, req *abci.RequestProcess return resp, nil } +// OnBlockCommitted implements tmtypes.BlockHeaderListener. The Autobahn +// block-execution path calls this once per committed block, and the call +// is forwarded to the in-process eth_newHeads notifier consumed by +// evmrpc's SubscriptionAPI. The method is a no-op when Autobahn is not +// enabled (blockHeaderNotifier is nil) and never blocks. +func (app *App) OnBlockCommitted(hash []byte, header *tmproto.Header, response *abci.ResponseFinalizeBlock) { + if app.blockHeaderNotifier == nil { + return + } + app.blockHeaderNotifier.OnBlockCommitted(hash, header, response) +} + func (app *App) FinalizeBlocker(ctx sdk.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) { startTime := time.Now() defer func() { @@ -2532,7 +2560,7 @@ func (app *App) RegisterTendermintService(clientCtx client.Context) { } if app.evmRPCConfig.WSEnabled { - evmWSServer, err := evmrpc.NewEVMWebSocketServer(app.evmRPCConfig, clientCtx.Client, &app.EvmKeeper, app.BeginBlockKeepers, app.BaseApp, app.TracerAnteHandler, rpcCtxProvider, txConfigProvider, DefaultNodeHome, app.GetStateStore()) + evmWSServer, err := evmrpc.NewEVMWebSocketServer(app.evmRPCConfig, clientCtx.Client, &app.EvmKeeper, app.BeginBlockKeepers, app.BaseApp, app.TracerAnteHandler, rpcCtxProvider, txConfigProvider, DefaultNodeHome, app.GetStateStore(), app.blockHeaderNotifier) if err != nil { panic(err) } diff --git a/evmrpc/notifier.go b/evmrpc/notifier.go new file mode 100644 index 0000000000..330242ef5f --- /dev/null +++ b/evmrpc/notifier.go @@ -0,0 +1,82 @@ +package evmrpc + +import ( + abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types" + tmproto "github.com/sei-protocol/sei-chain/sei-tendermint/proto/tendermint/types" +) + +// blockHeaderEvent is the in-process payload delivered to SubscriptionAPI +// for each committed block. It mirrors the data evmrpc needs to build an +// Ethereum block header, without going through the Tendermint event bus. +// +// hash is the autobahn lane-block header hash passed as Hash to +// app.FinalizeBlock — NOT a hash computed over the (partially-populated) +// Tendermint Header we synthesize for this event. This is the same value +// the eth_getBlockBy* and receipt API surfaces report as blockHash (the +// receipt store on disk records a zero blockHash; evmrpc overlays this +// hash at read time, see evmrpc/tx.go). Surfacing the same hash here +// keeps eth_newHeads consistent with the rest of the EVM RPC surface. +type blockHeaderEvent struct { + hash []byte + header *tmproto.Header + response *abci.ResponseFinalizeBlock +} + +// BlockHeaderNotifier implements sei-tendermint/types.BlockHeaderListener +// and feeds eth_subscribe("newHeads") via a direct in-process channel. +// +// Producers (e.g. the Autobahn block-execution path) call OnBlockCommitted +// once per committed block. The single consumer is SubscriptionAPI's +// fan-out goroutine, which broadcasts to all per-client subscribers. +// +// OnBlockCommitted is non-blocking and uses overwrite-on-full semantics: +// if the consumer is lagging, the oldest buffered event is dropped in +// favour of the newest. For eth_newHeads, the latest head is always more +// useful than a stale one. +// +// Concurrency: designed for a single producer (the block-execution loop) +// and a single consumer. Under that invariant the drain+send sequence in +// OnBlockCommitted always lands the new event. With multiple concurrent +// producers the same drain/send sequence still terminates without +// blocking, but the "latest" survivor among any racing publishes is +// nondeterministic — which is still acceptable for newHeads (we promise +// only that some recent head wins, not strict ordering across concurrent +// publishers). +type BlockHeaderNotifier struct { + ch chan blockHeaderEvent +} + +func NewBlockHeaderNotifier(capacity int) *BlockHeaderNotifier { + return &BlockHeaderNotifier{ch: make(chan blockHeaderEvent, capacity)} +} + +// OnBlockCommitted implements types.BlockHeaderListener. +func (n *BlockHeaderNotifier) OnBlockCommitted(hash []byte, header *tmproto.Header, response *abci.ResponseFinalizeBlock) { + if n == nil { + return + } + evt := blockHeaderEvent{hash: hash, header: header, response: response} + select { + case n.ch <- evt: + return + default: + } + // Buffer full: drain one stale event to make room for the new one. + // With a single producer, draining one slot is sufficient and the + // second send always succeeds. With multiple producers a racing + // publisher could refill the slot between the drain and the send, + // in which case the default branch drops the new event — that is + // still consistent with overwrite-on-full (some recent head wins). + select { + case <-n.ch: + default: + } + select { + case n.ch <- evt: + default: + } +} + +func (n *BlockHeaderNotifier) recv() <-chan blockHeaderEvent { + return n.ch +} diff --git a/evmrpc/notifier_internal_test.go b/evmrpc/notifier_internal_test.go new file mode 100644 index 0000000000..86fe589f74 --- /dev/null +++ b/evmrpc/notifier_internal_test.go @@ -0,0 +1,156 @@ +package evmrpc + +import ( + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + sdk "github.com/sei-protocol/sei-chain/sei-cosmos/types" + abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types" + tmproto "github.com/sei-protocol/sei-chain/sei-tendermint/proto/tendermint/types" + evmtypes "github.com/sei-protocol/sei-chain/x/evm/types" + "github.com/stretchr/testify/require" +) + +func TestBlockHeaderNotifier_DeliversEvent(t *testing.T) { + n := NewBlockHeaderNotifier(4) + + hash := []byte{0x01, 0x02, 0x03} + header := &tmproto.Header{Height: 42} + resp := &abci.ResponseFinalizeBlock{} + + n.OnBlockCommitted(hash, header, resp) + + select { + case evt := <-n.recv(): + require.Equal(t, hash, evt.hash) + require.Equal(t, header, evt.header) + require.Equal(t, resp, evt.response) + case <-time.After(time.Second): + t.Fatal("expected event on notifier channel") + } +} + +func TestBlockHeaderNotifier_OverwritesWhenFull(t *testing.T) { + n := NewBlockHeaderNotifier(1) + + // Fill the buffer with a stale event. + n.OnBlockCommitted([]byte{1}, &tmproto.Header{Height: 1}, &abci.ResponseFinalizeBlock{}) + + // A second call must not block and must replace the buffered event. + done := make(chan struct{}) + go func() { + n.OnBlockCommitted([]byte{2}, &tmproto.Header{Height: 2}, &abci.ResponseFinalizeBlock{}) + close(done) + }() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("OnBlockCommitted blocked when buffer was full") + } + + // The newest event must survive; the stale one must be gone. + evt := <-n.recv() + require.EqualValues(t, 2, evt.header.Height, "expected newest event to win on overwrite") + select { + case extra := <-n.recv(): + t.Fatalf("expected stale event to be dropped, got %+v", extra) + case <-time.After(50 * time.Millisecond): + } +} + +func TestBlockHeaderNotifier_NilReceiverIsNoOp(t *testing.T) { + var n *BlockHeaderNotifier + // Must not panic. + n.OnBlockCommitted(nil, &tmproto.Header{}, &abci.ResponseFinalizeBlock{}) +} + +func TestEncodeCommittedBlock(t *testing.T) { + hash := common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111").Bytes() + proposer := common.HexToAddress("0x2222222222222222222222222222222222222222").Bytes() + appHash := common.HexToHash("0x3333333333333333333333333333333333333333333333333333333333333333").Bytes() + ts := time.Unix(1_700_000_000, 0).UTC() + evt := blockHeaderEvent{ + hash: hash, + header: &tmproto.Header{ + Height: 12345, + Time: ts, + ProposerAddress: proposer, + AppHash: appHash, + }, + response: &abci.ResponseFinalizeBlock{ + TxResults: []*abci.ExecTxResult{ + {GasUsed: 21000}, + {GasUsed: 100000}, + }, + }, + } + + out := encodeCommittedBlock(evt, big.NewInt(42), 10_000_000) + + require.Equal(t, common.BytesToHash(hash), out["hash"]) + require.Equal(t, (*hexutil.Big)(big.NewInt(12345)), out["number"]) + require.Equal(t, common.BytesToAddress(proposer), out["miner"]) + require.Equal(t, common.BytesToHash(appHash), out["stateRoot"]) + require.Equal(t, hexutil.Uint64(ts.Unix()), out["timestamp"]) + require.Equal(t, hexutil.Uint64(121000), out["gasUsed"]) + require.Equal(t, hexutil.Uint64(10_000_000), out["gasLimit"]) + require.Equal(t, (*hexutil.Big)(big.NewInt(42)), out["baseFeePerGas"]) + // Fields not surfaced by the Autobahn path must be zero, but present. + require.Equal(t, common.Hash{}, out["parentHash"]) + require.Equal(t, common.Hash{}, out["receiptsRoot"]) + require.Equal(t, common.Hash{}, out["transactionsRoot"]) +} + +func TestEncodeCommittedBlock_ZeroGasLimit(t *testing.T) { + evt := blockHeaderEvent{ + hash: []byte{0xab}, + header: &tmproto.Header{Height: 1, Time: time.Unix(0, 0)}, + response: &abci.ResponseFinalizeBlock{}, + } + out := encodeCommittedBlock(evt, big.NewInt(0), 0) + require.Equal(t, hexutil.Uint64(0), out["gasLimit"]) +} + +// TestPickHeadBaseFee_UsesParentCtx pins down the off-by-one fix: +// GetNextBaseFeePerGas(ctx_at_N) returns the fee for block N+1, so the +// base fee for the newHeads notification of block N must come from +// ctxProvider(N-1) — NOT ctxProvider(N). We spy on the ctxProvider to +// assert which height was queried. +func TestPickHeadBaseFee_UsesParentCtx(t *testing.T) { + var captured []int64 + ctxProvider := func(h int64) sdk.Context { + captured = append(captured, h) + return sdk.Context{} + } + getNextBaseFee := func(sdk.Context) sdk.Dec { + return sdk.NewDec(42) + } + + got := pickHeadBaseFee(getNextBaseFee, ctxProvider, 5) + + require.Equal(t, big.NewInt(42), got, "should forward getNextBaseFee result") + require.Equal(t, []int64{4}, captured, "ctxProvider must be called with parent height (height-1)") +} + +// TestPickHeadBaseFee_GenesisFallback verifies that at height 1 we skip +// the keeper call entirely (there is no parent block) and return the +// configured default min fee. +func TestPickHeadBaseFee_GenesisFallback(t *testing.T) { + var captured []int64 + ctxProvider := func(h int64) sdk.Context { + captured = append(captured, h) + return sdk.Context{} + } + getNextBaseFee := func(sdk.Context) sdk.Dec { + t.Fatal("getNextBaseFee must not be called at genesis") + return sdk.ZeroDec() + } + + got := pickHeadBaseFee(getNextBaseFee, ctxProvider, 1) + + require.Equal(t, evmtypes.DefaultMinFeePerGas.TruncateInt().BigInt(), got) + require.Empty(t, captured, "ctxProvider must not be called at height 1") +} diff --git a/evmrpc/server.go b/evmrpc/server.go index a130bb8031..0d1c392764 100644 --- a/evmrpc/server.go +++ b/evmrpc/server.go @@ -254,6 +254,7 @@ func NewEVMWebSocketServer( txConfigProvider func(int64) client.TxConfig, homeDir string, stateStore types.StateStore, + blockHeaderNotifier *BlockHeaderNotifier, ) (EVMServer, error) { // Initialize global worker pool with configuration (metrics are embedded in pool) // This is idempotent - if HTTP server already initialized it, this is a no-op @@ -327,7 +328,7 @@ func NewEVMWebSocketServer( cacheCreationMutex: cacheCreationMutex, globalLogSlicePool: globalLogSlicePool, watermarks: watermarks, - }, &SubscriptionConfig{subscriptionCapacity: 100, newHeadLimit: config.MaxSubscriptionsNewHead}, &FilterConfig{timeout: config.FilterTimeout, maxLog: config.MaxLogNoBlock, maxBlock: config.MaxBlocksForLog}, ConnectionTypeWS), + }, &SubscriptionConfig{subscriptionCapacity: 100, newHeadLimit: config.MaxSubscriptionsNewHead}, &FilterConfig{timeout: config.FilterTimeout, maxLog: config.MaxLogNoBlock, maxBlock: config.MaxBlocksForLog}, ConnectionTypeWS, blockHeaderNotifier), }, { Namespace: "web3", diff --git a/evmrpc/setup_test.go b/evmrpc/setup_test.go index 4d03f2e4e0..b8e5a76f51 100644 --- a/evmrpc/setup_test.go +++ b/evmrpc/setup_test.go @@ -49,6 +49,7 @@ const TestWSPort = 7778 const TestBadPort = 7779 const TestStrictPort = 7780 const TestArchivePort = 7782 +const TestNotifierWSPort = 7784 const GenesisBlockHeight = 0 const MockHeight8 = 8 @@ -114,6 +115,11 @@ var MockBlockIDMultiTx = tmtypes.BlockID{ var NewHeadsCalled = make(chan struct{}, 1) +// NotifierForTest backs the Autobahn-style WS server started on +// TestNotifierWSPort. Tests publish to it via OnBlockCommitted to drive +// eth_subscribe("newHeads") through the in-process notifier path. +var NotifierForTest = evmrpc.NewBlockHeaderNotifier(16) + type MockClient struct { mock.Client latestOverride int64 @@ -667,7 +673,7 @@ func init() { } // Start ws server - wsServer, err := evmrpc.NewEVMWebSocketServer(goodConfig, &MockClient{}, EVMKeeper, testApp.BeginBlockKeepers, testApp.BaseApp, testApp.TracerAnteHandler, ctxProvider, txConfigProvider, "", nil) + wsServer, err := evmrpc.NewEVMWebSocketServer(goodConfig, &MockClient{}, EVMKeeper, testApp.BeginBlockKeepers, testApp.BaseApp, testApp.TracerAnteHandler, ctxProvider, txConfigProvider, "", nil, nil) if err != nil { panic(err) } @@ -675,6 +681,19 @@ func init() { panic(err) } fmt.Printf("wsServer started with config = %+v\n", goodConfig) + + // Start a second WS server wired to NotifierForTest, exercising the + // Autobahn (notifier-fed) eth_subscribe("newHeads") path. + notifierConfig := goodConfig + notifierConfig.HTTPPort = TestNotifierWSPort - 1 + notifierConfig.WSPort = TestNotifierWSPort + notifierWSServer, err := evmrpc.NewEVMWebSocketServer(notifierConfig, &MockClient{}, EVMKeeper, testApp.BeginBlockKeepers, testApp.BaseApp, testApp.TracerAnteHandler, ctxProvider, txConfigProvider, "", nil, NotifierForTest) + if err != nil { + panic(err) + } + if err := notifierWSServer.Start(); err != nil { + panic(err) + } time.Sleep(1 * time.Second) // Generate data @@ -1170,7 +1189,7 @@ func sendWSRequestAndListen(t *testing.T, port int, method string, params ...int headers := make(http.Header) headers.Set("Origin", "localhost") headers.Set("Content-Type", "application/json") - conn, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("ws://%s:%d", TestAddr, TestWSPort), headers) + conn, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("ws://%s:%d", TestAddr, port), headers) require.Nil(t, err) recv := make(chan map[string]interface{}) diff --git a/evmrpc/subscribe.go b/evmrpc/subscribe.go index 1d31d4283c..20e5f2e837 100644 --- a/evmrpc/subscribe.go +++ b/evmrpc/subscribe.go @@ -19,6 +19,7 @@ import ( tmtypes "github.com/sei-protocol/sei-chain/sei-tendermint/types" "github.com/sei-protocol/sei-chain/utils" "github.com/sei-protocol/sei-chain/x/evm/keeper" + evmtypes "github.com/sei-protocol/sei-chain/x/evm/types" ) const SleepInterval = 5 * time.Second @@ -40,52 +41,107 @@ type SubscriptionConfig struct { newHeadLimit uint64 } -func NewSubscriptionAPI(tmClient rpcclient.Client, k *keeper.Keeper, ctxProvider func(int64) sdk.Context, logFetcher *LogFetcher, subscriptionConfig *SubscriptionConfig, filterConfig *FilterConfig, connectionType ConnectionType) *SubscriptionAPI { +func NewSubscriptionAPI(tmClient rpcclient.Client, k *keeper.Keeper, ctxProvider func(int64) sdk.Context, logFetcher *LogFetcher, subscriptionConfig *SubscriptionConfig, filterConfig *FilterConfig, connectionType ConnectionType, blockHeaderNotifier *BlockHeaderNotifier) *SubscriptionAPI { logFetcher.filterConfig = filterConfig api := &SubscriptionAPI{ tmClient: tmClient, - subscriptionManager: NewSubscriptionManager(tmClient), subscriptonConfig: subscriptionConfig, logFetcher: logFetcher, newHeadListenersMtx: &sync.RWMutex{}, newHeadListeners: make(map[rpc.ID]chan map[string]interface{}), connectionType: connectionType, + // subscriptionManager is only constructed for the legacy + // event-bus path below; under Autobahn the notifier feeds the + // fan-out directly and the manager is unused. } - id, subCh, err := api.subscriptionManager.Subscribe(context.Background(), NewHeadQueryBuilder(), api.subscriptonConfig.subscriptionCapacity) - if err != nil { - panic(err) - } - go func() { - defer recoverAndLog() - defer func() { - _ = api.subscriptionManager.Unsubscribe(context.Background(), id) - }() - for { - res := <-subCh - eventHeader := res.Data.(tmtypes.EventDataNewBlockHeader) - ctx := ctxProvider(eventHeader.Header.Height) - baseFeePerGas := k.GetNextBaseFeePerGas(ctx).TruncateInt().BigInt() - ethHeader, err := encodeTmHeader(eventHeader, baseFeePerGas) - if err != nil { - fmt.Printf("error encoding new head event %#v due to %s\n", res.Data, err) - continue - } - api.newHeadListenersMtx.Lock() - toDelete := []rpc.ID{} - for id, c := range api.newHeadListeners { - if !handleListener(c, ethHeader) { - toDelete = append(toDelete, id) + if blockHeaderNotifier != nil { + // Autobahn (and any future direct-channel) path. The producer + // pushes one event per committed block; there is no Tendermint + // event-bus subscription. + go api.runNewHeadsFromNotifier(blockHeaderNotifier, k, ctxProvider) + } else { + // Legacy CometBFT path: subscribe to the Tendermint event bus. + api.subscriptionManager = NewSubscriptionManager(tmClient) + id, subCh, err := api.subscriptionManager.Subscribe(context.Background(), NewHeadQueryBuilder(), api.subscriptonConfig.subscriptionCapacity) + if err != nil { + panic(err) + } + go func() { + defer recoverAndLog() + defer func() { + _ = api.subscriptionManager.Unsubscribe(context.Background(), id) + }() + for { + res := <-subCh + eventHeader := res.Data.(tmtypes.EventDataNewBlockHeader) + ctx := ctxProvider(eventHeader.Header.Height) + baseFeePerGas := k.GetNextBaseFeePerGas(ctx).TruncateInt().BigInt() + ethHeader, err := encodeTmHeader(eventHeader, baseFeePerGas) + if err != nil { + fmt.Printf("error encoding new head event %#v due to %s\n", res.Data, err) + continue } + api.broadcastNewHead(ethHeader) } - for _, id := range toDelete { - delete(api.newHeadListeners, id) - } - api.newHeadListenersMtx.Unlock() - } - }() + }() + } return api } +func (a *SubscriptionAPI) runNewHeadsFromNotifier(notifier *BlockHeaderNotifier, k *keeper.Keeper, ctxProvider func(int64) sdk.Context) { + defer recoverAndLog() + for evt := range notifier.recv() { + // Defend against a misbehaving producer. BlockHeaderListener's + // contract requires non-nil header/response, but a single bad + // event must not kill the fan-out goroutine for all subscribers. + if evt.header == nil || evt.response == nil { + fmt.Printf("dropping malformed newHeads event: header=%v response=%v\n", evt.header, evt.response) + continue + } + ctx := ctxProvider(evt.header.Height) + baseFeePerGas := pickHeadBaseFee(k.GetNextBaseFeePerGas, ctxProvider, evt.header.Height) + // Source gasLimit from the active SDK ConsensusParams rather than + // evt.response.ConsensusParamUpdates: the latter is only populated + // on actual updates (nil for nearly every block). See block.go's + // GetBlockByNumber for the same pattern + rationale. + var gasLimit int64 + if cp := ctx.ConsensusParams(); cp != nil && cp.Block != nil { + gasLimit = cp.Block.MaxGas + } + ethHeader := encodeCommittedBlock(evt, baseFeePerGas, gasLimit) + a.broadcastNewHead(ethHeader) + } +} + +// pickHeadBaseFee returns the baseFeePerGas to attach to the eth_newHeads +// notification for the block at `height`. Mirrors block.go's +// GetBlockByNumber: GetNextBaseFeePerGas(ctx_at_N) is the fee for N+1, so +// we call it on the *parent* ctx (height-1). Genesis (height 1) has no +// parent; return the configured default min fee instead. +// +// `getNextBaseFee` is a function pointer rather than a *keeper.Keeper +// method so tests can inject a fake without needing a full keeper. +func pickHeadBaseFee(getNextBaseFee func(sdk.Context) sdk.Dec, ctxProvider func(int64) sdk.Context, height int64) *big.Int { + if height > 1 { + return getNextBaseFee(ctxProvider(height - 1)).TruncateInt().BigInt() + } + return evmtypes.DefaultMinFeePerGas.TruncateInt().BigInt() +} + +func (a *SubscriptionAPI) broadcastNewHead(ethHeader map[string]interface{}) { + a.newHeadListenersMtx.Lock() + defer a.newHeadListenersMtx.Unlock() + toDelete := []rpc.ID{} + for id, c := range a.newHeadListeners { + if !handleListener(c, ethHeader) { + toDelete = append(toDelete, id) + } + } + for _, id := range toDelete { + delete(a.newHeadListeners, id) + } +} + func handleListener(c chan map[string]interface{}, ethHeader map[string]interface{}) bool { // if the channel is already closed, sending to it/closing it will panic defer func() { _ = recover() }() @@ -281,6 +337,56 @@ func (s *SubscriptionManager) Unsubscribe(ctx context.Context, id SubscriberID) return nil } +// encodeCommittedBlock builds the eth_newHeads payload for an Autobahn- +// committed block. It differs from encodeTmHeader in two notable ways: +// +// 1. "hash" is the explicit Autobahn block-header hash from evt.hash +// (the same value the EVM receipt store records as blockHash). See +// blockHeaderEvent's doc for the rationale. +// 2. parentHash, receiptsRoot, and transactionsRoot are zero. The +// Autobahn block-execution path does not compute a Tendermint-style +// hash chain (LastBlockID / LastResultsHash / DataHash), so there is +// nothing meaningful to surface for those fields. Subscribers that +// chain-validate the head stream will need a different mechanism +// under Autobahn. +// +// gasLimit is read by the caller from the active SDK ConsensusParams +// (see runNewHeadsFromNotifier); ConsensusParamUpdates on the response +// would be nil for the vast majority of blocks. +func encodeCommittedBlock(evt blockHeaderEvent, baseFee *big.Int, gasLimit int64) map[string]interface{} { + blockHash := common.BytesToHash(evt.hash) + number := big.NewInt(evt.header.Height) + miner := common.BytesToAddress(evt.header.ProposerAddress) + appHash := common.BytesToHash(evt.header.AppHash) + var totalGasUsed int64 + for _, txRes := range evt.response.TxResults { + totalGasUsed += txRes.GasUsed + } + return map[string]interface{}{ + "difficulty": (*hexutil.Big)(utils.Big0), // inapplicable to Sei + "extraData": hexutil.Bytes{}, // inapplicable to Sei + "gasLimit": hexutil.Uint64(gasLimit), //nolint:gosec + "gasUsed": hexutil.Uint64(totalGasUsed), //nolint:gosec + "logsBloom": ethtypes.Bloom{}, // inapplicable to Sei + "miner": miner, + "nonce": ethtypes.BlockNonce{}, // inapplicable to Sei + "number": (*hexutil.Big)(number), + "parentHash": common.Hash{}, // see function doc + "receiptsRoot": common.Hash{}, // see function doc + "sha3Uncles": common.Hash{}, // inapplicable to Sei + "stateRoot": appHash, + "timestamp": hexutil.Uint64(evt.header.Time.Unix()), //nolint:gosec + "transactionsRoot": common.Hash{}, // see function doc + "mixHash": common.Hash{}, // inapplicable to Sei + "excessBlobGas": hexutil.Uint64(0), // inapplicable to Sei + "parentBeaconBlockRoot": common.Hash{}, // inapplicable to Sei + "hash": blockHash, + "baseFeePerGas": (*hexutil.Big)(baseFee), + "withdrawalsRoot": common.Hash{}, // inapplicable to Sei + "blobGasUsed": hexutil.Uint64(0), // inapplicable to Sei + } +} + func encodeTmHeader( header tmtypes.EventDataNewBlockHeader, baseFee *big.Int, diff --git a/evmrpc/subscribe_test.go b/evmrpc/subscribe_test.go index 82b4647351..d3a4d3eedf 100644 --- a/evmrpc/subscribe_test.go +++ b/evmrpc/subscribe_test.go @@ -9,6 +9,8 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/sei-protocol/sei-chain/evmrpc" + abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types" + tmproto "github.com/sei-protocol/sei-chain/sei-tendermint/proto/tendermint/types" "github.com/stretchr/testify/require" ) @@ -81,6 +83,91 @@ func TestSubscribeNewHeads(t *testing.T) { } } } + +// TestSubscribeNewHeadsAutobahn exercises the in-process notifier path +// (used under Autobahn) end-to-end: a WS client subscribes to newHeads +// against the notifier-backed server, the test pushes an event via +// NotifierForTest.OnBlockCommitted, and the WS subscriber must receive an +// eth_subscription notification carrying the encoded header. +func TestSubscribeNewHeadsAutobahn(t *testing.T) { + t.Parallel() + recvCh, done := sendWSRequestAndListen(t, TestNotifierWSPort, "subscribe", "newHeads") + defer func() { done <- struct{}{} }() + + hash := common.HexToHash("0x4242424242424242424242424242424242424242424242424242424242424242").Bytes() + appHash := common.HexToHash("0x3131313131313131313131313131313131313131313131313131313131313131").Bytes() + proposer := common.HexToAddress("0x9999999999999999999999999999999999999999").Bytes() + ts := time.Unix(1_700_000_500, 0).UTC() + + expectedKeys := []string{ + "parentHash", "sha3Uncles", "miner", "stateRoot", "transactionsRoot", + "receiptsRoot", "logsBloom", "difficulty", "number", "gasLimit", + "gasUsed", "timestamp", "extraData", "mixHash", "nonce", + "baseFeePerGas", "withdrawalsRoot", "blobGasUsed", "excessBlobGas", + "parentBeaconBlockRoot", "hash", + } + + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + + var subscriptionId string + for t.Context().Err() == nil { + select { + case resObj := <-recvCh: + if errVal, ok := resObj["error"]; ok { + t.Fatal("Received error:", errVal) + } + if subscriptionId == "" { + subscriptionId = resObj["result"].(string) + timer.Reset(5 * time.Second) + NotifierForTest.OnBlockCommitted(hash, &tmproto.Header{ + Height: MockHeight8, + Time: ts, + ProposerAddress: proposer, + AppHash: appHash, + }, &abci.ResponseFinalizeBlock{ + TxResults: []*abci.ExecTxResult{ + {GasUsed: 21000}, + {GasUsed: 50000}, + }, + // gasLimit is sourced from the SDK ConsensusParams in + // the consumer goroutine, not from the response, so + // ConsensusParamUpdates is intentionally omitted here. + }) + continue + } + require.Equal(t, "eth_subscription", resObj["method"]) + paramMap := resObj["params"].(map[string]interface{}) + require.Equal(t, subscriptionId, paramMap["subscription"]) + resultMap := paramMap["result"].(map[string]interface{}) + for _, k := range expectedKeys { + _, ok := resultMap[k] + require.Truef(t, ok, "missing key %q in header", k) + } + require.Equal(t, common.BytesToHash(hash).Hex(), resultMap["hash"]) + require.Equal(t, fmt.Sprintf("0x%x", MockHeight8), resultMap["number"]) + require.Equal(t, common.BytesToAddress(proposer).Hex(), resultMap["miner"]) + require.Equal(t, common.BytesToHash(appHash).Hex(), resultMap["stateRoot"]) + require.Equal(t, fmt.Sprintf("0x%x", ts.Unix()), resultMap["timestamp"]) + require.Equal(t, fmt.Sprintf("0x%x", 21000+50000), resultMap["gasUsed"]) + // gasLimit comes from the SDK ConsensusParams that the test + // runtime sets; just assert it's a non-zero hex string. + gasLimitStr, _ := resultMap["gasLimit"].(string) + require.NotEmpty(t, gasLimitStr) + require.NotEqual(t, "0x0", gasLimitStr) + // Fields the Autobahn path does not surface must serialize to + // the zero hash. + zeroHash := (common.Hash{}).Hex() + require.Equal(t, zeroHash, resultMap["parentHash"]) + require.Equal(t, zeroHash, resultMap["receiptsRoot"]) + require.Equal(t, zeroHash, resultMap["transactionsRoot"]) + return + case <-timer.C: + t.Fatal("No event received within 5 seconds") + } + } +} + func TestSubscribeEmptyLogs(t *testing.T) { t.Parallel() recvCh, done := sendWSRequestGood(t, "subscribe", "logs") diff --git a/integration_test/evm_module/scripts/evm_rpc_tests.sh b/integration_test/evm_module/scripts/evm_rpc_tests.sh index c0f6f29b3a..26505191e1 100755 --- a/integration_test/evm_module/scripts/evm_rpc_tests.sh +++ b/integration_test/evm_module/scripts/evm_rpc_tests.sh @@ -65,3 +65,10 @@ fi export SEI_EVM_IO_RUN_INTEGRATION=1 go test ./integration_test/evm_module/rpc_io_test/ -v -count=1 + +# WebSocket integration tests (eth_subscribe et al.). Lives in a sibling +# package because the .io/.iox framework cannot represent streaming +# methods. The test itself is consensus-mode agnostic, so the same +# invocation works under standard CometBFT and Autobahn clusters alike. +export SEI_EVM_WS_RUN_INTEGRATION=1 +go test ./integration_test/evm_module/ws_test/ -v -count=1 diff --git a/integration_test/evm_module/ws_test/ws_test.go b/integration_test/evm_module/ws_test/ws_test.go new file mode 100644 index 0000000000..13e9896508 --- /dev/null +++ b/integration_test/evm_module/ws_test/ws_test.go @@ -0,0 +1,109 @@ +// Package ws_test exercises WebSocket JSON-RPC subscriptions against a +// live Sei EVM RPC. The test is consensus-mode agnostic: it dials the +// EVM WS port and asserts that eth_subscribe("newHeads") delivers a +// head notification. It runs under both standard CometBFT clusters and +// Autobahn clusters — the producer hook differs between them (legacy +// event bus vs in-process notifier), but the externally observable +// behaviour must be identical. +// +// Env: +// - SEI_EVM_WS_RUN_INTEGRATION=1 to run (set by integration scripts/CI). +// Otherwise the test skips so `go test ./...` stays cheap. +// - SEI_EVM_WS_URL overrides the default ws://127.0.0.1:8546. +package ws_test + +import ( + "os" + "testing" + "time" + + "github.com/gorilla/websocket" +) + +func wsURL() string { + if u := os.Getenv("SEI_EVM_WS_URL"); u != "" { + return u + } + return "ws://127.0.0.1:8546" +} + +func TestEthSubscribeNewHeads(t *testing.T) { + if os.Getenv("SEI_EVM_WS_RUN_INTEGRATION") != "1" { + t.Skip("EVM WS integration tests skipped (set SEI_EVM_WS_RUN_INTEGRATION=1 to run)") + } + + url := wsURL() + conn, _, err := websocket.DefaultDialer.Dial(url, nil) + if err != nil { + t.Fatalf("dial %s: %v", url, err) + } + defer conn.Close() + + if err = conn.WriteJSON(map[string]interface{}{ + "jsonrpc": "2.0", + "id": 1, + "method": "eth_subscribe", + "params": []string{"newHeads"}, + }); err != nil { + t.Fatalf("write subscribe: %v", err) + } + + // First message must be the subscription confirmation. Bound the wait + // so a broken handshake fails fast. + if err = conn.SetReadDeadline(time.Now().Add(10 * time.Second)); err != nil { + t.Fatalf("set deadline: %v", err) + } + var ack struct { + Result string `json:"result"` + Error map[string]interface{} `json:"error"` + } + if err = conn.ReadJSON(&ack); err != nil { + t.Fatalf("read subscribe ack: %v", err) + } + if ack.Error != nil { + t.Fatalf("subscribe error: %v", ack.Error) + } + if ack.Result == "" { + t.Fatalf("subscribe returned empty subscription id") + } + t.Logf("subscription id: %s", ack.Result) + + // Wait for at least one head notification. At Sei's block cadence + // this should arrive within a few seconds; allow generous slack. + if err = conn.SetReadDeadline(time.Now().Add(15 * time.Second)); err != nil { + t.Fatalf("set deadline: %v", err) + } + var note struct { + Method string `json:"method"` + Params struct { + Subscription string `json:"subscription"` + Result map[string]interface{} `json:"result"` + } `json:"params"` + } + if err = conn.ReadJSON(¬e); err != nil { + t.Fatalf("read head notification: %v", err) + } + if note.Method != "eth_subscription" { + t.Fatalf("expected eth_subscription, got %q", note.Method) + } + if note.Params.Subscription != ack.Result { + t.Fatalf("subscription id mismatch: got %q want %q", + note.Params.Subscription, ack.Result) + } + header := note.Params.Result + for _, key := range []string{"hash", "number", "timestamp", "stateRoot", "miner"} { + v, ok := header[key] + if !ok { + t.Fatalf("head notification missing key %q (got %+v)", key, header) + } + // All-zero values for hash, number, or timestamp would indicate + // the producer hook didn't fire and we're seeing a default- + // constructed header. + if s, _ := v.(string); s == "" || s == "0x" || s == "0x0" { + if key == "hash" || key == "number" || key == "timestamp" { + t.Fatalf("head notification %q has zero-ish value %q", key, s) + } + } + } + t.Logf("received head: number=%v hash=%v", header["number"], header["hash"]) +} diff --git a/sei-tendermint/internal/p2p/giga_router.go b/sei-tendermint/internal/p2p/giga_router.go index 5434979ab0..3922af642c 100644 --- a/sei-tendermint/internal/p2p/giga_router.go +++ b/sei-tendermint/internal/p2p/giga_router.go @@ -42,6 +42,11 @@ type GigaRouterConfig struct { Producer *producer.Config TxMempool *mempool.TxMempool GenDoc *types.GenesisDoc + + // BlockHeaderListener, if non-nil, is invoked after each block is + // committed. Used to feed evmrpc's eth_subscribe("newHeads") without + // going through the legacy Tendermint event bus. + BlockHeaderListener types.BlockHeaderListener } type GigaRouter struct { @@ -287,6 +292,21 @@ func (r *GigaRouter) executeBlock(ctx context.Context, b *atypes.GlobalBlock) (* if err != nil { return nil, fmt.Errorf("r.cfg.App.Commit(): %w", err) } + // Block-header listener fires here — after a successful Commit (so + // subscribers only see committed state) and before the mempool + // reconciliation below (which is bookkeeping, not part of the block + // itself). On a Commit error we return early and the listener does + // not fire — there is no committed block to announce. + if r.cfg.BlockHeaderListener != nil { + header := (&types.Header{ + ChainID: r.cfg.GenDoc.ChainID, + Height: int64(b.GlobalNumber), // nolint:gosec + Time: b.Timestamp, + ProposerAddress: proposerAddress, + AppHash: resp.AppHash, + }).ToProto() + r.cfg.BlockHeaderListener.OnBlockCommitted(hash[:], header, resp) + } blockTxs := make(types.Txs, len(b.Payload.Txs())) for i, tx := range b.Payload.Txs() { blockTxs[i] = tx diff --git a/sei-tendermint/node/node.go b/sei-tendermint/node/node.go index 0016be94da..9b2900928b 100644 --- a/sei-tendermint/node/node.go +++ b/sei-tendermint/node/node.go @@ -89,6 +89,7 @@ func makeNode( tracerProviderOptions []trace.TracerProviderOption, nodeMetrics *NodeMetrics, consensusPolicy types.ConsensusPolicy, + blockHeaderListener types.BlockHeaderListener, ) (_ service.Service, err error) { var cancel context.CancelFunc ctx, cancel = context.WithCancel(ctx) @@ -202,6 +203,7 @@ func makeNode( utils.Some(mp), genDoc, dbProvider, + blockHeaderListener, ) closers = append(closers, peerCloser) if err != nil { diff --git a/sei-tendermint/node/public.go b/sei-tendermint/node/public.go index efc9d8881f..c5e5db4751 100644 --- a/sei-tendermint/node/public.go +++ b/sei-tendermint/node/public.go @@ -32,6 +32,13 @@ func New( nodeMetrics *NodeMetrics, consensusPolicy tmtypes.ConsensusPolicy, ) (service.Service, error) { + // Capture any optional BlockHeaderListener implementation before + // wrapping the app with the ABCI proxy, which only forwards ABCI + // methods. + var blockHeaderListener tmtypes.BlockHeaderListener + if l, ok := app.(tmtypes.BlockHeaderListener); ok { + blockHeaderListener = l + } proxyApp := proxy.New(app, nodeMetrics.proxy) nodeKey, err := tmtypes.LoadOrGenNodeKey(conf.NodeKeyFile()) if err != nil { @@ -65,6 +72,7 @@ func New( tracerProviderOptions, nodeMetrics, consensusPolicy, + blockHeaderListener, ) case config.ModeSeed: return makeSeedNode( diff --git a/sei-tendermint/node/seed.go b/sei-tendermint/node/seed.go index 1e19d9daa3..e70d960188 100644 --- a/sei-tendermint/node/seed.go +++ b/sei-tendermint/node/seed.go @@ -87,6 +87,7 @@ func makeSeedNode( utils.None[*mempool.TxMempool](), genDoc, dbProvider, + nil, ) closers = append(closers, peerCloser) if err != nil { diff --git a/sei-tendermint/node/setup.go b/sei-tendermint/node/setup.go index 142b9d2961..fcffa1c398 100644 --- a/sei-tendermint/node/setup.go +++ b/sei-tendermint/node/setup.go @@ -272,6 +272,7 @@ func createRouter( txMempool utils.Option[*mempool.TxMempool], genDoc *types.GenesisDoc, dbProvider config.DBProvider, + blockHeaderListener types.BlockHeaderListener, ) (*p2p.Router, closer, error) { closer := func() error { return nil } ep, err := p2p.ResolveEndpoint(nodeKey.ID().AddressString(cfg.P2P.ListenAddress)) @@ -369,6 +370,7 @@ func createRouter( if err != nil { return nil, closer, fmt.Errorf("buildGigaConfig: %w", err) } + gigaCfg.BlockHeaderListener = blockHeaderListener logger.Info("Autobahn config loaded", "validators", len(gigaCfg.ValidatorAddrs)) options.Giga = utils.Some(gigaCfg) } diff --git a/sei-tendermint/types/block_listener.go b/sei-tendermint/types/block_listener.go new file mode 100644 index 0000000000..c891a2ed97 --- /dev/null +++ b/sei-tendermint/types/block_listener.go @@ -0,0 +1,20 @@ +package types + +import ( + abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types" + tmproto "github.com/sei-protocol/sei-chain/sei-tendermint/proto/tendermint/types" +) + +// BlockHeaderListener is invoked once a block has been finalized and +// committed by the application. It is intended as a lightweight in-process +// alternative to subscribing on the Tendermint event bus for consumers that +// only need to be notified that a new head is available. +// +// Implementations must not block: the call site sits on the block-execution +// hot path and a slow listener will stall block production. +// +// Callers must pass non-nil header and response. Implementations are not +// required to nil-check; passing nil is a programming error. +type BlockHeaderListener interface { + OnBlockCommitted(hash []byte, header *tmproto.Header, response *abci.ResponseFinalizeBlock) +}