1+ import time
2+ from pymilvus import MilvusClient , DataType , IndexType
3+ import datetime
4+ import pytz
5+ import random
6+ import sys
7+
8+ # --- Configuration ---
9+ MILVUS_HOST = "http://localhost:19530"
10+
11+ # --- Test Scenario 1: Add Field Default Value ---
12+ COLLECTION_NAME_DEFAULT = "default_value_test_col"
13+ TIMESTAMP_FIELD_DEFAULT = "event_time_default"
14+ # Default value for TIMESTAMPTZ field (must be ISO 8601, UTC)
15+ DEFAULT_TS_VALUE_STR = "2025-01-01T00:00:00Z"
16+ EXPLICIT_TS_VALUE_STR = "2026-06-06T12:34:56Z"
17+ EXPECTED_DEFAULT_MATCH = "2025-01-01T00:00:00"
18+
19+ # --- Test Scenario 2: Naive Time Insertion with Collection Timezone ---
20+ COLLECTION_NAME_TZ_INSERT = "timestamptz_tz_test_col"
21+ TIMESTAMP_FIELD_TZ_INSERT = "event_time_tz"
22+ COLLECTION_TZ_STR = "America/New_York"
23+ RAW_TIME_STR = "2024-01-04 14:26:27" # Naive time
24+
25+ # Pre-calculate expected UTC based on COLLECTION_TZ_STR
26+ COLLECTION_TZ = pytz .timezone (COLLECTION_TZ_STR )
27+ correct_dt_utc = COLLECTION_TZ .localize (
28+ datetime .datetime .strptime (RAW_TIME_STR , "%Y-%m-%d %H:%M:%S" )
29+ ).astimezone (pytz .utc )
30+ EXPECTED_CORRECT_UTC = correct_dt_utc .strftime ("%Y-%m-%dT%H:%M:%SZ" )
31+ OBSERVED_ERROR_UTC_PART = "2024-01-04T14:26:27" # Naive time mistakenly stored as UTC
32+
33+
34+ # --- Core Setup Function ---
35+ def setup_base_collection (client : MilvusClient , name : str , tz_property : str = None ):
36+ """Creates a generic base collection."""
37+ schema = client .create_schema ()
38+ schema .add_field ("id" , DataType .INT64 , is_primary = True )
39+ schema .add_field ("vec" , DataType .FLOAT_VECTOR , dim = 4 )
40+
41+ if client .has_collection (name ):
42+ client .drop_collection (name )
43+
44+ properties = {"timezone" : tz_property } if tz_property else {}
45+
46+ client .create_collection (name , schema = schema , consistency_level = "Session" , properties = properties )
47+
48+ index_params = client .prepare_index_params (
49+ collection_name = name ,
50+ field_name = "vec" ,
51+ index_type = IndexType .HNSW ,
52+ metric_type = "COSINE" ,
53+ params = {"M" : 30 , "efConstruction" : 200 },
54+ )
55+ client .create_index (name , index_params )
56+
57+ return schema
58+
59+
60+ # ==============================================================================
61+ # 🚀 TEST SCENARIO 1: add_collection_field Default Value Retroactive Fill
62+ # ==============================================================================
63+ def run_default_value_retrofill_test (client : MilvusClient ):
64+ """Validates the default_value behavior of add_collection_field for TIMESTAMPTZ."""
65+
66+ print ("\n \n " + "=" * 80 )
67+ print (f"🚀 TEST 1: { COLLECTION_NAME_DEFAULT } - Add Field Default Value (Retroactive Fill)" )
68+ print ("=" * 80 )
69+
70+ # 1. Setup Base Collection (No timestamp field yet)
71+ setup_base_collection (client , COLLECTION_NAME_DEFAULT )
72+ client .load_collection (COLLECTION_NAME_DEFAULT )
73+ print (f"✅ Base collection '{ COLLECTION_NAME_DEFAULT } ' created and loaded." )
74+
75+ # 2. Inserting historical data (missing the event_time field)
76+ history_data = [
77+ {"id" : 101 , "vec" : [random .random () for _ in range (4 )]},
78+ {"id" : 102 , "vec" : [random .random () for _ in range (4 )]},
79+ ]
80+
81+ client .insert (COLLECTION_NAME_DEFAULT , history_data )
82+ client .flush (COLLECTION_NAME_DEFAULT )
83+ print (f"✅ Stage 1: Inserted historical data (ID 101, 102) before field addition." )
84+
85+ # 3. Adding TIMESTAMPTZ field with a default value
86+ try :
87+ client .add_collection_field (
88+ collection_name = COLLECTION_NAME_DEFAULT ,
89+ field_name = TIMESTAMP_FIELD_DEFAULT ,
90+ data_type = DataType .TIMESTAMPTZ ,
91+ nullable = True ,
92+ default_value = DEFAULT_TS_VALUE_STR
93+ )
94+ print (
95+ f"✅ Stage 2: Field '{ TIMESTAMP_FIELD_DEFAULT } ' added successfully with default value: { DEFAULT_TS_VALUE_STR } " )
96+ except Exception as e :
97+ print (f"❌ Stage 2: Failed to add field: { e } " )
98+ return
99+
100+ # 4. Inserting new data (Testing default vs explicit)
101+ new_data = [
102+ {"id" : 201 , "vec" : [random .random () for _ in range (4 )]}, # Missing field -> Expected Default
103+ {"id" : 202 , "vec" : [random .random () for _ in range (4 )], "event_time_default" : EXPLICIT_TS_VALUE_STR }
104+ # Explicit -> Expected Explicit
105+ ]
106+
107+ client .insert (COLLECTION_NAME_DEFAULT , new_data )
108+ client .flush (COLLECTION_NAME_DEFAULT )
109+ print ("✅ Stage 3: Inserted new data (ID 201, 202) after field addition." )
110+
111+ # 5. Verification
112+ client .load_collection (COLLECTION_NAME_DEFAULT )
113+ query_results = client .query (
114+ collection_name = COLLECTION_NAME_DEFAULT ,
115+ filter = "id in [101, 102, 201, 202]" ,
116+ output_fields = ["id" , TIMESTAMP_FIELD_DEFAULT ],
117+ timezone = "UTC"
118+ )
119+
120+ results_map = {r .get ('id' ): r .get (TIMESTAMP_FIELD_DEFAULT ) for r in query_results }
121+ print ("\n --- Verification Results ---" )
122+
123+ # ID 101 (Historical Missing) - **Key Test**
124+ actual_ts_101 = results_map .get (101 )
125+ if actual_ts_101 and EXPECTED_DEFAULT_MATCH in str (actual_ts_101 ):
126+ print (f"✅ Result 1 (ID 101, Historical): **Successfully retroactively filled** with default: { actual_ts_101 } " )
127+ else :
128+ print (f"❌ Result 1 (ID 101, Historical): **FAILED**, not retroactively filled: { actual_ts_101 } " )
129+
130+ # ID 201 (New Missing)
131+ actual_ts_201 = results_map .get (201 )
132+ if actual_ts_201 and EXPECTED_DEFAULT_MATCH in str (actual_ts_201 ):
133+ print (f"✅ Result 2 (ID 201, New Missing): Successfully filled with default: { actual_ts_201 } " )
134+ else :
135+ print (f"❌ Result 2 (ID 201, New Missing): Default value did not take effect: { actual_ts_201 } " )
136+
137+ # ID 202 (New Explicit)
138+ actual_ts_202 = results_map .get (202 )
139+ if actual_ts_202 and "2026-06-06T12:34:56" in str (actual_ts_202 ):
140+ print (f"✅ Result 3 (ID 202, New Explicit): Successfully overrode default value: { actual_ts_202 } " )
141+ else :
142+ print (f"❌ Result 3 (ID 202, New Explicit): Explicit value failed: { actual_ts_202 } " )
143+
144+
145+ # ==============================================================================
146+ # 🚀 TEST SCENARIO 2: Naive Time Insertion with Collection Timezone
147+ # ==============================================================================
148+ def run_naive_insertion_tz_test (client : MilvusClient ):
149+ """Verifies that naive time strings are correctly interpreted using the Collection's timezone setting."""
150+
151+ print ("\n \n " + "=" * 80 )
152+ print (f"🚀 TEST 2: { COLLECTION_NAME_TZ_INSERT } - Naive Time Insertion (TZ Conversion)" )
153+ print ("=" * 80 )
154+ print (f"Collection Timezone: { COLLECTION_TZ_STR } " )
155+ print (f"Raw Input (Naive): { RAW_TIME_STR } " )
156+ print (f"Expected UTC Storage: { EXPECTED_CORRECT_UTC } " )
157+
158+ # 1. Setup Collection with TIMESTAMPTZ field and Collection Timezone
159+ schema = client .create_schema ()
160+ schema .add_field ("id" , DataType .INT64 , is_primary = True )
161+ schema .add_field ("vec" , DataType .FLOAT_VECTOR , dim = 4 )
162+ schema .add_field (TIMESTAMP_FIELD_TZ_INSERT , DataType .TIMESTAMPTZ )
163+
164+ # Drop and recreate the collection to ensure TIMESTAMPTZ field is present from start
165+ if client .has_collection (COLLECTION_NAME_TZ_INSERT ):
166+ client .drop_collection (COLLECTION_NAME_TZ_INSERT )
167+
168+ client .create_collection (
169+ COLLECTION_NAME_TZ_INSERT ,
170+ schema = schema ,
171+ consistency_level = "Session" ,
172+ properties = {"timezone" : COLLECTION_TZ_STR }
173+ )
174+
175+ # Create index for vector field
176+ index_params = client .prepare_index_params (
177+ collection_name = COLLECTION_NAME_TZ_INSERT , field_name = "vec" , index_type = IndexType .HNSW ,
178+ metric_type = "COSINE" , params = {"M" : 30 , "efConstruction" : 200 },
179+ )
180+ client .create_index (COLLECTION_NAME_TZ_INSERT , index_params )
181+ client .load_collection (COLLECTION_NAME_TZ_INSERT )
182+ print ("✅ Setup complete for Naive Time Insertion Test." )
183+
184+ # 2. Insert raw data (naive time string)
185+ insert_data = [
186+ {"id" : 1 , "vec" : [random .random () for _ in range (4 )], TIMESTAMP_FIELD_TZ_INSERT : RAW_TIME_STR },
187+ ]
188+
189+ client .insert (COLLECTION_NAME_TZ_INSERT , insert_data )
190+ client .flush (COLLECTION_NAME_TZ_INSERT )
191+ print ("✅ Stage 1: Insertion of naive time string successful." )
192+
193+ # 3. Query data and verify UTC storage
194+ client .load_collection (COLLECTION_NAME_TZ_INSERT )
195+ query_results = client .query (
196+ collection_name = COLLECTION_NAME_TZ_INSERT ,
197+ filter = "id == 1" ,
198+ output_fields = ["id" , TIMESTAMP_FIELD_TZ_INSERT ],
199+ timezone = "UTC" # Retrieve internal UTC storage time
200+ )
201+
202+ if not query_results :
203+ print ("❌ Query result is empty." )
204+ return
205+
206+ actual_ts_str = query_results [0 ].get (TIMESTAMP_FIELD_TZ_INSERT )
207+
208+ print ("\n --- Verification Results ---" )
209+ print (f"Actual Query Result (UTC): { actual_ts_str } " )
210+
211+ # Verification Logic: Check if the actual result matches the expected 19:26:27Z
212+ if actual_ts_str and EXPECTED_CORRECT_UTC in actual_ts_str :
213+ print (
214+ f"✅ Verification SUCCESS: Milvus correctly converted '{ RAW_TIME_STR } ' to UTC based on '{ COLLECTION_TZ_STR } '." )
215+ print (f" (Stored UTC: { EXPECTED_CORRECT_UTC } )" )
216+ elif actual_ts_str and OBSERVED_ERROR_UTC_PART in actual_ts_str :
217+ print (f"❌ Verification FAILED (Issue Reproduced): Milvus mistakenly treated '{ RAW_TIME_STR } ' as UTC time." )
218+ print (f" (Mistaken UTC: { actual_ts_str } )" )
219+ else :
220+ print (f"⚠️ Verification FAILED: Actual result '{ actual_ts_str } ' does not match any expectation." )
221+
222+
223+ # ==============================================================================
224+ # 🏃♂️ MAIN EXECUTION (Updated Function Name)
225+ # ==============================================================================
226+ def main_timestamptz_tests ():
227+ try :
228+ client = MilvusClient (uri = MILVUS_HOST )
229+ except Exception as e :
230+ print (f"Could not connect to Milvus service { MILVUS_HOST } . Please ensure the service is running. Error: { e } " )
231+ sys .exit (1 )
232+
233+ # Run all test scenarios
234+ run_default_value_retrofill_test (client )
235+ run_naive_insertion_tz_test (client )
236+
237+ # --- Cleanup for all collections ---
238+ print ("\n \n " + "=" * 80 )
239+ print ("🧹 Cleaning up collections..." )
240+ print ("=" * 80 )
241+
242+ collections_to_clean = [COLLECTION_NAME_DEFAULT , COLLECTION_NAME_TZ_INSERT ]
243+
244+ for name in collections_to_clean :
245+ try :
246+ if client .has_collection (name ):
247+ client .release_collection (name )
248+ client .drop_collection (name )
249+ print (f"✅ Cleanup: Collection '{ name } ' dropped." )
250+ except Exception as e :
251+ print (f"❌ Failed to clean up collection '{ name } ': { e } " )
252+
253+
254+ if __name__ == "__main__" :
255+ main_timestamptz_tests ()
0 commit comments