|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import re |
| 4 | +from datetime import datetime |
| 5 | +from typing import Optional |
| 6 | + |
| 7 | +from rich.text import Text |
| 8 | +from toolong import timestamps |
| 9 | +from toolong.highlighter import LogHighlighter |
| 10 | +from typing_extensions import TypeAlias |
| 11 | + |
| 12 | +ParseResult: TypeAlias = "tuple[Optional[datetime], str, Text]" |
| 13 | + |
| 14 | + |
| 15 | +class LogFormat: |
| 16 | + def parse(self, line: str) -> ParseResult | None: |
| 17 | + raise NotImplementedError() |
| 18 | + |
| 19 | + |
| 20 | +class NextflowRegexLogFormatOne(LogFormat): |
| 21 | + REGEX = re.compile(".*?") |
| 22 | + LOG_LEVELS = { |
| 23 | + "DEBUG": ["dim white on black", ""], |
| 24 | + "INFO": ["bold black on green", "on #042C07"], |
| 25 | + "WARN": ["bold black on yellow", "on #44450E"], |
| 26 | + "ERROR": ["bold black on red", "on #470005"], |
| 27 | + } |
| 28 | + |
| 29 | + highlighter = LogHighlighter() |
| 30 | + |
| 31 | + def parse(self, line: str) -> ParseResult | None: |
| 32 | + match = self.REGEX.fullmatch(line) |
| 33 | + if match is None: |
| 34 | + return None |
| 35 | + |
| 36 | + text = Text.from_ansi(line) |
| 37 | + groups = match.groupdict() |
| 38 | + if date := groups.get("date", None): |
| 39 | + _, timestamp = timestamps.parse(groups["date"]) |
| 40 | + text.highlight_words([date], "not bold magenta") |
| 41 | + if thread := groups.get("thread", None): |
| 42 | + text.highlight_words([thread], "blue") |
| 43 | + if log_level := groups.get("log_level", None): |
| 44 | + text.highlight_words([f" {log_level} "], self.LOG_LEVELS[log_level][0]) |
| 45 | + text.stylize_before(self.LOG_LEVELS[log_level][1]) |
| 46 | + if logger_name := groups.get("logger_name", None): |
| 47 | + text.highlight_words([logger_name], "cyan") |
| 48 | + if process_name := groups.get("process_name", None): |
| 49 | + text.highlight_words([process_name], "bold cyan") |
| 50 | + if message := groups.get("message", None): |
| 51 | + text.highlight_words([message], "dim" if log_level == "DEBUG" else "") |
| 52 | + |
| 53 | + return None, line, text |
| 54 | + |
| 55 | + |
| 56 | +class NextflowRegexLogFormatTwo(LogFormat): |
| 57 | + REGEX = re.compile(".*?") |
| 58 | + highlighter = LogHighlighter() |
| 59 | + |
| 60 | + def parse(self, line: str) -> ParseResult | None: |
| 61 | + match = self.REGEX.fullmatch(line) |
| 62 | + if match is None: |
| 63 | + return None |
| 64 | + |
| 65 | + text = Text.from_ansi(line) |
| 66 | + text.stylize_before("dim") |
| 67 | + groups = match.groupdict() |
| 68 | + if process := groups.get("process", None): |
| 69 | + text.highlight_words([process], "blue not dim") |
| 70 | + if process_name := groups.get("process_name", None): |
| 71 | + text.highlight_words([process_name], "bold cyan not dim") |
| 72 | + |
| 73 | + return None, line, text |
| 74 | + |
| 75 | + |
| 76 | +class NextflowRegexLogFormatThree(LogFormat): |
| 77 | + REGEX = re.compile(".*?") |
| 78 | + CHANNEL_TYPES = { |
| 79 | + "(value)": "green", |
| 80 | + "(cntrl)": "yellow", |
| 81 | + "(queue)": "magenta", |
| 82 | + } |
| 83 | + highlighter = LogHighlighter() |
| 84 | + |
| 85 | + def parse(self, line: str) -> ParseResult | None: |
| 86 | + match = self.REGEX.fullmatch(line) |
| 87 | + if match is None: |
| 88 | + return None |
| 89 | + |
| 90 | + text = Text.from_ansi(line) |
| 91 | + groups = match.groupdict() |
| 92 | + if port := groups.get("port", None): |
| 93 | + text.highlight_words([port], "blue") |
| 94 | + if channel_type := groups.get("channel_type", None): |
| 95 | + text.highlight_words([channel_type], self.CHANNEL_TYPES[channel_type]) |
| 96 | + if channel_state := groups.get("channel_state", None): |
| 97 | + text.highlight_words([channel_state], "cyan" if channel_state == "OPEN" else "yellow") |
| 98 | + text.highlight_words(["; channel:"], "dim") |
| 99 | + if channel_name := groups.get("channel_name", None): |
| 100 | + text.highlight_words([channel_name], "cyan") |
| 101 | + |
| 102 | + return None, line, text |
| 103 | + |
| 104 | + |
| 105 | +class NextflowRegexLogFormatFour(LogFormat): |
| 106 | + REGEX = re.compile(".*?") |
| 107 | + highlighter = LogHighlighter() |
| 108 | + |
| 109 | + def parse(self, line: str) -> ParseResult | None: |
| 110 | + match = self.REGEX.fullmatch(line) |
| 111 | + if match is None: |
| 112 | + return None |
| 113 | + |
| 114 | + text = Text.from_ansi(line) |
| 115 | + text.stylize_before("dim") |
| 116 | + groups = match.groupdict() |
| 117 | + text.highlight_words(["status="], "dim") |
| 118 | + if status := groups.get("status", None): |
| 119 | + text.highlight_words([status], "cyan not dim") |
| 120 | + |
| 121 | + return None, line, text |
| 122 | + |
| 123 | + |
| 124 | +class NextflowRegexLogFormatFive(LogFormat): |
| 125 | + REGEX = re.compile(".*?") |
| 126 | + highlighter = LogHighlighter() |
| 127 | + |
| 128 | + def parse(self, line: str) -> ParseResult | None: |
| 129 | + match = self.REGEX.fullmatch(line) |
| 130 | + if match is None: |
| 131 | + return None |
| 132 | + |
| 133 | + text = Text.from_ansi(line) |
| 134 | + text.stylize_before("dim") |
| 135 | + groups = match.groupdict() |
| 136 | + if script_id := groups.get("script_id", None): |
| 137 | + text.highlight_words([script_id], "blue") |
| 138 | + if script_path := groups.get("script_path", None): |
| 139 | + text.highlight_words([script_path], "magenta") |
| 140 | + |
| 141 | + return None, line, text |
| 142 | + |
| 143 | + |
| 144 | +class NextflowLogFormat(NextflowRegexLogFormatOne): |
| 145 | + REGEX = re.compile( |
| 146 | + r"(?P<date>\w+-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) (?P<thread>\[.*\]?) (?P<log_level>\w+)\s+(?P<logger_name>[\w\.]+) - (?P<message>.*?)$" |
| 147 | + ) |
| 148 | + |
| 149 | + |
| 150 | +class NextflowLogFormatActiveProcess(NextflowRegexLogFormatTwo): |
| 151 | + REGEX = re.compile(r"^(?P<marker>\[process\]) (?P<process>.*?)(?P<process_name>[^:]+?)?$") |
| 152 | + |
| 153 | + |
| 154 | +class NextflowLogFormatActiveProcessDetails(NextflowRegexLogFormatThree): |
| 155 | + REGEX = re.compile( |
| 156 | + r" (?P<port>port \d+): (?P<channel_type>\((value|queue|cntrl)\)) (?P<channel_state>\S+)\s+; channel: (?P<channel_name>.*?)$" |
| 157 | + ) |
| 158 | + |
| 159 | + |
| 160 | +class NextflowLogFormatActiveProcessStatus(NextflowRegexLogFormatFour): |
| 161 | + REGEX = re.compile(r"^ status=(?P<status>.*?)?$") |
| 162 | + |
| 163 | + |
| 164 | +class NextflowLogFormatScriptParse(NextflowRegexLogFormatFive): |
| 165 | + REGEX = re.compile(r"^ (?P<script_id>Script_\w+:) (?P<script_path>.*?)$") |
| 166 | + |
| 167 | + |
| 168 | +def nextflow_formatters(formats): |
| 169 | + return [ |
| 170 | + NextflowLogFormat(), |
| 171 | + NextflowLogFormatActiveProcess(), |
| 172 | + NextflowLogFormatActiveProcessDetails(), |
| 173 | + NextflowLogFormatActiveProcessStatus(), |
| 174 | + NextflowLogFormatScriptParse(), |
| 175 | + ] |
| 176 | + |
| 177 | + |
| 178 | +def nextflow_format_parser(format_parser): |
| 179 | + class FormatParser(format_parser): |
| 180 | + """Parses a log line.""" |
| 181 | + |
| 182 | + def __init__(self) -> None: |
| 183 | + super().__init__() |
| 184 | + self._log_status = "" |
| 185 | + |
| 186 | + def parse(self, line: str) -> ParseResult: |
| 187 | + """Parse a line.""" |
| 188 | + |
| 189 | + for logtype in ["DEBUG", "INFO", "WARN", "ERROR"]: |
| 190 | + if logtype in line: |
| 191 | + self._log_status = logtype |
| 192 | + return super().parse(line) |
| 193 | + text = Text(line) |
| 194 | + if text.plain == text.markup: |
| 195 | + if self._log_status == "DEBUG": |
| 196 | + text.stylize("dim") |
| 197 | + if self._log_status == "WARN": |
| 198 | + text.stylize("yellow") |
| 199 | + if self._log_status == "ERROR": |
| 200 | + text.stylize("red") |
| 201 | + return None, line, text |
| 202 | + |
| 203 | + return FormatParser |
0 commit comments