Skip to content
This repository was archived by the owner on Aug 22, 2025. It is now read-only.

Commit 329cf8e

Browse files
committed
make friendly to any user
better example updates fix kwargs
1 parent 85fa4fd commit 329cf8e

File tree

4 files changed

+175
-135
lines changed

4 files changed

+175
-135
lines changed

examples/prefect_deploy.py

Lines changed: 0 additions & 53 deletions
This file was deleted.

examples/prefect-deploy.Dockerfile renamed to examples/read-hn.Dockerfile

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ RUN rm -rf /var/lib/apt/lists/*
77
WORKDIR /app
88

99
ENV UV_SYSTEM_PYTHON=1
10-
ENV PATH="/root/.local/bin:$PATH"
1110

1211
RUN uv pip install controlflow
1312

examples/read_hn.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
# /// script
2+
# dependencies = ["controlflow"]
3+
# ///
4+
5+
import os
6+
import sys
7+
from pathlib import Path
8+
from typing import Annotated, TypedDict
9+
10+
import httpx
11+
from prefect.artifacts import create_markdown_artifact
12+
from prefect.blocks.system import Secret
13+
from prefect.docker import DockerImage
14+
from prefect.runner.storage import GitCredentials, GitRepository
15+
from pydantic import AnyHttpUrl, Field
16+
17+
import controlflow as cf
18+
19+
20+
class HNArticleSummary(TypedDict):
21+
link: AnyHttpUrl
22+
title: str
23+
main_topics: Annotated[set[str], Field(min_length=1, max_length=5)]
24+
key_takeaways: Annotated[set[str], Field(min_length=1, max_length=5)]
25+
tech_domains: Annotated[set[str], Field(min_length=1, max_length=5)]
26+
27+
28+
@cf.task(instructions="concise, main details")
29+
def analyze_article(id: str) -> HNArticleSummary:
30+
"""Analyze a HackerNews article and return structured insights"""
31+
content = httpx.get(f"https://hacker-news.firebaseio.com/v0/item/{id}.json").json()
32+
return f"here is the article content: {content}" # type: ignore
33+
34+
35+
@cf.task()
36+
def summarize_article_briefs(
37+
briefs: list[HNArticleSummary],
38+
) -> Annotated[str, Field(description="markdown summary")]:
39+
"""Summarize a list of article briefs"""
40+
return f"here are the article briefs: {briefs}" # type: ignore
41+
42+
43+
@cf.flow(retries=2)
44+
def analyze_hn_articles(n: int = 5):
45+
top_article_ids = httpx.get(
46+
"https://hacker-news.firebaseio.com/v0/topstories.json"
47+
).json()[:n]
48+
briefs = analyze_article.map(top_article_ids).result()
49+
create_markdown_artifact(
50+
key="hn-article-exec-summary",
51+
markdown=summarize_article_briefs(briefs),
52+
)
53+
54+
55+
if __name__ == "__main__":
56+
EVERY_12_HOURS_CRON = "0 */12 * * *"
57+
if len(sys.argv) > 1 and sys.argv[1] == "serve":
58+
analyze_hn_articles.serve(
59+
parameters={"n": 5},
60+
cron=EVERY_12_HOURS_CRON,
61+
)
62+
elif len(sys.argv) > 1 and sys.argv[1] == "local_deploy":
63+
analyze_hn_articles.from_source(
64+
source=str((p := Path(__file__)).parent.resolve()),
65+
entrypoint=f"{p.name}:analyze_hn_articles",
66+
).deploy(
67+
name="local-deployment",
68+
work_pool_name="local",
69+
cron=EVERY_12_HOURS_CRON,
70+
)
71+
elif len(sys.argv) > 1 and sys.argv[1] == "docker_deploy":
72+
repo = GitRepository(
73+
url="https://github.com/PrefectHQ/controlflow.git",
74+
branch="main",
75+
credentials=None, # replace with `dict(username="", access_token="")` for private repos
76+
)
77+
analyze_hn_articles.from_source(
78+
source=repo,
79+
entrypoint="examples/read_hn.py:analyze_articles",
80+
).deploy(
81+
name="docker-deployment",
82+
# image=DockerImage( # uncomment and replace with your own image if desired
83+
# name="zzstoatzz/cf-read-hn",
84+
# tag="latest",
85+
# dockerfile=str(Path(__file__).parent.resolve() / "read-hn.Dockerfile"),
86+
# ),
87+
work_pool_name="docker-work", # uv pip install -U prefect-docker prefect worker start --pool docker-work --type docker
88+
cron=EVERY_12_HOURS_CRON,
89+
parameters={"n": 5},
90+
job_variables={
91+
"env": {"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY")},
92+
"image": "zzstoatzz/cf-read-hn:latest", # publicly available image on dockerhub
93+
},
94+
build=False,
95+
push=False,
96+
)
97+
else:
98+
print(f"just running the code\n\n\n\n\n\n")
99+
analyze_hn_articles(5)

