diff --git a/app/service/data_svc.py b/app/service/data_svc.py index d5e9d8ccf..c01ef8fac 100644 --- a/app/service/data_svc.py +++ b/app/service/data_svc.py @@ -46,11 +46,15 @@ class DataService(DataServiceInterface, BaseService): + _DEFAULT_TTL_OPERATION_DAYS = 7 + def __init__(self): self.log = self.add_service('data_svc', self) self.schema = dict(agents=[], planners=[], adversaries=[], abilities=[], sources=[], operations=[], schedules=[], plugins=[], obfuscators=[], objectives=[], data_encoders=[]) self.ram = copy.deepcopy(self.schema) + self._ttl_config = {'operations': self._DEFAULT_TTL_OPERATION_DAYS * 86400} + self._eviction_task = None # Guard against spawning multiple eviction loops @staticmethod def _iter_data_files(): @@ -120,9 +124,35 @@ async def apply(self, collection): if collection not in self.ram: self.ram[collection] = [] + async def _evict_expired_objects(self): + """Background task to evict expired objects based on TTL config.""" + while True: + await asyncio.sleep(3600) # Run every hour + try: + ttl = self._ttl_config.get('operations') + if ttl and 'operations' in self.ram: + now = datetime.datetime.utcnow() + before = len(self.ram['operations']) + self.ram['operations'] = [ + op for op in self.ram['operations'] + if not (getattr(op, 'finish', None) and + hasattr(op, 'start') and op.start and + (now - op.start).total_seconds() > ttl) + ] + evicted = before - len(self.ram['operations']) + if evicted: + self.log.info('TTL eviction: removed %d expired operations', evicted) + except Exception: + # Log full traceback to aid production debugging; swallowing + # exceptions is intentional here so the background loop survives. + self.log.exception('TTL eviction error') + async def load_data(self, plugins=()): - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() loop.create_task(self._load(plugins)) + # Only start one eviction loop even if load_data() is called multiple times. + if self._eviction_task is None or self._eviction_task.done(): + self._eviction_task = loop.create_task(self._evict_expired_objects()) async def reload_data(self, plugins=()): await self._load(plugins) diff --git a/tests/security/test_ttl_eviction.py b/tests/security/test_ttl_eviction.py new file mode 100644 index 000000000..fc015ee8b --- /dev/null +++ b/tests/security/test_ttl_eviction.py @@ -0,0 +1,98 @@ +"""Tests for TTL-based eviction in DataService. + +Rather than duplicating the production filtering predicate, these tests +drive the actual _evict_expired_objects() coroutine so that any future +changes to the eviction logic are automatically exercised. +""" +import asyncio +import datetime +import unittest +from unittest.mock import MagicMock, patch + + +def _make_data_svc(): + """Return a DataService instance with a minimal stub environment.""" + from app.service.data_svc import DataService + svc = DataService.__new__(DataService) + svc.log = MagicMock() + svc._ttl_config = {'operations': 7 * 86400} # 7 days + svc.ram = {'operations': []} + svc._eviction_task = None + return svc + + +def _make_op(finish_marker, age_days): + """Build a mock operation object.""" + now = datetime.datetime.utcnow() + op = MagicMock() + op.finish = finish_marker + op.start = now - datetime.timedelta(days=age_days) + return op + + +class TestTTLEviction(unittest.TestCase): + def _run_one_eviction_cycle(self, svc): + """Run one pass of the eviction coroutine by patching asyncio.sleep.""" + async def run(): + # Patch sleep so the infinite loop runs only once then raises to exit. + call_count = 0 + + async def fake_sleep(_): + nonlocal call_count + call_count += 1 + if call_count >= 1: + raise asyncio.CancelledError + + with patch('asyncio.sleep', side_effect=fake_sleep): + try: + await svc._evict_expired_objects() + except asyncio.CancelledError: + pass + + asyncio.get_event_loop().run_until_complete(run()) + + def test_old_finished_operations_evicted(self): + """Finished operations older than TTL must be removed.""" + svc = _make_data_svc() + old_op = _make_op('done', age_days=10) + new_op = _make_op('done', age_days=1) + running_op = _make_op(None, age_days=30) # no finish marker – not expired + svc.ram['operations'] = [old_op, new_op, running_op] + + self._run_one_eviction_cycle(svc) + + self.assertNotIn(old_op, svc.ram['operations']) + self.assertIn(new_op, svc.ram['operations']) + self.assertIn(running_op, svc.ram['operations']) + + def test_running_operations_not_evicted(self): + """Operations without a finish marker must never be evicted regardless of age.""" + svc = _make_data_svc() + running_op = _make_op(None, age_days=100) + svc.ram['operations'] = [running_op] + + self._run_one_eviction_cycle(svc) + + self.assertIn(running_op, svc.ram['operations']) + + def test_no_eviction_when_ttl_not_set(self): + """When no TTL is configured for operations, no eviction should occur.""" + svc = _make_data_svc() + svc._ttl_config = {} + old_op = _make_op('done', age_days=365) + svc.ram['operations'] = [old_op] + + self._run_one_eviction_cycle(svc) + + self.assertIn(old_op, svc.ram['operations']) + + def test_eviction_exception_is_logged_not_propagated(self): + """An exception during eviction must be logged without crashing the loop.""" + svc = _make_data_svc() + # Make .ram raise on access to trigger error path. + svc.ram = MagicMock() + svc.ram.__contains__ = MagicMock(side_effect=RuntimeError('boom')) + + self._run_one_eviction_cycle(svc) + + svc.log.exception.assert_called_once()