Merge commit '2_4_6'

This commit is contained in:
Daniele Varrazzo 2013-01-21 11:18:27 +00:00
commit 7a1d1791d3
37 changed files with 790 additions and 257 deletions

25
NEWS
View File

@ -1,3 +1,28 @@
What's new in psycopg 2.4.6
---------------------------
- Fixed 'cursor()' arguments propagation in connection subclasses
and overriding of the 'cursor_factory' argument. Thanks to
Corry Haines for the report and the initial patch (ticket #105).
- Dropped GIL release during string adaptation around a function call
invoking a Python API function, which could cause interpreter crash.
Thanks to Manu Cupcic for the report (ticket #110).
- Close a green connection if there is an error in the callback.
Maybe a harsh solution but it leaves the program responsive
(ticket #113).
- 'register_hstore()', 'register_composite()', 'tpc_recover()' work with
RealDictConnection and Cursor (ticket #114).
- Fixed broken pool for Zope and connections re-init across ZSQL methods
in the same request (tickets #123, #125, #142).
- connect() raises an exception instead of swallowing keyword arguments
when a connection string is specified as well (ticket #131).
- Discard any result produced by 'executemany()' (ticket #133).
- Fixed pickling of FixedOffsetTimezone objects (ticket #135).
- Release the GIL around PQgetResult calls after COPY (ticket #140).
- Fixed empty strings handling in composite caster (ticket #141).
- Fixed pickling of DictRow and RealDictRow objects.
What's new in psycopg 2.4.5
---------------------------

View File

@ -16,7 +16,7 @@
# their work without bothering about the module dependencies.
ALLOWED_PSYCOPG_VERSIONS = ('2.4-beta1', '2.4-beta2', '2.4', '2.4.1', '2.4.2', '2.4.3', '2.4.4', '2.4.5')
ALLOWED_PSYCOPG_VERSIONS = ('2.4', '2.4.1', '2.4.4', '2.4.5', '2.4.6')
import sys
import time

View File

@ -32,7 +32,7 @@ from psycopg2 import NUMBER, STRING, ROWID, DATETIME
# the DB object, managing all the real query work
class DB(TM, dbi_db.DB):
_p_oid = _p_changed = _registered = None
def __init__(self, dsn, tilevel, typecasts, enc='utf-8'):
@ -46,13 +46,18 @@ class DB(TM, dbi_db.DB):
self.failures = 0
self.calls = 0
self.make_mappings()
def getconn(self, init=True):
# if init is False we are trying to get hold on an already existing
# connection, so we avoid to (re)initialize it risking errors.
conn = pool.getconn(self.dsn)
if init:
conn.set_isolation_level(int(self.tilevel))
# use set_session where available as in these versions
# set_isolation_level generates an extra query.
if psycopg2.__version__ >= '2.4.2':
conn.set_session(isolation_level=int(self.tilevel))
else:
conn.set_isolation_level(int(self.tilevel))
conn.set_client_encoding(self.encoding)
for tc in self.typecasts:
register_type(tc, conn)
@ -64,9 +69,9 @@ class DB(TM, dbi_db.DB):
except AttributeError:
pass
pool.putconn(self.dsn, conn, close)
def getcursor(self):
conn = self.getconn()
conn = self.getconn(False)
return conn.cursor()
def _finish(self, *ignored):
@ -76,7 +81,7 @@ class DB(TM, dbi_db.DB):
self.putconn()
except AttributeError:
pass
def _abort(self, *ignored):
try:
conn = self.getconn(False)
@ -90,7 +95,7 @@ class DB(TM, dbi_db.DB):
# then get and immediately release a connection
self.getconn()
self.putconn()
def close(self):
# FIXME: if this connection is closed we flush all the pool associated
# with the current DSN; does this makes sense?

View File

@ -78,8 +78,10 @@ or DSN for short) is a string... (TODO: finish docs)
</td>
<td align="left" valign="top">
<select name="tilevel:int">
<option value="4">Read uncommitted</option>
<option value="1">Read committed</option>
<option value="2" selected="YES">Serializable</option>
<option value="2" selected="YES">Repeatable read</option>
<option value="3">Serializable</option>
</select>
</td>
</tr>

View File

@ -44,11 +44,17 @@
</td>
<td align="left" valign="top">
<select name="tilevel:int">
<option value="4"
<dtml-if expr="tilevel==4">selected="YES"</dtml-if>>
Read uncommitted</option>
<option value="1"
<dtml-if expr="tilevel==1">selected="YES"</dtml-if>>
Read committed</option>
<option value="2"
<dtml-if expr="tilevel==2">selected="YES"</dtml-if>>
Repeatable read</option>
<option value="3"
<dtml-if expr="tilevel==3">selected="YES"</dtml-if>>
Serializable</option>
</select>
</td>

View File

@ -19,7 +19,151 @@
# ZPsycopgDA code in db.py.
import threading
import psycopg2.pool
import psycopg2
from psycopg2.pool import PoolError
class AbstractConnectionPool(object):
"""Generic key-based pooling code."""
def __init__(self, minconn, maxconn, *args, **kwargs):
"""Initialize the connection pool.
New 'minconn' connections are created immediately calling 'connfunc'
with given parameters. The connection pool will support a maximum of
about 'maxconn' connections.
"""
self.minconn = minconn
self.maxconn = maxconn
self.closed = False
self._args = args
self._kwargs = kwargs
self._pool = []
self._used = {}
self._rused = {} # id(conn) -> key map
self._keys = 0
for i in range(self.minconn):
self._connect()
def _connect(self, key=None):
"""Create a new connection and assign it to 'key' if not None."""
conn = psycopg2.connect(*self._args, **self._kwargs)
if key is not None:
self._used[key] = conn
self._rused[id(conn)] = key
else:
self._pool.append(conn)
return conn
def _getkey(self):
"""Return a new unique key."""
self._keys += 1
return self._keys
def _getconn(self, key=None):
"""Get a free connection and assign it to 'key' if not None."""
if self.closed: raise PoolError("connection pool is closed")
if key is None: key = self._getkey()
if key in self._used:
return self._used[key]
if self._pool:
self._used[key] = conn = self._pool.pop()
self._rused[id(conn)] = key
return conn
else:
if len(self._used) == self.maxconn:
raise PoolError("connection pool exausted")
return self._connect(key)
def _putconn(self, conn, key=None, close=False):
"""Put away a connection."""
if self.closed: raise PoolError("connection pool is closed")
if key is None: key = self._rused[id(conn)]
if not key:
raise PoolError("trying to put unkeyed connection")
if len(self._pool) < self.minconn and not close:
self._pool.append(conn)
else:
conn.close()
# here we check for the presence of key because it can happen that a
# thread tries to put back a connection after a call to close
if not self.closed or key in self._used:
del self._used[key]
del self._rused[id(conn)]
def _closeall(self):
"""Close all connections.
Note that this can lead to some code fail badly when trying to use
an already closed connection. If you call .closeall() make sure
your code can deal with it.
"""
if self.closed: raise PoolError("connection pool is closed")
for conn in self._pool + list(self._used.values()):
try:
conn.close()
except:
pass
self.closed = True
class PersistentConnectionPool(AbstractConnectionPool):
"""A pool that assigns persistent connections to different threads.
Note that this connection pool generates by itself the required keys
using the current thread id. This means that until a thread puts away
a connection it will always get the same connection object by successive
`!getconn()` calls. This also means that a thread can't use more than one
single connection from the pool.
"""
def __init__(self, minconn, maxconn, *args, **kwargs):
"""Initialize the threading lock."""
import threading
AbstractConnectionPool.__init__(
self, minconn, maxconn, *args, **kwargs)
self._lock = threading.Lock()
# we we'll need the thread module, to determine thread ids, so we
# import it here and copy it in an instance variable
import thread
self.__thread = thread
def getconn(self):
"""Generate thread id and return a connection."""
key = self.__thread.get_ident()
self._lock.acquire()
try:
return self._getconn(key)
finally:
self._lock.release()
def putconn(self, conn=None, close=False):
"""Put away an unused connection."""
key = self.__thread.get_ident()
self._lock.acquire()
try:
if not conn: conn = self._used[key]
self._putconn(conn, key, close)
finally:
self._lock.release()
def closeall(self):
"""Close all connections (even the one currently in use.)"""
self._lock.acquire()
try:
self._closeall()
finally:
self._lock.release()
_connections_pool = {}
_connections_lock = threading.Lock()
@ -29,7 +173,7 @@ def getpool(dsn, create=True):
try:
if not _connections_pool.has_key(dsn) and create:
_connections_pool[dsn] = \
psycopg2.pool.PersistentConnectionPool(4, 200, dsn)
PersistentConnectionPool(4, 200, dsn)
finally:
_connections_lock.release()
return _connections_pool[dsn]
@ -41,7 +185,7 @@ def flushpool(dsn):
del _connections_pool[dsn]
finally:
_connections_lock.release()
def getconn(dsn, create=True):
return getpool(dsn, create=create).getconn()

View File

@ -60,11 +60,17 @@ The ``connection`` class
pair: Transaction; Commit
.. method:: commit()
Commit any pending transaction to the database. Psycopg can be set to
perform automatic commits at each operation, see
`~connection.set_isolation_level()`.
Commit any pending transaction to the database.
By default, Psycopg opens a transaction before executing the first
command: if `!commit()` is not called, the effect of any data
manipulation will be lost.
The connection can be also set in "autocommit" mode: no transaction is
automatically open, commands have immediate effect. See
:ref:`transactions-control` for details.
.. index::
pair: Transaction; Rollback
@ -77,7 +83,7 @@ The ``connection`` class
.. method:: close()
Close the connection now (rather than whenever `del` is executed).
The connection will be unusable from this point forward; an
`~psycopg2.InterfaceError` will be raised if any operation is

View File

@ -154,6 +154,15 @@ can be enabled using the `register_hstore()` function.
.. autofunction:: register_hstore
.. versionchanged:: 2.4
added the *oid* parameter. If not specified, the typecaster is
installed also if |hstore| is not installed in the :sql:`public`
schema.
.. versionchanged:: 2.4.3
added support for |hstore| array.
.. |hstore| replace:: :sql:`hstore`
.. _hstore: http://www.postgresql.org/docs/current/static/hstore.html

View File

@ -7,7 +7,7 @@ import sys
def main():
if len(sys.argv) != 3:
print >>sys.stderr, "usage: %s index.rst text-dir"
sys.stderr.write("usage: %s index.rst text-dir\n")
return 2
_, index, txt_dir = sys.argv
@ -18,7 +18,11 @@ def main():
return 0
def iter_file_base(fn):
have_line = iter(open(fn)).next
f = open(fn)
if sys.version_info[0] >= 3:
have_line = iter(f).__next__
else:
have_line = iter(f).next
while not have_line().startswith('.. toctree'):
pass
@ -28,7 +32,7 @@ def iter_file_base(fn):
yield os.path.splitext(os.path.basename(fn))[0]
n = 0
while 1:
while True:
line = have_line()
if line.isspace():
continue
@ -37,18 +41,21 @@ def iter_file_base(fn):
n += 1
yield line.strip()
f.close()
if n < 5:
# maybe format changed?
raise Exception("Not enough files found. Format change in index.rst?")
def emit(basename, txt_dir):
for line in open(os.path.join(txt_dir, basename + ".txt")):
f = open(os.path.join(txt_dir, basename + ".txt"))
for line in f:
line = line.replace("``", "'")
sys.stdout.write(line)
f.close()
# some space between sections
print
print
sys.stdout.write("\n\n")
if __name__ == '__main__':

View File

@ -97,6 +97,9 @@ many placeholders can use the same values::
... VALUES (%(int)s, %(date)s, %(date)s, %(str)s);""",
... {'int': 10, 'str': "O'Reilly", 'date': datetime.date(2005, 11, 18)})
When parameters are used, in order to include a literal ``%`` in the query you
can use the ``%%`` string.
While the mechanism resembles regular Python strings manipulation, there are a
few subtle differences you should care about when passing parameters to a
query:
@ -196,7 +199,7 @@ argument of the `~cursor.execute()` method::
Adaptation of Python values to SQL types
----------------------------------------
Many standards Python types are adapted into SQL and returned as Python
Many standard Python types are adapted into SQL and returned as Python
objects when a query is executed.
If you need to convert other Python types to and from PostgreSQL data types,
@ -513,12 +516,12 @@ issued by all the cursors created by the same connection. Should any command
fail, the transaction will be aborted and no further command will be executed
until a call to the `~connection.rollback()` method.
The connection is responsible to terminate its transaction, calling either the
`~connection.commit()` or `~connection.rollback()` method. Committed
The connection is responsible for terminating its transaction, calling either
the `~connection.commit()` or `~connection.rollback()` method. Committed
changes are immediately made persistent into the database. Closing the
connection using the `~connection.close()` method or destroying the
connection object (using `!del` or letting it fall out of scope)
will result in an implicit `!rollback()` call.
will result in an implicit rollback.
It is possible to set the connection in *autocommit* mode: this way all the
commands executed will be immediately committed and no rollback is possible. A

View File

@ -146,37 +146,36 @@ def connect(dsn=None,
Using *async*=True an asynchronous connection will be created.
Any other keyword parameter will be passed to the underlying client
library: the list of supported parameter depends on the library version.
library: the list of supported parameters depends on the library version.
"""
items = []
if database is not None:
items.append(('dbname', database))
if user is not None:
items.append(('user', user))
if password is not None:
items.append(('password', password))
if host is not None:
items.append(('host', host))
if port is not None:
items.append(('port', port))
items.extend([(k, v) for (k, v) in kwargs.iteritems() if v is not None])
if dsn is not None and items:
raise TypeError(
"'%s' is an invalid keyword argument when the dsn is specified"
% items[0][0])
if dsn is None:
# Note: reproducing the behaviour of the previous C implementation:
# keyword are silently swallowed if a DSN is specified. I would have
# raised an exception. File under "histerical raisins".
items = []
if database is not None:
items.append(('dbname', database))
if user is not None:
items.append(('user', user))
if password is not None:
items.append(('password', password))
if host is not None:
items.append(('host', host))
# Reproducing the previous C implementation behaviour: swallow a
# negative port. The libpq would raise an exception for it.
if port is not None and int(port) > 0:
items.append(('port', port))
if not items:
raise TypeError('missing dsn and no parameters')
else:
dsn = " ".join(["%s=%s" % (k, _param_escape(str(v)))
for (k, v) in items])
items.extend(
[(k, v) for (k, v) in kwargs.iteritems() if v is not None])
dsn = " ".join(["%s=%s" % (k, _param_escape(str(v)))
for (k, v) in items])
if not dsn:
raise InterfaceError('missing dsn and no parameters')
return _connect(dsn,
connection_factory=connection_factory, async=async)
return _connect(dsn, connection_factory=connection_factory, async=async)
__all__ = filter(lambda k: not k.startswith('_'), locals().keys())

View File

@ -116,20 +116,21 @@ def register_adapter(typ, callable):
# The SQL_IN class is the official adapter for tuples starting from 2.0.6.
class SQL_IN(object):
"""Adapt any iterable to an SQL quotable object."""
def __init__(self, seq):
self._seq = seq
self._conn = None
def prepare(self, conn):
self._conn = conn
def getquoted(self):
# this is the important line: note how every object in the
# list is adapted and then how getquoted() is called on it
pobjs = [adapt(o) for o in self._seq]
for obj in pobjs:
if hasattr(obj, 'prepare'):
obj.prepare(self._conn)
if self._conn is not None:
for obj in pobjs:
if hasattr(obj, 'prepare'):
obj.prepare(self._conn)
qobjs = [o.getquoted() for o in pobjs]
return b('(') + b(', ').join(qobjs) + b(')')

View File

@ -54,46 +54,46 @@ class DictCursorBase(_cursor):
else:
raise NotImplementedError(
"DictCursorBase can't be instantiated without a row factory.")
_cursor.__init__(self, *args, **kwargs)
super(DictCursorBase, self).__init__(*args, **kwargs)
self._query_executed = 0
self._prefetch = 0
self.row_factory = row_factory
def fetchone(self):
if self._prefetch:
res = _cursor.fetchone(self)
res = super(DictCursorBase, self).fetchone()
if self._query_executed:
self._build_index()
if not self._prefetch:
res = _cursor.fetchone(self)
res = super(DictCursorBase, self).fetchone()
return res
def fetchmany(self, size=None):
if self._prefetch:
res = _cursor.fetchmany(self, size)
res = super(DictCursorBase, self).fetchmany(size)
if self._query_executed:
self._build_index()
if not self._prefetch:
res = _cursor.fetchmany(self, size)
res = super(DictCursorBase, self).fetchmany(size)
return res
def fetchall(self):
if self._prefetch:
res = _cursor.fetchall(self)
res = super(DictCursorBase, self).fetchall()
if self._query_executed:
self._build_index()
if not self._prefetch:
res = _cursor.fetchall(self)
res = super(DictCursorBase, self).fetchall()
return res
def __iter__(self):
if self._prefetch:
res = _cursor.__iter__(self)
res = super(DictCursorBase, self).__iter__()
first = res.next()
if self._query_executed:
self._build_index()
if not self._prefetch:
res = _cursor.__iter__(self)
res = super(DictCursorBase, self).__iter__()
first = res.next()
yield first
@ -103,29 +103,27 @@ class DictCursorBase(_cursor):
class DictConnection(_connection):
"""A connection that uses `DictCursor` automatically."""
def cursor(self, name=None):
if name is None:
return _connection.cursor(self, cursor_factory=DictCursor)
else:
return _connection.cursor(self, name, cursor_factory=DictCursor)
def cursor(self, *args, **kwargs):
kwargs.setdefault('cursor_factory', DictCursor)
return super(DictConnection, self).cursor(*args, **kwargs)
class DictCursor(DictCursorBase):
"""A cursor that keeps a list of column name -> index mappings."""
def __init__(self, *args, **kwargs):
kwargs['row_factory'] = DictRow
DictCursorBase.__init__(self, *args, **kwargs)
super(DictCursor, self).__init__(*args, **kwargs)
self._prefetch = 1
def execute(self, query, vars=None):
self.index = {}
self._query_executed = 1
return _cursor.execute(self, query, vars)
return super(DictCursor, self).execute(query, vars)
def callproc(self, procname, vars=None):
self.index = {}
self._query_executed = 1
return _cursor.callproc(self, procname, vars)
return super(DictCursor, self).callproc(procname, vars)
def _build_index(self):
if self._query_executed == 1 and self.description:
@ -186,7 +184,14 @@ class DictRow(list):
def __contains__(self, x):
return x in self._index
# grop the crusty Py2 methods
def __getstate__(self):
return self[:], self._index.copy()
def __setstate__(self, data):
self[:] = data[0]
self._index = data[1]
# drop the crusty Py2 methods
if sys.version_info[0] > 2:
items = iteritems; del iteritems
keys = iterkeys; del iterkeys
@ -196,11 +201,9 @@ class DictRow(list):
class RealDictConnection(_connection):
"""A connection that uses `RealDictCursor` automatically."""
def cursor(self, name=None):
if name is None:
return _connection.cursor(self, cursor_factory=RealDictCursor)
else:
return _connection.cursor(self, name, cursor_factory=RealDictCursor)
def cursor(self, *args, **kwargs):
kwargs.setdefault('cursor_factory', RealDictCursor)
return super(RealDictConnection, self).cursor(*args, **kwargs)
class RealDictCursor(DictCursorBase):
"""A cursor that uses a real dict as the base type for rows.
@ -210,21 +213,20 @@ class RealDictCursor(DictCursorBase):
to access database rows both as a dictionary and a list, then use
the generic `DictCursor` instead of `!RealDictCursor`.
"""
def __init__(self, *args, **kwargs):
kwargs['row_factory'] = RealDictRow
DictCursorBase.__init__(self, *args, **kwargs)
super(RealDictCursor, self).__init__(*args, **kwargs)
self._prefetch = 0
def execute(self, query, vars=None):
self.column_mapping = []
self._query_executed = 1
return _cursor.execute(self, query, vars)
return super(RealDictCursor, self).execute(query, vars)
def callproc(self, procname, vars=None):
self.column_mapping = []
self._query_executed = 1
return _cursor.callproc(self, procname, vars)
return super(RealDictCursor, self).callproc(procname, vars)
def _build_index(self):
if self._query_executed == 1 and self.description:
@ -250,12 +252,19 @@ class RealDictRow(dict):
name = self._column_mapping[name]
return dict.__setitem__(self, name, value)
def __getstate__(self):
return (self.copy(), self._column_mapping[:])
def __setstate__(self, data):
self.update(data[0])
self._column_mapping = data[1]
class NamedTupleConnection(_connection):
"""A connection that uses `NamedTupleCursor` automatically."""
def cursor(self, *args, **kwargs):
kwargs['cursor_factory'] = NamedTupleCursor
return _connection.cursor(self, *args, **kwargs)
kwargs.setdefault('cursor_factory', NamedTupleCursor)
return super(NamedTupleConnection, self).cursor(*args, **kwargs)
class NamedTupleCursor(_cursor):
"""A cursor that generates results as `~collections.namedtuple`.
@ -277,18 +286,18 @@ class NamedTupleCursor(_cursor):
def execute(self, query, vars=None):
self.Record = None
return _cursor.execute(self, query, vars)
return super(NamedTupleCursor, self).execute(query, vars)
def executemany(self, query, vars):
self.Record = None
return _cursor.executemany(self, query, vars)
return super(NamedTupleCursor, self).executemany(query, vars)
def callproc(self, procname, vars=None):
self.Record = None
return _cursor.callproc(self, procname, vars)
return super(NamedTupleCursor, self).callproc(procname, vars)
def fetchone(self):
t = _cursor.fetchone(self)
t = super(NamedTupleCursor, self).fetchone()
if t is not None:
nt = self.Record
if nt is None:
@ -296,21 +305,21 @@ class NamedTupleCursor(_cursor):
return nt(*t)
def fetchmany(self, size=None):
ts = _cursor.fetchmany(self, size)
ts = super(NamedTupleCursor, self).fetchmany(size)
nt = self.Record
if nt is None:
nt = self.Record = self._make_nt()
return [nt(*t) for t in ts]
def fetchall(self):
ts = _cursor.fetchall(self)
ts = super(NamedTupleCursor, self).fetchall()
nt = self.Record
if nt is None:
nt = self.Record = self._make_nt()
return [nt(*t) for t in ts]
def __iter__(self):
it = _cursor.__iter__(self)
it = super(NamedTupleCursor, self).__iter__()
t = it.next()
nt = self.Record
@ -349,7 +358,7 @@ class LoggingConnection(_connection):
self.log = self._logtologger
else:
self.log = self._logtofile
def filter(self, msg, curs):
"""Filter the query before logging it.
@ -358,51 +367,49 @@ class LoggingConnection(_connection):
just does nothing.
"""
return msg
def _logtofile(self, msg, curs):
msg = self.filter(msg, curs)
if msg: self._logobj.write(msg + os.linesep)
def _logtologger(self, msg, curs):
msg = self.filter(msg, curs)
if msg: self._logobj.debug(msg)
def _check(self):
if not hasattr(self, '_logobj'):
raise self.ProgrammingError(
"LoggingConnection object has not been initialize()d")
def cursor(self, name=None):
def cursor(self, *args, **kwargs):
self._check()
if name is None:
return _connection.cursor(self, cursor_factory=LoggingCursor)
else:
return _connection.cursor(self, name, cursor_factory=LoggingCursor)
kwargs.setdefault('cursor_factory', LoggingCursor)
return super(LoggingConnection, self).cursor(*args, **kwargs)
class LoggingCursor(_cursor):
"""A cursor that logs queries using its connection logging facilities."""
def execute(self, query, vars=None):
try:
return _cursor.execute(self, query, vars)
return super(LoggingCursor, self).execute(query, vars)
finally:
self.connection.log(self.query, self)
def callproc(self, procname, vars=None):
try:
return _cursor.callproc(self, procname, vars)
return super(LoggingCursor, self).callproc(procname, vars)
finally:
self.connection.log(self.query, self)
class MinTimeLoggingConnection(LoggingConnection):
"""A connection that logs queries based on execution time.
This is just an example of how to sub-class `LoggingConnection` to
provide some extra filtering for the logged queries. Both the
`inizialize()` and `filter()` methods are overwritten to make sure
that only queries executing for more than ``mintime`` ms are logged.
Note that this connection uses the specialized cursor
`MinTimeLoggingCursor`.
"""
@ -415,20 +422,17 @@ class MinTimeLoggingConnection(LoggingConnection):
if t > self._mintime:
return msg + os.linesep + " (execution time: %d ms)" % t
def cursor(self, name=None):
self._check()
if name is None:
return _connection.cursor(self, cursor_factory=MinTimeLoggingCursor)
else:
return _connection.cursor(self, name, cursor_factory=MinTimeLoggingCursor)
def cursor(self, *args, **kwargs):
kwargs.setdefault('cursor_factory', MinTimeLoggingCursor)
return LoggingConnection.cursor(self, *args, **kwargs)
class MinTimeLoggingCursor(LoggingCursor):
"""The cursor sub-class companion to `MinTimeLoggingConnection`."""
def execute(self, query, vars=None):
self.timestamp = time.time()
return LoggingCursor.execute(self, query, vars)
def callproc(self, procname, vars=None):
self.timestamp = time.time()
return LoggingCursor.execute(self, procname, vars)
@ -580,6 +584,18 @@ def wait_select(conn):
raise OperationalError("bad state from poll: %s" % state)
def _solve_conn_curs(conn_or_curs):
"""Return the connection and a DBAPI cursor from a connection or cursor."""
if hasattr(conn_or_curs, 'execute'):
conn = conn_or_curs.connection
curs = conn.cursor(cursor_factory=_cursor)
else:
conn = conn_or_curs
curs = conn.cursor(cursor_factory=_cursor)
return conn, curs
class HstoreAdapter(object):
"""Adapt a Python dict to the hstore syntax."""
def __init__(self, wrapped):
@ -688,12 +704,7 @@ class HstoreAdapter(object):
def get_oids(self, conn_or_curs):
"""Return the lists of OID of the hstore and hstore[] types.
"""
if hasattr(conn_or_curs, 'execute'):
conn = conn_or_curs.connection
curs = conn_or_curs
else:
conn = conn_or_curs
curs = conn_or_curs.cursor()
conn, curs = _solve_conn_curs(conn_or_curs)
# Store the transaction status of the connection to revert it after use
conn_status = conn.status
@ -744,7 +755,6 @@ def register_hstore(conn_or_curs, globally=False, unicode=False,
'hstore'::regtype::oid`. Analogously you can obtain a value for *array_oid*
using a query such as :sql:`SELECT 'hstore[]'::regtype::oid`.
Note that, when passing a dictionary from Python to the database, both
strings and unicode keys and values are supported. Dictionaries returned
from the database have keys/values according to the *unicode* parameter.
@ -752,15 +762,6 @@ def register_hstore(conn_or_curs, globally=False, unicode=False,
The |hstore| contrib module must be already installed in the database
(executing the ``hstore.sql`` script in your ``contrib`` directory).
Raise `~psycopg2.ProgrammingError` if the type is not found.
.. versionchanged:: 2.4
added the *oid* parameter. If not specified, the typecaster is
installed also if |hstore| is not installed in the :sql:`public`
schema.
.. versionchanged:: 2.4.3
added support for |hstore| array.
"""
if oid is None:
oid = HstoreAdapter.get_oids(conn_or_curs)
@ -874,9 +875,9 @@ class CompositeCaster(object):
for m in self._re_tokenize.finditer(s):
if m is None:
raise psycopg2.InterfaceError("can't parse type: %r" % s)
if m.group(1):
if m.group(1) is not None:
rv.append(None)
elif m.group(2):
elif m.group(2) is not None:
rv.append(self._re_undouble.sub(r"\1", m.group(2)))
else:
rv.append(m.group(3))
@ -899,12 +900,7 @@ class CompositeCaster(object):
Raise `ProgrammingError` if the type is not found.
"""
if hasattr(conn_or_curs, 'execute'):
conn = conn_or_curs.connection
curs = conn_or_curs
else:
conn = conn_or_curs
curs = conn_or_curs.cursor()
conn, curs = _solve_conn_curs(conn_or_curs)
# Store the transaction status of the connection to revert it after use
conn_status = conn.status

View File

@ -217,6 +217,10 @@ class PersistentConnectionPool(AbstractConnectionPool):
def __init__(self, minconn, maxconn, *args, **kwargs):
"""Initialize the threading lock."""
import warnings
warnings.warn("deprecated: use ZPsycopgDA.pool implementation",
DeprecationWarning)
import threading
AbstractConnectionPool.__init__(
self, minconn, maxconn, *args, **kwargs)

View File

@ -63,8 +63,7 @@ class FixedOffsetTimezone(datetime.tzinfo):
try:
return cls._cache[key]
except KeyError:
tz = datetime.tzinfo.__new__(cls, offset, name)
tz.__init__(offset, name)
tz = super(FixedOffsetTimezone, cls).__new__(cls, offset, name)
cls._cache[key] = tz
return tz
@ -73,6 +72,10 @@ class FixedOffsetTimezone(datetime.tzinfo):
return "psycopg2.tz.FixedOffsetTimezone(offset=%r, name=%r)" \
% (offset_mins, self._name)
def __getinitargs__(self):
offset_mins = self._offset.seconds // 60 + self._offset.days * 24 * 60
return (offset_mins, self._name)
def utcoffset(self, dt):
return self._offset
@ -87,7 +90,7 @@ class FixedOffsetTimezone(datetime.tzinfo):
return "%+03d:%d" % (hours, minutes)
else:
return "%+03d" % hours
def dst(self, dt):
return ZERO
@ -104,7 +107,6 @@ class LocalTimezone(datetime.tzinfo):
This is the exact implementation from the Python 2.3 documentation.
"""
def utcoffset(self, dt):
if self._isdst(dt):
return DSTOFFSET

View File

@ -73,16 +73,7 @@ qstring_quote(qstringObject *self)
/* encode the string into buffer */
Bytes_AsStringAndSize(str, &s, &len);
/* Call qstring_escape with the GIL released, then reacquire the GIL
before verifying that the results can fit into a Python string; raise
an exception if not. */
Py_BEGIN_ALLOW_THREADS
buffer = psycopg_escape_string(self->conn, s, len, NULL, &qlen);
Py_END_ALLOW_THREADS
if (buffer == NULL) {
if (!(buffer = psycopg_escape_string(self->conn, s, len, NULL, &qlen))) {
Py_DECREF(str);
PyErr_NoMemory();
return NULL;

View File

@ -141,6 +141,7 @@ HIDDEN void conn_notifies_process(connectionObject *self);
RAISES_NEG HIDDEN int conn_setup(connectionObject *self, PGconn *pgconn);
HIDDEN int conn_connect(connectionObject *self, long int async);
HIDDEN void conn_close(connectionObject *self);
HIDDEN void conn_close_locked(connectionObject *self);
RAISES_NEG HIDDEN int conn_commit(connectionObject *self);
RAISES_NEG HIDDEN int conn_rollback(connectionObject *self);
RAISES_NEG HIDDEN int conn_set_session(connectionObject *self, const char *isolevel,

View File

@ -896,7 +896,7 @@ conn_poll(connectionObject *self)
/* fetch the tuples (if there are any) and build the result. We
* don't care if pq_fetch return 0 or 1, but if there was an error,
* we want to signal it to the caller. */
if (pq_fetch(curs) == -1) {
if (pq_fetch(curs, 0) == -1) {
res = PSYCO_POLL_ERROR;
}
@ -922,12 +922,24 @@ conn_close(connectionObject *self)
return;
}
/* sets this connection as closed even for other threads; also note that
we need to check the value of pgconn, because we get called even when
the connection fails! */
/* sets this connection as closed even for other threads; */
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
conn_close_locked(self);
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;
}
/* conn_close_locked - shut down the connection with the lock already taken */
void conn_close_locked(connectionObject *self)
{
if (self->closed) {
return;
}
/* We used to call pq_abort_locked here, but the idea of issuing a
* rollback on close/GC has been considered inappropriate.
*
@ -937,9 +949,10 @@ conn_close(connectionObject *self)
* transaction though: to avoid these problems the transaction should be
* closed only in status CONN_STATUS_READY.
*/
self->closed = 1;
/* we need to check the value of pgconn, because we get called even when
* the connection fails! */
if (self->pgconn) {
PQfinish(self->pgconn);
self->pgconn = NULL;
@ -947,9 +960,6 @@ conn_close(connectionObject *self)
PQfreeCancel(self->cancel);
self->cancel = NULL;
}
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;
}
/* conn_commit - commit on a connection */

View File

@ -42,7 +42,7 @@
/* cursor method - allocate a new cursor */
#define psyco_conn_cursor_doc \
"cursor(name=None, cursor_factory=extensions.cursor, withhold=None) -- new cursor\n\n" \
"cursor(name=None, cursor_factory=extensions.cursor, withhold=False) -- new cursor\n\n" \
"Return a new cursor.\n\nThe ``cursor_factory`` argument can be used to\n" \
"create non-standard cursors by passing a class different from the\n" \
"default. Note that the new class *should* be a sub-class of\n" \
@ -50,26 +50,26 @@
":rtype: `extensions.cursor`"
static PyObject *
psyco_conn_cursor(connectionObject *self, PyObject *args, PyObject *keywds)
psyco_conn_cursor(connectionObject *self, PyObject *args, PyObject *kwargs)
{
const char *name = NULL;
PyObject *obj, *factory = NULL, *withhold = NULL;
PyObject *obj;
PyObject *name = Py_None;
PyObject *factory = (PyObject *)&cursorType;
PyObject *withhold = Py_False;
static char *kwlist[] = {"name", "cursor_factory", "withhold", NULL};
if (!PyArg_ParseTupleAndKeywords(args, keywds, "|sOO", kwlist,
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OOO", kwlist,
&name, &factory, &withhold)) {
return NULL;
}
if (withhold != NULL) {
if (PyObject_IsTrue(withhold) && name == NULL) {
PyErr_SetString(ProgrammingError,
"'withhold=True can be specified only for named cursors");
return NULL;
}
if (PyObject_IsTrue(withhold) && (name == Py_None)) {
PyErr_SetString(ProgrammingError,
"'withhold=True can be specified only for named cursors");
return NULL;
}
EXC_IF_CONN_CLOSED(self);
if (self->status != CONN_STATUS_READY &&
@ -80,31 +80,28 @@ psyco_conn_cursor(connectionObject *self, PyObject *args, PyObject *keywds)
return NULL;
}
if (name != NULL && self->async == 1) {
if (name != Py_None && self->async == 1) {
PyErr_SetString(ProgrammingError,
"asynchronous connections "
"cannot produce named cursors");
return NULL;
}
Dprintf("psyco_conn_cursor: new cursor for connection at %p", self);
Dprintf("psyco_conn_cursor: parameters: name = %s", name);
Dprintf("psyco_conn_cursor: new %s cursor for connection at %p",
(name == Py_None ? "unnamed" : "named"), self);
if (factory == NULL) factory = (PyObject *)&cursorType;
if (name)
obj = PyObject_CallFunction(factory, "Os", self, name);
else
obj = PyObject_CallFunctionObjArgs(factory, self, NULL);
if (!(obj = PyObject_CallFunctionObjArgs(factory, self, name, NULL))) {
return NULL;
}
if (obj == NULL) return NULL;
if (PyObject_IsInstance(obj, (PyObject *)&cursorType) == 0) {
PyErr_SetString(PyExc_TypeError,
"cursor factory must be subclass of psycopg2._psycopg.cursor");
Py_DECREF(obj);
return NULL;
}
if (withhold != NULL && PyObject_IsTrue(withhold))
if (PyObject_IsTrue(withhold))
((cursorObject*)obj)->withhold = 1;
Dprintf("psyco_conn_cursor: new cursor at %p: refcnt = "

View File

@ -63,7 +63,7 @@ psyco_curs_close(cursorObject *self, PyObject *args)
EXC_IF_NO_MARK(self);
PyOS_snprintf(buffer, 127, "CLOSE \"%s\"", self->name);
if (pq_execute(self, buffer, 0) == -1) return NULL;
if (pq_execute(self, buffer, 0, 0) == -1) return NULL;
}
self->closed = 1;
@ -365,7 +365,8 @@ _psyco_curs_merge_query_args(cursorObject *self,
RAISES_NEG static int
_psyco_curs_execute(cursorObject *self,
PyObject *operation, PyObject *vars, long int async)
PyObject *operation, PyObject *vars,
long int async, int no_result)
{
int res = -1;
int tmp;
@ -432,7 +433,7 @@ _psyco_curs_execute(cursorObject *self,
/* At this point, the SQL statement must be str, not unicode */
tmp = pq_execute(self, Bytes_AS_STRING(self->query), async);
tmp = pq_execute(self, Bytes_AS_STRING(self->query), async, no_result);
Dprintf("psyco_curs_execute: res = %d, pgres = %p", tmp, self->pgres);
if (tmp < 0) { goto exit; }
@ -479,7 +480,7 @@ psyco_curs_execute(cursorObject *self, PyObject *args, PyObject *kwargs)
EXC_IF_ASYNC_IN_PROGRESS(self, execute);
EXC_IF_TPC_PREPARED(self->conn, execute);
if (0 > _psyco_curs_execute(self, operation, vars, self->conn->async)) {
if (0 > _psyco_curs_execute(self, operation, vars, self->conn->async, 0)) {
return NULL;
}
@ -524,7 +525,7 @@ psyco_curs_executemany(cursorObject *self, PyObject *args, PyObject *kwargs)
}
while ((v = PyIter_Next(vars)) != NULL) {
if (0 > _psyco_curs_execute(self, operation, v, 0)) {
if (0 > _psyco_curs_execute(self, operation, v, 0, 1)) {
Py_DECREF(v);
Py_XDECREF(iter);
return NULL;
@ -655,7 +656,7 @@ _psyco_curs_prefetch(cursorObject *self)
if (self->pgres == NULL) {
Dprintf("_psyco_curs_prefetch: trying to fetch data");
do {
i = pq_fetch(self);
i = pq_fetch(self, 0);
Dprintf("_psycopg_curs_prefetch: result = %d", i);
} while(i == 1);
}
@ -757,7 +758,7 @@ psyco_curs_fetchone(cursorObject *self, PyObject *args)
EXC_IF_ASYNC_IN_PROGRESS(self, fetchone);
EXC_IF_TPC_PREPARED(self->conn, fetchone);
PyOS_snprintf(buffer, 127, "FETCH FORWARD 1 FROM \"%s\"", self->name);
if (pq_execute(self, buffer, 0) == -1) return NULL;
if (pq_execute(self, buffer, 0, 0) == -1) return NULL;
if (_psyco_curs_prefetch(self) < 0) return NULL;
}
@ -808,7 +809,7 @@ psyco_curs_next_named(cursorObject *self)
PyOS_snprintf(buffer, 127, "FETCH FORWARD %ld FROM \"%s\"",
self->itersize, self->name);
if (pq_execute(self, buffer, 0) == -1) return NULL;
if (pq_execute(self, buffer, 0, 0) == -1) return NULL;
if (_psyco_curs_prefetch(self) < 0) return NULL;
}
@ -877,7 +878,7 @@ psyco_curs_fetchmany(cursorObject *self, PyObject *args, PyObject *kwords)
EXC_IF_TPC_PREPARED(self->conn, fetchone);
PyOS_snprintf(buffer, 127, "FETCH FORWARD %d FROM \"%s\"",
(int)size, self->name);
if (pq_execute(self, buffer, 0) == -1) { goto exit; }
if (pq_execute(self, buffer, 0, 0) == -1) { goto exit; }
if (_psyco_curs_prefetch(self) < 0) { goto exit; }
}
@ -952,7 +953,7 @@ psyco_curs_fetchall(cursorObject *self, PyObject *args)
EXC_IF_ASYNC_IN_PROGRESS(self, fetchall);
EXC_IF_TPC_PREPARED(self->conn, fetchall);
PyOS_snprintf(buffer, 127, "FETCH FORWARD ALL FROM \"%s\"", self->name);
if (pq_execute(self, buffer, 0) == -1) { goto exit; }
if (pq_execute(self, buffer, 0, 0) == -1) { goto exit; }
if (_psyco_curs_prefetch(self) < 0) { goto exit; }
}
@ -1045,7 +1046,8 @@ psyco_curs_callproc(cursorObject *self, PyObject *args)
if (!(operation = Bytes_FromString(sql))) { goto exit; }
if (0 <= _psyco_curs_execute(self, operation, parameters, self->conn->async)) {
if (0 <= _psyco_curs_execute(self, operation, parameters,
self->conn->async, 0)) {
Py_INCREF(parameters);
res = parameters;
}
@ -1172,7 +1174,7 @@ psyco_curs_scroll(cursorObject *self, PyObject *args, PyObject *kwargs)
else {
PyOS_snprintf(buffer, 127, "MOVE %d FROM \"%s\"", value, self->name);
}
if (pq_execute(self, buffer, 0) == -1) return NULL;
if (pq_execute(self, buffer, 0, 0) == -1) return NULL;
if (_psyco_curs_prefetch(self) < 0) return NULL;
}
@ -1352,7 +1354,7 @@ psyco_curs_copy_from(cursorObject *self, PyObject *args, PyObject *kwargs)
Py_INCREF(file);
self->copyfile = file;
if (pq_execute(self, query, 0) >= 0) {
if (pq_execute(self, query, 0, 0) >= 0) {
res = Py_None;
Py_INCREF(Py_None);
}
@ -1448,7 +1450,7 @@ psyco_curs_copy_to(cursorObject *self, PyObject *args, PyObject *kwargs)
Py_INCREF(file);
self->copyfile = file;
if (pq_execute(self, query, 0) >= 0) {
if (pq_execute(self, query, 0, 0) >= 0) {
res = Py_None;
Py_INCREF(Py_None);
}
@ -1522,7 +1524,7 @@ psyco_curs_copy_expert(cursorObject *self, PyObject *args, PyObject *kwargs)
self->copyfile = file;
/* At this point, the SQL statement must be str, not unicode */
if (pq_execute(self, Bytes_AS_STRING(sql), 0) >= 0) {
if (pq_execute(self, Bytes_AS_STRING(sql), 0, 0) >= 0) {
res = Py_None;
Py_INCREF(res);
}
@ -1724,7 +1726,7 @@ cursor_setup(cursorObject *self, connectionObject *conn, const char *name)
if (name) {
if (!(self->name = psycopg_escape_identifier_easy(name, 0))) {
return 1;
return -1;
}
}
@ -1733,7 +1735,7 @@ cursor_setup(cursorObject *self, connectionObject *conn, const char *name)
(PyObject *)&connectionType) == 0) {
PyErr_SetString(PyExc_TypeError,
"argument 1 must be subclass of psycopg2._psycopg.connection");
return 1;
return -1;
} */
Py_INCREF(conn);
self->conn = conn;
@ -1808,15 +1810,35 @@ cursor_dealloc(PyObject* obj)
}
static int
cursor_init(PyObject *obj, PyObject *args, PyObject *kwds)
cursor_init(PyObject *obj, PyObject *args, PyObject *kwargs)
{
const char *name = NULL;
PyObject *conn;
PyObject *name = Py_None;
const char *cname;
if (!PyArg_ParseTuple(args, "O|s", &conn, &name))
static char *kwlist[] = {"conn", "name", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!|O", kwlist,
&connectionType, &conn, &name)) {
return -1;
}
return cursor_setup((cursorObject *)obj, (connectionObject *)conn, name);
if (name == Py_None) {
cname = NULL;
} else {
Py_INCREF(name); /* for ensure_bytes */
if (!(name = psycopg_ensure_bytes(name))) {
/* name has had a ref stolen */
return -1;
}
Py_DECREF(name);
if (!(cname = Bytes_AsString(name))) {
return -1;
}
}
return cursor_setup((cursorObject *)obj, (connectionObject *)conn, cname);
}
static PyObject *

View File

@ -34,7 +34,7 @@
HIDDEN PyObject *wait_callback = NULL;
static PyObject *have_wait_callback(void);
static void psyco_clear_result_blocking(connectionObject *conn);
static void green_panic(connectionObject *conn);
/* Register a callback function to block waiting for data.
*
@ -178,7 +178,7 @@ psyco_exec_green(connectionObject *conn, const char *command)
conn->async_status = ASYNC_WRITE;
if (0 != psyco_wait(conn)) {
psyco_clear_result_blocking(conn);
green_panic(conn);
goto end;
}
@ -192,22 +192,21 @@ end:
}
/* Discard the result of the currenly executed query, blocking.
*
* This function doesn't honour the wait callback: it can be used in case of
* emergency if the callback fails in order to put the connection back into a
* consistent state.
*
* If any command was issued before clearing the result, libpq would fail with
* the error "another command is already in progress".
/* There has been a communication error during query execution. It may have
* happened e.g. for a network error or an error in the callback, and we
* cannot tell the two apart.
* Trying to PQcancel or PQgetResult to put the connection back into a working
* state doesn't work nice (issue #113): the program blocks and the
* interpreter won't even respond to SIGINT. PQreset could work async, but the
* python program would have then a connection made but not configured where
* it is probably not designed to handled. So for the moment we do the kindest
* thing we can: we close the connection. A long-running program should
* already have a way to discard broken connections; a short-lived one would
* benefit of working ctrl-c.
*/
static void
psyco_clear_result_blocking(connectionObject *conn)
green_panic(connectionObject *conn)
{
PGresult *res;
Dprintf("psyco_clear_result_blocking");
while (NULL != (res = PQgetResult(conn->pgconn))) {
PQclear(res);
}
Dprintf("green_panic: closing the connection");
conn_close_locked(conn);
}

View File

@ -829,12 +829,16 @@ pq_flush(connectionObject *conn)
}
/* pq_execute - execute a query, possibly asynchronously
this fucntion locks the connection object
this function call Py_*_ALLOW_THREADS macros */
*
* With no_result an eventual query result is discarded.
* Currently only used to implement cursor.executemany().
*
* This function locks the connection object
* This function call Py_*_ALLOW_THREADS macros
*/
RAISES_NEG int
pq_execute(cursorObject *curs, const char *query, int async)
pq_execute(cursorObject *curs, const char *query, int async, int no_result)
{
PGresult *pgres = NULL;
char *error = NULL;
@ -938,7 +942,7 @@ pq_execute(cursorObject *curs, const char *query, int async)
to respect the old DBAPI-2.0 compatible behaviour */
if (async == 0) {
Dprintf("pq_execute: entering syncronous DBAPI compatibility mode");
if (pq_fetch(curs) < 0) return -1;
if (pq_fetch(curs, no_result) < 0) return -1;
}
else {
PyObject *tmp;
@ -976,7 +980,7 @@ pq_send_query(connectionObject *conn, const char *query)
/* Return the last result available on the connection.
*
* The function will block will block only if a command is active and the
* The function will block only if a command is active and the
* necessary response data has not yet been read by PQconsumeInput.
*
* The result should be disposed using PQclear()
@ -1302,9 +1306,9 @@ _pq_copy_in_v3(cursorObject *curs)
res = PQputCopyEnd(curs->conn->pgconn, "error in .read() call");
IFCLEARPGRES(curs->pgres);
Dprintf("_pq_copy_in_v3: copy ended; res = %d", res);
/* if the result is -1 we should not even try to get a result from the
bacause that will lock the current thread forever */
if (res == -1) {
@ -1316,7 +1320,13 @@ _pq_copy_in_v3(cursorObject *curs)
}
else {
/* and finally we grab the operation result from the backend */
while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
for (;;) {
Py_BEGIN_ALLOW_THREADS;
curs->pgres = PQgetResult(curs->conn->pgconn);
Py_END_ALLOW_THREADS;
if (NULL == curs->pgres)
break;
if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
pq_raise(curs->conn, curs, NULL);
IFCLEARPGRES(curs->pgres);
@ -1386,7 +1396,13 @@ _pq_copy_out_v3(cursorObject *curs)
/* and finally we grab the operation result from the backend */
IFCLEARPGRES(curs->pgres);
while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
for (;;) {
Py_BEGIN_ALLOW_THREADS;
curs->pgres = PQgetResult(curs->conn->pgconn);
Py_END_ALLOW_THREADS;
if (NULL == curs->pgres)
break;
if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
pq_raise(curs->conn, curs, NULL);
IFCLEARPGRES(curs->pgres);
@ -1399,7 +1415,7 @@ exit:
}
int
pq_fetch(cursorObject *curs)
pq_fetch(cursorObject *curs, int no_result)
{
int pgstatus, ex = -1;
const char *rowcount;
@ -1463,10 +1479,18 @@ pq_fetch(cursorObject *curs)
break;
case PGRES_TUPLES_OK:
Dprintf("pq_fetch: data from a SELECT (got tuples)");
curs->rowcount = PQntuples(curs->pgres);
if (0 == _pq_fetch_tuples(curs)) { ex = 0; }
/* don't clear curs->pgres, because it contains the results! */
if (!no_result) {
Dprintf("pq_fetch: got tuples");
curs->rowcount = PQntuples(curs->pgres);
if (0 == _pq_fetch_tuples(curs)) { ex = 0; }
/* don't clear curs->pgres, because it contains the results! */
}
else {
Dprintf("pq_fetch: got tuples, discarding them");
IFCLEARPGRES(curs->pgres);
curs->rowcount = -1;
ex = 0;
}
break;
case PGRES_EMPTY_QUERY:

View File

@ -35,8 +35,9 @@
/* exported functions */
HIDDEN PGresult *pq_get_last_result(connectionObject *conn);
RAISES_NEG HIDDEN int pq_fetch(cursorObject *curs);
RAISES_NEG HIDDEN int pq_execute(cursorObject *curs, const char *query, int async);
RAISES_NEG HIDDEN int pq_fetch(cursorObject *curs, int no_result);
RAISES_NEG HIDDEN int pq_execute(cursorObject *curs, const char *query,
int async, int no_result);
HIDDEN int pq_send_query(connectionObject *conn, const char *query);
HIDDEN int pq_begin_locked(connectionObject *conn, PGresult **pgres,
char **error, PyThreadState **tstate);

View File

@ -433,6 +433,8 @@ static struct {
};
#if PY_VERSION_HEX >= 0x02050000
/* Error.__reduce_ex__
*
* The method is required to make exceptions picklable: set the cursor
@ -484,6 +486,8 @@ error:
return rv;
}
#endif /* PY_VERSION_HEX >= 0x02050000 */
static int
psyco_errors_init(void)
{
@ -498,8 +502,10 @@ psyco_errors_init(void)
PyObject *descr = NULL;
int rv = -1;
#if PY_VERSION_HEX >= 0x02050000
static PyMethodDef psyco_error_reduce_ex_def =
{"__reduce_ex__", psyco_error_reduce_ex, METH_VARARGS, "pickle helper"};
#endif
for (i=0; exctable[i].name; i++) {
if (!(dict = PyDict_New())) { goto exit; }

View File

@ -35,6 +35,11 @@
# error "psycopg requires Python >= 2.4"
#endif
#if PY_VERSION_HEX < 0x02050000
/* Function missing in Py 2.4 */
#define PyErr_WarnEx(cat,msg,lvl) PyErr_Warn(cat,msg)
#endif
#if PY_VERSION_HEX < 0x02050000 && !defined(PY_SSIZE_T_MIN)
typedef int Py_ssize_t;
#define PY_SSIZE_T_MIN INT_MIN

View File

@ -212,8 +212,10 @@ typecast_array_scan(const char *str, Py_ssize_t strlength,
PyList_Append(array, sub);
Py_DECREF(sub);
if (stack_index == MAX_DIMENSIONS)
if (stack_index == MAX_DIMENSIONS) {
PyErr_SetString(DataError, "excessive array dimensions");
return -1;
}
stack[stack_index++] = array;
array = sub;
@ -224,9 +226,11 @@ typecast_array_scan(const char *str, Py_ssize_t strlength,
}
else if (state == ASCAN_END) {
if (--stack_index < 0)
if (stack_index == 0) {
PyErr_SetString(DataError, "unbalanced braces in array");
return -1;
array = stack[stack_index];
}
array = stack[--stack_index];
}
else if (state == ASCAN_EOF)
@ -253,7 +257,11 @@ typecast_GENERIC_ARRAY_cast(const char *str, Py_ssize_t len, PyObject *curs)
if (str[0] == '[')
typecast_array_cleanup(&str, &len);
if (str[0] != '{') {
PyErr_SetString(Error, "array does not start with '{'");
PyErr_SetString(DataError, "array does not start with '{'");
return NULL;
}
if (str[1] == '\0') {
PyErr_SetString(DataError, "malformed array: '{'");
return NULL;
}

View File

@ -28,6 +28,7 @@
#include "psycopg/psycopg.h"
#include "psycopg/xid.h"
#include "psycopg/cursor.h"
static const char xid_doc[] =
@ -660,8 +661,11 @@ xid_recover(PyObject *conn)
PyObject *tmp;
Py_ssize_t len, i;
/* curs = conn.cursor() */
if (!(curs = PyObject_CallMethod(conn, "cursor", NULL))) { goto exit; }
/* curs = conn.cursor()
* (sort of. Use the real cursor in case the connection returns
* somenthing non-dbapi -- see ticket #114) */
if (!(curs = PyObject_CallFunctionObjArgs(
(PyObject *)&cursorType, conn, NULL))) { goto exit; }
/* curs.execute(...) */
if (!(tmp = PyObject_CallMethod(curs, "execute", "s",

View File

@ -0,0 +1,81 @@
#!/usr/bin/env python
"""Test for issue #113: test with error during green processing
"""
DSN = 'dbname=test'
import eventlet.patcher
eventlet.patcher.monkey_patch()
import os
import signal
from time import sleep
import psycopg2
from psycopg2 import extensions
from eventlet.hubs import trampoline
# register a test wait callback that fails if SIGHUP is received
panic = []
def wait_cb(conn):
"""A wait callback useful to allow eventlet to work with Psycopg."""
while 1:
if panic:
raise Exception('whatever')
state = conn.poll()
if state == extensions.POLL_OK:
break
elif state == extensions.POLL_READ:
trampoline(conn.fileno(), read=True)
elif state == extensions.POLL_WRITE:
trampoline(conn.fileno(), write=True)
else:
raise psycopg2.OperationalError(
"Bad result from poll: %r" % state)
extensions.set_wait_callback(wait_cb)
# SIGHUP handler to inject a fail in the callback
def handler(signum, frame):
panic.append(True)
signal.signal(signal.SIGHUP, handler)
# Simulate another green thread working
def worker():
while 1:
print "I'm working"
sleep(1)
eventlet.spawn(worker)
# You can unplug the network cable etc. here.
# Kill -HUP will raise an exception in the callback.
print "PID", os.getpid()
conn = psycopg2.connect(DSN)
curs = conn.cursor()
try:
for i in range(1000):
curs.execute("select %s, pg_sleep(1)", (i,))
r = curs.fetchone()
print "selected", r
except BaseException, e:
print "got exception:", e.__class__.__name__, e
if conn.closed:
print "the connection is closed"
else:
conn.rollback()
curs.execute("select 1")
print curs.fetchone()

View File

@ -73,7 +73,7 @@ except ImportError:
# Take a look at http://www.python.org/dev/peps/pep-0386/
# for a consistent versioning pattern.
PSYCOPG_VERSION = '2.4.5'
PSYCOPG_VERSION = '2.4.6'
version_flags = ['dt', 'dec']

View File

@ -458,8 +458,8 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
cnn.close()
return rv
def connect(self):
conn = psycopg2.connect(dsn)
def connect(self, **kwargs):
conn = psycopg2.connect(dsn, **kwargs)
self._conns.append(conn)
return conn
@ -760,6 +760,20 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
cnn.tpc_prepare()
self.assertRaises(psycopg2.ProgrammingError, cnn.cancel)
def test_tpc_recover_non_dbapi_connection(self):
from psycopg2.extras import RealDictConnection
cnn = self.connect(connection_factory=RealDictConnection)
cnn.tpc_begin('dict-connection')
cnn.tpc_prepare()
cnn.reset()
xids = cnn.tpc_recover()
xid = [ xid for xid in xids if xid.database == dbname ][0]
self.assertEqual(None, xid.format_id)
self.assertEqual('dict-connection', xid.gtrid)
self.assertEqual(None, xid.bqual)
from testutils import skip_if_tpc_disabled
decorate_all_tests(ConnectionTwoPhaseTests, skip_if_tpc_disabled)

View File

@ -165,6 +165,10 @@ class CursorTests(unittest.TestCase):
del curs
self.assert_(w() is None)
def test_null_name(self):
curs = self.conn.cursor(None)
self.assertEqual(curs.name, None)
def test_invalid_name(self):
curs = self.conn.cursor()
curs.execute("create temp table invname (data int);")

View File

@ -539,6 +539,24 @@ class FixedOffsetTimezoneTests(unittest.TestCase):
self.assert_(FixedOffsetTimezone(9 * 60) is not FixedOffsetTimezone(9 * 60, 'FOO'))
self.assert_(FixedOffsetTimezone(name='FOO') is not FixedOffsetTimezone(9 * 60, 'FOO'))
def test_pickle(self):
# ticket #135
import pickle
tz11 = FixedOffsetTimezone(60)
tz12 = FixedOffsetTimezone(120)
for proto in [-1, 0, 1, 2]:
tz21, tz22 = pickle.loads(pickle.dumps([tz11, tz12], proto))
self.assertEqual(tz11, tz21)
self.assertEqual(tz12, tz22)
tz11 = FixedOffsetTimezone(60, name='foo')
tz12 = FixedOffsetTimezone(120, name='bar')
for proto in [-1, 0, 1, 2]:
tz21, tz22 = pickle.loads(pickle.dumps([tz11, tz12], proto))
self.assertEqual(tz11, tz21)
self.assertEqual(tz12, tz22)
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)

View File

@ -35,6 +35,16 @@ class ExtrasDictCursorTests(unittest.TestCase):
def tearDown(self):
self.conn.close()
def testDictConnCursorArgs(self):
self.conn.close()
self.conn = psycopg2.connect(dsn, connection_factory=psycopg2.extras.DictConnection)
cur = self.conn.cursor()
self.assert_(isinstance(cur, psycopg2.extras.DictCursor))
self.assertEqual(cur.name, None)
# overridable
cur = self.conn.cursor('foo', cursor_factory=psycopg2.extras.NamedTupleCursor)
self.assertEqual(cur.name, 'foo')
self.assert_(isinstance(cur, psycopg2.extras.NamedTupleCursor))
def testDictCursorWithPlainCursorFetchOne(self):
self._testWithPlainCursor(lambda curs: curs.fetchone())
@ -195,6 +205,32 @@ class ExtrasDictCursorTests(unittest.TestCase):
for i, r in enumerate(curs):
self.assertEqual(i + 1, curs.rownumber)
def testPickleDictRow(self):
import pickle
curs = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
curs.execute("select 10 as a, 20 as b")
r = curs.fetchone()
d = pickle.dumps(r)
r1 = pickle.loads(d)
self.assertEqual(r, r1)
self.assertEqual(r[0], r1[0])
self.assertEqual(r[1], r1[1])
self.assertEqual(r['a'], r1['a'])
self.assertEqual(r['b'], r1['b'])
self.assertEqual(r._index, r1._index)
def testPickleRealDictRow(self):
import pickle
curs = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
curs.execute("select 10 as a, 20 as b")
r = curs.fetchone()
d = pickle.dumps(r)
r1 = pickle.loads(d)
self.assertEqual(r, r1)
self.assertEqual(r['a'], r1['a'])
self.assertEqual(r['b'], r1['b'])
self.assertEqual(r._column_mapping, r1._column_mapping)
class NamedTupleCursorTest(unittest.TestCase):
def setUp(self):
@ -219,6 +255,12 @@ class NamedTupleCursorTest(unittest.TestCase):
if self.conn is not None:
self.conn.close()
@skip_if_no_namedtuple
def test_cursor_args(self):
cur = self.conn.cursor('foo', cursor_factory=psycopg2.extras.DictCursor)
self.assertEqual(cur.name, 'foo')
self.assert_(isinstance(cur, psycopg2.extras.DictCursor))
@skip_if_no_namedtuple
def test_fetchone(self):
curs = self.conn.cursor()

View File

@ -79,6 +79,9 @@ class GreenTests(unittest.TestCase):
warnings.warn("sending a large query didn't trigger block on write.")
def test_error_in_callback(self):
# behaviour changed after issue #113: if there is an error in the
# callback for the moment we don't have a way to reset the connection
# without blocking (ticket #113) so just close it.
conn = self.conn
curs = conn.cursor()
curs.execute("select 1") # have a BEGIN
@ -88,11 +91,21 @@ class GreenTests(unittest.TestCase):
psycopg2.extensions.set_wait_callback(lambda conn: 1//0)
self.assertRaises(ZeroDivisionError, curs.execute, "select 2")
self.assert_(conn.closed)
def test_dont_freak_out(self):
# if there is an error in a green query, don't freak out and close
# the connection
conn = self.conn
curs = conn.cursor()
self.assertRaises(psycopg2.ProgrammingError,
curs.execute, "select the unselectable")
# check that the connection is left in an usable state
psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select)
self.assert_(not conn.closed)
conn.rollback()
curs.execute("select 2")
self.assertEqual(2, curs.fetchone()[0])
curs.execute("select 1")
self.assertEqual(curs.fetchone()[0], 1)
def test_suite():

View File

@ -40,10 +40,10 @@ class ConnectTestCase(unittest.TestCase):
psycopg2._connect = self._connect_orig
def test_there_has_to_be_something(self):
self.assertRaises(psycopg2.InterfaceError, psycopg2.connect)
self.assertRaises(psycopg2.InterfaceError, psycopg2.connect,
self.assertRaises(TypeError, psycopg2.connect)
self.assertRaises(TypeError, psycopg2.connect,
connection_factory=lambda dsn, async=False: None)
self.assertRaises(psycopg2.InterfaceError, psycopg2.connect,
self.assertRaises(TypeError, psycopg2.connect,
async=True)
def test_no_keywords(self):
@ -127,6 +127,14 @@ class ConnectTestCase(unittest.TestCase):
psycopg2.connect(database=r"\every thing'")
self.assertEqual(self.args[0], r"dbname='\\every thing\''")
def test_no_kwargs_swallow(self):
self.assertRaises(TypeError,
psycopg2.connect, 'dbname=foo', database='foo')
self.assertRaises(TypeError,
psycopg2.connect, 'dbname=foo', user='postgres')
self.assertRaises(TypeError,
psycopg2.connect, 'dbname=foo', no_such_param='meh')
class ExceptionsTestCase(unittest.TestCase):
def setUp(self):

View File

@ -200,6 +200,13 @@ class TypesBasicTests(unittest.TestCase):
r = self.execute("SELECT %s AS foo", (ss,))
self.failUnlessEqual(ss, r)
def testArrayMalformed(self):
curs = self.conn.cursor()
ss = ['', '{', '{}}', '{' * 20 + '}' * 20]
for s in ss:
self.assertRaises(psycopg2.DataError,
psycopg2.extensions.STRINGARRAY, b(s), curs)
@testutils.skip_from_python(3)
def testTypeRoundtripBuffer(self):
o1 = buffer("".join(map(chr, range(256))))

View File

@ -424,6 +424,30 @@ class HstoreTestCase(unittest.TestCase):
psycopg2.extensions.string_types.pop(oid)
psycopg2.extensions.string_types.pop(aoid)
@skip_if_no_hstore
def test_non_dbapi_connection(self):
from psycopg2.extras import RealDictConnection
from psycopg2.extras import register_hstore
conn = psycopg2.connect(dsn, connection_factory=RealDictConnection)
try:
register_hstore(conn)
curs = conn.cursor()
curs.execute("select ''::hstore as x")
self.assertEqual(curs.fetchone()['x'], {})
finally:
conn.close()
conn = psycopg2.connect(dsn, connection_factory=RealDictConnection)
try:
curs = conn.cursor()
register_hstore(curs)
curs.execute("select ''::hstore as x")
self.assertEqual(curs.fetchone()['x'], {})
finally:
conn.close()
def skip_if_no_composite(f):
def skip_if_no_composite_(self):
if self.conn.server_version < 80000:
@ -479,6 +503,7 @@ class AdaptTypeTestCase(unittest.TestCase):
self.assertEqual(CompositeCaster.tokenize(s), v)
ok("(,)", [None, None])
ok('(,"")', [None, ''])
ok('(hello,,10.234,2010-11-11)', ['hello', None, '10.234', '2010-11-11'])
ok('(10,"""")', ['10', '"'])
ok('(10,",")', ['10', ','])
@ -532,6 +557,26 @@ class AdaptTypeTestCase(unittest.TestCase):
self.assertEqual(v.astring, "hello")
self.assertEqual(v.adate, date(2011,1,2))
@skip_if_no_composite
def test_empty_string(self):
# issue #141
self._create_type("type_ss", [('s1', 'text'), ('s2', 'text')])
curs = self.conn.cursor()
psycopg2.extras.register_composite("type_ss", curs)
def ok(t):
curs.execute("select %s::type_ss", (t,))
rv = curs.fetchone()[0]
self.assertEqual(t, rv)
ok(('a', 'b'))
ok(('a', ''))
ok(('', 'b'))
ok(('a', None))
ok((None, 'b'))
ok(('', ''))
ok((None, None))
@skip_if_no_composite
def test_cast_nested(self):
self._create_type("type_is",
@ -712,6 +757,30 @@ class AdaptTypeTestCase(unittest.TestCase):
self.assertEqual(r[0], (2, 'test2'))
self.assertEqual(r[1], [(3, 'testc', 2), (4, 'testd', 2)])
@skip_if_no_hstore
def test_non_dbapi_connection(self):
from psycopg2.extras import RealDictConnection
from psycopg2.extras import register_composite
self._create_type("type_ii", [("a", "integer"), ("b", "integer")])
conn = psycopg2.connect(dsn, connection_factory=RealDictConnection)
try:
register_composite('type_ii', conn)
curs = conn.cursor()
curs.execute("select '(1,2)'::type_ii as x")
self.assertEqual(curs.fetchone()['x'], (1,2))
finally:
conn.close()
conn = psycopg2.connect(dsn, connection_factory=RealDictConnection)
try:
curs = conn.cursor()
register_composite('type_ii', conn)
curs.execute("select '(1,2)'::type_ii as x")
self.assertEqual(curs.fetchone()['x'], (1,2))
finally:
conn.close()
def _create_type(self, name, fields):
curs = self.conn.cursor()
try: