revamp ip address demo, buffered background IP submit with upsert, parameterized SQL with psycopg2 cursor & execute_values

This commit is contained in:
2024-10-24 04:04:09 -05:00
parent 25177a3346
commit c0d135d8a8

View File

@@ -1,26 +1,94 @@
import logging
import random
from collections import defaultdict
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from datetime import datetime
from typing import AsyncIterator
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from dotenv import load_dotenv
from fastapi import FastAPI, Request, Response, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi_cache import FastAPICache
from fastapi_cache.backends.inmemory import InMemoryBackend
from fastapi_cache.decorator import cache
from fastapi_utils.tasks import repeat_every
import human_readable
from linkpulse.utilities import get_ip, hide_ip, pluralize
from peewee import PostgresqlDatabase
from psycopg2.extras import execute_values
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
load_dotenv(dotenv_path=".env")
from linkpulse import models # type: ignore
from linkpulse import models, responses # type: ignore
db: PostgresqlDatabase = models.BaseModel._meta.database
def flush_ips():
if len(app.state.buffered_updates) == 0:
return
try:
with db.atomic():
sql = """
WITH updates (ip, last_seen, increment) AS (VALUES %s)
INSERT INTO ipaddress (ip, last_seen, count)
SELECT ip, last_seen, increment
FROM updates
ON CONFLICT (ip)
DO UPDATE
SET count = ipaddress.count + EXCLUDED.count, last_seen = EXCLUDED.last_seen;
"""
rows = [
(ip, data.last_seen, data.count)
for ip, data in app.state.buffered_updates.items()
]
cur = db.cursor()
execute_values(cur, sql, rows)
except:
print("Failed to flush IPs to the database.")
i = len(app.state.buffered_updates)
print("Flushed {} IP{} to the database.".format(i, pluralize(i)))
# Finish up
app.state.buffered_updates.clear()
scheduler = BackgroundScheduler()
scheduler.add_job(flush_ips, IntervalTrigger(seconds=5))
@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
FastAPICache.init(backend=InMemoryBackend(), prefix="fastapi-cache", cache_status_header="X-Cache")
FastAPICache.init(
backend=InMemoryBackend(), prefix="fastapi-cache", cache_status_header="X-Cache"
)
random.seed(42)
app.state.ip_pool = [
".".join(str(random.randint(0, 255)) for _ in range(4)) for _ in range(50)
]
app.state.buffered_updates = defaultdict(IPCounter)
scheduler.start()
yield
scheduler.shutdown()
flush_ips()
@dataclass
class IPCounter:
# Note: This is not the true 'seen' count, but the count of how many times the IP has been seen since the last flush.
count: int = 0
last_seen: datetime = field(default_factory=datetime.now)
app = FastAPI(lifespan=lifespan)
@@ -65,32 +133,32 @@ async def get_migration():
return {"name": name, "migrated_at": migrated_at}
@app.get("/api/test")
async def get_current_time(request: Request):
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
user_ip = request.headers.get("X-Forwarded-For")
if not user_ip:
# Fallback, probably not on a proxy
user_ip = request.client.host
response = {"time": current_time, "ip": user_ip}
@app.get("/api/ips")
async def get_ips(request: Request, response: Response):
"""
Returns a list of partially redacted IP addresses, as well as submitting the user's IP address to the database (buffered).
"""
now = datetime.now()
# user_ip = get_ip(request)
user_ip = random.choice(app.state.ip_pool)
if user_ip is None:
print("No IP found!")
response.status_code = status.HTTP_403_FORBIDDEN
return {"error": "Unable to handle request."}
# Create one record
new_ip, created = models.IPAddress.get_or_create(
ip=user_ip, defaults={"lastSeen": datetime.now()}
# Update the buffered updates
app.state.buffered_updates[user_ip].count += 1
app.state.buffered_updates[user_ip].last_seen = now
# Return the IP addresses
latest_ips = (
models.IPAddress.select(models.IPAddress.ip, models.IPAddress.last_seen, models.IPAddress.count)
.order_by(models.IPAddress.last_seen.desc())
.limit(10)
)
if not created:
new_ip.lastSeen = datetime.now()
result = new_ip.save()
print(result, new_ip)
# Query all records
for ip in models.IPAddress.select():
print(ip.ip, ip.lastSeen)
message = request.query_params.get("message")
if message:
response["message"] = message
return response
return {"ips": [responses.SeenIP(ip=hide_ip(ip.ip), last_seen=human_readable.date_time(ip.last_seen), count=ip.count) for ip in latest_ips]}