diff --git a/go.work b/go.work index 711343a..122c708 100644 --- a/go.work +++ b/go.work @@ -1,8 +1,9 @@ -go 1.24.2 +go 1.25.5 use ( ./datadogriver - ./otelriver ./nilerror + ./otelriver ./panictoerror + ./versionedjob ) diff --git a/versionedjob/README.md b/versionedjob/README.md new file mode 100644 index 0000000..6fb6a90 --- /dev/null +++ b/versionedjob/README.md @@ -0,0 +1,167 @@ +# versionedjob [![Build Status](https://github.com/riverqueue/rivercontrib/actions/workflows/ci.yaml/badge.svg?branch=master)](https://github.com/riverqueue/rivercontrib/actions) [![Go Reference](https://pkg.go.dev/badge/github.com/riverqueue/rivercontrib.svg)](https://pkg.go.dev/github.com/riverqueue/rivercontrib/versionedjob) + +Provides a River hook with a simple job versioning framework. **Version transformers** are written for versioned jobs containing procedures for upgrading jobs that were encoded as older versions to the most modern version. This allows for workers to be implemented as if all job versions will be the most modern version only, keeping code simpler. + +```go +// VersionTransformer defines how to perform transformations between versions +// for a specific job kind. +type VersionTransformer interface { + // ApplyVersion applies version transformations to the given job. Version + // transformations are fully defined according to the user, as well as how a + // version is extracted from the job's args. + // + // Generally, this function should extract a version from the job, then + // apply versions one by one until it's fully modernized to the point where + // it can be successfully run by its worker. + ApplyVersion(job *rivertype.JobRow) error + + // Kind is the job kind that this transformer applies to. + Kind() string +} +``` + +## Example + +Below are three versions of the same job: `VersionedJobArgsV1`, `VersionedJobArgsV2`, and the current version, `VersionedJobArgs`. From V1 to V2, `name` was renamed to `title`, and a `version` field added to track version. In V3, a new `description` property was added. A real program would only keep the latest version (`VersionedJobArgs`), but this example shows all three for reference. + +```go +type VersionedJobArgsV1 struct { + Name string `json:"name"` +} + +type VersionedJobArgsV2 struct { + Title string `json:"title"` + Version int `json:"version"` +} + +type VersionedJobArgs struct { + Description string `json:"description"` + Title string `json:"title"` + Version int `json:"version"` +} +``` + +The worker for `VersionedJobArgs` is written so it only handles the latest version (`title` instead of `name` and assumes `description` is present). This is possible because a `VersionTransformer` will handle migrating jobs from old versions to new ones before they hit the worker. + +```go +type VersionedJobWorker struct { + river.WorkerDefaults[VersionedJobArgs] +} + +func (w *VersionedJobWorker) Work(ctx context.Context, job *river.Job[VersionedJobArgs]) error { + fmt.Printf("Job title: %s; description: %s\n", job.Args.Title, job.Args.Description) + return nil +} +``` + +The `VersionTransformer` implementation handles version upgrades one by one. Jobs which are multiple versions old can still be upgraded because multiple version changes can be applied in one go. This implementation uses `gjson`/`sjson` so that each change need only know a minimum about the data object in question and that unknown fields are retained. Other approaches are possible though, including using only Go's built-in `gjson` package. + +```go +type VersionedJobTransformer struct{} + +func (*VersionedJobTransformer) ApplyVersion(ctx context.Context, job *rivertype.JobRow) error { + // Extract version from job, defaulting to 1 if not present because we + // assume that was before versioning was introduced. + version := cmp.Or(gjson.GetBytes(job.EncodedArgs, "version").Int(), 1) + + var err error + + // + // Here, we walk through each successive version, applying transformations + // to bring it to its next version. If a job is multiple versions behind, + // version transformations are one-by-one applied in order until the job's + // args are fully modernized. + // + + // Version change: V1 --> V2 + if version < 2 { + version = 2 + + job.EncodedArgs, err = sjson.SetBytes(job.EncodedArgs, "title", gjson.GetBytes(job.EncodedArgs, "name").String()) + if err != nil { + return err + } + + job.EncodedArgs, err = sjson.DeleteBytes(job.EncodedArgs, "name") + if err != nil { + return err + } + } + + // Version change: V2 --> V3 + if version < 3 { + version = 3 + + title := gjson.GetBytes(job.EncodedArgs, "title").String() + if title == "" { + return errors.New("no title found in job args") + } + + job.EncodedArgs, err = sjson.SetBytes(job.EncodedArgs, "description", "A description of a "+title+".") + if err != nil { + return err + } + } + + // Not strictly necessary, but set version to latest. + job.EncodedArgs, err = sjson.SetBytes(job.EncodedArgs, "version", version) + if err != nil { + return err + } + + return nil +} + +func (*VersionedJobTransformer) Kind() string { return (VersionedJobArgs{}).Kind() } +``` + +A River client is initialized with the `versiondjob` hook and transformer installed: + +```go +riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + Hooks: []rivertype.Hook{ + versionedjob.NewHook(&versionedjob.HookConfig{ + Transformers: []versionedjob.VersionTransformer{ + &VersionedJobTransformer{}, + }, + }), + }, +}) +if err != nil { + panic(err) +} +``` + +With all that in place, a job of any version can be inserted and thanks to the version transformer modernizing the older ones, the worker will produce the same result regardless of input. + +```go +if _, err = riverClient.InsertMany(ctx, []river.InsertManyParams{ + { + Args: VersionedJobArgsV1{ + Name: "My Job", + }, + }, + { + Args: VersionedJobArgsV2{ + Title: "My Job", + Version: 2, + }, + }, + { + Args: VersionedJobArgs{ + Title: "My Job", + Description: "A description of a My Job.", + Version: 3, + }, + }, +}); err != nil { + panic(err) +} +``` + +```go +// Output: +// Job title: My Job; description: A description of a My Job. +// Job title: My Job; description: A description of a My Job. +// Job title: My Job; description: A description of a My Job. +``` diff --git a/versionedjob/go.mod b/versionedjob/go.mod new file mode 100644 index 0000000..defcc14 --- /dev/null +++ b/versionedjob/go.mod @@ -0,0 +1,30 @@ +module github.com/riverqueue/rivercontrib/versionedjob + +go 1.25.5 + +require ( + github.com/jackc/pgx/v5 v5.7.6 + github.com/riverqueue/river v0.29.0 + github.com/riverqueue/river/riverdriver/riverpgxv5 v0.29.0 + github.com/riverqueue/river/rivershared v0.29.0 + github.com/riverqueue/river/rivertype v0.29.0 + github.com/stretchr/testify v1.11.1 + github.com/tidwall/gjson v1.18.0 + github.com/tidwall/sjson v1.2.5 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/riverqueue/river/riverdriver v0.29.0 // indirect + github.com/tidwall/match v1.2.0 // indirect + github.com/tidwall/pretty v1.2.1 // indirect + go.uber.org/goleak v1.3.0 // indirect + golang.org/x/crypto v0.45.0 // indirect + golang.org/x/sync v0.19.0 // indirect + golang.org/x/text v0.32.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/versionedjob/go.sum b/versionedjob/go.sum new file mode 100644 index 0000000..e732e68 --- /dev/null +++ b/versionedjob/go.sum @@ -0,0 +1,63 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0= +github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk= +github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/riverqueue/river v0.29.0 h1:PMO4k6n7HcIjjgrbnG2UG04Exh8aLmQksOddOoYDASA= +github.com/riverqueue/river v0.29.0/go.mod h1:S8BbQbxCrJLYygmnrnraltHhWlGzZzwjqcRbY3wdq7w= +github.com/riverqueue/river/riverdriver v0.29.0 h1:o7mV07RPXrGJdwXUKxVTOyvG1/cDmJIMI3V4Le4/LBo= +github.com/riverqueue/river/riverdriver v0.29.0/go.mod h1:bmkdn74EG4Ogsv44JkC1CBxFZ3JHfYsN+e0K8Dq0otU= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.29.0 h1:l3D17JWq/00QEt0bcawyDMxZYmM1YAk11Y/nRRVk5C8= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.29.0/go.mod h1:mpncN3m7DR7VpD78LV5CczbSpwkWcLeJ5j1kkJiOt9s= +github.com/riverqueue/river/rivershared v0.29.0 h1:Niwbmp/CQAKPZ+zT3teCgEmPhksyW0f2cx4X03FurEk= +github.com/riverqueue/river/rivershared v0.29.0/go.mod h1:74WjXTYKV4nTfLemIPloPqiA3Tjqe5BFvnALrNbS62k= +github.com/riverqueue/river/rivertype v0.29.0 h1:26hpzbd44piqJZ+1zO4RO6GRKpmZVX3Ncx+Ki+w2gtg= +github.com/riverqueue/river/rivertype v0.29.0/go.mod h1:rWpgI59doOWS6zlVocROcwc00fZ1RbzRwsRTU8CDguw= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= +github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/match v1.2.0 h1:0pt8FlkOwjN2fPt4bIl4BoNxb98gGHN2ObFEDkrfZnM= +github.com/tidwall/match v1.2.0/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= +golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU= +golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/versionedjob/hook_example_test.go b/versionedjob/hook_example_test.go new file mode 100644 index 0000000..b12b0b4 --- /dev/null +++ b/versionedjob/hook_example_test.go @@ -0,0 +1,211 @@ +package versionedjob_test + +import ( + "cmp" + "context" + "errors" + "fmt" + "log/slog" + "os" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/tidwall/gjson" + "github.com/tidwall/sjson" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdbtest" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/riversharedtest" + "github.com/riverqueue/river/rivershared/util/slogutil" + "github.com/riverqueue/river/rivershared/util/testutil" + "github.com/riverqueue/river/rivertype" + "github.com/riverqueue/rivercontrib/versionedjob" +) + +// VersionedJobArgsV1 is the V1 version of the versioned job args. It's present +// in this example for reference, but in real example it'd be removed in favor +// of only the latest version and version transformer. +// +// Initial version of the job. Contains only a name field. There was no version +// field because its author didn't yet know they were going to need successive +// versions. +type VersionedJobArgsV1 struct { + Name string `json:"name"` +} + +func (VersionedJobArgsV1) Kind() string { return (VersionedJobArgs{}).Kind() } + +// VersionedJobArgsV2 is the V2 version of the versioned job args. It's present +// in this example for reference, but in real example it'd be removed in favor +// of only the latest version and version transformer. +// +// In V2, the name field was renamed to title. This is a direct mapping so the +// transformer just needs to move the value from one place to the other. A +// version field is added so we can differentiate V1 and V2. +type VersionedJobArgsV2 struct { + Title string `json:"title"` + Version int `json:"version"` +} + +func (VersionedJobArgsV2) Kind() string { return (VersionedJobArgs{}).Kind() } + +// VersionedJobArgs is the V3 (current) version of the versioned job args. +// +// In V3, a description field is added. When versioning forward, a default value +// can be derived from the title. +type VersionedJobArgs struct { + Description string `json:"description"` + Title string `json:"title"` + Version int `json:"version"` +} + +func (VersionedJobArgs) Kind() string { return "versioned_job" } + +// +// Worker +// + +type VersionedJobWorker struct { + river.WorkerDefaults[VersionedJobArgs] +} + +func (w *VersionedJobWorker) Work(ctx context.Context, job *river.Job[VersionedJobArgs]) error { + fmt.Printf("Job title: %s; description: %s\n", job.Args.Title, job.Args.Description) + return nil +} + +type VersionedJobTransformer struct{} + +func (*VersionedJobTransformer) ApplyVersion(ctx context.Context, job *rivertype.JobRow) error { + // Extract version from job, defaulting to 1 if not present because we + // assume that was before versioning was introduced. + version := cmp.Or(gjson.GetBytes(job.EncodedArgs, "version").Int(), 1) + + var err error + + // + // Here, we walk through each successive version, applying transformations + // to bring it to its next version. If a job is multiple versions behind, + // version transformations are one-by-one applied in order until the job's + // args are fully modernized. + // + + // Version change: V1 --> V2 + if version < 2 { + version = 2 + + job.EncodedArgs, err = sjson.SetBytes(job.EncodedArgs, "title", gjson.GetBytes(job.EncodedArgs, "name").String()) + if err != nil { + return err + } + + job.EncodedArgs, err = sjson.DeleteBytes(job.EncodedArgs, "name") + if err != nil { + return err + } + } + + // Version change: V2 --> V3 + if version < 3 { + version = 3 + + title := gjson.GetBytes(job.EncodedArgs, "title").String() + if title == "" { + return errors.New("no title found in job args") + } + + job.EncodedArgs, err = sjson.SetBytes(job.EncodedArgs, "description", "A description of a "+title+".") + if err != nil { + return err + } + } + + // Not strictly necessary, but set version to latest. + job.EncodedArgs, err = sjson.SetBytes(job.EncodedArgs, "version", version) + if err != nil { + return err + } + + return nil +} + +func (*VersionedJobTransformer) Kind() string { return (VersionedJobArgs{}).Kind() } + +func ExampleHook() { + ctx := context.Background() + + dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL()) + if err != nil { + panic(err) + } + defer dbPool.Close() + + workers := river.NewWorkers() + river.AddWorker(workers, &VersionedJobWorker{}) + + riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + Hooks: []rivertype.Hook{ + versionedjob.NewHook(&versionedjob.HookConfig{ + Transformers: []versionedjob.VersionTransformer{ + &VersionedJobTransformer{}, + }, + }), + }, + Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})), + Queues: map[string]river.QueueConfig{ + river.QueueDefault: {MaxWorkers: 100}, + }, + Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test + TestOnly: true, // suitable only for use in tests; remove for live environments + Workers: workers, + }) + if err != nil { + panic(err) + } + + // Out of example scope, but used to wait until a job is worked. + subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted) + defer subscribeCancel() + + // Insert all three versions of the job. In each case, thanks to the version + // transformer, the worker will receive the same args and produce the same + // output for each regardless of what version of the job went in. + if _, err = riverClient.InsertMany(ctx, []river.InsertManyParams{ + { + Args: VersionedJobArgsV1{ + Name: "My Job", + }, + }, + { + Args: VersionedJobArgsV2{ + Title: "My Job", + Version: 2, + }, + }, + { + Args: VersionedJobArgs{ + Title: "My Job", + Description: "A description of a My Job.", + Version: 3, + }, + }, + }); err != nil { + panic(err) + } + + if err := riverClient.Start(ctx); err != nil { + panic(err) + } + + // Wait for jobs to complete. Only needed for purposes of the example test. + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 3) + + if err := riverClient.Stop(ctx); err != nil { + panic(err) + } + + // Output: + // Job title: My Job; description: A description of a My Job. + // Job title: My Job; description: A description of a My Job. + // Job title: My Job; description: A description of a My Job. +} diff --git a/versionedjob/versioned_job.go b/versionedjob/versioned_job.go new file mode 100644 index 0000000..5bdcde4 --- /dev/null +++ b/versionedjob/versioned_job.go @@ -0,0 +1,76 @@ +package versionedjob + +import ( + "context" + + "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivertype" +) + +// VersionTransformer defines how to perform transformations between versions +// for a specific job kind. +type VersionTransformer interface { + // ApplyVersion applies version transformations to the given job. Version + // transformations are fully defined according to the user, as well as how a + // version is extracted from the job's args. + // + // Generally, this function should extract a version from the job, then + // apply versions one by one until it's fully modernized to the point where + // it can be successfully run by its worker. + ApplyVersion(ctx context.Context, job *rivertype.JobRow) error + + // Kind is the job kind that this transformer applies to. + Kind() string +} + +// Verify interface compliance. +var _ rivertype.HookWorkBegin = &Hook{} + +// HookConfig is configuration for the versionedjob hook. +type HookConfig struct { + // Transformers are version transformers that the hook will apply. Only one + // version transformer should be registered for any particular job kind. + Transformers []VersionTransformer +} + +// Hook is a River hook that applies version transformations on jobs so that +// workers can be written to handle only the most modern version, keeping worker +// code simple and clean. +type Hook struct { + baseservice.BaseService + rivertype.Hook + + config *HookConfig + transformersMap map[string]VersionTransformer +} + +// NewHook initializes a new River versionedjob hook. +// +// config may be nil. +func NewHook(config *HookConfig) *Hook { + if config == nil { + config = &HookConfig{} + } + + transformersMap := make(map[string]VersionTransformer, len(config.Transformers)) + for _, transformer := range config.Transformers { + if _, ok := transformersMap[transformer.Kind()]; ok { + panic("duplicate version transformer for kind: " + transformer.Kind()) + } + + transformersMap[transformer.Kind()] = transformer + } + + return &Hook{ + config: config, + transformersMap: transformersMap, + } +} + +func (h *Hook) WorkBegin(ctx context.Context, job *rivertype.JobRow) error { + if transformer, ok := h.transformersMap[job.Kind]; ok { + return transformer.ApplyVersion(ctx, job) + } + + return nil +} diff --git a/versionedjob/versioned_job_test.go b/versionedjob/versioned_job_test.go new file mode 100644 index 0000000..6dcc47b --- /dev/null +++ b/versionedjob/versioned_job_test.go @@ -0,0 +1,128 @@ +package versionedjob_test + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/riversharedtest" + "github.com/riverqueue/river/rivertype" + "github.com/riverqueue/rivercontrib/versionedjob" +) + +func TestHook(t *testing.T) { + t.Parallel() + + ctx := t.Context() + + type testBundle struct{} + + setupConfig := func(t *testing.T, config *versionedjob.HookConfig) (*versionedjob.Hook, *testBundle) { + t.Helper() + + return baseservice.Init( + riversharedtest.BaseServiceArchetype(t), + versionedjob.NewHook(config), + ), &testBundle{} + } + + setup := func(t *testing.T) (*versionedjob.Hook, *testBundle) { + t.Helper() + + return setupConfig(t, &versionedjob.HookConfig{ + Transformers: []versionedjob.VersionTransformer{ + &VersionedJobTransformer{}, + }, + }) + } + + t.Run("CurrentVersionNoOp", func(t *testing.T) { + t.Parallel() + + hook, _ := setup(t) + + job := &rivertype.JobRow{ + EncodedArgs: mustMarshalJSON(t, map[string]any{ + "title": "My Job", + "description": "A description of a My Job.", + "version": 3, + }), + Kind: (VersionedJobArgs{}).Kind(), + } + + require.NoError(t, hook.WorkBegin(ctx, job)) + + // Expect no changes. + require.Equal(t, VersionedJobArgs{ + Title: "My Job", + Description: "A description of a My Job.", + Version: 3, + }, mustUnmarshalJSON[VersionedJobArgs](t, job.EncodedArgs)) + }) + + t.Run("AppliesVersion", func(t *testing.T) { + t.Parallel() + + hook, _ := setup(t) + + job := &rivertype.JobRow{ + EncodedArgs: mustMarshalJSON(t, map[string]any{ + "title": "My Job", + "version": 2, + }), + Kind: (VersionedJobArgs{}).Kind(), + } + + require.NoError(t, hook.WorkBegin(ctx, job)) + + // Expect latest version. + require.Equal(t, VersionedJobArgs{ + Title: "My Job", + Description: "A description of a My Job.", + Version: 3, + }, mustUnmarshalJSON[VersionedJobArgs](t, job.EncodedArgs)) + }) + + t.Run("AppliesMultipleVersions", func(t *testing.T) { + t.Parallel() + + hook, _ := setup(t) + + job := &rivertype.JobRow{ + EncodedArgs: mustMarshalJSON(t, map[string]any{ + "name": "My Job", + // notably, version is absent here because job rows likely start life without one + }), + Kind: (VersionedJobArgs{}).Kind(), + } + + require.NoError(t, hook.WorkBegin(ctx, job)) + + // Expect latest version. + require.Equal(t, VersionedJobArgs{ + Title: "My Job", + Description: "A description of a My Job.", + Version: 3, + }, mustUnmarshalJSON[VersionedJobArgs](t, job.EncodedArgs)) + }) +} + +func mustMarshalJSON(t *testing.T, v any) []byte { + t.Helper() + + data, err := json.Marshal(v) + require.NoError(t, err) + + return data +} + +func mustUnmarshalJSON[T any](t *testing.T, data []byte) T { + t.Helper() + + var v T + require.NoError(t, json.Unmarshal(data, &v)) + + return v +}