Skip to content

Commit 9c649cf

Browse files
authored
Add the ability to cancel UDFs by client disconnect or timeout (#64)
* Ability to cancel a UDF on disconnect or timeout * Add missing decorator arg * Add async support * Add testing * Add type aliases * Add metrics * Add metrics * Add layers of timings * Add logging * Improve UDF cancellation * Use separate timer for function call; not thread safe * Disable starter workspace for now
1 parent 89c8e93 commit 9c649cf

File tree

15 files changed

+648
-96
lines changed

15 files changed

+648
-96
lines changed

singlestoredb/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
DataError, ManagementError,
2626
)
2727
from .management import (
28-
manage_cluster, manage_workspaces, manage_files,
28+
manage_cluster, manage_workspaces, manage_files, manage_regions,
2929
)
3030
from .types import (
3131
Date, Time, Timestamp, DateFromTicks, TimeFromTicks, TimestampFromTicks,

singlestoredb/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,11 @@
438438
environ=['SINGLESTOREDB_EXT_FUNC_PORT'],
439439
)
440440

441+
register_option(
442+
'external_function.timeout', 'int', check_int, 24*60*60,
443+
'Specifies the timeout in seconds for processing a batch of rows.',
444+
environ=['SINGLESTOREDB_EXT_FUNC_TIMEOUT'],
445+
)
441446

442447
#
443448
# Debugging options

singlestoredb/functions/decorator.py

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import functools
23
import inspect
34
from typing import Any
@@ -19,6 +20,7 @@
1920
]
2021

2122
ReturnType = ParameterType
23+
UDFType = Callable[..., Any]
2224

2325

2426
def is_valid_type(obj: Any) -> bool:
@@ -100,38 +102,50 @@ def _func(
100102
name: Optional[str] = None,
101103
args: Optional[ParameterType] = None,
102104
returns: Optional[ReturnType] = None,
103-
) -> Callable[..., Any]:
105+
timeout: Optional[int] = None,
106+
) -> UDFType:
104107
"""Generic wrapper for UDF and TVF decorators."""
105108

106109
_singlestoredb_attrs = { # type: ignore
107110
k: v for k, v in dict(
108111
name=name,
109112
args=expand_types(args),
110113
returns=expand_types(returns),
114+
timeout=timeout,
111115
).items() if v is not None
112116
}
113117

114118
# No func was specified, this is an uncalled decorator that will get
115119
# called later, so the wrapper much be created with the func passed
116120
# in at that time.
117121
if func is None:
118-
def decorate(func: Callable[..., Any]) -> Callable[..., Any]:
122+
def decorate(func: UDFType) -> UDFType:
119123

120-
def wrapper(*args: Any, **kwargs: Any) -> Callable[..., Any]:
121-
return func(*args, **kwargs) # type: ignore
124+
if asyncio.iscoroutinefunction(func):
125+
async def async_wrapper(*args: Any, **kwargs: Any) -> UDFType:
126+
return await func(*args, **kwargs) # type: ignore
127+
async_wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
128+
return functools.wraps(func)(async_wrapper)
122129

123-
wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
124-
125-
return functools.wraps(func)(wrapper)
130+
else:
131+
def wrapper(*args: Any, **kwargs: Any) -> UDFType:
132+
return func(*args, **kwargs) # type: ignore
133+
wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
134+
return functools.wraps(func)(wrapper)
126135

127136
return decorate
128137

129-
def wrapper(*args: Any, **kwargs: Any) -> Callable[..., Any]:
130-
return func(*args, **kwargs) # type: ignore
131-
132-
wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
138+
if asyncio.iscoroutinefunction(func):
139+
async def async_wrapper(*args: Any, **kwargs: Any) -> UDFType:
140+
return await func(*args, **kwargs) # type: ignore
141+
async_wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
142+
return functools.wraps(func)(async_wrapper)
133143

134-
return functools.wraps(func)(wrapper)
144+
else:
145+
def wrapper(*args: Any, **kwargs: Any) -> UDFType:
146+
return func(*args, **kwargs) # type: ignore
147+
wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
148+
return functools.wraps(func)(wrapper)
135149

136150

137151
def udf(
@@ -140,7 +154,8 @@ def udf(
140154
name: Optional[str] = None,
141155
args: Optional[ParameterType] = None,
142156
returns: Optional[ReturnType] = None,
143-
) -> Callable[..., Any]:
157+
timeout: Optional[int] = None,
158+
) -> UDFType:
144159
"""
145160
Define a user-defined function (UDF).
146161
@@ -167,6 +182,9 @@ def udf(
167182
Specifies the return data type of the function. This parameter
168183
works the same way as `args`. If the function is a table-valued
169184
function, the return type should be a `Table` object.
185+
timeout : int, optional
186+
The timeout in seconds for the UDF execution. If not specified,
187+
the global default timeout is used.
170188
171189
Returns
172190
-------
@@ -178,4 +196,5 @@ def udf(
178196
name=name,
179197
args=args,
180198
returns=returns,
199+
timeout=timeout,
181200
)

0 commit comments

Comments
 (0)