Skip to content

Commit

Permalink
feat: endpoint for imapd to authenticate against (#5295)
Browse files Browse the repository at this point in the history
* feat: endpoint for imapd to authenticate against

* chore: remove unintended whitespace

* fix: be stricter in matching User
  • Loading branch information
rjsparks committed Mar 14, 2023
1 parent f810eda commit 2fe4647
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 0 deletions.
15 changes: 15 additions & 0 deletions ietf/api/ietf_utils.py
@@ -0,0 +1,15 @@
# Copyright The IETF Trust 2023, All Rights Reserved

# This is not utils.py because Tastypie implicitly consumes ietf.api.utils.
# See ietf.api.__init__.py for details.

from django.conf import settings

def is_valid_token(endpoint, token):
# This is where we would consider integration with vault
# Settings implementation for now.
if hasattr(settings, "APP_API_TOKENS"):
token_store = settings.APP_API_TOKENS
if endpoint in token_store and token in token_store[endpoint]:
return True
return False
96 changes: 96 additions & 0 deletions ietf/api/tests.py
Expand Up @@ -13,6 +13,7 @@
from django.apps import apps
from django.conf import settings
from django.test import Client
from django.test.utils import override_settings
from django.urls import reverse as urlreverse
from django.utils import timezone

Expand Down Expand Up @@ -530,6 +531,101 @@ def test_api_appauth(self):
jsondata = r.json()
self.assertEqual(jsondata['success'], True)

class DirectAuthApiTests(TestCase):

def setUp(self):
super().setUp()
self.valid_token = "nSZJDerbau6WZwbEAYuQ"
self.invalid_token = self.valid_token
while self.invalid_token == self.valid_token:
self.invalid_token = User.objects.make_random_password(20)
self.url = urlreverse("ietf.api.views.directauth")
self.valid_person = PersonFactory()
self.valid_password = self.valid_person.user.username+"+password"
self.invalid_password = self.valid_password
while self.invalid_password == self.valid_password:
self.invalid_password = User.objects.make_random_password(20)

self.valid_body_with_good_password = self.post_dict(authtoken=self.valid_token, username=self.valid_person.user.username, password=self.valid_password)
self.valid_body_with_bad_password = self.post_dict(authtoken=self.valid_token, username=self.valid_person.user.username, password=self.invalid_password)
self.valid_body_with_unknown_user = self.post_dict(authtoken=self.valid_token, username="notauser@nowhere.nada", password=self.valid_password)

def post_dict(self, authtoken, username, password):
data = dict()
if authtoken is not None:
data["authtoken"] = authtoken
if username is not None:
data["username"] = username
if password is not None:
data["password"] = password
return dict(data = json.dumps(data))

def response_data(self, response):
try:
data = json.loads(response.content)
except json.decoder.JSONDecodeError:
data = None
self.assertIsNotNone(data)
return data

def test_bad_methods(self):
for method in (self.client.get, self.client.put, self.client.head, self.client.delete, self.client.patch):
r = method(self.url)
self.assertEqual(r.status_code, 405)

def test_bad_post(self):
for bad in [
self.post_dict(authtoken=None, username=self.valid_person.user.username, password=self.valid_password),
self.post_dict(authtoken=self.valid_token, username=None, password=self.valid_password),
self.post_dict(authtoken=self.valid_token, username=self.valid_person.user.username, password=None),
self.post_dict(authtoken=None, username=None, password=self.valid_password),
self.post_dict(authtoken=self.valid_token, username=None, password=None),
self.post_dict(authtoken=None, username=self.valid_person.user.username, password=None),
self.post_dict(authtoken=None, username=None, password=None),
]:
r = self.client.post(self.url, bad)
self.assertEqual(r.status_code, 200)
data = self.response_data(r)
self.assertEqual(data["result"], "failure")
self.assertEqual(data["reason"], "invalid post")

bad = dict(authtoken=self.valid_token, username=self.valid_person.user.username, password=self.valid_password)
r = self.client.post(self.url, bad)
self.assertEqual(r.status_code, 200)
data = self.response_data(r)
self.assertEqual(data["result"], "failure")
self.assertEqual(data["reason"], "invalid post")

def test_notokenstore(self):
self.assertFalse(hasattr(settings, "APP_API_TOKENS"))
r = self.client.post(self.url,self.valid_body_with_good_password)
self.assertEqual(r.status_code, 200)
data = self.response_data(r)
self.assertEqual(data["result"], "failure")
self.assertEqual(data["reason"], "invalid authtoken")

@override_settings(APP_API_TOKENS={"ietf.api.views.directauth":"nSZJDerbau6WZwbEAYuQ"})
def test_bad_username(self):
r = self.client.post(self.url, self.valid_body_with_unknown_user)
self.assertEqual(r.status_code, 200)
data = self.response_data(r)
self.assertEqual(data["result"], "failure")
self.assertEqual(data["reason"], "authentication failed")

@override_settings(APP_API_TOKENS={"ietf.api.views.directauth":"nSZJDerbau6WZwbEAYuQ"})
def test_bad_password(self):
r = self.client.post(self.url, self.valid_body_with_bad_password)
self.assertEqual(r.status_code, 200)
data = self.response_data(r)
self.assertEqual(data["result"], "failure")
self.assertEqual(data["reason"], "authentication failed")

@override_settings(APP_API_TOKENS={"ietf.api.views.directauth":"nSZJDerbau6WZwbEAYuQ"})
def test_good_password(self):
r = self.client.post(self.url, self.valid_body_with_good_password)
self.assertEqual(r.status_code, 200)
data = self.response_data(r)
self.assertEqual(data["result"], "success")

class TastypieApiTestCase(ResourceTestCaseMixin, TestCase):
def __init__(self, *args, **kwargs):
Expand Down
2 changes: 2 additions & 0 deletions ietf/api/urls.py
Expand Up @@ -58,6 +58,8 @@
# latest versions
url(r'^rfcdiff-latest-json/%(name)s(?:-%(rev)s)?(\.txt|\.html)?/?$' % settings.URL_REGEXPS, api_views.rfcdiff_latest_json),
url(r'^rfcdiff-latest-json/(?P<name>[Rr][Ff][Cc] [0-9]+?)(\.txt|\.html)?/?$', api_views.rfcdiff_latest_json),
# direct authentication
url(r'^directauth/?$', api_views.directauth),
]

