Skip to content
Merged
12 changes: 2 additions & 10 deletions src/sentry/api/endpoints/project_rule_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from sentry.workflow_engine.migration_helpers.rule_action import (
translate_rule_data_actions_to_notification_actions,
)
from sentry.workflow_engine.models import Detector, Workflow
from sentry.workflow_engine.models import Workflow
from sentry.workflow_engine.types import WorkflowEventData

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -161,14 +161,6 @@ def execute_future_on_test_event_workflow_engine(
organization=rule.project.organization,
)

detector = Detector(
id=TEST_NOTIFICATION_ID,
project=rule.project,
name=rule.label,
enabled=True,
type=ErrorGroupType.slug,
)

event_data = WorkflowEventData(
event=test_event,
group=test_event.group,
Expand All @@ -190,7 +182,7 @@ def execute_future_on_test_event_workflow_engine(
action_exceptions.append(f"An unexpected error occurred. Error ID: '{error_id}'")
continue

action_exceptions.extend(test_fire_action(action, event_data, detector))
action_exceptions.extend(test_fire_action(action, event_data))

status = None
data = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
)
from sentry.workflow_engine.endpoints.utils.test_fire_action import test_fire_action
from sentry.workflow_engine.endpoints.validators.base.action import BaseActionValidator
from sentry.workflow_engine.models import Action, Detector
from sentry.workflow_engine.models import Action
from sentry.workflow_engine.types import WorkflowEventData

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -121,14 +121,6 @@ def test_fire_actions(actions: list[dict[str, Any]], project: Project):
group=test_event.group,
)

detector = Detector(
id=TEST_NOTIFICATION_ID,
project=project,
name="Test Detector",
enabled=True,
type="error",
)

