Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 135 additions & 4 deletions bitnet_tools/multi_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import hashlib
import json
import math
import os
import random
import threading
from concurrent.futures import ThreadPoolExecutor
from collections import Counter, defaultdict
from datetime import datetime
Expand All @@ -17,6 +19,8 @@
CACHE_DIR = Path('.bitnet_cache')
UNIQUE_BITMAP_SIZE = 65536
TOP_VALUE_TRACK_CAP = 5000
CACHE_ENTRY_TTL_SECONDS = 60 * 60 * 24
CACHE_MAX_TOTAL_BYTES = 256 * 1024 * 1024


def _quantile(sorted_values: list[float], q: float) -> float:
Expand Down Expand Up @@ -310,21 +314,148 @@ def _cache_key(path: Path, group_column: str | None, target_column: str | None)
return hashlib.sha256(raw.encode('utf-8')).hexdigest()


def _cache_path(path: Path, group_column: str | None, target_column: str | None) -> Path:
return CACHE_DIR / f"{_cache_key(path, group_column, target_column)}.json"


def _cache_index_path() -> Path:
return CACHE_DIR / 'multi_csv_cache_index.json'


def _load_cache_index() -> dict[str, Any]:
index_path = _cache_index_path()
if not index_path.exists():
return {'entries': {}}
try:
with index_path.open('r', encoding='utf-8') as f:
loaded = json.load(f)
entries = loaded.get('entries')
if isinstance(entries, dict):
return {'entries': entries}
except Exception:
pass
return {'entries': {}}


def _save_cache_index(index: dict[str, Any]) -> None:
index_path = _cache_index_path()
tmp_path = index_path.with_name(f"{index_path.name}.{os.getpid()}.{threading.get_ident()}.tmp")
tmp_path.write_text(json.dumps(index, ensure_ascii=False), encoding='utf-8')
tmp_path.replace(index_path)


def _record_cache_access(cache_path: Path, size_bytes: int, kind: str = 'profile') -> None:
now = datetime.utcnow().timestamp()
index = _load_cache_index()
entries = index.setdefault('entries', {})
Comment on lines +349 to +350
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Serialize cache-index updates to prevent lost entries

_record_cache_access does a read-modify-write of the whole index without synchronization, so concurrent workers can each load an older snapshot and then overwrite each other’s updates in _save_cache_index. This drops valid cache entries from the index, which breaks LRU/TTL accounting and allows on-disk cache files to evade capacity eviction.

Useful? React with 👍 / 👎.

entries[str(cache_path)] = {
'last_access': now,
'size_bytes': int(size_bytes),
'kind': kind,
}
_save_cache_index(index)


def _remove_cache_entry(cache_path: Path, index: dict[str, Any] | None = None) -> None:
should_save = index is None
if cache_path.exists():
cache_path.unlink(missing_ok=True)
if index is None:
index = _load_cache_index()
entries = index.setdefault('entries', {})
entries.pop(str(cache_path), None)
if should_save:
_save_cache_index(index)


def _cleanup_rebuildable_intermediates(index: dict[str, Any]) -> None:
candidates = [
*CACHE_DIR.glob('*.tmp'),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid deleting active cache temp files during cleanup

_cleanup_rebuildable_intermediates removes every *.tmp file in CACHE_DIR, but _write_json_maybe_stream uses *.tmp as the live write target before replace(). When analyze_multiple_csv(..., max_workers>1) runs, one worker can enter _enforce_cache_limits() and delete another worker’s in-flight temp file, causing tmp_path.replace(path) to raise FileNotFoundError and fail the analysis run.

Useful? React with 👍 / 👎.

*CACHE_DIR.glob('*.partial'),
*CACHE_DIR.glob('*.bak'),
]
for p in candidates:
_remove_cache_entry(p, index=index)


def _enforce_cache_limits() -> None:
index = _load_cache_index()
entries = index.setdefault('entries', {})
_cleanup_rebuildable_intermediates(index)

now = datetime.utcnow().timestamp()
total_size = 0
stale_paths: list[Path] = []
existing_entries: list[tuple[Path, dict[str, Any]]] = []

for raw_path, meta in list(entries.items()):
p = Path(raw_path)
if not p.exists():
entries.pop(raw_path, None)
continue
last_access = float(meta.get('last_access', 0.0))
if CACHE_ENTRY_TTL_SECONDS > 0 and now - last_access > CACHE_ENTRY_TTL_SECONDS:
stale_paths.append(p)
continue
size_bytes = p.stat().st_size
meta['size_bytes'] = size_bytes
total_size += size_bytes
existing_entries.append((p, meta))

for p in stale_paths:
_remove_cache_entry(p, index=index)

if CACHE_MAX_TOTAL_BYTES > 0 and total_size > CACHE_MAX_TOTAL_BYTES:
# LRU eviction: 오래 사용하지 않은 항목부터 제거
existing_entries.sort(key=lambda item: float(item[1].get('last_access', 0.0)))
for p, meta in existing_entries:
if total_size <= CACHE_MAX_TOTAL_BYTES:
break
total_size -= int(meta.get('size_bytes', p.stat().st_size if p.exists() else 0))
_remove_cache_entry(p, index=index)

