Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 7 additions & 22 deletions interactions/query.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,16 @@
import logging
import csv
import sys
from sqlalchemy import create_engine, text

from scystream.sdk.env.settings import (
EnvSettings,
from scystream.sdk.database_handling.database_manager import (
PandasDatabaseOperations,
)


def query_db(
query: str,
db_settings: EnvSettings,
output_file_name: str
) -> None:
def execute_query_to_csv(query: str, dsn: str, output_file: str) -> None:
try:
engine = create_engine(db_settings.DB_DSN)
with engine.connect() as conn:
result = conn.execute(text(query))
col_names = result.keys()
rows = result.fetchall()

with open(output_file_name, "w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(col_names)
writer.writerows(rows)
db = PandasDatabaseOperations(dsn)
df = db.read(query=query)
df.to_csv(output_file, index=False)
except Exception as e:
logging.error(f"Failed to execute query or write CSV: {e}")
logging.error(f"Database query failed: {e}")
sys.exit(1)
finally:
conn.close()
55 changes: 25 additions & 30 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
FileSettings,
)
from scystream.sdk.file_handling.s3_manager import S3Operations
from interactions.query import query_db
from interactions.query import execute_query_to_csv


def upload_to_s3(local_file_path: str, output_settings: FileSettings) -> None:
Expand All @@ -22,7 +22,7 @@ def upload_to_s3(local_file_path: str, output_settings: FileSettings) -> None:
f"{output_settings.FILE_PATH}/"
f"{output_settings.FILE_NAME}."
f"{output_settings.FILE_EXT}"
)
),
)
except Exception as e:
logging.error(f"Failed to upload CSV to S3: {e}")
Expand All @@ -31,7 +31,7 @@ def upload_to_s3(local_file_path: str, output_settings: FileSettings) -> None:

def read_query_file(file_path: str) -> str:
try:
with open(file_path, 'r') as f:
with open(file_path, "r") as f:
return f.read().strip()
except Exception as e:
logging.error(f"Failed to read query file: {e}")
Expand Down Expand Up @@ -68,54 +68,49 @@ class QueryDatabaseEntrypointSettings(EnvSettings):

@entrypoint(QueryDatabaseEntrypointSettings)
def run_query_from_string(settings):
query = settings.query_str.QUERY
target_csv = "output.csv"
query_db(query, settings, target_csv)
execute_query_to_csv(
query=settings.query_str.QUERY,
dsn=settings.DB_DSN,
output_file=target_csv,
)
upload_to_s3(target_csv, settings.csv_output)


@entrypoint(QueryDatabaseFromFileEntrypointSettings)
def run_query_from_file(settings):
local_file = "query_file.txt"

s3_conn = S3Operations(settings.query_file)
try:
s3_conn.download_file(
bucket_name=settings.query_file.BUCKET_NAME,
s3_object_name=(
f"{settings.query_file.FILE_PATH}/"
f"{settings.query_file.FILE_NAME}."
f"{settings.query_file.FILE_EXT}"
),
local_file_path=local_file
)
S3Operations.download(settings.query_file, local_file)
except Exception as e:
logging.error(f"Failed to download query file: {e}")
sys.exit(1)

query = read_query_file(local_file)
target_csv = "output.csv"
query_db(query, settings, target_csv)

execute_query_to_csv(
query=query, dsn=settings.DB_DSN, output_file=target_csv
)
upload_to_s3(target_csv, settings.csv_output)


"""
if __name__ == "__main__":
test = QueryDatabaseEntrypointSettings(
DB_DSN="postgresql://guest:guest@localhost:5432/patstat",
query_str=QueryStrInput(
QUERY="SELECT name FROM employees;"
),
csv_output=CSVOutput(
S3_HOST="http://localhost",
S3_PORT="9000",
S3_ACCESS_KEY="minioadmin",
S3_SECRET_KEY="minioadmin",
BUCKET_NAME="output-bucket",
FILE_PATH="output_file_path",
FILE_NAME="csv_file",
FILE_EXT="csv"
)
DB_DSN="postgresql+psycopg2://postgres:postgres@localhost:5432/postgres",
query_str=QueryStrInput(QUERY="SELECT * FROM test_table;"),
csv_output=CSVOutput(
S3_HOST="http://localhost",
S3_PORT="9000",
S3_ACCESS_KEY="minioadmin",
S3_SECRET_KEY="minioadmin",
BUCKET_NAME="output-bucket",
FILE_PATH="output_file_path",
FILE_NAME="csv_file",
FILE_EXT="csv",
),
)

run_query_from_string(test)
Expand Down
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
scystream-sdk==1.2.1
SQLAlchemy==2.0.43
scystream-sdk[all]==1.4.0
psycopg2-binary==2.9.10
PyMySQL==1.1.2
duckdb==1.4.1
Expand Down
Loading