mirror of
				https://github.com/psycopg/psycopg2.git
				synced 2025-11-04 01:37:31 +03:00 
			
		
		
		
	Merge branch 'execute-locks'
This commit is contained in:
		
						commit
						1e6d5fb32d
					
				
							
								
								
									
										7
									
								
								NEWS
									
									
									
									
									
								
							
							
						
						
									
										7
									
								
								NEWS
									
									
									
									
									
								
							| 
						 | 
					@ -44,6 +44,13 @@ Other changes:
 | 
				
			||||||
  install``.
 | 
					  install``.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					What's new in psycopg 2.7.7
 | 
				
			||||||
 | 
					^^^^^^^^^^^^^^^^^^^^^^^^^^^
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					- Cleanup of the cursor results assignment code, which might have solved
 | 
				
			||||||
 | 
					  double free and inconsistencies in concurrent usage (:tickets:`#346, #384`).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
What's new in psycopg 2.7.6.1
 | 
					What's new in psycopg 2.7.6.1
 | 
				
			||||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 | 
					^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
							
								
								
									
										246
									
								
								psycopg/pqpath.c
									
									
									
									
									
								
							
							
						
						
									
										246
									
								
								psycopg/pqpath.c
									
									
									
									
									
								
							| 
						 | 
					@ -996,12 +996,145 @@ pq_flush(connectionObject *conn)
 | 
				
			||||||
*/
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
RAISES_NEG int
 | 
					RAISES_NEG int
 | 
				
			||||||
