Skip to content

Commit d4edbef

Browse files
committed
Add layers of timings
1 parent 640fa7b commit d4edbef

File tree

2 files changed

+52
-44
lines changed

2 files changed

+52
-44
lines changed

singlestoredb/functions/ext/asgi.py

Lines changed: 49 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -313,17 +313,19 @@ def build_udf_endpoint(
313313

314314
async def do_func(
315315
cancel_event: threading.Event,
316+
timer: Timer,
316317
row_ids: Sequence[int],
317318
rows: Sequence[Sequence[Any]],
318319
) -> Tuple[Sequence[int], List[Tuple[Any, ...]]]:
319320
'''Call function on given rows of data.'''
320321
out = []
321-
for row in rows:
322-
cancel_on_event(cancel_event)
323-
if is_async:
324-
out.append(await func(*row))
325-
else:
326-
out.append(func(*row))
322+
with timer('call_function'):
323+
for row in rows:
324+
cancel_on_event(cancel_event)
325+
if is_async:
326+
out.append(await func(*row))
327+
else:
328+
out.append(func(*row))
327329
return row_ids, list(zip(out))
328330

329331
return do_func
@@ -357,6 +359,7 @@ def build_vector_udf_endpoint(
357359

358360
async def do_func(
359361
cancel_event: threading.Event,
362+
timer: Timer,
360363
row_ids: Sequence[int],
361364
cols: Sequence[Tuple[Sequence[Any], Optional[Sequence[bool]]]],
362365
) -> Tuple[
@@ -367,16 +370,17 @@ async def do_func(
367370
row_ids = array_cls(row_ids)
368371

369372
# Call the function with `cols` as the function parameters
370-
if cols and cols[0]:
371-
if is_async:
372-
out = await func(*[x if m else x[0] for x, m in zip(cols, masks)])
373-
else:
374-
out = func(*[x if m else x[0] for x, m in zip(cols, masks)])
375-
else:
376-
if is_async:
377-
out = await func()
373+
with timer('call_function'):
374+
if cols and cols[0]:
375+
if is_async:
376+
out = await func(*[x if m else x[0] for x, m in zip(cols, masks)])
377+
else:
378+
out = func(*[x if m else x[0] for x, m in zip(cols, masks)])
378379
else:
379-
out = func()
380+
if is_async:
381+
out = await func()
382+
else:
383+
out = func()
380384

381385
cancel_on_event(cancel_event)
382386

@@ -420,21 +424,23 @@ def build_tvf_endpoint(
420424

421425
async def do_func(
422426
cancel_event: threading.Event,
427+
timer: Timer,
423428
row_ids: Sequence[int],
424429
rows: Sequence[Sequence[Any]],
425430
) -> Tuple[Sequence[int], List[Tuple[Any, ...]]]:
426431
'''Call function on given rows of data.'''
427432
out_ids: List[int] = []
428433
out = []
429434
# Call function on each row of data
430-
for i, row in zip(row_ids, rows):
431-
cancel_on_event(cancel_event)
432-
if is_async:
433-
res = await func(*row)
434-
else:
435-
res = func(*row)
436-
out.extend(as_list_of_tuples(res))
437-
out_ids.extend([row_ids[i]] * (len(out)-len(out_ids)))
435+
with timer('call_function'):
436+
for i, row in zip(row_ids, rows):
437+
cancel_on_event(cancel_event)
438+
if is_async:
439+
res = await func(*row)
440+
else:
441+
res = func(*row)
442+
out.extend(as_list_of_tuples(res))
443+
out_ids.extend([row_ids[i]] * (len(out)-len(out_ids)))
438444
return out_ids, out
439445

440446
return do_func
@@ -467,6 +473,7 @@ def build_vector_tvf_endpoint(
467473

468474
async def do_func(
469475
cancel_event: threading.Event,
476+
timer: Timer,
470477
row_ids: Sequence[int],
471478
cols: Sequence[Tuple[Sequence[Any], Optional[Sequence[bool]]]],
472479
) -> Tuple[
@@ -481,20 +488,23 @@ async def do_func(
481488
is_async = asyncio.iscoroutinefunction(func)
482489

483490
# Call function on each column of data
484-
if cols and cols[0]:
485-
if is_async:
486-
res = get_dataframe_columns(
487-
await func(*[x if m else x[0] for x, m in zip(cols, masks)]),
488-
)
489-
else:
490-
res = get_dataframe_columns(
491-
func(*[x if m else x[0] for x, m in zip(cols, masks)]),
492-
)
493-
else:
494-
if is_async:
495-
res = get_dataframe_columns(await func())
491+
with timer('call_function'):
492+
if cols and cols[0]:
493+
if is_async:
494+
func_res = await func(
495+
*[x if m else x[0] for x, m in zip(cols, masks)],
496+
)
497+
else:
498+
func_res = func(
499+
*[x if m else x[0] for x, m in zip(cols, masks)],
500+
)
496501
else:
497-
res = get_dataframe_columns(func())
502+
if is_async:
503+
func_res = await func()
504+
else:
505+
func_res = func()
506+
507+
res = get_dataframe_columns(func_res)
498508

499509
cancel_on_event(cancel_event)
500510

@@ -1003,10 +1013,10 @@ async def __call__(
10031013
)
10041014

10051015
func_task = asyncio.create_task(
1006-
func(cancel_event, *inputs)
1016+
func(cancel_event, timer, *inputs)
10071017
if func_info['is_async']
10081018
else to_thread(
1009-
lambda: asyncio.run(func(cancel_event, *inputs)),
1019+
lambda: asyncio.run(func(cancel_event, timer, *inputs)),
10101020
),
10111021
)
10121022
disconnect_task = asyncio.create_task(
@@ -1018,7 +1028,7 @@ async def __call__(
10181028

10191029
all_tasks += [func_task, disconnect_task, timeout_task]
10201030

1021-
with timer('function_call'):
1031+
with timer('function_wrapper'):
10221032
done, pending = await asyncio.wait(
10231033
all_tasks, return_when=asyncio.FIRST_COMPLETED,
10241034
)

singlestoredb/functions/ext/timer.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -108,15 +108,13 @@ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
108108
# Pop the current timing from stack
109109
timing_info = self._stack.pop()
110110
elapsed = time.perf_counter() - timing_info['start_time']
111-
self.metrics[timing_info['key']] = elapsed
111+
self.metrics.setdefault(timing_info['key'], 0)
112+
self.metrics[timing_info['key']] += elapsed
112113

113114
def finish(self) -> None:
114115
"""Finish the current timing context and store the elapsed time."""
115116
if self._stack:
116-
raise RuntimeError(
117-
'finish() called without a matching __enter__(). '
118-
'Use the context manager instead.',
119-
)
117+
raise RuntimeError('finish() called within a `with` block.')
120118

121119
self.metrics['total'] = time.perf_counter() - self.start_time
122120

0 commit comments

Comments
 (0)