Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions snt_dhis2_formatting/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
run_notebook,
run_report_notebook,
validate_config,
save_pipeline_parameters,
)


Expand Down Expand Up @@ -70,7 +71,17 @@ def snt_dhis2_formatting(run_report_only: bool, pull_scripts: bool):
if country_code is None:
current_run.log_warning("COUNTRY_CODE is not specified in the configuration.")

parameters_file = save_pipeline_parameters(
pipeline_name="snt_dhis2_formatting",
parameters={"run_report_only": run_report_only, "pull_scripts": pull_scripts},
output_path=snt_dhis2_formatted_path,
country_code=country_code,
)

# format data for SNT
dhis2_pyramid_formatting(
snt_root_path=snt_root_path, pipeline_root_path=snt_pipeline_path, snt_config=snt_config_dict
)
dhis2_analytics_formatting(
snt_root_path=snt_root_path, pipeline_root_path=snt_pipeline_path, snt_config=snt_config_dict
)
Expand All @@ -82,9 +93,6 @@ def snt_dhis2_formatting(run_report_only: bool, pull_scripts: bool):
dhis2_shapes_formatting(
snt_root_path=snt_root_path, pipeline_root_path=snt_pipeline_path, snt_config=snt_config_dict
)
dhis2_pyramid_formatting(
snt_root_path=snt_root_path, pipeline_root_path=snt_pipeline_path, snt_config=snt_config_dict
)
dhis2_reporting_rates_formatting(
snt_root_path=snt_root_path, pipeline_root_path=snt_pipeline_path, snt_config=snt_config_dict
)
Expand All @@ -102,6 +110,7 @@ def snt_dhis2_formatting(run_report_only: bool, pull_scripts: bool):
snt_dhis2_formatted_path / f"{country_code}_pyramid.csv",
snt_dhis2_formatted_path / f"{country_code}_reporting.parquet",
snt_dhis2_formatted_path / f"{country_code}_reporting.csv",
parameters_file,
],
)

Expand Down
16 changes: 8 additions & 8 deletions snt_dhis2_incidence/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,6 @@ def snt_dhis2_incidence(
}

if not run_report_only:
params_file = save_pipeline_parameters(
pipeline_name="snt_dhis2_incidence",
parameters=notebook_params,
output_path=data_path,
country_code=country_code,
)
current_run.log_info(f"Saved pipeline parameters to {params_file}")

run_notebook(
nb_path=pipeline_path / "code" / "snt_dhis2_incidence.ipynb",
out_nb_path=pipeline_path / "papermill_outputs",
Expand All @@ -147,6 +139,14 @@ def snt_dhis2_incidence(
country_code=country_code,
)

params_file = save_pipeline_parameters(
pipeline_name="snt_dhis2_incidence",
parameters=notebook_params,
output_path=data_path,
country_code=country_code,
)
current_run.log_info(f"Saved pipeline parameters to {params_file}")

add_files_to_dataset(
dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_INCIDENCE"],
country_code=country_code,
Expand Down
18 changes: 18 additions & 0 deletions snt_dhis2_outliers_detection/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
run_notebook,
run_report_notebook,
validate_config,
save_pipeline_parameters,
)


Expand Down Expand Up @@ -117,6 +118,21 @@ def run_pipeline_task(
"RUN_MAGIC_GLASSES_COMPLETE": run_mg_complete,
},
error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"},
country_code=country_code,
)

parameters_file = save_pipeline_parameters(
pipeline_name="snt_dhis2_outliers_detection",
parameters={
"ROOT_PATH": Path(workspace.files_path).as_posix(),
"DEVIATION_MEAN": deviation_mean,
"DEVIATION_MEDIAN": deviation_median,
"DEVIATION_IQR": deviation_iqr,
"RUN_MAGIC_GLASSES_PARTIAL": run_mg_partial,
"RUN_MAGIC_GLASSES_COMPLETE": run_mg_complete,
},
output_path=data_path,
country_code=country_code,
)

# Add files to Dataset
Expand All @@ -126,6 +142,7 @@ def run_pipeline_task(
file_paths=[
*[p for p in (data_path.glob(f"{country_code}_flagged_outliers_allmethods.parquet"))],
*[p for p in (data_path.glob(f"{country_code}_outlier_*.parquet"))],
parameters_file,
],
)

Expand All @@ -138,6 +155,7 @@ def run_pipeline_task(
nb_file=pipeline_path / "reporting" / "snt_dhis2_outliers_detection_report.ipynb",
nb_output_path=pipeline_path / "reporting" / "outputs",
error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"},
country_code=country_code,
)

current_run.log_info("Pipeline finished!")
Expand Down
16 changes: 16 additions & 0 deletions snt_dhis2_outliers_imputation_classic/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
run_report_notebook,
validate_config,
create_outliers_db_table,
save_pipeline_parameters,
)


Expand Down Expand Up @@ -115,6 +116,19 @@ def snt_dhis2_outliers_imputation_classic(
"DEVIATION_IQR": deviation_iqr,
},
error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"},
country_code=country_code,
)

parameters_file = save_pipeline_parameters(
pipeline_name="snt_dhis2_outliers_imputation_classic",
parameters={
"ROOT_PATH": Path(workspace.files_path).as_posix(),
"DEVIATION_MEAN": deviation_mean,
"DEVIATION_MEDIAN": deviation_median,
"DEVIATION_IQR": deviation_iqr,
},
output_path=data_path,
country_code=country_code,
)

# Add files to Dataset
Expand All @@ -123,6 +137,7 @@ def snt_dhis2_outliers_imputation_classic(
country_code=country_code,
file_paths=[
*data_path.glob(f"{country_code}_routine_outliers*.parquet"),
parameters_file,
],
)

Expand All @@ -137,6 +152,7 @@ def snt_dhis2_outliers_imputation_classic(
nb_file=pipeline_path / "reporting" / "snt_dhis2_outliers_imputation_classic_report.ipynb",
nb_output_path=pipeline_path / "reporting" / "outputs",
error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"},
country_code=country_code,
)

# TODO: For the shiny app, we can think in a procedure to collect all
Expand Down
30 changes: 20 additions & 10 deletions snt_dhis2_outliers_imputation_iqr/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
run_report_notebook,
validate_config,
create_outliers_db_table,
save_pipeline_parameters,
)


Expand Down Expand Up @@ -78,24 +79,32 @@ def snt_dhis2_outliers_imputation_iqr(
country_code = snt_config["SNT_CONFIG"]["COUNTRY_CODE"]

if not run_report_only:
input_params = {
"ROOT_PATH": Path(workspace.files_path).as_posix(),
"DEVIATION_IQR": deviation_iqr,
}
run_notebook(
nb_path=pipeline_path / "code" / "snt_dhis2_outliers_imputation_iqr.ipynb",
out_nb_path=pipeline_path / "papermill_outputs",
kernel_name="ir",
parameters={
"ROOT_PATH": Path(workspace.files_path).as_posix(),
"DEVIATION_IQR": deviation_iqr,
},
parameters=input_params,
error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"},
country_code=country_code,
)

parameters_file = save_pipeline_parameters(
pipeline_name="snt_dhis2_outliers_imputation_iqr",
parameters=input_params,
output_path=data_path,
country_code=country_code,
)

iqr_files = list(data_path.glob(f"{country_code}_routine_outliers-iqr*.parquet"))
if iqr_files:
add_files_to_dataset(
dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"],
country_code=country_code,
file_paths=iqr_files,
)
add_files_to_dataset(
dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"],
country_code=country_code,
file_paths=[*iqr_files, parameters_file],
)

if push_db:
create_outliers_db_table(country_code=country_code, data_path=data_path)
Expand All @@ -107,6 +116,7 @@ def snt_dhis2_outliers_imputation_iqr(
nb_file=pipeline_path / "reporting" / "snt_dhis2_outliers_imputation_iqr_report.ipynb",
nb_output_path=pipeline_path / "reporting" / "outputs",
error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"},
country_code=country_code,
)

current_run.log_info("Pipeline finished successfully.")
Expand Down
30 changes: 20 additions & 10 deletions snt_dhis2_outliers_imputation_mean/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
run_report_notebook,
validate_config,
create_outliers_db_table,
save_pipeline_parameters,
)


