-
Notifications
You must be signed in to change notification settings - Fork 0
Add TTL/LRU/cap-managed cache behavior for multi CSV profiling #53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,7 +4,9 @@ | |
| import hashlib | ||
| import json | ||
| import math | ||
| import os | ||
| import random | ||
| import threading | ||
| from concurrent.futures import ThreadPoolExecutor | ||
| from collections import Counter, defaultdict | ||
| from datetime import datetime | ||
|
|
@@ -17,6 +19,8 @@ | |
| CACHE_DIR = Path('.bitnet_cache') | ||
| UNIQUE_BITMAP_SIZE = 65536 | ||
| TOP_VALUE_TRACK_CAP = 5000 | ||
| CACHE_ENTRY_TTL_SECONDS = 60 * 60 * 24 | ||
| CACHE_MAX_TOTAL_BYTES = 256 * 1024 * 1024 | ||
|
|
||
|
|
||
| def _quantile(sorted_values: list[float], q: float) -> float: | ||
|
|
@@ -310,21 +314,148 @@ def _cache_key(path: Path, group_column: str | None, target_column: str | None) | |
| return hashlib.sha256(raw.encode('utf-8')).hexdigest() | ||
|
|
||
|
|
||
| def _cache_path(path: Path, group_column: str | None, target_column: str | None) -> Path: | ||
| return CACHE_DIR / f"{_cache_key(path, group_column, target_column)}.json" | ||
|
|
||
|
|
||
| def _cache_index_path() -> Path: | ||
| return CACHE_DIR / 'multi_csv_cache_index.json' | ||
|
|
||
|
|
||
| def _load_cache_index() -> dict[str, Any]: | ||
| index_path = _cache_index_path() | ||
| if not index_path.exists(): | ||
| return {'entries': {}} | ||
| try: | ||
| with index_path.open('r', encoding='utf-8') as f: | ||
| loaded = json.load(f) | ||
| entries = loaded.get('entries') | ||
| if isinstance(entries, dict): | ||
| return {'entries': entries} | ||
| except Exception: | ||
| pass | ||
| return {'entries': {}} | ||
|
|
||
|
|
||
| def _save_cache_index(index: dict[str, Any]) -> None: | ||
| index_path = _cache_index_path() | ||
| tmp_path = index_path.with_name(f"{index_path.name}.{os.getpid()}.{threading.get_ident()}.tmp") | ||
| tmp_path.write_text(json.dumps(index, ensure_ascii=False), encoding='utf-8') | ||
| tmp_path.replace(index_path) | ||
|
|
||
|
|
||
| def _record_cache_access(cache_path: Path, size_bytes: int, kind: str = 'profile') -> None: | ||
| now = datetime.utcnow().timestamp() | ||
| index = _load_cache_index() | ||
| entries = index.setdefault('entries', {}) | ||
| entries[str(cache_path)] = { | ||
| 'last_access': now, | ||
| 'size_bytes': int(size_bytes), | ||
| 'kind': kind, | ||
| } | ||
| _save_cache_index(index) | ||
|
|
||
|
|
||
| def _remove_cache_entry(cache_path: Path, index: dict[str, Any] | None = None) -> None: | ||
| should_save = index is None | ||
| if cache_path.exists(): | ||
| cache_path.unlink(missing_ok=True) | ||
| if index is None: | ||
| index = _load_cache_index() | ||
| entries = index.setdefault('entries', {}) | ||
| entries.pop(str(cache_path), None) | ||
| if should_save: | ||
| _save_cache_index(index) | ||
|
|
||
|
|
||
| def _cleanup_rebuildable_intermediates(index: dict[str, Any]) -> None: | ||
| candidates = [ | ||
| *CACHE_DIR.glob('*.tmp'), | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎. |
||
| *CACHE_DIR.glob('*.partial'), | ||
| *CACHE_DIR.glob('*.bak'), | ||
| ] | ||
| for p in candidates: | ||
| _remove_cache_entry(p, index=index) | ||
|
|
||
|
|
||
| def _enforce_cache_limits() -> None: | ||
| index = _load_cache_index() | ||
| entries = index.setdefault('entries', {}) | ||
| _cleanup_rebuildable_intermediates(index) | ||
|
|
||
| now = datetime.utcnow().timestamp() | ||
| total_size = 0 | ||
| stale_paths: list[Path] = [] | ||
| existing_entries: list[tuple[Path, dict[str, Any]]] = [] | ||
|
|
||
| for raw_path, meta in list(entries.items()): | ||
| p = Path(raw_path) | ||
| if not p.exists(): | ||
| entries.pop(raw_path, None) | ||
| continue | ||
| last_access = float(meta.get('last_access', 0.0)) | ||
| if CACHE_ENTRY_TTL_SECONDS > 0 and now - last_access > CACHE_ENTRY_TTL_SECONDS: | ||
| stale_paths.append(p) | ||
| continue | ||
| size_bytes = p.stat().st_size | ||
| meta['size_bytes'] = size_bytes | ||
| total_size += size_bytes | ||
| existing_entries.append((p, meta)) | ||
|
|
||
| for p in stale_paths: | ||
| _remove_cache_entry(p, index=index) | ||
|
|
||
| if CACHE_MAX_TOTAL_BYTES > 0 and total_size > CACHE_MAX_TOTAL_BYTES: | ||
| # LRU eviction: 오래 사용하지 않은 항목부터 제거 | ||
| existing_entries.sort(key=lambda item: float(item[1].get('last_access', 0.0))) | ||
| for p, meta in existing_entries: | ||
| if total_size <= CACHE_MAX_TOTAL_BYTES: | ||
| break | ||
| total_size -= int(meta.get('size_bytes', p.stat().st_size if p.exists() else 0)) | ||
| _remove_cache_entry(p, index=index) | ||
|
|
||
| _save_cache_index(index) | ||
|
|
||
|
|
||
| def _write_json_maybe_stream(path: Path, data: dict[str, Any]) -> None: | ||
| encoder = json.JSONEncoder(ensure_ascii=False, separators=(',', ':')) | ||
| tmp_path = path.with_suffix('.tmp') | ||
| with tmp_path.open('w', encoding='utf-8') as f: | ||
| for piece in encoder.iterencode(data): | ||
| f.write(piece) | ||
| tmp_path.replace(path) | ||
|
|
||
|
|
||
| def _load_cached_profile(path: Path, group_column: str | None, target_column: str | None) -> dict[str, Any] | None: | ||
| CACHE_DIR.mkdir(exist_ok=True) | ||
| cp = CACHE_DIR / f"{_cache_key(path, group_column, target_column)}.json" | ||
| cp = _cache_path(path, group_column, target_column) | ||
| if not cp.exists(): | ||
| return None | ||
| try: | ||
| return json.loads(cp.read_text(encoding='utf-8')) | ||
| stat = cp.stat() | ||
| if CACHE_ENTRY_TTL_SECONDS > 0: | ||
| age_seconds = datetime.utcnow().timestamp() - stat.st_mtime | ||
| if age_seconds > CACHE_ENTRY_TTL_SECONDS: | ||
| _remove_cache_entry(cp) | ||
| return None | ||
| with cp.open('r', encoding='utf-8') as f: | ||
| loaded = json.load(f) | ||
| _record_cache_access(cp, stat.st_size) | ||
| return loaded | ||
| except Exception: | ||
| return None | ||
|
|
||
|
|
||
| def _save_cached_profile(path: Path, group_column: str | None, target_column: str | None, data: dict[str, Any]) -> None: | ||
| CACHE_DIR.mkdir(exist_ok=True) | ||
| cp = CACHE_DIR / f"{_cache_key(path, group_column, target_column)}.json" | ||
| cp.write_text(json.dumps(data, ensure_ascii=False), encoding='utf-8') | ||
| cp = _cache_path(path, group_column, target_column) | ||
| # 대용량 캐시 직렬화 시 메모리 급증을 피하기 위해 스트리밍 쓰기 기본 사용 | ||
| if os.getenv('BITNET_CACHE_STREAM_WRITE', '1') != '0': | ||
| _write_json_maybe_stream(cp, data) | ||
| else: | ||
| cp.write_text(json.dumps(data, ensure_ascii=False), encoding='utf-8') | ||
| _record_cache_access(cp, cp.stat().st_size) | ||
| _enforce_cache_limits() | ||
|
|
||
|
|
||
| def _generate_insights(files: list[dict[str, Any]], schema_drift: dict[str, Any]) -> list[str]: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_record_cache_accessdoes a read-modify-write of the whole index without synchronization, so concurrent workers can each load an older snapshot and then overwrite each other’s updates in_save_cache_index. This drops valid cache entries from the index, which breaks LRU/TTL accounting and allows on-disk cache files to evade capacity eviction.Useful? React with 👍 / 👎.