1- import cx_Oracle
1+ from cx_Oracle import MessageProperties , DEQ_NO_WAIT
2+ from ThreadPoolExecutorPlus import ThreadPoolExecutor
3+ from asyncio import Lock as aioLock
4+ from collections .abc import Iterable
5+ from typing import Union , TYPE_CHECKING
6+ if TYPE_CHECKING :
7+ from .connections import AsyncConnectionWrapper
8+ from cx_Oracle import Queue
9+ from asyncio .windows_events import ProactorEventLoop
10+
211
312class AsyncQueueWrapper :
413
5- def __init__ (self , queue , loop , thread_pool , ):
14+ def __init__ (self , queue : 'Queue' , loop : 'ProactorEventLoop' , thread_pool : ThreadPoolExecutor , conn : 'AsyncConnectionWrapper' ):
615 self ._queue = queue
716 self ._loop = loop
817 self ._thread_pool = thread_pool
18+ self ._conn = conn
19+ self ._deqlock = aioLock ()
920
1021 async def enqOne (self , * args , ** kwargs ):
1122 return await self ._loop .run_in_executor (self ._thread_pool , self ._queue .enqOne , * args , ** kwargs )
@@ -14,19 +25,54 @@ async def enqMany(self , *args , **kwargs):
1425 return await self ._loop .run_in_executor (self ._thread_pool , self ._queue .enqMany , * args , ** kwargs )
1526
1627 async def deqOne (self , * args , ** kwargs ):
17- return await self ._loop .run_in_executor (self ._thread_pool , self ._queue .deqOne , * args , ** kwargs )
28+ async with self ._deqlock :
29+ return await self ._loop .run_in_executor (self ._thread_pool , self ._queue .deqOne , * args , ** kwargs )
30+
31+ def deqMany (self , maxMessages : int = - 1 ):
32+ return DeqManyWrapper (self ._loop , self ._thread_pool , self ._queue , self ._deqlock , maxMessages )
33+
34+ def _decode (self , _object : MessageProperties ):
35+ return _object .payload .decode (self ._conn .encoding )
36+
37+ def unpack (self , _object : Union [MessageProperties , list ]):
38+ if isinstance (_object , Iterable ):
39+ return list (map (self ._decode , _object ))
40+ else :
41+ return self ._decode (_object )
42+
43+ @property
44+ def pack (self ):
45+ return self ._conn .msgproperties
46+
47+ @property
48+ def enqOptions (self ):
49+ return self ._queue .enqOptions
50+
51+ @property
52+ def deqOptions (self ):
53+ return self ._queue .deqOptions
1854
19- def deqMany (self , maxMessages ):
20- return DeqManyWrapper (self ._loop , self ._thread_pool , self ._queue , maxMessages )
2155
2256class DeqManyWrapper :
2357
24- def __init__ (self , loop , thread_pool , queue , maxMessages ):
58+ def __init__ (self , loop : 'ProactorEventLoop' , thread_pool : ThreadPoolExecutor , queue : 'Queue' , deqlock : aioLock , maxMessages : int ):
2559 self ._loop = loop
2660 self ._thread_pool = thread_pool
2761 self ._queue = queue
2862 self ._count = 0
29- self ._max = maxMessages
63+ self ._max = maxMessages if maxMessages > - 1 else (1 << 16 - 1 )
64+ self ._max = self ._max if self ._max <= (1 << 16 - 1 ) else (1 << 16 - 1 )
65+ self ._deqlock = deqlock
66+
67+ def __await__ (self ):
68+ yield from self ._deqlock .acquire ().__await__ ()
69+ try :
70+ ret = yield from self ._loop .run_in_executor (self ._thread_pool , self ._queue .deqMany , self ._max ).__await__ ()
71+ except Exception as exc :
72+ raise exc
73+ finally :
74+ self ._deqlock .release ()
75+ return ret
3076
3177 def __aiter__ (self ):
3278 return self
@@ -35,9 +81,10 @@ async def __anext__(self):
3581 self ._count += 1
3682 if self ._count <= self ._max :
3783 _tmp = self ._queue .deqOptions .wait
38- self ._queue .deqOptions .wait = cx_Oracle .DEQ_NO_WAIT
39- data = await self ._loop .run_in_executor (self ._thread_pool , self ._queue .deqOne )
40- self ._queue .deqOptions .wait = _tmp
84+ async with self ._deqlock :
85+ self ._queue .deqOptions .wait = DEQ_NO_WAIT
86+ data = await self ._loop .run_in_executor (self ._thread_pool , self ._queue .deqOne )
87+ self ._queue .deqOptions .wait = _tmp
4188 if data :
4289 return data
4390 else :
0 commit comments