diff --git a/fact/src/config/mod.rs b/fact/src/config/mod.rs index 42e016d1..4091477f 100644 --- a/fact/src/config/mod.rs +++ b/fact/src/config/mod.rs @@ -4,6 +4,7 @@ use std::{ path::{Path, PathBuf}, str::FromStr, sync::LazyLock, + time::Duration, }; use anyhow::{bail, Context}; @@ -31,6 +32,7 @@ pub struct FactConfig { json: Option, ringbuf_size: Option, hotreload: Option, + scan_interval: Option, } impl FactConfig { @@ -95,6 +97,10 @@ impl FactConfig { if let Some(hotreload) = from.hotreload { self.hotreload = Some(hotreload); } + + if let Some(scan_interval) = from.scan_interval { + self.scan_interval = Some(scan_interval); + } } pub fn paths(&self) -> &[PathBuf] { @@ -117,6 +123,10 @@ impl FactConfig { self.hotreload.unwrap_or(true) } + pub fn scan_interval(&self) -> Duration { + self.scan_interval.unwrap_or(Duration::from_secs(30)) + } + #[cfg(test)] pub fn set_paths(&mut self, paths: Vec) { self.paths = Some(paths); @@ -216,6 +226,21 @@ impl TryFrom> for FactConfig { }; config.hotreload = Some(hotreload); } + "scan_interval" => { + if let Some(scan_interval) = v.as_f64() { + if scan_interval <= 0.0 { + bail!("invalid scan_interval: {scan_interval}"); + } + config.scan_interval = Some(Duration::from_secs_f64(scan_interval)); + } else if let Some(scan_interval) = v.as_i64() { + if scan_interval <= 0 { + bail!("invalid scan_interval: {scan_interval}"); + } + config.scan_interval = Some(Duration::from_secs(scan_interval as u64)) + } else { + bail!("scan_interval field has incorrect type: {v:?}"); + } + } name => bail!("Invalid field '{name}' with value: {v:?}"), } } @@ -429,6 +454,15 @@ pub struct FactCli { hotreload: bool, #[arg(long, overrides_with = "hotreload", hide(true))] no_hotreload: bool, + + /// Interval at which scanning of monitored directories should + /// happen in seconds. + /// + /// The seconds can use a decimal point for fractions of seconds. + /// + /// Default value is 30 seconds + #[arg(long, short, env = "FACT_SCAN_INTERVAL")] + scan_interval: Option, } impl FactCli { @@ -448,6 +482,7 @@ impl FactCli { json: resolve_bool_arg(self.json, self.no_json), ringbuf_size: self.ringbuf_size, hotreload: resolve_bool_arg(self.hotreload, self.no_hotreload), + scan_interval: self.scan_interval.map(Duration::from_secs_f64), } } } diff --git a/fact/src/config/reloader.rs b/fact/src/config/reloader.rs index 0994a7c9..1f33c37a 100644 --- a/fact/src/config/reloader.rs +++ b/fact/src/config/reloader.rs @@ -17,6 +17,7 @@ pub struct Reloader { grpc: watch::Sender, paths: watch::Sender>, files: HashMap<&'static str, i64>, + scan_interval: watch::Sender, trigger: Arc, } @@ -75,6 +76,12 @@ impl Reloader { self.paths.subscribe() } + /// Subscribe to get notifications when scan_interval configuration + /// is changed. + pub fn scan_interval(&self) -> watch::Receiver { + self.scan_interval.subscribe() + } + /// Get a reference to the internal trigger for manual reloading of /// configuration. /// @@ -171,6 +178,17 @@ impl Reloader { } }); + self.scan_interval.send_if_modified(|old| { + let new = new.scan_interval(); + if *old != new { + debug!("Sending new scan interval configuration..."); + *old = new; + true + } else { + false + } + }); + if self.config.hotreload() != new.hotreload() { warn!("Changes to the hotreload field only take effect on startup"); } @@ -203,6 +221,7 @@ impl From for Reloader { let (endpoint, _) = watch::channel(config.endpoint.clone()); let (grpc, _) = watch::channel(config.grpc.clone()); let (paths, _) = watch::channel(config.paths().to_vec()); + let (scan_interval, _) = watch::channel(config.scan_interval()); let trigger = Arc::new(Notify::new()); Reloader { @@ -210,6 +229,7 @@ impl From for Reloader { endpoint, grpc, paths, + scan_interval, files, trigger, } diff --git a/fact/src/config/tests.rs b/fact/src/config/tests.rs index 46b2b973..1573a509 100644 --- a/fact/src/config/tests.rs +++ b/fact/src/config/tests.rs @@ -203,6 +203,20 @@ fn parsing() { ..Default::default() }, ), + ( + "scan_interval: 60", + FactConfig { + scan_interval: Some(Duration::from_secs(60)), + ..Default::default() + }, + ), + ( + "scan_interval: 30.5", + FactConfig { + scan_interval: Some(Duration::from_secs_f64(30.5)), + ..Default::default() + }, + ), ( r#" paths: @@ -218,6 +232,7 @@ fn parsing() { json: false ringbuf_size: 8192 hotreload: false + scan_interval: 60 "#, FactConfig { paths: Some(vec![PathBuf::from("/etc")]), @@ -234,6 +249,7 @@ fn parsing() { json: Some(false), ringbuf_size: Some(8192), hotreload: Some(false), + scan_interval: Some(Duration::from_secs(60)), }, ), ]; @@ -387,6 +403,14 @@ paths: "hotreload: 4", "hotreload field has incorrect type: Integer(4)", ), + ( + "scan_interval: true", + "scan_interval field has incorrect type: Boolean(true)", + ), + ("scan_interval: 0", "invalid scan_interval: 0"), + ("scan_interval: 0.0", "invalid scan_interval: 0"), + ("scan_interval: -128", "invalid scan_interval: -128"), + ("scan_interval: -128.5", "invalid scan_interval: -128.5"), ("unknown:", "Invalid field 'unknown' with value: Null"), ]; for (input, expected) in tests { @@ -726,6 +750,36 @@ fn update() { ..Default::default() }, ), + ( + "ringbuf_size: 16384", + FactConfig::default(), + FactConfig { + ringbuf_size: Some(16384), + ..Default::default() + }, + ), + ( + "ringbuf_size: 16384", + FactConfig { + ringbuf_size: Some(8192), + ..Default::default() + }, + FactConfig { + ringbuf_size: Some(16384), + ..Default::default() + }, + ), + ( + "ringbuf_size: 16384", + FactConfig { + ringbuf_size: Some(16384), + ..Default::default() + }, + FactConfig { + ringbuf_size: Some(16384), + ..Default::default() + }, + ), ( "hotreload: false", FactConfig::default(), @@ -756,6 +810,55 @@ fn update() { ..Default::default() }, ), + ( + "scan_interval: 60", + FactConfig::default(), + FactConfig { + scan_interval: Some(Duration::from_secs(60)), + ..Default::default() + }, + ), + ( + "scan_interval: 0.5", + FactConfig::default(), + FactConfig { + scan_interval: Some(Duration::from_secs_f64(0.5)), + ..Default::default() + }, + ), + ( + "scan_interval: 60", + FactConfig { + scan_interval: Some(Duration::from_secs(30)), + ..Default::default() + }, + FactConfig { + scan_interval: Some(Duration::from_secs(60)), + ..Default::default() + }, + ), + ( + "scan_interval: 25.5", + FactConfig { + scan_interval: Some(Duration::from_secs(30)), + ..Default::default() + }, + FactConfig { + scan_interval: Some(Duration::from_secs_f64(25.5)), + ..Default::default() + }, + ), + ( + "scan_interval: 60", + FactConfig { + scan_interval: Some(Duration::from_secs(60)), + ..Default::default() + }, + FactConfig { + scan_interval: Some(Duration::from_secs(60)), + ..Default::default() + }, + ), ( r#" paths: @@ -771,6 +874,7 @@ fn update() { json: false ringbuf_size: 16384 hotreload: false + scan_interval: 60 "#, FactConfig { paths: Some(vec![PathBuf::from("/etc"), PathBuf::from("/bin")]), @@ -787,6 +891,7 @@ fn update() { json: Some(true), ringbuf_size: Some(64), hotreload: Some(true), + scan_interval: Some(Duration::from_secs(30)), }, FactConfig { paths: Some(vec![PathBuf::from("/etc")]), @@ -803,6 +908,7 @@ fn update() { json: Some(false), ringbuf_size: Some(16384), hotreload: Some(false), + scan_interval: Some(Duration::from_secs(60)), }, ), ]; diff --git a/fact/src/host_scanner.rs b/fact/src/host_scanner.rs index 2adbb58d..73893967 100644 --- a/fact/src/host_scanner.rs +++ b/fact/src/host_scanner.rs @@ -23,6 +23,7 @@ use std::{ os::linux::fs::MetadataExt, path::{Path, PathBuf}, sync::Arc, + time::Duration, }; use anyhow::Context; @@ -30,29 +31,39 @@ use aya::maps::MapData; use fact_ebpf::{inode_key_t, inode_value_t}; use log::{debug, info, warn}; use tokio::{ - sync::{broadcast, mpsc, watch}, + sync::{broadcast, mpsc, watch, Notify}, task::JoinHandle, }; -use crate::{bpf::Bpf, event::Event, host_info}; +use crate::{ + bpf::Bpf, + event::Event, + host_info, + metrics::host_scanner::{HostScannerMetrics, ScanLabels}, +}; pub struct HostScanner { kernel_inode_map: RefCell>, inode_map: RefCell>, - config: watch::Receiver>, + paths: watch::Receiver>, + scan_interval: watch::Receiver, running: watch::Receiver, rx: mpsc::Receiver, tx: broadcast::Sender>, + + metrics: HostScannerMetrics, } impl HostScanner { pub fn new( bpf: &mut Bpf, rx: mpsc::Receiver, - config: watch::Receiver>, + paths: watch::Receiver>, + scan_interval: watch::Receiver, running: watch::Receiver, + metrics: HostScannerMetrics, ) -> anyhow::Result { let kernel_inode_map = RefCell::new(bpf.take_inode_map()?); let inode_map = RefCell::new(std::collections::HashMap::new()); @@ -61,10 +72,12 @@ impl HostScanner { let host_scanner = HostScanner { kernel_inode_map, inode_map, - config, + paths, + scan_interval, running, rx, tx, + metrics, }; // Run an initial scan to fill in the inode map @@ -75,7 +88,26 @@ impl HostScanner { fn scan(&self) -> anyhow::Result<()> { debug!("Host scan started"); - for path in self.config.borrow().iter() { + self.metrics.scan_inc(ScanLabels::Scans); + let config = self.paths.borrow(); + + // Cleanup any items that are either: + // * Not configured to be monitored anymore. + // * Are configured to be monitored but no longer are found in + // the file system. + self.inode_map.borrow_mut().retain(|inode, path| { + if config.iter().any(|prefix| path.starts_with(prefix)) + && host_info::prepend_host_mount(path).exists() + { + true + } else { + let _ = self.kernel_inode_map.borrow_mut().remove(inode); + self.metrics.scan_inc(ScanLabels::InodeRemoved); + false + } + }); + + for path in config.iter() { let path = host_info::prepend_host_mount(path); self.scan_inner(&path)?; } @@ -85,15 +117,21 @@ impl HostScanner { } fn scan_inner(&self, path: &Path) -> anyhow::Result<()> { + self.metrics.scan_inc(ScanLabels::ElementsScanned); + if path.is_dir() { + self.metrics.scan_inc(ScanLabels::DirectoryScanned); for entry in path.read_dir()?.flatten() { let entry = entry.path(); self.scan_inner(&entry) .with_context(|| format!("Failed to scan {}", entry.display()))?; } } else if path.is_file() { + self.metrics.scan_inc(ScanLabels::FileScanned); self.update_entry(path) .with_context(|| format!("Failed to update entry for {}", path.display()))?; + } else { + self.metrics.scan_inc(ScanLabels::FsItemIgnored); } Ok(()) } @@ -101,6 +139,7 @@ impl HostScanner { fn update_entry(&self, path: &Path) -> anyhow::Result<()> { if !path.exists() { // If path does not exist, we don't have anything to update + self.metrics.scan_inc(ScanLabels::FileRemoved); return Ok(()); } @@ -118,6 +157,8 @@ impl HostScanner { let entry = inode_map.entry(inode).or_default(); *entry = host_info::remove_host_mount(path); + self.metrics.scan_inc(ScanLabels::FileUpdated); + debug!("Added entry for {}: {inode:?}", path.display()); Ok(()) } @@ -132,7 +173,35 @@ impl HostScanner { self.inode_map.borrow().get(inode?).cloned() } + /// Periodically notify the host scanner main task that a scan needs + /// to happen. + /// + /// This is needed because `tokio::time::Interval::tick` will create + /// a new future every time it is called, if used in a + /// `tokio::select` with other events that trigger more often, the + /// tick will never happen. This way we have a separate task that + /// will reliably send a notification to the main one. + fn start_scan_notifier(&self, scan_trigger: Arc) { + let mut running = self.running.clone(); + let mut scan_interval = self.scan_interval.clone(); + tokio::spawn(async move { + while *running.borrow() { + let mut interval = tokio::time::interval(*scan_interval.borrow()); + loop { + tokio::select! { + _ = interval.tick() => scan_trigger.notify_one(), + _ = running.changed() => break, + _ = scan_interval.changed() => break, + } + } + } + }); + } + pub fn start(mut self) -> JoinHandle> { + let scan_trigger = Arc::new(Notify::new()); + self.start_scan_notifier(scan_trigger.clone()); + tokio::spawn(async move { info!("Starting host scanner..."); @@ -143,20 +212,26 @@ impl HostScanner { info!("No more events to process"); break; }; + self.metrics.events.added(); if let Some(host_path) = self.get_host_path(Some(event.get_inode())) { + self.metrics.scan_inc(ScanLabels::InodeHit); event.set_host_path(host_path); } if let Some(host_path) = self.get_host_path(event.get_old_inode()) { + self.metrics.scan_inc(ScanLabels::InodeHit); event.set_old_host_path(host_path); } let event = Arc::new(event); if let Err(e) = self.tx.send(event) { + self.metrics.events.dropped(); warn!("Failed to send event: {e}"); } }, + _ = scan_trigger.notified() => self.scan()?, + _ = self.paths.changed() => self.scan()?, _ = self.running.changed() => { if !*self.running.borrow() { break; diff --git a/fact/src/lib.rs b/fact/src/lib.rs index 880813bb..422df3b2 100644 --- a/fact/src/lib.rs +++ b/fact/src/lib.rs @@ -83,7 +83,14 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> { let mut bpf = Bpf::new(reloader.paths(), reloader.config().ringbuf_size(), tx)?; let exporter = Exporter::new(bpf.take_metrics()?); - let host_scanner = HostScanner::new(&mut bpf, rx, reloader.paths(), running.subscribe())?; + let host_scanner = HostScanner::new( + &mut bpf, + rx, + reloader.paths(), + reloader.scan_interval(), + running.subscribe(), + exporter.metrics.host_scanner.clone(), + )?; output::start( host_scanner.subscribe(), diff --git a/fact/src/metrics/host_scanner.rs b/fact/src/metrics/host_scanner.rs new file mode 100644 index 00000000..f5197cf5 --- /dev/null +++ b/fact/src/metrics/host_scanner.rs @@ -0,0 +1,73 @@ +use prometheus_client::{ + encoding::{EncodeLabelSet, EncodeLabelValue}, + metrics::{counter::Counter, family::Family}, + registry::Registry, +}; + +use crate::metrics::{EventCounter, LabelValues as EventLabels}; + +#[derive(Clone, Hash, Eq, Debug, PartialEq, EncodeLabelValue, Copy)] +pub enum ScanLabels { + Scans, + ElementsScanned, + InodeRemoved, + InodeHit, + DirectoryScanned, + FileScanned, + FileRemoved, + FileUpdated, + FsItemIgnored, +} + +#[derive(Clone, Hash, Eq, Debug, PartialEq, EncodeLabelSet)] +pub struct ScanEvents { + label: ScanLabels, +} + +#[derive(Debug, Clone)] +/// Metrics for the HostScanner component +pub struct HostScannerMetrics { + pub events: EventCounter, + pub scan: Family>, +} + +impl HostScannerMetrics { + pub(super) fn new() -> Self { + let labels = [EventLabels::Total, EventLabels::Added, EventLabels::Dropped]; + let events = EventCounter::new( + "host_scanner_events", + "Events processed by the host scanner component", + &labels, + ); + + let scan: Family> = Default::default(); + for label in [ + ScanLabels::Scans, + ScanLabels::ElementsScanned, + ScanLabels::InodeRemoved, + ScanLabels::InodeHit, + ScanLabels::DirectoryScanned, + ScanLabels::FileScanned, + ScanLabels::FileRemoved, + ScanLabels::FileUpdated, + ScanLabels::FsItemIgnored, + ] { + let _ = scan.get_or_create(&ScanEvents { label }); + } + + HostScannerMetrics { events, scan } + } + + pub(super) fn register(&self, reg: &mut Registry) { + self.events.register(reg); + reg.register( + "host_scanner_scan", + "Counter of events by scans from the host scanner component", + self.scan.clone(), + ); + } + + pub fn scan_inc(&self, label: ScanLabels) { + self.scan.get_or_create(&ScanEvents { label }).inc(); + } +} diff --git a/fact/src/metrics/mod.rs b/fact/src/metrics/mod.rs index 38e579b8..97213c83 100644 --- a/fact/src/metrics/mod.rs +++ b/fact/src/metrics/mod.rs @@ -4,7 +4,10 @@ use prometheus_client::{ registry::Registry, }; +use host_scanner::HostScannerMetrics; + pub mod exporter; +pub mod host_scanner; mod kernel_metrics; #[derive(Clone, Hash, Eq, Debug, PartialEq, EncodeLabelValue, Copy)] @@ -135,6 +138,7 @@ impl OutputMetrics { pub struct Metrics { pub bpf_worker: EventCounter, pub output: OutputMetrics, + pub host_scanner: HostScannerMetrics, } impl Metrics { @@ -153,9 +157,13 @@ impl Metrics { let output_metrics = OutputMetrics::new(); output_metrics.register(registry); + let host_scanner = HostScannerMetrics::new(); + host_scanner.register(registry); + Metrics { bpf_worker, output: output_metrics, + host_scanner, } } } diff --git a/tests/test_config_hotreload.py b/tests/test_config_hotreload.py index 9a875228..1989f8f1 100644 --- a/tests/test_config_hotreload.py +++ b/tests/test_config_hotreload.py @@ -145,7 +145,7 @@ def test_paths(fact, fact_config, monitored_dir, ignored_dir, server): f.write('This is another test') e = Event(process=p, event_type=EventType.OPEN, - file=ignored_file, host_path='') + file=ignored_file, host_path=ignored_file) # File Under Test with open(fut, 'w') as f: @@ -185,8 +185,8 @@ def test_paths_addition(fact, fact_config, monitored_dir, ignored_dir, server): events = [ Event(process=p, event_type=EventType.OPEN, - file=ignored_file, host_path=''), - Event(process=p, event_type=EventType.OPEN, file=fut, host_path='') + file=ignored_file, host_path=ignored_file), + Event(process=p, event_type=EventType.OPEN, file=fut, host_path=fut) ] server.wait_events(events)