From d77e3f68e4a048db42dab0cbc9c83ad93ea83604 Mon Sep 17 00:00:00 2001 From: Paul Kalhorn Date: Tue, 10 Feb 2026 22:56:34 -0500 Subject: [PATCH 1/5] feat: overwrite input with preprocessed strings --- main.py | 61 ++++++++++++++++++---- preprocessing/loader.py | 111 ++++++++++++++++++++++++++++++---------- test/test_full.py | 39 ++++++++++---- test/test_loaders.py | 12 +++-- 4 files changed, 170 insertions(+), 53 deletions(-) diff --git a/main.py b/main.py index 3223be6..6f9c8e4 100644 --- a/main.py +++ b/main.py @@ -2,6 +2,7 @@ import pandas as pd from sqlalchemy import create_engine +from pathlib import Path from typing import List from scystream.sdk.core import entrypoint from scystream.sdk.env.settings import ( @@ -28,6 +29,11 @@ class NormalizedDocsOutput(PostgresSettings, OutputSettings): __identifier__ = "normalized_docs" +class NormalizedTXTOutput(FileSettings, OutputSettings): + __identifier__ = "normalized_overwritten_file_output" + FILE_EXT: str = "txt" + + class TXTFileInput(FileSettings, InputSettings): __identifier__ = "txt_file" FILE_EXT: str = "txt" @@ -40,6 +46,11 @@ class BIBFileInput(FileSettings, InputSettings): SELECTED_ATTRIBUTE: str = "Abstract" +class NormalizedBIBOutput(FileSettings, OutputSettings): + __identifier__ = "normalized_overwritten_file_output" + FILE_EXT: str = "bib" + + class PreprocessTXT(EnvSettings): LANGUAGE: str = "en" FILTER_STOPWORDS: bool = True @@ -52,6 +63,7 @@ class PreprocessTXT(EnvSettings): txt_input: TXTFileInput normalized_docs_output: NormalizedDocsOutput + normalized_overwritten_file_output: NormalizedTXTOutput class PreprocessBIB(EnvSettings): @@ -66,6 +78,7 @@ class PreprocessBIB(EnvSettings): bib_input: BIBFileInput normalized_docs_output: NormalizedDocsOutput + normalized_overwritten_file_output: NormalizedBIBOutput def _write_preprocessed_docs_to_postgres( @@ -93,8 +106,12 @@ def _write_preprocessed_docs_to_postgres( settings.DB_TABLE}'.") -def _preprocess_and_store(documents: List[DocumentRecord], settings): - """Shared preprocessing logic for TXT and BIB.""" +def _preprocess_and_store( + documents: List[DocumentRecord], + overwrite_callback, + settings, +) -> List[PreprocessedDocument]: + logger.info(f"Starting preprocessing with {len(documents)} documents") pre = Preprocessor( @@ -110,28 +127,50 @@ def _preprocess_and_store(documents: List[DocumentRecord], settings): result = pre.generate_normalized_output() _write_preprocessed_docs_to_postgres( - result, settings.normalized_docs_output) + result, + settings.normalized_docs_output + ) + + # Overwrite file using injected behavior + export_path = Path(f"output.{ + settings.normalized_overwritten_file_output.FILE_EXT}") + overwrite_callback(result, export_path) + + S3Operations.upload( + settings.normalized_overwritten_file_output, + export_path + ) logger.info("Preprocessing completed successfully.") + return result @entrypoint(PreprocessTXT) def preprocess_txt_file(settings): - logger.info("Downloading TXT input from S3...") + logger.info("Downloading TXT file...") S3Operations.download(settings.txt_input, settings.TXT_DOWNLOAD_PATH) - texts = TxtLoader.load(settings.TXT_DOWNLOAD_PATH) + documents = TxtLoader.load(settings.TXT_DOWNLOAD_PATH) - _preprocess_and_store(texts, settings) + _preprocess_and_store( + documents=documents, + overwrite_callback=TxtLoader.overwrite_with_results, + settings=settings + ) @entrypoint(PreprocessBIB) def preprocess_bib_file(settings): - logger.info("Downloading BIB input from S3...") + logger.info("Downloading BIB file...") S3Operations.download(settings.bib_input, settings.BIB_DOWNLOAD_PATH) - texts = BibLoader.load( - settings.BIB_DOWNLOAD_PATH, - attribute=settings.bib_input.SELECTED_ATTRIBUTE, + loader = BibLoader( + file_path=settings.BIB_DOWNLOAD_PATH, + attribute=settings.bib_input.SELECTED_ATTRIBUTE + ) + + _preprocess_and_store( + documents=loader.document_records, + overwrite_callback=loader.overwrite_with_results, + settings=settings ) - _preprocess_and_store(texts, settings) diff --git a/preprocessing/loader.py b/preprocessing/loader.py index ecb5193..4c06a39 100644 --- a/preprocessing/loader.py +++ b/preprocessing/loader.py @@ -1,8 +1,10 @@ import logging import re import bibtexparser +from typing import List +from pathlib import Path -from preprocessing.models import DocumentRecord +from preprocessing.models import DocumentRecord, PreprocessedDocument logger = logging.getLogger(__name__) @@ -12,15 +14,10 @@ def normalize_text(text: str) -> str: return "" text = re.sub(r"\\[a-zA-Z]+\{([^}]*)\}", r"\1", text) - text = re.sub(r"\\[a-zA-Z]+", "", text) - text = re.sub(r"[{}]", "", text) - text = re.sub(r'\\"([a-zA-Z])', r'\1', text) - text = re.sub(r"\\'", "", text) - text = re.sub(r"\s+", " ", text) return text.strip() @@ -40,34 +37,92 @@ def load(file_path: str) -> list[DocumentRecord]: for i, line in enumerate(lines, start=1) ] + @staticmethod + def overwrite_with_results( + preprocessed_docs: List[PreprocessedDocument], + export_path: Path, + ) -> None: + logger.info("Writing preprocessed TXT file...") + + output_path = Path.cwd() / export_path.name + + # Ensure correct order (IDs are numeric strings) + sorted_docs = sorted( + preprocessed_docs, + key=lambda d: int(d.doc_id) + ) + + with open(output_path, "w", encoding="utf-8") as f: + for doc in sorted_docs: + line = " ".join(doc.tokens) + f.write(line + "\n") + + logger.info(f"TXT file successfully written to: {output_path}") + class BibLoader: - @staticmethod - def load(file_path: str, attribute: str) -> list[DocumentRecord]: + def __init__(self, file_path: str, attribute: str): logger.info(f"Loading BIB file (attribute={attribute})...") with open(file_path, "r", encoding="utf-8") as f: - bib_database = bibtexparser.load(f) - - results = [] - attribute_lower = attribute.lower() - - for entry in bib_database.entries: - bib_id = ( - entry.get("id") - or entry.get("ID") - or entry.get("citekey") - or entry.get("entrykey") - or entry.get("Unique-ID") - or "UNKNOWN_ID" - ) + self.bib_db = bibtexparser.load(f) - raw_value = entry.get(attribute_lower, "") + self.file_path = file_path + self.attribute = attribute.lower() + + self.document_records = self._build_document_records() + + @staticmethod + def _extract_bib_id(entry: dict) -> str: + return ( + entry.get("id") + or entry.get("ID") + or entry.get("citekey") + or entry.get("entrykey") + or entry.get("Unique-ID") + or "UNKNOWN_ID" + ) + + def _build_document_records(self) -> List[DocumentRecord]: + records = [] + + for entry in self.bib_db.entries: + bib_id = self._extract_bib_id(entry) + raw_value = entry.get(self.attribute, "") normalized = normalize_text(raw_value) - results.append(DocumentRecord( - doc_id=bib_id, - text=normalized - )) + records.append( + DocumentRecord( + doc_id=bib_id, + text=normalized + ) + ) + + return records + + def overwrite_with_results( + self, + preprocessed_docs: List[PreprocessedDocument], + export_path: Path + ) -> None: + logger.info("Overwriting input documents with preprocessed text...") + + output_path = Path.cwd() / export_path.name + + preprocessed_dict = { + doc.doc_id: doc for doc in preprocessed_docs + } + + for entry in self.bib_db.entries: + bib_id = self._extract_bib_id(entry) + preprocessed = preprocessed_dict.get(bib_id) + + if not preprocessed: + continue + + entry[self.attribute] = " ".join(preprocessed.tokens) + + with open(output_path, "w", encoding="utf-8") as f: + bibtexparser.dump(self.bib_db, f) - return results + logger.info(f"BIB file successfully written to: {output_path}") diff --git a/test/test_full.py b/test/test_full.py index 6a04e61..7bee867 100644 --- a/test/test_full.py +++ b/test/test_full.py @@ -15,6 +15,9 @@ PG_USER = "postgres" PG_PASS = "postgres" +INPUT_FILE_NAME = "input" +OUTPUT_FILE_NAME = "output" + def parse_pg_array(arr: str) -> list[str]: # Convert Postgres literal → Python list @@ -55,15 +58,13 @@ def s3_minio(): def test_full_bib(s3_minio): - input_file_name = "input" - - bib_path = Path(__file__).parent / "files" / f"{input_file_name}.bib" + bib_path = Path(__file__).parent / "files" / f"{INPUT_FILE_NAME}.bib" bib_bytes = bib_path.read_bytes() # Upload to MinIO s3_minio.put_object( Bucket=BUCKET_NAME, - Key=f"{input_file_name}.bib", + Key=f"{INPUT_FILE_NAME}.bib", Body=bib_bytes ) @@ -79,7 +80,7 @@ def test_full_bib(s3_minio): "bib_file_S3_SECRET_KEY": MINIO_PWD, "bib_file_BUCKET_NAME": BUCKET_NAME, "bib_file_FILE_PATH": "", - "bib_file_FILE_NAME": input_file_name, + "bib_file_FILE_NAME": INPUT_FILE_NAME, "bib_file_SELECTED_ATTRIBUTE": "abstract", # PostgreSQL output @@ -88,6 +89,14 @@ def test_full_bib(s3_minio): "normalized_docs_PG_USER": PG_USER, "normalized_docs_PG_PASS": PG_PASS, "normalized_docs_DB_TABLE": "normalized_docs_bib", + + "normalized_overwritten_file_output_S3_HOST": "http://127.0.0.1", + "normalized_overwritten_file_output_S3_PORT": "9000", + "normalized_overwritten_file_output_S3_ACCESS_KEY": MINIO_USER, + "normalized_overwritten_file_output_S3_SECRET_KEY": MINIO_PWD, + "normalized_overwritten_file_output_BUCKET_NAME": BUCKET_NAME, + "normalized_overwritten_file_output_FILE_PATH": "", + "normalized_overwritten_file_output_FILE_NAME": OUTPUT_FILE_NAME } for k, v in env.items(): @@ -122,17 +131,17 @@ def test_full_bib(s3_minio): assert isinstance(df.iloc[0]["tokens"], list) assert all(isinstance(t, str) for t in df.iloc[0]["tokens"]) + # TODO: Test overwritten file upload -def test_full_txt(s3_minio): - input_file_name = "input" - txt_path = Path(__file__).parent / "files" / f"{input_file_name}.txt" +def test_full_txt(s3_minio): + txt_path = Path(__file__).parent / "files" / f"{INPUT_FILE_NAME}.txt" txt_bytes = txt_path.read_bytes() # Upload input to MinIO s3_minio.put_object( Bucket=BUCKET_NAME, - Key=f"{input_file_name}.txt", + Key=f"{INPUT_FILE_NAME}.txt", Body=txt_bytes ) @@ -146,7 +155,7 @@ def test_full_txt(s3_minio): "txt_file_S3_SECRET_KEY": MINIO_PWD, "txt_file_BUCKET_NAME": BUCKET_NAME, "txt_file_FILE_PATH": "", - "txt_file_FILE_NAME": input_file_name, + "txt_file_FILE_NAME": INPUT_FILE_NAME, # Postgres output "normalized_docs_PG_HOST": "localhost", @@ -154,6 +163,14 @@ def test_full_txt(s3_minio): "normalized_docs_PG_USER": PG_USER, "normalized_docs_PG_PASS": PG_PASS, "normalized_docs_DB_TABLE": "normalized_docs_txt", + + "normalized_overwritten_file_output_S3_HOST": "http://127.0.0.1", + "normalized_overwritten_file_output_S3_PORT": "9000", + "normalized_overwritten_file_output_S3_ACCESS_KEY": MINIO_USER, + "normalized_overwritten_file_output_S3_SECRET_KEY": MINIO_PWD, + "normalized_overwritten_file_output_BUCKET_NAME": BUCKET_NAME, + "normalized_overwritten_file_output_FILE_PATH": "", + "normalized_overwritten_file_output_FILE_NAME": OUTPUT_FILE_NAME } for k, v in env.items(): @@ -177,3 +194,5 @@ def test_full_txt(s3_minio): assert isinstance(df.iloc[0]["tokens"], list) assert all(isinstance(t, str) for t in df.iloc[0]["tokens"]) + + # TODO: Test overwritten file upload diff --git a/test/test_loaders.py b/test/test_loaders.py index c55827c..da105ed 100644 --- a/test/test_loaders.py +++ b/test/test_loaders.py @@ -13,7 +13,6 @@ def test_txt_loader_reads_and_normalizes(): result = TxtLoader.load(fname) os.unlink(fname) - # Expect list of DocumentRecord assert len(result) == 2 assert isinstance(result[0], DocumentRecord) @@ -33,11 +32,16 @@ def test_bib_loader_extracts_attribute(): } """ - with tempfile.NamedTemporaryFile("w+", delete=False) as f: + with tempfile.NamedTemporaryFile("w+", delete=False, suffix=".bib") as f: f.write(bib_content) fname = f.name - result = BibLoader.load(fname, "abstract") + loader = BibLoader( + file_path=fname, + attribute="abstract" + ) + + result = loader.document_records os.unlink(fname) assert len(result) == 1 @@ -45,7 +49,7 @@ def test_bib_loader_extracts_attribute(): record = result[0] assert isinstance(record, DocumentRecord) - # ID taken from bib entry key: "@article{a,..." + # ID taken from entry key assert record.doc_id == "a" # Normalized abstract text From 5047266380cca3ec886fc4ed6a9bfb4bfc5bdea5 Mon Sep 17 00:00:00 2001 From: Paul Kalhorn Date: Wed, 8 Apr 2026 14:14:21 +0200 Subject: [PATCH 2/5] chore: fix formatting --- cbc.yaml | 24 ++++++++++++++++ main.py | 61 ++++++++++++++++++++++++----------------- preprocessing/core.py | 25 ++++++----------- preprocessing/loader.py | 27 ++++-------------- test/test_loaders.py | 5 +--- 5 files changed, 76 insertions(+), 66 deletions(-) diff --git a/cbc.yaml b/cbc.yaml index 4df19d8..a3d61ab 100644 --- a/cbc.yaml +++ b/cbc.yaml @@ -36,6 +36,18 @@ entrypoints: normalized_docs_PG_USER: null description: Database Output, containing bib_id aswell as the normalized text type: pg_table + normalized_overwritten_file_output: + config: + normalized_overwritten_file_output_BUCKET_NAME: null + normalized_overwritten_file_output_FILE_EXT: bib + normalized_overwritten_file_output_FILE_NAME: null + normalized_overwritten_file_output_FILE_PATH: null + normalized_overwritten_file_output_S3_ACCESS_KEY: null + normalized_overwritten_file_output_S3_HOST: null + normalized_overwritten_file_output_S3_PORT: null + normalized_overwritten_file_output_S3_SECRET_KEY: null + description: The File Input Overwritten with the normalized output + type: file preprocess_txt_file: description: Entrypoint to preprocess a .txt file envs: @@ -69,4 +81,16 @@ entrypoints: normalized_docs_PG_USER: null description: Database Output, containing bib_id aswell as the normalized text type: pg_table + normalized_overwritten_file_output: + config: + normalized_overwritten_file_output_BUCKET_NAME: null + normalized_overwritten_file_output_FILE_EXT: txt + normalized_overwritten_file_output_FILE_NAME: null + normalized_overwritten_file_output_FILE_PATH: null + normalized_overwritten_file_output_S3_ACCESS_KEY: null + normalized_overwritten_file_output_S3_HOST: null + normalized_overwritten_file_output_S3_PORT: null + normalized_overwritten_file_output_S3_SECRET_KEY: null + description: The File Input Overwritten with the normalized output + type: file name: Language-Preprocessing diff --git a/main.py b/main.py index 6f9c8e4..cc42195 100644 --- a/main.py +++ b/main.py @@ -10,7 +10,7 @@ InputSettings, OutputSettings, FileSettings, - PostgresSettings + PostgresSettings, ) from scystream.sdk.file_handling.s3_manager import S3Operations @@ -18,9 +18,15 @@ from preprocessing.loader import TxtLoader, BibLoader from preprocessing.models import DocumentRecord, PreprocessedDocument +from scystream.sdk.config.config_loader import ( + get_compute_block, + generate_config_from_compute_block, +) + + logging.basicConfig( level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) @@ -82,19 +88,18 @@ class PreprocessBIB(EnvSettings): def _write_preprocessed_docs_to_postgres( - preprocessed_ouput: List[PreprocessedDocument], - settings: PostgresSettings + preprocessed_ouput: List[PreprocessedDocument], settings: PostgresSettings ): - df = pd.DataFrame([ - { - "doc_id": d.doc_id, - "tokens": d.tokens - } - for d in preprocessed_ouput - ]) - - logger.info(f"Writing {len(df)} processed documents to DB table '{ - settings.DB_TABLE}'…") + df = pd.DataFrame( + [{"doc_id": d.doc_id, "tokens": d.tokens} for d in preprocessed_ouput] + ) + + logger.info( + f""" + Writing {len(df)} processed documents to + DB table '{settings.DB_TABLE}'… + """ + ) engine = create_engine( f"postgresql+psycopg2://{settings.PG_USER}:{settings.PG_PASS}" f"@{settings.PG_HOST}:{int(settings.PG_PORT)}/" @@ -102,8 +107,9 @@ def _write_preprocessed_docs_to_postgres( df.to_sql(settings.DB_TABLE, engine, if_exists="replace", index=False) - logger.info(f"Successfully stored normalized documents into '{ - settings.DB_TABLE}'.") + logger.info( + f"Successfully stored normalized documents into '{settings.DB_TABLE}'." + ) def _preprocess_and_store( @@ -127,18 +133,17 @@ def _preprocess_and_store( result = pre.generate_normalized_output() _write_preprocessed_docs_to_postgres( - result, - settings.normalized_docs_output + result, settings.normalized_docs_output ) # Overwrite file using injected behavior - export_path = Path(f"output.{ - settings.normalized_overwritten_file_output.FILE_EXT}") + export_path = Path( + f"output.{settings.normalized_overwritten_file_output.FILE_EXT}" + ) overwrite_callback(result, export_path) S3Operations.upload( - settings.normalized_overwritten_file_output, - export_path + settings.normalized_overwritten_file_output, export_path ) logger.info("Preprocessing completed successfully.") @@ -155,7 +160,7 @@ def preprocess_txt_file(settings): _preprocess_and_store( documents=documents, overwrite_callback=TxtLoader.overwrite_with_results, - settings=settings + settings=settings, ) @@ -166,11 +171,17 @@ def preprocess_bib_file(settings): loader = BibLoader( file_path=settings.BIB_DOWNLOAD_PATH, - attribute=settings.bib_input.SELECTED_ATTRIBUTE + attribute=settings.bib_input.SELECTED_ATTRIBUTE, ) _preprocess_and_store( documents=loader.document_records, overwrite_callback=loader.overwrite_with_results, - settings=settings + settings=settings, ) + + +if __name__ == "__main__": + cb = get_compute_block() + + generate_config_from_compute_block(cb, Path("test.yaml")) diff --git a/preprocessing/core.py b/preprocessing/core.py index f819839..86b3497 100644 --- a/preprocessing/core.py +++ b/preprocessing/core.py @@ -5,10 +5,7 @@ from nltk.stem.porter import PorterStemmer from preprocessing.models import PreprocessedDocument, DocumentRecord -LANG_TO_SPACY_MODELS = { - "en": "en_core_web_sm", - "de": "de_core_news_sm" -} +LANG_TO_SPACY_MODELS = {"en": "en_core_web_sm", "de": "de_core_news_sm"} logger = logging.getLogger(__name__) @@ -45,12 +42,11 @@ def __init__( self.documents: List[DocumentRecord] = [] def filter_tokens( - self, - tokens: list[spacy.tokens.Token], - filter_stopwords: bool = False + self, tokens: list[spacy.tokens.Token], filter_stopwords: bool = False ) -> list[spacy.tokens.Token]: return [ - t for t in tokens + t + for t in tokens if t.is_alpha and (not filter_stopwords or not t.is_stop) and len(t.text) > 2 @@ -80,20 +76,17 @@ def generate_normalized_output(self) -> List[PreprocessedDocument]: if self.use_ngrams and self.ngram_min > 1: for n in range(self.ngram_min, self.ngram_max + 1): for i in range(len(normalized) - n + 1): - ngram = " ".join(normalized[i:i+n]) + ngram = " ".join(normalized[i : i + n]) doc_terms.append(ngram) - processed_docs.append(PreprocessedDocument( - doc_id=record.doc_id, - tokens=doc_terms - )) + processed_docs.append( + PreprocessedDocument(doc_id=record.doc_id, tokens=doc_terms) + ) return processed_docs def normalize_token( - self, - token: spacy.tokens.Token, - porter: PorterStemmer + self, token: spacy.tokens.Token, porter: PorterStemmer ): """Apply lemma or stem normalization.""" word = token.text.lower() if not token.text.isupper() else token.text diff --git a/preprocessing/loader.py b/preprocessing/loader.py index 4c06a39..d34e36c 100644 --- a/preprocessing/loader.py +++ b/preprocessing/loader.py @@ -16,7 +16,7 @@ def normalize_text(text: str) -> str: text = re.sub(r"\\[a-zA-Z]+\{([^}]*)\}", r"\1", text) text = re.sub(r"\\[a-zA-Z]+", "", text) text = re.sub(r"[{}]", "", text) - text = re.sub(r'\\"([a-zA-Z])', r'\1', text) + text = re.sub(r'\\"([a-zA-Z])', r"\1", text) text = re.sub(r"\\'", "", text) text = re.sub(r"\s+", " ", text) @@ -30,10 +30,7 @@ def load(file_path: str) -> list[DocumentRecord]: lines = f.readlines() return [ - DocumentRecord( - doc_id=str(i), - text=normalize_text(line) - ) + DocumentRecord(doc_id=str(i), text=normalize_text(line)) for i, line in enumerate(lines, start=1) ] @@ -47,10 +44,7 @@ def overwrite_with_results( output_path = Path.cwd() / export_path.name # Ensure correct order (IDs are numeric strings) - sorted_docs = sorted( - preprocessed_docs, - key=lambda d: int(d.doc_id) - ) + sorted_docs = sorted(preprocessed_docs, key=lambda d: int(d.doc_id)) with open(output_path, "w", encoding="utf-8") as f: for doc in sorted_docs: @@ -91,27 +85,18 @@ def _build_document_records(self) -> List[DocumentRecord]: raw_value = entry.get(self.attribute, "") normalized = normalize_text(raw_value) - records.append( - DocumentRecord( - doc_id=bib_id, - text=normalized - ) - ) + records.append(DocumentRecord(doc_id=bib_id, text=normalized)) return records def overwrite_with_results( - self, - preprocessed_docs: List[PreprocessedDocument], - export_path: Path + self, preprocessed_docs: List[PreprocessedDocument], export_path: Path ) -> None: logger.info("Overwriting input documents with preprocessed text...") output_path = Path.cwd() / export_path.name - preprocessed_dict = { - doc.doc_id: doc for doc in preprocessed_docs - } + preprocessed_dict = {doc.doc_id: doc for doc in preprocessed_docs} for entry in self.bib_db.entries: bib_id = self._extract_bib_id(entry) diff --git a/test/test_loaders.py b/test/test_loaders.py index da105ed..c3575ce 100644 --- a/test/test_loaders.py +++ b/test/test_loaders.py @@ -36,10 +36,7 @@ def test_bib_loader_extracts_attribute(): f.write(bib_content) fname = f.name - loader = BibLoader( - file_path=fname, - attribute="abstract" - ) + loader = BibLoader(file_path=fname, attribute="abstract") result = loader.document_records os.unlink(fname) From c92a29ca1d090b6a18330428f310679fd7bdc000 Mon Sep 17 00:00:00 2001 From: Paul Kalhorn Date: Wed, 8 Apr 2026 14:35:32 +0200 Subject: [PATCH 3/5] Merge main into feat/keep-input-files --- main.py | 49 +++++++------------------------------------ preprocessing/core.py | 2 +- 2 files changed, 8 insertions(+), 43 deletions(-) diff --git a/main.py b/main.py index ab78d09..24e3350 100644 --- a/main.py +++ b/main.py @@ -1,26 +1,22 @@ -import hashlib import logging +import pandas as pd +from sqlalchemy import create_engine from pathlib import Path from typing import List from scystream.sdk.core import entrypoint from scystream.sdk.env.settings import ( EnvSettings, - FileSettings, InputSettings, OutputSettings, FileSettings, PostgresSettings, ) from scystream.sdk.file_handling.s3_manager import S3Operations -from sqlalchemy import create_engine -from sqlalchemy.sql import quoted_name - -from scystream.sdk.config.config_loader import ( - get_compute_block, - generate_config_from_compute_block, -) +from preprocessing.core import Preprocessor +from preprocessing.loader import TxtLoader, BibLoader +from preprocessing.models import DocumentRecord, PreprocessedDocument logging.basicConfig( level=logging.INFO, @@ -29,21 +25,6 @@ logger = logging.getLogger(__name__) -def _normalize_table_name(table_name: str) -> str: - max_length = 63 - if len(table_name) <= max_length: - return table_name - digest = hashlib.sha1(table_name.encode("utf-8")).hexdigest()[:10] - prefix_length = max_length - len(digest) - 1 - return f"{table_name[:prefix_length]}_{digest}" - - -def _resolve_db_table(settings: PostgresSettings) -> str: - normalized_name = _normalize_table_name(settings.DB_TABLE) - settings.DB_TABLE = normalized_name - return normalized_name - - class NormalizedDocsOutput(PostgresSettings, OutputSettings): __identifier__ = "normalized_docs" @@ -107,26 +88,16 @@ def _write_preprocessed_docs_to_postgres( [{"doc_id": d.doc_id, "tokens": d.tokens} for d in preprocessed_ouput] ) - logger.info( - f""" - Writing {len(df)} processed documents to - DB table '{settings.DB_TABLE}'… - """ - ) - engine = create_engine( - f"postgresql+psycopg2://{settings.PG_USER}:{settings.PG_PASS}" - f"@{settings.PG_HOST}:{int(settings.PG_PORT)}/" - ) - logger.info( "Writing %s processed documents to DB table '%s'…", len(df), - resolved_table_name, + settings.DB_TABLE, ) engine = create_engine( f"postgresql+psycopg2://{settings.PG_USER}:{settings.PG_PASS}" f"@{settings.PG_HOST}:{int(settings.PG_PORT)}/", ) + df.to_sql(settings.DB_TABLE, engine, if_exists="replace", index=False) logger.info( f"Successfully stored normalized documents into '{settings.DB_TABLE}'." @@ -200,9 +171,3 @@ def preprocess_bib_file(settings): overwrite_callback=loader.overwrite_with_results, settings=settings, ) - - -if __name__ == "__main__": - cb = get_compute_block() - - generate_config_from_compute_block(cb, Path("test.yaml")) diff --git a/preprocessing/core.py b/preprocessing/core.py index 86b3497..b2fe866 100644 --- a/preprocessing/core.py +++ b/preprocessing/core.py @@ -76,7 +76,7 @@ def generate_normalized_output(self) -> List[PreprocessedDocument]: if self.use_ngrams and self.ngram_min > 1: for n in range(self.ngram_min, self.ngram_max + 1): for i in range(len(normalized) - n + 1): - ngram = " ".join(normalized[i : i + n]) + ngram = " ".join(normalized[i:i + n]) # fmt: off doc_terms.append(ngram) processed_docs.append( From 0cac07ad6cec759bfdd34fc9d17dc361b1f080ad Mon Sep 17 00:00:00 2001 From: Paul Kalhorn Date: Thu, 9 Apr 2026 16:25:04 +0200 Subject: [PATCH 4/5] update: use scystream-sdk 1.4 --- cbc.yaml | 14 ++++---------- main.py | 41 ++++++++++------------------------------- requirements.txt | 2 +- test/test_full.py | 31 +++++++------------------------ 4 files changed, 22 insertions(+), 66 deletions(-) diff --git a/cbc.yaml b/cbc.yaml index 4df19d8..7a0ca6f 100644 --- a/cbc.yaml +++ b/cbc.yaml @@ -30,12 +30,9 @@ entrypoints: normalized_docs_output: config: normalized_docs_DB_TABLE: null - normalized_docs_PG_HOST: null - normalized_docs_PG_PASS: null - normalized_docs_PG_PORT: null - normalized_docs_PG_USER: null + normalized_docs_DB_DSN: null description: Database Output, containing bib_id aswell as the normalized text - type: pg_table + type: database_table preprocess_txt_file: description: Entrypoint to preprocess a .txt file envs: @@ -63,10 +60,7 @@ entrypoints: normalized_docs_output: config: normalized_docs_DB_TABLE: null - normalized_docs_PG_HOST: null - normalized_docs_PG_PASS: null - normalized_docs_PG_PORT: null - normalized_docs_PG_USER: null + normalized_docs_DB_DSN: null description: Database Output, containing bib_id aswell as the normalized text - type: pg_table + type: database_table name: Language-Preprocessing diff --git a/main.py b/main.py index b1ec208..ac14b5a 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,3 @@ -import hashlib import logging import pandas as pd @@ -11,11 +10,12 @@ FileSettings, InputSettings, OutputSettings, - PostgresSettings, + DatabaseSettings, ) from scystream.sdk.file_handling.s3_manager import S3Operations -from sqlalchemy import create_engine -from sqlalchemy.sql import quoted_name +from scystream.sdk.database_handling.database_manager import ( + PandasDatabaseOperations, +) logging.basicConfig( level=logging.INFO, @@ -24,22 +24,7 @@ logger = logging.getLogger(__name__) -def _normalize_table_name(table_name: str) -> str: - max_length = 63 - if len(table_name) <= max_length: - return table_name - digest = hashlib.sha1(table_name.encode("utf-8")).hexdigest()[:10] - prefix_length = max_length - len(digest) - 1 - return f"{table_name[:prefix_length]}_{digest}" - - -def _resolve_db_table(settings: PostgresSettings) -> str: - normalized_name = _normalize_table_name(settings.DB_TABLE) - settings.DB_TABLE = normalized_name - return normalized_name - - -class NormalizedDocsOutput(PostgresSettings, OutputSettings): +class NormalizedDocsOutput(DatabaseSettings, OutputSettings): __identifier__ = "normalized_docs" @@ -85,9 +70,8 @@ class PreprocessBIB(EnvSettings): def _write_preprocessed_docs_to_postgres( preprocessed_ouput: list[PreprocessedDocument], - settings: PostgresSettings, + settings: DatabaseSettings, ): - resolved_table_name = _resolve_db_table(settings) df = pd.DataFrame( [ { @@ -101,19 +85,14 @@ def _write_preprocessed_docs_to_postgres( logger.info( "Writing %s processed documents to DB table '%s'…", len(df), - resolved_table_name, - ) - engine = create_engine( - f"postgresql+psycopg2://{settings.PG_USER}:{settings.PG_PASS}" - f"@{settings.PG_HOST}:{int(settings.PG_PORT)}/", + settings.DB_TABLE, ) - - table_name = quoted_name(resolved_table_name, quote=True) - df.to_sql(table_name, engine, if_exists="replace", index=False) + db = PandasDatabaseOperations(settings.DB_DSN) + db.write(table=settings.DB_TABLE, data=df) logger.info( "Successfully stored normalized documents into '%s'.", - resolved_table_name, + settings.DB_TABLE, ) diff --git a/requirements.txt b/requirements.txt index 1804e39..f3eff98 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -scystream-sdk==1.2.2 +scystream-sdk[database,postgres]==1.4.0 spacy==3.8.7 nltk==3.9.1 bibtexparser==1.4.3 diff --git a/test/test_full.py b/test/test_full.py index 6a04e61..7788896 100644 --- a/test/test_full.py +++ b/test/test_full.py @@ -48,7 +48,7 @@ def s3_minio(): "s3", endpoint_url="http://localhost:9000", aws_access_key_id=MINIO_USER, - aws_secret_access_key=MINIO_PWD + aws_secret_access_key=MINIO_PWD, ) ensure_bucket(client, BUCKET_NAME) return client @@ -62,16 +62,13 @@ def test_full_bib(s3_minio): # Upload to MinIO s3_minio.put_object( - Bucket=BUCKET_NAME, - Key=f"{input_file_name}.bib", - Body=bib_bytes + Bucket=BUCKET_NAME, Key=f"{input_file_name}.bib", Body=bib_bytes ) # ENV for preprocess_bib_file env = { # Preprocessor config "UNIGRAM_NORMALIZER": "porter", - # BIB INPUT S3 "bib_file_S3_HOST": "http://127.0.0.1", "bib_file_S3_PORT": "9000", @@ -81,12 +78,8 @@ def test_full_bib(s3_minio): "bib_file_FILE_PATH": "", "bib_file_FILE_NAME": input_file_name, "bib_file_SELECTED_ATTRIBUTE": "abstract", - # PostgreSQL output - "normalized_docs_PG_HOST": "localhost", - "normalized_docs_PG_PORT": "5432", - "normalized_docs_PG_USER": PG_USER, - "normalized_docs_PG_PASS": PG_PASS, + "normalized_docs_DB_DSN": f"postgresql://{PG_USER}:{PG_PASS}@127.0.0.1:5432/postgres", "normalized_docs_DB_TABLE": "normalized_docs_bib", } @@ -109,13 +102,10 @@ def test_full_bib(s3_minio): # doc_id increments assert len(df["doc_id"]) == len(df) # doc_id count matches rows - assert df["doc_id"].is_unique # no duplicates + assert df["doc_id"].is_unique # no duplicates assert all(isinstance(x, str) for x in df["doc_id"]) # Bib IDs are strings - assert set(df["doc_id"]) == { - "WOS:001016714700004", - "WOS:001322577100012" - } + assert set(df["doc_id"]) == {"WOS:001016714700004", "WOS:001322577100012"} df["tokens"] = df["tokens"].apply(parse_pg_array) @@ -131,14 +121,11 @@ def test_full_txt(s3_minio): # Upload input to MinIO s3_minio.put_object( - Bucket=BUCKET_NAME, - Key=f"{input_file_name}.txt", - Body=txt_bytes + Bucket=BUCKET_NAME, Key=f"{input_file_name}.txt", Body=txt_bytes ) env = { "UNIGRAM_NORMALIZER": "porter", - # TXT input S3 "txt_file_S3_HOST": "http://127.0.0.1", "txt_file_S3_PORT": "9000", @@ -147,12 +134,8 @@ def test_full_txt(s3_minio): "txt_file_BUCKET_NAME": BUCKET_NAME, "txt_file_FILE_PATH": "", "txt_file_FILE_NAME": input_file_name, - # Postgres output - "normalized_docs_PG_HOST": "localhost", - "normalized_docs_PG_PORT": "5432", - "normalized_docs_PG_USER": PG_USER, - "normalized_docs_PG_PASS": PG_PASS, + "normalized_docs_DB_DSN": f"postgresql://{PG_USER}:{PG_PASS}@127.0.0.1:5432/postgres", "normalized_docs_DB_TABLE": "normalized_docs_txt", } From 2ecaeee0c977256ea384f449b70278cb08f65d5b Mon Sep 17 00:00:00 2001 From: Paul Kalhorn Date: Thu, 9 Apr 2026 16:34:42 +0200 Subject: [PATCH 5/5] chore: ignore tests for linting --- .github/workflows/ci.yaml | 6 ++- output.bib | 79 --------------------------------------- output.txt | 4 -- 3 files changed, 4 insertions(+), 85 deletions(-) delete mode 100644 output.bib delete mode 100644 output.txt diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 3e28428..7572acb 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -24,6 +24,8 @@ jobs: - name: Run flake8 uses: py-actions/flake8@v2 + with: + args: --exclude=test validate-compute-block: name: Validate Compute Block Config @@ -38,7 +40,7 @@ jobs: - name: Intall dependencies run: | pip install -r requirements.txt - + - name: Check cbcs run: | python3 - <<'EOF' @@ -132,7 +134,7 @@ jobs: tags: | type=ref, event=pr type=raw, value=latest, enable=${{ (github.ref == format('refs/heads/{0}', 'main')) }} - + - name: Build and push Docker image uses: docker/build-push-action@v5 with: diff --git a/output.bib b/output.bib deleted file mode 100644 index db85e44..0000000 --- a/output.bib +++ /dev/null @@ -1,79 +0,0 @@ -@article{WOS:001016714700004, - abstract = {modern contemporari transhuman seen recent rise academ popular relev specif naiv metaphys idea immort return rise modern contemporari contemporari transhuman transhuman seen seen recent recent rise rise academ academ popular popular relev relev specif specif naiv naiv metaphys metaphys idea idea immort immort return return rise modern contemporari transhuman contemporari transhuman seen transhuman seen recent seen recent rise recent rise academ rise academ popular academ popular relev popular relev specif relev specif naiv specif naiv metaphys naiv metaphys idea metaphys idea immort idea immort return immort return rise articl refrain ethic polit assess transhuman articl refrain refrain ethic ethic polit polit assess assess transhuman articl refrain ethic refrain ethic polit ethic polit assess polit assess transhuman critiqu exact metaphys idealist natur transhuman pursuit digit immort idea technolog advanc precis artifici gener intellig immort virtual self possibl critiqu exact exact metaphys metaphys idealist idealist natur natur transhuman transhuman pursuit pursuit digit digit immort immort idea idea technolog technolog advanc advanc precis precis artifici artifici gener gener intellig intellig immort immort virtual virtual self self possibl critiqu exact metaphys exact metaphys idealist metaphys idealist natur idealist natur transhuman natur transhuman pursuit transhuman pursuit digit pursuit digit immort digit immort idea immort idea technolog idea technolog advanc technolog advanc precis advanc precis artifici precis artifici gener artifici gener intellig gener intellig immort intellig immort virtual immort virtual self virtual self possibl articl follow form immanuel kant paralog critiqu pure reason kant concern substanti immort natur soul experienti imposs articl follow follow form form immanuel immanuel kant kant paralog paralog critiqu critiqu pure pure reason reason kant kant concern concern substanti substanti immort immort natur natur soul soul experienti experienti imposs articl follow form follow form immanuel form immanuel kant immanuel kant paralog kant paralog critiqu paralog critiqu pure critiqu pure reason pure reason kant reason kant concern kant concern substanti concern substanti immort substanti immort natur immort natur soul natur soul experienti soul experienti imposs articl offer theoret practic paralog fals logic infer argu transhumanist claim digit immort possibl fundament stem incorrect major premis articl offer offer theoret theoret practic practic paralog paralog fals fals logic logic infer infer argu argu transhumanist transhumanist claim claim digit digit immort immort possibl possibl fundament fundament stem stem incorrect incorrect major major premis articl offer theoret offer theoret practic theoret practic paralog practic paralog fals paralog fals logic fals logic infer logic infer argu infer argu transhumanist argu transhumanist claim transhumanist claim digit claim digit immort digit immort possibl immort possibl fundament possibl fundament stem fundament stem incorrect stem incorrect major incorrect major premis concern substanti natur inform inform theoret paralog second concern infinit transform pure plastic inform practic paralog concern substanti substanti natur natur inform inform inform inform theoret theoret paralog paralog second second concern concern infinit infinit transform transform pure pure plastic plastic inform inform practic practic paralog concern substanti natur substanti natur inform natur inform inform inform inform theoret inform theoret paralog theoret paralog second paralog second concern second concern infinit concern infinit transform infinit transform pure transform pure plastic pure plastic inform plastic inform practic inform practic paralog}, - address = {2-4 PARK SQUARE, MILTON PARK, ABINGDON OX14 4RN, OXON, ENGLAND}, - author = {White, Joel}, - author-email = {jhmw01@gmail.com}, - da = {2025-06-26}, - doc-delivery-number = {K5GF0}, - doi = {10.1080/20539320.2022.2150463}, - eissn = {2053-9339}, - issn = {2053-9320}, - journal = {JOURNAL OF AESTHETICS AND PHENOMENOLOGY}, - journal-iso = {J. Aesthet. Phenomenol.}, - keywords = {Transhumanism; Critical Philosophy; Immanuel Kant; Entropy; Paralogisms; -Digital Immortality}, - language = {English}, - month = {JUL 3}, - number = {2, SI}, - number-of-cited-references = {30}, - orcid-numbers = {White, Joel/0000-0001-6460-0564}, - pages = {155-172}, - publisher = {ROUTLEDGE JOURNALS, TAYLOR \& FRANCIS LTD}, - research-areas = {Philosophy}, - times-cited = {0}, - title = {Theoretical and Practical Paralogisms of Digital Immortality}, - type = {Article}, - unique-id = {WOS:001016714700004}, - usage-count-last-180-days = {3}, - usage-count-since-2013 = {15}, - volume = {9}, - web-of-science-categories = {Philosophy}, - web-of-science-index = {Emerging Sources Citation Index (ESCI)}, - year = {2022} -} - -@article{WOS:001322577100012, - abstract = {unit nation panel digit cooper emphas inclus growth digit network digit public good util multistakehold system approach unit nation nation panel panel digit digit cooper cooper emphas emphas inclus inclus growth growth digit digit network network digit digit public public good good util util multistakehold multistakehold system system approach unit nation panel nation panel digit panel digit cooper digit cooper emphas cooper emphas inclus emphas inclus growth inclus growth digit growth digit network digit network digit network digit public digit public good public good util good util multistakehold util multistakehold system multistakehold system approach similarli inform commun technolog ICT innov intervent program govern india digit north east vision emphas need inclus growth ICT northeast region similarli inform inform commun commun technolog technolog ICT ICT innov innov intervent intervent program program govern govern india india digit digit north north east east vision vision emphas emphas need need inclus inclus growth growth ICT ICT northeast northeast region similarli inform commun inform commun technolog commun technolog ICT technolog ICT innov ICT innov intervent innov intervent program intervent program govern program govern india govern india digit india digit north digit north east north east vision east vision emphas vision emphas need emphas need inclus need inclus growth inclus growth ICT growth ICT northeast ICT northeast region line articl present insight field studi conduct rural part manipur india incident found applic rural part develop world line articl articl present present insight insight field field studi studi conduct conduct rural rural part part manipur manipur india india incident incident found found applic applic rural rural part part develop develop world line articl present articl present insight present insight field insight field studi field studi conduct studi conduct rural conduct rural part rural part manipur part manipur india manipur india incident india incident found incident found applic found applic rural applic rural part rural part develop part develop world articl envis commun driven sociodigit transform northeast region india articl envis envis commun commun driven driven sociodigit sociodigit transform transform northeast northeast region region india articl envis commun envis commun driven commun driven sociodigit driven sociodigit transform sociodigit transform northeast transform northeast region northeast region india quest articl highlight sociopolit challeng digit transform provid insight inclus ICT region infrastructur util citizen smart govern servic demand digit empower citizen social welfar capac build commun engag quest articl articl highlight highlight sociopolit sociopolit challeng challeng digit digit transform transform provid provid insight insight inclus inclus ICT ICT region region infrastructur infrastructur util util citizen citizen smart smart govern govern servic servic demand demand digit digit empower empower citizen citizen social social welfar welfar capac capac build build commun commun engag quest articl highlight articl highlight sociopolit highlight sociopolit challeng sociopolit challeng digit challeng digit transform digit transform provid transform provid insight provid insight inclus insight inclus ICT inclus ICT region ICT region infrastructur region infrastructur util infrastructur util citizen util citizen smart citizen smart govern smart govern servic govern servic demand servic demand digit demand digit empower digit empower citizen empower citizen social citizen social welfar social welfar capac welfar capac build capac build commun build commun engag}, - address = {10662 LOS VAQUEROS CIRCLE, PO BOX 3014, LOS ALAMITOS, CA 90720-1314 USA}, - affiliation = {Kant, V (Corresponding Author), Indian Inst Technol Kanpur, Kanpur 208016, India. -Kant, Vivek, Indian Inst Technol Kanpur, Kanpur 208016, India. -Khanganba, Sanjram Premjit, Indian Inst Technol Indore, Indore 452020, India. -Dixit, Sudhir, Basic Internet Fdn, Oslo, Norway.}, - affiliations = {Indian Institute of Technology System (IIT System); Indian Institute of -Technology (IIT) - Kanpur; Indian Institute of Technology System (IIT -System); Indian Institute of Technology (IIT) - Indore}, - author = {Kant, Vivek and Khanganba, Sanjram Premjit and Dixit, Sudhir}, - author-email = {vkant@iitk.ac.in -sanjrampk@iiti.ac.in -sudhir.dixit@ieee.org}, - da = {2025-06-26}, - doc-delivery-number = {H3O9D}, - doi = {10.1109/MITP.2024.3433459}, - eissn = {1941-045X}, - issn = {1520-9202}, - journal = {IT PROFESSIONAL}, - journal-iso = {IT Prof.}, - keywords = {Technological innovation; Digital transformation; Government; Buildings; -Asia; Africa; Information and communication technology}, - language = {English}, - month = {JUL-AUG}, - number = {4}, - number-of-cited-references = {7}, - orcid-numbers = {/0000-0002-6215-7500}, - pages = {42-47}, - publisher = {IEEE COMPUTER SOC}, - research-areas = {Computer Science; Telecommunications}, - researcherid-numbers = {/ITU-6308-2023}, - times-cited = {0}, - title = {Sociopolitical Challenges to Digital Transformation of Rural -Communities: Learnings from a Case Study From Manipur, India}, - type = {Article}, - unique-id = {WOS:001322577100012}, - usage-count-last-180-days = {11}, - usage-count-since-2013 = {22}, - volume = {26}, - web-of-science-categories = {Computer Science, Information Systems; Computer Science, Software -Engineering; Telecommunications}, - web-of-science-index = {Science Citation Index Expanded (SCI-EXPANDED)}, - year = {2024} -} diff --git a/output.txt b/output.txt deleted file mode 100644 index 7c8d8ea..0000000 --- a/output.txt +++ /dev/null @@ -1,4 +0,0 @@ -cat chase mice cat chase chase mice cat chase mice dog chase cat dog chase chase cat dog chase cat -bird fli high bird fli fli high bird fli high cat dog coexist cat dog dog coexist cat dog coexist -mice hide cat mice hide hide cat mice hide cat bird sing loudli bird sing sing loudli bird sing loudli -cat dog coexist cat dog dog coexist cat dog coexist cat dog coexist cat dog dog coexist cat dog coexist