From 9627f830e10f815120740d70f9134d15f2bbe999 Mon Sep 17 00:00:00 2001 From: Paul Kalhorn Date: Thu, 9 Apr 2026 15:29:52 +0200 Subject: [PATCH 1/2] update: use scystream-sdk 1.4 --- interactions/query.py | 29 ++++++---------------- main.py | 57 ++++++++++++++++++++----------------------- requirements.txt | 3 +-- 3 files changed, 34 insertions(+), 55 deletions(-) diff --git a/interactions/query.py b/interactions/query.py index 25587c5..769481e 100644 --- a/interactions/query.py +++ b/interactions/query.py @@ -1,31 +1,16 @@ import logging -import csv import sys -from sqlalchemy import create_engine, text -from scystream.sdk.env.settings import ( - EnvSettings, +from scystream.sdk.database_handling.database_manager import ( + PandasDatabaseOperations, ) -def query_db( - query: str, - db_settings: EnvSettings, - output_file_name: str -) -> None: +def execute_query_to_csv(query: str, dsn: str, output_file: str) -> None: try: - engine = create_engine(db_settings.DB_DSN) - with engine.connect() as conn: - result = conn.execute(text(query)) - col_names = result.keys() - rows = result.fetchall() - - with open(output_file_name, "w", newline="") as csvfile: - writer = csv.writer(csvfile) - writer.writerow(col_names) - writer.writerows(rows) + db = PandasDatabaseOperations(dsn) + df = db.read(query=query) + df.to_csv(output_file, index=False) except Exception as e: - logging.error(f"Failed to execute query or write CSV: {e}") + logging.error(f"Database query failed: {e}") sys.exit(1) - finally: - conn.close() diff --git a/main.py b/main.py index 5a639b0..95d072a 100644 --- a/main.py +++ b/main.py @@ -9,7 +9,7 @@ FileSettings, ) from scystream.sdk.file_handling.s3_manager import S3Operations -from interactions.query import query_db +from interactions.query import execute_query_to_csv def upload_to_s3(local_file_path: str, output_settings: FileSettings) -> None: @@ -22,7 +22,7 @@ def upload_to_s3(local_file_path: str, output_settings: FileSettings) -> None: f"{output_settings.FILE_PATH}/" f"{output_settings.FILE_NAME}." f"{output_settings.FILE_EXT}" - ) + ), ) except Exception as e: logging.error(f"Failed to upload CSV to S3: {e}") @@ -31,7 +31,7 @@ def upload_to_s3(local_file_path: str, output_settings: FileSettings) -> None: def read_query_file(file_path: str) -> str: try: - with open(file_path, 'r') as f: + with open(file_path, "r") as f: return f.read().strip() except Exception as e: logging.error(f"Failed to read query file: {e}") @@ -66,11 +66,14 @@ class QueryDatabaseEntrypointSettings(EnvSettings): csv_output: CSVOutput -@entrypoint(QueryDatabaseEntrypointSettings) +# @entrypoint(QueryDatabaseEntrypointSettings) def run_query_from_string(settings): - query = settings.query_str.QUERY target_csv = "output.csv" - query_db(query, settings, target_csv) + execute_query_to_csv( + query=settings.query_str.QUERY, + dsn=settings.DB_DSN, + output_file=target_csv, + ) upload_to_s3(target_csv, settings.csv_output) @@ -78,44 +81,36 @@ def run_query_from_string(settings): def run_query_from_file(settings): local_file = "query_file.txt" - s3_conn = S3Operations(settings.query_file) try: - s3_conn.download_file( - bucket_name=settings.query_file.BUCKET_NAME, - s3_object_name=( - f"{settings.query_file.FILE_PATH}/" - f"{settings.query_file.FILE_NAME}." - f"{settings.query_file.FILE_EXT}" - ), - local_file_path=local_file - ) + S3Operations.download(settings.query_file, local_file) except Exception as e: logging.error(f"Failed to download query file: {e}") sys.exit(1) query = read_query_file(local_file) target_csv = "output.csv" - query_db(query, settings, target_csv) + + execute_query_to_csv( + query=query, dsn=settings.DB_DSN, output_file=target_csv + ) upload_to_s3(target_csv, settings.csv_output) """ if __name__ == "__main__": test = QueryDatabaseEntrypointSettings( - DB_DSN="postgresql://guest:guest@localhost:5432/patstat", - query_str=QueryStrInput( - QUERY="SELECT name FROM employees;" - ), - csv_output=CSVOutput( - S3_HOST="http://localhost", - S3_PORT="9000", - S3_ACCESS_KEY="minioadmin", - S3_SECRET_KEY="minioadmin", - BUCKET_NAME="output-bucket", - FILE_PATH="output_file_path", - FILE_NAME="csv_file", - FILE_EXT="csv" - ) + DB_DSN="postgresql+psycopg2://postgres:postgres@localhost:5432/postgres", + query_str=QueryStrInput(QUERY="SELECT * FROM test_table;"), + csv_output=CSVOutput( + S3_HOST="http://localhost", + S3_PORT="9000", + S3_ACCESS_KEY="minioadmin", + S3_SECRET_KEY="minioadmin", + BUCKET_NAME="output-bucket", + FILE_PATH="output_file_path", + FILE_NAME="csv_file", + FILE_EXT="csv", + ), ) run_query_from_string(test) diff --git a/requirements.txt b/requirements.txt index 8c65b43..c480122 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,4 @@ -scystream-sdk==1.2.1 -SQLAlchemy==2.0.43 +scystream-sdk[all]==1.4.0 psycopg2-binary==2.9.10 PyMySQL==1.1.2 duckdb==1.4.1 From 2588db00f44a724437998410e4c1e623a3bedc92 Mon Sep 17 00:00:00 2001 From: Paul Kalhorn Date: Thu, 9 Apr 2026 16:10:11 +0200 Subject: [PATCH 2/2] fix: comment --- main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.py b/main.py index 95d072a..8b1b902 100644 --- a/main.py +++ b/main.py @@ -66,7 +66,7 @@ class QueryDatabaseEntrypointSettings(EnvSettings): csv_output: CSVOutput -# @entrypoint(QueryDatabaseEntrypointSettings) +@entrypoint(QueryDatabaseEntrypointSettings) def run_query_from_string(settings): target_csv = "output.csv" execute_query_to_csv(