Skip to content

Commit e2ec1cd

Browse files
authored
Merge pull request #2258 from danielaskdd/pipeline-cancelllation
Feat: Add Pipeline Cancellation Feature with Enhanced Reliability
2 parents 6a29b5d + 3eb3a07 commit e2ec1cd

File tree

15 files changed

+736
-220
lines changed

15 files changed

+736
-220
lines changed

lightrag/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from .lightrag import LightRAG as LightRAG, QueryParam as QueryParam
22

3-
__version__ = "1.4.9.4"
3+
__version__ = "1.4.9.5"
44
__author__ = "Zirui Guo"
55
__url__ = "https://github.com/HKUDS/LightRAG"

lightrag/api/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__api_version__ = "0244"
1+
__api_version__ = "0245"

lightrag/api/routers/document_routes.py

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,28 @@ class Config:
161161
}
162162

163163

164+
class CancelPipelineResponse(BaseModel):
165+
"""Response model for pipeline cancellation operation
166+
167+
Attributes:
168+
status: Status of the cancellation request
169+
message: Message describing the operation result
170+
"""
171+
172+
status: Literal["cancellation_requested", "not_busy"] = Field(
173+
description="Status of the cancellation request"
174+
)
175+
message: str = Field(description="Human-readable message describing the operation")
176+
177+
class Config:
178+
json_schema_extra = {
179+
"example": {
180+
"status": "cancellation_requested",
181+
"message": "Pipeline cancellation has been requested. Documents will be marked as FAILED.",
182+
}
183+
}
184+
185+
164186
class InsertTextRequest(BaseModel):
165187
"""Request model for inserting a single text document
166188
@@ -1534,7 +1556,19 @@ async def background_delete_documents(
15341556
try:
15351557
# Loop through each document ID and delete them one by one
15361558
for i, doc_id in enumerate(doc_ids, 1):
1559+
# Check for cancellation at the start of each document deletion
15371560
async with pipeline_status_lock:
1561+
if pipeline_status.get("cancellation_requested", False):
1562+
cancel_msg = f"Deletion cancelled by user at document {i}/{total_docs}. {len(successful_deletions)} deleted, {total_docs - i + 1} remaining."
1563+
logger.info(cancel_msg)
1564+
pipeline_status["latest_message"] = cancel_msg
1565+
pipeline_status["history_messages"].append(cancel_msg)
1566+
# Add remaining documents to failed list with cancellation reason
1567+
failed_deletions.extend(
1568+
doc_ids[i - 1 :]
1569+
) # i-1 because enumerate starts at 1
1570+
break # Exit the loop, remaining documents unchanged
1571+
15381572
start_msg = f"Deleting document {i}/{total_docs}: {doc_id}"
15391573
logger.info(start_msg)
15401574
pipeline_status["cur_batch"] = i
@@ -1697,6 +1731,10 @@ async def background_delete_documents(
16971731
# Final summary and check for pending requests
16981732
async with pipeline_status_lock:
16991733
pipeline_status["busy"] = False
1734+
pipeline_status["pending_requests"] = False # Reset pending requests flag
1735+
pipeline_status["cancellation_requested"] = (
1736+
False # Always reset cancellation flag
1737+
)
17001738
completion_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed"
17011739
pipeline_status["latest_message"] = completion_msg
17021740
pipeline_status["history_messages"].append(completion_msg)
@@ -2230,7 +2268,7 @@ async def get_pipeline_status() -> PipelineStatusResponse:
22302268
logger.error(traceback.format_exc())
22312269
raise HTTPException(status_code=500, detail=str(e))
22322270

2233-
# TODO: Deprecated
2271+
# TODO: Deprecated, use /documents/paginated instead
22342272
@router.get(
22352273
"", response_model=DocsStatusesResponse, dependencies=[Depends(combined_auth)]
22362274
)
@@ -2754,4 +2792,63 @@ async def reprocess_failed_documents(background_tasks: BackgroundTasks):
27542792
logger.error(traceback.format_exc())
27552793
raise HTTPException(status_code=500, detail=str(e))
27562794

2795+
@router.post(
2796+
"/cancel_pipeline",
2797+
response_model=CancelPipelineResponse,
2798+
dependencies=[Depends(combined_auth)],
2799+
)
2800+
async def cancel_pipeline():
2801+
"""
2802+
Request cancellation of the currently running pipeline.
2803+
2804+
This endpoint sets a cancellation flag in the pipeline status. The pipeline will:
2805+
1. Check this flag at key processing points
2806+
2. Stop processing new documents
2807+
3. Cancel all running document processing tasks
2808+
4. Mark all PROCESSING documents as FAILED with reason "User cancelled"
2809+
2810+
The cancellation is graceful and ensures data consistency. Documents that have
2811+
completed processing will remain in PROCESSED status.
2812+
2813+
Returns:
2814+
CancelPipelineResponse: Response with status and message
2815+
- status="cancellation_requested": Cancellation flag has been set
2816+
- status="not_busy": Pipeline is not currently running
2817+
2818+
Raises:
2819+
HTTPException: If an error occurs while setting cancellation flag (500).
2820+
"""
2821+
try:
2822+
from lightrag.kg.shared_storage import (
2823+
get_namespace_data,
2824+
get_pipeline_status_lock,
2825+
)
2826+
2827+
pipeline_status = await get_namespace_data("pipeline_status")
2828+
pipeline_status_lock = get_pipeline_status_lock()
2829+
2830+
async with pipeline_status_lock:
2831+
if not pipeline_status.get("busy", False):
2832+
return CancelPipelineResponse(
2833+
status="not_busy",
2834+
message="Pipeline is not currently running. No cancellation needed.",
2835+
)
2836+
2837+
# Set cancellation flag
2838+
pipeline_status["cancellation_requested"] = True
2839+
cancel_msg = "Pipeline cancellation requested by user"
2840+
logger.info(cancel_msg)
2841+
pipeline_status["latest_message"] = cancel_msg
2842+
pipeline_status["history_messages"].append(cancel_msg)
2843+
2844+
return CancelPipelineResponse(
2845+
status="cancellation_requested",
2846+
message="Pipeline cancellation has been requested. Documents will be marked as FAILED.",
2847+
)
2848+
2849+
except Exception as e:
2850+
logger.error(f"Error requesting pipeline cancellation: {str(e)}")
2851+
logger.error(traceback.format_exc())
2852+
raise HTTPException(status_code=500, detail=str(e))
2853+
27572854
return router

lightrag/exceptions.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,3 +96,11 @@ def __init__(self, namespace: str = ""):
9696
f" await initialize_pipeline_status()"
9797
)
9898
super().__init__(msg)
99+
100+
101+
class PipelineCancelledException(Exception):
102+
"""Raised when pipeline processing is cancelled by user request."""
103+
104+
def __init__(self, message: str = "User cancelled"):
105+
super().__init__(message)
106+
self.message = message

0 commit comments

Comments
 (0)