377 lines
10 KiB
Python
377 lines
10 KiB
Python
"""Configure logging."""
|
|
|
|
import base64
|
|
from enum import Enum
|
|
from enum import IntEnum
|
|
from enum import auto
|
|
import logging
|
|
import pathlib
|
|
import re
|
|
import sys
|
|
import traceback
|
|
from typing import Self
|
|
|
|
from PIL.ImageColor import getrgb
|
|
import pendulum
|
|
|
|
|
|
class ANSIColor(IntEnum):
|
|
BLACK = 0
|
|
RED = 1
|
|
GREEN = 2
|
|
YELLOW = 3
|
|
BLUE = 4
|
|
MAGENTA = 5
|
|
CYAN = 6
|
|
WHITE = 8
|
|
|
|
|
|
class Mode(Enum):
|
|
C0 = auto()
|
|
C1 = auto()
|
|
|
|
|
|
class ANSI:
|
|
mode: Mode
|
|
|
|
SOH = "\u0001"
|
|
STX = "\u0002"
|
|
BEL = "\u0007"
|
|
BS = "\u0008"
|
|
HT = "\u0009"
|
|
LF = "\u000a"
|
|
VT = "\u000b"
|
|
FF = "\u000c"
|
|
CR = "\u000d"
|
|
ESC = "\u001b"
|
|
DEL = "\u007f"
|
|
|
|
def __init__(self, mode: Mode = Mode.C0):
|
|
self.mode = mode
|
|
|
|
@property
|
|
def DCS(self: Self) -> str:
|
|
"""Device Control String"""
|
|
match self.mode:
|
|
case Mode.C1:
|
|
return "\u008d"
|
|
case Mode.C0 | _:
|
|
return f"{self.ESC}P"
|
|
|
|
@property
|
|
def CSI(self: Self) -> str:
|
|
"""Control Sequence Introducer"""
|
|
match self.mode:
|
|
case Mode.C1:
|
|
return "\u009b"
|
|
case Mode.C0 | _:
|
|
return f"{self.ESC}["
|
|
|
|
@property
|
|
def ST(self: Self) -> str:
|
|
"""String Terminator"""
|
|
match self.mode:
|
|
case Mode.C1:
|
|
return "\u009c"
|
|
case Mode.C0 | _:
|
|
return f"{self.ESC}\\"
|
|
|
|
@property
|
|
def OSC(self: Self) -> str:
|
|
"""Operating System Command"""
|
|
match self.mode:
|
|
case Mode.C1:
|
|
return "\u009d"
|
|
case Mode.C0 | _:
|
|
return f"{self.ESC}]"
|
|
|
|
def __getattr__(self: Self, item: str) -> str:
|
|
return self.color(item)
|
|
|
|
def color(self: Self, item: str) -> str:
|
|
item = item.upper()
|
|
extra = ""
|
|
offset = 30
|
|
|
|
if item.startswith("BACKGROUND_"):
|
|
offset = 40
|
|
item = item[11:]
|
|
|
|
if item.startswith("BRIGHT_"):
|
|
extra = ";1"
|
|
item = item[7:]
|
|
|
|
try:
|
|
color = ANSIColor[item]
|
|
return f"{self.CSI}1;{color+offset:d}{extra}m"
|
|
except KeyError:
|
|
return self.fg24bitrgb(*(getrgb(item)[:3]))
|
|
|
|
@property
|
|
def yellow(self: Self) -> str:
|
|
return self.color("yellow")
|
|
|
|
@property
|
|
def white(self: Self) -> str:
|
|
return self.color("white")
|
|
|
|
@property
|
|
def green(self: Self) -> str:
|
|
return self.color("green")
|
|
|
|
@property
|
|
def blue(self: Self) -> str:
|
|
return self.color("blue")
|
|
|
|
@property
|
|
def red(self: Self) -> str:
|
|
return self.color("red")
|
|
|
|
@property
|
|
def reset(self: Self) -> str:
|
|
return f"{self.CSI}0m"
|
|
|
|
@property
|
|
def bold(self: Self) -> str:
|
|
return f"{self.CSI}1m"
|
|
|
|
@property
|
|
def faint(self: Self) -> str:
|
|
return f"{self.CSI}2m"
|
|
|
|
@property
|
|
def italic(self: Self) -> str:
|
|
return f"{self.CSI}3m"
|
|
|
|
@property
|
|
def underline(self: Self) -> str:
|
|
return f"{self.CSI}4m"
|
|
|
|
@property
|
|
def reversed(self: Self) -> str:
|
|
return f"{self.CSI}7m"
|
|
|
|
def icon_name_and_window_title(self: Self, text: str) -> str:
|
|
return f"{self.OSC}0;{text}{self.ST}"
|
|
|
|
def icon_name(self: Self, text: str) -> str:
|
|
return f"{self.OSC}1;{text}{self.ST}"
|
|
|
|
def window_title(self: Self, text: str) -> str:
|
|
return f"{self.OSC}2;{text}{self.ST}"
|
|
|
|
def hyperlink(self: Self, link: str, text: str) -> str:
|
|
return f"{self.OSC}8;;{link}{self.ST}{text}{self.OSC}8;;{self.ST}"
|
|
|
|
def post_notification(self: Self, message: str) -> str:
|
|
return f"{self.OSC}9;{message}{self.ST}"
|
|
|
|
def write_to_pastboard(self: Self, pasteboard: str, data: str) -> str:
|
|
return f"{self.OSC}52;{pasteboard};{base64.b64encode(data).decode()}{self.ST}"
|
|
|
|
def query_pasteboard(self: Self, pasteboard: str) -> str:
|
|
return f"{self.OSC}52;{pasteboard};?{self.ST}"
|
|
|
|
def copy_to_clipboard(self: Self, clipboard_name: str) -> str:
|
|
return f"{self.OSC}1337;CopyToClipboard={clipboard_name}{self.ST}"
|
|
|
|
def end_copy(self: Self) -> str:
|
|
return f"{self.OSC}1337;EndCopy{self.ST}"
|
|
|
|
def clear_scrollback_history(self: Self) -> str:
|
|
return f"{self.OSC}1337;ClearScrollback{self.ST}"
|
|
|
|
def set_current_directory(self: Self, dir: str) -> str:
|
|
return f"{self.OSC}1337;CurrentDir={dir}{self.ST}"
|
|
|
|
def change_profile(self: Self, new_profile_name: str) -> str:
|
|
return f"{self.OSC}1337;SetProfile={new_profile_name}{self.ST}"
|
|
|
|
def steal_focus(self: Self) -> str:
|
|
return f"{self.OSC}1337;StealFocus{self.ST}"
|
|
|
|
def fg8bit(self: Self, code: int) -> str:
|
|
assert code >= 0
|
|
assert code < 256
|
|
code = f"{self.CSI}38;5;{code}m"
|
|
|
|
def bg8bit(self: Self, code: int) -> str:
|
|
assert code >= 0
|
|
assert code < 256
|
|
return f"{self.CSI}48;5;{code}m"
|
|
|
|
def fg24bitrgb(self: Self, red: int, green: int, blue: int) -> str:
|
|
assert red >= 0
|
|
assert red < 256
|
|
assert green >= 0
|
|
assert green < 256
|
|
assert blue >= 0
|
|
assert blue < 256
|
|
return f"{self.CSI}38;2;{red};{green};{blue}m"
|
|
|
|
def bg24bitrgb(self: Self, red: int, green: int, blue: int) -> str:
|
|
assert red >= 0
|
|
assert red < 256
|
|
assert green >= 0
|
|
assert green < 256
|
|
assert blue >= 0
|
|
assert blue < 256
|
|
return f"{self.CSI}48;2;{red};{green};{blue}m"
|
|
|
|
# def fg24bitcmy(self, cyan: int, magenta: int, yellow: int) -> str:
|
|
# assert cyan >= 0
|
|
# assert cyan < 256
|
|
# assert magenta >= 0
|
|
# assert magenta < 256
|
|
# assert yellow >= 0
|
|
# assert yellow < 256
|
|
# return f"{self.OSC}38:2::{cyan};{magenta};{yellow}:::m"
|
|
|
|
|
|
ansi = ANSI()
|
|
|
|
|
|
class BashPromptBuilder:
|
|
ansi: ANSI
|
|
|
|
ESC = "\\e"
|
|
BEL = "\\a"
|
|
prompt = "\\$"
|
|
current_working_directory = "\\w"
|
|
username = "\\u"
|
|
hostname_short = "\\h"
|
|
hostname_long = "\\H"
|
|
|
|
@property
|
|
def RL_PROMPT_START_IGNORE(self: Self) -> str:
|
|
return self.ansi.SOH
|
|
|
|
@property
|
|
def RL_PROMPT_END_IGNORE(self: Self) -> str:
|
|
return self.ansi.STX
|
|
|
|
def __init__(self: Self, ansi: ANSI = ansi) -> None:
|
|
self.ansi = ansi
|
|
|
|
def wrap_nonprinting(self: Self, text: str) -> str:
|
|
return f"{self.RL_PROMPT_START_IGNORE}{text}{self.RL_PROMPT_END_IGNORE}"
|
|
|
|
def title_bar(self: Self, text: str) -> str:
|
|
return self.wrap_nonprinting(self.ansi.icon_name_and_window_title(text))
|
|
|
|
|
|
class JLogFormatter(logging.Formatter):
|
|
"""Format time the way it SHOULD be done."""
|
|
|
|
COLORS = {
|
|
"WARNING": ansi.yellow,
|
|
"INFO": ansi.white,
|
|
"DEBUG": ansi.blue,
|
|
"CRITICAL": ansi.yellow,
|
|
"ERROR": ansi.red,
|
|
}
|
|
|
|
color: bool
|
|
ascii_only: bool
|
|
|
|
def __init__(self: Self, color: bool = True, ascii_only: bool = False):
|
|
super().__init__("")
|
|
self.color = color
|
|
self.ascii_only = ascii_only
|
|
|
|
def format(self: Self, record: logging.LogRecord) -> str:
|
|
message = record.msg
|
|
|
|
if len(record.args) > 0:
|
|
message = record.msg % record.args
|
|
|
|
created = pendulum.from_timestamp(record.created).in_timezone("America/Chicago")
|
|
|
|
levelname_color = ""
|
|
if record.levelname in self.COLORS:
|
|
levelname_color = self.COLORS[record.levelname]
|
|
|
|
name = record.name
|
|
if len(name) > 25:
|
|
if not self.ascii_only:
|
|
name = name[:24] + "\u2026"
|
|
else:
|
|
name = name[:22] + "..."
|
|
|
|
pathname = record.pathname
|
|
if len(pathname) > 15:
|
|
if not self.ascii_only:
|
|
pathname = "\u2026" + pathname[-14:]
|
|
else:
|
|
pathname = "..." + pathname[:12]
|
|
|
|
prefix = (
|
|
f"{ansi.faint:s}{created:YYYY-MM-DD HH:mm:ss.SSSSSS Z}{ansi.reset:s} "
|
|
f"[{ansi.bold:s}{name:^25s}{ansi.reset:s}] {pathname:s}:{record.lineno:<4d} "
|
|
f"{levelname_color:s}{record.levelname:8s}{ansi.reset:s}"
|
|
)
|
|
|
|
msg = ""
|
|
for line in message.splitlines():
|
|
msg += f"\n{prefix} {line.rstrip():s}"
|
|
|
|
msg = msg.strip()
|
|
|
|
if record.exc_info is not None:
|
|
msg += f"\n{prefix:s} exception: {type(record.exc_info)}"
|
|
|
|
match record.exc_info:
|
|
case tuple():
|
|
formatted = traceback.format_exception(*record.exc_info)
|
|
# case logging._ExcInfoType():
|
|
# formatted = traceback.format_exception(*record.exc_info[2])
|
|
# case logging._SysExcInfoType():
|
|
# formatted = traceback.format_exception(*record.exc_info)
|
|
case _:
|
|
formatted = []
|
|
|
|
for chunk in formatted:
|
|
for line in chunk.splitlines():
|
|
msg += f"\n{prefix:s} {line.rstrip():s}"
|
|
|
|
if "\u001b" in msg:
|
|
msg += ansi.reset
|
|
|
|
if not self.color:
|
|
msg = re.sub(r"\u001b\[\d+[^m]*m", "", msg)
|
|
|
|
if self.ascii_only:
|
|
msg = msg.encode("ascii", errors="replace").decode("ascii")
|
|
|
|
return msg
|
|
|
|
|
|
def setup_logging(
|
|
filename: pathlib.Path | None = None, color: bool = True, ascii_only: bool = False
|
|
) -> None:
|
|
"""Set up logging."""
|
|
formatter = JLogFormatter(color=color, ascii_only=ascii_only)
|
|
|
|
root = logging.getLogger()
|
|
for handler in root.handlers:
|
|
root.removeHandler(handler)
|
|
handler.close()
|
|
|
|
root.setLevel(logging.DEBUG)
|
|
|
|
console = logging.StreamHandler(stream=sys.stderr)
|
|
console.setFormatter(formatter)
|
|
console.setLevel(logging.DEBUG)
|
|
|
|
root.addHandler(console)
|
|
|
|
if filename is not None:
|
|
file = logging.FileHandler(filename=filename, encoding="utf-8")
|
|
file.setFormatter(formatter)
|
|
file.setLevel(logging.DEBUG)
|
|
root.addHandler(file)
|
|
|
|
logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO)
|
|
logging.getLogger("aiohttp_retry").setLevel(logging.INFO)
|
|
logging.captureWarnings(True)
|