Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
2f3620b
feat: Add workspace isolation support for pipeline status
Nov 13, 2025
5f15358
fix: Add default workspace support for backward compatibility
Nov 15, 2025
af3da52
Merge branch 'main' into feature/pipeline-workspace-isolation
danielaskdd Nov 16, 2025
de404cc
Refactor workspace handling to use default workspace and namespace locks
danielaskdd Nov 16, 2025
1915d25
Fix workspace isolation for pipeline status across all operations
danielaskdd Nov 16, 2025
501008c
Refactor namespace lock to support reusable async context manager
danielaskdd Nov 16, 2025
8290876
Fix missing function call parentheses in get_all_update_flags_status
danielaskdd Nov 16, 2025
91be4cc
Refactor storage classes to use namespace instead of final_namespace
danielaskdd Nov 16, 2025
83cf878
Fix NamespaceLock concurrent coroutine safety with ContextVar
danielaskdd Nov 16, 2025
602e144
Standardize empty workspace handling from "_" to "" across storage
danielaskdd Nov 16, 2025
5f80890
Fix pipeline status namespace check to handle root case
danielaskdd Nov 16, 2025
97199b5
Fix workspace filtering logic in get_all_update_flags_status
danielaskdd Nov 16, 2025
b8fab6c
Remove final_namespace attribute for in-memory storage and use namesp…
danielaskdd Nov 16, 2025
0913857
Fix NamespaceLock context variable timing to prevent lock bricking
danielaskdd Nov 16, 2025
b7edb13
Auto-initialize pipeline status in LightRAG.initialize_storages()
danielaskdd Nov 16, 2025
71edb73
Remove manual initialize_pipeline_status() calls across codebase
danielaskdd Nov 16, 2025
04041e7
test: Add comprehensive workspace isolation test suite for PR #2366
BukeLy Nov 17, 2025
eda2f37
test: Enhance workspace isolation test suite to 100% coverage
BukeLy Nov 17, 2025
47a5bf8
test: Add real integration and E2E tests for workspace isolation
BukeLy Nov 17, 2025
7940d21
Merge pull request #2367 from BukeLy/test-workspace-isolation
danielaskdd Nov 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,5 @@ download_models_hf.py

# Cline files
memory-bank
.claude/CLAUDE.md
.claude/
18 changes: 12 additions & 6 deletions env.example
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,8 @@ POSTGRES_USER=your_username
POSTGRES_PASSWORD='your_password'
POSTGRES_DATABASE=your_database
POSTGRES_MAX_CONNECTIONS=12
# POSTGRES_WORKSPACE=forced_workspace_name
### DB specific workspace should not be set, keep for compatible only
### POSTGRES_WORKSPACE=forced_workspace_name

### PostgreSQL Vector Storage Configuration
### Vector storage type: HNSW, IVFFlat
Expand Down Expand Up @@ -395,7 +396,8 @@ NEO4J_MAX_TRANSACTION_RETRY_TIME=30
NEO4J_MAX_CONNECTION_LIFETIME=300
NEO4J_LIVENESS_CHECK_TIMEOUT=30
NEO4J_KEEP_ALIVE=true
# NEO4J_WORKSPACE=forced_workspace_name
### DB specific workspace should not be set, keep for compatible only
### NEO4J_WORKSPACE=forced_workspace_name

### MongoDB Configuration
MONGO_URI=mongodb://root:root@localhost:27017/
Expand All @@ -409,27 +411,31 @@ MILVUS_DB_NAME=lightrag
# MILVUS_USER=root
# MILVUS_PASSWORD=your_password
# MILVUS_TOKEN=your_token
# MILVUS_WORKSPACE=forced_workspace_name
### DB specific workspace should not be set, keep for compatible only
### MILVUS_WORKSPACE=forced_workspace_name

### Qdrant
QDRANT_URL=http://localhost:6333
# QDRANT_API_KEY=your-api-key
# QDRANT_WORKSPACE=forced_workspace_name
### DB specific workspace should not be set, keep for compatible only
### QDRANT_WORKSPACE=forced_workspace_name

### Redis
REDIS_URI=redis://localhost:6379
REDIS_SOCKET_TIMEOUT=30
REDIS_CONNECT_TIMEOUT=10
REDIS_MAX_CONNECTIONS=100
REDIS_RETRY_ATTEMPTS=3
# REDIS_WORKSPACE=forced_workspace_name
### DB specific workspace should not be set, keep for compatible only
### REDIS_WORKSPACE=forced_workspace_name

