diff --git a/singlestoredb/__init__.py b/singlestoredb/__init__.py index 3d290695e..4a78f31a1 100644 --- a/singlestoredb/__init__.py +++ b/singlestoredb/__init__.py @@ -25,7 +25,7 @@ DataError, ManagementError, ) from .management import ( - manage_cluster, manage_workspaces, manage_files, + manage_cluster, manage_workspaces, manage_files, manage_regions, ) from .types import ( Date, Time, Timestamp, DateFromTicks, TimeFromTicks, TimestampFromTicks, diff --git a/singlestoredb/config.py b/singlestoredb/config.py index 61b79f298..a5e4805de 100644 --- a/singlestoredb/config.py +++ b/singlestoredb/config.py @@ -438,6 +438,11 @@ environ=['SINGLESTOREDB_EXT_FUNC_PORT'], ) +register_option( + 'external_function.timeout', 'int', check_int, 24*60*60, + 'Specifies the timeout in seconds for processing a batch of rows.', + environ=['SINGLESTOREDB_EXT_FUNC_TIMEOUT'], +) # # Debugging options diff --git a/singlestoredb/functions/decorator.py b/singlestoredb/functions/decorator.py index 2280ed401..687211368 100644 --- a/singlestoredb/functions/decorator.py +++ b/singlestoredb/functions/decorator.py @@ -1,3 +1,4 @@ +import asyncio import functools import inspect from typing import Any @@ -19,6 +20,7 @@ ] ReturnType = ParameterType +UDFType = Callable[..., Any] def is_valid_type(obj: Any) -> bool: @@ -100,7 +102,8 @@ def _func( name: Optional[str] = None, args: Optional[ParameterType] = None, returns: Optional[ReturnType] = None, -) -> Callable[..., Any]: + timeout: Optional[int] = None, +) -> UDFType: """Generic wrapper for UDF and TVF decorators.""" _singlestoredb_attrs = { # type: ignore @@ -108,6 +111,7 @@ def _func( name=name, args=expand_types(args), returns=expand_types(returns), + timeout=timeout, ).items() if v is not None } @@ -115,23 +119,33 @@ def _func( # called later, so the wrapper much be created with the func passed # in at that time. if func is None: - def decorate(func: Callable[..., Any]) -> Callable[..., Any]: + def decorate(func: UDFType) -> UDFType: - def wrapper(*args: Any, **kwargs: Any) -> Callable[..., Any]: - return func(*args, **kwargs) # type: ignore + if asyncio.iscoroutinefunction(func): + async def async_wrapper(*args: Any, **kwargs: Any) -> UDFType: + return await func(*args, **kwargs) # type: ignore + async_wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore + return functools.wraps(func)(async_wrapper) - wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore - - return functools.wraps(func)(wrapper) + else: + def wrapper(*args: Any, **kwargs: Any) -> UDFType: + return func(*args, **kwargs) # type: ignore + wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore + return functools.wraps(func)(wrapper) return decorate - def wrapper(*args: Any, **kwargs: Any) -> Callable[..., Any]: - return func(*args, **kwargs) # type: ignore - - wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore + if asyncio.iscoroutinefunction(func): + async def async_wrapper(*args: Any, **kwargs: Any) -> UDFType: + return await func(*args, **kwargs) # type: ignore + async_wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore + return functools.wraps(func)(async_wrapper) - return functools.wraps(func)(wrapper) + else: + def wrapper(*args: Any, **kwargs: Any) -> UDFType: + return func(*args, **kwargs) # type: ignore + wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore + return functools.wraps(func)(wrapper) def udf( @@ -140,7 +154,8 @@ def udf( name: Optional[str] = None, args: Optional[ParameterType] = None, returns: Optional[ReturnType] = None, -) -> Callable[..., Any]: + timeout: Optional[int] = None, +) -> UDFType: """ Define a user-defined function (UDF). @@ -167,6 +182,9 @@ def udf( Specifies the return data type of the function. This parameter works the same way as `args`. If the function is a table-valued function, the return type should be a `Table` object. + timeout : int, optional + The timeout in seconds for the UDF execution. If not specified, + the global default timeout is used. Returns ------- @@ -178,4 +196,5 @@ def udf( name=name, args=args, returns=returns, + timeout=timeout, ) diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index 97997ffc9..69b498bd4 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -24,7 +24,10 @@ """ import argparse import asyncio +import contextvars import dataclasses +import datetime +import functools import importlib.util import inspect import io @@ -37,8 +40,11 @@ import sys import tempfile import textwrap +import threading +import time import typing import urllib +import uuid import zipfile import zipimport from types import ModuleType @@ -66,6 +72,7 @@ from ..signature import signature_to_sql from ..typing import Masked from ..typing import Table +from .timer import Timer try: import cloudpickle @@ -95,6 +102,15 @@ func_map = itertools.starmap +async def to_thread( + func: Any, /, *args: Any, **kwargs: Dict[str, Any], +) -> Any: + loop = asyncio.get_running_loop() + ctx = contextvars.copy_context() + func_call = functools.partial(ctx.run, func, *args, **kwargs) + return await loop.run_in_executor(None, func_call) + + # Use negative values to indicate unsigned ints / binary data / usec time precision rowdat_1_type_map = { 'bool': ft.LONGLONG, @@ -251,6 +267,32 @@ def build_tuple(x: Any) -> Any: return tuple(x) if isinstance(x, Masked) else (x, None) +def cancel_on_event( + cancel_event: threading.Event, +) -> None: + """ + Cancel the function call if the cancel event is set. + + Parameters + ---------- + cancel_event : threading.Event + The event to check for cancellation + + Raises + ------ + asyncio.CancelledError + If the cancel event is set + + """ + if cancel_event.is_set(): + task = asyncio.current_task() + if task is not None: + task.cancel() + raise asyncio.CancelledError( + 'Function call was cancelled by client', + ) + + def build_udf_endpoint( func: Callable[..., Any], returns_data_format: str, @@ -273,12 +315,24 @@ def build_udf_endpoint( """ if returns_data_format in ['scalar', 'list']: + is_async = asyncio.iscoroutinefunction(func) + async def do_func( + cancel_event: threading.Event, + timer: Timer, row_ids: Sequence[int], rows: Sequence[Sequence[Any]], ) -> Tuple[Sequence[int], List[Tuple[Any, ...]]]: '''Call function on given rows of data.''' - return row_ids, [as_tuple(x) for x in zip(func_map(func, rows))] + out = [] + async with timer('call_function'): + for row in rows: + cancel_on_event(cancel_event) + if is_async: + out.append(await func(*row)) + else: + out.append(func(*row)) + return row_ids, list(zip(out)) return do_func @@ -307,8 +361,11 @@ def build_vector_udf_endpoint( """ masks = get_masked_params(func) array_cls = get_array_class(returns_data_format) + is_async = asyncio.iscoroutinefunction(func) async def do_func( + cancel_event: threading.Event, + timer: Timer, row_ids: Sequence[int], cols: Sequence[Tuple[Sequence[Any], Optional[Sequence[bool]]]], ) -> Tuple[ @@ -319,10 +376,19 @@ async def do_func( row_ids = array_cls(row_ids) # Call the function with `cols` as the function parameters - if cols and cols[0]: - out = func(*[x if m else x[0] for x, m in zip(cols, masks)]) - else: - out = func() + async with timer('call_function'): + if cols and cols[0]: + if is_async: + out = await func(*[x if m else x[0] for x, m in zip(cols, masks)]) + else: + out = func(*[x if m else x[0] for x, m in zip(cols, masks)]) + else: + if is_async: + out = await func() + else: + out = func() + + cancel_on_event(cancel_event) # Single masked value if isinstance(out, Masked): @@ -360,7 +426,11 @@ def build_tvf_endpoint( """ if returns_data_format in ['scalar', 'list']: + is_async = asyncio.iscoroutinefunction(func) + async def do_func( + cancel_event: threading.Event, + timer: Timer, row_ids: Sequence[int], rows: Sequence[Sequence[Any]], ) -> Tuple[Sequence[int], List[Tuple[Any, ...]]]: @@ -368,9 +438,15 @@ async def do_func( out_ids: List[int] = [] out = [] # Call function on each row of data - for i, res in zip(row_ids, func_map(func, rows)): - out.extend(as_list_of_tuples(res)) - out_ids.extend([row_ids[i]] * (len(out)-len(out_ids))) + async with timer('call_function'): + for i, row in zip(row_ids, rows): + cancel_on_event(cancel_event) + if is_async: + res = await func(*row) + else: + res = func(*row) + out.extend(as_list_of_tuples(res)) + out_ids.extend([row_ids[i]] * (len(out)-len(out_ids))) return out_ids, out return do_func @@ -402,6 +478,8 @@ def build_vector_tvf_endpoint( array_cls = get_array_class(returns_data_format) async def do_func( + cancel_event: threading.Event, + timer: Timer, row_ids: Sequence[int], cols: Sequence[Tuple[Sequence[Any], Optional[Sequence[bool]]]], ) -> Tuple[ @@ -413,13 +491,28 @@ async def do_func( # each result row, so we just have to use the same # row ID for all rows in the result. + is_async = asyncio.iscoroutinefunction(func) + # Call function on each column of data - if cols and cols[0]: - res = get_dataframe_columns( - func(*[x if m else x[0] for x, m in zip(cols, masks)]), - ) - else: - res = get_dataframe_columns(func()) + async with timer('call_function'): + if cols and cols[0]: + if is_async: + func_res = await func( + *[x if m else x[0] for x, m in zip(cols, masks)], + ) + else: + func_res = func( + *[x if m else x[0] for x, m in zip(cols, masks)], + ) + else: + if is_async: + func_res = await func() + else: + func_res = func() + + res = get_dataframe_columns(func_res) + + cancel_on_event(cancel_event) # Generate row IDs if isinstance(res[0], Masked): @@ -458,6 +551,10 @@ def make_func( function_type = sig.get('function_type', 'udf') args_data_format = sig.get('args_data_format', 'scalar') returns_data_format = sig.get('returns_data_format', 'scalar') + timeout = ( + func._singlestoredb_attrs.get('timeout') or # type: ignore + get_option('external_function.timeout') + ) if function_type == 'tvf': do_func = build_tvf_endpoint(func, returns_data_format) @@ -477,6 +574,12 @@ def make_func( # Set function type info['function_type'] = function_type + # Set timeout + info['timeout'] = max(timeout, 1) + + # Set async flag + info['is_async'] = asyncio.iscoroutinefunction(func) + # Setup argument types for rowdat_1 parser colspec = [] for x in sig['args']: @@ -498,6 +601,43 @@ def make_func( return do_func, info +async def cancel_on_timeout(timeout: int) -> None: + """Cancel request if it takes too long.""" + await asyncio.sleep(timeout) + raise asyncio.CancelledError( + 'Function call was cancelled due to timeout', + ) + + +async def cancel_on_disconnect( + receive: Callable[..., Awaitable[Any]], +) -> None: + """Cancel request if client disconnects.""" + while True: + message = await receive() + if message['type'] == 'http.disconnect': + raise asyncio.CancelledError( + 'Function call was cancelled by client', + ) + + +async def cancel_all_tasks(tasks: Iterable[asyncio.Task[Any]]) -> None: + """Cancel all tasks.""" + for task in tasks: + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + + +def start_counter() -> float: + """Start a timer and return the start time.""" + return time.perf_counter() + + +def end_counter(start: float) -> float: + """End a timer and return the elapsed time.""" + return time.perf_counter() - start + + class Application(object): """ Create an external function application. @@ -824,6 +964,21 @@ async def __call__( Function to send response information ''' + request_id = str(uuid.uuid4()) + + timer = Timer( + id=request_id, + timestamp=datetime.datetime.now( + datetime.timezone.utc, + ).strftime('%Y-%m-%dT%H:%M:%S.%fZ'), + ) + call_timer = Timer( + id=request_id, + timestamp=datetime.datetime.now( + datetime.timezone.utc, + ).strftime('%Y-%m-%dT%H:%M:%S.%fZ'), + ) + assert scope['type'] == 'http' method = scope['method'] @@ -838,6 +993,9 @@ async def __call__( func_name = headers.get(b's2-ef-name', b'') func_endpoint = self.endpoints.get(func_name) + timer.metadata['function'] = func_name.decode('utf-8') if func_name else '' + call_timer.metadata['function'] = timer.metadata['function'] + func = None func_info: Dict[str, Any] = {} if func_endpoint is not None: @@ -845,35 +1003,120 @@ async def __call__( # Call the endpoint if method == 'POST' and func is not None and path == self.invoke_path: + + logger.info( + json.dumps({ + 'type': 'function_call', + 'id': request_id, + 'name': func_name.decode('utf-8'), + 'content_type': content_type.decode('utf-8'), + 'accepts': accepts.decode('utf-8'), + }), + ) + args_data_format = func_info['args_data_format'] returns_data_format = func_info['returns_data_format'] data = [] more_body = True - while more_body: - request = await receive() - data.append(request['body']) - more_body = request.get('more_body', False) + with timer('receive_data'): + while more_body: + request = await receive() + if request['type'] == 'http.disconnect': + raise RuntimeError('client disconnected') + data.append(request['body']) + more_body = request.get('more_body', False) data_version = headers.get(b's2-ef-version', b'') input_handler = self.handlers[(content_type, data_version, args_data_format)] output_handler = self.handlers[(accepts, data_version, returns_data_format)] try: - out = await func( - *input_handler['load']( # type: ignore + all_tasks = [] + result = [] + + cancel_event = threading.Event() + + with timer('parse_input'): + inputs = input_handler['load']( # type: ignore func_info['colspec'], b''.join(data), + ) + + func_task = asyncio.create_task( + func(cancel_event, call_timer, *inputs) + if func_info['is_async'] + else to_thread( + lambda: asyncio.run( + func(cancel_event, call_timer, *inputs), + ), ), ) - body = output_handler['dump']( - [x[1] for x in func_info['returns']], *out, # type: ignore + disconnect_task = asyncio.create_task( + cancel_on_disconnect(receive), + ) + timeout_task = asyncio.create_task( + cancel_on_timeout(func_info['timeout']), ) + + all_tasks += [func_task, disconnect_task, timeout_task] + + async with timer('function_wrapper'): + done, pending = await asyncio.wait( + all_tasks, return_when=asyncio.FIRST_COMPLETED, + ) + + await cancel_all_tasks(pending) + + for task in done: + if task is disconnect_task: + cancel_event.set() + raise asyncio.CancelledError( + 'Function call was cancelled by client disconnect', + ) + + elif task is timeout_task: + cancel_event.set() + raise asyncio.TimeoutError( + 'Function call was cancelled due to timeout', + ) + + elif task is func_task: + result.extend(task.result()) + + with timer('format_output'): + body = output_handler['dump']( + [x[1] for x in func_info['returns']], *result, # type: ignore + ) + await send(output_handler['response']) + except asyncio.TimeoutError: + logging.exception( + 'Timeout in function call: ' + func_name.decode('utf-8'), + ) + body = ( + '[TimeoutError] Function call timed out after ' + + str(func_info['timeout']) + + ' seconds' + ).encode('utf-8') + await send(self.error_response_dict) + + except asyncio.CancelledError: + logging.exception( + 'Function call cancelled: ' + func_name.decode('utf-8'), + ) + body = b'[CancelledError] Function call was cancelled' + await send(self.error_response_dict) + except Exception as e: - logging.exception('Error in function call') + logging.exception( + 'Error in function call: ' + func_name.decode('utf-8'), + ) body = f'[{type(e).__name__}] {str(e).strip()}'.encode('utf-8') await send(self.error_response_dict) + finally: + await cancel_all_tasks(all_tasks) + # Handle api reflection elif method == 'GET' and path == self.show_create_function_path: host = headers.get(b'host', b'localhost:80') @@ -905,9 +1148,15 @@ async def __call__( await send(self.path_not_found_response_dict) # Send body - out = self.body_response_dict.copy() - out['body'] = body - await send(out) + with timer('send_response'): + out = self.body_response_dict.copy() + out['body'] = body + await send(out) + + for k, v in call_timer.metrics.items(): + timer.metrics[k] = v + + timer.finish() def _create_link( self, diff --git a/singlestoredb/functions/ext/timer.py b/singlestoredb/functions/ext/timer.py new file mode 100644 index 000000000..cb1f234e3 --- /dev/null +++ b/singlestoredb/functions/ext/timer.py @@ -0,0 +1,98 @@ +import json +import time +from typing import Any +from typing import Dict +from typing import Optional + +from . import utils + +logger = utils.get_logger('singlestoredb.functions.ext.metrics') + + +class RoundedFloatEncoder(json.JSONEncoder): + + def encode(self, obj: Any) -> str: + if isinstance(obj, dict): + return '{' + ', '.join( + f'"{k}": {self._format_value(v)}' + for k, v in obj.items() + ) + '}' + return super().encode(obj) + + def _format_value(self, value: Any) -> str: + if isinstance(value, float): + return f'{value:.2f}' + return json.dumps(value) + + +class Timer: + """ + Timer context manager that supports nested timing using a stack. + + Example + ------- + timer = Timer() + + with timer('total'): + with timer('receive_data'): + time.sleep(0.1) + with timer('parse_input'): + time.sleep(0.2) + with timer('call_function'): + with timer('inner_operation'): + time.sleep(0.05) + time.sleep(0.3) + + print(timer.metrics) + # {'receive_data': 0.1, 'parse_input': 0.2, 'inner_operation': 0.05, + # 'call_function': 0.35, 'total': 0.65} + + """ + + def __init__(self, **kwargs: Any) -> None: + self.metadata: Dict[str, Any] = kwargs + self.metrics: Dict[str, float] = dict() + self.entries: Dict[str, float] = dict() + self._current_key: Optional[str] = None + self.start_time = time.perf_counter() + + def __call__(self, key: str) -> 'Timer': + self._current_key = key + return self + + def __enter__(self) -> 'Timer': + if self._current_key is None: + raise ValueError( + "No key specified. Use timer('key_name') as context manager.", + ) + self.entries[self._current_key] = time.perf_counter() + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + key = self._current_key + if key and key in self.entries: + start = self.entries.pop(key) + elapsed = time.perf_counter() - start + self.metrics[key] = elapsed + self._current_key = None + + async def __aenter__(self) -> 'Timer': + return self.__enter__() + + async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + self.__exit__(exc_type, exc_val, exc_tb) + + def reset(self) -> None: + self.metrics.clear() + self.entries.clear() + self._current_key = None + + def finish(self) -> None: + """Finish the current timing context and store the elapsed time.""" + self.metrics['total'] = time.perf_counter() - self.start_time + self.log_metrics() + + def log_metrics(self) -> None: + if self.metadata.get('function'): + result = dict(type='function_metrics', **self.metadata, **self.metrics) + logger.info(json.dumps(result, cls=RoundedFloatEncoder)) diff --git a/singlestoredb/functions/typing.py b/singlestoredb/functions/typing/__init__.py similarity index 100% rename from singlestoredb/functions/typing.py rename to singlestoredb/functions/typing/__init__.py diff --git a/singlestoredb/functions/typing/numpy.py b/singlestoredb/functions/typing/numpy.py new file mode 100644 index 000000000..fb3954d2f --- /dev/null +++ b/singlestoredb/functions/typing/numpy.py @@ -0,0 +1,20 @@ +import numpy as np +import numpy.typing as npt + +NDArray = npt.NDArray + +StringArray = StrArray = npt.NDArray[np.str_] +BytesArray = npt.NDArray[np.bytes_] +Float32Array = FloatArray = npt.NDArray[np.float32] +Float64Array = DoubleArray = npt.NDArray[np.float64] +IntArray = npt.NDArray[np.int_] +Int8Array = npt.NDArray[np.int8] +Int16Array = npt.NDArray[np.int16] +Int32Array = npt.NDArray[np.int32] +Int64Array = npt.NDArray[np.int64] +UInt8Array = npt.NDArray[np.uint8] +UInt16Array = npt.NDArray[np.uint16] +UInt32Array = npt.NDArray[np.uint32] +UInt64Array = npt.NDArray[np.uint64] +DateTimeArray = npt.NDArray[np.datetime64] +TimeDeltaArray = npt.NDArray[np.timedelta64] diff --git a/singlestoredb/functions/typing/pandas.py b/singlestoredb/functions/typing/pandas.py new file mode 100644 index 000000000..23a662c55 --- /dev/null +++ b/singlestoredb/functions/typing/pandas.py @@ -0,0 +1,2 @@ +from pandas import DataFrame # noqa: F401 +from pandas import Series # noqa: F401 diff --git a/singlestoredb/functions/typing/polars.py b/singlestoredb/functions/typing/polars.py new file mode 100644 index 000000000..d7556a1e0 --- /dev/null +++ b/singlestoredb/functions/typing/polars.py @@ -0,0 +1,2 @@ +from polars import DataFrame # noqa: F401 +from polars import Series # noqa: F401 diff --git a/singlestoredb/functions/typing/pyarrow.py b/singlestoredb/functions/typing/pyarrow.py new file mode 100644 index 000000000..7c7fce94e --- /dev/null +++ b/singlestoredb/functions/typing/pyarrow.py @@ -0,0 +1,2 @@ +from pyarrow import Array # noqa: F401 +from pyarrow import Table # noqa: F401 diff --git a/singlestoredb/management/__init__.py b/singlestoredb/management/__init__.py index 0f4887fcb..8a87d2840 100644 --- a/singlestoredb/management/__init__.py +++ b/singlestoredb/management/__init__.py @@ -2,6 +2,7 @@ from .cluster import manage_cluster from .files import manage_files from .manager import get_token +from .region import manage_regions from .workspace import get_organization from .workspace import get_secret from .workspace import get_stage diff --git a/singlestoredb/tests/ext_funcs/__init__.py b/singlestoredb/tests/ext_funcs/__init__.py index d481af9e5..f5ea9e419 100644 --- a/singlestoredb/tests/ext_funcs/__init__.py +++ b/singlestoredb/tests/ext_funcs/__init__.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 # mypy: disable-error-code="type-arg" +import asyncio +import time import typing from typing import List from typing import NamedTuple @@ -7,10 +9,6 @@ from typing import Tuple import numpy as np -import numpy.typing as npt -import pandas as pd -import polars as pl -import pyarrow as pa import singlestoredb.functions.dtypes as dt from singlestoredb.functions import Masked @@ -24,6 +22,10 @@ from singlestoredb.functions.dtypes import SMALLINT from singlestoredb.functions.dtypes import TEXT from singlestoredb.functions.dtypes import TINYINT +from singlestoredb.functions.typing import numpy as npt +from singlestoredb.functions.typing import pandas as pdt +from singlestoredb.functions.typing import polars as plt +from singlestoredb.functions.typing import pyarrow as pat @udf @@ -36,19 +38,44 @@ def double_mult(x: float, y: float) -> float: return x * y +@udf(timeout=2) +def timeout_double_mult(x: float, y: float) -> float: + time.sleep(5) + return x * y + + +@udf +async def async_double_mult(x: float, y: float) -> float: + return x * y + + +@udf(timeout=2) +async def async_timeout_double_mult(x: float, y: float) -> float: + await asyncio.sleep(5) + return x * y + + @udf( args=[DOUBLE(nullable=False), DOUBLE(nullable=False)], returns=DOUBLE(nullable=False), ) -def pandas_double_mult(x: pd.Series, y: pd.Series) -> pd.Series: +def pandas_double_mult(x: pdt.Series, y: pdt.Series) -> pdt.Series: return x * y @udf def numpy_double_mult( - x: npt.NDArray[np.float64], - y: npt.NDArray[np.float64], -) -> npt.NDArray[np.float64]: + x: npt.Float64Array, + y: npt.Float64Array, +) -> npt.Float64Array: + return x * y + + +@udf +async def async_numpy_double_mult( + x: npt.Float64Array, + y: npt.Float64Array, +) -> npt.Float64Array: return x * y @@ -56,7 +83,7 @@ def numpy_double_mult( args=[DOUBLE(nullable=False), DOUBLE(nullable=False)], returns=DOUBLE(nullable=False), ) -def arrow_double_mult(x: pa.Array, y: pa.Array) -> pa.Array: +def arrow_double_mult(x: pat.Array, y: pat.Array) -> pat.Array: import pyarrow.compute as pc return pc.multiply(x, y) @@ -65,7 +92,7 @@ def arrow_double_mult(x: pa.Array, y: pa.Array) -> pa.Array: args=[DOUBLE(nullable=False), DOUBLE(nullable=False)], returns=DOUBLE(nullable=False), ) -def polars_double_mult(x: pl.Series, y: pl.Series) -> pl.Series: +def polars_double_mult(x: plt.Series, y: plt.Series) -> plt.Series: return x * y @@ -106,12 +133,12 @@ def tinyint_mult(x: Optional[int], y: Optional[int]) -> Optional[int]: @tinyint_udf -def pandas_tinyint_mult(x: pd.Series, y: pd.Series) -> pd.Series: +def pandas_tinyint_mult(x: pdt.Series, y: pdt.Series) -> pdt.Series: return x * y @tinyint_udf -def polars_tinyint_mult(x: pl.Series, y: pl.Series) -> pl.Series: +def polars_tinyint_mult(x: plt.Series, y: plt.Series) -> plt.Series: return x * y @@ -121,7 +148,7 @@ def numpy_tinyint_mult(x: np.ndarray, y: np.ndarray) -> np.ndarray: @tinyint_udf -def arrow_tinyint_mult(x: pa.Array, y: pa.Array) -> pa.Array: +def arrow_tinyint_mult(x: pat.Array, y: pat.Array) -> pat.Array: import pyarrow.compute as pc return pc.multiply(x, y) @@ -144,12 +171,12 @@ def smallint_mult(x: Optional[int], y: Optional[int]) -> Optional[int]: @smallint_udf -def pandas_smallint_mult(x: pd.Series, y: pd.Series) -> pd.Series: +def pandas_smallint_mult(x: pdt.Series, y: pdt.Series) -> pdt.Series: return x * y @smallint_udf -def polars_smallint_mult(x: pl.Series, y: pl.Series) -> pl.Series: +def polars_smallint_mult(x: plt.Series, y: plt.Series) -> plt.Series: return x * y @@ -159,7 +186,7 @@ def numpy_smallint_mult(x: np.ndarray, y: np.ndarray) -> np.ndarray: @smallint_udf -def arrow_smallint_mult(x: pa.Array, y: pa.Array) -> pa.Array: +def arrow_smallint_mult(x: pat.Array, y: pat.Array) -> pat.Array: import pyarrow.compute as pc return pc.multiply(x, y) @@ -183,12 +210,12 @@ def mediumint_mult(x: Optional[int], y: Optional[int]) -> Optional[int]: @mediumint_udf -def pandas_mediumint_mult(x: pd.Series, y: pd.Series) -> pd.Series: +def pandas_mediumint_mult(x: pdt.Series, y: pdt.Series) -> pdt.Series: return x * y @mediumint_udf -def polars_mediumint_mult(x: pl.Series, y: pl.Series) -> pl.Series: +def polars_mediumint_mult(x: plt.Series, y: plt.Series) -> plt.Series: return x * y @@ -198,7 +225,7 @@ def numpy_mediumint_mult(x: np.ndarray, y: np.ndarray) -> np.ndarray: @mediumint_udf -def arrow_mediumint_mult(x: pa.Array, y: pa.Array) -> pa.Array: +def arrow_mediumint_mult(x: pat.Array, y: pat.Array) -> pat.Array: import pyarrow.compute as pc return pc.multiply(x, y) @@ -222,12 +249,12 @@ def bigint_mult(x: Optional[int], y: Optional[int]) -> Optional[int]: @bigint_udf -def pandas_bigint_mult(x: pd.Series, y: pd.Series) -> pd.Series: +def pandas_bigint_mult(x: pdt.Series, y: pdt.Series) -> pdt.Series: return x * y @bigint_udf -def polars_bigint_mult(x: pl.Series, y: pl.Series) -> pl.Series: +def polars_bigint_mult(x: plt.Series, y: plt.Series) -> plt.Series: return x * y @@ -237,7 +264,7 @@ def numpy_bigint_mult(x: np.ndarray, y: np.ndarray) -> np.ndarray: @bigint_udf -def arrow_bigint_mult(x: pa.Array, y: pa.Array) -> pa.Array: +def arrow_bigint_mult(x: pat.Array, y: pat.Array) -> pat.Array: import pyarrow.compute as pc return pc.multiply(x, y) @@ -261,12 +288,12 @@ def nullable_tinyint_mult(x: Optional[int], y: Optional[int]) -> Optional[int]: @nullable_tinyint_udf -def pandas_nullable_tinyint_mult(x: pd.Series, y: pd.Series) -> pd.Series: +def pandas_nullable_tinyint_mult(x: pdt.Series, y: pdt.Series) -> pdt.Series: return x * y @nullable_tinyint_udf -def polars_nullable_tinyint_mult(x: pl.Series, y: pl.Series) -> pl.Series: +def polars_nullable_tinyint_mult(x: plt.Series, y: plt.Series) -> plt.Series: return x * y @@ -276,7 +303,7 @@ def numpy_nullable_tinyint_mult(x: np.ndarray, y: np.ndarray) -> np.ndarray: @nullable_tinyint_udf -def arrow_nullable_tinyint_mult(x: pa.Array, y: pa.Array) -> pa.Array: +def arrow_nullable_tinyint_mult(x: pat.Array, y: pat.Array) -> pat.Array: import pyarrow.compute as pc return pc.multiply(x, y) @@ -299,12 +326,12 @@ def nullable_smallint_mult(x: Optional[int], y: Optional[int]) -> Optional[int]: @nullable_smallint_udf -def pandas_nullable_smallint_mult(x: pd.Series, y: pd.Series) -> pd.Series: +def pandas_nullable_smallint_mult(x: pdt.Series, y: pdt.Series) -> pdt.Series: return x * y @nullable_smallint_udf -def polars_nullable_smallint_mult(x: pl.Series, y: pl.Series) -> pl.Series: +def polars_nullable_smallint_mult(x: plt.Series, y: plt.Series) -> plt.Series: return x * y @@ -314,7 +341,7 @@ def numpy_nullable_smallint_mult(x: np.ndarray, y: np.ndarray) -> np.ndarray: @nullable_smallint_udf -def arrow_nullable_smallint_mult(x: pa.Array, y: pa.Array) -> pa.Array: +def arrow_nullable_smallint_mult(x: pat.Array, y: pat.Array) -> pat.Array: import pyarrow.compute as pc return pc.multiply(x, y) @@ -338,12 +365,12 @@ def nullable_mediumint_mult(x: Optional[int], y: Optional[int]) -> Optional[int] @nullable_mediumint_udf -def pandas_nullable_mediumint_mult(x: pd.Series, y: pd.Series) -> pd.Series: +def pandas_nullable_mediumint_mult(x: pdt.Series, y: pdt.Series) -> pdt.Series: return x * y @nullable_mediumint_udf -def polars_nullable_mediumint_mult(x: pl.Series, y: pl.Series) -> pl.Series: +def polars_nullable_mediumint_mult(x: plt.Series, y: plt.Series) -> plt.Series: return x * y @@ -353,7 +380,7 @@ def numpy_nullable_mediumint_mult(x: np.ndarray, y: np.ndarray) -> np.ndarray: @nullable_mediumint_udf -def arrow_nullable_mediumint_mult(x: pa.Array, y: pa.Array) -> pa.Array: +def arrow_nullable_mediumint_mult(x: pat.Array, y: pat.Array) -> pat.Array: import pyarrow.compute as pc return pc.multiply(x, y) @@ -377,12 +404,12 @@ def nullable_bigint_mult(x: Optional[int], y: Optional[int]) -> Optional[int]: @nullable_bigint_udf -def pandas_nullable_bigint_mult(x: pd.Series, y: pd.Series) -> pd.Series: +def pandas_nullable_bigint_mult(x: pdt.Series, y: pdt.Series) -> pdt.Series: return x * y @nullable_bigint_udf -def polars_nullable_bigint_mult(x: pl.Series, y: pl.Series) -> pl.Series: +def polars_nullable_bigint_mult(x: plt.Series, y: plt.Series) -> plt.Series: return x * y @@ -392,7 +419,7 @@ def numpy_nullable_bigint_mult(x: np.ndarray, y: np.ndarray) -> np.ndarray: @nullable_bigint_udf -def arrow_nullable_bigint_mult(x: pa.Array, y: pa.Array) -> pa.Array: +def arrow_nullable_bigint_mult(x: pat.Array, y: pat.Array) -> pat.Array: import pyarrow.compute as pc return pc.multiply(x, y) @@ -410,7 +437,7 @@ def string_mult(x: str, times: int) -> str: @udf(args=[TEXT(nullable=False), BIGINT(nullable=False)], returns=TEXT(nullable=False)) -def pandas_string_mult(x: pd.Series, times: pd.Series) -> pd.Series: +def pandas_string_mult(x: pdt.Series, times: pdt.Series) -> pdt.Series: return x * times @@ -447,8 +474,8 @@ def nullable_string_mult(x: Optional[str], times: Optional[int]) -> Optional[str returns=TINYINT(nullable=True), ) def pandas_nullable_tinyint_mult_with_masks( - x: Masked[pd.Series], y: Masked[pd.Series], -) -> Masked[pd.Series]: + x: Masked[pdt.Series], y: Masked[pdt.Series], +) -> Masked[pdt.Series]: x_data, x_nulls = x y_data, y_nulls = y return Masked(x_data * y_data, x_nulls | y_nulls) @@ -468,8 +495,8 @@ def numpy_nullable_tinyint_mult_with_masks( returns=TINYINT(nullable=True), ) def polars_nullable_tinyint_mult_with_masks( - x: Masked[pl.Series], y: Masked[pl.Series], -) -> Masked[pl.Series]: + x: Masked[plt.Series], y: Masked[plt.Series], +) -> Masked[plt.Series]: x_data, x_nulls = x y_data, y_nulls = y return Masked(x_data * y_data, x_nulls | y_nulls) @@ -480,8 +507,8 @@ def polars_nullable_tinyint_mult_with_masks( returns=TINYINT(nullable=True), ) def arrow_nullable_tinyint_mult_with_masks( - x: Masked[pa.Array], y: Masked[pa.Array], -) -> Masked[pa.Array]: + x: Masked[pat.Array], y: Masked[pat.Array], +) -> Masked[pat.Array]: import pyarrow.compute as pc x_data, x_nulls = x y_data, y_nulls = y @@ -489,7 +516,7 @@ def arrow_nullable_tinyint_mult_with_masks( @udf(returns=[TEXT(nullable=False, name='res')]) -def numpy_fixed_strings() -> Table[npt.NDArray[np.str_]]: +def numpy_fixed_strings() -> Table[npt.StrArray]: out = np.array( [ 'hello', @@ -502,7 +529,7 @@ def numpy_fixed_strings() -> Table[npt.NDArray[np.str_]]: @udf(returns=[TEXT(nullable=False, name='res'), TINYINT(nullable=False, name='res2')]) -def numpy_fixed_strings_2() -> Table[npt.NDArray[np.str_], npt.NDArray[np.int8]]: +def numpy_fixed_strings_2() -> Table[npt.StrArray, npt.Int8Array]: out = np.array( [ 'hello', @@ -515,7 +542,7 @@ def numpy_fixed_strings_2() -> Table[npt.NDArray[np.str_], npt.NDArray[np.int8]] @udf(returns=[BLOB(nullable=False, name='res')]) -def numpy_fixed_binary() -> Table[npt.NDArray[np.bytes_]]: +def numpy_fixed_binary() -> Table[npt.BytesArray]: out = np.array( [ 'hello'.encode('utf8'), @@ -537,6 +564,11 @@ def table_function(n: int) -> Table[List[int]]: return Table([10] * n) +@udf +async def async_table_function(n: int) -> Table[List[int]]: + return Table([10] * n) + + @udf( returns=[ dt.INT(name='c_int', nullable=False), @@ -561,8 +593,8 @@ def table_function_struct(n: int) -> Table[List[MyTable]]: @udf def vec_function( - x: npt.NDArray[np.float64], y: npt.NDArray[np.float64], -) -> npt.NDArray[np.float64]: + x: npt.Float64Array, y: npt.Float64Array, +) -> npt.Float64Array: return x * y @@ -577,8 +609,8 @@ class VecOutputs(typing.NamedTuple): @udf(args=VecInputs, returns=VecOutputs) def vec_function_ints( - x: npt.NDArray[np.int_], y: npt.NDArray[np.int_], -) -> npt.NDArray[np.int_]: + x: npt.IntArray, y: npt.IntArray, +) -> npt.IntArray: return x * y @@ -589,9 +621,16 @@ class DFOutputs(typing.NamedTuple): @udf(args=VecInputs, returns=DFOutputs) def vec_function_df( - x: npt.NDArray[np.int_], y: npt.NDArray[np.int_], -) -> Table[pd.DataFrame]: - return pd.DataFrame(dict(res=[1, 2, 3], res2=[1.1, 2.2, 3.3])) + x: npt.IntArray, y: npt.IntArray, +) -> Table[pdt.DataFrame]: + return pdt.DataFrame(dict(res=[1, 2, 3], res2=[1.1, 2.2, 3.3])) + + +@udf(args=VecInputs, returns=DFOutputs) +async def async_vec_function_df( + x: npt.IntArray, y: npt.IntArray, +) -> Table[pdt.DataFrame]: + return pdt.DataFrame(dict(res=[1, 2, 3], res2=[1.1, 2.2, 3.3])) class MaskOutputs(typing.NamedTuple): @@ -600,8 +639,8 @@ class MaskOutputs(typing.NamedTuple): @udf(args=VecInputs, returns=MaskOutputs) def vec_function_ints_masked( - x: Masked[npt.NDArray[np.int_]], y: Masked[npt.NDArray[np.int_]], -) -> Table[Masked[npt.NDArray[np.int_]]]: + x: Masked[npt.IntArray], y: Masked[npt.IntArray], +) -> Table[Masked[npt.IntArray]]: x_data, x_nulls = x y_data, y_nulls = y return Table(Masked(x_data * y_data, x_nulls | y_nulls)) @@ -614,8 +653,8 @@ class MaskOutputs2(typing.NamedTuple): @udf(args=VecInputs, returns=MaskOutputs2) def vec_function_ints_masked2( - x: Masked[npt.NDArray[np.int_]], y: Masked[npt.NDArray[np.int_]], -) -> Table[Masked[npt.NDArray[np.int_]], Masked[npt.NDArray[np.int_]]]: + x: Masked[npt.IntArray], y: Masked[npt.IntArray], +) -> Table[Masked[npt.IntArray], Masked[npt.IntArray]]: x_data, x_nulls = x y_data, y_nulls = y return Table( diff --git a/singlestoredb/tests/test.sql b/singlestoredb/tests/test.sql index fd7d26f25..ab3cf955a 100644 --- a/singlestoredb/tests/test.sql +++ b/singlestoredb/tests/test.sql @@ -14,6 +14,28 @@ INSERT INTO data SET id='e', name='elephants', value=0; COMMIT; +CREATE ROWSTORE TABLE IF NOT EXISTS longer_data ( + id VARCHAR(255) NOT NULL, + name VARCHAR(255) NOT NULL, + value BIGINT NOT NULL, + PRIMARY KEY (id) USING HASH +) DEFAULT CHARSET = utf8 COLLATE = utf8_unicode_ci; + +INSERT INTO longer_data SET id='a', name='antelopes', value=2; +INSERT INTO longer_data SET id='b', name='bears', value=2; +INSERT INTO longer_data SET id='c', name='cats', value=5; +INSERT INTO longer_data SET id='d', name='dogs', value=4; +INSERT INTO longer_data SET id='e', name='elephants', value=0; +INSERT INTO longer_data SET id='f', name='ferrets', value=2; +INSERT INTO longer_data SET id='g', name='gorillas', value=4; +INSERT INTO longer_data SET id='h', name='horses', value=6; +INSERT INTO longer_data SET id='i', name='iguanas', value=2; +INSERT INTO longer_data SET id='j', name='jaguars', value=0; +INSERT INTO longer_data SET id='k', name='kiwis', value=0; +INSERT INTO longer_data SET id='l', name='leopards', value=1; + +COMMIT; + CREATE ROWSTORE TABLE IF NOT EXISTS data_with_nulls ( id VARCHAR(255) NOT NULL, name VARCHAR(255), diff --git a/singlestoredb/tests/test_ext_func.py b/singlestoredb/tests/test_ext_func.py index 60e1ecf2a..d3e680e58 100755 --- a/singlestoredb/tests/test_ext_func.py +++ b/singlestoredb/tests/test_ext_func.py @@ -162,6 +162,43 @@ def test_double_mult(self): 'from data order by id', ) + def test_timeout_double_mult(self): + with self.assertRaises(self.conn.OperationalError) as exc: + self.cur.execute( + 'select timeout_double_mult(value, 100) as res ' + 'from longer_data order by id', + ) + assert 'timeout' in str(exc.exception).lower() + + def test_async_double_mult(self): + self.cur.execute( + 'select async_double_mult(value, 100) as res from data order by id', + ) + + assert [tuple(x) for x in self.cur] == \ + [(200.0,), (200.0,), (500.0,), (400.0,), (0.0,)] + + desc = self.cur.description + assert len(desc) == 1 + assert desc[0].name == 'res' + assert desc[0].type_code == ft.DOUBLE + assert desc[0].null_ok is False + + # NULL is not valid + with self.assertRaises(self.conn.OperationalError): + self.cur.execute( + 'select async_double_mult(value, NULL) as res ' + 'from data order by id', + ) + + def test_async_timeout_double_mult(self): + with self.assertRaises(self.conn.OperationalError) as exc: + self.cur.execute( + 'select async_timeout_double_mult(value, 100) as res ' + 'from longer_data order by id', + ) + assert 'timeout' in str(exc.exception).lower() + def test_pandas_double_mult(self): self.cur.execute( 'select pandas_double_mult(value, 100) as res ' @@ -206,6 +243,28 @@ def test_numpy_double_mult(self): 'from data order by id', ) + def test_async_numpy_double_mult(self): + self.cur.execute( + 'select async_numpy_double_mult(value, 100) as res ' + 'from data order by id', + ) + + assert [tuple(x) for x in self.cur] == \ + [(200.0,), (200.0,), (500.0,), (400.0,), (0.0,)] + + desc = self.cur.description + assert len(desc) == 1 + assert desc[0].name == 'res' + assert desc[0].type_code == ft.DOUBLE + assert desc[0].null_ok is False + + # NULL is not valid + with self.assertRaises(self.conn.OperationalError): + self.cur.execute( + 'select async_numpy_double_mult(value, NULL) as res ' + 'from data order by id', + ) + def test_arrow_double_mult(self): self.cur.execute( 'select arrow_double_mult(value, 100) as res ' @@ -1246,6 +1305,17 @@ def test_table_function(self): assert desc[0].type_code == ft.LONGLONG assert desc[0].null_ok is False + def test_async_table_function(self): + self.cur.execute('select * from async_table_function(5)') + + assert [x[0] for x in self.cur] == [10, 10, 10, 10, 10] + + desc = self.cur.description + assert len(desc) == 1 + assert desc[0].name == 'a' + assert desc[0].type_code == ft.LONGLONG + assert desc[0].null_ok is False + def test_table_function_tuple(self): self.cur.execute('select * from table_function_tuple(3)') @@ -1310,6 +1380,26 @@ def test_vec_function_df(self): assert desc[1].type_code == ft.DOUBLE assert desc[1].null_ok is False + def test_async_vec_function_df(self): + self.cur.execute('select * from async_vec_function_df(5, 10)') + + out = list(self.cur) + + assert out == [ + (1, 1.1), + (2, 2.2), + (3, 3.3), + ] + + desc = self.cur.description + assert len(desc) == 2 + assert desc[0].name == 'res' + assert desc[0].type_code == ft.SHORT + assert desc[0].null_ok is False + assert desc[1].name == 'res2' + assert desc[1].type_code == ft.DOUBLE + assert desc[1].null_ok is False + def test_vec_function_ints_masked(self): self.cur.execute('select * from vec_function_ints_masked(5, 10)') diff --git a/singlestoredb/tests/test_management.py b/singlestoredb/tests/test_management.py index 2e77e6e61..2cbe12b0a 100755 --- a/singlestoredb/tests/test_management.py +++ b/singlestoredb/tests/test_management.py @@ -14,6 +14,7 @@ from singlestoredb.management.job import Status from singlestoredb.management.job import TargetType from singlestoredb.management.region import Region +from singlestoredb.management.region import RegionManager from singlestoredb.management.utils import NamedList @@ -365,6 +366,7 @@ def test_connect(self): assert 'endpoint' in cm.exception.msg, cm.exception.msg +@pytest.mark.skip(reason='Not implemented in server yet') @pytest.mark.management class TestStarterWorkspace(unittest.TestCase): @@ -1491,6 +1493,7 @@ def test_file_object(self): space.remove('obj_test_2.ipynb') +@pytest.mark.skip(reason='Not implemented in server yet') @pytest.mark.management class TestRegions(unittest.TestCase): """Test cases for region management.""" @@ -1591,5 +1594,5 @@ def test_no_manager(self): # Verify from_dict class method with self.assertRaises(s2.ManagementError) as cm: - Region.get_shared_tier_regions(None) + RegionManager.list_shared_tier_regions(None) assert 'No workspace manager' in str(cm.exception)