diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index c38d6dfe080..942a7dd6064 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1152,6 +1152,11 @@ where let mut futures = Joiner::new(); + // Capture the number of pending monitor writes before persisting the channel manager. + // We'll only flush this many writes after the manager is persisted, to avoid flushing + // monitor updates that arrived after the manager state was captured. + let pending_monitor_writes = chain_monitor.pending_monitor_operation_count(); + if channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!(logger, "Persisting ChannelManager..."); @@ -1349,6 +1354,13 @@ where res?; } + // Flush the monitor writes that were pending before we persisted the channel manager. + // Any writes that arrived after are left in the queue for the next iteration. + if pending_monitor_writes > 0 { + log_trace!(logger, "Flushing {} monitor writes", pending_monitor_writes); + chain_monitor.flush(pending_monitor_writes); + } + match check_and_reset_sleeper(&mut last_onion_message_handler_call, || { sleeper(ONION_MESSAGE_HANDLER_TIMER) }) { @@ -1413,6 +1425,14 @@ where channel_manager.get_cm().encode(), ) .await?; + + // Flush all pending monitor writes after final channel manager persistence. + let pending_monitor_writes = chain_monitor.pending_monitor_operation_count(); + if pending_monitor_writes > 0 { + log_trace!(logger, "Flushing {} monitor writes on shutdown", pending_monitor_writes); + chain_monitor.flush(pending_monitor_writes); + } + if let Some(ref scorer) = scorer { kv_store .write( @@ -1722,6 +1742,10 @@ impl BackgroundProcessor { channel_manager.get_cm().timer_tick_occurred(); last_freshness_call = Instant::now(); } + + // Capture the number of pending monitor writes before persisting the channel manager. + let pending_monitor_writes = chain_monitor.pending_monitor_operation_count(); + if channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!(logger, "Persisting ChannelManager..."); (kv_store.write( @@ -1733,6 +1757,12 @@ impl BackgroundProcessor { log_trace!(logger, "Done persisting ChannelManager."); } + // Flush the monitor writes that were pending before we persisted the channel manager. + if pending_monitor_writes > 0 { + log_trace!(logger, "Flushing {} monitor writes", pending_monitor_writes); + chain_monitor.flush(pending_monitor_writes); + } + if let Some(liquidity_manager) = liquidity_manager.as_ref() { log_trace!(logger, "Persisting LiquidityManager..."); let _ = liquidity_manager.get_lm().persist().map_err(|e| { @@ -1853,6 +1883,18 @@ impl BackgroundProcessor { CHANNEL_MANAGER_PERSISTENCE_KEY, channel_manager.get_cm().encode(), )?; + + // Flush all pending monitor writes after final channel manager persistence. + let pending_monitor_writes = chain_monitor.pending_monitor_operation_count(); + if pending_monitor_writes > 0 { + log_trace!( + logger, + "Flushing {} monitor writes on shutdown", + pending_monitor_writes + ); + chain_monitor.flush(pending_monitor_writes); + } + if let Some(ref scorer) = scorer { kv_store.write( SCORER_PERSISTENCE_PRIMARY_NAMESPACE, diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 9fd6383cf7e..625151c56b7 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -235,6 +235,14 @@ impl MonitorHolder { } } +/// A pending operation queued for later execution in [`ChainMonitor::flush`]. +enum PendingMonitorOp { + /// A new monitor to insert and persist. + NewMonitor { channel_id: ChannelId, monitor: ChannelMonitor }, + /// An update to apply and persist. + Update { channel_id: ChannelId, update: ChannelMonitorUpdate }, +} + /// A read-only reference to a current ChannelMonitor. /// /// Note that this holds a mutex in [`ChainMonitor`] and may block other events until it is @@ -395,6 +403,13 @@ pub struct ChainMonitor< /// Messages to send to the peer. This is currently used to distribute PeerStorage to channel partners. pending_send_only_events: Mutex>, + /// Pending monitor operations queued for later execution. + /// + /// When [`chain::Watch::watch_channel`] or [`chain::Watch::update_channel`] is called, + /// the operation is queued here instead of being immediately executed. Operations are + /// executed when [`Self::flush`] is called. + pending_monitor_operations: Mutex>>, + #[cfg(peer_storage)] our_peerstorage_encryption_key: PeerStorageKey, } @@ -453,6 +468,7 @@ impl< event_notifier: Arc::clone(&event_notifier), persister: AsyncPersister { persister, event_notifier }, pending_send_only_events: Mutex::new(Vec::new()), + pending_monitor_operations: Mutex::new(Vec::new()), #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, } @@ -663,6 +679,7 @@ where highest_chain_height: AtomicUsize::new(0), event_notifier: Arc::new(Notifier::new()), pending_send_only_events: Mutex::new(Vec::new()), + pending_monitor_operations: Mutex::new(Vec::new()), #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, } @@ -747,6 +764,216 @@ where self.monitors.write().unwrap().remove(channel_id).unwrap().monitor } + /// Returns the number of pending monitor operations queued for later execution. + /// + /// This can be used to capture the queue size before persisting the channel manager, + /// then pass that count to [`Self::flush`] to only flush those specific operations. + pub fn pending_monitor_operation_count(&self) -> usize { + self.pending_monitor_operations.lock().unwrap().len() + } + + /// Applies and persists pending monitor updates. + /// + /// This method takes the first `count` pending updates from the queue, applies each update + /// to its corresponding [`ChannelMonitor`], and then persists the updated monitor via the + /// configured [`Persist`] implementation. + /// + /// For each new monitor or update that is successfully persisted (i.e., the persister returns + /// [`ChannelMonitorUpdateStatus::Completed`]), [`Self::channel_monitor_updated`] is called + /// to signal completion. If the persister returns [`ChannelMonitorUpdateStatus::InProgress`], + /// the update is tracked as pending and the persister is expected to call + /// [`Self::channel_monitor_updated`] later when persistence completes. + /// + /// # Panics + /// + /// Panics if the persister returns [`ChannelMonitorUpdateStatus::UnrecoverableError`]. + pub fn flush(&self, count: usize) { + // Drain the requested number of operations from the queue. + // We hold the monitors write lock while draining to prevent a race with watch_channel. + let pending_ops: Vec> = { + let _monitors = self.monitors.write().unwrap(); + let mut queue = self.pending_monitor_operations.lock().unwrap(); + let n = count.min(queue.len()); + queue.drain(..n).collect() + }; + + for op in pending_ops { + match op { + PendingMonitorOp::NewMonitor { channel_id, monitor } => { + let logger = WithChannelMonitor::from(&self.logger, &monitor, None); + log_trace!(logger, "Inserting new ChannelMonitor"); + + let update_id = monitor.get_latest_update_id(); + let persist_res = + self.persister.persist_new_channel(monitor.persistence_key(), &monitor); + + let is_pending = match persist_res { + ChannelMonitorUpdateStatus::Completed => { + log_info!(logger, "Persistence of new ChannelMonitor completed"); + false + }, + ChannelMonitorUpdateStatus::InProgress => { + log_info!( + logger, + "Persistence of new ChannelMonitor in progress, update_id {}", + update_id + ); + true + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + let err_str = "ChannelMonitor persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(logger, "{}", err_str); + panic!("{}", err_str); + }, + }; + + // Register outputs with chain source before inserting. + if let Some(ref chain_source) = self.chain_source { + monitor.load_outputs_to_watch(chain_source, &self.logger); + } + + // Insert the monitor, tracking the update as pending if persistence is async. + let pending_updates = if is_pending { vec![update_id] } else { Vec::new() }; + let mut monitors = self.monitors.write().unwrap(); + match monitors.entry(channel_id) { + hash_map::Entry::Occupied(_) => { + log_error!( + logger, + "Failed to add new channel data: channel monitor for given channel ID is already present" + ); + continue; + }, + hash_map::Entry::Vacant(entry) => { + entry.insert(MonitorHolder { + monitor, + pending_monitor_updates: Mutex::new(pending_updates), + }); + }, + } + drop(monitors); + + // Signal completion only if persistence completed synchronously. + if !is_pending { + let _ = self.channel_monitor_updated(channel_id, update_id); + } + }, + PendingMonitorOp::Update { channel_id, update } => { + let monitors = self.monitors.read().unwrap(); + let monitor_state = match monitors.get(&channel_id) { + Some(state) => state, + None => { + // Monitor was removed (channel closed) before we could flush. + log_debug!( + self.logger, + "Skipping flush for channel {} - monitor no longer exists", + channel_id + ); + continue; + }, + }; + + let monitor = &monitor_state.monitor; + let logger = WithChannelMonitor::from(&self.logger, monitor, None); + log_trace!(logger, "Applying ChannelMonitor update id {}", update.update_id); + + // Lock pending_monitor_updates to prevent race with channel_monitor_updated. + let mut pending_monitor_updates = + monitor_state.pending_monitor_updates.lock().unwrap(); + + // Apply the update to the in-memory ChannelMonitor. + let update_res = monitor.update_monitor( + &update, + &self.broadcaster, + &self.fee_estimator, + &self.logger, + ); + + let update_id = update.update_id; + + // Persist the updated monitor. + let persist_res = if update_res.is_err() { + log_warn!( + logger, + "Failed to update ChannelMonitor. Going ahead and persisting the entire ChannelMonitor" + ); + self.persister.update_persisted_channel( + monitor.persistence_key(), + None, + monitor, + ) + } else { + self.persister.update_persisted_channel( + monitor.persistence_key(), + Some(&update), + monitor, + ) + }; + + let is_pending = match persist_res { + ChannelMonitorUpdateStatus::Completed => { + log_debug!( + logger, + "Persistence of ChannelMonitorUpdate id {:?} completed", + update_id, + ); + false + }, + ChannelMonitorUpdateStatus::InProgress => { + log_debug!( + logger, + "Persistence of ChannelMonitorUpdate id {:?} in progress", + update_id, + ); + pending_monitor_updates.push(update_id); + true + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + core::mem::drop(pending_monitor_updates); + core::mem::drop(monitors); + let _poison = self.monitors.write().unwrap(); + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(logger, "{}", err_str); + panic!("{}", err_str); + }, + }; + + // Drop locks before calling channel_monitor_updated to avoid deadlock. + drop(pending_monitor_updates); + drop(monitors); + + // Signal completion only if persistence completed synchronously. + if !is_pending { + let _ = self.channel_monitor_updated(channel_id, update_id); + } + + // Register any renegotiated funding outputs with the chain source. + if let Some(ref chain_source) = self.chain_source { + let monitors = self.monitors.read().unwrap(); + if let Some(monitor_state) = monitors.get(&channel_id) { + let monitor = &monitor_state.monitor; + let logger = WithChannelMonitor::from(&self.logger, monitor, None); + for (funding_outpoint, funding_script) in + update.internal_renegotiated_funding_data() + { + log_trace!( + logger, + "Registering renegotiated funding outpoint {} with the filter to monitor confirmations and spends", + funding_outpoint + ); + chain_source.register_tx(&funding_outpoint.txid, &funding_script); + chain_source.register_output(WatchedOutput { + block_hash: None, + outpoint: funding_outpoint, + script_pubkey: funding_script, + }); + } + } + } + }, + } + } + } + /// Indicates the persistence of a [`ChannelMonitor`] has completed after /// [`ChannelMonitorUpdateStatus::InProgress`] was returned from an update operation. /// @@ -1344,40 +1571,30 @@ where &self, channel_id: ChannelId, monitor: ChannelMonitor, ) -> Result { let logger = WithChannelMonitor::from(&self.logger, &monitor, None); - let mut monitors = self.monitors.write().unwrap(); - let entry = match monitors.entry(channel_id) { - hash_map::Entry::Occupied(_) => { - log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present"); - return Err(()); - }, - hash_map::Entry::Vacant(e) => e, - }; - log_trace!(logger, "Got new ChannelMonitor"); - let update_id = monitor.get_latest_update_id(); - let mut pending_monitor_updates = Vec::new(); - let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor); - match persist_res { - ChannelMonitorUpdateStatus::InProgress => { - log_info!(logger, "Persistence of new ChannelMonitor in progress",); - pending_monitor_updates.push(update_id); - }, - ChannelMonitorUpdateStatus::Completed => { - log_info!(logger, "Persistence of new ChannelMonitor completed",); - }, - ChannelMonitorUpdateStatus::UnrecoverableError => { - let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(logger, "{}", err_str); - panic!("{}", err_str); - }, - } - if let Some(ref chain_source) = self.chain_source { - monitor.load_outputs_to_watch(chain_source, &self.logger); + + // Hold both locks to prevent race with flush() which moves monitors from + // pending queue to monitors. + let monitors = self.monitors.read().unwrap(); + let mut pending_ops = self.pending_monitor_operations.lock().unwrap(); + + // Check channel_id not already in monitors or pending queue. + if monitors.contains_key(&channel_id) { + log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present"); + return Err(()); } - entry.insert(MonitorHolder { - monitor, - pending_monitor_updates: Mutex::new(pending_monitor_updates), + let already_pending = pending_ops.iter().any(|op| match op { + PendingMonitorOp::NewMonitor { channel_id: id, .. } => *id == channel_id, + _ => false, }); - Ok(persist_res) + if already_pending { + log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already pending"); + return Err(()); + } + + log_trace!(logger, "Queueing new ChannelMonitor for later insertion"); + pending_ops.push(PendingMonitorOp::NewMonitor { channel_id, monitor }); + + Ok(ChannelMonitorUpdateStatus::InProgress) } fn update_channel( @@ -1386,112 +1603,41 @@ where // `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those // versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`. debug_assert_eq!(update.channel_id.unwrap(), channel_id); - // Update the monitor that watches the channel referred to by the given outpoint. + + let logger = WithContext::from(&self.logger, None, Some(channel_id), None); + + // Hold both locks to check that the monitor exists either in monitors or pending queue. let monitors = self.monitors.read().unwrap(); - match monitors.get(&channel_id) { - None => { - let logger = WithContext::from(&self.logger, None, Some(channel_id), None); - log_error!(logger, "Failed to update channel monitor: no such monitor registered"); - - // We should never ever trigger this from within ChannelManager. Technically a - // user could use this object with some proxying in between which makes this - // possible, but in tests and fuzzing, this should be a panic. - #[cfg(debug_assertions)] - panic!("ChannelManager generated a channel update for a channel that was not yet registered!"); - #[cfg(not(debug_assertions))] - ChannelMonitorUpdateStatus::InProgress - }, - Some(monitor_state) => { - let monitor = &monitor_state.monitor; - let logger = WithChannelMonitor::from(&self.logger, &monitor, None); - log_trace!(logger, "Updating ChannelMonitor to id {}", update.update_id,); - - // We hold a `pending_monitor_updates` lock through `update_monitor` to ensure we - // have well-ordered updates from the users' point of view. See the - // `pending_monitor_updates` docs for more. - let mut pending_monitor_updates = - monitor_state.pending_monitor_updates.lock().unwrap(); - let update_res = monitor.update_monitor( - update, - &self.broadcaster, - &self.fee_estimator, - &self.logger, - ); + let mut pending_ops = self.pending_monitor_operations.lock().unwrap(); - let update_id = update.update_id; - let persist_res = if update_res.is_err() { - // Even if updating the monitor returns an error, the monitor's state will - // still be changed. Therefore, we should persist the updated monitor despite the error. - // We don't want to persist a `monitor_update` which results in a failure to apply later - // while reading `channel_monitor` with updates from storage. Instead, we should persist - // the entire `channel_monitor` here. - log_warn!(logger, "Failed to update ChannelMonitor. Going ahead and persisting the entire ChannelMonitor"); - self.persister.update_persisted_channel( - monitor.persistence_key(), - None, - monitor, - ) - } else { - self.persister.update_persisted_channel( - monitor.persistence_key(), - Some(update), - monitor, - ) - }; - match persist_res { - ChannelMonitorUpdateStatus::InProgress => { - pending_monitor_updates.push(update_id); - log_debug!( - logger, - "Persistence of ChannelMonitorUpdate id {:?} in progress", - update_id, - ); - }, - ChannelMonitorUpdateStatus::Completed => { - log_debug!( - logger, - "Persistence of ChannelMonitorUpdate id {:?} completed", - update_id, - ); - }, - ChannelMonitorUpdateStatus::UnrecoverableError => { - // Take the monitors lock for writing so that we poison it and any future - // operations going forward fail immediately. - core::mem::drop(pending_monitor_updates); - core::mem::drop(monitors); - let _poison = self.monitors.write().unwrap(); - let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(logger, "{}", err_str); - panic!("{}", err_str); - }, - } + let in_monitors = monitors.contains_key(&channel_id); + let in_pending = pending_ops.iter().any(|op| match op { + PendingMonitorOp::NewMonitor { channel_id: id, .. } => *id == channel_id, + _ => false, + }); - // We may need to start monitoring for any alternative funding transactions. - if let Some(ref chain_source) = self.chain_source { - for (funding_outpoint, funding_script) in - update.internal_renegotiated_funding_data() - { - log_trace!( - logger, - "Registering renegotiated funding outpoint {} with the filter to monitor confirmations and spends", - funding_outpoint - ); - chain_source.register_tx(&funding_outpoint.txid, &funding_script); - chain_source.register_output(WatchedOutput { - block_hash: None, - outpoint: funding_outpoint, - script_pubkey: funding_script, - }); - } - } + if !in_monitors && !in_pending { + log_error!(logger, "Failed to update channel monitor: no such monitor registered"); - if update_res.is_err() { - ChannelMonitorUpdateStatus::InProgress - } else { - persist_res - } - }, + // We should never ever trigger this from within ChannelManager. Technically a + // user could use this object with some proxying in between which makes this + // possible, but in tests and fuzzing, this should be a panic. + #[cfg(debug_assertions)] + panic!("ChannelManager generated a channel update for a channel that was not yet registered!"); + #[cfg(not(debug_assertions))] + return ChannelMonitorUpdateStatus::InProgress; } + + log_trace!( + logger, + "Queueing ChannelMonitor update id {} for later application", + update.update_id, + ); + + // Queue the update for later application and persistence in flush(). + pending_ops.push(PendingMonitorOp::Update { channel_id, update: update.clone() }); + + ChannelMonitorUpdateStatus::InProgress } fn release_pending_monitor_events(