From 1eba3e23ee7291891bc0a84780c2e1bf05ec48f6 Mon Sep 17 00:00:00 2001 From: tuannguyensn2001 Date: Thu, 27 Nov 2025 09:56:52 +0700 Subject: [PATCH] Add queue option --- dbos/dbos.go | 15 ++++++++++++++- dbos/queue.go | 21 +++++++++++++++++---- dbos/queues_test.go | 22 ++++++++++++++++++++++ 3 files changed, 53 insertions(+), 5 deletions(-) diff --git a/dbos/dbos.go b/dbos/dbos.go index 828d949..bc8642d 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -40,6 +40,14 @@ type Config struct { ConductorAPIKey string // DBOS conductor API key (optional) ApplicationVersion string // Application version (optional, overridden by DBOS__APPVERSION env var) ExecutorID string // Executor ID (optional, overridden by DBOS__VMID env var) + QueueRunner QueueConfig // Queue configuration (optional) +} + +// QueueConfig configures the queue runner polling behavior. +type QueueConfig struct { + BaseInterval float64 // seconds + MinInterval float64 // seconds + MaxInterval float64 // seconds } func processConfig(inputConfig *Config) (*Config, error) { @@ -54,6 +62,10 @@ func processConfig(inputConfig *Config) (*Config, error) { inputConfig.AdminServerPort = _DEFAULT_ADMIN_SERVER_PORT } + if inputConfig.QueueRunner.MinInterval > inputConfig.QueueRunner.MaxInterval { + return nil, fmt.Errorf("minInterval must be less than maxInterval") + } + dbosConfig := &Config{ DatabaseURL: inputConfig.DatabaseURL, AppName: inputConfig.AppName, @@ -66,6 +78,7 @@ func processConfig(inputConfig *Config) (*Config, error) { ApplicationVersion: inputConfig.ApplicationVersion, ExecutorID: inputConfig.ExecutorID, SystemDBPool: inputConfig.SystemDBPool, + QueueRunner: inputConfig.QueueRunner, } // Load defaults @@ -379,7 +392,7 @@ func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error initExecutor.logger.Debug("System database initialized") // Initialize the queue runner and register DBOS internal queue - initExecutor.queueRunner = newQueueRunner(initExecutor.logger) + initExecutor.queueRunner = newQueueRunner(initExecutor.logger, config.QueueRunner) // Initialize conductor if API key is provided if config.ConductorAPIKey != "" { diff --git a/dbos/queue.go b/dbos/queue.go index d9832b9..f6dee53 100644 --- a/dbos/queue.go +++ b/dbos/queue.go @@ -14,6 +14,9 @@ import ( const ( _DBOS_INTERNAL_QUEUE_NAME = "_dbos_internal_queue" _DEFAULT_MAX_TASKS_PER_ITERATION = 100 + _DEFAULT_BASE_INTERVAL = 1.0 + _DEFAULT_MAX_INTERVAL = 120.0 + _DEFAULT_MIN_INTERVAL = 1.0 ) // RateLimiter configures rate limiting for workflow queue execution. @@ -159,11 +162,21 @@ type queueRunner struct { completionChan chan struct{} } -func newQueueRunner(logger *slog.Logger) *queueRunner { +func newQueueRunner(logger *slog.Logger, config QueueConfig) *queueRunner { + if config.BaseInterval == 0 { + config.BaseInterval = _DEFAULT_BASE_INTERVAL + } + if config.MinInterval == 0 { + config.MinInterval = _DEFAULT_MIN_INTERVAL + } + if config.MaxInterval == 0 { + config.MaxInterval = _DEFAULT_MAX_INTERVAL + } + return &queueRunner{ - baseInterval: 1.0, - minInterval: 1.0, - maxInterval: 120.0, + baseInterval: config.BaseInterval, + minInterval: config.MinInterval, + maxInterval: config.MaxInterval, backoffFactor: 2.0, scalebackFactor: 0.9, jitterMin: 0.95, diff --git a/dbos/queues_test.go b/dbos/queues_test.go index 53148f6..66ce3ee 100644 --- a/dbos/queues_test.go +++ b/dbos/queues_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "log/slog" "os" "reflect" "runtime" @@ -1579,3 +1580,24 @@ func TestPartitionedQueues(t *testing.T) { require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after partitioned queue test") }) } + +func TestNewQueueRunner(t *testing.T) { + t.Run("init queue with default interval", func(t *testing.T) { + runner := newQueueRunner(slog.New(slog.NewTextHandler(os.Stdout, nil)), QueueConfig{}) + require.Equal(t, _DEFAULT_BASE_INTERVAL, runner.baseInterval) + require.Equal(t, _DEFAULT_MAX_INTERVAL, runner.maxInterval) + require.Equal(t, _DEFAULT_MIN_INTERVAL, runner.minInterval) + }) + + t.Run("init queue with custom interval", func(t *testing.T) { + runner := newQueueRunner(slog.New(slog.NewTextHandler(os.Stdout, nil)), QueueConfig{ + BaseInterval: 1, + MinInterval: 2, + MaxInterval: 3, + }) + require.Equal(t, float64(1), runner.baseInterval) + require.Equal(t, float64(2), runner.minInterval) + require.Equal(t, float64(3), runner.maxInterval) + + }) +}