From 2a55a2676a688d8a8af4445568bdea6e6b699dae Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Tue, 17 Mar 2026 12:12:32 -0400 Subject: [PATCH 01/19] Split up some tests that have many variants Helps when debugging to know which variants failed. --- lightning/src/ln/monitor_tests.rs | 60 +++++++++++++++++++++++++++++-- lightning/src/ln/payment_tests.rs | 26 +++++++++++++- 2 files changed, 83 insertions(+), 3 deletions(-) diff --git a/lightning/src/ln/monitor_tests.rs b/lightning/src/ln/monitor_tests.rs index efd2084a38e..1a68e5165a8 100644 --- a/lightning/src/ln/monitor_tests.rs +++ b/lightning/src/ln/monitor_tests.rs @@ -3801,27 +3801,83 @@ fn do_test_lost_timeout_monitor_events(confirm_tx: CommitmentType, dust_htlcs: b } #[test] -fn test_lost_timeout_monitor_events() { +fn test_lost_timeout_monitor_events_a() { do_test_lost_timeout_monitor_events(CommitmentType::RevokedCounterparty, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_b() { do_test_lost_timeout_monitor_events(CommitmentType::RevokedCounterparty, true, false); +} +#[test] +fn test_lost_timeout_monitor_events_c() { do_test_lost_timeout_monitor_events(CommitmentType::PreviousCounterparty, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_d() { do_test_lost_timeout_monitor_events(CommitmentType::PreviousCounterparty, true, false); +} +#[test] +fn test_lost_timeout_monitor_events_e() { do_test_lost_timeout_monitor_events(CommitmentType::LatestCounterparty, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_f() { do_test_lost_timeout_monitor_events(CommitmentType::LatestCounterparty, true, false); +} +#[test] +fn test_lost_timeout_monitor_events_g() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithoutLastHTLC, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_h() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithoutLastHTLC, true, false); +} +#[test] +fn test_lost_timeout_monitor_events_i() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithLastHTLC, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_j() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithLastHTLC, true, false); - +} +#[test] +fn test_lost_timeout_monitor_events_k() { do_test_lost_timeout_monitor_events(CommitmentType::RevokedCounterparty, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_l() { do_test_lost_timeout_monitor_events(CommitmentType::RevokedCounterparty, true, true); +} +#[test] +fn test_lost_timeout_monitor_events_m() { do_test_lost_timeout_monitor_events(CommitmentType::PreviousCounterparty, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_n() { do_test_lost_timeout_monitor_events(CommitmentType::PreviousCounterparty, true, true); +} +#[test] +fn test_lost_timeout_monitor_events_o() { do_test_lost_timeout_monitor_events(CommitmentType::LatestCounterparty, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_p() { do_test_lost_timeout_monitor_events(CommitmentType::LatestCounterparty, true, true); +} +#[test] +fn test_lost_timeout_monitor_events_q() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithoutLastHTLC, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_r() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithoutLastHTLC, true, true); +} +#[test] +fn test_lost_timeout_monitor_events_s() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithLastHTLC, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_t() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithLastHTLC, true, true); } diff --git a/lightning/src/ln/payment_tests.rs b/lightning/src/ln/payment_tests.rs index 7d198d2d70d..d405567473d 100644 --- a/lightning/src/ln/payment_tests.rs +++ b/lightning/src/ln/payment_tests.rs @@ -1423,15 +1423,39 @@ fn do_test_dup_htlc_onchain_doesnt_fail_on_reload( } #[test] -fn test_dup_htlc_onchain_doesnt_fail_on_reload() { +fn test_dup_htlc_onchain_doesnt_fail_on_reload_a() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, true, true, true); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_b() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, true, true, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_c() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, true, false, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_d() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, false, true, true); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_e() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, false, true, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_f() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, false, false, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_g() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(false, false, true, true); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_h() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(false, false, true, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_i() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(false, false, false, false); } From 0430d7e9091eecff11592717f70a19399ace0905 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Thu, 19 Mar 2026 16:08:46 -0400 Subject: [PATCH 02/19] Remove unnecessary pending_monitor_events clone --- lightning/src/chain/channelmonitor.rs | 31 +++++++++++++-------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 1eb1484d07d..ec33db6be56 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -68,8 +68,8 @@ use crate::util::byte_utils; use crate::util::logger::{Logger, WithContext}; use crate::util::persist::MonitorName; use crate::util::ser::{ - MaybeReadable, Readable, ReadableArgs, RequiredWrapper, UpgradableRequired, Writeable, Writer, - U48, + Iterable, MaybeReadable, Readable, ReadableArgs, RequiredWrapper, UpgradableRequired, + Writeable, Writer, U48, }; #[allow(unused_imports)] @@ -1719,24 +1719,23 @@ pub(crate) fn write_chanmon_internal( channel_monitor.lockdown_from_offchain.write(writer)?; channel_monitor.holder_tx_signed.write(writer)?; - // If we have a `HolderForceClosedWithInfo` event, we need to write the `HolderForceClosed` for backwards compatibility. - let pending_monitor_events = - match channel_monitor.pending_monitor_events.iter().find(|ev| match ev { - MonitorEvent::HolderForceClosedWithInfo { .. } => true, - _ => false, - }) { - Some(MonitorEvent::HolderForceClosedWithInfo { outpoint, .. }) => { - let mut pending_monitor_events = channel_monitor.pending_monitor_events.clone(); - pending_monitor_events.push(MonitorEvent::HolderForceClosed(*outpoint)); - pending_monitor_events - }, - _ => channel_monitor.pending_monitor_events.clone(), - }; + // If we have a `HolderForceClosedWithInfo` event, we need to write the `HolderForceClosed` + // for backwards compatibility. + let holder_force_closed_compat = channel_monitor.pending_monitor_events.iter().find_map(|ev| { + if let MonitorEvent::HolderForceClosedWithInfo { outpoint, .. } = ev { + Some(MonitorEvent::HolderForceClosed(*outpoint)) + } else { + None + } + }); + let pending_monitor_events = Iterable( + channel_monitor.pending_monitor_events.iter().chain(holder_force_closed_compat.as_ref()), + ); write_tlv_fields!(writer, { (1, channel_monitor.funding_spend_confirmed, option), (3, channel_monitor.htlcs_resolved_on_chain, required_vec), - (5, pending_monitor_events, required_vec), + (5, pending_monitor_events, required), (7, channel_monitor.funding_spend_seen, required), (9, channel_monitor.counterparty_node_id, required), (11, channel_monitor.confirmed_commitment_tx_counterparty_output, option), From bf76c56384597fd15b42b2eaad1ebd90c48f13e1 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Tue, 17 Mar 2026 15:26:43 -0400 Subject: [PATCH 03/19] Add persistent_monitor_events flag to monitors/manager Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. As a first step towards this, here we persist a flag in the ChannelManager and ChannelMonitors indicating whether this new feature is enabled. It will be used in upcoming commits to maintain compatibility and create an upgrade/downgrade path between LDK versions. --- lightning/src/chain/channelmonitor.rs | 22 +++++++++++++++++++++ lightning/src/ln/channelmanager.rs | 28 +++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index ec33db6be56..42395205785 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -1281,6 +1281,13 @@ pub(crate) struct ChannelMonitorImpl { // block/transaction-connected events and *not* during block/transaction-disconnected events, // we further MUST NOT generate events during block/transaction-disconnection. pending_monitor_events: Vec, + /// When set, monitor events are retained until explicitly acked rather than cleared on read. + /// + /// Allows the ChannelManager to reconstruct pending HTLC state by replaying monitor events on + /// startup, rather than from complexly polling and reconciling Channel{Monitor} APIs, as well as + /// move the responsibility of off-chain payment resolution from the Channel to the monitor. Will + /// be always set once support is complete. + persistent_events_enabled: bool, pub(super) pending_events: Vec, pub(super) is_processing_pending_events: bool, @@ -1732,8 +1739,12 @@ pub(crate) fn write_chanmon_internal( channel_monitor.pending_monitor_events.iter().chain(holder_force_closed_compat.as_ref()), ); + // Only write `persistent_events_enabled` if it's set to true, as it's an even TLV. + let persistent_events_enabled = channel_monitor.persistent_events_enabled.then_some(true); + write_tlv_fields!(writer, { (1, channel_monitor.funding_spend_confirmed, option), + (2, persistent_events_enabled, option), (3, channel_monitor.htlcs_resolved_on_chain, required_vec), (5, pending_monitor_events, required), (7, channel_monitor.funding_spend_seen, required), @@ -1937,6 +1948,7 @@ impl ChannelMonitor { payment_preimages: new_hash_map(), pending_monitor_events: Vec::new(), + persistent_events_enabled: false, pending_events: Vec::new(), is_processing_pending_events: false, @@ -6693,8 +6705,10 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP let mut alternative_funding_confirmed = None; let mut is_manual_broadcast = RequiredWrapper(None); let mut funding_seen_onchain = RequiredWrapper(None); + let mut persistent_events_enabled = false; read_tlv_fields!(reader, { (1, funding_spend_confirmed, option), + (2, persistent_events_enabled, (default_value, false)), (3, htlcs_resolved_on_chain, optional_vec), (5, pending_monitor_events, optional_vec), (7, funding_spend_seen, option), @@ -6716,6 +6730,13 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP (35, is_manual_broadcast, (default_value, false)), (37, funding_seen_onchain, (default_value, true)), }); + + #[cfg(not(any(feature = "_test_utils", test)))] + if persistent_events_enabled { + // This feature isn't supported yet so error if the writer expected it to be. + return Err(DecodeError::InvalidValue) + } + // Note that `payment_preimages_with_info` was added (and is always written) in LDK 0.1, so // we can use it to determine if this monitor was last written by LDK 0.1 or later. let written_by_0_1_or_later = payment_preimages_with_info.is_some(); @@ -6864,6 +6885,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP payment_preimages, pending_monitor_events: pending_monitor_events.unwrap(), + persistent_events_enabled, pending_events, is_processing_pending_events: false, diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 30eb7f85d71..5d9ce18fff6 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -2952,6 +2952,15 @@ pub struct ChannelManager< /// offer they resolve to to the given one. pub testing_dnssec_proof_offer_resolution_override: Mutex>, + /// When set, monitors will repeatedly provide an event back to the `ChannelManager` on restart + /// until the event is explicitly acknowledged as processed. + /// + /// Allows us to reconstruct pending HTLC state by replaying monitor events on startup, rather + /// than from complexly polling and reconciling Channel{Monitor} APIs, as well as move the + /// responsibility of off-chain payment resolution from the Channel to the monitor. Will be + /// always set once support is complete. + persistent_monitor_events: bool, + #[cfg(test)] pub(super) entropy_source: ES, #[cfg(not(test))] @@ -3641,6 +3650,8 @@ impl< logger, + persistent_monitor_events: false, + #[cfg(feature = "_test_utils")] testing_dnssec_proof_offer_resolution_override: Mutex::new(new_hash_map()), } @@ -18189,6 +18200,9 @@ impl< } } + // Only write `persistent_events_enabled` if it's set to true, as it's an even TLV. + let persistent_monitor_events = self.persistent_monitor_events.then_some(true); + write_tlv_fields!(writer, { (1, pending_outbound_payments_no_retry, required), (2, pending_intercepted_htlcs, option), @@ -18201,6 +18215,7 @@ impl< (9, htlc_purposes, required_vec), (10, legacy_in_flight_monitor_updates, option), (11, self.probing_cookie_secret, required), + (12, persistent_monitor_events, option), (13, htlc_onion_fields, optional_vec), (14, decode_update_add_htlcs_opt, option), (15, self.inbound_payment_id_secret, required), @@ -18300,6 +18315,7 @@ pub(super) struct ChannelManagerData { forward_htlcs_legacy: HashMap>, pending_intercepted_htlcs_legacy: HashMap, decode_update_add_htlcs_legacy: HashMap>, + persistent_monitor_events: bool, // The `ChannelManager` version that was written. version: u8, } @@ -18485,6 +18501,7 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> let mut inbound_payment_id_secret = None; let mut peer_storage_dir: Option)>> = None; let mut async_receive_offer_cache: AsyncReceiveOfferCache = AsyncReceiveOfferCache::new(); + let mut persistent_monitor_events = false; read_tlv_fields!(reader, { (1, pending_outbound_payments_no_retry, option), (2, pending_intercepted_htlcs_legacy, option), @@ -18497,6 +18514,7 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> (9, claimable_htlc_purposes, optional_vec), (10, legacy_in_flight_monitor_updates, option), (11, probing_cookie_secret, option), + (12, persistent_monitor_events, (default_value, false)), (13, amountless_claimable_htlc_onion_fields, optional_vec), (14, decode_update_add_htlcs_legacy, option), (15, inbound_payment_id_secret, option), @@ -18505,6 +18523,12 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> (21, async_receive_offer_cache, (default_value, async_receive_offer_cache)), }); + #[cfg(not(any(feature = "_test_utils", test)))] + if persistent_monitor_events { + // This feature isn't supported yet so error if the writer expected it to be. + return Err(DecodeError::InvalidValue); + } + // Merge legacy pending_outbound_payments fields into a single HashMap. // Priority: pending_outbound_payments (TLV 3) > pending_outbound_payments_no_retry (TLV 1) // > pending_outbound_payments_compat (non-TLV legacy) @@ -18621,6 +18645,7 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> peer_storage_dir: peer_storage_dir.unwrap_or_default(), async_receive_offer_cache, version, + persistent_monitor_events, }) } } @@ -18922,6 +18947,7 @@ impl< mut in_flight_monitor_updates, peer_storage_dir, async_receive_offer_cache, + persistent_monitor_events, version: _version, } = data; @@ -20185,6 +20211,8 @@ impl< logger: args.logger, config: RwLock::new(args.config), + persistent_monitor_events, + #[cfg(feature = "_test_utils")] testing_dnssec_proof_offer_resolution_override: Mutex::new(new_hash_map()), }; From 16a291d5d62c04e9fce4df9b2792b489a0593424 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Wed, 18 Mar 2026 15:08:54 -0400 Subject: [PATCH 04/19] Add helper to push monitor events Cleans up the next commit --- lightning/src/chain/chainmonitor.rs | 53 +++++++-------------------- lightning/src/chain/channelmonitor.rs | 26 +++++++++---- 2 files changed, 32 insertions(+), 47 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 07d835dc785..8f54c1a85e8 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -367,9 +367,6 @@ pub struct ChainMonitor< fee_estimator: F, persister: P, _entropy_source: ES, - /// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly - /// from the user and not from a [`ChannelMonitor`]. - pending_monitor_events: Mutex, PublicKey)>>, /// The best block height seen, used as a proxy for the passage of time. highest_chain_height: AtomicUsize, @@ -437,7 +434,6 @@ where logger, fee_estimator: feeest, _entropy_source, - pending_monitor_events: Mutex::new(Vec::new()), highest_chain_height: AtomicUsize::new(0), event_notifier: Arc::clone(&event_notifier), persister: AsyncPersister { persister, event_notifier }, @@ -656,7 +652,6 @@ where fee_estimator: feeest, persister, _entropy_source, - pending_monitor_events: Mutex::new(Vec::new()), highest_chain_height: AtomicUsize::new(0), event_notifier: Arc::new(Notifier::new()), pending_send_only_events: Mutex::new(Vec::new()), @@ -801,16 +796,11 @@ where return Ok(()); } let funding_txo = monitor_data.monitor.get_funding_txo(); - self.pending_monitor_events.lock().unwrap().push(( + monitor_data.monitor.push_monitor_event(MonitorEvent::Completed { funding_txo, channel_id, - vec![MonitorEvent::Completed { - funding_txo, - channel_id, - monitor_update_id: monitor_data.monitor.get_latest_update_id(), - }], - monitor_data.monitor.get_counterparty_node_id(), - )); + monitor_update_id: monitor_data.monitor.get_latest_update_id(), + }); self.event_notifier.notify(); Ok(()) @@ -823,14 +813,11 @@ where pub fn force_channel_monitor_updated(&self, channel_id: ChannelId, monitor_update_id: u64) { let monitors = self.monitors.read().unwrap(); let monitor = &monitors.get(&channel_id).unwrap().monitor; - let counterparty_node_id = monitor.get_counterparty_node_id(); - let funding_txo = monitor.get_funding_txo(); - self.pending_monitor_events.lock().unwrap().push(( - funding_txo, + monitor.push_monitor_event(MonitorEvent::Completed { + funding_txo: monitor.get_funding_txo(), channel_id, - vec![MonitorEvent::Completed { funding_txo, channel_id, monitor_update_id }], - counterparty_node_id, - )); + monitor_update_id, + }); self.event_notifier.notify(); } @@ -1265,21 +1252,13 @@ where // The channel is post-close (funding spend seen, lockdown, or // holder tx signed). Return InProgress so ChannelManager freezes // the channel until the force-close MonitorEvents are processed. - // Push a Completed event into pending_monitor_events so it gets - // picked up after the per-monitor events in the next - // release_pending_monitor_events call. - let funding_txo = monitor.get_funding_txo(); - let channel_id = monitor.channel_id(); - self.pending_monitor_events.lock().unwrap().push(( - funding_txo, - channel_id, - vec![MonitorEvent::Completed { - funding_txo, - channel_id, - monitor_update_id: monitor.get_latest_update_id(), - }], - monitor.get_counterparty_node_id(), - )); + // Push a Completed event into the monitor so it gets picked up + // in the next release_pending_monitor_events call. + monitor.push_monitor_event(MonitorEvent::Completed { + funding_txo: monitor.get_funding_txo(), + channel_id: monitor.channel_id(), + monitor_update_id: monitor.get_latest_update_id(), + }); log_debug!( logger, "Deferring completion of ChannelMonitorUpdate id {:?} (channel is post-close)", @@ -1664,10 +1643,6 @@ where )); } } - // Drain pending_monitor_events (which includes deferred post-close - // completions) after per-monitor events so that force-close - // MonitorEvents are processed by ChannelManager first. - pending_monitor_events.extend(self.pending_monitor_events.lock().unwrap().split_off(0)); pending_monitor_events } } diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 42395205785..f3cd89b39b1 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -183,6 +183,10 @@ impl Readable for ChannelMonitorUpdate { } } +fn push_monitor_event(pending_monitor_events: &mut Vec, event: MonitorEvent) { + pending_monitor_events.push(event); +} + /// An event to be processed by the ChannelManager. #[derive(Clone, PartialEq, Eq)] pub enum MonitorEvent { @@ -226,8 +230,6 @@ pub enum MonitorEvent { }, } impl_writeable_tlv_based_enum_upgradable_legacy!(MonitorEvent, - // Note that Completed is currently never serialized to disk as it is generated only in - // ChainMonitor. (0, Completed) => { (0, funding_txo, required), (2, monitor_update_id, required), @@ -2166,6 +2168,10 @@ impl ChannelMonitor { self.inner.lock().unwrap().get_and_clear_pending_monitor_events() } + pub(super) fn push_monitor_event(&self, event: MonitorEvent) { + self.inner.lock().unwrap().push_monitor_event(event); + } + /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity. /// /// For channels featuring anchor outputs, this method will also process [`BumpTransaction`] @@ -3891,7 +3897,7 @@ impl ChannelMonitorImpl { outpoint: funding_outpoint, channel_id: self.channel_id, }; - self.pending_monitor_events.push(event); + push_monitor_event(&mut self.pending_monitor_events, event); } // Although we aren't signing the transaction directly here, the transaction will be signed @@ -4552,6 +4558,10 @@ impl ChannelMonitorImpl { &self.outputs_to_watch } + fn push_monitor_event(&mut self, event: MonitorEvent) { + push_monitor_event(&mut self.pending_monitor_events, event); + } + fn get_and_clear_pending_monitor_events(&mut self) -> Vec { let mut ret = Vec::new(); mem::swap(&mut ret, &mut self.pending_monitor_events); @@ -5611,7 +5621,7 @@ impl ChannelMonitorImpl { ); log_info!(logger, "Channel closed by funding output spend in txid {txid}"); if !self.funding_spend_seen { - self.pending_monitor_events.push(MonitorEvent::CommitmentTxConfirmed(())); + self.push_monitor_event(MonitorEvent::CommitmentTxConfirmed(())); } self.funding_spend_seen = true; @@ -5786,7 +5796,7 @@ impl ChannelMonitorImpl { log_debug!(logger, "HTLC {} failure update in {} has got enough confirmations to be passed upstream", &payment_hash, entry.txid); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { + self.push_monitor_event(MonitorEvent::HTLCEvent(HTLCUpdate { payment_hash, payment_preimage: None, source, @@ -5896,7 +5906,7 @@ impl ChannelMonitorImpl { log_error!(logger, "Failing back HTLC {} upstream to preserve the \ channel as the forward HTLC hasn't resolved and our backward HTLC \ expires soon at {}", log_bytes!(htlc.payment_hash.0), inbound_htlc_expiry); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { + push_monitor_event(&mut self.pending_monitor_events, MonitorEvent::HTLCEvent(HTLCUpdate { source: source.clone(), payment_preimage: None, payment_hash: htlc.payment_hash, @@ -6313,7 +6323,7 @@ impl ChannelMonitorImpl { }, }); self.counterparty_fulfilled_htlcs.insert(SentHTLCId::from_source(&source), payment_preimage); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { + push_monitor_event(&mut self.pending_monitor_events, MonitorEvent::HTLCEvent(HTLCUpdate { source, payment_preimage: Some(payment_preimage), payment_hash, @@ -6337,7 +6347,7 @@ impl ChannelMonitorImpl { }, }); self.counterparty_fulfilled_htlcs.insert(SentHTLCId::from_source(&source), payment_preimage); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { + push_monitor_event(&mut self.pending_monitor_events, MonitorEvent::HTLCEvent(HTLCUpdate { source, payment_preimage: Some(payment_preimage), payment_hash, From fe77aecfb3428ca2a339c5082dba8c9c128dcd60 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Wed, 18 Mar 2026 17:18:26 -0400 Subject: [PATCH 05/19] Rename pending_monitor_events to _legacy This field will be deprecated in upcoming commits when we start persisting MonitorEvent ids alongside the MonitorEvents. --- lightning/src/chain/channelmonitor.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index f3cd89b39b1..9315bab3533 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -1737,7 +1737,7 @@ pub(crate) fn write_chanmon_internal( None } }); - let pending_monitor_events = Iterable( + let pending_monitor_events_legacy = Iterable( channel_monitor.pending_monitor_events.iter().chain(holder_force_closed_compat.as_ref()), ); @@ -1748,7 +1748,7 @@ pub(crate) fn write_chanmon_internal( (1, channel_monitor.funding_spend_confirmed, option), (2, persistent_events_enabled, option), (3, channel_monitor.htlcs_resolved_on_chain, required_vec), - (5, pending_monitor_events, required), + (5, pending_monitor_events_legacy, required), (7, channel_monitor.funding_spend_seen, required), (9, channel_monitor.counterparty_node_id, required), (11, channel_monitor.confirmed_commitment_tx_counterparty_output, option), @@ -6645,16 +6645,16 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP } } - let pending_monitor_events_len: u64 = Readable::read(reader)?; - let mut pending_monitor_events = Some( - Vec::with_capacity(cmp::min(pending_monitor_events_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)))); - for _ in 0..pending_monitor_events_len { + let pending_monitor_events_legacy_len: u64 = Readable::read(reader)?; + let mut pending_monitor_events_legacy = Some( + Vec::with_capacity(cmp::min(pending_monitor_events_legacy_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)))); + for _ in 0..pending_monitor_events_legacy_len { let ev = match ::read(reader)? { 0 => MonitorEvent::HTLCEvent(Readable::read(reader)?), 1 => MonitorEvent::HolderForceClosed(outpoint), _ => return Err(DecodeError::InvalidValue) }; - pending_monitor_events.as_mut().unwrap().push(ev); + pending_monitor_events_legacy.as_mut().unwrap().push(ev); } let pending_events_len: u64 = Readable::read(reader)?; @@ -6720,7 +6720,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP (1, funding_spend_confirmed, option), (2, persistent_events_enabled, (default_value, false)), (3, htlcs_resolved_on_chain, optional_vec), - (5, pending_monitor_events, optional_vec), + (5, pending_monitor_events_legacy, optional_vec), (7, funding_spend_seen, option), (9, counterparty_node_id, option), (11, confirmed_commitment_tx_counterparty_output, option), @@ -6769,11 +6769,11 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP // `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. If we have both // events, we can remove the `HolderForceClosed` event and just keep the `HolderForceClosedWithInfo`. - if let Some(ref mut pending_monitor_events) = pending_monitor_events { - if pending_monitor_events.iter().any(|e| matches!(e, MonitorEvent::HolderForceClosed(_))) && - pending_monitor_events.iter().any(|e| matches!(e, MonitorEvent::HolderForceClosedWithInfo { .. })) + if let Some(ref mut evs) = pending_monitor_events_legacy { + if evs.iter().any(|e| matches!(e, MonitorEvent::HolderForceClosed(_))) && + evs.iter().any(|e| matches!(e, MonitorEvent::HolderForceClosedWithInfo { .. })) { - pending_monitor_events.retain(|e| !matches!(e, MonitorEvent::HolderForceClosed(_))); + evs.retain(|e| !matches!(e, MonitorEvent::HolderForceClosed(_))); } } @@ -6894,7 +6894,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP current_holder_commitment_number, payment_preimages, - pending_monitor_events: pending_monitor_events.unwrap(), + pending_monitor_events: pending_monitor_events_legacy.unwrap(), persistent_events_enabled, pending_events, is_processing_pending_events: false, From 12d72ed93243253c2ee69fabdbc18df6106e0c94 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Mon, 16 Mar 2026 13:37:38 -0400 Subject: [PATCH 06/19] Add chain::Watch ack_monitor_event API Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we add an as-yet-unused API to chain::Watch to allow the ChannelManager to tell the a ChannelMonitor that a MonitorEvent has been irrevocably processed and can be deleted. --- fuzz/src/chanmon_consistency.rs | 5 +++++ lightning/src/chain/chainmonitor.rs | 22 ++++++++++++++++++++++ lightning/src/chain/channelmonitor.rs | 5 +++++ lightning/src/chain/mod.rs | 14 ++++++++++++++ lightning/src/util/test_utils.rs | 6 +++++- 5 files changed, 51 insertions(+), 1 deletion(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index e4fd3475024..6ad1fa7c150 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -41,6 +41,7 @@ use lightning::chain; use lightning::chain::chaininterface::{ BroadcasterInterface, ConfirmationTarget, FeeEstimator, TransactionType, }; +use lightning::chain::chainmonitor::MonitorEventSource; use lightning::chain::channelmonitor::{ChannelMonitor, MonitorEvent}; use lightning::chain::transaction::OutPoint; use lightning::chain::{ @@ -366,6 +367,10 @@ impl chain::Watch for TestChainMonitor { ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { return self.chain_monitor.release_pending_monitor_events(); } + + fn ack_monitor_event(&self, source: MonitorEventSource) { + self.chain_monitor.ack_monitor_event(source); + } } struct KeyProvider { diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 8f54c1a85e8..946ddcc0571 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -67,6 +67,21 @@ use core::iter::Cycle; use core::ops::Deref; use core::sync::atomic::{AtomicUsize, Ordering}; +/// Identifies the source of a [`MonitorEvent`] for acknowledgment via +/// [`chain::Watch::ack_monitor_event`] once the event has been processed. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct MonitorEventSource { + /// The event ID assigned by the [`ChannelMonitor`]. + pub event_id: u64, + /// The channel from which the [`MonitorEvent`] originated. + pub channel_id: ChannelId, +} + +impl_writeable_tlv_based!(MonitorEventSource, { + (0, event_id, required), + (2, channel_id, required), +}); + /// A pending operation queued for later execution when `ChainMonitor` is in deferred mode. enum PendingMonitorOp { /// A new monitor to insert and persist. @@ -1645,6 +1660,13 @@ where } pending_monitor_events } + + fn ack_monitor_event(&self, source: MonitorEventSource) { + let monitors = self.monitors.read().unwrap(); + if let Some(monitor_state) = monitors.get(&source.channel_id) { + monitor_state.monitor.ack_monitor_event(source.event_id); + } + } } impl< diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 9315bab3533..7c104cbf1cd 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -2172,6 +2172,11 @@ impl ChannelMonitor { self.inner.lock().unwrap().push_monitor_event(event); } + /// Removes a [`MonitorEvent`] by its event ID, acknowledging that it has been processed. + pub(super) fn ack_monitor_event(&self, _event_id: u64) { + // TODO: once events have ids, remove the corresponding event here + } + /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity. /// /// For channels featuring anchor outputs, this method will also process [`BumpTransaction`] diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index 99e184d8fda..fd61d492509 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -18,6 +18,7 @@ use bitcoin::network::Network; use bitcoin::script::{Script, ScriptBuf}; use bitcoin::secp256k1::PublicKey; +use crate::chain::chainmonitor::MonitorEventSource; use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, MonitorEvent}; use crate::chain::transaction::{OutPoint, TransactionData}; use crate::ln::types::ChannelId; @@ -341,6 +342,15 @@ pub trait Watch { fn release_pending_monitor_events( &self, ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)>; + + /// Acknowledges and removes a [`MonitorEvent`] previously returned by + /// [`Watch::release_pending_monitor_events`] by its event ID. + /// + /// Once acknowledged, the event will no longer be returned by future calls to + /// [`Watch::release_pending_monitor_events`] and will not be replayed on restart. + /// + /// Events may be acknowledged in any order. + fn ack_monitor_event(&self, source: MonitorEventSource); } impl + ?Sized, W: Deref> @@ -363,6 +373,10 @@ impl + ?Sized, W: Der ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { self.deref().release_pending_monitor_events() } + + fn ack_monitor_event(&self, source: MonitorEventSource) { + self.deref().ack_monitor_event(source) + } } /// The `Filter` trait defines behavior for indicating chain activity of interest pertaining to diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index abcc24adf8d..6a68f42254e 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -15,7 +15,7 @@ use crate::chain::chaininterface; #[cfg(any(test, feature = "_externalize_tests"))] use crate::chain::chaininterface::FEERATE_FLOOR_SATS_PER_KW; use crate::chain::chaininterface::{ConfirmationTarget, TransactionType}; -use crate::chain::chainmonitor::{ChainMonitor, Persist}; +use crate::chain::chainmonitor::{ChainMonitor, MonitorEventSource, Persist}; use crate::chain::channelmonitor::{ ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, MonitorEvent, }; @@ -733,6 +733,10 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { } return self.chain_monitor.release_pending_monitor_events(); } + + fn ack_monitor_event(&self, source: MonitorEventSource) { + self.chain_monitor.ack_monitor_event(source); + } } #[cfg(any(test, feature = "_externalize_tests"))] From 3945038389c345e9db66720cd3539c6727f7e98e Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Wed, 18 Mar 2026 17:24:04 -0400 Subject: [PATCH 07/19] Add monitor event ids Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. To allow the ChannelManager to ack specific monitor events once they are resolved in upcoming commits, here we give each MonitorEvent a corresponding unique id. It's implemented in such a way that we can delete legacy monitor event code in the future when the new persistent monitor events flag is enabled by default. --- fuzz/src/chanmon_consistency.rs | 2 +- lightning/src/chain/chainmonitor.rs | 2 +- lightning/src/chain/channelmonitor.rs | 200 +++++++++++++----- lightning/src/chain/mod.rs | 8 +- lightning/src/ln/chanmon_update_fail_tests.rs | 6 +- lightning/src/ln/channelmanager.rs | 2 +- lightning/src/util/test_utils.rs | 6 +- 7 files changed, 161 insertions(+), 65 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 6ad1fa7c150..ef84b7ded3c 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -364,7 +364,7 @@ impl chain::Watch for TestChainMonitor { fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> { return self.chain_monitor.release_pending_monitor_events(); } diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 946ddcc0571..0e6fa06498b 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -1638,7 +1638,7 @@ where fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> { for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() { let _ = self.channel_monitor_updated(channel_id, update_id); } diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 7c104cbf1cd..cd8bde9662d 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -183,8 +183,13 @@ impl Readable for ChannelMonitorUpdate { } } -fn push_monitor_event(pending_monitor_events: &mut Vec, event: MonitorEvent) { - pending_monitor_events.push(event); +fn push_monitor_event( + pending_monitor_events: &mut Vec<(u64, MonitorEvent)>, event: MonitorEvent, + next_monitor_event_id: &mut u64, +) { + let id = *next_monitor_event_id; + *next_monitor_event_id += 1; + pending_monitor_events.push((id, event)); } /// An event to be processed by the ChannelManager. @@ -1282,7 +1287,7 @@ pub(crate) struct ChannelMonitorImpl { // Note that because the `event_lock` in `ChainMonitor` is only taken in // block/transaction-connected events and *not* during block/transaction-disconnected events, // we further MUST NOT generate events during block/transaction-disconnection. - pending_monitor_events: Vec, + pending_monitor_events: Vec<(u64, MonitorEvent)>, /// When set, monitor events are retained until explicitly acked rather than cleared on read. /// /// Allows the ChannelManager to reconstruct pending HTLC state by replaying monitor events on @@ -1290,6 +1295,7 @@ pub(crate) struct ChannelMonitorImpl { /// move the responsibility of off-chain payment resolution from the Channel to the monitor. Will /// be always set once support is complete. persistent_events_enabled: bool, + next_monitor_event_id: u64, pub(super) pending_events: Vec, pub(super) is_processing_pending_events: bool, @@ -1670,32 +1676,38 @@ pub(crate) fn write_chanmon_internal( writer.write_all(&payment_preimage.0[..])?; } - writer.write_all( - &(channel_monitor - .pending_monitor_events - .iter() - .filter(|ev| match ev { - MonitorEvent::HTLCEvent(_) => true, - MonitorEvent::HolderForceClosed(_) => true, - MonitorEvent::HolderForceClosedWithInfo { .. } => true, - _ => false, - }) - .count() as u64) - .to_be_bytes(), - )?; - for event in channel_monitor.pending_monitor_events.iter() { - match event { - MonitorEvent::HTLCEvent(upd) => { - 0u8.write(writer)?; - upd.write(writer)?; - }, - MonitorEvent::HolderForceClosed(_) => 1u8.write(writer)?, - // `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. To keep - // backwards compatibility, we write a `HolderForceClosed` event along with the - // `HolderForceClosedWithInfo` event. This is deduplicated in the reader. - MonitorEvent::HolderForceClosedWithInfo { .. } => 1u8.write(writer)?, - _ => {}, // Covered in the TLV writes below + if !channel_monitor.persistent_events_enabled { + writer.write_all( + &(channel_monitor + .pending_monitor_events + .iter() + .filter(|(_, ev)| match ev { + MonitorEvent::HTLCEvent(_) => true, + MonitorEvent::HolderForceClosed(_) => true, + MonitorEvent::HolderForceClosedWithInfo { .. } => true, + _ => false, + }) + .count() as u64) + .to_be_bytes(), + )?; + for (_, event) in channel_monitor.pending_monitor_events.iter() { + match event { + MonitorEvent::HTLCEvent(upd) => { + 0u8.write(writer)?; + upd.write(writer)?; + }, + MonitorEvent::HolderForceClosed(_) => 1u8.write(writer)?, + // `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. To keep + // backwards compatibility, we write a `HolderForceClosed` event along with the + // `HolderForceClosedWithInfo` event. This is deduplicated in the reader. + MonitorEvent::HolderForceClosedWithInfo { .. } => 1u8.write(writer)?, + _ => {}, // Covered in the TLV writes below + } } + } else { + // If `persistent_events_enabled` is set, we'll write the events with their event ids in the + // TLV section below. + writer.write_all(&(0u64).to_be_bytes())?; } writer.write_all(&(channel_monitor.pending_events.len() as u64).to_be_bytes())?; @@ -1730,25 +1742,40 @@ pub(crate) fn write_chanmon_internal( // If we have a `HolderForceClosedWithInfo` event, we need to write the `HolderForceClosed` // for backwards compatibility. - let holder_force_closed_compat = channel_monitor.pending_monitor_events.iter().find_map(|ev| { - if let MonitorEvent::HolderForceClosedWithInfo { outpoint, .. } = ev { - Some(MonitorEvent::HolderForceClosed(*outpoint)) - } else { - None - } - }); - let pending_monitor_events_legacy = Iterable( - channel_monitor.pending_monitor_events.iter().chain(holder_force_closed_compat.as_ref()), - ); + let holder_force_closed_compat = + channel_monitor.pending_monitor_events.iter().find_map(|(_, ev)| { + if let MonitorEvent::HolderForceClosedWithInfo { outpoint, .. } = ev { + Some(MonitorEvent::HolderForceClosed(*outpoint)) + } else { + None + } + }); + let pending_monitor_events_legacy = if !channel_monitor.persistent_events_enabled { + Some(Iterable( + channel_monitor + .pending_monitor_events + .iter() + .map(|(_, ev)| ev) + .chain(holder_force_closed_compat.as_ref()), + )) + } else { + None + }; // Only write `persistent_events_enabled` if it's set to true, as it's an even TLV. let persistent_events_enabled = channel_monitor.persistent_events_enabled.then_some(true); + let pending_mon_evs_with_ids = if persistent_events_enabled.is_some() { + Some(Iterable(channel_monitor.pending_monitor_events.iter())) + } else { + None + }; write_tlv_fields!(writer, { (1, channel_monitor.funding_spend_confirmed, option), (2, persistent_events_enabled, option), (3, channel_monitor.htlcs_resolved_on_chain, required_vec), - (5, pending_monitor_events_legacy, required), + (4, pending_mon_evs_with_ids, option), + (5, pending_monitor_events_legacy, option), (7, channel_monitor.funding_spend_seen, required), (9, channel_monitor.counterparty_node_id, required), (11, channel_monitor.confirmed_commitment_tx_counterparty_output, option), @@ -1767,6 +1794,7 @@ pub(crate) fn write_chanmon_internal( (34, channel_monitor.alternative_funding_confirmed, option), (35, channel_monitor.is_manual_broadcast, required), (37, channel_monitor.funding_seen_onchain, required), + (39, channel_monitor.next_monitor_event_id, required), }); Ok(()) @@ -1951,6 +1979,7 @@ impl ChannelMonitor { payment_preimages: new_hash_map(), pending_monitor_events: Vec::new(), persistent_events_enabled: false, + next_monitor_event_id: 0, pending_events: Vec::new(), is_processing_pending_events: false, @@ -2164,7 +2193,7 @@ impl ChannelMonitor { /// Get the list of HTLCs who's status has been updated on chain. This should be called by /// ChannelManager via [`chain::Watch::release_pending_monitor_events`]. - pub fn get_and_clear_pending_monitor_events(&self) -> Vec { + pub fn get_and_clear_pending_monitor_events(&self) -> Vec<(u64, MonitorEvent)> { self.inner.lock().unwrap().get_and_clear_pending_monitor_events() } @@ -2177,6 +2206,20 @@ impl ChannelMonitor { // TODO: once events have ids, remove the corresponding event here } + /// Copies [`MonitorEvent`] state from `other` into `self`. + /// Used in tests to align transient runtime state before equality comparison after a + /// serialization round-trip. + #[cfg(any(test, feature = "_test_utils"))] + pub fn copy_monitor_event_state(&self, other: &ChannelMonitor) { + let (pending, next_id) = { + let other_inner = other.inner.lock().unwrap(); + (other_inner.pending_monitor_events.clone(), other_inner.next_monitor_event_id) + }; + let mut self_inner = self.inner.lock().unwrap(); + self_inner.pending_monitor_events = pending; + self_inner.next_monitor_event_id = next_id; + } + /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity. /// /// For channels featuring anchor outputs, this method will also process [`BumpTransaction`] @@ -3902,7 +3945,7 @@ impl ChannelMonitorImpl { outpoint: funding_outpoint, channel_id: self.channel_id, }; - push_monitor_event(&mut self.pending_monitor_events, event); + push_monitor_event(&mut self.pending_monitor_events, event, &mut self.next_monitor_event_id); } // Although we aren't signing the transaction directly here, the transaction will be signed @@ -4493,12 +4536,16 @@ impl ChannelMonitorImpl { "Failing HTLC from late counterparty commitment update immediately \ (funding spend already confirmed)" ); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { - payment_hash, - payment_preimage: None, - source: source.clone(), - htlc_value_satoshis, - })); + push_monitor_event( + &mut self.pending_monitor_events, + MonitorEvent::HTLCEvent(HTLCUpdate { + payment_hash, + payment_preimage: None, + source: source.clone(), + htlc_value_satoshis, + }), + &mut self.next_monitor_event_id, + ); self.htlcs_resolved_on_chain.push(IrrevocablyResolvedHTLC { commitment_tx_output_idx: None, resolving_txid: Some(confirmed_txid), @@ -4564,10 +4611,14 @@ impl ChannelMonitorImpl { } fn push_monitor_event(&mut self, event: MonitorEvent) { - push_monitor_event(&mut self.pending_monitor_events, event); + push_monitor_event( + &mut self.pending_monitor_events, + event, + &mut self.next_monitor_event_id, + ); } - fn get_and_clear_pending_monitor_events(&mut self) -> Vec { + fn get_and_clear_pending_monitor_events(&mut self) -> Vec<(u64, MonitorEvent)> { let mut ret = Vec::new(); mem::swap(&mut ret, &mut self.pending_monitor_events); ret @@ -5898,7 +5949,7 @@ impl ChannelMonitorImpl { continue; } let duplicate_event = self.pending_monitor_events.iter().any( - |update| if let &MonitorEvent::HTLCEvent(ref upd) = update { + |(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == *source } else { false }); if duplicate_event { @@ -5916,7 +5967,7 @@ impl ChannelMonitorImpl { payment_preimage: None, payment_hash: htlc.payment_hash, htlc_value_satoshis: Some(htlc.amount_msat / 1000), - })); + }), &mut self.next_monitor_event_id); } } } @@ -6315,7 +6366,7 @@ impl ChannelMonitorImpl { if let Some((source, payment_hash, amount_msat)) = payment_data { if accepted_preimage_claim { if !self.pending_monitor_events.iter().any( - |update| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) { + |(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) { self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry { txid: tx.compute_txid(), height, @@ -6333,11 +6384,11 @@ impl ChannelMonitorImpl { payment_preimage: Some(payment_preimage), payment_hash, htlc_value_satoshis: Some(amount_msat / 1000), - })); + }), &mut self.next_monitor_event_id); } } else if offered_preimage_claim { if !self.pending_monitor_events.iter().any( - |update| if let &MonitorEvent::HTLCEvent(ref upd) = update { + |(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) { self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry { @@ -6357,7 +6408,7 @@ impl ChannelMonitorImpl { payment_preimage: Some(payment_preimage), payment_hash, htlc_value_satoshis: Some(amount_msat / 1000), - })); + }), &mut self.next_monitor_event_id); } } else { self.onchain_events_awaiting_threshold_conf.retain(|ref entry| { @@ -6721,10 +6772,13 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP let mut is_manual_broadcast = RequiredWrapper(None); let mut funding_seen_onchain = RequiredWrapper(None); let mut persistent_events_enabled = false; + let mut next_monitor_event_id: Option = None; + let mut pending_mon_evs_with_ids: Option> = None; read_tlv_fields!(reader, { (1, funding_spend_confirmed, option), (2, persistent_events_enabled, (default_value, false)), (3, htlcs_resolved_on_chain, optional_vec), + (4, pending_mon_evs_with_ids, optional_vec), (5, pending_monitor_events_legacy, optional_vec), (7, funding_spend_seen, option), (9, counterparty_node_id, option), @@ -6744,6 +6798,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP (34, alternative_funding_confirmed, option), (35, is_manual_broadcast, (default_value, false)), (37, funding_seen_onchain, (default_value, true)), + (39, next_monitor_event_id, option), }); #[cfg(not(any(feature = "_test_utils", test)))] @@ -6782,6 +6837,22 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP } } + // If persistent events are enabled, use the events with their persisted IDs from TLV 4. + // Otherwise, use the legacy events from TLV 5 and assign sequential IDs. + let (next_monitor_event_id, pending_monitor_events): (u64, Vec<(u64, MonitorEvent)>) = + if persistent_events_enabled { + let evs = pending_mon_evs_with_ids.unwrap_or_default() + .into_iter().map(|ev| (ev.0, ev.1)).collect(); + (next_monitor_event_id.unwrap_or(0), evs) + } else if let Some(events) = pending_monitor_events_legacy { + let next_id = next_monitor_event_id.unwrap_or(events.len() as u64); + let evs = events.into_iter().enumerate() + .map(|(i, ev)| (i as u64, ev)).collect(); + (next_id, evs) + } else { + (next_monitor_event_id.unwrap_or(0), Vec::new()) + }; + let channel_parameters = channel_parameters.unwrap_or_else(|| { onchain_tx_handler.channel_parameters().clone() }); @@ -6899,8 +6970,9 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP current_holder_commitment_number, payment_preimages, - pending_monitor_events: pending_monitor_events_legacy.unwrap(), + pending_monitor_events, persistent_events_enabled, + next_monitor_event_id, pending_events, is_processing_pending_events: false, @@ -6949,6 +7021,22 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP } } +/// Deserialization wrapper for reading a `(u64, MonitorEvent)`. +/// Necessary because we can't deserialize a (Readable, MaybeReadable) tuple due to trait +/// conflicts. +struct ReadableIdMonitorEvent(u64, MonitorEvent); + +impl MaybeReadable for ReadableIdMonitorEvent { + fn read(reader: &mut R) -> Result, DecodeError> { + let id: u64 = Readable::read(reader)?; + let event_opt: Option = MaybeReadable::read(reader)?; + match event_opt { + Some(ev) => Ok(Some(ReadableIdMonitorEvent(id, ev))), + None => Ok(None), + } + } +} + #[cfg(test)] pub(super) fn dummy_monitor( channel_id: ChannelId, wrap_signer: impl FnOnce(crate::sign::InMemorySigner) -> S, diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index fd61d492509..9d61bf5aa10 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -333,6 +333,10 @@ pub trait Watch { /// Returns any monitor events since the last call. Subsequent calls must only return new /// events. /// + /// Each event comes with a corresponding id. Once the event is processed, call + /// [`Watch::ack_monitor_event`] with the corresponding id and channel id. Unacknowledged events + /// will be re-provided by this method after startup. + /// /// Note that after any block- or transaction-connection calls to a [`ChannelMonitor`], no /// further events may be returned here until the [`ChannelMonitor`] has been fully persisted /// to disk. @@ -341,7 +345,7 @@ pub trait Watch { /// [`MonitorEvent::Completed`] here, see [`ChannelMonitorUpdateStatus::InProgress`]. fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)>; + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)>; /// Acknowledges and removes a [`MonitorEvent`] previously returned by /// [`Watch::release_pending_monitor_events`] by its event ID. @@ -370,7 +374,7 @@ impl + ?Sized, W: Der fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> { self.deref().release_pending_monitor_events() } diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 0d8a4a020f0..30507928aba 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -5004,7 +5004,7 @@ fn native_async_persist() { let completed_persist = async_chain_monitor.release_pending_monitor_events(); assert_eq!(completed_persist.len(), 1); assert_eq!(completed_persist[0].2.len(), 1); - assert!(matches!(completed_persist[0].2[0], MonitorEvent::Completed { .. })); + assert!(matches!(completed_persist[0].2[0].1, MonitorEvent::Completed { .. })); // Now test two async `ChannelMonitorUpdate`s in flight at once, completing them in-order but // separately. @@ -5052,7 +5052,7 @@ fn native_async_persist() { let completed_persist = async_chain_monitor.release_pending_monitor_events(); assert_eq!(completed_persist.len(), 1); assert_eq!(completed_persist[0].2.len(), 1); - assert!(matches!(completed_persist[0].2[0], MonitorEvent::Completed { .. })); + assert!(matches!(completed_persist[0].2[0].1, MonitorEvent::Completed { .. })); // Finally, test two async `ChanelMonitorUpdate`s in flight at once, completing them // out-of-order and ensuring that no `MonitorEvent::Completed` is generated until they are both @@ -5098,7 +5098,7 @@ fn native_async_persist() { let completed_persist = async_chain_monitor.release_pending_monitor_events(); assert_eq!(completed_persist.len(), 1); assert_eq!(completed_persist[0].2.len(), 1); - if let MonitorEvent::Completed { monitor_update_id, .. } = &completed_persist[0].2[0] { + if let (_, MonitorEvent::Completed { monitor_update_id, .. }) = &completed_persist[0].2[0] { assert_eq!(*monitor_update_id, 4); } else { panic!(); diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 5d9ce18fff6..4d2b3f82e12 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -13493,7 +13493,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ for (funding_outpoint, channel_id, mut monitor_events, counterparty_node_id) in pending_monitor_events.drain(..) { - for monitor_event in monitor_events.drain(..) { + for (_event_id, monitor_event) in monitor_events.drain(..) { match monitor_event { MonitorEvent::HTLCEvent(htlc_update) => { let logger = WithContext::from( diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 6a68f42254e..9ad9aec8bf0 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -648,6 +648,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { ) .unwrap() .1; + new_monitor.copy_monitor_event_state(&monitor); assert!(new_monitor == monitor); self.latest_monitor_update_id .lock() @@ -709,6 +710,9 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { // it so it doesn't leak into the rest of the test. let failed_back = monitor.inner.lock().unwrap().failed_back_htlc_ids.clone(); new_monitor.inner.lock().unwrap().failed_back_htlc_ids = failed_back; + // The deserialized monitor will reset the monitor event state, so copy it from the live + // monitor before comparing. + new_monitor.copy_monitor_event_state(&monitor); if let Some(chan_id) = self.expect_monitor_round_trip_fail.lock().unwrap().take() { assert_eq!(chan_id, channel_id); assert!(new_monitor != *monitor); @@ -722,7 +726,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> { // Auto-flush pending operations so that the ChannelManager can pick up monitor // completion events. When not in deferred mode the queue is empty so this only // costs a lock acquisition. It ensures standard test helpers (route_payment, etc.) From 5baeb9c77257ee5417c70b3a0f802bac90255fdc Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Mon, 16 Mar 2026 16:42:19 -0400 Subject: [PATCH 08/19] Add MonitorUpdateCompletionAction::AckMonitorEvents Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we add a MonitorUpdateCompletionAction that will allow the ChannelManager to tell a monitor that an event is complete, upon completion of a particular ChannelMonitorUpdate. For example, this will be used when an HTLC is irrevocably failed backwards post-channel-close, to delete the corresponding MonitorEvent::HTLCEvent. --- lightning/src/ln/channelmanager.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 4d2b3f82e12..8eae02d725e 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -42,6 +42,7 @@ use crate::chain::chaininterface::{ BroadcasterInterface, ConfirmationTarget, FeeEstimator, LowerBoundedFeeEstimator, TransactionType, }; +use crate::chain::chainmonitor::MonitorEventSource; use crate::chain::channelmonitor::{ ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, MonitorEvent, WithChannelMonitor, ANTI_REORG_DELAY, CLTV_CLAIM_BUFFER, HTLC_FAIL_BACK_BUFFER, @@ -1492,6 +1493,10 @@ pub(crate) enum MonitorUpdateCompletionAction { blocking_action: RAAMonitorUpdateBlockingAction, downstream_channel_id: ChannelId, }, + /// Indicates that one or more [`MonitorEvent`]s should be acknowledged via + /// [`chain::Watch::ack_monitor_event`] once the associated [`ChannelMonitorUpdate`] has been + /// durably persisted. + AckMonitorEvents { monitor_events_to_ack: Vec }, } impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, @@ -1513,6 +1518,9 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, (0, event, upgradable_option), (1, downstream_counterparty_and_funding_outpoint, upgradable_required), }, + (3, AckMonitorEvents) => { + (0, monitor_events_to_ack, required_vec), + }, ); /// Result of attempting to resume a channel after a monitor update completes while locks are held. @@ -10345,6 +10353,11 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ Some(blocking_action), ); }, + MonitorUpdateCompletionAction::AckMonitorEvents { monitor_events_to_ack } => { + for source in monitor_events_to_ack { + self.chain_monitor.ack_monitor_event(source); + } + }, } } From 66da610c4b03ed4c6895d79742b6d8220863cc59 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Mon, 16 Mar 2026 13:45:21 -0400 Subject: [PATCH 09/19] Ack all monitor events besides HTLCUpdate Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we take care of the monitor events that are trivial to ACK, since all except HTLCEvents can be ACK'd immediately after the event is handled and don't need to be re-processed on startup. --- lightning/src/ln/channelmanager.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 8eae02d725e..1895c2c5b18 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -13506,7 +13506,8 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ for (funding_outpoint, channel_id, mut monitor_events, counterparty_node_id) in pending_monitor_events.drain(..) { - for (_event_id, monitor_event) in monitor_events.drain(..) { + for (event_id, monitor_event) in monitor_events.drain(..) { + let monitor_event_source = MonitorEventSource { event_id, channel_id }; match monitor_event { MonitorEvent::HTLCEvent(htlc_update) => { let logger = WithContext::from( @@ -13589,6 +13590,9 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ failed_channels.push((Err(e), counterparty_node_id)); } } + // Channel close monitor events do not need to be replayed on startup because we + // already check the monitors to see if the channel is closed. + self.chain_monitor.ack_monitor_event(monitor_event_source); }, MonitorEvent::CommitmentTxConfirmed(_) => { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -13610,6 +13614,9 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ failed_channels.push((Err(e), counterparty_node_id)); } } + // Channel close monitor events do not need to be replayed on startup because we + // already check the monitors to see if the channel is closed. + self.chain_monitor.ack_monitor_event(monitor_event_source); }, MonitorEvent::Completed { channel_id, monitor_update_id, .. } => { self.channel_monitor_updated( @@ -13617,6 +13624,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ Some(monitor_update_id), &counterparty_node_id, ); + self.chain_monitor.ack_monitor_event(monitor_event_source); }, } } From 6a12b20f94710b48d0b02b7b63177175856bd573 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Mon, 16 Mar 2026 14:44:59 -0400 Subject: [PATCH 10/19] Fix replay of monitor events for outbounds Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. If an outbound payment resolved on-chain and we get a monitor event for it, we want to mark that monitor event as resolved once the user has processed a PaymentSent or a PaymentFailed event. --- lightning/src/ln/channelmanager.rs | 33 +++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 1895c2c5b18..d1351455dec 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1545,12 +1545,29 @@ enum PostMonitorUpdateChanResume { }, } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, Eq)] pub(crate) struct PaymentCompleteUpdate { counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, channel_id: ChannelId, htlc_id: SentHTLCId, + /// The id of the [`MonitorEvent`] that triggered this update. + /// + /// Used when the [`Event`] corresponding to this update's [`EventCompletionAction`] has been + /// processed by the user, to remove the [`MonitorEvent`] from the [`ChannelMonitor`] and prevent + /// it from being re-provided on startup. + monitor_event_id: Option, +} + +impl PartialEq for PaymentCompleteUpdate { + fn eq(&self, other: &Self) -> bool { + self.counterparty_node_id == other.counterparty_node_id + && self.channel_funding_outpoint == other.channel_funding_outpoint + && self.channel_id == other.channel_id + && self.htlc_id == other.htlc_id + // monitor_event_id is excluded: it's metadata about the event source, + // not part of the semantic identity of the payment resolution. + } } impl_writeable_tlv_based!(PaymentCompleteUpdate, { @@ -1558,6 +1575,7 @@ impl_writeable_tlv_based!(PaymentCompleteUpdate, { (3, counterparty_node_id, required), (5, channel_id, required), (7, htlc_id, required), + (9, monitor_event_id, option), }); #[derive(Clone, Debug, PartialEq, Eq)] @@ -9965,6 +9983,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ next_channel_counterparty_node_id: PublicKey, next_channel_outpoint: OutPoint, next_channel_id: ChannelId, next_user_channel_id: Option, attribution_data: Option, send_timestamp: Option, + monitor_event_id: Option, ) { let startup_replay = !self.background_events_processed_since_startup.load(Ordering::Acquire); @@ -9983,6 +10002,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ channel_funding_outpoint: next_channel_outpoint, channel_id: next_channel_id, htlc_id, + monitor_event_id, }; Some(EventCompletionAction::ReleasePaymentCompleteChannelMonitorUpdate(release)) } else { @@ -12611,6 +12631,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ Some(next_user_channel_id), msg.attribution_data, send_timestamp, + None, ); Ok(()) @@ -13536,6 +13557,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ None, None, None, + Some(event_id), ); } else { log_trace!(logger, "Failing HTLC from our monitor"); @@ -13548,6 +13570,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ channel_funding_outpoint: funding_outpoint, channel_id, htlc_id: SentHTLCId::from_source(&htlc_update.source), + monitor_event_id: Some(event_id), }); self.fail_htlc_backwards_internal( &htlc_update.source, @@ -15325,8 +15348,13 @@ impl< channel_funding_outpoint, channel_id, htlc_id, + monitor_event_id, }, ) => { + if let Some(event_id) = monitor_event_id { + self.chain_monitor + .ack_monitor_event(MonitorEventSource { channel_id, event_id }); + } let per_peer_state = self.per_peer_state.read().unwrap(); let mut peer_state_lock = per_peer_state .get(&counterparty_node_id) @@ -19730,6 +19758,7 @@ impl< channel_funding_outpoint: monitor.get_funding_txo(), channel_id: monitor.channel_id(), htlc_id, + monitor_event_id: None, }; let mut compl_action = Some( EventCompletionAction::ReleasePaymentCompleteChannelMonitorUpdate(update) @@ -19809,6 +19838,7 @@ impl< channel_funding_outpoint: monitor.get_funding_txo(), channel_id: monitor.channel_id(), htlc_id: SentHTLCId::from_source(&htlc_source), + monitor_event_id: None, }); failed_htlcs.push(( @@ -20566,6 +20596,7 @@ impl< downstream_user_channel_id, None, None, + None, ); } From 1ac4e6859b12758c246720f279094ce0fa854f57 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Mon, 16 Mar 2026 15:05:54 -0400 Subject: [PATCH 11/19] Fix replay of preimage monitor events for fwds Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. If the outbound edge of a forwarded payment is resolved on-chain via preimage, and we get a monitor event for it, we want to mark that monitor event resolved once the preimage is durably persisted in the inbound edge. Hence, we add the monitor event resolution to the completion of the inbound edge's preimage monitor update. --- lightning/src/ln/channelmanager.rs | 38 ++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index d1351455dec..f66446cab1f 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1475,6 +1475,9 @@ pub(crate) enum MonitorUpdateCompletionAction { EmitEventOptionAndFreeOtherChannel { event: Option, downstream_counterparty_and_funding_outpoint: EventUnblockedChannel, + /// If this action originated from a [`MonitorEvent`], the source to acknowledge once the + /// action is handled. + monitor_event_source: Option, }, /// Indicates we should immediately resume the operation of another channel, unless there is /// some other reason why the channel is blocked. In practice this simply means immediately @@ -1492,6 +1495,9 @@ pub(crate) enum MonitorUpdateCompletionAction { downstream_counterparty_node_id: PublicKey, blocking_action: RAAMonitorUpdateBlockingAction, downstream_channel_id: ChannelId, + /// If this action originated from a [`MonitorEvent`], the source to acknowledge once the + /// action is handled. + monitor_event_source: Option, }, /// Indicates that one or more [`MonitorEvent`]s should be acknowledged via /// [`chain::Watch::ack_monitor_event`] once the associated [`ChannelMonitorUpdate`] has been @@ -1510,6 +1516,7 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, (0, downstream_counterparty_node_id, required), (4, blocking_action, upgradable_required), (5, downstream_channel_id, required), + (7, monitor_event_source, option), }, (2, EmitEventOptionAndFreeOtherChannel) => { // LDK prior to 0.3 required this field. It will not be present for trampoline payments @@ -1517,6 +1524,7 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, // are in the process of being resolved. (0, event, upgradable_option), (1, downstream_counterparty_and_funding_outpoint, upgradable_required), + (3, monitor_event_source, option), }, (3, AckMonitorEvents) => { (0, monitor_events_to_ack, required_vec), @@ -9535,6 +9543,7 @@ impl< startup_replay: bool, next_channel_counterparty_node_id: PublicKey, next_channel_outpoint: OutPoint, next_channel_id: ChannelId, hop_data: HTLCPreviousHopData, attribution_data: Option, send_timestamp: Option, + monitor_event_source: Option, ) { let _prev_channel_id = hop_data.channel_id; let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data); @@ -9624,6 +9633,7 @@ impl< downstream_counterparty_node_id: chan_to_release.counterparty_node_id, downstream_channel_id: chan_to_release.channel_id, blocking_action: chan_to_release.blocking_action, + monitor_event_source, }), None, ) @@ -9639,6 +9649,7 @@ impl< Some(MonitorUpdateCompletionAction::EmitEventOptionAndFreeOtherChannel { event, downstream_counterparty_and_funding_outpoint: chan_to_release, + monitor_event_source, }), None, ) @@ -9823,8 +9834,12 @@ impl< downstream_counterparty_node_id: node_id, blocking_action: blocker, downstream_channel_id: channel_id, + monitor_event_source, } = action { + if let Some(source) = monitor_event_source { + self.chain_monitor.ack_monitor_event(source); + } if let Some(peer_state_mtx) = per_peer_state.get(&node_id) { let mut peer_state = peer_state_mtx.lock().unwrap(); if let Some(blockers) = peer_state @@ -10083,6 +10098,8 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ hop_data, attribution_data, send_timestamp, + monitor_event_id + .map(|id| MonitorEventSource { event_id: id, channel_id: next_channel_id }), ); }, HTLCSource::TrampolineForward { previous_hop_data, .. } => { @@ -10126,6 +10143,19 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ current_previous_hop_data, attribution_data.clone(), send_timestamp, + // Only pass the monitor event ID for the first hop. Once one + // inbound channel durably has the preimage, the outbound + // monitor event can be acked. The preimage remains in the + // outbound monitor's storage and will be replayed to all + // remaining inbound hops on restart via pending_claims_to_replay. + if i == 0 { + monitor_event_id.map(|id| MonitorEventSource { + event_id: id, + channel_id: next_channel_id, + }) + } else { + None + }, ); } }, @@ -10352,7 +10382,11 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ MonitorUpdateCompletionAction::EmitEventOptionAndFreeOtherChannel { event, downstream_counterparty_and_funding_outpoint, + monitor_event_source, } => { + if let Some(source) = monitor_event_source { + self.chain_monitor.ack_monitor_event(source); + } if let Some(event) = event { self.pending_events.lock().unwrap().push_back((event, None)); } @@ -10366,7 +10400,11 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ downstream_counterparty_node_id, downstream_channel_id, blocking_action, + monitor_event_source, } => { + if let Some(source) = monitor_event_source { + self.chain_monitor.ack_monitor_event(source); + } self.handle_monitor_update_release( downstream_counterparty_node_id, downstream_channel_id, From 6778698a9c3ce65236046954aef1e52bd5216441 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Mon, 16 Mar 2026 16:03:46 -0400 Subject: [PATCH 12/19] Track monitor event id in HTLCForwardInfo::Fail* Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. To ensure we can resolve HTLC monitor events for forward failures, we need to pipe the event id through the HTLC failure pipeline, starting from when we first handle the monitor event and call fail_htlc_backwards. To get it from there to the next stage, here we store the monitor event id in the manager's HTLCForwardInfo::Fail*. In upcoming commits we will eventually get to the point of acking the monitor event when the HTLC is irrevocably removed from the inbound edge via monitor update. --- lightning/src/ln/channelmanager.rs | 61 +++++++++++++++++++++++++----- 1 file changed, 52 insertions(+), 9 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index f66446cab1f..b01d4bc5f17 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -497,8 +497,21 @@ impl PendingAddHTLCInfo { #[cfg_attr(test, derive(Clone, Debug, PartialEq))] pub(super) enum HTLCForwardInfo { AddHTLC(PendingAddHTLCInfo), - FailHTLC { htlc_id: u64, err_packet: msgs::OnionErrorPacket }, - FailMalformedHTLC { htlc_id: u64, failure_code: u16, sha256_of_onion: [u8; 32] }, + FailHTLC { + htlc_id: u64, + err_packet: msgs::OnionErrorPacket, + /// A pointer to a [`MonitorEvent`] that can be removed from the outbound edge of this failed + /// forward, once the failure is durably persisted on the inbound edge. + monitor_event_source: Option, + }, + FailMalformedHTLC { + htlc_id: u64, + failure_code: u16, + sha256_of_onion: [u8; 32], + /// A pointer to a [`MonitorEvent`] that can be removed from the outbound edge of this failed + /// forward, once the failure is durably persisted on the inbound edge. + monitor_event_source: Option, + }, } /// Whether this blinded HTLC is being failed backwards by the introduction node or a blinded node, @@ -7626,12 +7639,14 @@ impl< HTLCFailureMsg::Relay(fail_htlc) => HTLCForwardInfo::FailHTLC { htlc_id: fail_htlc.htlc_id, err_packet: fail_htlc.into(), + monitor_event_source: None, }, HTLCFailureMsg::Malformed(fail_malformed_htlc) => { HTLCForwardInfo::FailMalformedHTLC { htlc_id: fail_malformed_htlc.htlc_id, sha256_of_onion: fail_malformed_htlc.sha256_of_onion, failure_code: fail_malformed_htlc.failure_code.into(), + monitor_event_source: None, } }, }; @@ -8161,7 +8176,7 @@ impl< } None }, - HTLCForwardInfo::FailHTLC { htlc_id, ref err_packet } => { + HTLCForwardInfo::FailHTLC { htlc_id, ref err_packet, .. } => { if let Some(chan) = peer_state .channel_by_id .get_mut(&forward_chan_id) @@ -8181,7 +8196,12 @@ impl< break; } }, - HTLCForwardInfo::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion } => { + HTLCForwardInfo::FailMalformedHTLC { + htlc_id, + failure_code, + sha256_of_onion, + .. + } => { if let Some(chan) = peer_state .channel_by_id .get_mut(&forward_chan_id) @@ -9219,6 +9239,10 @@ impl< onion_error ); + let monitor_event_source = from_monitor_update_completion.as_ref().and_then(|u| { + u.monitor_event_id + .map(|id| MonitorEventSource { event_id: id, channel_id: u.channel_id }) + }); push_forward_htlcs_failure( *prev_outbound_scid_alias, get_htlc_forward_failure( @@ -9228,6 +9252,7 @@ impl< trampoline_shared_secret, phantom_shared_secret, *htlc_id, + monitor_event_source, ), ); @@ -9263,7 +9288,12 @@ impl< // necessarily want to fail all of our incoming HTLCs back yet. We may have other // outgoing HTLCs that need to resolve first. This will be tracked in our // pending_outbound_payments in a followup. - for current_hop_data in previous_hop_data { + let trampoline_monitor_event_source = + from_monitor_update_completion.as_ref().and_then(|u| { + u.monitor_event_id + .map(|id| MonitorEventSource { event_id: id, channel_id: u.channel_id }) + }); + for (i, current_hop_data) in previous_hop_data.iter().enumerate() { let HTLCPreviousHopData { prev_outbound_scid_alias, htlc_id, @@ -9290,6 +9320,7 @@ impl< &incoming_trampoline_shared_secret, &None, *htlc_id, + if i == 0 { trampoline_monitor_event_source } else { None }, ), ); } @@ -14270,6 +14301,7 @@ fn get_htlc_forward_failure( blinded_failure: &Option, onion_error: &HTLCFailReason, incoming_packet_shared_secret: &[u8; 32], trampoline_shared_secret: &Option<[u8; 32]>, phantom_shared_secret: &Option<[u8; 32]>, htlc_id: u64, + monitor_event_source: Option, ) -> HTLCForwardInfo { // TODO: Correctly wrap the error packet twice if failing back a trampoline + phantom HTLC. let secondary_shared_secret = trampoline_shared_secret.or(*phantom_shared_secret); @@ -14281,19 +14313,20 @@ fn get_htlc_forward_failure( incoming_packet_shared_secret, &secondary_shared_secret, ); - HTLCForwardInfo::FailHTLC { htlc_id, err_packet } + HTLCForwardInfo::FailHTLC { htlc_id, err_packet, monitor_event_source } }, Some(BlindedFailure::FromBlindedNode) => HTLCForwardInfo::FailMalformedHTLC { htlc_id, failure_code: LocalHTLCFailureReason::InvalidOnionBlinding.failure_code(), sha256_of_onion: [0; 32], + monitor_event_source, }, None => { let err_packet = onion_error.get_encrypted_failure_packet( incoming_packet_shared_secret, &secondary_shared_secret, ); - HTLCForwardInfo::FailHTLC { htlc_id, err_packet } + HTLCForwardInfo::FailHTLC { htlc_id, err_packet, monitor_event_source } }, } } @@ -17963,15 +17996,21 @@ impl Writeable for HTLCForwardInfo { 0u8.write(w)?; info.write(w)?; }, - Self::FailHTLC { htlc_id, err_packet } => { + Self::FailHTLC { htlc_id, err_packet, monitor_event_source } => { FAIL_HTLC_VARIANT_ID.write(w)?; write_tlv_fields!(w, { (0, htlc_id, required), (2, err_packet.data, required), (5, err_packet.attribution_data, option), + (7, monitor_event_source, option), }); }, - Self::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion } => { + Self::FailMalformedHTLC { + htlc_id, + failure_code, + sha256_of_onion, + monitor_event_source, + } => { // Since this variant was added in 0.0.119, write this as `::FailHTLC` with an empty error // packet so older versions have something to fail back with, but serialize the real data as // optional TLVs for the benefit of newer versions. @@ -17981,6 +18020,7 @@ impl Writeable for HTLCForwardInfo { (1, failure_code, required), (2, Vec::::new(), required), (3, sha256_of_onion, required), + (7, monitor_event_source, option), }); }, } @@ -18001,6 +18041,7 @@ impl Readable for HTLCForwardInfo { (2, err_packet, required), (3, sha256_of_onion, option), (5, attribution_data, option), + (7, monitor_event_source, option), }); if let Some(failure_code) = malformed_htlc_failure_code { if attribution_data.is_some() { @@ -18010,6 +18051,7 @@ impl Readable for HTLCForwardInfo { htlc_id: _init_tlv_based_struct_field!(htlc_id, required), failure_code, sha256_of_onion: sha256_of_onion.ok_or(DecodeError::InvalidValue)?, + monitor_event_source, } } else { Self::FailHTLC { @@ -18018,6 +18060,7 @@ impl Readable for HTLCForwardInfo { data: _init_tlv_based_struct_field!(err_packet, required), attribution_data: _init_tlv_based_struct_field!(attribution_data, option), }, + monitor_event_source, } } }, From ccf63760fb97e32ad167718f2418479d070e3319 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Mon, 16 Mar 2026 16:29:47 -0400 Subject: [PATCH 13/19] Add monitor_event_source to holding cell HTLC fails Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. To ensure we can resolve HTLC monitor events for forward failures, we need to pipe the event id through the HTLC failure pipeline. We started this process in the previous commit, and here we continue by storing the monitor event id in the Channel's holding cell HTLC failures. In upcoming commits we will eventually get to the point of acking the monitor event when the HTLC is irrevocably removed from the inbound edge via monitor update. --- lightning/src/ln/channel.rs | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 8b05d984e30..ad398d3be9c 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -30,6 +30,7 @@ use crate::blinded_path::message::BlindedMessagePath; use crate::chain::chaininterface::{ ConfirmationTarget, FeeEstimator, LowerBoundedFeeEstimator, TransactionType, }; +use crate::chain::chainmonitor::MonitorEventSource; use crate::chain::channelmonitor::{ ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, CommitmentHTLCData, LATENCY_GRACE_PERIOD_BLOCKS, @@ -551,11 +552,21 @@ enum HTLCUpdateAwaitingACK { FailHTLC { htlc_id: u64, err_packet: msgs::OnionErrorPacket, + /// Contains a pointer to a [`MonitorEvent`] that can be removed from the outbound edge of this + /// failed forward, once the failure is durably persisted in this channel (the inbound edge). + /// + /// [`MonitorEvent`]: crate::chain::channelmonitor::MonitorEvent + monitor_event_source: Option, }, FailMalformedHTLC { htlc_id: u64, failure_code: u16, sha256_of_onion: [u8; 32], + /// Contains a pointer to a [`MonitorEvent`] that can be removed from the outbound edge of this + /// failed forward, once the failure is durably persisted in this channel (the inbound edge). + /// + /// [`MonitorEvent`]: crate::chain::channelmonitor::MonitorEvent + monitor_event_source: Option, }, } @@ -6566,7 +6577,7 @@ impl FailHTLCContents for msgs::OnionErrorPacket { InboundHTLCState::LocalRemoved(InboundHTLCRemovalReason::FailRelay(self)) } fn to_htlc_update_awaiting_ack(self, htlc_id: u64) -> HTLCUpdateAwaitingACK { - HTLCUpdateAwaitingACK::FailHTLC { htlc_id, err_packet: self } + HTLCUpdateAwaitingACK::FailHTLC { htlc_id, err_packet: self, monitor_event_source: None } } } impl FailHTLCContents for ([u8; 32], u16) { @@ -6590,6 +6601,7 @@ impl FailHTLCContents for ([u8; 32], u16) { htlc_id, sha256_of_onion: self.0, failure_code: self.1, + monitor_event_source: None, } } } @@ -8437,7 +8449,7 @@ where monitor_update.updates.append(&mut additional_monitor_update.updates); None }, - &HTLCUpdateAwaitingACK::FailHTLC { htlc_id, ref err_packet } => Some( + &HTLCUpdateAwaitingACK::FailHTLC { htlc_id, ref err_packet, .. } => Some( self.fail_htlc(htlc_id, err_packet.clone(), false, logger) .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), ), @@ -8445,6 +8457,7 @@ where htlc_id, failure_code, sha256_of_onion, + .. } => Some( self.fail_htlc(htlc_id, (sha256_of_onion, failure_code), false, logger) .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), @@ -15091,7 +15104,7 @@ impl Writeable for FundedChannel { // Store the attribution data for later writing. holding_cell_attribution_data.push(attribution_data.as_ref()); }, - &HTLCUpdateAwaitingACK::FailHTLC { ref htlc_id, ref err_packet } => { + &HTLCUpdateAwaitingACK::FailHTLC { ref htlc_id, ref err_packet, .. } => { 2u8.write(writer)?; htlc_id.write(writer)?; err_packet.data.write(writer)?; @@ -15103,6 +15116,7 @@ impl Writeable for FundedChannel { htlc_id, failure_code, sha256_of_onion, + .. } => { // We don't want to break downgrading by adding a new variant, so write a dummy // `::FailHTLC` variant and write the real malformed error as an optional TLV. @@ -15542,6 +15556,7 @@ impl<'a, 'b, 'c, ES: EntropySource, SP: SignerProvider> data: Readable::read(reader)?, attribution_data: None, }, + monitor_event_source: None, }, _ => return Err(DecodeError::InvalidValue), }); @@ -15996,7 +16011,7 @@ impl<'a, 'b, 'c, ES: EntropySource, SP: SignerProvider> let htlc_idx = holding_cell_htlc_updates .iter() .position(|htlc| { - if let HTLCUpdateAwaitingACK::FailHTLC { htlc_id, err_packet } = htlc { + if let HTLCUpdateAwaitingACK::FailHTLC { htlc_id, err_packet, .. } = htlc { let matches = *htlc_id == malformed_htlc_id; if matches { debug_assert!(err_packet.data.is_empty()) @@ -16011,6 +16026,7 @@ impl<'a, 'b, 'c, ES: EntropySource, SP: SignerProvider> htlc_id: malformed_htlc_id, failure_code, sha256_of_onion, + monitor_event_source: None, }; let _ = core::mem::replace(&mut holding_cell_htlc_updates[htlc_idx], malformed_htlc); @@ -17036,12 +17052,14 @@ mod tests { |htlc_id, attribution_data| HTLCUpdateAwaitingACK::FailHTLC { htlc_id, err_packet: msgs::OnionErrorPacket { data: vec![42], attribution_data }, + monitor_event_source: None, }; let dummy_holding_cell_malformed_htlc = |htlc_id| HTLCUpdateAwaitingACK::FailMalformedHTLC { htlc_id, failure_code: LocalHTLCFailureReason::InvalidOnionBlinding.failure_code(), sha256_of_onion: [0; 32], + monitor_event_source: None, }; let mut holding_cell_htlc_updates = Vec::with_capacity(12); for i in 0..16 { From 7977454eaa972625ad1f586755ebe8433bec3d0e Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Mon, 16 Mar 2026 16:37:55 -0400 Subject: [PATCH 14/19] Pipe monitor event source HTLCForwardInfo -> holding cell htlcs Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. To ensure we can resolve HTLC monitor events for forward failures, we need to pipe the event id through the HTLC failure pipeline. Here we pipe the monitor event source from the manager to the channel, so it can be put in the channel's holding cell. In upcoming commits we will eventually get to the point of acking the monitor event when the HTLC is irrevocably removed from the inbound edge via monitor update. --- lightning/src/ln/channel.rs | 66 ++++++++++++++++++++++-------- lightning/src/ln/channelmanager.rs | 15 +++++-- 2 files changed, 60 insertions(+), 21 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index ad398d3be9c..dca0bf893cf 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -6561,7 +6561,9 @@ trait FailHTLCContents { type Message: FailHTLCMessageName; fn to_message(self, htlc_id: u64, channel_id: ChannelId) -> Self::Message; fn to_inbound_htlc_state(self) -> InboundHTLCState; - fn to_htlc_update_awaiting_ack(self, htlc_id: u64) -> HTLCUpdateAwaitingACK; + fn to_htlc_update_awaiting_ack( + self, htlc_id: u64, monitor_event_source: Option, + ) -> HTLCUpdateAwaitingACK; } impl FailHTLCContents for msgs::OnionErrorPacket { type Message = msgs::UpdateFailHTLC; @@ -6576,8 +6578,10 @@ impl FailHTLCContents for msgs::OnionErrorPacket { fn to_inbound_htlc_state(self) -> InboundHTLCState { InboundHTLCState::LocalRemoved(InboundHTLCRemovalReason::FailRelay(self)) } - fn to_htlc_update_awaiting_ack(self, htlc_id: u64) -> HTLCUpdateAwaitingACK { - HTLCUpdateAwaitingACK::FailHTLC { htlc_id, err_packet: self, monitor_event_source: None } + fn to_htlc_update_awaiting_ack( + self, htlc_id: u64, monitor_event_source: Option, + ) -> HTLCUpdateAwaitingACK { + HTLCUpdateAwaitingACK::FailHTLC { htlc_id, err_packet: self, monitor_event_source } } } impl FailHTLCContents for ([u8; 32], u16) { @@ -6596,12 +6600,14 @@ impl FailHTLCContents for ([u8; 32], u16) { failure_code: self.1, }) } - fn to_htlc_update_awaiting_ack(self, htlc_id: u64) -> HTLCUpdateAwaitingACK { + fn to_htlc_update_awaiting_ack( + self, htlc_id: u64, monitor_event_source: Option, + ) -> HTLCUpdateAwaitingACK { HTLCUpdateAwaitingACK::FailMalformedHTLC { htlc_id, sha256_of_onion: self.0, failure_code: self.1, - monitor_event_source: None, + monitor_event_source, } } } @@ -7343,9 +7349,10 @@ where /// Returns `Err` (always with [`ChannelError::Ignore`]) if the HTLC could not be failed (e.g. /// if it was already resolved). Otherwise returns `Ok`. pub fn queue_fail_htlc( - &mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, logger: &L, + &mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, + monitor_event_source: Option, logger: &L, ) -> Result<(), ChannelError> { - self.fail_htlc(htlc_id_arg, err_packet, true, logger) + self.fail_htlc(htlc_id_arg, err_packet, true, monitor_event_source, logger) .map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?")) } @@ -7354,10 +7361,17 @@ where /// /// See [`Self::queue_fail_htlc`] for more info. pub fn queue_fail_malformed_htlc( - &mut self, htlc_id_arg: u64, failure_code: u16, sha256_of_onion: [u8; 32], logger: &L, + &mut self, htlc_id_arg: u64, failure_code: u16, sha256_of_onion: [u8; 32], + monitor_event_source: Option, logger: &L, ) -> Result<(), ChannelError> { - self.fail_htlc(htlc_id_arg, (sha256_of_onion, failure_code), true, logger) - .map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?")) + self.fail_htlc( + htlc_id_arg, + (sha256_of_onion, failure_code), + true, + monitor_event_source, + logger, + ) + .map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?")) } /// Returns `Err` (always with [`ChannelError::Ignore`]) if the HTLC could not be failed (e.g. @@ -7365,7 +7379,7 @@ where #[rustfmt::skip] fn fail_htlc( &mut self, htlc_id_arg: u64, err_contents: E, mut force_holding_cell: bool, - logger: &L + monitor_event_source: Option, logger: &L ) -> Result, ChannelError> { if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) { panic!("Was asked to fail an HTLC when channel was not in an operational state"); @@ -7420,7 +7434,7 @@ where } } log_trace!(logger, "Placing failure for HTLC ID {} in holding cell.", htlc_id_arg); - self.context.holding_cell_htlc_updates.push(err_contents.to_htlc_update_awaiting_ack(htlc_id_arg)); + self.context.holding_cell_htlc_updates.push(err_contents.to_htlc_update_awaiting_ack(htlc_id_arg, monitor_event_source)); return Ok(None); } @@ -8449,18 +8463,34 @@ where monitor_update.updates.append(&mut additional_monitor_update.updates); None }, - &HTLCUpdateAwaitingACK::FailHTLC { htlc_id, ref err_packet, .. } => Some( - self.fail_htlc(htlc_id, err_packet.clone(), false, logger) - .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), + &HTLCUpdateAwaitingACK::FailHTLC { + htlc_id, + ref err_packet, + monitor_event_source, + } => Some( + self.fail_htlc( + htlc_id, + err_packet.clone(), + false, + monitor_event_source, + logger, + ) + .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), ), &HTLCUpdateAwaitingACK::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion, - .. + monitor_event_source, } => Some( - self.fail_htlc(htlc_id, (sha256_of_onion, failure_code), false, logger) - .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), + self.fail_htlc( + htlc_id, + (sha256_of_onion, failure_code), + false, + monitor_event_source, + logger, + ) + .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), ), }; if let Some(res) = fail_htlc_res { diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index b01d4bc5f17..5cb614b92ba 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -8176,7 +8176,7 @@ impl< } None }, - HTLCForwardInfo::FailHTLC { htlc_id, ref err_packet, .. } => { + HTLCForwardInfo::FailHTLC { htlc_id, ref err_packet, monitor_event_source } => { if let Some(chan) = peer_state .channel_by_id .get_mut(&forward_chan_id) @@ -8184,7 +8184,15 @@ impl< { let logger = WithChannelContext::from(&self.logger, &chan.context, None); log_trace!(logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id); - Some((chan.queue_fail_htlc(htlc_id, err_packet.clone(), &&logger), htlc_id)) + Some(( + chan.queue_fail_htlc( + htlc_id, + err_packet.clone(), + monitor_event_source, + &&logger, + ), + htlc_id, + )) } else { self.forwarding_channel_not_found( core::iter::once(forward_info).chain(draining_pending_forwards), @@ -8200,7 +8208,7 @@ impl< htlc_id, failure_code, sha256_of_onion, - .. + monitor_event_source, } => { if let Some(chan) = peer_state .channel_by_id @@ -8213,6 +8221,7 @@ impl< htlc_id, failure_code, sha256_of_onion, + monitor_event_source, &&logger, ); Some((res, htlc_id)) From b9f276292428289a54a496c5baa205f8af8c6393 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Mon, 16 Mar 2026 17:40:44 -0400 Subject: [PATCH 15/19] Ack monitor events on check_free_peer_holding_cells Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we build on recent commits by ACK'ing monitor events for forward failures once the monitor update that marks them as failed on the inbound edge is complete. --- lightning/src/ln/channel.rs | 62 +++++++++++++++++++----------- lightning/src/ln/channelmanager.rs | 11 +++++- 2 files changed, 50 insertions(+), 23 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index dca0bf893cf..dc34ccf3aa4 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -8328,7 +8328,7 @@ where /// returns `(None, Vec::new())`. pub fn maybe_free_holding_cell_htlcs( &mut self, fee_estimator: &LowerBoundedFeeEstimator, logger: &L, - ) -> (Option, Vec<(HTLCSource, PaymentHash)>) { + ) -> (Option<(ChannelMonitorUpdate, Vec)>, Vec<(HTLCSource, PaymentHash)>) { if matches!(self.context.channel_state, ChannelState::ChannelReady(_)) && self.context.channel_state.can_generate_new_commitment() { @@ -8342,7 +8342,7 @@ where /// for our counterparty. fn free_holding_cell_htlcs( &mut self, fee_estimator: &LowerBoundedFeeEstimator, logger: &L, - ) -> (Option, Vec<(HTLCSource, PaymentHash)>) { + ) -> (Option<(ChannelMonitorUpdate, Vec)>, Vec<(HTLCSource, PaymentHash)>) { assert!(matches!(self.context.channel_state, ChannelState::ChannelReady(_))); assert!(!self.context.channel_state.is_monitor_update_in_progress()); assert!(!self.context.channel_state.is_quiescent()); @@ -8372,6 +8372,7 @@ where let mut update_fulfill_count = 0; let mut update_fail_count = 0; let mut htlcs_to_fail = Vec::new(); + let mut monitor_events_to_ack = Vec::new(); for htlc_update in htlc_updates.drain(..) { // Note that this *can* fail, though it should be due to rather-rare conditions on // fee races with adding too many outputs which push our total payments just over @@ -8467,31 +8468,41 @@ where htlc_id, ref err_packet, monitor_event_source, - } => Some( - self.fail_htlc( - htlc_id, - err_packet.clone(), - false, - monitor_event_source, - logger, + } => { + if let Some(source) = monitor_event_source { + monitor_events_to_ack.push(source); + } + Some( + self.fail_htlc( + htlc_id, + err_packet.clone(), + false, + monitor_event_source, + logger, + ) + .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), ) - .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), - ), + }, &HTLCUpdateAwaitingACK::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion, monitor_event_source, - } => Some( - self.fail_htlc( - htlc_id, - (sha256_of_onion, failure_code), - false, - monitor_event_source, - logger, + } => { + if let Some(source) = monitor_event_source { + monitor_events_to_ack.push(source); + } + Some( + self.fail_htlc( + htlc_id, + (sha256_of_onion, failure_code), + false, + monitor_event_source, + logger, + ) + .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), ) - .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), - ), + }, }; if let Some(res) = fail_htlc_res { match res { @@ -8543,7 +8554,11 @@ where Vec::new(), logger, ); - (self.push_ret_blockable_mon_update(monitor_update), htlcs_to_fail) + ( + self.push_ret_blockable_mon_update(monitor_update) + .map(|upd| (upd, monitor_events_to_ack)), + htlcs_to_fail, + ) } else { (None, Vec::new()) } @@ -8896,7 +8911,10 @@ where self.context.monitor_pending_update_adds.append(&mut pending_update_adds); match self.maybe_free_holding_cell_htlcs(fee_estimator, logger) { - (Some(mut additional_update), htlcs_to_fail) => { + // TODO: Thread monitor_events_to_ack through the revoke_and_ack return + // value so the ChannelManager can attach an AckMonitorEvents completion + // action to this monitor update. + (Some((mut additional_update, _monitor_events_to_ack)), htlcs_to_fail) => { // free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be // strictly increasing by one, so decrement it here. self.context.latest_monitor_update_id = monitor_update.update_id; diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 5cb614b92ba..4189a13412c 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -13779,7 +13779,16 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ ); if monitor_opt.is_some() || !holding_cell_failed_htlcs.is_empty() { let update_res = monitor_opt - .map(|monitor_update| { + .map(|(monitor_update, monitor_events_to_ack)| { + if !monitor_events_to_ack.is_empty() { + peer_state + .monitor_update_blocked_actions + .entry(*chan_id) + .or_default() + .push(MonitorUpdateCompletionAction::AckMonitorEvents { + monitor_events_to_ack, + }); + } self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, &mut peer_state.monitor_update_blocked_actions, From 6bea82ecbcc5e16969bf8e5f53dec69f41d5a75a Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Tue, 17 Mar 2026 10:17:48 -0400 Subject: [PATCH 16/19] Ack monitor events on revoke_and_ack Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we build on recent commits by ACK'ing monitor events for forward failures once the monitor update that marks them as failed on the inbound edge is complete, when the HTLC was irrevocably failed via revoke_and_ack. --- lightning/src/ln/channel.rs | 16 ++++++++++------ lightning/src/ln/channelmanager.rs | 11 ++++++++++- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index dc34ccf3aa4..d6075180c7c 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -8584,7 +8584,7 @@ where ( Vec<(HTLCSource, PaymentHash)>, Vec<(StaticInvoice, BlindedMessagePath)>, - Option, + Option<(ChannelMonitorUpdate, Vec)>, ), ChannelError, > { @@ -8895,6 +8895,7 @@ where } else { "Blocked" }; + let mut monitor_events_to_ack = Vec::new(); macro_rules! return_with_htlcs_to_fail { ($htlcs_to_fail: expr) => { if !release_monitor { @@ -8903,7 +8904,12 @@ where .push(PendingChannelMonitorUpdate { update: monitor_update }); return Ok(($htlcs_to_fail, static_invoices, None)); } else { - return Ok(($htlcs_to_fail, static_invoices, Some(monitor_update))); + let events_to_ack = core::mem::take(&mut monitor_events_to_ack); + return Ok(( + $htlcs_to_fail, + static_invoices, + Some((monitor_update, events_to_ack)), + )); } }; } @@ -8911,10 +8917,8 @@ where self.context.monitor_pending_update_adds.append(&mut pending_update_adds); match self.maybe_free_holding_cell_htlcs(fee_estimator, logger) { - // TODO: Thread monitor_events_to_ack through the revoke_and_ack return - // value so the ChannelManager can attach an AckMonitorEvents completion - // action to this monitor update. - (Some((mut additional_update, _monitor_events_to_ack)), htlcs_to_fail) => { + (Some((mut additional_update, holding_cell_monitor_events_to_ack)), htlcs_to_fail) => { + monitor_events_to_ack = holding_cell_monitor_events_to_ack; // free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be // strictly increasing by one, so decrement it here. self.context.latest_monitor_update_id = monitor_update.update_id; diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 4189a13412c..ded7e1ab5da 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -12978,7 +12978,16 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ *counterparty_node_id); let (htlcs_to_fail, static_invoices, monitor_update_opt) = try_channel_entry!(self, peer_state, chan.revoke_and_ack(&msg, &self.fee_estimator, &&logger, mon_update_blocked), chan_entry); - if let Some(monitor_update) = monitor_update_opt { + if let Some((monitor_update, monitor_events_to_ack)) = monitor_update_opt { + if !monitor_events_to_ack.is_empty() { + peer_state + .monitor_update_blocked_actions + .entry(msg.channel_id) + .or_default() + .push(MonitorUpdateCompletionAction::AckMonitorEvents { + monitor_events_to_ack, + }); + } let funding_txo = funding_txo_opt .expect("Funding outpoint must have been set for RAA handling to succeed"); if let Some(data) = self.handle_new_monitor_update( From 42823457bec8771bb1b8adb28fb53247dd3d010a Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Tue, 17 Mar 2026 10:21:44 -0400 Subject: [PATCH 17/19] Ack monitor event when inbound edge is closed Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we build on recent commits by ACK'ing monitor events for forward failures if try to fail backwards and discover the inbound edge is closed. If that's the case, there's nothing for us to do as the HTLC will be resolved via timeout by the inbound edge counterparty. --- lightning/src/ln/channelmanager.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index ded7e1ab5da..028bb8b201e 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -7971,11 +7971,15 @@ impl< continue; } }, - HTLCForwardInfo::FailHTLC { .. } | HTLCForwardInfo::FailMalformedHTLC { .. } => { + HTLCForwardInfo::FailHTLC { monitor_event_source, .. } + | HTLCForwardInfo::FailMalformedHTLC { monitor_event_source, .. } => { // Channel went away before we could fail it. This implies // the channel is now on chain and our counterparty is // trying to broadcast the HTLC-Timeout, but that's their // problem, not ours. + if let Some(source) = monitor_event_source { + self.chain_monitor.ack_monitor_event(source); + } }, } } From 20ebcd3d655478d2de9b26df685d470f68549513 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Tue, 17 Mar 2026 10:30:12 -0400 Subject: [PATCH 18/19] Ack monitor events when failing-to-fail-backwards Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we build on recent commits by ACK'ing monitor events for forward failures if we go to fail on the inbound edge and get an error when queueing the backwards failure. If we get an error in this case it means the failure is duplicate or the channel is closed, and in either case it's fine to mark the event as resolved. --- lightning/src/ln/channelmanager.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 028bb8b201e..4abc748ed3a 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -8196,6 +8196,7 @@ impl< &&logger, ), htlc_id, + monitor_event_source, )) } else { self.forwarding_channel_not_found( @@ -8228,7 +8229,7 @@ impl< monitor_event_source, &&logger, ); - Some((res, htlc_id)) + Some((res, htlc_id, monitor_event_source)) } else { self.forwarding_channel_not_found( core::iter::once(forward_info).chain(draining_pending_forwards), @@ -8241,8 +8242,12 @@ impl< } }, }; - if let Some((queue_fail_htlc_res, htlc_id)) = queue_fail_htlc_res { + if let Some((queue_fail_htlc_res, htlc_id, monitor_event_source)) = queue_fail_htlc_res + { if let Err(e) = queue_fail_htlc_res { + if let Some(source) = monitor_event_source { + self.chain_monitor.ack_monitor_event(source); + } if let ChannelError::Ignore(msg) = e { if let Some(chan) = peer_state .channel_by_id From bbf1786c618af9395a3c33fc43d7ecb8eda98e8b Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Tue, 17 Mar 2026 15:31:39 -0400 Subject: [PATCH 19/19] Support persistent monitor events Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we complete work that was built on recent prior commits and actually start re-providing monitor events on startup if they went un-acked during runtime. This isn't actually supported in prod yet, so this new code will run randomly in tests, to ensure we still support the old paths. --- lightning/src/chain/channelmonitor.rs | 47 ++++++++++++++++++++++----- lightning/src/ln/channelmanager.rs | 27 ++++++++++++++- lightning/src/ln/monitor_tests.rs | 3 ++ lightning/src/util/config.rs | 6 ++++ 4 files changed, 74 insertions(+), 9 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index cd8bde9662d..0eb5559cd05 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -1288,6 +1288,11 @@ pub(crate) struct ChannelMonitorImpl { // block/transaction-connected events and *not* during block/transaction-disconnected events, // we further MUST NOT generate events during block/transaction-disconnection. pending_monitor_events: Vec<(u64, MonitorEvent)>, + // `MonitorEvent`s that have been provided to the `ChannelManager` via + // [`ChannelMonitor::get_and_clear_pending_monitor_events`] and are awaiting + // [`ChannelMonitor::ack_monitor_event`] for removal. If an event in this queue is not ack'd, it + // will be re-provided to the `ChannelManager` on startup. + provided_monitor_events: Vec<(u64, MonitorEvent)>, /// When set, monitor events are retained until explicitly acked rather than cleared on read. /// /// Allows the ChannelManager to reconstruct pending HTLC state by replaying monitor events on @@ -1765,7 +1770,12 @@ pub(crate) fn write_chanmon_internal( // Only write `persistent_events_enabled` if it's set to true, as it's an even TLV. let persistent_events_enabled = channel_monitor.persistent_events_enabled.then_some(true); let pending_mon_evs_with_ids = if persistent_events_enabled.is_some() { - Some(Iterable(channel_monitor.pending_monitor_events.iter())) + Some(Iterable( + channel_monitor + .provided_monitor_events + .iter() + .chain(channel_monitor.pending_monitor_events.iter()), + )) } else { None }; @@ -1978,6 +1988,7 @@ impl ChannelMonitor { payment_preimages: new_hash_map(), pending_monitor_events: Vec::new(), + provided_monitor_events: Vec::new(), persistent_events_enabled: false, next_monitor_event_id: 0, pending_events: Vec::new(), @@ -2202,8 +2213,15 @@ impl ChannelMonitor { } /// Removes a [`MonitorEvent`] by its event ID, acknowledging that it has been processed. - pub(super) fn ack_monitor_event(&self, _event_id: u64) { - // TODO: once events have ids, remove the corresponding event here + pub(super) fn ack_monitor_event(&self, event_id: u64) { + let inner = &mut *self.inner.lock().unwrap(); + inner.provided_monitor_events.retain(|(id, _)| *id != event_id); + } + + /// Enables persistent monitor events mode. When enabled, monitor events are retained until + /// explicitly acked rather than cleared on read. + pub(crate) fn set_persistent_events_enabled(&self, enabled: bool) { + self.inner.lock().unwrap().persistent_events_enabled = enabled; } /// Copies [`MonitorEvent`] state from `other` into `self`. @@ -2211,11 +2229,16 @@ impl ChannelMonitor { /// serialization round-trip. #[cfg(any(test, feature = "_test_utils"))] pub fn copy_monitor_event_state(&self, other: &ChannelMonitor) { - let (pending, next_id) = { + let (provided, pending, next_id) = { let other_inner = other.inner.lock().unwrap(); - (other_inner.pending_monitor_events.clone(), other_inner.next_monitor_event_id) + ( + other_inner.provided_monitor_events.clone(), + other_inner.pending_monitor_events.clone(), + other_inner.next_monitor_event_id, + ) }; let mut self_inner = self.inner.lock().unwrap(); + self_inner.provided_monitor_events = provided; self_inner.pending_monitor_events = pending; self_inner.next_monitor_event_id = next_id; } @@ -4619,9 +4642,16 @@ impl ChannelMonitorImpl { } fn get_and_clear_pending_monitor_events(&mut self) -> Vec<(u64, MonitorEvent)> { - let mut ret = Vec::new(); - mem::swap(&mut ret, &mut self.pending_monitor_events); - ret + if self.persistent_events_enabled { + let mut ret = Vec::new(); + mem::swap(&mut ret, &mut self.pending_monitor_events); + self.provided_monitor_events.extend(ret.iter().cloned()); + ret + } else { + let mut ret = Vec::new(); + mem::swap(&mut ret, &mut self.pending_monitor_events); + ret + } } /// Gets the set of events that are repeated regularly (e.g. those which RBF bump @@ -6971,6 +7001,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP payment_preimages, pending_monitor_events, + provided_monitor_events: Vec::new(), persistent_events_enabled, next_monitor_event_id, pending_events, diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 4abc748ed3a..9b290c2fe61 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -3641,6 +3641,8 @@ impl< our_network_pubkey, current_timestamp, expanded_inbound_key, node_signer.get_receive_auth_key(), secp_ctx.clone(), message_router, logger.clone(), ); + #[cfg(test)] + let override_persistent_monitor_events = config.override_persistent_monitor_events; ChannelManager { config: RwLock::new(config), @@ -3697,7 +3699,27 @@ impl< logger, - persistent_monitor_events: false, + persistent_monitor_events: { + #[cfg(not(test))] + { false } + #[cfg(test)] + { + override_persistent_monitor_events.unwrap_or_else(|| { + use core::hash::{BuildHasher, Hasher}; + match std::env::var("LDK_TEST_PERSISTENT_MON_EVENTS") { + Ok(val) => match val.as_str() { + "1" => true, + "0" => false, + _ => panic!("LDK_TEST_PERSISTENT_MON_EVENTS must be 0 or 1, got: {}", val), + }, + Err(_) => { + let rand_val = std::collections::hash_map::RandomState::new().build_hasher().finish(); + rand_val % 2 == 0 + }, + } + }) + } + }, #[cfg(feature = "_test_utils")] testing_dnssec_proof_offer_resolution_override: Mutex::new(new_hash_map()), @@ -11718,6 +11740,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ fail_chan!("Already had channel with the new channel_id"); }, hash_map::Entry::Vacant(e) => { + monitor.set_persistent_events_enabled(self.persistent_monitor_events); let monitor_res = self.chain_monitor.watch_channel(monitor.channel_id(), monitor); if let Ok(persist_state) = monitor_res { // There's no problem signing a counterparty's funding transaction if our monitor @@ -11888,6 +11911,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ match chan .funding_signed(&msg, best_block, &self.signer_provider, &self.logger) .and_then(|(funded_chan, monitor)| { + monitor.set_persistent_events_enabled(self.persistent_monitor_events); self.chain_monitor .watch_channel(funded_chan.context.channel_id(), monitor) .map_err(|()| { @@ -12803,6 +12827,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ if let Some(chan) = chan.as_funded_mut() { if let Some(monitor) = monitor_opt { + monitor.set_persistent_events_enabled(self.persistent_monitor_events); let monitor_res = self.chain_monitor.watch_channel(monitor.channel_id(), monitor); if let Ok(persist_state) = monitor_res { diff --git a/lightning/src/ln/monitor_tests.rs b/lightning/src/ln/monitor_tests.rs index 1a68e5165a8..4c97df2f42e 100644 --- a/lightning/src/ln/monitor_tests.rs +++ b/lightning/src/ln/monitor_tests.rs @@ -3592,6 +3592,9 @@ fn do_test_lost_timeout_monitor_events(confirm_tx: CommitmentType, dust_htlcs: b let mut cfg = test_default_channel_config(); cfg.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true; cfg.channel_handshake_config.negotiate_anchor_zero_fee_commitments = p2a_anchor; + // This test specifically tests lost monitor events, which requires the legacy + // (non-persistent) monitor event behavior. + cfg.override_persistent_monitor_events = Some(false); let cfgs = [Some(cfg.clone()), Some(cfg.clone()), Some(cfg.clone())]; let chanmon_cfgs = create_chanmon_cfgs(3); diff --git a/lightning/src/util/config.rs b/lightning/src/util/config.rs index e4158910b9a..6b0f7942423 100644 --- a/lightning/src/util/config.rs +++ b/lightning/src/util/config.rs @@ -1103,6 +1103,10 @@ pub struct UserConfig { /// /// [`ChannelManager::splice_channel`]: crate::ln::channelmanager::ChannelManager::splice_channel pub reject_inbound_splices: bool, + /// If set to `Some`, overrides the random selection of whether to use persistent monitor + /// events. Only available in tests. + #[cfg(test)] + pub override_persistent_monitor_events: Option, } impl Default for UserConfig { @@ -1119,6 +1123,8 @@ impl Default for UserConfig { enable_htlc_hold: false, hold_outbound_htlcs_at_next_hop: false, reject_inbound_splices: true, + #[cfg(test)] + override_persistent_monitor_events: None, } } }