diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props
index 7e49d6d805..f0987c5c92 100644
--- a/src/Directory.Packages.props
+++ b/src/Directory.Packages.props
@@ -19,6 +19,10 @@
+
+
+
+
@@ -50,6 +54,7 @@
+
@@ -62,6 +67,7 @@
+
@@ -78,6 +84,9 @@
+
+
+
diff --git a/src/ProjectReferences.Persisters.Primary.props b/src/ProjectReferences.Persisters.Primary.props
index 255b45ed5c..c0dfc95209 100644
--- a/src/ProjectReferences.Persisters.Primary.props
+++ b/src/ProjectReferences.Persisters.Primary.props
@@ -2,6 +2,9 @@
+
+
+
\ No newline at end of file
diff --git a/src/ServiceControl.Persistence.Sql.Core/.editorconfig b/src/ServiceControl.Persistence.Sql.Core/.editorconfig
new file mode 100644
index 0000000000..bedef15fb6
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/.editorconfig
@@ -0,0 +1,9 @@
+[*.cs]
+
+# Justification: ServiceControl app has no synchronization context
+dotnet_diagnostic.CA2007.severity = none
+
+# Disable style rules for auto-generated EF migrations
+[**/Migrations/*.cs]
+dotnet_diagnostic.IDE0065.severity = none
+generated_code = true
diff --git a/src/ServiceControl.Persistence.Sql.Core/Abstractions/BasePersistence.cs b/src/ServiceControl.Persistence.Sql.Core/Abstractions/BasePersistence.cs
new file mode 100644
index 0000000000..3329c92b36
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Abstractions/BasePersistence.cs
@@ -0,0 +1,43 @@
+namespace ServiceControl.Persistence.Sql.Core.Abstractions;
+
+using Microsoft.Extensions.DependencyInjection;
+using ServiceControl.Persistence;
+using ServiceControl.Persistence.MessageRedirects;
+using ServiceControl.Persistence.UnitOfWork;
+using Implementation;
+using Implementation.UnitOfWork;
+using Particular.LicensingComponent.Persistence;
+
+public abstract class BasePersistence
+{
+ protected static void RegisterDataStores(IServiceCollection services, bool maintenanceMode)
+ {
+ if (maintenanceMode)
+ {
+ return;
+ }
+
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Abstractions/IDatabaseMigrator.cs b/src/ServiceControl.Persistence.Sql.Core/Abstractions/IDatabaseMigrator.cs
new file mode 100644
index 0000000000..39de35cfe4
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Abstractions/IDatabaseMigrator.cs
@@ -0,0 +1,6 @@
+namespace ServiceControl.Persistence.Sql.Core.Abstractions;
+
+public interface IDatabaseMigrator
+{
+ Task ApplyMigrations(CancellationToken cancellationToken = default);
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Abstractions/SqlPersisterSettings.cs b/src/ServiceControl.Persistence.Sql.Core/Abstractions/SqlPersisterSettings.cs
new file mode 100644
index 0000000000..25de65b710
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Abstractions/SqlPersisterSettings.cs
@@ -0,0 +1,10 @@
+namespace ServiceControl.Persistence.Sql.Core.Abstractions;
+
+using ServiceControl.Persistence;
+
+public abstract class SqlPersisterSettings : PersistenceSettings
+{
+ public required string ConnectionString { get; set; }
+ public int CommandTimeout { get; set; } = 30;
+ public bool EnableSensitiveDataLogging { get; set; } = false;
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/DbContexts/ServiceControlDbContextBase.cs b/src/ServiceControl.Persistence.Sql.Core/DbContexts/ServiceControlDbContextBase.cs
new file mode 100644
index 0000000000..c37f8a9976
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/DbContexts/ServiceControlDbContextBase.cs
@@ -0,0 +1,69 @@
+namespace ServiceControl.Persistence.Sql.Core.DbContexts;
+
+using Entities;
+using EntityConfigurations;
+using Microsoft.EntityFrameworkCore;
+
+public abstract class ServiceControlDbContextBase : DbContext
+{
+ protected ServiceControlDbContextBase(DbContextOptions options) : base(options)
+ {
+ }
+
+ public DbSet TrialLicenses { get; set; }
+ public DbSet EndpointSettings { get; set; }
+ public DbSet EventLogItems { get; set; }
+ public DbSet MessageRedirects { get; set; }
+ public DbSet Subscriptions { get; set; }
+ public DbSet QueueAddresses { get; set; }
+ public DbSet KnownEndpoints { get; set; }
+ public DbSet CustomChecks { get; set; }
+ public DbSet MessageBodies { get; set; }
+ public DbSet RetryHistory { get; set; }
+ public DbSet FailedErrorImports { get; set; }
+ public DbSet ExternalIntegrationDispatchRequests { get; set; }
+ public DbSet ArchiveOperations { get; set; }
+ public DbSet FailedMessages { get; set; }
+ public DbSet RetryBatches { get; set; }
+ public DbSet FailedMessageRetries { get; set; }
+ public DbSet GroupComments { get; set; }
+ public DbSet RetryBatchNowForwarding { get; set; }
+ public DbSet NotificationsSettings { get; set; }
+ public DbSet LicensingMetadata { get; set; }
+ public DbSet Endpoints { get; set; }
+ public DbSet Throughput { get; set; }
+
+ protected override void OnModelCreating(ModelBuilder modelBuilder)
+ {
+ base.OnModelCreating(modelBuilder);
+
+ modelBuilder.ApplyConfiguration(new TrialLicenseConfiguration());
+ modelBuilder.ApplyConfiguration(new EndpointSettingsConfiguration());
+ modelBuilder.ApplyConfiguration(new EventLogItemConfiguration());
+ modelBuilder.ApplyConfiguration(new MessageRedirectsConfiguration());
+ modelBuilder.ApplyConfiguration(new SubscriptionConfiguration());
+ modelBuilder.ApplyConfiguration(new QueueAddressConfiguration());
+ modelBuilder.ApplyConfiguration(new KnownEndpointConfiguration());
+ modelBuilder.ApplyConfiguration(new CustomCheckConfiguration());
+ modelBuilder.ApplyConfiguration(new MessageBodyConfiguration());
+ modelBuilder.ApplyConfiguration(new RetryHistoryConfiguration());
+ modelBuilder.ApplyConfiguration(new FailedErrorImportConfiguration());
+ modelBuilder.ApplyConfiguration(new ExternalIntegrationDispatchRequestConfiguration());
+ modelBuilder.ApplyConfiguration(new ArchiveOperationConfiguration());
+ modelBuilder.ApplyConfiguration(new FailedMessageConfiguration());
+ modelBuilder.ApplyConfiguration(new RetryBatchConfiguration());
+ modelBuilder.ApplyConfiguration(new FailedMessageRetryConfiguration());
+ modelBuilder.ApplyConfiguration(new GroupCommentConfiguration());
+ modelBuilder.ApplyConfiguration(new RetryBatchNowForwardingConfiguration());
+ modelBuilder.ApplyConfiguration(new NotificationsSettingsConfiguration());
+ modelBuilder.ApplyConfiguration(new LicensingMetadataEntityConfiguration());
+ modelBuilder.ApplyConfiguration(new ThroughputEndpointConfiguration());
+ modelBuilder.ApplyConfiguration(new DailyThroughputConfiguration());
+
+ OnModelCreatingProvider(modelBuilder);
+ }
+
+ protected virtual void OnModelCreatingProvider(ModelBuilder modelBuilder)
+ {
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/ArchiveOperationEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/ArchiveOperationEntity.cs
new file mode 100644
index 0000000000..0ac92d6d5f
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/ArchiveOperationEntity.cs
@@ -0,0 +1,19 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+using System;
+
+public class ArchiveOperationEntity
+{
+ public Guid Id { get; set; }
+ public string RequestId { get; set; } = null!;
+ public string GroupName { get; set; } = null!;
+ public int ArchiveType { get; set; } // ArchiveType enum as int
+ public int ArchiveState { get; set; } // ArchiveState enum as int
+ public int TotalNumberOfMessages { get; set; }
+ public int NumberOfMessagesArchived { get; set; }
+ public int NumberOfBatches { get; set; }
+ public int CurrentBatch { get; set; }
+ public DateTime Started { get; set; }
+ public DateTime? Last { get; set; }
+ public DateTime? CompletionTime { get; set; }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/CustomCheckEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/CustomCheckEntity.cs
new file mode 100644
index 0000000000..0598b97e58
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/CustomCheckEntity.cs
@@ -0,0 +1,14 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+public class CustomCheckEntity
+{
+ public Guid Id { get; set; }
+ public string CustomCheckId { get; set; } = null!;
+ public string? Category { get; set; }
+ public int Status { get; set; } // 0 = Pass, 1 = Fail
+ public DateTime ReportedAt { get; set; }
+ public string? FailureReason { get; set; }
+ public string EndpointName { get; set; } = null!;
+ public Guid HostId { get; set; }
+ public string Host { get; set; } = null!;
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/DailyThroughputEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/DailyThroughputEntity.cs
new file mode 100644
index 0000000000..4da7565214
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/DailyThroughputEntity.cs
@@ -0,0 +1,10 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+public class DailyThroughputEntity
+{
+ public int Id { get; set; }
+ public required string EndpointName { get; set; }
+ public required string ThroughputSource { get; set; }
+ public required DateOnly Date { get; set; }
+ public required long MessageCount { get; set; }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/EndpointSettingsEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/EndpointSettingsEntity.cs
new file mode 100644
index 0000000000..ea88866b63
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/EndpointSettingsEntity.cs
@@ -0,0 +1,7 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+public class EndpointSettingsEntity
+{
+ public required string Name { get; set; }
+ public bool TrackInstances { get; set; }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/EventLogItemEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/EventLogItemEntity.cs
new file mode 100644
index 0000000000..fcc815159f
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/EventLogItemEntity.cs
@@ -0,0 +1,14 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+using System;
+
+public class EventLogItemEntity
+{
+ public Guid Id { get; set; }
+ public required string Description { get; set; }
+ public int Severity { get; set; }
+ public DateTime RaisedAt { get; set; }
+ public string? RelatedToJson { get; set; } // Stored as JSON array
+ public string? Category { get; set; }
+ public string? EventType { get; set; }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/ExternalIntegrationDispatchRequestEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/ExternalIntegrationDispatchRequestEntity.cs
new file mode 100644
index 0000000000..6a0c50450a
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/ExternalIntegrationDispatchRequestEntity.cs
@@ -0,0 +1,10 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+using System;
+
+public class ExternalIntegrationDispatchRequestEntity
+{
+ public long Id { get; set; }
+ public string DispatchContextJson { get; set; } = null!;
+ public DateTime CreatedAt { get; set; }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/FailedErrorImportEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/FailedErrorImportEntity.cs
new file mode 100644
index 0000000000..a88d4632c3
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/FailedErrorImportEntity.cs
@@ -0,0 +1,10 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+using System;
+
+public class FailedErrorImportEntity
+{
+ public Guid Id { get; set; }
+ public string MessageJson { get; set; } = null!; // FailedTransportMessage as JSON
+ public string? ExceptionInfo { get; set; }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/FailedMessageEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/FailedMessageEntity.cs
new file mode 100644
index 0000000000..4e161dad1c
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/FailedMessageEntity.cs
@@ -0,0 +1,33 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+using System;
+using ServiceControl.MessageFailures;
+
+public class FailedMessageEntity
+{
+ public Guid Id { get; set; }
+ public string UniqueMessageId { get; set; } = null!;
+ public FailedMessageStatus Status { get; set; }
+
+ // JSON columns for complex nested data
+ public string ProcessingAttemptsJson { get; set; } = null!;
+ public string FailureGroupsJson { get; set; } = null!;
+ public string HeadersJson { get; set; } = null!;
+
+ // Denormalized fields from FailureGroups for efficient filtering
+ // PrimaryFailureGroupId is the first group ID from FailureGroupsJson array
+ public string? PrimaryFailureGroupId { get; set; }
+
+ // Denormalized fields from the last processing attempt for efficient querying
+ public string? MessageId { get; set; }
+ public string? MessageType { get; set; }
+ public DateTime? TimeSent { get; set; }
+ public string? SendingEndpointName { get; set; }
+ public string? ReceivingEndpointName { get; set; }
+ public string? ExceptionType { get; set; }
+ public string? ExceptionMessage { get; set; }
+ public string? QueueAddress { get; set; }
+ public int? NumberOfProcessingAttempts { get; set; }
+ public DateTime? LastProcessedAt { get; set; }
+ public string? ConversationId { get; set; }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/FailedMessageRetryEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/FailedMessageRetryEntity.cs
new file mode 100644
index 0000000000..8daab62466
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/FailedMessageRetryEntity.cs
@@ -0,0 +1,11 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+using System;
+
+public class FailedMessageRetryEntity
+{
+ public Guid Id { get; set; }
+ public string FailedMessageId { get; set; } = null!;
+ public string? RetryBatchId { get; set; }
+ public int StageAttempts { get; set; }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/GroupCommentEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/GroupCommentEntity.cs
new file mode 100644
index 0000000000..a94c4c049b
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/GroupCommentEntity.cs
@@ -0,0 +1,10 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+using System;
+
+public class GroupCommentEntity
+{
+ public Guid Id { get; set; }
+ public string GroupId { get; set; } = null!;
+ public string Comment { get; set; } = null!;
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/KnownEndpointEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/KnownEndpointEntity.cs
new file mode 100644
index 0000000000..40ca68bdee
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/KnownEndpointEntity.cs
@@ -0,0 +1,11 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+public class KnownEndpointEntity
+{
+ public Guid Id { get; set; }
+ public string EndpointName { get; set; } = null!;
+ public Guid HostId { get; set; }
+ public string Host { get; set; } = null!;
+ public string HostDisplayName { get; set; } = null!;
+ public bool Monitored { get; set; }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/LicensingMetadataEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/LicensingMetadataEntity.cs
new file mode 100644
index 0000000000..6b7a97bb83
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/LicensingMetadataEntity.cs
@@ -0,0 +1,8 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+public class LicensingMetadataEntity
+{
+ public int Id { get; set; }
+ public required string Key { get; set; }
+ public required string Data { get; set; }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/MessageBodyEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/MessageBodyEntity.cs
new file mode 100644
index 0000000000..d5d1531acc
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/MessageBodyEntity.cs
@@ -0,0 +1,12 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+using System;
+
+public class MessageBodyEntity
+{
+ public Guid Id { get; set; }
+ public byte[] Body { get; set; } = null!;
+ public string ContentType { get; set; } = null!;
+ public int BodySize { get; set; }
+ public string? Etag { get; set; }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/MessageRedirectsEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/MessageRedirectsEntity.cs
new file mode 100644
index 0000000000..d1c9b77f38
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/MessageRedirectsEntity.cs
@@ -0,0 +1,11 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+using System;
+
+public class MessageRedirectsEntity
+{
+ public Guid Id { get; set; }
+ public required string ETag { get; set; }
+ public DateTime LastModified { get; set; }
+ public required string RedirectsJson { get; set; }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/NotificationsSettingsEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/NotificationsSettingsEntity.cs
new file mode 100644
index 0000000000..d6ede51cc1
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/NotificationsSettingsEntity.cs
@@ -0,0 +1,9 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+using System;
+
+public class NotificationsSettingsEntity
+{
+ public Guid Id { get; set; }
+ public string EmailSettingsJson { get; set; } = string.Empty;
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/QueueAddressEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/QueueAddressEntity.cs
new file mode 100644
index 0000000000..fe1302b60f
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/QueueAddressEntity.cs
@@ -0,0 +1,7 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+public class QueueAddressEntity
+{
+ public string PhysicalAddress { get; set; } = null!;
+ public int FailedMessageCount { get; set; }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/RetryBatchEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/RetryBatchEntity.cs
new file mode 100644
index 0000000000..b8cff440c6
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/RetryBatchEntity.cs
@@ -0,0 +1,23 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+using System;
+using ServiceControl.Persistence;
+
+public class RetryBatchEntity
+{
+ public Guid Id { get; set; }
+ public string? Context { get; set; }
+ public string RetrySessionId { get; set; } = null!;
+ public string? StagingId { get; set; }
+ public string? Originator { get; set; }
+ public string? Classifier { get; set; }
+ public DateTime StartTime { get; set; }
+ public DateTime? Last { get; set; }
+ public string RequestId { get; set; } = null!;
+ public int InitialBatchSize { get; set; }
+ public RetryType RetryType { get; set; }
+ public RetryBatchStatus Status { get; set; }
+
+ // JSON column for list of retry IDs
+ public string FailureRetriesJson { get; set; } = "[]";
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/RetryBatchNowForwardingEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/RetryBatchNowForwardingEntity.cs
new file mode 100644
index 0000000000..d93800728d
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/RetryBatchNowForwardingEntity.cs
@@ -0,0 +1,10 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+public class RetryBatchNowForwardingEntity
+{
+ public int Id { get; set; }
+ public string RetryBatchId { get; set; } = null!;
+
+ // This is a singleton entity - only one forwarding batch at a time
+ public const int SingletonId = 1;
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/RetryHistoryEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/RetryHistoryEntity.cs
new file mode 100644
index 0000000000..7056f97f1a
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/RetryHistoryEntity.cs
@@ -0,0 +1,8 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+public class RetryHistoryEntity
+{
+ public int Id { get; set; } = 1; // Singleton pattern
+ public string? HistoricOperationsJson { get; set; }
+ public string? UnacknowledgedOperationsJson { get; set; }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/SubscriptionEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/SubscriptionEntity.cs
new file mode 100644
index 0000000000..d09e7ed7d8
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/SubscriptionEntity.cs
@@ -0,0 +1,9 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+public class SubscriptionEntity
+{
+ public string Id { get; set; } = null!;
+ public string MessageTypeTypeName { get; set; } = null!;
+ public int MessageTypeVersion { get; set; }
+ public string SubscribersJson { get; set; } = null!;
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/ThroughputEndpointEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/ThroughputEndpointEntity.cs
new file mode 100644
index 0000000000..26b875aaf9
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/ThroughputEndpointEntity.cs
@@ -0,0 +1,13 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+public class ThroughputEndpointEntity
+{
+ public int Id { get; set; }
+ public required string EndpointName { get; set; }
+ public required string ThroughputSource { get; set; }
+ public string? SanitizedEndpointName { get; set; }
+ public string? EndpointIndicators { get; set; }
+ public string? UserIndicator { get; set; }
+ public string? Scope { get; set; }
+ public DateOnly LastCollectedData { get; set; }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Persistence.Sql.Core/Entities/TrialLicenseEntity.cs b/src/ServiceControl.Persistence.Sql.Core/Entities/TrialLicenseEntity.cs
new file mode 100644
index 0000000000..36659760c0
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Entities/TrialLicenseEntity.cs
@@ -0,0 +1,7 @@
+namespace ServiceControl.Persistence.Sql.Core.Entities;
+
+public class TrialLicenseEntity
+{
+ public int Id { get; set; }
+ public DateOnly TrialEndDate { get; set; }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/ArchiveOperationConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/ArchiveOperationConfiguration.cs
new file mode 100644
index 0000000000..109c5e8e32
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/ArchiveOperationConfiguration.cs
@@ -0,0 +1,30 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class ArchiveOperationConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("ArchiveOperations");
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.Id).IsRequired();
+ builder.Property(e => e.RequestId).HasMaxLength(200).IsRequired();
+ builder.Property(e => e.GroupName).HasMaxLength(200).IsRequired();
+ builder.Property(e => e.ArchiveType).IsRequired();
+ builder.Property(e => e.ArchiveState).IsRequired();
+ builder.Property(e => e.TotalNumberOfMessages).IsRequired();
+ builder.Property(e => e.NumberOfMessagesArchived).IsRequired();
+ builder.Property(e => e.NumberOfBatches).IsRequired();
+ builder.Property(e => e.CurrentBatch).IsRequired();
+ builder.Property(e => e.Started).IsRequired();
+ builder.Property(e => e.Last);
+ builder.Property(e => e.CompletionTime);
+
+ builder.HasIndex(e => e.RequestId);
+ builder.HasIndex(e => e.ArchiveState);
+ builder.HasIndex(e => new { e.ArchiveType, e.RequestId }).IsUnique();
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/CustomCheckConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/CustomCheckConfiguration.cs
new file mode 100644
index 0000000000..ce5861d2e2
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/CustomCheckConfiguration.cs
@@ -0,0 +1,25 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class CustomCheckConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("CustomChecks");
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.CustomCheckId).IsRequired().HasMaxLength(500);
+ builder.Property(e => e.Category).HasMaxLength(500);
+ builder.Property(e => e.Status).IsRequired();
+ builder.Property(e => e.ReportedAt).IsRequired();
+ builder.Property(e => e.FailureReason);
+ builder.Property(e => e.EndpointName).IsRequired().HasMaxLength(500);
+ builder.Property(e => e.HostId).IsRequired();
+ builder.Property(e => e.Host).IsRequired().HasMaxLength(500);
+
+ // Index for filtering by status
+ builder.HasIndex(e => e.Status);
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/DailyThroughputConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/DailyThroughputConfiguration.cs
new file mode 100644
index 0000000000..dd5394f438
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/DailyThroughputConfiguration.cs
@@ -0,0 +1,31 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class DailyThroughputConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("DailyThroughput")
+ .HasIndex(e => new
+ {
+ e.EndpointName,
+ e.ThroughputSource,
+ e.Date
+ }, "UC_DailyThroughput_EndpointName_ThroughputSource_Date")
+ .IsUnique();
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.EndpointName)
+ .IsRequired()
+ .HasMaxLength(200);
+ builder.Property(e => e.ThroughputSource)
+ .IsRequired()
+ .HasMaxLength(50);
+ builder.Property(e => e.Date)
+ .IsRequired();
+ builder.Property(e => e.MessageCount)
+ .IsRequired();
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/EndpointSettingsConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/EndpointSettingsConfiguration.cs
new file mode 100644
index 0000000000..365fcad7f9
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/EndpointSettingsConfiguration.cs
@@ -0,0 +1,22 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class EndpointSettingsConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("EndpointSettings");
+
+ builder.HasKey(e => e.Name);
+
+ builder.Property(e => e.Name)
+ .IsRequired()
+ .HasMaxLength(500);
+
+ builder.Property(e => e.TrackInstances)
+ .IsRequired();
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/EventLogItemConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/EventLogItemConfiguration.cs
new file mode 100644
index 0000000000..a5ddc6575b
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/EventLogItemConfiguration.cs
@@ -0,0 +1,40 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class EventLogItemConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("EventLogItems");
+
+ builder.HasKey(e => e.Id);
+
+ builder.Property(e => e.Id)
+ .IsRequired();
+
+ builder.Property(e => e.Description)
+ .IsRequired();
+
+ builder.Property(e => e.Severity)
+ .IsRequired();
+
+ builder.Property(e => e.RaisedAt)
+ .IsRequired();
+
+ builder.Property(e => e.Category)
+ .HasMaxLength(200);
+
+ builder.Property(e => e.EventType)
+ .HasMaxLength(200);
+
+ builder.Property(e => e.RelatedToJson)
+ .HasColumnType("jsonb")
+ .HasMaxLength(4000);
+
+ // Index for querying by RaisedAt
+ builder.HasIndex(e => e.RaisedAt);
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/ExternalIntegrationDispatchRequestConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/ExternalIntegrationDispatchRequestConfiguration.cs
new file mode 100644
index 0000000000..af17a802b1
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/ExternalIntegrationDispatchRequestConfiguration.cs
@@ -0,0 +1,23 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class ExternalIntegrationDispatchRequestConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("ExternalIntegrationDispatchRequests");
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.Id)
+ .ValueGeneratedOnAdd()
+ .HasColumnType("bigint")
+ .IsRequired();
+
+ builder.Property(e => e.DispatchContextJson).HasColumnType("jsonb").IsRequired();
+ builder.Property(e => e.CreatedAt).IsRequired();
+
+ builder.HasIndex(e => e.CreatedAt);
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/FailedErrorImportConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/FailedErrorImportConfiguration.cs
new file mode 100644
index 0000000000..e969222072
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/FailedErrorImportConfiguration.cs
@@ -0,0 +1,17 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class FailedErrorImportConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("FailedErrorImports");
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.Id).IsRequired();
+ builder.Property(e => e.MessageJson).HasColumnType("jsonb").IsRequired();
+ builder.Property(e => e.ExceptionInfo);
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/FailedMessageConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/FailedMessageConfiguration.cs
new file mode 100644
index 0000000000..f0a6ef63fe
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/FailedMessageConfiguration.cs
@@ -0,0 +1,66 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class FailedMessageConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("FailedMessages");
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.Id).IsRequired();
+ builder.Property(e => e.UniqueMessageId).HasMaxLength(200).IsRequired();
+ builder.Property(e => e.Status).IsRequired();
+ builder.Property(e => e.ProcessingAttemptsJson).HasColumnType("jsonb").IsRequired();
+ builder.Property(e => e.FailureGroupsJson).HasColumnType("jsonb").IsRequired();
+ builder.Property(e => e.HeadersJson).HasColumnType("jsonb").IsRequired();
+
+ // Denormalized query fields from FailureGroups
+ builder.Property(e => e.PrimaryFailureGroupId).HasMaxLength(200);
+
+ // Denormalized query fields from processing attempts
+ builder.Property(e => e.MessageId).HasMaxLength(200);
+ builder.Property(e => e.MessageType).HasMaxLength(500);
+ builder.Property(e => e.SendingEndpointName).HasMaxLength(500);
+ builder.Property(e => e.ReceivingEndpointName).HasMaxLength(500);
+ builder.Property(e => e.ExceptionType).HasMaxLength(500);
+ builder.Property(e => e.QueueAddress).HasMaxLength(500);
+ builder.Property(e => e.ConversationId).HasMaxLength(200);
+
+ // PRIMARY: Critical for uniqueness and upserts
+ builder.HasIndex(e => e.UniqueMessageId).IsUnique();
+
+ // COMPOSITE INDEXES: Hot paths - Status is involved in most queries
+ // Most common pattern: Status + LastProcessedAt (15+ queries)
+ builder.HasIndex(e => new { e.Status, e.LastProcessedAt });
+
+ // Endpoint-specific queries (8+ queries)
+ builder.HasIndex(e => new { e.ReceivingEndpointName, e.Status, e.LastProcessedAt });
+
+ // Queue-specific retry operations (6+ queries)
+ builder.HasIndex(e => new { e.QueueAddress, e.Status, e.LastProcessedAt });
+
+ // Retry operations by queue (3+ queries)
+ builder.HasIndex(e => new { e.Status, e.QueueAddress });
+
+ // TIME-BASED QUERIES
+ // Endpoint + time range queries (for GetAllMessagesForEndpoint)
+ builder.HasIndex(e => new { e.ReceivingEndpointName, e.TimeSent });
+
+ // Conversation tracking queries
+ builder.HasIndex(e => new { e.ConversationId, e.LastProcessedAt });
+
+ // SEARCH QUERIES
+ // Message type + time filtering
+ builder.HasIndex(e => new { e.MessageType, e.TimeSent });
+
+ // FAILURE GROUP QUERIES
+ // Critical for group-based filtering (avoids loading all messages)
+ builder.HasIndex(e => new { e.PrimaryFailureGroupId, e.Status, e.LastProcessedAt });
+
+ // SINGLE-COLUMN INDEXES: Keep for specific lookup cases
+ builder.HasIndex(e => e.MessageId);
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/FailedMessageRetryConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/FailedMessageRetryConfiguration.cs
new file mode 100644
index 0000000000..3b9d3bfa63
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/FailedMessageRetryConfiguration.cs
@@ -0,0 +1,22 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class FailedMessageRetryConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("FailedMessageRetries");
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.Id).IsRequired();
+ builder.Property(e => e.FailedMessageId).HasMaxLength(200).IsRequired();
+ builder.Property(e => e.RetryBatchId).HasMaxLength(200);
+ builder.Property(e => e.StageAttempts).IsRequired();
+
+ // Indexes
+ builder.HasIndex(e => e.FailedMessageId);
+ builder.HasIndex(e => e.RetryBatchId);
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/GroupCommentConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/GroupCommentConfiguration.cs
new file mode 100644
index 0000000000..8d21c9ebc5
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/GroupCommentConfiguration.cs
@@ -0,0 +1,19 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class GroupCommentConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("GroupComments");
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.Id).IsRequired();
+ builder.Property(e => e.GroupId).HasMaxLength(200).IsRequired();
+ builder.Property(e => e.Comment).IsRequired();
+
+ builder.HasIndex(e => e.GroupId);
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/KnownEndpointConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/KnownEndpointConfiguration.cs
new file mode 100644
index 0000000000..7b4d1bf7ed
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/KnownEndpointConfiguration.cs
@@ -0,0 +1,19 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class KnownEndpointConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("KnownEndpoints");
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.EndpointName).IsRequired().HasMaxLength(500);
+ builder.Property(e => e.HostId).IsRequired();
+ builder.Property(e => e.Host).IsRequired().HasMaxLength(500);
+ builder.Property(e => e.HostDisplayName).IsRequired().HasMaxLength(500);
+ builder.Property(e => e.Monitored).IsRequired();
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/LicensingMetadataEntityConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/LicensingMetadataEntityConfiguration.cs
new file mode 100644
index 0000000000..06e2318cbd
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/LicensingMetadataEntityConfiguration.cs
@@ -0,0 +1,22 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class LicensingMetadataEntityConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("LicensingMetadata")
+ .HasIndex(e => e.Key)
+ .IsUnique();
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.Key)
+ .IsRequired()
+ .HasMaxLength(200);
+ builder.Property(e => e.Data)
+ .IsRequired()
+ .HasMaxLength(2000);
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/MessageBodyConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/MessageBodyConfiguration.cs
new file mode 100644
index 0000000000..3b918cbfe4
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/MessageBodyConfiguration.cs
@@ -0,0 +1,19 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class MessageBodyConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("MessageBodies");
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.Id).IsRequired();
+ builder.Property(e => e.Body).IsRequired();
+ builder.Property(e => e.ContentType).HasMaxLength(200).IsRequired();
+ builder.Property(e => e.BodySize).IsRequired();
+ builder.Property(e => e.Etag).HasMaxLength(100);
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/MessageRedirectsConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/MessageRedirectsConfiguration.cs
new file mode 100644
index 0000000000..bff5840465
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/MessageRedirectsConfiguration.cs
@@ -0,0 +1,29 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class MessageRedirectsConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("MessageRedirects");
+
+ builder.HasKey(e => e.Id);
+
+ builder.Property(e => e.Id)
+ .IsRequired();
+
+ builder.Property(e => e.ETag)
+ .IsRequired()
+ .HasMaxLength(200);
+
+ builder.Property(e => e.LastModified)
+ .IsRequired();
+
+ builder.Property(e => e.RedirectsJson)
+ .HasColumnType("jsonb")
+ .IsRequired();
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/NotificationsSettingsConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/NotificationsSettingsConfiguration.cs
new file mode 100644
index 0000000000..bf4f6a24fb
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/NotificationsSettingsConfiguration.cs
@@ -0,0 +1,16 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class NotificationsSettingsConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("NotificationsSettings");
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.Id).IsRequired();
+ builder.Property(e => e.EmailSettingsJson).HasColumnType("jsonb").IsRequired();
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/QueueAddressConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/QueueAddressConfiguration.cs
new file mode 100644
index 0000000000..b9d5776c3a
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/QueueAddressConfiguration.cs
@@ -0,0 +1,16 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class QueueAddressConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("QueueAddresses");
+ builder.HasKey(e => e.PhysicalAddress);
+ builder.Property(e => e.PhysicalAddress).HasMaxLength(500);
+ builder.Property(e => e.FailedMessageCount).IsRequired();
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/RetryBatchConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/RetryBatchConfiguration.cs
new file mode 100644
index 0000000000..8c7ee6185d
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/RetryBatchConfiguration.cs
@@ -0,0 +1,29 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class RetryBatchConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("RetryBatches");
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.Id).IsRequired();
+ builder.Property(e => e.RetrySessionId).HasMaxLength(200).IsRequired();
+ builder.Property(e => e.RequestId).HasMaxLength(200).IsRequired();
+ builder.Property(e => e.StagingId).HasMaxLength(200);
+ builder.Property(e => e.Originator).HasMaxLength(500);
+ builder.Property(e => e.Classifier).HasMaxLength(500);
+ builder.Property(e => e.StartTime).IsRequired();
+ builder.Property(e => e.Status).IsRequired();
+ builder.Property(e => e.RetryType).IsRequired();
+ builder.Property(e => e.FailureRetriesJson).HasColumnType("jsonb").IsRequired();
+
+ // Indexes
+ builder.HasIndex(e => e.RetrySessionId);
+ builder.HasIndex(e => e.Status);
+ builder.HasIndex(e => e.StagingId);
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/RetryBatchNowForwardingConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/RetryBatchNowForwardingConfiguration.cs
new file mode 100644
index 0000000000..dfd5a3c455
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/RetryBatchNowForwardingConfiguration.cs
@@ -0,0 +1,18 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class RetryBatchNowForwardingConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("RetryBatchNowForwarding");
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.Id).IsRequired();
+ builder.Property(e => e.RetryBatchId).HasMaxLength(200).IsRequired();
+
+ builder.HasIndex(e => e.RetryBatchId);
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/RetryHistoryConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/RetryHistoryConfiguration.cs
new file mode 100644
index 0000000000..e7104f8e8b
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/RetryHistoryConfiguration.cs
@@ -0,0 +1,17 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class RetryHistoryConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("RetryHistory");
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.Id).HasDefaultValue(1).ValueGeneratedNever();
+ builder.Property(e => e.HistoricOperationsJson).HasColumnType("jsonb");
+ builder.Property(e => e.UnacknowledgedOperationsJson).HasColumnType("jsonb");
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/SubscriptionConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/SubscriptionConfiguration.cs
new file mode 100644
index 0000000000..349c30a5b7
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/SubscriptionConfiguration.cs
@@ -0,0 +1,21 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class SubscriptionConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("Subscriptions");
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.Id).HasMaxLength(100);
+ builder.Property(e => e.MessageTypeTypeName).IsRequired().HasMaxLength(500);
+ builder.Property(e => e.MessageTypeVersion).IsRequired();
+ builder.Property(e => e.SubscribersJson).HasColumnType("jsonb").IsRequired();
+
+ // Unique composite index to enforce one subscription per message type/version
+ builder.HasIndex(e => new { e.MessageTypeTypeName, e.MessageTypeVersion }).IsUnique();
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/ThroughputEndpointConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/ThroughputEndpointConfiguration.cs
new file mode 100644
index 0000000000..dbd1e630ee
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/ThroughputEndpointConfiguration.cs
@@ -0,0 +1,32 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class ThroughputEndpointConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("ThroughputEndpoint")
+ .HasIndex(e => new
+ {
+ e.EndpointName,
+ e.ThroughputSource
+ }, "UC_ThroughputEndpoint_EndpointName_ThroughputSource")
+ .IsUnique();
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.EndpointName)
+ .IsRequired()
+ .HasMaxLength(200);
+ builder.Property(e => e.ThroughputSource)
+ .IsRequired()
+ .HasMaxLength(50);
+
+ builder.Property(e => e.SanitizedEndpointName);
+ builder.Property(e => e.EndpointIndicators);
+ builder.Property(e => e.UserIndicator);
+ builder.Property(e => e.Scope);
+ builder.Property(e => e.LastCollectedData);
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/TrialLicenseConfiguration.cs b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/TrialLicenseConfiguration.cs
new file mode 100644
index 0000000000..903892ba1b
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/EntityConfigurations/TrialLicenseConfiguration.cs
@@ -0,0 +1,22 @@
+namespace ServiceControl.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class TrialLicenseConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("TrialLicense");
+
+ builder.HasKey(e => e.Id);
+
+ // Ensure only one row exists by using a fixed primary key
+ builder.Property(e => e.Id)
+ .ValueGeneratedNever();
+
+ builder.Property(e => e.TrialEndDate)
+ .IsRequired();
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Implementation/ArchiveMessages.cs b/src/ServiceControl.Persistence.Sql.Core/Implementation/ArchiveMessages.cs
new file mode 100644
index 0000000000..3db1a5d183
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Implementation/ArchiveMessages.cs
@@ -0,0 +1,160 @@
+namespace ServiceControl.Persistence.Sql.Core.Implementation;
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using DbContexts;
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using ServiceControl.Infrastructure.DomainEvents;
+using ServiceControl.Persistence.Recoverability;
+using ServiceControl.Recoverability;
+
+public class ArchiveMessages : DataStoreBase, IArchiveMessages
+{
+ readonly IDomainEvents domainEvents;
+ readonly ILogger logger;
+
+ public ArchiveMessages(
+ IServiceProvider serviceProvider,
+ IDomainEvents domainEvents,
+ ILogger logger) : base(serviceProvider)
+ {
+ this.domainEvents = domainEvents;
+ this.logger = logger;
+ }
+
+ public async Task ArchiveAllInGroup(string groupId)
+ {
+ // This would update all failed messages in the group to archived status
+ // For now, this is a placeholder that would need the failed message infrastructure
+ logger.LogInformation("Archiving all messages in group {GroupId}", groupId);
+ await Task.CompletedTask;
+ }
+
+ public async Task UnarchiveAllInGroup(string groupId)
+ {
+ logger.LogInformation("Unarchiving all messages in group {GroupId}", groupId);
+ await Task.CompletedTask;
+ }
+
+ public bool IsOperationInProgressFor(string groupId, ArchiveType archiveType)
+ {
+ return ExecuteWithDbContext(dbContext =>
+ {
+ var operationId = MakeOperationId(groupId, archiveType);
+ var operation = dbContext.ArchiveOperations
+ .AsNoTracking()
+ .FirstOrDefault(a => a.Id == Guid.Parse(operationId));
+
+ if (operation == null)
+ {
+ return Task.FromResult(false);
+ }
+
+ return Task.FromResult(operation.ArchiveState != (int)ArchiveState.ArchiveCompleted);
+ }).Result;
+ }
+
+ public bool IsArchiveInProgressFor(string groupId)
+ {
+ return IsOperationInProgressFor(groupId, ArchiveType.FailureGroup) ||
+ IsOperationInProgressFor(groupId, ArchiveType.All);
+ }
+
+ public void DismissArchiveOperation(string groupId, ArchiveType archiveType)
+ {
+ ExecuteWithDbContext(dbContext =>
+ {
+ var operationId = Guid.Parse(MakeOperationId(groupId, archiveType));
+
+ dbContext.ArchiveOperations.Where(a => a.Id == operationId).ExecuteDelete();
+ return Task.CompletedTask;
+ }).Wait();
+ }
+
+ public Task StartArchiving(string groupId, ArchiveType archiveType)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var operation = new ArchiveOperationEntity
+ {
+ Id = Guid.Parse(MakeOperationId(groupId, archiveType)),
+ RequestId = groupId,
+ GroupName = groupId,
+ ArchiveType = (int)archiveType,
+ ArchiveState = (int)ArchiveState.ArchiveStarted,
+ TotalNumberOfMessages = 0,
+ NumberOfMessagesArchived = 0,
+ NumberOfBatches = 0,
+ CurrentBatch = 0,
+ Started = DateTime.UtcNow
+ };
+
+ await dbContext.ArchiveOperations.AddAsync(operation);
+ await dbContext.SaveChangesAsync();
+
+ logger.LogInformation("Started archiving for group {GroupId}", groupId);
+ });
+ }
+
+ public Task StartUnarchiving(string groupId, ArchiveType archiveType)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var operation = new ArchiveOperationEntity
+ {
+ Id = Guid.Parse(MakeOperationId(groupId, archiveType)),
+ RequestId = groupId,
+ GroupName = groupId,
+ ArchiveType = (int)archiveType,
+ ArchiveState = (int)ArchiveState.ArchiveStarted,
+ TotalNumberOfMessages = 0,
+ NumberOfMessagesArchived = 0,
+ NumberOfBatches = 0,
+ CurrentBatch = 0,
+ Started = DateTime.UtcNow
+ };
+
+ await dbContext.ArchiveOperations.AddAsync(operation);
+ await dbContext.SaveChangesAsync();
+
+ logger.LogInformation("Started unarchiving for group {GroupId}", groupId);
+ });
+ }
+
+ public IEnumerable GetArchivalOperations()
+ {
+ // Note: IEnumerable methods need direct scope management as they yield results
+ using var scope = serviceProvider.CreateScope();
+ var dbContext = scope.ServiceProvider.GetRequiredService();
+
+ var operations = dbContext.ArchiveOperations
+ .AsNoTracking()
+ .AsEnumerable();
+
+ foreach (var op in operations)
+ {
+ yield return new InMemoryArchive(op.RequestId, (ArchiveType)op.ArchiveType, domainEvents)
+ {
+ GroupName = op.GroupName,
+ ArchiveState = (ArchiveState)op.ArchiveState,
+ TotalNumberOfMessages = op.TotalNumberOfMessages,
+ NumberOfMessagesArchived = op.NumberOfMessagesArchived,
+ NumberOfBatches = op.NumberOfBatches,
+ CurrentBatch = op.CurrentBatch,
+ Started = op.Started,
+ Last = op.Last,
+ CompletionTime = op.CompletionTime
+ };
+ }
+ }
+
+ static string MakeOperationId(string groupId, ArchiveType archiveType)
+ {
+ return $"{archiveType}/{groupId}";
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Implementation/BodyStorage.cs b/src/ServiceControl.Persistence.Sql.Core/Implementation/BodyStorage.cs
new file mode 100644
index 0000000000..e4151988e9
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Implementation/BodyStorage.cs
@@ -0,0 +1,39 @@
+namespace ServiceControl.Persistence.Sql.Core.Implementation;
+
+using System;
+using System.IO;
+using System.Threading.Tasks;
+using Microsoft.EntityFrameworkCore;
+using ServiceControl.Operations.BodyStorage;
+
+public class BodyStorage : DataStoreBase, IBodyStorage
+{
+ public BodyStorage(IServiceProvider serviceProvider) : base(serviceProvider)
+ {
+ }
+
+ public Task TryFetch(string bodyId)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ // Try to fetch the body directly by ID
+ var messageBody = await dbContext.MessageBodies
+ .AsNoTracking()
+ .FirstOrDefaultAsync(mb => mb.Id == Guid.Parse(bodyId));
+
+ if (messageBody == null)
+ {
+ return new MessageBodyStreamResult { HasResult = false };
+ }
+
+ return new MessageBodyStreamResult
+ {
+ HasResult = true,
+ Stream = new MemoryStream(messageBody.Body),
+ ContentType = messageBody.ContentType,
+ BodySize = messageBody.BodySize,
+ Etag = messageBody.Etag
+ };
+ });
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Implementation/CustomChecksDataStore.cs b/src/ServiceControl.Persistence.Sql.Core/Implementation/CustomChecksDataStore.cs
new file mode 100644
index 0000000000..ff02d9f381
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Implementation/CustomChecksDataStore.cs
@@ -0,0 +1,118 @@
+namespace ServiceControl.Persistence.Sql.Core.Implementation;
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Contracts.CustomChecks;
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Operations;
+using ServiceControl.Persistence;
+using ServiceControl.Persistence.Infrastructure;
+
+public class CustomChecksDataStore : DataStoreBase, ICustomChecksDataStore
+{
+ public CustomChecksDataStore(IServiceProvider serviceProvider) : base(serviceProvider)
+ {
+ }
+
+ public Task UpdateCustomCheckStatus(CustomCheckDetail detail)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var status = CheckStateChange.Unchanged;
+ var id = detail.GetDeterministicId();
+
+ var customCheck = await dbContext.CustomChecks.FirstOrDefaultAsync(c => c.Id == id);
+
+ if (customCheck == null ||
+ (customCheck.Status == (int)Status.Fail && !detail.HasFailed) ||
+ (customCheck.Status == (int)Status.Pass && detail.HasFailed))
+ {
+ if (customCheck == null)
+ {
+ customCheck = new CustomCheckEntity { Id = id };
+ await dbContext.CustomChecks.AddAsync(customCheck);
+ }
+
+ status = CheckStateChange.Changed;
+ }
+
+ customCheck.CustomCheckId = detail.CustomCheckId;
+ customCheck.Category = detail.Category;
+ customCheck.Status = detail.HasFailed ? (int)Status.Fail : (int)Status.Pass;
+ customCheck.ReportedAt = detail.ReportedAt;
+ customCheck.FailureReason = detail.FailureReason;
+ customCheck.EndpointName = detail.OriginatingEndpoint.Name;
+ customCheck.HostId = detail.OriginatingEndpoint.HostId;
+ customCheck.Host = detail.OriginatingEndpoint.Host;
+
+ await dbContext.SaveChangesAsync();
+
+ return status;
+ });
+ }
+
+ public Task>> GetStats(PagingInfo paging, string? status = null)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var query = dbContext.CustomChecks.AsQueryable();
+
+ // Add status filter
+ if (status == "fail")
+ {
+ query = query.Where(c => c.Status == (int)Status.Fail);
+ }
+ if (status == "pass")
+ {
+ query = query.Where(c => c.Status == (int)Status.Pass);
+ }
+
+ var totalCount = await query.CountAsync();
+
+ var results = await query
+ .OrderByDescending(c => c.ReportedAt)
+ .Skip(paging.Offset)
+ .Take(paging.Next)
+ .AsNoTracking()
+ .ToListAsync();
+
+ var customChecks = results.Select(e => new CustomCheck
+ {
+ Id = $"{e.Id}",
+ CustomCheckId = e.CustomCheckId,
+ Category = e.Category,
+ Status = (Status)e.Status,
+ ReportedAt = e.ReportedAt,
+ FailureReason = e.FailureReason,
+ OriginatingEndpoint = new EndpointDetails
+ {
+ Name = e.EndpointName,
+ HostId = e.HostId,
+ Host = e.Host
+ }
+ }).ToList();
+
+ var queryStats = new QueryStatsInfo(string.Empty, totalCount, false);
+ return new QueryResult>(customChecks, queryStats);
+ });
+ }
+
+ public Task DeleteCustomCheck(Guid id)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ await dbContext.CustomChecks.Where(c => c.Id == id).ExecuteDeleteAsync();
+ });
+ }
+
+ public Task GetNumberOfFailedChecks()
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ return await dbContext.CustomChecks.CountAsync(c => c.Status == (int)Status.Fail);
+ });
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Implementation/DataStoreBase.cs b/src/ServiceControl.Persistence.Sql.Core/Implementation/DataStoreBase.cs
new file mode 100644
index 0000000000..38ea61b2b9
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Implementation/DataStoreBase.cs
@@ -0,0 +1,44 @@
+namespace ServiceControl.Persistence.Sql.Core.Implementation;
+
+using System;
+using System.Threading.Tasks;
+using DbContexts;
+using Microsoft.Extensions.DependencyInjection;
+
+///
+/// Base class for data stores that provides helper methods to simplify scope and DbContext management
+///
+public abstract class DataStoreBase
+{
+ protected readonly IServiceProvider serviceProvider;
+
+ protected DataStoreBase(IServiceProvider serviceProvider)
+ {
+ this.serviceProvider = serviceProvider;
+ }
+
+ ///
+ /// Executes an operation with a scoped DbContext, returning a result
+ ///
+ protected async Task ExecuteWithDbContext(Func> operation)
+ {
+ using var scope = serviceProvider.CreateScope();
+ var dbContext = scope.ServiceProvider.GetRequiredService();
+ return await operation(dbContext);
+ }
+
+ ///
+ /// Executes an operation with a scoped DbContext, without returning a result
+ ///
+ protected async Task ExecuteWithDbContext(Func operation)
+ {
+ using var scope = serviceProvider.CreateScope();
+ var dbContext = scope.ServiceProvider.GetRequiredService();
+ await operation(dbContext);
+ }
+
+ ///
+ /// Creates a scope for operations that need to manage their own scope lifecycle (e.g., managers)
+ ///
+ protected IServiceScope CreateScope() => serviceProvider.CreateScope();
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Implementation/EditFailedMessagesManager.cs b/src/ServiceControl.Persistence.Sql.Core/Implementation/EditFailedMessagesManager.cs
new file mode 100644
index 0000000000..106b01f40a
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Implementation/EditFailedMessagesManager.cs
@@ -0,0 +1,126 @@
+namespace ServiceControl.Persistence.Sql.Core.Implementation;
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text.Json;
+using System.Threading.Tasks;
+using DbContexts;
+using Infrastructure;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.Extensions.DependencyInjection;
+using ServiceControl.MessageFailures;
+using ServiceControl.Operations;
+using ServiceControl.Persistence;
+
+class EditFailedMessagesManager(
+ IServiceScope scope) : IEditFailedMessagesManager
+{
+ readonly ServiceControlDbContextBase dbContext = scope.ServiceProvider.GetRequiredService();
+ string? currentEditingRequestId;
+ FailedMessage? currentMessage;
+
+ public async Task GetFailedMessage(string uniqueMessageId)
+ {
+ var entity = await dbContext.FailedMessages
+ .FirstOrDefaultAsync(m => m.UniqueMessageId == uniqueMessageId);
+
+ if (entity == null)
+ {
+ return null;
+ }
+
+ var processingAttempts = JsonSerializer.Deserialize>(entity.ProcessingAttemptsJson, JsonSerializationOptions.Default) ?? [];
+ var failureGroups = JsonSerializer.Deserialize>(entity.FailureGroupsJson, JsonSerializationOptions.Default) ?? [];
+
+ currentMessage = new FailedMessage
+ {
+ Id = entity.Id.ToString(),
+ UniqueMessageId = entity.UniqueMessageId,
+ Status = entity.Status,
+ ProcessingAttempts = processingAttempts,
+ FailureGroups = failureGroups
+ };
+
+ return currentMessage;
+ }
+
+ public async Task UpdateFailedMessage(FailedMessage failedMessage)
+ {
+ T? GetMetadata(FailedMessage.ProcessingAttempt lastAttempt, string key)
+ {
+ if (lastAttempt.MessageMetadata.TryGetValue(key, out var value))
+ {
+ return (T?)value;
+ }
+ else
+ {
+ return default;
+ }
+ }
+
+ var entity = await dbContext.FailedMessages
+ .FirstOrDefaultAsync(m => m.Id == Guid.Parse(failedMessage.Id));
+
+ if (entity != null)
+ {
+ entity.Status = failedMessage.Status;
+ entity.ProcessingAttemptsJson = JsonSerializer.Serialize(failedMessage.ProcessingAttempts, JsonSerializationOptions.Default);
+ entity.FailureGroupsJson = JsonSerializer.Serialize(failedMessage.FailureGroups, JsonSerializationOptions.Default);
+ entity.PrimaryFailureGroupId = failedMessage.FailureGroups.Count > 0 ? failedMessage.FailureGroups[0].Id : null;
+
+ // Update denormalized fields from last attempt
+ var lastAttempt = failedMessage.ProcessingAttempts.LastOrDefault();
+ if (lastAttempt != null)
+ {
+ entity.HeadersJson = JsonSerializer.Serialize(lastAttempt.Headers, JsonSerializationOptions.Default);
+ var messageType = GetMetadata(lastAttempt, "MessageType");
+ var sendingEndpoint = GetMetadata(lastAttempt, "SendingEndpoint");
+ var receivingEndpoint = GetMetadata(lastAttempt, "ReceivingEndpoint");
+
+ entity.MessageId = lastAttempt.MessageId;
+ entity.MessageType = messageType;
+ entity.TimeSent = lastAttempt.AttemptedAt;
+ entity.SendingEndpointName = sendingEndpoint?.Name;
+ entity.ReceivingEndpointName = receivingEndpoint?.Name;
+ entity.ExceptionType = lastAttempt.FailureDetails?.Exception?.ExceptionType;
+ entity.ExceptionMessage = lastAttempt.FailureDetails?.Exception?.Message;
+ entity.QueueAddress = lastAttempt.Headers?.GetValueOrDefault("NServiceBus.FailedQ");
+ entity.LastProcessedAt = lastAttempt.AttemptedAt;
+ }
+
+ entity.NumberOfProcessingAttempts = failedMessage.ProcessingAttempts.Count;
+ }
+ }
+
+ public Task GetCurrentEditingRequestId(string failedMessageId)
+ {
+ // Simple in-memory tracking for the editing request
+ return Task.FromResult(currentMessage?.Id == failedMessageId ? currentEditingRequestId : null);
+ }
+
+ public Task SetCurrentEditingRequestId(string editingMessageId)
+ {
+ currentEditingRequestId = editingMessageId;
+ return Task.CompletedTask;
+ }
+
+ public async Task SetFailedMessageAsResolved()
+ {
+ if (currentMessage != null)
+ {
+ currentMessage.Status = FailedMessageStatus.Resolved;
+ await UpdateFailedMessage(currentMessage);
+ }
+ }
+
+ public async Task SaveChanges()
+ {
+ await dbContext.SaveChangesAsync();
+ }
+
+ public void Dispose()
+ {
+ scope.Dispose();
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Implementation/EndpointSettingsStore.cs b/src/ServiceControl.Persistence.Sql.Core/Implementation/EndpointSettingsStore.cs
new file mode 100644
index 0000000000..4f9ef498be
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Implementation/EndpointSettingsStore.cs
@@ -0,0 +1,69 @@
+namespace ServiceControl.Persistence.Sql.Core.Implementation;
+
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using DbContexts;
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.Extensions.DependencyInjection;
+using ServiceControl.Persistence;
+
+public class EndpointSettingsStore : DataStoreBase, IEndpointSettingsStore
+{
+ public EndpointSettingsStore(IServiceProvider serviceProvider) : base(serviceProvider)
+ {
+ }
+
+ public async IAsyncEnumerable GetAllEndpointSettings()
+ {
+ // Note: IAsyncEnumerable methods need direct scope management as they yield results
+ using var scope = serviceProvider.CreateScope();
+ var dbContext = scope.ServiceProvider.GetRequiredService();
+
+ var entities = dbContext.EndpointSettings.AsNoTracking().AsAsyncEnumerable();
+
+ await foreach (var entity in entities)
+ {
+ yield return new EndpointSettings
+ {
+ Name = entity.Name,
+ TrackInstances = entity.TrackInstances
+ };
+ }
+ }
+
+ public Task UpdateEndpointSettings(EndpointSettings settings, CancellationToken cancellationToken)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ // Use EF's change tracking for upsert
+ var existing = await dbContext.EndpointSettings.FindAsync([settings.Name], cancellationToken);
+ if (existing == null)
+ {
+ var entity = new EndpointSettingsEntity
+ {
+ Name = settings.Name,
+ TrackInstances = settings.TrackInstances
+ };
+ dbContext.EndpointSettings.Add(entity);
+ }
+ else
+ {
+ existing.TrackInstances = settings.TrackInstances;
+ }
+
+ await dbContext.SaveChangesAsync(cancellationToken);
+ });
+ }
+
+ public Task Delete(string name, CancellationToken cancellationToken)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ await dbContext.EndpointSettings
+ .Where(e => e.Name == name)
+ .ExecuteDeleteAsync(cancellationToken);
+ });
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Implementation/ErrorMessageDataStore.FailureGroups.cs b/src/ServiceControl.Persistence.Sql.Core/Implementation/ErrorMessageDataStore.FailureGroups.cs
new file mode 100644
index 0000000000..3593409e8c
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Implementation/ErrorMessageDataStore.FailureGroups.cs
@@ -0,0 +1,240 @@
+namespace ServiceControl.Persistence.Sql.Core.Implementation;
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text.Json;
+using System.Threading.Tasks;
+using Entities;
+using Infrastructure;
+using Microsoft.EntityFrameworkCore;
+using ServiceControl.MessageFailures;
+using ServiceControl.MessageFailures.Api;
+using ServiceControl.Persistence.Infrastructure;
+using ServiceControl.Recoverability;
+
+partial class ErrorMessageDataStore
+{
+ public Task> GetFailureGroupView(string groupId, string status, string modified)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ // Query failed messages filtered by PrimaryFailureGroupId at database level
+ var messages = await dbContext.FailedMessages
+ .AsNoTracking()
+ .Where(fm => fm.PrimaryFailureGroupId == groupId)
+ .ToListAsync();
+
+ // Deserialize failure groups to get the primary group details
+ var allGroups = messages
+ .Select(fm =>
+ {
+ var groups = JsonSerializer.Deserialize>(fm.FailureGroupsJson, JsonSerializationOptions.Default) ?? [];
+ // Take the first group (which matches PrimaryFailureGroupId == groupId)
+ var primaryGroup = groups.FirstOrDefault();
+ return new
+ {
+ Group = primaryGroup,
+ MessageId = fm.Id,
+ LastProcessedAt = fm.LastProcessedAt ?? DateTime.MinValue
+ };
+ })
+ .Where(x => x.Group != null)
+ .ToList();
+
+ if (!allGroups.Any())
+ {
+ return new QueryResult(null!, new QueryStatsInfo("0", 0, false));
+ }
+
+ // Aggregate the group data
+ var firstGroup = allGroups.First().Group!; // Safe: allGroups is filtered to non-null Groups
+
+ // Retrieve comment if exists
+ var commentEntity = await dbContext.GroupComments
+ .AsNoTracking()
+ .FirstOrDefaultAsync(gc => gc.GroupId == groupId);
+
+ var view = new FailureGroupView
+ {
+ Id = groupId,
+ Title = firstGroup.Title,
+ Type = firstGroup.Type,
+ Count = allGroups.Count,
+ Comment = commentEntity?.Comment ?? string.Empty,
+ First = allGroups.Min(x => x.LastProcessedAt),
+ Last = allGroups.Max(x => x.LastProcessedAt)
+ };
+
+ return new QueryResult(view, new QueryStatsInfo("1", 1, false));
+ });
+ }
+
+ public Task> GetFailureGroupsByClassifier(string classifier)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ // Query all failed messages - optimize by selecting only required columns
+ // Note: Cannot filter by PrimaryFailureGroupId since we're filtering by classifier (Type)
+ var messages = await dbContext.FailedMessages
+ .AsNoTracking()
+ .Select(fm => new { fm.FailureGroupsJson, fm.LastProcessedAt })
+ .ToListAsync();
+
+ // Deserialize and group by failure group ID
+ var groupedData = messages
+ .SelectMany(fm =>
+ {
+ var groups = JsonSerializer.Deserialize>(fm.FailureGroupsJson, JsonSerializationOptions.Default) ?? [];
+ return groups.Select(g => new
+ {
+ Group = g,
+ LastProcessedAt = fm.LastProcessedAt ?? DateTime.MinValue
+ });
+ })
+ .Where(x => x.Group.Type == classifier)
+ .GroupBy(x => x.Group.Id)
+ .Select(g => new FailureGroupView
+ {
+ Id = g.Key,
+ Title = g.First().Group.Title,
+ Type = g.First().Group.Type,
+ Count = g.Count(),
+ Comment = string.Empty,
+ First = g.Min(x => x.LastProcessedAt),
+ Last = g.Max(x => x.LastProcessedAt)
+ })
+ .OrderByDescending(g => g.Last)
+ .ToList();
+
+ return (IList)groupedData;
+ });
+ }
+
+ public Task>> GetGroupErrors(string groupId, string status, string modified, SortInfo sortInfo, PagingInfo pagingInfo)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ // Get messages filtered by PrimaryFailureGroupId at database level
+ var allMessages = await dbContext.FailedMessages
+ .AsNoTracking()
+ .Where(fm => fm.PrimaryFailureGroupId == groupId)
+ .ToListAsync();
+
+ var matchingMessages = allMessages
+ .Where(fm =>
+ {
+ var groups = JsonSerializer.Deserialize>(fm.FailureGroupsJson, JsonSerializationOptions.Default) ?? [];
+ return groups.Any(g => g.Id == groupId);
+ })
+ .ToList();
+
+ // Apply status filter if provided
+ if (!string.IsNullOrEmpty(status))
+ {
+ var statusEnum = Enum.Parse(status, true);
+ matchingMessages = [.. matchingMessages.Where(fm => fm.Status == statusEnum)];
+ }
+
+ var totalCount = matchingMessages.Count;
+
+ // Apply sorting (simplified - would need full sorting implementation)
+ matchingMessages = [.. matchingMessages
+ .OrderByDescending(fm => fm.LastProcessedAt)
+ .Skip(pagingInfo.Offset)
+ .Take(pagingInfo.Next)];
+
+ var results = matchingMessages.Select(CreateFailedMessageView).ToList();
+
+ return new QueryResult>(results, new QueryStatsInfo(totalCount.ToString(), totalCount, false));
+ });
+ }
+
+ public Task GetGroupErrorsCount(string groupId, string status, string modified)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var allMessages = await dbContext.FailedMessages
+ .AsNoTracking()
+ .Where(fm => fm.PrimaryFailureGroupId == groupId)
+ .ToListAsync();
+
+ var count = allMessages
+ .Count(fm =>
+ {
+ var groups = JsonSerializer.Deserialize>(fm.FailureGroupsJson, JsonSerializationOptions.Default) ?? [];
+ var hasGroup = groups.Any(g => g.Id == groupId);
+
+ if (!hasGroup)
+ {
+ return false;
+ }
+
+ if (!string.IsNullOrEmpty(status))
+ {
+ var statusEnum = Enum.Parse(status, true);
+ return fm.Status == statusEnum;
+ }
+
+ return true;
+ });
+
+ return new QueryStatsInfo(count.ToString(), count, false);
+ });
+ }
+
+ public async Task>> GetGroup(string groupId, string status, string modified)
+ {
+ // This appears to be similar to GetFailureGroupView but returns a list
+ var singleResult = await GetFailureGroupView(groupId, status, modified);
+
+ if (singleResult.Results == null)
+ {
+ return new QueryResult>([], new QueryStatsInfo("0", 0, false));
+ }
+
+ return new QueryResult>([singleResult.Results], singleResult.QueryStats);
+ }
+
+ public Task EditComment(string groupId, string comment)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var id = Guid.Parse(groupId);
+
+ // Use EF's change tracking for upsert
+ var existing = await dbContext.GroupComments.FindAsync(id);
+ if (existing == null)
+ {
+ var commentEntity = new GroupCommentEntity
+ {
+ Id = id,
+ GroupId = groupId,
+ Comment = comment
+ };
+ dbContext.GroupComments.Add(commentEntity);
+ }
+ else
+ {
+ existing.Comment = comment;
+ }
+
+ await dbContext.SaveChangesAsync();
+ });
+ }
+
+ public Task DeleteComment(string groupId)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var comment = await dbContext.GroupComments
+ .FirstOrDefaultAsync(gc => gc.GroupId == groupId);
+
+ if (comment != null)
+ {
+ dbContext.GroupComments.Remove(comment);
+ await dbContext.SaveChangesAsync();
+ }
+ });
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Implementation/ErrorMessageDataStore.MessageQueries.cs b/src/ServiceControl.Persistence.Sql.Core/Implementation/ErrorMessageDataStore.MessageQueries.cs
new file mode 100644
index 0000000000..620cb49a2f
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Implementation/ErrorMessageDataStore.MessageQueries.cs
@@ -0,0 +1,423 @@
+namespace ServiceControl.Persistence.Sql.Core.Implementation;
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text.Json;
+using System.Threading.Tasks;
+using CompositeViews.Messages;
+using Infrastructure;
+using Microsoft.EntityFrameworkCore;
+using ServiceControl.MessageFailures;
+using ServiceControl.MessageFailures.Api;
+using ServiceControl.Persistence.Infrastructure;
+
+partial class ErrorMessageDataStore
+{
+ public Task>> GetAllMessages(PagingInfo pagingInfo, SortInfo sortInfo, bool includeSystemMessages, DateTimeRange? timeSentRange = null)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var query = dbContext.FailedMessages.AsQueryable();
+
+ // Apply time range filter
+ if (timeSentRange != null)
+ {
+ if (timeSentRange.From.HasValue)
+ {
+ query = query.Where(fm => fm.TimeSent >= timeSentRange.From);
+ }
+ if (timeSentRange.To.HasValue)
+ {
+ query = query.Where(fm => fm.TimeSent <= timeSentRange.To);
+ }
+ }
+
+ var totalCount = await query.CountAsync();
+
+ // Apply sorting
+ query = ApplySorting(query, sortInfo);
+
+ // Apply paging
+ query = query.Skip(pagingInfo.Offset).Take(pagingInfo.Next);
+
+ var entities = await query.AsNoTracking().ToListAsync();
+
+ var results = entities.Select(entity => CreateMessagesView(entity)).ToList();
+
+ return new QueryResult>(results, new QueryStatsInfo(totalCount.ToString(), totalCount, false));
+ });
+ }
+
+ public Task>> GetAllMessagesForEndpoint(string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, bool includeSystemMessages, DateTimeRange? timeSentRange = null)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var query = dbContext.FailedMessages
+ .Where(fm => fm.ReceivingEndpointName == endpointName);
+
+ // Apply time range filter
+ if (timeSentRange != null)
+ {
+ if (timeSentRange.From.HasValue)
+ {
+ query = query.Where(fm => fm.TimeSent >= timeSentRange.From);
+ }
+ if (timeSentRange.To.HasValue)
+ {
+ query = query.Where(fm => fm.TimeSent <= timeSentRange.To);
+ }
+ }
+
+ var totalCount = await query.CountAsync();
+
+ // Apply sorting
+ query = ApplySorting(query, sortInfo);
+
+ // Apply paging
+ query = query.Skip(pagingInfo.Offset).Take(pagingInfo.Next);
+
+ var entities = await query.AsNoTracking().ToListAsync();
+
+ var results = entities.Select(entity => CreateMessagesView(entity)).ToList();
+
+ return new QueryResult>(results, new QueryStatsInfo(totalCount.ToString(), totalCount, false));
+ });
+ }
+
+ public Task>> GetAllMessagesByConversation(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, bool includeSystemMessages)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var query = dbContext.FailedMessages
+ .Where(fm => fm.ConversationId == conversationId);
+
+ var totalCount = await query.CountAsync();
+
+ // Apply sorting
+ query = ApplySorting(query, sortInfo);
+
+ // Apply paging
+ query = query.Skip(pagingInfo.Offset).Take(pagingInfo.Next);
+
+ var entities = await query.AsNoTracking().ToListAsync();
+ var results = entities.Select(CreateMessagesView).ToList();
+
+ return new QueryResult>(results, new QueryStatsInfo(totalCount.ToString(), totalCount, false));
+ });
+ }
+
+ public Task>> GetAllMessagesForSearch(string searchTerms, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var query = dbContext.FailedMessages.AsQueryable();
+
+ // Apply search filter
+ if (!string.IsNullOrWhiteSpace(searchTerms))
+ {
+ query = query.Where(fm =>
+ fm.MessageType!.Contains(searchTerms) ||
+ fm.ExceptionMessage!.Contains(searchTerms) ||
+ fm.UniqueMessageId.Contains(searchTerms));
+ }
+
+ // Apply time range filter
+ if (timeSentRange != null)
+ {
+ if (timeSentRange.From.HasValue)
+ {
+ query = query.Where(fm => fm.TimeSent >= timeSentRange.From);
+ }
+ if (timeSentRange.To.HasValue)
+ {
+ query = query.Where(fm => fm.TimeSent <= timeSentRange.To);
+ }
+ }
+
+ var totalCount = await query.CountAsync();
+
+ // Apply sorting
+ query = ApplySorting(query, sortInfo);
+
+ // Apply paging
+ query = query.Skip(pagingInfo.Offset).Take(pagingInfo.Next);
+
+ var entities = await query.AsNoTracking().ToListAsync();
+
+ var results = entities.Select(entity => CreateMessagesView(entity)).ToList();
+
+ return new QueryResult>(results, new QueryStatsInfo(totalCount.ToString(), totalCount, false));
+ });
+ }
+
+ public Task>> SearchEndpointMessages(string endpointName, string searchKeyword, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var query = dbContext.FailedMessages
+ .Where(fm => fm.ReceivingEndpointName == endpointName);
+
+ // Apply search filter
+ if (!string.IsNullOrWhiteSpace(searchKeyword))
+ {
+ query = query.Where(fm =>
+ fm.MessageType!.Contains(searchKeyword) ||
+ fm.ExceptionMessage!.Contains(searchKeyword) ||
+ fm.UniqueMessageId.Contains(searchKeyword));
+ }
+
+ // Apply time range filter
+ if (timeSentRange != null)
+ {
+ if (timeSentRange.From.HasValue)
+ {
+ query = query.Where(fm => fm.TimeSent >= timeSentRange.From);
+ }
+ if (timeSentRange.To.HasValue)
+ {
+ query = query.Where(fm => fm.TimeSent <= timeSentRange.To);
+ }
+ }
+
+ var totalCount = await query.CountAsync();
+
+ // Apply sorting
+ query = ApplySorting(query, sortInfo);
+
+ // Apply paging
+ query = query.Skip(pagingInfo.Offset).Take(pagingInfo.Next);
+
+ var entities = await query.AsNoTracking().ToListAsync();
+
+ var results = entities.Select(entity => CreateMessagesView(entity)).ToList();
+
+ return new QueryResult>(results, new QueryStatsInfo(totalCount.ToString(), totalCount, false));
+ });
+ }
+
+ public Task>> ErrorGet(string status, string modified, string queueAddress, PagingInfo pagingInfo, SortInfo sortInfo)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var query = dbContext.FailedMessages.AsQueryable();
+
+ // Apply status filter
+ if (!string.IsNullOrWhiteSpace(status))
+ {
+ if (Enum.TryParse(status, true, out var statusEnum))
+ {
+ query = query.Where(fm => fm.Status == statusEnum);
+ }
+ }
+
+ // Apply queue address filter
+ if (!string.IsNullOrWhiteSpace(queueAddress))
+ {
+ query = query.Where(fm => fm.QueueAddress == queueAddress);
+ }
+
+ // Apply modified date filter
+ if (!string.IsNullOrWhiteSpace(modified))
+ {
+ if (DateTime.TryParse(modified, out var modifiedDate))
+ {
+ query = query.Where(fm => fm.LastProcessedAt >= modifiedDate);
+ }
+ }
+
+ var totalCount = await query.CountAsync();
+
+ // Apply sorting
+ query = ApplySorting(query, sortInfo);
+
+ // Apply paging
+ query = query.Skip(pagingInfo.Offset).Take(pagingInfo.Next);
+
+ var entities = await query.AsNoTracking().ToListAsync();
+
+ var results = entities.Select(entity => CreateFailedMessageView(entity)).ToList();
+
+ return new QueryResult>(results, new QueryStatsInfo(totalCount.ToString(), totalCount, false));
+ });
+ }
+
+ public Task ErrorsHead(string status, string modified, string queueAddress)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var query = dbContext.FailedMessages.AsQueryable();
+
+ // Apply status filter
+ if (!string.IsNullOrWhiteSpace(status))
+ {
+ if (Enum.TryParse(status, true, out var statusEnum))
+ {
+ query = query.Where(fm => fm.Status == statusEnum);
+ }
+ }
+
+ // Apply queue address filter
+ if (!string.IsNullOrWhiteSpace(queueAddress))
+ {
+ query = query.Where(fm => fm.QueueAddress == queueAddress);
+ }
+
+ // Apply modified date filter
+ if (!string.IsNullOrWhiteSpace(modified))
+ {
+ if (DateTime.TryParse(modified, out var modifiedDate))
+ {
+ query = query.Where(fm => fm.LastProcessedAt >= modifiedDate);
+ }
+ }
+
+ var totalCount = await query.CountAsync();
+
+ return new QueryStatsInfo(totalCount.ToString(), totalCount, false);
+ });
+ }
+
+ public Task>> ErrorsByEndpointName(string status, string endpointName, string modified, PagingInfo pagingInfo, SortInfo sortInfo)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var query = dbContext.FailedMessages.AsQueryable();
+
+ // Apply endpoint filter
+ query = query.Where(fm => fm.ReceivingEndpointName == endpointName);
+
+ // Apply status filter
+ if (!string.IsNullOrWhiteSpace(status))
+ {
+ if (Enum.TryParse(status, true, out var statusEnum))
+ {
+ query = query.Where(fm => fm.Status == statusEnum);
+ }
+ }
+
+ // Apply modified date filter
+ if (!string.IsNullOrWhiteSpace(modified))
+ {
+ if (DateTime.TryParse(modified, out var modifiedDate))
+ {
+ query = query.Where(fm => fm.LastProcessedAt >= modifiedDate);
+ }
+ }
+
+ var totalCount = await query.CountAsync();
+
+ // Apply sorting
+ query = ApplySorting(query, sortInfo);
+
+ // Apply paging
+ query = query.Skip(pagingInfo.Offset).Take(pagingInfo.Next);
+
+ var entities = await query.AsNoTracking().ToListAsync();
+
+ var results = entities.Select(entity => CreateFailedMessageView(entity)).ToList();
+
+ return new QueryResult>(results, new QueryStatsInfo(totalCount.ToString(), totalCount, false));
+ });
+ }
+
+ public Task> ErrorsSummary()
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var endpointStats = await dbContext.FailedMessages
+ .AsNoTracking()
+ .Where(fm => !string.IsNullOrEmpty(fm.ReceivingEndpointName))
+ .GroupBy(fm => fm.ReceivingEndpointName)
+ .Select(g => new { Endpoint = g.Key, Count = g.Count() })
+ .ToDictionaryAsync(x => x.Endpoint!, x => (object)x.Count);
+
+ var messageTypeStats = await dbContext.FailedMessages
+ .AsNoTracking()
+ .Where(fm => !string.IsNullOrEmpty(fm.MessageType))
+ .GroupBy(fm => fm.MessageType)
+ .Select(g => new { MessageType = g.Key, Count = g.Count() })
+ .ToDictionaryAsync(x => x.MessageType!, x => (object)x.Count);
+
+ var hostStats = await dbContext.FailedMessages
+ .AsNoTracking()
+ .Where(fm => !string.IsNullOrEmpty(fm.QueueAddress))
+ .GroupBy(fm => fm.QueueAddress)
+ .Select(g => new { Host = g.Key, Count = g.Count() })
+ .ToDictionaryAsync(x => x.Host!, x => (object)x.Count);
+
+ return (IDictionary)new Dictionary
+ {
+ ["Endpoints"] = endpointStats,
+ ["Message types"] = messageTypeStats,
+ ["Hosts"] = hostStats
+ };
+ });
+ }
+
+ public Task ErrorLastBy(string failedMessageId)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var entity = await dbContext.FailedMessages
+ .AsNoTracking()
+ .FirstOrDefaultAsync(fm => fm.Id == Guid.Parse(failedMessageId));
+
+ if (entity == null)
+ {
+ return null!;
+ }
+
+ var processingAttempts = JsonSerializer.Deserialize>(entity.ProcessingAttemptsJson, JsonSerializationOptions.Default) ?? [];
+ var lastAttempt = processingAttempts.LastOrDefault();
+
+ if (lastAttempt == null)
+ {
+ return null!;
+ }
+
+ return new FailedMessageView
+ {
+ Id = entity.UniqueMessageId,
+ MessageType = entity.MessageType,
+ TimeSent = entity.TimeSent,
+ IsSystemMessage = false, // Not stored in entity
+ Exception = lastAttempt.FailureDetails?.Exception,
+ MessageId = entity.MessageId,
+ NumberOfProcessingAttempts = entity.NumberOfProcessingAttempts ?? 0,
+ Status = entity.Status,
+ SendingEndpoint = null, // Would need to deserialize from JSON
+ ReceivingEndpoint = null, // Would need to deserialize from JSON
+ QueueAddress = entity.QueueAddress,
+ TimeOfFailure = lastAttempt.FailureDetails?.TimeOfFailure ?? DateTime.MinValue,
+ LastModified = entity.LastProcessedAt ?? DateTime.MinValue,
+ Edited = false, // Not implemented
+ EditOf = null
+ };
+ });
+ }
+
+ public Task ErrorBy(string failedMessageId)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var entity = await dbContext.FailedMessages
+ .AsNoTracking()
+ .FirstOrDefaultAsync(fm => fm.Id == Guid.Parse(failedMessageId));
+
+ if (entity == null)
+ {
+ return null!;
+ }
+
+ return new FailedMessage
+ {
+ Id = entity.Id.ToString(),
+ UniqueMessageId = entity.UniqueMessageId,
+ Status = entity.Status,
+ ProcessingAttempts = JsonSerializer.Deserialize>(entity.ProcessingAttemptsJson, JsonSerializationOptions.Default) ?? [],
+ FailureGroups = JsonSerializer.Deserialize>(entity.FailureGroupsJson, JsonSerializationOptions.Default) ?? []
+ };
+ });
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Implementation/ErrorMessageDataStore.Recoverability.cs b/src/ServiceControl.Persistence.Sql.Core/Implementation/ErrorMessageDataStore.Recoverability.cs
new file mode 100644
index 0000000000..90c423687f
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Implementation/ErrorMessageDataStore.Recoverability.cs
@@ -0,0 +1,172 @@
+namespace ServiceControl.Persistence.Sql.Core.Implementation;
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Microsoft.EntityFrameworkCore;
+using ServiceControl.MessageFailures;
+
+partial class ErrorMessageDataStore
+{
+ public Task FailedMessageMarkAsArchived(string failedMessageId)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var failedMessage = await dbContext.FailedMessages
+ .FirstOrDefaultAsync(fm => fm.Id == Guid.Parse(failedMessageId));
+
+ if (failedMessage != null)
+ {
+ failedMessage.Status = FailedMessageStatus.Archived;
+ await dbContext.SaveChangesAsync();
+ }
+ });
+ }
+
+ public Task MarkMessageAsResolved(string failedMessageId)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var failedMessage = await dbContext.FailedMessages
+ .FirstOrDefaultAsync(fm => fm.Id == Guid.Parse(failedMessageId));
+
+ if (failedMessage == null)
+ {
+ return false;
+ }
+
+ failedMessage.Status = FailedMessageStatus.Resolved;
+ await dbContext.SaveChangesAsync();
+ return true;
+ });
+ }
+
+ public Task ProcessPendingRetries(DateTime periodFrom, DateTime periodTo, string queueAddress, Func processCallback)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var query = dbContext.FailedMessages
+ .AsNoTracking()
+ .Where(fm => fm.Status == FailedMessageStatus.RetryIssued &&
+ fm.LastProcessedAt >= periodFrom &&
+ fm.LastProcessedAt < periodTo);
+
+ if (!string.IsNullOrWhiteSpace(queueAddress))
+ {
+ query = query.Where(fm => fm.QueueAddress == queueAddress);
+ }
+
+ var failedMessageIds = await query
+ .Select(fm => fm.Id)
+ .ToListAsync();
+
+ foreach (var failedMessageId in failedMessageIds)
+ {
+ await processCallback(failedMessageId.ToString());
+ }
+ });
+ }
+
+ public Task UnArchiveMessagesByRange(DateTime from, DateTime to)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ // First, get the unique message IDs that will be affected
+ var uniqueMessageIds = await dbContext.FailedMessages
+ .AsNoTracking()
+ .Where(fm => fm.Status == FailedMessageStatus.Archived &&
+ fm.LastProcessedAt >= from &&
+ fm.LastProcessedAt < to)
+ .Select(fm => fm.UniqueMessageId)
+ .ToListAsync();
+
+ // Then update all matching messages in a single operation
+ await dbContext.FailedMessages
+ .Where(fm => fm.Status == FailedMessageStatus.Archived &&
+ fm.LastProcessedAt >= from &&
+ fm.LastProcessedAt < to)
+ .ExecuteUpdateAsync(setters => setters.SetProperty(fm => fm.Status, FailedMessageStatus.Unresolved));
+
+ return uniqueMessageIds.ToArray();
+ });
+ }
+
+ public Task UnArchiveMessages(IEnumerable failedMessageIds)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ // Convert string IDs to Guids for querying
+ var messageGuids = failedMessageIds.Select(Guid.Parse).ToList();
+
+ // First, get the unique message IDs that will be affected
+ var uniqueMessageIds = await dbContext.FailedMessages
+ .AsNoTracking()
+ .Where(fm => messageGuids.Contains(fm.Id) && fm.Status == FailedMessageStatus.Archived)
+ .Select(fm => fm.UniqueMessageId)
+ .ToListAsync();
+
+ // Then update all matching messages in a single operation
+ await dbContext.FailedMessages
+ .Where(fm => messageGuids.Contains(fm.Id) && fm.Status == FailedMessageStatus.Archived)
+ .ExecuteUpdateAsync(setters => setters.SetProperty(fm => fm.Status, FailedMessageStatus.Unresolved));
+
+ return uniqueMessageIds.ToArray();
+ });
+ }
+
+ public Task RevertRetry(string messageUniqueId)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ // Change status back to Unresolved
+ var failedMessage = await dbContext.FailedMessages
+ .FirstOrDefaultAsync(fm => fm.UniqueMessageId == messageUniqueId);
+
+ if (failedMessage != null)
+ {
+ failedMessage.Status = FailedMessageStatus.Unresolved;
+ await dbContext.SaveChangesAsync();
+ }
+ });
+ }
+
+ public Task RemoveFailedMessageRetryDocument(string uniqueMessageId)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var retryDocumentId = $"FailedMessages/{uniqueMessageId}";
+ var retryDocument = await dbContext.FailedMessageRetries
+ .FirstOrDefaultAsync(r => r.FailedMessageId == retryDocumentId);
+
+ if (retryDocument != null)
+ {
+ dbContext.FailedMessageRetries.Remove(retryDocument);
+ await dbContext.SaveChangesAsync();
+ }
+ });
+ }
+
+ public Task GetRetryPendingMessages(DateTime from, DateTime to, string queueAddress)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var query = dbContext.FailedMessages
+ .AsNoTracking()
+ .Where(fm => fm.Status == FailedMessageStatus.RetryIssued &&
+ fm.LastProcessedAt >= from &&
+ fm.LastProcessedAt < to);
+
+ if (!string.IsNullOrWhiteSpace(queueAddress))
+ {
+ query = query.Where(fm => fm.QueueAddress == queueAddress);
+ }
+
+ var messageIds = await query
+ .Select(fm => fm.UniqueMessageId)
+ .ToListAsync();
+
+ return messageIds.ToArray();
+ });
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Implementation/ErrorMessageDataStore.ViewMapping.cs b/src/ServiceControl.Persistence.Sql.Core/Implementation/ErrorMessageDataStore.ViewMapping.cs
new file mode 100644
index 0000000000..ff0d298b5a
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Implementation/ErrorMessageDataStore.ViewMapping.cs
@@ -0,0 +1,165 @@
+namespace ServiceControl.Persistence.Sql.Core.Implementation;
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text.Json;
+using CompositeViews.Messages;
+using Entities;
+using Infrastructure;
+using MessageFailures.Api;
+using NServiceBus;
+using ServiceControl.MessageFailures;
+using ServiceControl.Operations;
+using ServiceControl.Persistence;
+using ServiceControl.Persistence.Infrastructure;
+using ServiceControl.SagaAudit;
+
+partial class ErrorMessageDataStore
+{
+ internal static IQueryable ApplySorting(IQueryable query, SortInfo sortInfo)
+ {
+ if (sortInfo == null || string.IsNullOrWhiteSpace(sortInfo.Sort))
+ {
+ return query.OrderByDescending(fm => fm.TimeSent);
+ }
+
+ var isDescending = sortInfo.Direction == "desc";
+
+ return sortInfo.Sort.ToLower() switch
+ {
+ "id" or "message_id" => isDescending
+ ? query.OrderByDescending(fm => fm.MessageId)
+ : query.OrderBy(fm => fm.MessageId),
+ "message_type" => isDescending
+ ? query.OrderByDescending(fm => fm.MessageType)
+ : query.OrderBy(fm => fm.MessageType),
+ "processed_at" => isDescending
+ ? query.OrderByDescending(fm => fm.LastProcessedAt)
+ : query.OrderBy(fm => fm.LastProcessedAt),
+ "status" => isDescending
+ ? query.OrderByDescending(fm => fm.Status)
+ : query.OrderBy(fm => fm.Status),
+ "time_sent" or _ => isDescending
+ ? query.OrderByDescending(fm => fm.TimeSent)
+ : query.OrderBy(fm => fm.TimeSent)
+ };
+ }
+
+ internal static FailedMessageView CreateFailedMessageView(FailedMessageEntity entity)
+ {
+ var processingAttempts = JsonSerializer.Deserialize>(entity.ProcessingAttemptsJson, JsonSerializationOptions.Default) ?? [];
+ var lastAttempt = processingAttempts.LastOrDefault();
+
+ // Extract endpoint details from metadata (stored during ingestion)
+ EndpointDetails? sendingEndpoint = null;
+ EndpointDetails? receivingEndpoint = null;
+
+ if (lastAttempt?.MessageMetadata != null)
+ {
+ if (lastAttempt.MessageMetadata.TryGetValue("SendingEndpoint", out var sendingObj) && sendingObj is JsonElement sendingJson)
+ {
+ sendingEndpoint = JsonSerializer.Deserialize(sendingJson.GetRawText(), JsonSerializationOptions.Default);
+ }
+
+ if (lastAttempt.MessageMetadata.TryGetValue("ReceivingEndpoint", out var receivingObj) && receivingObj is JsonElement receivingJson)
+ {
+ receivingEndpoint = JsonSerializer.Deserialize(receivingJson.GetRawText(), JsonSerializationOptions.Default);
+ }
+ }
+
+ return new FailedMessageView
+ {
+ Id = entity.UniqueMessageId,
+ MessageType = entity.MessageType,
+ TimeSent = entity.TimeSent,
+ IsSystemMessage = false, // Not stored in entity
+ Exception = lastAttempt?.FailureDetails?.Exception,
+ MessageId = entity.MessageId,
+ NumberOfProcessingAttempts = entity.NumberOfProcessingAttempts ?? 0,
+ Status = entity.Status,
+ SendingEndpoint = sendingEndpoint,
+ ReceivingEndpoint = receivingEndpoint,
+ QueueAddress = entity.QueueAddress,
+ TimeOfFailure = lastAttempt?.FailureDetails?.TimeOfFailure ?? DateTime.MinValue,
+ LastModified = entity.LastProcessedAt ?? DateTime.MinValue,
+ Edited = false, // Not implemented
+ EditOf = null
+ };
+ }
+
+ internal static MessagesView CreateMessagesView(FailedMessageEntity entity)
+ {
+ var processingAttempts = JsonSerializer.Deserialize>(entity.ProcessingAttemptsJson, JsonSerializationOptions.Default) ?? [];
+ var lastAttempt = processingAttempts.LastOrDefault();
+ var headers = JsonSerializer.Deserialize>(entity.HeadersJson, JsonSerializationOptions.Default) ?? [];
+
+ // Extract metadata from the last processing attempt (matching RavenDB implementation)
+ var metadata = lastAttempt?.MessageMetadata;
+
+ var isSystemMessage = metadata?.TryGetValue("IsSystemMessage", out var isSystem) == true && isSystem is bool b && b;
+ var bodySize = metadata?.TryGetValue("ContentLength", out var size) == true && size is int contentLength ? contentLength : 0;
+ var messageIntent = metadata?.TryGetValue("MessageIntent", out var mi) == true && mi is JsonElement miJson && Enum.TryParse(miJson.GetString(), out var parsedMi) ? parsedMi : MessageIntent.Send;
+
+ // Extract endpoint details from metadata (stored during ingestion)
+ EndpointDetails? sendingEndpoint = null;
+ EndpointDetails? receivingEndpoint = null;
+ SagaInfo? originatesFromSaga = null;
+ List? invokedSagas = null;
+
+ if (metadata != null)
+ {
+ if (metadata.TryGetValue("SendingEndpoint", out var sendingObj) && sendingObj is JsonElement sendingJson)
+ {
+ sendingEndpoint = JsonSerializer.Deserialize(sendingJson.GetRawText(), JsonSerializationOptions.Default);
+ }
+
+ if (metadata.TryGetValue("ReceivingEndpoint", out var receivingObj) && receivingObj is JsonElement receivingJson)
+ {
+ receivingEndpoint = JsonSerializer.Deserialize(receivingJson.GetRawText(), JsonSerializationOptions.Default);
+ }
+
+ if (metadata.TryGetValue("OriginatesFromSaga", out var sagaObj) && sagaObj is JsonElement sagaJson)
+ {
+ originatesFromSaga = JsonSerializer.Deserialize(sagaJson.GetRawText(), JsonSerializationOptions.Default);
+ }
+
+ if (metadata.TryGetValue("InvokedSagas", out var sagasObj) && sagasObj is JsonElement sagasJson)
+ {
+ invokedSagas = JsonSerializer.Deserialize>(sagasJson.GetRawText(), JsonSerializationOptions.Default);
+ }
+ }
+
+ // Calculate status matching RavenDB logic
+ var status = entity.Status == FailedMessageStatus.Resolved
+ ? MessageStatus.ResolvedSuccessfully
+ : entity.Status == FailedMessageStatus.RetryIssued
+ ? MessageStatus.RetryIssued
+ : entity.Status == FailedMessageStatus.Archived
+ ? MessageStatus.ArchivedFailure
+ : entity.NumberOfProcessingAttempts == 1
+ ? MessageStatus.Failed
+ : MessageStatus.RepeatedFailure;
+
+ return new MessagesView
+ {
+ Id = entity.UniqueMessageId,
+ MessageId = entity.MessageId,
+ MessageType = entity.MessageType,
+ SendingEndpoint = sendingEndpoint,
+ ReceivingEndpoint = receivingEndpoint,
+ TimeSent = entity.TimeSent,
+ ProcessedAt = entity.LastProcessedAt ?? DateTime.MinValue,
+ IsSystemMessage = isSystemMessage,
+ ConversationId = entity.ConversationId,
+ Headers = headers.Select(h => new KeyValuePair(h.Key, h.Value)),
+ Status = status,
+ MessageIntent = messageIntent,
+ BodyUrl = $"/api/errors/{entity.UniqueMessageId}/body",
+ BodySize = bodySize,
+ InvokedSagas = invokedSagas ?? [],
+ OriginatesFromSaga = originatesFromSaga,
+ InstanceId = null // Not available for failed messages
+ };
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Implementation/ErrorMessageDataStore.cs b/src/ServiceControl.Persistence.Sql.Core/Implementation/ErrorMessageDataStore.cs
new file mode 100644
index 0000000000..2c0544db61
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Implementation/ErrorMessageDataStore.cs
@@ -0,0 +1,129 @@
+namespace ServiceControl.Persistence.Sql.Core.Implementation;
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text.Json;
+using System.Threading.Tasks;
+using Entities;
+using Infrastructure;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.Extensions.DependencyInjection;
+using ServiceControl.EventLog;
+using ServiceControl.MessageFailures;
+using ServiceControl.Operations;
+using ServiceControl.Persistence;
+
+partial class ErrorMessageDataStore : DataStoreBase, IErrorMessageDataStore
+{
+ public ErrorMessageDataStore(IServiceProvider serviceProvider) : base(serviceProvider)
+ {
+ }
+
+ public Task FailedMessagesFetch(Guid[] ids)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var entities = await dbContext.FailedMessages
+ .AsNoTracking()
+ .Where(fm => ids.Contains(fm.Id))
+ .ToListAsync();
+
+ return entities.Select(entity => new FailedMessage
+ {
+ Id = entity.Id.ToString(),
+ UniqueMessageId = entity.UniqueMessageId,
+ Status = entity.Status,
+ ProcessingAttempts = JsonSerializer.Deserialize>(entity.ProcessingAttemptsJson, JsonSerializationOptions.Default) ?? [],
+ FailureGroups = JsonSerializer.Deserialize>(entity.FailureGroupsJson, JsonSerializationOptions.Default) ?? []
+ }).ToArray();
+ });
+ }
+
+ public Task StoreFailedErrorImport(FailedErrorImport failure)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var entity = new FailedErrorImportEntity
+ {
+ Id = Guid.Parse(failure.Id),
+ MessageJson = JsonSerializer.Serialize(failure.Message, JsonSerializationOptions.Default),
+ ExceptionInfo = failure.ExceptionInfo
+ };
+
+ dbContext.FailedErrorImports.Add(entity);
+ await dbContext.SaveChangesAsync();
+ });
+ }
+
+ public Task CreateEditFailedMessageManager()
+ {
+ var scope = serviceProvider.CreateScope();
+ var manager = new EditFailedMessagesManager(scope);
+ return Task.FromResult(manager);
+ }
+
+ public Task CreateNotificationsManager()
+ {
+ var notificationsManager = serviceProvider.GetRequiredService();
+ return Task.FromResult(notificationsManager);
+ }
+
+ public async Task StoreEventLogItem(EventLogItem logItem)
+ {
+ using var scope = serviceProvider.CreateScope();
+ var eventLogDataStore = scope.ServiceProvider.GetRequiredService();
+ await eventLogDataStore.Add(logItem);
+ }
+
+ public Task StoreFailedMessagesForTestsOnly(params FailedMessage[] failedMessages)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ foreach (var failedMessage in failedMessages)
+ {
+ var lastAttempt = failedMessage.ProcessingAttempts.LastOrDefault();
+
+ var entity = new FailedMessageEntity
+ {
+ Id = Guid.Parse(failedMessage.Id),
+ UniqueMessageId = failedMessage.UniqueMessageId,
+ Status = failedMessage.Status,
+ ProcessingAttemptsJson = JsonSerializer.Serialize(failedMessage.ProcessingAttempts, JsonSerializationOptions.Default),
+ FailureGroupsJson = JsonSerializer.Serialize(failedMessage.FailureGroups, JsonSerializationOptions.Default),
+ HeadersJson = JsonSerializer.Serialize(lastAttempt?.Headers ?? [], JsonSerializationOptions.Default),
+ PrimaryFailureGroupId = failedMessage.FailureGroups.Count > 0 ? failedMessage.FailureGroups[0].Id : null,
+
+ // Extract denormalized fields from last processing attempt if available
+ MessageId = lastAttempt?.MessageId,
+ MessageType = lastAttempt?.Headers?.GetValueOrDefault("NServiceBus.EnclosedMessageTypes"),
+ TimeSent = lastAttempt?.Headers != null && lastAttempt.Headers.TryGetValue("NServiceBus.TimeSent", out var ts) && DateTimeOffset.TryParse(ts, out var parsedTime) ? parsedTime.UtcDateTime : null,
+ SendingEndpointName = lastAttempt?.Headers?.GetValueOrDefault("NServiceBus.OriginatingEndpoint"),
+ ReceivingEndpointName = lastAttempt?.Headers?.GetValueOrDefault("NServiceBus.ProcessingEndpoint"),
+ ExceptionType = lastAttempt?.FailureDetails?.Exception?.ExceptionType,
+ ExceptionMessage = lastAttempt?.FailureDetails?.Exception?.Message,
+ QueueAddress = lastAttempt?.FailureDetails?.AddressOfFailingEndpoint,
+ NumberOfProcessingAttempts = failedMessage.ProcessingAttempts.Count,
+ LastProcessedAt = lastAttempt?.AttemptedAt,
+ ConversationId = lastAttempt?.Headers?.GetValueOrDefault("NServiceBus.ConversationId"),
+ };
+
+ dbContext.FailedMessages.Add(entity);
+ }
+
+ await dbContext.SaveChangesAsync();
+ });
+ }
+
+ public Task FetchFromFailedMessage(string uniqueMessageId)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var messageBody = await dbContext.MessageBodies
+ .AsNoTracking()
+ .FirstOrDefaultAsync(mb => mb.Id == Guid.Parse(uniqueMessageId));
+
+ return messageBody?.Body!;
+ });
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Implementation/EventLogDataStore.cs b/src/ServiceControl.Persistence.Sql.Core/Implementation/EventLogDataStore.cs
new file mode 100644
index 0000000000..49dc6448f3
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Implementation/EventLogDataStore.cs
@@ -0,0 +1,73 @@
+namespace ServiceControl.Persistence.Sql.Core.Implementation;
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text.Json;
+using System.Threading.Tasks;
+using Entities;
+using Infrastructure;
+using Microsoft.EntityFrameworkCore;
+using ServiceControl.EventLog;
+using ServiceControl.Persistence;
+using ServiceControl.Persistence.Infrastructure;
+
+public class EventLogDataStore : DataStoreBase, IEventLogDataStore
+{
+ public EventLogDataStore(IServiceProvider serviceProvider) : base(serviceProvider)
+ {
+ }
+
+ public Task Add(EventLogItem logItem)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var entity = new EventLogItemEntity
+ {
+ Id = SequentialGuidGenerator.NewSequentialGuid(),
+ Description = logItem.Description,
+ Severity = (int)logItem.Severity,
+ RaisedAt = logItem.RaisedAt,
+ Category = logItem.Category,
+ EventType = logItem.EventType,
+ RelatedToJson = logItem.RelatedTo != null ? JsonSerializer.Serialize(logItem.RelatedTo, JsonSerializationOptions.Default) : null
+ };
+
+ await dbContext.EventLogItems.AddAsync(entity);
+ await dbContext.SaveChangesAsync();
+ });
+ }
+
+ public Task<(IList items, long total, string version)> GetEventLogItems(PagingInfo pagingInfo)
+ {
+ return ExecuteWithDbContext(async dbContext =>
+ {
+ var query = dbContext.EventLogItems
+ .AsNoTracking()
+ .OrderByDescending(e => e.RaisedAt);
+
+ var total = await query.CountAsync();
+
+ var entities = await query
+ .Skip(pagingInfo.Offset)
+ .Take(pagingInfo.PageSize)
+ .ToListAsync();
+
+ var items = entities.Select(entity => new EventLogItem
+ {
+ Id = entity.Id.ToString(),
+ Description = entity.Description,
+ Severity = (Severity)entity.Severity,
+ RaisedAt = entity.RaisedAt,
+ Category = entity.Category,
+ EventType = entity.EventType,
+ RelatedTo = entity.RelatedToJson != null ? JsonSerializer.Deserialize>(entity.RelatedToJson, JsonSerializationOptions.Default) : null
+ }).ToList();
+
+ // Version could be based on the latest RaisedAt timestamp but the paging can affect this result, given that the latest may not be retrieved
+ var version = entities.Any() ? entities.Max(e => e.RaisedAt).Ticks.ToString() : "0";
+
+ return ((IList)items, (long)total, version);
+ });
+ }
+}
diff --git a/src/ServiceControl.Persistence.Sql.Core/Implementation/ExternalIntegrationRequestsDataStore.cs b/src/ServiceControl.Persistence.Sql.Core/Implementation/ExternalIntegrationRequestsDataStore.cs
new file mode 100644
index 0000000000..7a243cc929
--- /dev/null
+++ b/src/ServiceControl.Persistence.Sql.Core/Implementation/ExternalIntegrationRequestsDataStore.cs
@@ -0,0 +1,150 @@
+namespace ServiceControl.Persistence.Sql.Core.Implementation;
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text.Json;
+using System.Threading;
+using System.Threading.Tasks;
+using Entities;
+using Infrastructure;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.Extensions.Logging;
+using ServiceControl.ExternalIntegrations;
+using ServiceControl.Persistence;
+
+public class ExternalIntegrationRequestsDataStore : DataStoreBase, IExternalIntegrationRequestsDataStore, IAsyncDisposable
+{
+ readonly ILogger logger;
+ readonly CancellationTokenSource tokenSource = new();
+
+ Func