Move replication connection to C level.

This commit is contained in:
Oleksandr Shulgin 2015-10-27 18:21:24 +01:00
parent 433fb957cb
commit fbcf99ad07
9 changed files with 296 additions and 53 deletions

View File

@ -62,7 +62,8 @@ from psycopg2._psycopg import string_types, binary_types, new_type, new_array_ty
from psycopg2._psycopg import ISQLQuote, Notify, Diagnostics, Column
from psycopg2._psycopg import QueryCanceledError, TransactionRollbackError
from psycopg2._psycopg import ReplicationCursor, ReplicationMessage
from psycopg2._psycopg import REPLICATION_PHYSICAL, REPLICATION_LOGICAL
from psycopg2._psycopg import ReplicationConnection, ReplicationCursor, ReplicationMessage
try:
from psycopg2._psycopg import set_wait_callback, get_wait_callback

View File

@ -39,6 +39,8 @@ import psycopg2
from psycopg2 import extensions as _ext
from psycopg2.extensions import cursor as _cursor
from psycopg2.extensions import connection as _connection
from psycopg2.extensions import REPLICATION_PHYSICAL, REPLICATION_LOGICAL
from psycopg2.extensions import ReplicationConnection as _replicationConnection
from psycopg2.extensions import ReplicationCursor as _replicationCursor
from psycopg2.extensions import ReplicationMessage
from psycopg2.extensions import adapt as _A, quote_ident
@ -439,65 +441,28 @@ class MinTimeLoggingCursor(LoggingCursor):
return LoggingCursor.callproc(self, procname, vars)
"""Replication connection types."""
REPLICATION_LOGICAL = "LOGICAL"
REPLICATION_PHYSICAL = "PHYSICAL"
class ReplicationConnectionBase(_connection):
class ReplicationConnectionBase(_replicationConnection):
"""
Base class for Logical and Physical replication connection
classes. Uses `ReplicationCursor` automatically.
"""
def __init__(self, *args, **kwargs):
"""
Initializes a replication connection by adding appropriate
parameters to the provided DSN and tweaking the connection
attributes.
"""
# replication_type is set in subclasses
if self.replication_type == REPLICATION_LOGICAL:
replication = 'database'
elif self.replication_type == REPLICATION_PHYSICAL:
replication = 'true'
else:
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % self.replication_type)
items = _ext.parse_dsn(args[0])
# we add an appropriate replication keyword parameter, unless
# user has specified one explicitly in the DSN
items.setdefault('replication', replication)
dsn = " ".join(["%s=%s" % (k, psycopg2._param_escape(str(v)))
for (k, v) in items.iteritems()])
args = [dsn] + list(args[1:]) # async is the possible 2nd arg
super(ReplicationConnectionBase, self).__init__(*args, **kwargs)
# prevent auto-issued BEGIN statements
if not self.async:
self.autocommit = True
if self.cursor_factory is None:
self.cursor_factory = ReplicationCursor
self.cursor_factory = ReplicationCursor
class LogicalReplicationConnection(ReplicationConnectionBase):
def __init__(self, *args, **kwargs):
self.replication_type = REPLICATION_LOGICAL
kwargs['replication_type'] = REPLICATION_LOGICAL
super(LogicalReplicationConnection, self).__init__(*args, **kwargs)
class PhysicalReplicationConnection(ReplicationConnectionBase):
def __init__(self, *args, **kwargs):
self.replication_type = REPLICATION_PHYSICAL
kwargs['replication_type'] = REPLICATION_PHYSICAL
super(PhysicalReplicationConnection, self).__init__(*args, **kwargs)
@ -528,16 +493,16 @@ class ReplicationCursor(_replicationCursor):
if output_plugin is None:
raise psycopg2.ProgrammingError("output plugin name is required to create logical replication slot")
command += "%s %s" % (slot_type, quote_ident(output_plugin, self))
command += "LOGICAL %s" % quote_ident(output_plugin, self)
elif slot_type == REPLICATION_PHYSICAL:
if output_plugin is not None:
raise psycopg2.ProgrammingError("cannot specify output plugin name when creating physical replication slot")
command += slot_type
command += "PHYSICAL"
else:
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % slot_type)
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % repr(slot_type))
self.execute(command)
@ -562,7 +527,7 @@ class ReplicationCursor(_replicationCursor):
else:
raise psycopg2.ProgrammingError("slot name is required for logical replication")
command += "%s " % slot_type
command += "LOGICAL "
elif slot_type == REPLICATION_PHYSICAL:
if slot_name:
@ -570,7 +535,7 @@ class ReplicationCursor(_replicationCursor):
# don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX
else:
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % slot_type)
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % repr(slot_type))
if type(start_lsn) is str:
lsn = start_lsn.split('/')

