diff --git a/configuration/SNT_config_BFA.json b/configuration/SNT_config_BFA.json index 75f8b40..dbadf72 100644 --- a/configuration/SNT_config_BFA.json +++ b/configuration/SNT_config_BFA.json @@ -21,6 +21,7 @@ "ERA5_DATASET_CLIMATE": "snt-era5-climate", "SNT_SEASONALITY_RAINFALL": "snt-seasonality-rainfall", "SNT_SEASONALITY_CASES": "snt-seasonality-cases", + "DHIS2_QUALITY_OF_CARE": "snt-dhis2-quality-of-care", "SNT_MAP_EXTRACTS": "snt-map-extracts", "SNT_RESULTS": "snt-results" }, diff --git a/configuration/SNT_config_COD.json b/configuration/SNT_config_COD.json index 6a5570a..39daf5c 100644 --- a/configuration/SNT_config_COD.json +++ b/configuration/SNT_config_COD.json @@ -21,6 +21,7 @@ "ERA5_DATASET_CLIMATE": "snt-era5-climate", "SNT_SEASONALITY_RAINFALL": "snt-seasonality-rainfall", "SNT_SEASONALITY_CASES": "snt-seasonality-cases", + "DHIS2_QUALITY_OF_CARE": "snt-dhis2-quality-of-care", "SNT_MAP_EXTRACTS": "snt-map-extracts", "SNT_RESULTS": "snt-results" }, diff --git a/configuration/SNT_config_NER.json b/configuration/SNT_config_NER.json index c2503b8..70e41a7 100644 --- a/configuration/SNT_config_NER.json +++ b/configuration/SNT_config_NER.json @@ -1,150 +1,151 @@ -{ - "SNT_CONFIG": { - "COUNTRY_CODE": "NER", - "COUNTRY_NAME": "NIGER", - "NA_TREATMENT": "SET_0_TO_NA", - "DHIS2_ADMINISTRATION_1": "level_2_name", - "DHIS2_ADMINISTRATION_2": "level_3_name", - "ANALYTICS_ORG_UNITS_LEVEL": 6, - "REPORTING_RATE_PRODUCT_UID": ["LUo8tmRIJc9", "ki7YKOfyxjf"] - }, - "SNT_DATASET_IDENTIFIERS": { - "DHIS2_DATASET_EXTRACTS": "snt-dhis2-extracts", - "DHIS2_DATASET_FORMATTED": "snt-dhis2-formatted", - "DHIS2_POPULATION_TRANSFORMATION": "snt-dhis2-pop-transformation", - "DHIS2_REPORTING_RATE": "snt-dhis2-reporting-rate", - "DHIS2_OUTLIERS_DETECTION": "snt-dhis2-outliers-detection", - "DHIS2_OUTLIERS_REMOVAL_IMPUTATION": "snt-dhis2-outliers-removal-imputation", - "DHIS2_OUTLIERS_IMPUTATION": "snt-dhis2-outliers-imputation", - "DHIS2_INCIDENCE": "snt-dhis2-incidence", - "DHS_INDICATORS": "snt-dhs-indicators", - "MIS_INDICATORS": "snt-mis-indicators", - "WORLDPOP_DATASET_EXTRACT": "snt-worldpop-extract", - "SNT_HEALTHCARE_ACCESS": "snt-healthcare-access", - "ERA5_DATASET_CLIMATE": "snt-era5-climate", - "SNT_SEASONALITY": "snt-seasonality", - "SNT_SEASONALITY_RAINFALL": "snt-seasonality-rainfall", - "SNT_SEASONALITY_CASES": "snt-seasonality-cases", - "SNT_MAP_EXTRACTS": "snt-map-extracts", - "SNT_RESULTS": "snt-results" - }, - "DHIS2_DATA_DEFINITIONS": { - "POPULATION_DEFINITIONS": { - "TOTAL_POPULATION_REF": 27032412, - "GROWTH_FACTOR": 0.0334, - "REFERENCE_YEAR": 2024, - "POPULATION_INDICATORS": { - "POPULATION": { "ids": ["iIS0MGyL3EL"], "type": "dataElement" }, - "POP_UNDER_5": { - "ids": ["tiaX3KnlpRt", "BjCyMF2Thxn"], - "type": "dataElement" - }, - "POP_PREGNANT_WOMAN": { "ids": ["mEyVFwBw2QV"], "type": "dataElement" } - }, - "POPULATION_DISAGGREGATIONS": { - "POP_UNDER_5": 0.195, - "POP_PREGNANT_WOMAN": 0.052, - "POP_0_1_Y": 0.04, - "POP_1_2_Y": 0.079 - } - }, - "DHIS2_INDICATOR_DEFINITIONS": { - "SUSP": ["SE8vUT7xeWO"], - "SUSP_UNDER_5": ["SE8vUT7xeWO.KdO1NS2OI0c"], - "SUSP_ABOVE_5": ["SE8vUT7xeWO.CkWBRP9TqZd"], - "SUSP_PREGNANT_WOMAN": ["SE8vUT7xeWO.HfUn1dkLunb"], - "TEST": ["Hc1bP08WsJf", "IcxzDQ2iViL"], - "TEST_UNDER_5": ["Hc1bP08WsJf.KdO1NS2OI0c", "IcxzDQ2iViL.KdO1NS2OI0c"], - "TEST_ABOVE_5": ["Hc1bP08WsJf.CkWBRP9TqZd", "IcxzDQ2iViL.CkWBRP9TqZd"], - "TEST_PREGNANT_WOMAN": [ - "Hc1bP08WsJf.HfUn1dkLunb", - "IcxzDQ2iViL.HfUn1dkLunb" - ], - "CONF": ["S4S3iPZMvPb", "ScFIzTswCih"], - "CONF_UNDER_5": ["S4S3iPZMvPb.KdO1NS2OI0c", "ScFIzTswCih.KdO1NS2OI0c"], - "CONF_ABOVE_5": ["S4S3iPZMvPb.CkWBRP9TqZd", "ScFIzTswCih.CkWBRP9TqZd"], - "CONF_PREGNANT_WOMAN": [ - "S4S3iPZMvPb.HfUn1dkLunb", - "ScFIzTswCih.HfUn1dkLunb" - ], - "PRES": ["Dq6jCpvgXjN"], - "PRES_UNDER_5": ["Dq6jCpvgXjN.KdO1NS2OI0c"], - "PRES_ABOVE_5": ["Dq6jCpvgXjN.CkWBRP9TqZd"], - "PRES_PREGNANT_WOMAN": ["Dq6jCpvgXjN.HfUn1dkLunb"], - "MALTREAT": ["DLrfnBPUmH2"], - "MALTREAT_UNDER_5": ["DLrfnBPUmH2.KdO1NS2OI0c"], - "MALTREAT_ABOVE_5": ["DLrfnBPUmH2.CkWBRP9TqZd"], - "MALTREAT_PREGNANT_WOMAN": ["DLrfnBPUmH2.HfUn1dkLunb"], - "MALSIMP": ["GWVBGrF5JOd"], - "MALSIMP_UNDER_5": ["GWVBGrF5JOd.KdO1NS2OI0c"], - "MALSIMP_ABOVE_5": ["GWVBGrF5JOd.CkWBRP9TqZd"], - "MALSIMP_PREGNANT_WOMAN": ["GWVBGrF5JOd.HfUn1dkLunb"], - "MALSEV": ["VJN9TulWOl7"], - "MALSEV_UNDER_5": ["VJN9TulWOl7.KdO1NS2OI0c"], - "MALSEV_ABOVE_5": ["VJN9TulWOl7.CkWBRP9TqZd"], - "MALSEV_PREGNANT_WOMAN": ["VJN9TulWOl7.HfUn1dkLunb"], - "MALDTH": ["dl7lZ8HoO1K", "nJI3Bjb2bwx"], - "MALDTH_UNDER_5": ["dl7lZ8HoO1K.KdO1NS2OI0c", "nJI3Bjb2bwx.KdO1NS2OI0c"], - "MALDTH_ABOVE_5": ["dl7lZ8HoO1K.CkWBRP9TqZd", "nJI3Bjb2bwx.CkWBRP9TqZd"], - "MALDTH_PREGNANT_WOMAN": [ - "dl7lZ8HoO1K.HfUn1dkLunb", - "nJI3Bjb2bwx.HfUn1dkLunb" - ], - "MALADM": ["f00QZis2J3n"], - "MALADM_UNDER_5": ["f00QZis2J3n.KdO1NS2OI0c"], - "MALADM_ABOVE_5": ["f00QZis2J3n.CkWBRP9TqZd"], - "MALADM_PREGNANT_WOMAN": ["f00QZis2J3n.HfUn1dkLunb"], - "ALLADM": [ - "UqVkeBwgB06.bDk33CXuRo9", - "UqVkeBwgB06.vxmwRpP1cBe", - "UqVkeBwgB06.MLGLuR5CK0f" - ], - "ALLDTH": ["UqVkeBwgB06.vgdSzy5iJbZ"], - "ALLOUT": ["aAMNlG1Qjfn"], - "TPI1": ["PavbxlgreSC"], - "TPI3": ["tEqFtDVL2Y6"], - "CPN": ["gzj9zjZrNPi"], - "MILDA_CPN": ["hL3WOkhfxo3"], - "MILDA_VAR": ["azdUYKPccDe"], - "VAR1": ["Uk6UmYQp0Pd"], - "UTILISATION": [ - "aAMNlG1Qjfn.OfgifKSFiib", - "aAMNlG1Qjfn.Xj9ayPI0QYh", - "aAMNlG1Qjfn.AkcjRa8rdqa", - "aAMNlG1Qjfn.LqxtDRM0nb1", - "mvmPcDnpbqE.OfgifKSFiib", - "mvmPcDnpbqE.Xj9ayPI0QYh", - "mvmPcDnpbqE.AkcjRa8rdqa", - "mvmPcDnpbqE.LqxtDRM0nb1", - "wuIvQlLQsmx.XXvecDe14Bn", - "wuIvQlLQsmx.mOzOcdV7a8c", - "DPyo6EUCfmU.OfgifKSFiib", - "DPyo6EUCfmU.Xj9ayPI0QYh", - "DPyo6EUCfmU.AkcjRa8rdqa", - "DPyo6EUCfmU.LqxtDRM0nb1" - ] - }, - "DHIS2_REPORTING_RATES": { - "REPORTING_DATASETS": [ - { - "DATASET": "LUo8tmRIJc9", - "METRICS": { - "ACTUAL_REPORTS": "float", - "EXPECTED_REPORTS": "float" - } - }, - { - "DATASET": "ki7YKOfyxjf", - "METRICS": { - "ACTUAL_REPORTS": "float", - "EXPECTED_REPORTS": "float" - } - } - ], - "REPORTING_INDICATORS": { - "ACTUAL_REPORTS": "", - "EXPECTED_REPORTS": "" - } - } - } -} +{ + "SNT_CONFIG": { + "COUNTRY_CODE": "NER", + "COUNTRY_NAME": "NIGER", + "NA_TREATMENT": "SET_0_TO_NA", + "DHIS2_ADMINISTRATION_1": "level_2_name", + "DHIS2_ADMINISTRATION_2": "level_3_name", + "ANALYTICS_ORG_UNITS_LEVEL": 6, + "REPORTING_RATE_PRODUCT_UID": ["LUo8tmRIJc9", "ki7YKOfyxjf"] + }, + "SNT_DATASET_IDENTIFIERS": { + "DHIS2_DATASET_EXTRACTS": "snt-dhis2-extracts", + "DHIS2_DATASET_FORMATTED": "snt-dhis2-formatted", + "DHIS2_POPULATION_TRANSFORMATION": "snt-dhis2-pop-transformation", + "DHIS2_REPORTING_RATE": "snt-dhis2-reporting-rate", + "DHIS2_OUTLIERS_DETECTION": "snt-dhis2-outliers-detection", + "DHIS2_OUTLIERS_REMOVAL_IMPUTATION": "snt-dhis2-outliers-removal-imputation", + "DHIS2_OUTLIERS_IMPUTATION": "snt-dhis2-outliers-imputation", + "DHIS2_INCIDENCE": "snt-dhis2-incidence", + "DHIS2_QUALITY_OF_CARE": "snt-dhis2-quality-of-care", + "DHS_INDICATORS": "snt-dhs-indicators", + "MIS_INDICATORS": "snt-mis-indicators", + "WORLDPOP_DATASET_EXTRACT": "snt-worldpop-extract", + "SNT_HEALTHCARE_ACCESS": "snt-healthcare-access", + "ERA5_DATASET_CLIMATE": "snt-era5-climate", + "SNT_SEASONALITY": "snt-seasonality", + "SNT_SEASONALITY_RAINFALL": "snt-seasonality-rainfall", + "SNT_SEASONALITY_CASES": "snt-seasonality-cases", + "SNT_MAP_EXTRACTS": "snt-map-extracts", + "SNT_RESULTS": "snt-results" + }, + "DHIS2_DATA_DEFINITIONS": { + "POPULATION_DEFINITIONS": { + "TOTAL_POPULATION_REF": 27032412, + "GROWTH_FACTOR": 0.0334, + "REFERENCE_YEAR": 2024, + "POPULATION_INDICATORS": { + "POPULATION": { "ids": ["iIS0MGyL3EL"], "type": "dataElement" }, + "POP_UNDER_5": { + "ids": ["tiaX3KnlpRt", "BjCyMF2Thxn"], + "type": "dataElement" + }, + "POP_PREGNANT_WOMAN": { "ids": ["mEyVFwBw2QV"], "type": "dataElement" } + }, + "POPULATION_DISAGGREGATIONS": { + "POP_UNDER_5": 0.195, + "POP_PREGNANT_WOMAN": 0.052, + "POP_0_1_Y": 0.04, + "POP_1_2_Y": 0.079 + } + }, + "DHIS2_INDICATOR_DEFINITIONS": { + "SUSP": ["SE8vUT7xeWO"], + "SUSP_UNDER_5": ["SE8vUT7xeWO.KdO1NS2OI0c"], + "SUSP_ABOVE_5": ["SE8vUT7xeWO.CkWBRP9TqZd"], + "SUSP_PREGNANT_WOMAN": ["SE8vUT7xeWO.HfUn1dkLunb"], + "TEST": ["Hc1bP08WsJf", "IcxzDQ2iViL"], + "TEST_UNDER_5": ["Hc1bP08WsJf.KdO1NS2OI0c", "IcxzDQ2iViL.KdO1NS2OI0c"], + "TEST_ABOVE_5": ["Hc1bP08WsJf.CkWBRP9TqZd", "IcxzDQ2iViL.CkWBRP9TqZd"], + "TEST_PREGNANT_WOMAN": [ + "Hc1bP08WsJf.HfUn1dkLunb", + "IcxzDQ2iViL.HfUn1dkLunb" + ], + "CONF": ["S4S3iPZMvPb", "ScFIzTswCih"], + "CONF_UNDER_5": ["S4S3iPZMvPb.KdO1NS2OI0c", "ScFIzTswCih.KdO1NS2OI0c"], + "CONF_ABOVE_5": ["S4S3iPZMvPb.CkWBRP9TqZd", "ScFIzTswCih.CkWBRP9TqZd"], + "CONF_PREGNANT_WOMAN": [ + "S4S3iPZMvPb.HfUn1dkLunb", + "ScFIzTswCih.HfUn1dkLunb" + ], + "PRES": ["Dq6jCpvgXjN"], + "PRES_UNDER_5": ["Dq6jCpvgXjN.KdO1NS2OI0c"], + "PRES_ABOVE_5": ["Dq6jCpvgXjN.CkWBRP9TqZd"], + "PRES_PREGNANT_WOMAN": ["Dq6jCpvgXjN.HfUn1dkLunb"], + "MALTREAT": ["DLrfnBPUmH2"], + "MALTREAT_UNDER_5": ["DLrfnBPUmH2.KdO1NS2OI0c"], + "MALTREAT_ABOVE_5": ["DLrfnBPUmH2.CkWBRP9TqZd"], + "MALTREAT_PREGNANT_WOMAN": ["DLrfnBPUmH2.HfUn1dkLunb"], + "MALSIMP": ["GWVBGrF5JOd"], + "MALSIMP_UNDER_5": ["GWVBGrF5JOd.KdO1NS2OI0c"], + "MALSIMP_ABOVE_5": ["GWVBGrF5JOd.CkWBRP9TqZd"], + "MALSIMP_PREGNANT_WOMAN": ["GWVBGrF5JOd.HfUn1dkLunb"], + "MALSEV": ["VJN9TulWOl7"], + "MALSEV_UNDER_5": ["VJN9TulWOl7.KdO1NS2OI0c"], + "MALSEV_ABOVE_5": ["VJN9TulWOl7.CkWBRP9TqZd"], + "MALSEV_PREGNANT_WOMAN": ["VJN9TulWOl7.HfUn1dkLunb"], + "MALDTH": ["dl7lZ8HoO1K", "nJI3Bjb2bwx"], + "MALDTH_UNDER_5": ["dl7lZ8HoO1K.KdO1NS2OI0c", "nJI3Bjb2bwx.KdO1NS2OI0c"], + "MALDTH_ABOVE_5": ["dl7lZ8HoO1K.CkWBRP9TqZd", "nJI3Bjb2bwx.CkWBRP9TqZd"], + "MALDTH_PREGNANT_WOMAN": [ + "dl7lZ8HoO1K.HfUn1dkLunb", + "nJI3Bjb2bwx.HfUn1dkLunb" + ], + "MALADM": ["f00QZis2J3n"], + "MALADM_UNDER_5": ["f00QZis2J3n.KdO1NS2OI0c"], + "MALADM_ABOVE_5": ["f00QZis2J3n.CkWBRP9TqZd"], + "MALADM_PREGNANT_WOMAN": ["f00QZis2J3n.HfUn1dkLunb"], + "ALLADM": [ + "UqVkeBwgB06.bDk33CXuRo9", + "UqVkeBwgB06.vxmwRpP1cBe", + "UqVkeBwgB06.MLGLuR5CK0f" + ], + "ALLDTH": ["UqVkeBwgB06.vgdSzy5iJbZ"], + "ALLOUT": ["aAMNlG1Qjfn"], + "TPI1": ["PavbxlgreSC"], + "TPI3": ["tEqFtDVL2Y6"], + "CPN": ["gzj9zjZrNPi"], + "MILDA_CPN": ["hL3WOkhfxo3"], + "MILDA_VAR": ["azdUYKPccDe"], + "VAR1": ["Uk6UmYQp0Pd"], + "UTILISATION": [ + "aAMNlG1Qjfn.OfgifKSFiib", + "aAMNlG1Qjfn.Xj9ayPI0QYh", + "aAMNlG1Qjfn.AkcjRa8rdqa", + "aAMNlG1Qjfn.LqxtDRM0nb1", + "mvmPcDnpbqE.OfgifKSFiib", + "mvmPcDnpbqE.Xj9ayPI0QYh", + "mvmPcDnpbqE.AkcjRa8rdqa", + "mvmPcDnpbqE.LqxtDRM0nb1", + "wuIvQlLQsmx.XXvecDe14Bn", + "wuIvQlLQsmx.mOzOcdV7a8c", + "DPyo6EUCfmU.OfgifKSFiib", + "DPyo6EUCfmU.Xj9ayPI0QYh", + "DPyo6EUCfmU.AkcjRa8rdqa", + "DPyo6EUCfmU.LqxtDRM0nb1" + ] + }, + "DHIS2_REPORTING_RATES": { + "REPORTING_DATASETS": [ + { + "DATASET": "LUo8tmRIJc9", + "METRICS": { + "ACTUAL_REPORTS": "float", + "EXPECTED_REPORTS": "float" + } + }, + { + "DATASET": "ki7YKOfyxjf", + "METRICS": { + "ACTUAL_REPORTS": "float", + "EXPECTED_REPORTS": "float" + } + } + ], + "REPORTING_INDICATORS": { + "ACTUAL_REPORTS": "", + "EXPECTED_REPORTS": "" + } + } + } +} diff --git a/pipelines/snt_dhis2_quality_of_care/code/snt_dhis2_quality_of_care.ipynb b/pipelines/snt_dhis2_quality_of_care/code/snt_dhis2_quality_of_care.ipynb new file mode 100644 index 0000000..3d650ea --- /dev/null +++ b/pipelines/snt_dhis2_quality_of_care/code/snt_dhis2_quality_of_care.ipynb @@ -0,0 +1,430 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "fad6c24e", + "metadata": {}, + "source": [ + "## Quality of Care Indicators\n", + "\n", + "Compute district-year quality-of-care indicators from DHIS2 outliers-imputed routine data.\n", + "\n", + "Indicators:\n", + "- testing_rate = TEST / SUSP\n", + "- treatment_rate = MALTREAT / CONF\n", + "- case_fatality_rate = MALDTH / MALADM\n", + "- prop_adm_malaria = MALADM / ALLADM\n", + "- prop_malaria_deaths = MALDTH / ALLDTH\n", + "- non_malaria_all_cause_outpatients = ALLOUT (absolute)\n", + "- presumed_cases = PRES (absolute)\n", + "\n", + "Stock-out indicators are not implemented yet (on hold, NMDR data pending)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "317c4085", + "metadata": { + "vscode": { + "languageId": "r" + } + }, + "outputs": [], + "source": [ + "# Preliminaries\n", + "options(scipen=999)\n", + "\n", + "ROOT_PATH <- \"~/workspace\"\n", + "CONFIG_PATH <- file.path(ROOT_PATH, \"configuration\")\n", + "CODE_PATH <- file.path(ROOT_PATH, \"code\")\n", + "DATA_PATH <- file.path(ROOT_PATH, \"data\")\n", + "OUTPUT_DATA_PATH <- file.path(DATA_PATH, \"dhis2\", \"quality_of_care\")\n", + "FIGURES_PATH <- file.path(ROOT_PATH, \"pipelines\", \"snt_dhis2_quality_of_care\", \"reporting\", \"outputs\", \"figures\")\n", + "\n", + "dir.create(OUTPUT_DATA_PATH, recursive = TRUE, showWarnings = FALSE)\n", + "dir.create(FIGURES_PATH, recursive = TRUE, showWarnings = FALSE)\n", + "\n", + "source(file.path(CODE_PATH, \"snt_utils.r\"))\n", + "required_packages <- c(\"jsonlite\", \"data.table\", \"arrow\", \"sf\", \"ggplot2\", \"glue\", \"reticulate\", \"RColorBrewer\", \"dplyr\")\n", + "install_and_load(required_packages)\n", + "\n", + "Sys.setenv(RETICULATE_PYTHON = \"/opt/conda/bin/python\")\n", + "openhexa <- reticulate::import(\"openhexa.sdk\")\n", + "\n", + "config_json <- jsonlite::fromJSON(file.path(CONFIG_PATH, \"SNT_config.json\"))\n", + "COUNTRY_CODE <- config_json$SNT_CONFIG$COUNTRY_CODE\n", + "DHIS2_FORMATTED_DATASET <- config_json$SNT_DATASET_IDENTIFIERS$DHIS2_DATASET_FORMATTED\n", + "OUTLIERS_DATASET <- config_json$SNT_DATASET_IDENTIFIERS$DHIS2_OUTLIERS_IMPUTATION" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "98b78bf7", + "metadata": { + "vscode": { + "languageId": "r" + } + }, + "outputs": [], + "source": [ + "# Validate data_action parameter\n", + "if (!exists(\"data_action\")) {\n", + " data_action <- \"imputed\"\n", + "}\n", + "\n", + "allowed_actions <- c(\"imputed\", \"removed\")\n", + "if (!(data_action %in% allowed_actions)) {\n", + " stop(glue::glue(\"Invalid data_action: {data_action}. Allowed: {paste(allowed_actions, collapse=', ')}\"))\n", + "}\n", + "\n", + "# Automatically find the latest routine outliers-imputed file in the dataset\n", + "# Pattern: {COUNTRY_CODE}_routine_outliers-*_{data_action}.parquet\n", + "log_msg(glue::glue(\"Searching for latest routine outliers-imputed file in dataset (data_action: {data_action})...\"))\n", + "\n", + "dataset_last_version <- openhexa$workspace$get_dataset(OUTLIERS_DATASET)$latest_version\n", + "if (is.null(dataset_last_version)) {\n", + " stop(glue::glue(\"[ERROR] No version available in dataset `{OUTLIERS_DATASET}`. Process stopped.\"))\n", + "}\n", + "\n", + "# Pattern to match: {COUNTRY_CODE}_routine_outliers-*_{data_action}.parquet\n", + "pattern_prefix <- glue::glue(\"{COUNTRY_CODE}_routine_outliers-\")\n", + "pattern_suffix <- glue::glue(\"_{data_action}.parquet\")\n", + "routine_filename <- NULL\n", + "files_list <- reticulate::iterate(dataset_last_version$files)\n", + "\n", + "# Find all matching files and select the latest one\n", + "matching_files <- c()\n", + "for (file in files_list) {\n", + " filename <- file$filename\n", + " if (startsWith(filename, pattern_prefix) && endsWith(filename, pattern_suffix)) {\n", + " matching_files <- c(matching_files, filename)\n", + " }\n", + "}\n", + "\n", + "if (length(matching_files) == 0) {\n", + " stop(glue::glue(\"[ERROR] No file matching pattern `{pattern_prefix}*{pattern_suffix}` found in dataset `{OUTLIERS_DATASET}`. \",\n", + " \"Please run an outlier imputation pipeline first (e.g., snt_dhis2_outliers_imputation_mean) with `data_action=\\\"{data_action}\\\"`.\"))\n", + "}\n", + "\n", + "# Select the latest file (alphabetically sorted, which should correspond to most recent method)\n", + "routine_filename <- sort(matching_files, decreasing = TRUE)[1]\n", + "\n", + "log_msg(glue::glue(\"Found {length(matching_files)} matching file(s). Using latest: {routine_filename}\"))\n", + "\n", + "# Load the routine file\n", + "routine <- tryCatch({\n", + " get_latest_dataset_file_in_memory(OUTLIERS_DATASET, routine_filename)\n", + "}, error = function(e) {\n", + " msg <- paste0(\"[ERROR] 🛑 Error while loading DHIS2 routine data file `\", routine_filename, \n", + " \"` from `\", OUTLIERS_DATASET, \"`. [ERROR DETAILS] \", conditionMessage(e))\n", + " stop(msg)\n", + "})\n", + "\n", + "shapes <- get_latest_dataset_file_in_memory(DHIS2_FORMATTED_DATASET, paste0(COUNTRY_CODE, \"_shapes.geojson\"))\n", + "\n", + "setDT(routine)\n", + "\n", + "# Core required columns (must exist)\n", + "core_cols <- c(\"ADM2_ID\", \"YEAR\")\n", + "core_missing <- setdiff(core_cols, names(routine))\n", + "if (length(core_missing) > 0) {\n", + " stop(glue::glue(\"Missing core required columns in routine data: {paste(core_missing, collapse=', ')}\"))\n", + "}\n", + "\n", + "# Optional indicator columns (will be checked and handled gracefully)\n", + "indicator_cols <- c(\"TEST\", \"SUSP\", \"MALTREAT\", \"CONF\", \"MALDTH\", \"MALADM\", \"ALLADM\", \"ALLDTH\", \"ALLOUT\", \"PRES\")\n", + "available_cols <- intersect(indicator_cols, names(routine))\n", + "missing_cols <- setdiff(indicator_cols, names(routine))\n", + "\n", + "if (length(missing_cols) > 0) {\n", + " log_msg(glue::glue(\"[WARNING] Some indicator columns are missing: {paste(missing_cols, collapse=', ')}. These indicators will not be calculated.\"), level = \"warning\")\n", + "}\n", + "\n", + "# Convert available numeric columns\n", + "# Handle \"-\" and other non-numeric values by converting them to NA first\n", + "num_cols <- intersect(available_cols, names(routine))\n", + "if (length(num_cols) > 0) {\n", + " for (col in num_cols) {\n", + " # First convert to character to handle \"-\" strings, then replace with NA, then convert to numeric\n", + " col_vals <- as.character(routine[[col]])\n", + " col_vals[is.na(col_vals) | col_vals == \"\" | col_vals == \"-\"] <- NA_character_\n", + " routine[, (col) := as.numeric(col_vals)]\n", + " }\n", + "}\n", + "routine[, YEAR := as.integer(YEAR)]\n", + "routine[, ADM2_ID := as.character(ADM2_ID)]\n", + "\n", + "# Aggregate available columns only using lapply\n", + "if (length(available_cols) > 0) {\n", + " qoc <- routine[, lapply(.SD, function(x) sum(x, na.rm = TRUE)), \n", + " .SDcols = available_cols, \n", + " by = .(ADM2_ID, YEAR)]\n", + "} else {\n", + " # If no indicator columns available, create empty structure\n", + " qoc <- routine[, .(ADM2_ID, YEAR)]\n", + " qoc <- unique(qoc)\n", + "}\n", + "\n", + "# Calculate indicators only if required columns are available\n", + "if (\"TEST\" %in% names(qoc) && \"SUSP\" %in% names(qoc)) {\n", + " qoc[, testing_rate := fifelse(SUSP > 0, TEST / SUSP, NA_real_)]\n", + "} else {\n", + " log_msg(\"[WARNING] Cannot calculate testing_rate: missing TEST or SUSP columns\", level = \"warning\")\n", + "}\n", + "\n", + "if (\"MALTREAT\" %in% names(qoc) && \"CONF\" %in% names(qoc)) {\n", + " qoc[, treatment_rate := fifelse(CONF > 0, MALTREAT / CONF, NA_real_)]\n", + "} else {\n", + " log_msg(\"[WARNING] Cannot calculate treatment_rate: missing MALTREAT or CONF columns\", level = \"warning\")\n", + "}\n", + "\n", + "if (\"MALDTH\" %in% names(qoc) && \"MALADM\" %in% names(qoc)) {\n", + " qoc[, case_fatality_rate := fifelse(MALADM > 0, MALDTH / MALADM, NA_real_)]\n", + "} else {\n", + " log_msg(\"[WARNING] Cannot calculate case_fatality_rate: missing MALDTH or MALADM columns\", level = \"warning\")\n", + "}\n", + "\n", + "if (\"MALADM\" %in% names(qoc) && \"ALLADM\" %in% names(qoc)) {\n", + " qoc[, prop_adm_malaria := fifelse(ALLADM > 0, MALADM / ALLADM, NA_real_)]\n", + "} else {\n", + " log_msg(\"[WARNING] Cannot calculate prop_adm_malaria: missing MALADM or ALLADM columns\", level = \"warning\")\n", + "}\n", + "\n", + "if (\"MALDTH\" %in% names(qoc) && \"ALLDTH\" %in% names(qoc)) {\n", + " qoc[, prop_malaria_deaths := fifelse(ALLDTH > 0, MALDTH / ALLDTH, NA_real_)]\n", + " # Compatibility alias to match historical notebook export naming\n", + " qoc[, prop_deaths_malaria := prop_malaria_deaths]\n", + "} else {\n", + " log_msg(\"[WARNING] Cannot calculate prop_malaria_deaths: missing MALDTH or ALLDTH columns\", level = \"warning\")\n", + "}\n", + "\n", + "if (\"ALLOUT\" %in% names(qoc)) {\n", + " qoc[, non_malaria_all_cause_outpatients := ALLOUT]\n", + "} else {\n", + " log_msg(\"[WARNING] Cannot calculate non_malaria_all_cause_outpatients: missing ALLOUT column\", level = \"warning\")\n", + "}\n", + "\n", + "if (\"PRES\" %in% names(qoc)) {\n", + " qoc[, presumed_cases := PRES]\n", + "} else {\n", + " log_msg(\"[WARNING] Cannot calculate presumed_cases: missing PRES column\", level = \"warning\")\n", + "}\n", + "\n", + "shapes_dt <- as.data.table(sf::st_drop_geometry(shapes))\n", + "if (\"ADM2_ID\" %in% names(shapes_dt) && \"ADM2_NAME\" %in% names(shapes_dt)) {\n", + " shapes_dt[, ADM2_ID := as.character(ADM2_ID)]\n", + " qoc <- merge(qoc, unique(shapes_dt[, .(ADM2_ID, ADM2_NAME)]), by = \"ADM2_ID\", all.x = TRUE)\n", + "}\n", + "\n", + "# Persist only district-year outputs (requested)\n", + "out_district_parquet <- file.path(OUTPUT_DATA_PATH, glue::glue(\"{COUNTRY_CODE}_quality_of_care_district_year_{data_action}.parquet\"))\n", + "out_district_csv <- file.path(OUTPUT_DATA_PATH, glue::glue(\"{COUNTRY_CODE}_quality_of_care_district_year_{data_action}.csv\"))\n", + "\n", + "arrow::write_parquet(qoc, out_district_parquet)\n", + "data.table::fwrite(qoc, out_district_csv)\n", + "\n", + "log_msg(glue::glue(\"Saved outputs: {out_district_parquet}, {out_district_csv}\"))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "984689b0", + "metadata": { + "vscode": { + "languageId": "r" + } + }, + "outputs": [], + "source": [ + "# Yearly maps by ADM2\n", + "# Ensure ADM2_ID is character in both objects (do this once before the function)\n", + "shapes$ADM2_ID <- as.character(shapes$ADM2_ID)\n", + "qoc$ADM2_ID <- as.character(qoc$ADM2_ID)\n", + "\n", + "plot_yearly_map <- function(df, sf_shapes, value_col, title_prefix, filename_prefix, is_rate = TRUE) {\n", + " # Check if value_col exists in df\n", + " if (!(value_col %in% names(df))) {\n", + " log_msg(glue::glue(\"[WARNING] Column '{value_col}' not found in data. Skipping map generation.\"), level = \"warning\")\n", + " return(invisible(NULL))\n", + " }\n", + " \n", + " # Create a local copy of sf_shapes to avoid modifying the original\n", + " sf_shapes_local <- sf_shapes\n", + " if (!is.character(sf_shapes_local$ADM2_ID)) {\n", + " sf_shapes_local$ADM2_ID <- as.character(sf_shapes_local$ADM2_ID)\n", + " }\n", + " \n", + " years <- sort(unique(df$YEAR))\n", + " for (yr in years) {\n", + " df_y <- df[YEAR == yr]\n", + " \n", + " # Check if df_y has any rows\n", + " if (nrow(df_y) == 0) {\n", + " log_msg(glue::glue(\"[WARNING] No data for '{value_col}' in year {yr}. Skipping map.\"), level = \"warning\")\n", + " next\n", + " }\n", + " \n", + " # Ensure ADM2_ID is character in df_y\n", + " df_y$ADM2_ID <- as.character(df_y$ADM2_ID)\n", + " \n", + " # Use dplyr::left_join for sf objects to preserve geometry (use local copy)\n", + " map_df <- dplyr::left_join(sf_shapes_local, df_y, by = \"ADM2_ID\")\n", + "\n", + " # Check if value_col exists in map_df after merge\n", + " if (!(value_col %in% names(map_df))) {\n", + " log_msg(glue::glue(\"[WARNING] Column '{value_col}' not found after merge for year {yr}. Skipping map.\"), level = \"warning\")\n", + " next\n", + " }\n", + "\n", + " vals <- map_df[[value_col]]\n", + " finite_vals <- vals[is.finite(vals) & !is.na(vals)]\n", + " \n", + " # If no valid values, skip this map\n", + " if (length(finite_vals) == 0) {\n", + " log_msg(glue::glue(\"[WARNING] No valid values for '{value_col}' in year {yr}. Skipping map.\"), level = \"warning\")\n", + " next\n", + " }\n", + "\n", + " # Create cat column BEFORE creating the plot\n", + " cat_vals <- NULL\n", + " fill_palette <- NULL\n", + " \n", + " if (is_rate) {\n", + " # Create cat column with proper handling of NA values\n", + " cat_result <- tryCatch({\n", + " cat_vals <- cut(\n", + " vals,\n", + " breaks = c(-Inf, 0, 0.2, 0.4, 0.6, 0.8, 1.0, Inf),\n", + " labels = c(\"<0\", \"0-0.2\", \"0.2-0.4\", \"0.4-0.6\", \"0.6-0.8\", \"0.8-1.0\", \">1.0\"),\n", + " include.lowest = TRUE\n", + " )\n", + " fill_palette <- \"YlOrRd\"\n", + " TRUE # Success\n", + " }, error = function(e) {\n", + " log_msg(glue::glue(\"[WARNING] Failed to create categories for '{value_col}' year {yr}: {conditionMessage(e)}\"), level = \"warning\")\n", + " FALSE # Failure\n", + " })\n", + " if (!cat_result) {\n", + " next\n", + " }\n", + " } else {\n", + " cat_result <- tryCatch({\n", + " if (length(finite_vals) > 4) {\n", + " br <- unique(as.numeric(quantile(finite_vals, probs = seq(0, 1, 0.2), na.rm = TRUE)))\n", + " if (length(br) < 2) {\n", + " cat_vals <- as.factor(rep(\"all\", nrow(map_df)))\n", + " } else {\n", + " cat_vals <- cut(vals, breaks = br, include.lowest = TRUE)\n", + " }\n", + " } else {\n", + " cat_vals <- as.factor(vals)\n", + " }\n", + " fill_palette <- \"Blues\"\n", + " TRUE # Success\n", + " }, error = function(e) {\n", + " log_msg(glue::glue(\"[WARNING] Failed to create categories for '{value_col}' year {yr}: {conditionMessage(e)}\"), level = \"warning\")\n", + " FALSE # Failure\n", + " })\n", + " if (!cat_result) {\n", + " next\n", + " }\n", + " }\n", + " \n", + " # Check if cat_vals was created successfully\n", + " if (is.null(cat_vals) || length(cat_vals) != nrow(map_df)) {\n", + " log_msg(glue::glue(\"[WARNING] Failed to create 'cat' column for '{value_col}' in year {yr}. Skipping map.\"), level = \"warning\")\n", + " next\n", + " }\n", + " \n", + " # Check if all values are NA (cut failed) - but allow some NA values\n", + " if (all(is.na(cat_vals))) {\n", + " log_msg(glue::glue(\"[WARNING] All 'cat' values are NA for '{value_col}' in year {yr}. Skipping map.\"), level = \"warning\")\n", + " next\n", + " }\n", + " \n", + " # Add cat column using dplyr::mutate to ensure it's properly added to sf object\n", + " map_df <- dplyr::mutate(map_df, cat = as.factor(cat_vals))\n", + " \n", + " # Verify cat column exists before creating plot\n", + " if (!(\"cat\" %in% names(map_df))) {\n", + " log_msg(glue::glue(\"[WARNING] Failed to add 'cat' column to map_df for '{value_col}' in year {yr}. Skipping map.\"), level = \"warning\")\n", + " next\n", + " }\n", + " \n", + " # Create plot AFTER cat column is added\n", + " p <- ggplot(map_df) +\n", + " geom_sf(aes(fill = cat), color = \"grey60\", size = 0.1) +\n", + " scale_fill_brewer(palette = fill_palette, na.value = \"white\", drop = FALSE)\n", + "\n", + " p <- p +\n", + " theme_void() +\n", + " labs(\n", + " title = paste0(title_prefix, \" - \", yr),\n", + " fill = value_col,\n", + " caption = \"Source: SNT DHIS2 outliers-imputed routine data\"\n", + " ) +\n", + " theme(\n", + " legend.position = \"bottom\",\n", + " plot.title = element_text(face = \"bold\", size = 12)\n", + " )\n", + "\n", + " out_png <- file.path(FIGURES_PATH, glue::glue(\"{filename_prefix}_{yr}.png\"))\n", + " \n", + " # Try to save the plot, catch any errors\n", + " tryCatch({\n", + " ggsave(out_png, plot = p, width = 9, height = 7, dpi = 300, bg = \"white\")\n", + " log_msg(glue::glue(\"Saved map: {out_png}\"))\n", + " }, error = function(e) {\n", + " log_msg(glue::glue(\"[WARNING] Failed to save map for '{value_col}' year {yr}: {conditionMessage(e)}\"), level = \"warning\")\n", + " })\n", + " }\n", + "}\n", + "\n", + "# Plot only indicators that were calculated (columns exist)\n", + "if (\"testing_rate\" %in% names(qoc)) {\n", + " plot_yearly_map(qoc, shapes, \"testing_rate\", \"Testing rate (TEST / SUSP)\", \"testing_rate\", TRUE)\n", + "}\n", + "if (\"treatment_rate\" %in% names(qoc)) {\n", + " plot_yearly_map(qoc, shapes, \"treatment_rate\", \"Treatment rate (MALTREAT / CONF)\", \"treatment_rate\", TRUE)\n", + "}\n", + "if (\"case_fatality_rate\" %in% names(qoc)) {\n", + " plot_yearly_map(qoc, shapes, \"case_fatality_rate\", \"In-hospital case fatality rate (MALDTH / MALADM)\", \"case_fatality_rate\", TRUE)\n", + "}\n", + "if (\"prop_adm_malaria\" %in% names(qoc)) {\n", + " plot_yearly_map(qoc, shapes, \"prop_adm_malaria\", \"Proportion admitted for malaria (MALADM / ALLADM)\", \"prop_adm_malaria\", TRUE)\n", + "}\n", + "if (\"prop_malaria_deaths\" %in% names(qoc)) {\n", + " plot_yearly_map(qoc, shapes, \"prop_malaria_deaths\", \"Proportion of malaria deaths (MALDTH / ALLDTH)\", \"prop_malaria_deaths\", TRUE)\n", + "}\n", + "if (\"non_malaria_all_cause_outpatients\" %in% names(qoc)) {\n", + " plot_yearly_map(qoc, shapes, \"non_malaria_all_cause_outpatients\", \"Non-malaria all-cause outpatients (ALLOUT)\", \"allout\", FALSE)\n", + "}\n", + "if (\"presumed_cases\" %in% names(qoc)) {\n", + " plot_yearly_map(qoc, shapes, \"presumed_cases\", \"Presumed cases (PRES)\", \"presumed_cases\", FALSE)\n", + "}\n", + "\n", + "log_msg(glue::glue(\"Saved yearly maps in: {FIGURES_PATH}\"))" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "R", + "language": "R", + "name": "ir" + }, + "language_info": { + "codemirror_mode": "r", + "file_extension": ".r", + "mimetype": "text/x-r-source", + "name": "R", + "pygments_lexer": "r", + "version": "4.4.3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/pipelines/snt_dhis2_quality_of_care/reporting/snt_dhis2_quality_of_care_report.ipynb b/pipelines/snt_dhis2_quality_of_care/reporting/snt_dhis2_quality_of_care_report.ipynb new file mode 100644 index 0000000..045eb65 --- /dev/null +++ b/pipelines/snt_dhis2_quality_of_care/reporting/snt_dhis2_quality_of_care_report.ipynb @@ -0,0 +1,648 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "7d246ae9", + "metadata": {}, + "source": [ + "## Quality of Care Report\n", + "\n", + "This report displays a compact year-level summary of quality-of-care indicators and points to generated map outputs." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5eaa5bab", + "metadata": { + "vscode": { + "languageId": "r" + } + }, + "outputs": [], + "source": [ + "ROOT_PATH <- \"~/workspace\"\n", + "CONFIG_PATH <- file.path(ROOT_PATH, \"configuration\")\n", + "CODE_PATH <- file.path(ROOT_PATH, \"code\")\n", + "DATA_PATH <- file.path(ROOT_PATH, \"data\", \"dhis2\", \"quality_of_care\")\n", + "FIGURES_PATH <- file.path(ROOT_PATH, \"pipelines\", \"snt_dhis2_quality_of_care\", \"reporting\", \"outputs\", \"figures\")\n", + "\n", + "source(file.path(CODE_PATH, \"snt_utils.r\"))\n", + "install_and_load(c(\"jsonlite\", \"data.table\", \"arrow\", \"dplyr\", \"knitr\", \"glue\", \"reticulate\", \"writexl\", \"ggplot2\", \"scales\", \"gridExtra\", \"sf\"))\n", + "\n", + "# Create output directories\n", + "REPORT_OUTPUTS_PATH <- file.path(ROOT_PATH, \"pipelines\", \"snt_dhis2_quality_of_care\", \"reporting\", \"outputs\")\n", + "dir.create(REPORT_OUTPUTS_PATH, recursive = TRUE, showWarnings = FALSE)\n", + "dir.create(FIGURES_PATH, recursive = TRUE, showWarnings = FALSE)\n", + "\n", + "Sys.setenv(RETICULATE_PYTHON = \"/opt/conda/bin/python\")\n", + "openhexa <- reticulate::import(\"openhexa.sdk\")\n", + "\n", + "config_json <- jsonlite::fromJSON(file.path(CONFIG_PATH, \"SNT_config.json\"))\n", + "COUNTRY_CODE <- config_json$SNT_CONFIG$COUNTRY_CODE" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1a8320f8", + "metadata": { + "vscode": { + "languageId": "r" + } + }, + "outputs": [], + "source": [ + "# Use district-year output file (latest action)\n", + "files <- list.files(DATA_PATH, pattern = paste0(\"^\", COUNTRY_CODE, \"_quality_of_care_district_year_(imputed|removed)\\\\.parquet$\"), full.names = TRUE)\n", + "if (length(files) == 0) {\n", + " stop(glue::glue(\"No quality_of_care parquet found in {DATA_PATH}\"))\n", + "}\n", + "\n", + "latest_file <- files[which.max(file.info(files)$mtime)]\n", + "qoc <- as.data.table(arrow::read_parquet(latest_file))\n", + "\n", + "# Build summary table with only available columns\n", + "# Start with unique YEAR values\n", + "summary_tbl <- unique(qoc[, .(YEAR)])\n", + "\n", + "# Add rate indicators (mean) - merge one by one\n", + "if (\"testing_rate\" %in% names(qoc)) {\n", + " summary_tbl <- merge(summary_tbl, \n", + " qoc[, .(testing_rate = mean(testing_rate, na.rm = TRUE)), by = .(YEAR)], \n", + " by = \"YEAR\", all.x = TRUE)\n", + "}\n", + "if (\"treatment_rate\" %in% names(qoc)) {\n", + " summary_tbl <- merge(summary_tbl, \n", + " qoc[, .(treatment_rate = mean(treatment_rate, na.rm = TRUE)), by = .(YEAR)], \n", + " by = \"YEAR\", all.x = TRUE)\n", + "}\n", + "if (\"case_fatality_rate\" %in% names(qoc)) {\n", + " summary_tbl <- merge(summary_tbl, \n", + " qoc[, .(case_fatality_rate = mean(case_fatality_rate, na.rm = TRUE)), by = .(YEAR)], \n", + " by = \"YEAR\", all.x = TRUE)\n", + "}\n", + "if (\"prop_adm_malaria\" %in% names(qoc)) {\n", + " summary_tbl <- merge(summary_tbl, \n", + " qoc[, .(prop_adm_malaria = mean(prop_adm_malaria, na.rm = TRUE)), by = .(YEAR)], \n", + " by = \"YEAR\", all.x = TRUE)\n", + "}\n", + "if (\"prop_malaria_deaths\" %in% names(qoc)) {\n", + " summary_tbl <- merge(summary_tbl, \n", + " qoc[, .(prop_malaria_deaths = mean(prop_malaria_deaths, na.rm = TRUE)), by = .(YEAR)], \n", + " by = \"YEAR\", all.x = TRUE)\n", + "}\n", + "\n", + "# Add absolute indicators (sum)\n", + "if (\"non_malaria_all_cause_outpatients\" %in% names(qoc)) {\n", + " summary_tbl <- merge(summary_tbl, \n", + " qoc[, .(non_malaria_all_cause_outpatients = sum(non_malaria_all_cause_outpatients, na.rm = TRUE)), by = .(YEAR)], \n", + " by = \"YEAR\", all.x = TRUE)\n", + "}\n", + "if (\"presumed_cases\" %in% names(qoc)) {\n", + " summary_tbl <- merge(summary_tbl, \n", + " qoc[, .(presumed_cases = sum(presumed_cases, na.rm = TRUE)), by = .(YEAR)], \n", + " by = \"YEAR\", all.x = TRUE)\n", + "}\n", + "\n", + "summary_tbl <- summary_tbl[order(YEAR)]\n", + "\n", + "# Explicitly list missing indicators so report is self-explanatory\n", + "expected_indicators <- c(\n", + " \"testing_rate\",\n", + " \"treatment_rate\",\n", + " \"case_fatality_rate\",\n", + " \"prop_adm_malaria\",\n", + " \"prop_malaria_deaths\",\n", + " \"non_malaria_all_cause_outpatients\",\n", + " \"presumed_cases\"\n", + ")\n", + "missing_indicators <- setdiff(expected_indicators, names(qoc))\n", + "if (length(missing_indicators) > 0) {\n", + " log_msg(glue::glue(\"[WARNING] Missing indicators in input file: {paste(missing_indicators, collapse=', ')}\"), level = \"warning\")\n", + " cat(glue::glue(\"\\nMissing indicators in this run: {paste(missing_indicators, collapse=', ')}\\n\"))\n", + " cat(\"Reason: required source columns are absent in the selected outliers file.\\n\")\n", + "}\n", + "\n", + "# Save summary data (parquet, csv, xlsx) - following other pipelines pattern\n", + "summary_parquet <- file.path(REPORT_OUTPUTS_PATH, glue::glue(\"{COUNTRY_CODE}_quality_of_care_summary.parquet\"))\n", + "summary_csv <- file.path(REPORT_OUTPUTS_PATH, glue::glue(\"{COUNTRY_CODE}_quality_of_care_summary.csv\"))\n", + "summary_xlsx <- file.path(REPORT_OUTPUTS_PATH, glue::glue(\"{COUNTRY_CODE}_quality_of_care_summary.xlsx\"))\n", + "\n", + "# Save as parquet (primary format, like other pipelines)\n", + "arrow::write_parquet(summary_tbl, summary_parquet)\n", + "\n", + "# Save as csv and xlsx for compatibility\n", + "data.table::fwrite(summary_tbl, summary_csv)\n", + "writexl::write_xlsx(list(summary = as.data.frame(summary_tbl)), summary_xlsx)\n", + "\n", + "log_msg(glue::glue(\"Summary data saved to: {summary_parquet}, {summary_csv}, {summary_xlsx}\"))\n", + "\n", + "knitr::kable(summary_tbl, caption = \"Quality of Care - Year-level summary\")\n", + "\n", + "cat(glue::glue(\"\\nLoaded file: {latest_file}\\n\"))\n", + "cat(glue::glue(\"Map outputs folder: {FIGURES_PATH}\\n\"))\n", + "cat(glue::glue(\"Summary data saved to: {summary_parquet}, {summary_csv}, {summary_xlsx}\\n\"))" + ] + }, + { + "cell_type": "markdown", + "id": "3dc318ac", + "metadata": {}, + "source": [ + "## Graphs by Year" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0e86bb0a", + "metadata": { + "vscode": { + "languageId": "r" + } + }, + "outputs": [], + "source": [ + "# Create bar charts by year (same as original notebook - 4x2 grid layout)\n", + "# Prepare data - convert rates to percentages\n", + "plot_data <- copy(summary_tbl)\n", + "\n", + "# Create the same 4x2 subplot layout as original notebook\n", + "if (nrow(plot_data) > 0) {\n", + " # Create a list to store individual plots (in order: 4x2 grid)\n", + " plots_list <- list()\n", + " \n", + " # Row 0, Col 0: Testing rate\n", + " if (\"testing_rate\" %in% names(plot_data)) {\n", + " p <- ggplot(plot_data, aes(x = factor(YEAR), y = testing_rate * 100)) +\n", + " geom_bar(stat = \"identity\", fill = \"#2563eb\", color = \"#1e40af\", width = 0.7) +\n", + " geom_text(aes(label = paste0(round(testing_rate * 100, 1), \"%\")), \n", + " vjust = -0.5, size = 2.5) +\n", + " labs(title = \"Testing rate (TEST / SUSP)\", x = \"Année\", y = \"%\") +\n", + " theme_minimal() +\n", + " theme(\n", + " plot.title = element_text(face = \"bold\", size = 10),\n", + " axis.text.x = element_text(angle = 45, hjust = 1, size = 9),\n", + " panel.grid.major.y = element_line(linetype = \"dashed\", color = scales::alpha(\"grey\", 0.7)),\n", + " plot.background = element_rect(fill = \"#fafafa\", color = NA),\n", + " panel.background = element_rect(fill = \"#fafafa\", color = NA),\n", + " plot.margin = margin(5, 5, 5, 5)\n", + " ) +\n", + " scale_y_continuous(expand = expansion(mult = c(0, 0.1)))\n", + " plots_list[[\"testing_rate\"]] <- p\n", + " }\n", + " \n", + " # Row 0, Col 1: Treatment rate\n", + " if (\"treatment_rate\" %in% names(plot_data)) {\n", + " p <- ggplot(plot_data, aes(x = factor(YEAR), y = treatment_rate * 100)) +\n", + " geom_bar(stat = \"identity\", fill = \"#2563eb\", color = \"#1e40af\", width = 0.7) +\n", + " geom_text(aes(label = paste0(round(treatment_rate * 100, 1), \"%\")), \n", + " vjust = -0.5, size = 2.5) +\n", + " labs(title = \"Treatment rate (MALTREAT / CONF)\", x = \"Année\", y = \"%\") +\n", + " theme_minimal() +\n", + " theme(\n", + " plot.title = element_text(face = \"bold\", size = 10),\n", + " axis.text.x = element_text(angle = 45, hjust = 1, size = 9),\n", + " panel.grid.major.y = element_line(linetype = \"dashed\", color = scales::alpha(\"grey\", 0.7)),\n", + " plot.background = element_rect(fill = \"#fafafa\", color = NA),\n", + " panel.background = element_rect(fill = \"#fafafa\", color = NA),\n", + " plot.margin = margin(5, 5, 5, 5)\n", + " ) +\n", + " scale_y_continuous(expand = expansion(mult = c(0, 0.1)))\n", + " plots_list[[\"treatment_rate\"]] <- p\n", + " }\n", + " \n", + " # Row 1, Col 0: Case fatality rate\n", + " if (\"case_fatality_rate\" %in% names(plot_data)) {\n", + " p <- ggplot(plot_data, aes(x = factor(YEAR), y = case_fatality_rate * 100)) +\n", + " geom_bar(stat = \"identity\", fill = \"#2563eb\", color = \"#1e40af\", width = 0.7) +\n", + " geom_text(aes(label = paste0(round(case_fatality_rate * 100, 1), \"%\")), \n", + " vjust = -0.5, size = 2.5) +\n", + " labs(title = \"Case fatality rate (MALDTH / MALADM)\", x = \"Année\", y = \"%\") +\n", + " theme_minimal() +\n", + " theme(\n", + " plot.title = element_text(face = \"bold\", size = 10),\n", + " axis.text.x = element_text(angle = 45, hjust = 1, size = 9),\n", + " panel.grid.major.y = element_line(linetype = \"dashed\", color = scales::alpha(\"grey\", 0.7)),\n", + " plot.background = element_rect(fill = \"#fafafa\", color = NA),\n", + " panel.background = element_rect(fill = \"#fafafa\", color = NA),\n", + " plot.margin = margin(5, 5, 5, 5)\n", + " ) +\n", + " scale_y_continuous(expand = expansion(mult = c(0, 0.1)))\n", + " plots_list[[\"case_fatality_rate\"]] <- p\n", + " }\n", + " \n", + " # Row 1, Col 1: Proportion admissions malaria\n", + " if (\"prop_adm_malaria\" %in% names(plot_data)) {\n", + " p <- ggplot(plot_data, aes(x = factor(YEAR), y = prop_adm_malaria * 100)) +\n", + " geom_bar(stat = \"identity\", fill = \"#2563eb\", color = \"#1e40af\", width = 0.7) +\n", + " geom_text(aes(label = paste0(round(prop_adm_malaria * 100, 1), \"%\")), \n", + " vjust = -0.5, size = 2.5) +\n", + " labs(title = \"Prop. admissions paludisme (MALADM / ALLADM)\", x = \"Année\", y = \"%\") +\n", + " theme_minimal() +\n", + " theme(\n", + " plot.title = element_text(face = \"bold\", size = 10),\n", + " axis.text.x = element_text(angle = 45, hjust = 1, size = 9),\n", + " panel.grid.major.y = element_line(linetype = \"dashed\", color = scales::alpha(\"grey\", 0.7)),\n", + " plot.background = element_rect(fill = \"#fafafa\", color = NA),\n", + " panel.background = element_rect(fill = \"#fafafa\", color = NA),\n", + " plot.margin = margin(5, 5, 5, 5)\n", + " ) +\n", + " scale_y_continuous(expand = expansion(mult = c(0, 0.1)))\n", + " plots_list[[\"prop_adm_malaria\"]] <- p\n", + " }\n", + " \n", + " # Row 2, Col 0: Proportion deaths malaria\n", + " if (\"prop_malaria_deaths\" %in% names(plot_data)) {\n", + " p <- ggplot(plot_data, aes(x = factor(YEAR), y = prop_malaria_deaths * 100)) +\n", + " geom_bar(stat = \"identity\", fill = \"#2563eb\", color = \"#1e40af\", width = 0.7) +\n", + " geom_text(aes(label = paste0(round(prop_malaria_deaths * 100, 1), \"%\")), \n", + " vjust = -0.5, size = 2.5) +\n", + " labs(title = \"Prop. décès paludisme (MALDTH / ALLDTH)\", x = \"Année\", y = \"%\") +\n", + " theme_minimal() +\n", + " theme(\n", + " plot.title = element_text(face = \"bold\", size = 10),\n", + " axis.text.x = element_text(angle = 45, hjust = 1, size = 9),\n", + " panel.grid.major.y = element_line(linetype = \"dashed\", color = scales::alpha(\"grey\", 0.7)),\n", + " plot.background = element_rect(fill = \"#fafafa\", color = NA),\n", + " panel.background = element_rect(fill = \"#fafafa\", color = NA),\n", + " plot.margin = margin(5, 5, 5, 5)\n", + " ) +\n", + " scale_y_continuous(expand = expansion(mult = c(0, 0.1)))\n", + " plots_list[[\"prop_malaria_deaths\"]] <- p\n", + " }\n", + " \n", + " # Row 2, Col 1: Presumed cases (absolute)\n", + " if (\"presumed_cases\" %in% names(plot_data)) {\n", + " format_label <- function(v) {\n", + " ifelse(is.na(v) | v == 0, \"0\",\n", + " ifelse(v >= 1e6, paste0(round(v/1e6, 2), \"M\"),\n", + " format(round(v), big.mark = \" \", scientific = FALSE)\n", + " )\n", + " )\n", + " }\n", + " p <- ggplot(plot_data, aes(x = factor(YEAR), y = presumed_cases)) +\n", + " geom_bar(stat = \"identity\", fill = \"#2563eb\", color = \"#1e40af\", width = 0.7) +\n", + " geom_text(aes(label = format_label(presumed_cases)), \n", + " vjust = -0.5, size = 2.5) +\n", + " labs(title = \"Cas présumés (PRES)\", x = \"Année\", y = \"Nombre\") +\n", + " theme_minimal() +\n", + " theme(\n", + " plot.title = element_text(face = \"bold\", size = 10),\n", + " axis.text.x = element_text(angle = 45, hjust = 1, size = 9),\n", + " panel.grid.major.y = element_line(linetype = \"dashed\", color = scales::alpha(\"grey\", 0.7)),\n", + " plot.background = element_rect(fill = \"#fafafa\", color = NA),\n", + " panel.background = element_rect(fill = \"#fafafa\", color = NA),\n", + " plot.margin = margin(5, 5, 5, 5)\n", + " ) +\n", + " scale_y_continuous(labels = scales::comma, expand = expansion(mult = c(0, 0.1)))\n", + " plots_list[[\"presumed_cases\"]] <- p\n", + " }\n", + " \n", + " # Row 3, Col 0: Non-malaria all-cause outpatients (absolute)\n", + " if (\"non_malaria_all_cause_outpatients\" %in% names(plot_data)) {\n", + " format_label <- function(v) {\n", + " ifelse(is.na(v) | v == 0, \"0\",\n", + " ifelse(v >= 1e6, paste0(round(v/1e6, 2), \"M\"),\n", + " format(round(v), big.mark = \" \", scientific = FALSE)\n", + " )\n", + " )\n", + " }\n", + " p <- ggplot(plot_data, aes(x = factor(YEAR), y = non_malaria_all_cause_outpatients)) +\n", + " geom_bar(stat = \"identity\", fill = \"#2563eb\", color = \"#1e40af\", width = 0.7) +\n", + " geom_text(aes(label = format_label(non_malaria_all_cause_outpatients)), \n", + " vjust = -0.5, size = 2.5) +\n", + " labs(title = \"Consultations externes non-paludisme (ALLOUT)\", x = \"Année\", y = \"Nombre\") +\n", + " theme_minimal() +\n", + " theme(\n", + " plot.title = element_text(face = \"bold\", size = 10),\n", + " axis.text.x = element_text(angle = 45, hjust = 1, size = 9),\n", + " panel.grid.major.y = element_line(linetype = \"dashed\", color = scales::alpha(\"grey\", 0.7)),\n", + " plot.background = element_rect(fill = \"#fafafa\", color = NA),\n", + " panel.background = element_rect(fill = \"#fafafa\", color = NA),\n", + " plot.margin = margin(5, 5, 5, 5)\n", + " ) +\n", + " scale_y_continuous(labels = scales::comma, expand = expansion(mult = c(0, 0.1)))\n", + " plots_list[[\"non_malaria_all_cause_outpatients\"]] <- p\n", + " }\n", + " \n", + " # Create and display combined plot (dynamic grid for readability)\n", + " if (length(plots_list) > 0) {\n", + " # Order plots as in original\n", + " plot_order <- c(\"testing_rate\", \"treatment_rate\", \"case_fatality_rate\", \"prop_adm_malaria\", \n", + " \"prop_malaria_deaths\", \"presumed_cases\", \"non_malaria_all_cause_outpatients\")\n", + " available_plots <- plots_list[intersect(plot_order, names(plots_list))]\n", + "\n", + " if (length(available_plots) > 0) {\n", + " n_plots <- length(available_plots)\n", + " ncol_layout <- 2\n", + " nrow_layout <- ceiling(n_plots / ncol_layout)\n", + "\n", + " # Bigger display in report so labels are readable\n", + " options(repr.plot.width = 14, repr.plot.height = max(7, 4.8 * nrow_layout))\n", + "\n", + " combined_plot <- do.call(grid.arrange, c(available_plots, ncol = ncol_layout, nrow = nrow_layout))\n", + " print(combined_plot)\n", + "\n", + " # Save at larger size for presentation readability\n", + " combined_file <- file.path(FIGURES_PATH, glue::glue(\"{COUNTRY_CODE}_quality_of_care_by_year.png\"))\n", + " ggsave(\n", + " combined_file,\n", + " plot = combined_plot,\n", + " width = 18,\n", + " height = max(8, 5.2 * nrow_layout),\n", + " dpi = 300,\n", + " bg = \"white\",\n", + " units = \"in\"\n", + " )\n", + " log_msg(glue::glue(\"Combined bar charts saved: {combined_file}\"))\n", + " }\n", + " }\n", + "}" + ] + }, + { + "cell_type": "markdown", + "id": "3b625d36", + "metadata": {}, + "source": [ + "## Maps by District and Year\n", + "\n", + "Maps are generated directly from the quality-of-care data and district shapes." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6056a979", + "metadata": { + "vscode": { + "languageId": "r" + } + }, + "outputs": [], + "source": [ + "# Load shapes geojson from dataset (like seasonality pipeline)\n", + "DHIS2_FORMATTED_DATASET <- config_json$SNT_DATASET_IDENTIFIERS$DHIS2_DATASET_FORMATTED\n", + "\n", + "shapes <- tryCatch({\n", + " get_latest_dataset_file_in_memory(DHIS2_FORMATTED_DATASET, paste0(COUNTRY_CODE, \"_shapes.geojson\"))\n", + "}, error = function(e) {\n", + " msg <- paste0(\"Error while loading DHIS2 Shapes data for: \", COUNTRY_CODE, \". \", conditionMessage(e))\n", + " log_msg(msg, level = \"error\")\n", + " stop(msg)\n", + "})\n", + "\n", + "# Ensure ADM2_ID is character in both datasets\n", + "shapes$ADM2_ID <- as.character(shapes$ADM2_ID)\n", + "qoc$ADM2_ID <- as.character(qoc$ADM2_ID)\n", + "\n", + "# Merge shapes with quality-of-care data\n", + "qoc_sf <- shapes %>%\n", + " dplyr::left_join(qoc, by = \"ADM2_ID\")\n", + "\n", + "# Helper to build readable interval labels for legends\n", + "format_interval_labels <- function(breaks_vec) {\n", + " labels <- c()\n", + " for (i in seq_len(length(breaks_vec) - 1)) {\n", + " a <- breaks_vec[i]\n", + " b <- breaks_vec[i + 1]\n", + " labels <- c(labels, paste0(scales::comma(round(a)), \" - \", scales::comma(round(b))))\n", + " }\n", + " labels\n", + "}\n", + "\n", + "# Function to plot yearly maps (similar to code notebook but inline in report)\n", + "plot_yearly_map_report <- function(sf_data, value_col, title_prefix, is_rate = TRUE) {\n", + " if (!(value_col %in% names(sf_data))) {\n", + " log_msg(glue::glue(\"[WARNING] Column '{value_col}' not found. Skipping map generation.\"), level = \"warning\")\n", + " return(invisible(NULL))\n", + " }\n", + " \n", + " years <- sort(unique(sf_data$YEAR[!is.na(sf_data$YEAR)]))\n", + " if (length(years) == 0) {\n", + " log_msg(glue::glue(\"[WARNING] No valid years for '{value_col}'. Skipping map.\"), level = \"warning\")\n", + " return(invisible(NULL))\n", + " }\n", + " \n", + " # Create plots for each year\n", + " plot_list <- list()\n", + " base_shapes <- sf_data %>% dplyr::select(ADM2_ID, geometry) %>% dplyr::distinct()\n", + "\n", + " for (yr in years) {\n", + " # Keep all districts on map, then join year values\n", + " year_vals <- sf_data[sf_data$YEAR == yr, c(\"ADM2_ID\", value_col), drop = FALSE]\n", + " year_vals <- sf::st_drop_geometry(year_vals)\n", + " year_vals <- year_vals[!duplicated(year_vals$ADM2_ID), , drop = FALSE]\n", + " sf_y <- dplyr::left_join(base_shapes, year_vals, by = \"ADM2_ID\")\n", + "\n", + " vals <- sf_y[[value_col]]\n", + " finite_vals <- vals[is.finite(vals) & !is.na(vals)]\n", + "\n", + " if (length(finite_vals) == 0) {\n", + " next\n", + " }\n", + "\n", + " # Create categories\n", + " if (is_rate) {\n", + " cat_vals <- cut(\n", + " vals,\n", + " breaks = c(-Inf, 0, 0.2, 0.4, 0.6, 0.8, 1.0, Inf),\n", + " labels = c(\"<0\", \"0-0.2\", \"0.2-0.4\", \"0.4-0.6\", \"0.6-0.8\", \"0.8-1.0\", \">1.0\"),\n", + " include.lowest = TRUE\n", + " )\n", + " fill_palette <- \"YlOrRd\"\n", + " } else {\n", + " # Use readable fixed-count classes for absolute values\n", + " n_classes <- 5\n", + " br <- unique(as.numeric(quantile(finite_vals, probs = seq(0, 1, length.out = n_classes + 1), na.rm = TRUE)))\n", + " br <- sort(br)\n", + " if (length(br) < 2) {\n", + " br <- c(min(finite_vals, na.rm = TRUE), max(finite_vals, na.rm = TRUE) + 1)\n", + " }\n", + " if (length(unique(br)) < 2) {\n", + " cat_vals <- as.factor(rep(\"single value\", nrow(sf_y)))\n", + " } else {\n", + " labels_abs <- format_interval_labels(br)\n", + " cat_vals <- cut(vals, breaks = br, include.lowest = TRUE, labels = labels_abs)\n", + " }\n", + " fill_palette <- \"Blues\"\n", + " }\n", + "\n", + " sf_y$cat <- as.factor(cat_vals)\n", + "\n", + " p <- ggplot(sf_y) +\n", + " geom_sf(aes(fill = cat), color = \"grey60\", size = 0.12) +\n", + " scale_fill_brewer(palette = fill_palette, na.value = \"#f3f4f6\", drop = FALSE) +\n", + " theme_void() +\n", + " labs(\n", + " title = paste0(title_prefix, \" - \", yr),\n", + " fill = ifelse(is_rate, \"Rate class\", \"Value class\")\n", + " ) +\n", + " guides(fill = guide_legend(nrow = 2, byrow = TRUE)) +\n", + " theme(\n", + " legend.position = \"bottom\",\n", + " legend.text = element_text(size = 9),\n", + " legend.title = element_text(size = 10, face = \"bold\"),\n", + " plot.title = element_text(face = \"bold\", size = 13)\n", + " )\n", + "\n", + " plot_list[[as.character(yr)]] <- p\n", + " }\n", + " \n", + " # Display all plots\n", + " if (length(plot_list) > 0) {\n", + " options(repr.plot.width = 10, repr.plot.height = 8)\n", + " for (yr_name in names(plot_list)) {\n", + " print(plot_list[[yr_name]])\n", + " }\n", + " }\n", + "}\n", + "\n", + "# Generate maps for each available indicator\n", + "cat(\"### Testing Rate\\n\")\n", + "if (\"testing_rate\" %in% names(qoc_sf)) {\n", + " plot_yearly_map_report(qoc_sf, \"testing_rate\", \"Testing rate (TEST / SUSP)\", TRUE)\n", + "}\n", + "\n", + "cat(\"\\n### Treatment Rate\\n\")\n", + "if (\"treatment_rate\" %in% names(qoc_sf)) {\n", + " plot_yearly_map_report(qoc_sf, \"treatment_rate\", \"Treatment rate (MALTREAT / CONF)\", TRUE)\n", + "}\n", + "\n", + "cat(\"\\n### Case Fatality Rate\\n\")\n", + "if (\"case_fatality_rate\" %in% names(qoc_sf)) {\n", + " plot_yearly_map_report(qoc_sf, \"case_fatality_rate\", \"In-hospital case fatality rate (MALDTH / MALADM)\", TRUE)\n", + "}\n", + "\n", + "cat(\"\\n### Proportion Admissions Malaria\\n\")\n", + "if (\"prop_adm_malaria\" %in% names(qoc_sf)) {\n", + " plot_yearly_map_report(qoc_sf, \"prop_adm_malaria\", \"Proportion admitted for malaria (MALADM / ALLADM)\", TRUE)\n", + "}\n", + "\n", + "cat(\"\\n### Proportion Malaria Deaths\\n\")\n", + "if (\"prop_malaria_deaths\" %in% names(qoc_sf)) {\n", + " plot_yearly_map_report(qoc_sf, \"prop_malaria_deaths\", \"Proportion of malaria deaths (MALDTH / ALLDTH)\", TRUE)\n", + "}\n", + "\n", + "cat(\"\\n### Non-malaria All-cause Outpatients\\n\")\n", + "if (\"non_malaria_all_cause_outpatients\" %in% names(qoc_sf)) {\n", + " plot_yearly_map_report(qoc_sf, \"non_malaria_all_cause_outpatients\", \"Non-malaria all-cause outpatients (ALLOUT)\", FALSE)\n", + "}\n", + "\n", + "cat(\"\\n### Presumed Cases\\n\")\n", + "if (\"presumed_cases\" %in% names(qoc_sf)) {\n", + " plot_yearly_map_report(qoc_sf, \"presumed_cases\", \"Presumed cases (PRES)\", FALSE)\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5b31e4c8", + "metadata": { + "vscode": { + "languageId": "r" + } + }, + "outputs": [], + "source": [] + }, + { + "cell_type": "markdown", + "id": "8229c37e", + "metadata": {}, + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "07324c1c", + "metadata": { + "vscode": { + "languageId": "r" + } + }, + "outputs": [], + "source": [] + }, + { + "cell_type": "markdown", + "id": "7c084da7", + "metadata": {}, + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c9f52975", + "metadata": { + "vscode": { + "languageId": "r" + } + }, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "006866ce", + "metadata": { + "vscode": { + "languageId": "r" + } + }, + "outputs": [], + "source": [] + }, + { + "cell_type": "markdown", + "id": "f7225165", + "metadata": {}, + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "420ed27f", + "metadata": { + "vscode": { + "languageId": "r" + } + }, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "67ddb838", + "metadata": { + "vscode": { + "languageId": "r" + } + }, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "R", + "language": "R", + "name": "ir" + }, + "language_info": { + "codemirror_mode": "r", + "file_extension": ".r", + "mimetype": "text/x-r-source", + "name": "R", + "pygments_lexer": "r", + "version": "4.4.3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/snt_dhis2_outliers_imputation_iqr/pipeline.py b/snt_dhis2_outliers_imputation_iqr/pipeline.py index 1c103c7..94dfb48 100644 --- a/snt_dhis2_outliers_imputation_iqr/pipeline.py +++ b/snt_dhis2_outliers_imputation_iqr/pipeline.py @@ -1,18 +1,81 @@ from pathlib import Path +import tempfile from openhexa.sdk import current_run, parameter, pipeline, workspace from snt_lib.snt_pipeline_utils import ( add_files_to_dataset, - push_data_to_db_table, load_configuration_snt, pull_scripts_from_repository, run_notebook, run_report_notebook, save_pipeline_parameters, validate_config, + create_outliers_db_table, ) +def preserve_and_add_files_to_dataset( + dataset_id: str, + country_code: str, + new_files: list[Path], + method_prefix: str, +): + """ + Add new files to dataset while preserving existing files from other methods. + + Args: + dataset_id: Dataset identifier + country_code: Country code + new_files: List of new file paths to add + method_prefix: Prefix pattern to identify files from this method (e.g., "mean", "median", "magic_glasses") + """ + try: + dataset = workspace.get_dataset(dataset_id) + latest_version = dataset.latest_version + existing_files = latest_version.list_files() + + # Filter out files from this method but keep others + preserved_files = [] + for file_obj in existing_files: + filename = file_obj.name + + # Determine if this file belongs to the current method + is_current_method = False + if method_prefix == "magic_glasses": + # Magic Glasses files: flagged_outliers_magic_glasses.parquet, outlier_magic_glasses_*.parquet + is_current_method = ( + filename == f"{country_code}_flagged_outliers_magic_glasses.parquet" or + filename.startswith(f"{country_code}_outlier_magic_glasses_") + ) + else: + # Other methods: routine_outliers-{method}*.parquet + is_current_method = filename.startswith(f"{country_code}_routine_outliers-{method_prefix}") + + # Preserve files from other methods + if not is_current_method: + try: + with tempfile.NamedTemporaryFile(delete=False, suffix=Path(filename).suffix) as tmp_file: + tmp_path = Path(tmp_file.name) + file_obj.download(tmp_path) + preserved_files.append(tmp_path) + current_run.log_info(f"Preserving existing file: {filename}") + except Exception as e: + current_run.log_warning(f"Could not preserve file {filename}: {e}") + + # Combine preserved files with new files + all_files = preserved_files + new_files + current_run.log_info(f"Adding {len(new_files)} new files and preserving {len(preserved_files)} existing files") + except Exception as e: + current_run.log_warning(f"Could not preserve existing files, adding only new files: {e}") + all_files = new_files + + add_files_to_dataset( + dataset_id=dataset_id, + country_code=country_code, + file_paths=all_files, + ) + + @pipeline("snt_dhis2_outliers_imputation_iqr") @parameter( "deviation_iqr", @@ -81,7 +144,6 @@ def snt_dhis2_outliers_imputation_iqr( if not run_report_only: input_params = { "ROOT_PATH": Path(workspace.files_path).as_posix(), - "OUTLIERS_METHOD": "IQR", "DEVIATION_IQR": deviation_iqr, } run_notebook( @@ -93,18 +155,6 @@ def snt_dhis2_outliers_imputation_iqr( country_code=country_code, ) - expected_outputs = [ - data_path / f"{country_code}_routine_outliers_detected.parquet", - data_path / f"{country_code}_routine_outliers_imputed.parquet", - data_path / f"{country_code}_routine_outliers_removed.parquet", - ] - missing_outputs = [path.name for path in expected_outputs if not path.exists()] - if missing_outputs: - raise RuntimeError( - "Expected output files were not generated by the notebook: " - + ", ".join(missing_outputs) - ) - parameters_file = save_pipeline_parameters( pipeline_name="snt_dhis2_outliers_imputation_iqr", parameters=input_params, @@ -112,17 +162,20 @@ def snt_dhis2_outliers_imputation_iqr( country_code=country_code, ) - add_files_to_dataset( - dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"], + iqr_files = list(data_path.glob(f"{country_code}_routine_outliers-iqr*.parquet")) + new_files = [*iqr_files, parameters_file] + + # Preserve existing files from other methods and add new ones + dataset_id = snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"] + preserve_and_add_files_to_dataset( + dataset_id=dataset_id, country_code=country_code, - file_paths=[*expected_outputs, parameters_file], + new_files=new_files, + method_prefix="iqr", ) if push_db: - push_data_to_db_table( - table_name="outliers_detected", - file_path=data_path / f"{country_code}_routine_outliers_detected.parquet", - ) + create_outliers_db_table(country_code=country_code, data_path=data_path) else: current_run.log_info("Skipping outliers calculations, running only the reporting notebook.") diff --git a/snt_dhis2_outliers_imputation_magic_glasses/pipeline.py b/snt_dhis2_outliers_imputation_magic_glasses/pipeline.py index 52a4273..94e5bde 100644 --- a/snt_dhis2_outliers_imputation_magic_glasses/pipeline.py +++ b/snt_dhis2_outliers_imputation_magic_glasses/pipeline.py @@ -1,11 +1,12 @@ from pathlib import Path import time +import tempfile from openhexa.sdk import current_run, parameter, pipeline, workspace from snt_lib.snt_pipeline_utils import ( add_files_to_dataset, + create_outliers_db_table, load_configuration_snt, - push_data_to_db_table, pull_scripts_from_repository, run_notebook, run_report_notebook, @@ -14,14 +15,77 @@ ) +def preserve_and_add_files_to_dataset( + dataset_id: str, + country_code: str, + new_files: list[Path], + method_prefix: str, +): + """ + Add new files to dataset while preserving existing files from other methods. + + Args: + dataset_id: Dataset identifier + country_code: Country code + new_files: List of new file paths to add + method_prefix: Prefix pattern to identify files from this method (e.g., "mean", "median", "magic_glasses") + """ + try: + dataset = workspace.get_dataset(dataset_id) + latest_version = dataset.latest_version + existing_files = latest_version.list_files() + + # Filter out files from this method but keep others + preserved_files = [] + for file_obj in existing_files: + filename = file_obj.name + + # Determine if this file belongs to the current method + is_current_method = False + if method_prefix == "magic_glasses": + # Magic Glasses files: flagged_outliers_magic_glasses.parquet, outlier_magic_glasses_*.parquet + is_current_method = ( + filename == f"{country_code}_flagged_outliers_magic_glasses.parquet" or + filename.startswith(f"{country_code}_outlier_magic_glasses_") + ) + else: + # Other methods: routine_outliers-{method}*.parquet + is_current_method = filename.startswith(f"{country_code}_routine_outliers-{method_prefix}") + + # Preserve files from other methods + if not is_current_method: + try: + with tempfile.NamedTemporaryFile(delete=False, suffix=Path(filename).suffix) as tmp_file: + tmp_path = Path(tmp_file.name) + file_obj.download(tmp_path) + preserved_files.append(tmp_path) + current_run.log_info(f"Preserving existing file: {filename}") + except Exception as e: + current_run.log_warning(f"Could not preserve file {filename}: {e}") + + # Combine preserved files with new files + all_files = preserved_files + new_files + current_run.log_info(f"Adding {len(new_files)} new files and preserving {len(preserved_files)} existing files") + except Exception as e: + current_run.log_warning(f"Could not preserve existing files, adding only new files: {e}") + all_files = new_files + + add_files_to_dataset( + dataset_id=dataset_id, + country_code=country_code, + file_paths=all_files, + ) + + @pipeline("snt_dhis2_outliers_imputation_magic_glasses") @parameter( - "complete", - name="Run complete mode", - help="False (default): Partial mode (MAD15 then MAD10). True: Complete mode (Partial + seasonal detection, can take several hours).", - type=bool, - default=False, + "mode", + name="Detection mode", + help="Partial: fast (~7 min, MAD15 then MAD10). Complete: Partial + seasonal detection, can take several hours.", + type=str, + default="partial", required=False, + choices=["partial", "complete"], ) @parameter( "push_db", @@ -48,15 +112,20 @@ required=False, ) def snt_dhis2_outliers_imputation_magic_glasses( - complete: bool, + mode: str, push_db: bool, run_report_only: bool, pull_scripts: bool, ): - """Dedicated Magic Glasses outliers pipeline for SNT DHIS2 data.""" - run_mg_complete = bool(complete) + """Dedicated Magic Glasses outliers detection pipeline for SNT DHIS2 data.""" + mode_clean = (mode or "partial").strip().lower() + if mode_clean not in ("partial", "complete"): + raise ValueError('mode must be "partial" or "complete".') + run_mg_partial = True + run_mg_complete = mode_clean == "complete" + current_run.log_info(f"Selected detection mode: {mode_clean}") current_run.log_info( - f"Selected detection mode: {'complete' if run_mg_complete else 'partial'}" + f"Flags => RUN_MAGIC_GLASSES_PARTIAL={run_mg_partial}, RUN_MAGIC_GLASSES_COMPLETE={run_mg_complete}" ) if run_mg_complete: current_run.log_warning( @@ -77,17 +146,7 @@ def snt_dhis2_outliers_imputation_magic_glasses( root_path = Path(workspace.files_path) pipeline_path = root_path / "pipelines" / "snt_dhis2_outliers_imputation_magic_glasses" - data_path = root_path / "data" / "dhis2" / "outliers_imputation" - code_notebook = ( - pipeline_path - / "code" - / "snt_dhis2_outliers_imputation_magic_glasses.ipynb" - ) - report_notebook = ( - pipeline_path - / "reporting" - / "snt_dhis2_outliers_imputation_magic_glasses_report.ipynb" - ) + data_path = root_path / "data" / "dhis2" / "outliers_detection" pipeline_path.mkdir(parents=True, exist_ok=True) data_path.mkdir(parents=True, exist_ok=True) @@ -101,12 +160,14 @@ def snt_dhis2_outliers_imputation_magic_glasses( if not run_report_only: # Avoid publishing stale artifacts from previous runs. - for old_file in data_path.glob(f"{country_code}_routine_outliers*.parquet"): + for old_file in data_path.glob(f"{country_code}_flagged_outliers_magic_glasses.parquet"): + old_file.unlink(missing_ok=True) + for old_file in data_path.glob(f"{country_code}_outlier_magic_glasses_*.parquet"): old_file.unlink(missing_ok=True) input_params = { "ROOT_PATH": Path(workspace.files_path).as_posix(), - "OUTLIERS_METHOD": "MG_COMPLETE" if run_mg_complete else "MG_PARTIAL", + "RUN_MAGIC_GLASSES_PARTIAL": run_mg_partial, "RUN_MAGIC_GLASSES_COMPLETE": run_mg_complete, "DEVIATION_MAD15": 15, "DEVIATION_MAD10": 10, @@ -116,7 +177,7 @@ def snt_dhis2_outliers_imputation_magic_glasses( } run_start_ts = time.time() run_notebook( - nb_path=code_notebook, + nb_path=pipeline_path / "code" / "snt_dhis2_outliers_imputation_magic_glasses.ipynb", out_nb_path=pipeline_path / "papermill_outputs", kernel_name="ir", parameters=input_params, @@ -124,20 +185,17 @@ def snt_dhis2_outliers_imputation_magic_glasses( country_code=country_code, ) - expected_outputs = [ - data_path / f"{country_code}_routine_outliers_detected.parquet", - data_path / f"{country_code}_routine_outliers_imputed.parquet", - data_path / f"{country_code}_routine_outliers_removed.parquet", - ] - missing_outputs = [ - path.name - for path in expected_outputs - if (not path.exists() or path.stat().st_mtime < run_start_ts) - ] - if missing_outputs: + partial_file = data_path / f"{country_code}_outlier_magic_glasses_partial.parquet" + complete_file = data_path / f"{country_code}_outlier_magic_glasses_complete.parquet" + if not partial_file.exists() or partial_file.stat().st_mtime < run_start_ts: raise RuntimeError( - "Expected output files were not generated during this run: " - + ", ".join(missing_outputs) + "Partial output file was not generated during this run." + ) + if run_mg_complete and ( + not complete_file.exists() or complete_file.stat().st_mtime < run_start_ts + ): + raise RuntimeError( + "Complete mode selected but complete seasonal output was not generated during this run." ) parameters_file = save_pipeline_parameters( @@ -147,26 +205,33 @@ def snt_dhis2_outliers_imputation_magic_glasses( country_code=country_code, ) - add_files_to_dataset( - dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"], + # Get new files for Magic Glasses + mg_files = list(data_path.glob(f"{country_code}_flagged_outliers_magic_glasses.parquet")) + mg_files.extend(data_path.glob(f"{country_code}_outlier_magic_glasses_*.parquet")) + new_files = [*mg_files, parameters_file] + + # Preserve existing files from other methods and add new ones + dataset_id = snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"] + preserve_and_add_files_to_dataset( + dataset_id=dataset_id, country_code=country_code, - file_paths=[ - *data_path.glob(f"{country_code}_routine_outliers*.parquet"), - parameters_file, - ], + new_files=new_files, + method_prefix="magic_glasses", ) if push_db: - push_data_to_db_table( - table_name="outliers_detected", - file_path=data_path / f"{country_code}_routine_outliers_detected.parquet", - ) + try: + create_outliers_db_table(country_code=country_code, data_path=data_path) + except Exception as e: + current_run.log_warning( + f"MG files were produced but DB push failed with current utility: {e}" + ) else: current_run.log_info("Skipping calculations, running only the reporting notebook.") run_report_notebook( - nb_file=report_notebook, + nb_file=pipeline_path / "reporting" / "snt_dhis2_outliers_imputation_magic_glasses_report.ipynb", nb_output_path=pipeline_path / "reporting" / "outputs", error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, country_code=country_code, diff --git a/snt_dhis2_outliers_imputation_mean/pipeline.py b/snt_dhis2_outliers_imputation_mean/pipeline.py index 3cb15c8..2b65639 100644 --- a/snt_dhis2_outliers_imputation_mean/pipeline.py +++ b/snt_dhis2_outliers_imputation_mean/pipeline.py @@ -1,18 +1,81 @@ from pathlib import Path +import tempfile from openhexa.sdk import current_run, parameter, pipeline, workspace from snt_lib.snt_pipeline_utils import ( add_files_to_dataset, - push_data_to_db_table, load_configuration_snt, pull_scripts_from_repository, run_notebook, run_report_notebook, save_pipeline_parameters, validate_config, + create_outliers_db_table, ) +def preserve_and_add_files_to_dataset( + dataset_id: str, + country_code: str, + new_files: list[Path], + method_prefix: str, +): + """ + Add new files to dataset while preserving existing files from other methods. + + Args: + dataset_id: Dataset identifier + country_code: Country code + new_files: List of new file paths to add + method_prefix: Prefix pattern to identify files from this method (e.g., "mean", "median", "magic_glasses") + """ + try: + dataset = workspace.get_dataset(dataset_id) + latest_version = dataset.latest_version + existing_files = latest_version.list_files() + + # Filter out files from this method but keep others + preserved_files = [] + for file_obj in existing_files: + filename = file_obj.name + + # Determine if this file belongs to the current method + is_current_method = False + if method_prefix == "magic_glasses": + # Magic Glasses files: flagged_outliers_magic_glasses.parquet, outlier_magic_glasses_*.parquet + is_current_method = ( + filename == f"{country_code}_flagged_outliers_magic_glasses.parquet" or + filename.startswith(f"{country_code}_outlier_magic_glasses_") + ) + else: + # Other methods: routine_outliers-{method}*.parquet + is_current_method = filename.startswith(f"{country_code}_routine_outliers-{method_prefix}") + + # Preserve files from other methods + if not is_current_method: + try: + with tempfile.NamedTemporaryFile(delete=False, suffix=Path(filename).suffix) as tmp_file: + tmp_path = Path(tmp_file.name) + file_obj.download(tmp_path) + preserved_files.append(tmp_path) + current_run.log_info(f"Preserving existing file: {filename}") + except Exception as e: + current_run.log_warning(f"Could not preserve file {filename}: {e}") + + # Combine preserved files with new files + all_files = preserved_files + new_files + current_run.log_info(f"Adding {len(new_files)} new files and preserving {len(preserved_files)} existing files") + except Exception as e: + current_run.log_warning(f"Could not preserve existing files, adding only new files: {e}") + all_files = new_files + + add_files_to_dataset( + dataset_id=dataset_id, + country_code=country_code, + file_paths=all_files, + ) + + @pipeline("snt_dhis2_outliers_imputation_mean") @parameter( "deviation_mean", @@ -81,7 +144,6 @@ def snt_dhis2_outliers_imputation_mean( if not run_report_only: input_params = { "ROOT_PATH": Path(workspace.files_path).as_posix(), - "OUTLIERS_METHOD": "MEAN", "DEVIATION_MEAN": deviation_mean, } run_notebook( @@ -93,18 +155,6 @@ def snt_dhis2_outliers_imputation_mean( country_code=country_code, ) - expected_outputs = [ - data_path / f"{country_code}_routine_outliers_detected.parquet", - data_path / f"{country_code}_routine_outliers_imputed.parquet", - data_path / f"{country_code}_routine_outliers_removed.parquet", - ] - missing_outputs = [path.name for path in expected_outputs if not path.exists()] - if missing_outputs: - raise RuntimeError( - "Expected output files were not generated by the notebook: " - + ", ".join(missing_outputs) - ) - parameters_file = save_pipeline_parameters( pipeline_name="snt_dhis2_outliers_imputation_mean", parameters=input_params, @@ -112,17 +162,21 @@ def snt_dhis2_outliers_imputation_mean( country_code=country_code, ) - add_files_to_dataset( - dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"], + # Get new files for this method + mean_files = list(data_path.glob(f"{country_code}_routine_outliers-mean*.parquet")) + new_files = [*mean_files, parameters_file] + + # Preserve existing files from other methods and add new ones + dataset_id = snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"] + preserve_and_add_files_to_dataset( + dataset_id=dataset_id, country_code=country_code, - file_paths=[*expected_outputs, parameters_file], + new_files=new_files, + method_prefix="mean", ) if push_db: - push_data_to_db_table( - table_name="outliers_detected", - file_path=data_path / f"{country_code}_routine_outliers_detected.parquet", - ) + create_outliers_db_table(country_code=country_code, data_path=data_path) else: current_run.log_info("Skipping outliers calculations, running only the reporting notebook.") diff --git a/snt_dhis2_outliers_imputation_median/pipeline.py b/snt_dhis2_outliers_imputation_median/pipeline.py index 3c19bfa..d5e0534 100644 --- a/snt_dhis2_outliers_imputation_median/pipeline.py +++ b/snt_dhis2_outliers_imputation_median/pipeline.py @@ -1,18 +1,81 @@ from pathlib import Path +import tempfile from openhexa.sdk import current_run, parameter, pipeline, workspace from snt_lib.snt_pipeline_utils import ( add_files_to_dataset, - push_data_to_db_table, load_configuration_snt, pull_scripts_from_repository, run_notebook, run_report_notebook, save_pipeline_parameters, validate_config, + create_outliers_db_table, ) +def preserve_and_add_files_to_dataset( + dataset_id: str, + country_code: str, + new_files: list[Path], + method_prefix: str, +): + """ + Add new files to dataset while preserving existing files from other methods. + + Args: + dataset_id: Dataset identifier + country_code: Country code + new_files: List of new file paths to add + method_prefix: Prefix pattern to identify files from this method (e.g., "mean", "median", "magic_glasses") + """ + try: + dataset = workspace.get_dataset(dataset_id) + latest_version = dataset.latest_version + existing_files = latest_version.list_files() + + # Filter out files from this method but keep others + preserved_files = [] + for file_obj in existing_files: + filename = file_obj.name + + # Determine if this file belongs to the current method + is_current_method = False + if method_prefix == "magic_glasses": + # Magic Glasses files: flagged_outliers_magic_glasses.parquet, outlier_magic_glasses_*.parquet + is_current_method = ( + filename == f"{country_code}_flagged_outliers_magic_glasses.parquet" or + filename.startswith(f"{country_code}_outlier_magic_glasses_") + ) + else: + # Other methods: routine_outliers-{method}*.parquet + is_current_method = filename.startswith(f"{country_code}_routine_outliers-{method_prefix}") + + # Preserve files from other methods + if not is_current_method: + try: + with tempfile.NamedTemporaryFile(delete=False, suffix=Path(filename).suffix) as tmp_file: + tmp_path = Path(tmp_file.name) + file_obj.download(tmp_path) + preserved_files.append(tmp_path) + current_run.log_info(f"Preserving existing file: {filename}") + except Exception as e: + current_run.log_warning(f"Could not preserve file {filename}: {e}") + + # Combine preserved files with new files + all_files = preserved_files + new_files + current_run.log_info(f"Adding {len(new_files)} new files and preserving {len(preserved_files)} existing files") + except Exception as e: + current_run.log_warning(f"Could not preserve existing files, adding only new files: {e}") + all_files = new_files + + add_files_to_dataset( + dataset_id=dataset_id, + country_code=country_code, + file_paths=all_files, + ) + + @pipeline("snt_dhis2_outliers_imputation_median") @parameter( "deviation_median", @@ -81,7 +144,6 @@ def snt_dhis2_outliers_imputation_median( if not run_report_only: input_params = { "ROOT_PATH": Path(workspace.files_path).as_posix(), - "OUTLIERS_METHOD": "MEDIAN", "DEVIATION_MEDIAN": deviation_median, } run_notebook( @@ -93,18 +155,6 @@ def snt_dhis2_outliers_imputation_median( country_code=country_code, ) - expected_outputs = [ - data_path / f"{country_code}_routine_outliers_detected.parquet", - data_path / f"{country_code}_routine_outliers_imputed.parquet", - data_path / f"{country_code}_routine_outliers_removed.parquet", - ] - missing_outputs = [path.name for path in expected_outputs if not path.exists()] - if missing_outputs: - raise RuntimeError( - "Expected output files were not generated by the notebook: " - + ", ".join(missing_outputs) - ) - parameters_file = save_pipeline_parameters( pipeline_name="snt_dhis2_outliers_imputation_median", parameters=input_params, @@ -112,17 +162,20 @@ def snt_dhis2_outliers_imputation_median( country_code=country_code, ) - add_files_to_dataset( - dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"], + median_files = list(data_path.glob(f"{country_code}_routine_outliers-median*.parquet")) + new_files = [*median_files, parameters_file] + + # Preserve existing files from other methods and add new ones + dataset_id = snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"] + preserve_and_add_files_to_dataset( + dataset_id=dataset_id, country_code=country_code, - file_paths=[*expected_outputs, parameters_file], + new_files=new_files, + method_prefix="median", ) if push_db: - push_data_to_db_table( - table_name="outliers_detected", - file_path=data_path / f"{country_code}_routine_outliers_detected.parquet", - ) + create_outliers_db_table(country_code=country_code, data_path=data_path) else: current_run.log_info("Skipping outliers calculations, running only the reporting notebook.") diff --git a/snt_dhis2_outliers_imputation_path/pipeline.py b/snt_dhis2_outliers_imputation_path/pipeline.py index 0320cbf..186aedc 100644 --- a/snt_dhis2_outliers_imputation_path/pipeline.py +++ b/snt_dhis2_outliers_imputation_path/pipeline.py @@ -1,18 +1,81 @@ from pathlib import Path +import tempfile from openhexa.sdk import current_run, parameter, pipeline, workspace from snt_lib.snt_pipeline_utils import ( add_files_to_dataset, - push_data_to_db_table, load_configuration_snt, pull_scripts_from_repository, run_notebook, run_report_notebook, validate_config, + create_outliers_db_table, save_pipeline_parameters, ) +def preserve_and_add_files_to_dataset( + dataset_id: str, + country_code: str, + new_files: list[Path], + method_prefix: str, +): + """ + Add new files to dataset while preserving existing files from other methods. + + Args: + dataset_id: Dataset identifier + country_code: Country code + new_files: List of new file paths to add + method_prefix: Prefix pattern to identify files from this method (e.g., "mean", "median", "magic_glasses") + """ + try: + dataset = workspace.get_dataset(dataset_id) + latest_version = dataset.latest_version + existing_files = latest_version.list_files() + + # Filter out files from this method but keep others + preserved_files = [] + for file_obj in existing_files: + filename = file_obj.name + + # Determine if this file belongs to the current method + is_current_method = False + if method_prefix == "magic_glasses": + # Magic Glasses files: flagged_outliers_magic_glasses.parquet, outlier_magic_glasses_*.parquet + is_current_method = ( + filename == f"{country_code}_flagged_outliers_magic_glasses.parquet" or + filename.startswith(f"{country_code}_outlier_magic_glasses_") + ) + else: + # Other methods: routine_outliers-{method}*.parquet + is_current_method = filename.startswith(f"{country_code}_routine_outliers-{method_prefix}") + + # Preserve files from other methods + if not is_current_method: + try: + with tempfile.NamedTemporaryFile(delete=False, suffix=Path(filename).suffix) as tmp_file: + tmp_path = Path(tmp_file.name) + file_obj.download(tmp_path) + preserved_files.append(tmp_path) + current_run.log_info(f"Preserving existing file: {filename}") + except Exception as e: + current_run.log_warning(f"Could not preserve file {filename}: {e}") + + # Combine preserved files with new files + all_files = preserved_files + new_files + current_run.log_info(f"Adding {len(new_files)} new files and preserving {len(preserved_files)} existing files") + except Exception as e: + current_run.log_warning(f"Could not preserve existing files, adding only new files: {e}") + all_files = new_files + + add_files_to_dataset( + dataset_id=dataset_id, + country_code=country_code, + file_paths=all_files, + ) + + @pipeline("snt_dhis2_outliers_imputation_path") @parameter( "deviation_mean", @@ -83,7 +146,6 @@ def snt_dhis2_outliers_imputation_path( if not run_report_only: input_params = { "ROOT_PATH": Path(workspace.files_path).as_posix(), - "OUTLIERS_METHOD": "PATH", "DEVIATION_MEAN": deviation_mean, } run_notebook( @@ -95,18 +157,6 @@ def snt_dhis2_outliers_imputation_path( country_code=country_code, ) - expected_outputs = [ - data_path / f"{country_code}_routine_outliers_detected.parquet", - data_path / f"{country_code}_routine_outliers_imputed.parquet", - data_path / f"{country_code}_routine_outliers_removed.parquet", - ] - missing_outputs = [path.name for path in expected_outputs if not path.exists()] - if missing_outputs: - raise RuntimeError( - "Expected output files were not generated by the notebook: " - + ", ".join(missing_outputs) - ) - parameters_file = save_pipeline_parameters( pipeline_name="snt_dhis2_outliers_imputation_path", parameters=input_params, @@ -114,22 +164,22 @@ def snt_dhis2_outliers_imputation_path( country_code=country_code, ) - # Add files to Dataset - add_files_to_dataset( - dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"], + # Get new files for this method (trend) + trend_files = list(data_path.glob(f"{country_code}_routine_outliers-trend*.parquet")) + new_files = [*trend_files, parameters_file] + + # Preserve existing files from other methods and add new ones + dataset_id = snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"] + preserve_and_add_files_to_dataset( + dataset_id=dataset_id, country_code=country_code, - file_paths=[ - *expected_outputs, - parameters_file, - ], + new_files=new_files, + method_prefix="trend", ) # Create consolidated outliers DB table if push_db: - push_data_to_db_table( - table_name="outliers_detected", - file_path=data_path / f"{country_code}_routine_outliers_detected.parquet", - ) + create_outliers_db_table(country_code=country_code, data_path=data_path) else: current_run.log_info("Skipping outliers calculations, running only the reporting notebook.") diff --git a/snt_dhis2_quality_of_care/.gitignore b/snt_dhis2_quality_of_care/.gitignore new file mode 100644 index 0000000..43a40cd --- /dev/null +++ b/snt_dhis2_quality_of_care/.gitignore @@ -0,0 +1,4 @@ +workspace/ +workspace.yaml +.vscode/ +__pycache__ diff --git a/snt_dhis2_quality_of_care/pipeline.py b/snt_dhis2_quality_of_care/pipeline.py new file mode 100644 index 0000000..5c066ae --- /dev/null +++ b/snt_dhis2_quality_of_care/pipeline.py @@ -0,0 +1,123 @@ +from pathlib import Path + +from openhexa.sdk import current_run, pipeline, workspace, parameter +from snt_lib.snt_pipeline_utils import ( + add_files_to_dataset, + load_configuration_snt, + validate_config, + run_report_notebook, + run_notebook, + pull_scripts_from_repository, + save_pipeline_parameters, +) + + +@pipeline("snt_dhis2_quality_of_care") +@parameter( + "data_action", + name="Data action", + help="Choose whether to use imputed data (outliers replaced) or removed data (outliers removed).", + type=str, + choices=["imputed", "removed"], + default="imputed", + required=True, +) +@parameter( + "run_report_only", + name="Run reporting only", + help="Skip computations and execute only the reporting notebook.", + type=bool, + default=False, + required=False, +) +@parameter( + "pull_scripts", + name="Pull scripts", + help="Pull the latest pipeline scripts from the repository.", + type=bool, + default=False, + required=False, +) +def snt_dhis2_quality_of_care( + data_action: str, + run_report_only: bool, + pull_scripts: bool, +): + """Compute quality-of-care indicators from outliers-imputed DHIS2 routine data.""" + try: + current_run.log_info("Starting SNT Quality of Care pipeline...") + root_path = Path(workspace.files_path) + pipeline_path = root_path / "pipelines" / "snt_dhis2_quality_of_care" + data_path = root_path / "data" / "dhis2" / "quality_of_care" + pipeline_path.mkdir(parents=True, exist_ok=True) + data_path.mkdir(parents=True, exist_ok=True) + + if pull_scripts: + current_run.log_info("Pulling pipeline scripts from repository.") + pull_scripts_from_repository( + pipeline_name="snt_dhis2_quality_of_care", + report_scripts=["snt_dhis2_quality_of_care_report.ipynb"], + code_scripts=["snt_dhis2_quality_of_care.ipynb"], + ) + + snt_config = load_configuration_snt(config_path=root_path / "configuration" / "SNT_config.json") + validate_config(snt_config) + country_code = snt_config["SNT_CONFIG"]["COUNTRY_CODE"] + + nb_parameters = { + "data_action": data_action, + } + + parameters_file = save_pipeline_parameters( + pipeline_name="snt_dhis2_quality_of_care", + parameters=nb_parameters, + output_path=data_path, + country_code=country_code, + ) + + if not run_report_only: + run_notebook( + nb_path=pipeline_path / "code" / "snt_dhis2_quality_of_care.ipynb", + out_nb_path=pipeline_path / "papermill_outputs", + kernel_name="ir", + parameters=nb_parameters, + error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, + ) + + files_to_dataset = [ + data_path / f"{country_code}_quality_of_care_district_year_{data_action}.parquet", + data_path / f"{country_code}_quality_of_care_district_year_{data_action}.csv", + parameters_file, + ] + existing_files = [f for f in files_to_dataset if f.exists()] + missing_files = [f for f in files_to_dataset if not f.exists()] + for missing in missing_files: + current_run.log_warning(f"Output file not found, skipped for dataset upload: {missing}") + + if existing_files: + add_files_to_dataset( + dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_QUALITY_OF_CARE"], + country_code=country_code, + file_paths=existing_files, + ) + else: + current_run.log_warning("No output files found for dataset upload.") + else: + current_run.log_info("Skipping computations, running only reporting notebook.") + + run_report_notebook( + nb_file=pipeline_path / "reporting" / "snt_dhis2_quality_of_care_report.ipynb", + nb_output_path=pipeline_path / "reporting" / "outputs", + error_label_severity_map={"[ERROR]": "error", "[WARNING]": "warning"}, + country_code=country_code, + ) + + current_run.log_info("Quality of Care pipeline finished successfully.") + except Exception as e: + current_run.log_error(f"Pipeline failed: {e}") + raise + + +if __name__ == "__main__": + snt_dhis2_quality_of_care() diff --git a/snt_dhis2_quality_of_care/readme.md b/snt_dhis2_quality_of_care/readme.md new file mode 100644 index 0000000..5e2b61e --- /dev/null +++ b/snt_dhis2_quality_of_care/readme.md @@ -0,0 +1,27 @@ +SNT Quality of Care Pipeline + +Description + +This pipeline computes district-year quality-of-care indicators from DHIS2 outliers-imputed routine data and generates yearly ADM2 maps. + +Parameters + + outlier_imputation_method (String, required) + Name: Outlier imputation method + Description: Select which outlier detection/imputation method to use. + Choices/Default: mean, median, iqr, trend. Default: mean. + + data_action (String, required) + Name: Data action + Description: Choose whether to use imputed data (outliers replaced) or removed data (outliers removed). + Choices/Default: imputed, removed. Default: imputed. + + run_report_only (Boolean, optional) + Name: Run reporting only + Description: Skip computations and run only reporting notebook. + Choices/Default: TRUE/FALSE. Default: FALSE. + + pull_scripts (Boolean, optional) + Name: Pull scripts + Description: Pull latest scripts from repository before run. + Choices/Default: TRUE/FALSE. Default: FALSE. diff --git a/snt_dhis2_quality_of_care/requirements.txt b/snt_dhis2_quality_of_care/requirements.txt new file mode 100644 index 0000000..e278876 --- /dev/null +++ b/snt_dhis2_quality_of_care/requirements.txt @@ -0,0 +1,2 @@ +openhexa.toolbox @ git+https://github.com/BLSQ/openhexa-toolbox@main +snt_lib @ git+https://git@github.com/BLSQ/snt_utils.git