diff --git a/bitnet_tools/multi_csv.py b/bitnet_tools/multi_csv.py index b42887b..87e5381 100644 --- a/bitnet_tools/multi_csv.py +++ b/bitnet_tools/multi_csv.py @@ -4,7 +4,9 @@ import hashlib import json import math +import os import random +import threading from concurrent.futures import ThreadPoolExecutor from collections import Counter, defaultdict from datetime import datetime @@ -17,6 +19,8 @@ CACHE_DIR = Path('.bitnet_cache') UNIQUE_BITMAP_SIZE = 65536 TOP_VALUE_TRACK_CAP = 5000 +CACHE_ENTRY_TTL_SECONDS = 60 * 60 * 24 +CACHE_MAX_TOTAL_BYTES = 256 * 1024 * 1024 def _quantile(sorted_values: list[float], q: float) -> float: @@ -310,21 +314,148 @@ def _cache_key(path: Path, group_column: str | None, target_column: str | None) return hashlib.sha256(raw.encode('utf-8')).hexdigest() +def _cache_path(path: Path, group_column: str | None, target_column: str | None) -> Path: + return CACHE_DIR / f"{_cache_key(path, group_column, target_column)}.json" + + +def _cache_index_path() -> Path: + return CACHE_DIR / 'multi_csv_cache_index.json' + + +def _load_cache_index() -> dict[str, Any]: + index_path = _cache_index_path() + if not index_path.exists(): + return {'entries': {}} + try: + with index_path.open('r', encoding='utf-8') as f: + loaded = json.load(f) + entries = loaded.get('entries') + if isinstance(entries, dict): + return {'entries': entries} + except Exception: + pass + return {'entries': {}} + + +def _save_cache_index(index: dict[str, Any]) -> None: + index_path = _cache_index_path() + tmp_path = index_path.with_name(f"{index_path.name}.{os.getpid()}.{threading.get_ident()}.tmp") + tmp_path.write_text(json.dumps(index, ensure_ascii=False), encoding='utf-8') + tmp_path.replace(index_path) + + +def _record_cache_access(cache_path: Path, size_bytes: int, kind: str = 'profile') -> None: + now = datetime.utcnow().timestamp() + index = _load_cache_index() + entries = index.setdefault('entries', {}) + entries[str(cache_path)] = { + 'last_access': now, + 'size_bytes': int(size_bytes), + 'kind': kind, + } + _save_cache_index(index) + + +def _remove_cache_entry(cache_path: Path, index: dict[str, Any] | None = None) -> None: + should_save = index is None + if cache_path.exists(): + cache_path.unlink(missing_ok=True) + if index is None: + index = _load_cache_index() + entries = index.setdefault('entries', {}) + entries.pop(str(cache_path), None) + if should_save: + _save_cache_index(index) + + +def _cleanup_rebuildable_intermediates(index: dict[str, Any]) -> None: + candidates = [ + *CACHE_DIR.glob('*.tmp'), + *CACHE_DIR.glob('*.partial'), + *CACHE_DIR.glob('*.bak'), + ] + for p in candidates: + _remove_cache_entry(p, index=index) + + +def _enforce_cache_limits() -> None: + index = _load_cache_index() + entries = index.setdefault('entries', {}) + _cleanup_rebuildable_intermediates(index) + + now = datetime.utcnow().timestamp() + total_size = 0 + stale_paths: list[Path] = [] + existing_entries: list[tuple[Path, dict[str, Any]]] = [] + + for raw_path, meta in list(entries.items()): + p = Path(raw_path) + if not p.exists(): + entries.pop(raw_path, None) + continue + last_access = float(meta.get('last_access', 0.0)) + if CACHE_ENTRY_TTL_SECONDS > 0 and now - last_access > CACHE_ENTRY_TTL_SECONDS: + stale_paths.append(p) + continue + size_bytes = p.stat().st_size + meta['size_bytes'] = size_bytes + total_size += size_bytes + existing_entries.append((p, meta)) + + for p in stale_paths: + _remove_cache_entry(p, index=index) + + if CACHE_MAX_TOTAL_BYTES > 0 and total_size > CACHE_MAX_TOTAL_BYTES: + # LRU eviction: 오래 사용하지 않은 항목부터 제거 + existing_entries.sort(key=lambda item: float(item[1].get('last_access', 0.0))) + for p, meta in existing_entries: + if total_size <= CACHE_MAX_TOTAL_BYTES: + break + total_size -= int(meta.get('size_bytes', p.stat().st_size if p.exists() else 0)) + _remove_cache_entry(p, index=index) + + _save_cache_index(index) + + +def _write_json_maybe_stream(path: Path, data: dict[str, Any]) -> None: + encoder = json.JSONEncoder(ensure_ascii=False, separators=(',', ':')) + tmp_path = path.with_suffix('.tmp') + with tmp_path.open('w', encoding='utf-8') as f: + for piece in encoder.iterencode(data): + f.write(piece) + tmp_path.replace(path) + + def _load_cached_profile(path: Path, group_column: str | None, target_column: str | None) -> dict[str, Any] | None: CACHE_DIR.mkdir(exist_ok=True) - cp = CACHE_DIR / f"{_cache_key(path, group_column, target_column)}.json" + cp = _cache_path(path, group_column, target_column) if not cp.exists(): return None try: - return json.loads(cp.read_text(encoding='utf-8')) + stat = cp.stat() + if CACHE_ENTRY_TTL_SECONDS > 0: + age_seconds = datetime.utcnow().timestamp() - stat.st_mtime + if age_seconds > CACHE_ENTRY_TTL_SECONDS: + _remove_cache_entry(cp) + return None + with cp.open('r', encoding='utf-8') as f: + loaded = json.load(f) + _record_cache_access(cp, stat.st_size) + return loaded except Exception: return None def _save_cached_profile(path: Path, group_column: str | None, target_column: str | None, data: dict[str, Any]) -> None: CACHE_DIR.mkdir(exist_ok=True) - cp = CACHE_DIR / f"{_cache_key(path, group_column, target_column)}.json" - cp.write_text(json.dumps(data, ensure_ascii=False), encoding='utf-8') + cp = _cache_path(path, group_column, target_column) + # 대용량 캐시 직렬화 시 메모리 급증을 피하기 위해 스트리밍 쓰기 기본 사용 + if os.getenv('BITNET_CACHE_STREAM_WRITE', '1') != '0': + _write_json_maybe_stream(cp, data) + else: + cp.write_text(json.dumps(data, ensure_ascii=False), encoding='utf-8') + _record_cache_access(cp, cp.stat().st_size) + _enforce_cache_limits() def _generate_insights(files: list[dict[str, Any]], schema_drift: dict[str, Any]) -> list[str]: diff --git a/tests/test_analysis.py b/tests/test_analysis.py index d80072e..1eb07ec 100644 --- a/tests/test_analysis.py +++ b/tests/test_analysis.py @@ -149,6 +149,58 @@ def test_multi_csv_top_values_capped_marker(monkeypatch, tmp_path): assert any(x["value"] == "__OTHER__" for x in prof["top_values"]) +def test_multi_csv_cache_ttl_expires_entry(tmp_path, monkeypatch): + import bitnet_tools.multi_csv as multi + + monkeypatch.setattr(multi, "CACHE_DIR", tmp_path / ".cache") + monkeypatch.setattr(multi, "CACHE_ENTRY_TTL_SECONDS", 1) + + p = tmp_path / "ttl.csv" + p.write_text("a,b\n1,2\n", encoding="utf-8") + + first = multi.analyze_multiple_csv([p], "ttl") + assert first["file_count"] == 1 + cache_files = list((tmp_path / ".cache").glob("*.json")) + assert cache_files + + cache_file = next(x for x in cache_files if x.name != "multi_csv_cache_index.json") + stale = cache_file.stat().st_mtime - 3600 + import os + + os.utime(cache_file, (stale, stale)) + _ = multi._load_cached_profile(p, None, None) + + assert not cache_file.exists() + + +def test_multi_csv_cache_capacity_uses_lru_eviction(tmp_path, monkeypatch): + import bitnet_tools.multi_csv as multi + + monkeypatch.setattr(multi, "CACHE_DIR", tmp_path / ".cache") + monkeypatch.setattr(multi, "CACHE_MAX_TOTAL_BYTES", 220) + monkeypatch.setattr(multi, "CACHE_ENTRY_TTL_SECONDS", 60 * 60) + + p1 = tmp_path / "a.csv" + p2 = tmp_path / "b.csv" + p3 = tmp_path / "c.csv" + p1.write_text("a,b\n1,2\n", encoding="utf-8") + p2.write_text("a,b\n3,4\n", encoding="utf-8") + p3.write_text("a,b\n5,6\n", encoding="utf-8") + + multi._save_cached_profile(p1, None, None, {"v": "x" * 120}) + multi._save_cached_profile(p2, None, None, {"v": "y" * 120}) + multi._save_cached_profile(p3, None, None, {"v": "z" * 120}) + + cp1 = multi._cache_path(p1, None, None) + cp2 = multi._cache_path(p2, None, None) + cp3 = multi._cache_path(p3, None, None) + + # 용량 초과 시 가장 오래된 항목부터 제거되어 최신 항목만 유지 + assert not cp1.exists() + assert not cp2.exists() + assert cp3.exists() + + def test_multi_csv_with_parallel_workers(tmp_path): p1 = tmp_path / "a.csv" p2 = tmp_path / "b.csv"