Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion dbos/dbos.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ type Config struct {
ConductorAPIKey string // DBOS conductor API key (optional)
ApplicationVersion string // Application version (optional, overridden by DBOS__APPVERSION env var)
ExecutorID string // Executor ID (optional, overridden by DBOS__VMID env var)
QueueRunner QueueConfig // Queue configuration (optional)
}

// QueueConfig configures the queue runner polling behavior.
type QueueConfig struct {
BaseInterval float64 // seconds
MinInterval float64 // seconds
MaxInterval float64 // seconds
}

func processConfig(inputConfig *Config) (*Config, error) {
Expand All @@ -54,6 +62,10 @@ func processConfig(inputConfig *Config) (*Config, error) {
inputConfig.AdminServerPort = _DEFAULT_ADMIN_SERVER_PORT
}

if inputConfig.QueueRunner.MinInterval > inputConfig.QueueRunner.MaxInterval {
return nil, fmt.Errorf("minInterval must be less than maxInterval")
}

dbosConfig := &Config{
DatabaseURL: inputConfig.DatabaseURL,
AppName: inputConfig.AppName,
Expand All @@ -66,6 +78,7 @@ func processConfig(inputConfig *Config) (*Config, error) {
ApplicationVersion: inputConfig.ApplicationVersion,
ExecutorID: inputConfig.ExecutorID,
SystemDBPool: inputConfig.SystemDBPool,
QueueRunner: inputConfig.QueueRunner,
}

// Load defaults
Expand Down Expand Up @@ -379,7 +392,7 @@ func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error
initExecutor.logger.Debug("System database initialized")

// Initialize the queue runner and register DBOS internal queue
initExecutor.queueRunner = newQueueRunner(initExecutor.logger)
initExecutor.queueRunner = newQueueRunner(initExecutor.logger, config.QueueRunner)

// Initialize conductor if API key is provided
if config.ConductorAPIKey != "" {
Expand Down
21 changes: 17 additions & 4 deletions dbos/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
const (
_DBOS_INTERNAL_QUEUE_NAME = "_dbos_internal_queue"
_DEFAULT_MAX_TASKS_PER_ITERATION = 100
_DEFAULT_BASE_INTERVAL = 1.0
_DEFAULT_MAX_INTERVAL = 120.0
_DEFAULT_MIN_INTERVAL = 1.0
)

// RateLimiter configures rate limiting for workflow queue execution.
Expand Down Expand Up @@ -159,11 +162,21 @@ type queueRunner struct {
completionChan chan struct{}
}

func newQueueRunner(logger *slog.Logger) *queueRunner {
func newQueueRunner(logger *slog.Logger, config QueueConfig) *queueRunner {
if config.BaseInterval == 0 {
config.BaseInterval = _DEFAULT_BASE_INTERVAL
}
if config.MinInterval == 0 {
config.MinInterval = _DEFAULT_MIN_INTERVAL
}
if config.MaxInterval == 0 {
config.MaxInterval = _DEFAULT_MAX_INTERVAL
}

return &queueRunner{
baseInterval: 1.0,
minInterval: 1.0,
maxInterval: 120.0,
baseInterval: config.BaseInterval,
minInterval: config.MinInterval,
maxInterval: config.MaxInterval,
backoffFactor: 2.0,
scalebackFactor: 0.9,
jitterMin: 0.95,
Expand Down
22 changes: 22 additions & 0 deletions dbos/queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"log/slog"
"os"
"reflect"
"runtime"
Expand Down Expand Up @@ -1579,3 +1580,24 @@ func TestPartitionedQueues(t *testing.T) {
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after partitioned queue test")
})
}

func TestNewQueueRunner(t *testing.T) {
t.Run("init queue with default interval", func(t *testing.T) {
runner := newQueueRunner(slog.New(slog.NewTextHandler(os.Stdout, nil)), QueueConfig{})
require.Equal(t, _DEFAULT_BASE_INTERVAL, runner.baseInterval)
require.Equal(t, _DEFAULT_MAX_INTERVAL, runner.maxInterval)
require.Equal(t, _DEFAULT_MIN_INTERVAL, runner.minInterval)
})

t.Run("init queue with custom interval", func(t *testing.T) {
runner := newQueueRunner(slog.New(slog.NewTextHandler(os.Stdout, nil)), QueueConfig{
BaseInterval: 1,
MinInterval: 2,
MaxInterval: 3,
})
require.Equal(t, float64(1), runner.baseInterval)
require.Equal(t, float64(2), runner.minInterval)
require.Equal(t, float64(3), runner.maxInterval)

})
}
Loading