Skip to content

Commit 0362405

Browse files
authored
Merge pull request #13 from GoodManWEN/dev
Dev
2 parents 5b165ea + 2c2f69c commit 0362405

File tree

9 files changed

+279
-12
lines changed

9 files changed

+279
-12
lines changed

.github/workflows/python-package.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ jobs:
4747
run: | # sleep to make sure Oracle server is fully loaded
4848
sleep 60
4949
python -m pip install --upgrade pip
50-
python -m pip install flake8 pytest pytest-asyncio
50+
python -m pip install flake8 pytest pytest-asyncio async_timeout
5151
pip install -r requirements.txt
5252
- name: Lint with flake8
5353
run: |

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ import cx_Oracle_async
9191
async def main():
9292
# same api as cx_Oracle.makedsn with 4 limited parameters(host , port , sid , service_name).
9393
dsn = cx_Oracle_async.makedsn(host = 'localhost' , port = '1521' , service_name = 'orcl')
94-
async with await cx_Oracle_async.create_pool(user='', password='',dsn = dsn) as pool:
94+
async with cx_Oracle_async.create_pool(user='', password='',dsn = dsn) as pool:
9595
...
9696

9797
asyncio.run(main())

cx_Oracle_async/connections.py

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import TYPE_CHECKING
77
if TYPE_CHECKING:
88
from asyncio.windows_events import ProactorEventLoop
9+
from .pools import AsyncPoolWrapper
910

1011
class AsyncConnectionWrapper_context(BaseManager):
1112

@@ -19,12 +20,14 @@ async def __aexit__(self, exc_type, exc, tb):
1920

2021
class AsyncConnectionWrapper:
2122

22-
def __init__(self , conn: Connection, loop: 'ProactorEventLoop', thread_pool: ThreadPoolExecutor, pool: SessionPool):
23+
def __init__(self , conn: Connection, loop: 'ProactorEventLoop', thread_pool: ThreadPoolExecutor, pool: SessionPool, pool_wrapper:'AsyncPoolWrapper'):
2324
self._conn = conn
2425
self._loop = loop
2526
self._pool = pool
27+
self._pool_wrapper = pool_wrapper
2628
self._thread_pool = thread_pool
2729

30+
2831
def cursor(self):
2932
coro = self._loop.run_in_executor(self._thread_pool , self._cursor)
3033
return AsyncCursorWrapper_context(coro)
@@ -39,6 +42,42 @@ def msgproperties(self , *args , **kwargs):
3942
def encoding(self):
4043
return self._conn.encoding
4144

45+
@property
46+
def dsn(self):
47+
return self._conn.dsn
48+
49+
@property
50+
def module(self):
51+
return self._conn.module
52+
53+
@module.setter
54+
def module(self , arg):
55+
self._conn.module = arg
56+
57+
@property
58+
def action(self):
59+
return self._conn.action
60+
61+
@action.setter
62+
def action(self , arg):
63+
self._conn.action = arg
64+
65+
@property
66+
def client_identifier(self):
67+
return self._conn.client_identifier
68+
69+
@client_identifier.setter
70+
def client_identifier(self , arg):
71+
self._conn.client_identifier = arg
72+
73+
@property
74+
def clientinfo(self):
75+
return self._conn.clientinfo
76+
77+
@clientinfo.setter
78+
def clientinfo(self , arg):
79+
self._conn.clientinfo = arg
80+
4281
async def queue(self , *args , **kwargs):
4382
return AsyncQueueWrapper(self._conn.queue(*args , **kwargs) , self._loop , self._thread_pool , self)
4483

@@ -49,7 +88,17 @@ async def gettype(self , *args , **kwargs):
4988
return await self._loop.run_in_executor(self._thread_pool , self._conn.gettype , *args , **kwargs)
5089

5190
async def commit(self):
52-
await self._loop.run_in_executor(self._thread_pool , self._conn.commit)
91+
return await self._loop.run_in_executor(self._thread_pool , self._conn.commit)
5392

