Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 49 additions & 21 deletions lightrag/kg/memgraph_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ async def has_node(self, node_id: str) -> bool:
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
result = None
try:
workspace_label = self._get_workspace_label()
query = f"MATCH (n:`{workspace_label}` {{entity_id: $entity_id}}) RETURN count(n) > 0 AS node_exists"
Expand All @@ -146,7 +147,10 @@ async def has_node(self, node_id: str) -> bool:
logger.error(
f"[{self.workspace}] Error checking node existence for {node_id}: {str(e)}"
)
await result.consume() # Ensure the result is consumed even on error
if result is not None:
await (
result.consume()
) # Ensure the result is consumed even on error
raise

async def has_edge(self, source_node_id: str, target_node_id: str) -> bool:
Expand All @@ -170,6 +174,7 @@ async def has_edge(self, source_node_id: str, target_node_id: str) -> bool:
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
result = None
try:
workspace_label = self._get_workspace_label()
query = (
Expand All @@ -190,7 +195,10 @@ async def has_edge(self, source_node_id: str, target_node_id: str) -> bool:
logger.error(
f"[{self.workspace}] Error checking edge existence between {source_node_id} and {target_node_id}: {str(e)}"
)
await result.consume() # Ensure the result is consumed even on error
if result is not None:
await (
result.consume()
) # Ensure the result is consumed even on error
raise

async def get_node(self, node_id: str) -> dict[str, str] | None:
Expand Down Expand Up @@ -312,6 +320,7 @@ async def get_all_labels(self) -> list[str]:
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
result = None
try:
workspace_label = self._get_workspace_label()
query = f"""
Expand All @@ -328,7 +337,10 @@ async def get_all_labels(self) -> list[str]:
return labels
except Exception as e:
logger.error(f"[{self.workspace}] Error getting all labels: {str(e)}")
await result.consume() # Ensure the result is consumed even on error
if result is not None:
await (
result.consume()
) # Ensure the result is consumed even on error
raise

async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None:
Expand All @@ -352,6 +364,7 @@ async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | N
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
results = None
try:
workspace_label = self._get_workspace_label()
query = f"""MATCH (n:`{workspace_label}` {{entity_id: $entity_id}})
Expand Down Expand Up @@ -389,7 +402,10 @@ async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | N
logger.error(
f"[{self.workspace}] Error getting edges for node {source_node_id}: {str(e)}"
)
await results.consume() # Ensure results are consumed even on error
if results is not None:
await (
results.consume()
) # Ensure results are consumed even on error
raise
except Exception as e:
logger.error(
Expand Down Expand Up @@ -419,6 +435,7 @@ async def get_edge(
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
result = None
try:
workspace_label = self._get_workspace_label()
query = f"""
Expand Down Expand Up @@ -451,7 +468,10 @@ async def get_edge(
logger.error(
f"[{self.workspace}] Error getting edge between {source_node_id} and {target_node_id}: {str(e)}"
)
await result.consume() # Ensure the result is consumed even on error
if result is not None:
await (
result.consume()
) # Ensure the result is consumed even on error
raise

async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None:
Expand Down Expand Up @@ -1030,11 +1050,12 @@ async def get_popular_labels(self, limit: int = 300) -> list[str]:
"Memgraph driver is not initialized. Call 'await initialize()' first."
)

try:
workspace_label = self._get_workspace_label()
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
workspace_label = self._get_workspace_label()
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
result = None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Restore outer try/except in Memgraph popular label lookup

get_popular_labels() previously wrapped _get_workspace_label() and the session acquisition in a try/except that logged failures and returned an empty list. The new implementation performs those steps outside the try block, so a driver connection error or _get_workspace_label() failure now propagates as an unhandled exception and will bubble up to callers instead of returning [] as before. This is a behavioral regression and contradicts the commit’s intent of a purely defensive change.

Useful? React with πŸ‘Β / πŸ‘Ž.

try:
query = f"""
MATCH (n:`{workspace_label}`)
WHERE n.entity_id IS NOT NULL
Expand All @@ -1054,9 +1075,13 @@ async def get_popular_labels(self, limit: int = 300) -> list[str]:
f"[{self.workspace}] Retrieved {len(labels)} popular labels (limit: {limit})"
)
return labels
except Exception as e:
logger.error(f"[{self.workspace}] Error getting popular labels: {str(e)}")
return []
except Exception as e:
logger.error(
f"[{self.workspace}] Error getting popular labels: {str(e)}"
)
if result is not None:
await result.consume()
return []

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve empty-list fallback when session acquisition fails

The previous version wrapped the entire method in a try/except and returned [] on any error. By moving the try block inside the async with context, failures raised while opening the Memgraph session (e.g. network outage or authentication error) now bypass the handler and bubble up to callers. That changes the public behaviour from β€œlog and return an empty list” to propagating exceptions, which can crash code paths that previously tolerated a down graph store. Consider restoring the broader try/except so that session acquisition errors are still logged and handled consistently.

Useful? React with πŸ‘Β / πŸ‘Ž.


async def search_labels(self, query: str, limit: int = 50) -> list[str]:
"""Search labels with fuzzy matching
Expand All @@ -1078,11 +1103,12 @@ async def search_labels(self, query: str, limit: int = 50) -> list[str]:
if not query_lower:
return []

try:
workspace_label = self._get_workspace_label()
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
workspace_label = self._get_workspace_label()
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
result = None
try:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Maintain graceful failure in Memgraph label search

search_labels() likewise moved _get_workspace_label() and the session context creation outside its try/except. When Memgraph is unreachable or the helper raises, the method now bubbles the exception instead of logging and returning an empty list as it did before, causing callers to receive hard failures rather than a safe fallback.

Useful? React with πŸ‘Β / πŸ‘Ž.

cypher_query = f"""
MATCH (n:`{workspace_label}`)
WHERE n.entity_id IS NOT NULL
Expand All @@ -1109,6 +1135,8 @@ async def search_labels(self, query: str, limit: int = 50) -> list[str]:
f"[{self.workspace}] Search query '{query}' returned {len(labels)} results (limit: {limit})"
)
return labels
except Exception as e:
logger.error(f"[{self.workspace}] Error searching labels: {str(e)}")
return []
except Exception as e:
logger.error(f"[{self.workspace}] Error searching labels: {str(e)}")
if result is not None:
await result.consume()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Search labels now throws on connection errors instead of returning []

Similar to get_popular_labels, the try/except is now nested inside the session block, so any exception while acquiring the Memgraph session (or resolving the workspace label) is no longer caught and the method raises instead of returning [] as it did before. Callers relying on the graceful fallback will now see unhandled exceptions when the database is unavailable. Wrapping the session creation in the same try/except would keep the previous contract.

Useful? React with πŸ‘Β / πŸ‘Ž.

return []
18 changes: 14 additions & 4 deletions lightrag/kg/neo4j_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ async def has_node(self, node_id: str) -> bool:
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
result = None
try:
query = f"MATCH (n:`{workspace_label}` {{entity_id: $entity_id}}) RETURN count(n) > 0 AS node_exists"
result = await session.run(query, entity_id=node_id)
Expand All @@ -381,7 +382,8 @@ async def has_node(self, node_id: str) -> bool:
logger.error(
f"[{self.workspace}] Error checking node existence for {node_id}: {str(e)}"
)
await result.consume() # Ensure results are consumed even on error
if result is not None:
await result.consume() # Ensure results are consumed even on error
raise

async def has_edge(self, source_node_id: str, target_node_id: str) -> bool:
Expand All @@ -403,6 +405,7 @@ async def has_edge(self, source_node_id: str, target_node_id: str) -> bool:
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
result = None
try:
query = (
f"MATCH (a:`{workspace_label}` {{entity_id: $source_entity_id}})-[r]-(b:`{workspace_label}` {{entity_id: $target_entity_id}}) "
Expand All @@ -420,7 +423,8 @@ async def has_edge(self, source_node_id: str, target_node_id: str) -> bool:
logger.error(
f"[{self.workspace}] Error checking edge existence between {source_node_id} and {target_node_id}: {str(e)}"
)
await result.consume() # Ensure results are consumed even on error
if result is not None:
await result.consume() # Ensure results are consumed even on error
raise

async def get_node(self, node_id: str) -> dict[str, str] | None:
Expand Down Expand Up @@ -799,6 +803,7 @@ async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | N
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
results = None
try:
workspace_label = self._get_workspace_label()
query = f"""MATCH (n:`{workspace_label}` {{entity_id: $entity_id}})
Expand Down Expand Up @@ -836,7 +841,10 @@ async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | N
logger.error(
f"[{self.workspace}] Error getting edges for node {source_node_id}: {str(e)}"
)
await results.consume() # Ensure results are consumed even on error
if results is not None:
await (
results.consume()
) # Ensure results are consumed even on error
raise
except Exception as e:
logger.error(
Expand Down Expand Up @@ -1592,6 +1600,7 @@ async def get_popular_labels(self, limit: int = 300) -> list[str]:
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
result = None
try:
query = f"""
MATCH (n:`{workspace_label}`)
Expand All @@ -1616,7 +1625,8 @@ async def get_popular_labels(self, limit: int = 300) -> list[str]:
logger.error(
f"[{self.workspace}] Error getting popular labels: {str(e)}"
)
await result.consume()
if result is not None:
await result.consume()
raise

async def search_labels(self, query: str, limit: int = 50) -> list[str]:
Expand Down