Skip to content

Commit 2e2b525

Browse files
committed
add connection pool param "idle_seconds", if connection idle time timeout, then it will be closed.
1 parent 37e1da9 commit 2e2b525

File tree

2 files changed

+68
-39
lines changed

2 files changed

+68
-39
lines changed

tormysql/client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ def __init__(self, *args, **kwargs):
1414
self._kwargs = kwargs
1515
self._connection = None
1616
self._closed = False
17+
self._close_callback = None
1718

1819
if "cursorclass" in kwargs and issubclass(kwargs["cursorclass"], Cursor):
1920
kwargs["cursorclass"] = kwargs["cursorclass"].__delegate_class__
@@ -33,6 +34,12 @@ def _(connection_future):
3334

3435
def on_close(self):
3536
self._closed = True
37+
if self._close_callback:
38+
self._close_callback(self)
39+
self._close_callback = None
40+
41+
def set_close_callback(self, callback):
42+
self._close_callback = callback
3643

3744
def close(self):
3845
if self._closed:

tormysql/pool.py

Lines changed: 61 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# 14-8-8
33
# create by: snower
44

5+
import time
56
from collections import deque
67
from tornado.concurrent import TracebackFuture
78
from tornado.ioloop import IOLoop
@@ -10,37 +11,36 @@
1011
class ConnectionPoolClosedError(Exception):pass
1112
class ConnectionNotFoundError(Exception):pass
1213
class ConnectionNotUsedError(Exception):pass
14+
class ConnectionUsedError(Exception):pass
1315

1416
class Connection(Client):
1517
def __init__(self, pool, *args, **kwargs):
1618
self._pool = pool
19+
self.idle_time = time.time()
1720
super(Connection, self).__init__(*args, **kwargs)
1821

19-
def close(self):
22+
def close(self, remote_close = False):
23+
if remote_close:
24+
return self.do_close()
2025
return self._pool.release_connection(self)
2126

22-
def on_close(self):
23-
if self._closed:return
24-
self._closed = True
25-
self._pool.close_connection(self)
26-
2727
def do_close(self):
2828
return super(Connection, self).close()
2929

3030

3131
class ConnectionPool(object):
3232
def __init__(self, *args, **kwargs):
3333
self._max_connections = kwargs.pop("max_connections") if "max_connections" in kwargs else 1
34+
self._idle_seconds = kwargs.pop("idle_seconds") if "idle_seconds" in kwargs else 0
3435
self._args = args
3536
self._kwargs = kwargs
3637
self._connections = deque()
3738
self._used_connections = deque()
3839
self._connections_count = 0
3940
self._wait_connections = deque()
4041
self._closed = False
41-
self._close_results = []
4242
self._close_future = None
43-
self._close_future_count = 0
43+
self._check_idle_callback = False
4444

4545
def init_connection(self, callback):
4646
def _(connection_future):
@@ -50,10 +50,16 @@ def _(connection_future):
5050
else:
5151
callback(False, connection_future._exc_info)
5252
connection = Connection(self, *self._args, **self._kwargs)
53+
connection.set_close_callback(self.connection_close_callback)
54+
self._connections_count += 1
55+
self._used_connections.append(connection)
5356
connection_future = connection.connect()
54-
self._connections_count +=1
5557
IOLoop.current().add_future(connection_future, _)
5658

59+
if self._idle_seconds > 0 and not self._check_idle_callback:
60+
IOLoop.current().add_timeout(time.time() + self._idle_seconds, self.check_idle_connections)
61+
self._check_idle_callback = True
62+
5763
def get_connection(self):
5864
future = TracebackFuture()
5965
if self._closed:
@@ -64,7 +70,6 @@ def get_connection(self):
6470
if self._connections_count < self._max_connections:
6571
def _(succed, result):
6672
if succed:
67-
self._used_connections.append(result)
6873
future.set_result(result)
6974
else:
7075
future.set_exc_info(result)
@@ -80,16 +85,17 @@ def _(succed, result):
8085
Connection = get_connection
8186

8287
def release_connection(self, connection):
83-
if not self._closed and self._wait_connections:
88+
if self._closed:
89+
return connection.do_close()
90+
91+
if self._wait_connections:
8492
future = self._wait_connections.popleft()
8593
IOLoop.current().add_callback(lambda :future.set_result(connection))
8694
else:
8795
try:
8896
self._used_connections.remove(connection)
89-
if self._closed:
90-
self.do_close_connection(connection)
91-
else:
92-
self._connections.append(connection)
97+
self._connections.append(connection)
98+
connection.idle_time = time.time()
9399
except ValueError:
94100
if connection not in self._connections:
95101
connection.do_close()
@@ -98,6 +104,14 @@ def release_connection(self, connection):
98104
raise ConnectionNotUsedError()
99105

100106
def close_connection(self, connection):
107+
try:
108+
self._connections.remove(connection)
109+
self._used_connections.append(connection)
110+
return connection.do_close()
111+
except ValueError:
112+
raise ConnectionUsedError()
113+
114+
def connection_close_callback(self, connection):
101115
try:
102116
self._used_connections.remove(connection)
103117
self._connections_count -= 1
@@ -107,30 +121,38 @@ def close_connection(self, connection):
107121
self._connections_count -= 1
108122
except ValueError:
109123
pass
124+
if self._close_future and not self._used_connections and not self._connections:
125+
def do_close():
126+
self._close_future.set_result(None)
127+
self._close_future = None
128+
IOLoop.current().add_callback(do_close)
110129

111-
def _close_connection_callback(self, future):
112-
if future._exc_info is None:
113-
self._close_results.append(future._result)
114-
if len(self._close_results) == self._close_future_count:
115-
self._close_future.set_result(self._close_results)
116-
else:
117-
self._close_future.set_exc_info(future._exc_info)
130+
def close(self):
131+
if self._closed:
132+
raise ConnectionPoolClosedError()
118133

119-
def do_close_connection(self, connection):
120-
future = connection.do_close()
121-
future.add_done_callback(self._close_connection_callback)
134+
self._closed = True
135+
self._close_future = TracebackFuture()
122136

123-
def close(self):
124-
if not self._closed:
125-
self._closed = True
126-
self._close_future = TracebackFuture()
127-
self._close_future_count = len(self._used_connections) + len(self._connections)
128-
129-
while len(self._wait_connections):
130-
future = self._wait_connections.popleft()
131-
IOLoop.current().add_callback(lambda :future.set_exception(ConnectionPoolClosedError()))
132-
133-
while len(self._connections):
134-
connection = self._connections.popleft()
135-
self.do_close_connection(connection)
136-
return self._close_future
137+
while len(self._wait_connections):
138+
future = self._wait_connections.popleft()
139+
IOLoop.current().add_callback(lambda :future.set_exception(ConnectionPoolClosedError()))
140+
141+
while len(self._connections):
142+
connection = self._connections.popleft()
143+
self._used_connections.append(connection)
144+
connection.do_close()
145+
return self._close_future
146+
147+
def check_idle_connections(self):
148+
next_check_time = time.time() + self._idle_seconds
149+
for connection in tuple(self._connections):
150+
if time.time() - connection.idle_time > self._idle_seconds:
151+
self.close_connection(connection)
152+
elif connection.idle_time + self._idle_seconds < next_check_time:
153+
next_check_time = connection.idle_time + self._idle_seconds
154+
155+
if not self._closed and self._connections or self._used_connections:
156+
IOLoop.current().add_timeout(next_check_time, self.check_idle_connections)
157+
else:
158+
self._check_idle_callback = False

0 commit comments

Comments
 (0)