Skip to content

Commit 28f713c

Browse files
authored
fix: Fix a bug of reset connection (#3085)
issue: #3066 Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent 3da1040 commit 28f713c

File tree

2 files changed

+64
-52
lines changed

2 files changed

+64
-52
lines changed

pymilvus/client/grpc_handler.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import base64
22
import json
3+
import logging
34
import socket
5+
import threading
46
import time
57
from pathlib import Path
68
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Union
@@ -80,6 +82,58 @@
8082
len_of,
8183
)
8284

85+
logger = logging.getLogger(__name__)
86+
87+
88+
class ReconnectHandler:
89+
def __init__(self, conns: object, connection_name: str, kwargs: object) -> None:
90+
self.connection_name = connection_name
91+
self.conns = conns
92+
self._kwargs = kwargs
93+
self.is_idle_state = False
94+
self.reconnect_lock = threading.Lock()
95+
96+
def reset_db_name(self, db_name: str):
97+
self._kwargs["db_name"] = db_name
98+
99+
def check_state_and_reconnect_later(self):
100+
check_after_seconds = 3
101+
logger.debug(f"state is idle, schedule reconnect in {check_after_seconds} seconds")
102+
time.sleep(check_after_seconds)
103+
if not self.is_idle_state:
104+
logger.debug("idle state changed, skip reconnect")
105+
return
106+
with self.reconnect_lock:
107+
logger.info("reconnect on idle state")
108+
self.is_idle_state = False
109+
try:
110+
logger.debug("try disconnecting old connection...")
111+
self.conns.disconnect(self.connection_name)
112+
except Exception:
113+
logger.warning("disconnect failed: {e}")
114+
finally:
115+
reconnected = False
116+
while not reconnected:
117+
try:
118+
logger.debug("try reconnecting...")
119+
self.conns.connect(self.connection_name, **self._kwargs)
120+
reconnected = True
121+
except Exception as e:
122+
logger.warning(
123+
f"reconnect failed: {e}, try again after {check_after_seconds} seconds"
124+
)
125+
time.sleep(check_after_seconds)
126+
logger.info("reconnected")
127+
128+
def reconnect_on_idle(self, state: object):
129+
logger.debug(f"state change to: {state}")
130+
with self.reconnect_lock:
131+
if state.value[1] != "idle":
132+
self.is_idle_state = False
133+
return
134+
self.is_idle_state = True
135+
threading.Thread(target=self.check_state_and_reconnect_later).start()
136+
83137

84138
class GrpcHandler:
85139
# pylint: disable=too-many-instance-attributes
@@ -104,6 +158,12 @@ def __init__(
104158
self._setup_grpc_channel()
105159
self.callbacks = []
106160
self.schema_cache = {}
161+
self._reconnect_handler = None
162+
163+
def register_reconnect_handler(self, handler: ReconnectHandler):
164+
if handler is not None:
165+
self._reconnect_handler = handler
166+
self.register_state_change_callback(handler.reconnect_on_idle)
107167

108168
def register_state_change_callback(self, callback: Callable):
109169
self.callbacks.append(callback)
@@ -179,6 +239,8 @@ def reset_db_name(self, db_name: str):
179239
self._setup_db_interceptor(db_name)
180240
self._setup_grpc_channel()
181241
self._setup_identifier_interceptor(self._user)
242+
if self._reconnect_handler is not None:
243+
self._reconnect_handler.reset_db_name(db_name)
182244

183245
def _setup_authorization_interceptor(self, user: str, password: str, token: str):
184246
keys = []

pymilvus/orm/connections.py

Lines changed: 2 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,12 @@
1414
import logging
1515
import pathlib
1616
import threading
17-
import time
1817
from typing import Callable, Tuple, Union
1918
from urllib import parse
2019

2120
from pymilvus.client.async_grpc_handler import AsyncGrpcHandler
2221
from pymilvus.client.check import is_legal_address, is_legal_host, is_legal_port
23-
from pymilvus.client.grpc_handler import GrpcHandler
22+
from pymilvus.client.grpc_handler import GrpcHandler, ReconnectHandler
2423
from pymilvus.exceptions import (
2524
ConnectionConfigException,
2625
ConnectionNotExistException,
@@ -64,53 +63,6 @@ def __new__(cls, *args, **kwargs):
6463
return super().__new__(cls, *args, **kwargs)
6564

6665

67-
class ReconnectHandler:
68-
def __init__(self, conns: object, connection_name: str, kwargs: object) -> None:
69-
self.connection_name = connection_name
70-
self.conns = conns
71-
self._kwargs = kwargs
72-
self.is_idle_state = False
73-
self.reconnect_lock = threading.Lock()
74-
75-
def check_state_and_reconnect_later(self):
76-
check_after_seconds = 3
77-
logger.debug(f"state is idle, schedule reconnect in {check_after_seconds} seconds")
78-
time.sleep(check_after_seconds)
79-
if not self.is_idle_state:
80-
logger.debug("idle state changed, skip reconnect")
81-
return
82-
with self.reconnect_lock:
83-
logger.info("reconnect on idle state")
84-
self.is_idle_state = False
85-
try:
86-
logger.debug("try disconnecting old connection...")
87-
self.conns.disconnect(self.connection_name)
88-
except Exception:
89-
logger.warning("disconnect failed: {e}")
90-
finally:
91-
reconnected = False
92-
while not reconnected:
93-
try:
94-
logger.debug("try reconnecting...")
95-
self.conns.connect(self.connection_name, **self._kwargs)
96-
reconnected = True
97-
except Exception as e:
98-
logger.warning(
99-
f"reconnect failed: {e}, try again after {check_after_seconds} seconds"
100-
)
101-
time.sleep(check_after_seconds)
102-
logger.info("reconnected")
103-
104-
def reconnect_on_idle(self, state: object):
105-
logger.debug(f"state change to: {state}")
106-
with self.reconnect_lock:
107-
if state.value[1] != "idle":
108-
self.is_idle_state = False
109-
return
110-
self.is_idle_state = True
111-
threading.Thread(target=self.check_state_and_reconnect_later).start()
112-
113-
11466
class Connections(metaclass=SingleInstanceMetaClass):
11567
"""Class for managing all connections of milvus. Used as a singleton in this module."""
11668

@@ -418,9 +370,7 @@ def connect_milvus(**kwargs):
418370
gh._wait_for_channel_ready(timeout=timeout)
419371

420372
if kwargs.pop("keep_alive", False):
421-
gh.register_state_change_callback(
422-
ReconnectHandler(self, alias, kwargs_copy).reconnect_on_idle
423-
)
373+
gh.register_reconnect_handler(ReconnectHandler(self, alias, kwargs_copy))
424374
except Exception as e:
425375
self.remove_connection(alias)
426376
raise e from e

0 commit comments

Comments
 (0)