@@ -19,10 +19,7 @@ def __init__(
1919 self ,
2020 mongo_adapter : Optional [MongoDBAdapter ] = None ,
2121 zep_api_key : Optional [str ] = None ,
22- capture_modes : Optional [Dict [str , str ]] = None ,
2322 ):
24- self .capture_modes : Dict [str , str ] = capture_modes or {}
25-
2623 # Mongo setup
2724 if not mongo_adapter :
2825 self .mongo = None
@@ -46,18 +43,15 @@ def __init__(
4643 self .mongo .create_index (self .captures_collection , [("capture_name" , 1 )])
4744 self .mongo .create_index (self .captures_collection , [("agent_name" , 1 )])
4845 self .mongo .create_index (self .captures_collection , [("timestamp" , 1 )])
49- # Unique only when mode == 'once'
46+ # Unique per user/agent/capture combo
5047 try :
5148 self .mongo .create_index (
5249 self .captures_collection ,
5350 [("user_id" , 1 ), ("agent_name" , 1 ), ("capture_name" , 1 )],
5451 unique = True ,
55- partialFilterExpression = {"mode" : "once" },
5652 )
5753 except Exception as e :
58- logger .error (
59- f"Error creating partial unique index for captures: { e } "
60- )
54+ logger .error (f"Error creating unique index for captures: { e } " )
6155 except Exception as e :
6256 logger .error (f"Error initializing MongoDB captures collection: { e } " )
6357 self .captures_collection = "captures"
@@ -223,54 +217,39 @@ async def save_capture(
223217 raise ValueError ("data must be a dictionary" )
224218
225219 try :
226- mode = self .capture_modes .get (agent_name , "once" ) if agent_name else "once"
227220 now = datetime .now (timezone .utc )
228- if mode == "multiple" :
229- doc = {
221+ key = {
222+ "user_id" : user_id ,
223+ "agent_name" : agent_name ,
224+ "capture_name" : capture_name ,
225+ }
226+ existing = self .mongo .find_one (self .captures_collection , key )
227+ merged_data : Dict [str , Any ] = {}
228+ if existing and isinstance (existing .get ("data" ), dict ):
229+ merged_data .update (existing .get ("data" , {}))
230+ merged_data .update (data or {})
231+ update_doc = {
232+ "$set" : {
230233 "user_id" : user_id ,
231234 "agent_name" : agent_name ,
232235 "capture_name" : capture_name ,
233- "data" : data or {},
234- "schema" : schema or {},
235- "mode" : "multiple" ,
236+ "data" : merged_data ,
237+ "schema" : (
238+ schema
239+ if schema is not None
240+ else existing .get ("schema" )
241+ if existing
242+ else {}
243+ ),
236244 "timestamp" : now ,
237- "created_at" : now ,
238- }
239- return self .mongo .insert_one (self .captures_collection , doc )
240- else :
241- key = {
242- "user_id" : user_id ,
243- "agent_name" : agent_name ,
244- "capture_name" : capture_name ,
245- }
246- existing = self .mongo .find_one (self .captures_collection , key )
247- merged_data : Dict [str , Any ] = {}
248- if existing and isinstance (existing .get ("data" ), dict ):
249- merged_data .update (existing .get ("data" , {}))
250- merged_data .update (data or {})
251- update_doc = {
252- "$set" : {
253- "user_id" : user_id ,
254- "agent_name" : agent_name ,
255- "capture_name" : capture_name ,
256- "data" : merged_data ,
257- "schema" : (
258- schema
259- if schema is not None
260- else existing .get ("schema" )
261- if existing
262- else {}
263- ),
264- "mode" : "once" ,
265- "timestamp" : now ,
266- },
267- "$setOnInsert" : {"created_at" : now },
268- }
269- self .mongo .update_one (
270- self .captures_collection , key , update_doc , upsert = True
271- )
272- doc = self .mongo .find_one (self .captures_collection , key )
273- return str (doc .get ("_id" )) if doc and doc .get ("_id" ) else None
245+ },
246+ "$setOnInsert" : {"created_at" : now },
247+ }
248+ self .mongo .update_one (
249+ self .captures_collection , key , update_doc , upsert = True
250+ )
251+ doc = self .mongo .find_one (self .captures_collection , key )
252+ return str (doc .get ("_id" )) if doc and doc .get ("_id" ) else None
274253 except Exception as e : # pragma: no cover
275254 logger .error (f"MongoDB save_capture error: { e } " )
276255 return None
0 commit comments