Prefect tasks plus added functionality to enforce type checking and help in debugging helping to reduce negative engineering!
Using pip:
pip install supreme_taskSimply swap out prefect for supreme_task when importing the task decorator.
i.e.
from supreme_task import taskinstead of
from prefect import taskand you're good to go!
Get runtime type checking thanks to typeguard by importing the @task decorator from supreme_task instead of prefect.
See the example run.py file:
from supreme_task import task
@task
def add(x: int, y: int) -> int:
return x + y
add.fn(x="1", y=2)Running python run.py will raise the following exception:
Traceback (most recent call last):
File "run.py", line 9, in <module>
add.fn(x="1", y=2)
File "run.py", line 5, in add
def add(x: int, y: int) -> int:
File "supreme-task-py38/lib/python3.8/site-packages/typeguard/_functions.py", line 135, in check_argument_types
check_type_internal(value, annotation, memo)
File "supreme-task-py38/lib/python3.8/site-packages/typeguard/_checkers.py", line 761, in check_type_internal
raise TypeCheckError(f"is not an instance of {qualified_name(origin_type)}")
typeguard.TypeCheckError: argument "x" (str) is not an instance of intGet persistence of flow run inputs by importing the @task decorator from supreme_task instead of prefect to help with debugging.
i.e. given a file run.py:
from supreme_task import task
from prefect import flow
from prefect.filesystems import LocalFileSystem
@task
def faulty_add(x: int, y: int) -> int:
if x == 1:
raise ValueError("x is 1")
return x + y
@flow(result_storage=LocalFileSystem(basepath="results/"))
def my_flow() -> None:
faulty_add(x=1, y=2)
my_flow()We update our prefect configuration to enable result persistence:
prefect config set PREFECT_RESULTS_PERSIST_BY_DEFAULT=trueWe then run the flow by running python run.py
We now inspect the results directory:
$ tree results
results
├── 514aaa4ae0134405a639cbd9a17365da
├── b662c63ff9854b0e9383d7f6cf0a5b76
└── inputs
└── faulty_add
└── 2023-06-10T10-47-04+0000The inputs for failed task runs are saved under results/inputs/<task_name>/<start_run_time>.