From 153d21bc86e64054854e71f6cd0bad5b6c8551ba Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 19 Apr 2023 06:41:57 +0000 Subject: [PATCH 01/13] Allow to store and load a RunManager --- adaptive_scheduler/_server_support/run_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adaptive_scheduler/_server_support/run_manager.py b/adaptive_scheduler/_server_support/run_manager.py index 22833a01..f78a0f6f 100644 --- a/adaptive_scheduler/_server_support/run_manager.py +++ b/adaptive_scheduler/_server_support/run_manager.py @@ -256,7 +256,7 @@ def __init__( interval=self.job_manager_interval, max_fails_per_job=self.max_fails_per_job, max_simultaneous_jobs=self.max_simultaneous_jobs, - # Laucher command line options + # Launcher command line options save_dataframe=self.save_dataframe, dataframe_format=self.dataframe_format, loky_start_method=self.loky_start_method, From 754d36c07b4afa0e63836e2665e6da166a44173d Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Tue, 18 Apr 2023 23:51:06 -0700 Subject: [PATCH 02/13] add save and load --- .../_server_support/job_manager.py | 18 ++++----- .../_server_support/run_manager.py | 40 +++++++++++++++++++ 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/adaptive_scheduler/_server_support/job_manager.py b/adaptive_scheduler/_server_support/job_manager.py index f160c2a5..d8922bb3 100644 --- a/adaptive_scheduler/_server_support/job_manager.py +++ b/adaptive_scheduler/_server_support/job_manager.py @@ -201,17 +201,12 @@ async def _manage(self) -> None: "Too many jobs failed, your Python code probably has a bug." ) raise MaxRestartsReachedError(msg) # noqa: TRY301 - if await sleep_unless_task_is_done( - self.database_manager.task, # type: ignore[arg-type] - self.interval, - ): # if true, we are done - return except asyncio.CancelledError: log.info("task was cancelled because of a CancelledError") raise except MaxRestartsReachedError as e: log.exception( - "too many jobs have failed, cancelling the job manager", + "Too many jobs failed, cancelling the job manager", n_started=self.n_started, max_fails_per_job=self.max_fails_per_job, max_job_starts=self.max_job_starts, @@ -220,8 +215,9 @@ async def _manage(self) -> None: raise except Exception as e: # noqa: BLE001 log.exception("got exception when starting a job", exception=str(e)) - if await sleep_unless_task_is_done( - self.database_manager.task, # type: ignore[arg-type] - 5, - ): # if true, we are done - return + + if await sleep_unless_task_is_done( + self.database_manager.task, # type: ignore[arg-type] + self.interval, + ): # if true, we are done + return diff --git a/adaptive_scheduler/_server_support/run_manager.py b/adaptive_scheduler/_server_support/run_manager.py index f78a0f6f..1304ffea 100644 --- a/adaptive_scheduler/_server_support/run_manager.py +++ b/adaptive_scheduler/_server_support/run_manager.py @@ -7,6 +7,7 @@ from pathlib import Path from typing import TYPE_CHECKING, Any, Callable +import cloudpickle import pandas as pd from adaptive_scheduler.utils import ( @@ -14,6 +15,7 @@ GoalTypes, _at_least_adaptive_version, _time_between, + fname_to_learner, fname_to_learner_fname, load_dataframes, load_parallel, @@ -467,6 +469,44 @@ def load_dataframes(self) -> pd.DataFrame: raise ValueError(msg) return load_dataframes(self.fnames, format=self.dataframe_format) # type: ignore[return-value] + def save(self, store_fname: str | Path, *, overwrite: bool = True) -> None: + """Store the `RunManager` to a file. + + Parameters + ---------- + store_fname : str or Path + The filename to store the `RunManager` to. + overwrite : bool, default: False + If True, overwrite the file if it already exists. + + """ + keys = self.__dict__.keys() - { + "ioloop", # set in super().__init__() + "learners", # we can load them from the filenames + # below are set in __init__ + "database_manager", + "scheduler", + "job_manager", + "kill_manager", + } + to_save = {k: self.__dict__[k] for k in keys if not k.startswith("_")} + store_fname = Path(store_fname) + if store_fname.exists() and not overwrite: + msg = f"{store_fname} already exists." + raise FileExistsError(msg) + with store_fname.open("wb") as f: + cloudpickle.dump(to_save, f) + + @classmethod + def load(cls: type[RunManager], store_fname: str | Path) -> RunManager: + """Load a `RunManager` from a file.""" + store_fname = Path(store_fname) + with store_fname.open("rb") as f: + to_load = cloudpickle.load(f) + to_load["learners"] = [fname_to_learner(fn) for fn in to_load["fnames"]] + to_load["overwrite_db"] = False + return cls(**to_load) + async def _wait_for_finished( manager_first: RunManager, From 2f164120a8c18b901fe37bfe03fe8f6218fdc67a Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Tue, 18 Apr 2023 23:53:43 -0700 Subject: [PATCH 03/13] skip task --- adaptive_scheduler/_server_support/run_manager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/adaptive_scheduler/_server_support/run_manager.py b/adaptive_scheduler/_server_support/run_manager.py index 1304ffea..d6ba9829 100644 --- a/adaptive_scheduler/_server_support/run_manager.py +++ b/adaptive_scheduler/_server_support/run_manager.py @@ -481,7 +481,8 @@ def save(self, store_fname: str | Path, *, overwrite: bool = True) -> None: """ keys = self.__dict__.keys() - { - "ioloop", # set in super().__init__() + "ioloop", # set in super().start() + "task", # set in super().start() "learners", # we can load them from the filenames # below are set in __init__ "database_manager", From b7204c0820b2723664958135e13050b4c9d2aed4 Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Tue, 18 Apr 2023 23:59:20 -0700 Subject: [PATCH 04/13] fix --- .../_server_support/run_manager.py | 9 +++++++-- tests/test_run_manager.py | 20 +++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/adaptive_scheduler/_server_support/run_manager.py b/adaptive_scheduler/_server_support/run_manager.py index d6ba9829..cf5aa085 100644 --- a/adaptive_scheduler/_server_support/run_manager.py +++ b/adaptive_scheduler/_server_support/run_manager.py @@ -485,8 +485,8 @@ def save(self, store_fname: str | Path, *, overwrite: bool = True) -> None: "task", # set in super().start() "learners", # we can load them from the filenames # below are set in __init__ + "job_names", "database_manager", - "scheduler", "job_manager", "kill_manager", } @@ -506,7 +506,12 @@ def load(cls: type[RunManager], store_fname: str | Path) -> RunManager: to_load = cloudpickle.load(f) to_load["learners"] = [fname_to_learner(fn) for fn in to_load["fnames"]] to_load["overwrite_db"] = False - return cls(**to_load) + start_time = to_load.pop("start_time") + end_time = to_load.pop("end_time") + rm = cls(**to_load) + rm.start_time = start_time + rm.end_time = end_time + return rm async def _wait_for_finished( diff --git a/tests/test_run_manager.py b/tests/test_run_manager.py index c2e060f5..af8b4af5 100644 --- a/tests/test_run_manager.py +++ b/tests/test_run_manager.py @@ -71,6 +71,26 @@ def test_run_manager_cleanup( assert not rm.move_old_logs_to.exists() +@pytest.mark.asyncio() +async def test_run_manager_save_load( + mock_scheduler: MockScheduler, + learners: list[adaptive.Learner1D] + | list[adaptive.BalancingLearner] + | list[adaptive.SequenceLearner], + fnames: list[str] | list[Path], + tmp_path: Path, +) -> None: + """Test the cleanup method of RunManager.""" + with temporary_working_directory(tmp_path): + rm = RunManager(mock_scheduler, learners, fnames) + rm.start() + await asyncio.sleep(0.1) + fn = "run_manager.cloudpickle" + rm.save(fn) + rm2 = RunManager.load(fn) + assert rm2.fnames == rm.fnames + + def test_run_manager_parse_log_files( mock_scheduler: MockScheduler, learners: list[adaptive.Learner1D] From f5ee2d2742b8b446af020274eebbe03f53986113 Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 19 Apr 2023 00:06:16 -0700 Subject: [PATCH 05/13] auto store --- .../_server_support/run_manager.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/adaptive_scheduler/_server_support/run_manager.py b/adaptive_scheduler/_server_support/run_manager.py index cf5aa085..a2c75bae 100644 --- a/adaptive_scheduler/_server_support/run_manager.py +++ b/adaptive_scheduler/_server_support/run_manager.py @@ -179,6 +179,7 @@ def __init__( max_log_lines: int = 500, max_fails_per_job: int = 50, max_simultaneous_jobs: int = 100, + store_fname: str | Path | None = None, ) -> None: super().__init__() @@ -204,6 +205,7 @@ def __init__( self.max_log_lines = max_log_lines self.max_fails_per_job = max_fails_per_job self.max_simultaneous_jobs = max_simultaneous_jobs + self.store_fname = store_fname # Track job start times, (job_name, start_time) -> request_time self._job_start_time_dict: dict[tuple[str, str], str] = {} @@ -303,6 +305,8 @@ def start(self, wait_for: RunManager | None = None) -> RunManager: # type: igno self._start_one_by_one_task = start_one_by_one(wait_for, self) else: super().start() + if self.store_fname is not None: + self.start() return self async def _manage(self) -> None: @@ -469,17 +473,24 @@ def load_dataframes(self) -> pd.DataFrame: raise ValueError(msg) return load_dataframes(self.fnames, format=self.dataframe_format) # type: ignore[return-value] - def save(self, store_fname: str | Path, *, overwrite: bool = True) -> None: + def save(self, store_fname: str | Path | None, *, overwrite: bool = True) -> None: """Store the `RunManager` to a file. Parameters ---------- store_fname : str or Path - The filename to store the `RunManager` to. + The filename to store the `RunManager` to, if None, use the + `store_fname` attribute. overwrite : bool, default: False If True, overwrite the file if it already exists. """ + if store_fname is None: + store_fname = self.store_fname + if store_fname is None: + msg = "No `store_fname` given and no `store_fname` attribute is set." + raise ValueError(msg) + store_fname = Path(store_fname) keys = self.__dict__.keys() - { "ioloop", # set in super().start() "task", # set in super().start() @@ -491,7 +502,6 @@ def save(self, store_fname: str | Path, *, overwrite: bool = True) -> None: "kill_manager", } to_save = {k: self.__dict__[k] for k in keys if not k.startswith("_")} - store_fname = Path(store_fname) if store_fname.exists() and not overwrite: msg = f"{store_fname} already exists." raise FileExistsError(msg) From f421186120bf3644322112a947e40612eb10d538 Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 19 Apr 2023 00:07:43 -0700 Subject: [PATCH 06/13] rephrase --- adaptive_scheduler/_server_support/run_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adaptive_scheduler/_server_support/run_manager.py b/adaptive_scheduler/_server_support/run_manager.py index a2c75bae..0e00f492 100644 --- a/adaptive_scheduler/_server_support/run_manager.py +++ b/adaptive_scheduler/_server_support/run_manager.py @@ -495,7 +495,7 @@ def save(self, store_fname: str | Path | None, *, overwrite: bool = True) -> Non "ioloop", # set in super().start() "task", # set in super().start() "learners", # we can load them from the filenames - # below are set in __init__ + # below are created in __init__ "job_names", "database_manager", "job_manager", From 4eade189e483c72caf464555ff41616ff0b0f319 Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 19 Apr 2023 00:11:48 -0700 Subject: [PATCH 07/13] revert accidental change --- .../_server_support/job_manager.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/adaptive_scheduler/_server_support/job_manager.py b/adaptive_scheduler/_server_support/job_manager.py index d8922bb3..f160c2a5 100644 --- a/adaptive_scheduler/_server_support/job_manager.py +++ b/adaptive_scheduler/_server_support/job_manager.py @@ -201,12 +201,17 @@ async def _manage(self) -> None: "Too many jobs failed, your Python code probably has a bug." ) raise MaxRestartsReachedError(msg) # noqa: TRY301 + if await sleep_unless_task_is_done( + self.database_manager.task, # type: ignore[arg-type] + self.interval, + ): # if true, we are done + return except asyncio.CancelledError: log.info("task was cancelled because of a CancelledError") raise except MaxRestartsReachedError as e: log.exception( - "Too many jobs failed, cancelling the job manager", + "too many jobs have failed, cancelling the job manager", n_started=self.n_started, max_fails_per_job=self.max_fails_per_job, max_job_starts=self.max_job_starts, @@ -215,9 +220,8 @@ async def _manage(self) -> None: raise except Exception as e: # noqa: BLE001 log.exception("got exception when starting a job", exception=str(e)) - - if await sleep_unless_task_is_done( - self.database_manager.task, # type: ignore[arg-type] - self.interval, - ): # if true, we are done - return + if await sleep_unless_task_is_done( + self.database_manager.task, # type: ignore[arg-type] + 5, + ): # if true, we are done + return From 97652c3560b1d5332e64e8ea6612a571c7a1493d Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 19 Apr 2023 09:45:20 -0700 Subject: [PATCH 08/13] add comment --- adaptive_scheduler/_scheduler/slurm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adaptive_scheduler/_scheduler/slurm.py b/adaptive_scheduler/_scheduler/slurm.py index c2bfbfd0..5a6f7843 100644 --- a/adaptive_scheduler/_scheduler/slurm.py +++ b/adaptive_scheduler/_scheduler/slurm.py @@ -89,7 +89,7 @@ def __init__( self.cores_per_node = cores_per_node self.partition = partition self.exclusive = exclusive - self.__extra_scheduler = extra_scheduler + self.__extra_scheduler = extra_scheduler # stores original input msg = "Specify either `nodes` and `cores_per_node`, or only `cores`, not both." if cores is None: From 44e93f35c1028da3a432403774c62f8d62a9f7dd Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 19 Apr 2023 10:02:43 -0700 Subject: [PATCH 09/13] Put buttons and status widget side by side (#157) * Put buttons and status widget side by side * Renames --- adaptive_scheduler/widgets.py | 53 ++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/adaptive_scheduler/widgets.py b/adaptive_scheduler/widgets.py index 6e752928..ec05bb47 100644 --- a/adaptive_scheduler/widgets.py +++ b/adaptive_scheduler/widgets.py @@ -18,6 +18,7 @@ import ipywidgets as ipyw + from adaptive_scheduler.scheduler import BaseScheduler from adaptive_scheduler.server_support import RunManager from adaptive_scheduler.utils import FnamesTypes @@ -573,7 +574,7 @@ def on_click(_: Any) -> None: return (vbox, update_function) -def queue_widget(run_manager: RunManager) -> ipyw.VBox: +def queue_widget(scheduler: BaseScheduler) -> ipyw.VBox: """Create a widget that shows the current queue and allows to update it.""" import ipywidgets as ipyw @@ -584,7 +585,7 @@ def queue_widget(run_manager: RunManager) -> ipyw.VBox: ) def get_queue_df() -> pd.DataFrame: - queue = run_manager.scheduler.queue(me_only=me_only_checkbox.value) + queue = scheduler.queue(me_only=me_only_checkbox.value) return pd.DataFrame(queue).transpose() # Get both the VBox and the update_function from _create_widget @@ -611,37 +612,36 @@ def get_database_df() -> pd.DataFrame: return vbox -def _remove_widget(box: ipyw.VBox, widget_to_remove: ipyw.Widget) -> None: - box.children = tuple(child for child in box.children if child != widget_to_remove) - - def _toggle_widget( - box: ipyw.VBox, widget_key: str, widget_dict: dict[str, ipyw.Widget | str], - state_dict: dict[str, dict[str, Any]], + toggle_dict: dict[str, dict[str, Any]], ) -> Callable[[Any], None]: import ipywidgets as ipyw + from IPython.display import display def on_click(_: Any) -> None: - widget = state_dict[widget_key]["widget"] + widget = toggle_dict[widget_key]["widget"] if widget is None: - widget = state_dict[widget_key]["init_func"]() - state_dict[widget_key]["widget"] = widget + widget = toggle_dict[widget_key]["init_func"]() + toggle_dict[widget_key]["widget"] = widget button = widget_dict[widget_key] assert isinstance(button, ipyw.Button) - show_description = state_dict[widget_key]["show_description"] - hide_description = state_dict[widget_key]["hide_description"] - + show_description = toggle_dict[widget_key]["show_description"] + hide_description = toggle_dict[widget_key]["hide_description"] + output = toggle_dict[widget_key]["output"] if button.description == show_description: button.description = hide_description button.button_style = "warning" - box.children = (*box.children, widget) + with output: + output.clear_output() + display(widget) else: button.description = show_description button.button_style = "info" - _remove_widget(box, widget) + with output: + output.clear_output() return on_click @@ -752,32 +752,33 @@ def info(run_manager: RunManager) -> None: "show database": show_db_button, } - box = ipyw.VBox([]) - def update(_: Any) -> None: status.value = _info_html(run_manager) def load_learners(_: Any) -> None: run_manager.load_learners() - state_dict = { + toggle_dict = { "show logs": { "widget": None, "init_func": lambda: log_explorer(run_manager), "show_description": "show logs", "hide_description": "hide logs", + "output": ipyw.Output(), }, "show queue": { "widget": None, - "init_func": lambda: queue_widget(run_manager), + "init_func": lambda: queue_widget(run_manager.scheduler), "show_description": "show queue", "hide_description": "hide queue", + "output": ipyw.Output(), }, "show database": { "widget": None, "init_func": lambda: database_widget(run_manager), "show_description": "show database", "hide_description": "hide database", + "output": ipyw.Output(), }, } @@ -793,9 +794,9 @@ def _callable() -> None: return _callable widgets["update info"].on_click(update) - toggle_logs = _toggle_widget(box, "show logs", widgets, state_dict) - toggle_queue = _toggle_widget(box, "show queue", widgets, state_dict) - toggle_database = _toggle_widget(box, "show database", widgets, state_dict) + toggle_logs = _toggle_widget("show logs", widgets, toggle_dict) + toggle_queue = _toggle_widget("show queue", widgets, toggle_dict) + toggle_database = _toggle_widget("show database", widgets, toggle_dict) widgets["show logs"].on_click(toggle_logs) widgets["show queue"].on_click(toggle_queue) widgets["show database"].on_click(toggle_database) @@ -813,8 +814,10 @@ def _callable() -> None: # Cleanup button with confirm/deny option cleanup_callable = cleanup(include_old_logs=include_old_logs) _create_confirm_deny(cleanup_button, widgets, cleanup_callable, key="cleanup") - - box.children = (status, *tuple(widgets.values())) + buttons_box = ipyw.VBox(tuple(widgets.values())) + buttons_box.layout.margin = "0 0 0 100px" + top_box = ipyw.HBox((status, buttons_box)) + box = ipyw.VBox((top_box, *(v["output"] for v in toggle_dict.values()))) display(box) From bbd8e98e630a25019a53523c2c3880c4a9716a98 Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 19 Apr 2023 15:26:21 -0700 Subject: [PATCH 10/13] Add a results widget that shows result DataFrames (#158) * Add a results widget * add results button to info widget * disable scrolling * set output options * prevent double triggering * Fix check * reorder * simplify * Unify code * add basic test * improve color interpolation * fix numbers * improve color interp * add test * simplify and fix bug --- adaptive_scheduler/widgets.py | 234 +++++++++++++++++++++++++++------- tests/test_widgets.py | 35 +++++ 2 files changed, 222 insertions(+), 47 deletions(-) diff --git a/adaptive_scheduler/widgets.py b/adaptive_scheduler/widgets.py index ec05bb47..18080306 100644 --- a/adaptive_scheduler/widgets.py +++ b/adaptive_scheduler/widgets.py @@ -13,6 +13,8 @@ import numpy as np import pandas as pd +from adaptive_scheduler.utils import load_dataframes + if TYPE_CHECKING: from typing import Any, Callable @@ -20,7 +22,7 @@ from adaptive_scheduler.scheduler import BaseScheduler from adaptive_scheduler.server_support import RunManager - from adaptive_scheduler.utils import FnamesTypes + from adaptive_scheduler.utils import _DATAFRAME_FORMATS, FnamesTypes def _get_fnames(run_manager: RunManager, *, only_running: bool) -> list[Path]: @@ -228,33 +230,37 @@ def _tail( # noqa: PLR0913 update_button: ipyw.Button, only_running_checkbox: ipyw.Checkbox, only_failed_checkbox: ipyw.Checkbox, + sort_by_dropdown: ipyw.Dropdown, + contains_text: ipyw.Text, ) -> Callable[[Any], None]: tail_task = None ioloop = asyncio.get_running_loop() def on_click(_: Any) -> None: nonlocal tail_task - if tail_task is None: + tailing_log = tail_task is not None + + def update_ui_state(tailing: bool) -> None: # noqa: FBT001 + tail_button.description = "cancel tail log" if tailing else "tail log" + tail_button.button_style = "danger" if tailing else "info" + tail_button.icon = "window-close" if tailing else "refresh" + dropdown.disabled = tailing + only_running_checkbox.disabled = tailing + only_failed_checkbox.disabled = tailing + update_button.disabled = tailing + sort_by_dropdown.disabled = tailing + contains_text.disabled = tailing + + if not tailing_log: fname = dropdown.options[dropdown.index] tail_task = ioloop.create_task(_tail_log(fname, textarea)) - tail_button.description = "cancel tail log" - tail_button.button_style = "danger" - tail_button.icon = "window-close" - dropdown.disabled = True - update_button.disabled = True - only_running_checkbox.disabled = True - only_failed_checkbox.disabled = True else: - tail_button.description = "tail log" - tail_button.button_style = "info" - tail_button.icon = "refresh" - dropdown.disabled = False - only_running_checkbox.disabled = False - only_failed_checkbox.disabled = False - update_button.disabled = False + assert tail_task is not None tail_task.cancel() tail_task = None + update_ui_state(not tailing_log) + return on_click def _on_dropdown_change( @@ -329,6 +335,8 @@ def on_change(change: dict[str, Any]) -> None: update_button, only_running_checkbox, only_failed_checkbox, + sort_by_dropdown, + contains_text, ), ) vbox = ipyw.VBox( @@ -427,6 +435,35 @@ def flatten( ) +def _interp_red_green( + percent: float, + pct_red: int = 30, + pct_green: int = 10, +) -> tuple[int, int, int]: + if pct_green < pct_red: + if percent <= pct_green: + return 0, 255, 0 + if percent >= pct_red: + return 255, 0, 0 + else: + if percent >= pct_green: + return 0, 255, 0 + if percent <= pct_red: + return 255, 0, 0 + + # Interpolate between green and red + factor = (percent - pct_green) / (pct_red - pct_green) + red_level = int(255 * factor) + green_level = int(255 * (1 - factor)) + return red_level, green_level, 0 + + +def _create_html_tag(value: float, color: tuple[int, int, int]) -> str: + red_level, green_level, blue_level = color + hex_color = f"#{red_level:02x}{green_level:02x}{blue_level:02x}" + return f'{value:.2f}%' + + def _info_html(run_manager: RunManager) -> str: queue = run_manager.scheduler.queue(me_only=True) dbm = run_manager.database_manager @@ -482,20 +519,22 @@ def _table_row(i: int, key: str, value: Any) -> str: df = run_manager.parse_log_files() t_last = (pd.Timestamp.now() - df.timestamp.max()).seconds - overhead = df.mem_usage.mean() - red_level = max(0, min(int(255 * overhead / 100), 255)) - overhead_color = f"#{red_level:02x}{255 - red_level:02x}{0:02x}" - overhead_html_value = f'{overhead:.2f}%' - cpu = df.cpu_usage.mean() - red_level = max(0, min(int(255 * cpu / 100), 255)) - cpu_color = f"#{red_level:02x}{red_level:02x}{0:02x}" - cpu_html_value = f'{cpu:.2f}%' + cpu_html_value = _create_html_tag(cpu, _interp_red_green(cpu)) + + mem = df.mem_usage.mean() + mem_html_value = _create_html_tag(mem, _interp_red_green(mem, 80, 50)) + + overhead = df.overhead.mean() + overhead_html_value = _create_html_tag( + overhead, + _interp_red_green(overhead, 10, 30), + ) from_logs = [ ("# of points", df.npoints.sum()), ("mean CPU usage", cpu_html_value), - ("mean memory usage", f"{df.mem_usage.mean().round(1)} %"), + ("mean memory usage", mem_html_value), ("mean overhead", overhead_html_value), ("last log-entry", f"{t_last}s ago"), ] @@ -519,12 +558,12 @@ def _create_widget( data_provider: Callable[[], pd.DataFrame], update_button_text: str, *, - use_itables_checkbox: bool = False, + itables_checkbox_default: bool = False, additional_widgets: list[ipyw.Widget] | None = None, + extra_widget_config: dict | None = None, ) -> tuple[ipyw.VBox, Callable[[Any], None]]: import ipywidgets as ipyw from IPython.display import display - from itables import show def _update_data_df( itables_checkbox: ipyw.Checkbox, @@ -535,6 +574,9 @@ def on_click(_: Any) -> None: output_widget.clear_output() df = data_provider() if itables_checkbox.value: + from itables import show + + _set_itables_opts() show(df) else: with _display_all_dataframe_rows(): @@ -547,7 +589,7 @@ def on_click(_: Any) -> None: itables_checkbox = ipyw.Checkbox( description="Use itables (interactive)", indent=False, - value=use_itables_checkbox, + value=itables_checkbox_default, ) update_button = ipyw.Button( description=update_button_text, @@ -570,6 +612,11 @@ def on_click(_: Any) -> None: widget_list = [itables_checkbox, update_button, output_widget] if additional_widgets: widget_list = additional_widgets + widget_list + + if extra_widget_config: + for _key, config in extra_widget_config.items(): + widget_list.insert(config["position"], config["widget"]) + vbox = ipyw.VBox(widget_list, layout=ipyw.Layout(border="solid 2px gray")) return (vbox, update_function) @@ -612,6 +659,64 @@ def get_database_df() -> pd.DataFrame: return vbox +def _set_itables_opts() -> None: + import itables.options as opt + + opt.maxBytes = 262_144 + + +def results_widget( + fnames: list[str] | list[Path], + dataframe_format: _DATAFRAME_FORMATS, +) -> ipyw.VBox: + """Widget that loads and displays the results as `pandas.DataFrame`s.""" + import ipywidgets as ipyw + + def on_concat_checkbox_value_change(change: dict) -> None: + if change["name"] == "value": + dropdown.layout.visibility = "hidden" if change["new"] else "visible" + update_function(None) + + def get_results_df() -> pd.DataFrame: + selected_fname = dropdown.value + dfs = [selected_fname] if not concat_checkbox.value else fnames + df = load_dataframes(dfs, format=dataframe_format) + assert isinstance(df, pd.DataFrame) + + if len(df) > max_rows.value: + sample_indices = np.linspace(0, len(df) - 1, num=max_rows.value, dtype=int) + df = df.iloc[sample_indices] + + return df # type: ignore[return-value] + + # Create widgets + dropdown = ipyw.Dropdown(options=fnames) + concat_checkbox = ipyw.Checkbox(description="Concat all dataframes", indent=False) + max_rows = ipyw.IntText(value=300, description="Max rows") + + # Observe the value change in the 'concat_checkbox' + concat_checkbox.observe(on_concat_checkbox_value_change, names="value") + + extra_widget_config = { + "concat_checkbox": {"widget": concat_checkbox, "position": 1}, + "dropdown": {"widget": dropdown, "position": 0}, + "max_rows": {"widget": max_rows, "position": 4}, + } + + vbox, update_function = _create_widget( + get_results_df, + "Update results", + extra_widget_config=extra_widget_config, + ) + + # Add observers for the 'dropdown' and 'max_rows' widgets + dropdown.observe(update_function, names="value") + max_rows.observe(update_function, names="value") + + _add_title("adaptive_scheduler.widgets.results_widget", vbox) + return vbox + + def _toggle_widget( widget_key: str, widget_dict: dict[str, ipyw.Widget | str], @@ -628,8 +733,8 @@ def on_click(_: Any) -> None: button = widget_dict[widget_key] assert isinstance(button, ipyw.Button) - show_description = toggle_dict[widget_key]["show_description"] - hide_description = toggle_dict[widget_key]["hide_description"] + show_description = f"show {widget_key}" + hide_description = f"hide {widget_key}" output = toggle_dict[widget_key]["output"] if button.description == show_description: button.description = hide_description @@ -696,6 +801,8 @@ def info(run_manager: RunManager) -> None: import ipywidgets as ipyw from IPython.display import display + _disable_widgets_output_scrollbar() + status = ipyw.HTML(value=_info_html(run_manager)) layout = ipyw.Layout(width="200px") @@ -742,14 +849,21 @@ def info(run_manager: RunManager) -> None: button_style="info", icon="database", ) + show_results_button = ipyw.Button( + description="show results", + layout=layout, + button_style="info", + icon="table", + ) widgets = { "update info": update_info_button, "cancel": ipyw.HBox([cancel_button], layout=layout), "cleanup": ipyw.HBox([cleanup_button], layout=layout), "load learners": load_learners_button, - "show logs": show_logs_button, - "show queue": show_queue_button, - "show database": show_db_button, + "logs": show_logs_button, + "queue": show_queue_button, + "database": show_db_button, + "results": show_results_button, } def update(_: Any) -> None: @@ -759,25 +873,29 @@ def load_learners(_: Any) -> None: run_manager.load_learners() toggle_dict = { - "show logs": { + "logs": { "widget": None, "init_func": lambda: log_explorer(run_manager), - "show_description": "show logs", + "show_description": "logs", "hide_description": "hide logs", "output": ipyw.Output(), }, - "show queue": { + "queue": { "widget": None, "init_func": lambda: queue_widget(run_manager.scheduler), - "show_description": "show queue", - "hide_description": "hide queue", "output": ipyw.Output(), }, - "show database": { + "database": { "widget": None, "init_func": lambda: database_widget(run_manager), - "show_description": "show database", - "hide_description": "hide database", + "output": ipyw.Output(), + }, + "results": { + "widget": None, + "init_func": lambda: results_widget( + run_manager.fnames, + run_manager.dataframe_format, + ), "output": ipyw.Output(), }, } @@ -794,12 +912,14 @@ def _callable() -> None: return _callable widgets["update info"].on_click(update) - toggle_logs = _toggle_widget("show logs", widgets, toggle_dict) - toggle_queue = _toggle_widget("show queue", widgets, toggle_dict) - toggle_database = _toggle_widget("show database", widgets, toggle_dict) - widgets["show logs"].on_click(toggle_logs) - widgets["show queue"].on_click(toggle_queue) - widgets["show database"].on_click(toggle_database) + toggle_logs = _toggle_widget("logs", widgets, toggle_dict) + toggle_queue = _toggle_widget("queue", widgets, toggle_dict) + toggle_database = _toggle_widget("database", widgets, toggle_dict) + toggle_results = _toggle_widget("results", widgets, toggle_dict) + widgets["logs"].on_click(toggle_logs) + widgets["queue"].on_click(toggle_queue) + widgets["database"].on_click(toggle_database) + widgets["results"].on_click(toggle_results) widgets["load learners"].on_click(load_learners) # Cancel button with confirm/deny option @@ -833,3 +953,23 @@ def _display_all_dataframe_rows(max_colwidth: int = 50) -> Generator[None, None, finally: pd.set_option("display.max_rows", original_max_rows) pd.set_option("display.max_colwidth", original_max_colwidth) + + +def _disable_widgets_output_scrollbar() -> None: + import ipywidgets as ipyw + from IPython.display import display + + style = """ + + """ + display(ipyw.HTML(style)) diff --git a/tests/test_widgets.py b/tests/test_widgets.py index 77aa6f03..1b072f98 100644 --- a/tests/test_widgets.py +++ b/tests/test_widgets.py @@ -10,17 +10,21 @@ from typing import TYPE_CHECKING from unittest.mock import patch +import pandas as pd + from adaptive_scheduler.widgets import ( _bytes_to_human_readable, _failed_job_logs, _files_that_contain, _get_fnames, + _interp_red_green, _sort_fnames, _timedelta_to_human_readable, _total_size, info, log_explorer, queue_widget, + results_widget, ) if TYPE_CHECKING: @@ -286,3 +290,34 @@ def test_sort_fnames(tmp_path: Path) -> None: sorted_fnames = _sort_fnames("Alphabetical", run_manager, fnames) # type: ignore[arg-type] assert sorted_fnames == fnames # In this case, they should be the same + + +def test_results_widget(tmp_path: Path) -> None: + """Test the results_widget function.""" + # Create some sample data files in DataFrame pickle format + sample_data = [ + pd.DataFrame({"col1": range(10), "col2": range(10)}), + pd.DataFrame({"col1": range(10, 20), "col2": range(10, 20)}), + ] + + data_files = [] + for idx, data in enumerate(sample_data): + file_path = tmp_path / f"sample_data_{idx}.pickle" + data.to_pickle(file_path) + data_files.append(file_path) + + # Create the widget + widget = results_widget(data_files, "pickle") + + # Test the widget's configuration + assert widget is not None + + +def test_interp_red_green() -> None: + """Test _interp_red_green.""" + assert _interp_red_green(100, pct_red=10, pct_green=90) == (0, 255, 0) + assert _interp_red_green(0, pct_red=10, pct_green=90) == (255, 0, 0) + assert _interp_red_green(50, pct_red=10, pct_green=90) == (127, 127, 0) + assert _interp_red_green(50, pct_red=90, pct_green=10) == (127, 127, 0) + assert _interp_red_green(100, pct_green=10, pct_red=90) == (255, 0, 0) + assert _interp_red_green(0, pct_green=10, pct_red=90) == (0, 255, 0) From 0a1342a802f078415587f3765241becce0d033aa Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 19 Apr 2023 15:29:00 -0700 Subject: [PATCH 11/13] Fix the color of CPU usage (#159) --- adaptive_scheduler/widgets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adaptive_scheduler/widgets.py b/adaptive_scheduler/widgets.py index 18080306..bee64087 100644 --- a/adaptive_scheduler/widgets.py +++ b/adaptive_scheduler/widgets.py @@ -520,7 +520,7 @@ def _table_row(i: int, key: str, value: Any) -> str: t_last = (pd.Timestamp.now() - df.timestamp.max()).seconds cpu = df.cpu_usage.mean() - cpu_html_value = _create_html_tag(cpu, _interp_red_green(cpu)) + cpu_html_value = _create_html_tag(cpu, _interp_red_green(cpu, 50, 80)) mem = df.mem_usage.mean() mem_html_value = _create_html_tag(mem, _interp_red_green(mem, 80, 50)) From e5653a3f6790cb99f2500ad6a52a7725c49c3fb9 Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 19 Apr 2023 22:59:46 +0000 Subject: [PATCH 12/13] fix --- adaptive_scheduler/_server_support/run_manager.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/adaptive_scheduler/_server_support/run_manager.py b/adaptive_scheduler/_server_support/run_manager.py index 0e00f492..e352c036 100644 --- a/adaptive_scheduler/_server_support/run_manager.py +++ b/adaptive_scheduler/_server_support/run_manager.py @@ -306,7 +306,7 @@ def start(self, wait_for: RunManager | None = None) -> RunManager: # type: igno else: super().start() if self.store_fname is not None: - self.start() + self.save() return self async def _manage(self) -> None: @@ -473,12 +473,14 @@ def load_dataframes(self) -> pd.DataFrame: raise ValueError(msg) return load_dataframes(self.fnames, format=self.dataframe_format) # type: ignore[return-value] - def save(self, store_fname: str | Path | None, *, overwrite: bool = True) -> None: + def save( + self, store_fname: str | Path | None = None, *, overwrite: bool = True, + ) -> None: """Store the `RunManager` to a file. Parameters ---------- - store_fname : str or Path + store_fname : str or Path or None The filename to store the `RunManager` to, if None, use the `store_fname` attribute. overwrite : bool, default: False From 099861b80d6e32e9b249375da7124eb776462f81 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 19 Apr 2023 23:01:09 +0000 Subject: [PATCH 13/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- adaptive_scheduler/_server_support/run_manager.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/adaptive_scheduler/_server_support/run_manager.py b/adaptive_scheduler/_server_support/run_manager.py index e352c036..c3300866 100644 --- a/adaptive_scheduler/_server_support/run_manager.py +++ b/adaptive_scheduler/_server_support/run_manager.py @@ -474,7 +474,10 @@ def load_dataframes(self) -> pd.DataFrame: return load_dataframes(self.fnames, format=self.dataframe_format) # type: ignore[return-value] def save( - self, store_fname: str | Path | None = None, *, overwrite: bool = True, + self, + store_fname: str | Path | None = None, + *, + overwrite: bool = True, ) -> None: """Store the `RunManager` to a file.