_save_cache_index(index)


def _write_json_maybe_stream(path: Path, data: dict[str, Any]) -> None:
encoder = json.JSONEncoder(ensure_ascii=False, separators=(',', ':'))
tmp_path = path.with_suffix('.tmp')
with tmp_path.open('w', encoding='utf-8') as f:
for piece in encoder.iterencode(data):
f.write(piece)
tmp_path.replace(path)


def _load_cached_profile(path: Path, group_column: str | None, target_column: str | None) -> dict[str, Any] | None:
CACHE_DIR.mkdir(exist_ok=True)
cp = CACHE_DIR / f"{_cache_key(path, group_column, target_column)}.json"
cp = _cache_path(path, group_column, target_column)
if not cp.exists():
return None
try:
return json.loads(cp.read_text(encoding='utf-8'))
stat = cp.stat()
if CACHE_ENTRY_TTL_SECONDS > 0:
age_seconds = datetime.utcnow().timestamp() - stat.st_mtime
if age_seconds > CACHE_ENTRY_TTL_SECONDS:
_remove_cache_entry(cp)
return None
with cp.open('r', encoding='utf-8') as f:
loaded = json.load(f)
_record_cache_access(cp, stat.st_size)
return loaded
except Exception:
return None


def _save_cached_profile(path: Path, group_column: str | None, target_column: str | None, data: dict[str, Any]) -> None:
CACHE_DIR.mkdir(exist_ok=True)
cp = CACHE_DIR / f"{_cache_key(path, group_column, target_column)}.json"
cp.write_text(json.dumps(data, ensure_ascii=False), encoding='utf-8')
cp = _cache_path(path, group_column, target_column)
# 대용량 캐시 직렬화 시 메모리 급증을 피하기 위해 스트리밍 쓰기 기본 사용
if os.getenv('BITNET_CACHE_STREAM_WRITE', '1') != '0':
_write_json_maybe_stream(cp, data)
else:
cp.write_text(json.dumps(data, ensure_ascii=False), encoding='utf-8')
_record_cache_access(cp, cp.stat().st_size)
_enforce_cache_limits()


def _generate_insights(files: list[dict[str, Any]], schema_drift: dict[str, Any]) -> list[str]:
Expand Down
52 changes: 52 additions & 0 deletions tests/test_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,58 @@ def test_multi_csv_top_values_capped_marker(monkeypatch, tmp_path):
assert any(x["value"] == "__OTHER__" for x in prof["top_values"])


def test_multi_csv_cache_ttl_expires_entry(tmp_path, monkeypatch):
import bitnet_tools.multi_csv as multi

monkeypatch.setattr(multi, "CACHE_DIR", tmp_path / ".cache")
monkeypatch.setattr(multi, "CACHE_ENTRY_TTL_SECONDS", 1)

p = tmp_path / "ttl.csv"
p.write_text("a,b\n1,2\n", encoding="utf-8")

first = multi.analyze_multiple_csv([p], "ttl")
assert first["file_count"] == 1
cache_files = list((tmp_path / ".cache").glob("*.json"))
assert cache_files

cache_file = next(x for x in cache_files if x.name != "multi_csv_cache_index.json")
stale = cache_file.stat().st_mtime - 3600
import os

os.utime(cache_file, (stale, stale))
_ = multi._load_cached_profile(p, None, None)

assert not cache_file.exists()


def test_multi_csv_cache_capacity_uses_lru_eviction(tmp_path, monkeypatch):
import bitnet_tools.multi_csv as multi

monkeypatch.setattr(multi, "CACHE_DIR", tmp_path / ".cache")
monkeypatch.setattr(multi, "CACHE_MAX_TOTAL_BYTES", 220)
monkeypatch.setattr(multi, "CACHE_ENTRY_TTL_SECONDS", 60 * 60)

p1 = tmp_path / "a.csv"
p2 = tmp_path / "b.csv"
p3 = tmp_path / "c.csv"
p1.write_text("a,b\n1,2\n", encoding="utf-8")
p2.write_text("a,b\n3,4\n", encoding="utf-8")
p3.write_text("a,b\n5,6\n", encoding="utf-8")

multi._save_cached_profile(p1, None, None, {"v": "x" * 120})
multi._save_cached_profile(p2, None, None, {"v": "y" * 120})
multi._save_cached_profile(p3, None, None, {"v": "z" * 120})

cp1 = multi._cache_path(p1, None, None)
cp2 = multi._cache_path(p2, None, None)
cp3 = multi._cache_path(p3, None, None)

# 용량 초과 시 가장 오래된 항목부터 제거되어 최신 항목만 유지
assert not cp1.exists()
assert not cp2.exists()
assert cp3.exists()


def test_multi_csv_with_parallel_workers(tmp_path):
p1 = tmp_path / "a.csv"
p2 = tmp_path / "b.csv"
Expand Down