From 48b5c76612a04170af49f6c725d2759abb70d721 Mon Sep 17 00:00:00 2001 From: Daniel Liu <139250065@qq.com> Date: Fri, 13 Mar 2026 11:53:41 +0800 Subject: [PATCH] fix(core,consensus/XDPoS): split header verification by consensus version, close XFN-12 Problem - Mixed v1/v2 header batches around the switch boundary caused ambiguous verification flow and error attribution. - Early failures could leave unnecessary verification work running without explicit batch-level stop coverage. Fix - Split header verification into contiguous consensus-version batches in core import paths. - Keep strict mixed-batch rejection in XDPoS adaptor as a defensive guard (ErrMixedConsensusBatch). - Restore explicit abort propagation in blockchain batch verification loop. - Add empty-chain guard in HeaderChain.ValidateHeaderChain. Tests - Add consensus batch split unit tests for zero/single/multi/mixed v1-v2 scenarios. - Add adaptor test for mixed-batch rejection. - Add abort regression tests to ensure second batch does not start and first-batch trailing results are not emitted after abort. --- consensus/XDPoS/XDPoS.go | 20 +- consensus/XDPoS/XDPoS_test.go | 20 ++ .../engine_v2_tests/verify_header_test.go | 37 ++- core/blockchain.go | 28 ++- core/blockchain_abort_test.go | 213 ++++++++++++++++++ core/consensus_batch.go | 45 ++++ core/consensus_batch_test.go | 90 ++++++++ core/headerchain.go | 42 ++-- core/headerchain_validation_test.go | 28 +++ 9 files changed, 482 insertions(+), 41 deletions(-) create mode 100644 core/blockchain_abort_test.go create mode 100644 core/consensus_batch.go create mode 100644 core/consensus_batch_test.go create mode 100644 core/headerchain_validation_test.go diff --git a/consensus/XDPoS/XDPoS.go b/consensus/XDPoS/XDPoS.go index 0edf37312005..36cfeb019b40 100644 --- a/consensus/XDPoS/XDPoS.go +++ b/consensus/XDPoS/XDPoS.go @@ -44,6 +44,8 @@ const ( newRoundChanSize = 1 ) +var ErrMixedConsensusBatch = errors.New("mixed v1 and v2 headers in one batch") + func (x *XDPoS) SigHash(header *types.Header) (hash common.Hash) { switch x.config.BlockConsensusVersion(header.Number) { case params.ConsensusEngineVersion2: @@ -231,12 +233,24 @@ func (x *XDPoS) VerifyHeaders(chain consensus.ChainReader, headers []*types.Head v1fullVerifies = append(v1fullVerifies, fullVerifies[i]) } } + v1Count, v2Count := len(v1headers), len(v2headers) - if v1headers != nil { + switch { + case v1Count != 0 && v2Count == 0: x.EngineV1.VerifyHeaders(chain, v1headers, v1fullVerifies, abort, results) - } - if v2headers != nil { + case v1Count == 0 && v2Count != 0: x.EngineV2.VerifyHeaders(chain, v2headers, v2fullVerifies, abort, results) + case v1Count != 0 && v2Count != 0: + go func() { + for range headers { + select { + case <-abort: + return + case results <- ErrMixedConsensusBatch: + } + } + }() + return abort, results } return abort, results diff --git a/consensus/XDPoS/XDPoS_test.go b/consensus/XDPoS/XDPoS_test.go index c4c555d49403..b26621714c69 100644 --- a/consensus/XDPoS/XDPoS_test.go +++ b/consensus/XDPoS/XDPoS_test.go @@ -1,9 +1,11 @@ package XDPoS import ( + "math/big" "testing" "github.com/XinFinOrg/XDPoSChain/core/rawdb" + "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/params" "github.com/stretchr/testify/assert" ) @@ -16,3 +18,21 @@ func TestAdaptorShouldShareDbWithV1Engine(t *testing.T) { assert := assert.New(t) assert.Equal(engine.EngineV1.GetDb(), engine.GetDb()) } + +func TestVerifyHeadersRejectsMixedConsensusBatch(t *testing.T) { + database := rawdb.NewMemoryDatabase() + config := params.TestXDPoSMockChainConfig + engine := New(config, database) + + headers := []*types.Header{ + {Number: big.NewInt(900)}, + {Number: big.NewInt(901)}, + } + fullVerifies := []bool{false, false} + + abort, results := engine.VerifyHeaders(nil, headers, fullVerifies) + defer close(abort) + + assert.ErrorIs(t, <-results, ErrMixedConsensusBatch) + assert.ErrorIs(t, <-results, ErrMixedConsensusBatch) +} diff --git a/consensus/tests/engine_v2_tests/verify_header_test.go b/consensus/tests/engine_v2_tests/verify_header_test.go index 96466b58557c..3e8970cf1aed 100644 --- a/consensus/tests/engine_v2_tests/verify_header_test.go +++ b/consensus/tests/engine_v2_tests/verify_header_test.go @@ -412,29 +412,26 @@ func TestShouldVerifyHeaders(t *testing.T) { blockchain, _, _, _, _, _ := PrepareXDCTestBlockChainForV2Engine(t, 910, &config, nil) adaptor := blockchain.Engine().(*XDPoS.XDPoS) - // Happy path - var happyPathHeaders []*types.Header - happyPathHeaders = append(happyPathHeaders, blockchain.GetBlockByNumber(899).Header(), blockchain.GetBlockByNumber(900).Header(), blockchain.GetBlockByNumber(901).Header(), blockchain.GetBlockByNumber(902).Header()) - // Randomly set full verify - var fullVerifies []bool - fullVerifies = append(fullVerifies, false, true, true, false) - _, results := adaptor.VerifyHeaders(blockchain, happyPathHeaders, fullVerifies) - var verified []bool - for { - select { - case result := <-results: - if result != nil { - panic("Error received while verifying headers") - } - verified = append(verified, true) - case <-time.After(time.Duration(5) * time.Second): // It should be very fast to verify headers - if len(verified) == len(happyPathHeaders) { - return - } else { - panic("Suppose to have verified 3 block headers") + verifyBatch := func(headers []*types.Header, fullVerifies []bool) { + _, results := adaptor.VerifyHeaders(blockchain, headers, fullVerifies) + for i := 0; i < len(headers); i++ { + select { + case result := <-results: + assert.Nil(t, result) + case <-time.After(5 * time.Second): // Header verification should finish quickly. + t.Fatalf("timed out waiting for verify result at index %d", i) } } } + + // VerifyHeaders now rejects mixed v1/v2 batches, so verify each version in its own batch. + v1Headers := []*types.Header{blockchain.GetBlockByNumber(899).Header(), blockchain.GetBlockByNumber(900).Header()} + v1FullVerifies := []bool{false, true} + verifyBatch(v1Headers, v1FullVerifies) + + v2Headers := []*types.Header{blockchain.GetBlockByNumber(901).Header(), blockchain.GetBlockByNumber(902).Header()} + v2FullVerifies := []bool{true, false} + verifyBatch(v2Headers, v2FullVerifies) } func TestShouldVerifyHeadersEvenIfParentsNotYetWrittenIntoDB(t *testing.T) { diff --git a/core/blockchain.go b/core/blockchain.go index 3a4ec2ae473a..2c4aa12725d9 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1525,8 +1525,34 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] seals[i] = verifySeals bc.downloadingBlock.Add(block.Hash(), struct{}{}) } - abort, results := bc.engine.VerifyHeaders(bc, headers, seals) + abort := make(chan struct{}) defer close(abort) + results := make(chan error, len(headers)) + go func() { + for _, batch := range splitHeadersByConsensusVersion(bc.chainConfig, headers) { + batchAbort, batchResults := bc.engine.VerifyHeaders(bc, headers[batch.start:batch.end], seals[batch.start:batch.end]) + stopped := false + for i := batch.start; i < batch.end; i++ { + select { + case <-abort: + stopped = true + case err := <-batchResults: + select { + case <-abort: + stopped = true + case results <- err: + } + } + if stopped { + break + } + } + close(batchAbort) + if stopped { + return + } + } + }() // Peek the error for the first block to decide the directing import logic it := newInsertIterator(chain, results, bc.validator) diff --git a/core/blockchain_abort_test.go b/core/blockchain_abort_test.go new file mode 100644 index 000000000000..d88395c7f742 --- /dev/null +++ b/core/blockchain_abort_test.go @@ -0,0 +1,213 @@ +package core + +import ( + "errors" + "math/big" + "sync" + "testing" + "time" + + "github.com/XinFinOrg/XDPoSChain/consensus" + "github.com/XinFinOrg/XDPoSChain/consensus/ethash" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" + "github.com/XinFinOrg/XDPoSChain/core/types" + "github.com/XinFinOrg/XDPoSChain/core/vm" + "github.com/XinFinOrg/XDPoSChain/params" +) + +// stopAwareVerifyEngine overrides VerifyHeaders to make the first batch block +// after its first result, so we can assert insertChain aborts before starting +// the second batch. +type stopAwareVerifyEngine struct { + consensus.Engine + + mu sync.Mutex + verifyCalls int + secondBatchStarted chan struct{} + secondBatchOnce sync.Once + firstBatchStopped chan struct{} +} + +func (e *stopAwareVerifyEngine) VerifyHeaders(chain consensus.ChainReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) { + e.mu.Lock() + e.verifyCalls++ + call := e.verifyCalls + e.mu.Unlock() + + abort := make(chan struct{}) + results := make(chan error, len(headers)) + + switch call { + case 1: + go func() { + if len(headers) > 0 { + results <- errors.New("forced first-batch failure") + } + <-abort + close(e.firstBatchStopped) + }() + default: + e.secondBatchOnce.Do(func() { close(e.secondBatchStarted) }) + go func() { + for range headers { + select { + case <-abort: + return + case results <- nil: + } + } + }() + } + + return abort, results +} + +// firstBatchContinuationEngine tries to emit a second result from the same +// batch after a delay; if outer abort propagation works, that send must not +// happen. +type firstBatchContinuationEngine struct { + consensus.Engine + + secondResultSent chan struct{} + allowSecondDecision chan struct{} + secondDecisionDone chan struct{} + abortObserved chan struct{} +} + +func (e *firstBatchContinuationEngine) VerifyHeaders(chain consensus.ChainReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) { + abort := make(chan struct{}) + results := make(chan error, len(headers)) + go func() { + <-abort + close(e.abortObserved) + }() + + go func() { + if len(headers) == 0 { + close(e.secondDecisionDone) + return + } + results <- errors.New("forced first-result failure") + if len(headers) == 1 { + close(e.secondDecisionDone) + return + } + + <-e.allowSecondDecision + select { + case <-abort: + default: + close(e.secondResultSent) + } + close(e.secondDecisionDone) + }() + + return abort, results +} + +func waitForSignal(t *testing.T, ch <-chan struct{}, name string) { + t.Helper() + timeout := 5 * time.Second + if deadline, ok := t.Deadline(); ok { + if remaining := time.Until(deadline) / 2; remaining > 0 && remaining < timeout { + timeout = remaining + } + } + select { + case <-ch: + case <-time.After(timeout): + t.Fatalf("timed out waiting for %s", name) + } +} + +func TestInsertChainAbortStopsBeforeSecondConsensusBatch(t *testing.T) { + testdb := rawdb.NewMemoryDatabase() + + cfg := *params.TestXDPoSMockChainConfig + xdposCfg := *cfg.XDPoS + v2Src := xdposCfg.V2 + v2Cfg := ¶ms.V2{ + SwitchEpoch: v2Src.SwitchEpoch, + SwitchBlock: big.NewInt(2), // #1,#2 -> v1 batch; #3+ -> v2 batch + CurrentConfig: v2Src.CurrentConfig, + AllConfigs: v2Src.AllConfigs, + } + v2Cfg.BuildConfigIndex() + xdposCfg.V2 = v2Cfg + cfg.XDPoS = &xdposCfg + + engine := &stopAwareVerifyEngine{ + Engine: ethash.NewFaker(), + secondBatchStarted: make(chan struct{}), + firstBatchStopped: make(chan struct{}), + } + + gspec := &Genesis{Config: &cfg, ExtraData: make([]byte, 32+65)} + genesis := gspec.MustCommit(testdb) + blocks, _ := GenerateChain(&cfg, genesis, engine, testdb, 3, nil) + + bc, err := NewBlockChain(testdb, nil, gspec, engine, vm.Config{}) + if err != nil { + t.Fatalf("failed to create blockchain: %v", err) + } + defer bc.Stop() + + if _, err := bc.InsertChain(blocks); err == nil { + t.Fatal("expected InsertChain to fail on forced first-batch error") + } + waitForSignal(t, engine.firstBatchStopped, "first batch stop after abort") + + select { + case <-engine.secondBatchStarted: + t.Fatal("second consensus batch started despite early abort") + default: + } +} + +func TestInsertChainAbortStopsFirstBatchTrailingResults(t *testing.T) { + testdb := rawdb.NewMemoryDatabase() + + cfg := *params.TestXDPoSMockChainConfig + xdposCfg := *cfg.XDPoS + v2Src := xdposCfg.V2 + v2Cfg := ¶ms.V2{ + SwitchEpoch: v2Src.SwitchEpoch, + SwitchBlock: big.NewInt(10), // all generated blocks stay in the first batch + CurrentConfig: v2Src.CurrentConfig, + AllConfigs: v2Src.AllConfigs, + } + v2Cfg.BuildConfigIndex() + xdposCfg.V2 = v2Cfg + cfg.XDPoS = &xdposCfg + + engine := &firstBatchContinuationEngine{ + Engine: ethash.NewFaker(), + secondResultSent: make(chan struct{}), + allowSecondDecision: make(chan struct{}), + secondDecisionDone: make(chan struct{}), + abortObserved: make(chan struct{}), + } + + gspec := &Genesis{Config: &cfg, ExtraData: make([]byte, 32+65)} + genesis := gspec.MustCommit(testdb) + blocks, _ := GenerateChain(&cfg, genesis, engine, testdb, 3, nil) + + bc, err := NewBlockChain(testdb, nil, gspec, engine, vm.Config{}) + if err != nil { + t.Fatalf("failed to create blockchain: %v", err) + } + defer bc.Stop() + + if _, err := bc.InsertChain(blocks); err == nil { + t.Fatal("expected InsertChain to fail on forced first-result failure") + } + waitForSignal(t, engine.abortObserved, "batch abort propagation") + close(engine.allowSecondDecision) + waitForSignal(t, engine.secondDecisionDone, "second-result decision") + + select { + case <-engine.secondResultSent: + t.Fatal("trailing result from first batch was emitted after abort") + default: + } +} diff --git a/core/consensus_batch.go b/core/consensus_batch.go new file mode 100644 index 000000000000..8905cfd19608 --- /dev/null +++ b/core/consensus_batch.go @@ -0,0 +1,45 @@ +package core + +import ( + "github.com/XinFinOrg/XDPoSChain/core/types" + "github.com/XinFinOrg/XDPoSChain/params" +) + +type consensusHeaderBatch struct { + start int + end int // [start, end) +} + +// splitHeadersByConsensusVersion groups contiguous headers by consensus version. +// For non-XDPoS chains, the full input is returned as a single batch. +func splitHeadersByConsensusVersion(config *params.ChainConfig, headers []*types.Header) []consensusHeaderBatch { + if len(headers) == 0 { + return nil + } + if config == nil || config.XDPoS == nil { + return []consensusHeaderBatch{{start: 0, end: len(headers)}} + } + + versionOf := func(header *types.Header) string { + if header == nil || header.Number == nil { + return params.ConsensusEngineVersion1 + } + return config.XDPoS.BlockConsensusVersion(header.Number) + } + + batches := make([]consensusHeaderBatch, 0, 2) + start := 0 + currentVersion := versionOf(headers[0]) + + for i := 1; i < len(headers); i++ { + version := versionOf(headers[i]) + if version != currentVersion { + batches = append(batches, consensusHeaderBatch{start: start, end: i}) + start = i + currentVersion = version + } + } + batches = append(batches, consensusHeaderBatch{start: start, end: len(headers)}) + + return batches +} diff --git a/core/consensus_batch_test.go b/core/consensus_batch_test.go new file mode 100644 index 000000000000..89aa6d1dff6b --- /dev/null +++ b/core/consensus_batch_test.go @@ -0,0 +1,90 @@ +package core + +import ( + "math/big" + "reflect" + "testing" + + "github.com/XinFinOrg/XDPoSChain/core/types" + "github.com/XinFinOrg/XDPoSChain/params" +) + +func TestSplitHeadersByConsensusVersion_XDPoS(t *testing.T) { + makeHeaders := func(numbers ...int64) []*types.Header { + headers := make([]*types.Header, 0, len(numbers)) + for _, n := range numbers { + headers = append(headers, &types.Header{Number: big.NewInt(n)}) + } + return headers + } + + tests := []struct { + name string + headers []*types.Header + expected []consensusHeaderBatch + }{ + { + name: "zero blocks", + headers: makeHeaders(), + expected: nil, + }, + { + name: "single v1 block", + headers: makeHeaders(900), + expected: []consensusHeaderBatch{{start: 0, end: 1}}, + }, + { + name: "single v2 block", + headers: makeHeaders(901), + expected: []consensusHeaderBatch{{start: 0, end: 1}}, + }, + { + name: "multiple v1 blocks", + headers: makeHeaders(899, 900), + expected: []consensusHeaderBatch{{start: 0, end: 2}}, + }, + { + name: "multiple v2 blocks", + headers: makeHeaders(901, 902), + expected: []consensusHeaderBatch{{start: 0, end: 2}}, + }, + { + name: "single v1 and multiple v2 blocks", + headers: makeHeaders(900, 901, 902), + expected: []consensusHeaderBatch{{start: 0, end: 1}, {start: 1, end: 3}}, + }, + { + name: "multiple v1 and single v2 block", + headers: makeHeaders(899, 900, 901), + expected: []consensusHeaderBatch{{start: 0, end: 2}, {start: 2, end: 3}}, + }, + { + name: "multiple v1 and multiple v2 blocks", + headers: makeHeaders(899, 900, 901, 902), + expected: []consensusHeaderBatch{{start: 0, end: 2}, {start: 2, end: 4}}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := splitHeadersByConsensusVersion(params.TestXDPoSMockChainConfig, tt.headers) + if !reflect.DeepEqual(got, tt.expected) { + t.Fatalf("unexpected batch split, got=%+v expected=%+v", got, tt.expected) + } + }) + } +} + +func TestSplitHeadersByConsensusVersion_NonXDPoS(t *testing.T) { + headers := []*types.Header{ + {Number: big.NewInt(1)}, + {Number: big.NewInt(2)}, + } + batches := splitHeadersByConsensusVersion(nil, headers) + if len(batches) != 1 { + t.Fatalf("expected 1 batch, got %d", len(batches)) + } + if batches[0].start != 0 || batches[0].end != len(headers) { + t.Fatalf("unexpected batch range: %+v", batches[0]) + } +} diff --git a/core/headerchain.go b/core/headerchain.go index c1d6150d6da0..a40723ce7976 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -219,6 +219,10 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er type WhCallback func(*types.Header) error func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int) (int, error) { + if len(chain) == 0 { + return 0, nil + } + // Do a sanity check that the provided chain is actually ordered and linked for i := 1; i < len(chain); i++ { if chain[i].Number.Uint64() != chain[i-1].Number.Uint64()+1 || chain[i].ParentHash != chain[i-1].Hash() { @@ -246,24 +250,28 @@ func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int) seals[len(seals)-1] = true } - abort, results := hc.engine.VerifyHeaders(hc, chain, seals) - defer close(abort) - - // Iterate over the headers and ensure they all check out - for i, header := range chain { - // If the chain is terminating, stop processing blocks - if hc.procInterrupt() { - log.Debug("Premature abort during headers verification") - return 0, errors.New("aborted") - } - // If the header is a banned one, straight out abort - if BadHashes[header.Hash()] { - return i, ErrDenylistedHash - } - // Otherwise wait for headers checks and ensure they pass - if err := <-results; err != nil { - return i, err + for _, batch := range splitHeadersByConsensusVersion(hc.config, chain) { + abort, results := hc.engine.VerifyHeaders(hc, chain[batch.start:batch.end], seals[batch.start:batch.end]) + for i := batch.start; i < batch.end; i++ { + header := chain[i] + // If the chain is terminating, stop processing blocks + if hc.procInterrupt() { + close(abort) + log.Debug("Premature abort during headers verification") + return 0, errors.New("aborted") + } + // If the header is a banned one, straight out abort + if BadHashes[header.Hash()] { + close(abort) + return i, ErrDenylistedHash + } + // Otherwise wait for headers checks and ensure they pass + if err := <-results; err != nil { + close(abort) + return i, err + } } + close(abort) } return 0, nil diff --git a/core/headerchain_validation_test.go b/core/headerchain_validation_test.go new file mode 100644 index 000000000000..0e01c9fd6edc --- /dev/null +++ b/core/headerchain_validation_test.go @@ -0,0 +1,28 @@ +package core + +import ( + "testing" + + "github.com/XinFinOrg/XDPoSChain/consensus/ethash" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" + "github.com/XinFinOrg/XDPoSChain/core/vm" + "github.com/XinFinOrg/XDPoSChain/params" +) + +func TestValidateHeaderChain_EmptyChain(t *testing.T) { + testdb := rawdb.NewMemoryDatabase() + gspec := &Genesis{Config: params.TestChainConfig} + if _, err := gspec.Commit(testdb); err != nil { + t.Fatalf("failed to commit genesis: %v", err) + } + + bc, err := NewBlockChain(testdb, nil, gspec, ethash.NewFaker(), vm.Config{}) + if err != nil { + t.Fatalf("failed to create blockchain: %v", err) + } + defer bc.Stop() + + if idx, err := bc.hc.ValidateHeaderChain(nil, 1); err != nil || idx != 0 { + t.Fatalf("empty chain should pass, idx=%d err=%v", idx, err) + } +}