Adding Notify object with payload.

This commit is contained in:
Daniele Varrazzo 2010-10-15 09:42:44 +01:00
parent e651308287
commit f435d15c95
6 changed files with 333 additions and 12 deletions

View File

@ -34,6 +34,7 @@
#include "psycopg/cursor.h"
#include "psycopg/pqpath.h"
#include "psycopg/green.h"
#include "psycopg/notify.h"
/* conn_notice_callback - process notices */
@ -133,21 +134,47 @@ conn_notice_clean(connectionObject *self)
void
conn_notifies_process(connectionObject *self)
{
PGnotify *pgn;
PGnotify *pgn = NULL;
PyObject *notify = NULL;
PyObject *pid = NULL, *channel = NULL, *payload = NULL;
/* TODO: we are called without the lock! */
while ((pgn = PQnotifies(self->pgconn)) != NULL) {
PyObject *notify;
Dprintf("conn_notifies_process: got NOTIFY from pid %d, msg = %s",
(int) pgn->be_pid, pgn->relname);
notify = PyTuple_New(2);
PyTuple_SET_ITEM(notify, 0, PyInt_FromLong((long)pgn->be_pid));
PyTuple_SET_ITEM(notify, 1, PyString_FromString(pgn->relname));
PyList_Append(self->notifies, notify);
Py_DECREF(notify);
PQfreemem(pgn);
if (!(pid = PyInt_FromLong((long)pgn->be_pid))) { goto error; }
if (!(channel = PyString_FromString(pgn->relname))) { goto error; }
if (!(payload = PyString_FromString(pgn->extra))) { goto error; }
if (!(notify = PyObject_CallFunctionObjArgs((PyObject *)&NotifyType,
pid, channel, payload, NULL))) {
goto error;
}
Py_DECREF(pid); pid = NULL;
Py_DECREF(channel); channel = NULL;
Py_DECREF(payload); payload = NULL;
PyList_Append(self->notifies, (PyObject *)notify);
Py_DECREF(notify); notify = NULL;
PQfreemem(pgn); pgn = NULL;
}
return; /* no error */
error:
if (pgn) { PQfreemem(pgn); }
Py_XDECREF(notify);
Py_XDECREF(pid);
Py_XDECREF(channel);
Py_XDECREF(payload);
/* TODO: callers currently don't expect an error from us */
PyErr_Clear();
}

44
psycopg/notify.h Normal file
View File

@ -0,0 +1,44 @@
/* notify.h - definition for the psycopg Notify type
*
* Copyright (C) 2010 Daniele Varrazzo <daniele.varrazzo@gmail.com>
*
* This file is part of psycopg.
*
* psycopg2 is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* In addition, as a special exception, the copyright holders give
* permission to link this program with the OpenSSL library (or with
* modified versions of OpenSSL that use the same license as OpenSSL),
* and distribute linked combinations including the two.
*
* You must obey the GNU Lesser General Public License in all respects for
* all of the code used other than OpenSSL.
*
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*/
#ifndef PSYCOPG_NOTIFY_H
#define PSYCOPG_NOTIFY_H 1
#include <Python.h>
#include "psycopg/config.h"
extern HIDDEN PyTypeObject NotifyType;
typedef struct {
PyObject_HEAD
PyObject *pid;
PyObject *channel;
PyObject *payload;
} NotifyObject;
#endif /* PSYCOPG_NOTIFY_H */

213
psycopg/notify_type.c Normal file
View File

