Skip to content

Commit 882ff57

Browse files
committed
Add AQ options support
1 parent 24e2a7f commit 882ff57

File tree

3 files changed

+162
-8
lines changed

3 files changed

+162
-8
lines changed

cx_Oracle_async/AQ.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import cx_Oracle
2+
from asyncio import Lock as aioLock
3+
24

35
class AsyncQueueWrapper:
46

57
def __init__(self , queue , loop , thread_pool , ):
68
self._queue = queue
79
self._loop = loop
810
self._thread_pool = thread_pool
11+
self._deqlock = aioLock()
912

1013
async def enqOne(self , *args , **kwargs):
1114
return await self._loop.run_in_executor(self._thread_pool , self._queue.enqOne , *args , **kwargs)
@@ -14,19 +17,30 @@ async def enqMany(self , *args , **kwargs):
1417
return await self._loop.run_in_executor(self._thread_pool , self._queue.enqMany , *args , **kwargs)
1518

1619
async def deqOne(self , *args , **kwargs):
17-
return await self._loop.run_in_executor(self._thread_pool , self._queue.deqOne , *args , **kwargs)
20+
async with self._deqlock:
21+
return await self._loop.run_in_executor(self._thread_pool , self._queue.deqOne , *args , **kwargs)
1822

1923
def deqMany(self , maxMessages):
20-
return DeqManyWrapper(self._loop , self._thread_pool , self._queue , maxMessages)
24+
return DeqManyWrapper(self._loop , self._thread_pool , self._queue , self._deqlock , maxMessages)
25+
26+
@property
27+
def enqOptions(self):
28+
return self._queue.enqOptions
29+
30+
@property
31+
def deqOptions(self):
32+
return self._queue.deqOptions
33+
2134

2235
class DeqManyWrapper:
2336

24-
def __init__(self , loop , thread_pool , queue , maxMessages):
37+
def __init__(self , loop , thread_pool , queue , deqlock ,maxMessages):
2538
self._loop = loop
2639
self._thread_pool = thread_pool
2740
self._queue = queue
2841
self._count = 0
2942
self._max = maxMessages
43+
self._deqlock = deqlock
3044

3145
def __aiter__(self):
3246
return self
@@ -35,9 +49,10 @@ async def __anext__(self):
3549
self._count += 1
3650
if self._count <= self._max:
3751
_tmp = self._queue.deqOptions.wait
38-
self._queue.deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
39-
data = await self._loop.run_in_executor(self._thread_pool , self._queue.deqOne)
40-
self._queue.deqOptions.wait = _tmp
52+
async with self._deqlock:
53+
self._queue.deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
54+
data = await self._loop.run_in_executor(self._thread_pool , self._queue.deqOne)
55+
self._queue.deqOptions.wait = _tmp
4156
if data:
4257
return data
4358
else:

docs/temporary_document_of_AQ.md

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,5 +97,95 @@ async def main():
9797
async with cx_Oracle_async.create_pool(user = '' , password = '' , dsn = dsn) as oracle_pool:
9898
await features(oracle_pool)
9999

