Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,14 @@ var (

const (
MinGasEVMTx = 21000

// NewHeadsNotifierCapacity bounds the in-process eth_newHeads
// notifier buffer. Capacity 1 pairs with the notifier's
// overwrite-on-full semantics: if a consumer lags, the latest head
// always wins and stale heads are dropped. Anything larger only
// buffers staleness — newHeads subscribers care about the current
// head, not a backlog.
NewHeadsNotifierCapacity = 1
)

func init() {
Expand Down Expand Up @@ -445,9 +453,14 @@ type App struct {

HardForkManager *upgrades.HardForkManager

encodingConfig appparams.EncodingConfig
legacyEncodingConfig appparams.EncodingConfig
evmRPCConfig evmrpcconfig.Config
encodingConfig appparams.EncodingConfig
legacyEncodingConfig appparams.EncodingConfig
evmRPCConfig evmrpcconfig.Config
// blockHeaderNotifier is non-nil only when Autobahn is enabled. It is
// fed by GigaRouter after every committed block and consumed by
// evmrpc to drive eth_subscribe("newHeads") without going through the
// Tendermint event bus.
blockHeaderNotifier *evmrpc.BlockHeaderNotifier
adminConfig admin.Config
adminServer *grpc.Server
lightInvarianceConfig LightInvarianceConfig
Expand Down Expand Up @@ -714,6 +727,9 @@ func New(
if err != nil {
panic(fmt.Sprintf("error reading EVM config due to %s", err))
}
if tmConfig != nil && tmConfig.AutobahnConfigFile != "" {
app.blockHeaderNotifier = evmrpc.NewBlockHeaderNotifier(NewHeadsNotifierCapacity)
}
if app.evmRPCConfig.TraceBakeEnabled {
traceDB, dbErr := evmkeeper.NewTraceDB(homePath)
if dbErr != nil {
Expand Down Expand Up @@ -1324,6 +1340,18 @@ func (app *App) ProcessProposalHandler(ctx sdk.Context, req *abci.RequestProcess
return resp, nil
}

// OnBlockCommitted implements tmtypes.BlockHeaderListener. The Autobahn
// block-execution path calls this once per committed block, and the call
// is forwarded to the in-process eth_newHeads notifier consumed by
// evmrpc's SubscriptionAPI. The method is a no-op when Autobahn is not
// enabled (blockHeaderNotifier is nil) and never blocks.
func (app *App) OnBlockCommitted(hash []byte, header *tmproto.Header, response *abci.ResponseFinalizeBlock) {
if app.blockHeaderNotifier == nil {
return
}
app.blockHeaderNotifier.OnBlockCommitted(hash, header, response)
}

func (app *App) FinalizeBlocker(ctx sdk.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
startTime := time.Now()
defer func() {
Expand Down Expand Up @@ -2532,7 +2560,7 @@ func (app *App) RegisterTendermintService(clientCtx client.Context) {
}

if app.evmRPCConfig.WSEnabled {
evmWSServer, err := evmrpc.NewEVMWebSocketServer(app.evmRPCConfig, clientCtx.Client, &app.EvmKeeper, app.BeginBlockKeepers, app.BaseApp, app.TracerAnteHandler, rpcCtxProvider, txConfigProvider, DefaultNodeHome, app.GetStateStore())
evmWSServer, err := evmrpc.NewEVMWebSocketServer(app.evmRPCConfig, clientCtx.Client, &app.EvmKeeper, app.BeginBlockKeepers, app.BaseApp, app.TracerAnteHandler, rpcCtxProvider, txConfigProvider, DefaultNodeHome, app.GetStateStore(), app.blockHeaderNotifier)
if err != nil {
panic(err)
}
Expand Down
82 changes: 82 additions & 0 deletions evmrpc/notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package evmrpc

import (
abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types"
tmproto "github.com/sei-protocol/sei-chain/sei-tendermint/proto/tendermint/types"
)

// blockHeaderEvent is the in-process payload delivered to SubscriptionAPI
// for each committed block. It mirrors the data evmrpc needs to build an
// Ethereum block header, without going through the Tendermint event bus.
//
// hash is the autobahn lane-block header hash passed as Hash to
// app.FinalizeBlock — NOT a hash computed over the (partially-populated)
// Tendermint Header we synthesize for this event. This is the same value
// the eth_getBlockBy* and receipt API surfaces report as blockHash (the
// receipt store on disk records a zero blockHash; evmrpc overlays this
// hash at read time, see evmrpc/tx.go). Surfacing the same hash here
// keeps eth_newHeads consistent with the rest of the EVM RPC surface.
type blockHeaderEvent struct {
hash []byte
header *tmproto.Header
response *abci.ResponseFinalizeBlock
}

// BlockHeaderNotifier implements sei-tendermint/types.BlockHeaderListener
// and feeds eth_subscribe("newHeads") via a direct in-process channel.
//
// Producers (e.g. the Autobahn block-execution path) call OnBlockCommitted
// once per committed block. The single consumer is SubscriptionAPI's
// fan-out goroutine, which broadcasts to all per-client subscribers.
//
// OnBlockCommitted is non-blocking and uses overwrite-on-full semantics:
// if the consumer is lagging, the oldest buffered event is dropped in
// favour of the newest. For eth_newHeads, the latest head is always more
// useful than a stale one.
//
// Concurrency: designed for a single producer (the block-execution loop)
// and a single consumer. Under that invariant the drain+send sequence in
// OnBlockCommitted always lands the new event. With multiple concurrent
// producers the same drain/send sequence still terminates without
// blocking, but the "latest" survivor among any racing publishes is
// nondeterministic — which is still acceptable for newHeads (we promise
// only that some recent head wins, not strict ordering across concurrent
// publishers).
type BlockHeaderNotifier struct {
ch chan blockHeaderEvent
}

func NewBlockHeaderNotifier(capacity int) *BlockHeaderNotifier {
return &BlockHeaderNotifier{ch: make(chan blockHeaderEvent, capacity)}
}

// OnBlockCommitted implements types.BlockHeaderListener.
func (n *BlockHeaderNotifier) OnBlockCommitted(hash []byte, header *tmproto.Header, response *abci.ResponseFinalizeBlock) {
if n == nil {
return
}
evt := blockHeaderEvent{hash: hash, header: header, response: response}
select {
case n.ch <- evt:
return
default:
}
// Buffer full: drain one stale event to make room for the new one.
// With a single producer, draining one slot is sufficient and the
// second send always succeeds. With multiple producers a racing
// publisher could refill the slot between the drain and the send,
// in which case the default branch drops the new event — that is
// still consistent with overwrite-on-full (some recent head wins).
select {
case <-n.ch:
default:
}
select {
case n.ch <- evt:
default:
}
}

func (n *BlockHeaderNotifier) recv() <-chan blockHeaderEvent {
return n.ch
}
113 changes: 113 additions & 0 deletions evmrpc/notifier_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package evmrpc

import (
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types"
tmproto "github.com/sei-protocol/sei-chain/sei-tendermint/proto/tendermint/types"
"github.com/stretchr/testify/require"
)

func TestBlockHeaderNotifier_DeliversEvent(t *testing.T) {
n := NewBlockHeaderNotifier(4)

hash := []byte{0x01, 0x02, 0x03}
header := &tmproto.Header{Height: 42}
resp := &abci.ResponseFinalizeBlock{}

n.OnBlockCommitted(hash, header, resp)

select {
case evt := <-n.recv():
require.Equal(t, hash, evt.hash)
require.Equal(t, header, evt.header)
require.Equal(t, resp, evt.response)
case <-time.After(time.Second):
t.Fatal("expected event on notifier channel")
}
}

func TestBlockHeaderNotifier_OverwritesWhenFull(t *testing.T) {
n := NewBlockHeaderNotifier(1)

// Fill the buffer with a stale event.
n.OnBlockCommitted([]byte{1}, &tmproto.Header{Height: 1}, &abci.ResponseFinalizeBlock{})

// A second call must not block and must replace the buffered event.
done := make(chan struct{})
go func() {
n.OnBlockCommitted([]byte{2}, &tmproto.Header{Height: 2}, &abci.ResponseFinalizeBlock{})
close(done)
}()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("OnBlockCommitted blocked when buffer was full")
}

// The newest event must survive; the stale one must be gone.
evt := <-n.recv()
require.EqualValues(t, 2, evt.header.Height, "expected newest event to win on overwrite")
select {
case extra := <-n.recv():
t.Fatalf("expected stale event to be dropped, got %+v", extra)
case <-time.After(50 * time.Millisecond):
}
}

func TestBlockHeaderNotifier_NilReceiverIsNoOp(t *testing.T) {
var n *BlockHeaderNotifier
// Must not panic.
n.OnBlockCommitted(nil, &tmproto.Header{}, &abci.ResponseFinalizeBlock{})
}

func TestEncodeCommittedBlock(t *testing.T) {
hash := common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111").Bytes()
proposer := common.HexToAddress("0x2222222222222222222222222222222222222222").Bytes()
appHash := common.HexToHash("0x3333333333333333333333333333333333333333333333333333333333333333").Bytes()
ts := time.Unix(1_700_000_000, 0).UTC()
evt := blockHeaderEvent{
hash: hash,
header: &tmproto.Header{
Height: 12345,
Time: ts,
ProposerAddress: proposer,
AppHash: appHash,
},
response: &abci.ResponseFinalizeBlock{
TxResults: []*abci.ExecTxResult{
{GasUsed: 21000},
{GasUsed: 100000},
},
},
}

out := encodeCommittedBlock(evt, big.NewInt(42), 10_000_000)

require.Equal(t, common.BytesToHash(hash), out["hash"])
require.Equal(t, (*hexutil.Big)(big.NewInt(12345)), out["number"])
require.Equal(t, common.BytesToAddress(proposer), out["miner"])
require.Equal(t, common.BytesToHash(appHash), out["stateRoot"])
require.Equal(t, hexutil.Uint64(ts.Unix()), out["timestamp"])
require.Equal(t, hexutil.Uint64(121000), out["gasUsed"])
require.Equal(t, hexutil.Uint64(10_000_000), out["gasLimit"])
require.Equal(t, (*hexutil.Big)(big.NewInt(42)), out["baseFeePerGas"])
// Fields not surfaced by the Autobahn path must be zero, but present.
require.Equal(t, common.Hash{}, out["parentHash"])
require.Equal(t, common.Hash{}, out["receiptsRoot"])
require.Equal(t, common.Hash{}, out["transactionsRoot"])
}

func TestEncodeCommittedBlock_ZeroGasLimit(t *testing.T) {
evt := blockHeaderEvent{
hash: []byte{0xab},
header: &tmproto.Header{Height: 1, Time: time.Unix(0, 0)},
response: &abci.ResponseFinalizeBlock{},
}
out := encodeCommittedBlock(evt, big.NewInt(0), 0)
require.Equal(t, hexutil.Uint64(0), out["gasLimit"])
}
3 changes: 2 additions & 1 deletion evmrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ func NewEVMWebSocketServer(
txConfigProvider func(int64) client.TxConfig,
homeDir string,
stateStore types.StateStore,
blockHeaderNotifier *BlockHeaderNotifier,
) (EVMServer, error) {
// Initialize global worker pool with configuration (metrics are embedded in pool)
// This is idempotent - if HTTP server already initialized it, this is a no-op
Expand Down Expand Up @@ -327,7 +328,7 @@ func NewEVMWebSocketServer(
cacheCreationMutex: cacheCreationMutex,
globalLogSlicePool: globalLogSlicePool,
watermarks: watermarks,
}, &SubscriptionConfig{subscriptionCapacity: 100, newHeadLimit: config.MaxSubscriptionsNewHead}, &FilterConfig{timeout: config.FilterTimeout, maxLog: config.MaxLogNoBlock, maxBlock: config.MaxBlocksForLog}, ConnectionTypeWS),
}, &SubscriptionConfig{subscriptionCapacity: 100, newHeadLimit: config.MaxSubscriptionsNewHead}, &FilterConfig{timeout: config.FilterTimeout, maxLog: config.MaxLogNoBlock, maxBlock: config.MaxBlocksForLog}, ConnectionTypeWS, blockHeaderNotifier),
},
{
Namespace: "web3",
Expand Down
23 changes: 21 additions & 2 deletions evmrpc/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const TestWSPort = 7778
const TestBadPort = 7779
const TestStrictPort = 7780
const TestArchivePort = 7782
const TestNotifierWSPort = 7784

const GenesisBlockHeight = 0
const MockHeight8 = 8
Expand Down Expand Up @@ -114,6 +115,11 @@ var MockBlockIDMultiTx = tmtypes.BlockID{

var NewHeadsCalled = make(chan struct{}, 1)

// NotifierForTest backs the Autobahn-style WS server started on
// TestNotifierWSPort. Tests publish to it via OnBlockCommitted to drive
// eth_subscribe("newHeads") through the in-process notifier path.
var NotifierForTest = evmrpc.NewBlockHeaderNotifier(16)

type MockClient struct {
mock.Client
latestOverride int64
Expand Down Expand Up @@ -667,14 +673,27 @@ func init() {
}

// Start ws server
wsServer, err := evmrpc.NewEVMWebSocketServer(goodConfig, &MockClient{}, EVMKeeper, testApp.BeginBlockKeepers, testApp.BaseApp, testApp.TracerAnteHandler, ctxProvider, txConfigProvider, "", nil)
wsServer, err := evmrpc.NewEVMWebSocketServer(goodConfig, &MockClient{}, EVMKeeper, testApp.BeginBlockKeepers, testApp.BaseApp, testApp.TracerAnteHandler, ctxProvider, txConfigProvider, "", nil, nil)
if err != nil {
panic(err)
}
if err := wsServer.Start(); err != nil {
panic(err)
}
fmt.Printf("wsServer started with config = %+v\n", goodConfig)

// Start a second WS server wired to NotifierForTest, exercising the
// Autobahn (notifier-fed) eth_subscribe("newHeads") path.
notifierConfig := goodConfig
notifierConfig.HTTPPort = TestNotifierWSPort - 1
notifierConfig.WSPort = TestNotifierWSPort
notifierWSServer, err := evmrpc.NewEVMWebSocketServer(notifierConfig, &MockClient{}, EVMKeeper, testApp.BeginBlockKeepers, testApp.BaseApp, testApp.TracerAnteHandler, ctxProvider, txConfigProvider, "", nil, NotifierForTest)
if err != nil {
panic(err)
}
if err := notifierWSServer.Start(); err != nil {
panic(err)
}
time.Sleep(1 * time.Second)

// Generate data
Expand Down Expand Up @@ -1170,7 +1189,7 @@ func sendWSRequestAndListen(t *testing.T, port int, method string, params ...int
headers := make(http.Header)
headers.Set("Origin", "localhost")
headers.Set("Content-Type", "application/json")
conn, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("ws://%s:%d", TestAddr, TestWSPort), headers)
conn, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("ws://%s:%d", TestAddr, port), headers)
require.Nil(t, err)

recv := make(chan map[string]interface{})
Expand Down
Loading
Loading