Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion src/sentry/backup/comparators.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,17 @@ def compare(self, on: InstanceID, left: Any, right: Any) -> list[ComparatorFindi
return findings


class DataSourceComparator(IgnoredComparator):
"""
DataSource.source_id is a dynamic foreign key that gets remapped during import via the
normalize_before_relocation_import method. Since the remapping is handled there, we just
need to verify that both sides have a valid source_id value, without comparing the actual values.
Comment on lines +741 to +743
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

"""

def __init__(self):
super().__init__("source_id")


def auto_assign_datetime_equality_comparators(comps: ComparatorMap) -> None:
"""Automatically assigns the DateAddedComparator to any `DateTimeField` that is not already
claimed by the `DateUpdatedComparator`."""
Expand Down Expand Up @@ -929,7 +940,10 @@ def get_default_comparators() -> dict[str, list[JSONScrubbingComparator]]:
"workflow_engine.dataconditiongroupaction": [
DateUpdatedComparator("date_updated", "date_added")
],
"workflow_engine.datasource": [DateUpdatedComparator("date_updated", "date_added")],
"workflow_engine.datasource": [
DateUpdatedComparator("date_updated", "date_added"),
DataSourceComparator(),
],
"workflow_engine.datasourcedetector": [
DateUpdatedComparator("date_updated", "date_added")
],
Expand Down
7 changes: 7 additions & 0 deletions src/sentry/backup/findings.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ class ComparatorFindingKind(FindingKind):
# or `None`.
ForeignKeyComparatorExistenceCheck = auto()

# DataSource.source_id field comparison failed (dynamic foreign key).
DataSourceComparator = auto()

# Failed to compare DataSource.source_id field because one of the fields being compared was not present
# or `None`.
DataSourceComparatorExistenceCheck = auto()

# Failed to compare an ignored field.
IgnoredComparator = auto()

Expand Down
7 changes: 7 additions & 0 deletions src/sentry/monitors/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,7 @@ class Meta:

@data_source_type_registry.register(DATA_SOURCE_CRON_MONITOR)
class CronMonitorDataSourceHandler(DataSourceTypeHandler[Monitor]):
@override
@staticmethod
def bulk_get_query_object(
data_sources: list[DataSource],
Expand All @@ -834,6 +835,7 @@ def bulk_get_query_object(
}
return {ds.id: qs_lookup.get(ds.source_id) for ds in data_sources}

@override
@staticmethod
def related_model(instance) -> list[ModelRelation]:
return [ModelRelation(Monitor, {"id": instance.source_id})]
Expand All @@ -848,3 +850,8 @@ def get_instance_limit(org: Organization) -> int | None:
def get_current_instance_count(org: Organization) -> int:
# We don't have a limit at the moment, so no need to count.
raise NotImplementedError

@override
@staticmethod
def get_relocation_model_name() -> str:
return "monitors.monitor"
7 changes: 7 additions & 0 deletions src/sentry/snuba/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def write_relocation_import(

@data_source_type_registry.register(DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION)
class QuerySubscriptionDataSourceHandler(DataSourceTypeHandler[QuerySubscription]):
@override
@staticmethod
def bulk_get_query_object(
data_sources: list[DataSource],
Expand All @@ -203,6 +204,7 @@ def bulk_get_query_object(
}
return {ds.id: qs_lookup.get(ds.source_id) for ds in data_sources}

@override
@staticmethod
def related_model(instance) -> list[ModelRelation]:
return [ModelRelation(QuerySubscription, {"id": instance.source_id})]
Expand All @@ -223,3 +225,8 @@ def get_current_instance_count(org: Organization) -> int:
QuerySubscription.Status.UPDATING.value,
),
).count()

@override
@staticmethod
def get_relocation_model_name() -> str:
return "sentry.querysubscription"
11 changes: 10 additions & 1 deletion src/sentry/testutils/helpers/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
from sentry.services.nodestore.django.models import Node
from sentry.silo.base import SiloMode
from sentry.silo.safety import unguarded_write
from sentry.snuba.models import QuerySubscriptionDataSourceHandler
from sentry.tempest.models import TempestCredentials
from sentry.testutils.cases import TestCase, TransactionTestCase
from sentry.testutils.factories import get_fixture_path
Expand All @@ -121,6 +122,7 @@
from sentry.utils import json
from sentry.workflow_engine.models import Action, DataConditionAlertRuleTrigger, DataConditionGroup
from sentry.workflow_engine.models.workflow_action_group_status import WorkflowActionGroupStatus
from sentry.workflow_engine.registry import data_source_type_registry

__all__ = [
"export_to_file",
Expand Down Expand Up @@ -696,7 +698,14 @@ def create_exhaustive_organization(
workflow=workflow, condition_group=notification_condition_group
)

data_source = self.create_data_source(organization=org)
# Use the alert_rule's QuerySubscription for the DataSource
query_subscription = alert.snuba_query.subscriptions.first()
assert query_subscription is not None
data_source = self.create_data_source(
organization=org,
source_id=str(query_subscription.id),
type=data_source_type_registry.get_key(QuerySubscriptionDataSourceHandler),
)

self.create_data_source_detector(data_source, detector)
detector_conditions = self.create_data_condition_group(
Expand Down
7 changes: 7 additions & 0 deletions src/sentry/uptime/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ class UptimeRegionScheduleMode(enum.StrEnum):

@data_source_type_registry.register(DATA_SOURCE_UPTIME_SUBSCRIPTION)
class UptimeSubscriptionDataSourceHandler(DataSourceTypeHandler[UptimeSubscription]):
@override
@staticmethod
def bulk_get_query_object(
data_sources: list[DataSource],
Expand All @@ -210,6 +211,7 @@ def bulk_get_query_object(
}
return {ds.id: qs_lookup.get(ds.source_id) for ds in data_sources}

@override
@staticmethod
def related_model(instance) -> list[ModelRelation]:
return [ModelRelation(UptimeSubscription, {"id": instance.source_id})]
Expand All @@ -225,6 +227,11 @@ def get_current_instance_count(org: Organization) -> int:
# We don't have a limit at the moment, so no need to count.
raise NotImplementedError

@override
@staticmethod
def get_relocation_model_name() -> str:
return "uptime.uptimesubscription"


def get_detector(uptime_subscription: UptimeSubscription, prefetch_workflow_data=False) -> Detector:
"""
Expand Down
42 changes: 41 additions & 1 deletion src/sentry/workflow_engine/models/data_source.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
import builtins
import dataclasses
import logging
from typing import Generic, TypeVar

from django.db import models
from django.db.models.signals import pre_save
from django.dispatch import receiver

from sentry.backup.scopes import RelocationScope
from sentry.backup.dependencies import NormalizedModelName, PrimaryKeyMap
from sentry.backup.helpers import ImportFlags
from sentry.backup.scopes import ImportScope, RelocationScope
from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model
from sentry.utils.registry import NoRegistrationExistsError
from sentry.workflow_engine.models.data_source_detector import DataSourceDetector
from sentry.workflow_engine.registry import data_source_type_registry
from sentry.workflow_engine.types import DataSourceTypeHandler

logger = logging.getLogger(__name__)

T = TypeVar("T")


Expand All @@ -25,6 +30,13 @@ class DataPacket(Generic[T]):
@region_silo_model
class DataSource(DefaultFieldsModel):
__relocation_scope__ = RelocationScope.Organization
# DataSource.source_id dynamically references different models based on the 'type' field.
# We declare all possible dependencies here to ensure proper import ordering.
Comment on lines +33 to +34
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the order matter because we need the pk mappings for the other tables to exist before this table is relocated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

__relocation_dependencies__ = {
"monitors.monitor", # For DATA_SOURCE_CRON_MONITOR
"sentry.querysubscription", # For DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION
"uptime.uptimesubscription", # For DATA_SOURCE_UPTIME_SUBSCRIPTION
}

organization = FlexibleForeignKey("sentry.Organization")

Expand All @@ -49,6 +61,34 @@ def type_handler(self) -> builtins.type[DataSourceTypeHandler]:
raise ValueError(f"Unknown data source type: {self.type}")
return handler

def normalize_before_relocation_import(
self, pk_map: PrimaryKeyMap, scope: ImportScope, flags: ImportFlags
) -> int | None:
old_pk = super().normalize_before_relocation_import(pk_map, scope, flags)
if old_pk is None:
return None

# Map source_id based on the data source type
try:
handler = data_source_type_registry.get(self.type)
model_name = NormalizedModelName(handler.get_relocation_model_name())
old_source_id = int(self.source_id)
new_source_id = pk_map.get_pk(model_name, old_source_id)

if new_source_id is None:
# Referenced model not in pk_map - the source was filtered out or failed to import.
return None

self.source_id = str(new_source_id)
except Exception:
logger.exception(
"DataSource.normalize_before_relocation_import failed",
extra={"data_source_id": old_pk, "type": self.type, "source_id": self.source_id},
)
return None

return old_pk


@receiver(pre_save, sender=DataSource)
def ensure_type_handler_registered(sender, instance: DataSource, **kwargs):
Expand Down
16 changes: 15 additions & 1 deletion src/sentry/workflow_engine/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ def execute(event_data: WorkflowEventData, action: Action, detector: Detector) -
raise NotImplementedError


class DataSourceTypeHandler(Generic[T]):
class DataSourceTypeHandler(ABC, Generic[T]):
@staticmethod
@abstractmethod
def bulk_get_query_object(data_sources) -> dict[int, T | None]:
"""
Bulk fetch related data-source models returning a dict of the
Expand All @@ -118,6 +119,7 @@ def bulk_get_query_object(data_sources) -> dict[int, T | None]:
raise NotImplementedError

@staticmethod
@abstractmethod
def related_model(instance) -> list[ModelRelation]:
"""
A list of deletion ModelRelations. The model relation query should map
Expand All @@ -127,6 +129,7 @@ def related_model(instance) -> list[ModelRelation]:
raise NotImplementedError

@staticmethod
@abstractmethod
def get_instance_limit(org: Organization) -> int | None:
"""
Returns the maximum number of instances of this data source type for the organization.
Expand All @@ -135,13 +138,24 @@ def get_instance_limit(org: Organization) -> int | None:
raise NotImplementedError

@staticmethod
@abstractmethod
def get_current_instance_count(org: Organization) -> int:
"""
Returns the current number of instances of this data source type for the organization.
Only called if `get_instance_limit` returns a number >0
"""
raise NotImplementedError

@staticmethod
@abstractmethod
def get_relocation_model_name() -> str:
"""
Returns the normalized model name (e.g., "sentry.querysubscription") for the model that
source_id references. This is used during backup/relocation to map old PKs to new PKs.
The format is "app_label.model_name" in lowercase.
"""
raise NotImplementedError


class DataConditionHandler(Generic[T]):
class Group(StrEnum):
Expand Down
68 changes: 68 additions & 0 deletions tests/sentry/workflow_engine/models/test_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import pytest

from sentry.backup.dependencies import ImportKind, NormalizedModelName, PrimaryKeyMap
from sentry.backup.helpers import ImportFlags
from sentry.backup.scopes import ImportScope
from sentry.monitors.types import DATA_SOURCE_CRON_MONITOR
from sentry.workflow_engine.registry import data_source_type_registry
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest

Expand All @@ -18,3 +22,67 @@ def test_data_source_valid_type(self) -> None:
data_source = self.create_data_source(type="test")
assert data_source is not None
assert data_source.type == "test"

def test_normalize_before_relocation_import(self) -> None:
monitor = self.create_monitor(project=self.project)
data_source = self.create_data_source(
type=DATA_SOURCE_CRON_MONITOR,
source_id=str(monitor.id),
organization_id=self.organization.id,
)

old_monitor_pk = monitor.id
new_monitor_pk = 9999
old_data_source_id = data_source.id
old_org_id = data_source.organization_id

# Create a PrimaryKeyMap that maps the old monitor ID to a new one
pk_map = PrimaryKeyMap()
pk_map.insert(
model_name=NormalizedModelName("monitors.monitor"),
old=old_monitor_pk,
new=new_monitor_pk,
kind=ImportKind.Inserted,
)
pk_map.insert(
model_name=NormalizedModelName("sentry.organization"),
old=old_org_id,
new=old_org_id,
kind=ImportKind.Inserted,
)

old_data_source_pk = data_source.normalize_before_relocation_import(
pk_map, ImportScope.Organization, ImportFlags()
)

assert (
old_data_source_pk == old_data_source_id
), f"Expected {old_data_source_id}, got {old_data_source_pk}"
assert data_source.source_id == str(new_monitor_pk)
assert data_source.pk is None

def test_normalize_before_relocation_import_missing_source(self) -> None:
monitor = self.create_monitor(project=self.project)
data_source = self.create_data_source(
type=DATA_SOURCE_CRON_MONITOR,
source_id=str(monitor.id),
organization_id=self.organization.id,
)

old_org_id = data_source.organization_id

# Create a PrimaryKeyMap without the monitor mapping
pk_map = PrimaryKeyMap()
pk_map.insert(
model_name=NormalizedModelName("sentry.organization"),
old=old_org_id,
new=old_org_id,
kind=ImportKind.Inserted,
)

result = data_source.normalize_before_relocation_import(
pk_map, ImportScope.Organization, ImportFlags()
)

# Should return None when the referenced source is not in pk_map
assert result is None
Loading