Use 1.4's CSRFMiddleware, so that PUT and DELETE get CSRF validation if session authentication is being used

This commit is contained in:
Tom Christie 2011-12-14 20:10:06 +00:00
parent 050f07e7b1
commit d53f7f45b4
7 changed files with 229 additions and 12 deletions

View File

@ -8,7 +8,7 @@ The set of authentication methods which are used is then specified by setting th
"""
from django.contrib.auth import authenticate
from django.middleware.csrf import CsrfViewMiddleware
from djangorestframework.compat import CsrfViewMiddleware
from djangorestframework.utils import as_tuple
import base64

View File

@ -1,24 +1,25 @@
"""
The :mod:`compat` module provides support for backwards compatibility with older versions of django/python.
"""
import django
# cStringIO only if it's available
# cStringIO only if it's available, otherwise StringIO
try:
import cStringIO as StringIO
except ImportError:
import StringIO
# parse_qs
# parse_qs from 'urlparse' module unless python 2.5, in which case from 'cgi'
try:
# python >= ?
# python >= 2.6
from urlparse import parse_qs
except ImportError:
# python <= ?
# python < 2.6
from cgi import parse_qs
# django.test.client.RequestFactory (Django >= 1.3)
# django.test.client.RequestFactory (Required for Django < 1.3)
try:
from django.test.client import RequestFactory
except ImportError:
@ -156,6 +157,220 @@ except ImportError:
def head(self, request, *args, **kwargs):
return self.get(request, *args, **kwargs)
# PUT, DELETE do not require CSRF until 1.4. They should. Make it better.
if django.VERSION >= (1, 4):
from django.middleware.csrf import CsrfViewMiddleware
else:
import hashlib
import re
import random
import logging
import urlparse
from django.conf import settings
from django.core.urlresolvers import get_callable
try:
from logging import NullHandler
except ImportError:
class NullHandler(logging.Handler):
def emit(self, record):
pass
logger = logging.getLogger('django.request')
if not logger.handlers:
logger.addHandler(NullHandler())
def same_origin(url1, url2):
"""
Checks if two URLs are 'same-origin'
"""
p1, p2 = urlparse.urlparse(url1), urlparse.urlparse(url2)
return p1[0:2] == p2[0:2]
def constant_time_compare(val1, val2):
"""
Returns True if the two strings are equal, False otherwise.
The time taken is independent of the number of characters that match.
"""
if len(val1) != len(val2):
return False
result = 0
for x, y in zip(val1, val2):
result |= ord(x) ^ ord(y)
return result == 0
# Use the system (hardware-based) random number generator if it exists.
if hasattr(random, 'SystemRandom'):
randrange = random.SystemRandom().randrange
else:
randrange = random.randrange
_MAX_CSRF_KEY = 18446744073709551616L # 2 << 63
REASON_NO_REFERER = "Referer checking failed - no Referer."
REASON_BAD_REFERER = "Referer checking failed - %s does not match %s."
REASON_NO_CSRF_COOKIE = "CSRF cookie not set."
REASON_BAD_TOKEN = "CSRF token missing or incorrect."
def _get_failure_view():
"""
Returns the view to be used for CSRF rejections
"""
return get_callable(settings.CSRF_FAILURE_VIEW)
def _get_new_csrf_key():
return hashlib.md5("%s%s" % (randrange(0, _MAX_CSRF_KEY), settings.SECRET_KEY)).hexdigest()
def get_token(request):
"""
Returns the the CSRF token required for a POST form. The token is an
alphanumeric value.
A side effect of calling this function is to make the the csrf_protect
decorator and the CsrfViewMiddleware add a CSRF cookie and a 'Vary: Cookie'
header to the outgoing response. For this reason, you may need to use this
function lazily, as is done by the csrf context processor.
"""
request.META["CSRF_COOKIE_USED"] = True
return request.META.get("CSRF_COOKIE", None)
def _sanitize_token(token):
# Allow only alphanum, and ensure we return a 'str' for the sake of the post
# processing middleware.
token = re.sub('[^a-zA-Z0-9]', '', str(token.decode('ascii', 'ignore')))
if token == "":
# In case the cookie has been truncated to nothing at some point.
return _get_new_csrf_key()
else:
return token
class CsrfViewMiddleware(object):
"""
Middleware that requires a present and correct csrfmiddlewaretoken
for POST requests that have a CSRF cookie, and sets an outgoing
CSRF cookie.
This middleware should be used in conjunction with the csrf_token template
tag.
"""
# The _accept and _reject methods currently only exist for the sake of the
# requires_csrf_token decorator.
def _accept(self, request):
# Avoid checking the request twice by adding a custom attribute to
# request. This will be relevant when both decorator and middleware
# are used.
request.csrf_processing_done = True
return None
def _reject(self, request, reason):
return _get_failure_view()(request, reason=reason)
def process_view(self, request, callback, callback_args, callback_kwargs):
if getattr(request, 'csrf_processing_done', False):
return None
try:
csrf_token = _sanitize_token(request.COOKIES[settings.CSRF_COOKIE_NAME])
# Use same token next time
request.META['CSRF_COOKIE'] = csrf_token
except KeyError:
csrf_token = None
# Generate token and store it in the request, so it's available to the view.
request.META["CSRF_COOKIE"] = _get_new_csrf_key()
# Wait until request.META["CSRF_COOKIE"] has been manipulated before
# bailing out, so that get_token still works
if getattr(callback, 'csrf_exempt', False):
return None
# Assume that anything not defined as 'safe' by RC2616 needs protection.
if request.method not in ('GET', 'HEAD', 'OPTIONS', 'TRACE'):
if getattr(request, '_dont_enforce_csrf_checks', False):
# Mechanism to turn off CSRF checks for test suite. It comes after
# the creation of CSRF cookies, so that everything else continues to
# work exactly the same (e.g. cookies are sent etc), but before the
# any branches that call reject()
return self._accept(request)
if request.is_secure():
# Suppose user visits http://example.com/
# An active network attacker,(man-in-the-middle, MITM) sends a
# POST form which targets https://example.com/detonate-bomb/ and
# submits it via javascript.
#
# The attacker will need to provide a CSRF cookie and token, but
# that is no problem for a MITM and the session independent
# nonce we are using. So the MITM can circumvent the CSRF
# protection. This is true for any HTTP connection, but anyone
# using HTTPS expects better! For this reason, for
# https://example.com/ we need additional protection that treats
# http://example.com/ as completely untrusted. Under HTTPS,
# Barth et al. found that the Referer header is missing for
# same-domain requests in only about 0.2% of cases or less, so
# we can use strict Referer checking.
referer = request.META.get('HTTP_REFERER')
if referer is None:
logger.warning('Forbidden (%s): %s' % (REASON_NO_REFERER, request.path),
extra={
'status_code': 403,
'request': request,
}
)
return self._reject(request, REASON_NO_REFERER)
# Note that request.get_host() includes the port
good_referer = 'https://%s/' % request.get_host()
if not same_origin(referer, good_referer):
reason = REASON_BAD_REFERER % (referer, good_referer)
logger.warning('Forbidden (%s): %s' % (reason, request.path),
extra={
'status_code': 403,
'request': request,
}
)
return self._reject(request, reason)
if csrf_token is None:
# No CSRF cookie. For POST requests, we insist on a CSRF cookie,
# and in this way we can avoid all CSRF attacks, including login
# CSRF.
logger.warning('Forbidden (%s): %s' % (REASON_NO_CSRF_COOKIE, request.path),
extra={
'status_code': 403,
'request': request,
}
)
return self._reject(request, REASON_NO_CSRF_COOKIE)
# check non-cookie token for match
request_csrf_token = ""
if request.method == "POST":
request_csrf_token = request.POST.get('csrfmiddlewaretoken', '')
if request_csrf_token == "":
# Fall back to X-CSRFToken, to make things easier for AJAX,
# and possible for PUT/DELETE
request_csrf_token = request.META.get('HTTP_X_CSRFTOKEN', '')
if not constant_time_compare(request_csrf_token, csrf_token):
logger.warning('Forbidden (%s): %s' % (REASON_BAD_TOKEN, request.path),
extra={
'status_code': 403,
'request': request,
}
)
return self._reject(request, REASON_BAD_TOKEN)
return self._accept(request)
# Markdown is optional
try:
import markdown

View File

@ -95,7 +95,6 @@ INSTALLED_APPS = (
# Uncomment the next line to enable admin documentation:
# 'django.contrib.admindocs',
'djangorestframework',
'djangorestframework.tests',
)
# OAuth support is optional, so we only test oauth if it's installed.

View File

@ -8,13 +8,15 @@ from djangorestframework.mixins import CreateModelMixin, PaginatorMixin
from djangorestframework.resources import ModelResource
from djangorestframework.response import Response
from djangorestframework.tests.models import CustomUser
from djangorestframework.tests.testcases import TestModelsTestCase
from djangorestframework.views import View
class TestModelCreation(TestCase):
class TestModelCreation(TestModelsTestCase):
"""Tests on CreateModelMixin"""
def setUp(self):
super(TestModelsTestCase, self).setUp()
self.req = RequestFactory()
def test_creation(self):

View File

@ -25,4 +25,4 @@ class UserGroupMap(models.Model):
def get_absolute_url(self):
return ('user_group_map', (), {
'pk': self.id
})
})

View File

@ -5,6 +5,7 @@ from django.contrib.auth.models import Group, User
from djangorestframework.resources import ModelResource
from djangorestframework.views import ListOrCreateModelView, InstanceModelView
from djangorestframework.tests.models import CustomUser
from djangorestframework.tests.testcases import TestModelsTestCase
class GroupResource(ModelResource):
model = Group
@ -31,7 +32,7 @@ urlpatterns = patterns('',
)
class ModelViewTests(TestCase):
class ModelViewTests(TestModelsTestCase):
"""Test the model views djangorestframework provides"""
urls = 'djangorestframework.tests.modelviews'

View File

@ -56,8 +56,8 @@ class TestFieldNesting(TestCase):
self.serialize = self.serializer.serialize
class M1(models.Model):
field1 = models.CharField()
field2 = models.CharField()
field1 = models.CharField(max_length=256)
field2 = models.CharField(max_length=256)
class M2(models.Model):
field = models.OneToOneField(M1)