From 347085e20e63199c61f7427d43243c0a8054c308 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Sat, 23 Dec 2023 12:40:57 -0600 Subject: [PATCH 1/2] expose listen/notify pubsub functionality We already have a robust LISTEN/NOTIFY implementation internally. It would potentially be useful for a variety of user-space coordination activities. Expose it by wrapping our internal notifier package in a minimal public API. Ensure that we validate topic names in the process. --- client.go | 29 +++++++++++++++++++++++++++++ listen_subscription.go | 18 ++++++++++++++++++ 2 files changed, 47 insertions(+) create mode 100644 listen_subscription.go diff --git a/client.go b/client.go index fecc8a33..b2940ab2 100644 --- a/client.go +++ b/client.go @@ -542,6 +542,35 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client return client, nil } +// pgIdentifierRegexp matches all valid PostgreSQL identifier names: +var pgIdentifierRegexp = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_]*$") + +// Listen subscribes to a notification topic. The notifyFunc will be called for +// any notification that is received on the topic. The returned subscription must be +// closed with Unlisten when no longer needed. +// +// The provided notifyFunc should be fast and not block to avoid dropping other +// notifications and potentially causing internal issues within River. +// +// It returns an error if the specified topic is not a valid Postgres +// identifier. Panics if notifyFunc is nil. +func (c *Client[TTx]) Listen(topic string, notifyFunc NotifyFunc) (*ListenSubscription, error) { + // Validate that the topic is a valid Postgres identifier: + if valid := pgIdentifierRegexp.MatchString(topic); !valid { + return nil, fmt.Errorf("invalid topic name %q") + } + if notifyFunc == nil { + panic("notifyFunc is required") + } + + wrapperFunc := func(topic notifier.NotificationTopic, payload string) { + notifyFunc(string(topic), payload) + } + return &ListenSubscription{ + Subscription: c.notifier.Listen(notifier.NotificationTopic(topic), wrapperFunc), + }, nil +} + // Start starts the client's job fetching and working loops. Once this is called, // the client will run in a background goroutine until stopped. All jobs are // run with a context inheriting from the provided context, but with a timeout diff --git a/listen_subscription.go b/listen_subscription.go new file mode 100644 index 00000000..e7a8c1f7 --- /dev/null +++ b/listen_subscription.go @@ -0,0 +1,18 @@ +package river + +import "github.com/riverqueue/river/internal/notifier" + +// ListenSubscription is a subscription returned from the Client's Listen method. +type ListenSubscription struct { + *notifier.Subscription +} + +// Unlisten stops listening to this particular subscription. +func (sub *ListenSubscription) Unlisten() { + sub.Subscription.Unlisten() +} + +// NotifyFunc is a function that will be called any time a Postgres notification +// payload is receiverd on the specified topic. It must not block, or messages +// will be dropped (including those important to River's internal operation). +type NotifyFunc func(topic string, payload string) From 6e49be366b00bb41715dc59a48688a47e0ff1262 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Sat, 23 Dec 2023 12:54:44 -0600 Subject: [PATCH 2/2] DON'T MERGE: note for Brandur --- client.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/client.go b/client.go index b2940ab2..95264159 100644 --- a/client.go +++ b/client.go @@ -647,6 +647,11 @@ func (c *Client[TTx]) Start(ctx context.Context) error { c.wg.Add(2) go func() { + // TODO(bgentry): is this the wrong context? Should the notifier actually + // use the `workCtx` so that it doesn't get shut down before existing jobs + // have finished / had their contexts cancelled? This would preserve the + // ability to cancel an individual job's context during the initial + // shutdown phase. c.notifier.Run(fetchNewWorkCtx) c.wg.Done() }()