Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions bluesky_queueserver/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2746,18 +2746,23 @@ async def _history_clear_handler(self, request):
"""
logger.info("Clearing the plan execution history ...")
try:
supported_param_names = ["lock_key", "size"]
supported_param_names = ["lock_key", "size", "item_uid"]
self._check_request_for_unsupported_params(request=request, param_names=supported_param_names)

self._validate_lock_key(request.get("lock_key", None), check_queue=True)

size = request.get("size", None)
if size is not None:
item_uid = request.get("item_uid", None)
if size is not None and item_uid is not None:
raise ValueError("Parameters 'size' and 'item_uid' are mutually exclusive.")
elif size is not None:
if not isinstance(size, int):
raise ValueError("The 'size' parameter must be an integer.")
if size < 0:
raise ValueError("The 'size' paramter cannot be negative.")
raise ValueError(f"The 'size' parameter must be an integer: size={size!r}")
await self._plan_queue.trim_history(new_size=size)
elif item_uid is not None:
if not isinstance(item_uid, str):
raise ValueError(f"The 'item_uid' parameter must be a string: item_uid={item_uid!r}")
await self._plan_queue.trim_history(item_uid=item_uid)
else:
await self._plan_queue.clear_history()

Expand Down
41 changes: 33 additions & 8 deletions bluesky_queueserver/manager/plan_queue_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -1515,23 +1515,48 @@ async def clear_history(self):
async with self._lock:
await self._clear_history()

async def _trim_history(self, *, new_size=None):
async def _trim_history(self, *, new_size=None, item_uid=None):
"""
See ``self.trim_history()`` method.
"""
self._plan_history_uid = self.new_item_uid()
if new_size is None:
new_size = -1
await self._r_pool.ltrim(self._name_plan_history, 0, new_size - 1)

async def trim_history(self, *, new_size=None):
if new_size is None and item_uid is None:
raise ValueError("At least one of the parameters 'new_size' or 'item_uid' must be specified.")
elif new_size is not None and item_uid is not None:
raise ValueError("The parameters 'new_size' and 'item_uid' are mutually exclusive.")

update_uid = False
if new_size is not None:
history_size = await self._get_history_size()
if new_size < history_size:
new_size = max(min(new_size, history_size), 0)
n_delete = history_size - new_size
update_uid = True
await self._r_pool.ltrim(self._name_plan_history, n_delete, -1)
else: # item_uid is not None
# The following code is not efficient if the history is huge, but it should work fine for now.
history, _ = await self._get_history()

n_delete = None
for i, item in enumerate(history):
if item["item_uid"] == item_uid:
n_delete = i
break

if n_delete is not None:
update_uid = True
await self._r_pool.ltrim(self._name_plan_history, n_delete + 1, -1)

if update_uid:
self._plan_history_uid = self.new_item_uid()

async def trim_history(self, *, new_size=None, item_uid=None):
"""
Remove all entries from the plan queue older than the given index.
Does not touch the running item. The item (plan) may be pushed back
into the queue if it is stopped.
"""
async with self._lock:
await self._trim_history(new_size=new_size)
await self._trim_history(new_size=new_size, item_uid=item_uid)

# ----------------------------------------------------------------------
# Standard item operations during queue execution
Expand Down
100 changes: 80 additions & 20 deletions bluesky_queueserver/manager/tests/test_plan_queue_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -1682,7 +1682,23 @@ async def testing():
asyncio.run(testing())


def test_trim_history_functions():
# fmt: off
@pytest.mark.parametrize("new_size, item_uid, exp_size, new_items", [
(-1, None, 0, []),
(0, None, 0, []),
(1, None, 1, [{"item_uid": "d"}]),
(2, None, 2, [{"item_uid": "c"}, {"item_uid": "d"}]),
(3, None, 3, [{"item_uid": "b"}, {"item_uid": "c"}, {"item_uid": "d"}]),
(4, None, 4, [{"item_uid": "a"}, {"item_uid": "b"}, {"item_uid": "c"}, {"item_uid": "d"}]),
(5, None, 4, [{"item_uid": "a"}, {"item_uid": "b"}, {"item_uid": "c"}, {"item_uid": "d"}]),
(None, "d", 0, []),
(None, "c", 1, [{"item_uid": "d"}]),
(None, "b", 2, [{"item_uid": "c"}, {"item_uid": "d"}]),
(None, "a", 3, [{"item_uid": "b"}, {"item_uid": "c"}, {"item_uid": "d"}]),
(None, "?", 4, [{"item_uid": "a"}, {"item_uid": "b"}, {"item_uid": "c"}, {"item_uid": "d"}]),
])
# fmt: on
def test_trim_history_functions_1(new_size, item_uid, exp_size, new_items):
"""
Test for ``PlanQueueOperations._trim_history()`` method.
"""
Expand All @@ -1691,7 +1707,9 @@ async def testing():
async with PQ() as pq:
assert await pq.get_history_size() == 0

plans = [{"name": "a"}, {"name": "b"}, {"name": "c"}, {"name": "d"}]
plans = [{"item_uid": "a"}, {"item_uid": "b"}, {"item_uid": "c"}, {"item_uid": "d"}]
update_uid = exp_size != len(plans)

ph_uid = pq.plan_history_uid
for plan in plans:
await pq._add_to_history(plan)
Expand All @@ -1707,29 +1725,71 @@ async def testing():
assert plan_history == plans

ph_uid = pq.plan_history_uid
await pq.trim_history(new_size=2)
assert pq.plan_history_uid != ph_uid

plan_history, _ = await pq.get_history()
assert len(plan_history) == 2
assert plan_history == [{"name": "a"}, {"name": "b"}]

ph_uid = pq.plan_history_uid
await pq.trim_history()
assert pq.plan_history_uid != ph_uid
if item_uid is None:
await pq.trim_history(new_size=new_size)
else:
await pq.trim_history(item_uid=item_uid)
if update_uid:
assert pq.plan_history_uid != ph_uid
else:
assert pq.plan_history_uid == ph_uid

plan_history, _ = await pq.get_history()
assert len(plan_history) == 1
assert plan_history == [{"name": "a"}]
assert len(plan_history) == exp_size
assert plan_history == new_items

ph_uid = pq.plan_history_uid
await pq.trim_history()
assert pq.plan_history_uid != ph_uid
asyncio.run(testing())

plan_history, _ = await pq.get_history()
assert len(plan_history) == 0

asyncio.run(testing())
# # fmt: off
# @pytest.mark.parametrize("new_size, exp_size, new_items, success", [
# (-1, 0, []),
# (0, 0, []),
# (1, 1, [{"name": "d"}]),
# (2, 2, [{"name": "c"}, {"name": "d"}]),
# (3, 3, [{"name": "b"}, {"name": "c"}, {"name": "d"}]),
# (4, 4, [{"name": "a"}, {"name": "b"}, {"name": "c"}, {"name": "d"}]),
# (5, 4, [{"name": "a"}, {"name": "b"}, {"name": "c"}, {"name": "d"}]),
# ])
# # fmt: on
# def test_trim_history_functions_2(new_size, exp_size, new_items, success):
# """
# Test for ``PlanQueueOperations._trim_history()`` method.
# """

# async def testing():
# async with PQ() as pq:
# assert await pq.get_history_size() == 0

# plans = [{"name": "a"}, {"name": "b"}, {"name": "c"}, {"name": "d"}]
# update_uid = (new_size < len(plans))

# ph_uid = pq.plan_history_uid
# for plan in plans:
# await pq._add_to_history(plan)
# assert await pq.get_history_size() == 4
# assert pq.plan_history_uid != ph_uid

# ph_uid = pq.plan_history_uid
# plan_history, plan_history_uid_1 = await pq.get_history()
# assert pq.plan_history_uid == plan_history_uid_1
# assert pq.plan_history_uid == ph_uid

# assert len(plan_history) == 4
# assert plan_history == plans

# ph_uid = pq.plan_history_uid
# await pq.trim_history(new_size=new_size)
# if update_uid:
# assert pq.plan_history_uid != ph_uid
# else:
# assert pq.plan_history_uid == ph_uid

# plan_history, _ = await pq.get_history()
# assert len(plan_history) == exp_size
# assert plan_history == new_items

# asyncio.run(testing())


@pytest.mark.parametrize("immediate_execution", [False, True])
Expand Down
82 changes: 82 additions & 0 deletions bluesky_queueserver/manager/tests/test_zmq_api_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pprint
import re
import time as ttime
import uuid
from datetime import datetime

