Initial version debug schema in django

This commit is contained in:
Syrus Akbary 2015-12-03 22:15:09 -08:00
parent 0fa7f5c9aa
commit 14439155ee
8 changed files with 333 additions and 0 deletions

View File

@ -0,0 +1,4 @@
from .schema import DebugSchema
from .types import DjangoDebug
__all__ = ['DebugSchema', 'DjangoDebug']

View File

@ -0,0 +1,68 @@
from django.db import connections
from ....core.schema import Schema
from ....core.types import Field
from .sql.tracking import unwrap_cursor, wrap_cursor
from .sql.types import DjangoDebugSQL
from .types import DjangoDebug
class WrappedRoot(object):
def __init__(self, root):
self._recorded = []
self._root = root
def record(self, **log):
self._recorded.append(DjangoDebugSQL(**log))
def debug(self):
return DjangoDebug(sql=self._recorded)
class WrapRoot(object):
@property
def _root(self):
return self._wrapped_root.root
@_root.setter
def _root(self, value):
self._wrapped_root = value
def resolve_debug(self, args, info):
return self._wrapped_root.debug()
def debug_objecttype(objecttype):
return type('Debug{}'.format(objecttype._meta.type_name), (WrapRoot, objecttype), {'debug': Field(DjangoDebug, name='__debug')})
class DebugSchema(Schema):
@property
def query(self):
if not self._query:
return
return debug_objecttype(self._query)
@query.setter
def query(self, value):
self._query = value
def enable_instrumentation(self, wrapped_root):
# This is thread-safe because database connections are thread-local.
for connection in connections.all():
wrap_cursor(connection, wrapped_root)
def disable_instrumentation(self):
for connection in connections.all():
unwrap_cursor(connection)
def execute(self, *args, **kwargs):
root = kwargs.pop('root', object())
wrapped_root = WrappedRoot(root=root)
self.enable_instrumentation(wrapped_root)
result = super(DebugSchema, self).execute(root=wrapped_root, *args, **kwargs)
self.disable_instrumentation()
return result

View File

@ -0,0 +1,165 @@
# Code obtained from django-debug-toolbar sql panel tracking
from __future__ import absolute_import, unicode_literals
import json
from threading import local
from time import time
from django.utils import six
from django.utils.encoding import force_text
class SQLQueryTriggered(Exception):
"""Thrown when template panel triggers a query"""
class ThreadLocalState(local):
def __init__(self):
self.enabled = True
@property
def Wrapper(self):
if self.enabled:
return NormalCursorWrapper
return ExceptionCursorWrapper
def recording(self, v):
self.enabled = v
state = ThreadLocalState()
recording = state.recording # export function
def wrap_cursor(connection, panel):
if not hasattr(connection, '_djdt_cursor'):
connection._djdt_cursor = connection.cursor
def cursor():
return state.Wrapper(connection._djdt_cursor(), connection, panel)
connection.cursor = cursor
return cursor
def unwrap_cursor(connection):
if hasattr(connection, '_djdt_cursor'):
del connection._djdt_cursor
del connection.cursor
class ExceptionCursorWrapper(object):
"""
Wraps a cursor and raises an exception on any operation.
Used in Templates panel.
"""
def __init__(self, cursor, db, logger):
pass
def __getattr__(self, attr):
raise SQLQueryTriggered()
class NormalCursorWrapper(object):
"""
Wraps a cursor and logs queries.
"""
def __init__(self, cursor, db, logger):
self.cursor = cursor
# Instance of a BaseDatabaseWrapper subclass
self.db = db
# logger must implement a ``record`` method
self.logger = logger
def _quote_expr(self, element):
if isinstance(element, six.string_types):
return "'%s'" % force_text(element).replace("'", "''")
else:
return repr(element)
def _quote_params(self, params):
if not params:
return params
if isinstance(params, dict):
return dict((key, self._quote_expr(value))
for key, value in params.items())
return list(map(self._quote_expr, params))
def _decode(self, param):
try:
return force_text(param, strings_only=True)
except UnicodeDecodeError:
return '(encoded string)'
def _record(self, method, sql, params):
start_time = time()
try:
return method(sql, params)
finally:
stop_time = time()
duration = (stop_time - start_time) * 1000
_params = ''
try:
_params = json.dumps(list(map(self._decode, params)))
except Exception:
pass # object not JSON serializable
alias = getattr(self.db, 'alias', 'default')
conn = self.db.connection
vendor = getattr(conn, 'vendor', 'unknown')
params = {
'vendor': vendor,
'alias': alias,
'sql': self.db.ops.last_executed_query(
self.cursor, sql, self._quote_params(params)),
'duration': duration,
'raw_sql': sql,
'params': _params,
'start_time': start_time,
'stop_time': stop_time,
'is_slow': duration > 10,
'is_select': sql.lower().strip().startswith('select'),
}
if vendor == 'postgresql':
# If an erroneous query was ran on the connection, it might
# be in a state where checking isolation_level raises an
# exception.
try:
iso_level = conn.isolation_level
except conn.InternalError:
iso_level = 'unknown'
params.update({
'trans_id': self.logger.get_transaction_id(alias),
'trans_status': conn.get_transaction_status(),
'iso_level': iso_level,
'encoding': conn.encoding,
})
# We keep `sql` to maintain backwards compatibility
self.logger.record(**params)
def callproc(self, procname, params=()):
return self._record(self.cursor.callproc, procname, params)
def execute(self, sql, params=()):
return self._record(self.cursor.execute, sql, params)
def executemany(self, sql, param_list):
return self._record(self.cursor.executemany, sql, param_list)
def __getattr__(self, attr):
return getattr(self.cursor, attr)
def __iter__(self):
return iter(self.cursor)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()

