-
Notifications
You must be signed in to change notification settings - Fork 14
Support for line-based streaming of subprocess output #320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…ng of arbitrarily long lines.
|
✅ 40/40 passed, 2 skipped, 1m44s total Running from acceptance #376 |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #320 +/- ##
==========================================
+ Coverage 80.05% 80.48% +0.43%
==========================================
Files 17 17
Lines 2396 2449 +53
Branches 516 526 +10
==========================================
+ Hits 1918 1971 +53
Misses 354 354
Partials 124 124 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
m-abulazm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work. I just have a couple of questions to make sure I understand everything.
## Changes LSP servers use stdin/stdout for the LSP calls, and stderr as a general logging facility. Currently we mirror the stderr output as logs, but due to the implementation details this fails when an individual line received from stderr is longer than 64KiB in size. Although #2160 contains a hot fix for this problem, this PR improves the handling in a more robust manner: - Long lines from the LSP server are now detected and broken up into chunks, as several log entries. (This is preferable to a large limit because it means we have an upper bound on memory usage rather than potentially buffering forever and running out of memory.) - The cause of the error when a long line hits has been addressed, however if we hit some other error during stderr processing we now log the (critical!) error so that it's at least clear something has gone wrong. (The nature of this is that the CLI might hang, but it will be noisy about it.) ### Relevant implementation details The primary change here is to handle line-breaking and decoding ourselves rather than relying on the `StreamReader.readline()` implementation. (The behaviour of the latter cannot be reliably controlled when a line exceeds the configured `limit`, which defaults to 64K.) ### Caveats/things to watch out for when reviewing: Further changes are needed to the test LSP server (`lsp_server.py`) that we use for testing, they are out of scope for this PR. ### Linked issues Supersedes #2160, closes #2164. This PR is the basis for databrickslabs/blueprint#320: once that's available the changes in here can be simplified/refactored. ### Functionality - modified existing command: `databricks labs lakebridge transpile` ### Tests - manually tested - added unit tests
| # present to ensure that potentially large data chunks are released as soon as possible. | ||
|
|
||
| # Loop reading whatever data is available as it arrives, being careful to never have more than `limit` bytes pending. | ||
| while chunk := await stream.read(limit - len(pending_buffer)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if the stream never returns or stream is smaller than limit? should we add a timeout and a retry mechanism?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The semantics of stream.read() are that it's non-blocking and returns data as soon as it becomes available. This can absolutely be less than limit, and we want that: we can get
(The special case of 0 bytes, or empty byte array, indicates the stream has closed, and nothing more is available. This will terminate the loop.)
With respect to your question:
-
It will always return when the stream closes: this is the 0-byte read case above. The semantics for a pipe are that by default it remains open while the subprocess is running, and is closed automatically when the subprocess exits.
A subprocess can close its end of the pipe: that's allowed and still fine.
So the circumstances under which it never returns are that the subprocess: a) never writes to the pipe; b) never completes. In general… that's fine? Subprocesses are allowed to run for as long as they like, and might not ever write to the pipe. (I have processes running on a systems in my house that have been for several years without exiting.)
-
Smaller reads are expected and fine. The limit is simply the maximum to return from one call.
This leads to:
- We should not add a timeout: it's expected to wait as long as necessary for either data or EOF.
- We already retry, until EOF: that's what the loop does.
There is a related issue: what happens if there is an error during a read? We deliberately allow that exception to propagate: everything inside this generator will be cleaned up. The example usage on the PR description shows roughly how error-handling should be done.
This PR factors out some common code (from databrickslabs/lakebridge#2163) needed for processing a line-based stream in realtime, such as when working with the output of subprocesses. This is currently needed by the Lakebridge CLI as well as the BB LSP Server.
Although moderately complex, this is needed because the builtin Python support for line-based processing1 lacks in several ways:
In contrast, this processing:
Expected usage is something like:
Footnotes
asyncio.StreamReader.readline()↩