diff --git a/snt_dhis2_formatting/pipeline.py b/snt_dhis2_formatting/pipeline.py index dd95b26..11b029f 100644 --- a/snt_dhis2_formatting/pipeline.py +++ b/snt_dhis2_formatting/pipeline.py @@ -9,6 +9,7 @@ run_notebook, run_report_notebook, validate_config, + save_pipeline_parameters, ) @@ -70,7 +71,17 @@ def snt_dhis2_formatting(run_report_only: bool, pull_scripts: bool): if country_code is None: current_run.log_warning("COUNTRY_CODE is not specified in the configuration.") + parameters_file = save_pipeline_parameters( + pipeline_name="snt_dhis2_formatting", + parameters={"run_report_only": run_report_only, "pull_scripts": pull_scripts}, + output_path=snt_dhis2_formatted_path, + country_code=country_code, + ) + # format data for SNT + dhis2_pyramid_formatting( + snt_root_path=snt_root_path, pipeline_root_path=snt_pipeline_path, snt_config=snt_config_dict + ) dhis2_analytics_formatting( snt_root_path=snt_root_path, pipeline_root_path=snt_pipeline_path, snt_config=snt_config_dict ) @@ -82,9 +93,6 @@ def snt_dhis2_formatting(run_report_only: bool, pull_scripts: bool): dhis2_shapes_formatting( snt_root_path=snt_root_path, pipeline_root_path=snt_pipeline_path, snt_config=snt_config_dict ) - dhis2_pyramid_formatting( - snt_root_path=snt_root_path, pipeline_root_path=snt_pipeline_path, snt_config=snt_config_dict - ) dhis2_reporting_rates_formatting( snt_root_path=snt_root_path, pipeline_root_path=snt_pipeline_path, snt_config=snt_config_dict ) @@ -102,6 +110,7 @@ def snt_dhis2_formatting(run_report_only: bool, pull_scripts: bool): snt_dhis2_formatted_path / f"{country_code}_pyramid.csv", snt_dhis2_formatted_path / f"{country_code}_reporting.parquet", snt_dhis2_formatted_path / f"{country_code}_reporting.csv", + parameters_file, ], ) diff --git a/snt_dhis2_incidence/pipeline.py b/snt_dhis2_incidence/pipeline.py index ce83aa6..76a9dd8 100644 --- a/snt_dhis2_incidence/pipeline.py +++ b/snt_dhis2_incidence/pipeline.py @@ -131,14 +131,6 @@ def snt_dhis2_incidence( } if not run_report_only: - params_file = save_pipeline_parameters( - pipeline_name="snt_dhis2_incidence", - parameters=notebook_params, - output_path=data_path, - country_code=country_code, - ) - current_run.log_info(f"Saved pipeline parameters to {params_file}") - run_notebook( nb_path=pipeline_path / "code" / "snt_dhis2_incidence.ipynb", out_nb_path=pipeline_path / "papermill_outputs", @@ -147,6 +139,14 @@ def snt_dhis2_incidence( country_code=country_code, ) + params_file = save_pipeline_parameters( + pipeline_name="snt_dhis2_incidence", + parameters=notebook_params, + output_path=data_path, + country_code=country_code, + ) + current_run.log_info(f"Saved pipeline parameters to {params_file}") + add_files_to_dataset( dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_INCIDENCE"], country_code=country_code, diff --git a/snt_dhis2_outliers_detection/pipeline.py b/snt_dhis2_outliers_detection/pipeline.py index ac030ba..50a7cb6 100644 --- a/snt_dhis2_outliers_detection/pipeline.py +++ b/snt_dhis2_outliers_detection/pipeline.py @@ -8,6 +8,7 @@ run_notebook, run_report_notebook, validate_config, + save_pipeline_parameters, ) @@ -117,6 +118,21 @@ def run_pipeline_task( "RUN_MAGIC_GLASSES_COMPLETE": run_mg_complete, }, error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, + ) + + parameters_file = save_pipeline_parameters( + pipeline_name="snt_dhis2_outliers_detection", + parameters={ + "ROOT_PATH": Path(workspace.files_path).as_posix(), + "DEVIATION_MEAN": deviation_mean, + "DEVIATION_MEDIAN": deviation_median, + "DEVIATION_IQR": deviation_iqr, + "RUN_MAGIC_GLASSES_PARTIAL": run_mg_partial, + "RUN_MAGIC_GLASSES_COMPLETE": run_mg_complete, + }, + output_path=data_path, + country_code=country_code, ) # Add files to Dataset @@ -126,6 +142,7 @@ def run_pipeline_task( file_paths=[ *[p for p in (data_path.glob(f"{country_code}_flagged_outliers_allmethods.parquet"))], *[p for p in (data_path.glob(f"{country_code}_outlier_*.parquet"))], + parameters_file, ], ) @@ -138,6 +155,7 @@ def run_pipeline_task( nb_file=pipeline_path / "reporting" / "snt_dhis2_outliers_detection_report.ipynb", nb_output_path=pipeline_path / "reporting" / "outputs", error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, ) current_run.log_info("Pipeline finished!") diff --git a/snt_dhis2_outliers_imputation_classic/pipeline.py b/snt_dhis2_outliers_imputation_classic/pipeline.py index e9bb8b0..bb34c83 100644 --- a/snt_dhis2_outliers_imputation_classic/pipeline.py +++ b/snt_dhis2_outliers_imputation_classic/pipeline.py @@ -9,6 +9,7 @@ run_report_notebook, validate_config, create_outliers_db_table, + save_pipeline_parameters, ) @@ -115,6 +116,19 @@ def snt_dhis2_outliers_imputation_classic( "DEVIATION_IQR": deviation_iqr, }, error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, + ) + + parameters_file = save_pipeline_parameters( + pipeline_name="snt_dhis2_outliers_imputation_classic", + parameters={ + "ROOT_PATH": Path(workspace.files_path).as_posix(), + "DEVIATION_MEAN": deviation_mean, + "DEVIATION_MEDIAN": deviation_median, + "DEVIATION_IQR": deviation_iqr, + }, + output_path=data_path, + country_code=country_code, ) # Add files to Dataset @@ -123,6 +137,7 @@ def snt_dhis2_outliers_imputation_classic( country_code=country_code, file_paths=[ *data_path.glob(f"{country_code}_routine_outliers*.parquet"), + parameters_file, ], ) @@ -137,6 +152,7 @@ def snt_dhis2_outliers_imputation_classic( nb_file=pipeline_path / "reporting" / "snt_dhis2_outliers_imputation_classic_report.ipynb", nb_output_path=pipeline_path / "reporting" / "outputs", error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, ) # TODO: For the shiny app, we can think in a procedure to collect all diff --git a/snt_dhis2_outliers_imputation_iqr/pipeline.py b/snt_dhis2_outliers_imputation_iqr/pipeline.py index 7388a71..088d0fb 100644 --- a/snt_dhis2_outliers_imputation_iqr/pipeline.py +++ b/snt_dhis2_outliers_imputation_iqr/pipeline.py @@ -9,6 +9,7 @@ run_report_notebook, validate_config, create_outliers_db_table, + save_pipeline_parameters, ) @@ -78,24 +79,32 @@ def snt_dhis2_outliers_imputation_iqr( country_code = snt_config["SNT_CONFIG"]["COUNTRY_CODE"] if not run_report_only: + input_params = { + "ROOT_PATH": Path(workspace.files_path).as_posix(), + "DEVIATION_IQR": deviation_iqr, + } run_notebook( nb_path=pipeline_path / "code" / "snt_dhis2_outliers_imputation_iqr.ipynb", out_nb_path=pipeline_path / "papermill_outputs", kernel_name="ir", - parameters={ - "ROOT_PATH": Path(workspace.files_path).as_posix(), - "DEVIATION_IQR": deviation_iqr, - }, + parameters=input_params, error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, + ) + + parameters_file = save_pipeline_parameters( + pipeline_name="snt_dhis2_outliers_imputation_iqr", + parameters=input_params, + output_path=data_path, + country_code=country_code, ) iqr_files = list(data_path.glob(f"{country_code}_routine_outliers-iqr*.parquet")) - if iqr_files: - add_files_to_dataset( - dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"], - country_code=country_code, - file_paths=iqr_files, - ) + add_files_to_dataset( + dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"], + country_code=country_code, + file_paths=[*iqr_files, parameters_file], + ) if push_db: create_outliers_db_table(country_code=country_code, data_path=data_path) @@ -107,6 +116,7 @@ def snt_dhis2_outliers_imputation_iqr( nb_file=pipeline_path / "reporting" / "snt_dhis2_outliers_imputation_iqr_report.ipynb", nb_output_path=pipeline_path / "reporting" / "outputs", error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, ) current_run.log_info("Pipeline finished successfully.") diff --git a/snt_dhis2_outliers_imputation_mean/pipeline.py b/snt_dhis2_outliers_imputation_mean/pipeline.py index e50a086..b3ca719 100644 --- a/snt_dhis2_outliers_imputation_mean/pipeline.py +++ b/snt_dhis2_outliers_imputation_mean/pipeline.py @@ -9,6 +9,7 @@ run_report_notebook, validate_config, create_outliers_db_table, + save_pipeline_parameters, ) @@ -78,24 +79,32 @@ def snt_dhis2_outliers_imputation_mean( country_code = snt_config["SNT_CONFIG"]["COUNTRY_CODE"] if not run_report_only: + input_params = { + "ROOT_PATH": Path(workspace.files_path).as_posix(), + "DEVIATION_MEAN": deviation_mean, + } run_notebook( nb_path=pipeline_path / "code" / "snt_dhis2_outliers_imputation_mean.ipynb", out_nb_path=pipeline_path / "papermill_outputs", kernel_name="ir", - parameters={ - "ROOT_PATH": Path(workspace.files_path).as_posix(), - "DEVIATION_MEAN": deviation_mean, - }, + parameters=input_params, error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, + ) + + parameters_file = save_pipeline_parameters( + pipeline_name="snt_dhis2_outliers_imputation_mean", + parameters=input_params, + output_path=data_path, + country_code=country_code, ) mean_files = list(data_path.glob(f"{country_code}_routine_outliers-mean*.parquet")) - if mean_files: - add_files_to_dataset( - dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"], - country_code=country_code, - file_paths=mean_files, - ) + add_files_to_dataset( + dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"], + country_code=country_code, + file_paths=[*mean_files, parameters_file], + ) if push_db: create_outliers_db_table(country_code=country_code, data_path=data_path) @@ -107,6 +116,7 @@ def snt_dhis2_outliers_imputation_mean( nb_file=pipeline_path / "reporting" / "snt_dhis2_outliers_imputation_mean_report.ipynb", nb_output_path=pipeline_path / "reporting" / "outputs", error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, ) current_run.log_info("Pipeline finished successfully.") diff --git a/snt_dhis2_outliers_imputation_median/pipeline.py b/snt_dhis2_outliers_imputation_median/pipeline.py index f3b5503..d933266 100644 --- a/snt_dhis2_outliers_imputation_median/pipeline.py +++ b/snt_dhis2_outliers_imputation_median/pipeline.py @@ -9,6 +9,7 @@ run_report_notebook, validate_config, create_outliers_db_table, + save_pipeline_parameters, ) @@ -78,24 +79,32 @@ def snt_dhis2_outliers_imputation_median( country_code = snt_config["SNT_CONFIG"]["COUNTRY_CODE"] if not run_report_only: + input_params = { + "ROOT_PATH": Path(workspace.files_path).as_posix(), + "DEVIATION_MEDIAN": deviation_median, + } run_notebook( nb_path=pipeline_path / "code" / "snt_dhis2_outliers_imputation_median.ipynb", out_nb_path=pipeline_path / "papermill_outputs", kernel_name="ir", - parameters={ - "ROOT_PATH": Path(workspace.files_path).as_posix(), - "DEVIATION_MEDIAN": deviation_median, - }, + parameters=input_params, error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, + ) + + parameters_file = save_pipeline_parameters( + pipeline_name="snt_dhis2_outliers_imputation_median", + parameters=input_params, + output_path=data_path, + country_code=country_code, ) median_files = list(data_path.glob(f"{country_code}_routine_outliers-median*.parquet")) - if median_files: - add_files_to_dataset( - dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"], - country_code=country_code, - file_paths=median_files, - ) + add_files_to_dataset( + dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"], + country_code=country_code, + file_paths=[*median_files, parameters_file], + ) if push_db: create_outliers_db_table(country_code=country_code, data_path=data_path) @@ -107,6 +116,7 @@ def snt_dhis2_outliers_imputation_median( nb_file=pipeline_path / "reporting" / "snt_dhis2_outliers_imputation_median_report.ipynb", nb_output_path=pipeline_path / "reporting" / "outputs", error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, ) current_run.log_info("Pipeline finished successfully.") diff --git a/snt_dhis2_outliers_imputation_path/pipeline.py b/snt_dhis2_outliers_imputation_path/pipeline.py index 665d4d0..1f98651 100644 --- a/snt_dhis2_outliers_imputation_path/pipeline.py +++ b/snt_dhis2_outliers_imputation_path/pipeline.py @@ -9,6 +9,7 @@ run_report_notebook, validate_config, create_outliers_db_table, + save_pipeline_parameters, ) @@ -80,15 +81,24 @@ def snt_dhis2_outliers_imputation_path( country_code = snt_config["SNT_CONFIG"]["COUNTRY_CODE"] if not run_report_only: + input_params = { + "ROOT_PATH": Path(workspace.files_path).as_posix(), + "DEVIATION_MEAN": deviation_mean, + } run_notebook( nb_path=pipeline_path / "code" / "snt_dhis2_outliers_imputation_path.ipynb", out_nb_path=pipeline_path / "papermill_outputs", kernel_name="ir", - parameters={ - "ROOT_PATH": Path(workspace.files_path).as_posix(), - "DEVIATION_MEAN": deviation_mean, - }, + parameters=input_params, error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, + ) + + parameters_file = save_pipeline_parameters( + pipeline_name="snt_dhis2_outliers_imputation_path", + parameters=input_params, + output_path=data_path, + country_code=country_code, ) # Add files to Dataset @@ -97,6 +107,7 @@ def snt_dhis2_outliers_imputation_path( country_code=country_code, file_paths=[ *data_path.glob(f"{country_code}_routine_outliers*.parquet"), + parameters_file, ], ) @@ -111,6 +122,7 @@ def snt_dhis2_outliers_imputation_path( nb_file=pipeline_path / "reporting" / "snt_dhis2_outliers_imputation_path_report.ipynb", nb_output_path=pipeline_path / "reporting" / "outputs", error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, ) # TODO: For the shiny app, we can think in a procedure to collect all diff --git a/snt_dhis2_outliers_removal_imputation/pipeline.py b/snt_dhis2_outliers_removal_imputation/pipeline.py index 47d5009..88baedc 100644 --- a/snt_dhis2_outliers_removal_imputation/pipeline.py +++ b/snt_dhis2_outliers_removal_imputation/pipeline.py @@ -8,6 +8,7 @@ run_notebook, run_report_notebook, validate_config, + save_pipeline_parameters, ) @@ -73,6 +74,17 @@ def run_pipeline_task(outlier_method: str, run_report_only: bool, pull_scripts: "ROOT_PATH": root_path.as_posix(), }, error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, + ) + + parameters_file = save_pipeline_parameters( + pipeline_name="snt_dhis2_outliers_removal_imputation", + parameters={ + "OUTLIER_METHOD": outlier_method, + "ROOT_PATH": root_path.as_posix(), + }, + output_path=data_path, + country_code=country_code, ) add_files_to_dataset( @@ -81,6 +93,7 @@ def run_pipeline_task(outlier_method: str, run_report_only: bool, pull_scripts: file_paths=[ *[p for p in (data_path.glob(f"{country_code}_routine_outliers-*_*.parquet"))], *[p for p in (data_path.glob(f"{country_code}_routine_outliers-*_*.csv"))], + parameters_file, ], ) @@ -94,6 +107,7 @@ def run_pipeline_task(outlier_method: str, run_report_only: bool, pull_scripts: nb_file=pipeline_path / "reporting" / "snt_dhis2_outliers_removal_imputation_report.ipynb", nb_output_path=pipeline_path / "reporting" / "outputs", error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, ) current_run.log_info("Pipeline finished!") diff --git a/snt_dhis2_population_transformation/pipeline.py b/snt_dhis2_population_transformation/pipeline.py index 2725815..208a77d 100644 --- a/snt_dhis2_population_transformation/pipeline.py +++ b/snt_dhis2_population_transformation/pipeline.py @@ -110,6 +110,7 @@ def snt_dhis2_population_transformation(adjust_population: bool, run_report_only nb_file=snt_pipeline_path / "reporting" / "snt_dhis2_population_transformation_report.ipynb", nb_output_path=snt_pipeline_path / "reporting" / "outputs", error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, ) except Exception as e: @@ -148,6 +149,7 @@ def dhis2_population_transformation( out_nb_path=pipeline_root_path / "papermill_outputs", parameters=nb_parameter, error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, ) except Exception as e: raise Exception(f"Error in formatting analytics data: {e}") from e diff --git a/snt_dhis2_reporting_rate/pipeline.py b/snt_dhis2_reporting_rate/pipeline.py index 64d6956..02b4c48 100644 --- a/snt_dhis2_reporting_rate/pipeline.py +++ b/snt_dhis2_reporting_rate/pipeline.py @@ -7,6 +7,7 @@ run_notebook, run_report_notebook, validate_config, + save_pipeline_parameters, ) # Pipeline for calculating DHIS2 reporting rates with configurable parameters. @pipeline("snt_dhis2_reporting_rate") @@ -101,26 +102,36 @@ def snt_dhis2_reporting_rate( country_code = snt_config["SNT_CONFIG"]["COUNTRY_CODE"] if not run_report_only: + input_params = { + "SNT_ROOT_PATH": root_path.as_posix(), + "REPORTING_RATE_METHOD": reporting_rate_method, + "DATAELEMENT_METHOD_NUMERATOR_CONF": dataelement_method_numerator_conf, + "DATAELEMENT_METHOD_NUMERATOR_SUSP": dataelement_method_numerator_susp, + "DATAELEMENT_METHOD_NUMERATOR_TEST": dataelement_method_numerator_test, + "DATAELEMENT_METHOD_DENOMINATOR": dataelement_method_denominator, + } run_notebook( nb_path=pipeline_path / "code" / "snt_dhis2_reporting_rate.ipynb", out_nb_path=pipeline_path / "papermill_outputs", - parameters={ - "SNT_ROOT_PATH": root_path.as_posix(), - "REPORTING_RATE_METHOD": reporting_rate_method, - "DATAELEMENT_METHOD_NUMERATOR_CONF": dataelement_method_numerator_conf, - "DATAELEMENT_METHOD_NUMERATOR_SUSP": dataelement_method_numerator_susp, - "DATAELEMENT_METHOD_NUMERATOR_TEST": dataelement_method_numerator_test, - "DATAELEMENT_METHOD_DENOMINATOR": dataelement_method_denominator - }, - error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"} - ) + parameters=input_params, + error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, + ) + + parameters_file = save_pipeline_parameters( + pipeline_name="snt_dhis2_reporting_rate", + parameters=input_params, + output_path=data_path, + country_code=country_code, + ) add_files_to_dataset( dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_REPORTING_RATE"], country_code=country_code, file_paths=[ *[p for p in (data_path.glob(f"{country_code}_reporting_rate_*.parquet"))], - *[p for p in (data_path.glob(f"{country_code}_reporting_rate_*.csv"))] + *[p for p in (data_path.glob(f"{country_code}_reporting_rate_*.csv"))], + parameters_file, ], ) @@ -130,6 +141,7 @@ def snt_dhis2_reporting_rate( run_report_notebook( nb_file=pipeline_path / "reporting" / "snt_dhis2_reporting_rate_report.ipynb", nb_output_path=pipeline_path / "reporting" / "outputs", + country_code=country_code, ) current_run.log_info("Pipeline completed successfully!") diff --git a/snt_dhis2_reporting_rate_dataelement/pipeline.py b/snt_dhis2_reporting_rate_dataelement/pipeline.py index 18bb47f..bfbef73 100644 --- a/snt_dhis2_reporting_rate_dataelement/pipeline.py +++ b/snt_dhis2_reporting_rate_dataelement/pipeline.py @@ -160,6 +160,14 @@ def snt_dhis2_reporting_rate_dataelement( "USE_WEIGHTED_REPORTING_RATES": use_weighted_reporting_rates, } + run_notebook( + nb_path=pipeline_path / "code" / "snt_dhis2_reporting_rate_dataelement.ipynb", + out_nb_path=pipeline_path / "papermill_outputs", + parameters=nb_parameters, + error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, + ) + params_file = save_pipeline_parameters( pipeline_name="snt_dhis2_reporting_rate_dataelement", parameters=nb_parameters, @@ -168,13 +176,6 @@ def snt_dhis2_reporting_rate_dataelement( ) current_run.log_info(f"Saved pipeline parameters to {params_file}") - run_notebook( - nb_path=pipeline_path / "code" / "snt_dhis2_reporting_rate_dataelement.ipynb", - out_nb_path=pipeline_path / "papermill_outputs", - parameters=nb_parameters, - error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, - ) - add_files_to_dataset( dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_REPORTING_RATE"], country_code=country_code, @@ -191,6 +192,7 @@ def snt_dhis2_reporting_rate_dataelement( run_report_notebook( nb_file=pipeline_path / "reporting" / "snt_dhis2_reporting_rate_dataelement_report.ipynb", nb_output_path=pipeline_path / "reporting" / "outputs", + country_code=country_code, ) current_run.log_info("Pipeline completed successfully!") diff --git a/snt_dhis2_reporting_rate_dataset/pipeline.py b/snt_dhis2_reporting_rate_dataset/pipeline.py index 889281e..123f105 100644 --- a/snt_dhis2_reporting_rate_dataset/pipeline.py +++ b/snt_dhis2_reporting_rate_dataset/pipeline.py @@ -111,21 +111,22 @@ def snt_dhis2_reporting_rate_dataset( "ROUTINE_FILE": routine_file, } - params_file = save_pipeline_parameters( - pipeline_name="snt_dhis2_reporting_rate_dataelement", - parameters=nb_parameters, - output_path=data_path, - country_code=country_code, - ) - current_run.log_info(f"Saved pipeline parameters to {params_file}") - run_notebook( nb_path=pipeline_path / "code" / "snt_dhis2_reporting_rate_dataset.ipynb", out_nb_path=pipeline_path / "papermill_outputs", parameters=nb_parameters, error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, ) + params_file = save_pipeline_parameters( + pipeline_name="snt_dhis2_reporting_rate_dataset", + parameters=nb_parameters, + output_path=data_path, + country_code=country_code, + ) + current_run.log_info(f"Saved pipeline parameters to {params_file}") + add_files_to_dataset( dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_REPORTING_RATE"], country_code=country_code, @@ -142,6 +143,7 @@ def snt_dhis2_reporting_rate_dataset( run_report_notebook( nb_file=pipeline_path / "reporting" / "snt_dhis2_reporting_rate_dataset_report.ipynb", nb_output_path=pipeline_path / "reporting" / "outputs", + country_code=country_code, ) current_run.log_info("Pipeline completed successfully!") diff --git a/snt_dhs_indicators/pipeline.py b/snt_dhs_indicators/pipeline.py index c8f8b5f..f3048a8 100644 --- a/snt_dhs_indicators/pipeline.py +++ b/snt_dhs_indicators/pipeline.py @@ -7,6 +7,7 @@ run_notebook, run_report_notebook, validate_config, + save_pipeline_parameters, ) @@ -69,11 +70,19 @@ def dhs_indicators(run_reports_only: bool, pull_scripts: bool) -> None: # get country identifier for naming country_code = snt_config_dict["SNT_CONFIG"].get("COUNTRY_CODE") + parameters_file = save_pipeline_parameters( + pipeline_name="snt_dhs_indicators", + parameters={"run_reports_only": run_reports_only, "pull_scripts": pull_scripts}, + output_path=data_output_path, + country_code=country_code, + ) + run_dhs_indicator_notebooks( pipeline_root_path=pipeline_path, computation_notebook_name="snt_dhs_bednets_computation.ipynb", reporting_notebook_name="snt_dhs_bednets_report.ipynb", run_report_only=run_reports_only, + country_code=country_code, ) run_dhs_indicator_notebooks( @@ -81,6 +90,7 @@ def dhs_indicators(run_reports_only: bool, pull_scripts: bool) -> None: computation_notebook_name="snt_dhs_careseeking_computation.ipynb", reporting_notebook_name="snt_dhs_careseeking_report.ipynb", run_report_only=run_reports_only, + country_code=country_code, ) run_dhs_indicator_notebooks( @@ -88,6 +98,7 @@ def dhs_indicators(run_reports_only: bool, pull_scripts: bool) -> None: computation_notebook_name="snt_dhs_mortality_computation.ipynb", reporting_notebook_name="snt_dhs_mortality_report.ipynb", run_report_only=run_reports_only, + country_code=country_code, ) run_dhs_indicator_notebooks( @@ -95,6 +106,7 @@ def dhs_indicators(run_reports_only: bool, pull_scripts: bool) -> None: computation_notebook_name="snt_dhs_prevalence_computation.ipynb", reporting_notebook_name="snt_dhs_prevalence_report.ipynb", run_report_only=run_reports_only, + country_code=country_code, ) run_dhs_indicator_notebooks( @@ -102,6 +114,7 @@ def dhs_indicators(run_reports_only: bool, pull_scripts: bool) -> None: computation_notebook_name="snt_dhs_vaccination_computation.ipynb", reporting_notebook_name="snt_dhs_vaccination_report.ipynb", run_report_only=run_reports_only, + country_code=country_code, ) # add files to a new dataset version @@ -109,6 +122,7 @@ def dhs_indicators(run_reports_only: bool, pull_scripts: bool) -> None: dataset_id=snt_config_dict["SNT_DATASET_IDENTIFIERS"].get("DHS_INDICATORS", None), country_code=country_code, file_paths=[ + parameters_file, # bednet access files data_output_path / "bednets" @@ -198,6 +212,7 @@ def run_dhs_indicator_notebooks( computation_notebook_name: str, reporting_notebook_name: str, run_report_only: bool = False, + country_code: str | None = None, ) -> None: """Execute the computation notebook and generate a report using the reporting notebook. @@ -206,6 +221,7 @@ def run_dhs_indicator_notebooks( computation_notebook_name (str): Filename of the computation notebook. reporting_notebook_name (str): Filename of the reporting notebook. run_report_only (bool): If True, only the reporting notebook will be executed. + country_code: Country code for run_notebook save-parameters functionality. """ computation_notebook_path = pipeline_root_path / "code" / computation_notebook_name @@ -217,7 +233,8 @@ def run_dhs_indicator_notebooks( run_notebook( nb_path=computation_notebook_path, out_nb_path=papermill_folder_path, - parameters=None, + parameters={}, + country_code=country_code, ) except Exception as e: raise Exception(f"Error running computation notebook '{computation_notebook_name}': {e}") from e @@ -226,6 +243,7 @@ def run_dhs_indicator_notebooks( run_report_notebook( nb_file=reporting_folder_path / reporting_notebook_name, nb_output_path=reporting_folder_path / "outputs", + country_code=country_code, ) except Exception as e: raise Exception(f"Error running reporting notebook '{reporting_notebook_name}': {e}") from e diff --git a/snt_era5_aggregate/pipeline.py b/snt_era5_aggregate/pipeline.py index 0789e2b..c6147c8 100644 --- a/snt_era5_aggregate/pipeline.py +++ b/snt_era5_aggregate/pipeline.py @@ -25,6 +25,7 @@ load_configuration_snt, run_report_notebook, validate_config, + save_pipeline_parameters, ) from openhexa.toolbox.era5.cds import VARIABLES @@ -82,6 +83,13 @@ def era5_aggregate(run_report_only: bool, pull_scripts: bool): ) era5_dataset_id = snt_config_dict["SNT_DATASET_IDENTIFIERS"].get("ERA5_DATASET_CLIMATE") + parameters_file = save_pipeline_parameters( + pipeline_name="snt_era5_aggregate", + parameters={"run_report_only": run_report_only, "pull_scripts": pull_scripts}, + output_path=output_dir, + country_code=country_code, + ) + # get boundaries geometries from formatted dataset boundaries = read_boundaries( dhis2_formatted_dataset_id, filename=f"{country_code}_shapes.geojson" @@ -169,12 +177,13 @@ def era5_aggregate(run_report_only: bool, pull_scripts: bool): add_files_to_dataset( dataset_id=era5_dataset_id, country_code=country_code, - file_paths=filename_list, + file_paths=[*filename_list, parameters_file], ) run_report_notebook( nb_file=snt_pipeline_path / "reporting" / "snt_era5_aggregate_report.ipynb", nb_output_path=snt_pipeline_path / "reporting" / "outputs", + country_code=country_code, ) except Exception as e: diff --git a/snt_healthcare_access/pipeline.py b/snt_healthcare_access/pipeline.py index d1b01c1..b3ca55f 100644 --- a/snt_healthcare_access/pipeline.py +++ b/snt_healthcare_access/pipeline.py @@ -7,6 +7,7 @@ run_report_notebook, validate_config, pull_scripts_from_repository, + save_pipeline_parameters, ) @@ -99,15 +100,24 @@ def snt_healthcare_access( country_code = snt_config_dict["SNT_CONFIG"].get("COUNTRY_CODE") if not run_report_only: + input_params = { + "FOSA_FILE": input_fosa_file.path if input_fosa_file is not None else None, + "RADIUS_METERS": input_radius_meters, + "POP_FILE": input_pop_file.path if input_pop_file is not None else None, + } run_notebook( nb_path=pipeline_path / "code" / "snt_healthcare_access.ipynb", out_nb_path=pipeline_path / "papermill_outputs", - parameters={ - "FOSA_FILE": input_fosa_file.path if input_fosa_file is not None else None, - "RADIUS_METERS": input_radius_meters, - "POP_FILE": input_pop_file.path if input_pop_file is not None else None, - }, + parameters=input_params, error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, + ) + + parameters_file = save_pipeline_parameters( + pipeline_name="snt_healthcare_access", + parameters=input_params, + output_path=data_output_path, + country_code=country_code, ) # add files to a new dataset version @@ -117,6 +127,7 @@ def snt_healthcare_access( file_paths=[ data_output_path / f"{country_code}_population_covered_health.parquet", data_output_path / f"{country_code}_population_covered_health.csv", + parameters_file, ], ) @@ -127,6 +138,7 @@ def snt_healthcare_access( nb_file=pipeline_path / "reporting" / "snt_healthcare_access_report.ipynb", nb_output_path=pipeline_path / "reporting" / "outputs", error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, ) current_run.log_info("Pipeline finished!") diff --git a/snt_map_extract(DEPRECATED)/pipeline.py b/snt_map_extract(DEPRECATED)/pipeline.py index 1010483..b8295fc 100644 --- a/snt_map_extract(DEPRECATED)/pipeline.py +++ b/snt_map_extract(DEPRECATED)/pipeline.py @@ -149,6 +149,7 @@ def snt_map_extract( nb_file=pipeline_path / "reporting" / "snt_map_extract_report.ipynb", nb_output_path=pipeline_path / "reporting" / "outputs", nb_parameters=None, + country_code=country_code, ) current_run.log_info("Pipeline completed successfully!") diff --git a/snt_map_extracts/pipeline.py b/snt_map_extracts/pipeline.py index 47d5c66..432b480 100644 --- a/snt_map_extracts/pipeline.py +++ b/snt_map_extracts/pipeline.py @@ -19,6 +19,7 @@ run_report_notebook, get_file_from_dataset, validate_config, + save_pipeline_parameters, ) from malariaAtlasProject.map import MAPRasterExtractor, MAPExtractorError from malariaAtlasProject.map_utils import ( @@ -109,6 +110,19 @@ def snt_map_extracts( output_path = root_path / "data" / "map" output_path.mkdir(parents=True, exist_ok=True) + input_params = { + "pop_raster_selection": pop_raster_selection.path if pop_raster_selection else None, + "target_year": target_year, + "run_report_only": run_report_only, + "pull_scripts": pull_scripts, + } + parameters_file = save_pipeline_parameters( + pipeline_name="snt_map_extracts", + parameters=input_params, + output_path=output_path, + country_code=country_code, + ) + # Validate population raster (optional) if pop_raster_selection: log_message(logger, f"Population raster selected: {pop_raster_selection.path}") @@ -132,6 +146,7 @@ def snt_map_extracts( file_paths=[ output_path / "formatted" / country_code / f"{country_code}_map_data.parquet", output_path / "formatted" / country_code / f"{country_code}_map_data.csv", + parameters_file, ], ) @@ -141,6 +156,7 @@ def snt_map_extracts( run_report_notebook( nb_file=pipeline_path / "reporting" / "snt_map_extracts_report.ipynb", nb_output_path=pipeline_path / "reporting" / "outputs", + country_code=country_code, ) log_message(logger, "Pipeline completed successfully!") diff --git a/snt_seasonality/pipeline.py b/snt_seasonality/pipeline.py index fc6bc01..51317e1 100644 --- a/snt_seasonality/pipeline.py +++ b/snt_seasonality/pipeline.py @@ -1,13 +1,14 @@ -from datetime import datetime from pathlib import Path -import papermill as pm + from openhexa.sdk import current_run, parameter, pipeline, workspace from snt_lib.snt_pipeline_utils import ( pull_scripts_from_repository, add_files_to_dataset, load_configuration_snt, + run_notebook, run_report_notebook, validate_config, + save_pipeline_parameters, ) @@ -142,11 +143,19 @@ def snt_seasonality( "maximum_month_block_size": maximum_month_block_size, "threshold_for_seasonality": threshold_for_seasonality, "threshold_proportion_seasonal_years": threshold_proportion_seasonal_years, + "run_precipitation": run_precipitation, + "run_cases": run_cases, } validate_parameters(params) if not run_report_only: - files_to_ds = [] + parameters_file = save_pipeline_parameters( + pipeline_name="snt_seasonality", + parameters=params, + output_path=data_path, + country_code=country_code, + ) + files_to_ds = [parameters_file] error_messages = ["ERROR 1", "ERROR 2", "ERROR 3"] seasonality_nb = pipeline_path / "code" / "snt_seasonality.ipynb" @@ -154,12 +163,13 @@ def snt_seasonality( if run_precipitation: current_run.log_info(f"Running precipitation analysis with notebook : {seasonality_nb}") try: - params["type_of_seasonality"] = "precipitation" + nb_params = {**params, "type_of_seasonality": "precipitation"} run_notebook_for_type( nb_path=seasonality_nb, seasonality_type="precipitation", out_nb_path=pipeline_path / "papermill_outputs", - parameters=params, + parameters=nb_params, + country_code=country_code, ) files_to_ds.append(data_path / f"{country_code}_precipitation_seasonality.parquet") files_to_ds.append(data_path / f"{country_code}_precipitation_seasonality.csv") @@ -179,12 +189,13 @@ def snt_seasonality( if run_cases: current_run.log_info(f"Running cases analysis with notebook : {seasonality_nb}") try: - params["type_of_seasonality"] = "cases" + nb_params = {**params, "type_of_seasonality": "cases"} run_notebook_for_type( nb_path=seasonality_nb, seasonality_type="cases", out_nb_path=pipeline_path / "papermill_outputs", - parameters=params, + parameters=nb_params, + country_code=country_code, ) files_to_ds.append(data_path / f"{country_code}_cases_seasonality.parquet") files_to_ds.append(data_path / f"{country_code}_cases_seasonality.csv") @@ -211,6 +222,7 @@ def snt_seasonality( run_report_notebook( nb_file=pipeline_path / "reporting" / "snt_seasonality_report.ipynb", nb_output_path=pipeline_path / "reporting" / "outputs", + country_code=country_code, ) current_run.log_info("Pipeline completed successfully!") @@ -247,42 +259,23 @@ def validate_parameters(parameters: dict): def run_notebook_for_type( - nb_path: Path, seasonality_type: str, out_nb_path: Path, parameters: dict, kernel_name: str = "ir" + nb_path: Path, + seasonality_type: str, + out_nb_path: Path, + parameters: dict, + country_code: str, + kernel_name: str = "ir", ): - """Execute a Jupyter notebook using Papermill. - - Parameters - ---------- - nb_name : str - The name of the notebook to execute (without the .ipynb extension). - nb_path : Path - The path to the directory containing the notebook. - seasonality_type : str - Type of analysis to be added in the output notebook name. - out_nb_path : Path - The path to the directory where the output notebook will be saved. - parameters : dict - A dictionary of parameters to pass to the notebook. - kernel_name : str, optional - The name of the kernel to use for execution (default is "ir" for R, python3 for Python). - """ - current_run.log_info(f"Executing notebook: {nb_path}") - file_stem = nb_path.stem - extension = nb_path.suffix - execution_timestamp = datetime.now().strftime("%Y-%m-%d_%H%M") - out_nb_full_path = out_nb_path / f"{file_stem}_{seasonality_type}_OUTPUT_{execution_timestamp}{extension}" - out_nb_path.mkdir(parents=True, exist_ok=True) - - try: - pm.execute_notebook( - input_path=nb_path, - output_path=out_nb_full_path, - parameters=parameters, - kernel_name=kernel_name, - request_save_on_cell_execute=False, - ) - except Exception as e: - raise Exception(f"Error executing the notebook {type(e)}: {e}") from e + """Execute a Jupyter notebook via snt_lib (supports notebook_.ipynb).""" + current_run.log_info(f"Executing notebook: {nb_path} (type={seasonality_type})") + run_notebook( + nb_path=nb_path, + out_nb_path=out_nb_path, + parameters=parameters, + kernel_name=kernel_name, + error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, + ) if __name__ == "__main__": diff --git a/snt_seasonality_cases/pipeline.py b/snt_seasonality_cases/pipeline.py index 0359edb..08bf085 100644 --- a/snt_seasonality_cases/pipeline.py +++ b/snt_seasonality_cases/pipeline.py @@ -7,6 +7,7 @@ run_report_notebook, run_notebook, pull_scripts_from_repository, + save_pipeline_parameters, ) @@ -107,13 +108,16 @@ def snt_seasonality_cases( run_notebook( nb_path=pipeline_path / "code" / "snt_seasonality_cases.ipynb", out_nb_path=pipeline_path / "papermill_outputs", - parameters={ - "minimum_month_block_size": get_minimum_month_block_size, - "maximum_month_block_size": get_maximum_month_block_size, - "threshold_for_seasonality": get_threshold_for_seasonality, - "threshold_proportion_seasonal_years": get_threshold_proportion_seasonal_years, - }, + parameters=input_params, error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, + ) + + parameters_file = save_pipeline_parameters( + pipeline_name="snt_seasonality_cases", + parameters=input_params, + output_path=data_path, + country_code=country_code, ) add_files_to_dataset( @@ -122,6 +126,7 @@ def snt_seasonality_cases( file_paths=[ data_path / f"{country_code}_cases_seasonality.parquet", data_path / f"{country_code}_cases_seasonality.csv", + parameters_file, ], ) @@ -131,11 +136,16 @@ def snt_seasonality_cases( else: current_run.log_info("Skipping calculations, running only the reporting.") + snt_config = load_configuration_snt( + config_path=root_path / "configuration" / "SNT_config.json" + ) + country_code = snt_config["SNT_CONFIG"]["COUNTRY_CODE"] run_report_notebook( nb_file=pipeline_path / "reporting" / "snt_seasonality_cases_report.ipynb", nb_output_path=pipeline_path / "reporting" / "outputs", error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, ) diff --git a/snt_worldpop_extract/pipeline.py b/snt_worldpop_extract/pipeline.py index 27ecea3..f4a257d 100644 --- a/snt_worldpop_extract/pipeline.py +++ b/snt_worldpop_extract/pipeline.py @@ -12,6 +12,7 @@ load_configuration_snt, run_report_notebook, validate_config, + save_pipeline_parameters, ) from worlpopclient import WorldPopClient @@ -67,6 +68,13 @@ def snt_worldpop_extract(overwrite: bool = False, pull_scripts: bool = False, ye # get country identifier for file naming country_code = snt_config_dict["SNT_CONFIG"].get("COUNTRY_CODE") + parameters_file = save_pipeline_parameters( + pipeline_name="snt_worldpop_extract", + parameters={"overwrite": overwrite, "year": year, "pull_scripts": pull_scripts}, + output_path=data_path, + country_code=country_code, + ) + # Set output directory retrieve_population_data( country_code=country_code, @@ -93,6 +101,7 @@ def snt_worldpop_extract(overwrite: bool = False, pull_scripts: bool = False, ye data_path / "population" / f"{country_code}_worldpop_population.csv", data_path / "population" / f"{country_code}_worldpop_population.parquet", data_path / "raw" / f"{country_code}_worldpop_ppp_{year}.tif", + parameters_file, ] pop_unadj_tif = data_path / "raw" / f"{country_code}_worldpop_ppp_{year}_UNadj.tif" if pop_unadj_tif.exists(): @@ -108,6 +117,7 @@ def snt_worldpop_extract(overwrite: bool = False, pull_scripts: bool = False, ye run_report_notebook( nb_file=pipeline_path / "reporting" / "snt_worldpop_extract_report.ipynb", nb_output_path=pipeline_path / "reporting" / "outputs", + country_code=country_code, ) except Exception as e: current_run.log_error(f"An error occurred in the pipeline: {e}")