mirror of
				https://github.com/psycopg/psycopg2.git
				synced 2025-11-04 09:47:30 +03:00 
			
		
		
		
	Close the connection on error in callback
Unfortunately PQcancel blocks, so it's not better than PQgetResult. It has been suggested to use PQreset in non-blocking way but this would give the Python program the burden of handling a connection done but not configured in an unexpected place.
This commit is contained in:
		
							parent
							
								
									7632e1ae46
								
							
						
					
					
						commit
						58d048198f
					
				
							
								
								
									
										3
									
								
								NEWS
									
									
									
									
									
								
							
							
						
						
									
										3
									
								
								NEWS
									
									
									
									
									
								
							| 
						 | 
				
			
			@ -7,6 +7,9 @@ What's new in psycopg 2.4.6
 | 
			
		|||
  - Dropped GIL release during string adaptation around a function call
 | 
			
		||||
    invoking a Python API function, which could cause interpreter crash.
 | 
			
		||||
    Thanks to Manu Cupcic for the report (ticket #110).
 | 
			
		||||
  - Close a green connection if there is an error in the callback.
 | 
			
		||||
    Maybe a harsh solution but it leaves the program responsive
 | 
			
		||||
    (ticket #113).
 | 
			
		||||
  - 'register_hstore()', 'register_composite()', 'tpc_recover()' work with
 | 
			
		||||
    RealDictConnection and Cursor (ticket #114).
 | 
			
		||||
  - connect() raises an exception instead of swallowing keyword arguments
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -34,7 +34,7 @@
 | 
			
		|||
HIDDEN PyObject *wait_callback = NULL;
 | 
			
		||||
 | 
			
		||||
static PyObject *have_wait_callback(void);
 | 
			
		||||
static void psyco_panic_cancel(connectionObject *conn);
 | 
			
		||||
static void green_panic(connectionObject *conn);
 | 
			
		||||
 | 
			
		||||
/* Register a callback function to block waiting for data.
 | 
			
		||||
 *
 | 
			
		||||
| 
						 | 
				
			
			@ -178,7 +178,7 @@ psyco_exec_green(connectionObject *conn, const char *command)
 | 
			
		|||
    conn->async_status = ASYNC_WRITE;
 | 
			
		||||
 | 
			
		||||
    if (0 != psyco_wait(conn)) {
 | 
			
		||||
        psyco_panic_cancel(conn);
 | 
			
		||||
        green_panic(conn);
 | 
			
		||||
        goto end;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -194,52 +194,19 @@ end:
 | 
			
		|||
 | 
			
		||||
/* There has been a communication error during query execution. It may have
 | 
			
		||||
 * happened e.g. for a network error or an error in the callback, and we
 | 
			
		||||
 * cannot tell the two apart. The strategy here to avoid blocking (issue #113)
 | 
			
		||||
 * is to try and cancel the query, waiting for the result in non-blocking way.
 | 
			
		||||
 * If again we receive an error, we raise an error and close the connection.
 | 
			
		||||
 * Discard the result of the currenly executed query, blocking.
 | 
			
		||||
 * cannot tell the two apart.
 | 
			
		||||
 * Trying to PQcancel or PQgetResult to put the connection back into a working
 | 
			
		||||
 * state doesn't work nice (issue #113): the program blocks and the
 | 
			
		||||
 * interpreter won't even respond to SIGINT. PQreset could work async, but the
 | 
			
		||||
 * python program would have then a connection made but not configured where
 | 
			
		||||
 * it is probably not designed to handled. So for the moment we do the kindest
 | 
			
		||||
 * thing we can: we close the connection. A long-running program should
 | 
			
		||||
 * already have a way to discard broken connections; a short-lived one would
 | 
			
		||||
 * benefit of working ctrl-c.
 | 
			
		||||
 */
 | 
			
		||||
static void
 | 
			
		||||
psyco_panic_cancel(connectionObject *conn)
 | 
			
		||||
green_panic(connectionObject *conn)
 | 
			
		||||
{
 | 
			
		||||
    PGresult *res;
 | 
			
		||||
    PyObject *etype, *evalue, *etb;
 | 
			
		||||
    char errbuf[256];
 | 
			
		||||
 | 
			
		||||
    /* we should have an exception set. */
 | 
			
		||||
    PyErr_Fetch(&etype, &evalue, &etb);
 | 
			
		||||
    if (NULL == etype) {
 | 
			
		||||
        Dprintf("panic_cancel: called without exception set");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /* Try sending the cancel signal */
 | 
			
		||||
    Dprintf("panic_cancel: sending cancel request");
 | 
			
		||||
    if (PQcancel(conn->cancel, errbuf, sizeof(errbuf)) == 0) {
 | 
			
		||||
        Dprintf("panic_cancel: canceling failed: %s", errbuf);
 | 
			
		||||
        /* raise a warning: we'll keep the previous error */
 | 
			
		||||
        PyErr_WarnEx(NULL, errbuf, 1);
 | 
			
		||||
        goto exit;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /* go back in the loop for another attempt at async processing */
 | 
			
		||||
    /* TODO: should we start on ASYNC_WRITE instead? */
 | 
			
		||||
    if (0 != psyco_wait(conn)) {
 | 
			
		||||
        Dprintf("panic_cancel: error after cancel: closing the connection");
 | 
			
		||||
        PyErr_WarnEx(NULL, "async cancel failed: closing the connection", 1);
 | 
			
		||||
        conn_close_locked(conn);
 | 
			
		||||
        goto exit;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /* we must clear the result or we get "another command is already in
 | 
			
		||||
     * progress" */
 | 
			
		||||
    if (NULL != (res = pq_get_last_result(conn))) {
 | 
			
		||||
        PQclear(res);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
exit:
 | 
			
		||||
    /* restore the exception. If no exception was set at function begin, don't
 | 
			
		||||
     * clobber one that may have been set here. */
 | 
			
		||||
    if (etype) {
 | 
			
		||||
        PyErr_Restore(etype, evalue, etb);
 | 
			
		||||
    }
 | 
			
		||||
    Dprintf("green_panic: closing the connection");
 | 
			
		||||
    conn_close_locked(conn);
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -4,15 +4,20 @@
 | 
			
		|||
 | 
			
		||||
DSN = 'dbname=test'
 | 
			
		||||
 | 
			
		||||
# import eventlet.patcher
 | 
			
		||||
# eventlet.patcher.monkey_patch()
 | 
			
		||||
import eventlet.patcher
 | 
			
		||||
eventlet.patcher.monkey_patch()
 | 
			
		||||
 | 
			
		||||
import os
 | 
			
		||||
import signal
 | 
			
		||||
from time import sleep
 | 
			
		||||
 | 
			
		||||
import psycopg2
 | 
			
		||||
from psycopg2 import extensions
 | 
			
		||||
from eventlet.hubs import trampoline
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# register a test wait callback that fails if SIGHUP is received
 | 
			
		||||
 | 
			
		||||
panic = []
 | 
			
		||||
 | 
			
		||||
def wait_cb(conn):
 | 
			
		||||
| 
						 | 
				
			
			@ -34,23 +39,43 @@ def wait_cb(conn):
 | 
			
		|||
 | 
			
		||||
extensions.set_wait_callback(wait_cb)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# SIGHUP handler to inject a fail in the callback
 | 
			
		||||
 | 
			
		||||
def handler(signum, frame):
 | 
			
		||||
    panic.append(True)
 | 
			
		||||
 | 
			
		||||
signal.signal(signal.SIGHUP, handler)
 | 
			
		||||
 | 
			
		||||
conn = psycopg2.connect(DSN)
 | 
			
		||||
curs = conn.cursor()
 | 
			
		||||
print "PID", os.getpid()
 | 
			
		||||
try:
 | 
			
		||||
    curs.execute("select pg_sleep(1000)")
 | 
			
		||||
except BaseException, e:
 | 
			
		||||
    print "got exception:", e.__class__.__name__, e
 | 
			
		||||
 | 
			
		||||
conn.rollback()
 | 
			
		||||
curs.execute("select 1")
 | 
			
		||||
print curs.fetchone()
 | 
			
		||||
# Simulate another green thread working
 | 
			
		||||
 | 
			
		||||
def worker():
 | 
			
		||||
    while 1:
 | 
			
		||||
        print "I'm working"
 | 
			
		||||
        sleep(1)
 | 
			
		||||
 | 
			
		||||
eventlet.spawn(worker)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# You can unplug the network cable etc. here.
 | 
			
		||||
# Kill -HUP will raise an exception in the callback.
 | 
			
		||||
 | 
			
		||||
print "PID", os.getpid()
 | 
			
		||||
conn = psycopg2.connect(DSN)
 | 
			
		||||
curs = conn.cursor()
 | 
			
		||||
try:
 | 
			
		||||
    for i in range(1000):
 | 
			
		||||
        curs.execute("select %s, pg_sleep(1)", (i,))
 | 
			
		||||
        r = curs.fetchone()
 | 
			
		||||
        print "selected", r
 | 
			
		||||
 | 
			
		||||
except BaseException, e:
 | 
			
		||||
    print "got exception:", e.__class__.__name__, e
 | 
			
		||||
 | 
			
		||||
if conn.closed:
 | 
			
		||||
    print "the connection is closed"
 | 
			
		||||
else:
 | 
			
		||||
    conn.rollback()
 | 
			
		||||
    curs.execute("select 1")
 | 
			
		||||
    print curs.fetchone()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -79,6 +79,9 @@ class GreenTests(unittest.TestCase):
 | 
			
		|||
        warnings.warn("sending a large query didn't trigger block on write.")
 | 
			
		||||
 | 
			
		||||
    def test_error_in_callback(self):
 | 
			
		||||
        # behaviour changed after issue #113: if there is an error in the
 | 
			
		||||
        # callback for the moment we don't have a way to reset the connection
 | 
			
		||||
        # without blocking (ticket #113) so just close it.
 | 
			
		||||
        conn = self.conn
 | 
			
		||||
        curs = conn.cursor()
 | 
			
		||||
        curs.execute("select 1")  # have a BEGIN
 | 
			
		||||
| 
						 | 
				
			
			@ -88,11 +91,21 @@ class GreenTests(unittest.TestCase):
 | 
			
		|||
        psycopg2.extensions.set_wait_callback(lambda conn: 1//0)
 | 
			
		||||
        self.assertRaises(ZeroDivisionError, curs.execute, "select 2")
 | 
			
		||||
 | 
			
		||||
        self.assert_(conn.closed)
 | 
			
		||||
 | 
			
		||||
    def test_dont_freak_out(self):
 | 
			
		||||
        # if there is an error in a green query, don't freak out and close
 | 
			
		||||
        # the connection
 | 
			
		||||
        conn = self.conn
 | 
			
		||||
        curs = conn.cursor()
 | 
			
		||||
        self.assertRaises(psycopg2.ProgrammingError,
 | 
			
		||||
            curs.execute, "select the unselectable")
 | 
			
		||||
 | 
			
		||||
        # check that the connection is left in an usable state
 | 
			
		||||
        psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select)
 | 
			
		||||
        self.assert_(not conn.closed)
 | 
			
		||||
        conn.rollback()
 | 
			
		||||
        curs.execute("select 2")
 | 
			
		||||
        self.assertEqual(2, curs.fetchone()[0])
 | 
			
		||||
        curs.execute("select 1")
 | 
			
		||||
        self.assertEqual(curs.fetchone()[0], 1)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_suite():
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue
	
	Block a user