Skip to content

Commit 437f4d2

Browse files
committed
Update rollback & ping & bugfix
1 parent 0ba6a00 commit 437f4d2

File tree

7 files changed

+145
-15
lines changed

7 files changed

+145
-15
lines changed

.github/workflows/python-package.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ jobs:
4747
run: | # sleep to make sure Oracle server is fully loaded
4848
sleep 60
4949
python -m pip install --upgrade pip
50-
python -m pip install flake8 pytest pytest-asyncio
50+
python -m pip install flake8 pytest pytest-asyncio async_timeout
5151
pip install -r requirements.txt
5252
- name: Lint with flake8
5353
run: |

cx_Oracle_async/connections.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,17 @@ async def gettype(self , *args , **kwargs):
5252
return await self._loop.run_in_executor(self._thread_pool , self._conn.gettype , *args , **kwargs)
5353

5454
async def commit(self):
55-
await self._loop.run_in_executor(self._thread_pool , self._conn.commit)
55+
return await self._loop.run_in_executor(self._thread_pool , self._conn.commit)
5656

5757
async def release(self):
5858
self._pool_wrapper._unoccupied(self._conn)
5959
return await self._loop.run_in_executor(self._thread_pool , self._pool.release , self._conn)
6060

6161
async def cancel(self):
62-
await self._loop.run_in_executor(self._thread_pool , self._conn.cancel)
62+
return await self._loop.run_in_executor(self._thread_pool , self._conn.cancel)
63+
64+
async def ping(self):
65+
return await self._loop.run_in_executor(self._thread_pool , self._conn.ping)
66+
67+
async def rollback(self):
68+
return await self._loop.run_in_executor(self._thread_pool , self._conn.rollback)

cx_Oracle_async/pools.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def _acquire(self):
4949
self._occupied.update((_conn , ))
5050
return AsyncConnectionWrapper(_conn , self._loop , self._thread_pool , self._pool , self)
5151

52-
def _unoccupied(self , obj):
52+
def _unoccupied(self , obj: Connection):
5353
self._occupied.remove(obj)
5454

5555
async def release(self , conn: Connection):
@@ -58,10 +58,17 @@ async def release(self , conn: Connection):
5858
async def drop(self , conn: Connection):
5959
return await self._loop.run_in_executor(self._thread_pool , self._pool.drop , conn)
6060

61-
async def close(self , force = False):
62-
if force:
63-
while self._occupied:
64-
_conn = self._occupied.pop()
61+
async def close(self , force: bool = False , interrupt: bool = False):
62+
'''
63+
WARNING: option `interrupt` will force cancel all running connections before close
64+
the pool. This may cause fetching thread no response forever in some legacy version
65+
of oracle database such as 11 or lower.
66+
67+
Do make sure this option works fine with your working enviornment.
68+
'''
69+
while self._occupied:
70+
_conn = self._occupied.pop()
71+
if interrupt:
6572
await self._loop.run_in_executor(self._thread_pool , _conn.cancel)
6673

6774
return await self._loop.run_in_executor(self._thread_pool , self._pool.close , force)

tests/test_behavior.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ async def test_new_table():
2424
ret = await cursor.fetchone()
2525
assert ret
2626
if ret[0] > 0:
27-
await cursor.execute("DTOP TABLE DEPT")
27+
await cursor.execute("DROP TABLE DEPT")
2828

