Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions go.work
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
go 1.24.2
go 1.25.5

use (
./datadogriver
./otelriver
./nilerror
./otelriver
./panictoerror
./versionedjob
)
167 changes: 167 additions & 0 deletions versionedjob/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# versionedjob [![Build Status](https://github.com/riverqueue/rivercontrib/actions/workflows/ci.yaml/badge.svg?branch=master)](https://github.com/riverqueue/rivercontrib/actions) [![Go Reference](https://pkg.go.dev/badge/github.com/riverqueue/rivercontrib.svg)](https://pkg.go.dev/github.com/riverqueue/rivercontrib/versionedjob)

Provides a River hook with a simple job versioning framework. **Version transformers** are written for versioned jobs containing procedures for upgrading jobs that were encoded as older versions to the most modern version. This allows for workers to be implemented as if all job versions will be the most modern version only, keeping code simpler.

```go
// VersionTransformer defines how to perform transformations between versions
// for a specific job kind.
type VersionTransformer interface {
// ApplyVersion applies version transformations to the given job. Version
// transformations are fully defined according to the user, as well as how a
// version is extracted from the job's args.
//
// Generally, this function should extract a version from the job, then
// apply versions one by one until it's fully modernized to the point where
// it can be successfully run by its worker.
ApplyVersion(job *rivertype.JobRow) error

// Kind is the job kind that this transformer applies to.
Kind() string
}
```

## Example

Below are three versions of the same job: `VersionedJobArgsV1`, `VersionedJobArgsV2`, and the current version, `VersionedJobArgs`. From V1 to V2, `name` was renamed to `title`, and a `version` field added to track version. In V3, a new `description` property was added. A real program would only keep the latest version (`VersionedJobArgs`), but this example shows all three for reference.

```go
type VersionedJobArgsV1 struct {
Name string `json:"name"`
}

type VersionedJobArgsV2 struct {
Title string `json:"title"`
Version int `json:"version"`
}

type VersionedJobArgs struct {
Description string `json:"description"`
Title string `json:"title"`
Version int `json:"version"`
}
```

The worker for `VersionedJobArgs` is written so it only handles the latest version (`title` instead of `name` and assumes `description` is present). This is possible because a `VersionTransformer` will handle migrating jobs from old versions to new ones before they hit the worker.

```go
type VersionedJobWorker struct {
river.WorkerDefaults[VersionedJobArgs]
}

func (w *VersionedJobWorker) Work(ctx context.Context, job *river.Job[VersionedJobArgs]) error {
fmt.Printf("Job title: %s; description: %s\n", job.Args.Title, job.Args.Description)
return nil
}
```

The `VersionTransformer` implementation handles version upgrades one by one. Jobs which are multiple versions old can still be upgraded because multiple version changes can be applied in one go. This implementation uses `gjson`/`sjson` so that each change need only know a minimum about the data object in question and that unknown fields are retained. Other approaches are possible though, including using only Go's built-in `gjson` package.

```go
type VersionedJobTransformer struct{}

func (*VersionedJobTransformer) ApplyVersion(ctx context.Context, job *rivertype.JobRow) error {
// Extract version from job, defaulting to 1 if not present because we
// assume that was before versioning was introduced.
version := cmp.Or(gjson.GetBytes(job.EncodedArgs, "version").Int(), 1)

var err error

//
// Here, we walk through each successive version, applying transformations
// to bring it to its next version. If a job is multiple versions behind,
// version transformations are one-by-one applied in order until the job's
// args are fully modernized.
//

// Version change: V1 --> V2
if version < 2 {
version = 2

job.EncodedArgs, err = sjson.SetBytes(job.EncodedArgs, "title", gjson.GetBytes(job.EncodedArgs, "name").String())
if err != nil {
return err
}

job.EncodedArgs, err = sjson.DeleteBytes(job.EncodedArgs, "name")
if err != nil {
return err
}
}

// Version change: V2 --> V3
if version < 3 {
version = 3

title := gjson.GetBytes(job.EncodedArgs, "title").String()
if title == "" {
return errors.New("no title found in job args")
}

job.EncodedArgs, err = sjson.SetBytes(job.EncodedArgs, "description", "A description of a "+title+".")
if err != nil {
return err
}
}

// Not strictly necessary, but set version to latest.
job.EncodedArgs, err = sjson.SetBytes(job.EncodedArgs, "version", version)
if err != nil {
return err
}

return nil
}

func (*VersionedJobTransformer) Kind() string { return (VersionedJobArgs{}).Kind() }
```

