mirror of
https://github.com/Xevion/linkpulse.git
synced 2025-12-07 13:15:39 -06:00
Add validate_session() with constraint tests
This commit is contained in:
@@ -4,3 +4,27 @@ from fastapi import APIRouter
|
|||||||
from linkpulse.models import User, Session
|
from linkpulse.models import User, Session
|
||||||
|
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
|
|
||||||
|
def validate_session(
|
||||||
|
token: str, user: bool = True
|
||||||
|
) -> Tuple[bool, bool, Optional[User]]:
|
||||||
|
"""
|
||||||
|
Given a token, validate that the session exists and is not expired.
|
||||||
|
|
||||||
|
This function has side effects:
|
||||||
|
- This function updates last_used if `user` is True.
|
||||||
|
- This function will invalidate the session if it is expired.
|
||||||
|
"""
|
||||||
|
# Check if session exists
|
||||||
|
session = Session.get_or_none(Session.token == token)
|
||||||
|
if session is None:
|
||||||
|
return False, False, None
|
||||||
|
|
||||||
|
# Check if session is expired
|
||||||
|
if session.is_expired(revoke=True):
|
||||||
|
return True, False, None
|
||||||
|
|
||||||
|
if user:
|
||||||
|
session.use()
|
||||||
|
return True, True, session.user
|
||||||
|
|||||||
@@ -6,6 +6,9 @@ from linkpulse.models import Session
|
|||||||
from linkpulse.tests.random import random_string
|
from linkpulse.tests.random import random_string
|
||||||
from linkpulse.tests.test_user import user
|
from linkpulse.tests.test_user import user
|
||||||
from linkpulse.utilities import utc_now
|
from linkpulse.utilities import utc_now
|
||||||
|
from linkpulse.routers.authentication import validate_session
|
||||||
|
|
||||||
|
from peewee import IntegrityError
|
||||||
|
|
||||||
logger = structlog.get_logger()
|
logger = structlog.get_logger()
|
||||||
|
|
||||||
@@ -49,3 +52,28 @@ def test_expiry_valid(session):
|
|||||||
|
|
||||||
def test_expiry_invalid(expired_session):
|
def test_expiry_invalid(expired_session):
|
||||||
assert expired_session.is_expired() is True
|
assert expired_session.is_expired() is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_session_constraint_token_length(user):
|
||||||
|
with pytest.raises(IntegrityError):
|
||||||
|
Session.create(
|
||||||
|
user=user, token=random_string(31), expiry=utc_now() + timedelta(hours=1)
|
||||||
|
)
|
||||||
|
Session.create(
|
||||||
|
user=user, token=random_string(32), expiry=utc_now() + timedelta(hours=1)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_session_constraint_expiry(user):
|
||||||
|
with pytest.raises(IntegrityError):
|
||||||
|
Session.create(user=user, token=random_string(31), expiry=utc_now())
|
||||||
|
Session.create(
|
||||||
|
user=user, token=random_string(32), expiry=utc_now() + timedelta(minutes=1)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_validate_session(db, session):
|
||||||
|
assert session.last_used is None
|
||||||
|
assert validate_session(session.token, user=True) == (True, True, session.user)
|
||||||
|
session = Session.get(Session.token == session.token)
|
||||||
|
assert session.last_used is not None
|
||||||
|
|||||||
Reference in New Issue
Block a user