# Additional (standard) Tastypie endpoints
Expand Down
41 changes: 41 additions & 0 deletions ietf/api/views.py
Expand Up @@ -9,6 +9,7 @@
from jwcrypto.jwk import JWK

from django.conf import settings
from django.contrib.auth import authenticate
from django.contrib.auth.decorators import login_required
from django.contrib.auth.models import User
from django.core.exceptions import ValidationError
Expand All @@ -32,6 +33,7 @@
from ietf.person.models import Person, Email
from ietf.api import _api_list
from ietf.api.serializer import JsonExportMixin
from ietf.api.ietf_utils import is_valid_token
from ietf.doc.utils import fuzzy_find_documents
from ietf.ietfauth.views import send_account_creation_email
from ietf.ietfauth.utils import role_required
Expand Down Expand Up @@ -388,3 +390,42 @@ def rfcdiff_latest_json(request, name, rev=None):
if not response:
raise Http404
return HttpResponse(json.dumps(response), content_type='application/json')

@csrf_exempt
def directauth(request):
if request.method == "POST":
raw_data = request.POST.get("data", None)
if raw_data:
try:
data = json.loads(raw_data)
except json.decoder.JSONDecodeError:
data = None

if raw_data is None or data is None:
return HttpResponse(json.dumps(dict(result="failure",reason="invalid post")), content_type='application/json')

authtoken = data.get('authtoken', None)
username = data.get('username', None)
password = data.get('password', None)

if any([item is None for item in (authtoken, username, password)]):
return HttpResponse(json.dumps(dict(result="failure",reason="invalid post")), content_type='application/json')

if not is_valid_token("ietf.api.views.directauth", authtoken):
return HttpResponse(json.dumps(dict(result="failure",reason="invalid authtoken")), content_type='application/json')

user_query = User.objects.filter(username__iexact=username)

# Matching email would be consistent with auth everywhere else in the app, but until we can map users well
# in the imap server, people's annotations are associated with a very specific login.
# If we get a second user of this API, add an "allow_any_email" argument.


# Note well that we are using user.username, not what was passed to the API.
if user_query.count() == 1 and authenticate(username = user_query.first().username, password = password):
return HttpResponse(json.dumps(dict(result="success")), content_type='application/json')

return HttpResponse(json.dumps(dict(result="failure", reason="authentication failed")), content_type='application/json')

else:
return HttpResponse(status=405)

0 comments on commit 2fe4647

Please sign in to comment.