-
Notifications
You must be signed in to change notification settings - Fork 131
Expose LISTEN pubsub functionality #129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -542,6 +542,35 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client | |
| return client, nil | ||
| } | ||
|
|
||
| // pgIdentifierRegexp matches all valid PostgreSQL identifier names: | ||
| var pgIdentifierRegexp = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_]*$") | ||
|
|
||
| // Listen subscribes to a notification topic. The notifyFunc will be called for | ||
| // any notification that is received on the topic. The returned subscription must be | ||
| // closed with Unlisten when no longer needed. | ||
| // | ||
| // The provided notifyFunc should be fast and not block to avoid dropping other | ||
| // notifications and potentially causing internal issues within River. | ||
| // | ||
| // It returns an error if the specified topic is not a valid Postgres | ||
| // identifier. Panics if notifyFunc is nil. | ||
| func (c *Client[TTx]) Listen(topic string, notifyFunc NotifyFunc) (*ListenSubscription, error) { | ||
| // Validate that the topic is a valid Postgres identifier: | ||
| if valid := pgIdentifierRegexp.MatchString(topic); !valid { | ||
| return nil, fmt.Errorf("invalid topic name %q") | ||
| } | ||
| if notifyFunc == nil { | ||
| panic("notifyFunc is required") | ||
| } | ||
|
|
||
| wrapperFunc := func(topic notifier.NotificationTopic, payload string) { | ||
| notifyFunc(string(topic), payload) | ||
| } | ||
| return &ListenSubscription{ | ||
| Subscription: c.notifier.Listen(notifier.NotificationTopic(topic), wrapperFunc), | ||
| }, nil | ||
| } | ||
|
|
||
| // Start starts the client's job fetching and working loops. Once this is called, | ||
| // the client will run in a background goroutine until stopped. All jobs are | ||
| // run with a context inheriting from the provided context, but with a timeout | ||
|
|
@@ -618,6 +647,11 @@ func (c *Client[TTx]) Start(ctx context.Context) error { | |
|
|
||
| c.wg.Add(2) | ||
| go func() { | ||
| // TODO(bgentry): is this the wrong context? Should the notifier actually | ||
| // use the `workCtx` so that it doesn't get shut down before existing jobs | ||
| // have finished / had their contexts cancelled? This would preserve the | ||
| // ability to cancel an individual job's context during the initial | ||
| // shutdown phase. | ||
| c.notifier.Run(fetchNewWorkCtx) | ||
| c.wg.Done() | ||
|
Comment on lines
+650
to
656
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I noticed this while I was in here and wanted to ask about it. If we shut down the notifier at the same time as we stop fetching new work, that means the notifier is no longer available for ongoing jobs (including potentially for the individualized cancellation of ongoing jobs). That seems wrong. Perhaps we should use the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not much to add, but +1 — seems wrong as is. I'm a little scared to add another stop step since it's already somewhat difficult to understand the progression of each step as is. IMO, might not be a bad idea to try and refactor stopping at some point so that each step is easily visible in sequence (i.e. most of the logic lives in one place and you can easily scroll through each phase). |
||
| }() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| package river | ||
|
|
||
| import "github.com/riverqueue/river/internal/notifier" | ||
|
|
||
| // ListenSubscription is a subscription returned from the Client's Listen method. | ||
| type ListenSubscription struct { | ||
| *notifier.Subscription | ||
| } | ||
|
|
||
| // Unlisten stops listening to this particular subscription. | ||
| func (sub *ListenSubscription) Unlisten() { | ||
| sub.Subscription.Unlisten() | ||
| } | ||
|
|
||
| // NotifyFunc is a function that will be called any time a Postgres notification | ||
| // payload is receiverd on the specified topic. It must not block, or messages | ||
| // will be dropped (including those important to River's internal operation). | ||
| type NotifyFunc func(topic string, payload string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brandur should we also forbid the use of
river_prefix topics? Or do you think there's no real risk in allowing it? I've gone back and forth but atm I'm not concerned.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, doesn't hurt to start with constraints first and relax them later if it seems necessary.