100+
asyncio.run(main())
101+
```
102+
103+
It is noteworthy that since we were not implement this library asynchronous in a very basic level ,yet it's just a wrapper of synchronous functions via threads , that makes it not gonna work if you are doing two different things in a single connection at a time. For example in the following situation the code will **NOT** work:
104+
105+
```Python
106+
import cx_Oracle_async
107+
import asyncio
108+
109+
async def coro_to_get_from_queue(conn , queue , oracle_pool):
110+
print(f"coroutine start fetching")
111+
ret = (await queue.deqOne()).payload.decode(conn.encoding)
112+
print(f"coroutine returned , {ret=}")
113+
await conn.commit()
114+
115+
async def main():
116+
loop = asyncio.get_running_loop()
117+
dsn = cx_Oracle_async.makedsn(
118+
host = 'localhost',
119+
port = '1521',
120+
service_name='orcl'
121+
)
122+
async with cx_Oracle_async.create_pool(user = 'C##SCOTT' , password = '123456' , dsn = dsn) as oracle_pool:
123+
async with oracle_pool.acquire() as conn:
124+
queue = await conn.queue("DEMO_RAW_QUEUE")
125+
loop.create_task(coro_to_get_from_queue(conn , queue , oracle_pool))
126+
127+
await asyncio.sleep(1)
128+
129+
data = 'Hello World'
130+
print(f"mainthread put some thing in queue ,{data=}")
131+
await queue.enqOne(conn.msgproperties(payload=data))
132+
await conn.commit()
133+
print(f"mainthread put some thing done")
134+
135+
await asyncio.sleep(1)
136+
print('Process terminated.')
137+
138+
asyncio.run(main())
139+
```
140+
141+
As we planned , there should be a fetching thread(coroutine) start fetcing , this action will block since the queue is empty , and will return until there's something put into the queue. Then after one second sleep , the main thread will put 'Hello World' into AQ and that will trigger the blocked fetching thread , and then the whole program terminated.
142+
143+
However we will find the program blocking forever in real practice. That's because since `queue.deqOptions.wait` equals to `cx_Oracle.DEQ_WAIT_FOREVER` thus while there's nothing in the queue , the query will block **AND** this will take over the control of connection thread , which makes it impossible for the following code to put anything into the queue using the same thread, thus makes it a deadlock.
144+
145+
If you would like to achieve the same result , you should do that in **ANOTHER** connection thread. Simply modify the code as follow:
146+
```Python
147+
import cx_Oracle_async
148+
import asyncio
149+
from async_timeout import timeout
150+
151+
async def coro_to_get_from_queue(conn , queue , oracle_pool):
152+
try:
153+
async with timeout(2):
154+
print(f"coroutine start fetching")
155+
ret = (await queue.deqOne()).payload.decode(conn.encoding)
156+
print(f"coroutine returned , {ret=}")
157+
await conn.commit()
158+
except asyncio.TimeoutError:
159+
print('two seconds passed , timeout triggered.')
160+
async with oracle_pool.acquire() as conn2:
161+
queue2 = await conn2.queue("DEMO_RAW_QUEUE")
162+
data = 'Hello World'
163+
print(f"another connection put some thing in queue ,{data=}")
164+
await queue2.enqOne(conn2.msgproperties(payload=data))
165+
await conn2.commit()
166+
print(f"another connection put some thing done")
167+
168+
async def main():
169+
loop = asyncio.get_running_loop()
170+
dsn = cx_Oracle_async.makedsn(
171+
host = 'localhost',
172+
port = '1521',
173+
service_name='orcl'
174+
)
175+
async with cx_Oracle_async.create_pool(user = 'C##SCOTT' , password = '123456' , dsn = dsn) as oracle_pool:
176+
async with oracle_pool.acquire() as conn:
177+
queue = await conn.queue("DEMO_RAW_QUEUE")
178+
loop.create_task(coro_to_get_from_queue(conn , queue , oracle_pool))
179+
180+
await asyncio.sleep(1)
181+
182+
cursor = await conn.cursor()
183+
await cursor.execute(f"SELECT COUNT(*) FROM DEPT")
184+
fetch_result = await cursor.fetchall()
185+
print(f"main thread continue , {fetch_result=}")
186+
187+
await asyncio.sleep(1)
188+
print('Process terminated.')
189+
100190
asyncio.run(main())
101191
```

tests/test_aq.py

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,43 @@
44
import asyncio
55
import time
66
from cx_Oracle_async import *
7+
import cx_Oracle
8+
9+
async def modify_deqopts(queue , val):
10+
queue.deqOptions.wait = val
11+
12+
async def fetch_from_queue_no_wait(oracle_pool , loop):
13+
async with oracle_pool.acquire() as conn:
14+
queue = await conn.queue("DEMO_RAW_QUEUE")
15+
loop.create_task(modify_deqopts(queue , cx_Oracle.DEQ_NO_WAIT))
16+
ret = await queue.deqOne()
17+
if ret:
18+
ret = ret.payload.decode(conn.encoding)
19+
await conn.commit()
20+
return ret
21+
22+
async def fetch_from_queue_wait_forever(oracle_pool , loop):
23+
async with oracle_pool.acquire() as conn:
24+
queue = await conn.queue("DEMO_RAW_QUEUE")
25+
loop.create_task(modify_deqopts(queue , cx_Oracle.DEQ_WAIT_FOREVER))
26+
ret = await queue.deqOne()
27+
if ret:
28+
ret = ret.payload.decode(conn.encoding)
29+
return ret
30+
31+
async def put_into_queue(oracle_pool , loop):
32+
await asyncio.sleep(2)
33+
async with oracle_pool.acquire() as conn:
34+
queue = await conn.queue("DEMO_RAW_QUEUE")
35+
await queue.enqOne(conn.msgproperties(payload='Hello World'))
36+
await conn.commit()
737

838
@pytest.mark.asyncio
939
async def test_multiquery():
40+
loop = asyncio.get_running_loop()
1041
dsn = makedsn('localhost','1521',sid='xe')
11-
async with create_pool(user='system',password='oracle',dsn=dsn) as oracle_pool:
42+
INAQ = 0.5
43+
async with create_pool(user='system',password='oracle',dsn=dsn,max=4) as oracle_pool:
1244
async with oracle_pool.acquire() as conn:
1345
async with conn.cursor() as cursor:
1446
try:
@@ -123,4 +155,21 @@ async def test_multiquery():
123155
async for m in queue.deqMany(maxMessages=5):
124156
res.append(m.payload.decode(conn.encoding))
125157
ed_time = time.time()
126-
assert (ed_time - st_time) <= 0.5
158+
assert (ed_time - st_time) <= 0.5
159+
160+
# test aq options
161+
st_time = time.time()
162+
_task = loop.create_task(fetch_from_queue_no_wait(oracle_pool , loop))
163+
result = await _task
164+
ed_time = time.time()
165+
assert result == None
166+
assert (ed_time - st_time) <= INAQ
167+
168+
#
169+
st_time = time.time()
170+
_task = loop.create_task(fetch_from_queue_wait_forever(oracle_pool , loop))
171+
loop.create_task(put_into_queue(oracle_pool , loop))
172+
result = await _task
173+
ed_time = time.time()
174+
assert result == "Hello World"
175+
assert (2 - INAQ) <= (ed_time - st_time) <= (2 + INAQ)

0 commit comments

Comments
 (0)