Skip to content

Commit a4f5678

Browse files
committed
Add AQ support
1 parent 4fe7784 commit a4f5678

File tree

10 files changed

+339
-152
lines changed

10 files changed

+339
-152
lines changed

README.md

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@ Easy to use , buy may not the best practice for efficiency concern.
1818
pip install cx_Oracle_async
1919

2020
## Usage
21-
- Nearly all the same with aiomysql (with very limited functions of cource).
21+
- Nearly all the same with aiomysql (with very limited functions) , you can do execute , executemany , commit statement etc.
2222
- If you're connecting to database which is on a different machine from python process , you need to install oracle client module in order to use this library. Check [cx-Oracle's installation guide](https://cx-oracle.readthedocs.io/en/latest/user_guide/installation.html) for further information.
2323
- No automaticly date format transition built-in.
24+
- AQ feature newly added , [docs here](https://github.com/GoodManWEN/cx_Oracle_async/blob/main/docs/temporary_document_of_AQ.md).
2425

2526
## Performance
2627
query type | asynchronous multithreading | synchronous multithreading | synchronous single thread
2728
-|-|-|-
28-
fast single line query | 4864.96 q/s | 5859.20 q/s | 8209.536 q/s
29+
fast single line query | 6259.80 q/s | 28906.93 q/s | 14805.61 q/s
2930
single line insertion | N/A (todo) | N/A | N/A
3031

3132
*/\* Test platform: \*/*<br>
@@ -36,8 +37,8 @@ single line insertion | N/A (todo) | N/A | N/A
3637

3738
## Examples
3839
Before running examples , make sure you've already installed a [oracle client](https://github.com/GoodManWEN/cx_Oracle_async#usage) on your machine.
39-
```Python3
40-
# all_usages.py
40+
```Python
41+
# basic_usages.py
4142
import asyncio
4243
import cx_Oracle_async
4344

@@ -80,23 +81,16 @@ if __name__ == '__main__':
8081
```
8182

8283
Or you can connect to database via makedsn style:
83-
```Python3
84+
```Python
8485
# makedsn.py
8586
import asyncio
8687
import cx_Oracle_async
8788

8889
async def main():
8990
# same api as cx_Oracle.makedsn with 4 limited parameters(host , port , sid , service_name).
9091
dsn = cx_Oracle_async.makedsn(host = 'localhost' , port = '1521' , service_name = 'orcl')
91-
oracle_pool = await cx_Oracle_async.create_pool(
92-
user='user',
93-
password='password',
94-
dsn = dsn
95-
)
96-
97-
...
98-
99-
await oracle_pool.close()
92+
async with await cx_Oracle_async.create_pool(user='', password='',dsn = dsn) as pool:
93+
...
10094

10195
asyncio.run(main())
10296
```

cx_Oracle_async/AQ.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import cx_Oracle
2+
3+
class AsyncQueueWrapper:
4+
5+
def __init__(self , queue , loop , thread_pool , ):
6+
self._queue = queue
7+
self._loop = loop
8+
self._thread_pool = thread_pool
9+
10+
async def enqOne(self , *args , **kwargs):
11+
return await self._loop.run_in_executor(self._thread_pool , self._queue.enqOne , *args , **kwargs)
12+
13+
async def enqMany(self , *args , **kwargs):
14+
return await self._loop.run_in_executor(self._thread_pool , self._queue.enqMany , *args , **kwargs)
15+
16+
async def deqOne(self , *args , **kwargs):
17+
return await self._loop.run_in_executor(self._thread_pool , self._queue.deqOne , *args , **kwargs)
18+
19+
def deqMany(self , maxMessages):
20+
return DeqManyWrapper(self._loop , self._thread_pool , self._queue , maxMessages)
21+
22+
class DeqManyWrapper:
23+
24+
def __init__(self , loop , thread_pool , queue , maxMessages):
25+
self._loop = loop
26+
self._thread_pool = thread_pool
27+
self._queue = queue
28+
self._count = 0
29+
self._max = maxMessages
30+
31+
def __aiter__(self):
32+
return self
33+
34+
async def __anext__(self):
35+
self._count += 1
36+
if self._count <= self._max:
37+
_tmp = self._queue.deqOptions.wait
38+
self._queue.deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
39+
data = await self._loop.run_in_executor(self._thread_pool , self._queue.deqOne)
40+
self._queue.deqOptions.wait = _tmp
41+
if data:
42+
return data
43+
else:
44+
raise StopAsyncIteration
45+
else:
46+
raise StopAsyncIteration

cx_Oracle_async/connections.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
from .context import AbstractContextManager as BaseManager
2+
from .cursors import AsyncCursorWrapper , AsyncCursorWrapper_context
3+
from .AQ import AsyncQueueWrapper
4+
5+
6+
class AsyncConnectionWrapper_context(BaseManager):
7+
8+
def __init__(self , coro):
9+
super().__init__(coro)
10+
11+
async def __aexit__(self, exc_type, exc, tb):
12+
await self._obj.release()
13+
self._obj = None
14+
15+
16+
class AsyncConnectionWrapper:
17+
18+
def __init__(self , conn , loop , thread_pool , pool):
19+
self._conn = conn
20+
self._loop = loop
21+
self._pool = pool
22+
self._thread_pool = thread_pool
23+
self.encoding = self._conn.encoding
24+
25+
def cursor(self):
26+
coro = self._loop.run_in_executor(self._thread_pool , self._cursor)
27+
return AsyncCursorWrapper_context(coro)
28+
29+
def _cursor(self):
30+
return AsyncCursorWrapper(self._conn.cursor() , self._loop , self._thread_pool)
31+
32+
def msgproperties(self , *args , **kwargs):
33+
return self._conn.msgproperties(*args , **kwargs)
34+
35+
async def queue(self , *args , **kwargs):
36+
return AsyncQueueWrapper(self._conn.queue(*args , **kwargs) , self._loop , self._thread_pool)
37+
38+
async def gettype(self , *args , **kwargs):
39+
'''
40+
Uses the original cx_Oracle object without wrapper
41+
'''
42+
return await self._loop.run_in_executor(self._thread_pool , self._conn.gettype , *args , **kwargs)
43+
44+
async def commit(self):
45+
await self._loop.run_in_executor(self._thread_pool , self._conn.commit)
46+
47+
async def release(self):
48+
return await self._loop.run_in_executor(self._thread_pool , self._pool.release , self._conn)

cx_Oracle_async/context.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
class AbstractContextManager:
2+
3+
def __init__(self , coro):
4+
self._coro = coro
5+
self._obj = None
6+
7+
def __next__(self):
8+
return self.send(None)
9+
10+
def __iter__(self):
11+
return self._coro.__await__()
12+
13+
def __await__(self):
14+
return self._coro.__await__()
15+
16+
async def __aenter__(self):
17+
self._obj = await self._coro
18+
return self._obj
19+
20+
async def __aexit__(self, exc_type, exc, tb):
21+
...

cx_Oracle_async/cursors.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from .context import AbstractContextManager as BaseManager
2+
3+
4+
class AsyncCursorWrapper_context(BaseManager):
5+
6+
def __init__(self , coro):
7+
super().__init__(coro)
8+
9+
10+
class AsyncCursorWrapper:
11+
12+
def __init__(self , cursor , loop , thread_pool):
13+
self._cursor = cursor
14+
self._loop = loop
15+
self._thread_pool = thread_pool
16+
17+
async def execute(self , sql , *args , **kwargs):
18+
return await self._loop.run_in_executor(self._thread_pool , self._cursor.execute , sql , *args , **kwargs)
19+
20+
async def executemany(self , sql , *args , **kwargs):
21+
return await self._loop.run_in_executor(self._thread_pool , self._cursor.executemany , sql , *args , **kwargs)
22+
23+
async def fetchone(self):
24+
return await self._loop.run_in_executor(self._thread_pool , self._cursor.fetchone)
25+
26+
async def fetchall(self):
27+
# block mainly happens when fetch triggered.
28+
return await self._loop.run_in_executor(self._thread_pool , self._cursor.fetchall)
29+
30+
async def var(self, args):
31+
return await self._loop.run_in_executor(self._thread_pool , self._cursor.var, args)

cx_Oracle_async/pools.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from .context import AbstractContextManager as BaseManager
2+
from .connections import AsyncConnectionWrapper , AsyncConnectionWrapper_context
3+
from ThreadPoolExecutorPlus import ThreadPoolExecutor
4+
import asyncio
5+
import platform
6+
import os
7+
8+
pltfm = platform.system()
9+
if pltfm == 'Windows':
10+
DEFAULT_MAXIMUM_WORKER_NUM = (os.cpu_count() or 1) * 16
11+
DEFAULT_MAXIMUM_WORKER_TIMES = 2
12+
elif pltfm == 'Linux' or pltfm == 'Darwin':
13+
DEFAULT_MAXIMUM_WORKER_NUM = (os.cpu_count() or 1) * 32
14+
DEFAULT_MAXIMUM_WORKER_TIMES = 3
15+
16+
17+
class AsyncPoolWrapper_context(BaseManager):
18+
19+
def __init__(self , coro):
20+
super().__init__(coro)
21+
22+
async def __aexit__(self, exc_type, exc, tb):
23+
await self._obj.close()
24+
self._obj = None
25+
26+
27+
class AsyncPoolWrapper:
28+
29+
def __init__(self , pool , loop = None):
30+
31+
if loop == None:
32+
loop = asyncio.get_running_loop()
33+
self._thread_pool = ThreadPoolExecutor(max_workers = max(DEFAULT_MAXIMUM_WORKER_NUM , pool.max << DEFAULT_MAXIMUM_WORKER_TIMES))
34+
self._thread_pool.set_daemon_opts(min_workers = max(4 , pool.min << 1))
35+
self._loop = loop
36+
self._pool = pool
37+
38+
def acquire(self):
39+
coro = self._loop.run_in_executor(self._thread_pool , self._acquire)
40+
return AsyncConnectionWrapper_context(coro)
41+
42+
def _acquire(self):
43+
return AsyncConnectionWrapper(self._pool.acquire() , self._loop , self._thread_pool , self._pool)
44+
45+
async def close(self):
46+
return await self._loop.run_in_executor(self._thread_pool , self._pool.close)

0 commit comments

Comments
 (0)