FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects

verify incoming id token for API backends

Merged Dr Rich Wareham requested to merge issue-4-validate-gateway-token into main
Files
8
from django.test import TestCase
import secrets
from unittest import mock
from django.test import TestCase, override_settings
from google.auth.exceptions import GoogleAuthError
from identitylib.identifiers import Identifier, IdentifierSchemes
from rest_framework.exceptions import AuthenticationFailed
from rest_framework.test import APIRequestFactory
@@ -9,6 +13,7 @@ from apigatewayauth.authentication import (
)
@override_settings(API_GATEWAY_ENFORCE_ID_TOKEN_VERIFICATION=True)
class APIGatewayAuthTestCase(TestCase):
def setUp(self):
super().setUp()
@@ -16,10 +21,30 @@ class APIGatewayAuthTestCase(TestCase):
self.request_factory = APIRequestFactory()
self.auth = APIGatewayAuthentication()
def request_with_headers(self, headers={}):
# Patch the token verification function. The default side effect is to check that the token
# matches self.expected_token.
self.expected_token = secrets.token_urlsafe()
def verify_token(token: str, *args, **kwargs):
if token != self.expected_token:
raise GoogleAuthError("Token did not match mock token")
return {
"iss": "https://accounts.google.com",
"azp": "api-gateway@api-meta-2555105a.iam.gserviceaccount.com",
}
verify_token_patcher = mock.patch(
"google.oauth2.id_token.verify_token", side_effect=verify_token
)
self.mock_verify_token = verify_token_patcher.start()
self.addCleanup(verify_token_patcher.stop)
def request_with_headers(self, headers={}, *, include_authorization=True):
parsed_headers = {
f'HTTP_{key.upper().replace("-", "_")}': value for key, value in headers.items()
}
if include_authorization:
parsed_headers["HTTP_AUTHORIZATION"] = f"Bearer {self.expected_token}"
return self.request_factory.get("/", **parsed_headers)
def test_bails_early_without_api_org(self):
@@ -174,3 +199,94 @@ class APIGatewayAuthTestCase(TestCase):
"client-id-uuid-mock",
),
)
def test_fails_authentication_if_no_header_present(self):
with self.assertRaisesMessage(AuthenticationFailed, "Bearer token not present"):
self.auth.authenticate(
self.request_with_headers(
{
"x-api-org-name": "test",
"x-api-developer-app-class": "public",
"x-api-oauth2-user": str(Identifier("a123", IdentifierSchemes.CRSID)),
},
include_authorization=False,
)
)
def test_fails_with_invalid_token(self):
self.mock_verify_token.side_effect = GoogleAuthError("token-failed-verification")
with self.assertRaisesMessage(AuthenticationFailed, "Invalid API Gateway token"):
self.auth.authenticate(
self.request_with_headers(
{
"x-api-org-name": "test",
"x-api-developer-app-class": "public",
"x-api-oauth2-user": str(Identifier("a123", IdentifierSchemes.CRSID)),
}
)
)
@override_settings(
API_GATEWAY_JWT_EXPECTED_AUDIENCE="https://audience.invalid/",
API_GATEWAY_JWT_ISSUER_CERTS_URL="https://issuer.invalid/certs",
)
def test_verify_token_passed_expected_audience_and_certs_url(self):
self.assertIsNotNone(
self.auth.authenticate(
self.request_with_headers(
{
"x-api-org-name": "test",
"x-api-developer-app-class": "public",
"x-api-oauth2-user": str(Identifier("a123", IdentifierSchemes.CRSID)),
}
)
)
)
self.mock_verify_token.assert_called_once()
self.assertEqual(self.mock_verify_token.call_args[0][0], self.expected_token)
self.assertEqual(
self.mock_verify_token.call_args[1]["audience"], "https://audience.invalid/"
)
self.assertEqual(
self.mock_verify_token.call_args[1]["certs_url"], "https://issuer.invalid/certs"
)
def test_bad_id_token(self):
with self.assertRaisesMessage(AuthenticationFailed, "Invalid API Gateway token"):
self.auth.authenticate(
self.request_with_headers(
{
"x-api-org-name": "test",
"x-api-developer-app-class": "public",
"x-api-oauth2-user": str(Identifier("a123", IdentifierSchemes.CRSID)),
"Authorization": f"Bearer {secrets.token_urlsafe()}",
},
include_authorization=False,
)
)
@override_settings(API_GATEWAY_JWT_TRUSTED_ISSUERS=["https://issuer.invalid/"])
def test_bad_id_token_issuer(self):
with self.assertRaisesMessage(AuthenticationFailed, "Invalid API Gateway token"):
self.auth.authenticate(
self.request_with_headers(
{
"x-api-org-name": "test",
"x-api-developer-app-class": "public",
"x-api-oauth2-user": str(Identifier("a123", IdentifierSchemes.CRSID)),
}
)
)
@override_settings(API_GATEWAY_JWT_EXPECTED_AUTHORISED_PARTIES=["gateway@gateway.invalid"])
def test_bad_azp_claim(self):
with self.assertRaisesMessage(AuthenticationFailed, "Invalid API Gateway token"):
self.auth.authenticate(
self.request_with_headers(
{
"x-api-org-name": "test",
"x-api-developer-app-class": "public",
"x-api-oauth2-user": str(Identifier("a123", IdentifierSchemes.CRSID)),
}
)
)
Loading