-
Notifications
You must be signed in to change notification settings - Fork 4.5k
SolaceIO - add ack deadline #37164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
SolaceIO - add ack deadline #37164
Conversation
…e as it may lead to data loss during work rebalancing or retry. " This reverts commit f277b5ec59ba213af547167080fd8f5bd210e6a7.
Summary of ChangesHello @stankiewicz, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the robustness of the SolaceIO connector by addressing potential data loss scenarios. It refactors the message acknowledgment process, moving away from immediate acknowledgments in certain lifecycle methods. Instead, it implements a more resilient system where messages that are not successfully processed and checkpointed within a defined Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
R: @scwhittle |
|
R: @sjvanrossum |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
1 similar comment
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
55edab3 to
dc3b007
Compare
dc3b007 to
201026c
Compare
| Queue<BytesXMLMessage> safeToAckMessages = new ConcurrentLinkedQueue<>(); | ||
| safeToAckMessages.addAll(receivedMessages); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're not modifying safeToAckMessages outside the scope of this function, right? This doesn't have to be a concurrent container if that's the case.
I can't recall if receivedMessages can safely be replaced by the time this function is called. If so, maybe this can be cleaned up a bit more by reassigning receivedMessages to a new instance and passing the old instance off to nackMessages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
safeToAckMessages is used async by cleanUpThread, see below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, nvm. It seems like we're doing the same work that's expected of the bundle finalizer though.
I'm trying to think if we could capture the outstanding checkpoint finalization calls instead with an ExecutorService for example. When the reader is closed you'd then call ExecutorService#shutdownNow() which returns a collection of all Runnable items that will not be executed and those items could be executed then instead.
| try { | ||
| if (nackCallback != null) { | ||
| // wait only for last one to finish, it will mean all the previous one are also done. | ||
| nackCallback.get(ackDeadlineSeconds * 2, TimeUnit.SECONDS); | ||
| } | ||
| } catch (InterruptedException | ExecutionException | TimeoutException e) { | ||
| LOG.error("SolaceIO.Read: Failed to wait till nack background thread is finished"); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this attempts to await the same conditions as when callingExecutorService#shutdown() and ExecutorService#awaitTermination(long, TimeUnit)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not really, there are other background tasks that are scheduled on executor service that I don't want to wait for.
| return new SolaceCheckpointMark(bytesXMLMessages); | ||
| nackCallback = | ||
| cleanUpThread.schedule( | ||
| () -> nackMessages(safeToAckMessages), ackDeadlineSeconds, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see here @sjvanrossum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, and then there's another possible use of the container in SolaceCheckpointMark#finalizeCheckpoint(), nvm.
201026c to
cd60511
Compare
- remove message ack from close and advance as it may lead to data loss during work rebalancing or retry. - add async NACK with configurable deadline so any not finalized messages are rejected and retried.
cd60511 to
3149f96
Compare
Stacked on top of #37162
SolaceIO data loss:
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.