Skip to content

Commit e94302d

Browse files
authored
Avoid leaking connections if _can_use_connection fails (#1269)
If _can_use_connection fails (say, because of an asyncio timeout), then we may have a full connection that has been created but would be leaked by this function. (note: `_connect_addr` takes care of cleaning up after itself if it fails partway) This is particularly possible in the case of pgbouncer, where we may succeed at establishing a connection much quicker than even a trivial call to the backing database would take. I believe this failure mode was introduced in #987 I'm not certain if we should `await` here or just punt the closing of the other connections to a background task (I don't know how risky `close` is on an established connection).
1 parent 1d63bb1 commit e94302d

File tree

1 file changed

+29
-23
lines changed

1 file changed

+29
-23
lines changed

asyncpg/connect_utils.py

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1212,30 +1212,36 @@ async def _connect(*, loop, connection_class, record_class, **kwargs):
12121212
candidates = []
12131213
chosen_connection = None
12141214
last_error = None
1215-
for addr in addrs:
1216-
try:
1217-
conn = await _connect_addr(
1218-
addr=addr,
1219-
loop=loop,
1220-
params=params,
1221-
config=config,
1222-
connection_class=connection_class,
1223-
record_class=record_class,
1224-
)
1225-
candidates.append(conn)
1226-
if await _can_use_connection(conn, target_attr):
1227-
chosen_connection = conn
1228-
break
1229-
except OSError as ex:
1230-
last_error = ex
1231-
else:
1232-
if target_attr == SessionAttribute.prefer_standby and candidates:
1233-
chosen_connection = random.choice(candidates)
1215+
try:
1216+
for addr in addrs:
1217+
try:
1218+
conn = await _connect_addr(
1219+
addr=addr,
1220+
loop=loop,
1221+
params=params,
1222+
config=config,
1223+
connection_class=connection_class,
1224+
record_class=record_class,
1225+
)
1226+
candidates.append(conn)
1227+
if await _can_use_connection(conn, target_attr):
1228+
chosen_connection = conn
1229+
break
1230+
except OSError as ex:
1231+
last_error = ex
1232+
else:
1233+
if target_attr == SessionAttribute.prefer_standby and candidates:
1234+
chosen_connection = random.choice(candidates)
1235+
finally:
12341236

1235-
await asyncio.gather(
1236-
*(c.close() for c in candidates if c is not chosen_connection),
1237-
return_exceptions=True
1238-
)
1237+
async def _close_candidates(conns, chosen):
1238+
await asyncio.gather(
1239+
*(c.close() for c in conns if c is not chosen),
1240+
return_exceptions=True
1241+
)
1242+
if candidates:
1243+
asyncio.create_task(
1244+
_close_candidates(candidates, chosen_connection))
12391245

12401246
if chosen_connection:
12411247
return chosen_connection

0 commit comments

Comments
 (0)