pq_execute(cursorObject *curs, const char *query, int async, int no_result, int no_begin)
 | 
					_pq_execute_sync(cursorObject *curs, const char *query, int no_result, int no_begin)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    PGresult *pgres = NULL;
 | 
				
			||||||
 | 
					    char *error = NULL;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    CLEARPGRES(curs->pgres);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Py_BEGIN_ALLOW_THREADS;
 | 
				
			||||||
 | 
					    pthread_mutex_lock(&(curs->conn->lock));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if (!no_begin && pq_begin_locked(curs->conn, &pgres, &error, &_save) < 0) {
 | 
				
			||||||
 | 
					        pthread_mutex_unlock(&(curs->conn->lock));
 | 
				
			||||||
 | 
					        Py_BLOCK_THREADS;
 | 
				
			||||||
 | 
					        pq_complete_error(curs->conn, &pgres, &error);
 | 
				
			||||||
 | 
					        return -1;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Dprintf("pq_execute: executing SYNC query: pgconn = %p", curs->conn->pgconn);
 | 
				
			||||||
 | 
					    Dprintf("    %-.200s", query);
 | 
				
			||||||
 | 
					    if (!psyco_green()) {
 | 
				
			||||||
 | 
					        pgres = PQexec(curs->conn->pgconn, query);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    else {
 | 
				
			||||||
 | 
					        Py_BLOCK_THREADS;
 | 
				
			||||||
 | 
					        pgres = psyco_exec_green(curs->conn, query);
 | 
				
			||||||
 | 
					        Py_UNBLOCK_THREADS;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /* don't let pgres = NULL go to pq_fetch() */
 | 
				
			||||||
 | 
					    if (pgres == NULL) {
 | 
				
			||||||
 | 
					        if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) {
 | 
				
			||||||
 | 
					            curs->conn->closed = 2;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        pthread_mutex_unlock(&(curs->conn->lock));
 | 
				
			||||||
 | 
					        Py_BLOCK_THREADS;
 | 
				
			||||||
 | 
					        if (!PyErr_Occurred()) {
 | 
				
			||||||
 | 
					            PyErr_SetString(OperationalError,
 | 
				
			||||||
 | 
					                            PQerrorMessage(curs->conn->pgconn));
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        return -1;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Py_BLOCK_THREADS;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /* assign the result back to the cursor now that we have the GIL */
 | 
				
			||||||
 | 
					    curs->pgres = pgres;
 | 
				
			||||||
 | 
					    pgres = NULL;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /* Process notifies here instead of when fetching the tuple as we are
 | 
				
			||||||
 | 
					     * into the same critical section that received the data. Without this
 | 
				
			||||||
 | 
					     * care, reading notifies may disrupt other thread communications.
 | 
				
			||||||
 | 
					     * (as in ticket #55). */
 | 
				
			||||||
 | 
					    conn_notifies_process(curs->conn);
 | 
				
			||||||
 | 
					    conn_notice_process(curs->conn);
 | 
				
			||||||
 | 
					    Py_UNBLOCK_THREADS;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    pthread_mutex_unlock(&(curs->conn->lock));
 | 
				
			||||||
 | 
					    Py_END_ALLOW_THREADS;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /* if the execute was sync, we call pq_fetch() immediately,
 | 
				
			||||||
 | 
					       to respect the old DBAPI-2.0 compatible behaviour */
 | 
				
			||||||
 | 
					    Dprintf("pq_execute: entering synchronous DBAPI compatibility mode");
 | 
				
			||||||
 | 
					    if (pq_fetch(curs, no_result) < 0) return -1;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return 1;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					RAISES_NEG int
 | 
				
			||||||
 | 
					_pq_execute_async(cursorObject *curs, const char *query, int no_result, int no_begin)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
    PGresult *pgres = NULL;
 | 
					    PGresult *pgres = NULL;
 | 
				
			||||||
    char *error = NULL;
 | 
					    char *error = NULL;
 | 
				
			||||||
    int async_status = ASYNC_WRITE;
 | 
					    int async_status = ASYNC_WRITE;
 | 
				
			||||||
 | 
					    int ret;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    CLEARPGRES(curs->pgres);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Py_BEGIN_ALLOW_THREADS;
 | 
				
			||||||
 | 
					    pthread_mutex_lock(&(curs->conn->lock));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /* TODO: is this needed here? */
 | 
				
			||||||
 | 
					    if (!no_begin && pq_begin_locked(curs->conn, &pgres, &error, &_save) < 0) {
 | 
				
			||||||
 | 
					        pthread_mutex_unlock(&(curs->conn->lock));
 | 
				
			||||||
 | 
					        Py_BLOCK_THREADS;
 | 
				
			||||||
 | 
					        pq_complete_error(curs->conn, &pgres, &error);
 | 
				
			||||||
 | 
					        return -1;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Dprintf("pq_execute: executing ASYNC query: pgconn = %p", curs->conn->pgconn);
 | 
				
			||||||
 | 
					    Dprintf("    %-.200s", query);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if (PQsendQuery(curs->conn->pgconn, query) == 0) {
 | 
				
			||||||
 | 
					        if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) {
 | 
				
			||||||
 | 
					            curs->conn->closed = 2;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        pthread_mutex_unlock(&(curs->conn->lock));
 | 
				
			||||||
 | 
					        Py_BLOCK_THREADS;
 | 
				
			||||||
 | 
					        PyErr_SetString(OperationalError,
 | 
				
			||||||
 | 
					                        PQerrorMessage(curs->conn->pgconn));
 | 
				
			||||||
 | 
					        return -1;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    Dprintf("pq_execute: async query sent to backend");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    ret = PQflush(curs->conn->pgconn);
 | 
				
			||||||
 | 
					    if (ret == 0) {
 | 
				
			||||||
 | 
					        /* the query got fully sent to the server */
 | 
				
			||||||
 | 
					        Dprintf("pq_execute: query got flushed immediately");
 | 
				
			||||||
 | 
					        /* the async status will be ASYNC_READ */
 | 
				
			||||||
 | 
					        async_status = ASYNC_READ;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    else if (ret == 1) {
 | 
				
			||||||
 | 
					        /* not all of the query got sent to the server */
 | 
				
			||||||
 | 
					        async_status = ASYNC_WRITE;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    else {
 | 
				
			||||||
 | 
					        /* there was an error */
 | 
				
			||||||
 | 
					        pthread_mutex_unlock(&(curs->conn->lock));
 | 
				
			||||||
 | 
					        Py_BLOCK_THREADS;
 | 
				
			||||||
 | 
					        PyErr_SetString(OperationalError,
 | 
				
			||||||
 | 
					                        PQerrorMessage(curs->conn->pgconn));
 | 
				
			||||||
 | 
					        return -1;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    pthread_mutex_unlock(&(curs->conn->lock));
 | 
				
			||||||
 | 
					    Py_END_ALLOW_THREADS;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    curs->conn->async_status = async_status;
 | 
				
			||||||
 | 
					    if (!(curs->conn->async_cursor
 | 
				
			||||||
 | 
					            = PyWeakref_NewRef((PyObject *)curs, NULL))) {
 | 
				
			||||||
 | 
					        return -1;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return 0;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					RAISES_NEG int
 | 
				
			||||||
 | 
					pq_execute(cursorObject *curs, const char *query, int async, int no_result, int no_begin)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
    /* if the status of the connection is critical raise an exception and
 | 
					    /* if the status of the connection is critical raise an exception and
 | 
				
			||||||
       definitely close the connection */
 | 
					       definitely close the connection */
 | 
				
			||||||
    if (curs->conn->critical) {
 | 
					    if (curs->conn->critical) {
 | 
				
			||||||
| 
						 | 
					@ -1016,115 +1149,14 @@ pq_execute(cursorObject *curs, const char *query, int async, int no_result, int
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    Dprintf("pq_execute: pg connection at %p OK", curs->conn->pgconn);
 | 
					    Dprintf("pq_execute: pg connection at %p OK", curs->conn->pgconn);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    CLEARPGRES(curs->pgres);
 | 
					    if (!async) {
 | 
				
			||||||
 | 
					        return _pq_execute_sync(curs, query, no_result, no_begin);
 | 
				
			||||||
    Py_BEGIN_ALLOW_THREADS;
 | 
					    } else {
 | 
				
			||||||
    pthread_mutex_lock(&(curs->conn->lock));
 | 
					        return _pq_execute_async(curs, query, no_result, no_begin);
 | 
				
			||||||
 | 
					 | 
				
			||||||
    if (!no_begin && pq_begin_locked(curs->conn, &pgres, &error, &_save) < 0) {
 | 
					 | 
				
			||||||
        pthread_mutex_unlock(&(curs->conn->lock));
 | 
					 | 
				
			||||||
        Py_BLOCK_THREADS;
 | 
					 | 
				
			||||||
        pq_complete_error(curs->conn, &pgres, &error);
 | 
					 | 
				
			||||||
        return -1;
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					 | 
				
			||||||
    if (async == 0) {
 | 
					 | 
				
			||||||
        Dprintf("pq_execute: executing SYNC query: pgconn = %p", curs->conn->pgconn);
 | 
					 | 
				
			||||||
        Dprintf("    %-.200s", query);
 | 
					 | 
				
			||||||
        if (!psyco_green()) {
 | 
					 | 
				
			||||||
            curs->pgres = PQexec(curs->conn->pgconn, query);
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        else {
 | 
					 | 
				
			||||||
            Py_BLOCK_THREADS;
 | 
					 | 
				
			||||||
            curs->pgres = psyco_exec_green(curs->conn, query);
 | 
					 | 
				
			||||||
            Py_UNBLOCK_THREADS;
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        /* don't let pgres = NULL go to pq_fetch() */
 | 
					 | 
				
			||||||
        if (curs->pgres == NULL) {
 | 
					 | 
				
			||||||
            if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) {
 | 
					 | 
				
			||||||
                curs->conn->closed = 2;
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
            pthread_mutex_unlock(&(curs->conn->lock));
 | 
					 | 
				
			||||||
            Py_BLOCK_THREADS;
 | 
					 | 
				
			||||||
            if (!PyErr_Occurred()) {
 | 
					 | 
				
			||||||
                PyErr_SetString(OperationalError,
 | 
					 | 
				
			||||||
                                PQerrorMessage(curs->conn->pgconn));
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
            return -1;
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        /* Process notifies here instead of when fetching the tuple as we are
 | 
					 | 
				
			||||||
         * into the same critical section that received the data. Without this
 | 
					 | 
				
			||||||
         * care, reading notifies may disrupt other thread communications.
 | 
					 | 
				
			||||||
         * (as in ticket #55). */
 | 
					 | 
				
			||||||
        Py_BLOCK_THREADS;
 | 
					 | 
				
			||||||
        conn_notifies_process(curs->conn);
 | 
					 | 
				
			||||||
        conn_notice_process(curs->conn);
 | 
					 | 
				
			||||||
        Py_UNBLOCK_THREADS;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    else if (async == 1) {
 | 
					 | 
				
			||||||
        int ret;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        Dprintf("pq_execute: executing ASYNC query: pgconn = %p", curs->conn->pgconn);
 | 
					 | 
				
			||||||
        Dprintf("    %-.200s", query);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if (PQsendQuery(curs->conn->pgconn, query) == 0) {
 | 
					 | 
				
			||||||
            if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) {
 | 
					 | 
				
			||||||
                curs->conn->closed = 2;
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
            pthread_mutex_unlock(&(curs->conn->lock));
 | 
					 | 
				
			||||||
            Py_BLOCK_THREADS;
 | 
					 | 
				
			||||||
            PyErr_SetString(OperationalError,
 | 
					 | 
				
			||||||
                            PQerrorMessage(curs->conn->pgconn));
 | 
					 | 
				
			||||||
            return -1;
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        Dprintf("pq_execute: async query sent to backend");
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        ret = PQflush(curs->conn->pgconn);
 | 
					 | 
				
			||||||
        if (ret == 0) {
 | 
					 | 
				
			||||||
            /* the query got fully sent to the server */
 | 
					 | 
				
			||||||
            Dprintf("pq_execute: query got flushed immediately");
 | 
					 | 
				
			||||||
            /* the async status will be ASYNC_READ */
 | 
					 | 
				
			||||||
            async_status = ASYNC_READ;
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        else if (ret == 1) {
 | 
					 | 
				
			||||||
            /* not all of the query got sent to the server */
 | 
					 | 
				
			||||||
            async_status = ASYNC_WRITE;
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        else {
 | 
					 | 
				
			||||||
            /* there was an error */
 | 
					 | 
				
			||||||
            pthread_mutex_unlock(&(curs->conn->lock));
 | 
					 | 
				
			||||||
            Py_BLOCK_THREADS;
 | 
					 | 
				
			||||||
            PyErr_SetString(OperationalError,
 | 
					 | 
				
			||||||
                            PQerrorMessage(curs->conn->pgconn));
 | 
					 | 
				
			||||||
            return -1;
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    pthread_mutex_unlock(&(curs->conn->lock));
 | 
					 | 
				
			||||||
    Py_END_ALLOW_THREADS;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    /* if the execute was sync, we call pq_fetch() immediately,
 | 
					 | 
				
			||||||
       to respect the old DBAPI-2.0 compatible behaviour */
 | 
					 | 
				
			||||||
    if (async == 0) {
 | 
					 | 
				
			||||||
        Dprintf("pq_execute: entering synchronous DBAPI compatibility mode");
 | 
					 | 
				
			||||||
        if (pq_fetch(curs, no_result) < 0) return -1;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    else {
 | 
					 | 
				
			||||||
        PyObject *tmp;
 | 
					 | 
				
			||||||
        curs->conn->async_status = async_status;
 | 
					 | 
				
			||||||
        curs->conn->async_cursor = tmp = PyWeakref_NewRef((PyObject *)curs, NULL);
 | 
					 | 
				
			||||||
        if (!tmp) {
 | 
					 | 
				
			||||||
            /* weakref creation failed */
 | 
					 | 
				
			||||||
            return -1;
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    return 1-async;
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/* send an async query to the backend.
 | 
					/* send an async query to the backend.
 | 
				
			||||||
 *
 | 
					 *
 | 
				
			||||||
 * Return 1 if command succeeded, else 0.
 | 
					 * Return 1 if command succeeded, else 0.
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue
	
	Block a user