@@ -12,24 +12,40 @@ class CosineSimilarityBuilder(RelationshipBuilder):
1212 property_name : str = "embedding"
1313 new_property_name : str = "cosine_similarity"
1414 threshold : float = 0.9
15+ block_size : int = 1024
16+
17+ def _block_cosine_similarity (self , i : np .ndarray , j : np .ndarray ):
18+ """Calculate cosine similarity matrix between two sets of embeddings."""
19+ i_norm = i / np .linalg .norm (i , axis = 1 , keepdims = True )
20+ j_norm = j / np .linalg .norm (j , axis = 1 , keepdims = True )
21+ return np .dot (i_norm , j_norm .T )
1522
1623 def _find_similar_embedding_pairs (
1724 self , embeddings : np .ndarray , threshold : float
1825 ) -> t .List [t .Tuple [int , int , float ]]:
19- # Normalize the embeddings
20- normalized = embeddings / np .linalg .norm (embeddings , axis = 1 )[:, np .newaxis ]
26+ """Sharded computation of cosine similarity to find similar pairs."""
2127
22- # Calculate cosine similarity matrix
23- similarity_matrix = np .dot (normalized , normalized .T )
24- # Find pairs with similarity >= threshold
25- similar_pairs = np .argwhere (similarity_matrix >= threshold )
28+ def process_block (i : int , j : int ) -> t .Set [t .Tuple [int , int , float ]]:
29+ end_i = min (i + self .block_size , n_embeddings )
30+ end_j = min (j + self .block_size , n_embeddings )
31+ block = self ._block_cosine_similarity (
32+ embeddings [i :end_i , :], embeddings [j :end_j , :]
33+ )
34+ similar_idx = np .argwhere (block >= threshold )
35+ return {
36+ (int (i + ii ), int (j + jj ), float (block [ii , jj ]))
37+ for ii , jj in similar_idx
38+ if int (i + ii ) < int (j + jj )
39+ }
2640
27- # Filter out self-comparisons and duplicate pairs
28- return [
29- (pair [0 ], pair [1 ], similarity_matrix [pair [0 ], pair [1 ]])
30- for pair in similar_pairs
31- if pair [0 ] < pair [1 ]
32- ]
41+ n_embeddings , _dimension = embeddings .shape
42+ triplets = set ()
43+
44+ for i in range (0 , n_embeddings , self .block_size ):
45+ for j in range (i , n_embeddings , self .block_size ):
46+ triplets .update (process_block (i , j ))
47+
48+ return list (triplets )
3349
3450 def _validate_embedding_shapes (self , embeddings : t .List [t .Any ]):
3551 if not embeddings :
@@ -43,40 +59,66 @@ def _validate_embedding_shapes(self, embeddings: t.List[t.Any]):
4359 )
4460
4561 async def transform (self , kg : KnowledgeGraph ) -> t .List [Relationship ]:
46- if self .property_name is None :
47- self .property_name = "embedding"
48-
4962 embeddings = []
5063 for node in kg .nodes :
5164 embedding = node .get_property (self .property_name )
5265 if embedding is None :
5366 raise ValueError (f"Node { node .id } has no { self .property_name } " )
5467 embeddings .append (embedding )
55-
5668 self ._validate_embedding_shapes (embeddings )
5769 similar_pairs = self ._find_similar_embedding_pairs (
5870 np .array (embeddings ), self .threshold
5971 )
60-
6172 return [
6273 Relationship (
6374 source = kg .nodes [i ],
6475 target = kg .nodes [j ],
65- type = "cosine_similarity" ,
76+ type = self . new_property_name ,
6677 properties = {self .new_property_name : similarity_float },
6778 bidirectional = True ,
6879 )
6980 for i , j , similarity_float in similar_pairs
7081 ]
7182
83+ def generate_execution_plan (self , kg : KnowledgeGraph ) -> t .List [t .Coroutine ]:
84+ """
85+ Generates a coroutine task for finding similar embedding pairs, which can be scheduled/executed by an Executor.
86+ """
87+ filtered_kg = self .filter (kg )
88+
89+ embeddings = []
90+ for node in filtered_kg .nodes :
91+ embedding = node .get_property (self .property_name )
92+ if embedding is None :
93+ raise ValueError (f"Node { node .id } has no { self .property_name } " )
94+ embeddings .append (embedding )
95+ self ._validate_embedding_shapes (embeddings )
96+
97+ async def find_and_add_relationships ():
98+ similar_pairs = self ._find_similar_embedding_pairs (
99+ np .array (embeddings ), self .threshold
100+ )
101+ for i , j , similarity_float in similar_pairs :
102+ rel = Relationship (
103+ source = filtered_kg .nodes [i ],
104+ target = filtered_kg .nodes [j ],
105+ type = self .new_property_name ,
106+ properties = {self .new_property_name : similarity_float },
107+ bidirectional = True ,
108+ )
109+ kg .relationships .append (rel )
110+
111+ return [find_and_add_relationships ()]
112+
72113
73114@dataclass
74115class SummaryCosineSimilarityBuilder (CosineSimilarityBuilder ):
75116 property_name : str = "summary_embedding"
76117 new_property_name : str = "summary_cosine_similarity"
77118 threshold : float = 0.1
119+ block_size : int = 1024
78120
79- def filter (self , kg : KnowledgeGraph ) -> KnowledgeGraph :
121+ def _document_summary_filter (self , kg : KnowledgeGraph ) -> KnowledgeGraph :
80122 """
81123 Filters the knowledge graph to only include nodes with a summary embedding.
82124 """
@@ -90,22 +132,22 @@ def filter(self, kg: KnowledgeGraph) -> KnowledgeGraph:
90132 return KnowledgeGraph (nodes = nodes )
91133
92134 async def transform (self , kg : KnowledgeGraph ) -> t .List [Relationship ]:
135+ filtered_kg = self ._document_summary_filter (kg )
93136 embeddings = [
94137 node .get_property (self .property_name )
95- for node in kg .nodes
138+ for node in filtered_kg .nodes
96139 if node .get_property (self .property_name ) is not None
97140 ]
98141 if not embeddings :
99142 raise ValueError (f"No nodes have a valid { self .property_name } " )
100- self ._validate_embedding_shapes (embeddings )
101143 similar_pairs = self ._find_similar_embedding_pairs (
102144 np .array (embeddings ), self .threshold
103145 )
104146 return [
105147 Relationship (
106- source = kg .nodes [i ],
107- target = kg .nodes [j ],
108- type = "summary_cosine_similarity" ,
148+ source = filtered_kg .nodes [i ],
149+ target = filtered_kg .nodes [j ],
150+ type = self . new_property_name ,
109151 properties = {self .new_property_name : similarity_float },
110152 bidirectional = True ,
111153 )
0 commit comments