Merge branch 'fix-113' into devel

This commit is contained in:
Daniele Varrazzo 2012-10-11 22:25:59 +01:00
commit 5fbf3ef147
7 changed files with 139 additions and 27 deletions

3
NEWS
View File

@ -16,6 +16,9 @@ What's new in psycopg 2.4.6
- Dropped GIL release during string adaptation around a function call - Dropped GIL release during string adaptation around a function call
invoking a Python API function, which could cause interpreter crash. invoking a Python API function, which could cause interpreter crash.
Thanks to Manu Cupcic for the report (ticket #110). Thanks to Manu Cupcic for the report (ticket #110).
- Close a green connection if there is an error in the callback.
Maybe a harsh solution but it leaves the program responsive
(ticket #113).
- 'register_hstore()', 'register_composite()', 'tpc_recover()' work with - 'register_hstore()', 'register_composite()', 'tpc_recover()' work with
RealDictConnection and Cursor (ticket #114). RealDictConnection and Cursor (ticket #114).
- connect() raises an exception instead of swallowing keyword arguments - connect() raises an exception instead of swallowing keyword arguments

View File

@ -141,6 +141,7 @@ HIDDEN void conn_notifies_process(connectionObject *self);
RAISES_NEG HIDDEN int conn_setup(connectionObject *self, PGconn *pgconn); RAISES_NEG HIDDEN int conn_setup(connectionObject *self, PGconn *pgconn);
HIDDEN int conn_connect(connectionObject *self, long int async); HIDDEN int conn_connect(connectionObject *self, long int async);
HIDDEN void conn_close(connectionObject *self); HIDDEN void conn_close(connectionObject *self);
HIDDEN void conn_close_locked(connectionObject *self);
RAISES_NEG HIDDEN int conn_commit(connectionObject *self); RAISES_NEG HIDDEN int conn_commit(connectionObject *self);
RAISES_NEG HIDDEN int conn_rollback(connectionObject *self); RAISES_NEG HIDDEN int conn_rollback(connectionObject *self);
RAISES_NEG HIDDEN int conn_set_session(connectionObject *self, const char *isolevel, RAISES_NEG HIDDEN int conn_set_session(connectionObject *self, const char *isolevel,

View File

@ -922,12 +922,24 @@ conn_close(connectionObject *self)
return; return;
} }
/* sets this connection as closed even for other threads; also note that /* sets this connection as closed even for other threads; */
we need to check the value of pgconn, because we get called even when
the connection fails! */
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock); pthread_mutex_lock(&self->lock);
conn_close_locked(self);
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;
}
/* conn_close_locked - shut down the connection with the lock already taken */
void conn_close_locked(connectionObject *self)
{
if (self->closed) {
return;
}
/* We used to call pq_abort_locked here, but the idea of issuing a /* We used to call pq_abort_locked here, but the idea of issuing a
* rollback on close/GC has been considered inappropriate. * rollback on close/GC has been considered inappropriate.
* *
@ -937,9 +949,10 @@ conn_close(connectionObject *self)
* transaction though: to avoid these problems the transaction should be * transaction though: to avoid these problems the transaction should be
* closed only in status CONN_STATUS_READY. * closed only in status CONN_STATUS_READY.
*/ */
self->closed = 1; self->closed = 1;
/* we need to check the value of pgconn, because we get called even when
* the connection fails! */
if (self->pgconn) { if (self->pgconn) {
PQfinish(self->pgconn); PQfinish(self->pgconn);
self->pgconn = NULL; self->pgconn = NULL;
@ -947,9 +960,6 @@ conn_close(connectionObject *self)
PQfreeCancel(self->cancel); PQfreeCancel(self->cancel);
self->cancel = NULL; self->cancel = NULL;
} }
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;
} }
/* conn_commit - commit on a connection */ /* conn_commit - commit on a connection */

View File

@ -34,7 +34,7 @@
HIDDEN PyObject *wait_callback = NULL; HIDDEN PyObject *wait_callback = NULL;
static PyObject *have_wait_callback(void); static PyObject *have_wait_callback(void);
static void psyco_clear_result_blocking(connectionObject *conn); static void green_panic(connectionObject *conn);
/* Register a callback function to block waiting for data. /* Register a callback function to block waiting for data.
* *
@ -178,7 +178,7 @@ psyco_exec_green(connectionObject *conn, const char *command)
conn->async_status = ASYNC_WRITE; conn->async_status = ASYNC_WRITE;
if (0 != psyco_wait(conn)) { if (0 != psyco_wait(conn)) {
psyco_clear_result_blocking(conn); green_panic(conn);
goto end; goto end;
} }
@ -192,22 +192,21 @@ end:
} }
/* Discard the result of the currenly executed query, blocking. /* There has been a communication error during query execution. It may have
* * happened e.g. for a network error or an error in the callback, and we
* This function doesn't honour the wait callback: it can be used in case of * cannot tell the two apart.
* emergency if the callback fails in order to put the connection back into a * Trying to PQcancel or PQgetResult to put the connection back into a working
* consistent state. * state doesn't work nice (issue #113): the program blocks and the
* * interpreter won't even respond to SIGINT. PQreset could work async, but the
* If any command was issued before clearing the result, libpq would fail with * python program would have then a connection made but not configured where
* the error "another command is already in progress". * it is probably not designed to handled. So for the moment we do the kindest
* thing we can: we close the connection. A long-running program should
* already have a way to discard broken connections; a short-lived one would
* benefit of working ctrl-c.
*/ */
static void static void
psyco_clear_result_blocking(connectionObject *conn) green_panic(connectionObject *conn)
{ {
PGresult *res; Dprintf("green_panic: closing the connection");
conn_close_locked(conn);
Dprintf("psyco_clear_result_blocking");
while (NULL != (res = PQgetResult(conn->pgconn))) {
PQclear(res);
}
} }

