Source code for pgtoolkit.log.parser

from __future__ import annotations

import re
from collections.abc import Iterable, Iterator, Mapping, MutableMapping, Sequence
from datetime import datetime, timedelta, timezone
from re import Pattern
from typing import Any, Callable


[docs] class LogParser: """Log parsing manager This object gather parsing parameters and trigger parsing logic. When parsing multiple files with the same parameters or when parsing multiple sets of lines, :class:`LogParser` object ease the initialization and preservation of parsing parameters. When parsing a single set of lines, one can use :func:`parse` helper instead. :param prefix_parser: An instance of :class:`PrefixParser`. :param filters: An instance of :class:`NoopFilters` """ def __init__( self, prefix_parser: PrefixParser, filters: NoopFilters | None = None ) -> None: self.prefix_parser = prefix_parser self.filters = filters or NoopFilters() def parse(self, fo: Iterable[str]) -> Iterator[Record | UnknownData]: """Yield records and unparsed data from file-like object ``fo`` :param fo: A line iterator such as a file object. :rtype: Iterator[:class:`Record` | :class:`UnknownData`] :returns: Yields either :class:`Record` or :class:`UnknownData` object. """ # Fast access variables to avoid attribute access overhead on each # line. parse_prefix = self.prefix_parser.parse stage1 = Record.parse_stage1 filter_stage1 = self.filters.stage1 filter_stage2 = self.filters.stage2 filter_stage3 = self.filters.stage3 for group in group_lines(fo): try: record = stage1(group) if filter_stage1(record): continue record.parse_stage2(parse_prefix) if filter_stage2(record): continue record.parse_stage3() if filter_stage3(record): continue except UnknownData as e: yield e else: yield record
[docs] def parse( fo: Iterable[str], prefix_fmt: str, filters: NoopFilters | None = None ) -> Iterator[Record | UnknownData]: """Parses log lines and yield :class:`Record` or :class:`UnknownData` objects. This is a helper around :class:`LogParser` and :`PrefixParser`. :param fo: A line iterator such as a file-like object. :param prefix_fmt: is exactly the value of ``log_line_prefix`` Postgresql settings. :param filters: is an object like :class:`NoopFilters` instance. See Example_ section for usage. """ parser = LogParser( PrefixParser.from_configuration(prefix_fmt), filters=filters, ) yield from parser.parse(fo)
def group_lines(lines: Iterable[str], cont: str = "\t") -> Iterator[list[str]]: # Group continuation lines according to continuation prefix. Yield a list # on lines supposed to belong to the same log record. group: list[str] = [] for line in lines: if not line.startswith(cont) and group: yield group group = [] group.append(line) if group: yield group def parse_isodatetime(raw: str) -> datetime: try: infos = ( int(raw[:4]), int(raw[5:7]), int(raw[8:10]), int(raw[11:13]), int(raw[14:16]), int(raw[17:19]), int(raw[20:23]) if raw[19] == "." else 0, ) except ValueError: raise ValueError("%s is not a known date" % raw) if raw[-3:] != "UTC": # We need tzdata for that. raise ValueError("%s not in UTC." % raw) return datetime(*infos) def parse_epoch(raw: str) -> datetime: epoch, ms = raw.split(".") return datetime.fromtimestamp(int(epoch), timezone.utc) + timedelta( microseconds=int(ms) )
[docs] class UnknownData(Exception): """Represents unparsable data. :class:`UnknownData` is throwable, you can raise it. .. attribute:: lines The list of unparsable strings. """ # UnknownData object is an exception to be throwable. def __init__(self, lines: Sequence[str]) -> None: self.lines = lines def __repr__(self) -> str: summary = str(self)[:32].replace("\n", "") return f"<{self.__class__.__name__} {summary}...>" def __str__(self) -> str: return "".join(self.lines)
[docs] class NoopFilters: """Basic filter doing nothing. Filters are grouped in an object to simplify the definition of a filtering policy. By subclassing :class:`NoopFilters`, you can implement simple filtering or heavy parameterized filtering policy from this API. If a filter method returns True, the record processing stops and the record is dropped. .. automethod:: stage1 .. automethod:: stage2 .. automethod:: stage3 """
[docs] def stage1(self, record: Record) -> None: """First stage filter. :param Record record: A new record. :returns: ``True`` if record must be dropped. ``record`` has only `prefix`, `severity` and `message_type` attributes. """
[docs] def stage2(self, record: Record) -> None: """Second stage filter. :param Record record: A new record. :returns: ``True`` if record must be dropped. ``record`` has attributes from stage 1 plus attributes from prefix analysis. See :class:`Record` for details. """
[docs] def stage3(self, record: Record) -> None: """Third stage filter. :param Record record: A new record. :returns: ``True`` if record must be dropped. ``record`` has attributes from stage 2 plus attributes from message analysis, depending on message type. """
[docs] class PrefixParser: """Extract record metadata from PostgreSQL log line prefix. .. automethod:: from_configuration """ # cf. # https://www.postgresql.org/docs/current/static/runtime-config-logging.html#GUC-LOG-LINE-PREFIX _datetime_pat = r"\d{4}-[01]\d-[0-3]\d [012]\d:[0-6]\d:[0-6]\d" # Pattern map of Status information. _status_pat = dict( # Application name a=r"(?P<application>\[unknown\]|\w+)?", # Session ID c=r"(?P<session>\[unknown\]|[0-9a-f.]+)", # Database name d=r"(?P<database>\[unknown\]|\w+)?", # SQLSTATE error code e=r"(?P<error>\d+)", # Remote host name or IP address h=r"(?P<remote_host>\[local\]|\[unknown\]|[a-z0-9_-]+|[0-9.:]+)?", # Command tag: type of session's current command i=r"(?P<command_tag>\w+)", # Number of the log line for each session or process, starting at 1. l=r"(?P<line_num>\d+)", # noqa # Time stamp with milliseconds m=r"(?P<timestamp_ms>" + _datetime_pat + r".\d{3} [A-Z]{2,5})", # Time stamp with milliseconds (as a Unix epoch) n=r"(?P<epoch>\d+\.\d+)", # Process ID p=r"(?P<pid>\d+)", # Remote host name or IP address, and remote port r=r"(?P<remote_host_r>\[local\]|\[unknown\]|[a-z0-9_-]+|[0-9.:]+\((?P<remote_port>\d+)\))?", # noqa # Process start time stamp s=r"(?P<start>" + _datetime_pat + " [A-Z]{2,5})", # Time stamp without milliseconds t=r"(?P<timestamp>" + _datetime_pat + " [A-Z]{2,5})", # User name u=r"(?P<user>\[unknown\]|\w+)?", # Virtual transaction ID (backendID/localXID) v=r"(?P<virtual_xid>\d+/\d+)", # Transaction ID (0 if none is assigned) x=r"(?P<xid>\d+)", ) # re to search for %… in log_line_prefix. _format_re = re.compile(r"%([" + "".join(_status_pat.keys()) + "])") # re to find %q separator in log_line_prefix. _q_re = re.compile(r"(?<!%)%q") _casts: dict[str, Callable[[str], int | datetime]] = { "epoch": parse_epoch, "line_num": int, "pid": int, "remote_port": int, "start": parse_isodatetime, "timestamp": parse_isodatetime, "timestamp_ms": parse_isodatetime, "xid": int, } @classmethod def mkpattern(cls, prefix: str) -> str: # Builds a pattern from each known fields. segments = cls._format_re.split(prefix) for i, segment in enumerate(segments): if i % 2: segments[i] = cls._status_pat[segment] else: segments[i] = re.escape(segment) return "".join(segments)
[docs] @classmethod def from_configuration(cls, log_line_prefix: str) -> PrefixParser: """Factory from log_line_prefix Parses log_line_prefix and build a prefix parser from this. :param log_line_prefix: ``log_line_prefix`` PostgreSQL setting. :return: A :class:`PrefixParser` instance. """ optional: str | None try: fixed, optional = cls._q_re.split(log_line_prefix) except ValueError: fixed, optional = log_line_prefix, None pattern = cls.mkpattern(fixed) if optional: pattern += r"(?:" + cls.mkpattern(optional) + ")?" return cls(re.compile(pattern), log_line_prefix)
def __init__(self, re_: Pattern[str], prefix_fmt: str | None = None) -> None: self.re_ = re_ self.prefix_fmt = prefix_fmt def __repr__(self) -> str: return f"<{self.__class__.__name__} '{self.prefix_fmt}'>" def parse(self, prefix: str) -> MutableMapping[str, Any]: # Parses the prefix line according to the inner regular expression. If # prefix does not match, raises an UnknownData. match = self.re_.search(prefix) if not match: raise UnknownData([prefix]) fields = match.groupdict() self.cast_fields(fields) # Ensure remote_host is fed either by %h or %r. remote_host = fields.pop("remote_host_r", None) if remote_host: fields.setdefault("remote_host", remote_host) # Ensure timestamp field is fed either by %m or %t. timestamp_ms = fields.pop("timestamp_ms", None) if timestamp_ms: fields.setdefault("timestamp", timestamp_ms) return fields @classmethod def cast_fields(cls, fields: MutableMapping[str, Any]) -> None: # In-place cast of values in fields dictionary. for k in fields: v = fields[k] if v is None: continue cast = cls._casts.get(k) if cast: fields[k] = cast(v)
[docs] class Record: """Log record object. Record object stores record fields and implements the different parse stages. A record is primarily composed by a prefix, a severity and a message. Actually, severity is mixed with message type. For example, a HINT: message has the same severity as ``LOG:`` and is actually a continuation message (see csvlog output to compare). Thus we can determine easily message type as this stage. :mod:`pgtoolkit.log` does not rewrite message severity. Once prefix, severity and message are split, the parser analyze prefix according to ``log_line_prefix`` parameter. Prefix can give a lot of information for filtering, but costs some CPU cycles to process. Finally, the parser analyze the message to extract information such as statement, hint, duration, execution plan, etc. depending on the message type. These stages are separated so that marshalling can apply filter between each stage. .. automethod:: as_dict Each record field is accessible as an attribute : .. attribute:: prefix Raw prefix line. .. attribute:: severity One of ``DEBUG1`` to ``DEBUG5``, ``CONTEXT``, ``DETAIL``, ``ERROR``, etc. .. attribute:: message_type A string identifying message type. One of ``unknown``, ``duration``, ``connection``, ``analyze``, ``checkpoint``. .. attribute:: raw_lines A record can span multiple lines. This attribute keep a reference on raw record lines of the record. .. attribute:: message_lines Just like :attr:`raw_lines`, but the first line only include message, without prefix nor severity. The following attributes correspond to prefix fields. See `log_line_prefix documentation <https://www.postgresql.org/docs/current/static/runtime-config-logging.html#GIC-LOG-LINE-PREFIX>`_ for details. .. attribute:: application_name .. attribute:: command_tag .. attribute:: database .. attribute:: epoch :type: :class:`datetime.datetime` .. attribute:: error .. attribute:: line_num :type: :class:`int` .. attribute:: pid :type: :class:`int` .. attribute:: remote_host .. attribute:: remote_port :type: :class:`int` .. attribute:: session .. attribute:: start :type: :class:`datetime.datetime` .. attribute:: timestamp :type: :class:`datetime.datetime` .. attribute:: user .. attribute:: virtual_xid .. attribute:: xid :type: :class:`int` If the log lines miss a field, the record won't have the attribute. Use :func:`hasattr` to check whether a record have a specific attribute. """ __slots__ = ( "__dict__", "message_lines", "prefix", "raw_lines", ) # This actually mix severities and message types since they are in the same # field. _severities = [ "CONTEXT", "DETAIL", "ERROR", "FATAL", "HINT", "INFO", "LOG", "NOTICE", "PANIC", "QUERY", "STATEMENT", "WARNING", ] _stage1_re = re.compile("(DEBUG[1-5]|" + "|".join(_severities) + "): ") _types_prefixes = { "duration: ": "duration", "connection ": "connection", "disconnection": "connection", "automatic analyze": "analyze", "checkpoint ": "checkpoint", } @classmethod def guess_type(cls, severity: str, message_start: str) -> str: # Guess message type from severity and the first line of the message. if severity in ("HINT", "STATEMENT"): return severity.lower() for prefix in cls._types_prefixes: if message_start.startswith(prefix): return cls._types_prefixes[prefix] return "unknown" @classmethod def parse_stage1(cls, lines: list[str]) -> Record: # Stage1: split prefix, severity and message. try: prefix, severity, message0 = cls._stage1_re.split(lines[0], maxsplit=1) except ValueError: raise UnknownData(lines) return cls( prefix=prefix, severity=severity, message_type=cls.guess_type(severity, message0), message_lines=[message0] + lines[1:], raw_lines=lines, ) def __init__( self, prefix: str, severity: str, message_type: str = "unknown", message_lines: list[str] | None = None, raw_lines: list[str] | None = None, **fields: str, ) -> None: self.prefix = prefix self.severity = severity self.message_type = message_type self.message_lines = message_lines or [] self.raw_lines = raw_lines or [] self.__dict__.update(fields) def __repr__(self) -> str: return "<{} {}: {:.32}...>".format( self.__class__.__name__, self.severity, self.message_lines[0].replace("\n", ""), ) def parse_stage2(self, parse_prefix: Callable[[str], Mapping[str, Any]]) -> None: # Stage 2. Analyze prefix fields self.__dict__.update(parse_prefix(self.prefix)) def parse_stage3(self) -> None: # Stage 3. Analyze message lines. self.message = "".join( [line.lstrip("\t").rstrip("\n") for line in self.message_lines] )
[docs] def as_dict(self) -> dict[str, str | object | datetime]: """Returns record fields as a :class:`dict`.""" return {k: v for k, v in self.__dict__.items()}