From 44d6f8fe13838303dfca009aec1497c1ea8653cb Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Tue, 24 Feb 2026 18:13:37 +0100 Subject: [PATCH] ROX-33197: implement periodic scanning of host paths The purpose of this scan is to mitigate any inconsistencies in the inode tracking we might get from missed events on the kernel and will also update the maps on a configuration change. The existing scan method is extended to first cleanup any items that are no longer monitored or don't exist anymore on the host, then directories that are to be monitored are scanned and the inode maps are populated from the files found. The scan interval is set to 30 seconds by default, but can be modified through the regular configuration methods (file, env var, CLI arg). Unit tests for parsing of this new configuration value have been added. Some basic metrics have been added to the host_scanner module, these will keep track of: - Events coming through that get their host path filled in. - Elements being updated/removed during a scan. Finally, some integration tests that changed the monitored path are fixed. With these changes, the scanning of new monitored paths are causing the host path on those tests to be populated correctly, showing the scan works as expected. --- fact/src/config/mod.rs | 35 ++++++++++ fact/src/config/reloader.rs | 20 ++++++ fact/src/config/tests.rs | 106 +++++++++++++++++++++++++++++++ fact/src/host_scanner.rs | 87 +++++++++++++++++++++++-- fact/src/lib.rs | 9 ++- fact/src/metrics/host_scanner.rs | 73 +++++++++++++++++++++ fact/src/metrics/mod.rs | 8 +++ tests/test_config_hotreload.py | 6 +- 8 files changed, 334 insertions(+), 10 deletions(-) create mode 100644 fact/src/metrics/host_scanner.rs diff --git a/fact/src/config/mod.rs b/fact/src/config/mod.rs index 42e016d1..4091477f 100644 --- a/fact/src/config/mod.rs +++ b/fact/src/config/mod.rs @@ -4,6 +4,7 @@ use std::{ path::{Path, PathBuf}, str::FromStr, sync::LazyLock, + time::Duration, }; use anyhow::{bail, Context}; @@ -31,6 +32,7 @@ pub struct FactConfig { json: Option, ringbuf_size: Option, hotreload: Option, + scan_interval: Option, } impl FactConfig { @@ -95,6 +97,10 @@ impl FactConfig { if let Some(hotreload) = from.hotreload { self.hotreload = Some(hotreload); } + + if let Some(scan_interval) = from.scan_interval { + self.scan_interval = Some(scan_interval); + } } pub fn paths(&self) -> &[PathBuf] { @@ -117,6 +123,10 @@ impl FactConfig { self.hotreload.unwrap_or(true) } + pub fn scan_interval(&self) -> Duration { + self.scan_interval.unwrap_or(Duration::from_secs(30)) + } + #[cfg(test)] pub fn set_paths(&mut self, paths: Vec) { self.paths = Some(paths); @@ -216,6 +226,21 @@ impl TryFrom> for FactConfig { }; config.hotreload = Some(hotreload); } + "scan_interval" => { + if let Some(scan_interval) = v.as_f64() { + if scan_interval <= 0.0 { + bail!("invalid scan_interval: {scan_interval}"); + } + config.scan_interval = Some(Duration::from_secs_f64(scan_interval)); + } else if let Some(scan_interval) = v.as_i64() { + if scan_interval <= 0 { + bail!("invalid scan_interval: {scan_interval}"); + } + config.scan_interval = Some(Duration::from_secs(scan_interval as u64)) + } else { + bail!("scan_interval field has incorrect type: {v:?}"); + } + } name => bail!("Invalid field '{name}' with value: {v:?}"), } } @@ -429,6 +454,15 @@ pub struct FactCli { hotreload: bool, #[arg(long, overrides_with = "hotreload", hide(true))] no_hotreload: bool, + + /// Interval at which scanning of monitored directories should + /// happen in seconds. + /// + /// The seconds can use a decimal point for fractions of seconds. + /// + /// Default value is 30 seconds + #[arg(long, short, env = "FACT_SCAN_INTERVAL")] + scan_interval: Option, } impl FactCli { @@ -448,6 +482,7 @@ impl FactCli { json: resolve_bool_arg(self.json, self.no_json), ringbuf_size: self.ringbuf_size, hotreload: resolve_bool_arg(self.hotreload, self.no_hotreload), + scan_interval: self.scan_interval.map(Duration::from_secs_f64), } } } diff --git a/fact/src/config/reloader.rs b/fact/src/config/reloader.rs index 0994a7c9..1f33c37a 100644 --- a/fact/src/config/reloader.rs +++ b/fact/src/config/reloader.rs @@ -17,6 +17,7 @@ pub struct Reloader { grpc: watch::Sender, paths: watch::Sender>, files: HashMap<&'static str, i64>, + scan_interval: watch::Sender, trigger: Arc, } @@ -75,6 +76,12 @@ impl Reloader { self.paths.subscribe() } + /// Subscribe to get notifications when scan_interval configuration + /// is changed. + pub fn scan_interval(&self) -> watch::Receiver { + self.scan_interval.subscribe() + } + /// Get a reference to the internal trigger for manual reloading of /// configuration. /// @@ -171,6 +178,17 @@ impl Reloader { } }); + self.scan_interval.send_if_modified(|old| { + let new = new.scan_interval(); + if *old != new { + debug!("Sending new scan interval configuration..."); + *old = new; + true + } else { + false + } + }); + if self.config.hotreload() != new.hotreload() { warn!("Changes to the hotreload field only take effect on startup"); } @@ -203,6 +221,7 @@ impl From for Reloader { let (endpoint, _) = watch::channel(config.endpoint.clone()); let (grpc, _) = watch::channel(config.grpc.clone()); let (paths, _) = watch::channel(config.paths().to_vec()); + let (scan_interval, _) = watch::channel(config.scan_interval()); let trigger = Arc::new(Notify::new()); Reloader { @@ -210,6 +229,7 @@ impl From for Reloader { endpoint, grpc, paths, + scan_interval, files, trigger, } diff --git a/fact/src/config/tests.rs b/fact/src/config/tests.rs index 46b2b973..1573a509 100644 --- a/fact/src/config/tests.rs +++ b/fact/src/config/tests.rs @@ -203,6 +203,20 @@ fn parsing() { ..Default::default() }, ), + ( + "scan_interval: 60", + FactConfig { + scan_interval: Some(Duration::from_secs(60)), + ..Default::default() + }, + ), + ( + "scan_interval: 30.5", + FactConfig { + scan_interval: Some(Duration::from_secs_f64(30.5)), + ..Default::default() + }, + ), ( r#" paths: @@ -218,6 +232,7 @@ fn parsing() { json: false ringbuf_size: 8192 hotreload: false + scan_interval: 60 "#, FactConfig { paths: Some(vec![PathBuf::from("/etc")]), @@ -234,6 +249,7 @@ fn parsing() { json: Some(false), ringbuf_size: Some(8192), hotreload: Some(false), + scan_interval: Some(Duration::from_secs(60)), }, ), ]; @@ -387,6 +403,14 @@ paths: "hotreload: 4", "hotreload field has incorrect type: Integer(4)", ), + ( + "scan_interval: true", + "scan_interval field has incorrect type: Boolean(true)", + ), + ("scan_interval: 0", "invalid scan_interval: 0"), + ("scan_interval: 0.0", "invalid scan_interval: 0"), + ("scan_interval: -128", "invalid scan_interval: -128"), + ("scan_interval: -128.5", "invalid scan_interval: -128.5"), ("unknown:", "Invalid field 'unknown' with value: Null"), ]; for (input, expected) in tests { @@ -726,6 +750,36 @@ fn update() { ..Default::default() }, ), + ( + "ringbuf_size: 16384", + FactConfig::default(), + FactConfig { + ringbuf_size: Some(16384), + ..Default::default() + }, + ), + ( + "ringbuf_size: 16384", + FactConfig { + ringbuf_size: Some(8192), + ..Default::default() + }, + FactConfig { + ringbuf_size: Some(16384), + ..Default::default() + }, + ), + ( + "ringbuf_size: 16384", + FactConfig { + ringbuf_size: Some(16384), + ..Default::default() + }, + FactConfig { + ringbuf_size: Some(16384), + ..Default::default() + }, + ), ( "hotreload: false", FactConfig::default(), @@ -756,6 +810,55 @@ fn update() { ..Default::default() }, ), + ( + "scan_interval: 60", + FactConfig::default(), + FactConfig { + scan_interval: Some(Duration::from_secs(60)), + ..Default::default() + }, + ), + ( + "scan_interval: 0.5", + FactConfig::default(), + FactConfig { + scan_interval: Some(Duration::from_secs_f64(0.5)), + ..Default::default() + }, + ), + ( + "scan_interval: 60", + FactConfig { + scan_interval: Some(Duration::from_secs(30)), + ..Default::default() + }, + FactConfig { + scan_interval: Some(Duration::from_secs(60)), + ..Default::default() + }, + ), + ( + "scan_interval: 25.5", + FactConfig { + scan_interval: Some(Duration::from_secs(30)), + ..Default::default() + }, + FactConfig { + scan_interval: Some(Duration::from_secs_f64(25.5)), + ..Default::default() + }, + ), + ( + "scan_interval: 60", + FactConfig { + scan_interval: Some(Duration::from_secs(60)), + ..Default::default() + }, + FactConfig { + scan_interval: Some(Duration::from_secs(60)), + ..Default::default() + }, + ), ( r#" paths: @@ -771,6 +874,7 @@ fn update() { json: false ringbuf_size: 16384 hotreload: false + scan_interval: 60 "#, FactConfig { paths: Some(vec![PathBuf::from("/etc"), PathBuf::from("/bin")]), @@ -787,6 +891,7 @@ fn update() { json: Some(true), ringbuf_size: Some(64), hotreload: Some(true), + scan_interval: Some(Duration::from_secs(30)), }, FactConfig { paths: Some(vec![PathBuf::from("/etc")]), @@ -803,6 +908,7 @@ fn update() { json: Some(false), ringbuf_size: Some(16384), hotreload: Some(false), + scan_interval: Some(Duration::from_secs(60)), }, ), ]; diff --git a/fact/src/host_scanner.rs b/fact/src/host_scanner.rs index 2adbb58d..73893967 100644 --- a/fact/src/host_scanner.rs +++ b/fact/src/host_scanner.rs @@ -23,6 +23,7 @@ use std::{ os::linux::fs::MetadataExt, path::{Path, PathBuf}, sync::Arc, + time::Duration, }; use anyhow::Context; @@ -30,29 +31,39 @@ use aya::maps::MapData; use fact_ebpf::{inode_key_t, inode_value_t}; use log::{debug, info, warn}; use tokio::{ - sync::{broadcast, mpsc, watch}, + sync::{broadcast, mpsc, watch, Notify}, task::JoinHandle, }; -use crate::{bpf::Bpf, event::Event, host_info}; +use crate::{ + bpf::Bpf, + event::Event, + host_info, + metrics::host_scanner::{HostScannerMetrics, ScanLabels}, +}; pub struct HostScanner { kernel_inode_map: RefCell>, inode_map: RefCell>, - config: watch::Receiver>, + paths: watch::Receiver>, + scan_interval: watch::Receiver, running: watch::Receiver, rx: mpsc::Receiver, tx: broadcast::Sender>, + + metrics: HostScannerMetrics, } impl HostScanner { pub fn new( bpf: &mut Bpf, rx: mpsc::Receiver, - config: watch::Receiver>, + paths: watch::Receiver>, + scan_interval: watch::Receiver, running: watch::Receiver, + metrics: HostScannerMetrics, ) -> anyhow::Result { let kernel_inode_map = RefCell::new(bpf.take_inode_map()?); let inode_map = RefCell::new(std::collections::HashMap::new()); @@ -61,10 +72,12 @@ impl HostScanner { let host_scanner = HostScanner { kernel_inode_map, inode_map, - config, + paths, + scan_interval, running, rx, tx, + metrics, }; // Run an initial scan to fill in the inode map @@ -75,7 +88,26 @@ impl HostScanner { fn scan(&self) -> anyhow::Result<()> { debug!("Host scan started"); - for path in self.config.borrow().iter() { + self.metrics.scan_inc(ScanLabels::Scans); + let config = self.paths.borrow(); + + // Cleanup any items that are either: + // * Not configured to be monitored anymore. + // * Are configured to be monitored but no longer are found in + // the file system. + self.inode_map.borrow_mut().retain(|inode, path| { + if config.iter().any(|prefix| path.starts_with(prefix)) + && host_info::prepend_host_mount(path).exists() + { + true + } else { + let _ = self.kernel_inode_map.borrow_mut().remove(inode); + self.metrics.scan_inc(ScanLabels::InodeRemoved); + false + } + }); + + for path in config.iter() { let path = host_info::prepend_host_mount(path); self.scan_inner(&path)?; } @@ -85,15 +117,21 @@ impl HostScanner { } fn scan_inner(&self, path: &Path) -> anyhow::Result<()> { + self.metrics.scan_inc(ScanLabels::ElementsScanned); + if path.is_dir() { + self.metrics.scan_inc(ScanLabels::DirectoryScanned); for entry in path.read_dir()?.flatten() { let entry = entry.path(); self.scan_inner(&entry) .with_context(|| format!("Failed to scan {}", entry.display()))?; } } else if path.is_file() { + self.metrics.scan_inc(ScanLabels::FileScanned); self.update_entry(path) .with_context(|| format!("Failed to update entry for {}", path.display()))?; + } else { + self.metrics.scan_inc(ScanLabels::FsItemIgnored); } Ok(()) } @@ -101,6 +139,7 @@ impl HostScanner { fn update_entry(&self, path: &Path) -> anyhow::Result<()> { if !path.exists() { // If path does not exist, we don't have anything to update + self.metrics.scan_inc(ScanLabels::FileRemoved); return Ok(()); } @@ -118,6 +157,8 @@ impl HostScanner { let entry = inode_map.entry(inode).or_default(); *entry = host_info::remove_host_mount(path); + self.metrics.scan_inc(ScanLabels::FileUpdated); + debug!("Added entry for {}: {inode:?}", path.display()); Ok(()) } @@ -132,7 +173,35 @@ impl HostScanner { self.inode_map.borrow().get(inode?).cloned() } + /// Periodically notify the host scanner main task that a scan needs + /// to happen. + /// + /// This is needed because `tokio::time::Interval::tick` will create + /// a new future every time it is called, if used in a + /// `tokio::select` with other events that trigger more often, the + /// tick will never happen. This way we have a separate task that + /// will reliably send a notification to the main one. + fn start_scan_notifier(&self, scan_trigger: Arc) { + let mut running = self.running.clone(); + let mut scan_interval = self.scan_interval.clone(); + tokio::spawn(async move { + while *running.borrow() { + let mut interval = tokio::time::interval(*scan_interval.borrow()); + loop { + tokio::select! { + _ = interval.tick() => scan_trigger.notify_one(), + _ = running.changed() => break, + _ = scan_interval.changed() => break, + } + } + } + }); + } + pub fn start(mut self) -> JoinHandle> { + let scan_trigger = Arc::new(Notify::new()); + self.start_scan_notifier(scan_trigger.clone()); + tokio::spawn(async move { info!("Starting host scanner..."); @@ -143,20 +212,26 @@ impl HostScanner { info!("No more events to process"); break; }; + self.metrics.events.added(); if let Some(host_path) = self.get_host_path(Some(event.get_inode())) { + self.metrics.scan_inc(ScanLabels::InodeHit); event.set_host_path(host_path); } if let Some(host_path) = self.get_host_path(event.get_old_inode()) { + self.metrics.scan_inc(ScanLabels::InodeHit); event.set_old_host_path(host_path); } let event = Arc::new(event); if let Err(e) = self.tx.send(event) { + self.metrics.events.dropped(); warn!("Failed to send event: {e}"); } }, + _ = scan_trigger.notified() => self.scan()?, + _ = self.paths.changed() => self.scan()?, _ = self.running.changed() => { if !*self.running.borrow() { break; diff --git a/fact/src/lib.rs b/fact/src/lib.rs index 880813bb..422df3b2 100644 --- a/fact/src/lib.rs +++ b/fact/src/lib.rs @@ -83,7 +83,14 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> { let mut bpf = Bpf::new(reloader.paths(), reloader.config().ringbuf_size(), tx)?; let exporter = Exporter::new(bpf.take_metrics()?); - let host_scanner = HostScanner::new(&mut bpf, rx, reloader.paths(), running.subscribe())?; + let host_scanner = HostScanner::new( + &mut bpf, + rx, + reloader.paths(), + reloader.scan_interval(), + running.subscribe(), + exporter.metrics.host_scanner.clone(), + )?; output::start( host_scanner.subscribe(), diff --git a/fact/src/metrics/host_scanner.rs b/fact/src/metrics/host_scanner.rs new file mode 100644 index 00000000..f5197cf5 --- /dev/null +++ b/fact/src/metrics/host_scanner.rs @@ -0,0 +1,73 @@ +use prometheus_client::{ + encoding::{EncodeLabelSet, EncodeLabelValue}, + metrics::{counter::Counter, family::Family}, + registry::Registry, +}; + +use crate::metrics::{EventCounter, LabelValues as EventLabels}; + +#[derive(Clone, Hash, Eq, Debug, PartialEq, EncodeLabelValue, Copy)] +pub enum ScanLabels { + Scans, + ElementsScanned, + InodeRemoved, + InodeHit, + DirectoryScanned, + FileScanned, + FileRemoved, + FileUpdated, + FsItemIgnored, +} + +#[derive(Clone, Hash, Eq, Debug, PartialEq, EncodeLabelSet)] +pub struct ScanEvents { + label: ScanLabels, +} + +#[derive(Debug, Clone)] +/// Metrics for the HostScanner component +pub struct HostScannerMetrics { + pub events: EventCounter, + pub scan: Family>, +} + +impl HostScannerMetrics { + pub(super) fn new() -> Self { + let labels = [EventLabels::Total, EventLabels::Added, EventLabels::Dropped]; + let events = EventCounter::new( + "host_scanner_events", + "Events processed by the host scanner component", + &labels, + ); + + let scan: Family> = Default::default(); + for label in [ + ScanLabels::Scans, + ScanLabels::ElementsScanned, + ScanLabels::InodeRemoved, + ScanLabels::InodeHit, + ScanLabels::DirectoryScanned, + ScanLabels::FileScanned, + ScanLabels::FileRemoved, + ScanLabels::FileUpdated, + ScanLabels::FsItemIgnored, + ] { + let _ = scan.get_or_create(&ScanEvents { label }); + } + + HostScannerMetrics { events, scan } + } + + pub(super) fn register(&self, reg: &mut Registry) { + self.events.register(reg); + reg.register( + "host_scanner_scan", + "Counter of events by scans from the host scanner component", + self.scan.clone(), + ); + } + + pub fn scan_inc(&self, label: ScanLabels) { + self.scan.get_or_create(&ScanEvents { label }).inc(); + } +} diff --git a/fact/src/metrics/mod.rs b/fact/src/metrics/mod.rs index 38e579b8..97213c83 100644 --- a/fact/src/metrics/mod.rs +++ b/fact/src/metrics/mod.rs @@ -4,7 +4,10 @@ use prometheus_client::{ registry::Registry, }; +use host_scanner::HostScannerMetrics; + pub mod exporter; +pub mod host_scanner; mod kernel_metrics; #[derive(Clone, Hash, Eq, Debug, PartialEq, EncodeLabelValue, Copy)] @@ -135,6 +138,7 @@ impl OutputMetrics { pub struct Metrics { pub bpf_worker: EventCounter, pub output: OutputMetrics, + pub host_scanner: HostScannerMetrics, } impl Metrics { @@ -153,9 +157,13 @@ impl Metrics { let output_metrics = OutputMetrics::new(); output_metrics.register(registry); + let host_scanner = HostScannerMetrics::new(); + host_scanner.register(registry); + Metrics { bpf_worker, output: output_metrics, + host_scanner, } } } diff --git a/tests/test_config_hotreload.py b/tests/test_config_hotreload.py index 9a875228..1989f8f1 100644 --- a/tests/test_config_hotreload.py +++ b/tests/test_config_hotreload.py @@ -145,7 +145,7 @@ def test_paths(fact, fact_config, monitored_dir, ignored_dir, server): f.write('This is another test') e = Event(process=p, event_type=EventType.OPEN, - file=ignored_file, host_path='') + file=ignored_file, host_path=ignored_file) # File Under Test with open(fut, 'w') as f: @@ -185,8 +185,8 @@ def test_paths_addition(fact, fact_config, monitored_dir, ignored_dir, server): events = [ Event(process=p, event_type=EventType.OPEN, - file=ignored_file, host_path=''), - Event(process=p, event_type=EventType.OPEN, file=fut, host_path='') + file=ignored_file, host_path=ignored_file), + Event(process=p, event_type=EventType.OPEN, file=fut, host_path=fut) ] server.wait_events(events)