Skip to content

Conversation

@asnare
Copy link
Contributor

@asnare asnare commented Dec 3, 2025

This PR factors out some common code (from databrickslabs/lakebridge#2163) needed for processing a line-based stream in realtime, such as when working with the output of subprocesses. This is currently needed by the Lakebridge CLI as well as the BB LSP Server.

Although moderately complex, this is needed because the builtin Python support for line-based processing1 lacks in several ways:

  • When a line exceeds the limit it throws an exception. Exceptions in asynchronous contexts are very hard to deal with, making it error-prone. If unhandled (eg. in a task) everything stalls.
  • When an error occurs there's no reasonable recovery path: the internal and stream state are basically unknown.

In contrast, this processing:

  • Handles lines of arbitrary length, splitting up long lines if necessary.
  • Uses bounded memory irrespective of how long the lines are; unnecessarily large buffers are not needed.

Expected usage is something like:

async def run_something() -> None:
    process = await asyncio.create_subprocess_exec(
        ...,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    assert process.stdout is not None, "stdout should be a pipe"
    assert process.stderr is not None, "stderr should be a pipe"

    async def log_stream(name: str, stream: asyncio.StreamReader, log_level: int, limit: int = 4096) -> None:
        try:
            async for line in readlines(stream=stream, limit=limit):
                logger.log(log_level, str(line))
        except Exception as e:
            logger.error(f"Error reading {name}: {e}", exc_info=True)
            # Drain to prevent blocking.
            try:
                while await stream.read(limit): pass
            except Exception:
                pass  # Exception while draining, situation seems unrecoverable.

    await asyncio.gather(
        log_stream("stdout", process.stdout, logging.DEBUG),
        log_stream("stderr", process.stderr, logging.INFO),
        process.wait(),
    )

    return process.returncode

Footnotes

  1. asyncio.StreamReader.readline()

@asnare asnare self-assigned this Dec 3, 2025
@asnare asnare requested a review from nfx as a code owner December 3, 2025 13:46
@asnare asnare added the enhancement New feature or request label Dec 3, 2025
@github-actions
Copy link

github-actions bot commented Dec 3, 2025

✅ 40/40 passed, 2 skipped, 1m44s total

Running from acceptance #376

@codecov
Copy link

codecov bot commented Dec 3, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 80.48%. Comparing base (7ebe9a3) to head (e79b8dd).

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #320      +/-   ##
==========================================
+ Coverage   80.05%   80.48%   +0.43%     
==========================================
  Files          17       17              
  Lines        2396     2449      +53     
  Branches      516      526      +10     
==========================================
+ Hits         1918     1971      +53     
  Misses        354      354              
  Partials      124      124              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link

@m-abulazm m-abulazm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work. I just have a couple of questions to make sure I understand everything.

@asnare asnare requested a review from m-abulazm December 5, 2025 13:56
github-merge-queue bot pushed a commit to databrickslabs/lakebridge that referenced this pull request Dec 5, 2025
## Changes

LSP servers use stdin/stdout for the LSP calls, and stderr as a general
logging facility. Currently we mirror the stderr output as logs, but due
to the implementation details this fails when an individual line
received from stderr is longer than 64KiB in size.

Although #2160 contains a hot fix for this problem, this PR improves the
handling in a more robust manner:

- Long lines from the LSP server are now detected and broken up into
chunks, as several log entries. (This is preferable to a large limit
because it means we have an upper bound on memory usage rather than
potentially buffering forever and running out of memory.)
- The cause of the error when a long line hits has been addressed,
however if we hit some other error during stderr processing we now log
the (critical!) error so that it's at least clear something has gone
wrong. (The nature of this is that the CLI might hang, but it will be
noisy about it.)

### Relevant implementation details

The primary change here is to handle line-breaking and decoding
ourselves rather than relying on the `StreamReader.readline()`
implementation. (The behaviour of the latter cannot be reliably
controlled when a line exceeds the configured `limit`, which defaults to
64K.)

### Caveats/things to watch out for when reviewing:

Further changes are needed to the test LSP server (`lsp_server.py`) that
we use for testing, they are out of scope for this PR.

### Linked issues

Supersedes #2160, closes #2164.

This PR is the basis for databrickslabs/blueprint#320: once that's
available the changes in here can be simplified/refactored.

### Functionality

- modified existing command: `databricks labs lakebridge transpile`

### Tests

- manually tested
- added unit tests
# present to ensure that potentially large data chunks are released as soon as possible.

# Loop reading whatever data is available as it arrives, being careful to never have more than `limit` bytes pending.
while chunk := await stream.read(limit - len(pending_buffer)):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the stream never returns or stream is smaller than limit? should we add a timeout and a retry mechanism?

Copy link
Contributor Author

@asnare asnare Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantics of stream.read() are that it's non-blocking and returns data as soon as it becomes available. This can absolutely be less than limit, and we want that: we can get $1 \le n \le limit$ bytes.

(The special case of 0 bytes, or empty byte array, indicates the stream has closed, and nothing more is available. This will terminate the loop.)

With respect to your question:

  • It will always return when the stream closes: this is the 0-byte read case above. The semantics for a pipe are that by default it remains open while the subprocess is running, and is closed automatically when the subprocess exits.

    A subprocess can close its end of the pipe: that's allowed and still fine.

    So the circumstances under which it never returns are that the subprocess: a) never writes to the pipe; b) never completes. In general… that's fine? Subprocesses are allowed to run for as long as they like, and might not ever write to the pipe. (I have processes running on a systems in my house that have been for several years without exiting.)

  • Smaller reads are expected and fine. The limit is simply the maximum to return from one call.

This leads to:

  • We should not add a timeout: it's expected to wait as long as necessary for either data or EOF.
  • We already retry, until EOF: that's what the loop does.

There is a related issue: what happens if there is an error during a read? We deliberately allow that exception to propagate: everything inside this generator will be cleaned up. The example usage on the PR description shows roughly how error-handling should be done.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants