Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow (auto) saving and loading a RunManager (e.g., for external monitoring) #156

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion adaptive_scheduler/_scheduler/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def __init__(
self.cores_per_node = cores_per_node
self.partition = partition
self.exclusive = exclusive
self.__extra_scheduler = extra_scheduler
self.__extra_scheduler = extra_scheduler # stores original input

msg = "Specify either `nodes` and `cores_per_node`, or only `cores`, not both."
if cores is None:
Expand Down
63 changes: 62 additions & 1 deletion adaptive_scheduler/_server_support/run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable

import cloudpickle
import pandas as pd

from adaptive_scheduler.utils import (
LOKY_START_METHODS,
GoalTypes,
_at_least_adaptive_version,
_time_between,
fname_to_learner,
fname_to_learner_fname,
load_dataframes,
load_parallel,
Expand Down Expand Up @@ -177,6 +179,7 @@
max_log_lines: int = 500,
max_fails_per_job: int = 50,
max_simultaneous_jobs: int = 100,
store_fname: str | Path | None = None,
) -> None:
super().__init__()

Expand All @@ -202,6 +205,7 @@
self.max_log_lines = max_log_lines
self.max_fails_per_job = max_fails_per_job
self.max_simultaneous_jobs = max_simultaneous_jobs
self.store_fname = store_fname
# Track job start times, (job_name, start_time) -> request_time
self._job_start_time_dict: dict[tuple[str, str], str] = {}

Expand Down Expand Up @@ -256,7 +260,7 @@
interval=self.job_manager_interval,
max_fails_per_job=self.max_fails_per_job,
max_simultaneous_jobs=self.max_simultaneous_jobs,
# Laucher command line options
# Launcher command line options
save_dataframe=self.save_dataframe,
dataframe_format=self.dataframe_format,
loky_start_method=self.loky_start_method,
Expand Down Expand Up @@ -301,6 +305,8 @@
self._start_one_by_one_task = start_one_by_one(wait_for, self)
else:
super().start()
if self.store_fname is not None:
self.save()

Check warning on line 309 in adaptive_scheduler/_server_support/run_manager.py

View check run for this annotation

Codecov / codecov/patch

adaptive_scheduler/_server_support/run_manager.py#L309

Added line #L309 was not covered by tests
return self

async def _manage(self) -> None:
Expand Down Expand Up @@ -467,6 +473,61 @@
raise ValueError(msg)
return load_dataframes(self.fnames, format=self.dataframe_format) # type: ignore[return-value]

def save(
self,
store_fname: str | Path | None = None,
*,
overwrite: bool = True,
) -> None:
"""Store the `RunManager` to a file.

Parameters
----------
store_fname : str or Path or None
The filename to store the `RunManager` to, if None, use the
`store_fname` attribute.
overwrite : bool, default: False
If True, overwrite the file if it already exists.

"""
if store_fname is None:
store_fname = self.store_fname

Check warning on line 494 in adaptive_scheduler/_server_support/run_manager.py

View check run for this annotation

Codecov / codecov/patch

adaptive_scheduler/_server_support/run_manager.py#L494

Added line #L494 was not covered by tests
if store_fname is None:
msg = "No `store_fname` given and no `store_fname` attribute is set."
raise ValueError(msg)

Check warning on line 497 in adaptive_scheduler/_server_support/run_manager.py

View check run for this annotation

Codecov / codecov/patch

adaptive_scheduler/_server_support/run_manager.py#L496-L497

Added lines #L496 - L497 were not covered by tests
store_fname = Path(store_fname)
keys = self.__dict__.keys() - {
"ioloop", # set in super().start()
"task", # set in super().start()
"learners", # we can load them from the filenames
# below are created in __init__
"job_names",
"database_manager",
"job_manager",
"kill_manager",
}
to_save = {k: self.__dict__[k] for k in keys if not k.startswith("_")}
if store_fname.exists() and not overwrite:
msg = f"{store_fname} already exists."
raise FileExistsError(msg)

Check warning on line 512 in adaptive_scheduler/_server_support/run_manager.py

View check run for this annotation

Codecov / codecov/patch

adaptive_scheduler/_server_support/run_manager.py#L511-L512

Added lines #L511 - L512 were not covered by tests
with store_fname.open("wb") as f:
cloudpickle.dump(to_save, f)

@classmethod
def load(cls: type[RunManager], store_fname: str | Path) -> RunManager:
"""Load a `RunManager` from a file."""
store_fname = Path(store_fname)
with store_fname.open("rb") as f:
to_load = cloudpickle.load(f)
to_load["learners"] = [fname_to_learner(fn) for fn in to_load["fnames"]]
to_load["overwrite_db"] = False
start_time = to_load.pop("start_time")
end_time = to_load.pop("end_time")
rm = cls(**to_load)
rm.start_time = start_time
rm.end_time = end_time
return rm


async def _wait_for_finished(
manager_first: RunManager,
Expand Down
Loading