View File

@ -120,6 +120,7 @@ typedef struct connectionObject connectionObject;
typedef struct replicationMessageObject replicationMessageObject;
/* some utility functions */
HIDDEN PyObject *parse_arg(int pos, char *name, PyObject *defval, PyObject *args, PyObject *kwargs);
HIDDEN PyObject *psyco_parse_args(PyObject *self, PyObject *args, PyObject *kwargs);
HIDDEN PyObject *psyco_parse_dsn(PyObject *self, PyObject *args, PyObject *kwargs);
HIDDEN PyObject *psyco_make_dsn(PyObject *self, PyObject *args, PyObject *kwargs);

View File

@ -28,6 +28,7 @@
#include "psycopg/connection.h"
#include "psycopg/cursor.h"
#include "psycopg/replication_connection.h"
#include "psycopg/replication_cursor.h"
#include "psycopg/replication_message.h"
#include "psycopg/green.h"
@ -74,7 +75,7 @@ HIDDEN PyObject *psyco_DescriptionType = NULL;
/* finds a keyword or positional arg (pops it from kwargs if found there) */
static PyObject *
PyObject *
parse_arg(int pos, char *name, PyObject *defval, PyObject *args, PyObject *kwargs)
{
Py_ssize_t nargs = PyTuple_GET_SIZE(args);
@ -1114,6 +1115,9 @@ INIT_MODULE(_psycopg)(void)
Py_TYPE(&cursorType) = &PyType_Type;
if (PyType_Ready(&cursorType) == -1) goto exit;
Py_TYPE(&replicationConnectionType) = &PyType_Type;
if (PyType_Ready(&replicationConnectionType) == -1) goto exit;
Py_TYPE(&replicationCursorType) = &PyType_Type;
if (PyType_Ready(&replicationCursorType) == -1) goto exit;
@ -1237,6 +1241,8 @@ INIT_MODULE(_psycopg)(void)
PyModule_AddStringConstant(module, "__version__", PSYCOPG_VERSION);
PyModule_AddStringConstant(module, "__doc__", "psycopg PostgreSQL driver");
PyModule_AddIntConstant(module, "__libpq_version__", PG_VERSION_NUM);
PyModule_AddIntMacro(module, REPLICATION_PHYSICAL);
PyModule_AddIntMacro(module, REPLICATION_LOGICAL);
PyModule_AddObject(module, "apilevel", Text_FromUTF8(APILEVEL));
PyModule_AddObject(module, "threadsafety", PyInt_FromLong(THREADSAFETY));
PyModule_AddObject(module, "paramstyle", Text_FromUTF8(PARAMSTYLE));
@ -1244,6 +1250,7 @@ INIT_MODULE(_psycopg)(void)
/* put new types in module dictionary */
PyModule_AddObject(module, "connection", (PyObject*)&connectionType);
PyModule_AddObject(module, "cursor", (PyObject*)&cursorType);
PyModule_AddObject(module, "ReplicationConnection", (PyObject*)&replicationConnectionType);
PyModule_AddObject(module, "ReplicationCursor", (PyObject*)&replicationCursorType);
PyModule_AddObject(module, "ReplicationMessage", (PyObject*)&replicationMessageType);
PyModule_AddObject(module, "ISQLQuote", (PyObject*)&isqlquoteType);
@ -1285,6 +1292,9 @@ INIT_MODULE(_psycopg)(void)
if (0 != psyco_errors_init()) { goto exit; }
psyco_errors_fill(dict);
replicationPhysicalConst = PyDict_GetItemString(dict, "REPLICATION_PHYSICAL");
replicationLogicalConst = PyDict_GetItemString(dict, "REPLICATION_LOGICAL");
Dprintf("initpsycopg: module initialization complete");
exit:

View File

@ -0,0 +1,53 @@
/* replication_connection.h - definition for the psycopg replication connection type
*
* Copyright (C) 2015 Daniele Varrazzo <daniele.varrazzo@gmail.com>
*
* This file is part of psycopg.
*
* psycopg2 is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* In addition, as a special exception, the copyright holders give
* permission to link this program with the OpenSSL library (or with
* modified versions of OpenSSL that use the same license as OpenSSL),
* and distribute linked combinations including the two.
*
* You must obey the GNU Lesser General Public License in all respects for
* all of the code used other than OpenSSL.
*
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*/
#ifndef PSYCOPG_REPLICATION_CONNECTION_H
#define PSYCOPG_REPLICATION_CONNECTION_H 1
#include "psycopg/connection.h"
#ifdef __cplusplus
extern "C" {
#endif
extern HIDDEN PyTypeObject replicationConnectionType;
typedef struct replicationConnectionObject {
connectionObject conn;
long int type;
} replicationConnectionObject;
#define REPLICATION_PHYSICAL 1
#define REPLICATION_LOGICAL 2
extern HIDDEN PyObject *replicationPhysicalConst;
extern HIDDEN PyObject *replicationLogicalConst;
#ifdef __cplusplus
}
#endif
#endif /* !defined(PSYCOPG_REPLICATION_CONNECTION_H) */

View File

@ -0,0 +1,210 @@
/* replication_connection_type.c - python interface to replication connection objects
*
* Copyright (C) 2015 Daniele Varrazzo <daniele.varrazzo@gmail.com>
*
* This file is part of psycopg.
*
* psycopg2 is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* In addition, as a special exception, the copyright holders give
* permission to link this program with the OpenSSL library (or with
* modified versions of OpenSSL that use the same license as OpenSSL),
* and distribute linked combinations including the two.
*
* You must obey the GNU Lesser General Public License in all respects for
* all of the code used other than OpenSSL.
*
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*/
#define PSYCOPG_MODULE
#include "psycopg/psycopg.h"
#include "psycopg/replication_connection.h"
#include "psycopg/replication_message.h"
#include "psycopg/green.h"
#include "psycopg/pqpath.h"
#include <string.h>
#include <stdlib.h>
#define psyco_repl_conn_type_doc \
"replication_type -- the replication connection type"
static PyObject *
psyco_repl_conn_get_type(replicationConnectionObject *self)
{
connectionObject *conn = &self->conn;
PyObject *res = NULL;
EXC_IF_CONN_CLOSED(conn);
if (self->type == REPLICATION_PHYSICAL) {
res = replicationPhysicalConst;
} else if (self->type == REPLICATION_LOGICAL) {
res = replicationLogicalConst;
} else {
PyErr_Format(PyExc_TypeError, "unknown replication type constant: %ld", self->type);
}
Py_XINCREF(res);
return res;
}
static int
replicationConnection_init(PyObject *obj, PyObject *args, PyObject *kwargs)
{
replicationConnectionObject *self = (replicationConnectionObject *)obj;
PyObject *dsn = NULL;
PyObject *async = NULL;
PyObject *tmp = NULL;
const char *repl = NULL;
int ret = -1;
Py_XINCREF(args);
Py_XINCREF(kwargs);
/* dsn, async, replication_type */
if (!(dsn = parse_arg(0, "dsn", Py_None, args, kwargs))) { goto exit; }
if (!(async = parse_arg(1, "async", Py_False, args, kwargs))) { goto exit; }
if (!(tmp = parse_arg(2, "replication_type", Py_None, args, kwargs))) { goto exit; }
if (tmp == replicationPhysicalConst) {
self->type = REPLICATION_PHYSICAL;
repl = "true";
} else if (tmp == replicationLogicalConst) {
self->type = REPLICATION_LOGICAL;
repl = "database";
} else {
PyErr_SetString(PyExc_TypeError,
"replication_type must be either REPLICATION_PHYSICAL or REPLICATION_LOGICAL");
goto exit;
}
Py_DECREF(tmp);
tmp = NULL;
if (dsn != Py_None) {
if (kwargs && PyMapping_Size(kwargs) > 0) {
PyErr_SetString(PyExc_TypeError, "both dsn and parameters given");
goto exit;
} else {
if (!(tmp = PyTuple_Pack(1, dsn))) { goto exit; }
Py_XDECREF(kwargs);
if (!(kwargs = psyco_parse_dsn(NULL, tmp, NULL))) { goto exit; }
}
} else {
if (!(kwargs && PyMapping_Size(kwargs) > 0)) {
PyErr_SetString(PyExc_TypeError, "missing dsn and no parameters");
goto exit;
}
}
if (!PyMapping_HasKeyString(kwargs, "replication")) {
PyMapping_SetItemString(kwargs, "replication", Text_FromUTF8(repl));
}
Py_DECREF(dsn);
if (!(dsn = psyco_make_dsn(NULL, NULL, kwargs))) { goto exit; }
Py_DECREF(args);
Py_DECREF(kwargs);
kwargs = NULL;
if (!(args = PyTuple_Pack(2, dsn, async))) { goto exit; }
if ((ret = connectionType.tp_init(obj, args, NULL)) < 0) { goto exit; }
self->conn.autocommit = 1;
self->conn.cursor_factory = (PyObject *)&replicationCursorType;
Py_INCREF(self->conn.cursor_factory);
exit:
Py_XDECREF(tmp);
Py_XDECREF(dsn);
Py_XDECREF(async);
Py_XDECREF(args);
Py_XDECREF(kwargs);
return ret;
}
static PyObject *
replicationConnection_repr(replicationConnectionObject *self)
{
return PyString_FromFormat(
"<ReplicationConnection object at %p; dsn: '%s', closed: %ld>",
self, self->conn.dsn, self->conn.closed);
}
/* object calculated member list */
static struct PyGetSetDef replicationConnectionObject_getsets[] = {
/* override to prevent user tweaking these: */
{ "autocommit", NULL, NULL, NULL },
{ "isolation_level", NULL, NULL, NULL },
{ "set_session", NULL, NULL, NULL },
{ "set_isolation_level", NULL, NULL, NULL },
{ "reset", NULL, NULL, NULL },
/* an actual getter */
{ "replication_type",
(getter)psyco_repl_conn_get_type, NULL,
psyco_repl_conn_type_doc, NULL },
{NULL}
};
/* object type */
#define replicationConnectionType_doc \
"A replication connection."
PyTypeObject replicationConnectionType = {
PyVarObject_HEAD_INIT(NULL, 0)
"psycopg2.extensions.ReplicationConnection",
sizeof(replicationConnectionObject), 0,
0, /*tp_dealloc*/
0, /*tp_print*/
0, /*tp_getattr*/
0, /*tp_setattr*/
0, /*tp_compare*/
(reprfunc)replicationConnection_repr, /*tp_repr*/
0, /*tp_as_number*/
0, /*tp_as_sequence*/
0, /*tp_as_mapping*/
0, /*tp_hash*/
0, /*tp_call*/
(reprfunc)replicationConnection_repr, /*tp_str*/
0, /*tp_getattro*/
0, /*tp_setattro*/
0, /*tp_as_buffer*/
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER |
Py_TPFLAGS_HAVE_GC, /*tp_flags*/
replicationConnectionType_doc, /*tp_doc*/
0, /*tp_traverse*/
0, /*tp_clear*/
0, /*tp_richcompare*/
0, /*tp_weaklistoffset*/
0, /*tp_iter*/
0, /*tp_iternext*/
0, /*tp_methods*/
0, /*tp_members*/
replicationConnectionObject_getsets, /*tp_getset*/
&connectionType, /*tp_base*/
0, /*tp_dict*/
0, /*tp_descr_get*/
0, /*tp_descr_set*/
0, /*tp_dictoffset*/
replicationConnection_init, /*tp_init*/
0, /*tp_alloc*/
0, /*tp_new*/
};
PyObject *replicationPhysicalConst;
PyObject *replicationLogicalConst;

View File

@ -92,6 +92,7 @@
<None Include="psycopg\pqpath.h" />
<None Include="psycopg\psycopg.h" />
<None Include="psycopg\python.h" />
<None Include="psycopg\replication_connection.h" />
<None Include="psycopg\replication_cursor.h" />
<None Include="psycopg\replication_message.h" />
<None Include="psycopg\typecast.h" />
@ -227,6 +228,7 @@
<Compile Include="psycopg\microprotocols_proto.c" />
<Compile Include="psycopg\pqpath.c" />
<Compile Include="psycopg\psycopgmodule.c" />
<Compile Include="psycopg\replication_connection_type.c" />
<Compile Include="psycopg\replication_cursor_type.c" />
<Compile Include="psycopg\replication_message_type.c" />
<Compile Include="psycopg\typecast.c" />

View File

@ -466,7 +466,9 @@ sources = [
'connection_int.c', 'connection_type.c',
'cursor_int.c', 'cursor_type.c',
'replication_cursor_type.c', 'replication_message_type.c',
'replication_connection_type.c',
'replication_cursor_type.c',
'replication_message_type.c',
'diagnostics_type.c', 'error_type.c',
'lobject_int.c', 'lobject_type.c',
'notify_type.c', 'xid_type.c',
@ -482,7 +484,9 @@ depends = [
# headers
'config.h', 'pgtypes.h', 'psycopg.h', 'python.h', 'connection.h',
'cursor.h', 'diagnostics.h', 'error.h', 'green.h', 'lobject.h',
'replication_cursor.h', 'replication_message.h',
'replication_connection.h',
'replication_cursor.h',
'replication_message.h',
'notify.h', 'pqpath.h', 'xid.h',
'libpq_support.h', 'win32_support.h',

View File

@ -129,9 +129,6 @@ class ConnectingTestCase(unittest.TestCase):
conn = self.connect(**kwargs)
except psycopg2.OperationalError, e:
return self.skipTest("replication db not configured: %s" % e)
if not conn.async:
conn.autocommit = True
return conn
def _get_conn(self):