Support asynchronous connection building

After calling psycopg2.connect(dsn, async=True) you can poll the
connection that will tell you whether its file descriptor should be
waited on to become writable or readable or that the connection
attempt has succeeded.

Edited commit by Jan to not expose internal state in extensions.py.
This commit is contained in:
Federico Di Gregorio 2010-04-05 11:30:03 +02:00
parent 1f6ffbba0f
commit 34317dc4c3
5 changed files with 572 additions and 72 deletions

View File

@ -76,6 +76,11 @@ STATUS_ASYNC = 4
# This is a usefull mnemonic to check if the connection is in a transaction
STATUS_IN_TRANSACTION = STATUS_BEGIN
"""psycopg async connection polling values"""
POLL_READ = 1
POLL_WRITE = 2
POLL_OK = 3
"""Backend transaction status values."""
TRANSACTION_STATUS_IDLE = 0
TRANSACTION_STATUS_ACTIVE = 1

View File

@ -41,10 +41,33 @@ extern "C" {
#define CONN_STATUS_BEGIN 2
#define CONN_STATUS_SYNC 3
#define CONN_STATUS_ASYNC 4
/* async connection building statuses */
#define CONN_STATUS_SEND_DATESTYLE 5
#define CONN_STATUS_SENT_DATESTYLE 6
#define CONN_STATUS_GET_DATESTYLE 7
#define CONN_STATUS_SEND_CLIENT_ENCODING 8
#define CONN_STATUS_SENT_CLIENT_ENCODING 9
#define CONN_STATUS_GET_CLIENT_ENCODING 10
#define CONN_STATUS_SEND_TRANSACTION_ISOLATION 11
#define CONN_STATUS_SENT_TRANSACTION_ISOLATION 12
#define CONN_STATUS_GET_TRANSACTION_ISOLATION 13
/* polling result, try to keep in sync with PostgresPollingStatusType from
libpq-fe.h */
#define PSYCO_POLL_READ 1
#define PSYCO_POLL_WRITE 2
#define PSYCO_POLL_OK 3
/* Hard limit on the notices stored by the Python connection */
#define CONN_NOTICES_LIMIT 50
/* we need the initial date style to be ISO, for typecasters; if the user
later change it, she must know what she's doing... these are the queries we
need to issue */
#define psyco_datestyle "SET DATESTYLE TO 'ISO'"
#define psyco_client_encoding "SHOW client_encoding"
#define psyco_transaction_isolation "SHOW default_transaction_isolation"
extern HIDDEN PyTypeObject connectionType;
struct connectionObject_notice {
@ -66,12 +89,14 @@ typedef struct {
long int isolation_level; /* isolation level for this connection */
long int mark; /* number of commits/rollbacks done so far */
int status; /* status of the connection */
long int async; /* 1 means the connection is async */
int protocol; /* protocol version */
int server_version; /* server version */
PGconn *pgconn; /* the postgresql connection */
PGconn *pgconn; /* the postgresql connection */
PyObject *async_cursor;
PyObject *async_cursor; /* a cursor executing an asynchronous query */
/* notice processing */
PyObject *notice_list;
@ -89,15 +114,21 @@ typedef struct {
} connectionObject;
/* C-callable functions in connection_int.c and connection_ext.c */
HIDDEN int conn_get_standard_conforming_strings(PGconn *pgconn);
HIDDEN char *conn_get_encoding(PGresult *pgres);
HIDDEN int conn_get_isolation_level(PGresult *pgres);
HIDDEN int conn_get_protocol_version(PGconn *pgconn);
HIDDEN void conn_notice_process(connectionObject *self);
HIDDEN void conn_notice_clean(connectionObject *self);
HIDDEN int conn_setup(connectionObject *self, PGconn *pgconn);
HIDDEN int conn_connect(connectionObject *self);
HIDDEN int conn_connect(connectionObject *self, long int async);
HIDDEN void conn_close(connectionObject *self);
HIDDEN int conn_commit(connectionObject *self);
HIDDEN int conn_rollback(connectionObject *self);
HIDDEN int conn_switch_isolation_level(connectionObject *self, int level);
HIDDEN int conn_set_client_encoding(connectionObject *self, const char *enc);
HIDDEN PyObject *conn_poll_send(connectionObject *self);
HIDDEN PyObject *conn_poll_fetch(connectionObject *self);
/* exception-raising macros */
#define EXC_IF_CONN_CLOSED(self) if ((self)->closed > 0) { \

View File

@ -123,35 +123,16 @@ conn_notice_clean(connectionObject *self)
Py_END_ALLOW_THREADS;
}
/* conn_setup - setup and read basic information about the connection */
/*
* the conn_get_* family of functions makes it easier to obtain the connection
* parameters from query results or by interrogating the connection itself
*/
int
conn_setup(connectionObject *self, PGconn *pgconn)
conn_get_standard_conforming_strings(PGconn *pgconn)
{
PGresult *pgres;
const char *data, *tmp;
const char *scs; /* standard-conforming strings */
size_t i;
/* we need the initial date style to be ISO, for typecasters; if the user
later change it, she must know what she's doing... */
static const char datestyle[] = "SET DATESTYLE TO 'ISO'";
static const char encoding[] = "SHOW client_encoding";
static const char isolevel[] = "SHOW default_transaction_isolation";
static const char lvl1a[] = "read uncommitted";
static const char lvl1b[] = "read committed";
static const char lvl2a[] = "repeatable read";
static const char lvl2b[] = "serializable";
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
Py_BLOCK_THREADS;
if (self->encoding) free(self->encoding);
self->equote = 0;
self->isolation_level = 0;
int equote;
const char *scs;
/*
* The presence of the 'standard_conforming_strings' parameter
* means that the server _accepts_ the E'' quote.
@ -173,15 +154,82 @@ conn_setup(connectionObject *self, PGconn *pgconn)
scs ? scs : "unavailable");
#ifndef PSYCOPG_OWN_QUOTING
self->equote = (scs && (0 == strcmp("off", scs)));
equote = (scs && (0 == strcmp("off", scs)));
#else
self->equote = (scs != NULL);
equote = (scs != NULL);
#endif
Dprintf("conn_connect: server requires E'' quotes: %s",
self->equote ? "YES" : "NO");
equote ? "YES" : "NO");
return equote;
}
char *
conn_get_encoding(PGresult *pgres)
{
char *tmp, *encoding;
size_t i;
tmp = PQgetvalue(pgres, 0, 0);
encoding = malloc(strlen(tmp)+1);
if (encoding == NULL) {
PyErr_NoMemory();
IFCLEARPGRES(pgres);
return NULL;
}
for (i=0 ; i < strlen(tmp) ; i++)
encoding[i] = toupper(tmp[i]);
encoding[i] = '\0';
CLEARPGRES(pgres);
return encoding;
}
int
conn_get_isolation_level(PGresult *pgres)
{
static const char lvl1a[] = "read uncommitted";
static const char lvl1b[] = "read committed";
char *isolation_level = PQgetvalue(pgres, 0, 0);
CLEARPGRES(pgres);
if ((strncmp(lvl1a, isolation_level, strlen(isolation_level)) == 0)
|| (strncmp(lvl1b, isolation_level, strlen(isolation_level)) == 0))
return 1;
else /* if it's not one of the lower ones, it's SERIALIZABLE */
return 2;
}
int
conn_get_protocol_version(PGconn *pgconn)
{
#ifdef HAVE_PQPROTOCOL3
return PQprotocolVersion(pgconn);
#else
return 2;
#endif
}
/* conn_setup - setup and read basic information about the connection */
int
conn_setup(connectionObject *self, PGconn *pgconn)
{
PGresult *pgres;
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
Py_BLOCK_THREADS;
if (self->encoding) free(self->encoding);
self->equote = 0;
self->isolation_level = 0;
self->equote = conn_get_standard_conforming_strings(pgconn);
Py_UNBLOCK_THREADS;
pgres = PQexec(pgconn, datestyle);
pgres = PQexec(pgconn, psyco_datestyle);
Py_BLOCK_THREADS;
if (pgres == NULL || PQresultStatus(pgres) != PGRES_COMMAND_OK ) {
@ -196,7 +244,7 @@ conn_setup(connectionObject *self, PGconn *pgconn)
CLEARPGRES(pgres);
Py_UNBLOCK_THREADS;
pgres = PQexec(pgconn, encoding);
pgres = PQexec(pgconn, psyco_client_encoding);
Py_BLOCK_THREADS;
if (pgres == NULL || PQresultStatus(pgres) != PGRES_TUPLES_OK) {
@ -208,24 +256,19 @@ conn_setup(connectionObject *self, PGconn *pgconn)
Py_BLOCK_THREADS;
return -1;
}
tmp = PQgetvalue(pgres, 0, 0);
self->encoding = malloc(strlen(tmp)+1);
/* conn_get_encoding returns a malloc'd string */
self->encoding = conn_get_encoding(pgres);
if (self->encoding == NULL) {
PyErr_NoMemory();
PQfinish(pgconn);
IFCLEARPGRES(pgres);
Py_UNBLOCK_THREADS;
pthread_mutex_unlock(&self->lock);
Py_BLOCK_THREADS;
return -1;
}
for (i=0 ; i < strlen(tmp) ; i++)
self->encoding[i] = toupper(tmp[i]);
self->encoding[i] = '\0';
CLEARPGRES(pgres);
Py_UNBLOCK_THREADS;
pgres = PQexec(pgconn, isolevel);
pgres = PQexec(pgconn, psyco_transaction_isolation);
Py_BLOCK_THREADS;
if (pgres == NULL || PQresultStatus(pgres) != PGRES_TUPLES_OK) {
@ -238,16 +281,7 @@ conn_setup(connectionObject *self, PGconn *pgconn)
Py_BLOCK_THREADS;
return -1;
}
data = PQgetvalue(pgres, 0, 0);
if ((strncmp(lvl1a, data, strlen(lvl1a)) == 0)
|| (strncmp(lvl1b, data, strlen(lvl1b)) == 0))
self->isolation_level = 1;
else if ((strncmp(lvl2a, data, strlen(lvl2a)) == 0)
|| (strncmp(lvl2b, data, strlen(lvl2b)) == 0))
self->isolation_level = 2;
else
self->isolation_level = 2;
CLEARPGRES(pgres);
self->isolation_level = conn_get_isolation_level(pgres);
Py_UNBLOCK_THREADS;
pthread_mutex_unlock(&self->lock);
@ -259,7 +293,7 @@ conn_setup(connectionObject *self, PGconn *pgconn)
/* conn_connect - execute a connection to the database */
int
conn_connect(connectionObject *self)
conn_sync_connect(connectionObject *self)
{
PGconn *pgconn;
@ -295,11 +329,7 @@ conn_connect(connectionObject *self)
return -1;
}
#ifdef HAVE_PQPROTOCOL3
self->protocol = PQprotocolVersion(pgconn);
#else
self->protocol = 2;
#endif
self->protocol = conn_get_protocol_version(pgconn);
Dprintf("conn_connect: using protocol %d", self->protocol);
self->server_version = (int)PQserverVersion(pgconn);
@ -308,6 +338,246 @@ conn_connect(connectionObject *self)
return 0;
}
static int
conn_async_connect(connectionObject *self)
{
PGconn *pgconn;
pgconn = PQconnectStart(self->dsn);
Dprintf("conn_connect: new postgresql connection at %p", pgconn);
if (pgconn == NULL)
{
Dprintf("conn_connect: PQconnectStart(%s) FAILED", self->dsn);
PyErr_SetString(OperationalError, "PQconnectStart() failed");
return -1;
}
else if (PQstatus(pgconn) == CONNECTION_BAD)
{
Dprintf("conn_connect: PQconnectdb(%s) returned BAD", self->dsn);
PyErr_SetString(OperationalError, PQerrorMessage(pgconn));
PQfinish(pgconn);
return -1;
}
PQsetNoticeProcessor(pgconn, conn_notice_callback, (void*)self);
self->status = CONN_STATUS_ASYNC;
self->pgconn = pgconn;
return 0;
}
int
conn_connect(connectionObject *self, long int async)
{
if (async == 1) {
Dprintf("con_connect: connecting in ASYNC mode");
return conn_async_connect(self);
}
else {
Dprintf("con_connect: connecting in SYNC mode");
return conn_sync_connect(self);
}
}
/* conn_poll_send - handle connection polling when flushing output */
PyObject *
conn_poll_send(connectionObject *self)
{
const char *query;
int next_status;
int ret;
Dprintf("conn_poll_send: status %d", self->status);
switch (self->status) {
case CONN_STATUS_SEND_DATESTYLE:
/* set the datestyle */
query = psyco_datestyle;
next_status = CONN_STATUS_SENT_DATESTYLE;
break;
case CONN_STATUS_SEND_CLIENT_ENCODING:
/* get the client_encoding */
query = psyco_client_encoding;
next_status = CONN_STATUS_SENT_CLIENT_ENCODING;
break;
case CONN_STATUS_SEND_TRANSACTION_ISOLATION:
/* get the default isolevel */
query = psyco_transaction_isolation;
next_status = CONN_STATUS_SENT_TRANSACTION_ISOLATION;
break;
default:
/* unexpected state, error out */
PyErr_Format(OperationalError,
"unexpected state: %d", self->status);
return NULL;
}
Dprintf("conn_poll_send: sending query %-.200s", query);
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(self->lock));
if (PQsendQuery(self->pgconn, query) != 1) {
pthread_mutex_unlock(&(self->lock));
Py_BLOCK_THREADS;
PyErr_SetString(OperationalError,
PQerrorMessage(self->pgconn));
return NULL;
}
if (PQflush(self->pgconn) == 0) {
/* the query got fully sent to the server */
Dprintf("conn_poll_send: query got flushed immediately");
/* the return value will be POLL_READ */
ret = PSYCO_POLL_READ;
/* advance the next status, since we skip over the "waiting for the
query to be sent" status */
switch (next_status) {
case CONN_STATUS_SENT_DATESTYLE:
next_status = CONN_STATUS_GET_DATESTYLE;
break;
case CONN_STATUS_SENT_CLIENT_ENCODING:
next_status = CONN_STATUS_GET_CLIENT_ENCODING;
break;
case CONN_STATUS_SENT_TRANSACTION_ISOLATION:
next_status = CONN_STATUS_GET_TRANSACTION_ISOLATION;
break;
}
}
else {
/* query did not get sent completely, tell the client to wait for the
socket to become writable */
ret = PSYCO_POLL_WRITE;
}
self->status = next_status;
Dprintf("conn_poll_send: next status is %d, returning %d",
self->status, ret);
pthread_mutex_unlock(&(self->lock));
Py_END_ALLOW_THREADS;
return PyInt_FromLong(ret);
}
/* curs_poll_fetch - handle connection polling when reading result */
PyObject *
conn_poll_fetch(connectionObject *self)
{
PGresult *pgres;
int is_busy;
int next_status;
int ret;
Dprintf("conn_poll_fetch: status %d", self->status);
/* consume the input */
is_busy = pq_is_busy(self);
if (is_busy == -1) {
/* there was an error, raise the exception */
return NULL;
}
else if (is_busy == 1) {
/* the connection is busy, tell the user to wait more */
Dprintf("conn_poll_fetch: connection busy, returning %d",
PSYCO_POLL_READ);
return PyInt_FromLong(PSYCO_POLL_READ);
}
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(self->lock));
/* connection no longer busy, process input */
pgres = PQgetResult(self->pgconn);
/* do the rest while holding the GIL, we won't be calling into any
blocking API */
pthread_mutex_unlock(&(self->lock));
Py_END_ALLOW_THREADS;
Dprintf("conn_poll_fetch: got result %p", pgres);
/* we expect COMMAND_OK (from SET) or TUPLES_OK (from SHOW) */
if (pgres == NULL || (PQresultStatus(pgres) != PGRES_COMMAND_OK &&
PQresultStatus(pgres) != PGRES_TUPLES_OK)) {
PyErr_SetString(OperationalError, "can't issue "
"initial connection queries");
PQfinish(self->pgconn);
IFCLEARPGRES(pgres);
return NULL;
}
if (self->status == CONN_STATUS_GET_DATESTYLE) {
/* got the result from SET DATESTYLE*/
Dprintf("conn_poll_fetch: datestyle set");
next_status = CONN_STATUS_SEND_CLIENT_ENCODING;
}
else if (self->status == CONN_STATUS_GET_CLIENT_ENCODING) {
/* got the client_encoding */
self->encoding = conn_get_encoding(pgres);
if (self->encoding == NULL) {
PQfinish(self->pgconn);
return NULL;
}
Dprintf("conn_poll_fetch: got client_encoding %s", self->encoding);
next_status = CONN_STATUS_SEND_TRANSACTION_ISOLATION;
}
else if (self->status == CONN_STATUS_GET_TRANSACTION_ISOLATION) {
/* got the default isolevel */
self->isolation_level = conn_get_isolation_level(pgres);
Dprintf("conn_poll_fetch: got isolevel %ld", self->isolation_level);
/* since this is the last step, set the other instance variables now */
self->equote = conn_get_standard_conforming_strings(self->pgconn);
self->protocol = conn_get_protocol_version(self->pgconn);
self->server_version = (int) PQserverVersion(self->pgconn);
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(self->lock));
/* set the connection to nonblocking */
if (PQsetnonblocking(self->pgconn, 1) != 0) {
Dprintf("conn_async_connect: PQsetnonblocking() FAILED");
Py_BLOCK_THREADS;
PyErr_SetString(OperationalError, "PQsetnonblocking() failed");
PQfinish(self->pgconn);
return NULL;
}
pthread_mutex_unlock(&(self->lock));
Py_END_ALLOW_THREADS;
/* next status is going to READY */
next_status = CONN_STATUS_READY;
}
else {
/* unexpected state, error out */
PyErr_Format(OperationalError,
"unexpected state: %d", self->status);
return NULL;
}
/* clear any leftover result, there should be none, but the protocol
requires calling PQgetResult until you get a NULL */
pq_clear_async(self);
self->status = next_status;
/* if the curent status is READY it means we got the result of the
last initialization query, so we return POLL_OK, otherwise we need to
send another query, so return POLL_WRITE */
ret = self->status == CONN_STATUS_READY ? PSYCO_POLL_OK : PSYCO_POLL_WRITE;
Dprintf("conn_poll_fetch: next status is %d, returning %d",
self->status, ret);
return PyInt_FromLong(ret);
}
/* conn_close - do anything needed to shut down the connection */
void

