diff --git a/backend/score.py b/backend/score.py index e668788c3..31e2c76d9 100644 --- a/backend/score.py +++ b/backend/score.py @@ -894,7 +894,7 @@ async def retry_processing(uri=Form(None), userName=Form(None), password=Form(No try: start = time.time() graph = create_graph_database_connection(uri, userName, password, database) - chunks = graph.query(QUERY_TO_GET_CHUNKS, params={"filename":file_name}) + chunks = execute_graph_query(graph,QUERY_TO_GET_CHUNKS,params={"filename":file_name}) end = time.time() elapsed_time = end - start json_obj = {'api_name':'retry_processing', 'db_url':uri, 'userName':userName, 'database':database, 'file_name':file_name,'retry_condition':retry_condition, diff --git a/backend/src/graphDB_dataAccess.py b/backend/src/graphDB_dataAccess.py index 6f5365498..fde17859a 100644 --- a/backend/src/graphDB_dataAccess.py +++ b/backend/src/graphDB_dataAccess.py @@ -1,5 +1,7 @@ import logging import os +import time +from neo4j.exceptions import TransientError from langchain_neo4j import Neo4jGraph from src.shared.common_fn import create_gcs_bucket_folder_name_hashed, delete_uploaded_local_file, load_embedding_model from src.document_sources.gcs_bucket import delete_file_from_gcs @@ -254,8 +256,20 @@ def connection_check_and_get_vector_dimensions(self,database): else: return {'message':"Connection Successful","gds_status": gds_status,"write_access":write_access} - def execute_query(self, query, param=None): - return self.graph.query(query, param) + def execute_query(self, query, param=None,max_retries=3, delay=2): + retries = 0 + while retries < max_retries: + try: + return self.graph.query(query, param) + except TransientError as e: + if "DeadlockDetected" in str(e): + retries += 1 + logging.info(f"Deadlock detected. Retrying {retries}/{max_retries} in {delay} seconds...") + time.sleep(delay) # Wait before retrying + else: + raise + logging.error("Failed to execute query after maximum retries due to persistent deadlocks.") + raise RuntimeError("Query execution failed after multiple retries due to deadlock.") def get_current_status_document_node(self, file_name): query = """ diff --git a/backend/src/main.py b/backend/src/main.py index c21e26f5a..54cfff298 100644 --- a/backend/src/main.py +++ b/backend/src/main.py @@ -400,7 +400,7 @@ async def processing_source(uri, userName, password, database, model, file_name, obj_source_node.processing_time = processed_time obj_source_node.processed_chunk = select_chunks_upto+select_chunks_with_retry if retry_condition == START_FROM_BEGINNING: - result = graph.query(QUERY_TO_GET_NODES_AND_RELATIONS_OF_A_DOCUMENT, params={"filename":file_name}) + result = execute_graph_query(graph,QUERY_TO_GET_NODES_AND_RELATIONS_OF_A_DOCUMENT, params={"filename":file_name}) obj_source_node.node_count = result[0]['nodes'] obj_source_node.relationship_count = result[0]['rels'] else: @@ -539,7 +539,7 @@ def get_chunkId_chunkDoc_list(graph, file_name, pages, token_chunk_size, chunk_o else: chunkId_chunkDoc_list=[] - chunks = graph.query(QUERY_TO_GET_CHUNKS, params={"filename":file_name}) + chunks = execute_graph_query(graph,QUERY_TO_GET_CHUNKS, params={"filename":file_name}) if chunks[0]['text'] is None or chunks[0]['text']=="" or not chunks : raise LLMGraphBuilderException(f"Chunks are not created for {file_name}. Please re-upload file and try again.") @@ -550,13 +550,13 @@ def get_chunkId_chunkDoc_list(graph, file_name, pages, token_chunk_size, chunk_o if retry_condition == START_FROM_LAST_PROCESSED_POSITION: logging.info(f"Retry : start_from_last_processed_position") - starting_chunk = graph.query(QUERY_TO_GET_LAST_PROCESSED_CHUNK_POSITION, params={"filename":file_name}) + starting_chunk = execute_graph_query(graph,QUERY_TO_GET_LAST_PROCESSED_CHUNK_POSITION, params={"filename":file_name}) if starting_chunk and starting_chunk[0]["position"] < len(chunkId_chunkDoc_list): return len(chunks), chunkId_chunkDoc_list[starting_chunk[0]["position"] - 1:] elif starting_chunk and starting_chunk[0]["position"] == len(chunkId_chunkDoc_list): - starting_chunk = graph.query(QUERY_TO_GET_LAST_PROCESSED_CHUNK_WITHOUT_ENTITY, params={"filename":file_name}) + starting_chunk = execute_graph_query(graph,QUERY_TO_GET_LAST_PROCESSED_CHUNK_WITHOUT_ENTITY, params={"filename":file_name}) return len(chunks), chunkId_chunkDoc_list[starting_chunk[0]["position"] - 1:] else: @@ -741,7 +741,7 @@ def set_status_retry(graph, file_name, retry_condition): if retry_condition == DELETE_ENTITIES_AND_START_FROM_BEGINNING or retry_condition == START_FROM_BEGINNING: obj_source_node.processed_chunk=0 if retry_condition == DELETE_ENTITIES_AND_START_FROM_BEGINNING: - graph.query(QUERY_TO_DELETE_EXISTING_ENTITIES, params={"filename":file_name}) + execute_graph_query(graph,QUERY_TO_DELETE_EXISTING_ENTITIES, params={"filename":file_name}) obj_source_node.node_count=0 obj_source_node.relationship_count=0 logging.info(obj_source_node) diff --git a/backend/src/make_relationships.py b/backend/src/make_relationships.py index bccfa1ddd..a07f29c9c 100644 --- a/backend/src/make_relationships.py +++ b/backend/src/make_relationships.py @@ -1,6 +1,6 @@ from langchain_neo4j import Neo4jGraph from langchain.docstore.document import Document -from src.shared.common_fn import load_embedding_model +from src.shared.common_fn import load_embedding_model,execute_graph_query import logging from typing import List import os @@ -33,7 +33,7 @@ def merge_relationship_between_chunk_and_entites(graph: Neo4jGraph, graph_docume CALL apoc.merge.node([data.node_type], {id: data.node_id}) YIELD node AS n MERGE (c)-[:HAS_ENTITY]->(n) """ - graph.query(unwind_query, params={"batch_data": batch_data}) + execute_graph_query(graph,unwind_query, params={"batch_data": batch_data}) def create_chunk_embeddings(graph, chunkId_chunkDoc_list, file_name): @@ -59,7 +59,7 @@ def create_chunk_embeddings(graph, chunkId_chunkDoc_list, file_name): SET c.embedding = row.embeddings MERGE (c)-[:PART_OF]->(d) """ - graph.query(query_to_create_embedding, params={"fileName":file_name, "data":data_for_query}) + execute_graph_query(graph,query_to_create_embedding, params={"fileName":file_name, "data":data_for_query}) def create_relation_between_chunks(graph, file_name, chunks: List[Document])->list: logging.info("creating FIRST_CHUNK and NEXT_CHUNK relationships between chunks") @@ -127,7 +127,7 @@ def create_relation_between_chunks(graph, file_name, chunks: List[Document])->li MATCH (d:Document {fileName: data.f_name}) MERGE (c)-[:PART_OF]->(d) """ - graph.query(query_to_create_chunk_and_PART_OF_relation, params={"batch_data": batch_data}) + execute_graph_query(graph,query_to_create_chunk_and_PART_OF_relation, params={"batch_data": batch_data}) query_to_create_FIRST_relation = """ UNWIND $relationships AS relationship @@ -136,7 +136,7 @@ def create_relation_between_chunks(graph, file_name, chunks: List[Document])->li FOREACH(r IN CASE WHEN relationship.type = 'FIRST_CHUNK' THEN [1] ELSE [] END | MERGE (d)-[:FIRST_CHUNK]->(c)) """ - graph.query(query_to_create_FIRST_relation, params={"f_name": file_name, "relationships": relationships}) + execute_graph_query(graph,query_to_create_FIRST_relation, params={"f_name": file_name, "relationships": relationships}) query_to_create_NEXT_CHUNK_relation = """ UNWIND $relationships AS relationship @@ -145,17 +145,16 @@ def create_relation_between_chunks(graph, file_name, chunks: List[Document])->li MATCH (pc:Chunk {id: relationship.previous_chunk_id}) FOREACH(r IN CASE WHEN relationship.type = 'NEXT_CHUNK' THEN [1] ELSE [] END | MERGE (c)<-[:NEXT_CHUNK]-(pc)) - """ - graph.query(query_to_create_NEXT_CHUNK_relation, params={"relationships": relationships}) - + """ + execute_graph_query(graph,query_to_create_NEXT_CHUNK_relation, params={"relationships": relationships}) return lst_chunks_including_hash def create_chunk_vector_index(graph): start_time = time.time() try: - vector_index = graph.query("SHOW INDEXES YIELD * WHERE labelsOrTypes = ['Chunk'] and type = 'VECTOR' AND name = 'vector' return options") - + vector_index_query = "SHOW INDEXES YIELD * WHERE labelsOrTypes = ['Chunk'] and type = 'VECTOR' AND name = 'vector' return options" + vector_index = execute_graph_query(graph,vector_index_query) if not vector_index: vector_store = Neo4jVector(embedding=EMBEDDING_FUNCTION, graph=graph, diff --git a/backend/src/post_processing.py b/backend/src/post_processing.py index cdc8b06d3..0865c5ad3 100644 --- a/backend/src/post_processing.py +++ b/backend/src/post_processing.py @@ -4,7 +4,7 @@ from langchain_neo4j import Neo4jGraph import os from src.graph_query import get_graphDB_driver -from src.shared.common_fn import load_embedding_model +from src.shared.common_fn import load_embedding_model,execute_graph_query from langchain_core.output_parsers import JsonOutputParser from langchain_core.prompts import ChatPromptTemplate from src.shared.constants import GRAPH_CLEANUP_PROMPT @@ -179,8 +179,8 @@ def fetch_entities_for_embedding(graph): MATCH (e) WHERE NOT (e:Chunk OR e:Document OR e:`__Community__`) AND e.embedding IS NULL AND e.id IS NOT NULL RETURN elementId(e) AS elementId, e.id + " " + coalesce(e.description, "") AS text - """ - result = graph.query(query) + """ + result = execute_graph_query(graph,query) return [{"elementId": record["elementId"], "text": record["text"]} for record in result] def update_embeddings(rows, graph): @@ -194,7 +194,7 @@ def update_embeddings(rows, graph): MATCH (e) WHERE elementId(e) = row.elementId CALL db.create.setNodeVectorProperty(e, "embedding", row.embedding) """ - return graph.query(query,params={'rows':rows}) + return execute_graph_query(graph,query,params={'rows':rows}) def graph_schema_consolidation(graph): graphDb_data_Access = graphDBdataAccess(graph) @@ -223,14 +223,14 @@ def graph_schema_consolidation(graph): SET n:`{new_label}` REMOVE n:`{old_label}` """ - graph.query(query) - + execute_graph_query(graph,query) + for old_label, new_label in relation_mapping.items(): query = f""" MATCH (n)-[r:`{old_label}`]->(m) CREATE (n)-[r2:`{new_label}`]->(m) DELETE r """ - graph.query(query) + execute_graph_query(graph,query) return None diff --git a/backend/src/shared/common_fn.py b/backend/src/shared/common_fn.py index d95626bb3..6f394bb24 100644 --- a/backend/src/shared/common_fn.py +++ b/backend/src/shared/common_fn.py @@ -5,10 +5,12 @@ from langchain_google_vertexai import VertexAIEmbeddings from langchain_openai import OpenAIEmbeddings from langchain_neo4j import Neo4jGraph +from neo4j.exceptions import TransientError from langchain_community.graphs.graph_document import GraphDocument from typing import List import re import os +import time from pathlib import Path from urllib.parse import urlparse import boto3 @@ -90,10 +92,22 @@ def load_embedding_model(embedding_model_name: str): logging.info(f"Embedding: Using Langchain HuggingFaceEmbeddings , Dimension:{dimension}") return embeddings, dimension -def save_graphDocuments_in_neo4j(graph:Neo4jGraph, graph_document_list:List[GraphDocument]): - graph.add_graph_documents(graph_document_list, baseEntityLabel=True) - # graph.add_graph_documents(graph_document_list) - +def save_graphDocuments_in_neo4j(graph: Neo4jGraph, graph_document_list: List[GraphDocument], max_retries=3, delay=1): + retries = 0 + while retries < max_retries: + try: + graph.add_graph_documents(graph_document_list, baseEntityLabel=True) + return + except TransientError as e: + if "DeadlockDetected" in str(e): + retries += 1 + logging.info(f"Deadlock detected. Retrying {retries}/{max_retries} in {delay} seconds...") + time.sleep(delay) # Wait before retrying + else: + raise + logging.error("Failed to execute query after maximum retries due to persistent deadlocks.") + raise RuntimeError("Query execution failed after multiple retries due to deadlock.") + def handle_backticks_nodes_relationship_id_type(graph_document_list:List[GraphDocument]): for graph_document in graph_document_list: # Clean node id and types @@ -114,6 +128,21 @@ def handle_backticks_nodes_relationship_id_type(graph_document_list:List[GraphDo graph_document.nodes = cleaned_nodes return graph_document_list +def execute_graph_query(graph: Neo4jGraph, query, params=None, max_retries=3, delay=2): + retries = 0 + while retries < max_retries: + try: + return graph.query(query, params) + except TransientError as e: + if "DeadlockDetected" in str(e): + retries += 1 + logging.info(f"Deadlock detected. Retrying {retries}/{max_retries} in {delay} seconds...") + time.sleep(delay) # Wait before retrying + else: + raise + logging.error("Failed to execute query after maximum retries due to persistent deadlocks.") + raise RuntimeError("Query execution failed after multiple retries due to deadlock.") + def delete_uploaded_local_file(merged_file_path, file_name): file_path = Path(merged_file_path) if file_path.exists():