From 4086525928993c7b39df5fc97dad3e63791d6928 Mon Sep 17 00:00:00 2001 From: Carlos Martinez Date: Mon, 6 Mar 2017 22:11:49 -0500 Subject: [PATCH] Added authorization support for DjangoFilterConnectionField --- graphene_django/auth/fields.py | 37 ++++++++++++++++++ graphene_django/auth/mixins.py | 6 +-- .../{test_auth_mixins.py => test_auth.py} | 38 +++++++++++++++++++ 3 files changed, 78 insertions(+), 3 deletions(-) create mode 100644 graphene_django/auth/fields.py rename graphene_django/tests/{test_auth_mixins.py => test_auth.py} (80%) diff --git a/graphene_django/auth/fields.py b/graphene_django/auth/fields.py new file mode 100644 index 0000000..a5867b9 --- /dev/null +++ b/graphene_django/auth/fields.py @@ -0,0 +1,37 @@ +from django.core.exceptions import PermissionDenied +from graphene_django.filter import DjangoFilterConnectionField +from graphene_django.fields import DjangoConnectionField + + +class AuthDjangoFilterConnectionField(DjangoFilterConnectionField): + _permission = '' + + @classmethod + def has_perm(cls, context): + if context is None: + return False + if type(context) is dict: + user = context.get('user', None) + if user is None: + return False + else: + user = context.user + if user.is_authenticated() is False: + return False + + if type(cls._permission) is tuple: + for permission in cls._permission: + if not user.has_perm(permission): + return False + if type(cls._permission) is str: + if not user.has_perm(cls._permission): + return False + return True + + def connection_resolver(self, resolver, connection, default_manager, filterset_class, filtering_args, + root, args, context, info): + if self.has_perm(context) is not True: + return DjangoConnectionField.connection_resolver(resolver, connection, [PermissionDenied('Permission Denied'), ], root, args, context, info) + return super(AuthDjangoFilterConnectionField, self).connection_resolver( + resolver, connection, default_manager, filterset_class, filtering_args, + root, args, context, info) diff --git a/graphene_django/auth/mixins.py b/graphene_django/auth/mixins.py index 6371835..7833116 100644 --- a/graphene_django/auth/mixins.py +++ b/graphene_django/auth/mixins.py @@ -9,15 +9,15 @@ class AuthNodeMixin(): def has_perm(object_instance): if context is None: - return PermissionDenied('Permission Denied') + return False if type(context) is dict: user = context.get('user', None) if user is None: - return PermissionDenied('Permission Denied') + return False else: user = context.user if user.is_authenticated() is False: - return PermissionDenied('Permission Denied') + return False if type(cls._permission) is tuple: for permission in cls._permission: diff --git a/graphene_django/tests/test_auth_mixins.py b/graphene_django/tests/test_auth.py similarity index 80% rename from graphene_django/tests/test_auth_mixins.py rename to graphene_django/tests/test_auth.py index fb05449..a3678eb 100644 --- a/graphene_django/tests/test_auth_mixins.py +++ b/graphene_django/tests/test_auth.py @@ -3,6 +3,8 @@ from graphene import Schema, relay, ObjectType from django.test import TestCase, RequestFactory from graphene_django import DjangoObjectType from graphene_django.auth.mixins import AuthNodeMixin, AuthMutationMixin +from graphene_django.auth.fields import AuthDjangoFilterConnectionField +from django.core.exceptions import PermissionDenied from .models import Pet @@ -42,8 +44,13 @@ class CreatePet(AuthMutationMixin, graphene.Mutation): return CreatePet(pet=pet) +class PetFilterConnection(AuthDjangoFilterConnectionField): + _permission = 'app.create_pet' + + class QueryRoot(ObjectType): pet = relay.Node.Field(PetNode) + pets = PetFilterConnection(PetNode) class MutationRoot(ObjectType): @@ -97,6 +104,18 @@ class AuthorizationTests(TestCase): } } ''' + cls.query_filter = ''' + query { + pets{ + edges{ + node{ + id + name + } + } + } + } + ''' def setUp(self): self.factory = RequestFactory() @@ -167,3 +186,22 @@ class AuthorizationTests(TestCase): """ result = self.schema.execute(self.query_node, context_value={'user': self.luke}) self.assertEqual(result.errors, []) + + def test_filter_has_permission(self): + """ + Making query with an user who has the permission + """ + result = self.schema.execute(self.query_filter, context_value={'user': self.luke}) + print(result.data) + print(result.errors) + self.assertEqual(result.errors, []) + + def test_filter_non_permission(self): + """ + Making query with an user who has the permission + """ + result = self.schema.execute(self.query_filter, context_value={'user': self.storm_tropper}) + print(result.data) + print(result.errors) + self.assertNotEqual(result.errors, []) + self.assertEqual(result.errors[0].message, 'Permission Denied')