mirror of
				https://github.com/psycopg/psycopg2.git
				synced 2025-10-30 23:37:29 +03:00 
			
		
		
		
	Merge branch 'execute-locks' into maint_2_7
This commit is contained in:
		
						commit
						ba1d9d7dc6
					
				
							
								
								
									
										7
									
								
								NEWS
									
									
									
									
									
								
							
							
						
						
									
										7
									
								
								NEWS
									
									
									
									
									
								
							|  | @ -1,6 +1,13 @@ | |||
| Current release | ||||
| --------------- | ||||
| 
 | ||||
| What's new in psycopg 2.7.7 | ||||
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||||
| 
 | ||||
| - Cleanup of the cursor results assignment code, which might have solved | ||||
|   double free and inconsistencies in concurrent usage (:tickets:`#346, #384`). | ||||
| 
 | ||||
| 
 | ||||
| What's new in psycopg 2.7.6.1 | ||||
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||||
| 
 | ||||
|  |  | |||
							
								
								
									
										246
									
								
								psycopg/pqpath.c
									
									
									
									
									
								
							
							
						
						
									
										246
									
								
								psycopg/pqpath.c
									
									
									
									
									
								
							|  | @ -951,12 +951,145 @@ pq_flush(connectionObject *conn) | |||
| */ | ||||
| 
 | ||||
| RAISES_NEG int | ||||
| pq_execute(cursorObject *curs, const char *query, int async, int no_result, int no_begin) | ||||
| _pq_execute_sync(cursorObject *curs, const char *query, int no_result, int no_begin) | ||||
| { | ||||
|     PGresult *pgres = NULL; | ||||
|     char *error = NULL; | ||||
| 
 | ||||
|     CLEARPGRES(curs->pgres); | ||||
| 
 | ||||
|     Py_BEGIN_ALLOW_THREADS; | ||||
|     pthread_mutex_lock(&(curs->conn->lock)); | ||||
| 
 | ||||
|     if (!no_begin && pq_begin_locked(curs->conn, &pgres, &error, &_save) < 0) { | ||||
|         pthread_mutex_unlock(&(curs->conn->lock)); | ||||
|         Py_BLOCK_THREADS; | ||||
|         pq_complete_error(curs->conn, &pgres, &error); | ||||
|         return -1; | ||||
|     } | ||||
| 
 | ||||
|     Dprintf("pq_execute: executing SYNC query: pgconn = %p", curs->conn->pgconn); | ||||
|     Dprintf("    %-.200s", query); | ||||
|     if (!psyco_green()) { | ||||
|         pgres = PQexec(curs->conn->pgconn, query); | ||||
|     } | ||||
|     else { | ||||
|         Py_BLOCK_THREADS; | ||||
|         pgres = psyco_exec_green(curs->conn, query); | ||||
|         Py_UNBLOCK_THREADS; | ||||
|     } | ||||
| 
 | ||||
|     /* don't let pgres = NULL go to pq_fetch() */ | ||||
|     if (pgres == NULL) { | ||||
|         if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) { | ||||
|             curs->conn->closed = 2; | ||||
|         } | ||||
|         pthread_mutex_unlock(&(curs->conn->lock)); | ||||
|         Py_BLOCK_THREADS; | ||||
|         if (!PyErr_Occurred()) { | ||||
|             PyErr_SetString(OperationalError, | ||||
|                             PQerrorMessage(curs->conn->pgconn)); | ||||
|         } | ||||
|         return -1; | ||||
|     } | ||||
| 
 | ||||
|     Py_BLOCK_THREADS; | ||||
| 
 | ||||
|     /* assign the result back to the cursor now that we have the GIL */ | ||||
|     curs->pgres = pgres; | ||||
|     pgres = NULL; | ||||
| 
 | ||||
|     /* Process notifies here instead of when fetching the tuple as we are
 | ||||
|      * into the same critical section that received the data. Without this | ||||
|      * care, reading notifies may disrupt other thread communications. | ||||
|      * (as in ticket #55). */ | ||||
|     conn_notifies_process(curs->conn); | ||||
|     conn_notice_process(curs->conn); | ||||
|     Py_UNBLOCK_THREADS; | ||||
| 
 | ||||
|     pthread_mutex_unlock(&(curs->conn->lock)); | ||||
|     Py_END_ALLOW_THREADS; | ||||
| 
 | ||||
|     /* if the execute was sync, we call pq_fetch() immediately,
 | ||||
|        to respect the old DBAPI-2.0 compatible behaviour */ | ||||
|     Dprintf("pq_execute: entering synchronous DBAPI compatibility mode"); | ||||
|     if (pq_fetch(curs, no_result) < 0) return -1; | ||||
| 
 | ||||
