Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions configuration/SNT_config_BFA.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"ERA5_DATASET_CLIMATE": "snt-era5-climate",
"SNT_SEASONALITY_RAINFALL": "snt-seasonality-rainfall",
"SNT_SEASONALITY_CASES": "snt-seasonality-cases",
"DHIS2_QUALITY_OF_CARE": "snt-dhis2-quality-of-care",
"SNT_MAP_EXTRACTS": "snt-map-extracts",
"SNT_RESULTS": "snt-results"
},
Expand Down
1 change: 1 addition & 0 deletions configuration/SNT_config_COD.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"ERA5_DATASET_CLIMATE": "snt-era5-climate",
"SNT_SEASONALITY_RAINFALL": "snt-seasonality-rainfall",
"SNT_SEASONALITY_CASES": "snt-seasonality-cases",
"DHIS2_QUALITY_OF_CARE": "snt-dhis2-quality-of-care",
"SNT_MAP_EXTRACTS": "snt-map-extracts",
"SNT_RESULTS": "snt-results"
},
Expand Down
1 change: 1 addition & 0 deletions configuration/SNT_config_NER.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"ERA5_DATASET_CLIMATE": "snt-era5-climate",
"SNT_SEASONALITY_RAINFALL": "snt-seasonality-rainfall",
"SNT_SEASONALITY_CASES": "snt-seasonality-cases",
"DHIS2_QUALITY_OF_CARE": "snt-dhis2-quality-of-care",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we include an extra dataset in the configuration file, we should also make sure all config files (to all other countries) have this information available. For this we use a function called : validate_config() , part of the snt_lib .

I suggest to include the verification of this additional dataset in the list required_dataset_keys , this will raise an error if the entry does not exist or if its empty. Should be just :

  required_dataset_keys = [
       ...
       "SNT_RESULTS",
       "DHIS2_QUALITY_OF_CARE"
   ]

"SNT_MAP_EXTRACTS": "snt-map-extracts",
"SNT_RESULTS": "snt-results"
},
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

74 changes: 71 additions & 3 deletions snt_dhis2_outliers_imputation_iqr/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pathlib import Path
import tempfile

from openhexa.sdk import current_run, parameter, pipeline, workspace
from snt_lib.snt_pipeline_utils import (
Expand All @@ -13,6 +14,68 @@
)


def preserve_and_add_files_to_dataset(
dataset_id: str,
country_code: str,
new_files: list[Path],
method_prefix: str,
):
"""
Add new files to dataset while preserving existing files from other methods.

Args:
dataset_id: Dataset identifier
country_code: Country code
new_files: List of new file paths to add
method_prefix: Prefix pattern to identify files from this method (e.g., "mean", "median", "magic_glasses")
"""
try:
dataset = workspace.get_dataset(dataset_id)
latest_version = dataset.latest_version
existing_files = latest_version.list_files()

# Filter out files from this method but keep others
preserved_files = []
for file_obj in existing_files:
filename = file_obj.name

# Determine if this file belongs to the current method
is_current_method = False
if method_prefix == "magic_glasses":
# Magic Glasses files: flagged_outliers_magic_glasses.parquet, outlier_magic_glasses_*.parquet
is_current_method = (
filename == f"{country_code}_flagged_outliers_magic_glasses.parquet" or
filename.startswith(f"{country_code}_outlier_magic_glasses_")
)
else:
# Other methods: routine_outliers-{method}*.parquet
is_current_method = filename.startswith(f"{country_code}_routine_outliers-{method_prefix}")

# Preserve files from other methods
if not is_current_method:
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=Path(filename).suffix) as tmp_file:
tmp_path = Path(tmp_file.name)
file_obj.download(tmp_path)
preserved_files.append(tmp_path)
current_run.log_info(f"Preserving existing file: {filename}")
except Exception as e:
current_run.log_warning(f"Could not preserve file {filename}: {e}")

# Combine preserved files with new files
all_files = preserved_files + new_files
current_run.log_info(f"Adding {len(new_files)} new files and preserving {len(preserved_files)} existing files")
except Exception as e:
current_run.log_warning(f"Could not preserve existing files, adding only new files: {e}")
all_files = new_files

add_files_to_dataset(
dataset_id=dataset_id,
country_code=country_code,
file_paths=all_files,
)


