From 1c32845b392bf6668a2f1c6e065019594fe8611c Mon Sep 17 00:00:00 2001 From: Brandur Date: Mon, 22 Dec 2025 13:48:10 -0700 Subject: [PATCH] Add leadership "domains" so multiple Rivers can operate in one schema We've gotten a couple requests so far (see #342 and #1105) to be able to start multiple River clients targeting different queues within the same database/schema, and giving them the capacity to operate independently enough to be functional. This is currently not possible because a single leader is elected given a single schema and it handles all maintenance operations including non-queue ones like periodic job enqueuing. Here, add the idea of a `LeaderDomain`. This lets a user set the "domain" on which a client will elect its leader and allowing multiple leaders to be elected in a single schema. Each leader will run its own maintenance services. Setting `LeaderDomain` causes the additional effect of having maintenance services start to operate only on the queues that their client is configured for. The idea here is to give us backwards compatibility in that the default behavior (in case of an unset `LeaderDomain`) is the same, but providing a path for multiple leaders to be interoperable with each other. There are still a few edges: for example, reindexing is not queue specific, so multiple leaders could be running a reindexer. I've provided guidance in the config documentation that ideally, all clients but one should have their reindexer disabled. --- CHANGELOG.md | 4 + client.go | 62 ++++- client_test.go | 148 ++++++++++++ internal/leadership/elector.go | 33 +-- internal/leadership/elector_test.go | 43 ++++ internal/maintenance/job_cleaner.go | 12 + internal/maintenance/job_cleaner_test.go | 53 ++++- internal/maintenance/job_rescuer.go | 16 +- internal/maintenance/job_rescuer_test.go | 38 +++ internal/maintenance/job_scheduler.go | 43 ++-- internal/maintenance/job_scheduler_test.go | 63 +++-- internal/maintenance/queue_cleaner.go | 9 + internal/maintenance/queue_cleaner_test.go | 37 ++- riverdriver/river_driver_interface.go | 18 +- .../internal/dbsqlc/river_job.sql.go | 30 ++- .../internal/dbsqlc/river_leader.sql.go | 36 ++- .../internal/dbsqlc/river_queue.sql.go | 30 ++- ...007_river_leader_non_default_name.down.sql | 3 + .../007_river_leader_non_default_name.up.sql | 3 + .../river_database_sql_driver.go | 14 +- .../riverdrivertest/riverdrivertest.go | 216 +++++++++++++----- .../riverpgxv5/internal/dbsqlc/river_job.sql | 8 + .../internal/dbsqlc/river_job.sql.go | 30 ++- .../internal/dbsqlc/river_leader.sql | 25 +- .../internal/dbsqlc/river_leader.sql.go | 36 ++- .../internal/dbsqlc/river_queue.sql | 27 ++- .../internal/dbsqlc/river_queue.sql.go | 30 ++- ...007_river_leader_non_default_name.down.sql | 3 + .../007_river_leader_non_default_name.up.sql | 3 + riverdriver/riverpgxv5/river_pgx_v5_driver.go | 15 +- .../riversqlite/internal/dbsqlc/river_job.sql | 37 +-- .../internal/dbsqlc/river_job.sql.go | 61 ++--- .../internal/dbsqlc/river_leader.sql | 18 +- .../internal/dbsqlc/river_leader.sql.go | 36 ++- .../internal/dbsqlc/river_queue.sql | 1 + .../internal/dbsqlc/river_queue.sql.go | 1 + ...007_river_leader_non_default_name.down.sql | 15 ++ .../007_river_leader_non_default_name.up.sql | 16 ++ .../riversqlite/river_sqlite_driver.go | 118 +++++++++- rivershared/sqlctemplate/sqlc_template.go | 29 ++- rivershared/testfactory/test_factory.go | 2 + 41 files changed, 1124 insertions(+), 298 deletions(-) create mode 100644 riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.down.sql create mode 100644 riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.up.sql create mode 100644 riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.down.sql create mode 100644 riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.up.sql create mode 100644 riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.down.sql create mode 100644 riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.up.sql diff --git a/CHANGELOG.md b/CHANGELOG.md index 93c3bc2b..8ce429cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Added `Config.LeaderDomain` to allow multiple River clients to be elected leader within a single schema/database and run maintenance services on only their configured queues. [PR #1113](https://github.com/riverqueue/river/pull/1113). + ## [0.29.0] - 2025-12-22 ### Added diff --git a/client.go b/client.go index 4d54b39a..70872be0 100644 --- a/client.go +++ b/client.go @@ -9,6 +9,7 @@ import ( "log/slog" "os" "regexp" + "slices" "strings" "sync" "time" @@ -209,6 +210,47 @@ type Config struct { // Jobs may have their own specific hooks by implementing JobArgsWithHooks. Hooks []rivertype.Hook + // LeaderDomain is an optional "domain" string to use for leader election. + // Different clients sharing the same River schema can elect multiple + // leaders as long as they're using different domains, with one leader + // elected per domain. + // + // Setting this value also triggers the related behavior that maintenance + // services start to only operate on the queues they're configured on. So + // for example, given client1 handling queue_a and queue_b and client2 + // handling queue_c and queue_d, whichever client is elected leader will end + // up running all maintenance services for all queues (queue_a, queue_b, + // queue_c, and queue_d). But if client1 is using domain "domain1" and + // client2 is using domain "domain2", then client1 (elected in domain1) will + // only run maintenance services on queue_a and queue_b, while client2 + // (elected in domain2) will run maintenance services on queue_c and + // queue_d. + // + // A warning though that River *does not protect against configuration + // mistakes*. If client1 on domain1 is configured for queue_a and queue_b, + // and client2 on domain2 is *also* configured for queue_a and queue_b, then + // both clients may end up running maintenance services on the same queues + // at the same time. It's the caller's responsibility to ensure that doesn't + // happen. + // + // Left empty or use of the special value "default" causes the client to + // operate on all queues. When setting this value to non-empty + // non-"default", no other clients should be left empty or use "default" + // because the default client(s) will infringe on the domains of the + // non-default one(s). + // + // Certain maintenance services that aren't queue-related like the indexer + // will continue to run on all leaders regardless of domain. If using this + // feature, it's a good idea to configure ReindexerTimeout on all but a + // single leader domain to river.NeverSchedule(). + // + // In general, most River users should not need LeaderDomain, and when + // running multiple Rivers may want to consider using multiple databases and + // multiple schemas instead. + // + // Defaults to "default". + LeaderDomain string + // Logger is the structured logger to use for logging purposes. If none is // specified, logs will be emitted to STDOUT with messages at warn level // or higher. @@ -415,6 +457,7 @@ func (c *Config) WithDefaults() *Config { Hooks: c.Hooks, JobInsertMiddleware: c.JobInsertMiddleware, JobTimeout: cmp.Or(c.JobTimeout, JobTimeoutDefault), + LeaderDomain: c.LeaderDomain, Logger: logger, MaxAttempts: cmp.Or(c.MaxAttempts, MaxAttemptsDefault), Middleware: c.Middleware, @@ -840,6 +883,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, &leadership.Config{ ClientID: config.ID, + Domain: config.LeaderDomain, Schema: config.Schema, }) client.services = append(client.services, client.elector) @@ -860,6 +904,14 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client client.services = append(client.services, pluginPilot.PluginServices()...) } + // It's important for queuesIncluded to be `nil` in case it's not in use + // for the various driver queries to work correctly. + var queuesIncluded []string + if config.LeaderDomain != "" && config.LeaderDomain != leadership.DomainDefault && len(config.Queues) > 0 { + queuesIncluded = maputil.Keys(config.Queues) + slices.Sort(queuesIncluded) + } + // // Maintenance services // @@ -872,6 +924,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client CompletedJobRetentionPeriod: config.CompletedJobRetentionPeriod, DiscardedJobRetentionPeriod: config.DiscardedJobRetentionPeriod, QueuesExcluded: client.pilot.JobCleanerQueuesExcluded(), + QueuesIncluded: queuesIncluded, Schema: config.Schema, Timeout: config.JobCleanerTimeout, }, driver.GetExecutor()) @@ -882,6 +935,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client { jobRescuer := maintenance.NewRescuer(archetype, &maintenance.JobRescuerConfig{ ClientRetryPolicy: config.RetryPolicy, + QueuesIncluded: queuesIncluded, RescueAfter: config.RescueStuckJobsAfter, Schema: config.Schema, WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory { @@ -897,9 +951,10 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client { jobScheduler := maintenance.NewJobScheduler(archetype, &maintenance.JobSchedulerConfig{ - Interval: config.schedulerInterval, - NotifyInsert: client.maybeNotifyInsertForQueues, - Schema: config.Schema, + Interval: config.schedulerInterval, + NotifyInsert: client.maybeNotifyInsertForQueues, + QueuesIncluded: queuesIncluded, + Schema: config.Schema, }, driver.GetExecutor()) maintenanceServices = append(maintenanceServices, jobScheduler) client.testSignals.jobScheduler = &jobScheduler.TestSignals @@ -925,6 +980,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client { queueCleaner := maintenance.NewQueueCleaner(archetype, &maintenance.QueueCleanerConfig{ + QueuesIncluded: queuesIncluded, RetentionPeriod: maintenance.QueueRetentionPeriodDefault, Schema: config.Schema, }, driver.GetExecutor()) diff --git a/client_test.go b/client_test.go index 90ccc43e..d9ca410b 100644 --- a/client_test.go +++ b/client_test.go @@ -1491,6 +1491,154 @@ func Test_Client_Common(t *testing.T) { startstoptest.Stress(ctx, t, clientWithStop) }) + + t.Run("LeaderDomain_Alternate", func(t *testing.T) { + t.Parallel() + + var client1 *Client[pgx.Tx] + { + config, bundle := setupConfig(t) + config.LeaderDomain = "domain1" + config.ReindexerSchedule = &neverSchedule{} + config.Queues = map[string]QueueConfig{ + "queue_a": {MaxWorkers: 50}, + "queue_b": {MaxWorkers: 50}, + } + + var err error + client1, err = NewClient(bundle.driver, config) + require.NoError(t, err) + client1.testSignals.Init(t) + } + + var client2 *Client[pgx.Tx] + { + config, bundle := setupConfig(t) + config.LeaderDomain = "domain2" + config.Queues = map[string]QueueConfig{ + "queue_c": {MaxWorkers: 50}, + "queue_d": {MaxWorkers: 50}, + } + config.Schema = client1.config.Schema + config.ReindexerSchedule = &neverSchedule{} + + var err error + client2, err = NewClient(bundle.driver, config) + require.NoError(t, err) + client2.testSignals.Init(t) + } + + startClient(ctx, t, client1) + startClient(ctx, t, client2) + + // Both elected + client1.testSignals.electedLeader.WaitOrTimeout() + client2.testSignals.electedLeader.WaitOrTimeout() + }) + + t.Run("LeaderDomain_MaintenanceServiceConfigEmpty", func(t *testing.T) { + t.Parallel() + + config, bundle := setupConfig(t) + config.Queues = map[string]QueueConfig{ + "queue_a": {MaxWorkers: 50}, + "queue_b": {MaxWorkers: 50}, + } + + client, err := NewClient(bundle.driver, config) + require.NoError(t, err) + client.testSignals.Init(t) + + jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer) + require.Nil(t, jobCleaner.Config.QueuesIncluded) + jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client.queueMaintainer) + require.Nil(t, jobRescuer.Config.QueuesIncluded) + jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client.queueMaintainer) + require.Nil(t, jobScheduler.Config.QueuesIncluded) + queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client.queueMaintainer) + require.Nil(t, queueCleaner.Config.QueuesIncluded) + }) + + // The domain "default" is special in that it behaves like if LeaderDomain + // was not set. + t.Run("LeaderDomain_MaintenanceServiceConfigDefault", func(t *testing.T) { + t.Parallel() + + config, bundle := setupConfig(t) + config.LeaderDomain = "default" + config.Queues = map[string]QueueConfig{ + "queue_a": {MaxWorkers: 50}, + "queue_b": {MaxWorkers: 50}, + } + + client, err := NewClient(bundle.driver, config) + require.NoError(t, err) + client.testSignals.Init(t) + + jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer) + require.Nil(t, jobCleaner.Config.QueuesIncluded) + jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client.queueMaintainer) + require.Nil(t, jobRescuer.Config.QueuesIncluded) + jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client.queueMaintainer) + require.Nil(t, jobScheduler.Config.QueuesIncluded) + queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client.queueMaintainer) + require.Nil(t, queueCleaner.Config.QueuesIncluded) + }) + + // When non-default leader domains are configured, each client's maintenance + // services are limited to only their client's queues. + t.Run("LeaderDomain_MaintenanceServiceConfigAlternate", func(t *testing.T) { + t.Parallel() + + var client1 *Client[pgx.Tx] + { + config, bundle := setupConfig(t) + config.LeaderDomain = "domain1" + config.ReindexerSchedule = &neverSchedule{} + config.Queues = map[string]QueueConfig{ + "queue_a": {MaxWorkers: 50}, + "queue_b": {MaxWorkers: 50}, + } + + var err error + client1, err = NewClient(bundle.driver, config) + require.NoError(t, err) + client1.testSignals.Init(t) + + jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client1.queueMaintainer) + require.Equal(t, []string{"queue_a", "queue_b"}, jobCleaner.Config.QueuesIncluded) + jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client1.queueMaintainer) + require.Equal(t, []string{"queue_a", "queue_b"}, jobRescuer.Config.QueuesIncluded) + jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client1.queueMaintainer) + require.Equal(t, []string{"queue_a", "queue_b"}, jobScheduler.Config.QueuesIncluded) + queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client1.queueMaintainer) + require.Equal(t, []string{"queue_a", "queue_b"}, queueCleaner.Config.QueuesIncluded) + } + + { + config, bundle := setupConfig(t) + config.LeaderDomain = "domain2" + config.Queues = map[string]QueueConfig{ + "queue_c": {MaxWorkers: 50}, + "queue_d": {MaxWorkers: 50}, + } + config.Schema = client1.config.Schema + config.ReindexerSchedule = &neverSchedule{} + + client2, err := NewClient(bundle.driver, config) + require.NoError(t, err) + client2.testSignals.Init(t) + + jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client2.queueMaintainer) + require.Equal(t, []string{"queue_c", "queue_d"}, jobCleaner.Config.QueuesIncluded) + jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client2.queueMaintainer) + require.Equal(t, []string{"queue_c", "queue_d"}, jobRescuer.Config.QueuesIncluded) + jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client2.queueMaintainer) + require.Equal(t, []string{"queue_c", "queue_d"}, jobScheduler.Config.QueuesIncluded) + queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client2.queueMaintainer) + require.Equal(t, []string{"queue_c", "queue_d"}, queueCleaner.Config.QueuesIncluded) + } + }) } type workerWithMiddleware[T JobArgs] struct { diff --git a/internal/leadership/elector.go b/internal/leadership/elector.go index 69ddd75c..878f5d98 100644 --- a/internal/leadership/elector.go +++ b/internal/leadership/elector.go @@ -22,6 +22,8 @@ import ( "github.com/riverqueue/river/rivershared/util/testutil" ) +const DomainDefault = "default" + const ( electIntervalDefault = 5 * time.Second electIntervalJitterDefault = 1 * time.Second @@ -82,6 +84,7 @@ func (ts *electorTestSignals) Init(tb testutil.TestingTB) { type Config struct { ClientID string + Domain string ElectInterval time.Duration // period on which each elector attempts elect even without having received a resignation notification ElectIntervalJitter time.Duration Schema string @@ -121,6 +124,7 @@ func NewElector(archetype *baseservice.Archetype, exec riverdriver.Executor, not return baseservice.Init(archetype, &Elector{ config: (&Config{ ClientID: config.ClientID, + Domain: cmp.Or(config.Domain, string(DomainDefault)), ElectInterval: cmp.Or(config.ElectInterval, electIntervalDefault), ElectIntervalJitter: cmp.Or(config.ElectIntervalJitter, electIntervalJitterDefault), Schema: config.Schema, @@ -143,9 +147,9 @@ func (e *Elector) Start(ctx context.Context) error { var sub *notifier.Subscription if e.notifier == nil { - e.Logger.DebugContext(ctx, e.Name+": No notifier configured; starting in poll mode", "client_id", e.config.ClientID) + e.Logger.DebugContext(ctx, e.Name+": Resigned leadership successfully", "client_id", e.config.ClientID, "domain", e.config.Domain) } else { - e.Logger.DebugContext(ctx, e.Name+": Listening for leadership changes", "client_id", e.config.ClientID, "topic", notifier.NotificationTopicLeadership) + e.Logger.DebugContext(ctx, e.Name+": Resigned leadership successfully", "client_id", e.config.ClientID, "domain", e.config.Domain, "topic", notifier.NotificationTopicLeadership) var err error sub, err = e.notifier.Listen(ctx, notifier.NotificationTopicLeadership, func(topic notifier.NotificationTopic, payload string) { e.handleLeadershipNotification(ctx, topic, payload) @@ -180,7 +184,7 @@ func (e *Elector) Start(ctx context.Context) error { return } - e.Logger.DebugContext(ctx, e.Name+": Gained leadership", "client_id", e.config.ClientID) + e.Logger.DebugContext(ctx, e.Name+": Gained leadership", "client_id", e.config.ClientID, "domain", e.config.Domain) e.testSignals.GainedLeadership.Signal(struct{}{}) err := e.keepLeadershipLoop(ctx) @@ -193,7 +197,7 @@ func (e *Elector) Start(ctx context.Context) error { continue // lost leadership reelection; unusual but not a problem; don't log } - e.Logger.ErrorContext(ctx, e.Name+": Error keeping leadership", "client_id", e.config.ClientID, "err", err) + e.Logger.ErrorContext(ctx, e.Name+": Error keeping leadership", "client_id", e.config.ClientID, "domain", e.config.Domain, "err", err) } } }() @@ -205,10 +209,11 @@ func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error { var attempt int for { attempt++ - e.Logger.DebugContext(ctx, e.Name+": Attempting to gain leadership", "client_id", e.config.ClientID) + e.Logger.DebugContext(ctx, e.Name+": Attempting to gain leadership", "client_id", e.config.ClientID, "domain", e.config.Domain) elected, err := attemptElectOrReelect(ctx, e.exec, false, &riverdriver.LeaderElectParams{ LeaderID: e.config.ClientID, + Name: e.config.Domain, Now: e.Time.NowUTCOrNil(), Schema: e.config.Schema, TTL: e.leaderTTL(), @@ -229,7 +234,7 @@ func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error { attempt = 0 - e.Logger.DebugContext(ctx, e.Name+": Leadership bid was unsuccessful (not an error)", "client_id", e.config.ClientID) + e.Logger.DebugContext(ctx, e.Name+": Leadership bid was unsuccessful (not an error)", "client_id", e.config.ClientID, "domain", e.config.Domain) e.testSignals.DeniedLeadership.Signal(struct{}{}) select { @@ -254,17 +259,17 @@ func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error { func (e *Elector) handleLeadershipNotification(ctx context.Context, topic notifier.NotificationTopic, payload string) { if topic != notifier.NotificationTopicLeadership { // This should not happen unless the notifier is broken. - e.Logger.ErrorContext(ctx, e.Name+": Received unexpected notification", "client_id", e.config.ClientID, "topic", topic, "payload", payload) + e.Logger.ErrorContext(ctx, e.Name+": Received unexpected notification", "client_id", e.config.ClientID, "domain", e.config.Domain, "topic", topic, "payload", payload) return } notification := DBNotification{} if err := json.Unmarshal([]byte(payload), ¬ification); err != nil { - e.Logger.ErrorContext(ctx, e.Name+": Unable to unmarshal leadership notification", "client_id", e.config.ClientID, "err", err) + e.Logger.ErrorContext(ctx, e.Name+": Unable to unmarshal leadership notification", "client_id", e.config.ClientID, "domain", e.config.Domain, "err", err) return } - e.Logger.DebugContext(ctx, e.Name+": Received notification from notifier", "action", notification.Action, "client_id", e.config.ClientID) + e.Logger.DebugContext(ctx, e.Name+": Received notification from notifier", "action", notification.Action, "client_id", e.config.ClientID, "domain", e.config.Domain) // Do an initial context check so in case context is done, it always takes // precedence over sending a leadership notification. @@ -359,7 +364,7 @@ func (e *Elector) keepLeadershipLoop(ctx context.Context) error { case <-e.requestResignChan: // Receive a notification telling current leader to resign. - e.Logger.InfoContext(ctx, e.Name+": Current leader received forced resignation", "client_id", e.config.ClientID) + e.Logger.InfoContext(ctx, e.Name+": Current leader received forced resignation", "client_id", e.config.ClientID, "domain", e.config.Domain) if !timer.Stop() { <-timer.C @@ -383,10 +388,11 @@ func (e *Elector) keepLeadershipLoop(ctx context.Context) error { // Reelect timer expired; attempt reelection below. } - e.Logger.DebugContext(ctx, e.Name+": Current leader attempting reelect", "client_id", e.config.ClientID) + e.Logger.InfoContext(ctx, e.Name+": Current leader received forced resignation", "client_id", e.config.ClientID, "domain", e.config.Domain) reelected, err := attemptElectOrReelect(ctx, e.exec, true, &riverdriver.LeaderElectParams{ LeaderID: e.config.ClientID, + Name: e.config.Domain, Now: e.Time.NowUTCOrNil(), Schema: e.config.Schema, TTL: e.leaderTTL(), @@ -424,7 +430,7 @@ func (e *Elector) keepLeadershipLoop(ctx context.Context) error { // always surrendered in a timely manner so it can be picked up quickly by // another client, even in the event of a cancellation. func (e *Elector) attemptResignLoop(ctx context.Context) { - e.Logger.DebugContext(ctx, e.Name+": Attempting to resign leadership", "client_id", e.config.ClientID) + e.Logger.InfoContext(ctx, e.Name+": Current leader received forced resignation", "client_id", e.config.ClientID, "domain", e.config.Domain) // Make a good faith attempt to resign, even in the presence of errors, but // don't keep hammering if it doesn't work. In case a resignation failure, @@ -469,7 +475,7 @@ func (e *Elector) attemptResign(ctx context.Context, attempt int) error { } if resigned { - e.Logger.DebugContext(ctx, e.Name+": Resigned leadership successfully", "client_id", e.config.ClientID) + e.Logger.DebugContext(ctx, e.Name+": Resigned leadership successfully", "client_id", e.config.ClientID, "domain", e.config.Domain) e.testSignals.ResignedLeadership.Signal(struct{}{}) } @@ -484,6 +490,7 @@ func (e *Elector) errorSlogArgs(err error, attempt int, sleepDuration time.Durat return []any{ slog.Int("attempt", attempt), slog.String("client_id", e.config.ClientID), + slog.String("domain", e.config.Domain), slog.String("err", err.Error()), slog.String("sleep_duration", sleepDuration.String()), } diff --git a/internal/leadership/elector_test.go b/internal/leadership/elector_test.go index 62aba114..c9b8210b 100644 --- a/internal/leadership/elector_test.go +++ b/internal/leadership/elector_test.go @@ -422,6 +422,7 @@ func TestAttemptElectOrReelect(t *testing.T) { elected, err := attemptElectOrReelect(ctx, bundle.exec, false, &riverdriver.LeaderElectParams{ LeaderID: clientID, + Name: DomainDefault, TTL: leaderTTL, Schema: "", }) @@ -451,6 +452,7 @@ func TestAttemptElectOrReelect(t *testing.T) { // the transaction. elected, err := attemptElectOrReelect(ctx, bundle.exec, true, &riverdriver.LeaderElectParams{ LeaderID: clientID, + Name: DomainDefault, TTL: 30 * time.Second, Schema: "", }) @@ -478,6 +480,7 @@ func TestAttemptElectOrReelect(t *testing.T) { elected, err := attemptElectOrReelect(ctx, bundle.exec, true, &riverdriver.LeaderElectParams{ LeaderID: "different-client-id", + Name: DomainDefault, TTL: leaderTTL, Schema: "", }) @@ -493,6 +496,46 @@ func TestAttemptElectOrReelect(t *testing.T) { require.NoError(t, err) require.Equal(t, leader.ExpiresAt, updatedLeader.ExpiresAt) }) + + t.Run("MultipleDomains", func(t *testing.T) { + t.Parallel() + + bundle := setup(t) + + elected, err := attemptElectOrReelect(ctx, bundle.exec, true, &riverdriver.LeaderElectParams{ + LeaderID: clientID, + Name: DomainDefault, + Schema: "", + }) + require.NoError(t, err) + require.True(t, elected) + + elected, err = attemptElectOrReelect(ctx, bundle.exec, true, &riverdriver.LeaderElectParams{ + LeaderID: clientID, + Name: "domain2", + Schema: "", + }) + require.NoError(t, err) + require.True(t, elected) + + // Second election on a domain where we have a leader fails. + elected, err = attemptElectOrReelect(ctx, bundle.exec, true, &riverdriver.LeaderElectParams{ + LeaderID: "other-client-id", + Name: DomainDefault, + Schema: "", + }) + require.NoError(t, err) + require.False(t, elected) + + // And same in the alternate domain. + elected, err = attemptElectOrReelect(ctx, bundle.exec, true, &riverdriver.LeaderElectParams{ + LeaderID: "other-client-id", + Name: "domain2", + Schema: "", + }) + require.NoError(t, err) + require.False(t, elected) + }) } func TestElectorHandleLeadershipNotification(t *testing.T) { diff --git a/internal/maintenance/job_cleaner.go b/internal/maintenance/job_cleaner.go index f3216561..7a4672c0 100644 --- a/internal/maintenance/job_cleaner.go +++ b/internal/maintenance/job_cleaner.go @@ -56,6 +56,10 @@ type JobCleanerConfig struct { // QueuesExcluded are queues that'll be excluded from cleaning. QueuesExcluded []string + // QueuesIncluded are queues that'll be included in cleaning. If set, only + // these queues will be cleaned. If nil, all queues are cleaned. + QueuesIncluded []string + // Schema where River tables are located. Empty string omits schema, causing // Postgres to default to `search_path`. Schema string @@ -79,6 +83,12 @@ func (c *JobCleanerConfig) mustValidate() *JobCleanerConfig { if c.Interval <= 0 { panic("JobCleanerConfig.Interval must be above zero") } + if c.QueuesExcluded != nil && len(c.QueuesExcluded) == 0 { + panic("JobCleanerConfig.QueuesExcluded should be either nil or a non-empty slice") + } + if c.QueuesIncluded != nil && len(c.QueuesIncluded) == 0 { + panic("JobCleanerConfig.QueuesIncluded should be either nil or a non-empty slice") + } if c.Timeout <= 0 { panic("JobCleanerConfig.Timeout must be above zero") } @@ -117,6 +127,7 @@ func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, e CompletedJobRetentionPeriod: cmp.Or(config.CompletedJobRetentionPeriod, riversharedmaintenance.CompletedJobRetentionPeriodDefault), DiscardedJobRetentionPeriod: cmp.Or(config.DiscardedJobRetentionPeriod, riversharedmaintenance.DiscardedJobRetentionPeriodDefault), QueuesExcluded: config.QueuesExcluded, + QueuesIncluded: config.QueuesIncluded, Interval: cmp.Or(config.Interval, riversharedmaintenance.JobCleanerIntervalDefault), Schema: config.Schema, Timeout: cmp.Or(config.Timeout, riversharedmaintenance.JobCleanerTimeoutDefault), @@ -205,6 +216,7 @@ func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, err DiscardedFinalizedAtHorizon: time.Now().Add(-s.Config.DiscardedJobRetentionPeriod), Max: s.batchSize(), QueuesExcluded: s.Config.QueuesExcluded, + QueuesIncluded: s.Config.QueuesIncluded, Schema: s.Config.Schema, }) if err != nil { diff --git a/internal/maintenance/job_cleaner_test.go b/internal/maintenance/job_cleaner_test.go index 96ba0e1b..d55db8fd 100644 --- a/internal/maintenance/job_cleaner_test.go +++ b/internal/maintenance/job_cleaner_test.go @@ -328,7 +328,7 @@ func TestJobCleaner(t *testing.T) { require.ErrorIs(t, err, rivertype.ErrNotFound) }) - t.Run("OmmittedQueues", func(t *testing.T) { + t.Run("QueuesExcluded", func(t *testing.T) { t.Parallel() cleaner, bundle := setup(t) @@ -338,24 +338,24 @@ func TestJobCleaner(t *testing.T) { completedJob = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour))}) discardedJob = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateDiscarded), FinalizedAt: ptrutil.Ptr(bundle.discardedDeleteHorizon.Add(-1 * time.Hour))}) - omittedQueue1 = "omitted1" - omittedQueue2 = "omitted1" + excludedQueue1 = "queue1" + excludedQueue2 = "queue2" - // Not deleted because in an omitted queue. - omittedQueueJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour)), Queue: &omittedQueue1, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) - omittedQueueJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour)), Queue: &omittedQueue2, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + // Not deleted because in an excluded queue. + excludedQueueJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour)), Queue: &excludedQueue1, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + excludedQueueJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour)), Queue: &excludedQueue2, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) ) - cleaner.Config.QueuesExcluded = []string{omittedQueue1, omittedQueue2} + cleaner.Config.QueuesExcluded = []string{excludedQueue1, excludedQueue2} require.NoError(t, cleaner.Start(ctx)) cleaner.TestSignals.DeletedBatch.WaitOrTimeout() var err error - _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: omittedQueueJob1.ID, Schema: cleaner.Config.Schema}) + _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: excludedQueueJob1.ID, Schema: cleaner.Config.Schema}) require.NoError(t, err) - _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: omittedQueueJob2.ID, Schema: cleaner.Config.Schema}) + _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: excludedQueueJob2.ID, Schema: cleaner.Config.Schema}) require.NoError(t, err) _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: cancelledJob.ID, Schema: cleaner.Config.Schema}) @@ -366,6 +366,41 @@ func TestJobCleaner(t *testing.T) { require.ErrorIs(t, err, rivertype.ErrNotFound) }) + t.Run("QueuesIncluded", func(t *testing.T) { + t.Parallel() + + cleaner, bundle := setup(t) + + var ( + // Not deleted because not in an included queue. + notIncludedJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(bundle.cancelledDeleteHorizon.Add(-1 * time.Hour))}) + notIncludedJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour))}) + + includedQueue1 = "queue1" + includedQueue2 = "queue2" + + includedQueueJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour)), Queue: &includedQueue1, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + includedQueueJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour)), Queue: &includedQueue2, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + ) + + cleaner.Config.QueuesIncluded = []string{includedQueue1, includedQueue2} + + require.NoError(t, cleaner.Start(ctx)) + + cleaner.TestSignals.DeletedBatch.WaitOrTimeout() + + var err error + _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: includedQueueJob1.ID, Schema: cleaner.Config.Schema}) + require.ErrorIs(t, err, rivertype.ErrNotFound) + _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: includedQueueJob2.ID, Schema: cleaner.Config.Schema}) + require.ErrorIs(t, err, rivertype.ErrNotFound) + + _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: notIncludedJob1.ID, Schema: cleaner.Config.Schema}) + require.NoError(t, err) + _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: notIncludedJob2.ID, Schema: cleaner.Config.Schema}) + require.NoError(t, err) + }) + t.Run("ReducedBatchSizeBreakerTrips", func(t *testing.T) { t.Parallel() diff --git a/internal/maintenance/job_rescuer.go b/internal/maintenance/job_rescuer.go index c7353b36..0547ea0a 100644 --- a/internal/maintenance/job_rescuer.go +++ b/internal/maintenance/job_rescuer.go @@ -50,6 +50,11 @@ type JobRescuerConfig struct { // Interval is the amount of time to wait between runs of the rescuer. Interval time.Duration + // QueuesIncluded are queues that'll be included when considering jobs to + // rescue. If set, only these queues will be rescued. If nil, jobs in all + // queues are rescued. + QueuesIncluded []string + // RescueAfter is the amount of time for a job to be active before it is // considered stuck and should be rescued. RescueAfter time.Duration @@ -70,6 +75,9 @@ func (c *JobRescuerConfig) mustValidate() *JobRescuerConfig { if c.Interval <= 0 { panic("RescuerConfig.Interval must be above zero") } + if c.QueuesIncluded != nil && len(c.QueuesIncluded) == 0 { + panic("JobCleanerConfig.QueuesIncluded should be either nil or a non-empty slice") + } if c.RescueAfter <= 0 { panic("RescuerConfig.JobDuration must be above zero") } @@ -109,6 +117,7 @@ func NewRescuer(archetype *baseservice.Archetype, config *JobRescuerConfig, exec BatchSizes: batchSizes, ClientRetryPolicy: config.ClientRetryPolicy, Interval: cmp.Or(config.Interval, JobRescuerIntervalDefault), + QueuesIncluded: config.QueuesIncluded, RescueAfter: cmp.Or(config.RescueAfter, JobRescuerRescueAfterDefault), Schema: config.Schema, WorkUnitFactoryFunc: config.WorkUnitFactoryFunc, @@ -280,9 +289,10 @@ func (s *JobRescuer) getStuckJobs(ctx context.Context) ([]*rivertype.JobRow, err stuckHorizon := time.Now().Add(-s.Config.RescueAfter) return s.exec.JobGetStuck(ctx, &riverdriver.JobGetStuckParams{ - Max: s.batchSize(), - Schema: s.Config.Schema, - StuckHorizon: stuckHorizon, + Max: s.batchSize(), + QueuesIncluded: s.Config.QueuesIncluded, + Schema: s.Config.Schema, + StuckHorizon: stuckHorizon, }) } diff --git a/internal/maintenance/job_rescuer_test.go b/internal/maintenance/job_rescuer_test.go index d1f44ffd..71300515 100644 --- a/internal/maintenance/job_rescuer_test.go +++ b/internal/maintenance/job_rescuer_test.go @@ -405,4 +405,42 @@ func TestJobRescuer(t *testing.T) { } } }) + + t.Run("QueuesIncluded", func(t *testing.T) { + t.Parallel() + + rescuer, bundle := setup(t) + + var ( + notIncludedJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)}) + notIncludedJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)}) + + includedQueue1 = "queue1" + includedQueue2 = "queue2" + + includedJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5), Queue: &includedQueue1}) + includedJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Minute)), MaxAttempts: ptrutil.Ptr(5), Queue: &includedQueue2}) + ) + + rescuer.Config.QueuesIncluded = []string{includedQueue1, includedQueue2} + + require.NoError(t, rescuer.Start(ctx)) + + rescuer.TestSignals.FetchedBatch.WaitOrTimeout() + rescuer.TestSignals.UpdatedBatch.WaitOrTimeout() + + includedJob1After, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: includedJob1.ID, Schema: rescuer.Config.Schema}) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRetryable, includedJob1After.State) + includedJob2After, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: includedJob2.ID, Schema: rescuer.Config.Schema}) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRetryable, includedJob2After.State) + + notIncludedJob1After, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: notIncludedJob1.ID, Schema: rescuer.Config.Schema}) + require.NoError(t, err) + require.Equal(t, notIncludedJob1.State, notIncludedJob1After.State) // not rescued + notIncludedJob2After, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: notIncludedJob2.ID, Schema: rescuer.Config.Schema}) + require.NoError(t, err) + require.Equal(t, notIncludedJob2.State, notIncludedJob2After.State) // not rescued + }) } diff --git a/internal/maintenance/job_scheduler.go b/internal/maintenance/job_scheduler.go index 00adcee4..2f00453d 100644 --- a/internal/maintenance/job_scheduler.go +++ b/internal/maintenance/job_scheduler.go @@ -51,6 +51,10 @@ type JobSchedulerConfig struct { // where jobs were scheduled. NotifyInsert NotifyInsertFunc + // QueuesIncluded are queues that'll be included in scheduling. If set, + // only these queues will be scheduled. If nil, all queues are scheduled. + QueuesIncluded []string + // Schema where River tables are located. Empty string omits schema, causing // Postgres to default to `search_path`. Schema string @@ -59,11 +63,14 @@ type JobSchedulerConfig struct { func (c *JobSchedulerConfig) mustValidate() *JobSchedulerConfig { c.MustValidate() + if c.Default <= 0 { + panic("SchedulerConfig.Limit must be above zero") + } if c.Interval <= 0 { panic("SchedulerConfig.Interval must be above zero") } - if c.Default <= 0 { - panic("SchedulerConfig.Limit must be above zero") + if c.QueuesIncluded != nil && len(c.QueuesIncluded) == 0 { + panic("JobSchedulerConfig.QueuesIncluded should be either nil or a non-empty slice") } return c @@ -77,10 +84,10 @@ type JobScheduler struct { startstop.BaseStartStop // exported for test purposes + Config *JobSchedulerConfig TestSignals JobSchedulerTestSignals - config *JobSchedulerConfig - exec riverdriver.Executor + exec riverdriver.Executor // Circuit breaker that tracks consecutive timeout failures from the central // query. The query starts by using the full/default batch size, but after @@ -95,11 +102,12 @@ func NewJobScheduler(archetype *baseservice.Archetype, config *JobSchedulerConfi batchSizes := config.WithDefaults() return baseservice.Init(archetype, &JobScheduler{ - config: (&JobSchedulerConfig{ - BatchSizes: batchSizes, - Interval: cmp.Or(config.Interval, JobSchedulerIntervalDefault), - NotifyInsert: config.NotifyInsert, - Schema: config.Schema, + Config: (&JobSchedulerConfig{ + BatchSizes: batchSizes, + Interval: cmp.Or(config.Interval, JobSchedulerIntervalDefault), + NotifyInsert: config.NotifyInsert, + QueuesIncluded: config.QueuesIncluded, + Schema: config.Schema, }).mustValidate(), exec: exec, reducedBatchSizeBreaker: riversharedmaintenance.ReducedBatchSizeBreaker(batchSizes), @@ -121,7 +129,7 @@ func (s *JobScheduler) Start(ctx context.Context) error { //nolint:dupl s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStarted) defer s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStopped) - ticker := timeutil.NewTickerWithInitialTick(ctx, s.config.Interval) + ticker := timeutil.NewTickerWithInitialTick(ctx, s.Config.Interval) for { select { case <-ctx.Done(): @@ -150,9 +158,9 @@ func (s *JobScheduler) Start(ctx context.Context) error { //nolint:dupl func (s *JobScheduler) batchSize() int { if s.reducedBatchSizeBreaker.Open() { - return s.config.Reduced + return s.Config.Reduced } - return s.config.Default + return s.Config.Default } type schedulerRunOnceResult struct { @@ -175,12 +183,13 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er defer dbutil.RollbackWithoutCancel(ctx, execTx) now := s.Time.NowUTC() - nowWithLookAhead := now.Add(s.config.Interval) + nowWithLookAhead := now.Add(s.Config.Interval) scheduledJobResults, err := execTx.JobSchedule(ctx, &riverdriver.JobScheduleParams{ - Max: s.batchSize(), - Now: &nowWithLookAhead, - Schema: s.config.Schema, + Max: s.batchSize(), + Now: &nowWithLookAhead, + QueuesIncluded: s.Config.QueuesIncluded, + Schema: s.Config.Schema, }) if err != nil { return 0, fmt.Errorf("error scheduling jobs: %w", err) @@ -205,7 +214,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er } if len(queues) > 0 { - if err := s.config.NotifyInsert(ctx, execTx, queues); err != nil { + if err := s.Config.NotifyInsert(ctx, execTx, queues); err != nil { return 0, fmt.Errorf("error notifying insert: %w", err) } s.TestSignals.NotifiedQueues.Signal(queues) diff --git a/internal/maintenance/job_scheduler_test.go b/internal/maintenance/job_scheduler_test.go index 25388042..0ff0713a 100644 --- a/internal/maintenance/job_scheduler_test.go +++ b/internal/maintenance/job_scheduler_test.go @@ -73,21 +73,21 @@ func TestJobScheduler(t *testing.T) { requireJobStateUnchanged := func(t *testing.T, scheduler *JobScheduler, exec riverdriver.Executor, job *rivertype.JobRow) *rivertype.JobRow { t.Helper() - newJob, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: scheduler.config.Schema}) + newJob, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: scheduler.Config.Schema}) require.NoError(t, err) require.Equal(t, job.State, newJob.State) return newJob } requireJobStateAvailable := func(t *testing.T, scheduler *JobScheduler, exec riverdriver.Executor, job *rivertype.JobRow) *rivertype.JobRow { t.Helper() - newJob, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: scheduler.config.Schema}) + newJob, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: scheduler.Config.Schema}) require.NoError(t, err) require.Equal(t, rivertype.JobStateAvailable, newJob.State) return newJob } requireJobStateDiscardedWithMeta := func(t *testing.T, scheduler *JobScheduler, exec riverdriver.Executor, job *rivertype.JobRow) *rivertype.JobRow { t.Helper() - newJob, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: scheduler.config.Schema}) + newJob, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: scheduler.Config.Schema}) require.NoError(t, err) require.Equal(t, rivertype.JobStateDiscarded, newJob.State) require.NotNil(t, newJob.FinalizedAt) @@ -100,9 +100,9 @@ func TestJobScheduler(t *testing.T) { scheduler := NewJobScheduler(riversharedtest.BaseServiceArchetype(t), &JobSchedulerConfig{}, nil) - require.Equal(t, JobSchedulerIntervalDefault, scheduler.config.Interval) - require.Equal(t, riversharedmaintenance.BatchSizeDefault, scheduler.config.Default) - require.Equal(t, riversharedmaintenance.BatchSizeReduced, scheduler.config.Reduced) + require.Equal(t, JobSchedulerIntervalDefault, scheduler.Config.Interval) + require.Equal(t, riversharedmaintenance.BatchSizeDefault, scheduler.Config.Default) + require.Equal(t, riversharedmaintenance.BatchSizeReduced, scheduler.Config.Reduced) }) t.Run("StartStopStress", func(t *testing.T) { @@ -130,7 +130,7 @@ func TestJobScheduler(t *testing.T) { scheduledJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(-1 * time.Hour))}) scheduledJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))}) - scheduledJob3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(scheduler.config.Interval - time.Millisecond))}) // won't be scheduled + scheduledJob3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(scheduler.Config.Interval - time.Millisecond))}) // won't be scheduled scheduledJob4 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(30 * time.Second))}) // won't be scheduled retryableJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRetryable), ScheduledAt: ptrutil.Ptr(now.Add(-1 * time.Hour))}) @@ -205,13 +205,13 @@ func TestJobScheduler(t *testing.T) { t.Parallel() scheduler, bundle := setupTx(t) - scheduler.config.Default = 10 // reduced size for test speed + scheduler.Config.Default = 10 // reduced size for test speed now := time.Now().UTC() // Add one to our chosen batch size to get one extra job and therefore // one extra batch, ensuring that we've tested working multiple. - numJobs := scheduler.config.Default + 1 + numJobs := scheduler.Config.Default + 1 jobs := make([]*rivertype.JobRow, numJobs) @@ -243,7 +243,7 @@ func TestJobScheduler(t *testing.T) { t.Parallel() scheduler, _ := setupTx(t) - scheduler.config.Interval = 1 * time.Microsecond + scheduler.Config.Interval = 1 * time.Microsecond require.NoError(t, scheduler.Start(ctx)) @@ -258,7 +258,7 @@ func TestJobScheduler(t *testing.T) { t.Parallel() scheduler, _ := setupTx(t) - scheduler.config.Interval = time.Minute // should only trigger once for the initial run + scheduler.Config.Interval = time.Minute // should only trigger once for the initial run require.NoError(t, scheduler.Start(ctx)) scheduler.Stop() @@ -268,7 +268,7 @@ func TestJobScheduler(t *testing.T) { t.Parallel() scheduler, _ := setupTx(t) - scheduler.config.Interval = time.Minute // should only trigger once for the initial run + scheduler.Config.Interval = time.Minute // should only trigger once for the initial run ctx, cancelFunc := context.WithCancel(ctx) @@ -287,7 +287,7 @@ func TestJobScheduler(t *testing.T) { t.Parallel() scheduler, bundle := setupTx(t) - scheduler.config.Interval = time.Minute // should only trigger once for the initial run + scheduler.Config.Interval = time.Minute // should only trigger once for the initial run now := time.Now().UTC() job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(-1 * time.Hour))}) @@ -320,8 +320,8 @@ func TestJobScheduler(t *testing.T) { notifyCh := make(chan []string, 10) scheduler, _ := setup(t, &testOpts{exec: exec, schema: schema}) - scheduler.config.Interval = time.Minute // should only trigger once for the initial run - scheduler.config.NotifyInsert = func(ctx context.Context, tx riverdriver.ExecutorTx, queues []string) error { + scheduler.Config.Interval = time.Minute // should only trigger once for the initial run + scheduler.Config.NotifyInsert = func(ctx context.Context, tx riverdriver.ExecutorTx, queues []string) error { notifyCh <- queues return nil } @@ -357,7 +357,7 @@ func TestJobScheduler(t *testing.T) { addJob("other_status_queue", time.Minute, rivertype.JobStateCancelled) // it's cancelled // This one is scheduled in the future, just barely before the next run, so it should // be scheduled but shouldn't trigger a notification: - addJob("queue5", scheduler.config.Interval-time.Millisecond, rivertype.JobStateRetryable) + addJob("queue5", scheduler.Config.Interval-time.Millisecond, rivertype.JobStateRetryable) // Run the scheduler and wait for it to execute once: require.NoError(t, scheduler.Start(ctx)) @@ -439,4 +439,35 @@ func TestJobScheduler(t *testing.T) { } } }) + + t.Run("QueuesIncluded", func(t *testing.T) { + t.Parallel() + + scheduler, bundle := setupTx(t) + + var ( + now = time.Now().UTC() + + notIncludedJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(-1 * time.Hour))}) + notIncludedJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))}) + + includedQueue1 = "queue1" + includedQueue2 = "queue2" + + includedJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: &includedQueue1, State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(-1 * time.Hour))}) + includedJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: &includedQueue2, State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))}) + ) + + scheduler.Config.QueuesIncluded = []string{includedQueue1, includedQueue2} + + require.NoError(t, scheduler.Start(ctx)) + + scheduler.TestSignals.ScheduledBatch.WaitOrTimeout() + + requireJobStateUnchanged(t, scheduler, bundle.exec, notIncludedJob1) + requireJobStateUnchanged(t, scheduler, bundle.exec, notIncludedJob2) + + requireJobStateAvailable(t, scheduler, bundle.exec, includedJob1) + requireJobStateAvailable(t, scheduler, bundle.exec, includedJob2) + }) } diff --git a/internal/maintenance/queue_cleaner.go b/internal/maintenance/queue_cleaner.go index f64e97db..2abb328c 100644 --- a/internal/maintenance/queue_cleaner.go +++ b/internal/maintenance/queue_cleaner.go @@ -41,6 +41,10 @@ type QueueCleanerConfig struct { // Interval is the amount of time to wait between runs of the cleaner. Interval time.Duration + // QueuesIncluded are queues that'll be included in cleaning. If set, only + // these queues will be cleaned. If nil, all queues are cleaned. + QueuesIncluded []string + // RetentionPeriod is the amount of time to keep queues around before they're // removed. RetentionPeriod time.Duration @@ -56,6 +60,9 @@ func (c *QueueCleanerConfig) mustValidate() *QueueCleanerConfig { if c.Interval <= 0 { panic("QueueCleanerConfig.Interval must be above zero") } + if c.QueuesIncluded != nil && len(c.QueuesIncluded) == 0 { + panic("QueueCleanerConfig.QueuesIncluded should be either nil or a non-empty slice") + } if c.RetentionPeriod <= 0 { panic("QueueCleanerConfig.RetentionPeriod must be above zero") } @@ -91,6 +98,7 @@ func NewQueueCleaner(archetype *baseservice.Archetype, config *QueueCleanerConfi Config: (&QueueCleanerConfig{ BatchSizes: batchSizes, Interval: cmp.Or(config.Interval, queueCleanerIntervalDefault), + QueuesIncluded: config.QueuesIncluded, RetentionPeriod: cmp.Or(config.RetentionPeriod, QueueRetentionPeriodDefault), Schema: config.Schema, }).mustValidate(), @@ -163,6 +171,7 @@ func (s *QueueCleaner) runOnce(ctx context.Context) (*queueCleanerRunOnceResult, queuesDeleted, err := s.exec.QueueDeleteExpired(ctx, &riverdriver.QueueDeleteExpiredParams{ Max: s.batchSize(), + QueuesIncluded: s.Config.QueuesIncluded, Schema: s.Config.Schema, UpdatedAtHorizon: time.Now().Add(-s.Config.RetentionPeriod), }) diff --git a/internal/maintenance/queue_cleaner_test.go b/internal/maintenance/queue_cleaner_test.go index 06058e95..6e2b9f0e 100644 --- a/internal/maintenance/queue_cleaner_test.go +++ b/internal/maintenance/queue_cleaner_test.go @@ -106,12 +106,12 @@ func TestQueueCleaner(t *testing.T) { Name: queue3.Name, Schema: cleaner.Config.Schema, }) - require.ErrorIs(t, err, rivertype.ErrNotFound) // still there + require.ErrorIs(t, err, rivertype.ErrNotFound) _, err = bundle.exec.QueueGet(ctx, &riverdriver.QueueGetParams{ Name: queue4.Name, Schema: cleaner.Config.Schema, }) - require.ErrorIs(t, err, rivertype.ErrNotFound) // still there + require.ErrorIs(t, err, rivertype.ErrNotFound) _, err = bundle.exec.QueueGet(ctx, &riverdriver.QueueGetParams{ Name: queue5.Name, Schema: cleaner.Config.Schema, @@ -302,4 +302,37 @@ func TestQueueCleaner(t *testing.T) { } } }) + + t.Run("QueuesIncluded", func(t *testing.T) { + t.Parallel() + + cleaner, bundle := setup(t) + + var ( + now = time.Now() + + notIncludedQueue1 = testfactory.Queue(ctx, t, bundle.exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-25 * time.Hour))}) + notIncludedQueue2 = testfactory.Queue(ctx, t, bundle.exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-26 * time.Hour))}) + + includedQueue1 = testfactory.Queue(ctx, t, bundle.exec, &testfactory.QueueOpts{Name: ptrutil.Ptr("included1"), UpdatedAt: ptrutil.Ptr(now.Add(-25 * time.Hour))}) + includedQueue2 = testfactory.Queue(ctx, t, bundle.exec, &testfactory.QueueOpts{Name: ptrutil.Ptr("included2"), UpdatedAt: ptrutil.Ptr(now.Add(-26 * time.Hour))}) + ) + + cleaner.Config.QueuesIncluded = []string{includedQueue1.Name, includedQueue2.Name} + + require.NoError(t, cleaner.Start(ctx)) + + cleaner.TestSignals.DeletedBatch.WaitOrTimeout() + + var err error + _, err = bundle.exec.QueueGet(ctx, &riverdriver.QueueGetParams{Name: notIncludedQueue1.Name, Schema: cleaner.Config.Schema}) + require.NoError(t, err) // still there + _, err = bundle.exec.QueueGet(ctx, &riverdriver.QueueGetParams{Name: notIncludedQueue2.Name, Schema: cleaner.Config.Schema}) + require.NoError(t, err) // still there + + _, err = bundle.exec.QueueGet(ctx, &riverdriver.QueueGetParams{Name: includedQueue1.Name, Schema: cleaner.Config.Schema}) + require.ErrorIs(t, err, rivertype.ErrNotFound) + _, err = bundle.exec.QueueGet(ctx, &riverdriver.QueueGetParams{Name: includedQueue2.Name, Schema: cleaner.Config.Schema}) + require.ErrorIs(t, err, rivertype.ErrNotFound) + }) } diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index aed92b7e..8209ed85 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -382,6 +382,7 @@ type JobDeleteBeforeParams struct { DiscardedDoDelete bool DiscardedFinalizedAtHorizon time.Time Max int + Queues []string QueuesExcluded []string QueuesIncluded []string Schema string @@ -421,9 +422,10 @@ type JobGetByKindManyParams struct { } type JobGetStuckParams struct { - Max int - Schema string - StuckHorizon time.Time + Max int + QueuesIncluded []string + Schema string + StuckHorizon time.Time } type JobInsertFastParams struct { @@ -514,9 +516,10 @@ type JobRetryParams struct { } type JobScheduleParams struct { - Max int - Now *time.Time - Schema string + Max int + Now *time.Time + QueuesIncluded []string + Schema string } type JobScheduleResult struct { @@ -688,6 +691,7 @@ type LeaderInsertParams struct { ElectedAt *time.Time ExpiresAt *time.Time LeaderID string + Name string Now *time.Time Schema string TTL time.Duration @@ -695,6 +699,7 @@ type LeaderInsertParams struct { type LeaderElectParams struct { LeaderID string + Name string Now *time.Time Schema string TTL time.Duration @@ -783,6 +788,7 @@ type QueueCreateOrSetUpdatedAtParams struct { type QueueDeleteExpiredParams struct { Max int + QueuesIncluded []string Schema string UpdatedAtHorizon time.Time } diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index 8cc6c9a2..36806dd7 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -610,17 +610,22 @@ SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finali FROM /* TEMPLATE: schema */river_job WHERE state = 'running' AND attempted_at < $1::timestamptz + AND ( + $2::text[] IS NULL + OR queue = any($2) + ) ORDER BY id -LIMIT $2 +LIMIT $3 ` type JobGetStuckParams struct { - StuckHorizon time.Time - Max int32 + StuckHorizon time.Time + QueuesIncluded []string + Max int32 } func (q *Queries) JobGetStuck(ctx context.Context, db DBTX, arg *JobGetStuckParams) ([]*RiverJob, error) { - rows, err := db.QueryContext(ctx, jobGetStuck, arg.StuckHorizon, arg.Max) + rows, err := db.QueryContext(ctx, jobGetStuck, arg.StuckHorizon, pq.Array(arg.QueuesIncluded), arg.Max) if err != nil { return nil, err } @@ -1336,12 +1341,16 @@ WITH jobs_to_schedule AS ( state IN ('retryable', 'scheduled') AND priority >= 0 AND queue IS NOT NULL - AND scheduled_at <= coalesce($1::timestamptz, now()) + AND ( + $1::text[] IS NULL + OR queue = any($1) + ) + AND scheduled_at <= coalesce($2::timestamptz, now()) ORDER BY priority, scheduled_at, id - LIMIT $2::bigint + LIMIT $3::bigint FOR UPDATE ), jobs_with_rownum AS ( @@ -1388,7 +1397,7 @@ updated_jobs AS ( UPDATE /* TEMPLATE: schema */river_job SET state = job_updates.new_state, - finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN coalesce($1::timestamptz, now()) + finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN coalesce($2::timestamptz, now()) ELSE river_job.finalized_at END, metadata = CASE WHEN job_updates.metadata_do_update THEN river_job.metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb ELSE river_job.metadata END @@ -1406,8 +1415,9 @@ JOIN updated_jobs ON river_job.id = updated_jobs.id ` type JobScheduleParams struct { - Now *time.Time - Max int64 + QueuesIncluded []string + Now *time.Time + Max int64 } type JobScheduleRow struct { @@ -1416,7 +1426,7 @@ type JobScheduleRow struct { } func (q *Queries) JobSchedule(ctx context.Context, db DBTX, arg *JobScheduleParams) ([]*JobScheduleRow, error) { - rows, err := db.QueryContext(ctx, jobSchedule, arg.Now, arg.Max) + rows, err := db.QueryContext(ctx, jobSchedule, pq.Array(arg.QueuesIncluded), arg.Now, arg.Max) if err != nil { return nil, err } diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go index 0559f59e..02c3cb5c 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go @@ -15,12 +15,14 @@ const leaderAttemptElect = `-- name: LeaderAttemptElect :execrows INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( $1, coalesce($2::timestamptz, now()), -- @ttl is inserted as as seconds rather than a duration because ` + "`" + `lib/pq` + "`" + ` doesn't support the latter - coalesce($2::timestamptz, now()) + make_interval(secs => $3) + coalesce($2::timestamptz, now()) + make_interval(secs => $3), + $4 ) ON CONFLICT (name) DO NOTHING @@ -30,10 +32,16 @@ type LeaderAttemptElectParams struct { LeaderID string Now *time.Time TTL float64 + Name string } func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (int64, error) { - result, err := db.ExecContext(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) + result, err := db.ExecContext(ctx, leaderAttemptElect, + arg.LeaderID, + arg.Now, + arg.TTL, + arg.Name, + ) if err != nil { return 0, err } @@ -44,11 +52,13 @@ const leaderAttemptReelect = `-- name: LeaderAttemptReelect :execrows INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( $1, coalesce($2::timestamptz, now()), - coalesce($2::timestamptz, now()) + make_interval(secs => $3) + coalesce($2::timestamptz, now()) + make_interval(secs => $3), + $4 ) ON CONFLICT (name) DO UPDATE SET @@ -61,10 +71,16 @@ type LeaderAttemptReelectParams struct { LeaderID string Now *time.Time TTL float64 + Name string } func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (int64, error) { - result, err := db.ExecContext(ctx, leaderAttemptReelect, arg.LeaderID, arg.Now, arg.TTL) + result, err := db.ExecContext(ctx, leaderAttemptReelect, + arg.LeaderID, + arg.Now, + arg.TTL, + arg.Name, + ) if err != nil { return 0, err } @@ -105,11 +121,13 @@ const leaderInsert = `-- name: LeaderInsert :one INSERT INTO /* TEMPLATE: schema */river_leader( elected_at, expires_at, - leader_id + leader_id, + name ) VALUES ( coalesce($1::timestamptz, coalesce($2::timestamptz, now())), coalesce($3::timestamptz, coalesce($2::timestamptz, now()) + make_interval(secs => $4)), - $5 + $5, + $6 ) RETURNING elected_at, expires_at, leader_id, name ` @@ -119,6 +137,7 @@ type LeaderInsertParams struct { ExpiresAt *time.Time TTL float64 LeaderID string + Name string } func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertParams) (*RiverLeader, error) { @@ -128,6 +147,7 @@ func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertPa arg.ExpiresAt, arg.TTL, arg.LeaderID, + arg.Name, ) var i RiverLeader err := row.Scan( diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_queue.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_queue.sql.go index bd432f9c..76e1f8d6 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_queue.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_queue.sql.go @@ -59,24 +59,36 @@ func (q *Queries) QueueCreateOrSetUpdatedAt(ctx context.Context, db DBTX, arg *Q } const queueDeleteExpired = `-- name: QueueDeleteExpired :many -DELETE FROM /* TEMPLATE: schema */river_queue -WHERE name IN ( - SELECT name - FROM /* TEMPLATE: schema */river_queue - WHERE river_queue.updated_at < $1 - ORDER BY name ASC - LIMIT $2::bigint +WITH deleted_queues AS ( + DELETE FROM /* TEMPLATE: schema */river_queue + WHERE name IN ( + SELECT name + FROM /* TEMPLATE: schema */river_queue + WHERE river_queue.updated_at < $1 + AND ( + $2::text[] IS NULL + OR name = any($2) + ) + ORDER BY name ASC + LIMIT $3::bigint + ) + RETURNING name, created_at, metadata, paused_at, updated_at ) -RETURNING name, created_at, metadata, paused_at, updated_at +SELECT name, created_at, metadata, paused_at, updated_at +FROM /* TEMPLATE: schema */river_queue +WHERE name IN (SELECT name FROM deleted_queues) +ORDER BY name ASC ` type QueueDeleteExpiredParams struct { UpdatedAtHorizon time.Time + QueuesIncluded []string Max int64 } +// Uses a CTE only to guarantee return order. func (q *Queries) QueueDeleteExpired(ctx context.Context, db DBTX, arg *QueueDeleteExpiredParams) ([]*RiverQueue, error) { - rows, err := db.QueryContext(ctx, queueDeleteExpired, arg.UpdatedAtHorizon, arg.Max) + rows, err := db.QueryContext(ctx, queueDeleteExpired, arg.UpdatedAtHorizon, pq.Array(arg.QueuesIncluded), arg.Max) if err != nil { return nil, err } diff --git a/riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.down.sql b/riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.down.sql new file mode 100644 index 00000000..70d21044 --- /dev/null +++ b/riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.down.sql @@ -0,0 +1,3 @@ +ALTER TABLE /* TEMPLATE: schema */river_leader + DROP CONSTRAINT name_length, + ADD CONSTRAINT name_length CHECK (name = 'default'); \ No newline at end of file diff --git a/riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.up.sql b/riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.up.sql new file mode 100644 index 00000000..b1721078 --- /dev/null +++ b/riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.up.sql @@ -0,0 +1,3 @@ +ALTER TABLE /* TEMPLATE: schema */river_leader + DROP CONSTRAINT name_length, + ADD CONSTRAINT name_length CHECK (char_length(name) > 0 AND char_length(name) < 128); \ No newline at end of file diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index 4ecf3a90..83192078 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -331,8 +331,9 @@ func (e *Executor) JobGetByKindMany(ctx context.Context, params *riverdriver.Job func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetStuckParams) ([]*rivertype.JobRow, error) { jobs, err := dbsqlc.New().JobGetStuck(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetStuckParams{ - Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec - StuckHorizon: params.StuckHorizon, + Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec + QueuesIncluded: params.QueuesIncluded, + StuckHorizon: params.StuckHorizon, }) if err != nil { return nil, interpretError(err) @@ -593,8 +594,9 @@ func (e *Executor) JobRetry(ctx context.Context, params *riverdriver.JobRetryPar func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobScheduleParams) ([]*riverdriver.JobScheduleResult, error) { scheduleResults, err := dbsqlc.New().JobSchedule(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobScheduleParams{ - Max: int64(params.Max), - Now: params.Now, + Max: int64(params.Max), + Now: params.Now, + QueuesIncluded: params.QueuesIncluded, }) if err != nil { return nil, interpretError(err) @@ -714,6 +716,7 @@ func (e *Executor) JobUpdateFull(ctx context.Context, params *riverdriver.JobUpd func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { numElectionsWon, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ LeaderID: params.LeaderID, + Name: params.Name, Now: params.Now, TTL: params.TTL.Seconds(), }) @@ -726,6 +729,7 @@ func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.L func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { numElectionsWon, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ LeaderID: params.LeaderID, + Name: params.Name, Now: params.Now, TTL: params.TTL.Seconds(), }) @@ -756,6 +760,7 @@ func (e *Executor) LeaderInsert(ctx context.Context, params *riverdriver.LeaderI ElectedAt: params.ElectedAt, ExpiresAt: params.ExpiresAt, LeaderID: params.LeaderID, + Name: params.Name, Now: params.Now, TTL: params.TTL.Seconds(), }) @@ -882,6 +887,7 @@ func (e *Executor) QueueCreateOrSetUpdatedAt(ctx context.Context, params *riverd func (e *Executor) QueueDeleteExpired(ctx context.Context, params *riverdriver.QueueDeleteExpiredParams) ([]string, error) { queues, err := dbsqlc.New().QueueDeleteExpired(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.QueueDeleteExpiredParams{ Max: int64(params.Max), + QueuesIncluded: params.QueuesIncluded, UpdatedAtHorizon: params.UpdatedAtHorizon, }) if err != nil { diff --git a/riverdriver/riverdrivertest/riverdrivertest.go b/riverdriver/riverdrivertest/riverdrivertest.go index 27aade13..f523c054 100644 --- a/riverdriver/riverdrivertest/riverdrivertest.go +++ b/riverdriver/riverdrivertest/riverdrivertest.go @@ -1040,25 +1040,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, t.Run("QueuesIncluded", func(t *testing.T) { t.Parallel() - exec, bundle := setup(ctx, t) - - // I ran into yet another huge sqlc SQLite bug in that when mixing - // normal parameters with a `sqlc.slice` the latter must appear at - // the very end because it'll produce unnamed placeholders (?) - // instead of positional placeholders (?1) like most parameters. The - // trick of putting it at the end works, but only if you have - // exactly one `sqlc.slice` needed. If you need multiple and they - // need to be interspersed with other parameters (like in the case - // of `queues_excluded` and `queues_included`), everything stops - // working real fast. I could have worked around this by breaking - // the SQLite version of this operation into two sqlc queries, but - // since we only expect to need `queues_excluded` on SQLite (and not - // `queues_included` for the foreseeable future), I've just set - // SQLite to not support `queues_included` for the time being. - if bundle.driver.DatabaseName() == databaseNameSQLite { - t.Logf("Skipping JobDeleteBefore with QueuesIncluded test for SQLite") - return - } + exec, _ := setup(ctx, t) var ( //nolint:dupl cancelledJob = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{FinalizedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateCancelled)}) @@ -1557,40 +1539,76 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, t.Run("JobGetStuck", func(t *testing.T) { t.Parallel() - exec, _ := setup(ctx, t) + t.Run("Success", func(t *testing.T) { + t.Parallel() - var ( - horizon = time.Now().UTC() - beforeHorizon = horizon.Add(-1 * time.Minute) - afterHorizon = horizon.Add(1 * time.Minute) - ) + exec, _ := setup(ctx, t) + + var ( + horizon = time.Now().UTC() + beforeHorizon = horizon.Add(-1 * time.Minute) + afterHorizon = horizon.Add(1 * time.Minute) + ) - stuckJob1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) - stuckJob2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) + stuckJob1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) + stuckJob2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) - t.Logf("horizon = %s", horizon) - t.Logf("stuckJob1 = %s", stuckJob1.AttemptedAt) - t.Logf("stuckJob2 = %s", stuckJob2.AttemptedAt) + t.Logf("horizon = %s", horizon) + t.Logf("stuckJob1 = %s", stuckJob1.AttemptedAt) + t.Logf("stuckJob2 = %s", stuckJob2.AttemptedAt) - t.Logf("stuckJob1 full = %s", spew.Sdump(stuckJob1)) + t.Logf("stuckJob1 full = %s", spew.Sdump(stuckJob1)) - // Not returned because we put a maximum of two. - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) + // Not returned because we put a maximum of two. + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) - // Not stuck because not in running state. - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + // Not stuck because not in running state. + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable)}) - // Not stuck because after queried horizon. - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) + // Not stuck because after queried horizon. + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) - // Max two stuck - stuckJobs, err := exec.JobGetStuck(ctx, &riverdriver.JobGetStuckParams{ - Max: 2, - StuckHorizon: horizon, + // Max two stuck + stuckJobs, err := exec.JobGetStuck(ctx, &riverdriver.JobGetStuckParams{ + Max: 2, + StuckHorizon: horizon, + }) + require.NoError(t, err) + require.Equal(t, []int64{stuckJob1.ID, stuckJob2.ID}, + sliceutil.Map(stuckJobs, func(j *rivertype.JobRow) int64 { return j.ID })) + }) + + t.Run("QueuesIncluded", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + var ( + horizon = time.Now().UTC() + beforeHorizon = horizon.Add(-1 * time.Minute) + ) + + var ( + includedQueue1 = "included1" + includedQueue2 = "included2" + + stuckJob1 = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, Queue: &includedQueue1, State: ptrutil.Ptr(rivertype.JobStateRunning)}) + stuckJob2 = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, Queue: &includedQueue2, State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + // Not included because not in an included queue. + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, Queue: ptrutil.Ptr("excluded1"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, Queue: ptrutil.Ptr("excluded2"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) + ) + + stuckJobs, err := exec.JobGetStuck(ctx, &riverdriver.JobGetStuckParams{ + Max: 2, + QueuesIncluded: []string{includedQueue1, includedQueue2}, + StuckHorizon: horizon, + }) + require.NoError(t, err) + require.Equal(t, []int64{stuckJob1.ID, stuckJob2.ID}, + sliceutil.Map(stuckJobs, func(j *rivertype.JobRow) int64 { return j.ID })) }) - require.NoError(t, err) - require.Equal(t, []int64{stuckJob1.ID, stuckJob2.ID}, - sliceutil.Map(stuckJobs, func(j *rivertype.JobRow) int64 { return j.ID })) }) t.Run("JobInsertFastMany", func(t *testing.T) { @@ -2909,6 +2927,37 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, require.Equal(t, rivertype.JobStateDiscarded, updatedJob2.State) require.Equal(t, "scheduler_discarded", gjson.GetBytes(updatedJob2.Metadata, "unique_key_conflict").String()) }) + + t.Run("QueuesIncluded", func(t *testing.T) { + exec, _ := setup(ctx, t) + + var ( + horizon = time.Now() + beforeHorizon = horizon.Add(-1 * time.Minute) + ) + + var ( + includedQueue1 = "included1" + includedQueue2 = "included2" + + job1 = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: &includedQueue1, ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRetryable)}) + job2 = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: &includedQueue2, ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)}) + + // Not scheduled because not in an included queue. + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("excluded1"), ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRetryable)}) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("excluded1"), ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)}) + ) + + // First two scheduled because of limit. + result, err := exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{ + Max: 5, + Now: &horizon, + QueuesIncluded: []string{includedQueue1, includedQueue2}, + }) + require.NoError(t, err) + require.Equal(t, []int64{job1.ID, job2.ID}, + sliceutil.Map(result, func(j *riverdriver.JobScheduleResult) int64 { return j.Job.ID })) + }) }) makeErrPayload := func(t *testing.T, now time.Time) []byte { @@ -3458,6 +3507,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, elected, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ LeaderID: clientID, + Name: "default", Now: &now, TTL: leaderTTL, }) @@ -3481,6 +3531,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, elected, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ LeaderID: "different-client-id", + Name: "default", TTL: leaderTTL, }) require.NoError(t, err) @@ -3501,6 +3552,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, elected, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ LeaderID: clientID, + Name: "default", TTL: leaderTTL, }) require.NoError(t, err) @@ -3525,6 +3577,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, elected, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderElectParams{ LeaderID: clientID, + Name: "default", Now: &now, TTL: leaderTTL, }) @@ -3551,6 +3604,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, // the transaction. elected, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderElectParams{ LeaderID: clientID, + Name: "default", TTL: 30 * time.Second, }) require.NoError(t, err) @@ -3574,6 +3628,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, elected, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderElectParams{ LeaderID: "different-client", + Name: "default", TTL: 30 * time.Second, }) require.NoError(t, err) @@ -3593,6 +3648,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, elected, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderElectParams{ LeaderID: clientID, + Name: "default", TTL: leaderTTL, }) require.NoError(t, err) @@ -3672,6 +3728,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, ElectedAt: &electedAt, ExpiresAt: &expiresAt, LeaderID: clientID, + Name: "default", TTL: leaderTTL, }) require.NoError(t, err) @@ -3687,6 +3744,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, leader, err := exec.LeaderInsert(ctx, &riverdriver.LeaderInsertParams{ LeaderID: clientID, + Name: "default", Now: &now, TTL: leaderTTL, }) @@ -4141,27 +4199,65 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, t.Run("QueueDeleteExpired", func(t *testing.T) { t.Parallel() - exec, _ := setup(ctx, t) + t.Run("Success", func(t *testing.T) { + t.Parallel() - now := time.Now() - _ = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now)}) - queue2 := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-25 * time.Hour))}) - queue3 := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-26 * time.Hour))}) - queue4 := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-48 * time.Hour))}) - _ = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-23 * time.Hour))}) + exec, _ := setup(ctx, t) - horizon := now.Add(-24 * time.Hour) - deletedQueueNames, err := exec.QueueDeleteExpired(ctx, &riverdriver.QueueDeleteExpiredParams{Max: 2, UpdatedAtHorizon: horizon}) - require.NoError(t, err) + var ( + now = time.Now() - // queue2 and queue3 should be deleted, with queue4 being skipped due to max of 2: - require.Equal(t, []string{queue2.Name, queue3.Name}, deletedQueueNames) + _ = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now)}) + queue2 = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-25 * time.Hour))}) + queue3 = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-26 * time.Hour))}) + queue4 = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-48 * time.Hour))}) + _ = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-23 * time.Hour))}) + ) - // Try again, make sure queue4 gets deleted this time: - deletedQueueNames, err = exec.QueueDeleteExpired(ctx, &riverdriver.QueueDeleteExpiredParams{Max: 2, UpdatedAtHorizon: horizon}) - require.NoError(t, err) + horizon := now.Add(-24 * time.Hour) + deletedQueueNames, err := exec.QueueDeleteExpired(ctx, &riverdriver.QueueDeleteExpiredParams{Max: 2, UpdatedAtHorizon: horizon}) + require.NoError(t, err) + + // queue2 and queue3 should be deleted, with queue4 being skipped due to max of 2: + require.Equal(t, []string{queue2.Name, queue3.Name}, deletedQueueNames) + + // Try again, make sure queue4 gets deleted this time: + deletedQueueNames, err = exec.QueueDeleteExpired(ctx, &riverdriver.QueueDeleteExpiredParams{ + Max: 2, + UpdatedAtHorizon: horizon, + }) + require.NoError(t, err) + + require.Equal(t, []string{queue4.Name}, deletedQueueNames) + }) + + t.Run("QueuesIncluded", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) - require.Equal(t, []string{queue4.Name}, deletedQueueNames) + var ( + horizon = time.Now().Add(-24 * time.Hour) + + includedQueue1 = "included1" + includedQueue2 = "included2" + + queue1 = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{Name: &includedQueue1, UpdatedAt: ptrutil.Ptr(horizon.Add(-1 * time.Minute))}) + queue2 = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{Name: &includedQueue2, UpdatedAt: ptrutil.Ptr(horizon.Add(-2 * time.Minute))}) + + // Not included because not in an included queue. + _ = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{Name: ptrutil.Ptr("excluded1"), UpdatedAt: ptrutil.Ptr(horizon.Add(-1 * time.Minute))}) + _ = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{Name: ptrutil.Ptr("excluded2"), UpdatedAt: ptrutil.Ptr(horizon.Add(-2 * time.Minute))}) + ) + + deletedQueueNames, err := exec.QueueDeleteExpired(ctx, &riverdriver.QueueDeleteExpiredParams{ + Max: 5, + QueuesIncluded: []string{includedQueue1, includedQueue2}, + UpdatedAtHorizon: horizon, + }) + require.NoError(t, err) + require.Equal(t, []string{queue1.Name, queue2.Name}, deletedQueueNames) + }) }) t.Run("QueueGet", func(t *testing.T) { diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index 95a2493e..cbefead3 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -259,6 +259,10 @@ SELECT * FROM /* TEMPLATE: schema */river_job WHERE state = 'running' AND attempted_at < @stuck_horizon::timestamptz + AND ( + @queues_included::text[] IS NULL + OR queue = any(@queues_included) + ) ORDER BY id LIMIT @max; @@ -548,6 +552,10 @@ WITH jobs_to_schedule AS ( state IN ('retryable', 'scheduled') AND priority >= 0 AND queue IS NOT NULL + AND ( + @queues_included::text[] IS NULL + OR queue = any(@queues_included) + ) AND scheduled_at <= coalesce(sqlc.narg('now')::timestamptz, now()) ORDER BY priority, diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 53159d08..9a9462f6 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -592,17 +592,22 @@ SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finali FROM /* TEMPLATE: schema */river_job WHERE state = 'running' AND attempted_at < $1::timestamptz + AND ( + $2::text[] IS NULL + OR queue = any($2) + ) ORDER BY id -LIMIT $2 +LIMIT $3 ` type JobGetStuckParams struct { - StuckHorizon time.Time - Max int32 + StuckHorizon time.Time + QueuesIncluded []string + Max int32 } func (q *Queries) JobGetStuck(ctx context.Context, db DBTX, arg *JobGetStuckParams) ([]*RiverJob, error) { - rows, err := db.Query(ctx, jobGetStuck, arg.StuckHorizon, arg.Max) + rows, err := db.Query(ctx, jobGetStuck, arg.StuckHorizon, arg.QueuesIncluded, arg.Max) if err != nil { return nil, err } @@ -1303,12 +1308,16 @@ WITH jobs_to_schedule AS ( state IN ('retryable', 'scheduled') AND priority >= 0 AND queue IS NOT NULL - AND scheduled_at <= coalesce($1::timestamptz, now()) + AND ( + $1::text[] IS NULL + OR queue = any($1) + ) + AND scheduled_at <= coalesce($2::timestamptz, now()) ORDER BY priority, scheduled_at, id - LIMIT $2::bigint + LIMIT $3::bigint FOR UPDATE ), jobs_with_rownum AS ( @@ -1355,7 +1364,7 @@ updated_jobs AS ( UPDATE /* TEMPLATE: schema */river_job SET state = job_updates.new_state, - finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN coalesce($1::timestamptz, now()) + finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN coalesce($2::timestamptz, now()) ELSE river_job.finalized_at END, metadata = CASE WHEN job_updates.metadata_do_update THEN river_job.metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb ELSE river_job.metadata END @@ -1373,8 +1382,9 @@ JOIN updated_jobs ON river_job.id = updated_jobs.id ` type JobScheduleParams struct { - Now *time.Time - Max int64 + QueuesIncluded []string + Now *time.Time + Max int64 } type JobScheduleRow struct { @@ -1383,7 +1393,7 @@ type JobScheduleRow struct { } func (q *Queries) JobSchedule(ctx context.Context, db DBTX, arg *JobScheduleParams) ([]*JobScheduleRow, error) { - rows, err := db.Query(ctx, jobSchedule, arg.Now, arg.Max) + rows, err := db.Query(ctx, jobSchedule, arg.QueuesIncluded, arg.Now, arg.Max) if err != nil { return nil, err } diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql index e2417d86..1955abe6 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql @@ -2,8 +2,11 @@ CREATE UNLOGGED TABLE river_leader( elected_at timestamptz NOT NULL, expires_at timestamptz NOT NULL, leader_id text NOT NULL, - name text PRIMARY KEY DEFAULT 'default' CHECK (name = 'default'), - CONSTRAINT name_length CHECK (name = 'default'), + + -- this would be more aptly called "domain", but left as is for a less + -- invasive migration change + name text PRIMARY KEY DEFAULT 'default' CHECK (char_length(name) > 0 AND char_length(name) < 128), + CONSTRAINT leader_id_length CHECK (char_length(leader_id) > 0 AND char_length(leader_id) < 128) ); @@ -11,12 +14,14 @@ CREATE UNLOGGED TABLE river_leader( INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( @leader_id, coalesce(sqlc.narg('now')::timestamptz, now()), -- @ttl is inserted as as seconds rather than a duration because `lib/pq` doesn't support the latter - coalesce(sqlc.narg('now')::timestamptz, now()) + make_interval(secs => @ttl) + coalesce(sqlc.narg('now')::timestamptz, now()) + make_interval(secs => @ttl), + @name ) ON CONFLICT (name) DO NOTHING; @@ -25,11 +30,13 @@ ON CONFLICT (name) INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( @leader_id, coalesce(sqlc.narg('now')::timestamptz, now()), - coalesce(sqlc.narg('now')::timestamptz, now()) + make_interval(secs => @ttl) + coalesce(sqlc.narg('now')::timestamptz, now()) + make_interval(secs => @ttl), + @name ) ON CONFLICT (name) DO UPDATE SET @@ -49,11 +56,13 @@ FROM /* TEMPLATE: schema */river_leader; INSERT INTO /* TEMPLATE: schema */river_leader( elected_at, expires_at, - leader_id + leader_id, + name ) VALUES ( coalesce(sqlc.narg('elected_at')::timestamptz, coalesce(sqlc.narg('now')::timestamptz, now())), coalesce(sqlc.narg('expires_at')::timestamptz, coalesce(sqlc.narg('now')::timestamptz, now()) + make_interval(secs => @ttl)), - @leader_id + @leader_id, + @name ) RETURNING *; -- name: LeaderResign :execrows diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go index 156807d3..c239177b 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go @@ -16,12 +16,14 @@ const leaderAttemptElect = `-- name: LeaderAttemptElect :execrows INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( $1, coalesce($2::timestamptz, now()), -- @ttl is inserted as as seconds rather than a duration because ` + "`" + `lib/pq` + "`" + ` doesn't support the latter - coalesce($2::timestamptz, now()) + make_interval(secs => $3) + coalesce($2::timestamptz, now()) + make_interval(secs => $3), + $4 ) ON CONFLICT (name) DO NOTHING @@ -31,10 +33,16 @@ type LeaderAttemptElectParams struct { LeaderID string Now *time.Time TTL float64 + Name string } func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (int64, error) { - result, err := db.Exec(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) + result, err := db.Exec(ctx, leaderAttemptElect, + arg.LeaderID, + arg.Now, + arg.TTL, + arg.Name, + ) if err != nil { return 0, err } @@ -45,11 +53,13 @@ const leaderAttemptReelect = `-- name: LeaderAttemptReelect :execrows INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( $1, coalesce($2::timestamptz, now()), - coalesce($2::timestamptz, now()) + make_interval(secs => $3) + coalesce($2::timestamptz, now()) + make_interval(secs => $3), + $4 ) ON CONFLICT (name) DO UPDATE SET @@ -62,10 +72,16 @@ type LeaderAttemptReelectParams struct { LeaderID string Now *time.Time TTL float64 + Name string } func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (int64, error) { - result, err := db.Exec(ctx, leaderAttemptReelect, arg.LeaderID, arg.Now, arg.TTL) + result, err := db.Exec(ctx, leaderAttemptReelect, + arg.LeaderID, + arg.Now, + arg.TTL, + arg.Name, + ) if err != nil { return 0, err } @@ -106,11 +122,13 @@ const leaderInsert = `-- name: LeaderInsert :one INSERT INTO /* TEMPLATE: schema */river_leader( elected_at, expires_at, - leader_id + leader_id, + name ) VALUES ( coalesce($1::timestamptz, coalesce($2::timestamptz, now())), coalesce($3::timestamptz, coalesce($2::timestamptz, now()) + make_interval(secs => $4)), - $5 + $5, + $6 ) RETURNING elected_at, expires_at, leader_id, name ` @@ -120,6 +138,7 @@ type LeaderInsertParams struct { ExpiresAt *time.Time TTL float64 LeaderID string + Name string } func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertParams) (*RiverLeader, error) { @@ -129,6 +148,7 @@ func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertPa arg.ExpiresAt, arg.TTL, arg.LeaderID, + arg.Name, ) var i RiverLeader err := row.Scan( diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql index 6a114720..f810a676 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql @@ -25,15 +25,26 @@ SET RETURNING *; -- name: QueueDeleteExpired :many -DELETE FROM /* TEMPLATE: schema */river_queue -WHERE name IN ( - SELECT name - FROM /* TEMPLATE: schema */river_queue - WHERE river_queue.updated_at < @updated_at_horizon - ORDER BY name ASC - LIMIT @max::bigint +WITH deleted_queues AS ( + DELETE FROM /* TEMPLATE: schema */river_queue + WHERE name IN ( + SELECT name + FROM /* TEMPLATE: schema */river_queue + WHERE river_queue.updated_at < @updated_at_horizon + AND ( + @queues_included::text[] IS NULL + OR name = any(@queues_included) + ) + ORDER BY name ASC + LIMIT @max::bigint + ) + RETURNING * ) -RETURNING *; +-- Uses a CTE only to guarantee return order. +SELECT * +FROM /* TEMPLATE: schema */river_queue +WHERE name IN (SELECT name FROM deleted_queues) +ORDER BY name ASC; -- name: QueueGet :one SELECT * diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go index 18525033..f208f2aa 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go @@ -57,24 +57,36 @@ func (q *Queries) QueueCreateOrSetUpdatedAt(ctx context.Context, db DBTX, arg *Q } const queueDeleteExpired = `-- name: QueueDeleteExpired :many -DELETE FROM /* TEMPLATE: schema */river_queue -WHERE name IN ( - SELECT name - FROM /* TEMPLATE: schema */river_queue - WHERE river_queue.updated_at < $1 - ORDER BY name ASC - LIMIT $2::bigint +WITH deleted_queues AS ( + DELETE FROM /* TEMPLATE: schema */river_queue + WHERE name IN ( + SELECT name + FROM /* TEMPLATE: schema */river_queue + WHERE river_queue.updated_at < $1 + AND ( + $2::text[] IS NULL + OR name = any($2) + ) + ORDER BY name ASC + LIMIT $3::bigint + ) + RETURNING name, created_at, metadata, paused_at, updated_at ) -RETURNING name, created_at, metadata, paused_at, updated_at +SELECT name, created_at, metadata, paused_at, updated_at +FROM /* TEMPLATE: schema */river_queue +WHERE name IN (SELECT name FROM deleted_queues) +ORDER BY name ASC ` type QueueDeleteExpiredParams struct { UpdatedAtHorizon time.Time + QueuesIncluded []string Max int64 } +// Uses a CTE only to guarantee return order. func (q *Queries) QueueDeleteExpired(ctx context.Context, db DBTX, arg *QueueDeleteExpiredParams) ([]*RiverQueue, error) { - rows, err := db.Query(ctx, queueDeleteExpired, arg.UpdatedAtHorizon, arg.Max) + rows, err := db.Query(ctx, queueDeleteExpired, arg.UpdatedAtHorizon, arg.QueuesIncluded, arg.Max) if err != nil { return nil, err } diff --git a/riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.down.sql b/riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.down.sql new file mode 100644 index 00000000..70d21044 --- /dev/null +++ b/riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.down.sql @@ -0,0 +1,3 @@ +ALTER TABLE /* TEMPLATE: schema */river_leader + DROP CONSTRAINT name_length, + ADD CONSTRAINT name_length CHECK (name = 'default'); \ No newline at end of file diff --git a/riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.up.sql b/riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.up.sql new file mode 100644 index 00000000..b1721078 --- /dev/null +++ b/riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.up.sql @@ -0,0 +1,3 @@ +ALTER TABLE /* TEMPLATE: schema */river_leader + DROP CONSTRAINT name_length, + ADD CONSTRAINT name_length CHECK (char_length(name) > 0 AND char_length(name) < 128); \ No newline at end of file diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index c9650d6a..96a2744a 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -335,8 +335,9 @@ func (e *Executor) JobGetByKindMany(ctx context.Context, params *riverdriver.Job func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetStuckParams) ([]*rivertype.JobRow, error) { jobs, err := dbsqlc.New().JobGetStuck(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetStuckParams{ - Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec - StuckHorizon: params.StuckHorizon, + Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec + QueuesIncluded: params.QueuesIncluded, + StuckHorizon: params.StuckHorizon, }) if err != nil { return nil, interpretError(err) @@ -588,8 +589,9 @@ func (e *Executor) JobRetry(ctx context.Context, params *riverdriver.JobRetryPar func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobScheduleParams) ([]*riverdriver.JobScheduleResult, error) { scheduleResults, err := dbsqlc.New().JobSchedule(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobScheduleParams{ - Max: int64(params.Max), - Now: params.Now, + Max: int64(params.Max), + Now: params.Now, + QueuesIncluded: params.QueuesIncluded, }) if err != nil { return nil, interpretError(err) @@ -703,6 +705,7 @@ func (e *Executor) JobUpdateFull(ctx context.Context, params *riverdriver.JobUpd func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { numElectionsWon, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ LeaderID: params.LeaderID, + Name: params.Name, Now: params.Now, TTL: params.TTL.Seconds(), }) @@ -715,6 +718,7 @@ func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.L func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { numElectionsWon, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ LeaderID: params.LeaderID, + Name: params.Name, Now: params.Now, TTL: params.TTL.Seconds(), }) @@ -745,6 +749,7 @@ func (e *Executor) LeaderInsert(ctx context.Context, params *riverdriver.LeaderI ElectedAt: params.ElectedAt, ExpiresAt: params.ExpiresAt, LeaderID: params.LeaderID, + Name: params.Name, Now: params.Now, TTL: params.TTL.Seconds(), }) @@ -871,11 +876,13 @@ func (e *Executor) QueueCreateOrSetUpdatedAt(ctx context.Context, params *riverd func (e *Executor) QueueDeleteExpired(ctx context.Context, params *riverdriver.QueueDeleteExpiredParams) ([]string, error) { queues, err := dbsqlc.New().QueueDeleteExpired(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.QueueDeleteExpiredParams{ Max: int64(params.Max), + QueuesIncluded: params.QueuesIncluded, UpdatedAtHorizon: params.UpdatedAtHorizon, }) if err != nil { return nil, interpretError(err) } + queueNames := make([]string, len(queues)) for i, q := range queues { queueNames[i] = q.Name diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql index dfb72dc3..a3091867 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql @@ -92,34 +92,19 @@ RETURNING *; -- name: JobDeleteBefore :execresult DELETE FROM /* TEMPLATE: schema */river_job -WHERE - id IN ( - SELECT id - FROM /* TEMPLATE: schema */river_job - WHERE +WHERE id IN ( + SELECT id + FROM /* TEMPLATE: schema */river_job + WHERE ( (state = 'cancelled' AND finalized_at < cast(@cancelled_finalized_at_horizon AS text)) OR (state = 'completed' AND finalized_at < cast(@completed_finalized_at_horizon AS text)) OR (state = 'discarded' AND finalized_at < cast(@discarded_finalized_at_horizon AS text)) - ORDER BY id - LIMIT @max - ) - -- This is really awful, but unless the `sqlc.slice` appears as the very - -- last parameter in the query things will fail if it includes more than one - -- element. The sqlc SQLite driver uses position-based placeholders (?1) for - -- most parameters, but unnamed ones with `sqlc.slice` (?), and when - -- positional parameters follow unnamed parameters great confusion is the - -- result. Making sure `sqlc.slice` is last is the only workaround I could - -- find, but it stops working if there are multiple clauses that need a - -- positional placeholder plus `sqlc.slice` like this one (the Postgres - -- driver supports a `queues_included` parameter that I couldn't support - -- here). The non-workaround version is (unfortunately) to never, ever use - -- the sqlc driver for SQLite -- it's not a little buggy, it's off the - -- charts buggy, and there's little interest from the maintainers in fixing - -- any of it. We already started using it though, so plough on. - AND ( - cast(@queues_excluded_empty AS boolean) - OR river_job.queue NOT IN (sqlc.slice('queues_excluded')) - ); + ) + AND (/* TEMPLATE_BEGIN: queues_excluded_clause */ true /* TEMPLATE_END */) + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) + ORDER BY id + LIMIT @max +); -- name: JobDeleteMany :many DELETE FROM /* TEMPLATE: schema */river_job @@ -186,6 +171,7 @@ SELECT * FROM /* TEMPLATE: schema */river_job WHERE state = 'running' AND attempted_at < cast(@stuck_horizon AS text) + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) ORDER BY id LIMIT @max; @@ -411,6 +397,7 @@ FROM /* TEMPLATE: schema */river_job WHERE state IN ('retryable', 'scheduled') AND scheduled_at <= coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')) + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) ORDER BY priority, scheduled_at, diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go index 0a86bc05..c318519f 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go @@ -214,34 +214,19 @@ func (q *Queries) JobDelete(ctx context.Context, db DBTX, id int64) (*RiverJob, const jobDeleteBefore = `-- name: JobDeleteBefore :execresult DELETE FROM /* TEMPLATE: schema */river_job -WHERE - id IN ( - SELECT id - FROM /* TEMPLATE: schema */river_job - WHERE +WHERE id IN ( + SELECT id + FROM /* TEMPLATE: schema */river_job + WHERE ( (state = 'cancelled' AND finalized_at < cast(?1 AS text)) OR (state = 'completed' AND finalized_at < cast(?2 AS text)) OR (state = 'discarded' AND finalized_at < cast(?3 AS text)) - ORDER BY id - LIMIT ?4 - ) - -- This is really awful, but unless the ` + "`" + `sqlc.slice` + "`" + ` appears as the very - -- last parameter in the query things will fail if it includes more than one - -- element. The sqlc SQLite driver uses position-based placeholders (?1) for - -- most parameters, but unnamed ones with ` + "`" + `sqlc.slice` + "`" + ` (?), and when - -- positional parameters follow unnamed parameters great confusion is the - -- result. Making sure ` + "`" + `sqlc.slice` + "`" + ` is last is the only workaround I could - -- find, but it stops working if there are multiple clauses that need a - -- positional placeholder plus ` + "`" + `sqlc.slice` + "`" + ` like this one (the Postgres - -- driver supports a ` + "`" + `queues_included` + "`" + ` parameter that I couldn't support - -- here). The non-workaround version is (unfortunately) to never, ever use - -- the sqlc driver for SQLite -- it's not a little buggy, it's off the - -- charts buggy, and there's little interest from the maintainers in fixing - -- any of it. We already started using it though, so plough on. - AND ( - cast(?5 AS boolean) - OR river_job.queue NOT IN (/*SLICE:queues_excluded*/?) - ) + ) + AND (/* TEMPLATE_BEGIN: queues_excluded_clause */ true /* TEMPLATE_END */) + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) + ORDER BY id + LIMIT ?4 +) ` type JobDeleteBeforeParams struct { @@ -249,27 +234,15 @@ type JobDeleteBeforeParams struct { CompletedFinalizedAtHorizon string DiscardedFinalizedAtHorizon string Max int64 - QueuesExcludedEmpty bool - QueuesExcluded []string } func (q *Queries) JobDeleteBefore(ctx context.Context, db DBTX, arg *JobDeleteBeforeParams) (sql.Result, error) { - query := jobDeleteBefore - var queryParams []interface{} - queryParams = append(queryParams, arg.CancelledFinalizedAtHorizon) - queryParams = append(queryParams, arg.CompletedFinalizedAtHorizon) - queryParams = append(queryParams, arg.DiscardedFinalizedAtHorizon) - queryParams = append(queryParams, arg.Max) - queryParams = append(queryParams, arg.QueuesExcludedEmpty) - if len(arg.QueuesExcluded) > 0 { - for _, v := range arg.QueuesExcluded { - queryParams = append(queryParams, v) - } - query = strings.Replace(query, "/*SLICE:queues_excluded*/?", strings.Repeat(",?", len(arg.QueuesExcluded))[1:], 1) - } else { - query = strings.Replace(query, "/*SLICE:queues_excluded*/?", "NULL", 1) - } - return db.ExecContext(ctx, query, queryParams...) + return db.ExecContext(ctx, jobDeleteBefore, + arg.CancelledFinalizedAtHorizon, + arg.CompletedFinalizedAtHorizon, + arg.DiscardedFinalizedAtHorizon, + arg.Max, + ) } const jobDeleteMany = `-- name: JobDeleteMany :many @@ -562,6 +535,7 @@ SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finali FROM /* TEMPLATE: schema */river_job WHERE state = 'running' AND attempted_at < cast(?1 AS text) + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) ORDER BY id LIMIT ?2 ` @@ -1183,6 +1157,7 @@ FROM /* TEMPLATE: schema */river_job WHERE state IN ('retryable', 'scheduled') AND scheduled_at <= coalesce(cast(?1 AS text), datetime('now', 'subsec')) + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) ORDER BY priority, scheduled_at, diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql b/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql index c5da22cb..fb690ef5 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql @@ -11,11 +11,13 @@ CREATE TABLE river_leader ( INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( @leader_id, coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), - datetime(coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), 'subsec', cast(@ttl as text)) + datetime(coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), 'subsec', cast(@ttl as text)), + @name ) ON CONFLICT (name) DO NOTHING; @@ -24,11 +26,13 @@ ON CONFLICT (name) INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( @leader_id, coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), - datetime(coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), 'subsec', cast(@ttl as text)) + datetime(coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), 'subsec', cast(@ttl as text)), + @name ) ON CONFLICT (name) DO UPDATE SET @@ -48,11 +52,13 @@ FROM /* TEMPLATE: schema */river_leader; INSERT INTO /* TEMPLATE: schema */river_leader( elected_at, expires_at, - leader_id + leader_id, + name ) VALUES ( coalesce(cast(sqlc.narg('elected_at') AS text), cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), coalesce(cast(sqlc.narg('expires_at') AS text), datetime(coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), 'subsec', cast(@ttl as text))), - @leader_id + @leader_id, + @name ) RETURNING *; -- name: LeaderResign :execrows diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql.go index 2c50d5f9..0cd94a6a 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql.go @@ -13,11 +13,13 @@ const leaderAttemptElect = `-- name: LeaderAttemptElect :execrows INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( ?1, coalesce(cast(?2 AS text), datetime('now', 'subsec')), - datetime(coalesce(cast(?2 AS text), datetime('now', 'subsec')), 'subsec', cast(?3 as text)) + datetime(coalesce(cast(?2 AS text), datetime('now', 'subsec')), 'subsec', cast(?3 as text)), + ?4 ) ON CONFLICT (name) DO NOTHING @@ -27,10 +29,16 @@ type LeaderAttemptElectParams struct { LeaderID string Now *string TTL string + Name string } func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (int64, error) { - result, err := db.ExecContext(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) + result, err := db.ExecContext(ctx, leaderAttemptElect, + arg.LeaderID, + arg.Now, + arg.TTL, + arg.Name, + ) if err != nil { return 0, err } @@ -41,11 +49,13 @@ const leaderAttemptReelect = `-- name: LeaderAttemptReelect :execrows INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( ?1, coalesce(cast(?2 AS text), datetime('now', 'subsec')), - datetime(coalesce(cast(?2 AS text), datetime('now', 'subsec')), 'subsec', cast(?3 as text)) + datetime(coalesce(cast(?2 AS text), datetime('now', 'subsec')), 'subsec', cast(?3 as text)), + ?4 ) ON CONFLICT (name) DO UPDATE SET @@ -58,10 +68,16 @@ type LeaderAttemptReelectParams struct { LeaderID string Now *string TTL string + Name string } func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (int64, error) { - result, err := db.ExecContext(ctx, leaderAttemptReelect, arg.LeaderID, arg.Now, arg.TTL) + result, err := db.ExecContext(ctx, leaderAttemptReelect, + arg.LeaderID, + arg.Now, + arg.TTL, + arg.Name, + ) if err != nil { return 0, err } @@ -102,11 +118,13 @@ const leaderInsert = `-- name: LeaderInsert :one INSERT INTO /* TEMPLATE: schema */river_leader( elected_at, expires_at, - leader_id + leader_id, + name ) VALUES ( coalesce(cast(?1 AS text), cast(?2 AS text), datetime('now', 'subsec')), coalesce(cast(?3 AS text), datetime(coalesce(cast(?2 AS text), datetime('now', 'subsec')), 'subsec', cast(?4 as text))), - ?5 + ?5, + ?6 ) RETURNING elected_at, expires_at, leader_id, name ` @@ -116,6 +134,7 @@ type LeaderInsertParams struct { ExpiresAt *string TTL string LeaderID string + Name string } func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertParams) (*RiverLeader, error) { @@ -125,6 +144,7 @@ func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertPa arg.ExpiresAt, arg.TTL, arg.LeaderID, + arg.Name, ) var i RiverLeader err := row.Scan( diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql b/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql index 1aa7c5fe..f9ceb3ab 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql @@ -30,6 +30,7 @@ WHERE name IN ( SELECT name FROM /* TEMPLATE: schema */river_queue WHERE river_queue.updated_at < @updated_at_horizon + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) ORDER BY name ASC LIMIT @max ) diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql.go index a89f2b74..25afa4ff 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql.go @@ -63,6 +63,7 @@ WHERE name IN ( SELECT name FROM /* TEMPLATE: schema */river_queue WHERE river_queue.updated_at < ?1 + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) ORDER BY name ASC LIMIT ?2 ) diff --git a/riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.down.sql b/riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.down.sql new file mode 100644 index 00000000..4d99b4fa --- /dev/null +++ b/riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.down.sql @@ -0,0 +1,15 @@ +-- +-- Alter `river_leader` to add a default value of 'default` to `name`. SQLite +-- doesn't allow schema modifications, so this redefines the table entirely. +-- + +DROP TABLE /* TEMPLATE: schema */river_leader; + +CREATE TABLE /* TEMPLATE: schema */river_leader ( + elected_at timestamp NOT NULL, + expires_at timestamp NOT NULL, + leader_id text NOT NULL, + name text PRIMARY KEY NOT NULL DEFAULT 'default' CHECK (name = 'default'), + CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128), + CONSTRAINT leader_id_length CHECK (length(leader_id) > 0 AND length(leader_id) < 128) +); \ No newline at end of file diff --git a/riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.up.sql b/riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.up.sql new file mode 100644 index 00000000..a38caa24 --- /dev/null +++ b/riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.up.sql @@ -0,0 +1,16 @@ +-- +-- Alter `river_leader` to remove check constraint that `name` must be +-- `default`. SQLite doesn't allow schema modifications, so this redefines the +-- table entirely. +-- + +DROP TABLE /* TEMPLATE: schema */river_leader; + +CREATE TABLE /* TEMPLATE: schema */river_leader ( + elected_at timestamp NOT NULL, + expires_at timestamp NOT NULL, + leader_id text NOT NULL, + name text PRIMARY KEY NOT NULL DEFAULT 'default', + CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128), + CONSTRAINT leader_id_length CHECK (length(leader_id) > 0 AND length(leader_id) < 128) +); \ No newline at end of file diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index 7e6af4bc..49ec2cc8 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -342,18 +342,81 @@ func (e *Executor) JobDelete(ctx context.Context, params *riverdriver.JobDeleteP }) } +// queuesClause generates a partial SQL fragment used to checking whether a +// query's queue is included or excluded from a given set of injected queues. +// +// This is really, really awful, and I wish there was a way I could find to +// avoid doing this, but it's the only option I could come up with to work +// around the fact that sqlc is incredibly buggy, and near non-functionally +// buggy when it comes to SQLite. +// +// I tried to use an slice-like inject using `sqlc.slice(...)`, but because that +// generates `?` placeholders without numbers, it's completely incompatible with +// other parameters if it shows up anywhere in the query except for the very +// end. It's sometimes possible to rearrange the query so it appears at the end, +// but even that causes issues in other cases like if you want to use multiple +// `sqlc.slice(...)` calls (e.g. using both a `queues_included` and +// `queues_excluded`). +// +// Next up, you could workaround the problem using a `json_each(@json_array)`, +// but sqlc can't find variables used in a function like this. Not for a good +// reason, but again, just because it's extremely buggy. +// +// So instead, we use a `json_each` approach, but we have to manually inject via +// our home-grown templating system (see the `sqlctemplate` package). It's not +// great, and possibly bad even, but it works. +func queuesClause(replacements map[string]sqlctemplate.Replacement, namedArgs map[string]any, clauseName, columnName string, queues []string, isExcluded bool) error { + if queues == nil { + replacements[clauseName] = sqlctemplate.Replacement{Value: "true"} + return nil + } + + var ( + paramName = clauseName + "_arg" + clauseSQL = ` + EXISTS ( + SELECT 1 + FROM json_each(@` + paramName + `) + WHERE json_each.value = ` + columnName + ` + ) + ` + ) + + if isExcluded { + clauseSQL = "NOT " + clauseSQL + } + + data, err := json.Marshal(queues) + if err != nil { + return fmt.Errorf("error marshaling queues: %w", err) + } + + replacements[clauseName] = sqlctemplate.Replacement{Value: clauseSQL} + namedArgs[paramName] = data + + return nil +} + func (e *Executor) JobDeleteBefore(ctx context.Context, params *riverdriver.JobDeleteBeforeParams) (int, error) { - if len(params.QueuesIncluded) > 0 { - return 0, riverdriver.ErrNotImplemented + var ( + replacements = make(map[string]sqlctemplate.Replacement) + namedArgs = make(map[string]any) + ) + + if err := queuesClause(replacements, namedArgs, "queues_excluded_clause", "queue", params.QueuesExcluded, true); err != nil { + return 0, err + } + if err := queuesClause(replacements, namedArgs, "queues_included_clause", "queue", params.QueuesIncluded, false); err != nil { + return 0, err } + ctx = sqlctemplate.WithReplacements(ctx, replacements, namedArgs) + res, err := dbsqlc.New().JobDeleteBefore(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobDeleteBeforeParams{ CancelledFinalizedAtHorizon: timeString(params.CancelledFinalizedAtHorizon), CompletedFinalizedAtHorizon: timeString(params.CompletedFinalizedAtHorizon), DiscardedFinalizedAtHorizon: timeString(params.DiscardedFinalizedAtHorizon), Max: int64(params.Max), - QueuesExcluded: params.QueuesExcluded, - QueuesExcludedEmpty: len(params.QueuesExcluded) < 1, // not in the Postgres version, but I couldn't find a way around it }) if err != nil { return 0, interpretError(err) @@ -473,6 +536,17 @@ func (e *Executor) JobGetByKindMany(ctx context.Context, params *riverdriver.Job } func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetStuckParams) ([]*rivertype.JobRow, error) { + var ( + replacements = make(map[string]sqlctemplate.Replacement) + namedArgs = make(map[string]any) + ) + + if err := queuesClause(replacements, namedArgs, "queues_included_clause", "queue", params.QueuesIncluded, false); err != nil { + return nil, err + } + + ctx = sqlctemplate.WithReplacements(ctx, replacements, namedArgs) + jobs, err := dbsqlc.New().JobGetStuck(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetStuckParams{ Max: int64(params.Max), StuckHorizon: timeString(params.StuckHorizon), @@ -837,10 +911,24 @@ func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobSched ctx = schemaTemplateParam(ctx, params.Schema) dbtx := templateReplaceWrapper{dbtx: e.driver.UnwrapTx(execTx), replacer: &e.driver.replacer} - eligibleJobs, err := dbsqlc.New().JobScheduleGetEligible(schemaTemplateParam(ctx, params.Schema), dbtx, &dbsqlc.JobScheduleGetEligibleParams{ - Max: int64(params.Max), - Now: timeStringNullable(params.Now), - }) + eligibleJobs, err := func() ([]*dbsqlc.RiverJob, error) { + var ( + replacements = make(map[string]sqlctemplate.Replacement) + namedArgs = make(map[string]any) + ) + + if err := queuesClause(replacements, namedArgs, "queues_included_clause", "queue", params.QueuesIncluded, false); err != nil { + return nil, err + } + + ctx := sqlctemplate.WithReplacementsDup(ctx) // dupe so these new replacements don't leak into queries below (WithReplacements mutates an existing context container rather copies-on-write it) + ctx = sqlctemplate.WithReplacements(ctx, replacements, namedArgs) + + return dbsqlc.New().JobScheduleGetEligible(schemaTemplateParam(ctx, params.Schema), dbtx, &dbsqlc.JobScheduleGetEligibleParams{ + Max: int64(params.Max), + Now: timeStringNullable(params.Now), + }) + }() if err != nil { return nil, interpretError(err) } @@ -1070,6 +1158,7 @@ func (e *Executor) JobUpdateFull(ctx context.Context, params *riverdriver.JobUpd func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { numElectionsWon, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ LeaderID: params.LeaderID, + Name: params.Name, Now: timeStringNullable(params.Now), TTL: durationAsString(params.TTL), }) @@ -1082,6 +1171,7 @@ func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.L func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { numElectionsWon, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ LeaderID: params.LeaderID, + Name: params.Name, Now: timeStringNullable(params.Now), TTL: durationAsString(params.TTL), }) @@ -1111,6 +1201,7 @@ func (e *Executor) LeaderInsert(ctx context.Context, params *riverdriver.LeaderI leader, err := dbsqlc.New().LeaderInsert(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderInsertParams{ ElectedAt: timeStringNullable(params.ElectedAt), ExpiresAt: timeStringNullable(params.ExpiresAt), + Name: params.Name, Now: timeStringNullable(params.Now), LeaderID: params.LeaderID, TTL: durationAsString(params.TTL), @@ -1256,6 +1347,17 @@ func (e *Executor) QueueCreateOrSetUpdatedAt(ctx context.Context, params *riverd } func (e *Executor) QueueDeleteExpired(ctx context.Context, params *riverdriver.QueueDeleteExpiredParams) ([]string, error) { + var ( + replacements = make(map[string]sqlctemplate.Replacement) + namedArgs = make(map[string]any) + ) + + if err := queuesClause(replacements, namedArgs, "queues_included_clause", "name", params.QueuesIncluded, false); err != nil { + return nil, err + } + + ctx = sqlctemplate.WithReplacements(ctx, replacements, namedArgs) + queues, err := dbsqlc.New().QueueDeleteExpired(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.QueueDeleteExpiredParams{ Max: int64(params.Max), UpdatedAtHorizon: params.UpdatedAtHorizon.UTC(), diff --git a/rivershared/sqlctemplate/sqlc_template.go b/rivershared/sqlctemplate/sqlc_template.go index 88c02edb..cd2657d9 100644 --- a/rivershared/sqlctemplate/sqlc_template.go +++ b/rivershared/sqlctemplate/sqlc_template.go @@ -188,8 +188,8 @@ func (r *Replacer) RunSafely(ctx context.Context, argPlaceholder, sql string, ar } updatedSQL := sql - updatedSQL = replaceTemplate(updatedSQL, templateBeginEndRE) updatedSQL = replaceTemplate(updatedSQL, templateRE) + updatedSQL = replaceTemplate(updatedSQL, templateBeginEndRE) if len(templatesExpected) > 0 { return "", nil, errors.New("sqlctemplate params present in context but missing in SQL: " + strings.Join(templatesExpected, ", ")) @@ -265,6 +265,33 @@ func WithReplacements(ctx context.Context, replacements map[string]Replacement, }) } +// WithReplacementsDup duplicates the template replacements container in context. +// +// Normally, adding replacements modifies an existing container in context. Call +// this function before WithReplacements to create a dup instead. +// +// TODO(brandur): This API isn't great. Reconsider it. Maybe WithReplacements +// should always dup? +func WithReplacementsDup(ctx context.Context) context.Context { + var ( + namedArgs map[string]any + replacements map[string]Replacement + ) + + if container, ok := ctx.Value(contextKey{}).(*contextContainer); ok { + namedArgs = maps.Clone(container.NamedArgs) + replacements = maps.Clone(container.Replacements) + } else { + namedArgs = make(map[string]any) + replacements = make(map[string]Replacement) + } + + return context.WithValue(ctx, contextKey{}, &contextContainer{ + NamedArgs: namedArgs, + Replacements: replacements, + }) +} + // Comparable struct that's used as a key for template caching. type replacerCacheKey struct { namedArgs string // all arg names concatenated together diff --git a/rivershared/testfactory/test_factory.go b/rivershared/testfactory/test_factory.go index 26cf4b88..84f51d85 100644 --- a/rivershared/testfactory/test_factory.go +++ b/rivershared/testfactory/test_factory.go @@ -111,6 +111,7 @@ type LeaderOpts struct { ElectedAt *time.Time ExpiresAt *time.Time LeaderID *string + Name *string Now *time.Time Schema string } @@ -122,6 +123,7 @@ func Leader(ctx context.Context, tb testing.TB, exec riverdriver.Executor, opts ElectedAt: opts.ElectedAt, ExpiresAt: opts.ExpiresAt, LeaderID: ptrutil.ValOrDefault(opts.LeaderID, "test-client-id"), + Name: ptrutil.ValOrDefault(opts.Name, "default"), Now: opts.Now, Schema: opts.Schema, TTL: 10 * time.Second,