src/controlflow/decorators.py

Lines changed: 76 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import Any, Callable, Optional, TypeVar, Union, cast
55

66
from prefect import Flow as PrefectFlow
7+
from prefect import Task as PrefectTask
78
from prefect.utilities.asyncutils import run_coro_as_sync
89
from typing_extensions import ParamSpec
910

@@ -34,7 +35,7 @@ def flow(
3435
timeout_seconds: Optional[Union[float, int]] = None,
3536
prefect_kwargs: Optional[dict[str, Any]] = None,
3637
context_kwargs: Optional[list[str]] = None,
37-
**kwargs: Optional[dict[str, Any]],
38+
**kwargs: Any,
3839
) -> Callable[[Callable[P, R]], PrefectFlow[P, R]]:
3940
"""
4041
A decorator that wraps a function as a ControlFlow flow.
@@ -75,13 +76,15 @@ def flow(
7576
sig = inspect.signature(fn)
7677

7778
def create_flow_context(bound_args):
78-
flow_kwargs = kwargs.copy()
79+
flow_kwargs: dict[str, Any] = kwargs.copy()
7980
if thread is not None:
80-
flow_kwargs.setdefault("thread_id", thread) # type: ignore
81+
flow_kwargs["thread_id"] = thread
8182
if tools is not None:
82-
flow_kwargs.setdefault("tools", tools) # type: ignore
83+
flow_kwargs["tools"] = tools
8384
if default_agent is not None:
84-
flow_kwargs.setdefault("default_agent", default_agent) # type: ignore
85+
flow_kwargs["default_agent"] = default_agent
86+
87+
flow_kwargs.update(kwargs)
8588

8689
context = {}
8790
if context_kwargs:
@@ -117,17 +120,19 @@ def wrapper(*wrapper_args, **wrapper_kwargs):
117120
):
118121
return fn(*wrapper_args, **wrapper_kwargs)
119122

120-
prefect_wrapper = prefect_flow(
121-
timeout_seconds=timeout_seconds,
122-
retries=retries,
123-
retry_delay_seconds=retry_delay_seconds,
124-
**(prefect_kwargs or {}),
125-
)(wrapper)
126-
return cast(Callable[[Callable[P, R]], PrefectFlow[P, R]], prefect_wrapper)
123+
return cast(
124+
Callable[[Callable[P, R]], PrefectFlow[P, R]],
125+
prefect_flow(
126+
timeout_seconds=timeout_seconds,
127+
retries=retries,
128+
retry_delay_seconds=retry_delay_seconds,
129+
**(prefect_kwargs or {}),
130+
)(wrapper),
131+
)
127132

128133

129134
def task(
130-
fn: Optional[Callable[..., Any]] = None,
135+
fn: Optional[Callable[P, R]] = None,
131136
*,
132137
objective: Optional[str] = None,
133138
instructions: Optional[str] = None,
@@ -138,8 +143,8 @@ def task(
138143
retries: Optional[int] = None,
139144
retry_delay_seconds: Optional[Union[float, int]] = None,
140145
timeout_seconds: Optional[Union[float, int]] = None,
141-
**task_kwargs: Optional[dict[str, Any]],
142-
):
146+
**task_kwargs: Any,
147+
) -> Callable[[Callable[P, R]], PrefectTask[P, R]]:
143148
"""
144149
A decorator that turns a Python function into a Task. The Task objective is
145150
set to the function name, and the instructions are set to the function
@@ -162,78 +167,68 @@ def task(
162167
callable: The wrapped function or a new task decorator if `fn` is not provided.
163168
"""
164169

165-
if fn is None:
166-
return functools.partial(
167-
task,
168-
objective=objective,
169-
instructions=instructions,
170-
name=name,
171-
agents=agents,
172-
tools=tools,
173-
interactive=interactive,
174-
retries=retries,
175-
retry_delay_seconds=retry_delay_seconds,
176-
timeout_seconds=timeout_seconds,
177-
**task_kwargs,
178-
)
179-
180-
sig = inspect.signature(fn)
181-
182-
if name is None:
183-
name = fn.__name__
184-
185-
if objective is None:
186-
objective = fn.__doc__ or ""
170+
def decorator(func: Callable[P, R]) -> PrefectTask[P, R]:
171+
sig = inspect.signature(func)
187172

188-
result_type = fn.__annotations__.get("return")
189-
190-
def _get_task(*args, **kwargs) -> Task:
191-
# first process callargs
192-
bound = sig.bind(*args, **kwargs)
193-
bound.apply_defaults()
194-
context = bound.arguments.copy()
195-
196-
# call the function to see if it produces an updated objective
197-
maybe_coro = fn(*args, **kwargs)
198-
if asyncio.iscoroutine(maybe_coro):
199-
result = run_coro_as_sync(maybe_coro)
173+
if name is None:
174+
task_name = func.__name__
200175
else:
201-
result = maybe_coro
202-
if result is not None:
203-
context["Additional context"] = result
176+
task_name = name
204177

205-
return Task(
206-
objective=objective,
207-
instructions=instructions,
208-
name=name,
209-
agents=agents,
210-
context=context,
211-
result_type=result_type,
212-
interactive=interactive or False,
213-
tools=tools or [],
214-
**task_kwargs,
215-
)
178+
if objective is None:
179+
task_objective = func.__doc__ or ""
180+
else:
181+
task_objective = objective
216182

217-
if asyncio.iscoroutinefunction(fn):
183+
result_type = func.__annotations__.get("return")
218184

219-
@functools.wraps(fn)
220-
async def wrapper(*args, **kwargs):
221-
task = _get_task(*args, **kwargs)
222-
return await task.run_async()
223-
else:
185+
def _get_task(*args, **kwargs) -> Task:
186+
bound = sig.bind(*args, **kwargs)
187+
bound.apply_defaults()
188+
context = bound.arguments.copy()
189+
190+
maybe_coro = func(*args, **kwargs)
191+
if asyncio.iscoroutine(maybe_coro):
192+
result = run_coro_as_sync(maybe_coro)
193+
else:
194+
result = maybe_coro
195+
if result is not None:
196+
context["Additional context"] = result
197+
198+
return Task(
199+
objective=task_objective,
200+
instructions=instructions,
201+
name=task_name,
202+
agents=agents,
203+
context=context,
204+
result_type=result_type,
205+
interactive=interactive or False,
206+
tools=tools or [],
207+
**task_kwargs,
208+
)
209+
210+
if asyncio.iscoroutinefunction(func):
211+
212+
@functools.wraps(func)
213+
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: # type: ignore
214+
task = _get_task(*args, **kwargs)
215+
return await task.run_async() # type: ignore
216+
else:
224217

225-
@functools.wraps(fn)
226-
def wrapper(*args, **kwargs):
227-
task = _get_task(*args, **kwargs)
228-
return task.run()
218+
@functools.wraps(func)
219+
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
220+
task = _get_task(*args, **kwargs)
221+
return task.run() # type: ignore
229222

230-
prefect_wrapper = prefect_task(
231-
timeout_seconds=timeout_seconds,
232-
retries=retries,
233-
retry_delay_seconds=retry_delay_seconds,
234-
)(wrapper)
223+
prefect_wrapper = prefect_task(
224+
timeout_seconds=timeout_seconds,
225+
retries=retries,
226+
retry_delay_seconds=retry_delay_seconds,
227+
)(wrapper)
235228

236-
# store the `as_task` method for loading the task object
237-
prefect_wrapper.as_task = _get_task
229+
setattr(prefect_wrapper, "as_task", _get_task)
230+
return cast(PrefectTask[P, R], prefect_wrapper)
238231

239-
return cast(Callable[[Callable[..., Any]], Task], prefect_wrapper)
232+
if fn is None:
233+
return decorator
234+
return decorator(fn) # type: ignore

0 commit comments

Comments
 (0)