@pipeline("snt_dhis2_outliers_imputation_iqr")
@parameter(
"deviation_iqr",
Expand Down Expand Up @@ -100,10 +163,15 @@ def snt_dhis2_outliers_imputation_iqr(
)

iqr_files = list(data_path.glob(f"{country_code}_routine_outliers-iqr*.parquet"))
add_files_to_dataset(
dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"],
new_files = [*iqr_files, parameters_file]

# Preserve existing files from other methods and add new ones
dataset_id = snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"]
preserve_and_add_files_to_dataset(
dataset_id=dataset_id,
country_code=country_code,
file_paths=[*iqr_files, parameters_file],
new_files=new_files,
method_prefix="iqr",
)

if push_db:
Expand Down
81 changes: 74 additions & 7 deletions snt_dhis2_outliers_imputation_magic_glasses/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pathlib import Path
import time
import tempfile

from openhexa.sdk import current_run, parameter, pipeline, workspace
from snt_lib.snt_pipeline_utils import (
Expand All @@ -14,6 +15,68 @@
)


def preserve_and_add_files_to_dataset(
dataset_id: str,
country_code: str,
new_files: list[Path],
method_prefix: str,
):
"""
Add new files to dataset while preserving existing files from other methods.

Args:
dataset_id: Dataset identifier
country_code: Country code
new_files: List of new file paths to add
method_prefix: Prefix pattern to identify files from this method (e.g., "mean", "median", "magic_glasses")
"""
try:
dataset = workspace.get_dataset(dataset_id)
latest_version = dataset.latest_version
existing_files = latest_version.list_files()

# Filter out files from this method but keep others
preserved_files = []
for file_obj in existing_files:
filename = file_obj.name

# Determine if this file belongs to the current method
is_current_method = False
if method_prefix == "magic_glasses":
# Magic Glasses files: flagged_outliers_magic_glasses.parquet, outlier_magic_glasses_*.parquet
is_current_method = (
filename == f"{country_code}_flagged_outliers_magic_glasses.parquet" or
filename.startswith(f"{country_code}_outlier_magic_glasses_")
)
else:
# Other methods: routine_outliers-{method}*.parquet
is_current_method = filename.startswith(f"{country_code}_routine_outliers-{method_prefix}")

# Preserve files from other methods
if not is_current_method:
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=Path(filename).suffix) as tmp_file:
tmp_path = Path(tmp_file.name)
file_obj.download(tmp_path)
preserved_files.append(tmp_path)
current_run.log_info(f"Preserving existing file: {filename}")
except Exception as e:
current_run.log_warning(f"Could not preserve file {filename}: {e}")

# Combine preserved files with new files
all_files = preserved_files + new_files
current_run.log_info(f"Adding {len(new_files)} new files and preserving {len(preserved_files)} existing files")
except Exception as e:
current_run.log_warning(f"Could not preserve existing files, adding only new files: {e}")
all_files = new_files

add_files_to_dataset(
dataset_id=dataset_id,
country_code=country_code,
file_paths=all_files,
)


@pipeline("snt_dhis2_outliers_imputation_magic_glasses")
@parameter(
"mode",
Expand Down Expand Up @@ -142,14 +205,18 @@ def snt_dhis2_outliers_imputation_magic_glasses(
country_code=country_code,
)

add_files_to_dataset(
dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"],
# Get new files for Magic Glasses
mg_files = list(data_path.glob(f"{country_code}_flagged_outliers_magic_glasses.parquet"))
mg_files.extend(data_path.glob(f"{country_code}_outlier_magic_glasses_*.parquet"))
new_files = [*mg_files, parameters_file]

# Preserve existing files from other methods and add new ones
dataset_id = snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"]
preserve_and_add_files_to_dataset(
dataset_id=dataset_id,
country_code=country_code,
file_paths=[
*data_path.glob(f"{country_code}_flagged_outliers_magic_glasses.parquet"),
*data_path.glob(f"{country_code}_outlier_magic_glasses_*.parquet"),
parameters_file,
],
new_files=new_files,
method_prefix="magic_glasses",
)

if push_db:
Expand Down
75 changes: 72 additions & 3 deletions snt_dhis2_outliers_imputation_mean/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pathlib import Path
import tempfile

from openhexa.sdk import current_run, parameter, pipeline, workspace
from snt_lib.snt_pipeline_utils import (
Expand All @@ -13,6 +14,68 @@
)


def preserve_and_add_files_to_dataset(
dataset_id: str,
country_code: str,
new_files: list[Path],
method_prefix: str,
):
"""
Add new files to dataset while preserving existing files from other methods.

Args:
dataset_id: Dataset identifier
country_code: Country code
new_files: List of new file paths to add
method_prefix: Prefix pattern to identify files from this method (e.g., "mean", "median", "magic_glasses")
"""
try:
dataset = workspace.get_dataset(dataset_id)
latest_version = dataset.latest_version
existing_files = latest_version.list_files()

# Filter out files from this method but keep others
preserved_files = []
for file_obj in existing_files:
filename = file_obj.name

# Determine if this file belongs to the current method
is_current_method = False
if method_prefix == "magic_glasses":
# Magic Glasses files: flagged_outliers_magic_glasses.parquet, outlier_magic_glasses_*.parquet
is_current_method = (
filename == f"{country_code}_flagged_outliers_magic_glasses.parquet" or
filename.startswith(f"{country_code}_outlier_magic_glasses_")
)
else:
# Other methods: routine_outliers-{method}*.parquet
is_current_method = filename.startswith(f"{country_code}_routine_outliers-{method_prefix}")

# Preserve files from other methods
if not is_current_method:
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=Path(filename).suffix) as tmp_file:
tmp_path = Path(tmp_file.name)
file_obj.download(tmp_path)
preserved_files.append(tmp_path)
current_run.log_info(f"Preserving existing file: {filename}")
except Exception as e:
current_run.log_warning(f"Could not preserve file {filename}: {e}")

# Combine preserved files with new files
all_files = preserved_files + new_files
current_run.log_info(f"Adding {len(new_files)} new files and preserving {len(preserved_files)} existing files")
except Exception as e:
current_run.log_warning(f"Could not preserve existing files, adding only new files: {e}")
all_files = new_files

add_files_to_dataset(
dataset_id=dataset_id,
country_code=country_code,
file_paths=all_files,
)


@pipeline("snt_dhis2_outliers_imputation_mean")
@parameter(
"deviation_mean",
Expand Down Expand Up @@ -99,11 +162,17 @@ def snt_dhis2_outliers_imputation_mean(
country_code=country_code,
)

# Get new files for this method
mean_files = list(data_path.glob(f"{country_code}_routine_outliers-mean*.parquet"))
add_files_to_dataset(
dataset_id=snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"],
new_files = [*mean_files, parameters_file]

# Preserve existing files from other methods and add new ones
dataset_id = snt_config["SNT_DATASET_IDENTIFIERS"]["DHIS2_OUTLIERS_IMPUTATION"]
preserve_and_add_files_to_dataset(
dataset_id=dataset_id,
country_code=country_code,
file_paths=[*mean_files, parameters_file],
new_files=new_files,
method_prefix="mean",
)

if push_db:
Expand Down
Loading