diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index a69776f1..8763c27c 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -34,6 +34,7 @@ #include "psycopg/cursor.h" #include "psycopg/pqpath.h" #include "psycopg/green.h" +#include "psycopg/notify.h" /* conn_notice_callback - process notices */ @@ -133,21 +134,47 @@ conn_notice_clean(connectionObject *self) void conn_notifies_process(connectionObject *self) { - PGnotify *pgn; + PGnotify *pgn = NULL; + PyObject *notify = NULL; + PyObject *pid = NULL, *channel = NULL, *payload = NULL; + + /* TODO: we are called without the lock! */ while ((pgn = PQnotifies(self->pgconn)) != NULL) { - PyObject *notify; Dprintf("conn_notifies_process: got NOTIFY from pid %d, msg = %s", (int) pgn->be_pid, pgn->relname); - notify = PyTuple_New(2); - PyTuple_SET_ITEM(notify, 0, PyInt_FromLong((long)pgn->be_pid)); - PyTuple_SET_ITEM(notify, 1, PyString_FromString(pgn->relname)); - PyList_Append(self->notifies, notify); - Py_DECREF(notify); - PQfreemem(pgn); + if (!(pid = PyInt_FromLong((long)pgn->be_pid))) { goto error; } + if (!(channel = PyString_FromString(pgn->relname))) { goto error; } + if (!(payload = PyString_FromString(pgn->extra))) { goto error; } + + if (!(notify = PyObject_CallFunctionObjArgs((PyObject *)&NotifyType, + pid, channel, payload, NULL))) { + goto error; + } + + Py_DECREF(pid); pid = NULL; + Py_DECREF(channel); channel = NULL; + Py_DECREF(payload); payload = NULL; + + PyList_Append(self->notifies, (PyObject *)notify); + + Py_DECREF(notify); notify = NULL; + PQfreemem(pgn); pgn = NULL; } + return; /* no error */ + +error: + if (pgn) { PQfreemem(pgn); } + Py_XDECREF(notify); + Py_XDECREF(pid); + Py_XDECREF(channel); + Py_XDECREF(payload); + + /* TODO: callers currently don't expect an error from us */ + PyErr_Clear(); + } diff --git a/psycopg/notify.h b/psycopg/notify.h new file mode 100644 index 00000000..1490fb41 --- /dev/null +++ b/psycopg/notify.h @@ -0,0 +1,44 @@ +/* notify.h - definition for the psycopg Notify type + * + * Copyright (C) 2010 Daniele Varrazzo + * + * This file is part of psycopg. + * + * psycopg2 is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * In addition, as a special exception, the copyright holders give + * permission to link this program with the OpenSSL library (or with + * modified versions of OpenSSL that use the same license as OpenSSL), + * and distribute linked combinations including the two. + * + * You must obey the GNU Lesser General Public License in all respects for + * all of the code used other than OpenSSL. + * + * psycopg2 is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + */ + +#ifndef PSYCOPG_NOTIFY_H +#define PSYCOPG_NOTIFY_H 1 + +#include + +#include "psycopg/config.h" + +extern HIDDEN PyTypeObject NotifyType; + +typedef struct { + PyObject_HEAD + + PyObject *pid; + PyObject *channel; + PyObject *payload; + +} NotifyObject; + +#endif /* PSYCOPG_NOTIFY_H */ diff --git a/psycopg/notify_type.c b/psycopg/notify_type.c new file mode 100644 index 00000000..38288228 --- /dev/null +++ b/psycopg/notify_type.c @@ -0,0 +1,213 @@ +/* notify_type.c - python interface to Notify objects + * + * Copyright (C) 2010 Daniele Varrazzo + * + * This file is part of psycopg. + * + * psycopg2 is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * In addition, as a special exception, the copyright holders give + * permission to link this program with the OpenSSL library (or with + * modified versions of OpenSSL that use the same license as OpenSSL), + * and distribute linked combinations including the two. + * + * You must obey the GNU Lesser General Public License in all respects for + * all of the code used other than OpenSSL. + * + * psycopg2 is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + */ + +#define PY_SSIZE_T_CLEAN +#include +#include + +#define PSYCOPG_MODULE +#include "psycopg/config.h" +#include "psycopg/python.h" +#include "psycopg/psycopg.h" +#include "psycopg/notify.h" + +static PyMemberDef notify_members[] = { + { "pid", T_OBJECT, offsetof(NotifyObject, pid), RO }, + { "channel", T_OBJECT, offsetof(NotifyObject, channel), RO }, + { "payload", T_OBJECT, offsetof(NotifyObject, payload), RO }, + { NULL } +}; + +static PyObject * +notify_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) +{ + NotifyObject *self = (NotifyObject *)type->tp_alloc(type, 0); + + return (PyObject *)self; +} + +static int +notify_init(NotifyObject *self, PyObject *args, PyObject *kwargs) +{ + static char *kwlist[] = {"pid", "channel", "payload", NULL}; + PyObject *pid = NULL, *channel = NULL, *payload = NULL; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|O", kwlist, + &pid, &channel, &payload)) { + return -1; + } + + if (!payload) { + payload = Py_None; + } + + Py_CLEAR(self->pid); + Py_INCREF(pid); + self->pid = pid; + + Py_CLEAR(self->channel); + Py_INCREF(channel); + self->channel = channel; + + Py_CLEAR(self->payload); + Py_INCREF(payload); + self->payload = payload; + + return 0; +} + +static int +notify_traverse(NotifyObject *self, visitproc visit, void *arg) +{ + Py_VISIT(self->pid); + Py_VISIT(self->channel); + Py_VISIT(self->payload); + return 0; +} + +static void +notify_dealloc(NotifyObject *self) +{ + Py_CLEAR(self->pid); + Py_CLEAR(self->channel); + Py_CLEAR(self->payload); + + self->ob_type->tp_free((PyObject *)self); +} + +static void +notify_del(PyObject *self) +{ + PyObject_GC_Del(self); +} + +/* Notify can be accessed as a 2 items tuple for backward compatibility */ + +static Py_ssize_t +notify_len(NotifyObject *self) +{ + return 2; +} + +static PyObject * +notify_getitem(NotifyObject *self, Py_ssize_t item) +{ + if (item < 0) + item += 2; + + switch (item) { + case 0: + Py_INCREF(self->pid); + return self->pid; + case 1: + Py_INCREF(self->channel); + return self->channel; + default: + PyErr_SetString(PyExc_IndexError, "index out of range"); + return NULL; + } +} + +static PySequenceMethods notify_sequence = { + (lenfunc)notify_len, /* sq_length */ + 0, /* sq_concat */ + 0, /* sq_repeat */ + (ssizeargfunc)notify_getitem, /* sq_item */ + 0, /* sq_slice */ + 0, /* sq_ass_item */ + 0, /* sq_ass_slice */ + 0, /* sq_contains */ + 0, /* sq_inplace_concat */ + 0, /* sq_inplace_repeat */ +}; + + +static const char notify_doc[] = + "A notification received from the backend."; + +PyTypeObject NotifyType = { + PyObject_HEAD_INIT(NULL) + 0, + "psycopg2.extensions.Notify", + sizeof(NotifyObject), + 0, + (destructor)notify_dealloc, /* tp_dealloc */ + 0, /*tp_print*/ + + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + + 0, /*tp_compare*/ + + 0, /*tp_repr*/ + 0, /*tp_as_number*/ + ¬ify_sequence, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash */ + + 0, /*tp_call*/ + 0, /*tp_str*/ + + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + + Py_TPFLAGS_DEFAULT|Py_TPFLAGS_HAVE_GC, /*tp_flags*/ + notify_doc, /*tp_doc*/ + + (traverseproc)notify_traverse, /*tp_traverse*/ + 0, /*tp_clear*/ + + 0, /*tp_richcompare*/ + 0, /*tp_weaklistoffset*/ + + 0, /*tp_iter*/ + 0, /*tp_iternext*/ + + /* Attribute descriptor and subclassing stuff */ + + 0, /*tp_methods*/ + notify_members, /*tp_members*/ + 0, /*tp_getset*/ + 0, /*tp_base*/ + 0, /*tp_dict*/ + + 0, /*tp_descr_get*/ + 0, /*tp_descr_set*/ + 0, /*tp_dictoffset*/ + + (initproc)notify_init, /*tp_init*/ + 0, /*tp_alloc will be set to PyType_GenericAlloc in module init*/ + notify_new, /*tp_new*/ + (freefunc)notify_del, /*tp_free Low-level free-memory routine */ + 0, /*tp_is_gc For PyObject_IS_GC */ + 0, /*tp_bases*/ + 0, /*tp_mro method resolution order */ + 0, /*tp_cache*/ + 0, /*tp_subclasses*/ + 0 /*tp_weaklist*/ +}; + + diff --git a/psycopg/psycopgmodule.c b/psycopg/psycopgmodule.c index 8b47944d..783bf159 100644 --- a/psycopg/psycopgmodule.c +++ b/psycopg/psycopgmodule.c @@ -34,6 +34,7 @@ #include "psycopg/cursor.h" #include "psycopg/green.h" #include "psycopg/lobject.h" +#include "psycopg/notify.h" #include "psycopg/typecast.h" #include "psycopg/microprotocols.h" #include "psycopg/microprotocols_proto.h" @@ -732,6 +733,7 @@ init_psycopg(void) asisType.ob_type = &PyType_Type; listType.ob_type = &PyType_Type; chunkType.ob_type = &PyType_Type; + NotifyType.ob_type = &PyType_Type; if (PyType_Ready(&connectionType) == -1) return; if (PyType_Ready(&cursorType) == -1) return; @@ -745,6 +747,7 @@ init_psycopg(void) if (PyType_Ready(&asisType) == -1) return; if (PyType_Ready(&listType) == -1) return; if (PyType_Ready(&chunkType) == -1) return; + if (PyType_Ready(&NotifyType) == -1) return; #ifdef PSYCOPG_EXTENSIONS lobjectType.ob_type = &PyType_Type; @@ -850,6 +853,7 @@ init_psycopg(void) listType.tp_alloc = PyType_GenericAlloc; chunkType.tp_alloc = PyType_GenericAlloc; pydatetimeType.tp_alloc = PyType_GenericAlloc; + NotifyType.tp_alloc = PyType_GenericAlloc; #ifdef PSYCOPG_EXTENSIONS lobjectType.tp_alloc = PyType_GenericAlloc; diff --git a/setup.py b/setup.py index 4866c7e9..3119f1a0 100644 --- a/setup.py +++ b/setup.py @@ -346,7 +346,7 @@ sources = [ 'psycopgmodule.c', 'pqpath.c', 'typecast.c', 'microprotocols.c', 'microprotocols_proto.c', 'connection_type.c', 'connection_int.c', 'cursor_type.c', 'cursor_int.c', - 'lobject_type.c', 'lobject_int.c', + 'lobject_type.c', 'lobject_int.c', 'notify_type.c', 'adapter_qstring.c', 'adapter_pboolean.c', 'adapter_binary.c', 'adapter_asis.c', 'adapter_list.c', 'adapter_datetime.c', 'adapter_pfloat.c', 'adapter_pdecimal.c', diff --git a/tests/test_notify.py b/tests/test_notify.py index 8793aa21..acad27cd 100755 --- a/tests/test_notify.py +++ b/tests/test_notify.py @@ -1,5 +1,6 @@ #!/usr/bin/env python import unittest +import warnings import psycopg2 from psycopg2 import extensions @@ -34,8 +35,13 @@ class NotifiesTests(unittest.TestCase): curs.execute("LISTEN " + name) curs.close() - def notify(self, name, sec=0): + def notify(self, name, sec=0, payload=None): """Send a notification to the database, eventually after some time.""" + if payload is None: + payload = '' + else: + payload = ", %r" % payload + script = ("""\ import time time.sleep(%(sec)s) @@ -45,11 +51,11 @@ conn = psycopg2.connect(%(dsn)r) conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) print conn.get_backend_pid() curs = conn.cursor() -curs.execute("NOTIFY " %(name)r) +curs.execute("NOTIFY " %(name)r %(payload)r) curs.close() conn.close() """ - % { 'dsn': tests.dsn, 'sec': sec, 'name': name}) + % { 'dsn': tests.dsn, 'sec': sec, 'name': name, 'payload': payload}) return Popen([sys.executable, '-c', script], stdout=PIPE) @@ -99,6 +105,33 @@ conn.close() self.assertEqual(pid, self.conn.notifies[0][0]) self.assertEqual('foo', self.conn.notifies[0][1]) + def test_notify_attributes(self): + self.autocommit(self.conn) + self.listen('foo') + pid = int(self.notify('foo').communicate()[0]) + self.conn.poll() + self.assertEqual(1, len(self.conn.notifies)) + notify = self.conn.notifies[0] + self.assertEqual(pid, notify.pid) + self.assertEqual('foo', notify.channel) + self.assertEqual('', notify.payload) + + def test_notify_payload(self): + if self.conn.server_version < 90000: + warnings.warn("server version %s doesn't support notify payload: skipping test" + % self.conn.server_version) + return + self.autocommit(self.conn) + self.listen('foo') + pid = int(self.notify('foo', payload="Hello, world!").communicate()[0]) + self.conn.poll() + self.assertEqual(1, len(self.conn.notifies)) + notify = self.conn.notifies[0] + self.assertEqual(pid, notify.pid) + self.assertEqual('foo', notify.channel) + self.assertEqual('Hello, world!', notify.payload) + + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__)