@ -0,0 +1,213 @@
/* notify_type.c - python interface to Notify objects
*
* Copyright (C) 2010 Daniele Varrazzo <daniele.varrazzo@gmail.com>
*
* This file is part of psycopg.
*
* psycopg2 is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* In addition, as a special exception, the copyright holders give
* permission to link this program with the OpenSSL library (or with
* modified versions of OpenSSL that use the same license as OpenSSL),
* and distribute linked combinations including the two.
*
* You must obey the GNU Lesser General Public License in all respects for
* all of the code used other than OpenSSL.
*
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*/
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include <structmember.h>
#define PSYCOPG_MODULE
#include "psycopg/config.h"
#include "psycopg/python.h"
#include "psycopg/psycopg.h"
#include "psycopg/notify.h"
static PyMemberDef notify_members[] = {
{ "pid", T_OBJECT, offsetof(NotifyObject, pid), RO },
{ "channel", T_OBJECT, offsetof(NotifyObject, channel), RO },
{ "payload", T_OBJECT, offsetof(NotifyObject, payload), RO },
{ NULL }
};
static PyObject *
notify_new(PyTypeObject *type, PyObject *args, PyObject *kwargs)
{
NotifyObject *self = (NotifyObject *)type->tp_alloc(type, 0);
return (PyObject *)self;
}
static int
notify_init(NotifyObject *self, PyObject *args, PyObject *kwargs)
{
static char *kwlist[] = {"pid", "channel", "payload", NULL};
PyObject *pid = NULL, *channel = NULL, *payload = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|O", kwlist,
&pid, &channel, &payload)) {
return -1;
}
if (!payload) {
payload = Py_None;
}
Py_CLEAR(self->pid);
Py_INCREF(pid);
self->pid = pid;
Py_CLEAR(self->channel);
Py_INCREF(channel);
self->channel = channel;
Py_CLEAR(self->payload);
Py_INCREF(payload);
self->payload = payload;
return 0;
}
static int
notify_traverse(NotifyObject *self, visitproc visit, void *arg)
{
Py_VISIT(self->pid);
Py_VISIT(self->channel);
Py_VISIT(self->payload);
return 0;
}
static void
notify_dealloc(NotifyObject *self)
{
Py_CLEAR(self->pid);
Py_CLEAR(self->channel);
Py_CLEAR(self->payload);
self->ob_type->tp_free((PyObject *)self);
}
static void
notify_del(PyObject *self)
{
PyObject_GC_Del(self);
}
/* Notify can be accessed as a 2 items tuple for backward compatibility */
static Py_ssize_t
notify_len(NotifyObject *self)
{
return 2;
}
static PyObject *
notify_getitem(NotifyObject *self, Py_ssize_t item)
{
if (item < 0)
item += 2;
switch (item) {
case 0:
Py_INCREF(self->pid);
return self->pid;
case 1:
Py_INCREF(self->channel);
return self->channel;
default:
PyErr_SetString(PyExc_IndexError, "index out of range");
return NULL;
}
}
static PySequenceMethods notify_sequence = {
(lenfunc)notify_len, /* sq_length */
0, /* sq_concat */
0, /* sq_repeat */
(ssizeargfunc)notify_getitem, /* sq_item */
0, /* sq_slice */
0, /* sq_ass_item */
0, /* sq_ass_slice */
0, /* sq_contains */
0, /* sq_inplace_concat */
0, /* sq_inplace_repeat */
};
static const char notify_doc[] =
"A notification received from the backend.";
PyTypeObject NotifyType = {
PyObject_HEAD_INIT(NULL)
0,
"psycopg2.extensions.Notify",
sizeof(NotifyObject),
0,
(destructor)notify_dealloc, /* tp_dealloc */
0, /*tp_print*/
0, /*tp_getattr*/
0, /*tp_setattr*/
0, /*tp_compare*/
0, /*tp_repr*/
0, /*tp_as_number*/
&notify_sequence, /*tp_as_sequence*/
0, /*tp_as_mapping*/
0, /*tp_hash */
0, /*tp_call*/
0, /*tp_str*/
0, /*tp_getattro*/
0, /*tp_setattro*/
0, /*tp_as_buffer*/
Py_TPFLAGS_DEFAULT|Py_TPFLAGS_HAVE_GC, /*tp_flags*/
notify_doc, /*tp_doc*/
(traverseproc)notify_traverse, /*tp_traverse*/
0, /*tp_clear*/
0, /*tp_richcompare*/
0, /*tp_weaklistoffset*/
0, /*tp_iter*/
0, /*tp_iternext*/
/* Attribute descriptor and subclassing stuff */
0, /*tp_methods*/
notify_members, /*tp_members*/
0, /*tp_getset*/
0, /*tp_base*/
0, /*tp_dict*/
0, /*tp_descr_get*/
0, /*tp_descr_set*/
0, /*tp_dictoffset*/
(initproc)notify_init, /*tp_init*/
0, /*tp_alloc will be set to PyType_GenericAlloc in module init*/
notify_new, /*tp_new*/
(freefunc)notify_del, /*tp_free Low-level free-memory routine */
0, /*tp_is_gc For PyObject_IS_GC */
0, /*tp_bases*/
0, /*tp_mro method resolution order */
0, /*tp_cache*/
0, /*tp_subclasses*/
0 /*tp_weaklist*/
};

