From bebf48e350f03d827e9a10d043ff538d1600b9c5 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 3 Dec 2025 12:27:22 +0100 Subject: [PATCH 1/6] Implement asynchronous line-based processing of a stream, with handling of arbitrarily long lines. --- pyproject.toml | 3 + src/databricks/labs/blueprint/logger.py | 107 ++++++++++++++ tests/unit/test_logger.py | 187 +++++++++++++++++++++++- 3 files changed, 295 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 368d8de..6c09e73 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ dependencies = [ "pylint-pytest==2.0.0a0", "databricks-labs-pylint~=0.3.0", "pytest~=8.1.0", + "pytest-asyncio~=1.3.0", "pytest-cov~=4.1.0", "pytest-mock~=3.14.0", "pytest-timeout~=2.3.1", @@ -79,6 +80,8 @@ profile = "black" [tool.pytest.ini_options] addopts = "--no-header" cache_dir = ".venv/pytest-cache" +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" [tool.black] target-version = ["py310"] diff --git a/src/databricks/labs/blueprint/logger.py b/src/databricks/labs/blueprint/logger.py index b686dfe..ac0deea 100644 --- a/src/databricks/labs/blueprint/logger.py +++ b/src/databricks/labs/blueprint/logger.py @@ -1,7 +1,10 @@ """A nice formatter for logging. It uses colors and bold text if the console supports it.""" +import asyncio import logging import sys +from collections.abc import AsyncGenerator +from dataclasses import dataclass from typing import TextIO @@ -118,3 +121,107 @@ def install_logger( console_handler.setLevel(level) root.addHandler(console_handler) return console_handler + + +@dataclass(frozen=True, kw_only=True) +class Line: + """Represent a single line of (potentially truncated) log output.""" + + text: str + """The text of the line.""" + + is_truncated: bool = False + """Whether the line was truncated, with the remainder pending.""" + + is_final: bool = False + """Whether this is the final (incomplete) line in the stream.""" + + def __str__(self) -> str: + """Return the text of the line, appending an ellipsis if it was truncated.""" + # This is for display purposes only. + suffix = "" + if self.is_truncated: + suffix += "[\u2026]" + if self.is_final: + suffix += "[no eol]" + return f"{self.text}{suffix}" + + +async def readlines(*, stream: asyncio.StreamReader, limit: int) -> AsyncGenerator[Line, None]: + """Read lines from the given stream, yielding them as they arrive. + + The lines will be yielded in real-time as they arrive, once the newline character is seen. Semi-universal + newlines are supported: "\n" and "\r\n" both terminate lines (but not "\r" alone). + + On EOF any pending line will be yielded, even if it is incomplete (i.e. does not end with a newline). + + The stream being read is treated as UTF-8, with invalid byte sequences replaced with the Unicode replacement + character. + + Long lines will be split into chunks with a maximum length. If the split falls in the middle of a multibyte UTF-8 + character, the bytes on either side of the boundary will likely be invalid and logged as such. + + Args: + stream: The stream to mirror as logger output. + limit: The maximum number of bytes for a line before it is yielded anyway even though a newline has not been + encountered. Longer lines will therefore be split into chunks (as they arrive) no larger than this limit. + """ + if limit < 2: + msg = f"Limit must be at least 2 to allow for meaningful line reading, but got {limit}." + raise ValueError(msg) + # Maximum size of pending buffer is the limit argument. + pending_buffer = bytearray() + + # Implementation note: the buffer management here is a bit intricate because we want to ensure that: + # - We don't copy data around more than necessary. (Where possible we use memoryview() to avoid copies.) + # - We never want to have more than 'limit' bytes pending at any time; this is to avoid unbounded memory usage. + # - Temporary memory usage is kept to a minimum, again to avoid excessive memory usage. The various dels are + # present to ensure that potentially large data chunks are released as soon as possible. + + # TODO: Use an incremental UTF-8 decoder to avoid splitting multibyte characters across reads. + + # Loop reading whatever data is available as it arrives, being careful to never have more than `limit` bytes pending. + while chunk := await stream.read(limit - len(pending_buffer)): + # Process the chunk we've read, which may contain multiple lines, line by line. + line_from = 0 + while -1 != (idx := chunk.find(b"\n", line_from)): + # Step 1: Figure out the slice corresponding to this line, handling any pending data from the last read. + line_chunk = memoryview(chunk)[line_from:idx] + line_bytes: bytearray | bytes + if pending_buffer: + pending_buffer.extend(line_chunk) + line_bytes = pending_buffer + else: + line_bytes = bytes(line_chunk) + del line_chunk + + # Step 2: Decode the line and yield it. + line = Line(text=line_bytes.decode("utf-8", errors="replace").rstrip("\r")) + del line_bytes + yield line + del line + + # Step 3: Set up for handling the next line of this chunk. + pending_buffer.clear() + line_from = idx + 1 + + # Anything remaining in this chunk is pending data for the next read, but some corner cases need to be handled: + # - This chunk may not have any newlines, and we may already have pending data from the previous chunk we read. + # - We may be at the limit (including any pending data from earlier reads) and need to yield an incomplete + # line. + if remaining := memoryview(chunk)[line_from:]: + pending_buffer.extend(remaining) + if len(pending_buffer) >= limit: + # Line too long, yield what we have and reset. + # (As a special case, postpone handling a trailing \r: it could be part of a \r\n newline sequence.) + yield_through = (limit - 1) if pending_buffer.endswith(b"\r") else limit + yield_now = pending_buffer[:yield_through] + line = Line(text=yield_now.decode("utf-8", errors="replace"), is_truncated=True) + del yield_now + yield line + del line, pending_buffer[:yield_through] + del remaining + if pending_buffer: + # Here we've hit EOF but have an incomplete line pending. We need to yield it. + line = Line(text=pending_buffer.decode("utf-8", errors="replace"), is_final=True) + yield line diff --git a/tests/unit/test_logger.py b/tests/unit/test_logger.py index ecfcf84..8e32d8f 100644 --- a/tests/unit/test_logger.py +++ b/tests/unit/test_logger.py @@ -1,17 +1,23 @@ from __future__ import annotations +import asyncio import datetime as dt import inspect import io import logging import re -from collections.abc import Generator +from collections.abc import Generator, Sequence from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager import pytest -from databricks.labs.blueprint.logger import NiceFormatter, install_logger +from databricks.labs.blueprint.logger import ( + Line, + NiceFormatter, + install_logger, + readlines, +) class LogCaptureHandler(logging.Handler): @@ -348,3 +354,180 @@ def test_formatter_format_exception(use_colors: bool) -> None: " raise RuntimeError(exception_message)", ] assert exception == "RuntimeError: Test exception." + + +class MockStreamReader(asyncio.StreamReader): + """Mock asyncio.StreamReader that returns pre-configured chunks.""" + + _remaining_data: Sequence[bytes] + + def __init__(self, data_chunks: Sequence[bytes]) -> None: + super().__init__() + # Chunks represent data that could be returned on successive reads, mimicking the nature of non-blocking I/O + # where partial data may be returned. The chunk boundaries represent the splits where partial data is returned. + self._remaining_data = data_chunks + + async def read(self, n: int = -1) -> bytes: + match n: + case -1: + # Read all remaining data. + data = b"".join(self._remaining_data) + self._remaining_data = [] + case 0: + # Empty read. + data = b"" + case max_read if max_read > 0: + # Read up to n, but only from the first chunk. + match self._remaining_data: + case []: + data = b"" + case [head, *tail]: + if len(head) <= max_read: + data = head + self._remaining_data = tail + else: + data = head[:max_read] + self._remaining_data = [head[max_read:], *tail] + case _: + raise ValueError(f"Unsupported read size: {n}") + return data + + async def readline(self) -> bytes: + raise NotImplementedError("This is a mock; not implemented.") + + async def readexactly(self, n: int) -> bytes: + raise NotImplementedError("This is a mock; not implemented.") + + async def readuntil(self, separator: bytes = b"\n") -> bytes: + raise NotImplementedError("This is a mock; not implemented.") + + def at_eof(self) -> bool: + return not self._remaining_data + + @classmethod + def line(cls, text_dsl: str) -> Line: + """Create a Line instance from the given text, setting the flags based on trailing markers. + + The markers are: + - If the text ends with '+' it indicates truncation. + - If the text ends with '!' it indicates this is the final line. + + The markers are mutually exclusive and not part of the actual text. + + Args: + text_dsl: The text with optional trailing markers. + Returns: + The Line instance representing the supplied text and flags. + """ + if text_dsl.endswith("+"): + return Line(text=text_dsl[:-1], is_truncated=True) + elif text_dsl.endswith("!"): + return Line(text=text_dsl[:-1], is_final=True) + else: + return Line(text=text_dsl, is_truncated=False, is_final=False) + + @classmethod + async def assert_readlines_with_chunks_yields_lines( + cls, + data_chunks: Sequence[bytes], + expected_lines: Sequence[str | Line], + *, + limit: int = 128, + ) -> None: + stream = cls(data_chunks) + lines = [line async for line in readlines(stream=stream, limit=limit)] + normalized_lines = [x if isinstance(x, Line) else cls.line(x) for x in expected_lines] + assert normalized_lines == lines + + +async def test_readlines_normal_lines() -> None: + """Verify the simple case of each line fitting within the limit: one line per read.""" + data_chunks = (b"first line\n", b"second line\n", b"third line\n") + expected_lines = ("first line", "second line", "third line") + await MockStreamReader.assert_readlines_with_chunks_yields_lines(data_chunks, expected_lines) + + +async def test_readlines_whitespace_handling() -> None: + """Verify that whitespace (excluding newlines) and empty lines are preserved.""" + data_chunks = (b" first line \n", b"\tsecond\rline\t\r\n", b" \t \r\n", b"\n", b"last\tproper\tline\n", b" \t \r") + expected_lines = (" first line ", "\tsecond\rline\t", " \t ", "", "last\tproper\tline", " \t \r!") + await MockStreamReader.assert_readlines_with_chunks_yields_lines(data_chunks, expected_lines) + + +async def test_readlines_small_reads() -> None: + """Verify that lines split over multiple small sub-limit reads are correctly reassembled.""" + data_chunks = (b"first ", b"line\nsecond", b" line\r", b"\nlas", b"t line") + expected_lines = ("first line", "second line", "last line!") + await MockStreamReader.assert_readlines_with_chunks_yields_lines(data_chunks, expected_lines) + + +@pytest.mark.parametrize( + ("data_chunks", "expected_messages"), + ( + # Note: limit for all examples is 10. + # Single line split over 2 reads. + ((b"1234567", b"89\n"), ("123456789",)), + # Single read, exactly on the limit. + ((b"123456789\n",), ("123456789",)), + # Single read, exactly on the minimum limit to trigger premature flush. + ((b"1234567890",), ("1234567890+",)), + # Maximum line length. + ((b"123456789", b"123456789\n"), ("1234567891+", "23456789")), + # Multiple lines in one read, with existing data from the previous read. + ((b"1", b"12\n45\n78\n0", b"12\n"), ("112", "45", "78", "012")), + # A very long line, with some existing data in the buffer, and leaving some remainder. + ( + (b"12", b"3456789012" b"3456789012" b"3456789012" b"34567890\n1234"), + ("1234567890+", "1234567890+", "1234567890+", "1234567890+", "", "1234!"), + ), + # A \r\n newline split across reads. + ((b"1234\r", b"\nabcd\n"), ("1234", "abcd")), + # A \r\n split exactly on the limit. + ((b"123456789\r" b"\nabcd\n",), ("123456789+", "", "abcd")), + ), +) +async def test_readlines_line_exceeds_limit(data_chunks: Sequence[bytes], expected_messages: Sequence[str]) -> None: + """Verify that line buffering and splitting is handled, including if a line is (much!) longer than the limit.""" + await MockStreamReader.assert_readlines_with_chunks_yields_lines(data_chunks, expected_messages, limit=10) + + +async def test_readlines_incomplete_line_at_eof() -> None: + """Verify that an incomplete line at EOF is logged.""" + data_chunks = (b"normal_line\n", b"incomplete_line\r") + expected_messages = ("normal_line", "incomplete_line\r!") + await MockStreamReader.assert_readlines_with_chunks_yields_lines(data_chunks, expected_messages) + + +async def test_readlines_invalid_utf8() -> None: + """Test invalid UTF-8 sequences are replaced with replacement character.""" + data_chunks = ( + # A line with invalid UTF-8 bytes in it. + b"bad[\xc0\xc0]utf8\n", + # A long line, that will be split across the utf-8 sequence. + "123456789abcd\U0001f596efgh\n".encode("utf-8"), + ) + expected_messages = ("bad[\ufffd\ufffd]utf8", "123456789abcd\ufffd+", "\ufffdefgh") + await MockStreamReader.assert_readlines_with_chunks_yields_lines(data_chunks, expected_messages, limit=16) + + +async def test_readlines_empty_stream() -> None: + """Verify that an empty stream yields no lines.""" + await MockStreamReader.assert_readlines_with_chunks_yields_lines(data_chunks=(), expected_lines=()) + + +async def test_readlines_invalid_limit() -> None: + """Verify that an invalid limit raises ValueError.""" + stream = MockStreamReader(data_chunks=()) + with pytest.raises( + ValueError, match=re.escape("Limit must be at least 2 to allow for meaningful line reading, but got 1.") + ): + async for _ in readlines(stream=stream, limit=1): + pass + + +async def test_default_line_representation() -> None: + """Verify that the default Line representation is as expected.""" + # Note: just for convenience/display purposes. + assert str(Line(text="Here is a line")) == "Here is a line" + assert str(Line(text="Truncated line", is_truncated=True)) == "Truncated line[\u2026]" + assert str(Line(text="Last incomplete line", is_final=True)) == "Last incomplete line[no eol]" From aa9f2ab2ff392dbc78617b78e786afaab7b32385 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 3 Dec 2025 13:43:09 +0100 Subject: [PATCH 2/6] Update splitting to be UTF-8-aware. --- src/databricks/labs/blueprint/logger.py | 19 +++++++++---------- tests/unit/test_logger.py | 14 +++++++++++--- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/databricks/labs/blueprint/logger.py b/src/databricks/labs/blueprint/logger.py index ac0deea..ef6c4bf 100644 --- a/src/databricks/labs/blueprint/logger.py +++ b/src/databricks/labs/blueprint/logger.py @@ -1,6 +1,7 @@ """A nice formatter for logging. It uses colors and bold text if the console supports it.""" import asyncio +import codecs import logging import sys from collections.abc import AsyncGenerator @@ -158,8 +159,7 @@ async def readlines(*, stream: asyncio.StreamReader, limit: int) -> AsyncGenerat The stream being read is treated as UTF-8, with invalid byte sequences replaced with the Unicode replacement character. - Long lines will be split into chunks with a maximum length. If the split falls in the middle of a multibyte UTF-8 - character, the bytes on either side of the boundary will likely be invalid and logged as such. + Long lines will be split into chunks with a maximum length. Args: stream: The stream to mirror as logger output. @@ -171,6 +171,7 @@ async def readlines(*, stream: asyncio.StreamReader, limit: int) -> AsyncGenerat raise ValueError(msg) # Maximum size of pending buffer is the limit argument. pending_buffer = bytearray() + decoder = codecs.getincrementaldecoder("utf-8")(errors="replace") # Implementation note: the buffer management here is a bit intricate because we want to ensure that: # - We don't copy data around more than necessary. (Where possible we use memoryview() to avoid copies.) @@ -178,15 +179,13 @@ async def readlines(*, stream: asyncio.StreamReader, limit: int) -> AsyncGenerat # - Temporary memory usage is kept to a minimum, again to avoid excessive memory usage. The various dels are # present to ensure that potentially large data chunks are released as soon as possible. - # TODO: Use an incremental UTF-8 decoder to avoid splitting multibyte characters across reads. - # Loop reading whatever data is available as it arrives, being careful to never have more than `limit` bytes pending. while chunk := await stream.read(limit - len(pending_buffer)): # Process the chunk we've read, which may contain multiple lines, line by line. line_from = 0 - while -1 != (idx := chunk.find(b"\n", line_from)): + while -1 != (eol := chunk.find(b"\n", line_from)): # Step 1: Figure out the slice corresponding to this line, handling any pending data from the last read. - line_chunk = memoryview(chunk)[line_from:idx] + line_chunk = memoryview(chunk)[line_from:eol] line_bytes: bytearray | bytes if pending_buffer: pending_buffer.extend(line_chunk) @@ -196,14 +195,14 @@ async def readlines(*, stream: asyncio.StreamReader, limit: int) -> AsyncGenerat del line_chunk # Step 2: Decode the line and yield it. - line = Line(text=line_bytes.decode("utf-8", errors="replace").rstrip("\r")) + line = Line(text=decoder.decode(line_bytes).rstrip("\r")) del line_bytes yield line del line # Step 3: Set up for handling the next line of this chunk. pending_buffer.clear() - line_from = idx + 1 + line_from = eol + 1 # Anything remaining in this chunk is pending data for the next read, but some corner cases need to be handled: # - This chunk may not have any newlines, and we may already have pending data from the previous chunk we read. @@ -216,12 +215,12 @@ async def readlines(*, stream: asyncio.StreamReader, limit: int) -> AsyncGenerat # (As a special case, postpone handling a trailing \r: it could be part of a \r\n newline sequence.) yield_through = (limit - 1) if pending_buffer.endswith(b"\r") else limit yield_now = pending_buffer[:yield_through] - line = Line(text=yield_now.decode("utf-8", errors="replace"), is_truncated=True) + line = Line(text=decoder.decode(yield_now), is_truncated=True) del yield_now yield line del line, pending_buffer[:yield_through] del remaining if pending_buffer: # Here we've hit EOF but have an incomplete line pending. We need to yield it. - line = Line(text=pending_buffer.decode("utf-8", errors="replace"), is_final=True) + line = Line(text=decoder.decode(pending_buffer, final=True), is_final=True) yield line diff --git a/tests/unit/test_logger.py b/tests/unit/test_logger.py index 8e32d8f..99e7d4e 100644 --- a/tests/unit/test_logger.py +++ b/tests/unit/test_logger.py @@ -503,10 +503,18 @@ async def test_readlines_invalid_utf8() -> None: data_chunks = ( # A line with invalid UTF-8 bytes in it. b"bad[\xc0\xc0]utf8\n", - # A long line, that will be split across the utf-8 sequence. - "123456789abcd\U0001f596efgh\n".encode("utf-8"), + # An unterminated UTF-8 sequence at the end of the file. + b"incomplete\xc3" ) - expected_messages = ("bad[\ufffd\ufffd]utf8", "123456789abcd\ufffd+", "\ufffdefgh") + expected_messages = ("bad[\ufffd\ufffd]utf8", "incomplete\ufffd!") + await MockStreamReader.assert_readlines_with_chunks_yields_lines(data_chunks, expected_messages, limit=16) + + +async def test_readlines_split_utf8() -> None: + """Test that UTF-8 sequence split across limit-based chunks is handled properly.""" + # A long line, that will be split across the utf-8 sequence: the character will be deferred until the line. + data_chunks = ("123456789abcd\U0001f596efgh\n".encode("utf-8"),) + expected_messages = ("123456789abcd+", "\U0001f596efgh") await MockStreamReader.assert_readlines_with_chunks_yields_lines(data_chunks, expected_messages, limit=16) From ac9110f422b059fde0ef7ac66d83c8159e3c4702 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 3 Dec 2025 13:45:08 +0100 Subject: [PATCH 3/6] Fix comment. --- tests/unit/test_logger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_logger.py b/tests/unit/test_logger.py index 99e7d4e..8c7e88a 100644 --- a/tests/unit/test_logger.py +++ b/tests/unit/test_logger.py @@ -512,7 +512,7 @@ async def test_readlines_invalid_utf8() -> None: async def test_readlines_split_utf8() -> None: """Test that UTF-8 sequence split across limit-based chunks is handled properly.""" - # A long line, that will be split across the utf-8 sequence: the character will be deferred until the line. + # A long line, that will be split across the utf-8 sequence: that character will be deferred until the next line. data_chunks = ("123456789abcd\U0001f596efgh\n".encode("utf-8"),) expected_messages = ("123456789abcd+", "\U0001f596efgh") await MockStreamReader.assert_readlines_with_chunks_yields_lines(data_chunks, expected_messages, limit=16) From e86fe0fd84b150c5521b6ebedc923639eb331b47 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 3 Dec 2025 15:05:45 +0100 Subject: [PATCH 4/6] Fix formatting. --- tests/unit/test_logger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_logger.py b/tests/unit/test_logger.py index 8c7e88a..02356d2 100644 --- a/tests/unit/test_logger.py +++ b/tests/unit/test_logger.py @@ -504,7 +504,7 @@ async def test_readlines_invalid_utf8() -> None: # A line with invalid UTF-8 bytes in it. b"bad[\xc0\xc0]utf8\n", # An unterminated UTF-8 sequence at the end of the file. - b"incomplete\xc3" + b"incomplete\xc3", ) expected_messages = ("bad[\ufffd\ufffd]utf8", "incomplete\ufffd!") await MockStreamReader.assert_readlines_with_chunks_yields_lines(data_chunks, expected_messages, limit=16) From 6bf537bbb7140dffd06e160c89a9a76f65d9e6d3 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 3 Dec 2025 15:07:44 +0100 Subject: [PATCH 5/6] Use an older version of pytest-asyncio to avoid dependency conflicts. --- pyproject.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 6c09e73..c245c70 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,7 +46,7 @@ dependencies = [ "pylint-pytest==2.0.0a0", "databricks-labs-pylint~=0.3.0", "pytest~=8.1.0", - "pytest-asyncio~=1.3.0", + "pytest-asyncio~=0.23.0", "pytest-cov~=4.1.0", "pytest-mock~=3.14.0", "pytest-timeout~=2.3.1", @@ -81,7 +81,6 @@ profile = "black" addopts = "--no-header" cache_dir = ".venv/pytest-cache" asyncio_mode = "auto" -asyncio_default_fixture_loop_scope = "function" [tool.black] target-version = ["py310"] From e79b8dd1f8647dcc529e2d81ddc3ba7bd987b9c3 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Fri, 5 Dec 2025 13:14:43 +0100 Subject: [PATCH 6/6] Specify a default maximum line length before we prematurely flush. --- src/databricks/labs/blueprint/logger.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/databricks/labs/blueprint/logger.py b/src/databricks/labs/blueprint/logger.py index ef6c4bf..6a6fb65 100644 --- a/src/databricks/labs/blueprint/logger.py +++ b/src/databricks/labs/blueprint/logger.py @@ -148,7 +148,7 @@ def __str__(self) -> str: return f"{self.text}{suffix}" -async def readlines(*, stream: asyncio.StreamReader, limit: int) -> AsyncGenerator[Line, None]: +async def readlines(*, stream: asyncio.StreamReader, limit: int = 8192) -> AsyncGenerator[Line, None]: """Read lines from the given stream, yielding them as they arrive. The lines will be yielded in real-time as they arrive, once the newline character is seen. Semi-universal @@ -165,6 +165,7 @@ async def readlines(*, stream: asyncio.StreamReader, limit: int) -> AsyncGenerat stream: The stream to mirror as logger output. limit: The maximum number of bytes for a line before it is yielded anyway even though a newline has not been encountered. Longer lines will therefore be split into chunks (as they arrive) no larger than this limit. + Default: 8192. """ if limit < 2: msg = f"Limit must be at least 2 to allow for meaningful line reading, but got {limit}."