From c0d135d8a8b23c9b7b71a5c89d79ad52484cf318 Mon Sep 17 00:00:00 2001 From: Xevion Date: Thu, 24 Oct 2024 04:04:09 -0500 Subject: [PATCH] revamp ip address demo, buffered background IP submit with upsert, parameterized SQL with psycopg2 cursor & execute_values --- backend/linkpulse/app.py | 124 ++++++++++++++++++++++++++++++--------- 1 file changed, 96 insertions(+), 28 deletions(-) diff --git a/backend/linkpulse/app.py b/backend/linkpulse/app.py index 042c0ad..69b9bf2 100644 --- a/backend/linkpulse/app.py +++ b/backend/linkpulse/app.py @@ -1,26 +1,94 @@ +import logging +import random +from collections import defaultdict from contextlib import asynccontextmanager +from dataclasses import dataclass, field from datetime import datetime from typing import AsyncIterator -from fastapi import FastAPI, Request -from fastapi.middleware.cors import CORSMiddleware + from dotenv import load_dotenv +from fastapi import FastAPI, Request, Response, status +from fastapi.middleware.cors import CORSMiddleware from fastapi_cache import FastAPICache from fastapi_cache.backends.inmemory import InMemoryBackend from fastapi_cache.decorator import cache +from fastapi_utils.tasks import repeat_every +import human_readable +from linkpulse.utilities import get_ip, hide_ip, pluralize from peewee import PostgresqlDatabase +from psycopg2.extras import execute_values +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.interval import IntervalTrigger load_dotenv(dotenv_path=".env") -from linkpulse import models # type: ignore +from linkpulse import models, responses # type: ignore db: PostgresqlDatabase = models.BaseModel._meta.database +def flush_ips(): + if len(app.state.buffered_updates) == 0: + return + + try: + with db.atomic(): + sql = """ + WITH updates (ip, last_seen, increment) AS (VALUES %s) + INSERT INTO ipaddress (ip, last_seen, count) + SELECT ip, last_seen, increment + FROM updates + ON CONFLICT (ip) + DO UPDATE + SET count = ipaddress.count + EXCLUDED.count, last_seen = EXCLUDED.last_seen; + """ + rows = [ + (ip, data.last_seen, data.count) + for ip, data in app.state.buffered_updates.items() + ] + + cur = db.cursor() + execute_values(cur, sql, rows) + except: + print("Failed to flush IPs to the database.") + + i = len(app.state.buffered_updates) + print("Flushed {} IP{} to the database.".format(i, pluralize(i))) + + # Finish up + app.state.buffered_updates.clear() + + +scheduler = BackgroundScheduler() +scheduler.add_job(flush_ips, IntervalTrigger(seconds=5)) + + @asynccontextmanager async def lifespan(_: FastAPI) -> AsyncIterator[None]: - FastAPICache.init(backend=InMemoryBackend(), prefix="fastapi-cache", cache_status_header="X-Cache") + FastAPICache.init( + backend=InMemoryBackend(), prefix="fastapi-cache", cache_status_header="X-Cache" + ) + + random.seed(42) + app.state.ip_pool = [ + ".".join(str(random.randint(0, 255)) for _ in range(4)) for _ in range(50) + ] + app.state.buffered_updates = defaultdict(IPCounter) + + scheduler.start() + yield + scheduler.shutdown() + flush_ips() + + +@dataclass +class IPCounter: + # Note: This is not the true 'seen' count, but the count of how many times the IP has been seen since the last flush. + count: int = 0 + last_seen: datetime = field(default_factory=datetime.now) + app = FastAPI(lifespan=lifespan) @@ -65,32 +133,32 @@ async def get_migration(): return {"name": name, "migrated_at": migrated_at} -@app.get("/api/test") -async def get_current_time(request: Request): - current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) - user_ip = request.headers.get("X-Forwarded-For") - if not user_ip: - # Fallback, probably not on a proxy - user_ip = request.client.host - response = {"time": current_time, "ip": user_ip} +@app.get("/api/ips") +async def get_ips(request: Request, response: Response): + """ + Returns a list of partially redacted IP addresses, as well as submitting the user's IP address to the database (buffered). + """ + now = datetime.now() + # user_ip = get_ip(request) + user_ip = random.choice(app.state.ip_pool) + if user_ip is None: + print("No IP found!") + response.status_code = status.HTTP_403_FORBIDDEN + return {"error": "Unable to handle request."} - # Create one record - new_ip, created = models.IPAddress.get_or_create( - ip=user_ip, defaults={"lastSeen": datetime.now()} + # Update the buffered updates + app.state.buffered_updates[user_ip].count += 1 + app.state.buffered_updates[user_ip].last_seen = now + + # Return the IP addresses + latest_ips = ( + models.IPAddress.select(models.IPAddress.ip, models.IPAddress.last_seen, models.IPAddress.count) + .order_by(models.IPAddress.last_seen.desc()) + .limit(10) ) - if not created: - new_ip.lastSeen = datetime.now() - result = new_ip.save() - print(result, new_ip) - # Query all records - for ip in models.IPAddress.select(): - print(ip.ip, ip.lastSeen) - - message = request.query_params.get("message") - if message: - response["message"] = message - - return response + return {"ips": [responses.SeenIP(ip=hide_ip(ip.ip), last_seen=human_readable.date_time(ip.last_seen), count=ip.count) for ip in latest_ips]}