FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
Commit bb7bfa08 authored by E. Evstafiev's avatar E. Evstafiev :bulb:
Browse files

feat: add account lockout mechanism

parent 74b2b3ad
No related branches found
No related tags found
1 merge request!95Resolve "Implement exponential lockout"
Pipeline #694198 passed
...@@ -3,7 +3,7 @@ import random ...@@ -3,7 +3,7 @@ import random
import factory import factory
from django.utils import timezone from django.utils import timezone
from .models import Account, AccountDetails, AccountIdentifier from .models import Account, AccountDetails, AccountIdentifier, Lockout
from .reset_tokens import PasswordAppTokenResponse from .reset_tokens import PasswordAppTokenResponse
...@@ -91,3 +91,21 @@ class PasswordAppTokenResponseFactory: ...@@ -91,3 +91,21 @@ class PasswordAppTokenResponseFactory:
return PasswordAppTokenResponse( return PasswordAppTokenResponse(
token=faker.hexify(text="^^^^-^^^^-^^^^-^^^^", upper=True), token=faker.hexify(text="^^^^-^^^^-^^^^-^^^^", upper=True),
) )
class LockoutFactory(factory.django.DjangoModelFactory):
class Meta:
model = Lockout
identity_key = factory.Sequence(lambda n: f"user{n}")
date_of_birth = factory.Faker("date_of_birth")
attempts = 0
lockout_until = None
class Params:
locked_out = factory.Trait(
attempts=5,
lockout_until=factory.LazyFunction(
lambda: timezone.now() + timezone.timedelta(minutes=10)
),
)
# Generated by Django 4.2.19 on 2025-02-11 11:13
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("activate_account", "0006_accountidentifier_remove_account_unique_account_and_more"),
]
operations = [
migrations.CreateModel(
name="Lockout",
fields=[
(
"id",
models.AutoField(
auto_created=True, primary_key=True, serialize=False, verbose_name="ID"
),
),
("identity_key", models.CharField(max_length=100)),
("date_of_birth", models.DateField()),
("attempts", models.PositiveIntegerField(default=0)),
("lockout_until", models.DateTimeField(blank=True, null=True)),
],
),
migrations.AddConstraint(
model_name="lockout",
constraint=models.UniqueConstraint(
fields=("identity_key", "date_of_birth"), name="unique_lockout"
),
),
]
...@@ -66,3 +66,17 @@ class AccountIdentifier(models.Model): ...@@ -66,3 +66,17 @@ class AccountIdentifier(models.Model):
name="unique_account_identifier", name="unique_account_identifier",
), ),
] ]
class Lockout(models.Model):
identity_key = models.CharField(max_length=100) # CRSId or Last Name
date_of_birth = models.DateField()
attempts = models.PositiveIntegerField(default=0)
lockout_until = models.DateTimeField(null=True, blank=True)
class Meta:
constraints = [
models.UniqueConstraint(
fields=["identity_key", "date_of_birth"], name="unique_lockout"
)
]
import pytest
from django.db import IntegrityError, transaction
from django.utils import timezone
from activate_account.factories import LockoutFactory
from activate_account.models import Lockout
pytestmark = pytest.mark.django_db
def test_lockout_create():
lockout = LockoutFactory()
assert Lockout.objects.count() == 1
assert lockout.attempts == 0
assert lockout.lockout_until is None
def test_lockout_create_locked_out():
lockout = LockoutFactory(locked_out=True)
assert Lockout.objects.count() == 1
assert lockout.attempts > 0
assert lockout.lockout_until is not None
assert lockout.lockout_until > timezone.now()
def test_lockout_unique_constraint():
LockoutFactory(identity_key="user1", date_of_birth="2000-01-01")
with pytest.raises(IntegrityError):
with transaction.atomic():
LockoutFactory(identity_key="user1", date_of_birth="2000-01-01")
LockoutFactory(identity_key="user2", date_of_birth="2000-01-01")
LockoutFactory(identity_key="user1", date_of_birth="2001-02-02")
SESSION_GRANT_TYPE = "urn:devops.uis.cam.ac.uk:params:oauth:grant-type:new-user-credentials" SESSION_GRANT_TYPE = "urn:devops.uis.cam.ac.uk:params:oauth:grant-type:new-user-credentials"
LOCKOUT_ATTEMPTS = 3
LOCKOUT_INITIAL_SECONDS = 10
from datetime import timedelta
import structlog import structlog
from django.utils import timezone
from rest_framework import serializers from rest_framework import serializers
from activate_account.models import Account from activate_account.models import Account, Lockout
from authentication.constants import SESSION_GRANT_TYPE from authentication.constants import (
LOCKOUT_ATTEMPTS,
LOCKOUT_INITIAL_SECONDS,
SESSION_GRANT_TYPE,
)
from authentication.errors import InvalidGrantError, UnsupportedGrantTypeError from authentication.errors import InvalidGrantError, UnsupportedGrantTypeError
logger = structlog.get_logger(__name__) logger = structlog.get_logger(__name__)
...@@ -79,6 +86,21 @@ class TokenRequestSerializer(serializers.Serializer): ...@@ -79,6 +86,21 @@ class TokenRequestSerializer(serializers.Serializer):
raise serializers.ValidationError( raise serializers.ValidationError(
"At least one of crsid and last name must be provided" "At least one of crsid and last name must be provided"
) )
identity_key = data.get("crsid") or data.get("last_name")
date_of_birth = data["date_of_birth"]
lockout = Lockout.objects.filter(
identity_key=identity_key, date_of_birth=date_of_birth
).first()
if lockout and lockout.lockout_until and timezone.now() < lockout.lockout_until:
logger.warn(
"Account locked out",
identity_key=identity_key,
date_of_birth=date_of_birth,
lockout_until=lockout.lockout_until,
)
raise InvalidGrantError("Account locked out")
try: try:
if "crsid" in data: if "crsid" in data:
...@@ -94,6 +116,20 @@ class TokenRequestSerializer(serializers.Serializer): ...@@ -94,6 +116,20 @@ class TokenRequestSerializer(serializers.Serializer):
account_identifier__code=data["code"], account_identifier__code=data["code"],
) )
except Account.DoesNotExist: except Account.DoesNotExist:
if lockout:
lockout.attempts += 1
else:
lockout = Lockout(
identity_key=identity_key, date_of_birth=date_of_birth, attempts=1
)
if lockout.attempts >= LOCKOUT_ATTEMPTS:
lockout_duration = LOCKOUT_INITIAL_SECONDS * (
2 ** (lockout.attempts - LOCKOUT_ATTEMPTS)
)
lockout.lockout_until = timezone.now() + timedelta(seconds=lockout_duration)
lockout.save()
logger.warn( logger.warn(
"No matching user in token request.", "No matching user in token request.",
crsid=data.get("crsid"), crsid=data.get("crsid"),
...@@ -102,6 +138,9 @@ class TokenRequestSerializer(serializers.Serializer): ...@@ -102,6 +138,9 @@ class TokenRequestSerializer(serializers.Serializer):
) )
raise InvalidGrantError("No matching account") raise InvalidGrantError("No matching account")
if lockout:
lockout.delete()
# Set account in validated data # Set account in validated data
data["account"] = account data["account"] = account
......
from datetime import timedelta
import pytest
from django.utils import timezone
from freezegun import freeze_time
from activate_account.factories import AccountFactory, LockoutFactory
from activate_account.models import Lockout
from authentication.constants import (
LOCKOUT_ATTEMPTS,
LOCKOUT_INITIAL_SECONDS,
SESSION_GRANT_TYPE,
)
from authentication.errors import InvalidGrantError
from authentication.serializers import TokenRequestSerializer
def _build_data(identity_field, identity_value, identifier):
data = {
"grant_type": SESSION_GRANT_TYPE,
"date_of_birth": identifier.date_of_birth.strftime("%Y-%m-%d"),
"code": identifier.code,
}
data[identity_field] = identity_value
return data
@pytest.mark.parametrize("identity_field", ["crsid", "last_name"])
@pytest.mark.django_db
def test_lockout_existing_active(identity_field):
account = AccountFactory(account_identifier=True)
identifier = account.account_identifier.first()
identity_value = account.crsid if identity_field == "crsid" else identifier.last_name
future_time = timezone.now() + timedelta(minutes=5)
LockoutFactory(
identity_key=identity_value,
date_of_birth=identifier.date_of_birth,
attempts=LOCKOUT_ATTEMPTS,
lockout_until=future_time,
)
data = _build_data(identity_field, identity_value, identifier)
serializer = TokenRequestSerializer(data=data)
with pytest.raises(InvalidGrantError, match="Account locked out"):
serializer.is_valid(raise_exception=True)
@pytest.mark.parametrize("identity_field", ["crsid", "last_name"])
@pytest.mark.django_db
def test_lockout_created_on_invalid_credentials(identity_field):
account = AccountFactory(account_identifier=True)
identifier = account.account_identifier.first()
identity_value = account.crsid if identity_field == "crsid" else identifier.last_name
data = _build_data(identity_field, identity_value, identifier)
data["code"] = "wrong-code"
serializer = TokenRequestSerializer(data=data)
with pytest.raises(InvalidGrantError, match="No matching account"):
serializer.is_valid(raise_exception=True)
lockout = Lockout.objects.filter(
identity_key=identity_value, date_of_birth=identifier.date_of_birth
).first()
assert lockout is not None
assert lockout.attempts == 1
assert lockout.lockout_until is None
@pytest.mark.parametrize("identity_field", ["crsid", "last_name"])
@pytest.mark.django_db
def test_successful_login_clears_lockout(identity_field):
account = AccountFactory(account_identifier=True)
identifier = account.account_identifier.first()
identity_value = account.crsid if identity_field == "crsid" else identifier.last_name
past_time = timezone.now() - timedelta(minutes=1)
LockoutFactory(
identity_key=identity_value,
date_of_birth=identifier.date_of_birth,
attempts=3,
lockout_until=past_time,
)
data = _build_data(identity_field, identity_value, identifier)
serializer = TokenRequestSerializer(data=data)
serializer.is_valid(raise_exception=True)
assert "account" in serializer.validated_data
assert (
Lockout.objects.filter(
identity_key=identity_value, date_of_birth=identifier.date_of_birth
).first()
is None
)
@pytest.mark.parametrize("identity_field", ["crsid", "last_name"])
@pytest.mark.django_db
def test_exponential_backoff_lockout(identity_field):
with freeze_time("2025-01-01 00:00:00") as frozen_datetime:
account = AccountFactory(account_identifier=True)
identifier = account.account_identifier.first()
identity_value = account.crsid if identity_field == "crsid" else identifier.last_name
data = _build_data(identity_field, identity_value, identifier)
data["code"] = "wrong-code"
for i in range(LOCKOUT_ATTEMPTS):
serializer = TokenRequestSerializer(data=data)
with pytest.raises(InvalidGrantError, match="No matching account"):
serializer.is_valid(raise_exception=True)
lockout = Lockout.objects.filter(
identity_key=identity_value, date_of_birth=identifier.date_of_birth
).first()
if i < LOCKOUT_ATTEMPTS - 1:
assert lockout.lockout_until is None
else:
expected_duration = LOCKOUT_INITIAL_SECONDS * (2 ** (i + 1 - LOCKOUT_ATTEMPTS))
expected_lockout_time = timezone.now() + timedelta(seconds=expected_duration)
assert (lockout.lockout_until - expected_lockout_time).total_seconds() == 0
frozen_datetime.move_to(lockout.lockout_until + timedelta(seconds=1))
serializer = TokenRequestSerializer(data=data)
with pytest.raises(InvalidGrantError, match="No matching account"):
serializer.is_valid(raise_exception=True)
lockout = Lockout.objects.filter(
identity_key=identity_value, date_of_birth=identifier.date_of_birth
).first()
expected_duration = LOCKOUT_INITIAL_SECONDS * (2 ** (lockout.attempts - LOCKOUT_ATTEMPTS))
expected_lockout_time = timezone.now() + timedelta(seconds=expected_duration)
assert (lockout.lockout_until - expected_lockout_time).total_seconds() == 0
This diff is collapsed.
...@@ -53,6 +53,7 @@ pytest-env = "^0.8.2" ...@@ -53,6 +53,7 @@ pytest-env = "^0.8.2"
tox = "4.14.1" tox = "4.14.1"
openapi-spec-validator = "^0.7.1" openapi-spec-validator = "^0.7.1"
pyyaml = "^6.0.2" pyyaml = "^6.0.2"
freezegun = "^1.5.0"
[tool.poetry.group.dev.dependencies.coverage] [tool.poetry.group.dev.dependencies.coverage]
extras = [ "toml" ] extras = [ "toml" ]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment