psycopg2/ZPsycopgDA/db.py

209 lines
6.8 KiB
Python
Raw Normal View History

# ZPsycopgDA/db.py - query execution
#
# Copyright (C) 2004 Federico Di Gregorio <fog@initd.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2, or (at your option) any later
# version.
#
# Or, at your option this program (ZPsycopgDA) can be distributed under the
# Zope Public License (ZPL) Version 1.0, as published on the Zope web site,
# http://www.zope.org/Resources/ZPL.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
# or FITNESS FOR A PARTICULAR PURPOSE.
#
# See the LICENSE file for details.
from Shared.DC.ZRDB.TM import TM
from Shared.DC.ZRDB import dbi_db
from ZODB.POSException import ConflictError
import site
import pool
2005-08-22 05:49:18 +04:00
import psycopg2
2006-06-09 04:05:42 +04:00
from psycopg2.extensions import INTEGER, LONGINTEGER, FLOAT, BOOLEAN, DATE, TIME
from psycopg2.extensions import register_type
2005-08-22 05:49:18 +04:00
from psycopg2 import NUMBER, STRING, ROWID, DATETIME
# the DB object, managing all the real query work
class DB(TM, dbi_db.DB):
_p_oid = _p_changed = _registered = None
def __init__(self, dsn, tilevel, typecasts, enc='utf-8'):
self.dsn = dsn
self.tilevel = tilevel
self.typecasts = typecasts
self.encoding = enc
self.failures = 0
self.calls = 0
2006-06-15 16:39:56 +04:00
self.make_mappings()
def getconn(self, create=True):
conn = pool.getconn(self.dsn)
conn.set_isolation_level(int(self.tilevel))
conn.set_client_encoding(self.encoding)
for tc in self.typecasts:
register_type(tc, conn)
return conn
def putconn(self, close=False):
try:
conn = pool.getconn(self.dsn, False)
except AttributeError:
pass
pool.putconn(self.dsn, conn, close)
def getcursor(self):
conn = self.getconn()
return conn.cursor()
def _finish(self, *ignored):
try:
conn = self.getconn(False)
conn.commit()
self.putconn()
except AttributeError:
pass
def _abort(self, *ignored):
try:
conn = self.getconn(False)
conn.rollback()
self.putconn()
except AttributeError:
pass
def open(self):
# this will create a new pool for our DSN if not already existing,
# then get and immediately release a connection
self.getconn()
self.putconn()
def close(self):
# FIXME: if this connection is closed we flush all the pool associated
# with the current DSN; does this makes sense?
pool.flushpool(self.dsn)
def sortKey(self):
return 1
2006-06-15 16:39:56 +04:00
def make_mappings(self):
"""Generate the mappings used later by self.convert_description()."""
self.type_mappings = {}
for t, s in [(INTEGER,'i'), (LONGINTEGER, 'i'), (NUMBER, 'n'),
(BOOLEAN,'n'), (ROWID, 'i'),
(DATETIME, 'd'), (DATE, 'd'), (TIME, 'd')]:
for v in t.values:
self.type_mappings[v] = (t, s)
2005-05-26 11:34:27 +04:00
def convert_description(self, desc, use_psycopg_types=False):
"""Convert DBAPI-2.0 description field to Zope format."""
items = []
for name, typ, width, ds, p, scale, null_ok in desc:
2006-06-15 16:39:56 +04:00
m = self.type_mappings.get(typ, (STRING, 's'))
2005-05-26 11:34:27 +04:00
items.append({
'name': name,
2006-06-15 16:39:56 +04:00
'type': use_psycopg_types and m[0] or m[1],
2005-05-26 11:34:27 +04:00
'width': width,
'precision': p,
'scale': scale,
'null': null_ok,
})
return items
## tables and rows ##
def tables(self, rdb=0, _care=('TABLE', 'VIEW')):
self._register()
c = self.getcursor()
c.execute(
"SELECT t.tablename AS NAME, 'TABLE' AS TYPE "
" FROM pg_tables t WHERE tableowner <> 'postgres' "
"UNION SELECT v.viewname AS NAME, 'VIEW' AS TYPE "
" FROM pg_views v WHERE viewowner <> 'postgres' "
"UNION SELECT t.tablename AS NAME, 'SYSTEM_TABLE\' AS TYPE "
" FROM pg_tables t WHERE tableowner = 'postgres' "
"UNION SELECT v.viewname AS NAME, 'SYSTEM_TABLE' AS TYPE "
"FROM pg_views v WHERE viewowner = 'postgres'")
res = []
for name, typ in c.fetchall():
if typ in _care:
res.append({'TABLE_NAME': name, 'TABLE_TYPE': typ})
self.putconn()
return res
def columns(self, table_name):
self._register()
c = self.getcursor()
try:
r = c.execute('SELECT * FROM "%s" WHERE 1=0' % table_name)
except:
return ()
self.putconn()
2005-05-26 11:34:27 +04:00
return self.convert_description(c.description, True)
## query execution ##
def query(self, query_string, max_rows=None, query_data=None):
self._register()
self.calls = self.calls+1
desc = ()
res = []
nselects = 0
c = self.getcursor()
try:
for qs in [x for x in query_string.split('\0') if x]:
try:
if query_data:
c.execute(qs, query_data)
else:
c.execute(qs)
2005-09-12 06:23:17 +04:00
except psycopg2.OperationalError, e:
try:
self.close()
except:
pass
self.open()
try:
if query_data:
c.execute(qs, query_data)
else:
c.execute(qs)
2005-08-22 05:49:18 +04:00
except (psycopg2.ProgrammingError,
psycopg2.IntegrityError), e:
if e.args[0].find("concurrent update") > -1:
raise ConflictError
raise e
2005-08-22 05:49:18 +04:00
except (psycopg2.ProgrammingError, psycopg2.IntegrityError), e:
if e.args[0].find("concurrent update") > -1:
raise ConflictError
raise e
if c.description is not None:
nselects += 1
if c.description != desc and nselects > 1:
2005-08-22 05:49:18 +04:00
raise psycopg2.ProgrammingError(
'multiple selects in single query not allowed')
if max_rows:
res = c.fetchmany(max_rows)
else:
res = c.fetchall()
desc = c.description
self.failures = 0
except StandardError, err:
self._abort()
raise err
2005-05-26 11:34:27 +04:00
return self.convert_description(desc), res