### Memgraph Configuration
MEMGRAPH_URI=bolt://localhost:7687
MEMGRAPH_USERNAME=
MEMGRAPH_PASSWORD=
MEMGRAPH_DATABASE=memgraph
# MEMGRAPH_WORKSPACE=forced_workspace_name
### DB specific workspace should not be set, keep for compatible only
### MEMGRAPH_WORKSPACE=forced_workspace_name

############################
### Evaluation Configuration
Expand Down
33 changes: 30 additions & 3 deletions lightrag/api/lightrag_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
from lightrag.utils import logger, set_verbose_debug
from lightrag.kg.shared_storage import (
get_namespace_data,
get_default_workspace,
# set_default_workspace,
initialize_pipeline_status,
cleanup_keyed_lock,
finalize_share_data,
Expand Down Expand Up @@ -350,8 +352,9 @@ async def lifespan(app: FastAPI):

try:
# Initialize database connections
# set_default_workspace(rag.workspace) # comment this line to test auto default workspace setting in initialize_storages
await rag.initialize_storages()
await initialize_pipeline_status()
await initialize_pipeline_status() # with default workspace

# Data migration regardless of storage implementation
await rag.check_and_migrate_data()
Expand Down Expand Up @@ -452,6 +455,29 @@ def get_cors_origins():
# Create combined auth dependency for all endpoints
combined_auth = get_combined_auth_dependency(api_key)

def get_workspace_from_request(request: Request) -> str:
"""
Extract workspace from HTTP request header or use default.

This enables multi-workspace API support by checking the custom
'LIGHTRAG-WORKSPACE' header. If not present, falls back to the
server's default workspace configuration.

Args:
request: FastAPI Request object

Returns:
Workspace identifier (may be empty string for global namespace)
"""
# Check custom header first
workspace = request.headers.get("LIGHTRAG-WORKSPACE", "").strip()

# Fall back to server default if header not provided
if not workspace:
workspace = args.workspace

return workspace

# Create working directory if it doesn't exist
Path(args.working_dir).mkdir(parents=True, exist_ok=True)

Expand Down Expand Up @@ -1113,9 +1139,10 @@ async def login(form_data: OAuth2PasswordRequestForm = Depends()):
}

@app.get("/health", dependencies=[Depends(combined_auth)])
async def get_status():
async def get_status(request: Request):
"""Get current system status"""
try:
default_workspace = get_default_workspace()
pipeline_status = await get_namespace_data("pipeline_status")

if not auth_configured:
Expand Down Expand Up @@ -1147,7 +1174,7 @@ async def get_status():
"vector_storage": args.vector_storage,
"enable_llm_cache_for_extract": args.enable_llm_cache_for_extract,
"enable_llm_cache": args.enable_llm_cache,
"workspace": args.workspace,
"workspace": default_workspace,
"max_graph_nodes": args.max_graph_nodes,
# Rerank configuration
"enable_rerank": rerank_model_func is not None,
Expand Down
40 changes: 24 additions & 16 deletions lightrag/api/routers/document_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1641,11 +1641,11 @@ async def background_delete_documents(
"""Background task to delete multiple documents"""
from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
get_namespace_lock,
)

pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
pipeline_status_lock = get_namespace_lock("pipeline_status")

