Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/sentry/monitors/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,7 @@ class Meta:

@data_source_type_registry.register(DATA_SOURCE_CRON_MONITOR)
class CronMonitorDataSourceHandler(DataSourceTypeHandler[Monitor]):
@override
@staticmethod
def bulk_get_query_object(
data_sources: list[DataSource],
Expand All @@ -834,6 +835,7 @@ def bulk_get_query_object(
}
return {ds.id: qs_lookup.get(ds.source_id) for ds in data_sources}

@override
@staticmethod
def related_model(instance) -> list[ModelRelation]:
return [ModelRelation(Monitor, {"id": instance.source_id})]
Expand All @@ -848,3 +850,8 @@ def get_instance_limit(org: Organization) -> int | None:
def get_current_instance_count(org: Organization) -> int:
# We don't have a limit at the moment, so no need to count.
raise NotImplementedError

@override
@staticmethod
def get_relocation_model_name() -> str:
return "monitors.monitor"
7 changes: 7 additions & 0 deletions src/sentry/snuba/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def write_relocation_import(

@data_source_type_registry.register(DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION)
class QuerySubscriptionDataSourceHandler(DataSourceTypeHandler[QuerySubscription]):
@override
@staticmethod
def bulk_get_query_object(
data_sources: list[DataSource],
Expand All @@ -203,6 +204,7 @@ def bulk_get_query_object(
}
return {ds.id: qs_lookup.get(ds.source_id) for ds in data_sources}

@override
@staticmethod
def related_model(instance) -> list[ModelRelation]:
return [ModelRelation(QuerySubscription, {"id": instance.source_id})]
Expand All @@ -223,3 +225,8 @@ def get_current_instance_count(org: Organization) -> int:
QuerySubscription.Status.UPDATING.value,
),
).count()

@override
@staticmethod
def get_relocation_model_name() -> str:
return "sentry.querysubscription"
7 changes: 7 additions & 0 deletions src/sentry/uptime/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ class UptimeRegionScheduleMode(enum.StrEnum):

@data_source_type_registry.register(DATA_SOURCE_UPTIME_SUBSCRIPTION)
class UptimeSubscriptionDataSourceHandler(DataSourceTypeHandler[UptimeSubscription]):
@override
@staticmethod
def bulk_get_query_object(
data_sources: list[DataSource],
Expand All @@ -210,6 +211,7 @@ def bulk_get_query_object(
}
return {ds.id: qs_lookup.get(ds.source_id) for ds in data_sources}

@override
@staticmethod
def related_model(instance) -> list[ModelRelation]:
return [ModelRelation(UptimeSubscription, {"id": instance.source_id})]
Expand All @@ -225,6 +227,11 @@ def get_current_instance_count(org: Organization) -> int:
# We don't have a limit at the moment, so no need to count.
raise NotImplementedError

@override
@staticmethod
def get_relocation_model_name() -> str:
return "uptime.uptimesubscription"


def get_detector(uptime_subscription: UptimeSubscription, prefetch_workflow_data=False) -> Detector:
"""
Expand Down
51 changes: 50 additions & 1 deletion src/sentry/workflow_engine/models/data_source.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
import builtins
import dataclasses
import logging
from typing import Generic, TypeVar

from django.db import models
from django.db.models.signals import pre_save
from django.dispatch import receiver

from sentry.backup.scopes import RelocationScope
from sentry.backup.dependencies import NormalizedModelName, PrimaryKeyMap
from sentry.backup.helpers import ImportFlags
from sentry.backup.scopes import ImportScope, RelocationScope
from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model
from sentry.utils.registry import NoRegistrationExistsError
from sentry.workflow_engine.models.data_source_detector import DataSourceDetector
from sentry.workflow_engine.registry import data_source_type_registry
from sentry.workflow_engine.types import DataSourceTypeHandler

logger = logging.getLogger(__name__)

T = TypeVar("T")


Expand All @@ -25,6 +30,13 @@ class DataPacket(Generic[T]):
@region_silo_model
class DataSource(DefaultFieldsModel):
__relocation_scope__ = RelocationScope.Organization
# DataSource.source_id dynamically references different models based on the 'type' field.
# We declare all possible dependencies here to ensure proper import ordering.
Comment on lines +33 to +34
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the order matter because we need the pk mappings for the other tables to exist before this table is relocated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

__relocation_dependencies__ = {
"monitors.monitor", # For DATA_SOURCE_CRON_MONITOR
"sentry.querysubscription", # For DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION
"uptime.uptimesubscription", # For DATA_SOURCE_UPTIME_SUBSCRIPTION
}

organization = FlexibleForeignKey("sentry.Organization")

Expand All @@ -49,6 +61,43 @@ def type_handler(self) -> builtins.type[DataSourceTypeHandler]:
raise ValueError(f"Unknown data source type: {self.type}")
return handler

def normalize_before_relocation_import(
self, pk_map: PrimaryKeyMap, scope: ImportScope, flags: ImportFlags
) -> int | None:
old_pk = super().normalize_before_relocation_import(pk_map, scope, flags)
if old_pk is None:
return None

# Map source_id based on the data source type
try:
handler = data_source_type_registry.get(self.type)
model_name = NormalizedModelName(handler.get_relocation_model_name())
old_source_id = int(self.source_id)
new_source_id = pk_map.get_pk(model_name, old_source_id)

if new_source_id is not None:
self.source_id = str(new_source_id)
else:
# Referenced model not in pk_map. This may be correct (reset_pks=False) or broken
# (reset_pks=True but referenced model was filtered out or failed to import).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return None if we can't map the source_id? I think with this logic it will keep this object mapped to the old source_id, which could be inaccurate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should, and now we do. I got confused by test logic, but now know better.

logger.warning(
"DataSource source_id not remapped - referenced model not in pk_map",
extra={
"data_source_id": old_pk,
"type": self.type,
"source_id": old_source_id,
"model": str(model_name),
},
)
except Exception:
logger.exception(
"DataSource.normalize_before_relocation_import failed",
extra={"data_source_id": old_pk, "type": self.type, "source_id": self.source_id},
)
return None

return old_pk


@receiver(pre_save, sender=DataSource)
def ensure_type_handler_registered(sender, instance: DataSource, **kwargs):
Expand Down
16 changes: 15 additions & 1 deletion src/sentry/workflow_engine/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ def execute(event_data: WorkflowEventData, action: Action, detector: Detector) -
raise NotImplementedError


class DataSourceTypeHandler(Generic[T]):
class DataSourceTypeHandler(ABC, Generic[T]):
@staticmethod
@abstractmethod
def bulk_get_query_object(data_sources) -> dict[int, T | None]:
"""
Bulk fetch related data-source models returning a dict of the
Expand All @@ -118,6 +119,7 @@ def bulk_get_query_object(data_sources) -> dict[int, T | None]:
raise NotImplementedError

@staticmethod
@abstractmethod
def related_model(instance) -> list[ModelRelation]:
"""
A list of deletion ModelRelations. The model relation query should map
Expand All @@ -127,6 +129,7 @@ def related_model(instance) -> list[ModelRelation]:
raise NotImplementedError

@staticmethod
@abstractmethod
def get_instance_limit(org: Organization) -> int | None:
"""
Returns the maximum number of instances of this data source type for the organization.
Expand All @@ -135,13 +138,24 @@ def get_instance_limit(org: Organization) -> int | None:
raise NotImplementedError

@staticmethod
@abstractmethod
def get_current_instance_count(org: Organization) -> int:
"""
Returns the current number of instances of this data source type for the organization.
Only called if `get_instance_limit` returns a number >0
"""
raise NotImplementedError

@staticmethod
@abstractmethod
def get_relocation_model_name() -> str:
"""
Returns the normalized model name (e.g., "sentry.querysubscription") for the model that
source_id references. This is used during backup/relocation to map old PKs to new PKs.
The format is "app_label.model_name" in lowercase.
"""
raise NotImplementedError


class DataConditionHandler(Generic[T]):
class Group(StrEnum):
Expand Down
74 changes: 74 additions & 0 deletions tests/sentry/workflow_engine/models/test_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import pytest

from sentry.backup.dependencies import ImportKind, NormalizedModelName, PrimaryKeyMap
from sentry.backup.helpers import ImportFlags
from sentry.backup.scopes import ImportScope
from sentry.monitors.types import DATA_SOURCE_CRON_MONITOR
from sentry.workflow_engine.registry import data_source_type_registry
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest

Expand All @@ -18,3 +22,73 @@ def test_data_source_valid_type(self) -> None:
data_source = self.create_data_source(type="test")
assert data_source is not None
assert data_source.type == "test"

def test_normalize_before_relocation_import(self) -> None:
"""Test that normalize_before_relocation_import correctly maps source_id"""
monitor = self.create_monitor(project=self.project)
data_source = self.create_data_source(
type=DATA_SOURCE_CRON_MONITOR,
source_id=str(monitor.id),
organization_id=self.organization.id,
)

old_monitor_pk = monitor.id
new_monitor_pk = 9999
old_data_source_id = data_source.id
old_org_id = data_source.organization_id

# Create a PrimaryKeyMap that maps the old monitor ID to a new one
pk_map = PrimaryKeyMap()
pk_map.insert(
model_name=NormalizedModelName("monitors.monitor"),
old=old_monitor_pk,
new=new_monitor_pk,
kind=ImportKind.Inserted,
)
pk_map.insert(
model_name=NormalizedModelName("sentry.organization"),
old=old_org_id,
new=old_org_id,
kind=ImportKind.Inserted,
)

old_data_source_pk = data_source.normalize_before_relocation_import(
pk_map, ImportScope.Organization, ImportFlags()
)

assert (
old_data_source_pk == old_data_source_id
), f"Expected {old_data_source_id}, got {old_data_source_pk}"
assert data_source.source_id == str(new_monitor_pk)
assert data_source.pk is None

def test_normalize_before_relocation_import_missing_source(self) -> None:
"""Test that normalize_before_relocation_import succeeds but doesn't update source_id if mapping not found"""
monitor = self.create_monitor(project=self.project)
data_source = self.create_data_source(
type=DATA_SOURCE_CRON_MONITOR,
source_id=str(monitor.id),
organization_id=self.organization.id,
)

old_source_id = data_source.source_id
old_data_source_id = data_source.id
old_org_id = data_source.organization_id

# Create a PrimaryKeyMap without the monitor mapping
pk_map = PrimaryKeyMap()
pk_map.insert(
model_name=NormalizedModelName("sentry.organization"),
old=old_org_id,
new=old_org_id,
kind=ImportKind.Inserted,
)

result = data_source.normalize_before_relocation_import(
pk_map, ImportScope.Organization, ImportFlags()
)

# Should succeed but leave source_id unchanged
assert result == old_data_source_id
assert data_source.source_id == old_source_id
assert data_source.pk is None
Loading