|
| 1 | +import os |
| 2 | +import shutil |
| 3 | +import pytest |
| 4 | +import networkx as nx |
| 5 | +import numpy as np |
| 6 | +import asyncio |
| 7 | +import json |
| 8 | +from nano_graphrag import GraphRAG |
| 9 | +from nano_graphrag._storage import NetworkXStorage |
| 10 | +from nano_graphrag._utils import wrap_embedding_func_with_attrs |
| 11 | + |
| 12 | +WORKING_DIR = "./tests/nano_graphrag_cache_networkx_storage_test" |
| 13 | + |
| 14 | + |
| 15 | +@pytest.fixture(scope="function") |
| 16 | +def setup_teardown(): |
| 17 | + if os.path.exists(WORKING_DIR): |
| 18 | + shutil.rmtree(WORKING_DIR) |
| 19 | + os.mkdir(WORKING_DIR) |
| 20 | + |
| 21 | + yield |
| 22 | + |
| 23 | + shutil.rmtree(WORKING_DIR) |
| 24 | + |
| 25 | + |
| 26 | +@wrap_embedding_func_with_attrs(embedding_dim=384, max_token_size=8192) |
| 27 | +async def mock_embedding(texts: list[str]) -> np.ndarray: |
| 28 | + return np.random.rand(len(texts), 384) |
| 29 | + |
| 30 | + |
| 31 | +@pytest.fixture |
| 32 | +def networkx_storage(setup_teardown): |
| 33 | + rag = GraphRAG(working_dir=WORKING_DIR, embedding_func=mock_embedding) |
| 34 | + return NetworkXStorage( |
| 35 | + namespace="test", |
| 36 | + global_config=rag.__dict__, |
| 37 | + ) |
| 38 | + |
| 39 | + |
| 40 | +@pytest.mark.asyncio |
| 41 | +async def test_upsert_and_get_node(networkx_storage): |
| 42 | + node_id = "node1" |
| 43 | + node_data = {"attr1": "value1", "attr2": "value2"} |
| 44 | + |
| 45 | + await networkx_storage.upsert_node(node_id, node_data) |
| 46 | + |
| 47 | + result = await networkx_storage.get_node(node_id) |
| 48 | + assert result == node_data |
| 49 | + |
| 50 | + has_node = await networkx_storage.has_node(node_id) |
| 51 | + assert has_node is True |
| 52 | + |
| 53 | + |
| 54 | +@pytest.mark.asyncio |
| 55 | +async def test_upsert_and_get_edge(networkx_storage): |
| 56 | + source_id = "node1" |
| 57 | + target_id = "node2" |
| 58 | + edge_data = {"weight": 1.0, "type": "connection"} |
| 59 | + |
| 60 | + await networkx_storage.upsert_node(source_id, {}) |
| 61 | + await networkx_storage.upsert_node(target_id, {}) |
| 62 | + await networkx_storage.upsert_edge(source_id, target_id, edge_data) |
| 63 | + |
| 64 | + result = await networkx_storage.get_edge(source_id, target_id) |
| 65 | + assert result == edge_data |
| 66 | + |
| 67 | + has_edge = await networkx_storage.has_edge(source_id, target_id) |
| 68 | + assert has_edge is True |
| 69 | + |
| 70 | + |
| 71 | +@pytest.mark.asyncio |
| 72 | +async def test_node_degree(networkx_storage): |
| 73 | + node_id = "center" |
| 74 | + await networkx_storage.upsert_node(node_id, {}) |
| 75 | + |
| 76 | + num_neighbors = 5 |
| 77 | + for i in range(num_neighbors): |
| 78 | + neighbor_id = f"neighbor{i}" |
| 79 | + await networkx_storage.upsert_node(neighbor_id, {}) |
| 80 | + await networkx_storage.upsert_edge(node_id, neighbor_id, {}) |
| 81 | + |
| 82 | + degree = await networkx_storage.node_degree(node_id) |
| 83 | + assert degree == num_neighbors |
| 84 | + |
| 85 | + |
| 86 | +@pytest.mark.asyncio |
| 87 | +async def test_edge_degree(networkx_storage): |
| 88 | + source_id = "node1" |
| 89 | + target_id = "node2" |
| 90 | + |
| 91 | + await networkx_storage.upsert_node(source_id, {}) |
| 92 | + await networkx_storage.upsert_node(target_id, {}) |
| 93 | + await networkx_storage.upsert_edge(source_id, target_id, {}) |
| 94 | + |
| 95 | + num_source_neighbors = 3 |
| 96 | + for i in range(num_source_neighbors): |
| 97 | + neighbor_id = f"neighbor{i}" |
| 98 | + await networkx_storage.upsert_node(neighbor_id, {}) |
| 99 | + await networkx_storage.upsert_edge(source_id, neighbor_id, {}) |
| 100 | + |
| 101 | + num_target_neighbors = 2 |
| 102 | + for i in range(num_target_neighbors): |
| 103 | + neighbor_id = f"target_neighbor{i}" |
| 104 | + await networkx_storage.upsert_node(neighbor_id, {}) |
| 105 | + await networkx_storage.upsert_edge(target_id, neighbor_id, {}) |
| 106 | + |
| 107 | + expected_edge_degree = (num_source_neighbors + 1) + (num_target_neighbors + 1) |
| 108 | + edge_degree = await networkx_storage.edge_degree(source_id, target_id) |
| 109 | + assert edge_degree == expected_edge_degree |
| 110 | + |
| 111 | + |
| 112 | +@pytest.mark.asyncio |
| 113 | +async def test_get_node_edges(networkx_storage): |
| 114 | + center_id = "center" |
| 115 | + await networkx_storage.upsert_node(center_id, {}) |
| 116 | + |
| 117 | + expected_edges = [] |
| 118 | + for i in range(3): |
| 119 | + neighbor_id = f"neighbor{i}" |
| 120 | + await networkx_storage.upsert_node(neighbor_id, {}) |
| 121 | + await networkx_storage.upsert_edge(center_id, neighbor_id, {}) |
| 122 | + expected_edges.append((center_id, neighbor_id)) |
| 123 | + |
| 124 | + result = await networkx_storage.get_node_edges(center_id) |
| 125 | + assert set(result) == set(expected_edges) |
| 126 | + |
| 127 | + |
| 128 | +@pytest.mark.parametrize("algorithm", ["leiden"]) |
| 129 | +@pytest.mark.asyncio |
| 130 | +async def test_clustering(networkx_storage, algorithm): |
| 131 | + # [numberchiffre]: node ID is case-sensitive for clustering with leiden. |
| 132 | + for i in range(10): |
| 133 | + await networkx_storage.upsert_node(f"NODE{i}", {"source_id": f"chunk{i}"}) |
| 134 | + |
| 135 | + for i in range(9): |
| 136 | + await networkx_storage.upsert_edge(f"NODE{i}", f"NODE{i+1}", {}) |
| 137 | + |
| 138 | + assert networkx_storage._graph.number_of_nodes() > 0 |
| 139 | + assert networkx_storage._graph.number_of_edges() > 0 |
| 140 | + await networkx_storage.clustering(algorithm=algorithm) |
| 141 | + |
| 142 | + community_schema = await networkx_storage.community_schema() |
| 143 | + |
| 144 | + assert len(community_schema) > 0 |
| 145 | + |
| 146 | + for community in community_schema.values(): |
| 147 | + assert "level" in community |
| 148 | + assert "title" in community |
| 149 | + assert "edges" in community |
| 150 | + assert "nodes" in community |
| 151 | + assert "chunk_ids" in community |
| 152 | + assert "occurrence" in community |
| 153 | + assert "sub_communities" in community |
| 154 | + |
| 155 | + |
| 156 | +@pytest.mark.parametrize("algorithm", ["leiden"]) |
| 157 | +@pytest.mark.asyncio |
| 158 | +async def test_leiden_clustering_consistency(networkx_storage, algorithm): |
| 159 | + for i in range(10): |
| 160 | + await networkx_storage.upsert_node(f"NODE{i}", {"source_id": f"chunk{i}"}) |
| 161 | + for i in range(9): |
| 162 | + await networkx_storage.upsert_edge(f"NODE{i}", f"NODE{i+1}", {}) |
| 163 | + |
| 164 | + results = [] |
| 165 | + for _ in range(3): |
| 166 | + await networkx_storage.clustering(algorithm=algorithm) |
| 167 | + community_schema = await networkx_storage.community_schema() |
| 168 | + results.append(community_schema) |
| 169 | + |
| 170 | + assert all(len(r) == len(results[0]) for r in results), "Number of communities should be consistent" |
| 171 | + |
| 172 | + |
| 173 | +@pytest.mark.parametrize("algorithm", ["leiden"]) |
| 174 | +@pytest.mark.asyncio |
| 175 | +async def test_leiden_clustering_community_structure(networkx_storage, algorithm): |
| 176 | + for i in range(10): |
| 177 | + await networkx_storage.upsert_node(f"A{i}", {"source_id": f"chunkA{i}"}) |
| 178 | + await networkx_storage.upsert_node(f"B{i}", {"source_id": f"chunkB{i}"}) |
| 179 | + for i in range(9): |
| 180 | + await networkx_storage.upsert_edge(f"A{i}", f"A{i+1}", {}) |
| 181 | + await networkx_storage.upsert_edge(f"B{i}", f"B{i+1}", {}) |
| 182 | + |
| 183 | + await networkx_storage.clustering(algorithm=algorithm) |
| 184 | + community_schema = await networkx_storage.community_schema() |
| 185 | + |
| 186 | + assert len(community_schema) >= 2, "Should have at least two communities" |
| 187 | + |
| 188 | + communities = list(community_schema.values()) |
| 189 | + a_nodes = set(node for node in communities[0]['nodes'] if node.startswith('A')) |
| 190 | + b_nodes = set(node for node in communities[0]['nodes'] if node.startswith('B')) |
| 191 | + assert len(a_nodes) == 0 or len(b_nodes) == 0, "Nodes from different groups should be in different communities" |
| 192 | + |
| 193 | + |
| 194 | +@pytest.mark.parametrize("algorithm", ["leiden"]) |
| 195 | +@pytest.mark.asyncio |
| 196 | +async def test_leiden_clustering_hierarchical_structure(networkx_storage, algorithm): |
| 197 | + await networkx_storage.upsert_node("NODE1", {"source_id": "chunk1", "clusters": json.dumps([{"level": 0, "cluster": "0"}, {"level": 1, "cluster": "1"}])}) |
| 198 | + await networkx_storage.upsert_node("NODE2", {"source_id": "chunk2", "clusters": json.dumps([{"level": 0, "cluster": "0"}, {"level": 1, "cluster": "2"}])}) |
| 199 | + await networkx_storage.upsert_edge("NODE1", "NODE2", {}) |
| 200 | + await networkx_storage.clustering(algorithm=algorithm) |
| 201 | + community_schema = await networkx_storage.community_schema() |
| 202 | + |
| 203 | + levels = set(community['level'] for community in community_schema.values()) |
| 204 | + assert len(levels) >= 1, "Should have at least one level in the hierarchy" |
| 205 | + |
| 206 | + communities_per_level = {level: sum(1 for c in community_schema.values() if c['level'] == level) for level in levels} |
| 207 | + assert communities_per_level[0] >= communities_per_level.get(max(levels), 0), "Lower levels should have more or equal number of communities" |
| 208 | + |
| 209 | + |
| 210 | +@pytest.mark.asyncio |
| 211 | +async def test_persistence(setup_teardown): |
| 212 | + rag = GraphRAG(working_dir=WORKING_DIR, embedding_func=mock_embedding) |
| 213 | + initial_storage = NetworkXStorage( |
| 214 | + namespace="test_persistence", |
| 215 | + global_config=rag.__dict__, |
| 216 | + ) |
| 217 | + |
| 218 | + await initial_storage.upsert_node("node1", {"attr": "value"}) |
| 219 | + await initial_storage.upsert_node("node2", {"attr": "value"}) |
| 220 | + await initial_storage.upsert_edge("node1", "node2", {"weight": 1.0}) |
| 221 | + |
| 222 | + await initial_storage.index_done_callback() |
| 223 | + |
| 224 | + new_storage = NetworkXStorage( |
| 225 | + namespace="test_persistence", |
| 226 | + global_config=rag.__dict__, |
| 227 | + ) |
| 228 | + |
| 229 | + assert await new_storage.has_node("node1") |
| 230 | + assert await new_storage.has_node("node2") |
| 231 | + assert await new_storage.has_edge("node1", "node2") |
| 232 | + |
| 233 | + node1_data = await new_storage.get_node("node1") |
| 234 | + assert node1_data == {"attr": "value"} |
| 235 | + |
| 236 | + edge_data = await new_storage.get_edge("node1", "node2") |
| 237 | + assert edge_data == {"weight": 1.0} |
| 238 | + |
| 239 | + |
| 240 | +@pytest.mark.asyncio |
| 241 | +async def test_embed_nodes(networkx_storage): |
| 242 | + for i in range(5): |
| 243 | + await networkx_storage.upsert_node(f"node{i}", {"id": f"node{i}"}) |
| 244 | + |
| 245 | + for i in range(4): |
| 246 | + await networkx_storage.upsert_edge(f"node{i}", f"node{i+1}", {}) |
| 247 | + |
| 248 | + embeddings, node_ids = await networkx_storage.embed_nodes("node2vec") |
| 249 | + |
| 250 | + assert embeddings.shape == (5, networkx_storage.global_config['node2vec_params']['dimensions']) |
| 251 | + assert len(node_ids) == 5 |
| 252 | + assert all(f"node{i}" in node_ids for i in range(5)) |
| 253 | + |
| 254 | + |
| 255 | +@pytest.mark.asyncio |
| 256 | +async def test_stable_largest_connected_component_equal_components(): |
| 257 | + G = nx.Graph() |
| 258 | + G.add_edges_from([("A", "B"), ("C", "D"), ("E", "F")]) |
| 259 | + result = NetworkXStorage.stable_largest_connected_component(G) |
| 260 | + assert sorted(result.nodes()) == ["A", "B"] |
| 261 | + assert list(result.edges()) == [("A", "B")] |
| 262 | + |
| 263 | + |
| 264 | +@pytest.mark.asyncio |
| 265 | +async def test_stable_largest_connected_component_stability(): |
| 266 | + G = nx.Graph() |
| 267 | + G.add_edges_from([("A", "B"), ("B", "C"), ("C", "D"), ("E", "F")]) |
| 268 | + result1 = NetworkXStorage.stable_largest_connected_component(G) |
| 269 | + result2 = NetworkXStorage.stable_largest_connected_component(G) |
| 270 | + assert nx.is_isomorphic(result1, result2) |
| 271 | + assert list(result1.nodes()) == list(result2.nodes()) |
| 272 | + assert list(result1.edges()) == list(result2.edges()) |
| 273 | + |
| 274 | + |
| 275 | +@pytest.mark.asyncio |
| 276 | +async def test_stable_largest_connected_component_directed_graph(): |
| 277 | + G = nx.DiGraph() |
| 278 | + G.add_edges_from([("A", "B"), ("B", "C"), ("C", "D"), ("E", "F")]) |
| 279 | + result = NetworkXStorage.stable_largest_connected_component(G) |
| 280 | + assert sorted(result.nodes()) == ["A", "B", "C", "D"] |
| 281 | + assert sorted(result.edges()) == [("A", "B"), ("B", "C"), ("C", "D")] |
| 282 | + |
| 283 | + |
| 284 | +@pytest.mark.asyncio |
| 285 | +async def test_stable_largest_connected_component_self_loops_and_parallel_edges(): |
| 286 | + G = nx.Graph() |
| 287 | + G.add_edges_from([("A", "B"), ("B", "C"), ("C", "A"), ("A", "A"), ("B", "B"), ("A", "B")]) |
| 288 | + result = NetworkXStorage.stable_largest_connected_component(G) |
| 289 | + assert sorted(result.nodes()) == ["A", "B", "C"] |
| 290 | + assert sorted(result.edges()) == [('A', 'A'), ('A', 'B'), ('A', 'C'), ('B', 'B'), ('B', 'C')] |
| 291 | + |
| 292 | + |
| 293 | +@pytest.mark.asyncio |
| 294 | +async def test_community_schema_with_no_clusters(networkx_storage): |
| 295 | + await networkx_storage.upsert_node("node1", {"source_id": "chunk1"}) |
| 296 | + await networkx_storage.upsert_node("node2", {"source_id": "chunk2"}) |
| 297 | + await networkx_storage.upsert_edge("node1", "node2", {}) |
| 298 | + |
| 299 | + community_schema = await networkx_storage.community_schema() |
| 300 | + assert len(community_schema) == 0 |
| 301 | + |
| 302 | + |
| 303 | +@pytest.mark.asyncio |
| 304 | +async def test_community_schema_multiple_levels(networkx_storage): |
| 305 | + await networkx_storage.upsert_node("node1", {"source_id": "chunk1", "clusters": json.dumps([{"level": 0, "cluster": "0"}, {"level": 1, "cluster": "1"}])}) |
| 306 | + await networkx_storage.upsert_node("node2", {"source_id": "chunk2", "clusters": json.dumps([{"level": 0, "cluster": "0"}, {"level": 1, "cluster": "2"}])}) |
| 307 | + await networkx_storage.upsert_edge("node1", "node2", {}) |
| 308 | + |
| 309 | + community_schema = await networkx_storage.community_schema() |
| 310 | + assert len(community_schema) == 3 |
| 311 | + assert set(community_schema.keys()) == {"0", "1", "2"} |
| 312 | + assert community_schema["0"]["level"] == 0 |
| 313 | + assert community_schema["1"]["level"] == 1 |
| 314 | + assert community_schema["2"]["level"] == 1 |
| 315 | + assert set(community_schema["0"]["sub_communities"]) == {"1", "2"} |
| 316 | + |
| 317 | + |
| 318 | +@pytest.mark.asyncio |
| 319 | +async def test_community_schema_occurrence(networkx_storage): |
| 320 | + await networkx_storage.upsert_node("node1", {"source_id": "chunk1,chunk2", "clusters": json.dumps([{"level": 0, "cluster": "0"}])}) |
| 321 | + await networkx_storage.upsert_node("node2", {"source_id": "chunk3", "clusters": json.dumps([{"level": 0, "cluster": "0"}])}) |
| 322 | + await networkx_storage.upsert_node("node3", {"source_id": "chunk4", "clusters": json.dumps([{"level": 0, "cluster": "1"}])}) |
| 323 | + |
| 324 | + community_schema = await networkx_storage.community_schema() |
| 325 | + assert len(community_schema) == 2 |
| 326 | + assert community_schema["0"]["occurrence"] == 1 |
| 327 | + assert community_schema["1"]["occurrence"] == 0.5 |
| 328 | + |
| 329 | + |
| 330 | +@pytest.mark.asyncio |
| 331 | +async def test_community_schema_sub_communities(networkx_storage): |
| 332 | + await networkx_storage.upsert_node("node1", {"source_id": "chunk1", "clusters": json.dumps([{"level": 0, "cluster": "0"}, {"level": 1, "cluster": "1"}])}) |
| 333 | + await networkx_storage.upsert_node("node2", {"source_id": "chunk2", "clusters": json.dumps([{"level": 0, "cluster": "0"}, {"level": 1, "cluster": "2"}])}) |
| 334 | + await networkx_storage.upsert_node("node3", {"source_id": "chunk3", "clusters": json.dumps([{"level": 0, "cluster": "3"}, {"level": 1, "cluster": "4"}])}) |
| 335 | + |
| 336 | + community_schema = await networkx_storage.community_schema() |
| 337 | + assert len(community_schema) == 5 |
| 338 | + assert set(community_schema["0"]["sub_communities"]) == {"1", "2"} |
| 339 | + assert community_schema["3"]["sub_communities"] == ["4"] |
| 340 | + assert community_schema["1"]["sub_communities"] == [] |
| 341 | + assert community_schema["2"]["sub_communities"] == [] |
| 342 | + assert community_schema["4"]["sub_communities"] == [] |
| 343 | + |
| 344 | + |
| 345 | +@pytest.mark.asyncio |
| 346 | +async def test_concurrent_operations(networkx_storage): |
| 347 | + async def add_nodes(start, end): |
| 348 | + for i in range(start, end): |
| 349 | + await networkx_storage.upsert_node(f"node{i}", {"value": i}) |
| 350 | + |
| 351 | + await asyncio.gather( |
| 352 | + add_nodes(0, 500), |
| 353 | + add_nodes(500, 1000) |
| 354 | + ) |
| 355 | + |
| 356 | + assert await networkx_storage.node_degree("node0") == 0 |
| 357 | + assert len(networkx_storage._graph.nodes) == 1000 |
| 358 | + |
| 359 | + |
| 360 | +@pytest.mark.asyncio |
| 361 | +async def test_nonexistent_node_and_edge(networkx_storage): |
| 362 | + assert await networkx_storage.has_node("nonexistent") is False |
| 363 | + assert await networkx_storage.has_edge("node1", "node2") is False |
| 364 | + assert await networkx_storage.get_node("nonexistent") is None |
| 365 | + assert await networkx_storage.get_edge("node1", "node2") is None |
| 366 | + assert await networkx_storage.get_node_edges("nonexistent") is None |
| 367 | + assert await networkx_storage.node_degree("nonexistent") == 0 |
| 368 | + assert await networkx_storage.edge_degree("node1", "node2") == 0 |
| 369 | + |
| 370 | + |
| 371 | +@pytest.mark.asyncio |
| 372 | +async def test_error_handling(networkx_storage): |
| 373 | + with pytest.raises(ValueError, match="Clustering algorithm invalid_algo not supported"): |
| 374 | + await networkx_storage.clustering("invalid_algo") |
| 375 | + |
| 376 | + with pytest.raises(ValueError, match="Node embedding algorithm invalid_algo not supported"): |
| 377 | + await networkx_storage.embed_nodes("invalid_algo") |
0 commit comments