Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion app/service/data_svc.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@

class DataService(DataServiceInterface, BaseService):

_DEFAULT_TTL_OPERATION_DAYS = 7

def __init__(self):
self.log = self.add_service('data_svc', self)
self.schema = dict(agents=[], planners=[], adversaries=[], abilities=[], sources=[], operations=[],
schedules=[], plugins=[], obfuscators=[], objectives=[], data_encoders=[])
self.ram = copy.deepcopy(self.schema)
self._ttl_config = {'operations': self._DEFAULT_TTL_OPERATION_DAYS * 86400}
self._eviction_task = None # Guard against spawning multiple eviction loops

@staticmethod
def _iter_data_files():
Expand Down Expand Up @@ -120,9 +124,35 @@ async def apply(self, collection):
if collection not in self.ram:
self.ram[collection] = []

async def _evict_expired_objects(self):
"""Background task to evict expired objects based on TTL config."""
while True:
await asyncio.sleep(3600) # Run every hour
try:
ttl = self._ttl_config.get('operations')
if ttl and 'operations' in self.ram:
now = datetime.datetime.utcnow()
before = len(self.ram['operations'])
self.ram['operations'] = [
op for op in self.ram['operations']
if not (getattr(op, 'finish', None) and
hasattr(op, 'start') and op.start and
(now - op.start).total_seconds() > ttl)
]
evicted = before - len(self.ram['operations'])
if evicted:
self.log.info('TTL eviction: removed %d expired operations', evicted)
except Exception:
# Log full traceback to aid production debugging; swallowing
# exceptions is intentional here so the background loop survives.
self.log.exception('TTL eviction error')

async def load_data(self, plugins=()):
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
loop.create_task(self._load(plugins))
# Only start one eviction loop even if load_data() is called multiple times.
if self._eviction_task is None or self._eviction_task.done():
self._eviction_task = loop.create_task(self._evict_expired_objects())

async def reload_data(self, plugins=()):
await self._load(plugins)
Expand Down
98 changes: 98 additions & 0 deletions tests/security/test_ttl_eviction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Tests for TTL-based eviction in DataService.

Rather than duplicating the production filtering predicate, these tests
drive the actual _evict_expired_objects() coroutine so that any future
changes to the eviction logic are automatically exercised.
"""
import asyncio
import datetime
import unittest
from unittest.mock import MagicMock, patch


def _make_data_svc():
"""Return a DataService instance with a minimal stub environment."""
from app.service.data_svc import DataService
svc = DataService.__new__(DataService)
svc.log = MagicMock()
svc._ttl_config = {'operations': 7 * 86400} # 7 days
svc.ram = {'operations': []}
svc._eviction_task = None
return svc


def _make_op(finish_marker, age_days):
"""Build a mock operation object."""
now = datetime.datetime.utcnow()
op = MagicMock()
op.finish = finish_marker
op.start = now - datetime.timedelta(days=age_days)
return op


class TestTTLEviction(unittest.TestCase):
def _run_one_eviction_cycle(self, svc):
"""Run one pass of the eviction coroutine by patching asyncio.sleep."""
async def run():
# Patch sleep so the infinite loop runs only once then raises to exit.
call_count = 0

async def fake_sleep(_):
nonlocal call_count
call_count += 1
if call_count >= 1:
raise asyncio.CancelledError

with patch('asyncio.sleep', side_effect=fake_sleep):
try:
await svc._evict_expired_objects()
except asyncio.CancelledError:
pass

asyncio.get_event_loop().run_until_complete(run())

def test_old_finished_operations_evicted(self):
"""Finished operations older than TTL must be removed."""
svc = _make_data_svc()
old_op = _make_op('done', age_days=10)
new_op = _make_op('done', age_days=1)
running_op = _make_op(None, age_days=30) # no finish marker – not expired
svc.ram['operations'] = [old_op, new_op, running_op]

self._run_one_eviction_cycle(svc)

self.assertNotIn(old_op, svc.ram['operations'])
self.assertIn(new_op, svc.ram['operations'])
self.assertIn(running_op, svc.ram['operations'])

def test_running_operations_not_evicted(self):
"""Operations without a finish marker must never be evicted regardless of age."""
svc = _make_data_svc()
running_op = _make_op(None, age_days=100)
svc.ram['operations'] = [running_op]

self._run_one_eviction_cycle(svc)

self.assertIn(running_op, svc.ram['operations'])

def test_no_eviction_when_ttl_not_set(self):
"""When no TTL is configured for operations, no eviction should occur."""
svc = _make_data_svc()
svc._ttl_config = {}
old_op = _make_op('done', age_days=365)
svc.ram['operations'] = [old_op]

self._run_one_eviction_cycle(svc)

self.assertIn(old_op, svc.ram['operations'])

def test_eviction_exception_is_logged_not_propagated(self):
"""An exception during eviction must be logged without crashing the loop."""
svc = _make_data_svc()
# Make .ram raise on access to trigger error path.
svc.ram = MagicMock()
svc.ram.__contains__ = MagicMock(side_effect=RuntimeError('boom'))

self._run_one_eviction_cycle(svc)

svc.log.exception.assert_called_once()
Loading