View File

@ -34,6 +34,7 @@
#include "psycopg/cursor.h"
#include "psycopg/green.h"
#include "psycopg/lobject.h"
#include "psycopg/notify.h"
#include "psycopg/typecast.h"
#include "psycopg/microprotocols.h"
#include "psycopg/microprotocols_proto.h"
@ -732,6 +733,7 @@ init_psycopg(void)
asisType.ob_type = &PyType_Type;
listType.ob_type = &PyType_Type;
chunkType.ob_type = &PyType_Type;
NotifyType.ob_type = &PyType_Type;
if (PyType_Ready(&connectionType) == -1) return;
if (PyType_Ready(&cursorType) == -1) return;
@ -745,6 +747,7 @@ init_psycopg(void)
if (PyType_Ready(&asisType) == -1) return;
if (PyType_Ready(&listType) == -1) return;
if (PyType_Ready(&chunkType) == -1) return;
if (PyType_Ready(&NotifyType) == -1) return;
#ifdef PSYCOPG_EXTENSIONS
lobjectType.ob_type = &PyType_Type;
@ -850,6 +853,7 @@ init_psycopg(void)
listType.tp_alloc = PyType_GenericAlloc;
chunkType.tp_alloc = PyType_GenericAlloc;
pydatetimeType.tp_alloc = PyType_GenericAlloc;
NotifyType.tp_alloc = PyType_GenericAlloc;
#ifdef PSYCOPG_EXTENSIONS
lobjectType.tp_alloc = PyType_GenericAlloc;

View File

@ -346,7 +346,7 @@ sources = [
'psycopgmodule.c', 'pqpath.c', 'typecast.c',
'microprotocols.c', 'microprotocols_proto.c',
'connection_type.c', 'connection_int.c', 'cursor_type.c', 'cursor_int.c',
'lobject_type.c', 'lobject_int.c',
'lobject_type.c', 'lobject_int.c', 'notify_type.c',
'adapter_qstring.c', 'adapter_pboolean.c', 'adapter_binary.c',
'adapter_asis.c', 'adapter_list.c', 'adapter_datetime.c',
'adapter_pfloat.c', 'adapter_pdecimal.c',

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python
import unittest
import warnings
import psycopg2
from psycopg2 import extensions
@ -34,8 +35,13 @@ class NotifiesTests(unittest.TestCase):
curs.execute("LISTEN " + name)
curs.close()
def notify(self, name, sec=0):
def notify(self, name, sec=0, payload=None):
"""Send a notification to the database, eventually after some time."""
if payload is None:
payload = ''
else:
payload = ", %r" % payload
script = ("""\
import time
time.sleep(%(sec)s)
@ -45,11 +51,11 @@ conn = psycopg2.connect(%(dsn)r)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
print conn.get_backend_pid()
curs = conn.cursor()
curs.execute("NOTIFY " %(name)r)
curs.execute("NOTIFY " %(name)r %(payload)r)
curs.close()
conn.close()
"""
% { 'dsn': tests.dsn, 'sec': sec, 'name': name})
% { 'dsn': tests.dsn, 'sec': sec, 'name': name, 'payload': payload})
return Popen([sys.executable, '-c', script], stdout=PIPE)
@ -99,6 +105,33 @@ conn.close()
self.assertEqual(pid, self.conn.notifies[0][0])
self.assertEqual('foo', self.conn.notifies[0][1])
def test_notify_attributes(self):
self.autocommit(self.conn)
self.listen('foo')
pid = int(self.notify('foo').communicate()[0])
self.conn.poll()
self.assertEqual(1, len(self.conn.notifies))
notify = self.conn.notifies[0]
self.assertEqual(pid, notify.pid)
self.assertEqual('foo', notify.channel)
self.assertEqual('', notify.payload)
def test_notify_payload(self):
if self.conn.server_version < 90000:
warnings.warn("server version %s doesn't support notify payload: skipping test"
% self.conn.server_version)
return
self.autocommit(self.conn)
self.listen('foo')
pid = int(self.notify('foo', payload="Hello, world!").communicate()[0])
self.conn.poll()
self.assertEqual(1, len(self.conn.notifies))
notify = self.conn.notifies[0]
self.assertEqual(pid, notify.pid)
self.assertEqual('foo', notify.channel)
self.assertEqual('Hello, world!', notify.payload)
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)