import msgpack
Expand Down Expand Up @@ -454,6 +455,87 @@ def test_zmq_api_environment_open_close_4(tmp_path, re_manager_cmd, startup_with
assert state["worker_environment_state"] == "closed"


# =======================================================================================
# Method 'history_clear'


# fmt: off
@pytest.mark.parametrize("params, n_expected, success, err_msg", [
({}, 0, True, ""),
({"size": -1}, 0, True, ""),
({"size": 0}, 0, True, ""),
({"size": 1}, 1, True, ""),
({"size": 3}, 3, True, ""),
({"size": 4}, 4, True, ""),
({"size": 5}, 4, True, ""),
({"item_uid": 0}, 3, True, ""),
({"item_uid": 1}, 2, True, ""),
({"item_uid": 2}, 1, True, ""),
({"item_uid": 3}, 0, True, ""),
({"item_uid": -1}, 4, True, ""), # Random UUID - nothing is deleted
({"size": "ab"}, 4, False, "Error: The 'size' parameter must be an integer: size='ab'"),
({"item_uid": 1.5}, 4, False, "Error: The 'item_uid' parameter must be a string: item_uid=1.5"),
({"size": 2, "item_uid": 2}, 4, False, "Error: Parameters 'size' and 'item_uid' are mutually exclusive."),
])
# fmt: on
def test_zmq_api_history_clear_1(re_manager, params, n_expected, success, err_msg): # noqa: F811
"""
Basic test for ``history_clear`` API.
"""
# Add 4 plans to queue
for n in range(4):
params1a = {"item": _plan1, "user": _user, "user_group": _user_group}
resp1a, _ = zmq_request("queue_item_add", params1a)
assert resp1a["success"] is True, f"resp={resp1a}"

resp2, _ = zmq_request("environment_open")
assert resp2["success"] is True
assert wait_for_condition(time=timeout_env_open, condition=condition_environment_created)

resp3, _ = zmq_request("queue_start")
assert resp3["success"] is True
assert wait_for_condition(time=30, condition=condition_manager_idle)

resp4, _ = zmq_request("status")
assert resp4["items_in_queue"] == 0
assert resp4["items_in_history"] == 4

history, _ = zmq_request("history_get")
h_items_1 = history["items"]
assert len(h_items_1) == 4, pprint.pformat(h_items_1)
h_uids_1 = [_["item_uid"] for _ in h_items_1]

# Replace index with UUID if integer is provided
if "item_uid" in params and isinstance(params["item_uid"], int):
n = params["item_uid"]
if n >= 0:
# Select UUID based on the index of the item in the history
params["item_uid"] = h_uids_1[params["item_uid"]]
else:
# Generate random UUID
params["item_uid"] = str(uuid.uuid4())

resp5, _ = zmq_request("history_clear", params=params)
history_clear_success = resp5["success"]
assert history_clear_success is success, f"params={pprint.pformat(params)}\n{pprint.pformat(resp5)}"
assert resp5["msg"] == err_msg

resp4, _ = zmq_request("status")
assert resp4["items_in_queue"] == 0
assert resp4["items_in_history"] == n_expected

history, _ = zmq_request("history_get")
h_items_2 = history["items"]
assert len(h_items_2) == n_expected, pprint.pformat(h_items_2)
if len(h_items_2) > 0:
h_uids_2 = [_["item_uid"] for _ in h_items_2]
assert h_uids_2 == h_uids_1[-n_expected:], pprint.pformat(h_uids_2)

resp6, _ = zmq_request("environment_close")
assert resp6["success"] is True, f"resp={resp6}"
assert wait_for_condition(time=5, condition=condition_environment_closed)


# =======================================================================================
# Method 'queue_item_add'

Expand Down
20 changes: 16 additions & 4 deletions docs/source/re_manager_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -657,11 +657,23 @@ Execution Immediate: no follow-up requests are required.
============ =========================================================================================
Method **'history_clear'**
------------ -----------------------------------------------------------------------------------------
Description Clear the contents of the plan history.
Description Clear the contents of the plan history. If the parameter **size** is specified, then
the history is trimmed to the desired size. If the parameter **item_uid** is specified,
then the item with matching UID and all older items are removed from the history.
The parameters **size** and **item_uid** are mutually exclusive.
------------ -----------------------------------------------------------------------------------------
Parameters **size**: *int* or *None* (optional)
The new size of the history. If the size is 0 or less, then the history is
cleared. If the size is greater than or equal to size of the queue, then the queue is
not modified and **plan_history_uid** remains unchanged. Otherwise the history is
trimmed to the desired size.

**item_uid**: *str* or *None* (optional)
If the history contains an item with **item_uid**, then the history is trimmed by
removing this item and all older items. If the item with **item_uid** is not found
then the history and **plan_history_uid** remain unchanged.

*The request always succeeds*.
------------ -----------------------------------------------------------------------------------------
Parameters **lock_key**: *str* (optional)
**lock_key**: *str* (optional)
Lock key. The API fails if **the queue** is locked and no valid key is submitted
with the request. See documentation on :ref:`method_lock` API for more details.
------------ -----------------------------------------------------------------------------------------
Expand Down
Loading