View File

@ -37,6 +37,7 @@
#include "psycopg/psycopg.h"
#include "psycopg/connection.h"
#include "psycopg/cursor.h"
#include "psycopg/pqpath.h"
#include "psycopg/lobject.h"
/** DBAPI methods **/
@ -66,6 +67,20 @@ psyco_conn_cursor(connectionObject *self, PyObject *args, PyObject *keywds)
EXC_IF_CONN_CLOSED(self);
if (self->status != CONN_STATUS_READY &&
self->status != CONN_STATUS_BEGIN) {
PyErr_SetString(OperationalError,
"asynchronous connection attempt underway");
return NULL;
}
if (name != NULL && self->async) {
PyErr_SetString(OperationalError,
"asynchronous connections "
"cannot produce named cursors");
return NULL;
}
Dprintf("psyco_conn_cursor: new cursor for connection at %p", self);
Dprintf("psyco_conn_cursor: parameters: name = %s", name);
@ -389,6 +404,172 @@ psyco_conn_get_exception(PyObject *self, void *closure)
return exception;
}
#define psyco_conn_poll_doc \
"poll() -- return POLL_OK if the connection has been estabilished, " \
"POLL_READ if the application should be waiting " \
"for the socket to be readable or POLL_WRITE " \
"if the socket should be writable."
static PyObject *
psyco_conn_poll(connectionObject *self)
{
PostgresPollingStatusType poll_status;
Dprintf("conn_poll: polling with status %d", self->status);
switch (self->status) {
case CONN_STATUS_SEND_DATESTYLE:
case CONN_STATUS_SEND_CLIENT_ENCODING:
case CONN_STATUS_SEND_TRANSACTION_ISOLATION:
/* these mean that we need to wait for the socket to become writable
to send the rest of our query */
return conn_poll_send(self);
case CONN_STATUS_GET_DATESTYLE:
case CONN_STATUS_GET_CLIENT_ENCODING:
case CONN_STATUS_GET_TRANSACTION_ISOLATION:
/* these mean that we are waiting for the results of the queries */
return conn_poll_fetch(self);
case CONN_STATUS_ASYNC:
/* this means we are in the middle of a PQconnectPoll loop */
break;
case CONN_STATUS_READY:
/* we have completed the connection setup */
return PyInt_FromLong(PSYCO_POLL_OK);
default:
/* everything else is an error */
PyErr_SetString(OperationalError,
"not in asynchronous connection attempt");
return NULL;
}
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
poll_status = PQconnectPoll(self->pgconn);
if (poll_status == PGRES_POLLING_READING) {
pthread_mutex_unlock(&(self->lock));
Py_BLOCK_THREADS;
Dprintf("conn_poll: returing POLL_READ");
return PyInt_FromLong(PSYCO_POLL_READ);
}
if (poll_status == PGRES_POLLING_WRITING) {
pthread_mutex_unlock(&(self->lock));
Py_BLOCK_THREADS;
Dprintf("conn_poll: returing POLL_WRITE");
return PyInt_FromLong(PSYCO_POLL_WRITE);
}
if (poll_status == PGRES_POLLING_FAILED) {
pthread_mutex_unlock(&(self->lock));
Py_BLOCK_THREADS;
PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
return NULL;
}
/* the only other thing that PQconnectPoll can return is PGRES_POLLING_OK,
but make sure */
if (poll_status != PGRES_POLLING_OK) {
pthread_mutex_unlock(&(self->lock));
Py_BLOCK_THREADS;
PyErr_Format(OperationalError,
"unexpected result from PQconnectPoll: %d", poll_status);
return NULL;
}
Dprintf("conn_poll: got POLL_OK");
/* the connection is built, but we want to do a few other things before we
let the user use it */
self->equote = conn_get_standard_conforming_strings(self->pgconn);
Dprintf("conn_poll: got standard_conforming_strings");
/*
* Here is the tricky part, we need to figure the datestyle,
* client_encoding and isolevel, all using nonblocking calls. To do that
* we will keep telling the user to poll, while we are waiting for our
* asynchronous queries to complete.
*/
pthread_mutex_unlock(&(self->lock));
Py_END_ALLOW_THREADS;
/* the next operation the client will do is send a query, so ask him to
wait for a writable condition */
self->status = CONN_STATUS_SEND_DATESTYLE;
Dprintf("conn_poll: connection is built, retrning %d",
PSYCO_POLL_WRITE);
return PyInt_FromLong(PSYCO_POLL_WRITE);
}
/* extension: fileno - return the file descriptor of the connection */
#define psyco_conn_fileno_doc \
"fileno() -> int -- Return file descriptor associated to database connection."
static PyObject *
psyco_conn_fileno(connectionObject *self)
{
long int socket;
EXC_IF_CONN_CLOSED(self);
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(self->lock));
socket = (long int)PQsocket(self->pgconn);
pthread_mutex_unlock(&(self->lock));
Py_END_ALLOW_THREADS;
return PyInt_FromLong(socket);
}
/* extension: issync - tell if the connection is synchronous */
#define psyco_conn_issync_doc \
"issync() -> bool -- Return True if the connection is synchronous."
static PyObject *
psyco_conn_issync(connectionObject *self)
{
if (self->async) {
Py_INCREF(Py_False);
return Py_False;
}
else {
Py_INCREF(Py_True);
return Py_True;
}
}
/* extension: executing - check for asynchronous operations */
#define psyco_conn_executing_doc \
"executing() -> bool -- Return True if the connection is " \
"executing an asynchronous operation."
static PyObject *
psyco_conn_executing(connectionObject *self)
{
if (self->async_cursor == NULL) {
Py_INCREF(Py_False);
return Py_False;
}
else {
Py_INCREF(Py_True);
return Py_True;
}
}
/** the connection object **/
@ -418,6 +599,14 @@ static struct PyMethodDef connectionObject_methods[] = {
METH_VARARGS|METH_KEYWORDS, psyco_conn_lobject_doc},
{"reset", (PyCFunction)psyco_conn_reset,
METH_NOARGS, psyco_conn_reset_doc},
{"poll", (PyCFunction)psyco_conn_poll,
METH_NOARGS, psyco_conn_lobject_doc},
{"fileno", (PyCFunction)psyco_conn_fileno,
METH_NOARGS, psyco_conn_fileno_doc},
{"issync", (PyCFunction)psyco_conn_issync,
METH_NOARGS, psyco_conn_issync_doc},
{"executing", (PyCFunction)psyco_conn_executing,
METH_NOARGS, psyco_conn_executing_doc},
#endif
{NULL}
};
@ -476,21 +665,22 @@ static struct PyGetSetDef connectionObject_getsets[] = {
/* initialization and finalization methods */
static int
connection_setup(connectionObject *self, const char *dsn)
connection_setup(connectionObject *self, const char *dsn, long int async)
{
char *pos;
int res;
Dprintf("connection_setup: init connection object at %p, refcnt = "
FORMAT_CODE_PY_SSIZE_T,
self, ((PyObject *)self)->ob_refcnt
Dprintf("connection_setup: init connection object at %p, "
"async %ld, refcnt = " FORMAT_CODE_PY_SSIZE_T,
self, async, ((PyObject *)self)->ob_refcnt
);
self->dsn = strdup(dsn);
self->notice_list = PyList_New(0);
self->notifies = PyList_New(0);
self->closed = 0;
self->status = CONN_STATUS_READY;
self->async = async;
self->status = async ? CONN_STATUS_ASYNC : CONN_STATUS_READY;
self->critical = NULL;
self->async_cursor = NULL;
self->pgconn = NULL;
@ -502,7 +692,7 @@ connection_setup(connectionObject *self, const char *dsn)
pthread_mutex_init(&(self->lock), NULL);
if (conn_connect(self) != 0) {
if (conn_connect(self, async) != 0) {
Dprintf("connection_init: FAILED");
res = -1;
}
@ -560,11 +750,12 @@ static int
connection_init(PyObject *obj, PyObject *args, PyObject *kwds)
{
const char *dsn;
long int async = 0;
if (!PyArg_ParseTuple(args, "s", &dsn))
if (!PyArg_ParseTuple(args, "s|l", &dsn, &async))
return -1;
return connection_setup((connectionObject *)obj, dsn);
return connection_setup((connectionObject *)obj, dsn, async);
}
static PyObject *

View File

@ -90,6 +90,7 @@ HIDDEN int psycopg_debug_enabled = 0;
"- ``user`` -- user name used to authenticate\n" \
"- ``password`` -- password used to authenticate\n" \
"- ``sslmode`` -- SSL mode (see PostgreSQL documentation)\n\n" \
"- ``async`` -- if the connection should provide asynchronous API\n\n" \
"If the ``connection_factory`` keyword argument is not provided this\n" \
"function always return an instance of the `connection` class.\n" \
"Else the given sub-class of `extensions.connection` will be used to\n" \
@ -118,14 +119,16 @@ psyco_connect(PyObject *self, PyObject *args, PyObject *keywds)
const char *database=NULL, *user=NULL, *password=NULL;
const char *host=NULL, *sslmode=NULL;
char port[16];
int async = 0;
static char *kwlist[] = {"dsn", "database", "host", "port",
"user", "password", "sslmode",
"connection_factory", NULL};
"connection_factory", "async", NULL};
if (!PyArg_ParseTupleAndKeywords(args, keywds, "|sssOsssO", kwlist,
if (!PyArg_ParseTupleAndKeywords(args, keywds, "|sssOsssOi", kwlist,
&dsn_static, &database, &host, &pyport,
&user, &password, &sslmode, &factory)) {
&user, &password, &sslmode,
&factory, &async)) {
return NULL;
}
@ -190,11 +193,11 @@ psyco_connect(PyObject *self, PyObject *args, PyObject *keywds)
{
const char *dsn = (dsn_static != NULL ? dsn_static : dsn_dynamic);
Dprintf("psyco_connect: dsn = '%s'", dsn);
Dprintf("psyco_connect: dsn = '%s', async = %d", dsn, async);
/* allocate connection, fill with errors and return it */
if (factory == NULL) factory = (PyObject *)&connectionType;
conn = PyObject_CallFunction(factory, "s", dsn);
conn = PyObject_CallFunction(factory, "si", dsn, async);
}
goto cleanup;