diff --git a/CHANGELOG.md b/CHANGELOG.md index 93c3bc2b..8ce429cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Added `Config.LeaderDomain` to allow multiple River clients to be elected leader within a single schema/database and run maintenance services on only their configured queues. [PR #1113](https://github.com/riverqueue/river/pull/1113). + ## [0.29.0] - 2025-12-22 ### Added diff --git a/client.go b/client.go index 4d54b39a..70872be0 100644 --- a/client.go +++ b/client.go @@ -9,6 +9,7 @@ import ( "log/slog" "os" "regexp" + "slices" "strings" "sync" "time" @@ -209,6 +210,47 @@ type Config struct { // Jobs may have their own specific hooks by implementing JobArgsWithHooks. Hooks []rivertype.Hook + // LeaderDomain is an optional "domain" string to use for leader election. + // Different clients sharing the same River schema can elect multiple + // leaders as long as they're using different domains, with one leader + // elected per domain. + // + // Setting this value also triggers the related behavior that maintenance + // services start to only operate on the queues they're configured on. So + // for example, given client1 handling queue_a and queue_b and client2 + // handling queue_c and queue_d, whichever client is elected leader will end + // up running all maintenance services for all queues (queue_a, queue_b, + // queue_c, and queue_d). But if client1 is using domain "domain1" and + // client2 is using domain "domain2", then client1 (elected in domain1) will + // only run maintenance services on queue_a and queue_b, while client2 + // (elected in domain2) will run maintenance services on queue_c and + // queue_d. + // + // A warning though that River *does not protect against configuration + // mistakes*. If client1 on domain1 is configured for queue_a and queue_b, + // and client2 on domain2 is *also* configured for queue_a and queue_b, then + // both clients may end up running maintenance services on the same queues + // at the same time. It's the caller's responsibility to ensure that doesn't + // happen. + // + // Left empty or use of the special value "default" causes the client to + // operate on all queues. When setting this value to non-empty + // non-"default", no other clients should be left empty or use "default" + // because the default client(s) will infringe on the domains of the + // non-default one(s). + // + // Certain maintenance services that aren't queue-related like the indexer + // will continue to run on all leaders regardless of domain. If using this + // feature, it's a good idea to configure ReindexerTimeout on all but a + // single leader domain to river.NeverSchedule(). + // + // In general, most River users should not need LeaderDomain, and when + // running multiple Rivers may want to consider using multiple databases and + // multiple schemas instead. + // + // Defaults to "default". + LeaderDomain string + // Logger is the structured logger to use for logging purposes. If none is // specified, logs will be emitted to STDOUT with messages at warn level // or higher. @@ -415,6 +457,7 @@ func (c *Config) WithDefaults() *Config { Hooks: c.Hooks, JobInsertMiddleware: c.JobInsertMiddleware, JobTimeout: cmp.Or(c.JobTimeout, JobTimeoutDefault), + LeaderDomain: c.LeaderDomain, Logger: logger, MaxAttempts: cmp.Or(c.MaxAttempts, MaxAttemptsDefault), Middleware: c.Middleware, @@ -840,6 +883,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, &leadership.Config{ ClientID: config.ID, + Domain: config.LeaderDomain, Schema: config.Schema, }) client.services = append(client.services, client.elector) @@ -860,6 +904,14 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client client.services = append(client.services, pluginPilot.PluginServices()...) } + // It's important for queuesIncluded to be `nil` in case it's not in use + // for the various driver queries to work correctly. + var queuesIncluded []string + if config.LeaderDomain != "" && config.LeaderDomain != leadership.DomainDefault && len(config.Queues) > 0 { + queuesIncluded = maputil.Keys(config.Queues) + slices.Sort(queuesIncluded) + } + // // Maintenance services // @@ -872,6 +924,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client CompletedJobRetentionPeriod: config.CompletedJobRetentionPeriod, DiscardedJobRetentionPeriod: config.DiscardedJobRetentionPeriod, QueuesExcluded: client.pilot.JobCleanerQueuesExcluded(), + QueuesIncluded: queuesIncluded, Schema: config.Schema, Timeout: config.JobCleanerTimeout, }, driver.GetExecutor()) @@ -882,6 +935,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client { jobRescuer := maintenance.NewRescuer(archetype, &maintenance.JobRescuerConfig{ ClientRetryPolicy: config.RetryPolicy, + QueuesIncluded: queuesIncluded, RescueAfter: config.RescueStuckJobsAfter, Schema: config.Schema, WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory { @@ -897,9 +951,10 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client { jobScheduler := maintenance.NewJobScheduler(archetype, &maintenance.JobSchedulerConfig{ - Interval: config.schedulerInterval, - NotifyInsert: client.maybeNotifyInsertForQueues, - Schema: config.Schema, + Interval: config.schedulerInterval, + NotifyInsert: client.maybeNotifyInsertForQueues, + QueuesIncluded: queuesIncluded, + Schema: config.Schema, }, driver.GetExecutor()) maintenanceServices = append(maintenanceServices, jobScheduler) client.testSignals.jobScheduler = &jobScheduler.TestSignals @@ -925,6 +980,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client { queueCleaner := maintenance.NewQueueCleaner(archetype, &maintenance.QueueCleanerConfig{ + QueuesIncluded: queuesIncluded, RetentionPeriod: maintenance.QueueRetentionPeriodDefault, Schema: config.Schema, }, driver.GetExecutor()) diff --git a/client_test.go b/client_test.go index 90ccc43e..d9ca410b 100644 --- a/client_test.go +++ b/client_test.go @@ -1491,6 +1491,154 @@ func Test_Client_Common(t *testing.T) { startstoptest.Stress(ctx, t, clientWithStop) }) + + t.Run("LeaderDomain_Alternate", func(t *testing.T) { + t.Parallel() + + var client1 *Client[pgx.Tx] + { + config, bundle := setupConfig(t) + config.LeaderDomain = "domain1" + config.ReindexerSchedule = &neverSchedule{} + config.Queues = map[string]QueueConfig{ + "queue_a": {MaxWorkers: 50}, + "queue_b": {MaxWorkers: 50}, + } + + var err error + client1, err = NewClient(bundle.driver, config) + require.NoError(t, err) + client1.testSignals.Init(t) + } + + var client2 *Client[pgx.Tx] + { + config, bundle := setupConfig(t) + config.LeaderDomain = "domain2" + config.Queues = map[string]QueueConfig{ + "queue_c": {MaxWorkers: 50}, + "queue_d": {MaxWorkers: 50}, + } + config.Schema = client1.config.Schema + config.ReindexerSchedule = &neverSchedule{} + + var err error + client2, err = NewClient(bundle.driver, config) + require.NoError(t, err) + client2.testSignals.Init(t) + } + + startClient(ctx, t, client1) + startClient(ctx, t, client2) + + // Both elected + client1.testSignals.electedLeader.WaitOrTimeout() + client2.testSignals.electedLeader.WaitOrTimeout() + }) + + t.Run("LeaderDomain_MaintenanceServiceConfigEmpty", func(t *testing.T) { + t.Parallel() + + config, bundle := setupConfig(t) + config.Queues = map[string]QueueConfig{ + "queue_a": {MaxWorkers: 50}, + "queue_b": {MaxWorkers: 50}, + } + + client, err := NewClient(bundle.driver, config) + require.NoError(t, err) + client.testSignals.Init(t) + + jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer) + require.Nil(t, jobCleaner.Config.QueuesIncluded) + jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client.queueMaintainer) + require.Nil(t, jobRescuer.Config.QueuesIncluded) + jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client.queueMaintainer) + require.Nil(t, jobScheduler.Config.QueuesIncluded) + queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client.queueMaintainer) + require.Nil(t, queueCleaner.Config.QueuesIncluded) + }) + + // The domain "default" is special in that it behaves like if LeaderDomain + // was not set. + t.Run("LeaderDomain_MaintenanceServiceConfigDefault", func(t *testing.T) { + t.Parallel() + + config, bundle := setupConfig(t) + config.LeaderDomain = "default" + config.Queues = map[string]QueueConfig{ + "queue_a": {MaxWorkers: 50}, + "queue_b": {MaxWorkers: 50}, + } + + client, err := NewClient(bundle.driver, config) + require.NoError(t, err) + client.testSignals.Init(t) + + jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer) + require.Nil(t, jobCleaner.Config.QueuesIncluded) + jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client.queueMaintainer) + require.Nil(t, jobRescuer.Config.QueuesIncluded) + jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client.queueMaintainer) + require.Nil(t, jobScheduler.Config.QueuesIncluded) + queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client.queueMaintainer) + require.Nil(t, queueCleaner.Config.QueuesIncluded) + }) + + // When non-default leader domains are configured, each client's maintenance + // services are limited to only their client's queues. + t.Run("LeaderDomain_MaintenanceServiceConfigAlternate", func(t *testing.T) { + t.Parallel() + + var client1 *Client[pgx.Tx] + { + config, bundle := setupConfig(t) + config.LeaderDomain = "domain1" + config.ReindexerSchedule = &neverSchedule{} + config.Queues = map[string]QueueConfig{ + "queue_a": {MaxWorkers: 50}, + "queue_b": {MaxWorkers: 50}, + } + + var err error + client1, err = NewClient(bundle.driver, config) + require.NoError(t, err) + client1.testSignals.Init(t) + + jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client1.queueMaintainer) + require.Equal(t, []string{"queue_a", "queue_b"}, jobCleaner.Config.QueuesIncluded) + jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client1.queueMaintainer) + require.Equal(t, []string{"queue_a", "queue_b"}, jobRescuer.Config.QueuesIncluded) + jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client1.queueMaintainer) + require.Equal(t, []string{"queue_a", "queue_b"}, jobScheduler.Config.QueuesIncluded) + queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client1.queueMaintainer) + require.Equal(t, []string{"queue_a", "queue_b"}, queueCleaner.Config.QueuesIncluded) + } + + { + config, bundle := setupConfig(t) + config.LeaderDomain = "domain2" + config.Queues = map[string]QueueConfig{ + "queue_c": {MaxWorkers: 50}, + "queue_d": {MaxWorkers: 50}, + } + config.Schema = client1.config.Schema + config.ReindexerSchedule = &neverSchedule{} + + client2, err := NewClient(bundle.driver, config) + require.NoError(t, err) + client2.testSignals.Init(t) + + jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client2.queueMaintainer) + require.Equal(t, []string{"queue_c", "queue_d"}, jobCleaner.Config.QueuesIncluded) + jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client2.queueMaintainer) + require.Equal(t, []string{"queue_c", "queue_d"}, jobRescuer.Config.QueuesIncluded) + jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client2.queueMaintainer) + require.Equal(t, []string{"queue_c", "queue_d"}, jobScheduler.Config.QueuesIncluded) + queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client2.queueMaintainer) + require.Equal(t, []string{"queue_c", "queue_d"}, queueCleaner.Config.QueuesIncluded) + } + }) } type workerWithMiddleware[T JobArgs] struct { diff --git a/internal/leadership/elector.go b/internal/leadership/elector.go index 69ddd75c..878f5d98 100644 --- a/internal/leadership/elector.go +++ b/internal/leadership/elector.go @@ -22,6 +22,8 @@ import ( "github.com/riverqueue/river/rivershared/util/testutil" ) +const DomainDefault = "default" + const ( electIntervalDefault = 5 * time.Second electIntervalJitterDefault = 1 * time.Second @@ -82,6 +84,7 @@ func (ts *electorTestSignals) Init(tb testutil.TestingTB) { type Config struct { ClientID string + Domain string ElectInterval time.Duration // period on which each elector attempts elect even without having received a resignation notification ElectIntervalJitter time.Duration Schema string @@ -121,6 +124,7 @@ func NewElector(archetype *baseservice.Archetype, exec riverdriver.Executor, not return baseservice.Init(archetype, &Elector{ config: (&Config{ ClientID: config.ClientID, + Domain: cmp.Or(config.Domain, string(DomainDefault)), ElectInterval: cmp.Or(config.ElectInterval, electIntervalDefault), ElectIntervalJitter: cmp.Or(config.ElectIntervalJitter, electIntervalJitterDefault), Schema: config.Schema, @@ -143,9 +147,9 @@ func (e *Elector) Start(ctx context.Context) error { var sub *notifier.Subscription if e.notifier == nil { - e.Logger.DebugContext(ctx, e.Name+": No notifier configured; starting in poll mode", "client_id", e.config.ClientID) + e.Logger.DebugContext(ctx, e.Name+": Resigned leadership successfully", "client_id", e.config.ClientID, "domain", e.config.Domain) } else { - e.Logger.DebugContext(ctx, e.Name+": Listening for leadership changes", "client_id", e.config.ClientID, "topic", notifier.NotificationTopicLeadership) + e.Logger.DebugContext(ctx, e.Name+": Resigned leadership successfully", "client_id", e.config.ClientID, "domain", e.config.Domain, "topic", notifier.NotificationTopicLeadership) var err error sub, err = e.notifier.Listen(ctx, notifier.NotificationTopicLeadership, func(topic notifier.NotificationTopic, payload string) { e.handleLeadershipNotification(ctx, topic, payload) @@ -180,7 +184,7 @@ func (e *Elector) Start(ctx context.Context) error { return } - e.Logger.DebugContext(ctx, e.Name+": Gained leadership", "client_id", e.config.ClientID) + e.Logger.DebugContext(ctx, e.Name+": Gained leadership", "client_id", e.config.ClientID, "domain", e.config.Domain) e.testSignals.GainedLeadership.Signal(struct{}{}) err := e.keepLeadershipLoop(ctx) @@ -193,7 +197,7 @@ func (e *Elector) Start(ctx context.Context) error { continue // lost leadership reelection; unusual but not a problem; don't log } - e.Logger.ErrorContext(ctx, e.Name+": Error keeping leadership", "client_id", e.config.ClientID, "err", err) + e.Logger.ErrorContext(ctx, e.Name+": Error keeping leadership", "client_id", e.config.ClientID, "domain", e.config.Domain, "err", err) } } }() @@ -205,10 +209,11 @@ func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error { var attempt int for { attempt++ - e.Logger.DebugContext(ctx, e.Name+": Attempting to gain leadership", "client_id", e.config.ClientID) + e.Logger.DebugContext(ctx, e.Name+": Attempting to gain leadership", "client_id", e.config.ClientID, "domain", e.config.Domain) elected, err := attemptElectOrReelect(ctx, e.exec, false, &riverdriver.LeaderElectParams{ LeaderID: e.config.ClientID, + Name: e.config.Domain, Now: e.Time.NowUTCOrNil(), Schema: e.config.Schema, TTL: e.leaderTTL(), @@ -229,7 +234,7 @@ func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error { attempt = 0 - e.Logger.DebugContext(ctx, e.Name+": Leadership bid was unsuccessful (not an error)", "client_id", e.config.ClientID) + e.Logger.DebugContext(ctx, e.Name+": Leadership bid was unsuccessful (not an error)", "client_id", e.config.ClientID, "domain", e.config.Domain) e.testSignals.DeniedLeadership.Signal(struct{}{}) select { @@ -254,17 +259,17 @@ func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error { func (e *Elector) handleLeadershipNotification(ctx context.Context, topic notifier.NotificationTopic, payload string) { if topic != notifier.NotificationTopicLeadership { // This should not happen unless the notifier is broken. - e.Logger.ErrorContext(ctx, e.Name+": Received unexpected notification", "client_id", e.config.ClientID, "topic", topic, "payload", payload) + e.Logger.ErrorContext(ctx, e.Name+": Received unexpected notification", "client_id", e.config.ClientID, "domain", e.config.Domain, "topic", topic, "payload", payload) return } notification := DBNotification{} if err := json.Unmarshal([]byte(payload), ¬ification); err != nil { - e.Logger.ErrorContext(ctx, e.Name+": Unable to unmarshal leadership notification", "client_id", e.config.ClientID, "err", err) + e.Logger.ErrorContext(ctx, e.Name+": Unable to unmarshal leadership notification", "client_id", e.config.ClientID, "domain", e.config.Domain, "err", err) return } - e.Logger.DebugContext(ctx, e.Name+": Received notification from notifier", "action", notification.Action, "client_id", e.config.ClientID) + e.Logger.DebugContext(ctx, e.Name+": Received notification from notifier", "action", notification.Action, "client_id", e.config.ClientID, "domain", e.config.Domain) // Do an initial context check so in case context is done, it always takes // precedence over sending a leadership notification. @@ -359,7 +364,7 @@ func (e *Elector) keepLeadershipLoop(ctx context.Context) error { case <-e.requestResignChan: // Receive a notification telling current leader to resign. - e.Logger.InfoContext(ctx, e.Name+": Current leader received forced resignation", "client_id", e.config.ClientID) + e.Logger.InfoContext(ctx, e.Name+": Current leader received forced resignation", "client_id", e.config.ClientID, "domain", e.config.Domain) if !timer.Stop() { <-timer.C @@ -383,10 +388,11 @@ func (e *Elector) keepLeadershipLoop(ctx context.Context) error { // Reelect timer expired; attempt reelection below. } - e.Logger.DebugContext(ctx, e.Name+": Current leader attempting reelect", "client_id", e.config.ClientID) + e.Logger.InfoContext(ctx, e.Name+": Current leader received forced resignation", "client_id", e.config.ClientID, "domain", e.config.Domain) reelected, err := attemptElectOrReelect(ctx, e.exec, true, &riverdriver.LeaderElectParams{ LeaderID: e.config.ClientID, + Name: e.config.Domain, Now: e.Time.NowUTCOrNil(), Schema: e.config.Schema, TTL: e.leaderTTL(), @@ -424,7 +430,7 @@ func (e *Elector) keepLeadershipLoop(ctx context.Context) error { // always surrendered in a timely manner so it can be picked up quickly by // another client, even in the event of a cancellation. func (e *Elector) attemptResignLoop(ctx context.Context) { - e.Logger.DebugContext(ctx, e.Name+": Attempting to resign leadership", "client_id", e.config.ClientID) + e.Logger.InfoContext(ctx, e.Name+": Current leader received forced resignation", "client_id", e.config.ClientID, "domain", e.config.Domain) // Make a good faith attempt to resign, even in the presence of errors, but // don't keep hammering if it doesn't work. In case a resignation failure, @@ -469,7 +475,7 @@ func (e *Elector) attemptResign(ctx context.Context, attempt int) error { } if resigned { - e.Logger.DebugContext(ctx, e.Name+": Resigned leadership successfully", "client_id", e.config.ClientID) + e.Logger.DebugContext(ctx, e.Name+": Resigned leadership successfully", "client_id", e.config.ClientID, "domain", e.config.Domain) e.testSignals.ResignedLeadership.Signal(struct{}{}) } @@ -484,6 +490,7 @@ func (e *Elector) errorSlogArgs(err error, attempt int, sleepDuration time.Durat return []any{ slog.Int("attempt", attempt), slog.String("client_id", e.config.ClientID), + slog.String("domain", e.config.Domain), slog.String("err", err.Error()), slog.String("sleep_duration", sleepDuration.String()), } diff --git a/internal/leadership/elector_test.go b/internal/leadership/elector_test.go index 62aba114..c9b8210b 100644 --- a/internal/leadership/elector_test.go +++ b/internal/leadership/elector_test.go @@ -422,6 +422,7 @@ func TestAttemptElectOrReelect(t *testing.T) { elected, err := attemptElectOrReelect(ctx, bundle.exec, false, &riverdriver.LeaderElectParams{ LeaderID: clientID, + Name: DomainDefault, TTL: leaderTTL, Schema: "", }) @@ -451,6 +452,7 @@ func TestAttemptElectOrReelect(t *testing.T) { // the transaction. elected, err := attemptElectOrReelect(ctx, bundle.exec, true, &riverdriver.LeaderElectParams{ LeaderID: clientID, + Name: DomainDefault, TTL: 30 * time.Second, Schema: "", }) @@ -478,6 +480,7 @@ func TestAttemptElectOrReelect(t *testing.T) { elected, err := attemptElectOrReelect(ctx, bundle.exec, true, &riverdriver.LeaderElectParams{ LeaderID: "different-client-id", + Name: DomainDefault, TTL: leaderTTL, Schema: "", }) @@ -493,6 +496,46 @@ func TestAttemptElectOrReelect(t *testing.T) { require.NoError(t, err) require.Equal(t, leader.ExpiresAt, updatedLeader.ExpiresAt) }) + + t.Run("MultipleDomains", func(t *testing.T) { + t.Parallel() + + bundle := setup(t) + + elected, err := attemptElectOrReelect(ctx, bundle.exec, true, &riverdriver.LeaderElectParams{ + LeaderID: clientID, + Name: DomainDefault, + Schema: "", + }) + require.NoError(t, err) + require.True(t, elected) + + elected, err = attemptElectOrReelect(ctx, bundle.exec, true, &riverdriver.LeaderElectParams{ + LeaderID: clientID, + Name: "domain2", + Schema: "", + }) + require.NoError(t, err) + require.True(t, elected) + + // Second election on a domain where we have a leader fails. + elected, err = attemptElectOrReelect(ctx, bundle.exec, true, &riverdriver.LeaderElectParams{ + LeaderID: "other-client-id", + Name: DomainDefault, + Schema: "", + }) + require.NoError(t, err) + require.False(t, elected) + + // And same in the alternate domain. + elected, err = attemptElectOrReelect(ctx, bundle.exec, true, &riverdriver.LeaderElectParams{ + LeaderID: "other-client-id", + Name: "domain2", + Schema: "", + }) + require.NoError(t, err) + require.False(t, elected) + }) } func TestElectorHandleLeadershipNotification(t *testing.T) { diff --git a/internal/maintenance/job_cleaner.go b/internal/maintenance/job_cleaner.go index f3216561..7a4672c0 100644 --- a/internal/maintenance/job_cleaner.go +++ b/internal/maintenance/job_cleaner.go @@ -56,6 +56,10 @@ type JobCleanerConfig struct { // QueuesExcluded are queues that'll be excluded from cleaning. QueuesExcluded []string + // QueuesIncluded are queues that'll be included in cleaning. If set, only + // these queues will be cleaned. If nil, all queues are cleaned. + QueuesIncluded []string + // Schema where River tables are located. Empty string omits schema, causing // Postgres to default to `search_path`. Schema string @@ -79,6 +83,12 @@ func (c *JobCleanerConfig) mustValidate() *JobCleanerConfig { if c.Interval <= 0 { panic("JobCleanerConfig.Interval must be above zero") } + if c.QueuesExcluded != nil && len(c.QueuesExcluded) == 0 { + panic("JobCleanerConfig.QueuesExcluded should be either nil or a non-empty slice") + } + if c.QueuesIncluded != nil && len(c.QueuesIncluded) == 0 { + panic("JobCleanerConfig.QueuesIncluded should be either nil or a non-empty slice") + } if c.Timeout <= 0 { panic("JobCleanerConfig.Timeout must be above zero") } @@ -117,6 +127,7 @@ func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, e CompletedJobRetentionPeriod: cmp.Or(config.CompletedJobRetentionPeriod, riversharedmaintenance.CompletedJobRetentionPeriodDefault), DiscardedJobRetentionPeriod: cmp.Or(config.DiscardedJobRetentionPeriod, riversharedmaintenance.DiscardedJobRetentionPeriodDefault), QueuesExcluded: config.QueuesExcluded, + QueuesIncluded: config.QueuesIncluded, Interval: cmp.Or(config.Interval, riversharedmaintenance.JobCleanerIntervalDefault), Schema: config.Schema, Timeout: cmp.Or(config.Timeout, riversharedmaintenance.JobCleanerTimeoutDefault), @@ -205,6 +216,7 @@ func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, err DiscardedFinalizedAtHorizon: time.Now().Add(-s.Config.DiscardedJobRetentionPeriod), Max: s.batchSize(), QueuesExcluded: s.Config.QueuesExcluded, + QueuesIncluded: s.Config.QueuesIncluded, Schema: s.Config.Schema, }) if err != nil { diff --git a/internal/maintenance/job_cleaner_test.go b/internal/maintenance/job_cleaner_test.go index 96ba0e1b..d55db8fd 100644 --- a/internal/maintenance/job_cleaner_test.go +++ b/internal/maintenance/job_cleaner_test.go @@ -328,7 +328,7 @@ func TestJobCleaner(t *testing.T) { require.ErrorIs(t, err, rivertype.ErrNotFound) }) - t.Run("OmmittedQueues", func(t *testing.T) { + t.Run("QueuesExcluded", func(t *testing.T) { t.Parallel() cleaner, bundle := setup(t) @@ -338,24 +338,24 @@ func TestJobCleaner(t *testing.T) { completedJob = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour))}) discardedJob = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateDiscarded), FinalizedAt: ptrutil.Ptr(bundle.discardedDeleteHorizon.Add(-1 * time.Hour))}) - omittedQueue1 = "omitted1" - omittedQueue2 = "omitted1" + excludedQueue1 = "queue1" + excludedQueue2 = "queue2" - // Not deleted because in an omitted queue. - omittedQueueJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour)), Queue: &omittedQueue1, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) - omittedQueueJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour)), Queue: &omittedQueue2, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + // Not deleted because in an excluded queue. + excludedQueueJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour)), Queue: &excludedQueue1, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + excludedQueueJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour)), Queue: &excludedQueue2, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) ) - cleaner.Config.QueuesExcluded = []string{omittedQueue1, omittedQueue2} + cleaner.Config.QueuesExcluded = []string{excludedQueue1, excludedQueue2} require.NoError(t, cleaner.Start(ctx)) cleaner.TestSignals.DeletedBatch.WaitOrTimeout() var err error - _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: omittedQueueJob1.ID, Schema: cleaner.Config.Schema}) + _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: excludedQueueJob1.ID, Schema: cleaner.Config.Schema}) require.NoError(t, err) - _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: omittedQueueJob2.ID, Schema: cleaner.Config.Schema}) + _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: excludedQueueJob2.ID, Schema: cleaner.Config.Schema}) require.NoError(t, err) _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: cancelledJob.ID, Schema: cleaner.Config.Schema}) @@ -366,6 +366,41 @@ func TestJobCleaner(t *testing.T) { require.ErrorIs(t, err, rivertype.ErrNotFound) }) + t.Run("QueuesIncluded", func(t *testing.T) { + t.Parallel() + + cleaner, bundle := setup(t) + + var ( + // Not deleted because not in an included queue. + notIncludedJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(bundle.cancelledDeleteHorizon.Add(-1 * time.Hour))}) + notIncludedJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour))}) + + includedQueue1 = "queue1" + includedQueue2 = "queue2" + + includedQueueJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour)), Queue: &includedQueue1, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + includedQueueJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour)), Queue: &includedQueue2, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + ) + + cleaner.Config.QueuesIncluded = []string{includedQueue1, includedQueue2} + + require.NoError(t, cleaner.Start(ctx)) + + cleaner.TestSignals.DeletedBatch.WaitOrTimeout() + + var err error + _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: includedQueueJob1.ID, Schema: cleaner.Config.Schema}) + require.ErrorIs(t, err, rivertype.ErrNotFound) + _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: includedQueueJob2.ID, Schema: cleaner.Config.Schema}) + require.ErrorIs(t, err, rivertype.ErrNotFound) + + _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: notIncludedJob1.ID, Schema: cleaner.Config.Schema}) + require.NoError(t, err) + _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: notIncludedJob2.ID, Schema: cleaner.Config.Schema}) + require.NoError(t, err) + }) + t.Run("ReducedBatchSizeBreakerTrips", func(t *testing.T) { t.Parallel() diff --git a/internal/maintenance/job_rescuer.go b/internal/maintenance/job_rescuer.go index c7353b36..0547ea0a 100644 --- a/internal/maintenance/job_rescuer.go +++ b/internal/maintenance/job_rescuer.go @@ -50,6 +50,11 @@ type JobRescuerConfig struct { // Interval is the amount of time to wait between runs of the rescuer. Interval time.Duration + // QueuesIncluded are queues that'll be included when considering jobs to + // rescue. If set, only these queues will be rescued. If nil, jobs in all + // queues are rescued. + QueuesIncluded []string + // RescueAfter is the amount of time for a job to be active before it is // considered stuck and should be rescued. RescueAfter time.Duration @@ -70,6 +75,9 @@ func (c *JobRescuerConfig) mustValidate() *JobRescuerConfig { if c.Interval <= 0 { panic("RescuerConfig.Interval must be above zero") } + if c.QueuesIncluded != nil && len(c.QueuesIncluded) == 0 { + panic("JobCleanerConfig.QueuesIncluded should be either nil or a non-empty slice") + } if c.RescueAfter <= 0 { panic("RescuerConfig.JobDuration must be above zero") } @@ -109,6 +117,7 @@ func NewRescuer(archetype *baseservice.Archetype, config *JobRescuerConfig, exec BatchSizes: batchSizes, ClientRetryPolicy: config.ClientRetryPolicy, Interval: cmp.Or(config.Interval, JobRescuerIntervalDefault), + QueuesIncluded: config.QueuesIncluded, RescueAfter: cmp.Or(config.RescueAfter, JobRescuerRescueAfterDefault), Schema: config.Schema, WorkUnitFactoryFunc: config.WorkUnitFactoryFunc, @@ -280,9 +289,10 @@ func (s *JobRescuer) getStuckJobs(ctx context.Context) ([]*rivertype.JobRow, err stuckHorizon := time.Now().Add(-s.Config.RescueAfter) return s.exec.JobGetStuck(ctx, &riverdriver.JobGetStuckParams{ - Max: s.batchSize(), - Schema: s.Config.Schema, - StuckHorizon: stuckHorizon, + Max: s.batchSize(), + QueuesIncluded: s.Config.QueuesIncluded, + Schema: s.Config.Schema, + StuckHorizon: stuckHorizon, }) } diff --git a/internal/maintenance/job_rescuer_test.go b/internal/maintenance/job_rescuer_test.go index d1f44ffd..71300515 100644 --- a/internal/maintenance/job_rescuer_test.go +++ b/internal/maintenance/job_rescuer_test.go @@ -405,4 +405,42 @@ func TestJobRescuer(t *testing.T) { } } }) + + t.Run("QueuesIncluded", func(t *testing.T) { + t.Parallel() + + rescuer, bundle := setup(t) + + var ( + notIncludedJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)}) + notIncludedJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)}) + + includedQueue1 = "queue1" + includedQueue2 = "queue2" + + includedJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5), Queue: &includedQueue1}) + includedJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Minute)), MaxAttempts: ptrutil.Ptr(5), Queue: &includedQueue2}) + ) + + rescuer.Config.QueuesIncluded = []string{includedQueue1, includedQueue2} + + require.NoError(t, rescuer.Start(ctx)) + + rescuer.TestSignals.FetchedBatch.WaitOrTimeout() + rescuer.TestSignals.UpdatedBatch.WaitOrTimeout() + + includedJob1After, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: includedJob1.ID, Schema: rescuer.Config.Schema}) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRetryable, includedJob1After.State) + includedJob2After, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: includedJob2.ID, Schema: rescuer.Config.Schema}) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRetryable, includedJob2After.State) + + notIncludedJob1After, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: notIncludedJob1.ID, Schema: rescuer.Config.Schema}) + require.NoError(t, err) + require.Equal(t, notIncludedJob1.State, notIncludedJob1After.State) // not rescued + notIncludedJob2After, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: notIncludedJob2.ID, Schema: rescuer.Config.Schema}) + require.NoError(t, err) + require.Equal(t, notIncludedJob2.State, notIncludedJob2After.State) // not rescued + }) } diff --git a/internal/maintenance/job_scheduler.go b/internal/maintenance/job_scheduler.go index 00adcee4..2f00453d 100644 --- a/internal/maintenance/job_scheduler.go +++ b/internal/maintenance/job_scheduler.go @@ -51,6 +51,10 @@ type JobSchedulerConfig struct { // where jobs were scheduled. NotifyInsert NotifyInsertFunc + // QueuesIncluded are queues that'll be included in scheduling. If set, + // only these queues will be scheduled. If nil, all queues are scheduled. + QueuesIncluded []string + // Schema where River tables are located. Empty string omits schema, causing // Postgres to default to `search_path`. Schema string @@ -59,11 +63,14 @@ type JobSchedulerConfig struct { func (c *JobSchedulerConfig) mustValidate() *JobSchedulerConfig { c.MustValidate() + if c.Default <= 0 { + panic("SchedulerConfig.Limit must be above zero") + } if c.Interval <= 0 { panic("SchedulerConfig.Interval must be above zero") } - if c.Default <= 0 { - panic("SchedulerConfig.Limit must be above zero") + if c.QueuesIncluded != nil && len(c.QueuesIncluded) == 0 { + panic("JobSchedulerConfig.QueuesIncluded should be either nil or a non-empty slice") } return c @@ -77,10 +84,10 @@ type JobScheduler struct { startstop.BaseStartStop // exported for test purposes + Config *JobSchedulerConfig TestSignals JobSchedulerTestSignals - config *JobSchedulerConfig - exec riverdriver.Executor + exec riverdriver.Executor // Circuit breaker that tracks consecutive timeout failures from the central // query. The query starts by using the full/default batch size, but after @@ -95,11 +102,12 @@ func NewJobScheduler(archetype *baseservice.Archetype, config *JobSchedulerConfi batchSizes := config.WithDefaults() return baseservice.Init(archetype, &JobScheduler{ - config: (&JobSchedulerConfig{ - BatchSizes: batchSizes, - Interval: cmp.Or(config.Interval, JobSchedulerIntervalDefault), - NotifyInsert: config.NotifyInsert, - Schema: config.Schema, + Config: (&JobSchedulerConfig{ + BatchSizes: batchSizes, + Interval: cmp.Or(config.Interval, JobSchedulerIntervalDefault), + NotifyInsert: config.NotifyInsert, + QueuesIncluded: config.QueuesIncluded, + Schema: config.Schema, }).mustValidate(), exec: exec, reducedBatchSizeBreaker: riversharedmaintenance.ReducedBatchSizeBreaker(batchSizes), @@ -121,7 +129,7 @@ func (s *JobScheduler) Start(ctx context.Context) error { //nolint:dupl s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStarted) defer s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStopped) - ticker := timeutil.NewTickerWithInitialTick(ctx, s.config.Interval) + ticker := timeutil.NewTickerWithInitialTick(ctx, s.Config.Interval) for { select { case <-ctx.Done(): @@ -150,9 +158,9 @@ func (s *JobScheduler) Start(ctx context.Context) error { //nolint:dupl func (s *JobScheduler) batchSize() int { if s.reducedBatchSizeBreaker.Open() { - return s.config.Reduced + return s.Config.Reduced } - return s.config.Default + return s.Config.Default } type schedulerRunOnceResult struct { @@ -175,12 +183,13 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er defer dbutil.RollbackWithoutCancel(ctx, execTx) now := s.Time.NowUTC() - nowWithLookAhead := now.Add(s.config.Interval) + nowWithLookAhead := now.Add(s.Config.Interval) scheduledJobResults, err := execTx.JobSchedule(ctx, &riverdriver.JobScheduleParams{ - Max: s.batchSize(), - Now: &nowWithLookAhead, - Schema: s.config.Schema, + Max: s.batchSize(), + Now: &nowWithLookAhead, + QueuesIncluded: s.Config.QueuesIncluded, + Schema: s.Config.Schema, }) if err != nil { return 0, fmt.Errorf("error scheduling jobs: %w", err) @@ -205,7 +214,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er } if len(queues) > 0 { - if err := s.config.NotifyInsert(ctx, execTx, queues); err != nil { + if err := s.Config.NotifyInsert(ctx, execTx, queues); err != nil { return 0, fmt.Errorf("error notifying insert: %w", err) } s.TestSignals.NotifiedQueues.Signal(queues) diff --git a/internal/maintenance/job_scheduler_test.go b/internal/maintenance/job_scheduler_test.go index 25388042..0ff0713a 100644 --- a/internal/maintenance/job_scheduler_test.go +++ b/internal/maintenance/job_scheduler_test.go @@ -73,21 +73,21 @@ func TestJobScheduler(t *testing.T) { requireJobStateUnchanged := func(t *testing.T, scheduler *JobScheduler, exec riverdriver.Executor, job *rivertype.JobRow) *rivertype.JobRow { t.Helper() - newJob, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: scheduler.config.Schema}) + newJob, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: scheduler.Config.Schema}) require.NoError(t, err) require.Equal(t, job.State, newJob.State) return newJob } requireJobStateAvailable := func(t *testing.T, scheduler *JobScheduler, exec riverdriver.Executor, job *rivertype.JobRow) *rivertype.JobRow { t.Helper() - newJob, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: scheduler.config.Schema}) + newJob, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: scheduler.Config.Schema}) require.NoError(t, err) require.Equal(t, rivertype.JobStateAvailable, newJob.State) return newJob } requireJobStateDiscardedWithMeta := func(t *testing.T, scheduler *JobScheduler, exec riverdriver.Executor, job *rivertype.JobRow) *rivertype.JobRow { t.Helper() - newJob, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: scheduler.config.Schema}) + newJob, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: scheduler.Config.Schema}) require.NoError(t, err) require.Equal(t, rivertype.JobStateDiscarded, newJob.State) require.NotNil(t, newJob.FinalizedAt) @@ -100,9 +100,9 @@ func TestJobScheduler(t *testing.T) { scheduler := NewJobScheduler(riversharedtest.BaseServiceArchetype(t), &JobSchedulerConfig{}, nil) - require.Equal(t, JobSchedulerIntervalDefault, scheduler.config.Interval) - require.Equal(t, riversharedmaintenance.BatchSizeDefault, scheduler.config.Default) - require.Equal(t, riversharedmaintenance.BatchSizeReduced, scheduler.config.Reduced) + require.Equal(t, JobSchedulerIntervalDefault, scheduler.Config.Interval) + require.Equal(t, riversharedmaintenance.BatchSizeDefault, scheduler.Config.Default) + require.Equal(t, riversharedmaintenance.BatchSizeReduced, scheduler.Config.Reduced) }) t.Run("StartStopStress", func(t *testing.T) { @@ -130,7 +130,7 @@ func TestJobScheduler(t *testing.T) { scheduledJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(-1 * time.Hour))}) scheduledJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))}) - scheduledJob3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(scheduler.config.Interval - time.Millisecond))}) // won't be scheduled + scheduledJob3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(scheduler.Config.Interval - time.Millisecond))}) // won't be scheduled scheduledJob4 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(30 * time.Second))}) // won't be scheduled retryableJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRetryable), ScheduledAt: ptrutil.Ptr(now.Add(-1 * time.Hour))}) @@ -205,13 +205,13 @@ func TestJobScheduler(t *testing.T) { t.Parallel() scheduler, bundle := setupTx(t) - scheduler.config.Default = 10 // reduced size for test speed + scheduler.Config.Default = 10 // reduced size for test speed now := time.Now().UTC() // Add one to our chosen batch size to get one extra job and therefore // one extra batch, ensuring that we've tested working multiple. - numJobs := scheduler.config.Default + 1 + numJobs := scheduler.Config.Default + 1 jobs := make([]*rivertype.JobRow, numJobs) @@ -243,7 +243,7 @@ func TestJobScheduler(t *testing.T) { t.Parallel() scheduler, _ := setupTx(t) - scheduler.config.Interval = 1 * time.Microsecond + scheduler.Config.Interval = 1 * time.Microsecond require.NoError(t, scheduler.Start(ctx)) @@ -258,7 +258,7 @@ func TestJobScheduler(t *testing.T) { t.Parallel() scheduler, _ := setupTx(t) - scheduler.config.Interval = time.Minute // should only trigger once for the initial run + scheduler.Config.Interval = time.Minute // should only trigger once for the initial run require.NoError(t, scheduler.Start(ctx)) scheduler.Stop() @@ -268,7 +268,7 @@ func TestJobScheduler(t *testing.T) { t.Parallel() scheduler, _ := setupTx(t) - scheduler.config.Interval = time.Minute // should only trigger once for the initial run + scheduler.Config.Interval = time.Minute // should only trigger once for the initial run ctx, cancelFunc := context.WithCancel(ctx) @@ -287,7 +287,7 @@ func TestJobScheduler(t *testing.T) { t.Parallel() scheduler, bundle := setupTx(t) - scheduler.config.Interval = time.Minute // should only trigger once for the initial run + scheduler.Config.Interval = time.Minute // should only trigger once for the initial run now := time.Now().UTC() job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(-1 * time.Hour))}) @@ -320,8 +320,8 @@ func TestJobScheduler(t *testing.T) { notifyCh := make(chan []string, 10) scheduler, _ := setup(t, &testOpts{exec: exec, schema: schema}) - scheduler.config.Interval = time.Minute // should only trigger once for the initial run - scheduler.config.NotifyInsert = func(ctx context.Context, tx riverdriver.ExecutorTx, queues []string) error { + scheduler.Config.Interval = time.Minute // should only trigger once for the initial run + scheduler.Config.NotifyInsert = func(ctx context.Context, tx riverdriver.ExecutorTx, queues []string) error { notifyCh <- queues return nil } @@ -357,7 +357,7 @@ func TestJobScheduler(t *testing.T) { addJob("other_status_queue", time.Minute, rivertype.JobStateCancelled) // it's cancelled // This one is scheduled in the future, just barely before the next run, so it should // be scheduled but shouldn't trigger a notification: - addJob("queue5", scheduler.config.Interval-time.Millisecond, rivertype.JobStateRetryable) + addJob("queue5", scheduler.Config.Interval-time.Millisecond, rivertype.JobStateRetryable) // Run the scheduler and wait for it to execute once: require.NoError(t, scheduler.Start(ctx)) @@ -439,4 +439,35 @@ func TestJobScheduler(t *testing.T) { } } }) + + t.Run("QueuesIncluded", func(t *testing.T) { + t.Parallel() + + scheduler, bundle := setupTx(t) + + var ( + now = time.Now().UTC() + + notIncludedJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(-1 * time.Hour))}) + notIncludedJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))}) + + includedQueue1 = "queue1" + includedQueue2 = "queue2" + + includedJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: &includedQueue1, State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(-1 * time.Hour))}) + includedJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: &includedQueue2, State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))}) + ) + + scheduler.Config.QueuesIncluded = []string{includedQueue1, includedQueue2} + + require.NoError(t, scheduler.Start(ctx)) + + scheduler.TestSignals.ScheduledBatch.WaitOrTimeout() + + requireJobStateUnchanged(t, scheduler, bundle.exec, notIncludedJob1) + requireJobStateUnchanged(t, scheduler, bundle.exec, notIncludedJob2) + + requireJobStateAvailable(t, scheduler, bundle.exec, includedJob1) + requireJobStateAvailable(t, scheduler, bundle.exec, includedJob2) + }) } diff --git a/internal/maintenance/queue_cleaner.go b/internal/maintenance/queue_cleaner.go index f64e97db..2abb328c 100644 --- a/internal/maintenance/queue_cleaner.go +++ b/internal/maintenance/queue_cleaner.go @@ -41,6 +41,10 @@ type QueueCleanerConfig struct { // Interval is the amount of time to wait between runs of the cleaner. Interval time.Duration + // QueuesIncluded are queues that'll be included in cleaning. If set, only + // these queues will be cleaned. If nil, all queues are cleaned. + QueuesIncluded []string + // RetentionPeriod is the amount of time to keep queues around before they're // removed. RetentionPeriod time.Duration @@ -56,6 +60,9 @@ func (c *QueueCleanerConfig) mustValidate() *QueueCleanerConfig { if c.Interval <= 0 { panic("QueueCleanerConfig.Interval must be above zero") } + if c.QueuesIncluded != nil && len(c.QueuesIncluded) == 0 { + panic("QueueCleanerConfig.QueuesIncluded should be either nil or a non-empty slice") + } if c.RetentionPeriod <= 0 { panic("QueueCleanerConfig.RetentionPeriod must be above zero") } @@ -91,6 +98,7 @@ func NewQueueCleaner(archetype *baseservice.Archetype, config *QueueCleanerConfi Config: (&QueueCleanerConfig{ BatchSizes: batchSizes, Interval: cmp.Or(config.Interval, queueCleanerIntervalDefault), + QueuesIncluded: config.QueuesIncluded, RetentionPeriod: cmp.Or(config.RetentionPeriod, QueueRetentionPeriodDefault), Schema: config.Schema, }).mustValidate(), @@ -163,6 +171,7 @@ func (s *QueueCleaner) runOnce(ctx context.Context) (*queueCleanerRunOnceResult, queuesDeleted, err := s.exec.QueueDeleteExpired(ctx, &riverdriver.QueueDeleteExpiredParams{ Max: s.batchSize(), + QueuesIncluded: s.Config.QueuesIncluded, Schema: s.Config.Schema, UpdatedAtHorizon: time.Now().Add(-s.Config.RetentionPeriod), }) diff --git a/internal/maintenance/queue_cleaner_test.go b/internal/maintenance/queue_cleaner_test.go index 06058e95..6e2b9f0e 100644 --- a/internal/maintenance/queue_cleaner_test.go +++ b/internal/maintenance/queue_cleaner_test.go @@ -106,12 +106,12 @@ func TestQueueCleaner(t *testing.T) { Name: queue3.Name, Schema: cleaner.Config.Schema, }) - require.ErrorIs(t, err, rivertype.ErrNotFound) // still there + require.ErrorIs(t, err, rivertype.ErrNotFound) _, err = bundle.exec.QueueGet(ctx, &riverdriver.QueueGetParams{ Name: queue4.Name, Schema: cleaner.Config.Schema, }) - require.ErrorIs(t, err, rivertype.ErrNotFound) // still there + require.ErrorIs(t, err, rivertype.ErrNotFound) _, err = bundle.exec.QueueGet(ctx, &riverdriver.QueueGetParams{ Name: queue5.Name, Schema: cleaner.Config.Schema, @@ -302,4 +302,37 @@ func TestQueueCleaner(t *testing.T) { } } }) + + t.Run("QueuesIncluded", func(t *testing.T) { + t.Parallel() + + cleaner, bundle := setup(t) + + var ( + now = time.Now() + + notIncludedQueue1 = testfactory.Queue(ctx, t, bundle.exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-25 * time.Hour))}) + notIncludedQueue2 = testfactory.Queue(ctx, t, bundle.exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-26 * time.Hour))}) + + includedQueue1 = testfactory.Queue(ctx, t, bundle.exec, &testfactory.QueueOpts{Name: ptrutil.Ptr("included1"), UpdatedAt: ptrutil.Ptr(now.Add(-25 * time.Hour))}) + includedQueue2 = testfactory.Queue(ctx, t, bundle.exec, &testfactory.QueueOpts{Name: ptrutil.Ptr("included2"), UpdatedAt: ptrutil.Ptr(now.Add(-26 * time.Hour))}) + ) + + cleaner.Config.QueuesIncluded = []string{includedQueue1.Name, includedQueue2.Name} + + require.NoError(t, cleaner.Start(ctx)) + + cleaner.TestSignals.DeletedBatch.WaitOrTimeout() + + var err error + _, err = bundle.exec.QueueGet(ctx, &riverdriver.QueueGetParams{Name: notIncludedQueue1.Name, Schema: cleaner.Config.Schema}) + require.NoError(t, err) // still there + _, err = bundle.exec.QueueGet(ctx, &riverdriver.QueueGetParams{Name: notIncludedQueue2.Name, Schema: cleaner.Config.Schema}) + require.NoError(t, err) // still there + + _, err = bundle.exec.QueueGet(ctx, &riverdriver.QueueGetParams{Name: includedQueue1.Name, Schema: cleaner.Config.Schema}) + require.ErrorIs(t, err, rivertype.ErrNotFound) + _, err = bundle.exec.QueueGet(ctx, &riverdriver.QueueGetParams{Name: includedQueue2.Name, Schema: cleaner.Config.Schema}) + require.ErrorIs(t, err, rivertype.ErrNotFound) + }) } diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index aed92b7e..8209ed85 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -382,6 +382,7 @@ type JobDeleteBeforeParams struct { DiscardedDoDelete bool DiscardedFinalizedAtHorizon time.Time Max int + Queues []string QueuesExcluded []string QueuesIncluded []string Schema string @@ -421,9 +422,10 @@ type JobGetByKindManyParams struct { } type JobGetStuckParams struct { - Max int - Schema string - StuckHorizon time.Time + Max int + QueuesIncluded []string + Schema string + StuckHorizon time.Time } type JobInsertFastParams struct { @@ -514,9 +516,10 @@ type JobRetryParams struct { } type JobScheduleParams struct { - Max int - Now *time.Time - Schema string + Max int + Now *time.Time + QueuesIncluded []string + Schema string } type JobScheduleResult struct { @@ -688,6 +691,7 @@ type LeaderInsertParams struct { ElectedAt *time.Time ExpiresAt *time.Time LeaderID string + Name string Now *time.Time Schema string TTL time.Duration @@ -695,6 +699,7 @@ type LeaderInsertParams struct { type LeaderElectParams struct { LeaderID string + Name string Now *time.Time Schema string TTL time.Duration @@ -783,6 +788,7 @@ type QueueCreateOrSetUpdatedAtParams struct { type QueueDeleteExpiredParams struct { Max int + QueuesIncluded []string Schema string UpdatedAtHorizon time.Time } diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index 8cc6c9a2..36806dd7 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -610,17 +610,22 @@ SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finali FROM /* TEMPLATE: schema */river_job WHERE state = 'running' AND attempted_at < $1::timestamptz + AND ( + $2::text[] IS NULL + OR queue = any($2) + ) ORDER BY id -LIMIT $2 +LIMIT $3 ` type JobGetStuckParams struct { - StuckHorizon time.Time - Max int32 + StuckHorizon time.Time + QueuesIncluded []string + Max int32 } func (q *Queries) JobGetStuck(ctx context.Context, db DBTX, arg *JobGetStuckParams) ([]*RiverJob, error) { - rows, err := db.QueryContext(ctx, jobGetStuck, arg.StuckHorizon, arg.Max) + rows, err := db.QueryContext(ctx, jobGetStuck, arg.StuckHorizon, pq.Array(arg.QueuesIncluded), arg.Max) if err != nil { return nil, err } @@ -1336,12 +1341,16 @@ WITH jobs_to_schedule AS ( state IN ('retryable', 'scheduled') AND priority >= 0 AND queue IS NOT NULL - AND scheduled_at <= coalesce($1::timestamptz, now()) + AND ( + $1::text[] IS NULL + OR queue = any($1) + ) + AND scheduled_at <= coalesce($2::timestamptz, now()) ORDER BY priority, scheduled_at, id - LIMIT $2::bigint + LIMIT $3::bigint FOR UPDATE ), jobs_with_rownum AS ( @@ -1388,7 +1397,7 @@ updated_jobs AS ( UPDATE /* TEMPLATE: schema */river_job SET state = job_updates.new_state, - finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN coalesce($1::timestamptz, now()) + finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN coalesce($2::timestamptz, now()) ELSE river_job.finalized_at END, metadata = CASE WHEN job_updates.metadata_do_update THEN river_job.metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb ELSE river_job.metadata END @@ -1406,8 +1415,9 @@ JOIN updated_jobs ON river_job.id = updated_jobs.id ` type JobScheduleParams struct { - Now *time.Time - Max int64 + QueuesIncluded []string + Now *time.Time + Max int64 } type JobScheduleRow struct { @@ -1416,7 +1426,7 @@ type JobScheduleRow struct { } func (q *Queries) JobSchedule(ctx context.Context, db DBTX, arg *JobScheduleParams) ([]*JobScheduleRow, error) { - rows, err := db.QueryContext(ctx, jobSchedule, arg.Now, arg.Max) + rows, err := db.QueryContext(ctx, jobSchedule, pq.Array(arg.QueuesIncluded), arg.Now, arg.Max) if err != nil { return nil, err } diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go index 0559f59e..02c3cb5c 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go @@ -15,12 +15,14 @@ const leaderAttemptElect = `-- name: LeaderAttemptElect :execrows INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( $1, coalesce($2::timestamptz, now()), -- @ttl is inserted as as seconds rather than a duration because ` + "`" + `lib/pq` + "`" + ` doesn't support the latter - coalesce($2::timestamptz, now()) + make_interval(secs => $3) + coalesce($2::timestamptz, now()) + make_interval(secs => $3), + $4 ) ON CONFLICT (name) DO NOTHING @@ -30,10 +32,16 @@ type LeaderAttemptElectParams struct { LeaderID string Now *time.Time TTL float64 + Name string } func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (int64, error) { - result, err := db.ExecContext(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) + result, err := db.ExecContext(ctx, leaderAttemptElect, + arg.LeaderID, + arg.Now, + arg.TTL, + arg.Name, + ) if err != nil { return 0, err } @@ -44,11 +52,13 @@ const leaderAttemptReelect = `-- name: LeaderAttemptReelect :execrows INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( $1, coalesce($2::timestamptz, now()), - coalesce($2::timestamptz, now()) + make_interval(secs => $3) + coalesce($2::timestamptz, now()) + make_interval(secs => $3), + $4 ) ON CONFLICT (name) DO UPDATE SET @@ -61,10 +71,16 @@ type LeaderAttemptReelectParams struct { LeaderID string Now *time.Time TTL float64 + Name string } func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (int64, error) { - result, err := db.ExecContext(ctx, leaderAttemptReelect, arg.LeaderID, arg.Now, arg.TTL) + result, err := db.ExecContext(ctx, leaderAttemptReelect, + arg.LeaderID, + arg.Now, + arg.TTL, + arg.Name, + ) if err != nil { return 0, err } @@ -105,11 +121,13 @@ const leaderInsert = `-- name: LeaderInsert :one INSERT INTO /* TEMPLATE: schema */river_leader( elected_at, expires_at, - leader_id + leader_id, + name ) VALUES ( coalesce($1::timestamptz, coalesce($2::timestamptz, now())), coalesce($3::timestamptz, coalesce($2::timestamptz, now()) + make_interval(secs => $4)), - $5 + $5, + $6 ) RETURNING elected_at, expires_at, leader_id, name ` @@ -119,6 +137,7 @@ type LeaderInsertParams struct { ExpiresAt *time.Time TTL float64 LeaderID string + Name string } func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertParams) (*RiverLeader, error) { @@ -128,6 +147,7 @@ func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertPa arg.ExpiresAt, arg.TTL, arg.LeaderID, + arg.Name, ) var i RiverLeader err := row.Scan( diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_queue.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_queue.sql.go index bd432f9c..76e1f8d6 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_queue.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_queue.sql.go @@ -59,24 +59,36 @@ func (q *Queries) QueueCreateOrSetUpdatedAt(ctx context.Context, db DBTX, arg *Q } const queueDeleteExpired = `-- name: QueueDeleteExpired :many -DELETE FROM /* TEMPLATE: schema */river_queue -WHERE name IN ( - SELECT name - FROM /* TEMPLATE: schema */river_queue - WHERE river_queue.updated_at < $1 - ORDER BY name ASC - LIMIT $2::bigint +WITH deleted_queues AS ( + DELETE FROM /* TEMPLATE: schema */river_queue + WHERE name IN ( + SELECT name + FROM /* TEMPLATE: schema */river_queue + WHERE river_queue.updated_at < $1 + AND ( + $2::text[] IS NULL + OR name = any($2) + ) + ORDER BY name ASC + LIMIT $3::bigint + ) + RETURNING name, created_at, metadata, paused_at, updated_at ) -RETURNING name, created_at, metadata, paused_at, updated_at +SELECT name, created_at, metadata, paused_at, updated_at +FROM /* TEMPLATE: schema */river_queue +WHERE name IN (SELECT name FROM deleted_queues) +ORDER BY name ASC ` type QueueDeleteExpiredParams struct { UpdatedAtHorizon time.Time + QueuesIncluded []string Max int64 } +// Uses a CTE only to guarantee return order. func (q *Queries) QueueDeleteExpired(ctx context.Context, db DBTX, arg *QueueDeleteExpiredParams) ([]*RiverQueue, error) { - rows, err := db.QueryContext(ctx, queueDeleteExpired, arg.UpdatedAtHorizon, arg.Max) + rows, err := db.QueryContext(ctx, queueDeleteExpired, arg.UpdatedAtHorizon, pq.Array(arg.QueuesIncluded), arg.Max) if err != nil { return nil, err } diff --git a/riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.down.sql b/riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.down.sql new file mode 100644 index 00000000..70d21044 --- /dev/null +++ b/riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.down.sql @@ -0,0 +1,3 @@ +ALTER TABLE /* TEMPLATE: schema */river_leader + DROP CONSTRAINT name_length, + ADD CONSTRAINT name_length CHECK (name = 'default'); \ No newline at end of file diff --git a/riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.up.sql b/riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.up.sql new file mode 100644 index 00000000..b1721078 --- /dev/null +++ b/riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.up.sql @@ -0,0 +1,3 @@ +ALTER TABLE /* TEMPLATE: schema */river_leader + DROP CONSTRAINT name_length, + ADD CONSTRAINT name_length CHECK (char_length(name) > 0 AND char_length(name) < 128); \ No newline at end of file diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index 4ecf3a90..83192078 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -331,8 +331,9 @@ func (e *Executor) JobGetByKindMany(ctx context.Context, params *riverdriver.Job func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetStuckParams) ([]*rivertype.JobRow, error) { jobs, err := dbsqlc.New().JobGetStuck(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetStuckParams{ - Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec - StuckHorizon: params.StuckHorizon, + Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec + QueuesIncluded: params.QueuesIncluded, + StuckHorizon: params.StuckHorizon, }) if err != nil { return nil, interpretError(err) @@ -593,8 +594,9 @@ func (e *Executor) JobRetry(ctx context.Context, params *riverdriver.JobRetryPar func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobScheduleParams) ([]*riverdriver.JobScheduleResult, error) { scheduleResults, err := dbsqlc.New().JobSchedule(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobScheduleParams{ - Max: int64(params.Max), - Now: params.Now, + Max: int64(params.Max), + Now: params.Now, + QueuesIncluded: params.QueuesIncluded, }) if err != nil { return nil, interpretError(err) @@ -714,6 +716,7 @@ func (e *Executor) JobUpdateFull(ctx context.Context, params *riverdriver.JobUpd func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { numElectionsWon, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ LeaderID: params.LeaderID, + Name: params.Name, Now: params.Now, TTL: params.TTL.Seconds(), }) @@ -726,6 +729,7 @@ func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.L func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { numElectionsWon, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ LeaderID: params.LeaderID, + Name: params.Name, Now: params.Now, TTL: params.TTL.Seconds(), }) @@ -756,6 +760,7 @@ func (e *Executor) LeaderInsert(ctx context.Context, params *riverdriver.LeaderI ElectedAt: params.ElectedAt, ExpiresAt: params.ExpiresAt, LeaderID: params.LeaderID, + Name: params.Name, Now: params.Now, TTL: params.TTL.Seconds(), }) @@ -882,6 +887,7 @@ func (e *Executor) QueueCreateOrSetUpdatedAt(ctx context.Context, params *riverd func (e *Executor) QueueDeleteExpired(ctx context.Context, params *riverdriver.QueueDeleteExpiredParams) ([]string, error) { queues, err := dbsqlc.New().QueueDeleteExpired(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.QueueDeleteExpiredParams{ Max: int64(params.Max), + QueuesIncluded: params.QueuesIncluded, UpdatedAtHorizon: params.UpdatedAtHorizon, }) if err != nil { diff --git a/riverdriver/riverdrivertest/riverdrivertest.go b/riverdriver/riverdrivertest/riverdrivertest.go index 27aade13..f523c054 100644 --- a/riverdriver/riverdrivertest/riverdrivertest.go +++ b/riverdriver/riverdrivertest/riverdrivertest.go @@ -1040,25 +1040,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, t.Run("QueuesIncluded", func(t *testing.T) { t.Parallel() - exec, bundle := setup(ctx, t) - - // I ran into yet another huge sqlc SQLite bug in that when mixing - // normal parameters with a `sqlc.slice` the latter must appear at - // the very end because it'll produce unnamed placeholders (?) - // instead of positional placeholders (?1) like most parameters. The - // trick of putting it at the end works, but only if you have - // exactly one `sqlc.slice` needed. If you need multiple and they - // need to be interspersed with other parameters (like in the case - // of `queues_excluded` and `queues_included`), everything stops - // working real fast. I could have worked around this by breaking - // the SQLite version of this operation into two sqlc queries, but - // since we only expect to need `queues_excluded` on SQLite (and not - // `queues_included` for the foreseeable future), I've just set - // SQLite to not support `queues_included` for the time being. - if bundle.driver.DatabaseName() == databaseNameSQLite { - t.Logf("Skipping JobDeleteBefore with QueuesIncluded test for SQLite") - return - } + exec, _ := setup(ctx, t) var ( //nolint:dupl cancelledJob = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{FinalizedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateCancelled)}) @@ -1557,40 +1539,76 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, t.Run("JobGetStuck", func(t *testing.T) { t.Parallel() - exec, _ := setup(ctx, t) + t.Run("Success", func(t *testing.T) { + t.Parallel() - var ( - horizon = time.Now().UTC() - beforeHorizon = horizon.Add(-1 * time.Minute) - afterHorizon = horizon.Add(1 * time.Minute) - ) + exec, _ := setup(ctx, t) + + var ( + horizon = time.Now().UTC() + beforeHorizon = horizon.Add(-1 * time.Minute) + afterHorizon = horizon.Add(1 * time.Minute) + ) - stuckJob1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) - stuckJob2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) + stuckJob1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) + stuckJob2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) - t.Logf("horizon = %s", horizon) - t.Logf("stuckJob1 = %s", stuckJob1.AttemptedAt) - t.Logf("stuckJob2 = %s", stuckJob2.AttemptedAt) + t.Logf("horizon = %s", horizon) + t.Logf("stuckJob1 = %s", stuckJob1.AttemptedAt) + t.Logf("stuckJob2 = %s", stuckJob2.AttemptedAt) - t.Logf("stuckJob1 full = %s", spew.Sdump(stuckJob1)) + t.Logf("stuckJob1 full = %s", spew.Sdump(stuckJob1)) - // Not returned because we put a maximum of two. - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) + // Not returned because we put a maximum of two. + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) - // Not stuck because not in running state. - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + // Not stuck because not in running state. + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable)}) - // Not stuck because after queried horizon. - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) + // Not stuck because after queried horizon. + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) - // Max two stuck - stuckJobs, err := exec.JobGetStuck(ctx, &riverdriver.JobGetStuckParams{ - Max: 2, - StuckHorizon: horizon, + // Max two stuck + stuckJobs, err := exec.JobGetStuck(ctx, &riverdriver.JobGetStuckParams{ + Max: 2, + StuckHorizon: horizon, + }) + require.NoError(t, err) + require.Equal(t, []int64{stuckJob1.ID, stuckJob2.ID}, + sliceutil.Map(stuckJobs, func(j *rivertype.JobRow) int64 { return j.ID })) + }) + + t.Run("QueuesIncluded", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + var ( + horizon = time.Now().UTC() + beforeHorizon = horizon.Add(-1 * time.Minute) + ) + + var ( + includedQueue1 = "included1" + includedQueue2 = "included2" + + stuckJob1 = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, Queue: &includedQueue1, State: ptrutil.Ptr(rivertype.JobStateRunning)}) + stuckJob2 = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, Queue: &includedQueue2, State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + // Not included because not in an included queue. + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, Queue: ptrutil.Ptr("excluded1"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, Queue: ptrutil.Ptr("excluded2"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) + ) + + stuckJobs, err := exec.JobGetStuck(ctx, &riverdriver.JobGetStuckParams{ + Max: 2, + QueuesIncluded: []string{includedQueue1, includedQueue2}, + StuckHorizon: horizon, + }) + require.NoError(t, err) + require.Equal(t, []int64{stuckJob1.ID, stuckJob2.ID}, + sliceutil.Map(stuckJobs, func(j *rivertype.JobRow) int64 { return j.ID })) }) - require.NoError(t, err) - require.Equal(t, []int64{stuckJob1.ID, stuckJob2.ID}, - sliceutil.Map(stuckJobs, func(j *rivertype.JobRow) int64 { return j.ID })) }) t.Run("JobInsertFastMany", func(t *testing.T) { @@ -2909,6 +2927,37 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, require.Equal(t, rivertype.JobStateDiscarded, updatedJob2.State) require.Equal(t, "scheduler_discarded", gjson.GetBytes(updatedJob2.Metadata, "unique_key_conflict").String()) }) + + t.Run("QueuesIncluded", func(t *testing.T) { + exec, _ := setup(ctx, t) + + var ( + horizon = time.Now() + beforeHorizon = horizon.Add(-1 * time.Minute) + ) + + var ( + includedQueue1 = "included1" + includedQueue2 = "included2" + + job1 = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: &includedQueue1, ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRetryable)}) + job2 = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: &includedQueue2, ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)}) + + // Not scheduled because not in an included queue. + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("excluded1"), ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRetryable)}) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("excluded1"), ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)}) + ) + + // First two scheduled because of limit. + result, err := exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{ + Max: 5, + Now: &horizon, + QueuesIncluded: []string{includedQueue1, includedQueue2}, + }) + require.NoError(t, err) + require.Equal(t, []int64{job1.ID, job2.ID}, + sliceutil.Map(result, func(j *riverdriver.JobScheduleResult) int64 { return j.Job.ID })) + }) }) makeErrPayload := func(t *testing.T, now time.Time) []byte { @@ -3458,6 +3507,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, elected, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ LeaderID: clientID, + Name: "default", Now: &now, TTL: leaderTTL, }) @@ -3481,6 +3531,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, elected, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ LeaderID: "different-client-id", + Name: "default", TTL: leaderTTL, }) require.NoError(t, err) @@ -3501,6 +3552,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, elected, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ LeaderID: clientID, + Name: "default", TTL: leaderTTL, }) require.NoError(t, err) @@ -3525,6 +3577,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, elected, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderElectParams{ LeaderID: clientID, + Name: "default", Now: &now, TTL: leaderTTL, }) @@ -3551,6 +3604,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, // the transaction. elected, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderElectParams{ LeaderID: clientID, + Name: "default", TTL: 30 * time.Second, }) require.NoError(t, err) @@ -3574,6 +3628,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, elected, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderElectParams{ LeaderID: "different-client", + Name: "default", TTL: 30 * time.Second, }) require.NoError(t, err) @@ -3593,6 +3648,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, elected, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderElectParams{ LeaderID: clientID, + Name: "default", TTL: leaderTTL, }) require.NoError(t, err) @@ -3672,6 +3728,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, ElectedAt: &electedAt, ExpiresAt: &expiresAt, LeaderID: clientID, + Name: "default", TTL: leaderTTL, }) require.NoError(t, err) @@ -3687,6 +3744,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, leader, err := exec.LeaderInsert(ctx, &riverdriver.LeaderInsertParams{ LeaderID: clientID, + Name: "default", Now: &now, TTL: leaderTTL, }) @@ -4141,27 +4199,65 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, t.Run("QueueDeleteExpired", func(t *testing.T) { t.Parallel() - exec, _ := setup(ctx, t) + t.Run("Success", func(t *testing.T) { + t.Parallel() - now := time.Now() - _ = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now)}) - queue2 := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-25 * time.Hour))}) - queue3 := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-26 * time.Hour))}) - queue4 := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-48 * time.Hour))}) - _ = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-23 * time.Hour))}) + exec, _ := setup(ctx, t) - horizon := now.Add(-24 * time.Hour) - deletedQueueNames, err := exec.QueueDeleteExpired(ctx, &riverdriver.QueueDeleteExpiredParams{Max: 2, UpdatedAtHorizon: horizon}) - require.NoError(t, err) + var ( + now = time.Now() - // queue2 and queue3 should be deleted, with queue4 being skipped due to max of 2: - require.Equal(t, []string{queue2.Name, queue3.Name}, deletedQueueNames) + _ = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now)}) + queue2 = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-25 * time.Hour))}) + queue3 = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-26 * time.Hour))}) + queue4 = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-48 * time.Hour))}) + _ = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-23 * time.Hour))}) + ) - // Try again, make sure queue4 gets deleted this time: - deletedQueueNames, err = exec.QueueDeleteExpired(ctx, &riverdriver.QueueDeleteExpiredParams{Max: 2, UpdatedAtHorizon: horizon}) - require.NoError(t, err) + horizon := now.Add(-24 * time.Hour) + deletedQueueNames, err := exec.QueueDeleteExpired(ctx, &riverdriver.QueueDeleteExpiredParams{Max: 2, UpdatedAtHorizon: horizon}) + require.NoError(t, err) + + // queue2 and queue3 should be deleted, with queue4 being skipped due to max of 2: + require.Equal(t, []string{queue2.Name, queue3.Name}, deletedQueueNames) + + // Try again, make sure queue4 gets deleted this time: + deletedQueueNames, err = exec.QueueDeleteExpired(ctx, &riverdriver.QueueDeleteExpiredParams{ + Max: 2, + UpdatedAtHorizon: horizon, + }) + require.NoError(t, err) + + require.Equal(t, []string{queue4.Name}, deletedQueueNames) + }) + + t.Run("QueuesIncluded", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) - require.Equal(t, []string{queue4.Name}, deletedQueueNames) + var ( + horizon = time.Now().Add(-24 * time.Hour) + + includedQueue1 = "included1" + includedQueue2 = "included2" + + queue1 = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{Name: &includedQueue1, UpdatedAt: ptrutil.Ptr(horizon.Add(-1 * time.Minute))}) + queue2 = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{Name: &includedQueue2, UpdatedAt: ptrutil.Ptr(horizon.Add(-2 * time.Minute))}) + + // Not included because not in an included queue. + _ = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{Name: ptrutil.Ptr("excluded1"), UpdatedAt: ptrutil.Ptr(horizon.Add(-1 * time.Minute))}) + _ = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{Name: ptrutil.Ptr("excluded2"), UpdatedAt: ptrutil.Ptr(horizon.Add(-2 * time.Minute))}) + ) + + deletedQueueNames, err := exec.QueueDeleteExpired(ctx, &riverdriver.QueueDeleteExpiredParams{ + Max: 5, + QueuesIncluded: []string{includedQueue1, includedQueue2}, + UpdatedAtHorizon: horizon, + }) + require.NoError(t, err) + require.Equal(t, []string{queue1.Name, queue2.Name}, deletedQueueNames) + }) }) t.Run("QueueGet", func(t *testing.T) { diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index 95a2493e..cbefead3 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -259,6 +259,10 @@ SELECT * FROM /* TEMPLATE: schema */river_job WHERE state = 'running' AND attempted_at < @stuck_horizon::timestamptz + AND ( + @queues_included::text[] IS NULL + OR queue = any(@queues_included) + ) ORDER BY id LIMIT @max; @@ -548,6 +552,10 @@ WITH jobs_to_schedule AS ( state IN ('retryable', 'scheduled') AND priority >= 0 AND queue IS NOT NULL + AND ( + @queues_included::text[] IS NULL + OR queue = any(@queues_included) + ) AND scheduled_at <= coalesce(sqlc.narg('now')::timestamptz, now()) ORDER BY priority, diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 53159d08..9a9462f6 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -592,17 +592,22 @@ SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finali FROM /* TEMPLATE: schema */river_job WHERE state = 'running' AND attempted_at < $1::timestamptz + AND ( + $2::text[] IS NULL + OR queue = any($2) + ) ORDER BY id -LIMIT $2 +LIMIT $3 ` type JobGetStuckParams struct { - StuckHorizon time.Time - Max int32 + StuckHorizon time.Time + QueuesIncluded []string + Max int32 } func (q *Queries) JobGetStuck(ctx context.Context, db DBTX, arg *JobGetStuckParams) ([]*RiverJob, error) { - rows, err := db.Query(ctx, jobGetStuck, arg.StuckHorizon, arg.Max) + rows, err := db.Query(ctx, jobGetStuck, arg.StuckHorizon, arg.QueuesIncluded, arg.Max) if err != nil { return nil, err } @@ -1303,12 +1308,16 @@ WITH jobs_to_schedule AS ( state IN ('retryable', 'scheduled') AND priority >= 0 AND queue IS NOT NULL - AND scheduled_at <= coalesce($1::timestamptz, now()) + AND ( + $1::text[] IS NULL + OR queue = any($1) + ) + AND scheduled_at <= coalesce($2::timestamptz, now()) ORDER BY priority, scheduled_at, id - LIMIT $2::bigint + LIMIT $3::bigint FOR UPDATE ), jobs_with_rownum AS ( @@ -1355,7 +1364,7 @@ updated_jobs AS ( UPDATE /* TEMPLATE: schema */river_job SET state = job_updates.new_state, - finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN coalesce($1::timestamptz, now()) + finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN coalesce($2::timestamptz, now()) ELSE river_job.finalized_at END, metadata = CASE WHEN job_updates.metadata_do_update THEN river_job.metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb ELSE river_job.metadata END @@ -1373,8 +1382,9 @@ JOIN updated_jobs ON river_job.id = updated_jobs.id ` type JobScheduleParams struct { - Now *time.Time - Max int64 + QueuesIncluded []string + Now *time.Time + Max int64 } type JobScheduleRow struct { @@ -1383,7 +1393,7 @@ type JobScheduleRow struct { } func (q *Queries) JobSchedule(ctx context.Context, db DBTX, arg *JobScheduleParams) ([]*JobScheduleRow, error) { - rows, err := db.Query(ctx, jobSchedule, arg.Now, arg.Max) + rows, err := db.Query(ctx, jobSchedule, arg.QueuesIncluded, arg.Now, arg.Max) if err != nil { return nil, err } diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql index e2417d86..1955abe6 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql @@ -2,8 +2,11 @@ CREATE UNLOGGED TABLE river_leader( elected_at timestamptz NOT NULL, expires_at timestamptz NOT NULL, leader_id text NOT NULL, - name text PRIMARY KEY DEFAULT 'default' CHECK (name = 'default'), - CONSTRAINT name_length CHECK (name = 'default'), + + -- this would be more aptly called "domain", but left as is for a less + -- invasive migration change + name text PRIMARY KEY DEFAULT 'default' CHECK (char_length(name) > 0 AND char_length(name) < 128), + CONSTRAINT leader_id_length CHECK (char_length(leader_id) > 0 AND char_length(leader_id) < 128) ); @@ -11,12 +14,14 @@ CREATE UNLOGGED TABLE river_leader( INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( @leader_id, coalesce(sqlc.narg('now')::timestamptz, now()), -- @ttl is inserted as as seconds rather than a duration because `lib/pq` doesn't support the latter - coalesce(sqlc.narg('now')::timestamptz, now()) + make_interval(secs => @ttl) + coalesce(sqlc.narg('now')::timestamptz, now()) + make_interval(secs => @ttl), + @name ) ON CONFLICT (name) DO NOTHING; @@ -25,11 +30,13 @@ ON CONFLICT (name) INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( @leader_id, coalesce(sqlc.narg('now')::timestamptz, now()), - coalesce(sqlc.narg('now')::timestamptz, now()) + make_interval(secs => @ttl) + coalesce(sqlc.narg('now')::timestamptz, now()) + make_interval(secs => @ttl), + @name ) ON CONFLICT (name) DO UPDATE SET @@ -49,11 +56,13 @@ FROM /* TEMPLATE: schema */river_leader; INSERT INTO /* TEMPLATE: schema */river_leader( elected_at, expires_at, - leader_id + leader_id, + name ) VALUES ( coalesce(sqlc.narg('elected_at')::timestamptz, coalesce(sqlc.narg('now')::timestamptz, now())), coalesce(sqlc.narg('expires_at')::timestamptz, coalesce(sqlc.narg('now')::timestamptz, now()) + make_interval(secs => @ttl)), - @leader_id + @leader_id, + @name ) RETURNING *; -- name: LeaderResign :execrows diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go index 156807d3..c239177b 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go @@ -16,12 +16,14 @@ const leaderAttemptElect = `-- name: LeaderAttemptElect :execrows INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( $1, coalesce($2::timestamptz, now()), -- @ttl is inserted as as seconds rather than a duration because ` + "`" + `lib/pq` + "`" + ` doesn't support the latter - coalesce($2::timestamptz, now()) + make_interval(secs => $3) + coalesce($2::timestamptz, now()) + make_interval(secs => $3), + $4 ) ON CONFLICT (name) DO NOTHING @@ -31,10 +33,16 @@ type LeaderAttemptElectParams struct { LeaderID string Now *time.Time TTL float64 + Name string } func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (int64, error) { - result, err := db.Exec(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) + result, err := db.Exec(ctx, leaderAttemptElect, + arg.LeaderID, + arg.Now, + arg.TTL, + arg.Name, + ) if err != nil { return 0, err } @@ -45,11 +53,13 @@ const leaderAttemptReelect = `-- name: LeaderAttemptReelect :execrows INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( $1, coalesce($2::timestamptz, now()), - coalesce($2::timestamptz, now()) + make_interval(secs => $3) + coalesce($2::timestamptz, now()) + make_interval(secs => $3), + $4 ) ON CONFLICT (name) DO UPDATE SET @@ -62,10 +72,16 @@ type LeaderAttemptReelectParams struct { LeaderID string Now *time.Time TTL float64 + Name string } func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (int64, error) { - result, err := db.Exec(ctx, leaderAttemptReelect, arg.LeaderID, arg.Now, arg.TTL) + result, err := db.Exec(ctx, leaderAttemptReelect, + arg.LeaderID, + arg.Now, + arg.TTL, + arg.Name, + ) if err != nil { return 0, err } @@ -106,11 +122,13 @@ const leaderInsert = `-- name: LeaderInsert :one INSERT INTO /* TEMPLATE: schema */river_leader( elected_at, expires_at, - leader_id + leader_id, + name ) VALUES ( coalesce($1::timestamptz, coalesce($2::timestamptz, now())), coalesce($3::timestamptz, coalesce($2::timestamptz, now()) + make_interval(secs => $4)), - $5 + $5, + $6 ) RETURNING elected_at, expires_at, leader_id, name ` @@ -120,6 +138,7 @@ type LeaderInsertParams struct { ExpiresAt *time.Time TTL float64 LeaderID string + Name string } func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertParams) (*RiverLeader, error) { @@ -129,6 +148,7 @@ func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertPa arg.ExpiresAt, arg.TTL, arg.LeaderID, + arg.Name, ) var i RiverLeader err := row.Scan( diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql index 6a114720..f810a676 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql @@ -25,15 +25,26 @@ SET RETURNING *; -- name: QueueDeleteExpired :many -DELETE FROM /* TEMPLATE: schema */river_queue -WHERE name IN ( - SELECT name - FROM /* TEMPLATE: schema */river_queue - WHERE river_queue.updated_at < @updated_at_horizon - ORDER BY name ASC - LIMIT @max::bigint +WITH deleted_queues AS ( + DELETE FROM /* TEMPLATE: schema */river_queue + WHERE name IN ( + SELECT name + FROM /* TEMPLATE: schema */river_queue + WHERE river_queue.updated_at < @updated_at_horizon + AND ( + @queues_included::text[] IS NULL + OR name = any(@queues_included) + ) + ORDER BY name ASC + LIMIT @max::bigint + ) + RETURNING * ) -RETURNING *; +-- Uses a CTE only to guarantee return order. +SELECT * +FROM /* TEMPLATE: schema */river_queue +WHERE name IN (SELECT name FROM deleted_queues) +ORDER BY name ASC; -- name: QueueGet :one SELECT * diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go index 18525033..f208f2aa 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go @@ -57,24 +57,36 @@ func (q *Queries) QueueCreateOrSetUpdatedAt(ctx context.Context, db DBTX, arg *Q } const queueDeleteExpired = `-- name: QueueDeleteExpired :many -DELETE FROM /* TEMPLATE: schema */river_queue -WHERE name IN ( - SELECT name - FROM /* TEMPLATE: schema */river_queue - WHERE river_queue.updated_at < $1 - ORDER BY name ASC - LIMIT $2::bigint +WITH deleted_queues AS ( + DELETE FROM /* TEMPLATE: schema */river_queue + WHERE name IN ( + SELECT name + FROM /* TEMPLATE: schema */river_queue + WHERE river_queue.updated_at < $1 + AND ( + $2::text[] IS NULL + OR name = any($2) + ) + ORDER BY name ASC + LIMIT $3::bigint + ) + RETURNING name, created_at, metadata, paused_at, updated_at ) -RETURNING name, created_at, metadata, paused_at, updated_at +SELECT name, created_at, metadata, paused_at, updated_at +FROM /* TEMPLATE: schema */river_queue +WHERE name IN (SELECT name FROM deleted_queues) +ORDER BY name ASC ` type QueueDeleteExpiredParams struct { UpdatedAtHorizon time.Time + QueuesIncluded []string Max int64 } +// Uses a CTE only to guarantee return order. func (q *Queries) QueueDeleteExpired(ctx context.Context, db DBTX, arg *QueueDeleteExpiredParams) ([]*RiverQueue, error) { - rows, err := db.Query(ctx, queueDeleteExpired, arg.UpdatedAtHorizon, arg.Max) + rows, err := db.Query(ctx, queueDeleteExpired, arg.UpdatedAtHorizon, arg.QueuesIncluded, arg.Max) if err != nil { return nil, err } diff --git a/riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.down.sql b/riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.down.sql new file mode 100644 index 00000000..70d21044 --- /dev/null +++ b/riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.down.sql @@ -0,0 +1,3 @@ +ALTER TABLE /* TEMPLATE: schema */river_leader + DROP CONSTRAINT name_length, + ADD CONSTRAINT name_length CHECK (name = 'default'); \ No newline at end of file diff --git a/riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.up.sql b/riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.up.sql new file mode 100644 index 00000000..b1721078 --- /dev/null +++ b/riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.up.sql @@ -0,0 +1,3 @@ +ALTER TABLE /* TEMPLATE: schema */river_leader + DROP CONSTRAINT name_length, + ADD CONSTRAINT name_length CHECK (char_length(name) > 0 AND char_length(name) < 128); \ No newline at end of file diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index c9650d6a..96a2744a 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -335,8 +335,9 @@ func (e *Executor) JobGetByKindMany(ctx context.Context, params *riverdriver.Job func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetStuckParams) ([]*rivertype.JobRow, error) { jobs, err := dbsqlc.New().JobGetStuck(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetStuckParams{ - Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec - StuckHorizon: params.StuckHorizon, + Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec + QueuesIncluded: params.QueuesIncluded, + StuckHorizon: params.StuckHorizon, }) if err != nil { return nil, interpretError(err) @@ -588,8 +589,9 @@ func (e *Executor) JobRetry(ctx context.Context, params *riverdriver.JobRetryPar func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobScheduleParams) ([]*riverdriver.JobScheduleResult, error) { scheduleResults, err := dbsqlc.New().JobSchedule(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobScheduleParams{ - Max: int64(params.Max), - Now: params.Now, + Max: int64(params.Max), + Now: params.Now, + QueuesIncluded: params.QueuesIncluded, }) if err != nil { return nil, interpretError(err) @@ -703,6 +705,7 @@ func (e *Executor) JobUpdateFull(ctx context.Context, params *riverdriver.JobUpd func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { numElectionsWon, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ LeaderID: params.LeaderID, + Name: params.Name, Now: params.Now, TTL: params.TTL.Seconds(), }) @@ -715,6 +718,7 @@ func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.L func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { numElectionsWon, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ LeaderID: params.LeaderID, + Name: params.Name, Now: params.Now, TTL: params.TTL.Seconds(), }) @@ -745,6 +749,7 @@ func (e *Executor) LeaderInsert(ctx context.Context, params *riverdriver.LeaderI ElectedAt: params.ElectedAt, ExpiresAt: params.ExpiresAt, LeaderID: params.LeaderID, + Name: params.Name, Now: params.Now, TTL: params.TTL.Seconds(), }) @@ -871,11 +876,13 @@ func (e *Executor) QueueCreateOrSetUpdatedAt(ctx context.Context, params *riverd func (e *Executor) QueueDeleteExpired(ctx context.Context, params *riverdriver.QueueDeleteExpiredParams) ([]string, error) { queues, err := dbsqlc.New().QueueDeleteExpired(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.QueueDeleteExpiredParams{ Max: int64(params.Max), + QueuesIncluded: params.QueuesIncluded, UpdatedAtHorizon: params.UpdatedAtHorizon, }) if err != nil { return nil, interpretError(err) } + queueNames := make([]string, len(queues)) for i, q := range queues { queueNames[i] = q.Name diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql index dfb72dc3..a3091867 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql @@ -92,34 +92,19 @@ RETURNING *; -- name: JobDeleteBefore :execresult DELETE FROM /* TEMPLATE: schema */river_job -WHERE - id IN ( - SELECT id - FROM /* TEMPLATE: schema */river_job - WHERE +WHERE id IN ( + SELECT id + FROM /* TEMPLATE: schema */river_job + WHERE ( (state = 'cancelled' AND finalized_at < cast(@cancelled_finalized_at_horizon AS text)) OR (state = 'completed' AND finalized_at < cast(@completed_finalized_at_horizon AS text)) OR (state = 'discarded' AND finalized_at < cast(@discarded_finalized_at_horizon AS text)) - ORDER BY id - LIMIT @max - ) - -- This is really awful, but unless the `sqlc.slice` appears as the very - -- last parameter in the query things will fail if it includes more than one - -- element. The sqlc SQLite driver uses position-based placeholders (?1) for - -- most parameters, but unnamed ones with `sqlc.slice` (?), and when - -- positional parameters follow unnamed parameters great confusion is the - -- result. Making sure `sqlc.slice` is last is the only workaround I could - -- find, but it stops working if there are multiple clauses that need a - -- positional placeholder plus `sqlc.slice` like this one (the Postgres - -- driver supports a `queues_included` parameter that I couldn't support - -- here). The non-workaround version is (unfortunately) to never, ever use - -- the sqlc driver for SQLite -- it's not a little buggy, it's off the - -- charts buggy, and there's little interest from the maintainers in fixing - -- any of it. We already started using it though, so plough on. - AND ( - cast(@queues_excluded_empty AS boolean) - OR river_job.queue NOT IN (sqlc.slice('queues_excluded')) - ); + ) + AND (/* TEMPLATE_BEGIN: queues_excluded_clause */ true /* TEMPLATE_END */) + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) + ORDER BY id + LIMIT @max +); -- name: JobDeleteMany :many DELETE FROM /* TEMPLATE: schema */river_job @@ -186,6 +171,7 @@ SELECT * FROM /* TEMPLATE: schema */river_job WHERE state = 'running' AND attempted_at < cast(@stuck_horizon AS text) + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) ORDER BY id LIMIT @max; @@ -411,6 +397,7 @@ FROM /* TEMPLATE: schema */river_job WHERE state IN ('retryable', 'scheduled') AND scheduled_at <= coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')) + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) ORDER BY priority, scheduled_at, diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go index 0a86bc05..c318519f 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go @@ -214,34 +214,19 @@ func (q *Queries) JobDelete(ctx context.Context, db DBTX, id int64) (*RiverJob, const jobDeleteBefore = `-- name: JobDeleteBefore :execresult DELETE FROM /* TEMPLATE: schema */river_job -WHERE - id IN ( - SELECT id - FROM /* TEMPLATE: schema */river_job - WHERE +WHERE id IN ( + SELECT id + FROM /* TEMPLATE: schema */river_job + WHERE ( (state = 'cancelled' AND finalized_at < cast(?1 AS text)) OR (state = 'completed' AND finalized_at < cast(?2 AS text)) OR (state = 'discarded' AND finalized_at < cast(?3 AS text)) - ORDER BY id - LIMIT ?4 - ) - -- This is really awful, but unless the ` + "`" + `sqlc.slice` + "`" + ` appears as the very - -- last parameter in the query things will fail if it includes more than one - -- element. The sqlc SQLite driver uses position-based placeholders (?1) for - -- most parameters, but unnamed ones with ` + "`" + `sqlc.slice` + "`" + ` (?), and when - -- positional parameters follow unnamed parameters great confusion is the - -- result. Making sure ` + "`" + `sqlc.slice` + "`" + ` is last is the only workaround I could - -- find, but it stops working if there are multiple clauses that need a - -- positional placeholder plus ` + "`" + `sqlc.slice` + "`" + ` like this one (the Postgres - -- driver supports a ` + "`" + `queues_included` + "`" + ` parameter that I couldn't support - -- here). The non-workaround version is (unfortunately) to never, ever use - -- the sqlc driver for SQLite -- it's not a little buggy, it's off the - -- charts buggy, and there's little interest from the maintainers in fixing - -- any of it. We already started using it though, so plough on. - AND ( - cast(?5 AS boolean) - OR river_job.queue NOT IN (/*SLICE:queues_excluded*/?) - ) + ) + AND (/* TEMPLATE_BEGIN: queues_excluded_clause */ true /* TEMPLATE_END */) + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) + ORDER BY id + LIMIT ?4 +) ` type JobDeleteBeforeParams struct { @@ -249,27 +234,15 @@ type JobDeleteBeforeParams struct { CompletedFinalizedAtHorizon string DiscardedFinalizedAtHorizon string Max int64 - QueuesExcludedEmpty bool - QueuesExcluded []string } func (q *Queries) JobDeleteBefore(ctx context.Context, db DBTX, arg *JobDeleteBeforeParams) (sql.Result, error) { - query := jobDeleteBefore - var queryParams []interface{} - queryParams = append(queryParams, arg.CancelledFinalizedAtHorizon) - queryParams = append(queryParams, arg.CompletedFinalizedAtHorizon) - queryParams = append(queryParams, arg.DiscardedFinalizedAtHorizon) - queryParams = append(queryParams, arg.Max) - queryParams = append(queryParams, arg.QueuesExcludedEmpty) - if len(arg.QueuesExcluded) > 0 { - for _, v := range arg.QueuesExcluded { - queryParams = append(queryParams, v) - } - query = strings.Replace(query, "/*SLICE:queues_excluded*/?", strings.Repeat(",?", len(arg.QueuesExcluded))[1:], 1) - } else { - query = strings.Replace(query, "/*SLICE:queues_excluded*/?", "NULL", 1) - } - return db.ExecContext(ctx, query, queryParams...) + return db.ExecContext(ctx, jobDeleteBefore, + arg.CancelledFinalizedAtHorizon, + arg.CompletedFinalizedAtHorizon, + arg.DiscardedFinalizedAtHorizon, + arg.Max, + ) } const jobDeleteMany = `-- name: JobDeleteMany :many @@ -562,6 +535,7 @@ SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finali FROM /* TEMPLATE: schema */river_job WHERE state = 'running' AND attempted_at < cast(?1 AS text) + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) ORDER BY id LIMIT ?2 ` @@ -1183,6 +1157,7 @@ FROM /* TEMPLATE: schema */river_job WHERE state IN ('retryable', 'scheduled') AND scheduled_at <= coalesce(cast(?1 AS text), datetime('now', 'subsec')) + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) ORDER BY priority, scheduled_at, diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql b/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql index c5da22cb..fb690ef5 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql @@ -11,11 +11,13 @@ CREATE TABLE river_leader ( INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( @leader_id, coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), - datetime(coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), 'subsec', cast(@ttl as text)) + datetime(coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), 'subsec', cast(@ttl as text)), + @name ) ON CONFLICT (name) DO NOTHING; @@ -24,11 +26,13 @@ ON CONFLICT (name) INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( @leader_id, coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), - datetime(coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), 'subsec', cast(@ttl as text)) + datetime(coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), 'subsec', cast(@ttl as text)), + @name ) ON CONFLICT (name) DO UPDATE SET @@ -48,11 +52,13 @@ FROM /* TEMPLATE: schema */river_leader; INSERT INTO /* TEMPLATE: schema */river_leader( elected_at, expires_at, - leader_id + leader_id, + name ) VALUES ( coalesce(cast(sqlc.narg('elected_at') AS text), cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), coalesce(cast(sqlc.narg('expires_at') AS text), datetime(coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), 'subsec', cast(@ttl as text))), - @leader_id + @leader_id, + @name ) RETURNING *; -- name: LeaderResign :execrows diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql.go index 2c50d5f9..0cd94a6a 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql.go @@ -13,11 +13,13 @@ const leaderAttemptElect = `-- name: LeaderAttemptElect :execrows INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( ?1, coalesce(cast(?2 AS text), datetime('now', 'subsec')), - datetime(coalesce(cast(?2 AS text), datetime('now', 'subsec')), 'subsec', cast(?3 as text)) + datetime(coalesce(cast(?2 AS text), datetime('now', 'subsec')), 'subsec', cast(?3 as text)), + ?4 ) ON CONFLICT (name) DO NOTHING @@ -27,10 +29,16 @@ type LeaderAttemptElectParams struct { LeaderID string Now *string TTL string + Name string } func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (int64, error) { - result, err := db.ExecContext(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) + result, err := db.ExecContext(ctx, leaderAttemptElect, + arg.LeaderID, + arg.Now, + arg.TTL, + arg.Name, + ) if err != nil { return 0, err } @@ -41,11 +49,13 @@ const leaderAttemptReelect = `-- name: LeaderAttemptReelect :execrows INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( ?1, coalesce(cast(?2 AS text), datetime('now', 'subsec')), - datetime(coalesce(cast(?2 AS text), datetime('now', 'subsec')), 'subsec', cast(?3 as text)) + datetime(coalesce(cast(?2 AS text), datetime('now', 'subsec')), 'subsec', cast(?3 as text)), + ?4 ) ON CONFLICT (name) DO UPDATE SET @@ -58,10 +68,16 @@ type LeaderAttemptReelectParams struct { LeaderID string Now *string TTL string + Name string } func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (int64, error) { - result, err := db.ExecContext(ctx, leaderAttemptReelect, arg.LeaderID, arg.Now, arg.TTL) + result, err := db.ExecContext(ctx, leaderAttemptReelect, + arg.LeaderID, + arg.Now, + arg.TTL, + arg.Name, + ) if err != nil { return 0, err } @@ -102,11 +118,13 @@ const leaderInsert = `-- name: LeaderInsert :one INSERT INTO /* TEMPLATE: schema */river_leader( elected_at, expires_at, - leader_id + leader_id, + name ) VALUES ( coalesce(cast(?1 AS text), cast(?2 AS text), datetime('now', 'subsec')), coalesce(cast(?3 AS text), datetime(coalesce(cast(?2 AS text), datetime('now', 'subsec')), 'subsec', cast(?4 as text))), - ?5 + ?5, + ?6 ) RETURNING elected_at, expires_at, leader_id, name ` @@ -116,6 +134,7 @@ type LeaderInsertParams struct { ExpiresAt *string TTL string LeaderID string + Name string } func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertParams) (*RiverLeader, error) { @@ -125,6 +144,7 @@ func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertPa arg.ExpiresAt, arg.TTL, arg.LeaderID, + arg.Name, ) var i RiverLeader err := row.Scan( diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql b/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql index 1aa7c5fe..f9ceb3ab 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql @@ -30,6 +30,7 @@ WHERE name IN ( SELECT name FROM /* TEMPLATE: schema */river_queue WHERE river_queue.updated_at < @updated_at_horizon + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) ORDER BY name ASC LIMIT @max ) diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql.go index a89f2b74..25afa4ff 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql.go @@ -63,6 +63,7 @@ WHERE name IN ( SELECT name FROM /* TEMPLATE: schema */river_queue WHERE river_queue.updated_at < ?1 + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) ORDER BY name ASC LIMIT ?2 ) diff --git a/riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.down.sql b/riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.down.sql new file mode 100644 index 00000000..4d99b4fa --- /dev/null +++ b/riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.down.sql @@ -0,0 +1,15 @@ +-- +-- Alter `river_leader` to add a default value of 'default` to `name`. SQLite +-- doesn't allow schema modifications, so this redefines the table entirely. +-- + +DROP TABLE /* TEMPLATE: schema */river_leader; + +CREATE TABLE /* TEMPLATE: schema */river_leader ( + elected_at timestamp NOT NULL, + expires_at timestamp NOT NULL, + leader_id text NOT NULL, + name text PRIMARY KEY NOT NULL DEFAULT 'default' CHECK (name = 'default'), + CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128), + CONSTRAINT leader_id_length CHECK (length(leader_id) > 0 AND length(leader_id) < 128) +); \ No newline at end of file diff --git a/riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.up.sql b/riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.up.sql new file mode 100644 index 00000000..a38caa24 --- /dev/null +++ b/riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.up.sql @@ -0,0 +1,16 @@ +-- +-- Alter `river_leader` to remove check constraint that `name` must be +-- `default`. SQLite doesn't allow schema modifications, so this redefines the +-- table entirely. +-- + +DROP TABLE /* TEMPLATE: schema */river_leader; + +CREATE TABLE /* TEMPLATE: schema */river_leader ( + elected_at timestamp NOT NULL, + expires_at timestamp NOT NULL, + leader_id text NOT NULL, + name text PRIMARY KEY NOT NULL DEFAULT 'default', + CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128), + CONSTRAINT leader_id_length CHECK (length(leader_id) > 0 AND length(leader_id) < 128) +); \ No newline at end of file diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index 7e6af4bc..49ec2cc8 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -342,18 +342,81 @@ func (e *Executor) JobDelete(ctx context.Context, params *riverdriver.JobDeleteP }) } +// queuesClause generates a partial SQL fragment used to checking whether a +// query's queue is included or excluded from a given set of injected queues. +// +// This is really, really awful, and I wish there was a way I could find to +// avoid doing this, but it's the only option I could come up with to work +// around the fact that sqlc is incredibly buggy, and near non-functionally +// buggy when it comes to SQLite. +// +// I tried to use an slice-like inject using `sqlc.slice(...)`, but because that +// generates `?` placeholders without numbers, it's completely incompatible with +// other parameters if it shows up anywhere in the query except for the very +// end. It's sometimes possible to rearrange the query so it appears at the end, +// but even that causes issues in other cases like if you want to use multiple +// `sqlc.slice(...)` calls (e.g. using both a `queues_included` and +// `queues_excluded`). +// +// Next up, you could workaround the problem using a `json_each(@json_array)`, +// but sqlc can't find variables used in a function like this. Not for a good +// reason, but again, just because it's extremely buggy. +// +// So instead, we use a `json_each` approach, but we have to manually inject via +// our home-grown templating system (see the `sqlctemplate` package). It's not +// great, and possibly bad even, but it works. +func queuesClause(replacements map[string]sqlctemplate.Replacement, namedArgs map[string]any, clauseName, columnName string, queues []string, isExcluded bool) error { + if queues == nil { + replacements[clauseName] = sqlctemplate.Replacement{Value: "true"} + return nil + } + + var ( + paramName = clauseName + "_arg" + clauseSQL = ` + EXISTS ( + SELECT 1 + FROM json_each(@` + paramName + `) + WHERE json_each.value = ` + columnName + ` + ) + ` + ) + + if isExcluded { + clauseSQL = "NOT " + clauseSQL + } + + data, err := json.Marshal(queues) + if err != nil { + return fmt.Errorf("error marshaling queues: %w", err) + } + + replacements[clauseName] = sqlctemplate.Replacement{Value: clauseSQL} + namedArgs[paramName] = data + + return nil +} + func (e *Executor) JobDeleteBefore(ctx context.Context, params *riverdriver.JobDeleteBeforeParams) (int, error) { - if len(params.QueuesIncluded) > 0 { - return 0, riverdriver.ErrNotImplemented + var ( + replacements = make(map[string]sqlctemplate.Replacement) + namedArgs = make(map[string]any) + ) + + if err := queuesClause(replacements, namedArgs, "queues_excluded_clause", "queue", params.QueuesExcluded, true); err != nil { + return 0, err + } + if err := queuesClause(replacements, namedArgs, "queues_included_clause", "queue", params.QueuesIncluded, false); err != nil { + return 0, err } + ctx = sqlctemplate.WithReplacements(ctx, replacements, namedArgs) + res, err := dbsqlc.New().JobDeleteBefore(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobDeleteBeforeParams{ CancelledFinalizedAtHorizon: timeString(params.CancelledFinalizedAtHorizon), CompletedFinalizedAtHorizon: timeString(params.CompletedFinalizedAtHorizon), DiscardedFinalizedAtHorizon: timeString(params.DiscardedFinalizedAtHorizon), Max: int64(params.Max), - QueuesExcluded: params.QueuesExcluded, - QueuesExcludedEmpty: len(params.QueuesExcluded) < 1, // not in the Postgres version, but I couldn't find a way around it }) if err != nil { return 0, interpretError(err) @@ -473,6 +536,17 @@ func (e *Executor) JobGetByKindMany(ctx context.Context, params *riverdriver.Job } func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetStuckParams) ([]*rivertype.JobRow, error) { + var ( + replacements = make(map[string]sqlctemplate.Replacement) + namedArgs = make(map[string]any) + ) + + if err := queuesClause(replacements, namedArgs, "queues_included_clause", "queue", params.QueuesIncluded, false); err != nil { + return nil, err + } + + ctx = sqlctemplate.WithReplacements(ctx, replacements, namedArgs) + jobs, err := dbsqlc.New().JobGetStuck(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetStuckParams{ Max: int64(params.Max), StuckHorizon: timeString(params.StuckHorizon), @@ -837,10 +911,24 @@ func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobSched ctx = schemaTemplateParam(ctx, params.Schema) dbtx := templateReplaceWrapper{dbtx: e.driver.UnwrapTx(execTx), replacer: &e.driver.replacer} - eligibleJobs, err := dbsqlc.New().JobScheduleGetEligible(schemaTemplateParam(ctx, params.Schema), dbtx, &dbsqlc.JobScheduleGetEligibleParams{ - Max: int64(params.Max), - Now: timeStringNullable(params.Now), - }) + eligibleJobs, err := func() ([]*dbsqlc.RiverJob, error) { + var ( + replacements = make(map[string]sqlctemplate.Replacement) + namedArgs = make(map[string]any) + ) + + if err := queuesClause(replacements, namedArgs, "queues_included_clause", "queue", params.QueuesIncluded, false); err != nil { + return nil, err + } + + ctx := sqlctemplate.WithReplacementsDup(ctx) // dupe so these new replacements don't leak into queries below (WithReplacements mutates an existing context container rather copies-on-write it) + ctx = sqlctemplate.WithReplacements(ctx, replacements, namedArgs) + + return dbsqlc.New().JobScheduleGetEligible(schemaTemplateParam(ctx, params.Schema), dbtx, &dbsqlc.JobScheduleGetEligibleParams{ + Max: int64(params.Max), + Now: timeStringNullable(params.Now), + }) + }() if err != nil { return nil, interpretError(err) } @@ -1070,6 +1158,7 @@ func (e *Executor) JobUpdateFull(ctx context.Context, params *riverdriver.JobUpd func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { numElectionsWon, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ LeaderID: params.LeaderID, + Name: params.Name, Now: timeStringNullable(params.Now), TTL: durationAsString(params.TTL), }) @@ -1082,6 +1171,7 @@ func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.L func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { numElectionsWon, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ LeaderID: params.LeaderID, + Name: params.Name, Now: timeStringNullable(params.Now), TTL: durationAsString(params.TTL), }) @@ -1111,6 +1201,7 @@ func (e *Executor) LeaderInsert(ctx context.Context, params *riverdriver.LeaderI leader, err := dbsqlc.New().LeaderInsert(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderInsertParams{ ElectedAt: timeStringNullable(params.ElectedAt), ExpiresAt: timeStringNullable(params.ExpiresAt), + Name: params.Name, Now: timeStringNullable(params.Now), LeaderID: params.LeaderID, TTL: durationAsString(params.TTL), @@ -1256,6 +1347,17 @@ func (e *Executor) QueueCreateOrSetUpdatedAt(ctx context.Context, params *riverd } func (e *Executor) QueueDeleteExpired(ctx context.Context, params *riverdriver.QueueDeleteExpiredParams) ([]string, error) { + var ( + replacements = make(map[string]sqlctemplate.Replacement) + namedArgs = make(map[string]any) + ) + + if err := queuesClause(replacements, namedArgs, "queues_included_clause", "name", params.QueuesIncluded, false); err != nil { + return nil, err + } + + ctx = sqlctemplate.WithReplacements(ctx, replacements, namedArgs) + queues, err := dbsqlc.New().QueueDeleteExpired(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.QueueDeleteExpiredParams{ Max: int64(params.Max), UpdatedAtHorizon: params.UpdatedAtHorizon.UTC(), diff --git a/rivershared/sqlctemplate/sqlc_template.go b/rivershared/sqlctemplate/sqlc_template.go index 88c02edb..cd2657d9 100644 --- a/rivershared/sqlctemplate/sqlc_template.go +++ b/rivershared/sqlctemplate/sqlc_template.go @@ -188,8 +188,8 @@ func (r *Replacer) RunSafely(ctx context.Context, argPlaceholder, sql string, ar } updatedSQL := sql - updatedSQL = replaceTemplate(updatedSQL, templateBeginEndRE) updatedSQL = replaceTemplate(updatedSQL, templateRE) + updatedSQL = replaceTemplate(updatedSQL, templateBeginEndRE) if len(templatesExpected) > 0 { return "", nil, errors.New("sqlctemplate params present in context but missing in SQL: " + strings.Join(templatesExpected, ", ")) @@ -265,6 +265,33 @@ func WithReplacements(ctx context.Context, replacements map[string]Replacement, }) } +// WithReplacementsDup duplicates the template replacements container in context. +// +// Normally, adding replacements modifies an existing container in context. Call +// this function before WithReplacements to create a dup instead. +// +// TODO(brandur): This API isn't great. Reconsider it. Maybe WithReplacements +// should always dup? +func WithReplacementsDup(ctx context.Context) context.Context { + var ( + namedArgs map[string]any + replacements map[string]Replacement + ) + + if container, ok := ctx.Value(contextKey{}).(*contextContainer); ok { + namedArgs = maps.Clone(container.NamedArgs) + replacements = maps.Clone(container.Replacements) + } else { + namedArgs = make(map[string]any) + replacements = make(map[string]Replacement) + } + + return context.WithValue(ctx, contextKey{}, &contextContainer{ + NamedArgs: namedArgs, + Replacements: replacements, + }) +} + // Comparable struct that's used as a key for template caching. type replacerCacheKey struct { namedArgs string // all arg names concatenated together diff --git a/rivershared/testfactory/test_factory.go b/rivershared/testfactory/test_factory.go index 26cf4b88..84f51d85 100644 --- a/rivershared/testfactory/test_factory.go +++ b/rivershared/testfactory/test_factory.go @@ -111,6 +111,7 @@ type LeaderOpts struct { ElectedAt *time.Time ExpiresAt *time.Time LeaderID *string + Name *string Now *time.Time Schema string } @@ -122,6 +123,7 @@ func Leader(ctx context.Context, tb testing.TB, exec riverdriver.Executor, opts ElectedAt: opts.ElectedAt, ExpiresAt: opts.ExpiresAt, LeaderID: ptrutil.ValOrDefault(opts.LeaderID, "test-client-id"), + Name: ptrutil.ValOrDefault(opts.Name, "default"), Now: opts.Now, Schema: opts.Schema, TTL: 10 * time.Second,