Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions mypy.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[mypy]
exclude = (?x)(
^src/humanloop/eval_utils\.py$
| ^src/humanloop/prompt_utils\.py$
^src/humanloop/eval_utils/.*\.py$ # all files in "eval_utils" folder
| ^src/humanloop/prompt_utils\.py$ # single "prompt_utils.py" file
)
6 changes: 3 additions & 3 deletions src/humanloop/eval_utils/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
class EvaluationContext(TypedDict):
"""Context Log to Humanloop.

Global state that is set when an Evaluation is ran.
Per datapoint state that is set when an Evaluation is ran.
"""

"""Required for associating a Log with the Evaluation Run."""
source_datapoint_id: str

"""Exporter calls this so the eval_utils are notified to evaluate an uploaded Log."""
upload_callback: Callable[[dict], None]
"""Overloaded .log method call."""
upload_callback: Callable[[str], None]

"""ID of the evaluated File."""
file_id: str
Expand Down
135 changes: 56 additions & 79 deletions src/humanloop/eval_utils/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
from humanloop.types.create_flow_log_response import CreateFlowLogResponse
from humanloop.types.create_prompt_log_response import CreatePromptLogResponse
from humanloop.types.create_tool_log_response import CreateToolLogResponse
from humanloop.types.datapoint_response_target_value import DatapointResponseTargetValue
from humanloop.types.evaluation_run_response import EvaluationRunResponse
from humanloop.types.run_stats_response import RunStatsResponse
from pydantic import ValidationError
Expand Down Expand Up @@ -115,9 +114,9 @@ def _is_evaluated_file(
) == log_args.get("path")

# Copy the original log method in a hidden attribute
client._log = client.log # type: ignore
client._log = client.log