total_docs = len(doc_ids)
successful_deletions = []
Expand Down Expand Up @@ -2134,12 +2134,12 @@ async def clear_documents():
"""
from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
get_namespace_lock,
)

# Get pipeline status and lock
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
pipeline_status_lock = get_namespace_lock("pipeline_status")

# Check and set status with lock
async with pipeline_status_lock:
Expand Down Expand Up @@ -2330,10 +2330,12 @@ async def get_pipeline_status() -> PipelineStatusResponse:
try:
from lightrag.kg.shared_storage import (
get_namespace_data,
get_namespace_lock,
get_all_update_flags_status,
)

pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_namespace_lock("pipeline_status")

# Get update flags status for all namespaces
update_status = await get_all_update_flags_status()
Expand All @@ -2350,8 +2352,9 @@ async def get_pipeline_status() -> PipelineStatusResponse:
processed_flags.append(bool(flag))
processed_update_status[namespace] = processed_flags

# Convert to regular dict if it's a Manager.dict
status_dict = dict(pipeline_status)
async with pipeline_status_lock:
# Convert to regular dict if it's a Manager.dict
status_dict = dict(pipeline_status)

# Add processed update_status to the status dictionary
status_dict["update_status"] = processed_update_status
Expand Down Expand Up @@ -2538,17 +2541,22 @@ async def delete_document(
doc_ids = delete_request.doc_ids

try:
from lightrag.kg.shared_storage import get_namespace_data
from lightrag.kg.shared_storage import (
get_namespace_data,
get_namespace_lock,
)

pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_namespace_lock("pipeline_status")

# Check if pipeline is busy
if pipeline_status.get("busy", False):
return DeleteDocByIdResponse(
status="busy",
message="Cannot delete documents while pipeline is busy",
doc_id=", ".join(doc_ids),
)
# Check if pipeline is busy with proper lock
async with pipeline_status_lock:
if pipeline_status.get("busy", False):
return DeleteDocByIdResponse(
status="busy",
message="Cannot delete documents while pipeline is busy",
doc_id=", ".join(doc_ids),
)

# Add deletion task to background tasks
background_tasks.add_task(
Expand Down Expand Up @@ -2944,11 +2952,11 @@ async def cancel_pipeline():
try:
from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
get_namespace_lock,
)

pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
pipeline_status_lock = get_namespace_lock("pipeline_status")

async with pipeline_status_lock:
if not pipeline_status.get("busy", False):
Expand Down
18 changes: 13 additions & 5 deletions lightrag/kg/faiss_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from lightrag.base import BaseVectorStorage

from .shared_storage import (
get_storage_lock,
get_namespace_lock,
get_update_flag,
set_all_update_flags,
)
Expand Down Expand Up @@ -73,9 +73,13 @@ def __post_init__(self):
async def initialize(self):
"""Initialize storage data"""
# Get the update flag for cross-process update notification
self.storage_updated = await get_update_flag(self.final_namespace)
self.storage_updated = await get_update_flag(
self.final_namespace, workspace=self.workspace
)
# Get the storage lock for use in other methods
self._storage_lock = get_storage_lock()
self._storage_lock = get_namespace_lock(
self.final_namespace, workspace=self.workspace
)

async def _get_index(self):
"""Check if the shtorage should be reloaded"""
Expand Down Expand Up @@ -400,7 +404,9 @@ async def index_done_callback(self) -> None:
# Save data to disk
self._save_faiss_index()
# Notify other processes that data has been updated
await set_all_update_flags(self.final_namespace)
await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
# Reset own update flag to avoid self-reloading
self.storage_updated.value = False
except Exception as e:
Expand Down Expand Up @@ -527,7 +533,9 @@ async def drop(self) -> dict[str, str]:
self._load_faiss_index()

# Notify other processes
await set_all_update_flags(self.final_namespace)
await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
self.storage_updated.value = False

logger.info(
Expand Down
32 changes: 23 additions & 9 deletions lightrag/kg/json_doc_status_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from lightrag.exceptions import StorageNotInitializedError
from .shared_storage import (
get_namespace_data,
get_storage_lock,
get_namespace_lock,
get_data_init_lock,
get_update_flag,
set_all_update_flags,
Expand Down Expand Up @@ -50,12 +50,20 @@ def __post_init__(self):

async def initialize(self):
"""Initialize storage data"""
self._storage_lock = get_storage_lock()
self.storage_updated = await get_update_flag(self.final_namespace)
self._storage_lock = get_namespace_lock(
self.final_namespace, workspace=self.workspace
)
self.storage_updated = await get_update_flag(
self.final_namespace, workspace=self.workspace
)
async with get_data_init_lock():
# check need_init must before get_namespace_data
need_init = await try_initialize_namespace(self.final_namespace)
self._data = await get_namespace_data(self.final_namespace)
need_init = await try_initialize_namespace(
self.final_namespace, workspace=self.workspace
)
self._data = await get_namespace_data(
self.final_namespace, workspace=self.workspace
)
if need_init:
loaded_data = load_json(self._file_name) or {}
async with self._storage_lock:
Expand Down Expand Up @@ -175,7 +183,9 @@ async def index_done_callback(self) -> None:
self._data.clear()
self._data.update(cleaned_data)

await clear_all_update_flags(self.final_namespace)
await clear_all_update_flags(
self.final_namespace, workspace=self.workspace
)

async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
"""
Expand All @@ -196,7 +206,7 @@ async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
if "chunks_list" not in doc_data:
doc_data["chunks_list"] = []
self._data.update(data)
await set_all_update_flags(self.final_namespace)
await set_all_update_flags(self.final_namespace, workspace=self.workspace)

await self.index_done_callback()

Expand Down Expand Up @@ -350,7 +360,9 @@ async def delete(self, doc_ids: list[str]) -> None:
any_deleted = True

if any_deleted:
await set_all_update_flags(self.final_namespace)
await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)

async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]:
"""Get document by file path
Expand Down Expand Up @@ -389,7 +401,9 @@ async def drop(self) -> dict[str, str]:
try:
async with self._storage_lock:
self._data.clear()
await set_all_update_flags(self.final_namespace)
await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)

await self.index_done_callback()
logger.info(
Expand Down
Loading