5493
async def release(self):
55-
return await self._loop.run_in_executor(self._thread_pool , self._pool.release , self._conn)
94+
self._pool_wrapper._unoccupied(self._conn)
95+
return await self._loop.run_in_executor(self._thread_pool , self._pool.release , self._conn)
96+
97+
async def cancel(self):
98+
return await self._loop.run_in_executor(self._thread_pool , self._conn.cancel)
99+
100+
async def ping(self):
101+
return await self._loop.run_in_executor(self._thread_pool , self._conn.ping)
102+
103+
async def rollback(self):
104+
return await self._loop.run_in_executor(self._thread_pool , self._conn.rollback)

cx_Oracle_async/pools.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from .context import AbstractContextManager as BaseManager
22
from .connections import AsyncConnectionWrapper , AsyncConnectionWrapper_context
33
from ThreadPoolExecutorPlus import ThreadPoolExecutor
4-
from cx_Oracle import SessionPool
4+
from cx_Oracle import Connection , SessionPool
55
from types import CoroutineType
66
import asyncio
77
import platform
@@ -38,13 +38,37 @@ def __init__(self , pool : SessionPool, loop : 'ProactorEventLoop' = None):
3838
self._thread_pool.set_daemon_opts(min_workers = max(4 , pool.min << 1))
3939
self._loop = loop
4040
self._pool = pool
41+
self._occupied = set()
4142

4243
def acquire(self):
4344
coro = self._loop.run_in_executor(self._thread_pool , self._acquire)
4445
return AsyncConnectionWrapper_context(coro)
4546

4647
def _acquire(self):
47-
return AsyncConnectionWrapper(self._pool.acquire() , self._loop , self._thread_pool , self._pool)
48+
_conn = self._pool.acquire()
49+
self._occupied.update((_conn , ))
50+
return AsyncConnectionWrapper(_conn , self._loop , self._thread_pool , self._pool , self)
4851

49-
async def close(self):
50-
return await self._loop.run_in_executor(self._thread_pool , self._pool.close)
52+
def _unoccupied(self , obj: Connection):
53+
self._occupied.remove(obj)
54+
55+
async def release(self , conn: Connection):
56+
return await self._loop.run_in_executor(self._thread_pool , self._pool.release , conn)
57+
58+
async def drop(self , conn: Connection):
59+
return await self._loop.run_in_executor(self._thread_pool , self._pool.drop , conn)
60+
61+
async def close(self , force: bool = False , interrupt: bool = False):
62+
'''
63+
WARNING: option `interrupt` will force cancel all running connections before close
64+
the pool. This may cause fetching thread no response forever in some legacy version
65+
of oracle database such as 11 or lower.
66+
67+
Do make sure this option works fine with your working enviornment.
68+
'''
69+
while self._occupied:
70+
_conn = self._occupied.pop()
71+
if interrupt:
72+
await self._loop.run_in_executor(self._thread_pool , _conn.cancel)
73+
74+
return await self._loop.run_in_executor(self._thread_pool , self._pool.close , force)

tests/test_behavior.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ async def test_new_table():
2424
ret = await cursor.fetchone()
2525
assert ret
2626
if ret[0] > 0:
27-
await cursor.execute("DTOP TABLE DEPT")
27+
await cursor.execute("DROP TABLE DEPT")
2828

