Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 38 additions & 12 deletions tfbpapi/virtual_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from __future__ import annotations

import logging
import re
from functools import lru_cache
from pathlib import Path
from typing import Any
Expand Down Expand Up @@ -580,7 +581,6 @@ def _register_all_views(self) -> None:
if not self._is_comparative(repo_id, config_name):
self._enrich_raw_view(db_name)

# 5. Comparative expanded views (pre-parsed composite IDs)
# 5. Comparative expanded views (pre-parsed composite IDs)
for db_name, (repo_id, config_name) in self._db_name_map.items():
repo_cfg = self.config.repositories.get(repo_id)
Expand Down Expand Up @@ -833,18 +833,26 @@ def qualify(col: str) -> str:
if prop_result is not None:
_derived_exprs, _prop_raw_cols = prop_result
for expr in _derived_exprs:
# Detect CAST(<field> AS _enum_<key>) AS <key> patterns
# where <field> == <key> (in-place factor override)
if not expr.startswith("CAST("):
continue
# Detect factor CAST expressions of the form:
# CAST(CAST(<field> AS VARCHAR) AS _enum_<key>) AS <key>
# where <field> == <key> (in-place factor override).
# The output column name is the last " AS <name>" token.
parts = expr.rsplit(" AS ", 1)
if len(parts) != 2:
continue
out_col = parts[1].strip()
# Check whether the source field has the same name as
# the output column (in-place override case)
cast_inner = parts[0][len("CAST(") :]
src_field = cast_inner.split(" AS ")[0].strip()
# Extract the innermost source field from the CAST chain.
# Handles both:
# CAST(CAST(<field> AS VARCHAR) AS _enum_<key>)
# CAST(CAST(CAST(<field> AS BIGINT) AS VARCHAR) AS _enum_<key>)
m = re.match(
r"CAST\(CAST\((?:CAST\()?(\w+)(?:\s+AS\s+BIGINT\))?"
r"\s+AS\s+VARCHAR\)\s+AS\s+_enum_\w+\)",
parts[0],
)
if m is None:
continue
src_field = m.group(1)
if src_field == out_col and out_col in all_parquet_cols:
# Find a unique _orig name
candidate = f"{out_col}_orig"
Expand Down Expand Up @@ -894,8 +902,16 @@ def add_col(col: str) -> None:
# Add derived property expressions from the VirtualDB config
if prop_result is not None:
derived_exprs, prop_raw_cols = prop_result
# Ensure source columns needed by expressions are selected
for col in prop_raw_cols:
# Ensure source columns needed by expressions are selected.
# For external metadata datasets, restrict to columns physically
# present in the metadata parquet -- data columns must not bleed
# into the meta view.
allowed_raw_cols = (
[c for c in prop_raw_cols if c in actual_meta_cols]
if is_external
else prop_raw_cols
)
for col in allowed_raw_cols:
add_col(col)
# Rewrite CAST expressions to use the _orig alias when the
# source field was renamed to avoid collision.
Expand Down Expand Up @@ -1230,7 +1246,17 @@ def _resolve_property_columns(
card, config_name, mapping.field
)
self._ensure_enum_type(enum_type, levels)
expressions.append(f"CAST({mapping.field} AS {enum_type}) AS {key}")
# If all levels are integer-valued strings (e.g. '0',
# '90'), the parquet column may be DOUBLE (e.g. 90.0).
# Cast through BIGINT first to strip the decimal before
# converting to VARCHAR so '90.0' becomes '90'.
all_int = all(re.fullmatch(r"-?\d+", lv) for lv in levels)
inner = (
f"CAST({mapping.field} AS BIGINT)" if all_int else mapping.field
)
expressions.append(
f"CAST(CAST({inner} AS VARCHAR)" f" AS {enum_type}) AS {key}"
)
elif key == mapping.field:
# no-op -- column already present as raw col
pass
Expand Down
Loading