2929
sql = f"""
3030
CREATE TABLE DEPT
@@ -78,4 +78,4 @@ async def test_usage():
7878

7979
cursor1 = await conn1.cursor()
8080
async with conn1.cursor() as cursor2:
81-
assert type(cursor1) is type(cursor2)
81+
assert type(cursor1) is type(cursor2)

tests/test_drop.py

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,73 @@
33
import pytest
44
import asyncio
55
import time
6+
from async_timeout import timeout
67
from cx_Oracle_async import *
78
import cx_Oracle
9+
import threading
810

911
async def create_long_query(oracle_pool):
1012
async with oracle_pool.acquire() as conn:
1113
cursor = await conn.cursor()
1214
try:
13-
await cursor.execute("BEGIN DBMS_LOCK.SLEEP(:a); END;",(20,))
15+
await cursor.execute("BEGIN DBMS_LOCK.SLEEP(:a); END;",(10,))
1416
except Exception as e:
1517
assert isinstance(e , cx_Oracle.OperationalError)
1618

19+
def create_long_query_sync(oracle_pool):
20+
'''
21+
use sync function in order to avoid pytest loop never stop bug.
22+
'''
23+
try:
24+
conn = oracle_pool._pool.acquire()
25+
cursor = conn.cursor()
26+
cursor.execute("BEGIN DBMS_LOCK.SLEEP(:a); END;",(10,))
27+
except:
28+
...
29+
1730
@pytest.mark.asyncio
1831
async def test_force_close():
1932
loop = asyncio.get_running_loop()
2033
dsn = makedsn('localhost','1521',sid='xe')
2134
INAQ = 0.5
22-
oracle_pool = await create_pool(user='system',password='oracle',dsn=dsn,max=4)
23-
loop = asyncio.get_running_loop()
35+
oracle_pool = await create_pool(user='system',password='oracle',dsn=dsn)
2436
loop.create_task(create_long_query(oracle_pool))
2537
st_time = time.time()
2638
await asyncio.sleep(2)
27-
await oracle_pool.close(force = True)
39+
await oracle_pool.close(force = True , interrupt = False)
40+
ed_time = time.time()
41+
assert (10 - INAQ) <= (ed_time - st_time) <= (10 + INAQ)
42+
43+
# test occupy
44+
oracle_pool = await create_pool(user='system',password='oracle',dsn=dsn,max=4)
45+
conn = await oracle_pool.acquire()
46+
conn = await oracle_pool.acquire()
47+
assert len(oracle_pool._occupied) == 2
48+
conn = await oracle_pool.acquire()
49+
assert len(oracle_pool._occupied) == 3
50+
st_time = time.time()
51+
await asyncio.sleep(2)
52+
async with timeout(2):
53+
# no running task , return immediately
54+
await oracle_pool.close(force = True , interrupt = False)
55+
ed_time = time.time()
56+
assert (2 - INAQ) <= (ed_time - st_time) <= (2 + INAQ)
57+
58+
# test interrupt
59+
oracle_pool = await create_pool(user='system',password='oracle',dsn=dsn,max=4)
60+
st_time = time.time()
61+
t = threading.Thread(target = create_long_query_sync , args = (oracle_pool,))
62+
t.setDaemon(True)
63+
t.start()
64+
await asyncio.sleep(2)
65+
exception_flag = False
66+
try:
67+
async with timeout(2):
68+
# no response forever
69+
await oracle_pool.close(force = True , interrupt = True)
70+
except Exception as e:
71+
exception_flag = True
72+
assert isinstance(e , asyncio.TimeoutError)
2873
ed_time = time.time()
29-
assert (ed_time - st_time) <= (20 - INAQ)
74+
assert exception_flag
75+
assert (4 - INAQ) <= (ed_time - st_time) <= (10 - INAQ)

tests/test_ping.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import os , sys
2+
sys.path.append(os.getcwd())
3+
import pytest
4+
import asyncio
5+
from cx_Oracle_async import *
6+
import cx_Oracle
7+
8+
@pytest.mark.asyncio
9+
async def test_ping():
10+
dsn = makedsn(
11+
host = 'localhost',
12+
port = '1521',
13+
sid = 'xe'
14+
)
15+
async with create_pool(user = 'system',password = 'oracle',dsn = dsn) as oracle_pool:
16+
conn = await oracle_pool.acquire()
17+
r = await conn.ping()
18+
assert r == None
19+
20+
await conn.release()
21+
exception_flag = False
22+
try:
23+
await conn.ping()
24+
except Exception as e:
25+
exception_flag = True
26+
assert isinstance(e , cx_Oracle.InterfaceError)
27+
assert exception_flag

tests/test_rollback.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import os , sys
2+
sys.path.append(os.getcwd())
3+
import pytest
4+
import asyncio
5+
from cx_Oracle_async import *
6+
import cx_Oracle
7+
8+
@pytest.mark.asyncio
9+
async def test_ping():
10+
dsn = makedsn(
11+
host = 'localhost',
12+
port = '1521',
13+
sid = 'xe'
14+
)
15+
async with create_pool(user = 'system',password = 'oracle',dsn = dsn) as oracle_pool:
16+
async with oracle_pool.acquire() as conn:
17+
async with conn.cursor() as cursor:
18+
# check if dept exesits
19+
await cursor.execute("SELECT COUNT(*) FROM USER_TABLES WHERE TABLE_NAME = UPPER(:a)" , ('DEPT' , ))
20+
ret = await cursor.fetchone()
21+
assert ret
22+
if ret[0] > 0:
23+
await cursor.execute("DROP TABLE DEPT")
24+
25+
sql = f"""
26+
CREATE TABLE DEPT
27+
(DEPTNO NUMBER(2) CONSTRAINT PK_DEPT PRIMARY KEY,
28+
DNAME VARCHAR2(14),
29+
LOC VARCHAR2(13)
30+
)
31+
"""
32+
await cursor.execute(sql)
33+
34+
await cursor.execute(f"INSERT INTO DEPT(DEPTNO) VALUES (:a)" , (10 , ))
35+
await conn.rollback()
36+
await cursor.execute(f"INSERT INTO DEPT(DEPTNO) VALUES (:a)" , (12 , ))
37+
await conn.commit()
38+
39+
async with oracle_pool.acquire() as conn:
40+
async with conn.cursor() as cursor:
41+
await cursor.execute(f"SELECT * FROM DEPT")
42+
r = await cursor.fetchall()
43+
assert len(r) == 1
44+
assert r[0][0] == 12

0 commit comments

Comments
 (0)