A River client is initialized with the `versiondjob` hook and transformer installed:

```go
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Hooks: []rivertype.Hook{
versionedjob.NewHook(&versionedjob.HookConfig{
Transformers: []versionedjob.VersionTransformer{
&VersionedJobTransformer{},
},
}),
},
})
if err != nil {
panic(err)
}
```

With all that in place, a job of any version can be inserted and thanks to the version transformer modernizing the older ones, the worker will produce the same result regardless of input.

```go
if _, err = riverClient.InsertMany(ctx, []river.InsertManyParams{
{
Args: VersionedJobArgsV1{
Name: "My Job",
},
},
{
Args: VersionedJobArgsV2{
Title: "My Job",
Version: 2,
},
},
{
Args: VersionedJobArgs{
Title: "My Job",
Description: "A description of a My Job.",
Version: 3,
},
},
}); err != nil {
panic(err)
}
```

```go
// Output:
// Job title: My Job; description: A description of a My Job.
// Job title: My Job; description: A description of a My Job.
// Job title: My Job; description: A description of a My Job.
```
30 changes: 30 additions & 0 deletions versionedjob/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
module github.com/riverqueue/rivercontrib/versionedjob

go 1.25.5

require (
github.com/jackc/pgx/v5 v5.7.6
github.com/riverqueue/river v0.29.0
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.29.0
github.com/riverqueue/river/rivershared v0.29.0
github.com/riverqueue/river/rivertype v0.29.0
github.com/stretchr/testify v1.11.1
github.com/tidwall/gjson v1.18.0
github.com/tidwall/sjson v1.2.5
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/riverqueue/river/riverdriver v0.29.0 // indirect
github.com/tidwall/match v1.2.0 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
go.uber.org/goleak v1.3.0 // indirect
golang.org/x/crypto v0.45.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/text v0.32.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
63 changes: 63 additions & 0 deletions versionedjob/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0=
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk=
github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/riverqueue/river v0.29.0 h1:PMO4k6n7HcIjjgrbnG2UG04Exh8aLmQksOddOoYDASA=
github.com/riverqueue/river v0.29.0/go.mod h1:S8BbQbxCrJLYygmnrnraltHhWlGzZzwjqcRbY3wdq7w=
github.com/riverqueue/river/riverdriver v0.29.0 h1:o7mV07RPXrGJdwXUKxVTOyvG1/cDmJIMI3V4Le4/LBo=
github.com/riverqueue/river/riverdriver v0.29.0/go.mod h1:bmkdn74EG4Ogsv44JkC1CBxFZ3JHfYsN+e0K8Dq0otU=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.29.0 h1:l3D17JWq/00QEt0bcawyDMxZYmM1YAk11Y/nRRVk5C8=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.29.0/go.mod h1:mpncN3m7DR7VpD78LV5CczbSpwkWcLeJ5j1kkJiOt9s=
github.com/riverqueue/river/rivershared v0.29.0 h1:Niwbmp/CQAKPZ+zT3teCgEmPhksyW0f2cx4X03FurEk=
github.com/riverqueue/river/rivershared v0.29.0/go.mod h1:74WjXTYKV4nTfLemIPloPqiA3Tjqe5BFvnALrNbS62k=
github.com/riverqueue/river/rivertype v0.29.0 h1:26hpzbd44piqJZ+1zO4RO6GRKpmZVX3Ncx+Ki+w2gtg=
github.com/riverqueue/river/rivertype v0.29.0/go.mod h1:rWpgI59doOWS6zlVocROcwc00fZ1RbzRwsRTU8CDguw=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/match v1.2.0 h1:0pt8FlkOwjN2fPt4bIl4BoNxb98gGHN2ObFEDkrfZnM=
github.com/tidwall/match v1.2.0/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q=
golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4=
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU=
golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Loading
Loading