Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Added `Config.LeaderDomain` to allow multiple River clients to be elected leader within a single schema/database and run maintenance services on only their configured queues. [PR #1113](https://github.com/riverqueue/river/pull/1113).

## [0.29.0] - 2025-12-22

### Added
Expand Down
62 changes: 59 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"log/slog"
"os"
"regexp"
"slices"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -209,6 +210,47 @@ type Config struct {
// Jobs may have their own specific hooks by implementing JobArgsWithHooks.
Hooks []rivertype.Hook

// LeaderDomain is an optional "domain" string to use for leader election.
// Different clients sharing the same River schema can elect multiple
// leaders as long as they're using different domains, with one leader
// elected per domain.
//
// Setting this value also triggers the related behavior that maintenance
// services start to only operate on the queues they're configured on. So
// for example, given client1 handling queue_a and queue_b and client2
// handling queue_c and queue_d, whichever client is elected leader will end
// up running all maintenance services for all queues (queue_a, queue_b,
// queue_c, and queue_d). But if client1 is using domain "domain1" and
// client2 is using domain "domain2", then client1 (elected in domain1) will
// only run maintenance services on queue_a and queue_b, while client2
// (elected in domain2) will run maintenance services on queue_c and
// queue_d.
//
// A warning though that River *does not protect against configuration
// mistakes*. If client1 on domain1 is configured for queue_a and queue_b,
// and client2 on domain2 is *also* configured for queue_a and queue_b, then
// both clients may end up running maintenance services on the same queues
// at the same time. It's the caller's responsibility to ensure that doesn't
// happen.
//
// Left empty or use of the special value "default" causes the client to
// operate on all queues. When setting this value to non-empty
// non-"default", no other clients should be left empty or use "default"
// because the default client(s) will infringe on the domains of the
// non-default one(s).
//
// Certain maintenance services that aren't queue-related like the indexer
// will continue to run on all leaders regardless of domain. If using this
// feature, it's a good idea to configure ReindexerTimeout on all but a
// single leader domain to river.NeverSchedule().
//
// In general, most River users should not need LeaderDomain, and when
// running multiple Rivers may want to consider using multiple databases and
// multiple schemas instead.
//
// Defaults to "default".
LeaderDomain string

// Logger is the structured logger to use for logging purposes. If none is
// specified, logs will be emitted to STDOUT with messages at warn level
// or higher.
Expand Down Expand Up @@ -415,6 +457,7 @@ func (c *Config) WithDefaults() *Config {
Hooks: c.Hooks,
JobInsertMiddleware: c.JobInsertMiddleware,
JobTimeout: cmp.Or(c.JobTimeout, JobTimeoutDefault),
LeaderDomain: c.LeaderDomain,
Logger: logger,
MaxAttempts: cmp.Or(c.MaxAttempts, MaxAttemptsDefault),
Middleware: c.Middleware,
Expand Down Expand Up @@ -840,6 +883,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client

client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, &leadership.Config{
ClientID: config.ID,
Domain: config.LeaderDomain,
Schema: config.Schema,
})
client.services = append(client.services, client.elector)
Expand All @@ -860,6 +904,14 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
client.services = append(client.services, pluginPilot.PluginServices()...)
}

// It's important for queuesIncluded to be `nil` in case it's not in use
// for the various driver queries to work correctly.
var queuesIncluded []string
if config.LeaderDomain != "" && config.LeaderDomain != leadership.DomainDefault && len(config.Queues) > 0 {
queuesIncluded = maputil.Keys(config.Queues)
slices.Sort(queuesIncluded)
}

//
// Maintenance services
//
Expand All @@ -872,6 +924,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
CompletedJobRetentionPeriod: config.CompletedJobRetentionPeriod,
DiscardedJobRetentionPeriod: config.DiscardedJobRetentionPeriod,
QueuesExcluded: client.pilot.JobCleanerQueuesExcluded(),
QueuesIncluded: queuesIncluded,
Schema: config.Schema,
Timeout: config.JobCleanerTimeout,
}, driver.GetExecutor())
Expand All @@ -882,6 +935,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
{
jobRescuer := maintenance.NewRescuer(archetype, &maintenance.JobRescuerConfig{
ClientRetryPolicy: config.RetryPolicy,
QueuesIncluded: queuesIncluded,
RescueAfter: config.RescueStuckJobsAfter,
Schema: config.Schema,
WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory {
Expand All @@ -897,9 +951,10 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client

{
jobScheduler := maintenance.NewJobScheduler(archetype, &maintenance.JobSchedulerConfig{
Interval: config.schedulerInterval,
NotifyInsert: client.maybeNotifyInsertForQueues,
Schema: config.Schema,
Interval: config.schedulerInterval,
NotifyInsert: client.maybeNotifyInsertForQueues,
QueuesIncluded: queuesIncluded,
Schema: config.Schema,
}, driver.GetExecutor())
maintenanceServices = append(maintenanceServices, jobScheduler)
client.testSignals.jobScheduler = &jobScheduler.TestSignals
Expand All @@ -925,6 +980,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client

{
queueCleaner := maintenance.NewQueueCleaner(archetype, &maintenance.QueueCleanerConfig{
QueuesIncluded: queuesIncluded,
RetentionPeriod: maintenance.QueueRetentionPeriodDefault,
Schema: config.Schema,
}, driver.GetExecutor())
Expand Down
148 changes: 148 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1491,6 +1491,154 @@ func Test_Client_Common(t *testing.T) {

startstoptest.Stress(ctx, t, clientWithStop)
})

t.Run("LeaderDomain_Alternate", func(t *testing.T) {
t.Parallel()

var client1 *Client[pgx.Tx]
{
config, bundle := setupConfig(t)
config.LeaderDomain = "domain1"
config.ReindexerSchedule = &neverSchedule{}
config.Queues = map[string]QueueConfig{
"queue_a": {MaxWorkers: 50},
"queue_b": {MaxWorkers: 50},
}

var err error
client1, err = NewClient(bundle.driver, config)
require.NoError(t, err)
client1.testSignals.Init(t)
}

var client2 *Client[pgx.Tx]
{
config, bundle := setupConfig(t)
config.LeaderDomain = "domain2"
config.Queues = map[string]QueueConfig{
"queue_c": {MaxWorkers: 50},
"queue_d": {MaxWorkers: 50},
}
config.Schema = client1.config.Schema
config.ReindexerSchedule = &neverSchedule{}

var err error
client2, err = NewClient(bundle.driver, config)
require.NoError(t, err)
client2.testSignals.Init(t)
}

startClient(ctx, t, client1)
startClient(ctx, t, client2)

// Both elected
client1.testSignals.electedLeader.WaitOrTimeout()
client2.testSignals.electedLeader.WaitOrTimeout()
})

t.Run("LeaderDomain_MaintenanceServiceConfigEmpty", func(t *testing.T) {
t.Parallel()

config, bundle := setupConfig(t)
config.Queues = map[string]QueueConfig{
"queue_a": {MaxWorkers: 50},
"queue_b": {MaxWorkers: 50},
}

client, err := NewClient(bundle.driver, config)
require.NoError(t, err)
client.testSignals.Init(t)

jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer)
require.Nil(t, jobCleaner.Config.QueuesIncluded)
jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client.queueMaintainer)
require.Nil(t, jobRescuer.Config.QueuesIncluded)
jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client.queueMaintainer)
require.Nil(t, jobScheduler.Config.QueuesIncluded)
queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client.queueMaintainer)
require.Nil(t, queueCleaner.Config.QueuesIncluded)
})

// The domain "default" is special in that it behaves like if LeaderDomain
// was not set.
t.Run("LeaderDomain_MaintenanceServiceConfigDefault", func(t *testing.T) {
t.Parallel()

config, bundle := setupConfig(t)
config.LeaderDomain = "default"
config.Queues = map[string]QueueConfig{
"queue_a": {MaxWorkers: 50},
"queue_b": {MaxWorkers: 50},
}

client, err := NewClient(bundle.driver, config)
require.NoError(t, err)
client.testSignals.Init(t)

jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer)
require.Nil(t, jobCleaner.Config.QueuesIncluded)
jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client.queueMaintainer)
require.Nil(t, jobRescuer.Config.QueuesIncluded)
jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client.queueMaintainer)
require.Nil(t, jobScheduler.Config.QueuesIncluded)
queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client.queueMaintainer)
require.Nil(t, queueCleaner.Config.QueuesIncluded)
})

