Skip to content

Commit ec81cde

Browse files
authored
Merge pull request #10 from GoodManWEN/dev
Performance optimization
2 parents 8f39726 + e3df39f commit ec81cde

File tree

3 files changed

+78
-24
lines changed

3 files changed

+78
-24
lines changed

cx_Oracle_async/AQ.py

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from asyncio import Lock as aioLock
44
from collections.abc import Iterable
55
from typing import Union , TYPE_CHECKING
6+
from collections import deque
67
if TYPE_CHECKING:
78
from .connections import AsyncConnectionWrapper
89
from cx_Oracle import Queue
@@ -60,34 +61,68 @@ def __init__(self , loop : 'ProactorEventLoop' , thread_pool : ThreadPoolExecuto
6061
self._thread_pool = thread_pool
6162
self._queue = queue
6263
self._count = 0
63-
self._max = maxMessages if maxMessages > -1 else (1 << 16 - 1)
64-
self._max = self._max if self._max <= (1 << 16 - 1) else (1 << 16 - 1)
64+
self._max_messages = maxMessages
6565
self._deqlock = deqlock
66+
self._buffer = deque()
67+
self._soft_max = ((1 << 16) - 1)
68+
self._max_limit = maxMessages if maxMessages > -1 else self._soft_max
69+
self._max_limit = self._max_limit if self._max_limit <= self._soft_max else self._soft_max
70+
self._deqcount = 0
71+
self._closed = False
72+
73+
@property
74+
def _fetch_num(self):
75+
return self._soft_max if self._max_messages < 0 else min(self._max_limit , self._max_messages - self._deqcount)
6676

6777
def __await__(self):
78+
if self._closed:
79+
raise RuntimeError('Current query has closed , you cannot activate it twice.')
6880
yield from self._deqlock.acquire().__await__()
6981
try:
70-
ret = yield from self._loop.run_in_executor(self._thread_pool , self._queue.deqMany , self._max).__await__()
82+
ret = yield from self._loop.run_in_executor(self._thread_pool , self._queue.deqMany , self._fetch_num).__await__()
7183
except Exception as exc:
7284
raise exc
7385
finally:
7486
self._deqlock.release()
87+
self._closed = True
7588
return ret
7689

7790
def __aiter__(self):
7891
return self
7992

8093
async def __anext__(self):
81-
self._count += 1
82-
if self._count <= self._max:
83-
_tmp = self._queue.deqOptions.wait
84-
async with self._deqlock:
94+
if self._closed:
95+
raise RuntimeError('Current query has closed , you cannot activate it twice.')
96+
97+
if self._max_messages == 0:
98+
self._closed = True
99+
raise StopAsyncIteration
100+
101+
# Fetch off
102+
if self._max_messages > 0 and self._deqcount >= self._max_messages:
103+
if self._buffer:
104+
return self._buffer.popleft()
105+
self._closed = True
106+
raise StopAsyncIteration
107+
108+
# Fetch on
109+
if self._buffer:
110+
return self._buffer.popleft()
111+
_tmp = self._queue.deqOptions.wait
112+
async with self._deqlock:
113+
try:
85114
self._queue.deqOptions.wait = DEQ_NO_WAIT
86-
data = await self._loop.run_in_executor(self._thread_pool , self._queue.deqOne)
115+
data = await self._loop.run_in_executor(self._thread_pool , self._queue.deqMany , self._fetch_num)
116+
except Exception as exc:
117+
raise exc
118+
finally:
87119
self._queue.deqOptions.wait = _tmp
120+
88121
if data:
89-
return data
90-
else:
91-
raise StopAsyncIteration
92-
else:
122+
self._buffer.extend(data)
123+
self._deqcount += len(data)
124+
return self._buffer.popleft()
125+
126+
# No data return , close iteration.
127+
self._closed = True
93128
raise StopAsyncIteration

docs/temporary_document_of_AQ.md

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,10 @@ async def main():
258258
assert list(map(queue.unpack , ret)) == list(map(str , range(10)))
259259

260260
# The second way , you can call deqMany as a asynchronous generator.
261-
# This is a self implemented method which yield a single queue.deqOne
262-
# each time. The benifits is you will get immediate response.
261+
# This is a self implemented method which yield queue.deqMany() with
262+
# queue.deqOptions = DEQ_NO_WAIT until it reaches the message limit or
263+
# there's nothing in the queue. The benifits is you will get immediate
264+
# response.
263265

264266
await queue.enqMany(queue.pack(m) for m in map(str , range(10)))
265267
await conn.commit()
@@ -283,17 +285,16 @@ async def main():
283285
# Of course you can change deqOptions into non-blocking mode like
284286
# `queue.deqOptions.wait = cx_Oracle_async.DEQ_NO_WAIT` to aviod it.
285287

286-
# On the other hand , If you are using the `async with` mode , it will never
287-
# block your main thread , however it's low efficiency , and it will not be affected by
288-
# `Queue.deqOptions` , no matter what setting `Queue.deqOptions` is , it will return
289-
# immediately when there's nothing in the queue.
288+
# On the other hand , If you are using the `async with` mode , it will
289+
# never block your main thread , however it will not be affected by
290+
# `Queue.deqOptions` , no matter what setting `Queue.deqOptions` is ,
291+
# it will return immediately when there's nothing in the queue.
290292

291-
# So taking into consideration that when argument maxMessages equals to -1 (default value),
292-
# it means unlimit fetch untill the queue is empty (whose "unlimit" do has a soft upper bound
293-
# of maximum queue length of 65535). It's convenient to clear the whole queue with
294-
# the following code:
293+
# So taking into consideration that when argument maxMessages equals to
294+
# -1 (default value), it means unlimit fetch untill the queue is empty.
295+
# It's convenient to clear the whole queue with the following code:
295296

296-
messages = list(map(str , range(random.randint(0,10))))
297+
messages = list(map(str , range(random.randint(0,10000))))
297298
await queue.enqMany(queue.pack(m) for m in messages)
298299
await conn.commit()
299300

tests/test_aq.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,4 +238,22 @@ async def test_block_behavior():
238238
ret = await queue.deqMany(-1)
239239
ed_time = time.time()
240240
assert queue.unpack(ret) == ["Hello World",]
241-
assert (2 - INAQ) <= (ed_time - st_time) <= (2 + INAQ)
241+
assert (2 - INAQ) <= (ed_time - st_time) <= (2 + INAQ)
242+
243+
await queue.enqOne(queue.pack('Hello World'))
244+
await conn.commit()
245+
obj = queue.deqMany(65535)
246+
ret = await obj
247+
try:
248+
ret = await obj
249+
raise TypeError()
250+
except Exception as exc:
251+
assert isinstance(exc , RuntimeError)
252+
253+
obj = queue.deqMany(65535)
254+
async for _ in obj:...
255+
try:
256+
async for _ in obj:...
257+
raise TypeError()
258+
except Exception as exc:
259+
assert isinstance(exc , RuntimeError)

0 commit comments

Comments
 (0)