diff --git a/bluesky_queueserver/manager/manager.py b/bluesky_queueserver/manager/manager.py index 643d2d08..b1c811f8 100644 --- a/bluesky_queueserver/manager/manager.py +++ b/bluesky_queueserver/manager/manager.py @@ -2746,18 +2746,23 @@ async def _history_clear_handler(self, request): """ logger.info("Clearing the plan execution history ...") try: - supported_param_names = ["lock_key", "size"] + supported_param_names = ["lock_key", "size", "item_uid"] self._check_request_for_unsupported_params(request=request, param_names=supported_param_names) self._validate_lock_key(request.get("lock_key", None), check_queue=True) size = request.get("size", None) - if size is not None: + item_uid = request.get("item_uid", None) + if size is not None and item_uid is not None: + raise ValueError("Parameters 'size' and 'item_uid' are mutually exclusive.") + elif size is not None: if not isinstance(size, int): - raise ValueError("The 'size' parameter must be an integer.") - if size < 0: - raise ValueError("The 'size' paramter cannot be negative.") + raise ValueError(f"The 'size' parameter must be an integer: size={size!r}") await self._plan_queue.trim_history(new_size=size) + elif item_uid is not None: + if not isinstance(item_uid, str): + raise ValueError(f"The 'item_uid' parameter must be a string: item_uid={item_uid!r}") + await self._plan_queue.trim_history(item_uid=item_uid) else: await self._plan_queue.clear_history() diff --git a/bluesky_queueserver/manager/plan_queue_ops.py b/bluesky_queueserver/manager/plan_queue_ops.py index c4d73da3..ed410302 100644 --- a/bluesky_queueserver/manager/plan_queue_ops.py +++ b/bluesky_queueserver/manager/plan_queue_ops.py @@ -1515,23 +1515,48 @@ async def clear_history(self): async with self._lock: await self._clear_history() - async def _trim_history(self, *, new_size=None): + async def _trim_history(self, *, new_size=None, item_uid=None): """ See ``self.trim_history()`` method. """ - self._plan_history_uid = self.new_item_uid() - if new_size is None: - new_size = -1 - await self._r_pool.ltrim(self._name_plan_history, 0, new_size - 1) - - async def trim_history(self, *, new_size=None): + if new_size is None and item_uid is None: + raise ValueError("At least one of the parameters 'new_size' or 'item_uid' must be specified.") + elif new_size is not None and item_uid is not None: + raise ValueError("The parameters 'new_size' and 'item_uid' are mutually exclusive.") + + update_uid = False + if new_size is not None: + history_size = await self._get_history_size() + if new_size < history_size: + new_size = max(min(new_size, history_size), 0) + n_delete = history_size - new_size + update_uid = True + await self._r_pool.ltrim(self._name_plan_history, n_delete, -1) + else: # item_uid is not None + # The following code is not efficient if the history is huge, but it should work fine for now. + history, _ = await self._get_history() + + n_delete = None + for i, item in enumerate(history): + if item["item_uid"] == item_uid: + n_delete = i + break + + if n_delete is not None: + update_uid = True + await self._r_pool.ltrim(self._name_plan_history, n_delete + 1, -1) + + if update_uid: + self._plan_history_uid = self.new_item_uid() + + async def trim_history(self, *, new_size=None, item_uid=None): """ Remove all entries from the plan queue older than the given index. Does not touch the running item. The item (plan) may be pushed back into the queue if it is stopped. """ async with self._lock: - await self._trim_history(new_size=new_size) + await self._trim_history(new_size=new_size, item_uid=item_uid) # ---------------------------------------------------------------------- # Standard item operations during queue execution diff --git a/bluesky_queueserver/manager/tests/test_plan_queue_ops.py b/bluesky_queueserver/manager/tests/test_plan_queue_ops.py index 7031993d..ffac23d0 100644 --- a/bluesky_queueserver/manager/tests/test_plan_queue_ops.py +++ b/bluesky_queueserver/manager/tests/test_plan_queue_ops.py @@ -1682,7 +1682,23 @@ async def testing(): asyncio.run(testing()) -def test_trim_history_functions(): +# fmt: off +@pytest.mark.parametrize("new_size, item_uid, exp_size, new_items", [ + (-1, None, 0, []), + (0, None, 0, []), + (1, None, 1, [{"item_uid": "d"}]), + (2, None, 2, [{"item_uid": "c"}, {"item_uid": "d"}]), + (3, None, 3, [{"item_uid": "b"}, {"item_uid": "c"}, {"item_uid": "d"}]), + (4, None, 4, [{"item_uid": "a"}, {"item_uid": "b"}, {"item_uid": "c"}, {"item_uid": "d"}]), + (5, None, 4, [{"item_uid": "a"}, {"item_uid": "b"}, {"item_uid": "c"}, {"item_uid": "d"}]), + (None, "d", 0, []), + (None, "c", 1, [{"item_uid": "d"}]), + (None, "b", 2, [{"item_uid": "c"}, {"item_uid": "d"}]), + (None, "a", 3, [{"item_uid": "b"}, {"item_uid": "c"}, {"item_uid": "d"}]), + (None, "?", 4, [{"item_uid": "a"}, {"item_uid": "b"}, {"item_uid": "c"}, {"item_uid": "d"}]), +]) +# fmt: on +def test_trim_history_functions_1(new_size, item_uid, exp_size, new_items): """ Test for ``PlanQueueOperations._trim_history()`` method. """ @@ -1691,7 +1707,9 @@ async def testing(): async with PQ() as pq: assert await pq.get_history_size() == 0 - plans = [{"name": "a"}, {"name": "b"}, {"name": "c"}, {"name": "d"}] + plans = [{"item_uid": "a"}, {"item_uid": "b"}, {"item_uid": "c"}, {"item_uid": "d"}] + update_uid = exp_size != len(plans) + ph_uid = pq.plan_history_uid for plan in plans: await pq._add_to_history(plan) @@ -1707,29 +1725,71 @@ async def testing(): assert plan_history == plans ph_uid = pq.plan_history_uid - await pq.trim_history(new_size=2) - assert pq.plan_history_uid != ph_uid - - plan_history, _ = await pq.get_history() - assert len(plan_history) == 2 - assert plan_history == [{"name": "a"}, {"name": "b"}] - - ph_uid = pq.plan_history_uid - await pq.trim_history() - assert pq.plan_history_uid != ph_uid + if item_uid is None: + await pq.trim_history(new_size=new_size) + else: + await pq.trim_history(item_uid=item_uid) + if update_uid: + assert pq.plan_history_uid != ph_uid + else: + assert pq.plan_history_uid == ph_uid plan_history, _ = await pq.get_history() - assert len(plan_history) == 1 - assert plan_history == [{"name": "a"}] + assert len(plan_history) == exp_size + assert plan_history == new_items - ph_uid = pq.plan_history_uid - await pq.trim_history() - assert pq.plan_history_uid != ph_uid + asyncio.run(testing()) - plan_history, _ = await pq.get_history() - assert len(plan_history) == 0 - asyncio.run(testing()) +# # fmt: off +# @pytest.mark.parametrize("new_size, exp_size, new_items, success", [ +# (-1, 0, []), +# (0, 0, []), +# (1, 1, [{"name": "d"}]), +# (2, 2, [{"name": "c"}, {"name": "d"}]), +# (3, 3, [{"name": "b"}, {"name": "c"}, {"name": "d"}]), +# (4, 4, [{"name": "a"}, {"name": "b"}, {"name": "c"}, {"name": "d"}]), +# (5, 4, [{"name": "a"}, {"name": "b"}, {"name": "c"}, {"name": "d"}]), +# ]) +# # fmt: on +# def test_trim_history_functions_2(new_size, exp_size, new_items, success): +# """ +# Test for ``PlanQueueOperations._trim_history()`` method. +# """ + +# async def testing(): +# async with PQ() as pq: +# assert await pq.get_history_size() == 0 + +# plans = [{"name": "a"}, {"name": "b"}, {"name": "c"}, {"name": "d"}] +# update_uid = (new_size < len(plans)) + +# ph_uid = pq.plan_history_uid +# for plan in plans: +# await pq._add_to_history(plan) +# assert await pq.get_history_size() == 4 +# assert pq.plan_history_uid != ph_uid + +# ph_uid = pq.plan_history_uid +# plan_history, plan_history_uid_1 = await pq.get_history() +# assert pq.plan_history_uid == plan_history_uid_1 +# assert pq.plan_history_uid == ph_uid + +# assert len(plan_history) == 4 +# assert plan_history == plans + +# ph_uid = pq.plan_history_uid +# await pq.trim_history(new_size=new_size) +# if update_uid: +# assert pq.plan_history_uid != ph_uid +# else: +# assert pq.plan_history_uid == ph_uid + +# plan_history, _ = await pq.get_history() +# assert len(plan_history) == exp_size +# assert plan_history == new_items + +# asyncio.run(testing()) @pytest.mark.parametrize("immediate_execution", [False, True]) diff --git a/bluesky_queueserver/manager/tests/test_zmq_api_base.py b/bluesky_queueserver/manager/tests/test_zmq_api_base.py index 08068b02..3921b463 100644 --- a/bluesky_queueserver/manager/tests/test_zmq_api_base.py +++ b/bluesky_queueserver/manager/tests/test_zmq_api_base.py @@ -6,6 +6,7 @@ import pprint import re import time as ttime +import uuid from datetime import datetime import msgpack @@ -454,6 +455,87 @@ def test_zmq_api_environment_open_close_4(tmp_path, re_manager_cmd, startup_with assert state["worker_environment_state"] == "closed" +# ======================================================================================= +# Method 'history_clear' + + +# fmt: off +@pytest.mark.parametrize("params, n_expected, success, err_msg", [ + ({}, 0, True, ""), + ({"size": -1}, 0, True, ""), + ({"size": 0}, 0, True, ""), + ({"size": 1}, 1, True, ""), + ({"size": 3}, 3, True, ""), + ({"size": 4}, 4, True, ""), + ({"size": 5}, 4, True, ""), + ({"item_uid": 0}, 3, True, ""), + ({"item_uid": 1}, 2, True, ""), + ({"item_uid": 2}, 1, True, ""), + ({"item_uid": 3}, 0, True, ""), + ({"item_uid": -1}, 4, True, ""), # Random UUID - nothing is deleted + ({"size": "ab"}, 4, False, "Error: The 'size' parameter must be an integer: size='ab'"), + ({"item_uid": 1.5}, 4, False, "Error: The 'item_uid' parameter must be a string: item_uid=1.5"), + ({"size": 2, "item_uid": 2}, 4, False, "Error: Parameters 'size' and 'item_uid' are mutually exclusive."), +]) +# fmt: on +def test_zmq_api_history_clear_1(re_manager, params, n_expected, success, err_msg): # noqa: F811 + """ + Basic test for ``history_clear`` API. + """ + # Add 4 plans to queue + for n in range(4): + params1a = {"item": _plan1, "user": _user, "user_group": _user_group} + resp1a, _ = zmq_request("queue_item_add", params1a) + assert resp1a["success"] is True, f"resp={resp1a}" + + resp2, _ = zmq_request("environment_open") + assert resp2["success"] is True + assert wait_for_condition(time=timeout_env_open, condition=condition_environment_created) + + resp3, _ = zmq_request("queue_start") + assert resp3["success"] is True + assert wait_for_condition(time=30, condition=condition_manager_idle) + + resp4, _ = zmq_request("status") + assert resp4["items_in_queue"] == 0 + assert resp4["items_in_history"] == 4 + + history, _ = zmq_request("history_get") + h_items_1 = history["items"] + assert len(h_items_1) == 4, pprint.pformat(h_items_1) + h_uids_1 = [_["item_uid"] for _ in h_items_1] + + # Replace index with UUID if integer is provided + if "item_uid" in params and isinstance(params["item_uid"], int): + n = params["item_uid"] + if n >= 0: + # Select UUID based on the index of the item in the history + params["item_uid"] = h_uids_1[params["item_uid"]] + else: + # Generate random UUID + params["item_uid"] = str(uuid.uuid4()) + + resp5, _ = zmq_request("history_clear", params=params) + history_clear_success = resp5["success"] + assert history_clear_success is success, f"params={pprint.pformat(params)}\n{pprint.pformat(resp5)}" + assert resp5["msg"] == err_msg + + resp4, _ = zmq_request("status") + assert resp4["items_in_queue"] == 0 + assert resp4["items_in_history"] == n_expected + + history, _ = zmq_request("history_get") + h_items_2 = history["items"] + assert len(h_items_2) == n_expected, pprint.pformat(h_items_2) + if len(h_items_2) > 0: + h_uids_2 = [_["item_uid"] for _ in h_items_2] + assert h_uids_2 == h_uids_1[-n_expected:], pprint.pformat(h_uids_2) + + resp6, _ = zmq_request("environment_close") + assert resp6["success"] is True, f"resp={resp6}" + assert wait_for_condition(time=5, condition=condition_environment_closed) + + # ======================================================================================= # Method 'queue_item_add' diff --git a/docs/source/re_manager_api.rst b/docs/source/re_manager_api.rst index 7cfee55d..13d446ff 100644 --- a/docs/source/re_manager_api.rst +++ b/docs/source/re_manager_api.rst @@ -657,11 +657,23 @@ Execution Immediate: no follow-up requests are required. ============ ========================================================================================= Method **'history_clear'** ------------ ----------------------------------------------------------------------------------------- -Description Clear the contents of the plan history. +Description Clear the contents of the plan history. If the parameter **size** is specified, then + the history is trimmed to the desired size. If the parameter **item_uid** is specified, + then the item with matching UID and all older items are removed from the history. + The parameters **size** and **item_uid** are mutually exclusive. +------------ ----------------------------------------------------------------------------------------- +Parameters **size**: *int* or *None* (optional) + The new size of the history. If the size is 0 or less, then the history is + cleared. If the size is greater than or equal to size of the queue, then the queue is + not modified and **plan_history_uid** remains unchanged. Otherwise the history is + trimmed to the desired size. + + **item_uid**: *str* or *None* (optional) + If the history contains an item with **item_uid**, then the history is trimmed by + removing this item and all older items. If the item with **item_uid** is not found + then the history and **plan_history_uid** remain unchanged. - *The request always succeeds*. ------------- ----------------------------------------------------------------------------------------- -Parameters **lock_key**: *str* (optional) + **lock_key**: *str* (optional) Lock key. The API fails if **the queue** is locked and no valid key is submitted with the request. See documentation on :ref:`method_lock` API for more details. ------------ -----------------------------------------------------------------------------------------