From 3a2ef750868d18eef01ad8166d9afabd0ff6fd79 Mon Sep 17 00:00:00 2001 From: Xevion Date: Fri, 1 Nov 2024 04:17:28 -0500 Subject: [PATCH] Add ASGI Request-Id correlation, add structlog LoggingMiddleware, overhaul all logging - minor formatting details, type fixes. --- backend/linkpulse/__main__.py | 19 ++-- backend/linkpulse/app.py | 31 +++--- backend/linkpulse/logging.py | 164 ++++++++++++++++++++++---------- backend/linkpulse/middleware.py | 51 ++++++++++ 4 files changed, 193 insertions(+), 72 deletions(-) create mode 100644 backend/linkpulse/middleware.py diff --git a/backend/linkpulse/__main__.py b/backend/linkpulse/__main__.py index 2e4afbc..6c86e14 100644 --- a/backend/linkpulse/__main__.py +++ b/backend/linkpulse/__main__.py @@ -1,18 +1,24 @@ import sys import structlog -import linkpulse.logging + + logger = structlog.get_logger() + def main(*args): if args[0] == "serve": - import asyncio - from linkpulse.app import app + from linkpulse.logging import setup_logging from uvicorn import run + setup_logging() - logger.debug('Invoking uvicorn.run') - run('linkpulse.app:app', reload=True, host='0.0.0.0', access_log=True) + logger.debug("Invoking uvicorn.run") + run( + "linkpulse.app:app", + reload=True, + host="0.0.0.0" + ) elif args[0] == "migrate": from linkpulse.migrate import main @@ -27,7 +33,8 @@ def main(*args): from linkpulse.models import BaseModel, IPAddress # start REPL - from bpython import embed + from bpython import embed # type: ignore + embed(locals()) else: print("Invalid command: {}".format(args[0])) diff --git a/backend/linkpulse/app.py b/backend/linkpulse/app.py index 617e3c7..dfe2fa2 100644 --- a/backend/linkpulse/app.py +++ b/backend/linkpulse/app.py @@ -1,13 +1,14 @@ import logging import os import random + from collections import defaultdict from contextlib import asynccontextmanager from dataclasses import dataclass, field from datetime import datetime from typing import AsyncIterator -import structlog +from asgi_correlation_id import CorrelationIdMiddleware import human_readable from apscheduler.schedulers.background import BackgroundScheduler # type: ignore from apscheduler.triggers.interval import IntervalTrigger # type: ignore @@ -17,11 +18,13 @@ from fastapi_cache import FastAPICache from fastapi_cache.backends.inmemory import InMemoryBackend from fastapi_cache.decorator import cache from linkpulse.utilities import get_ip, hide_ip, pluralize +from linkpulse.middleware import LoggingMiddleware from peewee import PostgresqlDatabase from psycopg2.extras import execute_values -if not structlog.is_configured(): - import linkpulse.logging +from linkpulse.logging import setup_logging + +setup_logging(json_logs=False, log_level="DEBUG") load_dotenv(dotenv_path=".env") @@ -30,8 +33,7 @@ from linkpulse import models, responses # type: ignore # global variables is_development = os.getenv("ENVIRONMENT") == "development" db: PostgresqlDatabase = models.BaseModel._meta.database # type: ignore - -logger = structlog.get_logger(__name__) +logger = logging.getLogger() def flush_ips(): @@ -86,15 +88,15 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]: # Delete all randomly generated IP addresses with db.atomic(): - logging.info( + logger.info( "Deleting Randomized IP Addresses", {"ip_pool_count": len(app.state.ip_pool)}, ) query = models.IPAddress.delete().where( models.IPAddress.ip << app.state.ip_pool ) - rowcount = query.execute() - logger.info("Randomized IP Addresses deleted", {"rowcount": rowcount}) + row_count = query.execute() + logger.info("Randomized IP Addresses deleted", {"row_count": row_count}) FastAPICache.init( backend=InMemoryBackend(), prefix="fastapi-cache", cache_status_header="X-Cache" @@ -126,19 +128,20 @@ app = FastAPI(lifespan=lifespan) if is_development: from fastapi.middleware.cors import CORSMiddleware - origins = [ - "http://localhost", - "http://localhost:5173", - ] - app.add_middleware( CORSMiddleware, - allow_origins=origins, + allow_origins=[ + "http://localhost", + "http://localhost:5173", + ], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) +app.add_middleware(LoggingMiddleware) +app.add_middleware(CorrelationIdMiddleware) + @app.get("/health") async def health(): diff --git a/backend/linkpulse/logging.py b/backend/linkpulse/logging.py index 98976df..e18561f 100644 --- a/backend/linkpulse/logging.py +++ b/backend/linkpulse/logging.py @@ -3,59 +3,119 @@ import sys from typing import List import structlog -from structlog.stdlib import ProcessorFormatter -from structlog.types import Processor - -shared_processors: List[Processor] = [ - structlog.stdlib.add_log_level, - structlog.processors.CallsiteParameterAdder( - { - structlog.processors.CallsiteParameter.MODULE, - structlog.processors.CallsiteParameter.FILENAME, - structlog.processors.CallsiteParameter.LINENO, - } - ), - structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S.%f"), -] - -structlog_processors = shared_processors + [] -# Remove _record & _from_structlog. -logging_processors: List[Processor] = [ProcessorFormatter.remove_processors_meta] - -if sys.stderr.isatty(): - console_renderer = structlog.dev.ConsoleRenderer() - logging_processors.append(console_renderer) - structlog_processors.append(console_renderer) -else: - json_renderer = structlog.processors.JSONRenderer(indent=1, sort_keys=True) - structlog_processors.append(json_renderer) - logging_processors.append(json_renderer) - -structlog.configure( - processors=structlog_processors, - wrapper_class=structlog.stdlib.BoundLogger, - # logger_factory=structlog.stdlib.LoggerFactory(), - logger_factory=structlog.PrintLoggerFactory(sys.stderr), - context_class=dict, - cache_logger_on_first_use=True, -) +from structlog.types import EventDict, Processor +from itertools import chain -formatter = ProcessorFormatter( - # These run ONLY on `logging` entries that do NOT originate within - # structlog. - foreign_pre_chain=shared_processors, - # These run on ALL entries after the pre_chain is done. - processors=logging_processors, -) +def rename_event_key(_, __, event_dict: EventDict) -> EventDict: + """ + Renames the `event` key to `msg`, as Railway expects it in that form. + """ + print(event_dict) + event_dict["msg"] = event_dict.pop("event") + return event_dict -handler = logging.StreamHandler(sys.stderr) -# Use OUR `ProcessorFormatter` to format all `logging` entries. -handler.setFormatter(formatter) -logging.basicConfig(handlers=[handler], level=logging.INFO) -external_loggers = ["uvicorn.error", "uvicorn.access"] -for logger_name in external_loggers: - logger = logging.getLogger(logger_name) - logger.handlers = [handler] - logger.propagate = False +def drop_color_message_key(_, __, event_dict: EventDict) -> EventDict: + """ + Uvicorn logs the message a second time in the extra `color_message`, but we don't + need it. This processor drops the key from the event dict if it exists. + """ + event_dict.pop("color_message", None) + return event_dict + + +def setup_logging(json_logs: bool = False, log_level: str = "INFO"): + def flatten(n): + match n: + case []: return [] + case [[*hd], *tl]: return [*flatten(hd), *flatten(tl)] + case [hd, *tl]: return [hd, *flatten(tl)] + + shared_processors: List[Processor] = flatten([ + structlog.contextvars.merge_contextvars, + structlog.stdlib.add_logger_name, + structlog.stdlib.add_log_level, + structlog.stdlib.PositionalArgumentsFormatter(), + structlog.stdlib.ExtraAdder(), + drop_color_message_key, + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + ( + [ + rename_event_key, + # Format the exception only for JSON logs, as we want to pretty-print them when using the ConsoleRenderer + structlog.processors.format_exc_info, + ] + if json_logs + else [] + ), + ]) + + structlog.configure( + processors=[ + *shared_processors, + # Prepare event dict for `ProcessorFormatter`. + structlog.stdlib.ProcessorFormatter.wrap_for_formatter, + ], + logger_factory=structlog.stdlib.LoggerFactory(), + cache_logger_on_first_use=True, + ) + + log_renderer: structlog.types.Processor + if json_logs: + log_renderer = structlog.processors.JSONRenderer() + else: + log_renderer = structlog.dev.ConsoleRenderer() + + formatter = structlog.stdlib.ProcessorFormatter( + # These run ONLY on `logging` entries that do NOT originate within structlog. + foreign_pre_chain=shared_processors, + # These run on ALL entries after the pre_chain is done. + processors=[ + # Remove _record & _from_structlog. + structlog.stdlib.ProcessorFormatter.remove_processors_meta, + log_renderer, + ], + ) + + handler = logging.StreamHandler() + # Use OUR `ProcessorFormatter` to format all `logging` entries. + handler.setFormatter(formatter) + root_logger = logging.getLogger() + root_logger.addHandler(handler) + root_logger.setLevel(log_level.upper()) + + def configure_logger(name: str, clear: bool, propagate: bool) -> None: + logger = logging.getLogger(name) + if clear: + logger.handlers.clear() + logger.propagate = propagate + + for _log in ["uvicorn", "uvicorn.error"]: + # Clear the log handlers for uvicorn loggers, and enable propagation + # so the messages are caught by our root logger and formatted correctly + # by structlog + configure_logger(_log, clear=True, propagate=False) + + # Since we re-create the access logs ourselves, to add all information + # in the structured log (see the `logging_middleware` in main.py), we clear + # the handlers and prevent the logs to propagate to a logger higher up in the + # hierarchy (effectively rendering them silent). + configure_logger("uvicorn.access", clear=True, propagate=False) + + def handle_exception(exc_type, exc_value, exc_traceback): + """ + Log any uncaught exception instead of letting it be printed by Python + (but leave KeyboardInterrupt untouched to allow users to Ctrl+C to stop) + See https://stackoverflow.com/a/16993115/3641865 + """ + if issubclass(exc_type, KeyboardInterrupt): + sys.__excepthook__(exc_type, exc_value, exc_traceback) + return + + root_logger.error( + "Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback) + ) + + sys.excepthook = handle_exception diff --git a/backend/linkpulse/middleware.py b/backend/linkpulse/middleware.py new file mode 100644 index 0000000..ac1b8d7 --- /dev/null +++ b/backend/linkpulse/middleware.py @@ -0,0 +1,51 @@ +import time +from asgi_correlation_id import correlation_id +import structlog + +from fastapi import FastAPI, Request, Response +from starlette.middleware.base import BaseHTTPMiddleware + + + +class LoggingMiddleware(BaseHTTPMiddleware): + def __init__(self, app: FastAPI): + super().__init__(app) + self.access_logger = structlog.get_logger("api.access") + + async def dispatch(self, request: Request, call_next) -> Response: + structlog.contextvars.clear_contextvars() + + # These context vars will be added to all log entries emitted during the request + request_id = correlation_id.get() + structlog.contextvars.bind_contextvars(request_id=request_id) + + start_time = time.perf_counter_ns() + # If the call_next raises an error, we still want to return our own 500 response, + # so we can add headers to it (process time, request ID...) + response = Response(status_code=500) + try: + response = await call_next(request) + except Exception: + # TODO: Validate that we don't swallow exceptions (unit test?) + structlog.stdlib.get_logger("api.error").exception("Uncaught exception") + raise + finally: + process_time = time.perf_counter_ns() - start_time + self.access_logger.info( + "Request", + http={ + "url": str(request.url), + "query": dict(request.query_params), + "status_code": response.status_code, + "method": request.method, + "request_id": request_id, + "version": request.scope["http_version"], + }, + client={"ip": request.client.host, "port": request.client.port} if request.client else None, + duration=process_time, + ) + + # TODO: Is this being returned in production? We shouldn't be leaking this. + response.headers["X-Process-Time"] = str(process_time / 10 ** 9) + + return response \ No newline at end of file