for action_data in actions:
# Create a temporary Action object (not saved to database)
action = Action(
Expand All @@ -143,7 +135,7 @@ def test_fire_actions(actions: list[dict[str, Any]], project: Project):
setattr(action, "workflow_id", workflow_id)

# Test fire the action and collect any exceptions
exceptions = test_fire_action(action, workflow_event_data, detector)
exceptions = test_fire_action(action, workflow_event_data)
if exceptions:
action_exceptions.extend(exceptions)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,20 @@
import sentry_sdk

from sentry.shared_integrations.exceptions import IntegrationFormError
from sentry.workflow_engine.models import Action, Detector
from sentry.workflow_engine.models import Action
from sentry.workflow_engine.types import WorkflowEventData

logger = logging.getLogger(__name__)


def test_fire_action(
action: Action, event_data: WorkflowEventData, detector: Detector
) -> list[str]:
def test_fire_action(action: Action, event_data: WorkflowEventData) -> list[str]:
"""
This function will fire an action and return a list of exceptions that occurred.
"""
action_exceptions = []
try:
action.trigger(
event_data=event_data,
detector=detector,
)
except Exception as exc:
if isinstance(exc, IntegrationFormError):
Expand Down
12 changes: 6 additions & 6 deletions src/sentry/workflow_engine/models/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import builtins
import logging
from enum import StrEnum
from typing import TYPE_CHECKING, ClassVar, TypedDict
from typing import ClassVar, TypedDict

from django.db import models
from django.db.models import Q
Expand All @@ -23,10 +23,6 @@
from sentry.workflow_engine.registry import action_handler_registry
from sentry.workflow_engine.types import ActionHandler, WorkflowEventData

if TYPE_CHECKING:
from sentry.workflow_engine.models import Detector


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -127,7 +123,11 @@ def get_handler(self) -> builtins.type[ActionHandler]:
action_type = Action.Type(self.type)
return action_handler_registry.get(action_type)

def trigger(self, event_data: WorkflowEventData, detector: Detector) -> None:
def trigger(self, event_data: WorkflowEventData) -> None:
from sentry.workflow_engine.processors.detector import get_detector_from_event_data

detector = get_detector_from_event_data(event_data)

with metrics.timer(
"workflow_engine.action.trigger.execution_time",
tags={"action_type": self.type, "detector_type": detector.type},
Expand Down
44 changes: 44 additions & 0 deletions src/sentry/workflow_engine/processors/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.issues.producer import PayloadType, produce_occurrence_to_kafka
from sentry.locks import locks
from sentry.models.activity import Activity
from sentry.models.group import Group
from sentry.models.project import Project
from sentry.services.eventstore.models import GroupEvent
Expand Down Expand Up @@ -139,6 +140,49 @@ def get_detector_by_event(event_data: WorkflowEventData) -> Detector:
return detector


def get_detector_by_group(group: Group) -> Detector:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this merits a comment "Returns Detector associated with this group, either based on DetectorGroup, (project, type), or if those fail, returns the Issue Stream detector". or something.

try:
detector = DetectorGroup.objects.get(group=group).detector
if detector is not None:
return detector
except DetectorGroup.DoesNotExist:
logger.exception(
"DetectorGroup not found for group",
extra={"group_id": group.id},
)
pass

try:
return Detector.objects.get(project_id=group.project_id, type=group.issue_type.slug)
Copy link
Member

@ceorourke ceorourke Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What other types of detectors (besides the issue stream type handled later) are there only 1 per project? Is it just the error detector type or are there others?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The performance detectors will probably be 1 per project(?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so there's an assumption that anything found for DetectorGroup is for metric issues, then it tries the other types and finally falls back to issue stream group types? Would an uptime or cron detector potentially hit a MultipleObjectsReturned? Maybe it'd be safer to pass a list of types

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think uptime and crons hit DetectorGroup too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated it so that this is only used for activity notifications (e.g. resolve for metric issues)

We only process updates if the detector_id is attached to the StatusChangeMessage

detector_id = status_change_message.get("detector_id")
if detector_id is None:
# We should not hit this case, it's should only occur if there is a bug
# passing it from the workflow_engine to the issue platform.
metrics.incr("workflow_engine.tasks.error.no_detector_id")
return

If detector_id exists, I believe it must have existed when the occurrence to create the issue was sent to issue platform, meaning we would have already written the DetectorGroup row

if is_new and occurrence.evidence_data and "detector_id" in occurrence.evidence_data:
associate_new_group_with_detector(group, occurrence.evidence_data["detector_id"])

except (Detector.DoesNotExist, Detector.MultipleObjectsReturned):
# return issue stream detector
return Detector.objects.get(project_id=group.project_id, type=IssueStreamGroupType.slug)


def get_detector_from_event_data(event_data: WorkflowEventData) -> Detector:
try:
if isinstance(event_data.event, GroupEvent):
return get_detector_by_event(event_data)
elif isinstance(event_data.event, Activity):
return get_detector_by_group(event_data.group)
else:
raise TypeError(f"Cannot determine the detector from {type(event_data.event)}.")
except Detector.DoesNotExist:
logger.exception(
"Detector not found for event data",
extra={
"type": type(event_data.event),
"id": (
event_data.event.event_id
if isinstance(event_data.event, GroupEvent)
else event_data.event.id
),
"group_id": event_data.group.id,
},
)
raise


class _SplitEvents(NamedTuple):
events_with_occurrences: list[tuple[GroupEvent, int]]
error_events: list[GroupEvent]
Expand Down
28 changes: 16 additions & 12 deletions src/sentry/workflow_engine/tasks/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ def build_trigger_action_task_params(
@retry(timeouts=True, raise_on_no_retries=False, ignore_and_capture=Action.DoesNotExist)
def trigger_action(
action_id: int,
detector_id: int,
workflow_id: int,
event_id: str | None,
activity_id: int | None,
Expand All @@ -76,8 +75,10 @@ def trigger_action(
group_state: GroupState,
has_reappeared: bool,
has_escalated: bool,
detector_id: int | None = None,
) -> None:
from sentry.notifications.notification_action.utils import should_fire_workflow_actions
from sentry.workflow_engine.processors.detector import get_detector_from_event_data

# XOR check to ensure exactly one of event_id or activity_id is provided
if (event_id is not None) == (activity_id is not None):
Expand All @@ -88,19 +89,14 @@ def trigger_action(
raise ValueError("Exactly one of event_id or activity_id must be provided")

action = Action.objects.annotate(workflow_id=Value(workflow_id)).get(id=action_id)
detector = Detector.objects.get(id=detector_id)

metrics.incr(
"workflow_engine.tasks.trigger_action_task_started",
tags={"action_type": action.type, "detector_type": detector.type},
sample_rate=1.0,
)

project_id = detector.project_id
# TODO: remove detector usage from this task
detector: Detector | None = None
if detector_id is not None:
detector = Detector.objects.get(id=detector_id)

if event_id is not None:
event_data = build_workflow_event_data_from_event(
project_id=project_id,
event_id=event_id,
group_id=group_id,
workflow_id=workflow_id,
Expand All @@ -109,7 +105,6 @@ def trigger_action(
has_reappeared=has_reappeared,
has_escalated=has_escalated,
)

elif activity_id is not None:
event_data = build_workflow_event_data_from_activity(
activity_id=activity_id, group_id=group_id
Expand All @@ -122,6 +117,15 @@ def trigger_action(
)
raise ValueError("Exactly one of event_id or activity_id must be provided")

if not detector:
detector = get_detector_from_event_data(event_data)

metrics.incr(
"workflow_engine.tasks.trigger_action_task_started",
tags={"action_type": action.type, "detector_type": detector.type},
sample_rate=1.0,
)

should_trigger_actions = should_fire_workflow_actions(
detector.project.organization, event_data.group.type
)
Expand All @@ -130,7 +134,7 @@ def trigger_action(
# Set up a timeout grouping context because we want to make sure any Sentry timeout reporting
# in this scope is grouped properly.
with timeout_grouping_context(action.type):
action.trigger(event_data, detector)
action.trigger(event_data)
else:
logger.info(
"workflow_engine.triggered_actions.dry-run",
Expand Down
5 changes: 2 additions & 3 deletions src/sentry/workflow_engine/tasks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ def __init__(self, event_id: str, project_id: int):

@scopedstats.timer()
def build_workflow_event_data_from_event(
project_id: int,
event_id: str,
group_id: int,
workflow_id: int | None = None,
Expand All @@ -78,14 +77,14 @@ def build_workflow_event_data_from_event(
This method handles all the database fetching and object construction logic.
Raises EventNotFoundError if the event is not found.
"""

group = Group.objects.get_from_cache(id=group_id)
project_id = group.project_id
event = fetch_event(event_id, project_id)
if event is None:
raise EventNotFoundError(event_id, project_id)

occurrence = IssueOccurrence.fetch(occurrence_id, project_id) if occurrence_id else None

group = Group.objects.get_from_cache(id=group_id)
group_event = GroupEvent.from_event(event, group)
group_event.occurrence = occurrence

Expand Down
3 changes: 1 addition & 2 deletions src/sentry/workflow_engine/tasks/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ def process_workflow_activity(activity_id: int, group_id: int, detector_id: int)
on_silent=DataConditionGroup.DoesNotExist,
)
def process_workflows_event(
project_id: int,
event_id: str,
group_id: int,
occurrence_id: str | None,
group_state: GroupState,
has_reappeared: bool,
has_escalated: bool,
start_timestamp_seconds: float | None = None,
project_id: int | None = None,
**kwargs: dict[str, Any],
) -> None:
from sentry.workflow_engine.processors.workflow import process_workflows
Expand All @@ -114,7 +114,6 @@ def process_workflows_event(
with recorder.record():
try:
event_data = build_workflow_event_data_from_event(
project_id=project_id,
event_id=event_id,
group_id=group_id,
occurrence_id=occurrence_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from sentry.testutils.silo import assume_test_silo_mode
from sentry.testutils.skips import requires_snuba
from sentry.workflow_engine.models import Action
from sentry.workflow_engine.typings.grouptype import IssueStreamGroupType
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest

pytestmark = [requires_snuba]
Expand All @@ -34,6 +35,10 @@ def setUp(self) -> None:
super().setUp()
self.login_as(self.user)
self.project = self.create_project(organization=self.organization)
self.detector = self.create_detector(project=self.project)
self.issue_stream_detector = self.create_detector(
project=self.project, type=IssueStreamGroupType.slug
)
self.workflow = self.create_workflow()

def setup_pd_service(self) -> PagerDutyServiceDict:
Expand Down Expand Up @@ -94,7 +99,7 @@ def test_pagerduty_action(
assert mock_send_trigger.call_count == 1
pagerduty_data = mock_send_trigger.call_args.kwargs.get("data")
assert pagerduty_data is not None
assert pagerduty_data["payload"]["summary"].startswith("[Test Detector]:")
assert pagerduty_data["payload"]["summary"].startswith(f"[{self.detector.name}]:")

@mock.patch.object(NotifyEventAction, "after")
@mock.patch(
Expand Down
Loading
Loading