View File

@ -35,6 +35,11 @@
# error "psycopg requires Python >= 2.4" # error "psycopg requires Python >= 2.4"
#endif #endif
#if PY_VERSION_HEX < 0x02050000
/* Function missing in Py 2.4 */
#define PyErr_WarnEx(cat,msg,lvl) PyErr_Warn(cat,msg)
#endif
#if PY_VERSION_HEX < 0x02050000 && !defined(PY_SSIZE_T_MIN) #if PY_VERSION_HEX < 0x02050000 && !defined(PY_SSIZE_T_MIN)
typedef int Py_ssize_t; typedef int Py_ssize_t;
#define PY_SSIZE_T_MIN INT_MIN #define PY_SSIZE_T_MIN INT_MIN

View File

@ -0,0 +1,81 @@
#!/usr/bin/env python
"""Test for issue #113: test with error during green processing
"""
DSN = 'dbname=test'
import eventlet.patcher
eventlet.patcher.monkey_patch()
import os
import signal
from time import sleep
import psycopg2
from psycopg2 import extensions
from eventlet.hubs import trampoline
# register a test wait callback that fails if SIGHUP is received
panic = []
def wait_cb(conn):
"""A wait callback useful to allow eventlet to work with Psycopg."""
while 1:
if panic:
raise Exception('whatever')
state = conn.poll()
if state == extensions.POLL_OK:
break
elif state == extensions.POLL_READ:
trampoline(conn.fileno(), read=True)
elif state == extensions.POLL_WRITE:
trampoline(conn.fileno(), write=True)
else:
raise psycopg2.OperationalError(
"Bad result from poll: %r" % state)
extensions.set_wait_callback(wait_cb)
# SIGHUP handler to inject a fail in the callback
def handler(signum, frame):
panic.append(True)
signal.signal(signal.SIGHUP, handler)
# Simulate another green thread working
def worker():
while 1:
print "I'm working"
sleep(1)
eventlet.spawn(worker)
# You can unplug the network cable etc. here.
# Kill -HUP will raise an exception in the callback.
print "PID", os.getpid()
conn = psycopg2.connect(DSN)
curs = conn.cursor()
try:
for i in range(1000):
curs.execute("select %s, pg_sleep(1)", (i,))
r = curs.fetchone()
print "selected", r
except BaseException, e:
print "got exception:", e.__class__.__name__, e
if conn.closed:
print "the connection is closed"
else:
conn.rollback()
curs.execute("select 1")
print curs.fetchone()

View File

@ -79,6 +79,9 @@ class GreenTests(unittest.TestCase):
warnings.warn("sending a large query didn't trigger block on write.") warnings.warn("sending a large query didn't trigger block on write.")
def test_error_in_callback(self): def test_error_in_callback(self):
# behaviour changed after issue #113: if there is an error in the
# callback for the moment we don't have a way to reset the connection
# without blocking (ticket #113) so just close it.
conn = self.conn conn = self.conn
curs = conn.cursor() curs = conn.cursor()
curs.execute("select 1") # have a BEGIN curs.execute("select 1") # have a BEGIN
@ -88,11 +91,21 @@ class GreenTests(unittest.TestCase):
psycopg2.extensions.set_wait_callback(lambda conn: 1//0) psycopg2.extensions.set_wait_callback(lambda conn: 1//0)
self.assertRaises(ZeroDivisionError, curs.execute, "select 2") self.assertRaises(ZeroDivisionError, curs.execute, "select 2")
self.assert_(conn.closed)
def test_dont_freak_out(self):
# if there is an error in a green query, don't freak out and close
# the connection
conn = self.conn
curs = conn.cursor()
self.assertRaises(psycopg2.ProgrammingError,
curs.execute, "select the unselectable")
# check that the connection is left in an usable state # check that the connection is left in an usable state
psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select) self.assert_(not conn.closed)
conn.rollback() conn.rollback()
curs.execute("select 2") curs.execute("select 1")
self.assertEqual(2, curs.fetchone()[0]) self.assertEqual(curs.fetchone()[0], 1)
def test_suite(): def test_suite():