-
Notifications
You must be signed in to change notification settings - Fork 22
Add the ability to cancel UDFs by client disconnect or timeout #64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds the ability to cancel User-Defined Functions (UDFs) by client disconnect or timeout. The changes introduce async UDF support, timeout configuration, cancellation mechanisms, and comprehensive testing for the new timeout and async functionality.
- Adds async UDF support with
async/awaitsyntax - Implements timeout-based cancellation with configurable timeouts
- Introduces client disconnect detection and cancellation
- Adds comprehensive metrics and logging for function execution
Reviewed Changes
Copilot reviewed 14 out of 15 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| singlestoredb/functions/decorator.py | Adds timeout parameter and async function wrapper support |
| singlestoredb/functions/ext/asgi.py | Implements cancellation logic, timeout handling, and execution metrics |
| singlestoredb/functions/ext/timer.py | New timer utility for function execution metrics |
| singlestoredb/config.py | Adds external_function.timeout configuration option |
| singlestoredb/tests/ext_funcs/init.py | Updates type imports and adds async test functions |
| singlestoredb/tests/test_ext_func.py | Adds comprehensive tests for timeout and async functionality |
| singlestoredb/tests/test.sql | Adds longer_data table for timeout testing |
| singlestoredb/management/init.py | Adds manage_regions import |
| singlestoredb/functions/typing/*.py | New typing modules for cleaner imports |
Comments suppressed due to low confidence (2)
singlestoredb/tests/test_ext_func.py:171
- The assertion is inside the context manager but should be outside. The exception won't be available until after the context manager exits.
assert 'timeout' in str(exc.exception).lower()
singlestoredb/tests/test_ext_func.py:200
- The assertion is inside the context manager but should be outside. The exception won't be available until after the context manager exits.
assert 'timeout' in str(exc.exception).lower()
| for i, res in zip(row_ids, func_map(func, rows)): | ||
| out.extend(as_list_of_tuples(res)) | ||
| out_ids.extend([row_ids[i]] * (len(out)-len(out_ids))) | ||
| async with timer('call_function'): |
Copilot
AI
Jul 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable i is used in the loop but row_ids[i] should be i since we're iterating with zip(row_ids, rows) and i is already the row_id value.
| with timer('receive_data'): | ||
| while more_body: | ||
| request = await receive() | ||
| if request['type'] == 'http.disconnect': |
Copilot
AI
Jul 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This disconnect check during data reception will raise a RuntimeError, but the main cancellation logic expects asyncio.CancelledError. This inconsistency could lead to unhandled exceptions.
| else: | ||
| res = func(*row) | ||
| out.extend(as_list_of_tuples(res)) | ||
| out_ids.extend([row_ids[i]] * (len(out)-len(out_ids))) |
Copilot
AI
Jul 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The calculation [row_ids[i]] * (len(out)-len(out_ids)) is complex and error-prone. Consider storing the previous length and using that for clarity.
| out_ids.extend([row_ids[i]] * (len(out)-len(out_ids))) | |
| prev_len_out = len(out) | |
| out_ids.extend([row_ids[i]] * (prev_len_out - len(out_ids))) |
No description provided.