From 2fe4647832a3ffa55d195cb5c9ff7832f76cc29f Mon Sep 17 00:00:00 2001 From: Robert Sparks Date: Tue, 14 Mar 2023 12:19:21 -0500 Subject: [PATCH] feat: endpoint for imapd to authenticate against (#5295) * feat: endpoint for imapd to authenticate against * chore: remove unintended whitespace * fix: be stricter in matching User --- ietf/api/ietf_utils.py | 15 +++++++ ietf/api/tests.py | 96 ++++++++++++++++++++++++++++++++++++++++++ ietf/api/urls.py | 2 + ietf/api/views.py | 41 ++++++++++++++++++ 4 files changed, 154 insertions(+) create mode 100644 ietf/api/ietf_utils.py diff --git a/ietf/api/ietf_utils.py b/ietf/api/ietf_utils.py new file mode 100644 index 0000000000..06b9d76aff --- /dev/null +++ b/ietf/api/ietf_utils.py @@ -0,0 +1,15 @@ +# Copyright The IETF Trust 2023, All Rights Reserved + +# This is not utils.py because Tastypie implicitly consumes ietf.api.utils. +# See ietf.api.__init__.py for details. + +from django.conf import settings + +def is_valid_token(endpoint, token): + # This is where we would consider integration with vault + # Settings implementation for now. + if hasattr(settings, "APP_API_TOKENS"): + token_store = settings.APP_API_TOKENS + if endpoint in token_store and token in token_store[endpoint]: + return True + return False diff --git a/ietf/api/tests.py b/ietf/api/tests.py index e11e7eae61..a488199502 100644 --- a/ietf/api/tests.py +++ b/ietf/api/tests.py @@ -13,6 +13,7 @@ from django.apps import apps from django.conf import settings from django.test import Client +from django.test.utils import override_settings from django.urls import reverse as urlreverse from django.utils import timezone @@ -530,6 +531,101 @@ def test_api_appauth(self): jsondata = r.json() self.assertEqual(jsondata['success'], True) +class DirectAuthApiTests(TestCase): + + def setUp(self): + super().setUp() + self.valid_token = "nSZJDerbau6WZwbEAYuQ" + self.invalid_token = self.valid_token + while self.invalid_token == self.valid_token: + self.invalid_token = User.objects.make_random_password(20) + self.url = urlreverse("ietf.api.views.directauth") + self.valid_person = PersonFactory() + self.valid_password = self.valid_person.user.username+"+password" + self.invalid_password = self.valid_password + while self.invalid_password == self.valid_password: + self.invalid_password = User.objects.make_random_password(20) + + self.valid_body_with_good_password = self.post_dict(authtoken=self.valid_token, username=self.valid_person.user.username, password=self.valid_password) + self.valid_body_with_bad_password = self.post_dict(authtoken=self.valid_token, username=self.valid_person.user.username, password=self.invalid_password) + self.valid_body_with_unknown_user = self.post_dict(authtoken=self.valid_token, username="notauser@nowhere.nada", password=self.valid_password) + + def post_dict(self, authtoken, username, password): + data = dict() + if authtoken is not None: + data["authtoken"] = authtoken + if username is not None: + data["username"] = username + if password is not None: + data["password"] = password + return dict(data = json.dumps(data)) + + def response_data(self, response): + try: + data = json.loads(response.content) + except json.decoder.JSONDecodeError: + data = None + self.assertIsNotNone(data) + return data + + def test_bad_methods(self): + for method in (self.client.get, self.client.put, self.client.head, self.client.delete, self.client.patch): + r = method(self.url) + self.assertEqual(r.status_code, 405) + + def test_bad_post(self): + for bad in [ + self.post_dict(authtoken=None, username=self.valid_person.user.username, password=self.valid_password), + self.post_dict(authtoken=self.valid_token, username=None, password=self.valid_password), + self.post_dict(authtoken=self.valid_token, username=self.valid_person.user.username, password=None), + self.post_dict(authtoken=None, username=None, password=self.valid_password), + self.post_dict(authtoken=self.valid_token, username=None, password=None), + self.post_dict(authtoken=None, username=self.valid_person.user.username, password=None), + self.post_dict(authtoken=None, username=None, password=None), + ]: + r = self.client.post(self.url, bad) + self.assertEqual(r.status_code, 200) + data = self.response_data(r) + self.assertEqual(data["result"], "failure") + self.assertEqual(data["reason"], "invalid post") + + bad = dict(authtoken=self.valid_token, username=self.valid_person.user.username, password=self.valid_password) + r = self.client.post(self.url, bad) + self.assertEqual(r.status_code, 200) + data = self.response_data(r) + self.assertEqual(data["result"], "failure") + self.assertEqual(data["reason"], "invalid post") + + def test_notokenstore(self): + self.assertFalse(hasattr(settings, "APP_API_TOKENS")) + r = self.client.post(self.url,self.valid_body_with_good_password) + self.assertEqual(r.status_code, 200) + data = self.response_data(r) + self.assertEqual(data["result"], "failure") + self.assertEqual(data["reason"], "invalid authtoken") + + @override_settings(APP_API_TOKENS={"ietf.api.views.directauth":"nSZJDerbau6WZwbEAYuQ"}) + def test_bad_username(self): + r = self.client.post(self.url, self.valid_body_with_unknown_user) + self.assertEqual(r.status_code, 200) + data = self.response_data(r) + self.assertEqual(data["result"], "failure") + self.assertEqual(data["reason"], "authentication failed") + + @override_settings(APP_API_TOKENS={"ietf.api.views.directauth":"nSZJDerbau6WZwbEAYuQ"}) + def test_bad_password(self): + r = self.client.post(self.url, self.valid_body_with_bad_password) + self.assertEqual(r.status_code, 200) + data = self.response_data(r) + self.assertEqual(data["result"], "failure") + self.assertEqual(data["reason"], "authentication failed") + + @override_settings(APP_API_TOKENS={"ietf.api.views.directauth":"nSZJDerbau6WZwbEAYuQ"}) + def test_good_password(self): + r = self.client.post(self.url, self.valid_body_with_good_password) + self.assertEqual(r.status_code, 200) + data = self.response_data(r) + self.assertEqual(data["result"], "success") class TastypieApiTestCase(ResourceTestCaseMixin, TestCase): def __init__(self, *args, **kwargs): diff --git a/ietf/api/urls.py b/ietf/api/urls.py index 53b69c05e7..5185b9f888 100644 --- a/ietf/api/urls.py +++ b/ietf/api/urls.py @@ -58,6 +58,8 @@ # latest versions url(r'^rfcdiff-latest-json/%(name)s(?:-%(rev)s)?(\.txt|\.html)?/?$' % settings.URL_REGEXPS, api_views.rfcdiff_latest_json), url(r'^rfcdiff-latest-json/(?P[Rr][Ff][Cc] [0-9]+?)(\.txt|\.html)?/?$', api_views.rfcdiff_latest_json), + # direct authentication + url(r'^directauth/?$', api_views.directauth), ] # Additional (standard) Tastypie endpoints diff --git a/ietf/api/views.py b/ietf/api/views.py index 3a2d4a31d5..0ea2eb79b8 100644 --- a/ietf/api/views.py +++ b/ietf/api/views.py @@ -9,6 +9,7 @@ from jwcrypto.jwk import JWK from django.conf import settings +from django.contrib.auth import authenticate from django.contrib.auth.decorators import login_required from django.contrib.auth.models import User from django.core.exceptions import ValidationError @@ -32,6 +33,7 @@ from ietf.person.models import Person, Email from ietf.api import _api_list from ietf.api.serializer import JsonExportMixin +from ietf.api.ietf_utils import is_valid_token from ietf.doc.utils import fuzzy_find_documents from ietf.ietfauth.views import send_account_creation_email from ietf.ietfauth.utils import role_required @@ -388,3 +390,42 @@ def rfcdiff_latest_json(request, name, rev=None): if not response: raise Http404 return HttpResponse(json.dumps(response), content_type='application/json') + +@csrf_exempt +def directauth(request): + if request.method == "POST": + raw_data = request.POST.get("data", None) + if raw_data: + try: + data = json.loads(raw_data) + except json.decoder.JSONDecodeError: + data = None + + if raw_data is None or data is None: + return HttpResponse(json.dumps(dict(result="failure",reason="invalid post")), content_type='application/json') + + authtoken = data.get('authtoken', None) + username = data.get('username', None) + password = data.get('password', None) + + if any([item is None for item in (authtoken, username, password)]): + return HttpResponse(json.dumps(dict(result="failure",reason="invalid post")), content_type='application/json') + + if not is_valid_token("ietf.api.views.directauth", authtoken): + return HttpResponse(json.dumps(dict(result="failure",reason="invalid authtoken")), content_type='application/json') + + user_query = User.objects.filter(username__iexact=username) + + # Matching email would be consistent with auth everywhere else in the app, but until we can map users well + # in the imap server, people's annotations are associated with a very specific login. + # If we get a second user of this API, add an "allow_any_email" argument. + + + # Note well that we are using user.username, not what was passed to the API. + if user_query.count() == 1 and authenticate(username = user_query.first().username, password = password): + return HttpResponse(json.dumps(dict(result="success")), content_type='application/json') + + return HttpResponse(json.dumps(dict(result="failure", reason="authentication failed")), content_type='application/json') + + else: + return HttpResponse(status=405)