2929
sql = f"""
3030
CREATE TABLE DEPT
@@ -78,4 +78,4 @@ async def test_usage():
7878

7979
cursor1 = await conn1.cursor()
8080
async with conn1.cursor() as cursor2:
81-
assert type(cursor1) is type(cursor2)
81+
assert type(cursor1) is type(cursor2)

tests/test_connection.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import pytest
44
import asyncio
55
from cx_Oracle_async import *
6+
import time
67

78
@pytest.mark.asyncio
89
async def test_different_connect_ways():
@@ -35,4 +36,51 @@ async def test_different_connect_ways():
3536

3637
ret = await oracle_pool.close()
3738
assert ret == None
38-
39+
40+
@pytest.mark.asyncio
41+
async def test_properties():
42+
INAQ = 0.5
43+
dsn = makedsn(
44+
host = 'localhost',
45+
port = '1521',
46+
sid = 'xe'
47+
)
48+
oracle_pool = await create_pool(
49+
user = 'system',
50+
password = 'oracle',
51+
dsn = dsn
52+
)
53+
54+
async with oracle_pool.acquire() as conn:
55+
st_time = time.time()
56+
conn.module = 'hello world'
57+
conn.action = 'test_action'
58+
conn.client_identifier = 'test_identifier'
59+
conn.clientinfo = 'test_info'
60+
ed_time = time.time()
61+
assert (ed_time - st_time) <= INAQ
62+
63+
async with conn.cursor() as cursor:
64+
await cursor.execute("SELECT SID, MODULE ,ACTION ,CLIENT_IDENTIFIER , CLIENT_INFO FROM V$SESSION WHERE MODULE='hello world'")
65+
r = await cursor.fetchall()
66+
assert len(r) == 1
67+
_ , _module , _action , _ciden , _cinfo = r[0]
68+
assert _module == 'hello world'
69+
assert _action == 'test_action'
70+
assert _ciden == 'test_identifier'
71+
assert _cinfo == 'test_info'
72+
73+
# test no update
74+
st_time = time.time()
75+
conn.module = 'hello world2'
76+
conn.action = 'test_action2'
77+
conn.client_identifier = 'test_identifier2'
78+
conn.clientinfo = 'test_info2'
79+
ed_time = time.time()
80+
assert (ed_time - st_time) <= INAQ
81+
82+
conn2 = await oracle_pool.acquire()
83+
async with conn2.cursor() as cursor:
84+
await cursor.execute("SELECT SID, MODULE ,ACTION ,CLIENT_IDENTIFIER , CLIENT_INFO FROM V$SESSION WHERE MODULE='hello world2'")
85+
r = await cursor.fetchall()
86+
assert len(r) == 0

tests/test_drop.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import os , sys
2+
sys.path.append(os.getcwd())
3+
import pytest
4+
import asyncio
5+
import time
6+
from async_timeout import timeout
7+
from cx_Oracle_async import *
8+
import cx_Oracle
9+
import threading
10+
11+
async def create_long_query(oracle_pool):
12+
async with oracle_pool.acquire() as conn:
13+
cursor = await conn.cursor()
14+
try:
15+
await cursor.execute("BEGIN DBMS_LOCK.SLEEP(:a); END;",(10,))
16+
except Exception as e:
17+
assert isinstance(e , cx_Oracle.OperationalError)
18+
19+
def create_long_query_sync(oracle_pool):
20+
'''
21+
use sync function in order to avoid pytest loop never stop bug.
22+
'''
23+
try:
24+
conn = oracle_pool._pool.acquire()
25+
cursor = conn.cursor()
26+
cursor.execute("BEGIN DBMS_LOCK.SLEEP(:a); END;",(10,))
27+
except:
28+
...
29+
30+
@pytest.mark.asyncio
31+
async def test_force_close():
32+
loop = asyncio.get_running_loop()
33+
dsn = makedsn('localhost','1521',sid='xe')
34+
INAQ = 0.5
35+
oracle_pool = await create_pool(user='system',password='oracle',dsn=dsn)
36+
loop.create_task(create_long_query(oracle_pool))
37+
st_time = time.time()
38+
await asyncio.sleep(2)
39+
await oracle_pool.close(force = True , interrupt = False)
40+
ed_time = time.time()
41+
assert (10 - INAQ) <= (ed_time - st_time) <= (10 + INAQ)
42+
43+
# test occupy
44+
oracle_pool = await create_pool(user='system',password='oracle',dsn=dsn,max=4)
45+
conn = await oracle_pool.acquire()
46+
conn = await oracle_pool.acquire()
47+
assert len(oracle_pool._occupied) == 2
48+
conn = await oracle_pool.acquire()
49+
assert len(oracle_pool._occupied) == 3
50+
st_time = time.time()
51+
await asyncio.sleep(2)
52+
async with timeout(2):
53+
# no running task , return immediately
54+
await oracle_pool.close(force = True , interrupt = False)
55+
ed_time = time.time()
56+
assert (2 - INAQ) <= (ed_time - st_time) <= (2 + INAQ)
57+
58+
# test interrupt
59+
oracle_pool = await create_pool(user='system',password='oracle',dsn=dsn,max=4)
60+
st_time = time.time()
61+
t = threading.Thread(target = create_long_query_sync , args = (oracle_pool,))
62+
t.setDaemon(True)
63+
t.start()
64+
await asyncio.sleep(2)
65+
exception_flag = False
66+
try:
67+
async with timeout(2):
68+
# no response forever
69+
await oracle_pool.close(force = True , interrupt = True)
70+
except Exception as e:
71+
exception_flag = True
72+
assert isinstance(e , asyncio.TimeoutError)
73+
ed_time = time.time()
74+
assert exception_flag
75+
assert (4 - INAQ) <= (ed_time - st_time) <= (10 - INAQ)

tests/test_ping.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import os , sys
2+
sys.path.append(os.getcwd())
3+
import pytest
4+
import asyncio
5+
from cx_Oracle_async import *
6+
import cx_Oracle
7+
8+
@pytest.mark.asyncio
9+
async def test_ping():
10+
dsn = makedsn(
11+
host = 'localhost',
12+
port = '1521',
13+
sid = 'xe'
14+
)
15+
async with create_pool(user = 'system',password = 'oracle',dsn = dsn) as oracle_pool:
16+
conn = await oracle_pool.acquire()
17+
r = await conn.ping()
18+
assert r == None
19+
20+
await conn.release()
21+
exception_flag = False
22+
try:
23+
await conn.ping()
24+
except Exception as e:
25+
exception_flag = True
26+
assert isinstance(e , cx_Oracle.InterfaceError)
27+
assert exception_flag

tests/test_rollback.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import os , sys
2+
sys.path.append(os.getcwd())
3+
import pytest
4+
import asyncio
5+
from cx_Oracle_async import *
6+
import cx_Oracle
7+
8+
@pytest.mark.asyncio
9+
async def test_ping():
10+
dsn = makedsn(
11+
host = 'localhost',
12+
port = '1521',
13+
sid = 'xe'
14+
)
15+
async with create_pool(user = 'system',password = 'oracle',dsn = dsn) as oracle_pool:
16+
async with oracle_pool.acquire() as conn:
17+
async with conn.cursor() as cursor:
18+
# check if dept exesits
19+
await cursor.execute("SELECT COUNT(*) FROM USER_TABLES WHERE TABLE_NAME = UPPER(:a)" , ('DEPT' , ))
20+
ret = await cursor.fetchone()
21+
assert ret
22+
if ret[0] > 0:
23+
await cursor.execute("DROP TABLE DEPT")
24+
25+
sql = f"""
26+
CREATE TABLE DEPT
27+
(DEPTNO NUMBER(2) CONSTRAINT PK_DEPT PRIMARY KEY,
28+
DNAME VARCHAR2(14),
29+
LOC VARCHAR2(13)
30+
)
31+
"""
32+
await cursor.execute(sql)
33+
34+
await cursor.execute(f"INSERT INTO DEPT(DEPTNO) VALUES (:a)" , (10 , ))
35+
await conn.rollback()
36+
await cursor.execute(f"INSERT INTO DEPT(DEPTNO) VALUES (:a)" , (12 , ))
37+
await conn.commit()
38+
39+
async with oracle_pool.acquire() as conn:
40+
async with conn.cursor() as cursor:
41+
await cursor.execute(f"SELECT * FROM DEPT")
42+
r = await cursor.fetchall()
43+
assert len(r) == 1
44+
assert r[0][0] == 12

0 commit comments

Comments
 (0)