Expand Down Expand Up @@ -78,24 +79,32 @@ def snt_dhis2_outliers_imputation_mean(
country_code = snt_config["SNT_CONFIG"]["COUNTRY_CODE"]

if not run_report_only:
input_params = {
"ROOT_PATH": Path(workspace.files_path).as_posix(),
"DEVIATION_MEAN": deviation_mean,
}
run_notebook(
nb_path=pipeline_path / "code" / "snt_dhis2_outliers_imputation_mean.ipynb",
out_nb_path=pipeline_path / "papermill_outputs",
kernel_name="ir",
parameters={
"ROOT_PATH": Path(workspace.files_path).as_posix(),
"DEVIATION_MEAN": deviation_mean,
},
parameters=input_params,
error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"},
country_code=country_code,
)

parameters_file = save_pipeline_parameters(
pipeline_name="snt_dhis2_outliers_imputation_mean",
parameters=input_params,
output_path=data_path,
country_code=country_code,
)

mean_files = list(data_path.glob(f"{country_code}_routine_outliers-mean*.parquet"))
if mean_files:
add_files_to_dataset(
dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"],
country_code=country_code,
file_paths=mean_files,
)
add_files_to_dataset(
dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"],
country_code=country_code,
file_paths=[*mean_files, parameters_file],
)

if push_db:
create_outliers_db_table(country_code=country_code, data_path=data_path)
Expand All @@ -107,6 +116,7 @@ def snt_dhis2_outliers_imputation_mean(
nb_file=pipeline_path / "reporting" / "snt_dhis2_outliers_imputation_mean_report.ipynb",
nb_output_path=pipeline_path / "reporting" / "outputs",
error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"},
country_code=country_code,
)

current_run.log_info("Pipeline finished successfully.")
Expand Down
30 changes: 20 additions & 10 deletions snt_dhis2_outliers_imputation_median/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
run_report_notebook,
validate_config,
create_outliers_db_table,
save_pipeline_parameters,
)


Expand Down Expand Up @@ -78,24 +79,32 @@ def snt_dhis2_outliers_imputation_median(
country_code = snt_config["SNT_CONFIG"]["COUNTRY_CODE"]

if not run_report_only:
input_params = {
"ROOT_PATH": Path(workspace.files_path).as_posix(),
"DEVIATION_MEDIAN": deviation_median,
}
run_notebook(
nb_path=pipeline_path / "code" / "snt_dhis2_outliers_imputation_median.ipynb",
out_nb_path=pipeline_path / "papermill_outputs",
kernel_name="ir",
parameters={
"ROOT_PATH": Path(workspace.files_path).as_posix(),
"DEVIATION_MEDIAN": deviation_median,
},
parameters=input_params,
error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"},
country_code=country_code,
)

parameters_file = save_pipeline_parameters(
pipeline_name="snt_dhis2_outliers_imputation_median",
parameters=input_params,
output_path=data_path,
country_code=country_code,
)

median_files = list(data_path.glob(f"{country_code}_routine_outliers-median*.parquet"))
if median_files:
add_files_to_dataset(
dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"],
country_code=country_code,
file_paths=median_files,
)
add_files_to_dataset(
dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"],
country_code=country_code,
file_paths=[*median_files, parameters_file],
)

if push_db:
create_outliers_db_table(country_code=country_code, data_path=data_path)
Expand All @@ -107,6 +116,7 @@ def snt_dhis2_outliers_imputation_median(
nb_file=pipeline_path / "reporting" / "snt_dhis2_outliers_imputation_median_report.ipynb",
nb_output_path=pipeline_path / "reporting" / "outputs",
error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"},
country_code=country_code,
)

current_run.log_info("Pipeline finished successfully.")
Expand Down
20 changes: 16 additions & 4 deletions snt_dhis2_outliers_imputation_path/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
run_report_notebook,
validate_config,
create_outliers_db_table,
save_pipeline_parameters,
)


Expand Down Expand Up @@ -80,15 +81,24 @@ def snt_dhis2_outliers_imputation_path(
country_code = snt_config["SNT_CONFIG"]["COUNTRY_CODE"]

if not run_report_only:
input_params = {
"ROOT_PATH": Path(workspace.files_path).as_posix(),
"DEVIATION_MEAN": deviation_mean,
}
run_notebook(
nb_path=pipeline_path / "code" / "snt_dhis2_outliers_imputation_path.ipynb",
out_nb_path=pipeline_path / "papermill_outputs",
kernel_name="ir",
parameters={
"ROOT_PATH": Path(workspace.files_path).as_posix(),
"DEVIATION_MEAN": deviation_mean,
},
parameters=input_params,
error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"},
country_code=country_code,
)

parameters_file = save_pipeline_parameters(
pipeline_name="snt_dhis2_outliers_imputation_path",
parameters=input_params,
output_path=data_path,
country_code=country_code,
)

# Add files to Dataset
Expand All @@ -97,6 +107,7 @@ def snt_dhis2_outliers_imputation_path(
country_code=country_code,
file_paths=[
*data_path.glob(f"{country_code}_routine_outliers*.parquet"),
parameters_file,
],
)

Expand All @@ -111,6 +122,7 @@ def snt_dhis2_outliers_imputation_path(
nb_file=pipeline_path / "reporting" / "snt_dhis2_outliers_imputation_path_report.ipynb",
nb_output_path=pipeline_path / "reporting" / "outputs",
error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"},
country_code=country_code,
)

# TODO: For the shiny app, we can think in a procedure to collect all
Expand Down
Loading