From bc0ef7728f08dc22aff8001a164b8be59f02c8c1 Mon Sep 17 00:00:00 2001 From: Xevion Date: Thu, 24 Oct 2024 04:23:35 -0500 Subject: [PATCH] reformat, add docs --- backend/linkpulse/app.py | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/backend/linkpulse/app.py b/backend/linkpulse/app.py index 2d238d5..b8d02b1 100644 --- a/backend/linkpulse/app.py +++ b/backend/linkpulse/app.py @@ -71,10 +71,13 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]: ) if is_development: + # 42 is the answer to everything random.seed(42) + # Generate a pool of random IP addresses app.state.ip_pool = [ ".".join(str(random.randint(0, 255)) for _ in range(4)) for _ in range(50) ] + app.state.buffered_updates = defaultdict(IPCounter) scheduler.start() @@ -130,6 +133,10 @@ async def health(): @app.get("/api/migration") @cache(expire=60) async def get_migration(): + """ + Returns the details of the most recent migration. + """ + # Kind of insecure, but this is just a demo thing to show that migratehistory is available. cursor = db.execute_sql( "SELECT name, migrated_at FROM migratehistory ORDER BY migrated_at DESC LIMIT 1" ) @@ -148,7 +155,12 @@ async def get_ips(request: Request, response: Response): """ now = datetime.now() - user_ip = get_ip(request) if not is_development else random.choice(app.state.ip_pool) + # Get the user's IP address + user_ip = ( + get_ip(request) if not is_development else random.choice(app.state.ip_pool) + ) + + # If the IP address is not found, return an error if user_ip is None: print("No IP found!") response.status_code = status.HTTP_403_FORBIDDEN @@ -158,11 +170,22 @@ async def get_ips(request: Request, response: Response): app.state.buffered_updates[user_ip].count += 1 app.state.buffered_updates[user_ip].last_seen = now - # Return the IP addresses + # Query the latest IPs latest_ips = ( - models.IPAddress.select(models.IPAddress.ip, models.IPAddress.last_seen, models.IPAddress.count) + models.IPAddress.select( + models.IPAddress.ip, models.IPAddress.last_seen, models.IPAddress.count + ) .order_by(models.IPAddress.last_seen.desc()) .limit(10) ) - return {"ips": [responses.SeenIP(ip=hide_ip(ip.ip), last_seen=human_readable.date_time(ip.last_seen), count=ip.count) for ip in latest_ips]} + return { + "ips": [ + responses.SeenIP( + ip=hide_ip(ip.ip), + last_seen=human_readable.date_time(ip.last_seen), + count=ip.count, + ) + for ip in latest_ips + ] + }