From 2875802f78872af61c0c50a5f2e81ede8c1afa0f Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 26 Jan 2026 13:54:37 +0100 Subject: [PATCH 1/3] Remove `rustfmt::skip` from `utxo.rs` Signed-off-by: Elias Rohrer --- lightning/src/routing/utxo.rs | 633 +++++++++++++++++++++++----------- 1 file changed, 432 insertions(+), 201 deletions(-) diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index f46160f1f14..962f3ae691d 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -189,9 +189,8 @@ impl PendingChecks { /// Checks if there is a pending `channel_update` UTXO validation for the given channel, /// and, if so, stores the channel message for handling later and returns an `Err`. - #[rustfmt::skip] pub(super) fn check_hold_pending_channel_update( - &self, msg: &msgs::UnsignedChannelUpdate, full_msg: Option<&msgs::ChannelUpdate> + &self, msg: &msgs::UnsignedChannelUpdate, full_msg: Option<&msgs::ChannelUpdate>, ) -> Result<(), LightningError> { let mut pending_checks = self.internal.lock().unwrap(); if let hash_map::Entry::Occupied(e) = pending_checks.channels.entry(msg.short_channel_id) { @@ -200,25 +199,32 @@ impl PendingChecks { Some(msgs_ref) => { let mut messages = msgs_ref.lock().unwrap(); let latest_update = if is_from_a { - &mut messages.latest_channel_update_a - } else { - &mut messages.latest_channel_update_b - }; - if latest_update.is_none() || latest_update.as_ref().unwrap().timestamp() < msg.timestamp { + &mut messages.latest_channel_update_a + } else { + &mut messages.latest_channel_update_b + }; + if latest_update.is_none() + || latest_update.as_ref().unwrap().timestamp() < msg.timestamp + { // If the messages we got has a higher timestamp, just blindly assume the // signatures on the new message are correct and drop the old message. This // may cause us to end up dropping valid `channel_update`s if a peer is // malicious, but we should get the correct ones when the node updates them. - *latest_update = Some( - if let Some(msg) = full_msg { ChannelUpdate::Full(msg.clone()) } - else { ChannelUpdate::Unsigned(msg.clone()) }); + *latest_update = Some(if let Some(msg) = full_msg { + ChannelUpdate::Full(msg.clone()) + } else { + ChannelUpdate::Unsigned(msg.clone()) + }); } return Err(LightningError { - err: "Awaiting channel_announcement validation to accept channel_update".to_owned(), + err: "Awaiting channel_announcement validation to accept channel_update" + .to_owned(), action: ErrorAction::IgnoreAndLog(Level::Gossip), }); }, - None => { e.remove(); }, + None => { + e.remove(); + }, } } Ok(()) @@ -226,45 +232,49 @@ impl PendingChecks { /// Checks if there is a pending `node_announcement` UTXO validation for a channel with the /// given node and, if so, stores the channel message for handling later and returns an `Err`. - #[rustfmt::skip] pub(super) fn check_hold_pending_node_announcement( - &self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement> + &self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>, ) -> Result<(), LightningError> { let mut pending_checks = self.internal.lock().unwrap(); if let hash_map::Entry::Occupied(mut e) = pending_checks.nodes.entry(msg.node_id) { let mut found_at_least_one_chan = false; - e.get_mut().retain(|node_msgs| { - match Weak::upgrade(&node_msgs) { - Some(chan_mtx) => { - let mut chan_msgs = chan_mtx.lock().unwrap(); - if let Some(chan_announce) = &chan_msgs.channel_announce { - let latest_announce = - if *chan_announce.node_id_1() == msg.node_id { - &mut chan_msgs.latest_node_announce_a - } else { - &mut chan_msgs.latest_node_announce_b - }; - if latest_announce.is_none() || - latest_announce.as_ref().unwrap().timestamp() < msg.timestamp - { - *latest_announce = Some( - if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) } - else { NodeAnnouncement::Unsigned(msg.clone()) }); - } - found_at_least_one_chan = true; - true + e.get_mut().retain(|node_msgs| match Weak::upgrade(&node_msgs) { + Some(chan_mtx) => { + let mut chan_msgs = chan_mtx.lock().unwrap(); + if let Some(chan_announce) = &chan_msgs.channel_announce { + let latest_announce = if *chan_announce.node_id_1() == msg.node_id { + &mut chan_msgs.latest_node_announce_a } else { - debug_assert!(false, "channel_announce is set before struct is added to node map"); - false + &mut chan_msgs.latest_node_announce_b + }; + if latest_announce.is_none() + || latest_announce.as_ref().unwrap().timestamp() < msg.timestamp + { + *latest_announce = Some(if let Some(msg) = full_msg { + NodeAnnouncement::Full(msg.clone()) + } else { + NodeAnnouncement::Unsigned(msg.clone()) + }); } - }, - None => false, - } + found_at_least_one_chan = true; + true + } else { + debug_assert!( + false, + "channel_announce is set before struct is added to node map" + ); + false + } + }, + None => false, }); - if e.get().is_empty() { e.remove(); } + if e.get().is_empty() { + e.remove(); + } if found_at_least_one_chan { return Err(LightningError { - err: "Awaiting channel_announcement validation to accept node_announcement".to_owned(), + err: "Awaiting channel_announcement validation to accept node_announcement" + .to_owned(), action: ErrorAction::IgnoreAndLog(Level::Gossip), }); } @@ -272,10 +282,10 @@ impl PendingChecks { Ok(()) } - #[rustfmt::skip] - fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement, - full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option>>, - pending_channels: &mut HashMap>> + fn check_replace_previous_entry( + msg: &msgs::UnsignedChannelAnnouncement, full_msg: Option<&msgs::ChannelAnnouncement>, + replacement: Option>>, + pending_channels: &mut HashMap>>, ) -> Result<(), msgs::LightningError> { match pending_channels.entry(msg.short_channel_id) { hash_map::Entry::Occupied(mut e) => { @@ -287,8 +297,13 @@ impl PendingChecks { // This may be called with the mutex held on a different UtxoMessages // struct, however in that case we have a global lockorder of new messages // -> old messages, which makes this safe. - let pending_matches = match &pending_msgs.unsafe_well_ordered_double_lock_self().channel_announce { - Some(ChannelAnnouncement::Full(pending_msg)) => Some(pending_msg) == full_msg, + let pending_matches = match &pending_msgs + .unsafe_well_ordered_double_lock_self() + .channel_announce + { + Some(ChannelAnnouncement::Full(pending_msg)) => { + Some(pending_msg) == full_msg + }, Some(ChannelAnnouncement::Unsigned(pending_msg)) => pending_msg == msg, None => { // This shouldn't actually be reachable. We set the @@ -320,54 +335,66 @@ impl PendingChecks { // so just remove/replace it and move on. if let Some(item) = replacement { *e.get_mut() = item; - } else { e.remove(); } + } else { + e.remove(); + } }, } }, hash_map::Entry::Vacant(v) => { - if let Some(item) = replacement { v.insert(item); } + if let Some(item) = replacement { + v.insert(item); + } }, } Ok(()) } - #[rustfmt::skip] - pub(super) fn check_channel_announcement(&self, - utxo_lookup: &Option, msg: &msgs::UnsignedChannelAnnouncement, - full_msg: Option<&msgs::ChannelAnnouncement> - ) -> Result, msgs::LightningError> where U::Target: UtxoLookup { - let handle_result = |res| { - match res { - Ok(TxOut { value, script_pubkey }) => { - let expected_script = - make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_array(), msg.bitcoin_key_2.as_array()).to_p2wsh(); - if script_pubkey != expected_script { - return Err(LightningError{ - err: format!("Channel announcement key ({}) didn't match on-chain script ({})", - expected_script.to_hex_string(), script_pubkey.to_hex_string()), - action: ErrorAction::IgnoreError - }); - } - Ok(Some(value)) - }, - Err(UtxoLookupError::UnknownChain) => { - Err(LightningError { - err: format!("Channel announced on an unknown chain ({})", - msg.chain_hash.to_bytes().as_hex()), - action: ErrorAction::IgnoreError - }) - }, - Err(UtxoLookupError::UnknownTx) => { - Err(LightningError { - err: "Channel announced without corresponding UTXO entry".to_owned(), - action: ErrorAction::IgnoreError - }) - }, - } + pub(super) fn check_channel_announcement( + &self, utxo_lookup: &Option, msg: &msgs::UnsignedChannelAnnouncement, + full_msg: Option<&msgs::ChannelAnnouncement>, + ) -> Result, msgs::LightningError> + where + U::Target: UtxoLookup, + { + let handle_result = |res| match res { + Ok(TxOut { value, script_pubkey }) => { + let expected_script = make_funding_redeemscript_from_slices( + msg.bitcoin_key_1.as_array(), + msg.bitcoin_key_2.as_array(), + ) + .to_p2wsh(); + if script_pubkey != expected_script { + return Err(LightningError { + err: format!( + "Channel announcement key ({}) didn't match on-chain script ({})", + expected_script.to_hex_string(), + script_pubkey.to_hex_string() + ), + action: ErrorAction::IgnoreError, + }); + } + Ok(Some(value)) + }, + Err(UtxoLookupError::UnknownChain) => Err(LightningError { + err: format!( + "Channel announced on an unknown chain ({})", + msg.chain_hash.to_bytes().as_hex() + ), + action: ErrorAction::IgnoreError, + }), + Err(UtxoLookupError::UnknownTx) => Err(LightningError { + err: "Channel announced without corresponding UTXO entry".to_owned(), + action: ErrorAction::IgnoreError, + }), }; - Self::check_replace_previous_entry(msg, full_msg, None, - &mut self.internal.lock().unwrap().channels)?; + Self::check_replace_previous_entry( + msg, + full_msg, + None, + &mut self.internal.lock().unwrap().channels, + )?; match utxo_lookup { &None => { @@ -386,15 +413,27 @@ impl PendingChecks { // handle the result in-line. handle_result(res) } else { - Self::check_replace_previous_entry(msg, full_msg, - Some(Arc::downgrade(&future.state)), &mut pending_checks.channels)?; - async_messages.channel_announce = Some( - if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) } - else { ChannelAnnouncement::Unsigned(msg.clone()) }); - pending_checks.nodes.entry(msg.node_id_1) - .or_default().push(Arc::downgrade(&future.state)); - pending_checks.nodes.entry(msg.node_id_2) - .or_default().push(Arc::downgrade(&future.state)); + Self::check_replace_previous_entry( + msg, + full_msg, + Some(Arc::downgrade(&future.state)), + &mut pending_checks.channels, + )?; + async_messages.channel_announce = Some(if let Some(msg) = full_msg { + ChannelAnnouncement::Full(msg.clone()) + } else { + ChannelAnnouncement::Unsigned(msg.clone()) + }); + pending_checks + .nodes + .entry(msg.node_id_1) + .or_default() + .push(Arc::downgrade(&future.state)); + pending_checks + .nodes + .entry(msg.node_id_2) + .or_default() + .push(Arc::downgrade(&future.state)); Err(LightningError { err: "Channel being checked async".to_owned(), action: ErrorAction::IgnoreAndLog(Level::Gossip), @@ -402,7 +441,7 @@ impl PendingChecks { } }, } - } + }, } } @@ -419,16 +458,13 @@ impl PendingChecks { /// Returns true if there are a large number of async checks pending and future /// `channel_announcement` messages should be delayed. Note that this is only a hint and /// messages already in-flight may still have to be handled for various reasons. - #[rustfmt::skip] pub(super) fn too_many_checks_pending(&self) -> bool { let mut pending_checks = self.internal.lock().unwrap(); if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS { // If we have many channel checks pending, ensure we don't have any dangling checks // (i.e. checks where the user told us they'd call back but drop'd the `UtxoFuture` // instead) before we commit to applying backpressure. - pending_checks.channels.retain(|_, chan| { - Weak::upgrade(&chan).is_some() - }); + pending_checks.channels.retain(|_, chan| Weak::upgrade(&chan).is_some()); pending_checks.nodes.retain(|_, channels| { channels.retain(|chan| Weak::upgrade(&chan).is_some()); !channels.is_empty() @@ -595,11 +631,17 @@ mod tests { (chain_source, network_graph) } - #[rustfmt::skip] - fn get_test_objects() -> (msgs::ChannelAnnouncement, TestChainSource, - NetworkGraph>, bitcoin::ScriptBuf, msgs::NodeAnnouncement, - msgs::NodeAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, msgs::ChannelUpdate) - { + fn get_test_objects() -> ( + msgs::ChannelAnnouncement, + TestChainSource, + NetworkGraph>, + bitcoin::ScriptBuf, + msgs::NodeAnnouncement, + msgs::NodeAnnouncement, + msgs::ChannelUpdate, + msgs::ChannelUpdate, + msgs::ChannelUpdate, + ) { let secp_ctx = Secp256k1::new(); let (chain_source, network_graph) = get_network(); @@ -607,23 +649,40 @@ mod tests { let good_script = get_channel_script(&secp_ctx); let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap(); let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); - let valid_announcement = get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx); + let valid_announcement = + get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx); let node_a_announce = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx); let node_b_announce = get_signed_node_announcement(|_| {}, node_2_privkey, &secp_ctx); // Note that we have to set the "direction" flag correctly on both messages - let chan_update_a = get_signed_channel_update(|msg| msg.channel_flags = 0, node_1_privkey, &secp_ctx); - let chan_update_b = get_signed_channel_update(|msg| msg.channel_flags = 1, node_2_privkey, &secp_ctx); - let chan_update_c = get_signed_channel_update(|msg| { - msg.channel_flags = 1; msg.timestamp += 1; }, node_2_privkey, &secp_ctx); - - (valid_announcement, chain_source, network_graph, good_script, node_a_announce, - node_b_announce, chan_update_a, chan_update_b, chan_update_c) + let chan_update_a = + get_signed_channel_update(|msg| msg.channel_flags = 0, node_1_privkey, &secp_ctx); + let chan_update_b = + get_signed_channel_update(|msg| msg.channel_flags = 1, node_2_privkey, &secp_ctx); + let chan_update_c = get_signed_channel_update( + |msg| { + msg.channel_flags = 1; + msg.timestamp += 1; + }, + node_2_privkey, + &secp_ctx, + ); + + ( + valid_announcement, + chain_source, + network_graph, + good_script, + node_a_announce, + node_b_announce, + chan_update_a, + chan_update_b, + chan_update_c, + ) } #[test] - #[rustfmt::skip] fn test_fast_async_lookup() { // Check that async lookups which resolve quicker than the future is returned to the // `get_utxo` call can read it still resolve properly. @@ -637,44 +696,81 @@ mod tests { network_graph.pending_checks.check_resolved_futures(&network_graph); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); - network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap(); - assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_some()); + network_graph + .update_channel_from_announcement(&valid_announcement, &Some(&chain_source)) + .unwrap(); + assert!(network_graph + .read_only() + .channels() + .get(&valid_announcement.contents.short_channel_id) + .is_some()); } #[test] - #[rustfmt::skip] fn test_async_lookup() { // Test a simple async lookup - let (valid_announcement, chain_source, network_graph, good_script, - node_a_announce, node_b_announce, ..) = get_test_objects(); + let ( + valid_announcement, + chain_source, + network_graph, + good_script, + node_a_announce, + node_b_announce, + .., + ) = get_test_objects(); let notifier = Arc::new(Notifier::new()); let future = UtxoFuture::new(Arc::clone(¬ifier)); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); assert_eq!( - network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, - "Channel being checked async"); - assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + network_graph + .update_channel_from_announcement(&valid_announcement, &Some(&chain_source)) + .unwrap_err() + .err, + "Channel being checked async" + ); + assert!(network_graph + .read_only() + .channels() + .get(&valid_announcement.contents.short_channel_id) + .is_none()); future.resolve(Ok(TxOut { value: Amount::ZERO, script_pubkey: good_script })); assert!(notifier.notify_pending()); network_graph.pending_checks.check_resolved_futures(&network_graph); - network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap(); - network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap(); - - assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1) - .unwrap().announcement_info.is_none()); + network_graph + .read_only() + .channels() + .get(&valid_announcement.contents.short_channel_id) + .unwrap(); + network_graph + .read_only() + .channels() + .get(&valid_announcement.contents.short_channel_id) + .unwrap(); + + assert!(network_graph + .read_only() + .nodes() + .get(&valid_announcement.contents.node_id_1) + .unwrap() + .announcement_info + .is_none()); network_graph.update_node_from_announcement(&node_a_announce).unwrap(); network_graph.update_node_from_announcement(&node_b_announce).unwrap(); - assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1) - .unwrap().announcement_info.is_some()); + assert!(network_graph + .read_only() + .nodes() + .get(&valid_announcement.contents.node_id_1) + .unwrap() + .announcement_info + .is_some()); } #[test] - #[rustfmt::skip] fn test_invalid_async_lookup() { // Test an async lookup which returns an incorrect script let (valid_announcement, chain_source, network_graph, ..) = get_test_objects(); @@ -684,19 +780,30 @@ mod tests { *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); assert_eq!( - network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, - "Channel being checked async"); - assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + network_graph + .update_channel_from_announcement(&valid_announcement, &Some(&chain_source)) + .unwrap_err() + .err, + "Channel being checked async" + ); + assert!(network_graph + .read_only() + .channels() + .get(&valid_announcement.contents.short_channel_id) + .is_none()); let value = Amount::from_sat(1_000_000); future.resolve(Ok(TxOut { value, script_pubkey: bitcoin::ScriptBuf::new() })); assert!(notifier.notify_pending()); network_graph.pending_checks.check_resolved_futures(&network_graph); - assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + assert!(network_graph + .read_only() + .channels() + .get(&valid_announcement.contents.short_channel_id) + .is_none()); } #[test] - #[rustfmt::skip] fn test_failing_async_lookup() { // Test an async lookup which returns an error let (valid_announcement, chain_source, network_graph, ..) = get_test_objects(); @@ -706,44 +813,78 @@ mod tests { *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); assert_eq!( - network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, - "Channel being checked async"); - assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + network_graph + .update_channel_from_announcement(&valid_announcement, &Some(&chain_source)) + .unwrap_err() + .err, + "Channel being checked async" + ); + assert!(network_graph + .read_only() + .channels() + .get(&valid_announcement.contents.short_channel_id) + .is_none()); future.resolve(Err(UtxoLookupError::UnknownTx)); assert!(notifier.notify_pending()); network_graph.pending_checks.check_resolved_futures(&network_graph); - assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + assert!(network_graph + .read_only() + .channels() + .get(&valid_announcement.contents.short_channel_id) + .is_none()); } #[test] - #[rustfmt::skip] fn test_updates_async_lookup() { // Test async lookups will process pending channel_update/node_announcements once they // complete. - let (valid_announcement, chain_source, network_graph, good_script, node_a_announce, - node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects(); + let ( + valid_announcement, + chain_source, + network_graph, + good_script, + node_a_announce, + node_b_announce, + chan_update_a, + chan_update_b, + .., + ) = get_test_objects(); let notifier = Arc::new(Notifier::new()); let future = UtxoFuture::new(Arc::clone(¬ifier)); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); assert_eq!( - network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, - "Channel being checked async"); - assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + network_graph + .update_channel_from_announcement(&valid_announcement, &Some(&chain_source)) + .unwrap_err() + .err, + "Channel being checked async" + ); + assert!(network_graph + .read_only() + .channels() + .get(&valid_announcement.contents.short_channel_id) + .is_none()); assert_eq!( network_graph.update_node_from_announcement(&node_a_announce).unwrap_err().err, - "Awaiting channel_announcement validation to accept node_announcement"); + "Awaiting channel_announcement validation to accept node_announcement" + ); assert_eq!( network_graph.update_node_from_announcement(&node_b_announce).unwrap_err().err, - "Awaiting channel_announcement validation to accept node_announcement"); + "Awaiting channel_announcement validation to accept node_announcement" + ); - assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err, - "Awaiting channel_announcement validation to accept channel_update"); - assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err, - "Awaiting channel_announcement validation to accept channel_update"); + assert_eq!( + network_graph.update_channel(&chan_update_a).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update" + ); + assert_eq!( + network_graph.update_channel(&chan_update_b).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update" + ); assert!(!notifier.notify_pending()); future @@ -751,58 +892,114 @@ mod tests { assert!(notifier.notify_pending()); network_graph.pending_checks.check_resolved_futures(&network_graph); - assert!(network_graph.read_only().channels() - .get(&valid_announcement.contents.short_channel_id).unwrap().one_to_two.is_some()); - assert!(network_graph.read_only().channels() - .get(&valid_announcement.contents.short_channel_id).unwrap().two_to_one.is_some()); - - assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1) - .unwrap().announcement_info.is_some()); - assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_2) - .unwrap().announcement_info.is_some()); + assert!(network_graph + .read_only() + .channels() + .get(&valid_announcement.contents.short_channel_id) + .unwrap() + .one_to_two + .is_some()); + assert!(network_graph + .read_only() + .channels() + .get(&valid_announcement.contents.short_channel_id) + .unwrap() + .two_to_one + .is_some()); + + assert!(network_graph + .read_only() + .nodes() + .get(&valid_announcement.contents.node_id_1) + .unwrap() + .announcement_info + .is_some()); + assert!(network_graph + .read_only() + .nodes() + .get(&valid_announcement.contents.node_id_2) + .unwrap() + .announcement_info + .is_some()); } #[test] - #[rustfmt::skip] fn test_latest_update_async_lookup() { // Test async lookups will process the latest channel_update if two are received while // awaiting an async UTXO lookup. - let (valid_announcement, chain_source, network_graph, good_script, _, - _, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects(); + let ( + valid_announcement, + chain_source, + network_graph, + good_script, + _, + _, + chan_update_a, + chan_update_b, + chan_update_c, + .., + ) = get_test_objects(); let notifier = Arc::new(Notifier::new()); let future = UtxoFuture::new(Arc::clone(¬ifier)); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); assert_eq!( - network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, - "Channel being checked async"); - assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + network_graph + .update_channel_from_announcement(&valid_announcement, &Some(&chain_source)) + .unwrap_err() + .err, + "Channel being checked async" + ); + assert!(network_graph + .read_only() + .channels() + .get(&valid_announcement.contents.short_channel_id) + .is_none()); - assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err, - "Awaiting channel_announcement validation to accept channel_update"); - assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err, - "Awaiting channel_announcement validation to accept channel_update"); - assert_eq!(network_graph.update_channel(&chan_update_c).unwrap_err().err, - "Awaiting channel_announcement validation to accept channel_update"); + assert_eq!( + network_graph.update_channel(&chan_update_a).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update" + ); + assert_eq!( + network_graph.update_channel(&chan_update_b).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update" + ); + assert_eq!( + network_graph.update_channel(&chan_update_c).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update" + ); assert!(!notifier.notify_pending()); - future.resolve(Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); + future + .resolve(Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); assert!(notifier.notify_pending()); network_graph.pending_checks.check_resolved_futures(&network_graph); assert_eq!(chan_update_a.contents.timestamp, chan_update_b.contents.timestamp); let graph_lock = network_graph.read_only(); - assert!(graph_lock.channels() - .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap() - .one_to_two.as_ref().unwrap().last_update != - graph_lock.channels() - .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap() - .two_to_one.as_ref().unwrap().last_update); + assert!( + graph_lock + .channels() + .get(&valid_announcement.contents.short_channel_id) + .as_ref() + .unwrap() + .one_to_two + .as_ref() + .unwrap() + .last_update != graph_lock + .channels() + .get(&valid_announcement.contents.short_channel_id) + .as_ref() + .unwrap() + .two_to_one + .as_ref() + .unwrap() + .last_update + ); } #[test] - #[rustfmt::skip] fn test_no_double_lookups() { // Test that a pending async lookup will prevent a second async lookup from flying, but // only if the channel_announcement message is identical. @@ -813,8 +1010,12 @@ mod tests { *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); assert_eq!( - network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, - "Channel being checked async"); + network_graph + .update_channel_from_announcement(&valid_announcement, &Some(&chain_source)) + .unwrap_err() + .err, + "Channel being checked async" + ); assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1); // If we make a second request with the same message, the call count doesn't increase... @@ -822,8 +1023,12 @@ mod tests { let future_b = UtxoFuture::new(Arc::clone(¬ifier_b)); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future_b.clone()); assert_eq!( - network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, - "Channel announcement is already being checked"); + network_graph + .update_channel_from_announcement(&valid_announcement, &Some(&chain_source)) + .unwrap_err() + .err, + "Channel announcement is already being checked" + ); assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1); // But if we make a third request with a tweaked message, we should get a second call @@ -831,10 +1036,15 @@ mod tests { let secp_ctx = Secp256k1::new(); let replacement_pk_1 = &SecretKey::from_slice(&[99; 32]).unwrap(); let replacement_pk_2 = &SecretKey::from_slice(&[98; 32]).unwrap(); - let invalid_announcement = get_signed_channel_announcement(|_| {}, replacement_pk_1, replacement_pk_2, &secp_ctx); + let invalid_announcement = + get_signed_channel_announcement(|_| {}, replacement_pk_1, replacement_pk_2, &secp_ctx); assert_eq!( - network_graph.update_channel_from_announcement(&invalid_announcement, &Some(&chain_source)).unwrap_err().err, - "Channel being checked async"); + network_graph + .update_channel_from_announcement(&invalid_announcement, &Some(&chain_source)) + .unwrap_err() + .err, + "Channel being checked async" + ); assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 2); // Still, if we resolve the original future, the original channel will be accepted. @@ -843,14 +1053,20 @@ mod tests { assert!(notifier_a.notify_pending()); assert!(!notifier_b.notify_pending()); network_graph.pending_checks.check_resolved_futures(&network_graph); - assert!(!network_graph.read_only().channels() - .get(&valid_announcement.contents.short_channel_id).unwrap() - .announcement_message.as_ref().unwrap() - .contents.features.supports_unknown_test_feature()); + assert!(!network_graph + .read_only() + .channels() + .get(&valid_announcement.contents.short_channel_id) + .unwrap() + .announcement_message + .as_ref() + .unwrap() + .contents + .features + .supports_unknown_test_feature()); } #[test] - #[rustfmt::skip] fn test_checks_backpressure() { // Test that too_many_checks_pending returns true when there are many checks pending, and // returns false once they complete. @@ -867,14 +1083,22 @@ mod tests { for i in 0..PendingChecks::MAX_PENDING_LOOKUPS { let valid_announcement = get_signed_channel_announcement( - |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx); - network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err(); + |msg| msg.short_channel_id += 1 + i as u64, + node_1_privkey, + node_2_privkey, + &secp_ctx, + ); + network_graph + .update_channel_from_announcement(&valid_announcement, &Some(&chain_source)) + .unwrap_err(); assert!(!network_graph.pending_checks.too_many_checks_pending()); } - let valid_announcement = get_signed_channel_announcement( - |_| {}, node_1_privkey, node_2_privkey, &secp_ctx); - network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err(); + let valid_announcement = + get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx); + network_graph + .update_channel_from_announcement(&valid_announcement, &Some(&chain_source)) + .unwrap_err(); assert!(network_graph.pending_checks.too_many_checks_pending()); // Once the future completes the "too many checks" flag should reset. @@ -885,7 +1109,6 @@ mod tests { } #[test] - #[rustfmt::skip] fn test_checks_backpressure_drop() { // Test that too_many_checks_pending returns true when there are many checks pending, and // returns false if we drop some of the futures without completion. @@ -901,14 +1124,22 @@ mod tests { for i in 0..PendingChecks::MAX_PENDING_LOOKUPS { let valid_announcement = get_signed_channel_announcement( - |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx); - network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err(); + |msg| msg.short_channel_id += 1 + i as u64, + node_1_privkey, + node_2_privkey, + &secp_ctx, + ); + network_graph + .update_channel_from_announcement(&valid_announcement, &Some(&chain_source)) + .unwrap_err(); assert!(!network_graph.pending_checks.too_many_checks_pending()); } - let valid_announcement = get_signed_channel_announcement( - |_| {}, node_1_privkey, node_2_privkey, &secp_ctx); - network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err(); + let valid_announcement = + get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx); + network_graph + .update_channel_from_announcement(&valid_announcement, &Some(&chain_source)) + .unwrap_err(); assert!(network_graph.pending_checks.too_many_checks_pending()); // Once the future is drop'd (by resetting the `utxo_ret` value) the "too many checks" flag From 7cfc19c6d9ea821ffa72d1054fcf1d87178cb23d Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 28 Jan 2026 12:43:58 +0100 Subject: [PATCH 2/3] f Address `rustfmt` comments --- lightning/src/routing/utxo.rs | 144 ++++++++++------------------------ 1 file changed, 41 insertions(+), 103 deletions(-) diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index 962f3ae691d..f869d573d92 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -655,20 +655,6 @@ mod tests { let node_a_announce = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx); let node_b_announce = get_signed_node_announcement(|_| {}, node_2_privkey, &secp_ctx); - // Note that we have to set the "direction" flag correctly on both messages - let chan_update_a = - get_signed_channel_update(|msg| msg.channel_flags = 0, node_1_privkey, &secp_ctx); - let chan_update_b = - get_signed_channel_update(|msg| msg.channel_flags = 1, node_2_privkey, &secp_ctx); - let chan_update_c = get_signed_channel_update( - |msg| { - msg.channel_flags = 1; - msg.timestamp += 1; - }, - node_2_privkey, - &secp_ctx, - ); - ( valid_announcement, chain_source, @@ -676,9 +662,17 @@ mod tests { good_script, node_a_announce, node_b_announce, - chan_update_a, - chan_update_b, - chan_update_c, + get_signed_channel_update(|msg| msg.channel_flags = 0, node_1_privkey, &secp_ctx), + get_signed_channel_update(|msg| msg.channel_flags = 1, node_2_privkey, &secp_ctx), + // Note that we have to set the "direction" flag correctly on both messages + get_signed_channel_update( + |msg| { + msg.channel_flags = 1; + msg.timestamp += 1; + }, + node_2_privkey, + &secp_ctx, + ), ) } @@ -687,6 +681,7 @@ mod tests { // Check that async lookups which resolve quicker than the future is returned to the // `get_utxo` call can read it still resolve properly. let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects(); + let scid = valid_announcement.contents.short_channel_id; let notifier = Arc::new(Notifier::new()); let future = UtxoFuture::new(Arc::clone(¬ifier)); @@ -699,11 +694,7 @@ mod tests { network_graph .update_channel_from_announcement(&valid_announcement, &Some(&chain_source)) .unwrap(); - assert!(network_graph - .read_only() - .channels() - .get(&valid_announcement.contents.short_channel_id) - .is_some()); + assert!(network_graph.read_only().channels().get(&scid).is_some()); } #[test] @@ -718,6 +709,8 @@ mod tests { node_b_announce, .., ) = get_test_objects(); + let scid = valid_announcement.contents.short_channel_id; + let node_id_1 = valid_announcement.contents.node_id_1; let notifier = Arc::new(Notifier::new()); let future = UtxoFuture::new(Arc::clone(¬ifier)); @@ -730,30 +723,18 @@ mod tests { .err, "Channel being checked async" ); - assert!(network_graph - .read_only() - .channels() - .get(&valid_announcement.contents.short_channel_id) - .is_none()); + assert!(network_graph.read_only().channels().get(&scid).is_none()); future.resolve(Ok(TxOut { value: Amount::ZERO, script_pubkey: good_script })); assert!(notifier.notify_pending()); network_graph.pending_checks.check_resolved_futures(&network_graph); - network_graph - .read_only() - .channels() - .get(&valid_announcement.contents.short_channel_id) - .unwrap(); - network_graph - .read_only() - .channels() - .get(&valid_announcement.contents.short_channel_id) - .unwrap(); + network_graph.read_only().channels().get(&scid).unwrap(); + network_graph.read_only().channels().get(&scid).unwrap(); assert!(network_graph .read_only() .nodes() - .get(&valid_announcement.contents.node_id_1) + .get(&node_id_1) .unwrap() .announcement_info .is_none()); @@ -764,7 +745,7 @@ mod tests { assert!(network_graph .read_only() .nodes() - .get(&valid_announcement.contents.node_id_1) + .get(&node_id_1) .unwrap() .announcement_info .is_some()); @@ -774,6 +755,7 @@ mod tests { fn test_invalid_async_lookup() { // Test an async lookup which returns an incorrect script let (valid_announcement, chain_source, network_graph, ..) = get_test_objects(); + let scid = valid_announcement.contents.short_channel_id; let notifier = Arc::new(Notifier::new()); let future = UtxoFuture::new(Arc::clone(¬ifier)); @@ -786,27 +768,20 @@ mod tests { .err, "Channel being checked async" ); - assert!(network_graph - .read_only() - .channels() - .get(&valid_announcement.contents.short_channel_id) - .is_none()); + assert!(network_graph.read_only().channels().get(&scid).is_none()); let value = Amount::from_sat(1_000_000); future.resolve(Ok(TxOut { value, script_pubkey: bitcoin::ScriptBuf::new() })); assert!(notifier.notify_pending()); network_graph.pending_checks.check_resolved_futures(&network_graph); - assert!(network_graph - .read_only() - .channels() - .get(&valid_announcement.contents.short_channel_id) - .is_none()); + assert!(network_graph.read_only().channels().get(&scid).is_none()); } #[test] fn test_failing_async_lookup() { // Test an async lookup which returns an error let (valid_announcement, chain_source, network_graph, ..) = get_test_objects(); + let scid = valid_announcement.contents.short_channel_id; let notifier = Arc::new(Notifier::new()); let future = UtxoFuture::new(Arc::clone(¬ifier)); @@ -819,20 +794,12 @@ mod tests { .err, "Channel being checked async" ); - assert!(network_graph - .read_only() - .channels() - .get(&valid_announcement.contents.short_channel_id) - .is_none()); + assert!(network_graph.read_only().channels().get(&scid).is_none()); future.resolve(Err(UtxoLookupError::UnknownTx)); assert!(notifier.notify_pending()); network_graph.pending_checks.check_resolved_futures(&network_graph); - assert!(network_graph - .read_only() - .channels() - .get(&valid_announcement.contents.short_channel_id) - .is_none()); + assert!(network_graph.read_only().channels().get(&scid).is_none()); } #[test] @@ -850,6 +817,7 @@ mod tests { chan_update_b, .., ) = get_test_objects(); + let scid = valid_announcement.contents.short_channel_id; let notifier = Arc::new(Notifier::new()); let future = UtxoFuture::new(Arc::clone(¬ifier)); @@ -862,11 +830,7 @@ mod tests { .err, "Channel being checked async" ); - assert!(network_graph - .read_only() - .channels() - .get(&valid_announcement.contents.short_channel_id) - .is_none()); + assert!(network_graph.read_only().channels().get(&scid).is_none()); assert_eq!( network_graph.update_node_from_announcement(&node_a_announce).unwrap_err().err, @@ -892,20 +856,8 @@ mod tests { assert!(notifier.notify_pending()); network_graph.pending_checks.check_resolved_futures(&network_graph); - assert!(network_graph - .read_only() - .channels() - .get(&valid_announcement.contents.short_channel_id) - .unwrap() - .one_to_two - .is_some()); - assert!(network_graph - .read_only() - .channels() - .get(&valid_announcement.contents.short_channel_id) - .unwrap() - .two_to_one - .is_some()); + assert!(network_graph.read_only().channels().get(&scid).unwrap().one_to_two.is_some()); + assert!(network_graph.read_only().channels().get(&scid).unwrap().two_to_one.is_some()); assert!(network_graph .read_only() @@ -939,6 +891,7 @@ mod tests { chan_update_c, .., ) = get_test_objects(); + let scid = valid_announcement.contents.short_channel_id; let notifier = Arc::new(Notifier::new()); let future = UtxoFuture::new(Arc::clone(¬ifier)); @@ -951,11 +904,7 @@ mod tests { .err, "Channel being checked async" ); - assert!(network_graph - .read_only() - .channels() - .get(&valid_announcement.contents.short_channel_id) - .is_none()); + assert!(network_graph.read_only().channels().get(&scid).is_none()); assert_eq!( network_graph.update_channel(&chan_update_a).unwrap_err().err, @@ -978,25 +927,13 @@ mod tests { assert_eq!(chan_update_a.contents.timestamp, chan_update_b.contents.timestamp); let graph_lock = network_graph.read_only(); - assert!( - graph_lock - .channels() - .get(&valid_announcement.contents.short_channel_id) - .as_ref() - .unwrap() - .one_to_two - .as_ref() - .unwrap() - .last_update != graph_lock - .channels() - .get(&valid_announcement.contents.short_channel_id) - .as_ref() - .unwrap() - .two_to_one - .as_ref() - .unwrap() - .last_update - ); + #[rustfmt::skip] + let one_to_two_update = + graph_lock.channels().get(&scid).as_ref().unwrap().one_to_two.as_ref().unwrap().last_update; + #[rustfmt::skip] + let two_to_one_update = + graph_lock.channels().get(&scid).as_ref().unwrap().two_to_one.as_ref().unwrap().last_update; + assert!(one_to_two_update != two_to_one_update); } #[test] @@ -1004,6 +941,7 @@ mod tests { // Test that a pending async lookup will prevent a second async lookup from flying, but // only if the channel_announcement message is identical. let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects(); + let scid = valid_announcement.contents.short_channel_id; let notifier_a = Arc::new(Notifier::new()); let future = UtxoFuture::new(Arc::clone(¬ifier_a)); @@ -1056,7 +994,7 @@ mod tests { assert!(!network_graph .read_only() .channels() - .get(&valid_announcement.contents.short_channel_id) + .get(&scid) .unwrap() .announcement_message .as_ref() From 0da488399db17e2a3ae6c4d8a1a175eacb7b30a5 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 28 Jan 2026 13:42:43 +0100 Subject: [PATCH 3/3] Fix race condition in async `UtxoFuture` resolution Previously, we refactored the `GossipVerifier` to not require holding a circular reference. As part of this, we moved to a model where the `UtxoFuture`s are now polled by the background processor which checks for completion through `get_and_clear_pending_msg_events`. However, as part of this refactor we introduced race-condition: as we only held `Weak` references in `PendingChecksContext` and the `UtxoFuture` was directly dropped by the `GossipVerifier` after calling `resolve`, the actual data was dropped with the future and gone when the background processor attempted to retrieve and apply it via `check_resolved_futures`. Here, we fix this issue by simply holding on to the `state` `Arc`s in a separate `pending_states` `Vec` that is only pruned in `check_resolved_futures`, ensuring any completed results are collected first. Signed-off-by: Elias Rohrer --- lightning/src/routing/utxo.rs | 37 ++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index f869d573d92..f3351aa0a33 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -166,6 +166,7 @@ impl UtxoFuture { } struct PendingChecksContext { + pending_states: Vec>>, channels: HashMap>>, nodes: HashMap>>>, } @@ -180,6 +181,7 @@ impl PendingChecks { pub(super) fn new() -> Self { PendingChecks { internal: Mutex::new(PendingChecksContext { + pending_states: Vec::new(), channels: new_hash_map(), nodes: new_hash_map(), }), @@ -413,6 +415,20 @@ impl PendingChecks { // handle the result in-line. handle_result(res) } else { + // To avoid cases where we drop the resolved data before it can be + // collected by `check_resolved_futures`, we here track all pending + // states at least until the next call of `check_resolved_futures`. + let pending_states = &mut pending_checks.pending_states; + if pending_states + .iter() + .find(|s| Arc::ptr_eq(s, &future.state)) + .is_none() + { + // We're not already tracking the future state, keep the `Arc` + // around. + pending_states.push(Arc::clone(&future.state)); + } + Self::check_replace_previous_entry( msg, full_msg, @@ -574,6 +590,21 @@ impl PendingChecks { let mut completed_states = Vec::new(); { let mut lck = self.internal.lock().unwrap(); + lck.pending_states.retain(|state| { + if state.lock().unwrap().complete.is_some() { + // We're done, collect the result and clean up. + completed_states.push(Arc::clone(&state)); + false + } else { + if Arc::strong_count(state) == 1 { + // The future has been dropped. + false + } else { + // It's still inflight. + true + } + } + }); lck.channels.retain(|_, state| { if let Some(state) = state.upgrade() { if state.lock().unwrap().complete.is_some() { @@ -1081,8 +1112,12 @@ mod tests { assert!(network_graph.pending_checks.too_many_checks_pending()); // Once the future is drop'd (by resetting the `utxo_ret` value) the "too many checks" flag - // should reset to false. + // should not yet reset to false. *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)); + assert!(network_graph.pending_checks.too_many_checks_pending()); + + // .. but it should once we called check_resolved_futures clearing the `pending_states`. + network_graph.pending_checks.check_resolved_futures(&network_graph); assert!(!network_graph.pending_checks.too_many_checks_pending()); } }