|     return 1; | ||||
| } | ||||
| 
 | ||||
| RAISES_NEG int | ||||
| _pq_execute_async(cursorObject *curs, const char *query, int no_result, int no_begin) | ||||
| { | ||||
|     PGresult *pgres = NULL; | ||||
|     char *error = NULL; | ||||
|     int async_status = ASYNC_WRITE; | ||||
|     int ret; | ||||
| 
 | ||||
|     CLEARPGRES(curs->pgres); | ||||
| 
 | ||||
|     Py_BEGIN_ALLOW_THREADS; | ||||
|     pthread_mutex_lock(&(curs->conn->lock)); | ||||
| 
 | ||||
|     /* TODO: is this needed here? */ | ||||
|     if (!no_begin && pq_begin_locked(curs->conn, &pgres, &error, &_save) < 0) { | ||||
|         pthread_mutex_unlock(&(curs->conn->lock)); | ||||
|         Py_BLOCK_THREADS; | ||||
|         pq_complete_error(curs->conn, &pgres, &error); | ||||
|         return -1; | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
|     Dprintf("pq_execute: executing ASYNC query: pgconn = %p", curs->conn->pgconn); | ||||
|     Dprintf("    %-.200s", query); | ||||
| 
 | ||||
|     if (PQsendQuery(curs->conn->pgconn, query) == 0) { | ||||
|         if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) { | ||||
|             curs->conn->closed = 2; | ||||
|         } | ||||
|         pthread_mutex_unlock(&(curs->conn->lock)); | ||||
|         Py_BLOCK_THREADS; | ||||
|         PyErr_SetString(OperationalError, | ||||
|                         PQerrorMessage(curs->conn->pgconn)); | ||||
|         return -1; | ||||
|     } | ||||
|     Dprintf("pq_execute: async query sent to backend"); | ||||
| 
 | ||||
|     ret = PQflush(curs->conn->pgconn); | ||||
|     if (ret == 0) { | ||||
|         /* the query got fully sent to the server */ | ||||
|         Dprintf("pq_execute: query got flushed immediately"); | ||||
|         /* the async status will be ASYNC_READ */ | ||||
|         async_status = ASYNC_READ; | ||||
|     } | ||||
|     else if (ret == 1) { | ||||
|         /* not all of the query got sent to the server */ | ||||
|         async_status = ASYNC_WRITE; | ||||
|     } | ||||
|     else { | ||||
|         /* there was an error */ | ||||
|         pthread_mutex_unlock(&(curs->conn->lock)); | ||||
|         Py_BLOCK_THREADS; | ||||
|         PyErr_SetString(OperationalError, | ||||
|                         PQerrorMessage(curs->conn->pgconn)); | ||||
|         return -1; | ||||
|     } | ||||
| 
 | ||||
|     pthread_mutex_unlock(&(curs->conn->lock)); | ||||
|     Py_END_ALLOW_THREADS; | ||||
| 
 | ||||
|     curs->conn->async_status = async_status; | ||||
|     if (!(curs->conn->async_cursor | ||||
|             = PyWeakref_NewRef((PyObject *)curs, NULL))) { | ||||
|         return -1; | ||||
|     } | ||||
| 
 | ||||
|     return 0; | ||||
| } | ||||
| 
 | ||||
