Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies = [
"pylint-pytest==2.0.0a0",
"databricks-labs-pylint~=0.3.0",
"pytest~=8.1.0",
"pytest-asyncio~=0.23.0",
"pytest-cov~=4.1.0",
"pytest-mock~=3.14.0",
"pytest-timeout~=2.3.1",
Expand Down Expand Up @@ -79,6 +80,7 @@ profile = "black"
[tool.pytest.ini_options]
addopts = "--no-header"
cache_dir = ".venv/pytest-cache"
asyncio_mode = "auto"

[tool.black]
target-version = ["py310"]
Expand Down
107 changes: 107 additions & 0 deletions src/databricks/labs/blueprint/logger.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
"""A nice formatter for logging. It uses colors and bold text if the console supports it."""

import asyncio
import codecs
import logging
import sys
from collections.abc import AsyncGenerator
from dataclasses import dataclass
from typing import TextIO


Expand Down Expand Up @@ -118,3 +122,106 @@ def install_logger(
console_handler.setLevel(level)
root.addHandler(console_handler)
return console_handler


@dataclass(frozen=True, kw_only=True)
class Line:
"""Represent a single line of (potentially truncated) log output."""

text: str
"""The text of the line."""

is_truncated: bool = False
"""Whether the line was truncated, with the remainder pending."""

is_final: bool = False
"""Whether this is the final (incomplete) line in the stream."""

def __str__(self) -> str:
"""Return the text of the line, appending an ellipsis if it was truncated."""
# This is for display purposes only.
suffix = ""
if self.is_truncated:
suffix += "[\u2026]"
if self.is_final:
suffix += "[no eol]"
return f"{self.text}{suffix}"


async def readlines(*, stream: asyncio.StreamReader, limit: int = 8192) -> AsyncGenerator[Line, None]:
"""Read lines from the given stream, yielding them as they arrive.

The lines will be yielded in real-time as they arrive, once the newline character is seen. Semi-universal
newlines are supported: "\n" and "\r\n" both terminate lines (but not "\r" alone).

On EOF any pending line will be yielded, even if it is incomplete (i.e. does not end with a newline).

The stream being read is treated as UTF-8, with invalid byte sequences replaced with the Unicode replacement
character.

Long lines will be split into chunks with a maximum length.

Args:
stream: The stream to mirror as logger output.
limit: The maximum number of bytes for a line before it is yielded anyway even though a newline has not been
encountered. Longer lines will therefore be split into chunks (as they arrive) no larger than this limit.
Default: 8192.
"""
if limit < 2:
msg = f"Limit must be at least 2 to allow for meaningful line reading, but got {limit}."
raise ValueError(msg)
# Maximum size of pending buffer is the limit argument.
pending_buffer = bytearray()
decoder = codecs.getincrementaldecoder("utf-8")(errors="replace")

# Implementation note: the buffer management here is a bit intricate because we want to ensure that:
# - We don't copy data around more than necessary. (Where possible we use memoryview() to avoid copies.)
# - We never want to have more than 'limit' bytes pending at any time; this is to avoid unbounded memory usage.
# - Temporary memory usage is kept to a minimum, again to avoid excessive memory usage. The various dels are
# present to ensure that potentially large data chunks are released as soon as possible.

# Loop reading whatever data is available as it arrives, being careful to never have more than `limit` bytes pending.
while chunk := await stream.read(limit - len(pending_buffer)):
# Process the chunk we've read, which may contain multiple lines, line by line.
line_from = 0
while -1 != (eol := chunk.find(b"\n", line_from)):
# Step 1: Figure out the slice corresponding to this line, handling any pending data from the last read.
line_chunk = memoryview(chunk)[line_from:eol]
line_bytes: bytearray | bytes
if pending_buffer:
pending_buffer.extend(line_chunk)
line_bytes = pending_buffer
else:
line_bytes = bytes(line_chunk)
del line_chunk

# Step 2: Decode the line and yield it.
line = Line(text=decoder.decode(line_bytes).rstrip("\r"))
del line_bytes
yield line
del line

# Step 3: Set up for handling the next line of this chunk.
pending_buffer.clear()
line_from = eol + 1

# Anything remaining in this chunk is pending data for the next read, but some corner cases need to be handled:
# - This chunk may not have any newlines, and we may already have pending data from the previous chunk we read.
# - We may be at the limit (including any pending data from earlier reads) and need to yield an incomplete
# line.
if remaining := memoryview(chunk)[line_from:]:
pending_buffer.extend(remaining)
if len(pending_buffer) >= limit:
# Line too long, yield what we have and reset.
# (As a special case, postpone handling a trailing \r: it could be part of a \r\n newline sequence.)
yield_through = (limit - 1) if pending_buffer.endswith(b"\r") else limit
yield_now = pending_buffer[:yield_through]
line = Line(text=decoder.decode(yield_now), is_truncated=True)
del yield_now
yield line
del line, pending_buffer[:yield_through]
del remaining
if pending_buffer:
# Here we've hit EOF but have an incomplete line pending. We need to yield it.
line = Line(text=decoder.decode(pending_buffer, final=True), is_final=True)
yield line
195 changes: 193 additions & 2 deletions tests/unit/test_logger.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
from __future__ import annotations

import asyncio
import datetime as dt
import inspect
import io
import logging
import re
from collections.abc import Generator
from collections.abc import Generator, Sequence
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager

import pytest

from databricks.labs.blueprint.logger import NiceFormatter, install_logger
from databricks.labs.blueprint.logger import (
Line,
NiceFormatter,
install_logger,
readlines,
)


class LogCaptureHandler(logging.Handler):
Expand Down Expand Up @@ -348,3 +354,188 @@ def test_formatter_format_exception(use_colors: bool) -> None:
" raise RuntimeError(exception_message)",
]
assert exception == "RuntimeError: Test exception."


class MockStreamReader(asyncio.StreamReader):
"""Mock asyncio.StreamReader that returns pre-configured chunks."""

_remaining_data: Sequence[bytes]

def __init__(self, data_chunks: Sequence[bytes]) -> None:
super().__init__()
# Chunks represent data that could be returned on successive reads, mimicking the nature of non-blocking I/O
# where partial data may be returned. The chunk boundaries represent the splits where partial data is returned.
self._remaining_data = data_chunks

async def read(self, n: int = -1) -> bytes:
match n:
case -1:
# Read all remaining data.
data = b"".join(self._remaining_data)
self._remaining_data = []
case 0:
# Empty read.
data = b""
case max_read if max_read > 0:
# Read up to n, but only from the first chunk.
match self._remaining_data:
case []:
data = b""
case [head, *tail]:
if len(head) <= max_read:
data = head
self._remaining_data = tail
else:
data = head[:max_read]
self._remaining_data = [head[max_read:], *tail]
case _:
raise ValueError(f"Unsupported read size: {n}")
return data

async def readline(self) -> bytes:
raise NotImplementedError("This is a mock; not implemented.")

async def readexactly(self, n: int) -> bytes:
raise NotImplementedError("This is a mock; not implemented.")

async def readuntil(self, separator: bytes = b"\n") -> bytes:
raise NotImplementedError("This is a mock; not implemented.")

def at_eof(self) -> bool:
return not self._remaining_data

@classmethod
def line(cls, text_dsl: str) -> Line:
"""Create a Line instance from the given text, setting the flags based on trailing markers.

The markers are:
- If the text ends with '+' it indicates truncation.
- If the text ends with '!' it indicates this is the final line.

The markers are mutually exclusive and not part of the actual text.

Args:
text_dsl: The text with optional trailing markers.
Returns:
The Line instance representing the supplied text and flags.
"""
if text_dsl.endswith("+"):
return Line(text=text_dsl[:-1], is_truncated=True)
elif text_dsl.endswith("!"):
return Line(text=text_dsl[:-1], is_final=True)
else:
return Line(text=text_dsl, is_truncated=False, is_final=False)

@classmethod
async def assert_readlines_with_chunks_yields_lines(
cls,
data_chunks: Sequence[bytes],
expected_lines: Sequence[str | Line],
*,
limit: int = 128,
) -> None:
stream = cls(data_chunks)
lines = [line async for line in readlines(stream=stream, limit=limit)]
normalized_lines = [x if isinstance(x, Line) else cls.line(x) for x in expected_lines]
assert normalized_lines == lines


async def test_readlines_normal_lines() -> None:
"""Verify the simple case of each line fitting within the limit: one line per read."""
data_chunks = (b"first line\n", b"second line\n", b"third line\n")
expected_lines = ("first line", "second line", "third line")
await MockStreamReader.assert_readlines_with_chunks_yields_lines(data_chunks, expected_lines)


async def test_readlines_whitespace_handling() -> None:
"""Verify that whitespace (excluding newlines) and empty lines are preserved."""
data_chunks = (b" first line \n", b"\tsecond\rline\t\r\n", b" \t \r\n", b"\n", b"last\tproper\tline\n", b" \t \r")
expected_lines = (" first line ", "\tsecond\rline\t", " \t ", "", "last\tproper\tline", " \t \r!")
await MockStreamReader.assert_readlines_with_chunks_yields_lines(data_chunks, expected_lines)


async def test_readlines_small_reads() -> None:
"""Verify that lines split over multiple small sub-limit reads are correctly reassembled."""
data_chunks = (b"first ", b"line\nsecond", b" line\r", b"\nlas", b"t line")
expected_lines = ("first line", "second line", "last line!")
await MockStreamReader.assert_readlines_with_chunks_yields_lines(data_chunks, expected_lines)


@pytest.mark.parametrize(
("data_chunks", "expected_messages"),
(
# Note: limit for all examples is 10.
# Single line split over 2 reads.
((b"1234567", b"89\n"), ("123456789",)),
# Single read, exactly on the limit.
((b"123456789\n",), ("123456789",)),
# Single read, exactly on the minimum limit to trigger premature flush.
((b"1234567890",), ("1234567890+",)),
# Maximum line length.
((b"123456789", b"123456789\n"), ("1234567891+", "23456789")),
# Multiple lines in one read, with existing data from the previous read.
((b"1", b"12\n45\n78\n0", b"12\n"), ("112", "45", "78", "012")),
# A very long line, with some existing data in the buffer, and leaving some remainder.
(
(b"12", b"3456789012" b"3456789012" b"3456789012" b"34567890\n1234"),
("1234567890+", "1234567890+", "1234567890+", "1234567890+", "", "1234!"),
),
# A \r\n newline split across reads.
((b"1234\r", b"\nabcd\n"), ("1234", "abcd")),
# A \r\n split exactly on the limit.
((b"123456789\r" b"\nabcd\n",), ("123456789+", "", "abcd")),
),
)
async def test_readlines_line_exceeds_limit(data_chunks: Sequence[bytes], expected_messages: Sequence[str]) -> None:
"""Verify that line buffering and splitting is handled, including if a line is (much!) longer than the limit."""
await MockStreamReader.assert_readlines_with_chunks_yields_lines(data_chunks, expected_messages, limit=10)


async def test_readlines_incomplete_line_at_eof() -> None:
"""Verify that an incomplete line at EOF is logged."""
data_chunks = (b"normal_line\n", b"incomplete_line\r")
expected_messages = ("normal_line", "incomplete_line\r!")
await MockStreamReader.assert_readlines_with_chunks_yields_lines(data_chunks, expected_messages)


async def test_readlines_invalid_utf8() -> None:
"""Test invalid UTF-8 sequences are replaced with replacement character."""
data_chunks = (
# A line with invalid UTF-8 bytes in it.
b"bad[\xc0\xc0]utf8\n",
# An unterminated UTF-8 sequence at the end of the file.
b"incomplete\xc3",
)
expected_messages = ("bad[\ufffd\ufffd]utf8", "incomplete\ufffd!")
await MockStreamReader.assert_readlines_with_chunks_yields_lines(data_chunks, expected_messages, limit=16)


async def test_readlines_split_utf8() -> None:
"""Test that UTF-8 sequence split across limit-based chunks is handled properly."""
# A long line, that will be split across the utf-8 sequence: that character will be deferred until the next line.
data_chunks = ("123456789abcd\U0001f596efgh\n".encode("utf-8"),)
expected_messages = ("123456789abcd+", "\U0001f596efgh")
await MockStreamReader.assert_readlines_with_chunks_yields_lines(data_chunks, expected_messages, limit=16)


async def test_readlines_empty_stream() -> None:
"""Verify that an empty stream yields no lines."""
await MockStreamReader.assert_readlines_with_chunks_yields_lines(data_chunks=(), expected_lines=())


async def test_readlines_invalid_limit() -> None:
"""Verify that an invalid limit raises ValueError."""
stream = MockStreamReader(data_chunks=())
with pytest.raises(
ValueError, match=re.escape("Limit must be at least 2 to allow for meaningful line reading, but got 1.")
):
async for _ in readlines(stream=stream, limit=1):
pass


async def test_default_line_representation() -> None:
"""Verify that the default Line representation is as expected."""
# Note: just for convenience/display purposes.
assert str(Line(text="Here is a line")) == "Here is a line"
assert str(Line(text="Truncated line", is_truncated=True)) == "Truncated line[\u2026]"
assert str(Line(text="Last incomplete line", is_final=True)) == "Last incomplete line[no eol]"