mirror of
https://github.com/graphql-python/graphene-django.git
synced 2025-02-22 06:20:33 +03:00
Added authorization support for DjangoFilterConnectionField
This commit is contained in:
parent
6f138c8a7e
commit
4086525928
37
graphene_django/auth/fields.py
Normal file
37
graphene_django/auth/fields.py
Normal file
|
@ -0,0 +1,37 @@
|
|||
from django.core.exceptions import PermissionDenied
|
||||
from graphene_django.filter import DjangoFilterConnectionField
|
||||
from graphene_django.fields import DjangoConnectionField
|
||||
|
||||
|
||||
class AuthDjangoFilterConnectionField(DjangoFilterConnectionField):
|
||||
_permission = ''
|
||||
|
||||
@classmethod
|
||||
def has_perm(cls, context):
|
||||
if context is None:
|
||||
return False
|
||||
if type(context) is dict:
|
||||
user = context.get('user', None)
|
||||
if user is None:
|
||||
return False
|
||||
else:
|
||||
user = context.user
|
||||
if user.is_authenticated() is False:
|
||||
return False
|
||||
|
||||
if type(cls._permission) is tuple:
|
||||
for permission in cls._permission:
|
||||
if not user.has_perm(permission):
|
||||
return False
|
||||
if type(cls._permission) is str:
|
||||
if not user.has_perm(cls._permission):
|
||||
return False
|
||||
return True
|
||||
|
||||
def connection_resolver(self, resolver, connection, default_manager, filterset_class, filtering_args,
|
||||
root, args, context, info):
|
||||
if self.has_perm(context) is not True:
|
||||
return DjangoConnectionField.connection_resolver(resolver, connection, [PermissionDenied('Permission Denied'), ], root, args, context, info)
|
||||
return super(AuthDjangoFilterConnectionField, self).connection_resolver(
|
||||
resolver, connection, default_manager, filterset_class, filtering_args,
|
||||
root, args, context, info)
|
|
@ -9,15 +9,15 @@ class AuthNodeMixin():
|
|||
|
||||
def has_perm(object_instance):
|
||||
if context is None:
|
||||
return PermissionDenied('Permission Denied')
|
||||
return False
|
||||
if type(context) is dict:
|
||||
user = context.get('user', None)
|
||||
if user is None:
|
||||
return PermissionDenied('Permission Denied')
|
||||
return False
|
||||
else:
|
||||
user = context.user
|
||||
if user.is_authenticated() is False:
|
||||
return PermissionDenied('Permission Denied')
|
||||
return False
|
||||
|
||||
if type(cls._permission) is tuple:
|
||||
for permission in cls._permission:
|
||||
|
|
|
@ -3,6 +3,8 @@ from graphene import Schema, relay, ObjectType
|
|||
from django.test import TestCase, RequestFactory
|
||||
from graphene_django import DjangoObjectType
|
||||
from graphene_django.auth.mixins import AuthNodeMixin, AuthMutationMixin
|
||||
from graphene_django.auth.fields import AuthDjangoFilterConnectionField
|
||||
from django.core.exceptions import PermissionDenied
|
||||
from .models import Pet
|
||||
|
||||
|
||||
|
@ -42,8 +44,13 @@ class CreatePet(AuthMutationMixin, graphene.Mutation):
|
|||
return CreatePet(pet=pet)
|
||||
|
||||
|
||||
class PetFilterConnection(AuthDjangoFilterConnectionField):
|
||||
_permission = 'app.create_pet'
|
||||
|
||||
|
||||
class QueryRoot(ObjectType):
|
||||
pet = relay.Node.Field(PetNode)
|
||||
pets = PetFilterConnection(PetNode)
|
||||
|
||||
|
||||
class MutationRoot(ObjectType):
|
||||
|
@ -97,6 +104,18 @@ class AuthorizationTests(TestCase):
|
|||
}
|
||||
}
|
||||
'''
|
||||
cls.query_filter = '''
|
||||
query {
|
||||
pets{
|
||||
edges{
|
||||
node{
|
||||
id
|
||||
name
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
'''
|
||||
|
||||
def setUp(self):
|
||||
self.factory = RequestFactory()
|
||||
|
@ -167,3 +186,22 @@ class AuthorizationTests(TestCase):
|
|||
"""
|
||||
result = self.schema.execute(self.query_node, context_value={'user': self.luke})
|
||||
self.assertEqual(result.errors, [])
|
||||
|
||||
def test_filter_has_permission(self):
|
||||
"""
|
||||
Making query with an user who has the permission
|
||||
"""
|
||||
result = self.schema.execute(self.query_filter, context_value={'user': self.luke})
|
||||
print(result.data)
|
||||
print(result.errors)
|
||||
self.assertEqual(result.errors, [])
|
||||
|
||||
def test_filter_non_permission(self):
|
||||
"""
|
||||
Making query with an user who has the permission
|
||||
"""
|
||||
result = self.schema.execute(self.query_filter, context_value={'user': self.storm_tropper})
|
||||
print(result.data)
|
||||
print(result.errors)
|
||||
self.assertNotEqual(result.errors, [])
|
||||
self.assertEqual(result.errors[0].message, 'Permission Denied')
|
Loading…
Reference in New Issue
Block a user