Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions bluesky_queueserver/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2742,16 +2742,25 @@ async def _history_get_handler(self, request):

async def _history_clear_handler(self, request):
"""
Remove all entries from the plan history
Remove some or all entries from the plan history
"""
logger.info("Clearing the plan execution history ...")
try:
supported_param_names = ["lock_key"]
supported_param_names = ["lock_key", "size"]
self._check_request_for_unsupported_params(request=request, param_names=supported_param_names)

self._validate_lock_key(request.get("lock_key", None), check_queue=True)

await self._plan_queue.clear_history()
size = request.get("size", None)
if size is not None:
if not isinstance(size, int):
raise ValueError("The 'size' parameter must be an integer.")
if size < 0:
raise ValueError("The 'size' paramter cannot be negative.")
await self._plan_queue.trim_history(new_size=size)
else:
await self._plan_queue.clear_history()

success, msg = True, ""
except Exception as ex:
success, msg = False, f"Error: {ex}"
Expand Down
18 changes: 18 additions & 0 deletions bluesky_queueserver/manager/plan_queue_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -1515,6 +1515,24 @@ async def clear_history(self):
async with self._lock:
await self._clear_history()

async def _trim_history(self, *, new_size=None):
"""
See ``self.trim_history()`` method.
"""
self._plan_history_uid = self.new_item_uid()
if new_size is None:
new_size = -1
await self._r_pool.ltrim(self._name_plan_history, 0, new_size - 1)

async def trim_history(self, *, new_size=None):
"""
Remove all entries from the plan queue older than the given index.
Does not touch the running item. The item (plan) may be pushed back
into the queue if it is stopped.
"""
async with self._lock:
await self._trim_history(new_size=new_size)

# ----------------------------------------------------------------------
# Standard item operations during queue execution

Expand Down
50 changes: 50 additions & 0 deletions bluesky_queueserver/manager/tests/test_plan_queue_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -1682,6 +1682,56 @@ async def testing():
asyncio.run(testing())


def test_trim_history_functions():
"""
Test for ``PlanQueueOperations._trim_history()`` method.
"""

async def testing():
async with PQ() as pq:
assert await pq.get_history_size() == 0

plans = [{"name": "a"}, {"name": "b"}, {"name": "c"}, {"name": "d"}]
ph_uid = pq.plan_history_uid
for plan in plans:
await pq._add_to_history(plan)
assert await pq.get_history_size() == 4
assert pq.plan_history_uid != ph_uid

ph_uid = pq.plan_history_uid
plan_history, plan_history_uid_1 = await pq.get_history()
assert pq.plan_history_uid == plan_history_uid_1
assert pq.plan_history_uid == ph_uid

assert len(plan_history) == 4
assert plan_history == plans

ph_uid = pq.plan_history_uid
await pq.trim_history(new_size=2)
assert pq.plan_history_uid != ph_uid

plan_history, _ = await pq.get_history()
assert len(plan_history) == 2
assert plan_history == [{"name": "a"}, {"name": "b"}]

ph_uid = pq.plan_history_uid
await pq.trim_history()
assert pq.plan_history_uid != ph_uid

plan_history, _ = await pq.get_history()
assert len(plan_history) == 1
assert plan_history == [{"name": "a"}]

ph_uid = pq.plan_history_uid
await pq.trim_history()
assert pq.plan_history_uid != ph_uid

plan_history, _ = await pq.get_history()
assert len(plan_history) == 0

asyncio.run(testing())


@pytest.mark.parametrize("immediate_execution", [False, True])
@pytest.mark.parametrize("func", ["process_next_item", "set_next_item_as_running"])
@pytest.mark.parametrize("loop_mode", [False, True])
Expand Down
Loading