Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,35 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
return client, nil
}

// pgIdentifierRegexp matches all valid PostgreSQL identifier names:
var pgIdentifierRegexp = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_]*$")

// Listen subscribes to a notification topic. The notifyFunc will be called for
// any notification that is received on the topic. The returned subscription must be
// closed with Unlisten when no longer needed.
//
// The provided notifyFunc should be fast and not block to avoid dropping other
// notifications and potentially causing internal issues within River.
//
// It returns an error if the specified topic is not a valid Postgres
// identifier. Panics if notifyFunc is nil.
func (c *Client[TTx]) Listen(topic string, notifyFunc NotifyFunc) (*ListenSubscription, error) {
// Validate that the topic is a valid Postgres identifier:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brandur should we also forbid the use of river_ prefix topics? Or do you think there's no real risk in allowing it? I've gone back and forth but atm I'm not concerned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, doesn't hurt to start with constraints first and relax them later if it seems necessary.

if valid := pgIdentifierRegexp.MatchString(topic); !valid {
return nil, fmt.Errorf("invalid topic name %q")
}
if notifyFunc == nil {
panic("notifyFunc is required")
}

wrapperFunc := func(topic notifier.NotificationTopic, payload string) {
notifyFunc(string(topic), payload)
}
return &ListenSubscription{
Subscription: c.notifier.Listen(notifier.NotificationTopic(topic), wrapperFunc),
}, nil
}

// Start starts the client's job fetching and working loops. Once this is called,
// the client will run in a background goroutine until stopped. All jobs are
// run with a context inheriting from the provided context, but with a timeout
Expand Down Expand Up @@ -618,6 +647,11 @@ func (c *Client[TTx]) Start(ctx context.Context) error {

c.wg.Add(2)
go func() {
// TODO(bgentry): is this the wrong context? Should the notifier actually
// use the `workCtx` so that it doesn't get shut down before existing jobs
// have finished / had their contexts cancelled? This would preserve the
// ability to cancel an individual job's context during the initial
// shutdown phase.
c.notifier.Run(fetchNewWorkCtx)
c.wg.Done()
Comment on lines +650 to 656
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed this while I was in here and wanted to ask about it. If we shut down the notifier at the same time as we stop fetching new work, that means the notifier is no longer available for ongoing jobs (including potentially for the individualized cancellation of ongoing jobs). That seems wrong. Perhaps we should use the workCtx instead? That will require a bit of refactoring for the Stop flow because we'd want to call workCancel as soon as the producers finish but before waiting on the notifier. Currently, the notifier is bundled together into a waitgroup with producers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not much to add, but +1 — seems wrong as is. I'm a little scared to add another stop step since it's already somewhat difficult to understand the progression of each step as is. IMO, might not be a bad idea to try and refactor stopping at some point so that each step is easily visible in sequence (i.e. most of the logic lives in one place and you can easily scroll through each phase).

}()
Expand Down
18 changes: 18 additions & 0 deletions listen_subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package river

import "github.com/riverqueue/river/internal/notifier"

// ListenSubscription is a subscription returned from the Client's Listen method.
type ListenSubscription struct {
*notifier.Subscription
}

// Unlisten stops listening to this particular subscription.
func (sub *ListenSubscription) Unlisten() {
sub.Subscription.Unlisten()
}

// NotifyFunc is a function that will be called any time a Postgres notification
// payload is receiverd on the specified topic. It must not block, or messages
// will be dropped (including those important to River's internal operation).
type NotifyFunc func(topic string, payload string)