Skip to content

Handled deadlock errors in executing cypher query #1187

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/score.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ async def retry_processing(uri=Form(None), userName=Form(None), password=Form(No
try:
start = time.time()
graph = create_graph_database_connection(uri, userName, password, database)
chunks = graph.query(QUERY_TO_GET_CHUNKS, params={"filename":file_name})
chunks = execute_graph_query(graph,QUERY_TO_GET_CHUNKS,params={"filename":file_name})
end = time.time()
elapsed_time = end - start
json_obj = {'api_name':'retry_processing', 'db_url':uri, 'userName':userName, 'database':database, 'file_name':file_name,'retry_condition':retry_condition,
Expand Down
18 changes: 16 additions & 2 deletions backend/src/graphDB_dataAccess.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
import os
import time
from neo4j.exceptions import TransientError
from langchain_neo4j import Neo4jGraph
from src.shared.common_fn import create_gcs_bucket_folder_name_hashed, delete_uploaded_local_file, load_embedding_model
from src.document_sources.gcs_bucket import delete_file_from_gcs
Expand Down Expand Up @@ -254,8 +256,20 @@ def connection_check_and_get_vector_dimensions(self,database):
else:
return {'message':"Connection Successful","gds_status": gds_status,"write_access":write_access}

def execute_query(self, query, param=None):
return self.graph.query(query, param)
def execute_query(self, query, param=None,max_retries=3, delay=2):
retries = 0
while retries < max_retries:
try:
return self.graph.query(query, param)
except TransientError as e:
if "DeadlockDetected" in str(e):
retries += 1
logging.info(f"Deadlock detected. Retrying {retries}/{max_retries} in {delay} seconds...")
time.sleep(delay) # Wait before retrying
else:
raise
logging.error("Failed to execute query after maximum retries due to persistent deadlocks.")
raise RuntimeError("Query execution failed after multiple retries due to deadlock.")

def get_current_status_document_node(self, file_name):
query = """
Expand Down
10 changes: 5 additions & 5 deletions backend/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ async def processing_source(uri, userName, password, database, model, file_name,
obj_source_node.processing_time = processed_time
obj_source_node.processed_chunk = select_chunks_upto+select_chunks_with_retry
if retry_condition == START_FROM_BEGINNING:
result = graph.query(QUERY_TO_GET_NODES_AND_RELATIONS_OF_A_DOCUMENT, params={"filename":file_name})
result = execute_graph_query(graph,QUERY_TO_GET_NODES_AND_RELATIONS_OF_A_DOCUMENT, params={"filename":file_name})
obj_source_node.node_count = result[0]['nodes']
obj_source_node.relationship_count = result[0]['rels']
else:
Expand Down Expand Up @@ -539,7 +539,7 @@ def get_chunkId_chunkDoc_list(graph, file_name, pages, token_chunk_size, chunk_o

else:
chunkId_chunkDoc_list=[]
chunks = graph.query(QUERY_TO_GET_CHUNKS, params={"filename":file_name})
chunks = execute_graph_query(graph,QUERY_TO_GET_CHUNKS, params={"filename":file_name})

if chunks[0]['text'] is None or chunks[0]['text']=="" or not chunks :
raise LLMGraphBuilderException(f"Chunks are not created for {file_name}. Please re-upload file and try again.")
Expand All @@ -550,13 +550,13 @@ def get_chunkId_chunkDoc_list(graph, file_name, pages, token_chunk_size, chunk_o

if retry_condition == START_FROM_LAST_PROCESSED_POSITION:
logging.info(f"Retry : start_from_last_processed_position")
starting_chunk = graph.query(QUERY_TO_GET_LAST_PROCESSED_CHUNK_POSITION, params={"filename":file_name})
starting_chunk = execute_graph_query(graph,QUERY_TO_GET_LAST_PROCESSED_CHUNK_POSITION, params={"filename":file_name})

if starting_chunk and starting_chunk[0]["position"] < len(chunkId_chunkDoc_list):
return len(chunks), chunkId_chunkDoc_list[starting_chunk[0]["position"] - 1:]

elif starting_chunk and starting_chunk[0]["position"] == len(chunkId_chunkDoc_list):
starting_chunk = graph.query(QUERY_TO_GET_LAST_PROCESSED_CHUNK_WITHOUT_ENTITY, params={"filename":file_name})
starting_chunk = execute_graph_query(graph,QUERY_TO_GET_LAST_PROCESSED_CHUNK_WITHOUT_ENTITY, params={"filename":file_name})
return len(chunks), chunkId_chunkDoc_list[starting_chunk[0]["position"] - 1:]

else:
Expand Down Expand Up @@ -741,7 +741,7 @@ def set_status_retry(graph, file_name, retry_condition):
if retry_condition == DELETE_ENTITIES_AND_START_FROM_BEGINNING or retry_condition == START_FROM_BEGINNING:
obj_source_node.processed_chunk=0
if retry_condition == DELETE_ENTITIES_AND_START_FROM_BEGINNING:
graph.query(QUERY_TO_DELETE_EXISTING_ENTITIES, params={"filename":file_name})
execute_graph_query(graph,QUERY_TO_DELETE_EXISTING_ENTITIES, params={"filename":file_name})
obj_source_node.node_count=0
obj_source_node.relationship_count=0
logging.info(obj_source_node)
Expand Down
19 changes: 9 additions & 10 deletions backend/src/make_relationships.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from langchain_neo4j import Neo4jGraph
from langchain.docstore.document import Document
from src.shared.common_fn import load_embedding_model
from src.shared.common_fn import load_embedding_model,execute_graph_query
import logging
from typing import List
import os
Expand Down Expand Up @@ -33,7 +33,7 @@ def merge_relationship_between_chunk_and_entites(graph: Neo4jGraph, graph_docume
CALL apoc.merge.node([data.node_type], {id: data.node_id}) YIELD node AS n
MERGE (c)-[:HAS_ENTITY]->(n)
"""
graph.query(unwind_query, params={"batch_data": batch_data})
execute_graph_query(graph,unwind_query, params={"batch_data": batch_data})


def create_chunk_embeddings(graph, chunkId_chunkDoc_list, file_name):
Expand All @@ -59,7 +59,7 @@ def create_chunk_embeddings(graph, chunkId_chunkDoc_list, file_name):
SET c.embedding = row.embeddings
MERGE (c)-[:PART_OF]->(d)
"""
graph.query(query_to_create_embedding, params={"fileName":file_name, "data":data_for_query})
execute_graph_query(graph,query_to_create_embedding, params={"fileName":file_name, "data":data_for_query})

def create_relation_between_chunks(graph, file_name, chunks: List[Document])->list:
logging.info("creating FIRST_CHUNK and NEXT_CHUNK relationships between chunks")
Expand Down Expand Up @@ -127,7 +127,7 @@ def create_relation_between_chunks(graph, file_name, chunks: List[Document])->li
MATCH (d:Document {fileName: data.f_name})
MERGE (c)-[:PART_OF]->(d)
"""
graph.query(query_to_create_chunk_and_PART_OF_relation, params={"batch_data": batch_data})
execute_graph_query(graph,query_to_create_chunk_and_PART_OF_relation, params={"batch_data": batch_data})

query_to_create_FIRST_relation = """
UNWIND $relationships AS relationship
Expand All @@ -136,7 +136,7 @@ def create_relation_between_chunks(graph, file_name, chunks: List[Document])->li
FOREACH(r IN CASE WHEN relationship.type = 'FIRST_CHUNK' THEN [1] ELSE [] END |
MERGE (d)-[:FIRST_CHUNK]->(c))
"""
graph.query(query_to_create_FIRST_relation, params={"f_name": file_name, "relationships": relationships})
execute_graph_query(graph,query_to_create_FIRST_relation, params={"f_name": file_name, "relationships": relationships})

query_to_create_NEXT_CHUNK_relation = """
UNWIND $relationships AS relationship
Expand All @@ -145,17 +145,16 @@ def create_relation_between_chunks(graph, file_name, chunks: List[Document])->li
MATCH (pc:Chunk {id: relationship.previous_chunk_id})
FOREACH(r IN CASE WHEN relationship.type = 'NEXT_CHUNK' THEN [1] ELSE [] END |
MERGE (c)<-[:NEXT_CHUNK]-(pc))
"""
graph.query(query_to_create_NEXT_CHUNK_relation, params={"relationships": relationships})

"""
execute_graph_query(graph,query_to_create_NEXT_CHUNK_relation, params={"relationships": relationships})
return lst_chunks_including_hash


def create_chunk_vector_index(graph):
start_time = time.time()
try:
vector_index = graph.query("SHOW INDEXES YIELD * WHERE labelsOrTypes = ['Chunk'] and type = 'VECTOR' AND name = 'vector' return options")

vector_index_query = "SHOW INDEXES YIELD * WHERE labelsOrTypes = ['Chunk'] and type = 'VECTOR' AND name = 'vector' return options"
vector_index = execute_graph_query(graph,vector_index_query)
if not vector_index:
vector_store = Neo4jVector(embedding=EMBEDDING_FUNCTION,
graph=graph,
Expand Down
14 changes: 7 additions & 7 deletions backend/src/post_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from langchain_neo4j import Neo4jGraph
import os
from src.graph_query import get_graphDB_driver
from src.shared.common_fn import load_embedding_model
from src.shared.common_fn import load_embedding_model,execute_graph_query
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate
from src.shared.constants import GRAPH_CLEANUP_PROMPT
Expand Down Expand Up @@ -179,8 +179,8 @@ def fetch_entities_for_embedding(graph):
MATCH (e)
WHERE NOT (e:Chunk OR e:Document OR e:`__Community__`) AND e.embedding IS NULL AND e.id IS NOT NULL
RETURN elementId(e) AS elementId, e.id + " " + coalesce(e.description, "") AS text
"""
result = graph.query(query)
"""
result = execute_graph_query(graph,query)
return [{"elementId": record["elementId"], "text": record["text"]} for record in result]

def update_embeddings(rows, graph):
Expand All @@ -194,7 +194,7 @@ def update_embeddings(rows, graph):
MATCH (e) WHERE elementId(e) = row.elementId
CALL db.create.setNodeVectorProperty(e, "embedding", row.embedding)
"""
return graph.query(query,params={'rows':rows})
return execute_graph_query(graph,query,params={'rows':rows})

def graph_schema_consolidation(graph):
graphDb_data_Access = graphDBdataAccess(graph)
Expand Down Expand Up @@ -223,14 +223,14 @@ def graph_schema_consolidation(graph):
SET n:`{new_label}`
REMOVE n:`{old_label}`
"""
graph.query(query)
execute_graph_query(graph,query)

for old_label, new_label in relation_mapping.items():
query = f"""
MATCH (n)-[r:`{old_label}`]->(m)
CREATE (n)-[r2:`{new_label}`]->(m)
DELETE r
"""
graph.query(query)
execute_graph_query(graph,query)

return None
37 changes: 33 additions & 4 deletions backend/src/shared/common_fn.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
from langchain_google_vertexai import VertexAIEmbeddings
from langchain_openai import OpenAIEmbeddings
from langchain_neo4j import Neo4jGraph
from neo4j.exceptions import TransientError
from langchain_community.graphs.graph_document import GraphDocument
from typing import List
import re
import os
import time
from pathlib import Path
from urllib.parse import urlparse
import boto3
Expand Down Expand Up @@ -90,10 +92,22 @@ def load_embedding_model(embedding_model_name: str):
logging.info(f"Embedding: Using Langchain HuggingFaceEmbeddings , Dimension:{dimension}")
return embeddings, dimension

def save_graphDocuments_in_neo4j(graph:Neo4jGraph, graph_document_list:List[GraphDocument]):
graph.add_graph_documents(graph_document_list, baseEntityLabel=True)
# graph.add_graph_documents(graph_document_list)

def save_graphDocuments_in_neo4j(graph: Neo4jGraph, graph_document_list: List[GraphDocument], max_retries=3, delay=1):
retries = 0
while retries < max_retries:
try:
graph.add_graph_documents(graph_document_list, baseEntityLabel=True)
return
except TransientError as e:
if "DeadlockDetected" in str(e):
retries += 1
logging.info(f"Deadlock detected. Retrying {retries}/{max_retries} in {delay} seconds...")
time.sleep(delay) # Wait before retrying
else:
raise
logging.error("Failed to execute query after maximum retries due to persistent deadlocks.")
raise RuntimeError("Query execution failed after multiple retries due to deadlock.")

def handle_backticks_nodes_relationship_id_type(graph_document_list:List[GraphDocument]):
for graph_document in graph_document_list:
# Clean node id and types
Expand All @@ -114,6 +128,21 @@ def handle_backticks_nodes_relationship_id_type(graph_document_list:List[GraphDo
graph_document.nodes = cleaned_nodes
return graph_document_list

def execute_graph_query(graph: Neo4jGraph, query, params=None, max_retries=3, delay=2):
retries = 0
while retries < max_retries:
try:
return graph.query(query, params)
except TransientError as e:
if "DeadlockDetected" in str(e):
retries += 1
logging.info(f"Deadlock detected. Retrying {retries}/{max_retries} in {delay} seconds...")
time.sleep(delay) # Wait before retrying
else:
raise
logging.error("Failed to execute query after maximum retries due to persistent deadlocks.")
raise RuntimeError("Query execution failed after multiple retries due to deadlock.")

def delete_uploaded_local_file(merged_file_path, file_name):
file_path = Path(merged_file_path)
if file_path.exists():
Expand Down