Mopet
Exploration
df
property
readonly
Returns a dataframe with exploration results. Creates it from new if it doesn't exist yet.
Returns:
Type | Description |
---|---|
pandas.DataFrame |
Dataframe with exploration results |
__init__(self, function, explore_params, default_params=None, exploration_name=None, hdf_filename=None, num_cpus=None, num_gpus=None)
special
Defines a parameter exploration of a given function
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function |
function |
Function to evaluate at each run |
required |
explore_params |
dict |
Exploration parameters (individual) for each run |
required |
default_params |
dict |
Default (shared) parameters to load for each run, optional, defaults to None |
None |
exploration_name |
str, optional |
Name of the run, will create a name if left empty, defaults to None |
None |
hdf_filename |
str, optional |
Filename of the hdf storage file, defaults to None |
None |
num_cpus |
int |
Number of desired CPU cores passed to ray, defaults to None |
None |
num_gpus |
int |
Number of desired GPUs passed to ray, defaults to None |
None |
Returns:
Type | Description |
---|---|
|
Exploration instance |
Source code in mopet/mopet.py
def __init__(
self,
function,
explore_params,
default_params=None,
exploration_name=None,
hdf_filename=None,
num_cpus: int = None,
num_gpus: int = None,
):
"""Defines a parameter exploration of a given `function`.
:param function: Function to evaluate at each run
:type function: function
:param explore_params: Exploration parameters (individual) for each run
:type explore_params: dict
:param default_params: Default (shared) parameters to load for each run, optional, defaults to None
:type default_params: dict
:param exploration_name: Name of the run, will create a name if left empty, defaults to None
:type exploration_name: str, optional
:param hdf_filename: Filename of the hdf storage file, defaults to None
:type hdf_filename: str, optional
:param num_cpus: Number of desired CPU cores passed to ray, defaults to None
:type num_cpus: int, optional
:param num_gpus: Number of desired GPUs passed to ray, defaults to None
:type num_gpus: int, optional
:return: Exploration instance
"""
self.function = function
self.results = {}
self.results_params = []
if default_params is not None:
self.default_params = copy.deepcopy(default_params)
self.full_params = True
else:
self.default_params = None
self.full_params = False
self.explore_params = copy.deepcopy(explore_params)
if exploration_name is None:
exploration_name = "exploration" + datetime.datetime.now().strftime("_%Y_%m_%d_%HH_%MM_%SS")
self.exploration_name = exploration_name
if hdf_filename is None:
hdf_filename = "exploration.h5"
self.hdf_filename = hdf_filename
self.dfResults = None
# status
self._hdf_open_for_reading = False
# List of all parameter combinations generated when exploration starts
self.explore_params_list = None
# Dict with runId as keys and explored parameter dict as value.
# Will be filled when exploration starts.
self.run_params_dict = {}
# Dict with runId as keys and explored parameter dict as value.
# Will be filled when calling `load_results`.
self.params = {}
# Ray configuration
self.num_gpus = num_gpus
self.num_cpus = num_cpus
close_hdf(self)
Close a previously opened HDF file.
Source code in mopet/mopet.py
def close_hdf(self):
"""Close a previously opened HDF file."""
self.h5file.close()
self._hdf_open_for_reading = False
logging.info(f"{self.hdf_filename} closed.")
get_run(self, run_id=None, run_name=None, filename=None, exploration_name=None)
Get a single result from a previous exploration. This function
will load a single result from the HDF file. Use this function
if you want to avoid loading all results to memory, which you can
do using .load_results(arrays=True)
.
Note: This function will open the HDF for reading but will not close
it afterwards! This is to speed up many sequential loads but it also
means that you have to close the HDF file yourself. You can do this
by using .close_hdf()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id |
int, optional |
Unique id of the run. Has to be given if run_name is not given, defaults to None |
None |
run_name |
str, optional |
The name of the run. Has to be given if run_id is not given, defaults to None |
None |
filename |
str, optional |
Filename of the HDF with previous exploration results. Previously used filename will be used if not given, defaults to None |
None |
exploration_name |
str, optional |
Name of the exploration to load data from. Previously used exploration_name will be used if not given, defaults to None |
None |
Returns:
Type | Description |
---|---|
dict |
Results of the run |
Source code in mopet/mopet.py
def get_run(self, run_id=None, run_name=None, filename=None, exploration_name=None):
"""Get a single result from a previous exploration. This function
will load a single result from the HDF file. Use this function
if you want to avoid loading all results to memory, which you can
do using `.load_results(arrays=True)`.
Note: This function will open the HDF for reading but will not close
it afterwards! This is to speed up many sequential loads but it also
means that you have to close the HDF file yourself. You can do this
by using `.close_hdf()`.
:param run_id: Unique id of the run. Has to be given if run_name is not given, defaults to None
:type run_id: int, optional
:param run_name: The name of the run. Has to be given if run_id is not given, defaults to None
:type run_name: str, optional
:param filename: Filename of the HDF with previous exploration results. Previously used filename will be used if not given, defaults to None
:type filename: str, optional
:param exploration_name: Name of the exploration to load data from. Previously used exploration_name will be used if not given, defaults to None
:type exploration_name: str, optional
:return: Results of the run
:rtype: dict
:raises: NoSuchExplorationError if hdf5 file does not contain `exploration_name` group.
"""
# get result by id or if not then by run_name (hdf_run)
assert run_id is not None or run_name is not None, "Either use `run_id` or `run_name`."
if exploration_name:
self.exploration_name = exploration_name
if run_id is not None:
run_name = self.RUN_PREFIX + str(run_id)
if not self._hdf_open_for_reading:
self._open_hdf(filename)
try:
run_results_group = self.h5file.get_node("/" + self.exploration_name, "runs")[run_name]
except NoSuchNodeError:
raise ExplorationNotFoundError(
"Exploration %s could not be found in HDF file %s".format(self.exploration_name, self.hdf_filename)
)
result = self._read_group_as_dict(run_results_group)
return result
load_results(self, filename=None, exploration_name=None, arrays=False, as_dict=False)
Load results from previous explorations. This function
will open an HDF file and look for an exploration. It will
create a Pandas Dataframe
object (accessible through the
attribute .df
) with a list of all runs and their parameters.
You can load the exploration results using following parameters:
- If
arrays==False
, all scalar results from the exploration will be added to the Dataframe (default). - If
arrays==True
, then all results, including (larger) numpy arrays will be loaded. This can take up a lot of RAM since all results will be available. Only use this option if you know that you have enough memory. Otherwise, you might want to skip this and load results separately using the method.get_run()
. - If
as_dict==True
, all results will be loaded and saved to the attribute.results
regardless of their type. Will use even more memory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str, optional |
Filename of HDF file, uses default filename or previously used filename if not given, defaults to None |
None |
exploration_name |
str, optional |
Name of the exploration, same as the group names of the explorations in the HDF file, defaults to None |
None |
arrays |
bool, optional |
Aggregate all results, including arrays into the results Dataframe, defaults to False |
False |
as_dict |
bool, optional |
Load all results into a dictionary available as the attribute |
False |
Exceptions:
Type | Description |
---|---|
Hdf5FileNotExistsError |
if file with |
Source code in mopet/mopet.py
def load_results(self, filename=None, exploration_name=None, arrays=False, as_dict=False):
"""Load results from previous explorations. This function
will open an HDF file and look for an exploration. It will
create a Pandas `Dataframe` object (accessible through the
attribute `.df`) with a list of all runs and their parameters.
You can load the exploration results using following parameters:
- If `arrays==False`, all scalar results from the exploration
will be added to the Dataframe (default).
- If `arrays==True`, then all results, including (larger) numpy arrays
will be loaded. This can take
up a lot of RAM since all results will be available. Only
use this option if you know that you have enough memory. Otherwise,
you might want to skip this and load results separately using the
method `.get_run()`.
- If `as_dict==True`, all results will be loaded and saved to the attribute
`.results` regardless of their type. Will use even more memory.
:param filename: Filename of HDF file, uses default filename or previously used filename if not given, defaults to None
:type filename: str, optional
:param exploration_name: Name of the exploration, same as the group names of the explorations in the HDF file, defaults to None
:type exploration_name: str, optional
:param arrays: Aggregate all results, including arrays into the results Dataframe, defaults to False
:type arrays: bool, optional
:param as_dict: Load all results into a dictionary available as the attribute `.results`. Can use a lot of RAM, defaults to False
:type as_dict: bool, optional
:raises Hdf5FileNotExistsError: if file with `filename` does not exist.
"""
if exploration_name is None:
exploration_name = self.exploration_name
else:
self.exploration_name = exploration_name
self._open_hdf(filename=filename)
self._load_all_results(exploration_name, as_dict=as_dict)
self._create_df()
self._aggregate_results(exploration_name, arrays=arrays)
self.close_hdf()
run(self)
Start parameter exploration.
TODO: Pass kwargs in run() to the exploration function
Exceptions:
Type | Description |
---|---|
ExplorationExistsError |
if exploration with same name already exists in HDF5 file. |
Source code in mopet/mopet.py
def run(self):
"""Start parameter exploration.
TODO: Pass kwargs in run() to the exploration function
:raises ExplorationExistsError: if exploration with same name already exists in HDF5 file.
"""
# Initialize ray
self._init_ray(num_cpus=self.num_cpus, num_gpus=self.num_gpus)
# Create a list of all combinations of parameters from explore_params
self.explore_params_list = self._cartesian_product_dict(self.explore_params)
# Initialize hdf storage
self._pre_storage_routine()
# -----------------------------
# Set up all simulations
# -----------------------------
# remember the time
start_time = time.time()
# a unique id for each run
run_id = 0
# contains ray objects of each run
ray_returns = {}
# contains all exploration parameters of each run
self.run_params_dict = {}
logging.info(f"Starting {len(self.explore_params_list)} jobs.")
# cycle through all parameter combinations
for update_params in tqdm.tqdm(self.explore_params_list):
if self.full_params and self.default_params is not None:
# load the default parameters
run_params = copy.deepcopy(self.default_params)
# and update them with the explored parameters
run_params.update(update_params)
else:
run_params = copy.deepcopy(update_params)
# start all ray jobs and remember the ray object
# pylint: disable=no-member
ray_returns[run_id] = _ray_remote.remote(self.function, run_params)
# store this runs explore parameters
self.run_params_dict[run_id] = copy.deepcopy(update_params)
# increment the run id
run_id += 1
# stop measuring time
end_time = time.time() - start_time
logging.info(f"Runs took {end_time} s to submit.")
# -----------------------------
# Reduce and store all results
# -----------------------------
# remember the time
start_time = time.time()
# cycle through all returned ray objects
for run_id, ray_return in tqdm.tqdm(ray_returns.items()):
# get the appropriate parameters for this run
run_param = self.run_params_dict[run_id]
# queue object for storage
self._store_result(run_id, ray_return, run_param)
# remove LOCAL_REFERENCE in form of ObjectId from ray's object store.
ray_returns[run_id] = None
# stop measuring time
end_time = time.time() - start_time
logging.info(f"Runs and storage took {end_time} s to complete.")
# tear down hdf storage
self._post_storage_routine()
self._shutdown_ray()