| RAISES_NEG int | ||||
| pq_execute(cursorObject *curs, const char *query, int async, int no_result, int no_begin) | ||||
| { | ||||
|     /* if the status of the connection is critical raise an exception and
 | ||||
|        definitely close the connection */ | ||||
|     if (curs->conn->critical) { | ||||
|  | @ -971,115 +1104,14 @@ pq_execute(cursorObject *curs, const char *query, int async, int no_result, int | |||
|     } | ||||
|     Dprintf("pq_execute: pg connection at %p OK", curs->conn->pgconn); | ||||
| 
 | ||||
|     Py_BEGIN_ALLOW_THREADS; | ||||
|     pthread_mutex_lock(&(curs->conn->lock)); | ||||
| 
 | ||||
|     if (!no_begin && pq_begin_locked(curs->conn, &pgres, &error, &_save) < 0) { | ||||
|         pthread_mutex_unlock(&(curs->conn->lock)); | ||||
|         Py_BLOCK_THREADS; | ||||
|         pq_complete_error(curs->conn, &pgres, &error); | ||||
|         return -1; | ||||
|     if (!async) { | ||||
|         return _pq_execute_sync(curs, query, no_result, no_begin); | ||||
|     } else { | ||||
|         return _pq_execute_async(curs, query, no_result, no_begin); | ||||
|     } | ||||
| 
 | ||||
|     if (async == 0) { | ||||
|         CLEARPGRES(curs->pgres); | ||||
|         Dprintf("pq_execute: executing SYNC query: pgconn = %p", curs->conn->pgconn); | ||||
|         Dprintf("    %-.200s", query); | ||||
|         if (!psyco_green()) { | ||||
|             curs->pgres = PQexec(curs->conn->pgconn, query); | ||||
|         } | ||||
|         else { | ||||
|             Py_BLOCK_THREADS; | ||||
|             curs->pgres = psyco_exec_green(curs->conn, query); | ||||
|             Py_UNBLOCK_THREADS; | ||||
|         } | ||||
| 
 | ||||
|         /* don't let pgres = NULL go to pq_fetch() */ | ||||
|         if (curs->pgres == NULL) { | ||||
|             if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) { | ||||
|                 curs->conn->closed = 2; | ||||
|             } | ||||
|             pthread_mutex_unlock(&(curs->conn->lock)); | ||||
|             Py_BLOCK_THREADS; | ||||
|             if (!PyErr_Occurred()) { | ||||
|                 PyErr_SetString(OperationalError, | ||||
|                                 PQerrorMessage(curs->conn->pgconn)); | ||||
|             } | ||||
|             return -1; | ||||
|         } | ||||
| 
 | ||||
|         /* Process notifies here instead of when fetching the tuple as we are
 | ||||
|          * into the same critical section that received the data. Without this | ||||
|          * care, reading notifies may disrupt other thread communications. | ||||
|          * (as in ticket #55). */ | ||||
|         Py_BLOCK_THREADS; | ||||
|         conn_notifies_process(curs->conn); | ||||
|         conn_notice_process(curs->conn); | ||||
|         Py_UNBLOCK_THREADS; | ||||
|     } | ||||
| 
 | ||||
|     else if (async == 1) { | ||||
|         int ret; | ||||
| 
 | ||||
|         Dprintf("pq_execute: executing ASYNC query: pgconn = %p", curs->conn->pgconn); | ||||
|         Dprintf("    %-.200s", query); | ||||
| 
 | ||||
|         CLEARPGRES(curs->pgres); | ||||
|         if (PQsendQuery(curs->conn->pgconn, query) == 0) { | ||||
|             if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) { | ||||
|                 curs->conn->closed = 2; | ||||
|             } | ||||
|             pthread_mutex_unlock(&(curs->conn->lock)); | ||||
|             Py_BLOCK_THREADS; | ||||
|             PyErr_SetString(OperationalError, | ||||
|                             PQerrorMessage(curs->conn->pgconn)); | ||||
|             return -1; | ||||
|         } | ||||
|         Dprintf("pq_execute: async query sent to backend"); | ||||
| 
 | ||||
|         ret = PQflush(curs->conn->pgconn); | ||||
|         if (ret == 0) { | ||||
|             /* the query got fully sent to the server */ | ||||
|             Dprintf("pq_execute: query got flushed immediately"); | ||||
|             /* the async status will be ASYNC_READ */ | ||||
|             async_status = ASYNC_READ; | ||||
|         } | ||||
|         else if (ret == 1) { | ||||
|             /* not all of the query got sent to the server */ | ||||
|             async_status = ASYNC_WRITE; | ||||
|         } | ||||
|         else { | ||||
|             /* there was an error */ | ||||
|             pthread_mutex_unlock(&(curs->conn->lock)); | ||||
|             Py_BLOCK_THREADS; | ||||
|             PyErr_SetString(OperationalError, | ||||
|                             PQerrorMessage(curs->conn->pgconn)); | ||||
|             return -1; | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     pthread_mutex_unlock(&(curs->conn->lock)); | ||||
|     Py_END_ALLOW_THREADS; | ||||
| 
 | ||||
|     /* if the execute was sync, we call pq_fetch() immediately,
 | ||||
|        to respect the old DBAPI-2.0 compatible behaviour */ | ||||
|     if (async == 0) { | ||||
|         Dprintf("pq_execute: entering synchronous DBAPI compatibility mode"); | ||||
|         if (pq_fetch(curs, no_result) < 0) return -1; | ||||
|     } | ||||
|     else { | ||||
|         PyObject *tmp; | ||||
|         curs->conn->async_status = async_status; | ||||
|         curs->conn->async_cursor = tmp = PyWeakref_NewRef((PyObject *)curs, NULL); | ||||
|         if (!tmp) { | ||||
|             /* weakref creation failed */ | ||||
|             return -1; | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     return 1-async; | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| /* send an async query to the backend.
 | ||||
|  * | ||||
|  * Return 1 if command succeeded, else 0. | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	Block a user