Skip to content

Conversation

@EgbertW
Copy link

@EgbertW EgbertW commented Dec 22, 2025

I encountered two issues with ElasticsearchIO:

  1. We have Elasticsearch running in Kubernetes behind a proxy that can reply with "504 Gateway Timeout" or similar server errors. The current implementation seems to handle this correctly, but it assumes that all exceptions are wrapped.

While the wrapping does happen here: https://github.com/elastic/elasticsearch/blob/main/client/rest/src/main/java/org/elasticsearch/client/RestClient.java#L304C31-L304C50 this only applies to exceptions like SocketException, ConnectExceptions and such.

In this particular case, the request simply succeeds and the response has a status 504 (or similar). Therefore it ends up in ConvertResponse: https://github.com/elastic/elasticsearch/blob/main/client/rest/src/main/java/org/elasticsearch/client/RestClient.java#L304C31-L304C50

ConvertResponse is the only method that ever creates a ResponseException, here: https://github.com/elastic/elasticsearch/blob/main/client/rest/src/main/java/org/elasticsearch/client/RestClient.java#L304C31-L304C50

This is thrown a few lines below, without being wrapped.

This means that the current implementation here: https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L2819

        if (t.getCause() instanceof ResponseException) {
          ResponseException ex = (ResponseException) t.getCause();

will actually never execute as intended, as the ResponseException is always the main exception and never wrapped.

The consequence of this is that isRetryableClientException never returns true for these type of errors even though it should - these proxy errors are typically temporarily and a retry should definitely be done. The fix is easy:

        if (t instanceof ResponseException) {
          ResponseException ex = (ResponseException) t;

While addressing this issue I encountered that now it is marked retryable and ElasticsearchIO does this, using handleRetry here: https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L2896

However, this is no longer executed in a try/catch construct and hence, if it fails for all attempts, the exception is always thrown, even if throwWriteErrors was set to false. This MR also addresses this issue.

The current tests did test a ResponseException scenario but this was only a scenario that triggers a HTTP 4XX Bad Request, no 500 Server Errors. Also, since this test did not specify a RetryConfiguration, from the output it did not become clear which path the exception follows: either just simply being caught and forwarded to the output tag org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Write#FAILED_WRITES, or attempted to retry because it satisfies isRetryableClientException. This MR adds a test that uses a mocked webserver that always returns a specific HTTP error code so that the flow can properly be tested.

Thanks for considering this MR, and please let me know what I can do to improve it if necessary.

Question: do I need to create an issue before merging the fix?


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @EgbertW, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the error handling and retry mechanisms within Apache Beam's ElasticsearchIO connector. It addresses a critical flaw where ResponseExceptions, often indicative of transient server or proxy issues, were not correctly identified for retries. Furthermore, it rectifies an inconsistency where the throwWriteErrors configuration was bypassed after retry attempts were exhausted, ensuring that users' preferences for error propagation are consistently honored. These improvements lead to a more resilient and predictable ElasticsearchIO behavior, especially in environments prone to temporary network or service disruptions.

Highlights

  • Corrected ResponseException Handling: Modified ElasticsearchIO to correctly identify ResponseException as retryable by checking the exception directly (t instanceof ResponseException) rather than its cause (t.getCause() instanceof ResponseException). This ensures server-side errors (e.g., 504 Gateway Timeout) are properly retried.
  • Honoring throwWriteErrors with Retries: Ensured that the throwWriteErrors configuration is respected even after all retry attempts fail. Previously, exceptions were always thrown; now, if throwWriteErrors is false, the error is routed to the FAILED_WRITES output tag instead of being re-thrown.
  • Enhanced Test Coverage for Retries: Added a new test case, testWriteWithClientResponseExceptionIsRetried, to ElasticsearchIOTest (for both ES 8 and 9) that uses a mocked webserver to simulate HTTP 500 errors, thoroughly validating the retry logic for ResponseExceptions.
  • New Mock Server Utility: Introduced ElasticsearchIOTestUtils.AlwaysFailServer, a utility class that creates a simple HTTP server to return specified error codes, facilitating robust testing of error handling scenarios.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions
Copy link
Contributor

Assigning reviewers:

R: @kennknowles for label java.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant