Add ASGI Request-Id correlation, add structlog LoggingMiddleware, overhaul all logging

- minor formatting details, type fixes.
This commit is contained in:
2024-11-01 04:17:28 -05:00
parent a96631e81e
commit 3a2ef75086
4 changed files with 193 additions and 72 deletions

View File

@@ -1,18 +1,24 @@
import sys
import structlog
import linkpulse.logging
logger = structlog.get_logger()
def main(*args):
if args[0] == "serve":
import asyncio
from linkpulse.app import app
from linkpulse.logging import setup_logging
from uvicorn import run
setup_logging()
logger.debug('Invoking uvicorn.run')
run('linkpulse.app:app', reload=True, host='0.0.0.0', access_log=True)
logger.debug("Invoking uvicorn.run")
run(
"linkpulse.app:app",
reload=True,
host="0.0.0.0"
)
elif args[0] == "migrate":
from linkpulse.migrate import main
@@ -27,7 +33,8 @@ def main(*args):
from linkpulse.models import BaseModel, IPAddress
# start REPL
from bpython import embed
from bpython import embed # type: ignore
embed(locals())
else:
print("Invalid command: {}".format(args[0]))

View File

@@ -1,13 +1,14 @@
import logging
import os
import random
from collections import defaultdict
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from datetime import datetime
from typing import AsyncIterator
import structlog
from asgi_correlation_id import CorrelationIdMiddleware
import human_readable
from apscheduler.schedulers.background import BackgroundScheduler # type: ignore
from apscheduler.triggers.interval import IntervalTrigger # type: ignore
@@ -17,11 +18,13 @@ from fastapi_cache import FastAPICache
from fastapi_cache.backends.inmemory import InMemoryBackend
from fastapi_cache.decorator import cache
from linkpulse.utilities import get_ip, hide_ip, pluralize
from linkpulse.middleware import LoggingMiddleware
from peewee import PostgresqlDatabase
from psycopg2.extras import execute_values
if not structlog.is_configured():
import linkpulse.logging
from linkpulse.logging import setup_logging
setup_logging(json_logs=False, log_level="DEBUG")
load_dotenv(dotenv_path=".env")
@@ -30,8 +33,7 @@ from linkpulse import models, responses # type: ignore
# global variables
is_development = os.getenv("ENVIRONMENT") == "development"
db: PostgresqlDatabase = models.BaseModel._meta.database # type: ignore
logger = structlog.get_logger(__name__)
logger = logging.getLogger()
def flush_ips():
@@ -86,15 +88,15 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]:
# Delete all randomly generated IP addresses
with db.atomic():
logging.info(
logger.info(
"Deleting Randomized IP Addresses",
{"ip_pool_count": len(app.state.ip_pool)},
)
query = models.IPAddress.delete().where(
models.IPAddress.ip << app.state.ip_pool
)
rowcount = query.execute()
logger.info("Randomized IP Addresses deleted", {"rowcount": rowcount})
row_count = query.execute()
logger.info("Randomized IP Addresses deleted", {"row_count": row_count})
FastAPICache.init(
backend=InMemoryBackend(), prefix="fastapi-cache", cache_status_header="X-Cache"
@@ -126,19 +128,20 @@ app = FastAPI(lifespan=lifespan)
if is_development:
from fastapi.middleware.cors import CORSMiddleware
origins = [
"http://localhost",
"http://localhost:5173",
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_origins=[
"http://localhost",
"http://localhost:5173",
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(LoggingMiddleware)
app.add_middleware(CorrelationIdMiddleware)
@app.get("/health")
async def health():

View File

@@ -3,59 +3,119 @@ import sys
from typing import List
import structlog
from structlog.stdlib import ProcessorFormatter
from structlog.types import Processor
shared_processors: List[Processor] = [
structlog.stdlib.add_log_level,
structlog.processors.CallsiteParameterAdder(
{
structlog.processors.CallsiteParameter.MODULE,
structlog.processors.CallsiteParameter.FILENAME,
structlog.processors.CallsiteParameter.LINENO,
}
),
structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S.%f"),
]
structlog_processors = shared_processors + []
# Remove _record & _from_structlog.
logging_processors: List[Processor] = [ProcessorFormatter.remove_processors_meta]
if sys.stderr.isatty():
console_renderer = structlog.dev.ConsoleRenderer()
logging_processors.append(console_renderer)
structlog_processors.append(console_renderer)
else:
json_renderer = structlog.processors.JSONRenderer(indent=1, sort_keys=True)
structlog_processors.append(json_renderer)
logging_processors.append(json_renderer)
structlog.configure(
processors=structlog_processors,
wrapper_class=structlog.stdlib.BoundLogger,
# logger_factory=structlog.stdlib.LoggerFactory(),
logger_factory=structlog.PrintLoggerFactory(sys.stderr),
context_class=dict,
cache_logger_on_first_use=True,
)
from structlog.types import EventDict, Processor
from itertools import chain
formatter = ProcessorFormatter(
# These run ONLY on `logging` entries that do NOT originate within
# structlog.
foreign_pre_chain=shared_processors,
# These run on ALL entries after the pre_chain is done.
processors=logging_processors,
)
def rename_event_key(_, __, event_dict: EventDict) -> EventDict:
"""
Renames the `event` key to `msg`, as Railway expects it in that form.
"""
print(event_dict)
event_dict["msg"] = event_dict.pop("event")
return event_dict
handler = logging.StreamHandler(sys.stderr)
# Use OUR `ProcessorFormatter` to format all `logging` entries.
handler.setFormatter(formatter)
logging.basicConfig(handlers=[handler], level=logging.INFO)
external_loggers = ["uvicorn.error", "uvicorn.access"]
for logger_name in external_loggers:
logger = logging.getLogger(logger_name)
logger.handlers = [handler]
logger.propagate = False
def drop_color_message_key(_, __, event_dict: EventDict) -> EventDict:
"""
Uvicorn logs the message a second time in the extra `color_message`, but we don't
need it. This processor drops the key from the event dict if it exists.
"""
event_dict.pop("color_message", None)
return event_dict
def setup_logging(json_logs: bool = False, log_level: str = "INFO"):
def flatten(n):
match n:
case []: return []
case [[*hd], *tl]: return [*flatten(hd), *flatten(tl)]
case [hd, *tl]: return [hd, *flatten(tl)]
shared_processors: List[Processor] = flatten([
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.stdlib.ExtraAdder(),
drop_color_message_key,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
(
[
rename_event_key,
# Format the exception only for JSON logs, as we want to pretty-print them when using the ConsoleRenderer
structlog.processors.format_exc_info,
]
if json_logs
else []
),
])
structlog.configure(
processors=[
*shared_processors,
# Prepare event dict for `ProcessorFormatter`.
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
log_renderer: structlog.types.Processor
if json_logs:
log_renderer = structlog.processors.JSONRenderer()
else:
log_renderer = structlog.dev.ConsoleRenderer()
formatter = structlog.stdlib.ProcessorFormatter(
# These run ONLY on `logging` entries that do NOT originate within structlog.
foreign_pre_chain=shared_processors,
# These run on ALL entries after the pre_chain is done.
processors=[
# Remove _record & _from_structlog.
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
log_renderer,
],
)
handler = logging.StreamHandler()
# Use OUR `ProcessorFormatter` to format all `logging` entries.
handler.setFormatter(formatter)
root_logger = logging.getLogger()
root_logger.addHandler(handler)
root_logger.setLevel(log_level.upper())
def configure_logger(name: str, clear: bool, propagate: bool) -> None:
logger = logging.getLogger(name)
if clear:
logger.handlers.clear()
logger.propagate = propagate
for _log in ["uvicorn", "uvicorn.error"]:
# Clear the log handlers for uvicorn loggers, and enable propagation
# so the messages are caught by our root logger and formatted correctly
# by structlog
configure_logger(_log, clear=True, propagate=False)
# Since we re-create the access logs ourselves, to add all information
# in the structured log (see the `logging_middleware` in main.py), we clear
# the handlers and prevent the logs to propagate to a logger higher up in the
# hierarchy (effectively rendering them silent).
configure_logger("uvicorn.access", clear=True, propagate=False)
def handle_exception(exc_type, exc_value, exc_traceback):
"""
Log any uncaught exception instead of letting it be printed by Python
(but leave KeyboardInterrupt untouched to allow users to Ctrl+C to stop)
See https://stackoverflow.com/a/16993115/3641865
"""
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
root_logger.error(
"Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback)
)
sys.excepthook = handle_exception

View File

@@ -0,0 +1,51 @@
import time
from asgi_correlation_id import correlation_id
import structlog
from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
class LoggingMiddleware(BaseHTTPMiddleware):
def __init__(self, app: FastAPI):
super().__init__(app)
self.access_logger = structlog.get_logger("api.access")
async def dispatch(self, request: Request, call_next) -> Response:
structlog.contextvars.clear_contextvars()
# These context vars will be added to all log entries emitted during the request
request_id = correlation_id.get()
structlog.contextvars.bind_contextvars(request_id=request_id)
start_time = time.perf_counter_ns()
# If the call_next raises an error, we still want to return our own 500 response,
# so we can add headers to it (process time, request ID...)
response = Response(status_code=500)
try:
response = await call_next(request)
except Exception:
# TODO: Validate that we don't swallow exceptions (unit test?)
structlog.stdlib.get_logger("api.error").exception("Uncaught exception")
raise
finally:
process_time = time.perf_counter_ns() - start_time
self.access_logger.info(
"Request",
http={
"url": str(request.url),
"query": dict(request.query_params),
"status_code": response.status_code,
"method": request.method,
"request_id": request_id,
"version": request.scope["http_version"],
},
client={"ip": request.client.host, "port": request.client.port} if request.client else None,
duration=process_time,
)
# TODO: Is this being returned in production? We shouldn't be leaking this.
response.headers["X-Process-Time"] = str(process_time / 10 ** 9)
return response