def _overloaded_log(
def _overload_log(
self,
**kwargs,
) -> Union[
Expand All @@ -132,20 +131,13 @@ def _overloaded_log(
# If the Evaluation Context is not set, an Evaluation is not running
evaluation_context = None

if _is_evaluated_file(
evaluation_context=evaluation_context, # type: ignore
log_args=kwargs,
):
if _is_evaluated_file(evaluation_context=evaluation_context, log_args=kwargs):
# If the .log API user does not provide the source_datapoint_id or run_id,
# override them with the values from the EvaluationContext
# _is_evaluated_file ensures that evaluation_context is not None
evaluation_context = typing.cast(
EvaluationContext,
evaluation_context,
)
for attribute in ["source_datapoint_id", "run_id"]:
if attribute not in kwargs or kwargs[attribute] is None:
kwargs[attribute] = evaluation_context[attribute] # type: ignore
kwargs[attribute] = evaluation_context[attribute]

# Call the original .log method
logger.debug(
Expand All @@ -156,33 +148,21 @@ def _overloaded_log(
)
response = self._log(**kwargs)

# Call the callback so the Evaluation can be updated
if _is_evaluated_file(
evaluation_context=evaluation_context, # type: ignore
evaluation_context=evaluation_context,
log_args=kwargs,
):
# Notify that the Log has been added to the Evaluation
# Call the callback so the Evaluation can be updated
# _is_evaluated_file ensures that evaluation_context is not None
evaluation_context = typing.cast(
EvaluationContext,
evaluation_context,
)
evaluation_context["upload_callback"]( # type: ignore
{
**kwargs,
# ID in kwargs refers to the File ID
# Replace it with the Log ID
"id": response.id,
}
)
evaluation_context["upload_callback"](log_id=response.id)

# Mark the Evaluation Context as consumed
evaluation_context_variable.set(None)

return response

# Replace the original log method with the overloaded one
client.log = types.MethodType(_overloaded_log, client) # type: ignore
client.log = types.MethodType(_overload_log, client)
# Return the client with the overloaded log method
logger.debug("Overloaded the .log method of %s", client)
return client
Expand Down Expand Up @@ -316,7 +296,7 @@ def run_eval(
except ValidationError:
flow_version = {"attributes": version}
file_dict = {**file_, **flow_version}
hl_file = client.flows.upsert(**file_dict) # type: ignore
hl_file = client.flows.upsert(**file_dict)

elif type_ == "prompt":
try:
Expand All @@ -325,7 +305,7 @@ def run_eval(
logger.error(msg="Invalid Prompt `version` in your `file` request. \n\nValidation error: \n)")
raise error_
try:
hl_file = client.prompts.upsert(**file_dict) # type: ignore
hl_file = client.prompts.upsert(**file_dict)
except ApiError as error_:
raise error_

Expand All @@ -335,10 +315,10 @@ def run_eval(
except ValidationError as error_:
logger.error(msg="Invalid Tool `version` in your `file` request. \n\nValidation error: \n)")
raise error_
hl_file = client.tools.upsert(**file_dict) # type: ignore
hl_file = client.tools.upsert(**file_dict)

elif type_ == "evaluator":
hl_file = client.evaluators.upsert(**file_dict) # type: ignore
hl_file = client.evaluators.upsert(**file_dict)

else:
raise NotImplementedError(f"Unsupported File type: {type_}")
Expand Down Expand Up @@ -396,7 +376,7 @@ def run_eval(
break
if requires_target:
missing_target = 0
for datapoint in hl_dataset.datapoints: # type: ignore
for datapoint in hl_dataset.datapoints:
if not datapoint.target:
missing_target += 1
if missing_target > 0:
Expand All @@ -410,15 +390,15 @@ def run_eval(
try:
evaluation = client.evaluations.create(
name=name,
evaluators=[{"path": e["path"]} for e in evaluators], # type: ignore
evaluators=[{"path": e["path"]} for e in evaluators],
file={"id": hl_file.id},
)
except ApiError as error_:
# If the name exists, go and get it # TODO: Update API GET to allow querying by name and file.
if error_.status_code == 409:
evals = client.evaluations.list(file_id=hl_file.id, size=50)
for page in evals.iter_pages():
evaluation = next((e for e in page.items if e.name == name), None) # type: ignore
evaluation = next((e for e in page.items if e.name == name), None)
else:
raise error_
if not evaluation:
Expand All @@ -433,25 +413,19 @@ def run_eval(
# Every Run will generate a new batch of Logs
run_id = run.id

_PROGRESS_BAR = _SimpleProgressBar(len(hl_dataset.datapoints)) # type: ignore
_PROGRESS_BAR = _SimpleProgressBar(len(hl_dataset.datapoints))

# Define the function to execute the `callable` in parallel and Log to Humanloop
def process_datapoint(dp: Datapoint, file_id: str, file_path: str, run_id: str):
def upload_callback(log: dict):
def upload_callback(log_id: str):
"""Logic ran after the Log has been created."""
logger.debug(
"upload_callback on Thread %s: log %s datapoint_target %s",
threading.get_ident(),
log,
dp.target,
)
_run_local_evaluators(
client=client,
log=log,
datapoint_target=dp.target,
log_id=log_id,
datapoint=dp,
local_evaluators=local_evaluators,
)
_PROGRESS_BAR.increment() # type: ignore
_PROGRESS_BAR.increment()

datapoint_dict = dp.dict()
# Set the Evaluation Context for current datapoint
Expand All @@ -471,6 +445,7 @@ def upload_callback(log: dict):
# .get() is safe since process_datapoint is always called in the context of an Evaluation
evaluation_context_variable.get(),
)
# TODO: shouldn't this only be defined in case where we actually need to log?
log_func = _get_log_func(
client=client,
file_type=type_,
Expand All @@ -481,18 +456,12 @@ def upload_callback(log: dict):
start_time = datetime.now()
try:
if "messages" in datapoint_dict and datapoint_dict["messages"] is not None:
# function_ is decorated by Humanloop, the OTel Exporter will
# handle the logging, which will call the upload_callback
# function above when it's done
output = function_( # type: ignore
output = function_(
**datapoint_dict["inputs"],
messages=datapoint_dict["messages"],
)
else:
# function_ is decorated by Humanloop, the OTel Exporter will
# handle the logging, which will call the upload_callback
# function above when it's done
output = function_(**datapoint_dict["inputs"]) # type: ignore
output = function_(**datapoint_dict["inputs"])

if not isinstance(output, str):
try:
Expand All @@ -509,7 +478,7 @@ def upload_callback(log: dict):
logger.debug(
"process_datapoint on Thread %s: function_ %s is a simple callable, context was not consumed",
threading.get_ident(),
function_.__name__, # type: ignore
function_.__name__,
)
log_func(
inputs=datapoint.inputs,
Expand All @@ -534,12 +503,12 @@ def upload_callback(log: dict):
logger.info(f"{CYAN}Run ID: {run_id}{RESET}")

# Generate locally if a file `callable` is provided
if function_: # type: ignore
if function_:
logger.info(
f"{CYAN}\nRunning '{hl_file.name}' over the Dataset '{hl_dataset.name}' using {workers} workers{RESET} "
)
with ThreadPoolExecutor(max_workers=workers) as executor:
for datapoint in hl_dataset.datapoints: # type: ignore
for datapoint in hl_dataset.datapoints:
executor.submit(
process_datapoint,
datapoint,
Expand Down Expand Up @@ -572,8 +541,8 @@ def upload_callback(log: dict):

# Skip `check_evaluation_improvement` if no thresholds were provided and there is only one run.
# (Or the logs would not be helpful)
if any(evaluator.get("threshold") is not None for evaluator in evaluators) or len(stats.run_stats) > 1: # type: ignore
for evaluator in evaluators: # type: ignore
if any(evaluator.get("threshold") is not None for evaluator in evaluators) or len(stats.run_stats) > 1:
for evaluator in evaluators:
score, delta = _check_evaluation_improvement(
evaluation=evaluation,
stats=stats,
Expand Down Expand Up @@ -623,13 +592,13 @@ def _get_log_func(
"run_id": run_id,
}
if file_type == "flow":
return partial(client.flows.log, **log_request, trace_status="complete") # type: ignore
return partial(client.flows.log, **log_request, trace_status="complete")
elif file_type == "prompt":
return partial(client.prompts.log, **log_request) # type: ignore
return partial(client.prompts.log, **log_request)
elif file_type == "evaluator":
return partial(client.evaluators.log, **log_request) # type: ignore
return partial(client.evaluators.log, **log_request)
elif file_type == "tool":
return partial(client.tools.log, **log_request) # type: ignore
return partial(client.tools.log, **log_request)
else:
raise NotImplementedError(f"Unsupported File version: {file_type}")

Expand All @@ -643,10 +612,10 @@ def _get_score_from_evaluator_stat(
if stat.total_logs:
score = round(stat.num_true / stat.total_logs, 2)
elif isinstance(stat, NumericStats):
score = round(stat.mean, 2) # type: ignore
score = round(stat.mean, 2)
else:
raise ValueError(f"Unsupported Evaluator Stat type: {type(stat)}")
return score # type: ignore
return score


def _get_evaluator_stats_by_path(
Expand All @@ -660,7 +629,7 @@ def _get_evaluator_stats_by_path(
evaluators_by_id[evaluator_stat.evaluator_version_id].version.path: evaluator_stat
for evaluator_stat in stat.evaluator_stats
}
return evaluator_stats_by_path # type: ignore
return evaluator_stats_by_path


def _check_evaluation_threshold(
Expand All @@ -675,14 +644,14 @@ def _check_evaluation_threshold(
evaluator_stats_by_path = _get_evaluator_stats_by_path(
stat=next(
(stat for stat in stats.run_stats if stat.run_id == run_id),
None, # type: ignore
None,
),
evaluation=evaluation,
)
if evaluator_path in evaluator_stats_by_path:
evaluator_stat = evaluator_stats_by_path[evaluator_path]
score = _get_score_from_evaluator_stat(stat=evaluator_stat)
if score >= threshold: # type: ignore
if score >= threshold:
logger.info(
f"{GREEN}✅ Latest eval [{score}] above threshold [{threshold}] for evaluator {evaluator_path}.{RESET}"
)
Expand Down Expand Up @@ -712,7 +681,7 @@ def _check_evaluation_improvement(
latest_evaluator_stats_by_path = _get_evaluator_stats_by_path(
stat=next(
(stat for stat in stats.run_stats if stat.run_id == run_id),
None, # type: ignore
None,
),
evaluation=evaluation,
)
Expand All @@ -731,37 +700,45 @@ def _check_evaluation_improvement(
previous_score = _get_score_from_evaluator_stat(stat=previous_evaluator_stat)
if latest_score is None or previous_score is None:
raise ValueError(f"Could not find score for Evaluator {evaluator_path}.")
diff = round(latest_score - previous_score, 2) # type: ignore
diff = round(latest_score - previous_score, 2)
if diff >= 0:
logger.info(f"{CYAN}Change of [{diff}] for Evaluator {evaluator_path}{RESET}")
return True, latest_score, diff # type: ignore
return True, latest_score, diff
else:
logger.info(f"{CYAN}Change of [{diff}] for Evaluator {evaluator_path}{RESET}")
return False, latest_score, diff # type: ignore
return False, latest_score, diff
else:
raise ValueError(f"Evaluator {evaluator_path} not found in the stats.")


def _run_local_evaluators(
client: "BaseHumanloop",
log: dict,
datapoint_target: typing.Optional[typing.Dict[str, DatapointResponseTargetValue]],
log_id: str,
datapoint: Optional[Datapoint],
local_evaluators: list[Evaluator],
):
"""Run local Evaluators on the Log and send the judgments to Humanloop."""
# Need to get the full log to pass to the evaluators
log = client.logs.get(id=log_id)
if not isinstance(log, dict):
log_dict = log.dict()
else:
log_dict = log
datapoint_dict = datapoint.dict() if datapoint else None
for local_evaluator in local_evaluators:
start_time = datetime.now()
try:
eval_function = local_evaluator["callable"]
if local_evaluator["args_type"] == "target_required":
judgement = eval_function(
log,
datapoint_target,
log_dict,
datapoint_dict,
)
else:
judgement = eval_function(log)
judgement = eval_function(log_dict)

_ = client.evaluators.log(
parent_id=log["id"],
parent_id=log_id,
judgment=judgement,
id=local_evaluator.get("id"),
path=local_evaluator.get("path"),
Expand All @@ -770,7 +747,7 @@ def _run_local_evaluators(
)
except Exception as e:
_ = client.evaluators.log(
parent_id=log["id"],
parent_id=log_id,
path=local_evaluator.get("path"),
id=local_evaluator.get("id"),
error=str(e),
Expand Down
Loading