From 9a7b1f8daf1c8e6217e721e1713c900f9bc6de50 Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Thu, 19 Feb 2026 11:12:04 +0100 Subject: [PATCH 1/2] sync: some fixes and better testing --- src/lean_spec/subspecs/chain/clock.py | 3 +- src/lean_spec/subspecs/chain/config.py | 4 - src/lean_spec/subspecs/chain/service.py | 23 ++- src/lean_spec/subspecs/sync/__init__.py | 6 +- src/lean_spec/subspecs/sync/backfill_sync.py | 31 +-- src/lean_spec/subspecs/sync/block_cache.py | 26 +-- .../subspecs/sync/checkpoint_sync.py | 3 +- src/lean_spec/subspecs/sync/config.py | 3 + src/lean_spec/subspecs/sync/head_sync.py | 35 ++-- src/lean_spec/subspecs/sync/peer_manager.py | 37 +++- src/lean_spec/subspecs/sync/service.py | 60 +++--- src/lean_spec/subspecs/sync/states.py | 28 ++- tests/lean_spec/helpers/mocks.py | 8 + .../subspecs/sync/test_backfill_sync.py | 92 ++++++++- .../subspecs/sync/test_block_cache.py | 3 - .../subspecs/sync/test_checkpoint_sync.py | 15 ++ .../lean_spec/subspecs/sync/test_head_sync.py | 170 ++++++++++++++++- .../subspecs/sync/test_peer_manager.py | 72 ++++++- tests/lean_spec/subspecs/sync/test_service.py | 176 +++++++++++++++++- tests/lean_spec/subspecs/sync/test_states.py | 26 --- 20 files changed, 663 insertions(+), 158 deletions(-) diff --git a/src/lean_spec/subspecs/chain/clock.py b/src/lean_spec/subspecs/chain/clock.py index 80afaffd..aaa6410e 100644 --- a/src/lean_spec/subspecs/chain/clock.py +++ b/src/lean_spec/subspecs/chain/clock.py @@ -1,6 +1,5 @@ """ -Slot Clock -========== +Slot Clock. Time-to-slot conversion for Lean Consensus. diff --git a/src/lean_spec/subspecs/chain/config.py b/src/lean_spec/subspecs/chain/config.py index f56da712..45d8aeeb 100644 --- a/src/lean_spec/subspecs/chain/config.py +++ b/src/lean_spec/subspecs/chain/config.py @@ -4,8 +4,6 @@ from lean_spec.types.uint import Uint64 -# --- Time Parameters --- - INTERVALS_PER_SLOT = Uint64(5) """Number of intervals per slot for forkchoice processing.""" @@ -21,8 +19,6 @@ JUSTIFICATION_LOOKBACK_SLOTS: Final = Uint64(3) """The number of slots to lookback for justification.""" -# --- State List Length Presets --- - HISTORICAL_ROOTS_LIMIT: Final = Uint64(2**18) """ The maximum number of historical block roots to store in the state. diff --git a/src/lean_spec/subspecs/chain/service.py b/src/lean_spec/subspecs/chain/service.py index 54072f1f..791d6c1b 100644 --- a/src/lean_spec/subspecs/chain/service.py +++ b/src/lean_spec/subspecs/chain/service.py @@ -1,19 +1,18 @@ """ Chain service that drives consensus timing. -The Chain Problem ------------------ -Ethereum consensus runs on a clock. Every 4 seconds (1 slot), validators: -- Interval 0: Propose blocks -- Interval 1: Create attestations -- Interval 2: Update safe target -- Interval 3: Accept attestations into fork choice +Ethereum consensus runs on a clock. Every 4 seconds (1 slot), validators act +at each of the 5 intervals: + +- Interval 0: Block proposal +- Interval 1: Vote propagation +- Interval 2: Aggregation +- Interval 3: Safe target update +- Interval 4: Attestation acceptance into fork choice The Store has all this logic built in. But nothing drives the clock. -ChainService is that driver - a simple timer loop. +ChainService is that driver — a simple timer loop: -How It Works ------------- 1. Sleep until next interval boundary 2. Get current wall-clock time 3. Tick the store forward to current time @@ -154,8 +153,8 @@ async def _tick_to(self, target_interval: Interval) -> list[SignedAggregatedAtte Between each remaining interval tick, yields to the event loop so gossip messages can be processed. - Updates ``self.sync_service.store`` in place after each tick so - concurrent gossip handlers see current time. + Updates the sync service's store after each tick so concurrent + gossip handlers see current time. Returns aggregated attestations produced during the ticks. """ diff --git a/src/lean_spec/subspecs/sync/__init__.py b/src/lean_spec/subspecs/sync/__init__.py index bdf3c0ee..b65141b2 100644 --- a/src/lean_spec/subspecs/sync/__init__.py +++ b/src/lean_spec/subspecs/sync/__init__.py @@ -2,13 +2,13 @@ Sync service for the lean Ethereum consensus client. What Is Sync? -------------- + When an Ethereum node starts, it needs to catch up with the network. The chain may be millions of blocks ahead. Sync is the process of downloading and validating those blocks until the node reaches the chain head. The Challenge -------------- + Sync is harder than it sounds: 1. **Ordering**: Blocks reference parents; children arrive before parents @@ -16,7 +16,7 @@ 3. **Progress tracking**: Need to know when we are "done" How It Works ------------- + - Blocks arrive via gossip subscription - If parent is known, process immediately - If parent is unknown, cache block and fetch parent (backfill) diff --git a/src/lean_spec/subspecs/sync/backfill_sync.py b/src/lean_spec/subspecs/sync/backfill_sync.py index c8a48922..5be599eb 100644 --- a/src/lean_spec/subspecs/sync/backfill_sync.py +++ b/src/lean_spec/subspecs/sync/backfill_sync.py @@ -6,7 +6,7 @@ is called "backfill" because we are filling in gaps going backward in time. The Challenge -------------- + Blocks can arrive out of order for several reasons: 1. **Gossip timing**: A child block gossips faster than its parent @@ -17,7 +17,7 @@ resolve them once their parents arrive or are explicitly fetched. How It Works ------------- + 1. Track orphan blocks in the BlockCache 2. When an orphan is detected, request its parent from peers 3. If the fetched parent is also an orphan, request its parent @@ -28,7 +28,7 @@ and handles dynamic gaps naturally. Depth Limiting --------------- + Backfill depth is limited to prevent attacks and resource exhaustion: - An attacker could send a block claiming to have a parent millions of slots ago @@ -92,23 +92,23 @@ class BackfillSync: arrive with unknown parents, this class orchestrates fetching those parents. How It Works - ------------ + 1. **Detection**: BlockCache marks blocks as orphans when added 2. **Request**: BackfillSync requests missing parents from peers 3. **Recursion**: If fetched parents are also orphans, continue fetching 4. **Resolution**: When parent chain is complete, blocks become processable Integration - ----------- + BackfillSync does not process blocks itself. It only ensures parents exist in the BlockCache. The SyncService is responsible for: - - Calling `fill_missing()` when orphans are detected + - Triggering backfill when orphans are detected - Processing blocks when they become processable - Integrating blocks into the Store Thread Safety - ------------- + This class is designed for single-threaded async operation. The `_pending` set tracks in-flight requests to avoid duplicate fetches. """ @@ -208,15 +208,15 @@ async def _fetch_batch( ) if blocks: - # Request succeeded. + # Request succeeded with data. self.peer_manager.on_request_success(peer.peer_id) # Add blocks to cache and check for further orphans. await self._process_received_blocks(blocks, peer.peer_id, depth) else: # Empty response. Peer may not have the blocks. - # This is not necessarily a failure (blocks may not exist). - pass + # Still a completed request — release the in-flight slot. + self.peer_manager.on_request_success(peer.peer_id) except Exception: # Network error. @@ -264,17 +264,6 @@ async def _process_received_blocks( if new_orphan_parents: await self.fill_missing(new_orphan_parents, depth=depth + 1) - async def fill_all_orphans(self) -> None: - """ - Fetch parents for all current orphan blocks. - - Convenience method that fetches the parent roots of all blocks - currently marked as orphans in the cache. - """ - orphan_parents = self.block_cache.get_orphan_parents() - if orphan_parents: - await self.fill_missing(orphan_parents) - def reset(self) -> None: """Clear all pending state.""" self._pending.clear() diff --git a/src/lean_spec/subspecs/sync/block_cache.py b/src/lean_spec/subspecs/sync/block_cache.py index fc803df7..8ce51871 100644 --- a/src/lean_spec/subspecs/sync/block_cache.py +++ b/src/lean_spec/subspecs/sync/block_cache.py @@ -2,7 +2,7 @@ Block cache for downloaded blocks awaiting parent resolution. Why Cache Blocks? ------------------ + In an ideal world, blocks arrive in perfect order: parent before child, always. Reality differs. Network latency, parallel downloads, and gossip propagation mean blocks often arrive before their parents are known. @@ -16,7 +16,7 @@ arrives, then process both. How It Works ------------- + The cache maintains three data structures: 1. **Block storage**: Maps block root to PendingBlock (the block + metadata) @@ -31,7 +31,7 @@ 4. Recursively check if processed children have their own waiting children Memory Safety -------------- + The cache is bounded by MAX_CACHED_BLOCKS (1024). When full, FIFO eviction removes the oldest blocks. This prevents memory exhaustion from attacks or prolonged network partitions that could otherwise grow the cache unboundedly. @@ -251,7 +251,7 @@ def remove(self, root: Bytes32) -> PendingBlock | None: return None # Clean up orphan tracking. - self._orphans.discard(root) + self.unmark_orphan(root) # Clean up parent index. # @@ -374,29 +374,11 @@ def get_processable(self, store: "Store") -> list[PendingBlock]: # Sort ensures parent-before-child processing order. return sorted(processable, key=lambda p: p.slot) - def get_highest_slot(self) -> Slot | None: - """ - Get the highest slot among cached blocks. - - This is useful for progress reporting without exposing internal storage. - - Returns: - The highest slot in the cache, or None if the cache is empty. - """ - if not self._blocks: - return None - return max(p.slot for p in self._blocks.values()) - @property def orphan_count(self) -> int: """Number of orphan blocks in the cache.""" return len(self._orphans) - @property - def is_empty(self) -> bool: - """Check if the cache is empty.""" - return len(self._blocks) == 0 - def clear(self) -> None: """Remove all blocks from the cache.""" self._blocks.clear() diff --git a/src/lean_spec/subspecs/sync/checkpoint_sync.py b/src/lean_spec/subspecs/sync/checkpoint_sync.py index 9e88558c..9710f68d 100644 --- a/src/lean_spec/subspecs/sync/checkpoint_sync.py +++ b/src/lean_spec/subspecs/sync/checkpoint_sync.py @@ -151,8 +151,7 @@ async def verify_checkpoint_state(state: "State") -> bool: # If the data was corrupted, hashing will likely fail or produce # an unexpected result. We log the root for debugging. state_root = hash_tree_root(state) - root_preview = state_root.hex()[:16] - logger.info(f"Checkpoint state verified: slot={state.slot}, root={root_preview}...") + logger.info(f"Checkpoint state verified: slot={state.slot}, root={state_root}...") return True except Exception as e: diff --git a/src/lean_spec/subspecs/sync/config.py b/src/lean_spec/subspecs/sync/config.py index f1dc0733..cb7fd2b4 100644 --- a/src/lean_spec/subspecs/sync/config.py +++ b/src/lean_spec/subspecs/sync/config.py @@ -22,3 +22,6 @@ MAX_BACKFILL_DEPTH: Final[int] = 512 """Maximum depth for backfill parent chain resolution.""" + +MAX_PENDING_ATTESTATIONS: Final[int] = 1024 +"""Maximum buffered attestations awaiting block processing.""" diff --git a/src/lean_spec/subspecs/sync/head_sync.py b/src/lean_spec/subspecs/sync/head_sync.py index 96610c9e..a04cf282 100644 --- a/src/lean_spec/subspecs/sync/head_sync.py +++ b/src/lean_spec/subspecs/sync/head_sync.py @@ -6,7 +6,7 @@ of operation. How It Works ------------- + 1. **Gossip arrives**: A new block is received via gossip subscription 2. **Check parent**: Does the parent exist in our Store? 3. **If yes**: Process immediately and check for cached descendants @@ -18,7 +18,7 @@ - Natural handling of out-of-order arrivals Descendant Processing ---------------------- + When a parent block arrives (either via gossip or backfill), there may be cached children waiting for it: @@ -31,7 +31,7 @@ common ancestor arrives. Error Handling --------------- + Block processing can fail for various reasons: - Invalid signatures @@ -95,11 +95,11 @@ class HeadSync: possible or caching them for later processing. How It Works - ------------ + When a gossip block arrives: 1. **Check Store**: Is the parent already in our Store? - - Yes: Process immediately via `store.on_block()` + - Yes: Process immediately - No: Cache the block and trigger backfill 2. **If processed**: Check cache for descendants @@ -109,10 +109,10 @@ class HeadSync: 3. **Return result**: Report what happened for state machine decisions Integration - ----------- + HeadSync receives blocks but does not own the Store. The SyncService must: - - Call `on_gossip_block()` when gossip blocks arrive + - Forward gossip blocks when they arrive - Apply the returned Store updates - Use the result to update sync state @@ -131,7 +131,7 @@ class HeadSync: This is injected to allow flexibility in block processing. - The default implementation uses `store.on_block()`, but tests can inject mocks. + The default implementation uses the store's block processing, but tests can inject mocks. Signature: (store, block) -> new_store Raises: Exception on validation failure @@ -169,8 +169,8 @@ async def on_gossip_block( logger.debug( "on_gossip_block: slot=%s root=%s parent=%s", slot, - block_root.hex()[:8], - parent_root.hex()[:8], + block_root.hex(), + parent_root.hex(), ) # Skip if already processing (reentrant call). @@ -263,7 +263,7 @@ async def _process_block_with_descendants( ), store # Process cached descendants. - descendants_count = await self._process_cached_descendants( + descendants_count, store = await self._process_cached_descendants( parent_root=block_root, store=store, peer_id=peer_id, @@ -284,7 +284,7 @@ async def _process_cached_descendants( parent_root: Bytes32, store: Store, peer_id: PeerId | None, - ) -> int: + ) -> tuple[int, Store]: """ Process any cached blocks that descend from the given parent. @@ -300,7 +300,7 @@ async def _process_cached_descendants( peer_id: Peer ID for error attribution. Returns: - Number of descendants successfully processed. + Tuple of (descendants successfully processed, updated store). """ processed_count = 0 @@ -328,15 +328,13 @@ async def _process_cached_descendants( # Remove from cache after successful processing. self.block_cache.remove(child_root) - # Unmark orphan status. - self.block_cache.unmark_orphan(child_root) - # Recursively process this child's descendants. - processed_count += await self._process_cached_descendants( + desc_count, store = await self._process_cached_descendants( parent_root=child_root, store=store, peer_id=peer_id, ) + processed_count += desc_count except Exception: # Processing failed. Leave in cache for retry or discard. @@ -346,7 +344,7 @@ async def _process_cached_descendants( finally: self._processing.discard(child_root) - return processed_count + return processed_count, store async def _cache_and_backfill( self, @@ -422,7 +420,6 @@ async def process_all_processable(self, store: Store) -> tuple[int, Store]: store = self.process_block(store, pending.block) processed_count += 1 self.block_cache.remove(pending.root) - self.block_cache.unmark_orphan(pending.root) except Exception: # Processing failed. Remove from cache to avoid infinite loop. diff --git a/src/lean_spec/subspecs/sync/peer_manager.py b/src/lean_spec/subspecs/sync/peer_manager.py index f67f9a85..0f4b9b8d 100644 --- a/src/lean_spec/subspecs/sync/peer_manager.py +++ b/src/lean_spec/subspecs/sync/peer_manager.py @@ -6,6 +6,7 @@ from __future__ import annotations +import random from collections import Counter from dataclasses import dataclass, field @@ -16,6 +17,21 @@ from .config import MAX_CONCURRENT_REQUESTS +INITIAL_PEER_SCORE: int = 100 +"""Starting score for newly added peers.""" + +MIN_PEER_SCORE: int = 0 +"""Minimum peer score (floor).""" + +MAX_PEER_SCORE: int = 200 +"""Maximum peer score (ceiling).""" + +SCORE_SUCCESS_BONUS: int = 10 +"""Score increase for a successful request.""" + +SCORE_FAILURE_PENALTY: int = 20 +"""Score decrease for a failed request.""" + @dataclass(slots=True) class SyncPeer: @@ -34,6 +50,9 @@ class SyncPeer: requests_in_flight: int = 0 """Number of active requests to this peer.""" + score: int = INITIAL_PEER_SCORE + """Peer reputation score. Higher means more reliable.""" + @property def peer_id(self) -> PeerId: """Get the peer's ID.""" @@ -105,7 +124,10 @@ def update_status(self, peer_id: PeerId, status: Status) -> None: def select_peer_for_request(self, min_slot: Slot | None = None) -> SyncPeer | None: """ - Select an available peer for a request. + Select an available peer for a request using weighted random selection. + + Peers with higher scores are more likely to be selected. This avoids + concentrating all load on one peer and naturally prefers reliable peers. Args: min_slot: Optional minimum slot the peer must have. @@ -113,13 +135,20 @@ def select_peer_for_request(self, min_slot: Slot | None = None) -> SyncPeer | No Returns: An available SyncPeer, or None if no suitable peer exists. """ + candidates: list[SyncPeer] = [] for peer in self._peers.values(): if not peer.is_available(): continue if min_slot is not None and not peer.has_slot(min_slot): continue - return peer - return None + candidates.append(peer) + + if not candidates: + return None + + # Weight by score. A score of 0 still gets weight 1 to avoid exclusion. + weights = [max(peer.score, 1) for peer in candidates] + return random.choices(candidates, weights=weights, k=1)[0] def get_network_finalized_slot(self) -> Slot | None: """ @@ -142,12 +171,14 @@ def on_request_success(self, peer_id: PeerId) -> None: peer = self._peers.get(peer_id) if peer is not None: peer.on_request_complete() + peer.score = min(peer.score + SCORE_SUCCESS_BONUS, MAX_PEER_SCORE) def on_request_failure(self, peer_id: PeerId) -> None: """Record a failed request to a peer.""" peer = self._peers.get(peer_id) if peer is not None: peer.on_request_complete() + peer.score = max(peer.score - SCORE_FAILURE_PENALTY, MIN_PEER_SCORE) def get_all_peers(self) -> list[SyncPeer]: """Get all tracked peers.""" diff --git a/src/lean_spec/subspecs/sync/service.py b/src/lean_spec/subspecs/sync/service.py index 9dbe0094..db7e7e5e 100644 --- a/src/lean_spec/subspecs/sync/service.py +++ b/src/lean_spec/subspecs/sync/service.py @@ -4,25 +4,25 @@ This is the main entry point for synchronization. The Core Problem ----------------- + When an Ethereum node starts, it has no chain history. Before it can validate -new blocks or produce attestations, it must synchronize with the network. This -involves: +new blocks or produce attestations, it must synchronize with the network. +This involves: 1. **Discovery**: Finding peers with chain data 2. **Assessment**: Determining how far behind we are 3. **Download**: Fetching missing blocks when they arrive out of order 4. **Validation**: Verifying and integrating blocks into our Store How It Works ------------- + - Blocks arrive via gossip subscription - If parent is known, process immediately - If parent is unknown, cache block and fetch parent (backfill) - When parents arrive, process waiting children State Machine -------------- + :: IDLE --> SYNCING --> SYNCED @@ -50,6 +50,7 @@ SignedAttestation, SignedBlockWithAttestation, ) +from lean_spec.subspecs.containers.slot import Slot from lean_spec.subspecs.forkchoice.store import Store from lean_spec.subspecs.networking.reqresp.message import Status from lean_spec.subspecs.networking.transport.peer_id import PeerId @@ -57,6 +58,7 @@ from .backfill_sync import BackfillSync, NetworkRequester from .block_cache import BlockCache +from .config import MAX_PENDING_ATTESTATIONS from .head_sync import HeadSync from .peer_manager import PeerManager from .states import SyncState @@ -66,10 +68,6 @@ logger = logging.getLogger(__name__) -BlockProcessor = Callable[[Store, SignedBlockWithAttestation], Store] - -PublishAggFn = Callable[[SignedAggregatedAttestation], Coroutine[Any, Any, None]] - def default_block_processor( store: Store, @@ -94,10 +92,10 @@ class SyncProgress: state: SyncState """Current sync state machine state.""" - local_head_slot: int | None = None + local_head_slot: Slot | None = None """Slot of our current chain head.""" - network_finalized_slot: int | None = None + network_finalized_slot: Slot | None = None """Network consensus on finalized slot (mode of peer reports).""" blocks_processed: int = 0 @@ -127,7 +125,7 @@ class SyncService: - Maintains the forkchoice Store Design Philosophy - ----------------- + The service is designed to be: **Reactive**: Responds to gossip blocks rather than proactively fetching. @@ -160,10 +158,14 @@ class SyncService: is_aggregator: bool = field(default=False) """Whether this node functions as an aggregator.""" - process_block: BlockProcessor = field(default=default_block_processor) - """Block processor function. Defaults to Store.on_block().""" + process_block: Callable[[Store, SignedBlockWithAttestation], Store] = field( + default=default_block_processor + ) + """Block processor function. Defaults to the store's block processing.""" - _publish_agg_fn: PublishAggFn = field(default=_noop_publish_agg) + _publish_agg_fn: Callable[[SignedAggregatedAttestation], Coroutine[Any, Any, None]] = field( + default=_noop_publish_agg + ) """Callback for publishing aggregated attestations to the network.""" _state: SyncState = field(default=SyncState.IDLE) @@ -236,7 +238,7 @@ def _process_block_wrapper( This wrapper is injected into HeadSync to track processed blocks and optionally persist them to the database. """ - # Delegate to the actual block processor (typically Store.on_block). + # Delegate to the actual block processor. # # The processor validates the block and updates forkchoice state. with metrics.block_processing_time.time(): @@ -313,7 +315,7 @@ def is_syncing(self) -> bool: @property def is_synced(self) -> bool: """Check if synced with network.""" - return self._state == SyncState.SYNCED + return self._state.is_synced def get_progress(self) -> SyncProgress: """ @@ -335,8 +337,8 @@ def get_progress(self) -> SyncProgress: return SyncProgress( state=self._state, - local_head_slot=int(head_slot), - network_finalized_slot=int(network_slot) if network_slot else None, + local_head_slot=head_slot, + network_finalized_slot=network_slot, blocks_processed=self._blocks_processed, # Only count peers that have an active connection. peers_connected=sum(1 for p in self.peer_manager.get_all_peers() if p.is_connected()), @@ -473,6 +475,8 @@ async def on_gossip_attestation( # This handles the common case where attestations arrive # slightly before the block they reference. self._pending_attestations.append(attestation) + if len(self._pending_attestations) > MAX_PENDING_ATTESTATIONS: + self._pending_attestations = self._pending_attestations[-MAX_PENDING_ATTESTATIONS:] async def on_gossip_aggregated_attestation( self, @@ -496,14 +500,18 @@ async def on_gossip_aggregated_attestation( except (AssertionError, KeyError): # Target block not yet processed. Buffer for replay. self._pending_aggregated_attestations.append(signed_attestation) + if len(self._pending_aggregated_attestations) > MAX_PENDING_ATTESTATIONS: + self._pending_aggregated_attestations = self._pending_aggregated_attestations[ + -MAX_PENDING_ATTESTATIONS: + ] def _replay_pending_attestations(self) -> None: """Retry buffered attestations after a block is processed. Drains both pending queues, attempting each attestation against the updated store. Attestations that still fail (e.g., referencing a block - not yet received) are discarded — they will arrive again via gossip - or be included in a future block. + not yet received) are kept in the buffer for the next replay attempt. + The buffer is bounded by MAX_PENDING_ATTESTATIONS to prevent unbounded growth. """ is_aggregator_role = self.store.validator_id is not None and self.is_aggregator @@ -516,7 +524,7 @@ def _replay_pending_attestations(self) -> None: is_aggregator=is_aggregator_role, ) except (AssertionError, KeyError): - pass + self._pending_attestations.append(attestation) pending_agg = self._pending_aggregated_attestations self._pending_aggregated_attestations = [] @@ -524,7 +532,7 @@ def _replay_pending_attestations(self) -> None: try: self.store = self.store.on_gossip_aggregated_attestation(signed_attestation) except (AssertionError, KeyError): - pass + self._pending_aggregated_attestations.append(signed_attestation) async def publish_aggregated_attestation( self, @@ -587,7 +595,7 @@ async def _check_sync_trigger(self) -> None: # # If already SYNCING, we should not re-trigger. # This prevents redundant state transitions. - if self._state not in (SyncState.IDLE, SyncState.SYNCED): + if self._state.is_syncing: return # Guard: Require peer information before syncing. @@ -606,7 +614,7 @@ async def _check_sync_trigger(self) -> None: # network has finalized past our head, we are definitely behind. if network_finalized > head_slot: await self._transition_to(SyncState.SYNCING) - elif self._state == SyncState.IDLE: + elif self._state.is_idle: # Transition from IDLE even if caught up. # # IDLE -> SYNCING enables gossip processing. Even if our head matches @@ -621,7 +629,7 @@ async def _check_sync_complete(self) -> None: finalized slot and there are no orphan blocks. """ # Guard: Only check completion while actively syncing. - if self._state != SyncState.SYNCING: + if not self._state.is_syncing: return # Invariant: All orphan blocks must be resolved before declaring synced. diff --git a/src/lean_spec/subspecs/sync/states.py b/src/lean_spec/subspecs/sync/states.py index 14613dba..20fb1317 100644 --- a/src/lean_spec/subspecs/sync/states.py +++ b/src/lean_spec/subspecs/sync/states.py @@ -12,7 +12,7 @@ class SyncState(Enum): This is a simple three-state machine for reactive synchronization: State Machine Diagram - --------------------- + :: IDLE --> SYNCING --> SYNCED @@ -20,7 +20,7 @@ class SyncState(Enum): +---------+-----------+ The Lifecycle - ------------- + A newly started node follows this progression: 1. **IDLE**: Node starts, no peers connected yet @@ -28,14 +28,14 @@ class SyncState(Enum): 3. **SYNCED**: Local head reaches network finalized slot; fully synchronized How It Works - ------------ + - Blocks arrive via gossip - If parent is known, process immediately - If parent is unknown, cache block and fetch parent (backfill) - Backfill happens naturally within SYNCING, not as a separate state Transitions - ----------- + IDLE -> SYNCING - Triggered when: Peers connected and we need to sync - Action: Start processing gossip blocks @@ -114,6 +114,16 @@ def can_transition_to(self, target: "SyncState") -> bool: """ return target in _VALID_TRANSITIONS.get(self, set()) + @property + def is_idle(self) -> bool: + """ + Check if this state represents inactivity. + + Returns: + True if no synchronization is in progress. + """ + return self == SyncState.IDLE + @property def is_syncing(self) -> bool: """ @@ -124,6 +134,16 @@ def is_syncing(self) -> bool: """ return self == SyncState.SYNCING + @property + def is_synced(self) -> bool: + """ + Check if this state represents full synchronization. + + Returns: + True if the node is caught up with the network. + """ + return self == SyncState.SYNCED + @property def accepts_gossip(self) -> bool: """ diff --git a/tests/lean_spec/helpers/mocks.py b/tests/lean_spec/helpers/mocks.py index 55f12672..0e25f906 100644 --- a/tests/lean_spec/helpers/mocks.py +++ b/tests/lean_spec/helpers/mocks.py @@ -10,6 +10,7 @@ from lean_spec.subspecs.containers import SignedBlockWithAttestation from lean_spec.subspecs.containers.attestation import SignedAttestation +from lean_spec.subspecs.containers.attestation.attestation import SignedAggregatedAttestation from lean_spec.subspecs.containers.slot import Slot from lean_spec.subspecs.networking import PeerId from lean_spec.subspecs.networking.service.events import NetworkEvent @@ -184,3 +185,10 @@ def on_gossip_attestation( """Track attestation additions. Returns self for assignment chaining.""" self._attestations_received.append(signed_attestation) return self + + def on_gossip_aggregated_attestation( + self, + signed_attestation: SignedAggregatedAttestation, + ) -> MockForkchoiceStore: + """Track aggregated attestation additions. Returns self for assignment chaining.""" + return self diff --git a/tests/lean_spec/subspecs/sync/test_backfill_sync.py b/tests/lean_spec/subspecs/sync/test_backfill_sync.py index 4362f21f..ce246457 100644 --- a/tests/lean_spec/subspecs/sync/test_backfill_sync.py +++ b/tests/lean_spec/subspecs/sync/test_backfill_sync.py @@ -194,13 +194,13 @@ async def test_network_failure_handled_gracefully( class TestOrphanHandling: """Tests for orphan block management during backfill.""" - async def test_fill_all_orphans_fetches_missing_parents( + async def test_orphan_parents_fetched_via_fill_missing( self, backfill_system: BackfillSync, network: MockNetworkRequester, peer_id: PeerId, ) -> None: - """fill_all_orphans fetches parents for all orphan blocks in cache.""" + """Fetching orphan parents via get_orphan_parents + fill_missing resolves orphans.""" parent = make_signed_block( slot=Slot(1), proposer_index=ValidatorIndex(0), @@ -220,7 +220,8 @@ async def test_fill_all_orphans_fetches_missing_parents( assert backfill_system.block_cache.orphan_count == 1 - await backfill_system.fill_all_orphans() + orphan_parents = backfill_system.block_cache.get_orphan_parents() + await backfill_system.fill_missing(orphan_parents) assert parent_root in backfill_system.block_cache @@ -257,7 +258,8 @@ async def test_shared_parent_deduplicated( backfill_system.block_cache.mark_orphan(pending1.root) backfill_system.block_cache.mark_orphan(pending2.root) - await backfill_system.fill_all_orphans() + orphan_parents = backfill_system.block_cache.get_orphan_parents() + await backfill_system.fill_missing(orphan_parents) all_requested_roots = [root for _, roots in network.request_log for root in roots] assert all_requested_roots.count(parent_root) == 1 @@ -265,3 +267,85 @@ async def test_shared_parent_deduplicated( assert parent_root in network.request_log[0][1] assert parent_root in backfill_system.block_cache + + +class TestRequestTracking: + """Tests for request in-flight tracking.""" + + async def test_empty_response_does_not_leak_requests_in_flight( + self, + peer_id: PeerId, + network: MockNetworkRequester, + ) -> None: + """Empty response completes the request, keeping the peer available.""" + manager = PeerManager() + manager.add_peer(PeerInfo(peer_id=peer_id, state=ConnectionState.CONNECTED)) + backfill = BackfillSync( + peer_manager=manager, + block_cache=BlockCache(), + network=network, + ) + + # Request a root the network doesn't have (returns empty). + unknown_root = Bytes32(b"\xff" * 32) + await backfill.fill_missing([unknown_root]) + + peer = manager.get_peer(peer_id) + assert peer is not None + assert peer.requests_in_flight == 0 + assert peer.is_available() + + async def test_in_flight_deduplication( + self, + backfill_system: BackfillSync, + network: MockNetworkRequester, + ) -> None: + """Duplicate fill_missing calls for the same root make only one request.""" + block = make_signed_block( + slot=Slot(5), + proposer_index=ValidatorIndex(0), + parent_root=Bytes32.zero(), + state_root=Bytes32.zero(), + ) + root = network.add_block(block) + + # First call fetches the block (may also try to fetch its parent). + await backfill_system.fill_missing([root]) + requests_after_first = len(network.request_log) + assert requests_after_first >= 1 + + # Second call: root is now in cache, so no new request. + await backfill_system.fill_missing([root]) + assert len(network.request_log) == requests_after_first + + async def test_retry_after_failure_clears_pending( + self, + peer_id: PeerId, + ) -> None: + """Failed request clears pending so a retry can succeed.""" + network = MockNetworkRequester() + manager = PeerManager() + manager.add_peer(PeerInfo(peer_id=peer_id, state=ConnectionState.CONNECTED)) + backfill = BackfillSync( + peer_manager=manager, + block_cache=BlockCache(), + network=network, + ) + + block = make_signed_block( + slot=Slot(5), + proposer_index=ValidatorIndex(0), + parent_root=Bytes32.zero(), + state_root=Bytes32.zero(), + ) + root = network.add_block(block) + + # First call fails. + network.should_fail = True + await backfill.fill_missing([root]) + assert root not in backfill._pending + + # Second call succeeds. + network.should_fail = False + await backfill.fill_missing([root]) + assert root in backfill.block_cache diff --git a/tests/lean_spec/subspecs/sync/test_block_cache.py b/tests/lean_spec/subspecs/sync/test_block_cache.py index bfa37a8e..01bb5e83 100644 --- a/tests/lean_spec/subspecs/sync/test_block_cache.py +++ b/tests/lean_spec/subspecs/sync/test_block_cache.py @@ -95,7 +95,6 @@ def test_empty_cache(self) -> None: cache = BlockCache() assert len(cache) == 0 - assert cache.is_empty assert cache.orphan_count == 0 def test_add_block(self, peer_id: PeerId) -> None: @@ -111,7 +110,6 @@ def test_add_block(self, peer_id: PeerId) -> None: pending = cache.add(block, peer_id) assert len(cache) == 1 - assert not cache.is_empty assert pending.block == block assert pending.received_from == peer_id @@ -196,7 +194,6 @@ def test_clear_cache(self, peer_id: PeerId) -> None: cache.clear() assert len(cache) == 0 - assert cache.is_empty assert cache.orphan_count == 0 diff --git a/tests/lean_spec/subspecs/sync/test_checkpoint_sync.py b/tests/lean_spec/subspecs/sync/test_checkpoint_sync.py index 0b7b12b5..fb94209f 100644 --- a/tests/lean_spec/subspecs/sync/test_checkpoint_sync.py +++ b/tests/lean_spec/subspecs/sync/test_checkpoint_sync.py @@ -3,8 +3,10 @@ from __future__ import annotations import asyncio +from unittest.mock import MagicMock from lean_spec.subspecs.api import ApiServer, ApiServerConfig +from lean_spec.subspecs.chain.config import VALIDATOR_REGISTRY_LIMIT from lean_spec.subspecs.containers import State from lean_spec.subspecs.containers.slot import Slot from lean_spec.subspecs.containers.state import Validators @@ -41,6 +43,19 @@ async def test_state_without_validators_fails_verification(self, genesis_state: result = await verify_checkpoint_state(empty_state) assert result is False + async def test_state_exceeding_validator_limit_fails(self) -> None: + """State with more validators than VALIDATOR_REGISTRY_LIMIT fails.""" + # Use a mock because SSZList enforces LIMIT at construction time, + # preventing creation of a real State with too many validators. + mock_state = MagicMock() + mock_state.slot = Slot(0) + mock_validators = MagicMock() + mock_validators.__len__ = MagicMock(return_value=int(VALIDATOR_REGISTRY_LIMIT) + 1) + mock_state.validators = mock_validators + + result = await verify_checkpoint_state(mock_state) + assert result is False + class TestCheckpointSyncClientServerIntegration: """Integration tests for checkpoint sync client fetching from server.""" diff --git a/tests/lean_spec/subspecs/sync/test_head_sync.py b/tests/lean_spec/subspecs/sync/test_head_sync.py index 32a6dadf..292abce9 100644 --- a/tests/lean_spec/subspecs/sync/test_head_sync.py +++ b/tests/lean_spec/subspecs/sync/test_head_sync.py @@ -15,7 +15,7 @@ from lean_spec.subspecs.ssz.hash import hash_tree_root from lean_spec.subspecs.sync.backfill_sync import BackfillSync from lean_spec.subspecs.sync.block_cache import BlockCache -from lean_spec.subspecs.sync.head_sync import HeadSync +from lean_spec.subspecs.sync.head_sync import HeadSync, HeadSyncResult from lean_spec.types import Bytes32 from tests.lean_spec.helpers import make_signed_block @@ -424,3 +424,171 @@ def fail_first(s: Any, block: SignedBlockWithAttestation) -> Any: assert call_count == 2 # Both attempted assert count == 1 # One succeeded assert len(successful_roots) == 1 + + +class TestStorePropagation: + """Tests for store propagation through descendant processing.""" + + async def test_store_propagated_through_descendant_chain( + self, + genesis_block, + peer_id: PeerId, + ) -> None: + """Returned store contains ALL processed blocks, not just the parent.""" + genesis_root = hash_tree_root(genesis_block) + store = cast(Store, MockStore({genesis_root})) + store.blocks[genesis_root] = genesis_block + block_cache = BlockCache() + + # Build chain: parent -> child1 -> child2 + parent = make_signed_block( + slot=Slot(1), + proposer_index=ValidatorIndex(0), + parent_root=genesis_root, + state_root=Bytes32(b"\x01" * 32), + ) + parent_root = hash_tree_root(parent.message.block) + + child1 = make_signed_block( + slot=Slot(2), + proposer_index=ValidatorIndex(0), + parent_root=parent_root, + state_root=Bytes32(b"\x02" * 32), + ) + child1_root = hash_tree_root(child1.message.block) + + child2 = make_signed_block( + slot=Slot(3), + proposer_index=ValidatorIndex(0), + parent_root=child1_root, + state_root=Bytes32(b"\x03" * 32), + ) + child2_root = hash_tree_root(child2.message.block) + + # Pre-cache descendants. + block_cache.add(child1, peer_id) + block_cache.add(child2, peer_id) + + def track_processing(s: Any, block: SignedBlockWithAttestation) -> Any: + root = hash_tree_root(block.message.block) + new_store = MockStore(set(s.blocks.keys()) | {root}) + return new_store + + head_sync = HeadSync( + block_cache=block_cache, + backfill=MagicMock(spec=BackfillSync), + process_block=track_processing, + ) + + result, new_store = await head_sync.on_gossip_block(parent, peer_id, store) + + assert result.processed is True + assert result.descendants_processed == 2 + assert parent_root in new_store.blocks + assert child1_root in new_store.blocks + assert child2_root in new_store.blocks + + +class TestReentrantGuard: + """Tests for reentrant processing guard.""" + + async def test_reentrant_call_returns_not_processed( + self, + genesis_block, + peer_id: PeerId, + ) -> None: + """Block already in _processing returns processed=False, cached=False.""" + genesis_root = hash_tree_root(genesis_block) + store = cast(Store, MockStore({genesis_root})) + store.blocks[genesis_root] = genesis_block + + head_sync = HeadSync( + block_cache=BlockCache(), + backfill=MagicMock(spec=BackfillSync), + process_block=MagicMock(), + ) + + block = make_signed_block( + slot=Slot(1), + proposer_index=ValidatorIndex(0), + parent_root=genesis_root, + state_root=Bytes32.zero(), + ) + block_root = hash_tree_root(block.message.block) + + # Simulate reentrant call by pre-adding to _processing. + head_sync._processing.add(block_root) + + result, returned_store = await head_sync.on_gossip_block(block, peer_id, store) + + assert result == HeadSyncResult( + processed=False, + cached=False, + backfill_triggered=False, + descendants_processed=0, + ) + assert returned_store is store + cast(MagicMock, head_sync.process_block).assert_not_called() + + +class TestProcessAllProcessableConvergence: + """Tests for process_all_processable convergence.""" + + async def test_chain_processed_in_single_call( + self, + genesis_block, + peer_id: PeerId, + ) -> None: + """Chain A -> B -> C all processed in one process_all_processable call.""" + genesis_root = hash_tree_root(genesis_block) + store = cast(Store, MockStore({genesis_root})) + store.blocks[genesis_root] = genesis_block + block_cache = BlockCache() + + # Build chain: A -> B -> C, all with genesis as ultimate ancestor. + block_a = make_signed_block( + slot=Slot(1), + proposer_index=ValidatorIndex(0), + parent_root=genesis_root, + state_root=Bytes32(b"\x01" * 32), + ) + root_a = hash_tree_root(block_a.message.block) + + block_b = make_signed_block( + slot=Slot(2), + proposer_index=ValidatorIndex(0), + parent_root=root_a, + state_root=Bytes32(b"\x02" * 32), + ) + root_b = hash_tree_root(block_b.message.block) + + block_c = make_signed_block( + slot=Slot(3), + proposer_index=ValidatorIndex(0), + parent_root=root_b, + state_root=Bytes32(b"\x03" * 32), + ) + + block_cache.add(block_a, peer_id) + block_cache.add(block_b, peer_id) + block_cache.add(block_c, peer_id) + + processing_order: list[int] = [] + + def track_processing(s: Any, block: SignedBlockWithAttestation) -> Any: + processing_order.append(int(block.message.block.slot)) + root = hash_tree_root(block.message.block) + new_store = MockStore(set(s.blocks.keys()) | {root}) + return new_store + + head_sync = HeadSync( + block_cache=block_cache, + backfill=MagicMock(spec=BackfillSync), + process_block=track_processing, + ) + + count, _ = await head_sync.process_all_processable(store) + + assert count == 3 + assert processing_order == [1, 2, 3] + assert len(block_cache) == 0 diff --git a/tests/lean_spec/subspecs/sync/test_peer_manager.py b/tests/lean_spec/subspecs/sync/test_peer_manager.py index 1e61b3bd..28cc639c 100644 --- a/tests/lean_spec/subspecs/sync/test_peer_manager.py +++ b/tests/lean_spec/subspecs/sync/test_peer_manager.py @@ -9,7 +9,14 @@ from lean_spec.subspecs.networking.reqresp.message import Status from lean_spec.subspecs.networking.types import ConnectionState from lean_spec.subspecs.sync.config import MAX_CONCURRENT_REQUESTS -from lean_spec.subspecs.sync.peer_manager import PeerManager, SyncPeer +from lean_spec.subspecs.sync.peer_manager import ( + INITIAL_PEER_SCORE, + MIN_PEER_SCORE, + SCORE_FAILURE_PENALTY, + SCORE_SUCCESS_BONUS, + PeerManager, + SyncPeer, +) from lean_spec.types import Bytes32 @@ -287,13 +294,14 @@ class TestPeerManagerRequestCallbacks: """Tests for PeerManager request callbacks.""" def test_on_request_success(self, connected_peer_info: PeerInfo) -> None: - """on_request_success decrements in-flight count.""" + """on_request_success decrements in-flight count and increases score.""" manager = PeerManager() sync_peer = manager.add_peer(connected_peer_info) sync_peer.requests_in_flight = 1 manager.on_request_success(connected_peer_info.peer_id) assert sync_peer.requests_in_flight == 0 + assert sync_peer.score == INITIAL_PEER_SCORE + SCORE_SUCCESS_BONUS def test_on_request_success_nonexistent_peer(self) -> None: """on_request_success does nothing for nonexistent peer.""" @@ -302,13 +310,14 @@ def test_on_request_success_nonexistent_peer(self) -> None: manager.on_request_success(peer("16Uiu2HAmNonexistent")) def test_on_request_failure(self, connected_peer_info: PeerInfo) -> None: - """on_request_failure decrements in-flight count.""" + """on_request_failure decrements in-flight count and decreases score.""" manager = PeerManager() sync_peer = manager.add_peer(connected_peer_info) sync_peer.requests_in_flight = 1 manager.on_request_failure(connected_peer_info.peer_id) assert sync_peer.requests_in_flight == 0 + assert sync_peer.score == INITIAL_PEER_SCORE - SCORE_FAILURE_PENALTY def test_on_request_failure_nonexistent_peer(self) -> None: """on_request_failure does nothing for nonexistent peer.""" @@ -335,3 +344,60 @@ def test_get_all_peers_returns_all(self, peer_id: PeerId, peer_id_2: PeerId) -> peers = manager.get_all_peers() assert len(peers) == 2 + + +class TestPeerScoring: + """Tests for peer scoring and weighted selection.""" + + def test_score_starts_at_initial(self, connected_peer_info: PeerInfo) -> None: + """New peers start with INITIAL_PEER_SCORE.""" + manager = PeerManager() + sync_peer = manager.add_peer(connected_peer_info) + assert sync_peer.score == INITIAL_PEER_SCORE + + def test_repeated_failures_lower_score(self, connected_peer_info: PeerInfo) -> None: + """Repeated failures reduce score toward MIN_PEER_SCORE.""" + manager = PeerManager() + sync_peer = manager.add_peer(connected_peer_info) + sync_peer.requests_in_flight = 10 + + for _ in range(20): + manager.on_request_failure(connected_peer_info.peer_id) + + assert sync_peer.score == MIN_PEER_SCORE + + def test_failed_peer_deprioritized(self, peer_id: PeerId, peer_id_2: PeerId) -> None: + """Peer with low score is selected less often than high-score peer.""" + manager = PeerManager() + info1 = PeerInfo(peer_id=peer_id, state=ConnectionState.CONNECTED) + info2 = PeerInfo(peer_id=peer_id_2, state=ConnectionState.CONNECTED) + + peer1 = manager.add_peer(info1) + manager.add_peer(info2) + + # Drive peer1 score to minimum. + peer1.requests_in_flight = 10 + for _ in range(20): + manager.on_request_failure(peer_id) + + assert peer1.score == MIN_PEER_SCORE + + # Sample selections — peer2 should dominate. + selections = [manager.select_peer_for_request() for _ in range(100)] + peer2_count = sum(1 for s in selections if s is not None and s.peer_id == peer_id_2) + assert peer2_count > 80 + + def test_overloaded_peer_skipped(self, peer_id: PeerId, peer_id_2: PeerId) -> None: + """Peer at MAX_CONCURRENT_REQUESTS is skipped in favor of available peer.""" + manager = PeerManager() + info1 = PeerInfo(peer_id=peer_id, state=ConnectionState.CONNECTED) + info2 = PeerInfo(peer_id=peer_id_2, state=ConnectionState.CONNECTED) + + peer1 = manager.add_peer(info1) + manager.add_peer(info2) + + peer1.requests_in_flight = MAX_CONCURRENT_REQUESTS + + selected = manager.select_peer_for_request() + assert selected is not None + assert selected.peer_id == peer_id_2 diff --git a/tests/lean_spec/subspecs/sync/test_service.py b/tests/lean_spec/subspecs/sync/test_service.py index 85f580ba..b42bd955 100644 --- a/tests/lean_spec/subspecs/sync/test_service.py +++ b/tests/lean_spec/subspecs/sync/test_service.py @@ -2,6 +2,8 @@ from __future__ import annotations +from typing import cast + import pytest from lean_spec.subspecs.containers.checkpoint import Checkpoint @@ -13,7 +15,12 @@ from lean_spec.subspecs.sync.service import SyncService from lean_spec.subspecs.sync.states import SyncState from lean_spec.types import Bytes32 -from tests.lean_spec.helpers import create_mock_sync_service, make_signed_block +from tests.lean_spec.helpers import ( + MockForkchoiceStore, + create_mock_sync_service, + make_signed_attestation, + make_signed_block, +) @pytest.fixture @@ -231,7 +238,7 @@ def test_progress_reflects_current_state( # Initial progress progress = sync_service.get_progress() assert progress.state == SyncState.IDLE - assert progress.local_head_slot == 0 + assert progress.local_head_slot == Slot(0) assert progress.blocks_processed == 0 assert progress.peers_connected == 1 # We added one peer in fixture @@ -256,7 +263,7 @@ def test_progress_includes_network_consensus( sync_service.peer_manager.update_status(peer_id, status) progress = sync_service.get_progress() - assert progress.network_finalized_slot == 100 + assert progress.network_finalized_slot == Slot(100) def test_progress_tracks_cache_state( self, @@ -321,3 +328,166 @@ def test_reset_clears_all_state( assert len(sync_service.block_cache) == 0 assert sync_service._backfill is not None assert len(sync_service._backfill._pending) == 0 + + +class TestAttestationGossipHandling: + """Tests for attestation gossip handling.""" + + async def test_attestation_accepted_when_synced( + self, + sync_service: SyncService, + peer_id: PeerId, + ) -> None: + """Attestation is processed when in SYNCED state.""" + sync_service._state = SyncState.SYNCED + + target = Checkpoint(root=sync_service.store.head, slot=Slot(0)) + attestation = make_signed_attestation( + validator=ValidatorIndex(0), + target=target, + ) + + await sync_service.on_gossip_attestation(attestation) + + mock_store = cast(MockForkchoiceStore, sync_service.store) + assert mock_store._attestations_received == [attestation] + + async def test_attestation_rejected_when_idle( + self, + sync_service: SyncService, + ) -> None: + """Attestation is ignored when in IDLE state.""" + assert sync_service.state == SyncState.IDLE + + target = Checkpoint(root=sync_service.store.head, slot=Slot(0)) + attestation = make_signed_attestation( + validator=ValidatorIndex(0), + target=target, + ) + + await sync_service.on_gossip_attestation(attestation) + + mock_store = cast(MockForkchoiceStore, sync_service.store) + assert mock_store._attestations_received == [] + + async def test_attestation_buffered_when_block_unknown( + self, + sync_service: SyncService, + ) -> None: + """Attestation referencing unknown block is buffered for replay.""" + sync_service._state = SyncState.SYNCING + + # Make the mock store reject this attestation. + unknown_root = Bytes32(b"\xab" * 32) + target = Checkpoint(root=unknown_root, slot=Slot(99)) + attestation = make_signed_attestation( + validator=ValidatorIndex(0), + target=target, + ) + + # Override on_gossip_attestation to raise for unknown blocks. + original_fn = sync_service.store.on_gossip_attestation + + def reject_unknown(signed_attestation, *, is_aggregator=False): + if signed_attestation.message.target.root == unknown_root: + raise KeyError("Unknown block") + return original_fn(signed_attestation, is_aggregator=is_aggregator) + + sync_service.store.on_gossip_attestation = reject_unknown # type: ignore[assignment] + + await sync_service.on_gossip_attestation(attestation) + + assert len(sync_service._pending_attestations) == 1 + assert sync_service._pending_attestations[0] is attestation + + async def test_buffered_attestation_replayed_after_block( + self, + sync_service: SyncService, + peer_id: PeerId, + ) -> None: + """Buffered attestation is replayed when a new block is processed.""" + sync_service._state = SyncState.SYNCING + + target = Checkpoint(root=sync_service.store.head, slot=Slot(0)) + attestation = make_signed_attestation( + validator=ValidatorIndex(0), + target=target, + ) + + # Manually buffer an attestation. + sync_service._pending_attestations.append(attestation) + + # Process a gossip block to trigger replay. + genesis_root = sync_service.store.head + block = make_signed_block( + slot=Slot(1), + proposer_index=ValidatorIndex(0), + parent_root=genesis_root, + state_root=Bytes32.zero(), + ) + + await sync_service.on_gossip_block(block, peer_id) + + # Attestation was replayed (accepted by mock store). + assert len(sync_service._pending_attestations) == 0 + mock_store = cast(MockForkchoiceStore, sync_service.store) + assert attestation in mock_store._attestations_received + + +class TestSyncedGossipBlocks: + """Tests for gossip block handling in SYNCED state.""" + + async def test_processes_gossip_in_synced_state( + self, + sync_service: SyncService, + peer_id: PeerId, + ) -> None: + """Gossip blocks are processed when in SYNCED state.""" + sync_service._state = SyncState.SYNCED + + genesis_root = sync_service.store.head + block = make_signed_block( + slot=Slot(1), + proposer_index=ValidatorIndex(0), + parent_root=genesis_root, + state_root=Bytes32.zero(), + ) + + await sync_service.on_gossip_block(block, peer_id) + + assert sync_service._blocks_processed == 1 + + +class TestInvalidStateTransition: + """Tests for invalid state transitions.""" + + async def test_idle_to_synced_raises_value_error( + self, + sync_service: SyncService, + ) -> None: + """Direct IDLE -> SYNCED transition raises ValueError.""" + assert sync_service.state == SyncState.IDLE + + with pytest.raises(ValueError, match="Invalid state transition"): + await sync_service._transition_to(SyncState.SYNCED) + + +class TestIdleToCaughtUp: + """Tests for IDLE-to-SYNCING when already caught up.""" + + async def test_idle_transitions_to_syncing_when_caught_up( + self, + sync_service: SyncService, + peer_id: PeerId, + ) -> None: + """IDLE transitions to SYNCING even when peer reports same head.""" + assert sync_service.state == SyncState.IDLE + + # Peer reports finalized at slot 0 (same as our head). + status = Status( + finalized=Checkpoint(root=Bytes32.zero(), slot=Slot(0)), + head=Checkpoint(root=Bytes32.zero(), slot=Slot(0)), + ) + await sync_service.on_peer_status(peer_id, status) + + assert sync_service.state == SyncState.SYNCING diff --git a/tests/lean_spec/subspecs/sync/test_states.py b/tests/lean_spec/subspecs/sync/test_states.py index a04fe80e..4f226c87 100644 --- a/tests/lean_spec/subspecs/sync/test_states.py +++ b/tests/lean_spec/subspecs/sync/test_states.py @@ -37,33 +37,16 @@ def test_idle_cannot_transition_to_synced(self) -> None: """IDLE cannot transition directly to SYNCED.""" assert not SyncState.IDLE.can_transition_to(SyncState.SYNCED) - def test_idle_cannot_transition_to_itself(self) -> None: - """IDLE cannot transition to IDLE.""" - assert not SyncState.IDLE.can_transition_to(SyncState.IDLE) - def test_syncing_valid_transitions(self) -> None: """SYNCING can transition to SYNCED or IDLE.""" assert SyncState.SYNCING.can_transition_to(SyncState.SYNCED) assert SyncState.SYNCING.can_transition_to(SyncState.IDLE) - def test_syncing_cannot_transition_to_itself(self) -> None: - """SYNCING cannot transition to itself.""" - assert not SyncState.SYNCING.can_transition_to(SyncState.SYNCING) - def test_synced_valid_transitions(self) -> None: """SYNCED can transition to SYNCING or IDLE.""" assert SyncState.SYNCED.can_transition_to(SyncState.SYNCING) assert SyncState.SYNCED.can_transition_to(SyncState.IDLE) - def test_synced_cannot_transition_to_itself(self) -> None: - """SYNCED cannot transition to itself.""" - assert not SyncState.SYNCED.can_transition_to(SyncState.SYNCED) - - def test_all_active_states_can_transition_to_idle(self) -> None: - """SYNCING and SYNCED can transition to IDLE (loss of peers).""" - assert SyncState.SYNCING.can_transition_to(SyncState.IDLE) - assert SyncState.SYNCED.can_transition_to(SyncState.IDLE) - class TestSyncStateIsSyncing: """Tests for the is_syncing property.""" @@ -136,15 +119,6 @@ def test_synced_to_syncing_cycle(self) -> None: assert current == SyncState.SYNCED - def test_any_active_state_to_idle_on_disconnect(self) -> None: - """Test that any active state can return to IDLE on peer disconnect.""" - active_states = [SyncState.SYNCING, SyncState.SYNCED] - - for state in active_states: - assert state.can_transition_to(SyncState.IDLE), ( - f"{state.name} should transition to IDLE" - ) - class TestSyncStateEdgeCases: """Tests for edge cases and invariants.""" From 9aeb92c89e0dccc9539bc60722d3c5225f3bdc02 Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Thu, 19 Feb 2026 11:38:10 +0100 Subject: [PATCH 2/2] better assertions on tests --- .../subspecs/sync/test_backfill_sync.py | 85 ++++++++++---- .../subspecs/sync/test_block_cache.py | 108 +++++++++++------- .../lean_spec/subspecs/sync/test_head_sync.py | 63 +++++++--- .../subspecs/sync/test_peer_manager.py | 72 +++++++----- tests/lean_spec/subspecs/sync/test_service.py | 57 ++++++--- 5 files changed, 256 insertions(+), 129 deletions(-) diff --git a/tests/lean_spec/subspecs/sync/test_backfill_sync.py b/tests/lean_spec/subspecs/sync/test_backfill_sync.py index ce246457..aec7fcb6 100644 --- a/tests/lean_spec/subspecs/sync/test_backfill_sync.py +++ b/tests/lean_spec/subspecs/sync/test_backfill_sync.py @@ -10,9 +10,14 @@ from lean_spec.subspecs.networking.peer import PeerInfo from lean_spec.subspecs.networking.types import ConnectionState from lean_spec.subspecs.sync.backfill_sync import BackfillSync -from lean_spec.subspecs.sync.block_cache import BlockCache +from lean_spec.subspecs.sync.block_cache import BlockCache, PendingBlock from lean_spec.subspecs.sync.config import MAX_BACKFILL_DEPTH, MAX_BLOCKS_PER_REQUEST -from lean_spec.subspecs.sync.peer_manager import PeerManager +from lean_spec.subspecs.sync.peer_manager import ( + INITIAL_PEER_SCORE, + SCORE_SUCCESS_BONUS, + PeerManager, + SyncPeer, +) from lean_spec.types import Bytes32 from tests.lean_spec.helpers import MockNetworkRequester, make_signed_block @@ -55,10 +60,17 @@ async def test_fetch_single_missing_block( await backfill_system.fill_missing([block_root]) - assert block_root in backfill_system.block_cache cached = backfill_system.block_cache.get(block_root) assert cached is not None - assert cached.slot == Slot(10) + assert cached == PendingBlock( + block=block, + root=block_root, + parent_root=Bytes32.zero(), + slot=Slot(10), + received_from=peer_id, + received_at=cached.received_at, + backfill_depth=1, + ) async def test_recursive_parent_chain_resolution( self, @@ -93,20 +105,42 @@ async def test_recursive_parent_chain_resolution( await backfill_system.fill_missing([child_root]) - assert child_root in backfill_system.block_cache - assert parent_root in backfill_system.block_cache - assert grandparent_root in backfill_system.block_cache - child_cached = backfill_system.block_cache.get(child_root) parent_cached = backfill_system.block_cache.get(parent_root) grandparent_cached = backfill_system.block_cache.get(grandparent_root) assert child_cached is not None + assert child_cached == PendingBlock( + block=child, + root=child_root, + parent_root=parent_root, + slot=Slot(3), + received_from=peer_id, + received_at=child_cached.received_at, + backfill_depth=1, + ) + assert parent_cached is not None + assert parent_cached == PendingBlock( + block=parent, + root=parent_root, + parent_root=grandparent_root, + slot=Slot(2), + received_from=peer_id, + received_at=parent_cached.received_at, + backfill_depth=2, + ) + assert grandparent_cached is not None - assert child_cached.backfill_depth == 1 - assert parent_cached.backfill_depth == 2 - assert grandparent_cached.backfill_depth == 3 + assert grandparent_cached == PendingBlock( + block=grandparent, + root=grandparent_root, + parent_root=Bytes32.zero(), + slot=Slot(1), + received_from=peer_id, + received_at=grandparent_cached.received_at, + backfill_depth=3, + ) async def test_depth_limit_stops_infinite_recursion( self, @@ -118,7 +152,7 @@ async def test_depth_limit_stops_infinite_recursion( await backfill_system.fill_missing([root], depth=MAX_BACKFILL_DEPTH) - assert len(network.request_log) == 0 + assert network.request_log == [] assert root not in backfill_system.block_cache async def test_skips_already_cached_blocks( @@ -140,7 +174,7 @@ async def test_skips_already_cached_blocks( await backfill_system.fill_missing([block_root]) - assert len(network.request_log) == 0 + assert network.request_log == [] class TestBatchingAndPeerManagement: @@ -150,6 +184,7 @@ async def test_large_request_split_into_batches( self, backfill_system: BackfillSync, network: MockNetworkRequester, + peer_id: PeerId, ) -> None: """Requests larger than MAX_BLOCKS_PER_REQUEST are split.""" num_roots = MAX_BLOCKS_PER_REQUEST + 5 @@ -157,9 +192,10 @@ async def test_large_request_split_into_batches( await backfill_system.fill_missing(roots) - assert len(network.request_log) == 2 - assert len(network.request_log[0][1]) == MAX_BLOCKS_PER_REQUEST - assert len(network.request_log[1][1]) == 5 + assert network.request_log == [ + (peer_id, roots[:MAX_BLOCKS_PER_REQUEST]), + (peer_id, roots[MAX_BLOCKS_PER_REQUEST:]), + ] async def test_no_requests_without_available_peer( self, @@ -175,7 +211,7 @@ async def test_no_requests_without_available_peer( await backfill.fill_missing([Bytes32(b"\x01" * 32)]) - assert len(network.request_log) == 0 + assert network.request_log == [] async def test_network_failure_handled_gracefully( self, @@ -188,7 +224,7 @@ async def test_network_failure_handled_gracefully( await backfill_system.fill_missing([root]) - assert root not in backfill_system._pending + assert backfill_system._pending == set() class TestOrphanHandling: @@ -279,7 +315,8 @@ async def test_empty_response_does_not_leak_requests_in_flight( ) -> None: """Empty response completes the request, keeping the peer available.""" manager = PeerManager() - manager.add_peer(PeerInfo(peer_id=peer_id, state=ConnectionState.CONNECTED)) + info = PeerInfo(peer_id=peer_id, state=ConnectionState.CONNECTED) + manager.add_peer(info) backfill = BackfillSync( peer_manager=manager, block_cache=BlockCache(), @@ -291,9 +328,11 @@ async def test_empty_response_does_not_leak_requests_in_flight( await backfill.fill_missing([unknown_root]) peer = manager.get_peer(peer_id) - assert peer is not None - assert peer.requests_in_flight == 0 - assert peer.is_available() + assert peer == SyncPeer( + info=info, + requests_in_flight=0, + score=INITIAL_PEER_SCORE + SCORE_SUCCESS_BONUS, + ) async def test_in_flight_deduplication( self, @@ -343,7 +382,7 @@ async def test_retry_after_failure_clears_pending( # First call fails. network.should_fail = True await backfill.fill_missing([root]) - assert root not in backfill._pending + assert backfill._pending == set() # Second call succeeds. network.should_fail = False diff --git a/tests/lean_spec/subspecs/sync/test_block_cache.py b/tests/lean_spec/subspecs/sync/test_block_cache.py index 01bb5e83..d84a71fb 100644 --- a/tests/lean_spec/subspecs/sync/test_block_cache.py +++ b/tests/lean_spec/subspecs/sync/test_block_cache.py @@ -36,12 +36,15 @@ def test_create_pending_block(self, peer_id: PeerId) -> None: received_from=peer_id, ) - assert pending.block == block - assert pending.root == root - assert pending.parent_root == Bytes32.zero() - assert pending.slot == Slot(1) - assert pending.received_from == peer_id - assert pending.backfill_depth == 0 + assert pending == PendingBlock( + block=block, + root=root, + parent_root=Bytes32.zero(), + slot=Slot(1), + received_from=peer_id, + received_at=pending.received_at, + backfill_depth=0, + ) def test_pending_block_default_received_at(self, peer_id: PeerId) -> None: """PendingBlock sets received_at to current time by default.""" @@ -84,7 +87,15 @@ def test_pending_block_custom_backfill_depth(self, peer_id: PeerId) -> None: backfill_depth=5, ) - assert pending.backfill_depth == 5 + assert pending == PendingBlock( + block=block, + root=root, + parent_root=Bytes32.zero(), + slot=Slot(1), + received_from=peer_id, + received_at=pending.received_at, + backfill_depth=5, + ) class TestBlockCacheBasicOperations: @@ -109,9 +120,17 @@ def test_add_block(self, peer_id: PeerId) -> None: pending = cache.add(block, peer_id) + root = hash_tree_root(block.message.block) assert len(cache) == 1 - assert pending.block == block - assert pending.received_from == peer_id + assert pending == PendingBlock( + block=block, + root=root, + parent_root=Bytes32.zero(), + slot=Slot(1), + received_from=peer_id, + received_at=pending.received_at, + backfill_depth=0, + ) def test_contains_block(self, peer_id: PeerId) -> None: """Contains check works for cached blocks.""" @@ -141,7 +160,6 @@ def test_get_block(self, peer_id: PeerId) -> None: pending = cache.add(block, peer_id) retrieved = cache.get(pending.root) - assert retrieved is not None assert retrieved == pending def test_get_nonexistent_block(self) -> None: @@ -337,8 +355,7 @@ def test_get_orphan_parents(self, peer_id: PeerId) -> None: orphan_parents = cache.get_orphan_parents() - assert len(orphan_parents) == 1 - assert parent_root in orphan_parents + assert orphan_parents == [parent_root] def test_get_orphan_parents_deduplicates(self, peer_id: PeerId) -> None: """get_orphan_parents deduplicates when multiple orphans share a parent.""" @@ -367,8 +384,7 @@ def test_get_orphan_parents_deduplicates(self, peer_id: PeerId) -> None: orphan_parents = cache.get_orphan_parents() # Should return the common parent only once - assert len(orphan_parents) == 1 - assert common_parent in orphan_parents + assert orphan_parents == [common_parent] def test_get_orphan_parents_excludes_cached_parents(self, peer_id: PeerId) -> None: """get_orphan_parents excludes parents that are already in the cache.""" @@ -396,7 +412,7 @@ def test_get_orphan_parents_excludes_cached_parents(self, peer_id: PeerId) -> No orphan_parents = cache.get_orphan_parents() # Parent is in cache, so should not be returned - assert len(orphan_parents) == 0 + assert orphan_parents == [] class TestBlockCacheParentChildIndex: @@ -425,8 +441,7 @@ def test_get_children_single_child(self, peer_id: PeerId) -> None: children = cache.get_children(parent_root) - assert len(children) == 1 - assert children[0] == pending + assert children == [pending] def test_get_children_multiple_children(self, peer_id: PeerId) -> None: """get_children returns all children of a parent.""" @@ -452,10 +467,10 @@ def test_get_children_multiple_children(self, peer_id: PeerId) -> None: children = cache.get_children(parent_root) - assert len(children) == 2 - roots = {c.root for c in children} - assert pending1.root in roots - assert pending2.root in roots + # Both blocks have the same slot, so order within that slot is non-deterministic. + assert sorted(children, key=lambda p: p.root) == sorted( + [pending1, pending2], key=lambda p: p.root + ) def test_get_children_sorted_by_slot(self, peer_id: PeerId) -> None: """get_children returns children sorted by slot.""" @@ -482,16 +497,13 @@ def test_get_children_sorted_by_slot(self, peer_id: PeerId) -> None: state_root=Bytes32(b"\x02" * 32), ) - cache.add(block_slot3, peer_id) - cache.add(block_slot1, peer_id) - cache.add(block_slot2, peer_id) + pending3 = cache.add(block_slot3, peer_id) + pending1 = cache.add(block_slot1, peer_id) + pending2 = cache.add(block_slot2, peer_id) children = cache.get_children(parent_root) - assert len(children) == 3 - assert children[0].slot == Slot(1) - assert children[1].slot == Slot(2) - assert children[2].slot == Slot(3) + assert children == [pending1, pending2, pending3] def test_remove_clears_parent_index(self, peer_id: PeerId) -> None: """Removing a block clears it from the parent-to-children index.""" @@ -505,11 +517,11 @@ def test_remove_clears_parent_index(self, peer_id: PeerId) -> None: ) pending = cache.add(block, peer_id) - assert len(cache.get_children(parent_root)) == 1 + assert cache.get_children(parent_root) == [pending] cache.remove(pending.root) - assert len(cache.get_children(parent_root)) == 0 + assert cache.get_children(parent_root) == [] class TestBlockCacheCapacityManagement: @@ -647,8 +659,7 @@ def test_get_processable_finds_block_with_parent_in_store(self, peer_id: PeerId) processable = cache.get_processable(mock_store) - assert len(processable) == 1 - assert processable[0] == pending + assert processable == [pending] def test_get_processable_sorted_by_slot(self, peer_id: PeerId) -> None: """get_processable returns blocks sorted by slot.""" @@ -678,16 +689,13 @@ def test_get_processable_sorted_by_slot(self, peer_id: PeerId) -> None: state_root=Bytes32(b"\x02" * 32), ) - cache.add(block3, peer_id) - cache.add(block1, peer_id) - cache.add(block2, peer_id) + pending3 = cache.add(block3, peer_id) + pending1 = cache.add(block1, peer_id) + pending2 = cache.add(block2, peer_id) processable = cache.get_processable(mock_store) - assert len(processable) == 3 - assert processable[0].slot == Slot(1) - assert processable[1].slot == Slot(2) - assert processable[2].slot == Slot(3) + assert processable == [pending1, pending2, pending3] class TestBlockCacheBackfillDepth: @@ -705,7 +713,16 @@ def test_add_with_backfill_depth(self, peer_id: PeerId) -> None: pending = cache.add(block, peer_id, backfill_depth=10) - assert pending.backfill_depth == 10 + root = hash_tree_root(block.message.block) + assert pending == PendingBlock( + block=block, + root=root, + parent_root=Bytes32.zero(), + slot=Slot(1), + received_from=peer_id, + received_at=pending.received_at, + backfill_depth=10, + ) def test_add_default_backfill_depth_is_zero(self, peer_id: PeerId) -> None: """Adding a block without backfill depth defaults to 0.""" @@ -719,4 +736,13 @@ def test_add_default_backfill_depth_is_zero(self, peer_id: PeerId) -> None: pending = cache.add(block, peer_id) - assert pending.backfill_depth == 0 + root = hash_tree_root(block.message.block) + assert pending == PendingBlock( + block=block, + root=root, + parent_root=Bytes32.zero(), + slot=Slot(1), + received_from=peer_id, + received_at=pending.received_at, + backfill_depth=0, + ) diff --git a/tests/lean_spec/subspecs/sync/test_head_sync.py b/tests/lean_spec/subspecs/sync/test_head_sync.py index 292abce9..92a1446a 100644 --- a/tests/lean_spec/subspecs/sync/test_head_sync.py +++ b/tests/lean_spec/subspecs/sync/test_head_sync.py @@ -73,9 +73,12 @@ def track_processing(s: Any, block: SignedBlockWithAttestation) -> Any: result, new_store = await head_sync.on_gossip_block(block, peer_id, store) - assert result.processed is True - assert result.cached is False - assert result.backfill_triggered is False + assert result == HeadSyncResult( + processed=True, + cached=False, + backfill_triggered=False, + descendants_processed=0, + ) assert block_root in processed_blocks async def test_block_with_unknown_parent_cached_and_triggers_backfill( @@ -104,9 +107,12 @@ async def test_block_with_unknown_parent_cached_and_triggers_backfill( result, _ = await head_sync.on_gossip_block(block, peer_id, store) - assert result.processed is False - assert result.cached is True - assert result.backfill_triggered is True + assert result == HeadSyncResult( + processed=False, + cached=True, + backfill_triggered=True, + descendants_processed=0, + ) assert block_root in head_sync.block_cache assert head_sync.block_cache.orphan_count == 1 backfill.fill_missing.assert_called_once_with([unknown_parent]) @@ -138,8 +144,12 @@ async def test_duplicate_block_skipped( result, _ = await head_sync.on_gossip_block(block, peer_id, store) - assert result.processed is False - assert result.cached is False + assert result == HeadSyncResult( + processed=False, + cached=False, + backfill_triggered=False, + descendants_processed=0, + ) process_block.assert_not_called() @@ -194,8 +204,12 @@ def track_processing(s: Any, block: SignedBlockWithAttestation) -> Any: # Process parent - should trigger child processing result, _ = await head_sync.on_gossip_block(parent, peer_id, store) - assert result.processed is True - assert result.descendants_processed == 1 + assert result == HeadSyncResult( + processed=True, + cached=False, + backfill_triggered=False, + descendants_processed=1, + ) assert processing_order == [parent_root, child_root] assert child_root not in block_cache # Removed after processing @@ -244,8 +258,12 @@ def track_processing(s: Any, block: SignedBlockWithAttestation) -> Any: # Process first block - should cascade to all descendants result, _ = await head_sync.on_gossip_block(blocks[0], peer_id, store) - assert result.processed is True - assert result.descendants_processed == 3 + assert result == HeadSyncResult( + processed=True, + cached=False, + backfill_triggered=False, + descendants_processed=3, + ) assert processing_order == [1, 2, 3, 4] @@ -367,8 +385,13 @@ def fail_processing(s: Any, b: SignedBlockWithAttestation) -> Any: result, returned_store = await head_sync.on_gossip_block(block, peer_id, store) - assert result.processed is False - assert result.error == "State transition failed" + assert result == HeadSyncResult( + processed=False, + cached=False, + backfill_triggered=False, + descendants_processed=0, + error="State transition failed", + ) assert returned_store is store # Original store returned on error async def test_sibling_error_does_not_block_other_siblings( @@ -482,11 +505,13 @@ def track_processing(s: Any, block: SignedBlockWithAttestation) -> Any: result, new_store = await head_sync.on_gossip_block(parent, peer_id, store) - assert result.processed is True - assert result.descendants_processed == 2 - assert parent_root in new_store.blocks - assert child1_root in new_store.blocks - assert child2_root in new_store.blocks + assert result == HeadSyncResult( + processed=True, + cached=False, + backfill_triggered=False, + descendants_processed=2, + ) + assert {parent_root, child1_root, child2_root} <= set(new_store.blocks.keys()) class TestReentrantGuard: diff --git a/tests/lean_spec/subspecs/sync/test_peer_manager.py b/tests/lean_spec/subspecs/sync/test_peer_manager.py index 28cc639c..086cee4b 100644 --- a/tests/lean_spec/subspecs/sync/test_peer_manager.py +++ b/tests/lean_spec/subspecs/sync/test_peer_manager.py @@ -32,10 +32,12 @@ def test_create_sync_peer(self, connected_peer_info: PeerInfo) -> None: """SyncPeer can be created from PeerInfo.""" sync_peer = SyncPeer(info=connected_peer_info) - assert sync_peer.info == connected_peer_info - assert sync_peer.peer_id == connected_peer_info.peer_id - assert sync_peer.status is None - assert sync_peer.requests_in_flight == 0 + assert sync_peer == SyncPeer( + info=connected_peer_info, + status=None, + requests_in_flight=0, + score=INITIAL_PEER_SCORE, + ) def test_is_connected(self, connected_peer_info: PeerInfo) -> None: """is_connected returns True when peer is connected.""" @@ -82,20 +84,20 @@ def test_on_request_start(self, connected_peer_info: PeerInfo) -> None: """on_request_start increments requests_in_flight.""" sync_peer = SyncPeer(info=connected_peer_info) sync_peer.on_request_start() - assert sync_peer.requests_in_flight == 1 + assert sync_peer == SyncPeer(info=connected_peer_info, requests_in_flight=1) def test_on_request_complete(self, connected_peer_info: PeerInfo) -> None: """on_request_complete decrements requests_in_flight.""" sync_peer = SyncPeer(info=connected_peer_info) sync_peer.requests_in_flight = 2 sync_peer.on_request_complete() - assert sync_peer.requests_in_flight == 1 + assert sync_peer == SyncPeer(info=connected_peer_info, requests_in_flight=1) def test_on_request_complete_does_not_go_negative(self, connected_peer_info: PeerInfo) -> None: """on_request_complete does not let in_flight go negative.""" sync_peer = SyncPeer(info=connected_peer_info) sync_peer.on_request_complete() - assert sync_peer.requests_in_flight == 0 + assert sync_peer == SyncPeer(info=connected_peer_info) class TestPeerManagerBasicOperations: @@ -113,7 +115,7 @@ def test_add_peer(self, connected_peer_info: PeerInfo) -> None: assert len(manager) == 1 assert connected_peer_info.peer_id in manager - assert sync_peer.info == connected_peer_info + assert sync_peer == SyncPeer(info=connected_peer_info) def test_add_peer_updates_existing(self, peer_id: PeerId) -> None: """Adding a peer with same ID updates the existing entry.""" @@ -126,7 +128,7 @@ def test_add_peer_updates_existing(self, peer_id: PeerId) -> None: assert len(manager) == 1 assert sync_peer1 is sync_peer2 - assert sync_peer2.info == info2 + assert sync_peer2 == SyncPeer(info=info2) def test_remove_peer(self, connected_peer_info: PeerInfo) -> None: """Removing a peer returns it and removes from manager.""" @@ -135,7 +137,7 @@ def test_remove_peer(self, connected_peer_info: PeerInfo) -> None: removed = manager.remove_peer(connected_peer_info.peer_id) - assert removed is not None + assert removed == SyncPeer(info=connected_peer_info) assert len(manager) == 0 def test_remove_nonexistent_peer(self) -> None: @@ -149,8 +151,7 @@ def test_get_peer(self, connected_peer_info: PeerInfo) -> None: manager.add_peer(connected_peer_info) peer = manager.get_peer(connected_peer_info.peer_id) - assert peer is not None - assert peer.peer_id == connected_peer_info.peer_id + assert peer == SyncPeer(info=connected_peer_info) def test_get_nonexistent_peer(self) -> None: """Getting a nonexistent peer returns None.""" @@ -176,8 +177,7 @@ def test_update_status(self, connected_peer_info: PeerInfo, sample_status: Statu manager.update_status(connected_peer_info.peer_id, sample_status) peer = manager.get_peer(connected_peer_info.peer_id) - assert peer is not None - assert peer.status == sample_status + assert peer == SyncPeer(info=connected_peer_info, status=sample_status) def test_update_status_nonexistent_peer(self, sample_status: Status) -> None: """update_status does nothing for nonexistent peer.""" @@ -194,8 +194,8 @@ def test_select_peer_for_request(self, connected_peer_info: PeerInfo) -> None: manager = PeerManager() manager.add_peer(connected_peer_info) - peer = manager.select_peer_for_request() - assert peer is not None + selected = manager.select_peer_for_request() + assert selected == SyncPeer(info=connected_peer_info) def test_select_peer_for_request_none_when_empty(self) -> None: """select_peer_for_request returns None when no peers.""" @@ -233,8 +233,7 @@ def test_select_peer_for_request_filters_by_min_slot( # Request slot 150 - only peer2 has it selected = manager.select_peer_for_request(min_slot=Slot(150)) - assert selected is not None - assert selected.peer_id == peer_id_2 + assert selected == sync_peer2 class TestPeerManagerNetworkConsensus: @@ -300,8 +299,11 @@ def test_on_request_success(self, connected_peer_info: PeerInfo) -> None: sync_peer.requests_in_flight = 1 manager.on_request_success(connected_peer_info.peer_id) - assert sync_peer.requests_in_flight == 0 - assert sync_peer.score == INITIAL_PEER_SCORE + SCORE_SUCCESS_BONUS + assert sync_peer == SyncPeer( + info=connected_peer_info, + requests_in_flight=0, + score=INITIAL_PEER_SCORE + SCORE_SUCCESS_BONUS, + ) def test_on_request_success_nonexistent_peer(self) -> None: """on_request_success does nothing for nonexistent peer.""" @@ -316,8 +318,11 @@ def test_on_request_failure(self, connected_peer_info: PeerInfo) -> None: sync_peer.requests_in_flight = 1 manager.on_request_failure(connected_peer_info.peer_id) - assert sync_peer.requests_in_flight == 0 - assert sync_peer.score == INITIAL_PEER_SCORE - SCORE_FAILURE_PENALTY + assert sync_peer == SyncPeer( + info=connected_peer_info, + requests_in_flight=0, + score=INITIAL_PEER_SCORE - SCORE_FAILURE_PENALTY, + ) def test_on_request_failure_nonexistent_peer(self) -> None: """on_request_failure does nothing for nonexistent peer.""" @@ -338,12 +343,13 @@ def test_get_all_peers_returns_all(self, peer_id: PeerId, peer_id_2: PeerId) -> """get_all_peers returns all tracked peers.""" manager = PeerManager() - for pid in [peer_id, peer_id_2]: - info = PeerInfo(peer_id=pid, state=ConnectionState.CONNECTED) - manager.add_peer(info) + info1 = PeerInfo(peer_id=peer_id, state=ConnectionState.CONNECTED) + info2 = PeerInfo(peer_id=peer_id_2, state=ConnectionState.CONNECTED) + manager.add_peer(info1) + manager.add_peer(info2) peers = manager.get_all_peers() - assert len(peers) == 2 + assert peers == [SyncPeer(info=info1), SyncPeer(info=info2)] class TestPeerScoring: @@ -353,7 +359,7 @@ def test_score_starts_at_initial(self, connected_peer_info: PeerInfo) -> None: """New peers start with INITIAL_PEER_SCORE.""" manager = PeerManager() sync_peer = manager.add_peer(connected_peer_info) - assert sync_peer.score == INITIAL_PEER_SCORE + assert sync_peer == SyncPeer(info=connected_peer_info) def test_repeated_failures_lower_score(self, connected_peer_info: PeerInfo) -> None: """Repeated failures reduce score toward MIN_PEER_SCORE.""" @@ -364,7 +370,11 @@ def test_repeated_failures_lower_score(self, connected_peer_info: PeerInfo) -> N for _ in range(20): manager.on_request_failure(connected_peer_info.peer_id) - assert sync_peer.score == MIN_PEER_SCORE + assert sync_peer == SyncPeer( + info=connected_peer_info, + requests_in_flight=0, + score=MIN_PEER_SCORE, + ) def test_failed_peer_deprioritized(self, peer_id: PeerId, peer_id_2: PeerId) -> None: """Peer with low score is selected less often than high-score peer.""" @@ -380,7 +390,7 @@ def test_failed_peer_deprioritized(self, peer_id: PeerId, peer_id_2: PeerId) -> for _ in range(20): manager.on_request_failure(peer_id) - assert peer1.score == MIN_PEER_SCORE + assert peer1 == SyncPeer(info=info1, requests_in_flight=0, score=MIN_PEER_SCORE) # Sample selections — peer2 should dominate. selections = [manager.select_peer_for_request() for _ in range(100)] @@ -398,6 +408,6 @@ def test_overloaded_peer_skipped(self, peer_id: PeerId, peer_id_2: PeerId) -> No peer1.requests_in_flight = MAX_CONCURRENT_REQUESTS + peer2 = manager.get_peer(peer_id_2) selected = manager.select_peer_for_request() - assert selected is not None - assert selected.peer_id == peer_id_2 + assert selected == peer2 diff --git a/tests/lean_spec/subspecs/sync/test_service.py b/tests/lean_spec/subspecs/sync/test_service.py index b42bd955..1b5b795c 100644 --- a/tests/lean_spec/subspecs/sync/test_service.py +++ b/tests/lean_spec/subspecs/sync/test_service.py @@ -12,7 +12,7 @@ from lean_spec.subspecs.networking import PeerId from lean_spec.subspecs.networking.reqresp.message import Status from lean_spec.subspecs.ssz.hash import hash_tree_root -from lean_spec.subspecs.sync.service import SyncService +from lean_spec.subspecs.sync.service import SyncProgress, SyncService from lean_spec.subspecs.sync.states import SyncState from lean_spec.types import Bytes32 from tests.lean_spec.helpers import ( @@ -174,8 +174,9 @@ async def test_ignores_gossip_in_idle_state( await sync_service.on_gossip_block(block, peer_id) - # Block should not be processed or cached + # Block should not be processed or cached. assert sync_service._blocks_processed == 0 + assert sync_service.block_cache.orphan_count == 0 assert len(sync_service.block_cache) == 0 async def test_processes_gossip_in_syncing_state( @@ -237,18 +238,30 @@ def test_progress_reflects_current_state( """get_progress accurately reflects service state.""" # Initial progress progress = sync_service.get_progress() - assert progress.state == SyncState.IDLE - assert progress.local_head_slot == Slot(0) - assert progress.blocks_processed == 0 - assert progress.peers_connected == 1 # We added one peer in fixture + assert progress == SyncProgress( + state=SyncState.IDLE, + local_head_slot=Slot(0), + network_finalized_slot=None, + blocks_processed=0, + peers_connected=1, + cache_size=0, + orphan_count=0, + ) # After processing some blocks sync_service._state = SyncState.SYNCING sync_service._blocks_processed = 42 progress = sync_service.get_progress() - assert progress.state == SyncState.SYNCING - assert progress.blocks_processed == 42 + assert progress == SyncProgress( + state=SyncState.SYNCING, + local_head_slot=Slot(0), + network_finalized_slot=None, + blocks_processed=42, + peers_connected=1, + cache_size=0, + orphan_count=0, + ) def test_progress_includes_network_consensus( self, @@ -263,7 +276,15 @@ def test_progress_includes_network_consensus( sync_service.peer_manager.update_status(peer_id, status) progress = sync_service.get_progress() - assert progress.network_finalized_slot == Slot(100) + assert progress == SyncProgress( + state=SyncState.IDLE, + local_head_slot=Slot(0), + network_finalized_slot=Slot(100), + blocks_processed=0, + peers_connected=1, + cache_size=0, + orphan_count=0, + ) def test_progress_tracks_cache_state( self, @@ -290,8 +311,15 @@ def test_progress_tracks_cache_state( sync_service.block_cache.mark_orphan(pending1.root) progress = sync_service.get_progress() - assert progress.cache_size == 2 - assert progress.orphan_count == 1 + assert progress == SyncProgress( + state=SyncState.IDLE, + local_head_slot=Slot(0), + network_finalized_slot=None, + blocks_processed=0, + peers_connected=1, + cache_size=2, + orphan_count=1, + ) class TestReset: @@ -327,7 +355,7 @@ def test_reset_clears_all_state( assert sync_service._blocks_processed == 0 assert len(sync_service.block_cache) == 0 assert sync_service._backfill is not None - assert len(sync_service._backfill._pending) == 0 + assert sync_service._backfill._pending == set() class TestAttestationGossipHandling: @@ -397,8 +425,7 @@ def reject_unknown(signed_attestation, *, is_aggregator=False): await sync_service.on_gossip_attestation(attestation) - assert len(sync_service._pending_attestations) == 1 - assert sync_service._pending_attestations[0] is attestation + assert sync_service._pending_attestations == [attestation] async def test_buffered_attestation_replayed_after_block( self, @@ -429,7 +456,7 @@ async def test_buffered_attestation_replayed_after_block( await sync_service.on_gossip_block(block, peer_id) # Attestation was replayed (accepted by mock store). - assert len(sync_service._pending_attestations) == 0 + assert sync_service._pending_attestations == [] mock_store = cast(MockForkchoiceStore, sync_service.store) assert attestation in mock_store._attestations_received