Skip to content

Commit e40aa7f

Browse files
czs007XTxxxx
andauthored
feat: timestamptz support (#3002) (#3092)
issue:milvus-io/milvus#27467 --------- Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com> Co-authored-by: xtx <xtianx@smail.nju.edu.cn>
1 parent e93d548 commit e40aa7f

File tree

8 files changed

+166
-1
lines changed

8 files changed

+166
-1
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ venv/
1919

2020
# Env
2121
.env
22+
.venv/
2223

2324
# Local Temp
2425
temp/

examples/timestamptz.py

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
import time
2+
from pymilvus import MilvusClient, DataType, CollectionSchema, IndexType, FieldSchema, milvus_client
3+
import datetime
4+
import pytz
5+
6+
milvus_host = "http://localhost:19530"
7+
collection_name = "timestamptz_test123"
8+
9+
10+
def main():
11+
client = MilvusClient(uri=milvus_host)
12+
13+
# create collection with TIMESTAMPTZ field
14+
if client.has_collection(collection_name):
15+
client.drop_collection(collection_name)
16+
17+
schema = client.create_schema()
18+
schema.add_field("id", DataType.INT64, is_primary=True)
19+
schema.add_field("tsz", DataType.TIMESTAMPTZ)
20+
schema.add_field("vec", DataType.FLOAT_VECTOR, dim=4)
21+
print("===================alter database timezone===================")
22+
try:
23+
client.alter_database_properties("default", {"database.timezone": "Asia/Shanghai"})
24+
except Exception as e:
25+
print(e)
26+
print(client.describe_database("default"))
27+
client.create_collection(collection_name, schema=schema, consistency_level="Session")
28+
index_params = client.prepare_index_params(
29+
collection_name=collection_name,
30+
field_name="vec",
31+
index_type=IndexType.HNSW,
32+
metric_type="COSINE",
33+
params={"M": 30, "efConstruction": 200},
34+
)
35+
# add timestamptz index of STL_SORT type
36+
index_params.add_index(field_name="tsz", index_name="tsz_index", index_type="STL_SORT")
37+
38+
client.create_index(collection_name, index_params)
39+
print(client.describe_collection(collection_name))
40+
client.load_collection(collection_name)
41+
print(f"load state: {client.get_load_state(collection_name)}")
42+
43+
# insert data with timezone-aware timestamps
44+
print("===================insert timestamptz===================")
45+
data_size = 8193
46+
shanghai_tz = pytz.timezone("Asia/Shanghai")
47+
data = [
48+
{
49+
"id": i + 1,
50+
"tsz": shanghai_tz.localize(
51+
datetime.datetime(2025, 1, 1, 0, 0, 0) + datetime.timedelta(days=i)
52+
).isoformat(),
53+
"vec": [float(i) / 10 for i in range(4)],
54+
}
55+
for i in range(data_size)
56+
]
57+
client.insert(collection_name, data)
58+
client.flush(collection_name)
59+
time.sleep(1) # wait for index creation
60+
print(client.describe_index(collection_name, "tsz_index"))
61+
print("===================insert invalid string===================")
62+
data = [{"id": 114514, "tsz": "should cause an error", "vec": [1.1, 1.2, 1.3]}]
63+
try:
64+
client.insert(collection_name, data)
65+
except Exception as e:
66+
print(e)
67+
68+
# query/search data with TIMESTAMPTZ, define timezone in kwargs
69+
print("====================test query====================")
70+
results = client.query(
71+
collection_name,
72+
filter="id <= 10",
73+
output_fields=["id", "tsz", "vec"],
74+
limit=2,
75+
timezone="America/Havana",
76+
time_fields="year, month, day, hour, minute, second, microsecond",
77+
)
78+
print("\n".join([str(res) for res in results]))
79+
80+
print("====================test search====================")
81+
results = client.search(
82+
collection_name,
83+
[[0.5, 0.6, 0.7, 0.8]],
84+
output_fields=["id", "tsz", "vec"],
85+
limit=10,
86+
timezone="America/Chicago",
87+
time_fields="year, month, day, hour, minute, second, microsecond",
88+
)
89+
print("\n".join([str(res) for res in results[0]]))
90+
91+
# Alter timezone (IANA timezone)
92+
print(client.describe_collection(collection_name))
93+
print("===================alter collection timezone===================")
94+
try:
95+
client.alter_collection_properties(collection_name, {"collection.timezone": "Asia/Shanghai"})
96+
except Exception as e:
97+
print(e)
98+
print(client.describe_collection(collection_name))
99+
100+
try:
101+
client.alter_collection_properties(collection_name, {"collection.timezone": "error"})
102+
except Exception as e:
103+
print(e)
104+
print(client.describe_collection(collection_name))
105+
106+
try:
107+
client.alter_database_properties("default", {"database.timezone": "error"})
108+
except Exception as e:
109+
print(e)
110+
print(client.describe_database("default"))
111+
112+
# Query with new operator
113+
results = client.query(
114+
collection_name, limit=10, timezone="Asia/Shanghai"
115+
)
116+
print("\n".join([str(res) for res in results]))
117+
118+
expr = "tsz + INTERVAL 'P0D' != ISO '2025-01-03T00:00:00+08:00'"
119+
results = client.query(collection_name, expr, output_fields=["id", "tsz"], limit=10)
120+
print(" The first expr:")
121+
print("\n".join([str(res) for res in results]))
122+
expr = "tsz != ISO '2025-01-03T00:00:00+08:00'"
123+
print(" The second expr")
124+
results = client.query(collection_name, expr, output_fields=["id", "tsz"], limit=10)
125+
print("\n".join([str(res) for res in results]))
126+
127+
if __name__ == "__main__":
128+
main()

pymilvus/client/entity_helper.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,17 @@ def pack_field_value_to_field_data(
373373
% (field_name, "double", type(field_value))
374374
+ f" Detail: {e!s}"
375375
) from e
376+
elif field_type == DataType.TIMESTAMPTZ:
377+
try:
378+
if field_value is None:
379+
field_data.scalars.string_data.data.extend([]) # Timestamptz is passed as String
380+
else:
381+
field_data.scalars.string_data.data.append(field_value)
382+
except (TypeError, ValueError) as e:
383+
raise DataNotMatchException(
384+
message=ExceptionsMessage.FieldDataInconsistent
385+
% (field_name, "string", type(field_value))
386+
) from e
376387
elif field_type == DataType.FLOAT_VECTOR:
377388
try:
378389
f_value = field_value
@@ -787,6 +798,11 @@ def assign_scalar(data: List[Any]) -> None:
787798
assign_scalar(data)
788799
return False
789800

801+
if field_data.type == DataType.TIMESTAMPTZ:
802+
data = field_data.scalars.string_data.data
803+
assign_scalar(data)
804+
return False
805+
790806
if field_data.type == DataType.VARCHAR:
791807
data = field_data.scalars.string_data.data
792808
assign_scalar(data)

pymilvus/client/prepare.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1478,6 +1478,12 @@ def search_requests_with_expr(
14781478
if param.get("analyzer_name") is not None:
14791479
search_params["analyzer_name"] = param["analyzer_name"]
14801480

1481+
if kwargs.get("timezone") is not None:
1482+
search_params["timezone"] = kwargs["timezone"]
1483+
1484+
if kwargs.get("time_fields") is not None:
1485+
search_params["time_fields"] = kwargs["time_fields"]
1486+
14811487
search_params["params"] = get_params(param)
14821488

14831489
req_params = [
@@ -1913,6 +1919,14 @@ def query_request(
19131919
if offset is not None:
19141920
req.query_params.append(common_types.KeyValuePair(key="offset", value=str(offset)))
19151921

1922+
timezone = kwargs.get("timezone")
1923+
if timezone is not None:
1924+
req.query_params.append(common_types.KeyValuePair(key="timezone", value=timezone))
1925+
1926+
timefileds = kwargs.get("time_fields")
1927+
if timefileds is not None:
1928+
req.query_params.append(common_types.KeyValuePair(key="time_fields", value=timefileds))
1929+
19161930
ignore_growing = kwargs.get("ignore_growing", False)
19171931
stop_reduce_for_best = kwargs.get(REDUCE_STOP_FOR_BEST, False)
19181932
is_iterator = kwargs.get(ITERATOR_FIELD)

pymilvus/client/search_result.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def __init__(
5757
DataType.DOUBLE,
5858
DataType.VARCHAR,
5959
DataType.GEOMETRY,
60+
DataType.TIMESTAMPTZ,
6061
]:
6162
if has_valid:
6263
[
@@ -529,7 +530,7 @@ def get_field_data(field_data: FieldData):
529530
return field_data.scalars.float_data.data
530531
if field_data.type == DataType.DOUBLE:
531532
return field_data.scalars.double_data.data
532-
if field_data.type == DataType.VARCHAR:
533+
if field_data.type in (DataType.VARCHAR, DataType.TIMESTAMPTZ):
533534
return field_data.scalars.string_data.data
534535
if field_data.type == DataType.GEOMETRY:
535536
return field_data.scalars.geometry_wkt_data.data

pymilvus/client/types.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ class DataType(IntEnum):
115115
ARRAY = schema_pb2.Array
116116
JSON = schema_pb2.JSON
117117
GEOMETRY = schema_pb2.Geometry
118+
TIMESTAMPTZ = schema_pb2.Timestamptz
118119

119120
BINARY_VECTOR = schema_pb2.BinaryVector
120121
FLOAT_VECTOR = schema_pb2.FloatVector

pymilvus/client/utils.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,9 @@ def len_of(field_data: Any) -> int:
164164
if field_data.scalars.HasField("double_data"):
165165
return len(field_data.scalars.double_data.data)
166166

167+
if field_data.scalars.HasField("timestamptz_data"):
168+
return len(field_data.scalars.timestamptz_data.data)
169+
167170
if field_data.scalars.HasField("string_data"):
168171
return len(field_data.scalars.string_data.data)
169172

tests/test_collection.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ def test_collection_by_DataFrame(self):
1717
FieldSchema("float16_vector", DataType.FLOAT16_VECTOR, dim=128),
1818
FieldSchema("bfloat16_vector", DataType.BFLOAT16_VECTOR, dim=128),
1919
FieldSchema("int8_vector", DataType.INT8_VECTOR, dim=128),
20+
FieldSchema("timestamptz", DataType.TIMESTAMPTZ),
2021
]
2122

2223
prefix = "pymilvus.client.grpc_handler.GrpcHandler"

0 commit comments

Comments
 (0)