mirror of
https://github.com/Xevion/linkpulse.git
synced 2025-12-12 09:10:28 -06:00
reformat, add docs
This commit is contained in:
@@ -71,10 +71,13 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]:
|
|||||||
)
|
)
|
||||||
|
|
||||||
if is_development:
|
if is_development:
|
||||||
|
# 42 is the answer to everything
|
||||||
random.seed(42)
|
random.seed(42)
|
||||||
|
# Generate a pool of random IP addresses
|
||||||
app.state.ip_pool = [
|
app.state.ip_pool = [
|
||||||
".".join(str(random.randint(0, 255)) for _ in range(4)) for _ in range(50)
|
".".join(str(random.randint(0, 255)) for _ in range(4)) for _ in range(50)
|
||||||
]
|
]
|
||||||
|
|
||||||
app.state.buffered_updates = defaultdict(IPCounter)
|
app.state.buffered_updates = defaultdict(IPCounter)
|
||||||
|
|
||||||
scheduler.start()
|
scheduler.start()
|
||||||
@@ -130,6 +133,10 @@ async def health():
|
|||||||
@app.get("/api/migration")
|
@app.get("/api/migration")
|
||||||
@cache(expire=60)
|
@cache(expire=60)
|
||||||
async def get_migration():
|
async def get_migration():
|
||||||
|
"""
|
||||||
|
Returns the details of the most recent migration.
|
||||||
|
"""
|
||||||
|
# Kind of insecure, but this is just a demo thing to show that migratehistory is available.
|
||||||
cursor = db.execute_sql(
|
cursor = db.execute_sql(
|
||||||
"SELECT name, migrated_at FROM migratehistory ORDER BY migrated_at DESC LIMIT 1"
|
"SELECT name, migrated_at FROM migratehistory ORDER BY migrated_at DESC LIMIT 1"
|
||||||
)
|
)
|
||||||
@@ -148,7 +155,12 @@ async def get_ips(request: Request, response: Response):
|
|||||||
"""
|
"""
|
||||||
now = datetime.now()
|
now = datetime.now()
|
||||||
|
|
||||||
user_ip = get_ip(request) if not is_development else random.choice(app.state.ip_pool)
|
# Get the user's IP address
|
||||||
|
user_ip = (
|
||||||
|
get_ip(request) if not is_development else random.choice(app.state.ip_pool)
|
||||||
|
)
|
||||||
|
|
||||||
|
# If the IP address is not found, return an error
|
||||||
if user_ip is None:
|
if user_ip is None:
|
||||||
print("No IP found!")
|
print("No IP found!")
|
||||||
response.status_code = status.HTTP_403_FORBIDDEN
|
response.status_code = status.HTTP_403_FORBIDDEN
|
||||||
@@ -158,11 +170,22 @@ async def get_ips(request: Request, response: Response):
|
|||||||
app.state.buffered_updates[user_ip].count += 1
|
app.state.buffered_updates[user_ip].count += 1
|
||||||
app.state.buffered_updates[user_ip].last_seen = now
|
app.state.buffered_updates[user_ip].last_seen = now
|
||||||
|
|
||||||
# Return the IP addresses
|
# Query the latest IPs
|
||||||
latest_ips = (
|
latest_ips = (
|
||||||
models.IPAddress.select(models.IPAddress.ip, models.IPAddress.last_seen, models.IPAddress.count)
|
models.IPAddress.select(
|
||||||
|
models.IPAddress.ip, models.IPAddress.last_seen, models.IPAddress.count
|
||||||
|
)
|
||||||
.order_by(models.IPAddress.last_seen.desc())
|
.order_by(models.IPAddress.last_seen.desc())
|
||||||
.limit(10)
|
.limit(10)
|
||||||
)
|
)
|
||||||
|
|
||||||
return {"ips": [responses.SeenIP(ip=hide_ip(ip.ip), last_seen=human_readable.date_time(ip.last_seen), count=ip.count) for ip in latest_ips]}
|
return {
|
||||||
|
"ips": [
|
||||||
|
responses.SeenIP(
|
||||||
|
ip=hide_ip(ip.ip),
|
||||||
|
last_seen=human_readable.date_time(ip.last_seen),
|
||||||
|
count=ip.count,
|
||||||
|
)
|
||||||
|
for ip in latest_ips
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user