View File

@ -0,0 +1,19 @@
from .....core import Float, ObjectType, String
class DjangoDebugSQL(ObjectType):
vendor = String()
alias = String()
sql = String()
duration = Float()
raw_sql = String()
params = String()
start_time = Float()
stop_time = Float()
is_slow = String()
is_select = String()
trans_id = String()
trans_status = String()
iso_level = String()
encoding = String()

View File

@ -0,0 +1,70 @@
import pytest
import graphene
from graphene.contrib.django import DjangoObjectType
from ...tests.models import Reporter
from ..schema import DebugSchema
# from examples.starwars_django.models import Character
pytestmark = pytest.mark.django_db
def test_should_query_well():
r1 = Reporter(last_name='ABA')
r1.save()
r2 = Reporter(last_name='Griffin')
r2.save()
class ReporterType(DjangoObjectType):
class Meta:
model = Reporter
class Query(graphene.ObjectType):
reporter = graphene.Field(ReporterType)
all_reporters = ReporterType.List
def resolve_all_reporters(self, *args, **kwargs):
return Reporter.objects.all()
def resolve_reporter(self, *args, **kwargs):
return Reporter.objects.first()
query = '''
query ReporterQuery {
reporter {
lastName
}
allReporters {
lastName
}
__debug {
sql {
rawSql
}
}
}
'''
expected = {
'reporter': {
'lastName': 'ABA',
},
'allReporters': [{
'lastName': 'ABA',
}, {
'lastName': 'Griffin',
}],
'__debug': {
'sql': [{
'rawSql': str(Reporter.objects.order_by('pk')[:1].query)
}, {
'rawSql': str(Reporter.objects.all().query)
}]
}
}
schema = DebugSchema(query=Query)
result = schema.execute(query)
assert not result.errors
assert result.data == expected

View File

@ -0,0 +1,7 @@
from ....core.classtypes.objecttype import ObjectType
from ....core.types import Field
from .sql.types import DjangoDebugSQL
class DjangoDebug(ObjectType):
sql = Field(DjangoDebugSQL.List)