diff --git a/.github/workflows/extra.yml b/.github/workflows/extra.yml index 99e1581ac..de7fd86dc 100644 --- a/.github/workflows/extra.yml +++ b/.github/workflows/extra.yml @@ -84,6 +84,8 @@ jobs: rm ./libensemble/tests/regression_tests/test_gpCAM.py # needs gpcam, which doesn't build on 3.13 rm ./libensemble/tests/regression_tests/test_asktell_gpCAM.py # needs gpcam, which doesn't build on 3.13 rm ./libensemble/tests/regression_tests/test_persistent_gp_multitask_ax.py # needs ax-platform, which doesn't yet support 3.14 + rm ./libensemble/tests/regression_tests/test_optimas_ax_mf.py # needs ax-platform, which doesn't yet support 3.14 + rm ./libensemble/tests/regression_tests/test_optimas_ax_sf.py # needs ax-platform, which doesn't yet support 3.14 - name: Start Redis if: matrix.os == 'ubuntu-latest' diff --git a/libensemble/gen_classes/aposmm.py b/libensemble/gen_classes/aposmm.py index 5c92d544d..71c2fcb43 100644 --- a/libensemble/gen_classes/aposmm.py +++ b/libensemble/gen_classes/aposmm.py @@ -293,7 +293,13 @@ def __init__( def _slot_in_data(self, results): """Slot in libE_calc_in and trial data into corresponding array fields. *Initial sample only!!*""" - self._ingest_buf[self._n_buffd_results : self._n_buffd_results + len(results)] = results + for name in results.dtype.names: + if name == "_id": + self._ingest_buf["sim_id"][self._n_buffd_results : self._n_buffd_results + len(results)] = results[ + "_id" + ] + else: + self._ingest_buf[name][self._n_buffd_results : self._n_buffd_results + len(results)] = results[name] def _enough_initial_sample(self): return ( @@ -361,7 +367,11 @@ def ingest_numpy(self, results: npt.NDArray, tag: int = EVAL_GEN_TAG) -> None: # Initial sample buffering here: if self._n_buffd_results == 0: - self._ingest_buf = np.zeros(self.gen_specs["user"]["initial_sample_size"], dtype=results.dtype) + # Create a dtype that includes sim_id but excludes _id + descr = [d for d in results.dtype.descr if d[0] != "_id"] + if "sim_id" not in [d[0] for d in descr]: + descr.append(("sim_id", int)) + self._ingest_buf = np.zeros(self.gen_specs["user"]["initial_sample_size"], dtype=descr) if not self._enough_initial_sample(): self._slot_in_data(np.copy(results)) diff --git a/libensemble/history.py b/libensemble/history.py index 388ba08b1..aa16a4d77 100644 --- a/libensemble/history.py +++ b/libensemble/history.py @@ -109,7 +109,7 @@ def __init__( self.last_ended = -1 def _append_new_fields(self, H_f: npt.NDArray) -> None: - dtype_new = np.dtype(list(set(self.H.dtype.descr + H_f.dtype.descr))) + dtype_new = np.dtype(list(set(self.H.dtype.descr + np.lib.recfunctions.repack_fields(H_f).dtype.descr))) H_new = np.zeros(len(self.H), dtype=dtype_new) old_fields = self.H.dtype.names for field in old_fields: @@ -121,10 +121,10 @@ def update_history_f(self, D: dict, kill_canceled_sims: bool = False) -> None: Updates the history after points have been evaluated """ - new_inds = D["libE_info"]["H_rows"] # The list of rows (as a numpy array) + new_inds = D["libE_info"]["H_rows"] returned_H = D["calc_out"] - fields = returned_H.dtype.names if returned_H is not None else [] + fields = returned_H.dtype.names if returned_H is not None else [] if returned_H is not None and any([field not in self.H.dtype.names for field in returned_H.dtype.names]): self._append_new_fields(returned_H) diff --git a/libensemble/manager.py b/libensemble/manager.py index 22ae8b5d3..5c084d321 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -484,7 +484,7 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) - calc_status = D_recv["calc_status"] keep_state = D_recv["libE_info"].get("keep_state", False) - if w not in self.persis_pending and not self.W[w]["active_recv"] and not keep_state: + if (w not in self.persis_pending and not self.W[w]["active_recv"] and not keep_state) or self.WorkerExc: self.W[w]["active"] = 0 if calc_status in [FINISHED_PERSISTENT_SIM_TAG, FINISHED_PERSISTENT_GEN_TAG]: @@ -507,7 +507,17 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) - self._freeup_resources(w) else: if calc_type == EVAL_SIM_TAG: - self.hist.update_history_f(D_recv, self.kill_canceled_sims) + try: + self.hist.update_history_f(D_recv, self.kill_canceled_sims) + except AttributeError as e: + if self.WorkerExc: + logger.debug(f"Manager ignoring secondary data error from worker {w} during shutdown: {e}") + else: + self.WorkerExc = True + self._kill_workers() + raise WorkerException( + f"Error in data from worker {w}", str(e), traceback.format_exc() + ) from None if calc_type == EVAL_GEN_TAG: D = D_recv["calc_out"] self._ensure_sim_id_in_persis_in(D) diff --git a/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py b/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py index 83e3bf625..cbcd4c22e 100644 --- a/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py +++ b/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py @@ -15,7 +15,6 @@ # TESTSUITE_COMMS: local mpi tcp # TESTSUITE_NPROCS: 3 -import sys from math import gamma, pi, sqrt import numpy as np @@ -23,10 +22,8 @@ import libensemble.gen_funcs from libensemble.executors.mpi_executor import MPIExecutor from libensemble.sim_funcs import six_hump_camel -from libensemble.sim_funcs.executor_hworld import executor_hworld as sim_f_exec # Import libEnsemble items for this test -from libensemble.sim_funcs.six_hump_camel import six_hump_camel as sim_f libensemble.gen_funcs.rc.aposmm_optimizers = "nlopt" from time import time @@ -34,34 +31,46 @@ from gest_api.vocs import VOCS from libensemble import Ensemble -from libensemble.alloc_funcs.persistent_aposmm_alloc import persistent_aposmm_alloc as alloc_f +from libensemble.alloc_funcs.start_only_persistent import only_persistent_gens as alloc_f from libensemble.gen_classes import APOSMM +from libensemble.manager import LoggedException from libensemble.specs import AllocSpecs, ExitCriteria, GenSpecs, SimSpecs from libensemble.tests.regression_tests.support import six_hump_camel_minima as minima + +def six_hump_camel_func(x): + """ + Definition of the six-hump camel + """ + x1 = x["core"] + x2 = x["edge"] + term1 = (4 - 2.1 * x1**2 + (x1**4) / 3) * x1**2 + term2 = x1 * x2 + term3 = (-4 + 4 * x2**2) * x2**2 + + return {"energy": term1 + term2 + term3} + + # Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). if __name__ == "__main__": - for run in range(2): + for run in range(3): workflow = Ensemble(parse_args=True) if workflow.is_manager: start_time = time() - if workflow.nworkers < 2: - sys.exit("Cannot run with a persistent worker if only one worker -- aborting...") - n = 2 workflow.alloc_specs = AllocSpecs(alloc_f=alloc_f) + workflow.libE_specs.gen_on_manager = True + vocs = VOCS( variables={"core": [-3, 3], "edge": [-2, 2], "core_on_cube": [-3, 3], "edge_on_cube": [-2, 2]}, objectives={"energy": "MINIMIZE"}, ) - workflow.libE_specs.gen_on_manager = True - aposmm = APOSMM( vocs, max_active_runs=workflow.nworkers, # should this match nworkers always? practically? @@ -74,34 +83,46 @@ ftol_abs=1e-6, ) - # SH TODO - dont want this stuff duplicated - pass with vocs instead workflow.gen_specs = GenSpecs( - persis_in=["x", "x_on_cube", "sim_id", "local_min", "local_pt", "f"], generator=aposmm, + vocs=vocs, batch_size=5, initial_batch_size=10, - user={"initial_sample_size": 100}, ) if run == 0: - workflow.sim_specs = SimSpecs(sim_f=sim_f, inputs=["x"], outputs=[("f", float)]) + workflow.sim_specs = SimSpecs(simulator=six_hump_camel_func, vocs=vocs) workflow.exit_criteria = ExitCriteria(sim_max=2000) elif run == 1: workflow.persis_info["num_gens_started"] = 0 sim_app2 = six_hump_camel.__file__ exctr = MPIExecutor() exctr.register_app(full_path=sim_app2, app_name="six_hump_camel", calc_type="sim") # Named app + workflow.sim_specs = SimSpecs(simulator=six_hump_camel_func, vocs=vocs) + workflow.exit_criteria = ExitCriteria(sim_max=200) + elif run == 2: + workflow.persis_info["num_gens_started"] = 0 workflow.sim_specs = SimSpecs( - sim_f=sim_f_exec, inputs=["x"], outputs=[("f", float), ("cstat", int)], user={"cores": 1} - ) + sim_f=six_hump_camel_func, vocs=vocs + ) # wrong parameter, but check we get error message workflow.exit_criteria = ExitCriteria(sim_max=200) + return_flag = False + workflow.libE_specs.abort_on_exception = False workflow.add_random_streams() - H, _, _ = workflow.run() + try: + H, _, _ = workflow.run() + except Exception as e: + assert isinstance(e, LoggedException) + aposmm.finalize() + return_flag = False + continue - # Perform the run + if run == 2 and workflow.is_manager: + assert return_flag + # Perform the run if workflow.is_manager and run == 0: print("[Manager]:", H[np.where(H["local_min"])]["x"]) print("[Manager]: Time taken =", time() - start_time, flush=True) diff --git a/libensemble/tests/regression_tests/test_optimas_ax_mf.py b/libensemble/tests/regression_tests/test_optimas_ax_mf.py index b6f43b3ed..758aa1fc2 100644 --- a/libensemble/tests/regression_tests/test_optimas_ax_mf.py +++ b/libensemble/tests/regression_tests/test_optimas_ax_mf.py @@ -16,9 +16,9 @@ # TESTSUITE_COMMS: mpi local # TESTSUITE_NPROCS: 4 # TESTSUITE_EXTRA: true +# TESTSUITE_OS_SKIP: OSX import numpy as np - from gest_api.vocs import VOCS from optimas.generators import AxMultiFidelityGenerator @@ -32,10 +32,7 @@ def eval_func_mf(input_params): x0 = input_params["x0"] x1 = input_params["x1"] resolution = input_params["res"] - result = -( - (x0 + 10 * np.cos(x0 + 0.1 * resolution)) - * (x1 + 5 * np.cos(x1 - 0.2 * resolution)) - ) + result = -((x0 + 10 * np.cos(x0 + 0.1 * resolution)) * (x1 + 5 * np.cos(x1 - 0.2 * resolution))) return {"f": result} diff --git a/libensemble/tests/regression_tests/test_optimas_ax_multitask.py b/libensemble/tests/regression_tests/test_optimas_ax_multitask.py index 04a2b5430..9e97dcad7 100644 --- a/libensemble/tests/regression_tests/test_optimas_ax_multitask.py +++ b/libensemble/tests/regression_tests/test_optimas_ax_multitask.py @@ -23,10 +23,10 @@ # TESTSUITE_NPROCS: 4 # TESTSUITE_EXTRA: true # TESTSUITE_EXCLUDE: true +# TESTSUITE_OS_SKIP: OSX import numpy as np from gest_api.vocs import VOCS - from optimas.core import Task from optimas.generators import AxMultitaskGenerator @@ -37,7 +37,7 @@ def eval_func_multitask(input_params): """Evaluation function for task1 or task2 in multitask test""" - print(f'input_params: {input_params}') + print(f"input_params: {input_params}") x0 = input_params["x0"] x1 = input_params["x1"] trial_type = input_params["trial_type"] diff --git a/libensemble/tests/regression_tests/test_optimas_ax_sf.py b/libensemble/tests/regression_tests/test_optimas_ax_sf.py index ba0b66c29..e4ee9e8a7 100644 --- a/libensemble/tests/regression_tests/test_optimas_ax_sf.py +++ b/libensemble/tests/regression_tests/test_optimas_ax_sf.py @@ -16,9 +16,9 @@ # TESTSUITE_COMMS: mpi local # TESTSUITE_NPROCS: 4 # TESTSUITE_EXTRA: true +# TESTSUITE_OS_SKIP: OSX import numpy as np - from gest_api.vocs import VOCS from optimas.generators import AxSingleFidelityGenerator @@ -28,7 +28,7 @@ def eval_func_sf(input_params): - """Evaluation function for single-fidelity test. """ + """Evaluation function for single-fidelity test.""" x0 = input_params["x0"] x1 = input_params["x1"] result = -(x0 + 10 * np.cos(x0)) * (x1 + 5 * np.cos(x1)) diff --git a/libensemble/utils/misc.py b/libensemble/utils/misc.py index 19b67f37e..83f238880 100644 --- a/libensemble/utils/misc.py +++ b/libensemble/utils/misc.py @@ -74,13 +74,7 @@ def _get_new_dtype_fields(first: dict, mapping: dict = {}) -> list: fields_to_convert = list( # combining all mapping lists chain.from_iterable(list(mapping.values())) ) # fields like ["beam_length", "beam_width"] that will become "x" - new_dtype_names = [i for i in new_dtype_names if i not in fields_to_convert] + list( - mapping.keys() - ) # array dtype needs "x". avoid fields from mapping values since we're converting those to "x" - # We need to accommodate "_id" getting mapped to "sim_id", but if it's not present - # in the input dictionary, then perhaps we're doing an initial sample. - if "_id" not in first and "sim_id" in mapping: - new_dtype_names.remove("sim_id") + new_dtype_names = [i for i in new_dtype_names if i not in fields_to_convert] return new_dtype_names @@ -139,9 +133,7 @@ def list_dicts_to_np(list_dicts: list, dtype: list = None, mapping: dict = {}) - new_dtype_names = _get_new_dtype_fields(first, mapping) combinable_names = _get_combinable_multidim_names(first, new_dtype_names) # [['x0', 'x1'], ['z']] - if ( - dtype is None - ): # Default value gets set upon function instantiation (default is mutable). + if dtype is None: # Default value gets set upon function instantiation (default is mutable). dtype = [] # build dtype of non-mapped fields. appending onto empty dtype @@ -152,9 +144,12 @@ def list_dicts_to_np(list_dicts: list, dtype: list = None, mapping: dict = {}) - if len(mapping): existing_names = [f[0] for f in dtype] for name in mapping: - if name not in existing_names: + # If the field is already in the dtype, skip it. *And* the field is present in the input data + if name not in existing_names and all(src in first for src in mapping[name]): size = len(mapping[name]) dtype.append(_decide_dtype(name, 0.0, size)) # default to float + new_dtype_names.append(name) + combinable_names.append(mapping[name]) out = np.zeros(len(list_dicts), dtype=dtype) @@ -219,10 +214,69 @@ def unmap_numpy_array(array: npt.NDArray, mapping: dict = {}) -> npt.NDArray: return unmapped_array +def map_numpy_array(array: npt.NDArray, mapping: dict = {}) -> npt.NDArray: + """Convert numpy array with individual scalar fields to mapped fields. + Parameters + ---------- + array : npt.NDArray + Input array with unmapped fields like x0, x1, x2 + mapping : dict + Mapping from field names to variable names + Returns + ------- + npt.NDArray + Array with mapped fields like x = [x0, x1, x2] + """ + if not mapping or array is None: + return array + + # Create new dtype with mapped fields + new_fields = [] + + # Track fields processed by mapping to avoid duplication + mapped_source_fields = set() + for key, val_list in mapping.items(): + mapped_source_fields.update(val_list) + + # First add mapped fields from the mapping definition + for mapped_name, val_list in mapping.items(): + first_var = val_list[0] + # We assume all components have the same type, take from first + base_type = array.dtype[first_var] + size = len(val_list) + if size > 1: + new_fields.append((mapped_name, base_type, (size,))) + else: + new_fields.append((mapped_name, base_type)) + + # Then add any fields from the source array that were NOT part of a mapping + for field in array.dtype.names: + if field not in mapped_source_fields: + new_fields.append((field, array.dtype[field])) + + # remove duplicates from new_fields + new_fields = list(dict.fromkeys(new_fields)) + + # Create the new array + mapped_array = np.zeros(len(array), dtype=new_fields) + + # Fill the new array + for field in mapped_array.dtype.names: + # Mapped field: stack the source columns + val_list = mapping[field] + if len(val_list) == 1: + mapped_array[field] = array[val_list[0]] + else: + # Stack columns horizontally for each row + # We need to extract each column, then stack them along axis 1 + cols = [array[val] for val in val_list] + mapped_array[field] = np.stack(cols, axis=1) + + return mapped_array + + def np_to_list_dicts(array: npt.NDArray, mapping: dict = {}) -> List[dict]: """Convert numpy structured array to list of dicts""" - if array is None: - return None out = [] for row in array: diff --git a/libensemble/utils/runners.py b/libensemble/utils/runners.py index 0d96b099b..ee0ebd65c 100644 --- a/libensemble/utils/runners.py +++ b/libensemble/utils/runners.py @@ -9,7 +9,7 @@ from libensemble.generators import LibensembleGenerator, PersistentGenInterfacer from libensemble.message_numbers import EVAL_GEN_TAG, FINISHED_PERSISTENT_GEN_TAG, PERSIS_STOP, STOP_TAG from libensemble.tools.persistent_support import PersistentSupport -from libensemble.utils.misc import list_dicts_to_np, np_to_list_dicts +from libensemble.utils.misc import list_dicts_to_np, map_numpy_array, np_to_list_dicts, unmap_numpy_array logger = logging.getLogger(__name__) @@ -51,7 +51,20 @@ def shutdown(self) -> None: def run(self, calc_in: npt.NDArray, Work: dict) -> (npt.NDArray, dict, int | None): if Work["persis_info"] is None: Work["persis_info"] = {} - return self._result(calc_in, Work["persis_info"], Work["libE_info"]) + out = self._result(calc_in, Work["persis_info"], Work["libE_info"]) + + # Help users who mixed up sim_f and simulator parameters + if isinstance(out, (tuple, list)): + calc_out = out[0] + else: + calc_out = out + + if isinstance(calc_out, dict): + raise AttributeError( + "Manager received a dictionary from a simulation. " + "Perhaps you meant to set `SimSpecs.simulator` instead of `SimSpecs.sim_f`?" + ) + return out class GlobusComputeRunner(Runner): @@ -195,14 +208,17 @@ def _convert_initial_ingest(self, x: npt.NDArray) -> list: class LibensembleGenThreadRunner(StandardGenRunner): - def _get_initial_suggest(self, libE_info) -> npt.NDArray: + def _get_initial_suggest(self, _) -> npt.NDArray: """Get initial batch from generator based on generator type""" - return self.gen.suggest_numpy() # libE really needs to receive the *entire* initial batch from a threaded gen + return unmap_numpy_array(self.gen.suggest_numpy(), mapping=getattr(self.gen, "variables_mapping", {})) + + def _convert_initial_ingest(self, x: npt.NDArray) -> list: + self.gen.ingest_numpy(map_numpy_array(x, mapping=getattr(self.gen, "variables_mapping", {}))) def _suggest_and_send(self): """Loop over generator's outbox contents, send to manager""" while not self.gen._running_gen_f.outbox.empty(): # recv/send any outstanding messages - points = self.gen.suggest_numpy() + points = unmap_numpy_array(self.gen.suggest_numpy(), mapping=getattr(self.gen, "variables_mapping", {})) if callable(getattr(self.gen, "suggest_updates", None)): updates = self.gen.suggest_updates() else: @@ -222,6 +238,8 @@ def _loop_over_gen(self, *args): while self.ps.comm.mail_flag(): # receive any new messages from Manager, give all to gen tag, _, H_in = self.ps.recv() if tag in [STOP_TAG, PERSIS_STOP]: - self.gen.ingest_numpy(H_in, PERSIS_STOP) + self.gen.ingest_numpy( + map_numpy_array(H_in, mapping=getattr(self.gen, "variables_mapping", {})), PERSIS_STOP + ) return self.gen._running_gen_f.result() - self.gen.ingest_numpy(H_in) + self.gen.ingest_numpy(map_numpy_array(H_in, mapping=getattr(self.gen, "variables_mapping", {}))) diff --git a/pixi.lock b/pixi.lock index b0c5eb203..79394ef44 100644 --- a/pixi.lock +++ b/pixi.lock @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:c10357cee7d3813245c607ea39fde22da46119409451949c0780225515aa5afa -size 1216383 +oid sha256:567e81eabdaf24db518cc9f93fa60e37d95dc39bc6a9b69db9eed1e8ec193997 +size 1216617