Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ use lightning::routing::utxo::UtxoLookup;
use lightning::sign::{
ChangeDestinationSource, ChangeDestinationSourceSync, EntropySource, OutputSpender,
};
#[cfg(not(c_bindings))]
use lightning::util::async_poll::MaybeSend;
use lightning::util::logger::Logger;
#[cfg(not(c_bindings))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commit msg typo

use lightning::util::native_async::MaybeSend;
use lightning::util::persist::{
KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
Expand Down
1 change: 1 addition & 0 deletions lightning-block-sync/src/async_poll.rs
264 changes: 128 additions & 136 deletions lightning-block-sync/src/init.rs

Large diffs are not rendered by default.

194 changes: 138 additions & 56 deletions lightning-block-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
#![deny(rustdoc::broken_intra_doc_links)]
#![deny(rustdoc::private_intra_doc_links)]
#![deny(missing_docs)]
#![deny(unsafe_code)]
#![cfg_attr(docsrs, feature(doc_cfg))]

extern crate alloc;
extern crate core;

#[cfg(any(feature = "rest-client", feature = "rpc-client"))]
pub mod http;

Expand All @@ -42,6 +44,9 @@ mod test_utils;
#[cfg(any(feature = "rest-client", feature = "rpc-client"))]
mod utils;

#[allow(unused)]
mod async_poll;

use crate::poll::{ChainTip, Poll, ValidatedBlockHeader};

use bitcoin::block::{Block, Header};
Expand Down Expand Up @@ -170,18 +175,13 @@ pub enum BlockData {
/// sources for the best chain tip. During this process it detects any chain forks, determines which
/// constitutes the best chain, and updates the listener accordingly with any blocks that were
/// connected or disconnected since the last poll.
///
/// Block headers for the best chain are maintained in the parameterized cache, allowing for a
/// custom cache eviction policy. This offers flexibility to those sensitive to resource usage.
/// Hence, there is a trade-off between a lower memory footprint and potentially increased network
/// I/O as headers are re-fetched during fork detection.
pub struct SpvClient<'a, P: Poll, C: Cache, L: Deref>
pub struct SpvClient<P: Poll, L: Deref>
where
L::Target: chain::Listen,
{
chain_tip: ValidatedBlockHeader,
chain_poller: P,
chain_notifier: ChainNotifier<'a, C, L>,
chain_notifier: ChainNotifier<HeaderCache, L>,
}

/// The `Cache` trait defines behavior for managing a block header cache, where block headers are
Expand All @@ -194,37 +194,86 @@ where
/// Implementations may define how long to retain headers such that it's unlikely they will ever be
/// needed to disconnect a block. In cases where block sources provide access to headers on stale
/// forks reliably, caches may be entirely unnecessary.
pub trait Cache {
pub(crate) trait Cache {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the trait still needed or can it be removed if fixed types are used internally?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually yea, see new commit, we do :/

/// Retrieves the block header keyed by the given block hash.
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader>;

/// Inserts the given block header during a find_difference operation, implying it might not be
/// the best header.
fn insert_during_diff(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader);

/// Called when a block has been connected to the best chain to ensure it is available to be
/// disconnected later if needed.
fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader);

/// Called when a block has been disconnected from the best chain. Once disconnected, a block's
/// header is no longer needed and thus can be removed.
fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option<ValidatedBlockHeader>;
/// Called when blocks have been disconnected from the best chain. Only the fork point
/// (best comon ancestor) is provided.
///
/// Once disconnected, a block's header is no longer needed and thus can be removed.
fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader);
}

/// The maximum number of [`ValidatedBlockHeader`]s stored in a [`HeaderCache`].
pub const HEADER_CACHE_LIMIT: u32 = 6 * 24 * 7;

/// Bounded cache of block headers keyed by block hash.
///
/// Retains only the latest [`HEADER_CACHE_LIMIT`] block headers based on height.
pub struct HeaderCache(std::collections::HashMap<BlockHash, ValidatedBlockHeader>);

impl HeaderCache {
/// Creates a new empty header cache.
pub fn new() -> Self {
Self(std::collections::HashMap::new())
}
}

/// Unbounded cache of block headers keyed by block hash.
pub type UnboundedCache = std::collections::HashMap<BlockHash, ValidatedBlockHeader>;
impl Cache for HeaderCache {
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
self.0.get(block_hash)
}

fn insert_during_diff(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) {
self.0.insert(block_hash, block_header);

// Remove headers older than our newest header minus a week.
let best_height = self.0.iter().map(|(_, header)| header.height).max().unwrap_or(0);
let cutoff_height = best_height.saturating_sub(HEADER_CACHE_LIMIT);
self.0.retain(|_, header| header.height >= cutoff_height);
}

fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) {
self.0.insert(block_hash, block_header);

// Remove headers older than a week.
let cutoff_height = block_header.height.saturating_sub(HEADER_CACHE_LIMIT);
self.0.retain(|_, header| header.height >= cutoff_height);
}

fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) {
self.0.retain(|_, block_info| block_info.height <= fork_point.height);
}
}

impl Cache for UnboundedCache {
impl Cache for &mut HeaderCache {
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
self.get(block_hash)
self.0.get(block_hash)
}

fn insert_during_diff(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) {
(*self).insert_during_diff(block_hash, block_header);
}

fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) {
self.insert(block_hash, block_header);
(*self).block_connected(block_hash, block_header);
}

fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option<ValidatedBlockHeader> {
self.remove(block_hash)
fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) {
self.0.retain(|_, block_info| block_info.height <= fork_point.height);
}
}

impl<'a, P: Poll, C: Cache, L: Deref> SpvClient<'a, P, C, L>
impl<P: Poll, L: Deref> SpvClient<P, L>
where
L::Target: chain::Listen,
{
Expand All @@ -239,7 +288,7 @@ where
///
/// [`poll_best_tip`]: SpvClient::poll_best_tip
pub fn new(
chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: &'a mut C,
chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: HeaderCache,
chain_listener: L,
) -> Self {
let chain_notifier = ChainNotifier { header_cache, chain_listener };
Expand Down Expand Up @@ -293,15 +342,15 @@ where
/// Notifies [listeners] of blocks that have been connected or disconnected from the chain.
///
/// [listeners]: lightning::chain::Listen
pub struct ChainNotifier<'a, C: Cache, L: Deref>
pub(crate) struct ChainNotifier<C: Cache, L: Deref>
where
L::Target: chain::Listen,
{
/// Cache for looking up headers before fetching from a block source.
header_cache: &'a mut C,
pub(crate) header_cache: C,

/// Listener that will be notified of connected or disconnected blocks.
chain_listener: L,
pub(crate) chain_listener: L,
}

/// Changes made to the chain between subsequent polls that transformed it from having one chain tip
Expand All @@ -315,14 +364,11 @@ struct ChainDifference {
/// If there are any disconnected blocks, this is where the chain forked.
common_ancestor: ValidatedBlockHeader,

/// Blocks that were disconnected from the chain since the last poll.
disconnected_blocks: Vec<ValidatedBlockHeader>,

/// Blocks that were connected to the chain since the last poll.
connected_blocks: Vec<ValidatedBlockHeader>,
}

impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L>
impl<C: Cache, L: Deref> ChainNotifier<C, L>
where
L::Target: chain::Listen,
{
Expand All @@ -338,23 +384,66 @@ where
chain_poller: &mut P,
) -> Result<(), (BlockSourceError, Option<ValidatedBlockHeader>)> {
let difference = self
.find_difference(new_header, old_header, chain_poller)
.find_difference_from_header(new_header, old_header, chain_poller)
.await
.map_err(|e| (e, None))?;
self.disconnect_blocks(difference.disconnected_blocks);
if difference.common_ancestor != *old_header {
self.disconnect_blocks(difference.common_ancestor);
}
self.connect_blocks(difference.common_ancestor, difference.connected_blocks, chain_poller)
.await
}

/// Returns the changes needed to produce the chain with `current_header` as its tip from the
/// chain with `prev_best_block` as its tip.
///
/// First resolves `prev_best_block` to a `ValidatedBlockHeader` using the `previous_blocks`
/// field as fallback if needed, then finds the common ancestor.
async fn find_difference_from_best_block<P: Poll>(
&mut self, current_header: ValidatedBlockHeader, prev_best_block: BestBlock,
chain_poller: &mut P,
) -> BlockSourceResult<ChainDifference> {
// Try to resolve the header for the previous best block. First try the block_hash,
// then fall back to previous_blocks if that fails.
let cur_tip = core::iter::once((0, &prev_best_block.block_hash));
let prev_tips =
prev_best_block.previous_blocks.iter().enumerate().filter_map(|(idx, hash_opt)| {
if let Some(block_hash) = hash_opt {
Some((idx as u32 + 1, block_hash))
} else {
None
}
});
let mut found_header = None;
for (height_diff, block_hash) in cur_tip.chain(prev_tips) {
if let Some(header) = self.header_cache.look_up(block_hash) {
found_header = Some(*header);
break;
}
let height = prev_best_block.height.checked_sub(height_diff).ok_or(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be validated just as well once outside the loop?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean sure but it seemed like cleaner code to do it in the loop where we need to calculate the height for each block anyway?

BlockSourceError::persistent("BestBlock had more previous_blocks than its height"),
)?;
if let Ok(header) = chain_poller.get_header(block_hash, Some(height)).await {
found_header = Some(header);
self.header_cache.insert_during_diff(*block_hash, header);
break;
}
}
let found_header = found_header.ok_or_else(|| {
BlockSourceError::persistent("could not resolve any block from BestBlock")
})?;

self.find_difference_from_header(current_header, &found_header, chain_poller).await
}

/// Returns the changes needed to produce the chain with `current_header` as its tip from the
/// chain with `prev_header` as its tip.
///
/// Walks backwards from `current_header` and `prev_header`, finding the common ancestor.
async fn find_difference<P: Poll>(
async fn find_difference_from_header<P: Poll>(
&self, current_header: ValidatedBlockHeader, prev_header: &ValidatedBlockHeader,
chain_poller: &mut P,
) -> BlockSourceResult<ChainDifference> {
let mut disconnected_blocks = Vec::new();
let mut connected_blocks = Vec::new();
let mut current = current_header;
let mut previous = *prev_header;
Expand All @@ -369,7 +458,6 @@ where
let current_height = current.height;
let previous_height = previous.height;
if current_height <= previous_height {
disconnected_blocks.push(previous);
previous = self.look_up_previous_header(chain_poller, &previous).await?;
}
if current_height >= previous_height {
Expand All @@ -379,7 +467,7 @@ where
}

let common_ancestor = current;
Ok(ChainDifference { common_ancestor, disconnected_blocks, connected_blocks })
Ok(ChainDifference { common_ancestor, connected_blocks })
}

/// Returns the previous header for the given header, either by looking it up in the cache or
Expand All @@ -394,16 +482,10 @@ where
}

/// Notifies the chain listeners of disconnected blocks.
fn disconnect_blocks(&mut self, disconnected_blocks: Vec<ValidatedBlockHeader>) {
for header in disconnected_blocks.iter() {
if let Some(cached_header) = self.header_cache.block_disconnected(&header.block_hash) {
assert_eq!(cached_header, *header);
}
}
if let Some(block) = disconnected_blocks.last() {
let fork_point = BestBlock::new(block.header.prev_blockhash, block.height - 1);
self.chain_listener.blocks_disconnected(fork_point);
}
fn disconnect_blocks(&mut self, fork_point: ValidatedBlockHeader) {
self.header_cache.blocks_disconnected(&fork_point);
let best_block = BestBlock::new(fork_point.block_hash, fork_point.height);
self.chain_listener.blocks_disconnected(best_block);
}

/// Notifies the chain listeners of connected blocks.
Expand Down Expand Up @@ -447,9 +529,9 @@ mod spv_client_tests {
let best_tip = chain.at_height(1);

let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
let mut cache = UnboundedCache::new();
let cache = HeaderCache::new();
let mut listener = NullChainListener {};
let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener);
let mut client = SpvClient::new(best_tip, poller, cache, &mut listener);
match client.poll_best_tip().await {
Err(e) => {
assert_eq!(e.kind(), BlockSourceErrorKind::Persistent);
Expand All @@ -466,9 +548,9 @@ mod spv_client_tests {
let common_tip = chain.tip();

let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
let mut cache = UnboundedCache::new();
let cache = HeaderCache::new();
let mut listener = NullChainListener {};
let mut client = SpvClient::new(common_tip, poller, &mut cache, &mut listener);
let mut client = SpvClient::new(common_tip, poller, cache, &mut listener);
match client.poll_best_tip().await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok((chain_tip, blocks_connected)) => {
Expand All @@ -486,9 +568,9 @@ mod spv_client_tests {
let old_tip = chain.at_height(1);

let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
let mut cache = UnboundedCache::new();
let cache = HeaderCache::new();
let mut listener = NullChainListener {};
let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener);
let mut client = SpvClient::new(old_tip, poller, cache, &mut listener);
match client.poll_best_tip().await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok((chain_tip, blocks_connected)) => {
Expand All @@ -506,9 +588,9 @@ mod spv_client_tests {
let old_tip = chain.at_height(1);

let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
let mut cache = UnboundedCache::new();
let cache = HeaderCache::new();
let mut listener = NullChainListener {};
let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener);
let mut client = SpvClient::new(old_tip, poller, cache, &mut listener);
match client.poll_best_tip().await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok((chain_tip, blocks_connected)) => {
Expand All @@ -526,9 +608,9 @@ mod spv_client_tests {
let old_tip = chain.at_height(1);

let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
let mut cache = UnboundedCache::new();
let cache = HeaderCache::new();
let mut listener = NullChainListener {};
let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener);
let mut client = SpvClient::new(old_tip, poller, cache, &mut listener);
match client.poll_best_tip().await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok((chain_tip, blocks_connected)) => {
Expand All @@ -547,9 +629,9 @@ mod spv_client_tests {
let worse_tip = chain.tip();

let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
let mut cache = UnboundedCache::new();
let cache = HeaderCache::new();
let mut listener = NullChainListener {};
let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener);
let mut client = SpvClient::new(best_tip, poller, cache, &mut listener);
match client.poll_best_tip().await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok((chain_tip, blocks_connected)) => {
Expand Down
Loading
Loading