diff --git a/vm-migration/src/protocol.rs b/vm-migration/src/protocol.rs index 88e7e81a57..e1fc874bd5 100644 --- a/vm-migration/src/protocol.rs +++ b/vm-migration/src/protocol.rs @@ -271,12 +271,30 @@ impl Response { } #[repr(C)] -#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct MemoryRange { pub gpa: u64, pub length: u64, } +impl MemoryRange { + /// Tries to merge `next` into `current` if they overlap or touch. + /// Returns the extended range on success, or `None` if they are disjoint. + /// + /// Assumes `next.gpa >= current.gpa` (i.e. ranges are sorted). + fn try_merge(current: MemoryRange, next: MemoryRange) -> Option { + let current_end = current.gpa + current.length; + if next.gpa <= current_end { + Some(MemoryRange { + gpa: current.gpa, + length: (next.gpa + next.length).max(current_end) - current.gpa, + }) + } else { + None + } + } +} + #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct MemoryRangeTable { data: Vec, @@ -355,6 +373,57 @@ impl MemoryRangeTable { &self.data } + /// Merges a [`MemoryRangeTable`] into the current table and collapses overlapping ranges into + /// a single range. + /// + /// It expects that `self` and `other` are sorted and hold each `gpa` once, i.e., unique entries + /// per gpa. + pub fn merge_in_place(&mut self, mut other: MemoryRangeTable) { + if other.data.is_empty() { + return; + } + + if self.data.is_empty() { + self.data = other.data; + return; + } + + // Check invariants we require, which makes the algorithm much simpler + { + debug_assert!(self.data.is_sorted_by_key(|r| r.gpa)); + debug_assert!(other.data.is_sorted_by_key(|r| r.gpa)); + + debug_assert!( + self.data.windows(2).all(|w| w[0].gpa != w[1].gpa), + "gpa not unique!" + ); + debug_assert!( + other.data.windows(2).all(|w| w[0].gpa != w[1].gpa), + "gpa not unique!" + ); + } + + // Algorithm: Combine both tables, sort by gpa, then do a single pass + // collapsing overlapping or touching ranges. + self.data.append(&mut other.data); + self.data.sort_unstable_by_key(|r| r.gpa); + + let mut write = 0_usize; + + // For each gpa, we check if we can merge it with the next range + for read in 1..self.data.len() { + match MemoryRange::try_merge(self.data[write], self.data[read]) { + Some(merged) => self.data[write] = merged, + None => { + write += 1; + self.data[write] = self.data[read]; + } + } + } + + self.data.truncate(write + 1); + } + /// Partitions the table into chunks of at most `chunk_size` bytes. pub fn partition(&self, chunk_size: u64) -> impl Iterator { MemoryRangeTableIterator::new(self, chunk_size) @@ -540,10 +609,10 @@ mod unit_tests { assert_eq!( chunks, &[ - [expected_regions[0].clone()].to_vec(), - [expected_regions[1].clone()].to_vec(), - [expected_regions[2].clone()].to_vec(), - [expected_regions[3].clone()].to_vec(), + [expected_regions[0]].to_vec(), + [expected_regions[1]].to_vec(), + [expected_regions[2]].to_vec(), + [expected_regions[3]].to_vec(), ] ); } @@ -593,4 +662,119 @@ mod unit_tests { ); } } + + fn table(ranges: &[(u64, u64)]) -> MemoryRangeTable { + MemoryRangeTable { + data: ranges + .iter() + .map(|&(gpa, length)| MemoryRange { gpa, length }) + .collect(), + } + } + + fn ranges(t: &MemoryRangeTable) -> Vec<(u64, u64)> { + t.data.iter().map(|r| (r.gpa, r.length)).collect() + } + + fn assert_canonical(t: &MemoryRangeTable) { + for w in t.data.windows(2) { + let a = &w[0]; + let b = &w[1]; + let a_end = a.gpa + a.length; + + assert!(a.length > 0); + assert!( + a_end < b.gpa, + "Ranges overlap or touch (and should have been merged)" + ); + } + } + + #[test] + fn merge_disjoint() { + let mut a = table(&[(0, 10)]); + let b = table(&[(20, 5)]); + + a.merge_in_place(b); + + assert_eq!(ranges(&a), vec![(0, 10), (20, 5)]); + assert_canonical(&a); + } + + #[test] + fn merge_overlap() { + let mut a = table(&[(0, 10)]); + let b = table(&[(5, 10)]); + + a.merge_in_place(b); + + assert_eq!(ranges(&a), vec![(0, 15)]); + assert_canonical(&a); + } + + #[test] + fn merge_adjacent() { + let mut a = table(&[(0, 10)]); + let b = table(&[(10, 5)]); + + a.merge_in_place(b); + + assert_eq!(ranges(&a), vec![(0, 15)]); + assert_canonical(&a); + } + + #[test] + fn merge_contained() { + let mut a = table(&[(0, 20)]); + let b = table(&[(5, 5)]); + + a.merge_in_place(b); + + assert_eq!(ranges(&a), vec![(0, 20)]); + assert_canonical(&a); + } + + #[test] + fn merge_chain_across_tables() { + let mut a = table(&[(0, 5), (20, 5)]); + let b = table(&[(5, 15)]); + + a.merge_in_place(b); + + assert_eq!(ranges(&a), vec![(0, 25)]); + assert_canonical(&a); + } + + #[test] + fn merge_self_empty() { + let mut a = table(&[]); + let b = table(&[(10, 5)]); + + a.merge_in_place(b); + + assert_eq!(ranges(&a), vec![(10, 5)]); + assert_canonical(&a); + } + + #[test] + fn merge_other_empty() { + let mut a = table(&[(10, 5)]); + let b = table(&[]); + + a.merge_in_place(b); + + assert_eq!(ranges(&a), vec![(10, 5)]); + assert_canonical(&a); + } + + #[test] + fn merge_duplicates() { + let mut a = table(&[(2, 1), (3, 2), (10, 5)]); + let b = table(&[(2, 1), (10, 5), (20, 9)]); + + a.merge_in_place(b); + + assert_eq!(ranges(&a), vec![(2, 3), (10, 5), (20, 9)]); + assert_canonical(&a); + } } diff --git a/vmm/src/api/mod.rs b/vmm/src/api/mod.rs index fdc6fce760..9c7a217e8b 100644 --- a/vmm/src/api/mod.rs +++ b/vmm/src/api/mod.rs @@ -1602,8 +1602,6 @@ impl ApiAction for VmMigrationProgress { fn request(&self, _: Self::RequestBody, response_sender: Sender) -> ApiRequest { Box::new(move |vmm| { - debug!("API request event: VmMigrationProgress"); - let snapshot = Ok(vmm.vm_migration_progress()); let response = snapshot .map(Box::new) diff --git a/vmm/src/dirty_log_worker.rs b/vmm/src/dirty_log_worker.rs new file mode 100644 index 0000000000..fab38432f3 --- /dev/null +++ b/vmm/src/dirty_log_worker.rs @@ -0,0 +1,290 @@ +// Copyright © 2026 Cyberus Technology GmbH +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::any::Any; +use std::collections::VecDeque; +use std::sync::{Arc, Condvar, Mutex, Weak}; +use std::thread::JoinHandle; +use std::time::{Duration, Instant}; +use std::{io, mem, thread}; + +use arch::PAGE_SIZE; +use log::{error, info}; +use vm_migration::protocol::MemoryRangeTable; +use vm_migration::{Migratable, MigratableError}; + +use crate::cpu::CpuManager; +use crate::memory_manager::MemoryManager; + +/// Throttle (sleep) time of the thread per iteration. +/// +/// 33ms means roughly 10-30 effective iterations per second. If this value is +/// too small, we take computation time from the vCPUs away. If we set too big, +/// the dirty memory ranges computations might take too long as there is too +/// much data to process - this might delay the final iteration. +/// +/// If the thread is in its sleep, it can be interrupted. +const THREAD_THROTTLE: Duration = Duration::from_millis(33); + +/// The timeslice in which we calculate the current dirty rate of the VM. +/// +/// Every [`THREAD_THROTTLE`] seconds, we get a set of dirtied memory and +/// calculate the dirty rate. This constant specifies how many samples we +/// look back into the future to calculate the average dirty rate. +/// +/// In other words: If this is 1s, we always return the average dirty rate of +/// the past second. +const DIRTY_RATE_CALC_TIMESLICE: Duration = Duration::from_secs(1); + +type JoinError = Box; + +/// All shared state of [`DirtyLogWorker`] that is behind the same lock. +struct DirtyLogWorkerSharedState { + /// The dirty rates measured in the past [`DIRTY_RATE_CALC_TIMESLICE`]. + /// + /// Used to calculate the dirty rate. + dirty_rates_pps: VecDeque, + /// The constantly updated (and merged) memory range table since the data + /// was moved out of the struct the last time. + table: MemoryRangeTable, + /// The timestamp of the last processing, used to calculate the dirty rate. + last_timestamp: Instant, + /// Set to true to signal the worker thread to stop and exit. + stop: bool, +} + +impl DirtyLogWorkerSharedState { + /// Adds a new dirty rate measurement to the underlying vector. + /// + /// Removes old elements from the vector. + fn update_dirty_rate(&mut self, dirty_rate_pps: u64) { + self.dirty_rates_pps.push_front(dirty_rate_pps); + if self.dirty_rates_pps.len() > Self::dirty_rate_vec_capacity() { + self.dirty_rates_pps.pop_back(); + } + } + + /// Returns the average dirty rate in pages per second. + fn average_dirty_rate_pps(&self) -> u64 { + if self.dirty_rates_pps.is_empty() { + 0 + } else { + self.dirty_rates_pps + .iter() + .sum::() + .div_ceil(self.dirty_rates_pps.len() as u64) + } + } + + /// Removes old elements from the vector and returns the average + /// dirty rate for the past [`DIRTY_RATE_CALC_TIMESLICE`]. + const fn dirty_rate_vec_capacity() -> usize { + DIRTY_RATE_CALC_TIMESLICE + .as_millis() + .div_ceil(THREAD_THROTTLE.as_millis()) as usize + } +} + +/// Worker thread that continuously fetches the dirty log. +pub struct DirtyLogWorker { + stop_condvar: Arc, + shared_state: Arc>, + cpu_manager: Weak>, + memory_manager: Weak>, +} + +impl DirtyLogWorker { + /// Spawns a new [`DirtyLogWorker`] and returns a [`DirtyLogWorkerHandle`] to it. + pub fn spawn( + cpu_manager: &Arc>, + memory_manager: &Arc>, + ) -> Result { + let stop_condvar = Arc::new(Condvar::new()); + let table = MemoryRangeTable::from_dirty_bitmap([], 0, 0); + + let shared_state = DirtyLogWorkerSharedState { + last_timestamp: Instant::now(), + dirty_rates_pps: VecDeque::new(), + table, + stop: false, + }; + let shared_state = Arc::new(Mutex::new(shared_state)); + + let worker = Self { + stop_condvar: stop_condvar.clone(), + shared_state: shared_state.clone(), + cpu_manager: Arc::downgrade(cpu_manager), + memory_manager: Arc::downgrade(memory_manager), + }; + + let inner_handle = thread::Builder::new() + .name("dirty-log-worker".to_string()) + .spawn(|| worker.run())?; + + let handle = DirtyLogWorkerHandle { + handle: Some(inner_handle), + stop_condvar, + shared_state, + }; + + Ok(handle) + } + + /// Fetches the latest snapshot of all dirty tables and merges them into a single one. + fn fetch_table(&self) -> Result { + let mut cpu_table = self + .cpu_manager + .upgrade() + .expect("VM's CpuManager should outlive this thread") + .lock() + .unwrap() + .dirty_log()?; + + let memory_table = self + .memory_manager + .upgrade() + .expect("VM's MemoryManager should outlive this thread") + .lock() + .unwrap() + .dirty_log()?; + + // Extend here is fine as they won't overlap. + cpu_table.extend(memory_table); + Ok(cpu_table) + } + + /// Updates internal metrics, such as the dirty rate. Also merges the new table + /// with the table of the previous iteration. + fn calc_metrics_and_update_table(&self, new_table: MemoryRangeTable) { + let mut state_lock = self.shared_state.lock().unwrap(); + + let elapsed = state_lock.last_timestamp.elapsed(); + let new_dirty_size = new_table + .regions() + .iter() + .map(|range| range.length) + .sum::(); + + // Calc dirty rate for current cycle + let dirty_rate_pps = if elapsed.is_zero() { + 0 + } else { + let dirty_rate_f64 = (new_dirty_size / PAGE_SIZE as u64) as f64 / elapsed.as_secs_f64(); + dirty_rate_f64.ceil() as u64 + }; + + state_lock.update_dirty_rate(dirty_rate_pps); + state_lock.table.merge_in_place(new_table); + state_lock.last_timestamp = Instant::now(); + } + + /// Starts the thread and let it run until [`DirtyLogWorkerHandle::stop`] is called. + pub fn run(self) -> Result<(), MigratableError /* dirty log error */> { + info!("thread started"); + + let worker_res = loop { + // Fetch the latest dirty log and release locks ASAP + let new_table = self.fetch_table()?; + self.calc_metrics_and_update_table(new_table); + + // Rate limiting plus better resolution for dirty rate calculation. + // Uses the condvar so we can be woken up early if stop is requested. + // To ensure the last call returns the freshest data, we exit the + // thread after querying the latest data. + let state_lock = self.shared_state.lock().unwrap(); + + // We sleep but might get woken up by our handler to exit. + let (guard, _timed_out) = self + .stop_condvar + .wait_timeout_while(state_lock, THREAD_THROTTLE, |state| !state.stop) + .unwrap(); + + if guard.stop { + // At this point, we assume the VM is stopped and perform one last fetch. + let new_table = self.fetch_table()?; + self.calc_metrics_and_update_table(new_table); + + info!("thread exiting"); + break Ok(()); + } + }; + + if let Err(e) = &worker_res { + error!("Thread experienced an error and stopped its work: {e:?}"); + } + + worker_res + } +} + +/// Handle to a [`DirtyLogWorker`] thread. +pub struct DirtyLogWorkerHandle { + // Option so that we can take the inner handle. + handle: Option>>, + stop_condvar: Arc, + shared_state: Arc>, +} + +impl DirtyLogWorkerHandle { + fn exit_and_join_thread(&mut self) -> Result<(), JoinError> { + info!("stopping thread ..."); + let begin = Instant::now(); + + // Tells the thread that it should exit ASAP. + self.shared_state.lock().unwrap().stop = true; + // We kick it out of a potential sleep() + self.stop_condvar.notify_one(); + + let thread_res = self + .handle + .take() + .expect("should have thread handle") + .join()?; + + match thread_res { + Ok(_) => { + info!("stopped thread after {}ms", begin.elapsed().as_millis()); + } + Err(e) => { + error!( + "Thread encountered an error: {e} (stopped thread after {}ms)", + begin.elapsed().as_millis() + ); + } + } + + Ok(()) + } + + /// Stops and terminates the thread gracefully. + /// + /// You must call this **after the VM is paused** and **before dirty logging** was stopped! + /// The call will then return the final memory range table of dirtied memory. + pub fn stop(mut self) -> Result<(MemoryRangeTable, u64 /* dirty rate */), JoinError> { + self.exit_and_join_thread()?; + Ok(self.get()) + } + + /// Gets the latest [`MemoryRangeTable`] of dirtied memory and the latest dirty rate. + /// + /// It replaces the internal state with an empty table. Callers are expected to call this once + /// per precopy iteration. + pub fn get(&self) -> (MemoryRangeTable, u64 /* dirty rate */) { + let mut lock = self.shared_state.lock().unwrap(); + let table = mem::take(&mut lock.table); + (table, lock.average_dirty_rate_pps()) + } +} + +impl Drop for DirtyLogWorkerHandle { + fn drop(&mut self) { + if self.handle.is_some() { + // We end up here in case of canceled or failed migrations. + if let Err(e) = self.exit_and_join_thread() { + error!("Failed to join thread: {e:?}"); + } + } + } +} diff --git a/vmm/src/lib.rs b/vmm/src/lib.rs index 82bc5ffc8a..b5e463714d 100644 --- a/vmm/src/lib.rs +++ b/vmm/src/lib.rs @@ -81,6 +81,7 @@ use crate::config::{RestoreConfig, add_to_config}; use crate::coredump::GuestDebuggable; #[cfg(feature = "kvm")] use crate::cpu::IS_IN_SHUTDOWN; +use crate::dirty_log_worker::{DirtyLogWorker, DirtyLogWorkerHandle}; use crate::landlock::Landlock; use crate::memory_manager::MemoryManager; use crate::migration::{get_vm_snapshot, recv_vm_config, recv_vm_state}; @@ -102,6 +103,7 @@ mod coredump; pub mod cpu; pub mod device_manager; pub mod device_tree; +mod dirty_log_worker; #[cfg(feature = "guest_debug")] mod gdb; #[cfg(feature = "igvm")] @@ -2433,6 +2435,7 @@ impl Vmm { migrate_downtime_limit: Duration, postponed_lifecycle_event: &Mutex>, return_if_cancelled_cb: &impl Fn(&mut SocketStream) -> result::Result<(), MigratableError>, + dirty_log_aggregator: &DirtyLogWorkerHandle, ) -> result::Result { let mut iteration_table; let total_memory_size_bytes = vm @@ -2515,7 +2518,9 @@ impl Vmm { iteration_table = if s.iteration == 0 { vm.memory_range_table()? } else { - vm.dirty_log()? + let (dirty_table, new_dirty_rate_pps) = dirty_log_aggregator.get(); + s.dirty_rate_pps = new_dirty_rate_pps; + dirty_table }; // Update the pending size (amount of data to transfer) @@ -2633,6 +2638,11 @@ impl Vmm { // Start logging dirty pages vm.start_dirty_log()?; + + // Start dirty log aggregator after dirty log was started. + let dirty_log_aggregator = DirtyLogWorker::spawn(vm.cpu_manager(), vm.memory_manager()) + .map_err(|e| MigratableError::MigrateSend(e.into()))?; + let iteration_table = Self::memory_copy_iterations( vm, &mem_send, @@ -2642,6 +2652,7 @@ impl Vmm { migrate_downtime_limit, postponed_lifecycle_event, return_if_cancelled_cb, + &dirty_log_aggregator, )?; info!("Entering downtime phase"); @@ -2654,11 +2665,16 @@ impl Vmm { vm.pause()?; info!("paused VM"); - // Send last batch of dirty pages - let mut final_table = vm.dirty_log()?; - final_table.extend(iteration_table.clone()); + // Send last batch of dirty pages: + let (mut final_table, _) = dirty_log_aggregator.stop().map_err(|e| { + MigratableError::MigrateSend(anyhow!("Failed to join dirty log worker: {e:?}")) + })?; + final_table.merge_in_place(iteration_table); mem_send.send_memory(&final_table, socket, return_if_cancelled_cb)?; + // Must happen after the dirty log aggregator is stopped + vm.stop_dirty_log()?; + // Update statistics s.bytes_to_transmit = final_table.regions().iter().map(|range| range.length).sum(); s.pages_to_transmit = s.bytes_to_transmit.div_ceil(PAGE_SIZE as u64); diff --git a/vmm/src/memory_manager.rs b/vmm/src/memory_manager.rs index 2fa847a696..ec093670bd 100644 --- a/vmm/src/memory_manager.rs +++ b/vmm/src/memory_manager.rs @@ -25,7 +25,7 @@ use devices::ioapic; #[cfg(target_arch = "aarch64")] use hypervisor::HypervisorVmError; use libc::_SC_NPROCESSORS_ONLN; -use log::{debug, error, info, trace, warn}; +use log::{debug, error, info, warn}; use serde::{Deserialize, Serialize}; use thiserror::Error; use tracer::trace_scoped; @@ -2664,15 +2664,6 @@ impl Migratable for MemoryManager { let sub_table = MemoryRangeTable::from_dirty_bitmap(dirty_bitmap, r.gpa, 4096); - if sub_table.regions().is_empty() { - debug!("Dirty Memory Range Table is empty"); - } else { - debug!("Dirty Memory Range Table:"); - for range in sub_table.regions() { - trace!("GPA: {:x} size: {} (KiB)", range.gpa, range.length / 1024); - } - } - table.extend(sub_table); } Ok(table) diff --git a/vmm/src/vm.rs b/vmm/src/vm.rs index f647409a29..69feaaf84f 100644 --- a/vmm/src/vm.rs +++ b/vmm/src/vm.rs @@ -3038,6 +3038,14 @@ impl Vm { .nmi() .map_err(|_| Error::ErrorNmi); } + + pub fn memory_manager(&self) -> &Arc> { + &self.memory_manager + } + + pub fn cpu_manager(&self) -> &Arc> { + &self.cpu_manager + } } impl Pausable for Vm { @@ -3279,6 +3287,8 @@ impl Migratable for Vm { } fn dirty_log(&mut self) -> std::result::Result { + // Please note that this is unused, since we have a different thread + // that operates directly on the underlying data sources! Ok(MemoryRangeTable::new_from_tables(vec![ self.memory_manager.lock().unwrap().dirty_log()?, self.device_manager.lock().unwrap().dirty_log()?,