From dda5dda5c08bb0cf88942eea6b2fd396e7e50fac Mon Sep 17 00:00:00 2001 From: Brandur Date: Thu, 15 May 2025 20:14:04 -0700 Subject: [PATCH] Optimize SQLite's complete happy path where setting `completed` without metadata This one follows up #870 to add an optimization for job completion where we separate out the most common case of setting jobs to `completed` without any metadata required and update all of them in a simplified batch query, then do the rest of the completions afterwards. In any non-degenerate queue, most completions will be setting success states so this should help with real world uses, but it also helps us significantly improve SQLite's benchmarking numbers. Here's a new benchmark run where throughput is ~4x what it was doing before and roughly on par with Postgres: $ go run ./cmd/river bench --database-url "sqlite://:memory:" --num-total-jobs 1_000_000 bench: jobs worked [ 0 ], inserted [ 1000000 ], job/sec [ 0.0 ] [0s] bench: jobs worked [ 88218 ], inserted [ 0 ], job/sec [ 44109.0 ] [2s] bench: jobs worked [ 91217 ], inserted [ 0 ], job/sec [ 45608.5 ] [2s] bench: jobs worked [ 88858 ], inserted [ 0 ], job/sec [ 44429.0 ] [2s] bench: jobs worked [ 77219 ], inserted [ 0 ], job/sec [ 38609.5 ] [2s] bench: jobs worked [ 82045 ], inserted [ 0 ], job/sec [ 41022.5 ] [2s] bench: jobs worked [ 84052 ], inserted [ 0 ], job/sec [ 42026.0 ] [2s] bench: jobs worked [ 72028 ], inserted [ 0 ], job/sec [ 36014.0 ] [2s] bench: jobs worked [ 90047 ], inserted [ 0 ], job/sec [ 45023.5 ] [2s] bench: jobs worked [ 88875 ], inserted [ 0 ], job/sec [ 44437.5 ] [2s] bench: jobs worked [ 89240 ], inserted [ 0 ], job/sec [ 44620.0 ] [2s] bench: jobs worked [ 88842 ], inserted [ 0 ], job/sec [ 44421.0 ] [2s] bench: jobs worked [ 59359 ], inserted [ 0 ], job/sec [ 29679.5 ] [2s] bench: total jobs worked [ 1000000 ], total jobs inserted [ 1000000 ], overall job/sec [ 42822.8 ], running 23.35203575s Here's a normal non-memory file-based database: $ go run ./cmd/river bench --database-url "sqlite://./sqlite/bench.sqlite3" --num-total-jobs 1_000_000 bench: jobs worked [ 0 ], inserted [ 1000000 ], job/sec [ 0.0 ] [0s] bench: jobs worked [ 83657 ], inserted [ 0 ], job/sec [ 41828.5 ] [2s] bench: jobs worked [ 76648 ], inserted [ 0 ], job/sec [ 38324.0 ] [2s] bench: jobs worked [ 88036 ], inserted [ 0 ], job/sec [ 44018.0 ] [2s] bench: jobs worked [ 75473 ], inserted [ 0 ], job/sec [ 37736.5 ] [2s] bench: jobs worked [ 82604 ], inserted [ 0 ], job/sec [ 41302.0 ] [2s] bench: jobs worked [ 84048 ], inserted [ 0 ], job/sec [ 42024.0 ] [2s] bench: jobs worked [ 85508 ], inserted [ 0 ], job/sec [ 42754.0 ] [2s] bench: jobs worked [ 90580 ], inserted [ 0 ], job/sec [ 45290.0 ] [2s] bench: jobs worked [ 83568 ], inserted [ 0 ], job/sec [ 41784.0 ] [2s] bench: jobs worked [ 86062 ], inserted [ 0 ], job/sec [ 43031.0 ] [2s] bench: jobs worked [ 88508 ], inserted [ 0 ], job/sec [ 44254.0 ] [2s] bench: jobs worked [ 75308 ], inserted [ 0 ], job/sec [ 37654.0 ] [2s] bench: total jobs worked [ 1000000 ], total jobs inserted [ 1000000 ], overall job/sec [ 42331.9 ], running 23.622860125s The improved benchmarks only work for fixed job burndown mode (with the `--num-total-jobs` option) because inserting jobs is still pretty slow because it's still done one by one. Once again, I'm pretty sure I'll be able to land some SQLite fixes that'll make batch operations possible using `json_each`, and then we should be able to make all normal operations batch-wise. That'll take some time though, and we can get this optimization out in time for the initial SQLite release. --- .../riverdrivertest/riverdrivertest.go | 4 +- .../riversqlite/internal/dbsqlc/river_job.sql | 10 +++ .../internal/dbsqlc/river_job.sql.go | 69 +++++++++++++++++++ .../riversqlite/river_sqlite_driver.go | 65 +++++++++++++++-- 4 files changed, 142 insertions(+), 6 deletions(-) diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 2657b4a2..c25fc12d 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -2261,7 +2261,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, t.Run("CompletesARunningJob", func(t *testing.T) { t.Parallel() - exec, _ := setup(ctx, t) + exec, bundle := setup(ctx, t) now := time.Now().UTC() @@ -2274,7 +2274,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, require.NoError(t, err) jobAfter := jobsAfter[0] require.Equal(t, rivertype.JobStateCompleted, jobAfter.State) - require.WithinDuration(t, now, *jobAfter.FinalizedAt, time.Microsecond) + require.WithinDuration(t, now, *jobAfter.FinalizedAt, bundle.driver.TimePrecision()) jobUpdated, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: ""}) require.NoError(t, err) diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql index 5a4b0e74..3586719a 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql @@ -380,6 +380,16 @@ WHERE id = @id AND state != 'running' RETURNING *; +-- This doesn't exist under the Postgres driver, but is used for an optimized +-- happy path for setting jobs to `complete` where metadata isn't required. +-- name: JobSetCompletedIfRunning :many +UPDATE /* TEMPLATE: schema */river_job +SET finalized_at = coalesce(cast(sqlc.narg('finalized_at') AS text), datetime('now', 'subsec')), + state = 'completed' +WHERE id IN (sqlc.slice('id')) + AND state = 'running' +RETURNING *; + -- Differs significantly from the Postgres version in that it can't do a bulk -- update, and since sqlc doesn't support `UPDATE` in CTEs, we need separate -- queries like JobSetMetadataIfNotRunning to do the fallback work. diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go index 5c308b4b..5a567257 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go @@ -1121,6 +1121,75 @@ func (q *Queries) JobScheduleSetDiscarded(ctx context.Context, db DBTX, arg *Job return items, nil } +const jobSetCompletedIfRunning = `-- name: JobSetCompletedIfRunning :many +UPDATE /* TEMPLATE: schema */river_job +SET finalized_at = coalesce(cast(?1 AS text), datetime('now', 'subsec')), + state = 'completed' +WHERE id IN (/*SLICE:id*/?) + AND state = 'running' +RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +` + +type JobSetCompletedIfRunningParams struct { + FinalizedAt *string + ID []int64 +} + +// This doesn't exist under the Postgres driver, but is used for an optimized +// happy path for setting jobs to `complete` where metadata isn't required. +func (q *Queries) JobSetCompletedIfRunning(ctx context.Context, db DBTX, arg *JobSetCompletedIfRunningParams) ([]*RiverJob, error) { + query := jobSetCompletedIfRunning + var queryParams []interface{} + queryParams = append(queryParams, arg.FinalizedAt) + if len(arg.ID) > 0 { + for _, v := range arg.ID { + queryParams = append(queryParams, v) + } + query = strings.Replace(query, "/*SLICE:id*/?", strings.Repeat(",?", len(arg.ID))[1:], 1) + } else { + query = strings.Replace(query, "/*SLICE:id*/?", "NULL", 1) + } + rows, err := db.QueryContext(ctx, query, queryParams...) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*RiverJob + for rows.Next() { + var i RiverJob + if err := rows.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + &i.AttemptedBy, + &i.CreatedAt, + &i.Errors, + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + &i.Tags, + &i.UniqueKey, + &i.UniqueStates, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const jobSetMetadataIfNotRunning = `-- name: JobSetMetadataIfNotRunning :one UPDATE /* TEMPLATE: schema */river_job SET metadata = json_patch(metadata, json(cast(?1 AS blob))) diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index 1504fd79..364731d4 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -11,9 +11,9 @@ // `dbPool.SetMaxOpenConns(1)`. // // A known deficiency in this driver compared to Postgres is that due to -// limitations in sqlc, it performs operations like completion and `InsertMany` -// one row at a time instead of in batches. This means that it's slower than the -// Postgres driver, especially when benchmarking. +// limitations in sqlc, it performs bulk operations like non-standard +// completions and `InsertMany` one row at a time instead of in batches. This +// means that it processes batches more slowly than the Postgres driver. package riversqlite import ( @@ -40,6 +40,7 @@ import ( "github.com/riverqueue/river/riverdriver/riversqlite/internal/dbsqlc" "github.com/riverqueue/river/rivershared/sqlctemplate" "github.com/riverqueue/river/rivershared/uniquestates" + "github.com/riverqueue/river/rivershared/util/maputil" "github.com/riverqueue/river/rivershared/util/ptrutil" "github.com/riverqueue/river/rivershared/util/randutil" "github.com/riverqueue/river/rivershared/util/sliceutil" @@ -683,8 +684,63 @@ func (e *Executor) JobSetStateIfRunningMany(ctx context.Context, params *riverdr ctx = schemaTemplateParam(ctx, params.Schema) dbtx := templateReplaceWrapper{dbtx: e.driver.UnwrapTx(execTx), replacer: &e.driver.replacer} + // Because it's by far the most common path, put in an optimization for + // jobs that we're setting to `completed` that don't have any metadata + // updates needed. Group those jobs out and complete them all in one + // query, then continue on and do all the other updates. + var ( + completedIDs = make([]int64, 0, len(params.ID)) + completedIndexes = make(map[int64]int, len(params.ID)) // job ID -> params index (for setting result) + ) + for i, id := range params.ID { + if params.State[i] == rivertype.JobStateCompleted && !params.MetadataDoMerge[i] { + completedIDs = append(completedIDs, id) + completedIndexes[id] = i + } + } + + if len(completedIDs) > 0 { + jobs, err := dbsqlc.New().JobSetCompletedIfRunning(ctx, dbtx, &dbsqlc.JobSetCompletedIfRunningParams{ + ID: completedIDs, + FinalizedAt: timeStringNullable(params.Now), + }) + if err != nil { + return fmt.Errorf("error setting completed state on jobs: %w", err) + } + + for _, job := range jobs { + setRes[completedIndexes[job.ID]], err = jobRowFromInternal(job) + if err != nil { + return err + } + delete(completedIndexes, job.ID) + } + + // Fetch any jobs that weren't set by the query above because they + // weren't `running`. In practice this should be quite rare, but we + // check for it in the test suite. + if len(completedIndexes) > 0 { + jobs, err := dbsqlc.New().JobGetByIDMany(ctx, dbtx, maputil.Keys(completedIndexes)) + if err != nil { + return fmt.Errorf("error getting non-running jobs: %w", err) + } + + for _, job := range jobs { + setRes[completedIndexes[job.ID]], err = jobRowFromInternal(job) + if err != nil { + return err + } + } + } + } + // Should be a batch insert, but that's currently impossible with SQLite/sqlc. https://github.com/sqlc-dev/sqlc/issues/3802 - for i := range params.ID { + for i, id := range params.ID { + // Skip job if we handled it in the happy path optimization above. + if _, ok := completedIndexes[id]; ok { + continue + } + setStateParams := &dbsqlc.JobSetStateIfRunningParams{ ID: params.ID[i], Error: []byte("{}"), // even if not used, must be valid JSON because it's bed into the `json` function @@ -723,6 +779,7 @@ func (e *Executor) JobSetStateIfRunningMany(ctx context.Context, params *riverdr MetadataUpdates: sliceutil.FirstNonEmpty(params.MetadataUpdates[i], []byte("{}")), }) if err != nil { + // Allow a job to have been deleted in the interim. if errors.Is(err, sql.ErrNoRows) { return nil }