// When non-default leader domains are configured, each client's maintenance
// services are limited to only their client's queues.
t.Run("LeaderDomain_MaintenanceServiceConfigAlternate", func(t *testing.T) {
t.Parallel()

var client1 *Client[pgx.Tx]
{
config, bundle := setupConfig(t)
config.LeaderDomain = "domain1"
config.ReindexerSchedule = &neverSchedule{}
config.Queues = map[string]QueueConfig{
"queue_a": {MaxWorkers: 50},
"queue_b": {MaxWorkers: 50},
}

var err error
client1, err = NewClient(bundle.driver, config)
require.NoError(t, err)
client1.testSignals.Init(t)

jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client1.queueMaintainer)
require.Equal(t, []string{"queue_a", "queue_b"}, jobCleaner.Config.QueuesIncluded)
jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client1.queueMaintainer)
require.Equal(t, []string{"queue_a", "queue_b"}, jobRescuer.Config.QueuesIncluded)
jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client1.queueMaintainer)
require.Equal(t, []string{"queue_a", "queue_b"}, jobScheduler.Config.QueuesIncluded)
queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client1.queueMaintainer)
require.Equal(t, []string{"queue_a", "queue_b"}, queueCleaner.Config.QueuesIncluded)
}

{
config, bundle := setupConfig(t)
config.LeaderDomain = "domain2"
config.Queues = map[string]QueueConfig{
"queue_c": {MaxWorkers: 50},
"queue_d": {MaxWorkers: 50},
}
config.Schema = client1.config.Schema
config.ReindexerSchedule = &neverSchedule{}

client2, err := NewClient(bundle.driver, config)
require.NoError(t, err)
client2.testSignals.Init(t)

jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client2.queueMaintainer)
require.Equal(t, []string{"queue_c", "queue_d"}, jobCleaner.Config.QueuesIncluded)
jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client2.queueMaintainer)
require.Equal(t, []string{"queue_c", "queue_d"}, jobRescuer.Config.QueuesIncluded)
jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client2.queueMaintainer)
require.Equal(t, []string{"queue_c", "queue_d"}, jobScheduler.Config.QueuesIncluded)
queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client2.queueMaintainer)
require.Equal(t, []string{"queue_c", "queue_d"}, queueCleaner.Config.QueuesIncluded)
}
})
}

type workerWithMiddleware[T JobArgs] struct {
Expand Down
Loading
Loading