1- import cx_Oracle
1+ from cx_Oracle import MessageProperties , DEQ_NO_WAIT
2+ from ThreadPoolExecutorPlus import ThreadPoolExecutor
23from asyncio import Lock as aioLock
4+ from collections .abc import Iterable
5+ from typing import Union , TYPE_CHECKING
6+ if TYPE_CHECKING :
7+ from .connections import AsyncConnectionWrapper
8+ from cx_Oracle import Queue
9+ from asyncio .windows_events import ProactorEventLoop
310
411
512class AsyncQueueWrapper :
613
7- def __init__ (self , queue , loop , thread_pool , ):
14+ def __init__ (self , queue : 'Queue' , loop : 'ProactorEventLoop' , thread_pool : ThreadPoolExecutor , conn : 'AsyncConnectionWrapper' ):
815 self ._queue = queue
916 self ._loop = loop
1017 self ._thread_pool = thread_pool
18+ self ._conn = conn
1119 self ._deqlock = aioLock ()
1220
1321 async def enqOne (self , * args , ** kwargs ):
@@ -20,9 +28,22 @@ async def deqOne(self , *args , **kwargs):
2028 async with self ._deqlock :
2129 return await self ._loop .run_in_executor (self ._thread_pool , self ._queue .deqOne , * args , ** kwargs )
2230
23- def deqMany (self , maxMessages = - 1 ):
31+ def deqMany (self , maxMessages : int = - 1 ):
2432 return DeqManyWrapper (self ._loop , self ._thread_pool , self ._queue , self ._deqlock , maxMessages )
2533
34+ def _decode (self , _object : MessageProperties ):
35+ return _object .payload .decode (self ._conn .encoding )
36+
37+ def unpack (self , _object : Union [MessageProperties , list ]):
38+ if isinstance (_object , Iterable ):
39+ return list (map (self ._decode , _object ))
40+ else :
41+ return self ._decode (_object )
42+
43+ @property
44+ def pack (self ):
45+ return self ._conn .msgproperties
46+
2647 @property
2748 def enqOptions (self ):
2849 return self ._queue .enqOptions
@@ -34,16 +55,24 @@ def deqOptions(self):
3455
3556class DeqManyWrapper :
3657
37- def __init__ (self , loop , thread_pool , queue , deqlock , maxMessages ):
58+ def __init__ (self , loop : 'ProactorEventLoop' , thread_pool : ThreadPoolExecutor , queue : 'Queue' , deqlock : aioLock , maxMessages : int ):
3859 self ._loop = loop
3960 self ._thread_pool = thread_pool
4061 self ._queue = queue
4162 self ._count = 0
42- self ._max = maxMessages if maxMessages != - 1 else (1 << 16 - 1 )
63+ self ._max = maxMessages if maxMessages > - 1 else (1 << 16 - 1 )
64+ self ._max = self ._max if self ._max <= (1 << 16 - 1 ) else (1 << 16 - 1 )
4365 self ._deqlock = deqlock
4466
4567 def __await__ (self ):
46- return self ._loop .run_in_executor (self ._thread_pool , self ._queue .deqMany , self ._max ).__await__ ()
68+ yield from self ._deqlock .acquire ().__await__ ()
69+ try :
70+ ret = yield from self ._loop .run_in_executor (self ._thread_pool , self ._queue .deqMany , self ._max ).__await__ ()
71+ except Exception as exc :
72+ raise exc
73+ finally :
74+ self ._deqlock .release ()
75+ return ret
4776
4877 def __aiter__ (self ):
4978 return self
@@ -53,7 +82,7 @@ async def __anext__(self):
5382 if self ._count <= self ._max :
5483 _tmp = self ._queue .deqOptions .wait
5584 async with self ._deqlock :
56- self ._queue .deqOptions .wait = cx_Oracle . DEQ_NO_WAIT
85+ self ._queue .deqOptions .wait = DEQ_NO_WAIT
5786 data = await self ._loop .run_in_executor (self ._thread_pool , self ._queue .deqOne )
5887 self ._queue .deqOptions .wait = _tmp
5988 if data :
0 commit comments