diff --git a/NEWS b/NEWS index 7e6fba48..fac54783 100644 --- a/NEWS +++ b/NEWS @@ -6,6 +6,8 @@ What's new in psycopg 2.7 New features: +- Added :ref:`replication-support` (:ticket:`#322`). Main authors are + Oleksandr Shulgin and Craig Ringer, who deserve a huge thank you. - Added `~psycopg2.extensions.parse_dsn()` and `~psycopg2.extensions.make_dsn()` functions (:tickets:`#321, #363`). `~psycopg2.connect()` now can take both *dsn* and keyword arguments, merging diff --git a/doc/src/advanced.rst b/doc/src/advanced.rst index f2e279f8..5b5fb354 100644 --- a/doc/src/advanced.rst +++ b/doc/src/advanced.rst @@ -423,7 +423,7 @@ this will be probably implemented in a future release. Support for coroutine libraries ------------------------------- -.. versionadded:: 2.2.0 +.. versionadded:: 2.2 Psycopg can be used together with coroutine_\-based libraries and participate in cooperative multithreading. @@ -509,3 +509,90 @@ resources about the topic. conn.commit() cur.close() conn.close() + + + +.. index:: + single: Replication + +.. _replication-support: + +Replication protocol support +---------------------------- + +.. versionadded:: 2.7 + +Modern PostgreSQL servers (version 9.0 and above) support replication. The +replication protocol is built on top of the client-server protocol and can be +operated using ``libpq``, as such it can be also operated by ``psycopg2``. +The replication protocol can be operated on both synchronous and +:ref:`asynchronous ` connections. + +Server version 9.4 adds a new feature called *Logical Replication*. + +.. seealso:: + + - PostgreSQL `Streaming Replication Protocol`__ + + .. __: http://www.postgresql.org/docs/current/static/protocol-replication.html + + +Logical replication Quick-Start +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +You must be using PostgreSQL server version 9.4 or above to run this quick +start. + +Make sure that replication connections are permitted for user ``postgres`` in +``pg_hba.conf`` and reload the server configuration. You also need to set +``wal_level=logical`` and ``max_wal_senders``, ``max_replication_slots`` to +value greater than zero in ``postgresql.conf`` (these changes require a server +restart). Create a database ``psycopg2_test``. + +Then run the following code to quickly try the replication support out. This +is not production code -- it has no error handling, it sends feedback too +often, etc. -- and it's only intended as a simple demo of logical +replication:: + + from __future__ import print_function + import sys + import psycopg2 + import psycopg2.extras + + conn = psycopg2.connect('dbname=psycopg2_test user=postgres', + connection_factory=psycopg2.extras.LogicalReplicationConnection) + cur = conn.cursor() + try: + # test_decoding produces textual output + cur.start_replication(slot_name='pytest', decode=True) + except psycopg2.ProgrammingError: + cur.create_replication_slot('pytest', output_plugin='test_decoding') + cur.start_replication(slot_name='pytest', decode=True) + + class DemoConsumer(object): + def __call__(self, msg): + print(msg.payload) + msg.cursor.send_feedback(flush_lsn=msg.data_start) + + democonsumer = DemoConsumer() + + print("Starting streaming, press Control-C to end...", file=sys.stderr) + try: + cur.consume_stream(democonsumer) + except KeyboardInterrupt: + cur.close() + conn.close() + print("The slot 'pytest' still exists. Drop it with " + "SELECT pg_drop_replication_slot('pytest'); if no longer needed.", + file=sys.stderr) + print("WARNING: Transaction logs will accumulate in pg_xlog " + "until the slot is dropped.", file=sys.stderr) + + +You can now make changes to the ``psycopg2_test`` database using a normal +psycopg2 session, ``psql``, etc. and see the logical decoding stream printed +by this demo client. + +This will continue running until terminated with ``Control-C``. + +For the details see :ref:`replication-objects`. diff --git a/doc/src/extras.rst b/doc/src/extras.rst index 0e21ae58..78e96efe 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -143,6 +143,374 @@ Logging cursor +.. _replication-objects: + +Replication connection and cursor classes +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +See :ref:`replication-support` for an introduction to the topic. + + +The following replication types are defined: + +.. data:: REPLICATION_LOGICAL +.. data:: REPLICATION_PHYSICAL + + +.. index:: + pair: Connection; replication + +.. autoclass:: LogicalReplicationConnection + + This connection factory class can be used to open a special type of + connection that is used for logical replication. + + Example:: + + from psycopg2.extras import LogicalReplicationConnection + log_conn = psycopg2.connect(dsn, connection_factory=LogicalReplicationConnection) + log_cur = log_conn.cursor() + + +.. autoclass:: PhysicalReplicationConnection + + This connection factory class can be used to open a special type of + connection that is used for physical replication. + + Example:: + + from psycopg2.extras import PhysicalReplicationConnection + phys_conn = psycopg2.connect(dsn, connection_factory=PhysicalReplicationConnection) + phys_cur = phys_conn.cursor() + + Both `LogicalReplicationConnection` and `PhysicalReplicationConnection` use + `ReplicationCursor` for actual communication with the server. + + +.. index:: + pair: Message; replication + +The individual messages in the replication stream are represented by +`ReplicationMessage` objects (both logical and physical type): + +.. autoclass:: ReplicationMessage + + .. attribute:: payload + + The actual data received from the server. + + An instance of either `bytes()` or `unicode()`, depending on the value + of `decode` option passed to `~ReplicationCursor.start_replication()` + on the connection. See `~ReplicationCursor.read_message()` for + details. + + .. attribute:: data_size + + The raw size of the message payload (before possible unicode + conversion). + + .. attribute:: data_start + + LSN position of the start of the message. + + .. attribute:: wal_end + + LSN position of the current end of WAL on the server. + + .. attribute:: send_time + + A `~datetime` object representing the server timestamp at the moment + when the message was sent. + + .. attribute:: cursor + + A reference to the corresponding `ReplicationCursor` object. + + +.. index:: + pair: Cursor; replication + +.. autoclass:: ReplicationCursor + + .. method:: create_replication_slot(slot_name, slot_type=None, output_plugin=None) + + Create streaming replication slot. + + :param slot_name: name of the replication slot to be created + :param slot_type: type of replication: should be either + `REPLICATION_LOGICAL` or `REPLICATION_PHYSICAL` + :param output_plugin: name of the logical decoding output plugin to be + used by the slot; required for logical + replication connections, disallowed for physical + + Example:: + + log_cur.create_replication_slot("logical1", "test_decoding") + phys_cur.create_replication_slot("physical1") + + # either logical or physical replication connection + cur.create_replication_slot("slot1", slot_type=REPLICATION_LOGICAL) + + When creating a slot on a logical replication connection, a logical + replication slot is created by default. Logical replication requires + name of the logical decoding output plugin to be specified. + + When creating a slot on a physical replication connection, a physical + replication slot is created by default. No output plugin parameter is + required or allowed when creating a physical replication slot. + + In either case the type of slot being created can be specified + explicitly using *slot_type* parameter. + + Replication slots are a feature of PostgreSQL server starting with + version 9.4. + + .. method:: drop_replication_slot(slot_name) + + Drop streaming replication slot. + + :param slot_name: name of the replication slot to drop + + Example:: + + # either logical or physical replication connection + cur.drop_replication_slot("slot1") + + Replication slots are a feature of PostgreSQL server starting with + version 9.4. + + .. method:: start_replication(slot_name=None, slot_type=None, start_lsn=0, timeline=0, options=None, decode=False) + + Start replication on the connection. + + :param slot_name: name of the replication slot to use; required for + logical replication, physical replication can work + with or without a slot + :param slot_type: type of replication: should be either + `REPLICATION_LOGICAL` or `REPLICATION_PHYSICAL` + :param start_lsn: the optional LSN position to start replicating from, + can be an integer or a string of hexadecimal digits + in the form ``XXX/XXX`` + :param timeline: WAL history timeline to start streaming from (optional, + can only be used with physical replication) + :param options: a dictionary of options to pass to logical replication + slot (not allowed with physical replication) + :param decode: a flag indicating that unicode conversion should be + performed on messages received from the server + + If a *slot_name* is specified, the slot must exist on the server and + its type must match the replication type used. + + If not specified using *slot_type* parameter, the type of replication + is defined by the type of replication connection. Logical replication + is only allowed on logical replication connection, but physical + replication can be used with both types of connection. + + On the other hand, physical replication doesn't require a named + replication slot to be used, only logical replication does. In any + case logical replication and replication slots are a feature of + PostgreSQL server starting with version 9.4. Physical replication can + be used starting with 9.0. + + If *start_lsn* is specified, the requested stream will start from that + LSN. The default is `!None` which passes the LSN ``0/0`` causing + replay to begin at the last point for which the server got flush + confirmation from the client, or the oldest available point for a new + slot. + + The server might produce an error if a WAL file for the given LSN has + already been recycled or it may silently start streaming from a later + position: the client can verify the actual position using information + provided by the `ReplicationMessage` attributes. The exact server + behavior depends on the type of replication and use of slots. + + The *timeline* parameter can only be specified with physical + replication and only starting with server version 9.3. + + A dictionary of *options* may be passed to the logical decoding plugin + on a logical replication slot. The set of supported options depends + on the output plugin that was used to create the slot. Must be + `!None` for physical replication. + + If *decode* is set to `!True` the messages received from the server + would be converted according to the connection `~connection.encoding`. + *This parameter should not be set with physical replication or with + logical replication plugins that produce binary output.* + + This function constructs a ``START_REPLICATION`` command and calls + `start_replication_expert()` internally. + + After starting the replication, to actually consume the incoming + server messages use `consume_stream()` or implement a loop around + `read_message()` in case of :ref:`asynchronous connection + `. + + .. method:: start_replication_expert(command, decode=False) + + Start replication on the connection using provided + ``START_REPLICATION`` command. See `start_replication()` for + description of *decode* parameter. + + .. method:: consume_stream(consume, keepalive_interval=10) + + :param consume: a callable object with signature :samp:`consume({msg})` + :param keepalive_interval: interval (in seconds) to send keepalive + messages to the server + + This method can only be used with synchronous connection. For + asynchronous connections see `read_message()`. + + Before using this method to consume the stream call + `start_replication()` first. + + This method enters an endless loop reading messages from the server + and passing them to ``consume()`` one at a time, then waiting for more + messages from the server. In order to make this method break out of + the loop and return, ``consume()`` can throw a `StopReplication` + exception. Any unhandled exception will make it break out of the loop + as well. + + The *msg* object passed to ``consume()`` is an instance of + `ReplicationMessage` class. See `read_message()` for details about + message decoding. + + This method also sends keepalive messages to the server in case there + were no new data from the server for the duration of + *keepalive_interval* (in seconds). The value of this parameter must + be set to at least 1 second, but it can have a fractional part. + + After processing certain amount of messages the client should send a + confirmation message to the server. This should be done by calling + `send_feedback()` method on the corresponding replication cursor. A + reference to the cursor is provided in the `ReplicationMessage` as an + attribute. + + The following example is a sketch implementation of ``consume()`` + callable for logical replication:: + + class LogicalStreamConsumer(object): + + ... + + def __call__(self, msg): + self.process_message(msg.payload) + + if self.should_send_feedback(msg): + msg.cursor.send_feedback(flush_lsn=msg.data_start) + + consumer = LogicalStreamConsumer() + cur.consume_stream(consumer) + + .. warning:: + + When using replication with slots, failure to constantly consume + *and* report success to the server appropriately can eventually + lead to "disk full" condition on the server, because the server + retains all the WAL segments that might be needed to stream the + changes via all of the currently open replication slots. + + On the other hand, it is not recommended to send confirmation + after *every* processed message, since that will put an + unnecessary load on network and the server. A possible strategy + is to confirm after every COMMIT message. + + .. method:: send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) + + :param write_lsn: a LSN position up to which the client has written the data locally + :param flush_lsn: a LSN position up to which the client has processed the + data reliably (the server is allowed to discard all + and every data that predates this LSN) + :param apply_lsn: a LSN position up to which the warm standby server + has applied the changes (physical replication + master-slave protocol only) + :param reply: request the server to send back a keepalive message immediately + + Use this method to report to the server that all messages up to a + certain LSN position have been processed on the client and may be + discarded on the server. + + This method can also be called with all default parameters' values to + just send a keepalive message to the server. + + Low-level replication cursor methods for :ref:`asynchronous connection + ` operation. + + With the synchronous connection a call to `consume_stream()` handles all + the complexity of handling the incoming messages and sending keepalive + replies, but at times it might be beneficial to use low-level interface + for better control, in particular to `~select` on multiple sockets. The + following methods are provided for asynchronous operation: + + .. method:: read_message() + + Try to read the next message from the server without blocking and + return an instance of `ReplicationMessage` or `!None`, in case there + are no more data messages from the server at the moment. + + This method should be used in a loop with asynchronous connections + (after calling `start_replication()` once). For synchronous + connections see `consume_stream()`. + + The returned message's `~ReplicationMessage.payload` is an instance of + `!unicode` decoded according to connection `~connection.encoding` + *iff* *decode* was set to `!True` in the initial call to + `start_replication()` on this connection, otherwise it is an instance + of `!bytes` with no decoding. + + It is expected that the calling code will call this method repeatedly + in order to consume all of the messages that might have been buffered + until `!None` is returned. After receiving `!None` from this method + the caller should use `~select.select()` or `~select.poll()` on the + corresponding connection to block the process until there is more data + from the server. + + The server can send keepalive messages to the client periodically. + Such messages are silently consumed by this method and are never + reported to the caller. + + .. method:: fileno() + + Call the corresponding connection's `~connection.fileno()` method and + return the result. + + This is a convenience method which allows replication cursor to be + used directly in `~select.select()` or `~select.poll()` calls. + + .. attribute:: io_timestamp + + A `~datetime` object representing the timestamp at the moment of last + communication with the server (a data or keepalive message in either + direction). + + An actual example of asynchronous operation might look like this:: + + from select import select + from datetime import datetime + + def consume(msg): + ... + + keepalive_interval = 10.0 + while True: + msg = cur.read_message() + if msg: + consume(msg) + else: + now = datetime.now() + timeout = keepalive_interval - (now - cur.io_timestamp).total_seconds() + try: + sel = select([cur], [], [], max(0, timeout)) + if not any(sel): + cur.send_feedback() # timed out, send keepalive message + except InterruptedError: + pass # recalculate timeout and continue + +.. index:: + pair: Cursor; Replication + +.. autoclass:: StopReplication + + .. index:: single: Data types; Additional diff --git a/lib/extras.py b/lib/extras.py index 2713d6fc..6ae98517 100644 --- a/lib/extras.py +++ b/lib/extras.py @@ -39,8 +39,12 @@ import psycopg2 from psycopg2 import extensions as _ext from psycopg2.extensions import cursor as _cursor from psycopg2.extensions import connection as _connection -from psycopg2.extensions import adapt as _A +from psycopg2.extensions import adapt as _A, quote_ident from psycopg2.extensions import b +from psycopg2._psycopg import REPLICATION_PHYSICAL, REPLICATION_LOGICAL +from psycopg2._psycopg import ReplicationConnection as _replicationConnection +from psycopg2._psycopg import ReplicationCursor as _replicationCursor +from psycopg2._psycopg import ReplicationMessage class DictCursorBase(_cursor): @@ -437,6 +441,123 @@ class MinTimeLoggingCursor(LoggingCursor): return LoggingCursor.callproc(self, procname, vars) +class LogicalReplicationConnection(_replicationConnection): + + def __init__(self, *args, **kwargs): + kwargs['replication_type'] = REPLICATION_LOGICAL + super(LogicalReplicationConnection, self).__init__(*args, **kwargs) + + +class PhysicalReplicationConnection(_replicationConnection): + + def __init__(self, *args, **kwargs): + kwargs['replication_type'] = REPLICATION_PHYSICAL + super(PhysicalReplicationConnection, self).__init__(*args, **kwargs) + + +class StopReplication(Exception): + """ + Exception used to break out of the endless loop in + `~ReplicationCursor.consume_stream()`. + + Subclass of `~exceptions.Exception`. Intentionally *not* inherited from + `~psycopg2.Error` as occurrence of this exception does not indicate an + error. + """ + pass + + +class ReplicationCursor(_replicationCursor): + """A cursor used for communication on replication connections.""" + + def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None): + """Create streaming replication slot.""" + + command = "CREATE_REPLICATION_SLOT %s " % quote_ident(slot_name, self) + + if slot_type is None: + slot_type = self.connection.replication_type + + if slot_type == REPLICATION_LOGICAL: + if output_plugin is None: + raise psycopg2.ProgrammingError("output plugin name is required to create logical replication slot") + + command += "LOGICAL %s" % quote_ident(output_plugin, self) + + elif slot_type == REPLICATION_PHYSICAL: + if output_plugin is not None: + raise psycopg2.ProgrammingError("cannot specify output plugin name when creating physical replication slot") + + command += "PHYSICAL" + + else: + raise psycopg2.ProgrammingError("unrecognized replication type: %s" % repr(slot_type)) + + self.execute(command) + + def drop_replication_slot(self, slot_name): + """Drop streaming replication slot.""" + + command = "DROP_REPLICATION_SLOT %s" % quote_ident(slot_name, self) + self.execute(command) + + def start_replication(self, slot_name=None, slot_type=None, start_lsn=0, + timeline=0, options=None, decode=False): + """Start replication stream.""" + + command = "START_REPLICATION " + + if slot_type is None: + slot_type = self.connection.replication_type + + if slot_type == REPLICATION_LOGICAL: + if slot_name: + command += "SLOT %s " % quote_ident(slot_name, self) + else: + raise psycopg2.ProgrammingError("slot name is required for logical replication") + + command += "LOGICAL " + + elif slot_type == REPLICATION_PHYSICAL: + if slot_name: + command += "SLOT %s " % quote_ident(slot_name, self) + # don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX + + else: + raise psycopg2.ProgrammingError("unrecognized replication type: %s" % repr(slot_type)) + + if type(start_lsn) is str: + lsn = start_lsn.split('/') + lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16)) + else: + lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF, start_lsn & 0xFFFFFFFF) + + command += lsn + + if timeline != 0: + if slot_type == REPLICATION_LOGICAL: + raise psycopg2.ProgrammingError("cannot specify timeline for logical replication") + + command += " TIMELINE %d" % timeline + + if options: + if slot_type == REPLICATION_PHYSICAL: + raise psycopg2.ProgrammingError("cannot specify output plugin options for physical replication") + + command += " (" + for k,v in options.iteritems(): + if not command.endswith('('): + command += ", " + command += "%s %s" % (quote_ident(k, self), _A(str(v))) + command += ")" + + self.start_replication_expert(command, decode=decode) + + # allows replication cursors to be used in select.select() directly + def fileno(self): + return self.connection.fileno() + + # a dbtype and adapter for Python UUID type class UUID_adapter(object): diff --git a/psycopg/libpq_support.c b/psycopg/libpq_support.c new file mode 100644 index 00000000..6c0b5f8e --- /dev/null +++ b/psycopg/libpq_support.c @@ -0,0 +1,104 @@ +/* libpq_support.c - functions not provided by libpq, but which are + * required for advanced communication with the server, such as + * streaming replication + * + * Copyright (C) 2003-2015 Federico Di Gregorio + * + * This file is part of psycopg. + * + * psycopg2 is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * In addition, as a special exception, the copyright holders give + * permission to link this program with the OpenSSL library (or with + * modified versions of OpenSSL that use the same license as OpenSSL), + * and distribute linked combinations including the two. + * + * You must obey the GNU Lesser General Public License in all respects for + * all of the code used other than OpenSSL. + * + * psycopg2 is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + */ + +#define PSYCOPG_MODULE +#include "psycopg/psycopg.h" + +#include "psycopg/libpq_support.h" + +/* htonl(), ntohl() */ +#ifdef _WIN32 +#include +/* gettimeofday() */ +#include "psycopg/win32_support.h" +#else +#include +#endif + +/* support routines taken from pg_basebackup/streamutil.c */ + +/* + * Frontend version of GetCurrentTimestamp(), since we are not linked with + * backend code. The protocol always uses integer timestamps, regardless of + * server setting. + */ +int64_t +feGetCurrentTimestamp(void) +{ + int64_t result; + struct timeval tp; + + gettimeofday(&tp, NULL); + + result = (int64_t) tp.tv_sec - + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); + + result = (result * USECS_PER_SEC) + tp.tv_usec; + + return result; +} + +/* + * Converts an int64 to network byte order. + */ +void +fe_sendint64(int64_t i, char *buf) +{ + uint32_t n32; + + /* High order half first, since we're doing MSB-first */ + n32 = (uint32_t) (i >> 32); + n32 = htonl(n32); + memcpy(&buf[0], &n32, 4); + + /* Now the low order half */ + n32 = (uint32_t) i; + n32 = htonl(n32); + memcpy(&buf[4], &n32, 4); +} + +/* + * Converts an int64 from network byte order to native format. + */ +int64_t +fe_recvint64(char *buf) +{ + int64_t result; + uint32_t h32; + uint32_t l32; + + memcpy(&h32, buf, 4); + memcpy(&l32, buf + 4, 4); + h32 = ntohl(h32); + l32 = ntohl(l32); + + result = h32; + result <<= 32; + result |= l32; + + return result; +} diff --git a/psycopg/libpq_support.h b/psycopg/libpq_support.h new file mode 100644 index 00000000..c8f10665 --- /dev/null +++ b/psycopg/libpq_support.h @@ -0,0 +1,48 @@ +/* libpq_support.h - definitions for libpq_support.c + * + * Copyright (C) 2003-2015 Federico Di Gregorio + * + * This file is part of psycopg. + * + * psycopg2 is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * In addition, as a special exception, the copyright holders give + * permission to link this program with the OpenSSL library (or with + * modified versions of OpenSSL that use the same license as OpenSSL), + * and distribute linked combinations including the two. + * + * You must obey the GNU Lesser General Public License in all respects for + * all of the code used other than OpenSSL. + * + * psycopg2 is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + */ +#ifndef PSYCOPG_LIBPQ_SUPPORT_H +#define PSYCOPG_LIBPQ_SUPPORT_H 1 + +#include "psycopg/config.h" + +/* type and constant definitions from internal postgres include */ +typedef unsigned PG_INT64_TYPE XLogRecPtr; + +/* have to use lowercase %x, as PyString_FromFormat can't do %X */ +#define XLOGFMTSTR "%x/%x" +#define XLOGFMTARGS(x) ((uint32_t)((x) >> 32)), ((uint32_t)((x) & 0xFFFFFFFF)) + +/* Julian-date equivalents of Day 0 in Unix and Postgres reckoning */ +#define UNIX_EPOCH_JDATE 2440588 /* == date2j(1970, 1, 1) */ +#define POSTGRES_EPOCH_JDATE 2451545 /* == date2j(2000, 1, 1) */ + +#define SECS_PER_DAY 86400 +#define USECS_PER_SEC 1000000LL + +HIDDEN int64_t feGetCurrentTimestamp(void); +HIDDEN void fe_sendint64(int64_t i, char *buf); +HIDDEN int64_t fe_recvint64(char *buf); + +#endif /* !defined(PSYCOPG_LIBPQ_SUPPORT_H) */ diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index eb862d3d..d7283d0c 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -35,13 +35,22 @@ #include "psycopg/pqpath.h" #include "psycopg/connection.h" #include "psycopg/cursor.h" +#include "psycopg/replication_cursor.h" +#include "psycopg/replication_message.h" #include "psycopg/green.h" #include "psycopg/typecast.h" #include "psycopg/pgtypes.h" #include "psycopg/error.h" -#include +#include "psycopg/libpq_support.h" +#include "libpq-fe.h" +#ifdef _WIN32 +/* select() */ +#include +/* gettimeofday() */ +#include "win32_support.h" +#endif extern HIDDEN PyObject *psyco_DescriptionType; @@ -1056,6 +1065,13 @@ pq_get_last_result(connectionObject *conn) PQclear(result); } result = res; + + /* After entering copy both mode, libpq will make a phony + * PGresult for us every time we query for it, so we need to + * break out of this endless loop. */ + if (PQresultStatus(result) == PGRES_COPY_BOTH) { + break; + } } return result; @@ -1520,6 +1536,281 @@ exit: return ret; } +/* Tries to read the next message from the replication stream, without + blocking, in both sync and async connection modes. If no message + is ready in the CopyData buffer, tries to read from the server, + again without blocking. If that doesn't help, returns Py_None. + The caller is then supposed to block on the socket(s) and call this + function again. + + Any keepalive messages from the server are silently consumed and + are never returned to the caller. + */ +int +pq_read_replication_message(replicationCursorObject *repl, replicationMessageObject **msg) +{ + cursorObject *curs = &repl->cur; + connectionObject *conn = curs->conn; + PGconn *pgconn = conn->pgconn; + char *buffer = NULL; + int len, data_size, consumed, hdr, reply; + XLogRecPtr data_start, wal_end; + int64_t send_time; + PyObject *str = NULL, *result = NULL; + int ret = -1; + + Dprintf("pq_read_replication_message"); + + *msg = NULL; + consumed = 0; + +retry: + len = PQgetCopyData(pgconn, &buffer, 1 /* async */); + + if (len == 0) { + /* If we've tried reading some data, but there was none, bail out. */ + if (consumed) { + ret = 0; + goto exit; + } + /* We should only try reading more data when there is nothing + available at the moment. Otherwise, with a really highly loaded + server we might be reading a number of messages for every single + one we process, thus overgrowing the internal buffer until the + client system runs out of memory. */ + if (!PQconsumeInput(pgconn)) { + pq_raise(conn, curs, NULL); + goto exit; + } + /* But PQconsumeInput() doesn't tell us if it has actually read + anything into the internal buffer and there is no (supported) way + to ask libpq about this directly. The way we check is setting the + flag and re-trying PQgetCopyData(): if that returns 0 again, + there's no more data available in the buffer, so we return None. */ + consumed = 1; + goto retry; + } + + if (len == -2) { + /* serious error */ + pq_raise(conn, curs, NULL); + goto exit; + } + if (len == -1) { + /* EOF */ + curs->pgres = PQgetResult(pgconn); + + if (curs->pgres && PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) { + pq_raise(conn, curs, NULL); + goto exit; + } + + CLEARPGRES(curs->pgres); + ret = 0; + goto exit; + } + + /* It also makes sense to set this flag here to make us return early in + case of retry due to keepalive message. Any pending data on the socket + will trigger read condition in select() in the calling code anyway. */ + consumed = 1; + + /* ok, we did really read something: update the io timestamp */ + gettimeofday(&repl->last_io, NULL); + + Dprintf("pq_read_replication_message: msg=%c, len=%d", buffer[0], len); + if (buffer[0] == 'w') { + /* XLogData: msgtype(1), dataStart(8), walEnd(8), sendTime(8) */ + hdr = 1 + 8 + 8 + 8; + if (len < hdr + 1) { + psyco_set_error(OperationalError, curs, "data message header too small"); + goto exit; + } + + data_size = len - hdr; + data_start = fe_recvint64(buffer + 1); + wal_end = fe_recvint64(buffer + 1 + 8); + send_time = fe_recvint64(buffer + 1 + 8 + 8); + + Dprintf("pq_read_replication_message: data_start="XLOGFMTSTR", wal_end="XLOGFMTSTR, + XLOGFMTARGS(data_start), XLOGFMTARGS(wal_end)); + + Dprintf("pq_read_replication_message: >>%.*s<<", data_size, buffer + hdr); + + if (repl->decode) { + str = PyUnicode_Decode(buffer + hdr, data_size, conn->codec, NULL); + } else { + str = Bytes_FromStringAndSize(buffer + hdr, data_size); + } + if (!str) { goto exit; } + + result = PyObject_CallFunctionObjArgs((PyObject *)&replicationMessageType, + curs, str, NULL); + Py_DECREF(str); + if (!result) { goto exit; } + + *msg = (replicationMessageObject *)result; + (*msg)->data_size = data_size; + (*msg)->data_start = data_start; + (*msg)->wal_end = wal_end; + (*msg)->send_time = send_time; + } + else if (buffer[0] == 'k') { + /* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */ + hdr = 1 + 8 + 8; + if (len < hdr + 1) { + psyco_set_error(OperationalError, curs, "keepalive message header too small"); + goto exit; + } + + reply = buffer[hdr]; + if (reply && pq_send_replication_feedback(repl, 0) < 0) { + goto exit; + } + + PQfreemem(buffer); + buffer = NULL; + goto retry; + } + else { + psyco_set_error(OperationalError, curs, "unrecognized replication message type"); + goto exit; + } + + ret = 0; + +exit: + if (buffer) { + PQfreemem(buffer); + } + + return ret; +} + +int +pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested) +{ + cursorObject *curs = &repl->cur; + connectionObject *conn = curs->conn; + PGconn *pgconn = conn->pgconn; + char replybuf[1 + 8 + 8 + 8 + 8 + 1]; + int len = 0; + + Dprintf("pq_send_replication_feedback: write="XLOGFMTSTR", flush="XLOGFMTSTR", apply="XLOGFMTSTR, + XLOGFMTARGS(repl->write_lsn), + XLOGFMTARGS(repl->flush_lsn), + XLOGFMTARGS(repl->apply_lsn)); + + replybuf[len] = 'r'; len += 1; + fe_sendint64(repl->write_lsn, &replybuf[len]); len += 8; + fe_sendint64(repl->flush_lsn, &replybuf[len]); len += 8; + fe_sendint64(repl->apply_lsn, &replybuf[len]); len += 8; + fe_sendint64(feGetCurrentTimestamp(), &replybuf[len]); len += 8; + replybuf[len] = reply_requested ? 1 : 0; len += 1; + + if (PQputCopyData(pgconn, replybuf, len) <= 0 || PQflush(pgconn) != 0) { + pq_raise(conn, curs, NULL); + return -1; + } + gettimeofday(&repl->last_io, NULL); + + return 0; +} + +/* Calls pq_read_replication_message in an endless loop, until + stop_replication is called or a fatal error occurs. The messages + are passed to the consumer object. + + When no message is available, blocks on the connection socket, but + manages to send keepalive messages to the server as needed. +*/ +int +pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_interval) +{ + cursorObject *curs = &repl->cur; + connectionObject *conn = curs->conn; + PGconn *pgconn = conn->pgconn; + replicationMessageObject *msg = NULL; + PyObject *tmp = NULL; + int fd, sel, ret = -1; + fd_set fds; + struct timeval keep_intr, curr_time, ping_time, timeout; + + if (!PyCallable_Check(consume)) { + Dprintf("pq_copy_both: expected callable consume object"); + goto exit; + } + + CLEARPGRES(curs->pgres); + + keep_intr.tv_sec = (int)keepalive_interval; + keep_intr.tv_usec = (keepalive_interval - keep_intr.tv_sec)*1.0e6; + + while (1) { + if (pq_read_replication_message(repl, &msg) < 0) { + goto exit; + } + else if (msg == NULL) { + fd = PQsocket(pgconn); + if (fd < 0) { + pq_raise(conn, curs, NULL); + goto exit; + } + + FD_ZERO(&fds); + FD_SET(fd, &fds); + + /* how long can we wait before we need to send a keepalive? */ + gettimeofday(&curr_time, NULL); + + timeradd(&repl->last_io, &keep_intr, &ping_time); + timersub(&ping_time, &curr_time, &timeout); + + if (timeout.tv_sec >= 0) { + Py_BEGIN_ALLOW_THREADS; + sel = select(fd + 1, &fds, NULL, NULL, &timeout); + Py_END_ALLOW_THREADS; + } + else { + sel = 0; /* we're past target time, pretend select() timed out */ + } + + if (sel < 0) { + if (errno != EINTR) { + PyErr_SetFromErrno(PyExc_OSError); + goto exit; + } + if (PyErr_CheckSignals()) { + goto exit; + } + continue; + } + + if (sel == 0) { + if (pq_send_replication_feedback(repl, 0) < 0) { + goto exit; + } + } + continue; + } + else { + tmp = PyObject_CallFunctionObjArgs(consume, msg, NULL); + Py_DECREF(msg); + + if (tmp == NULL) { + Dprintf("pq_copy_both: consume returned NULL"); + goto exit; + } + Py_DECREF(tmp); + } + } + + ret = 1; + +exit: + return ret; +} + int pq_fetch(cursorObject *curs, int no_result) { @@ -1579,6 +1870,17 @@ pq_fetch(cursorObject *curs, int no_result) CLEARPGRES(curs->pgres); break; + case PGRES_COPY_BOTH: + Dprintf("pq_fetch: data from a streaming replication slot (no tuples)"); + curs->rowcount = -1; + ex = 0; + /* Nothing to do here: pq_copy_both will be called separately. + + Also don't clear the result status: it's checked in + consume_stream. */ + /*CLEARPGRES(curs->pgres);*/ + break; + case PGRES_TUPLES_OK: if (!no_result) { Dprintf("pq_fetch: got tuples"); diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h index bd3293f8..5cf22309 100644 --- a/psycopg/pqpath.h +++ b/psycopg/pqpath.h @@ -28,6 +28,8 @@ #include "psycopg/cursor.h" #include "psycopg/connection.h" +#include "psycopg/replication_cursor.h" +#include "psycopg/replication_message.h" /* macro to clean the pg result */ #define CLEARPGRES(pgres) do { PQclear(pgres); pgres = NULL; } while (0) @@ -72,4 +74,11 @@ HIDDEN int pq_execute_command_locked(connectionObject *conn, RAISES HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres, char **error); +/* replication protocol support */ +HIDDEN int pq_copy_both(replicationCursorObject *repl, PyObject *consumer, + double keepalive_interval); +HIDDEN int pq_read_replication_message(replicationCursorObject *repl, + replicationMessageObject **msg); +HIDDEN int pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested); + #endif /* !defined(PSYCOPG_PQPATH_H) */ diff --git a/psycopg/psycopg.h b/psycopg/psycopg.h index 13326ccf..3174f309 100644 --- a/psycopg/psycopg.h +++ b/psycopg/psycopg.h @@ -117,6 +117,7 @@ HIDDEN PyObject *psyco_GetDecimalType(void); /* forward declarations */ typedef struct cursorObject cursorObject; typedef struct connectionObject connectionObject; +typedef struct replicationMessageObject replicationMessageObject; /* some utility functions */ RAISES HIDDEN PyObject *psyco_set_error(PyObject *exc, cursorObject *curs, const char *msg); diff --git a/psycopg/psycopgmodule.c b/psycopg/psycopgmodule.c index dfcd5ae7..d4a4c947 100644 --- a/psycopg/psycopgmodule.c +++ b/psycopg/psycopgmodule.c @@ -28,6 +28,9 @@ #include "psycopg/connection.h" #include "psycopg/cursor.h" +#include "psycopg/replication_connection.h" +#include "psycopg/replication_cursor.h" +#include "psycopg/replication_message.h" #include "psycopg/green.h" #include "psycopg/lobject.h" #include "psycopg/notify.h" @@ -112,6 +115,7 @@ psyco_connect(PyObject *self, PyObject *args, PyObject *keywds) return conn; } + #define psyco_parse_dsn_doc \ "parse_dsn(dsn) -> dict -- parse a connection string into parameters" @@ -901,6 +905,15 @@ INIT_MODULE(_psycopg)(void) Py_TYPE(&cursorType) = &PyType_Type; if (PyType_Ready(&cursorType) == -1) goto exit; + Py_TYPE(&replicationConnectionType) = &PyType_Type; + if (PyType_Ready(&replicationConnectionType) == -1) goto exit; + + Py_TYPE(&replicationCursorType) = &PyType_Type; + if (PyType_Ready(&replicationCursorType) == -1) goto exit; + + Py_TYPE(&replicationMessageType) = &PyType_Type; + if (PyType_Ready(&replicationMessageType) == -1) goto exit; + Py_TYPE(&typecastType) = &PyType_Type; if (PyType_Ready(&typecastType) == -1) goto exit; @@ -981,6 +994,8 @@ INIT_MODULE(_psycopg)(void) /* Initialize the PyDateTimeAPI everywhere is used */ PyDateTime_IMPORT; if (psyco_adapter_datetime_init()) { goto exit; } + if (psyco_repl_curs_datetime_init()) { goto exit; } + if (psyco_replmsg_datetime_init()) { goto exit; } Py_TYPE(&pydatetimeType) = &PyType_Type; if (PyType_Ready(&pydatetimeType) == -1) goto exit; @@ -1016,6 +1031,8 @@ INIT_MODULE(_psycopg)(void) PyModule_AddStringConstant(module, "__version__", PSYCOPG_VERSION); PyModule_AddStringConstant(module, "__doc__", "psycopg PostgreSQL driver"); PyModule_AddIntConstant(module, "__libpq_version__", PG_VERSION_NUM); + PyModule_AddIntMacro(module, REPLICATION_PHYSICAL); + PyModule_AddIntMacro(module, REPLICATION_LOGICAL); PyModule_AddObject(module, "apilevel", Text_FromUTF8(APILEVEL)); PyModule_AddObject(module, "threadsafety", PyInt_FromLong(THREADSAFETY)); PyModule_AddObject(module, "paramstyle", Text_FromUTF8(PARAMSTYLE)); @@ -1023,6 +1040,9 @@ INIT_MODULE(_psycopg)(void) /* put new types in module dictionary */ PyModule_AddObject(module, "connection", (PyObject*)&connectionType); PyModule_AddObject(module, "cursor", (PyObject*)&cursorType); + PyModule_AddObject(module, "ReplicationConnection", (PyObject*)&replicationConnectionType); + PyModule_AddObject(module, "ReplicationCursor", (PyObject*)&replicationCursorType); + PyModule_AddObject(module, "ReplicationMessage", (PyObject*)&replicationMessageType); PyModule_AddObject(module, "ISQLQuote", (PyObject*)&isqlquoteType); PyModule_AddObject(module, "Notify", (PyObject*)¬ifyType); PyModule_AddObject(module, "Xid", (PyObject*)&xidType); @@ -1062,6 +1082,9 @@ INIT_MODULE(_psycopg)(void) if (0 != psyco_errors_init()) { goto exit; } psyco_errors_fill(dict); + replicationPhysicalConst = PyDict_GetItemString(dict, "REPLICATION_PHYSICAL"); + replicationLogicalConst = PyDict_GetItemString(dict, "REPLICATION_LOGICAL"); + Dprintf("initpsycopg: module initialization complete"); exit: diff --git a/psycopg/replication_connection.h b/psycopg/replication_connection.h new file mode 100644 index 00000000..e693038a --- /dev/null +++ b/psycopg/replication_connection.h @@ -0,0 +1,55 @@ +/* replication_connection.h - definition for the psycopg replication connection type + * + * Copyright (C) 2015 Daniele Varrazzo + * + * This file is part of psycopg. + * + * psycopg2 is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * In addition, as a special exception, the copyright holders give + * permission to link this program with the OpenSSL library (or with + * modified versions of OpenSSL that use the same license as OpenSSL), + * and distribute linked combinations including the two. + * + * You must obey the GNU Lesser General Public License in all respects for + * all of the code used other than OpenSSL. + * + * psycopg2 is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + */ + +#ifndef PSYCOPG_REPLICATION_CONNECTION_H +#define PSYCOPG_REPLICATION_CONNECTION_H 1 + +#include "psycopg/connection.h" + +#ifdef __cplusplus +extern "C" { +#endif + +extern HIDDEN PyTypeObject replicationConnectionType; + +typedef struct replicationConnectionObject { + connectionObject conn; + + long int type; +} replicationConnectionObject; + +/* The funny constant values should help to avoid mixups with some + commonly used numbers like 1 and 2. */ +#define REPLICATION_PHYSICAL 12345678 +#define REPLICATION_LOGICAL 87654321 + +extern HIDDEN PyObject *replicationPhysicalConst; +extern HIDDEN PyObject *replicationLogicalConst; + +#ifdef __cplusplus +} +#endif + +#endif /* !defined(PSYCOPG_REPLICATION_CONNECTION_H) */ diff --git a/psycopg/replication_connection_type.c b/psycopg/replication_connection_type.c new file mode 100644 index 00000000..5e5d2229 --- /dev/null +++ b/psycopg/replication_connection_type.c @@ -0,0 +1,221 @@ +/* replication_connection_type.c - python interface to replication connection objects + * + * Copyright (C) 2015 Daniele Varrazzo + * + * This file is part of psycopg. + * + * psycopg2 is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * In addition, as a special exception, the copyright holders give + * permission to link this program with the OpenSSL library (or with + * modified versions of OpenSSL that use the same license as OpenSSL), + * and distribute linked combinations including the two. + * + * You must obey the GNU Lesser General Public License in all respects for + * all of the code used other than OpenSSL. + * + * psycopg2 is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + */ + +#define PSYCOPG_MODULE +#include "psycopg/psycopg.h" + +#include "psycopg/replication_connection.h" +#include "psycopg/replication_message.h" +#include "psycopg/green.h" +#include "psycopg/pqpath.h" + +#include +#include + + +#define psyco_repl_conn_type_doc \ +"replication_type -- the replication connection type" + +static PyObject * +psyco_repl_conn_get_type(replicationConnectionObject *self) +{ + connectionObject *conn = &self->conn; + PyObject *res = NULL; + + EXC_IF_CONN_CLOSED(conn); + + if (self->type == REPLICATION_PHYSICAL) { + res = replicationPhysicalConst; + } else if (self->type == REPLICATION_LOGICAL) { + res = replicationLogicalConst; + } else { + PyErr_Format(PyExc_TypeError, "unknown replication type constant: %ld", self->type); + } + + Py_XINCREF(res); + return res; +} + + +static int +replicationConnection_init(PyObject *obj, PyObject *args, PyObject *kwargs) +{ + replicationConnectionObject *self = (replicationConnectionObject *)obj; + PyObject *dsn = NULL, *replication_type = NULL, + *item = NULL, *ext = NULL, *make_dsn = NULL, + *extras = NULL, *cursor = NULL; + int async = 0; + int ret = -1; + + /* 'replication_type' is not actually optional, but there's no + good way to put it before 'async' in the list */ + static char *kwlist[] = {"dsn", "async", "replication_type", NULL}; + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|iO", kwlist, + &dsn, &async, &replication_type)) { return ret; } + + /* + We have to call make_dsn() to add replication-specific + connection parameters, because the DSN might be an URI (if there + were no keyword arguments to connect() it is passed unchanged). + */ + /* we reuse args and kwargs to call make_dsn() and parent type's tp_init() */ + if (!(kwargs = PyDict_New())) { return ret; } + Py_INCREF(args); + + /* we also reuse the dsn to hold the result of the make_dsn() call */ + Py_INCREF(dsn); + + if (!(ext = PyImport_ImportModule("psycopg2.extensions"))) { goto exit; } + if (!(make_dsn = PyObject_GetAttrString(ext, "make_dsn"))) { goto exit; } + + /* all the nice stuff is located in python-level ReplicationCursor class */ + if (!(extras = PyImport_ImportModule("psycopg2.extras"))) { goto exit; } + if (!(cursor = PyObject_GetAttrString(extras, "ReplicationCursor"))) { goto exit; } + + /* checking the object reference helps to avoid recognizing + unrelated integer constants as valid input values */ + if (replication_type == replicationPhysicalConst) { + self->type = REPLICATION_PHYSICAL; + +#define SET_ITEM(k, v) \ + if (!(item = Text_FromUTF8(#v))) { goto exit; } \ + if (PyDict_SetItemString(kwargs, #k, item) != 0) { goto exit; } \ + Py_DECREF(item); \ + item = NULL; + + SET_ITEM(replication, true); + SET_ITEM(dbname, replication); /* required for .pgpass lookup */ + } else if (replication_type == replicationLogicalConst) { + self->type = REPLICATION_LOGICAL; + + SET_ITEM(replication, database); +#undef SET_ITEM + } else { + PyErr_SetString(PyExc_TypeError, + "replication_type must be either REPLICATION_PHYSICAL or REPLICATION_LOGICAL"); + goto exit; + } + + Py_DECREF(args); + if (!(args = PyTuple_Pack(1, dsn))) { goto exit; } + + Py_DECREF(dsn); + if (!(dsn = PyObject_Call(make_dsn, args, kwargs))) { goto exit; } + + Py_DECREF(args); + if (!(args = Py_BuildValue("(Oi)", dsn, async))) { goto exit; } + + /* only attempt the connection once we've handled all possible errors */ + if ((ret = connectionType.tp_init(obj, args, NULL)) < 0) { goto exit; } + + self->conn.autocommit = 1; + Py_INCREF(self->conn.cursor_factory = cursor); + +exit: + Py_XDECREF(item); + Py_XDECREF(ext); + Py_XDECREF(make_dsn); + Py_XDECREF(extras); + Py_XDECREF(cursor); + Py_XDECREF(dsn); + Py_XDECREF(args); + Py_XDECREF(kwargs); + + return ret; +} + +static PyObject * +replicationConnection_repr(replicationConnectionObject *self) +{ + return PyString_FromFormat( + "", + self, self->conn.dsn, self->conn.closed); +} + + +/* object calculated member list */ + +static struct PyGetSetDef replicationConnectionObject_getsets[] = { + /* override to prevent user tweaking these: */ + { "autocommit", NULL, NULL, NULL }, + { "isolation_level", NULL, NULL, NULL }, + { "set_session", NULL, NULL, NULL }, + { "set_isolation_level", NULL, NULL, NULL }, + { "reset", NULL, NULL, NULL }, + /* an actual getter */ + { "replication_type", + (getter)psyco_repl_conn_get_type, NULL, + psyco_repl_conn_type_doc, NULL }, + {NULL} +}; + +/* object type */ + +#define replicationConnectionType_doc \ +"A replication connection." + +PyTypeObject replicationConnectionType = { + PyVarObject_HEAD_INIT(NULL, 0) + "psycopg2.extensions.ReplicationConnection", + sizeof(replicationConnectionObject), 0, + 0, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + (reprfunc)replicationConnection_repr, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash*/ + 0, /*tp_call*/ + (reprfunc)replicationConnection_repr, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER | + Py_TPFLAGS_HAVE_GC, /*tp_flags*/ + replicationConnectionType_doc, /*tp_doc*/ + 0, /*tp_traverse*/ + 0, /*tp_clear*/ + 0, /*tp_richcompare*/ + 0, /*tp_weaklistoffset*/ + 0, /*tp_iter*/ + 0, /*tp_iternext*/ + 0, /*tp_methods*/ + 0, /*tp_members*/ + replicationConnectionObject_getsets, /*tp_getset*/ + &connectionType, /*tp_base*/ + 0, /*tp_dict*/ + 0, /*tp_descr_get*/ + 0, /*tp_descr_set*/ + 0, /*tp_dictoffset*/ + replicationConnection_init, /*tp_init*/ + 0, /*tp_alloc*/ + 0, /*tp_new*/ +}; + +PyObject *replicationPhysicalConst; +PyObject *replicationLogicalConst; diff --git a/psycopg/replication_cursor.h b/psycopg/replication_cursor.h new file mode 100644 index 00000000..71c6e190 --- /dev/null +++ b/psycopg/replication_cursor.h @@ -0,0 +1,59 @@ +/* replication_cursor.h - definition for the psycopg replication cursor type + * + * Copyright (C) 2015 Daniele Varrazzo + * + * This file is part of psycopg. + * + * psycopg2 is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * In addition, as a special exception, the copyright holders give + * permission to link this program with the OpenSSL library (or with + * modified versions of OpenSSL that use the same license as OpenSSL), + * and distribute linked combinations including the two. + * + * You must obey the GNU Lesser General Public License in all respects for + * all of the code used other than OpenSSL. + * + * psycopg2 is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + */ + +#ifndef PSYCOPG_REPLICATION_CURSOR_H +#define PSYCOPG_REPLICATION_CURSOR_H 1 + +#include "psycopg/cursor.h" +#include "libpq_support.h" + +#ifdef __cplusplus +extern "C" { +#endif + +extern HIDDEN PyTypeObject replicationCursorType; + +typedef struct replicationCursorObject { + cursorObject cur; + + int consuming:1; /* if running the consume loop */ + int decode:1; /* if we should use character decoding on the messages */ + + struct timeval last_io; /* timestamp of the last exchange with the server */ + struct timeval keepalive_interval; /* interval for keepalive messages in replication mode */ + + XLogRecPtr write_lsn; /* LSNs for replication feedback messages */ + XLogRecPtr flush_lsn; + XLogRecPtr apply_lsn; +} replicationCursorObject; + + +RAISES_NEG int psyco_repl_curs_datetime_init(void); + +#ifdef __cplusplus +} +#endif + +#endif /* !defined(PSYCOPG_REPLICATION_CURSOR_H) */ diff --git a/psycopg/replication_cursor_type.c b/psycopg/replication_cursor_type.c new file mode 100644 index 00000000..271214f1 --- /dev/null +++ b/psycopg/replication_cursor_type.c @@ -0,0 +1,315 @@ +/* replication_cursor_type.c - python interface to replication cursor objects + * + * Copyright (C) 2015 Daniele Varrazzo + * + * This file is part of psycopg. + * + * psycopg2 is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * In addition, as a special exception, the copyright holders give + * permission to link this program with the OpenSSL library (or with + * modified versions of OpenSSL that use the same license as OpenSSL), + * and distribute linked combinations including the two. + * + * You must obey the GNU Lesser General Public License in all respects for + * all of the code used other than OpenSSL. + * + * psycopg2 is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + */ + +#define PSYCOPG_MODULE +#include "psycopg/psycopg.h" + +#include "psycopg/replication_cursor.h" +#include "psycopg/replication_message.h" +#include "psycopg/green.h" +#include "psycopg/pqpath.h" + +#include +#include + +/* python */ +#include "datetime.h" + + +#define psyco_repl_curs_start_replication_expert_doc \ +"start_replication_expert(command, decode=False) -- Start replication with a given command." + +static PyObject * +psyco_repl_curs_start_replication_expert(replicationCursorObject *self, + PyObject *args, PyObject *kwargs) +{ + cursorObject *curs = &self->cur; + connectionObject *conn = self->cur.conn; + PyObject *res = NULL; + char *command; + long int decode = 0; + static char *kwlist[] = {"command", "decode", NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|l", kwlist, &command, &decode)) { + return NULL; + } + + EXC_IF_CURS_CLOSED(curs); + EXC_IF_GREEN(start_replication_expert); + EXC_IF_TPC_PREPARED(conn, start_replication_expert); + + Dprintf("psyco_repl_curs_start_replication_expert: '%s'; decode: %d", command, decode); + + if (pq_execute(curs, command, conn->async, 1 /* no_result */, 1 /* no_begin */) >= 0) { + res = Py_None; + Py_INCREF(res); + + self->decode = decode; + gettimeofday(&self->last_io, NULL); + } + + return res; +} + +#define psyco_repl_curs_consume_stream_doc \ +"consume_stream(consumer, keepalive_interval=10) -- Consume replication stream." + +static PyObject * +psyco_repl_curs_consume_stream(replicationCursorObject *self, + PyObject *args, PyObject *kwargs) +{ + cursorObject *curs = &self->cur; + PyObject *consume = NULL, *res = NULL; + double keepalive_interval = 10; + static char *kwlist[] = {"consume", "keepalive_interval", NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|id", kwlist, + &consume, &keepalive_interval)) { + return NULL; + } + + EXC_IF_CURS_CLOSED(curs); + EXC_IF_CURS_ASYNC(curs, consume_stream); + EXC_IF_GREEN(consume_stream); + EXC_IF_TPC_PREPARED(self->cur.conn, consume_stream); + + Dprintf("psyco_repl_curs_consume_stream"); + + if (keepalive_interval < 1.0) { + psyco_set_error(ProgrammingError, curs, "keepalive_interval must be >= 1 (sec)"); + return NULL; + } + + if (self->consuming) { + PyErr_SetString(ProgrammingError, + "consume_stream cannot be used when already in the consume loop"); + return NULL; + } + + if (curs->pgres == NULL || PQresultStatus(curs->pgres) != PGRES_COPY_BOTH) { + PyErr_SetString(ProgrammingError, + "consume_stream: not replicating, call start_replication first"); + return NULL; + } + CLEARPGRES(curs->pgres); + + self->consuming = 1; + + if (pq_copy_both(self, consume, keepalive_interval) >= 0) { + res = Py_None; + Py_INCREF(res); + } + + self->consuming = 0; + + return res; +} + +#define psyco_repl_curs_read_message_doc \ +"read_message() -- Try reading a replication message from the server (non-blocking)." + +static PyObject * +psyco_repl_curs_read_message(replicationCursorObject *self) +{ + cursorObject *curs = &self->cur; + replicationMessageObject *msg = NULL; + + EXC_IF_CURS_CLOSED(curs); + EXC_IF_GREEN(read_message); + EXC_IF_TPC_PREPARED(self->cur.conn, read_message); + + if (pq_read_replication_message(self, &msg) < 0) { + return NULL; + } + if (msg) { + return (PyObject *)msg; + } + + Py_RETURN_NONE; +} + +#define psyco_repl_curs_send_feedback_doc \ +"send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) -- Try sending a replication feedback message to the server and optionally request a reply." + +static PyObject * +psyco_repl_curs_send_feedback(replicationCursorObject *self, + PyObject *args, PyObject *kwargs) +{ + cursorObject *curs = &self->cur; + XLogRecPtr write_lsn = 0, flush_lsn = 0, apply_lsn = 0; + int reply = 0; + static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", NULL}; + + EXC_IF_CURS_CLOSED(curs); + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|KKKi", kwlist, + &write_lsn, &flush_lsn, &apply_lsn, &reply)) { + return NULL; + } + + if (write_lsn > self->write_lsn) + self->write_lsn = write_lsn; + + if (flush_lsn > self->flush_lsn) + self->flush_lsn = flush_lsn; + + if (apply_lsn > self->apply_lsn) + self->apply_lsn = apply_lsn; + + if (pq_send_replication_feedback(self, reply) < 0) { + return NULL; + } + + Py_RETURN_NONE; +} + + +RAISES_NEG int +psyco_repl_curs_datetime_init(void) +{ + Dprintf("psyco_repl_curs_datetime_init: datetime init"); + + PyDateTime_IMPORT; + + if (!PyDateTimeAPI) { + PyErr_SetString(PyExc_ImportError, "datetime initialization failed"); + return -1; + } + return 0; +} + +#define psyco_repl_curs_io_timestamp_doc \ +"io_timestamp -- the timestamp of latest IO with the server" + +static PyObject * +psyco_repl_curs_get_io_timestamp(replicationCursorObject *self) +{ + cursorObject *curs = &self->cur; + PyObject *tval, *res = NULL; + double seconds; + + EXC_IF_CURS_CLOSED(curs); + + seconds = self->last_io.tv_sec + self->last_io.tv_usec / 1.0e6; + + tval = Py_BuildValue("(d)", seconds); + if (tval) { + res = PyDateTime_FromTimestamp(tval); + Py_DECREF(tval); + } + return res; +} + +/* object method list */ + +static struct PyMethodDef replicationCursorObject_methods[] = { + {"start_replication_expert", (PyCFunction)psyco_repl_curs_start_replication_expert, + METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_start_replication_expert_doc}, + {"consume_stream", (PyCFunction)psyco_repl_curs_consume_stream, + METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_consume_stream_doc}, + {"read_message", (PyCFunction)psyco_repl_curs_read_message, + METH_NOARGS, psyco_repl_curs_read_message_doc}, + {"send_feedback", (PyCFunction)psyco_repl_curs_send_feedback, + METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_send_feedback_doc}, + {NULL} +}; + +/* object calculated member list */ + +static struct PyGetSetDef replicationCursorObject_getsets[] = { + { "io_timestamp", + (getter)psyco_repl_curs_get_io_timestamp, NULL, + psyco_repl_curs_io_timestamp_doc, NULL }, + {NULL} +}; + +static int +replicationCursor_init(PyObject *obj, PyObject *args, PyObject *kwargs) +{ + replicationCursorObject *self = (replicationCursorObject *)obj; + + self->consuming = 0; + self->decode = 0; + + self->write_lsn = 0; + self->flush_lsn = 0; + self->apply_lsn = 0; + + return cursorType.tp_init(obj, args, kwargs); +} + +static PyObject * +replicationCursor_repr(replicationCursorObject *self) +{ + return PyString_FromFormat( + "", self, self->cur.closed); +} + + +/* object type */ + +#define replicationCursorType_doc \ +"A database replication cursor." + +PyTypeObject replicationCursorType = { + PyVarObject_HEAD_INIT(NULL, 0) + "psycopg2.extensions.ReplicationCursor", + sizeof(replicationCursorObject), 0, + 0, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + (reprfunc)replicationCursor_repr, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash*/ + 0, /*tp_call*/ + (reprfunc)replicationCursor_repr, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER | + Py_TPFLAGS_HAVE_GC, /*tp_flags*/ + replicationCursorType_doc, /*tp_doc*/ + 0, /*tp_traverse*/ + 0, /*tp_clear*/ + 0, /*tp_richcompare*/ + 0, /*tp_weaklistoffset*/ + 0, /*tp_iter*/ + 0, /*tp_iternext*/ + replicationCursorObject_methods, /*tp_methods*/ + 0, /*tp_members*/ + replicationCursorObject_getsets, /*tp_getset*/ + &cursorType, /*tp_base*/ + 0, /*tp_dict*/ + 0, /*tp_descr_get*/ + 0, /*tp_descr_set*/ + 0, /*tp_dictoffset*/ + replicationCursor_init, /*tp_init*/ + 0, /*tp_alloc*/ + 0, /*tp_new*/ +}; diff --git a/psycopg/replication_message.h b/psycopg/replication_message.h new file mode 100644 index 00000000..b4d93d67 --- /dev/null +++ b/psycopg/replication_message.h @@ -0,0 +1,57 @@ +/* replication_message.h - definition for the psycopg ReplicationMessage type + * + * Copyright (C) 2003-2015 Federico Di Gregorio + * + * This file is part of psycopg. + * + * psycopg2 is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * In addition, as a special exception, the copyright holders give + * permission to link this program with the OpenSSL library (or with + * modified versions of OpenSSL that use the same license as OpenSSL), + * and distribute linked combinations including the two. + * + * You must obey the GNU Lesser General Public License in all respects for + * all of the code used other than OpenSSL. + * + * psycopg2 is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + */ + +#ifndef PSYCOPG_REPLICATION_MESSAGE_H +#define PSYCOPG_REPLICATION_MESSAGE_H 1 + +#include "cursor.h" +#include "libpq_support.h" + +#ifdef __cplusplus +extern "C" { +#endif + +extern HIDDEN PyTypeObject replicationMessageType; + +/* the typedef is forward-declared in psycopg.h */ +struct replicationMessageObject { + PyObject_HEAD + + cursorObject *cursor; + PyObject *payload; + + int data_size; + XLogRecPtr data_start; + XLogRecPtr wal_end; + int64_t send_time; +}; + +RAISES_NEG int psyco_replmsg_datetime_init(void); + +#ifdef __cplusplus +} +#endif + +#endif /* !defined(PSYCOPG_REPLICATION_MESSAGE_H) */ diff --git a/psycopg/replication_message_type.c b/psycopg/replication_message_type.c new file mode 100644 index 00000000..b37c402e --- /dev/null +++ b/psycopg/replication_message_type.c @@ -0,0 +1,191 @@ +/* replication_message_type.c - python interface to ReplcationMessage objects + * + * Copyright (C) 2003-2015 Federico Di Gregorio + * + * This file is part of psycopg. + * + * psycopg2 is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * In addition, as a special exception, the copyright holders give + * permission to link this program with the OpenSSL library (or with + * modified versions of OpenSSL that use the same license as OpenSSL), + * and distribute linked combinations including the two. + * + * You must obey the GNU Lesser General Public License in all respects for + * all of the code used other than OpenSSL. + * + * psycopg2 is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + */ + +#define PSYCOPG_MODULE +#include "psycopg/psycopg.h" + +#include "psycopg/replication_message.h" + +#include "datetime.h" + +RAISES_NEG int +psyco_replmsg_datetime_init(void) +{ + Dprintf("psyco_replmsg_datetime_init: datetime init"); + + PyDateTime_IMPORT; + + if (!PyDateTimeAPI) { + PyErr_SetString(PyExc_ImportError, "datetime initialization failed"); + return -1; + } + return 0; +} + + +static PyObject * +replmsg_repr(replicationMessageObject *self) +{ + return PyString_FromFormat( + "", + self, self->data_size, XLOGFMTARGS(self->data_start), XLOGFMTARGS(self->wal_end), + (long int)self->send_time); +} + +static int +replmsg_init(PyObject *obj, PyObject *args, PyObject *kwargs) +{ + replicationMessageObject *self = (replicationMessageObject*) obj; + + if (!PyArg_ParseTuple(args, "O!O", &cursorType, &self->cursor, &self->payload)) + return -1; + Py_XINCREF(self->cursor); + Py_XINCREF(self->payload); + + self->data_size = 0; + self->data_start = 0; + self->wal_end = 0; + self->send_time = 0; + + return 0; +} + +static int +replmsg_traverse(replicationMessageObject *self, visitproc visit, void *arg) +{ + Py_VISIT((PyObject* )self->cursor); + Py_VISIT(self->payload); + return 0; +} + +static int +replmsg_clear(replicationMessageObject *self) +{ + Py_CLEAR(self->cursor); + Py_CLEAR(self->payload); + return 0; +} + +static void +replmsg_dealloc(PyObject* obj) +{ + PyObject_GC_UnTrack(obj); + + replmsg_clear((replicationMessageObject*) obj); + + Py_TYPE(obj)->tp_free(obj); +} + +#define psyco_replmsg_send_time_doc \ +"send_time - Timestamp of the replication message departure from the server." + +static PyObject * +psyco_replmsg_get_send_time(replicationMessageObject *self) +{ + PyObject *tval, *res = NULL; + double t; + + t = (double)self->send_time / USECS_PER_SEC + + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); + + tval = Py_BuildValue("(d)", t); + if (tval) { + res = PyDateTime_FromTimestamp(tval); + Py_DECREF(tval); + } + + return res; +} + +#define OFFSETOF(x) offsetof(replicationMessageObject, x) + +/* object member list */ + +static struct PyMemberDef replicationMessageObject_members[] = { + {"cursor", T_OBJECT, OFFSETOF(cursor), READONLY, + "Related ReplcationCursor object."}, + {"payload", T_OBJECT, OFFSETOF(payload), READONLY, + "The actual message data."}, + {"data_size", T_INT, OFFSETOF(data_size), READONLY, + "Raw size of the message data in bytes."}, + {"data_start", T_ULONGLONG, OFFSETOF(data_start), READONLY, + "LSN position of the start of this message."}, + {"wal_end", T_ULONGLONG, OFFSETOF(wal_end), READONLY, + "LSN position of the current end of WAL on the server."}, + {NULL} +}; + +static struct PyGetSetDef replicationMessageObject_getsets[] = { + { "send_time", (getter)psyco_replmsg_get_send_time, NULL, + psyco_replmsg_send_time_doc, NULL }, + {NULL} +}; + +/* object type */ + +#define replicationMessageType_doc \ +"A replication protocol message." + +PyTypeObject replicationMessageType = { + PyVarObject_HEAD_INIT(NULL, 0) + "psycopg2.extensions.ReplicationMessage", + sizeof(replicationMessageObject), 0, + replmsg_dealloc, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + (reprfunc)replmsg_repr, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash */ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | + Py_TPFLAGS_HAVE_GC, /*tp_flags*/ + replicationMessageType_doc, /*tp_doc*/ + (traverseproc)replmsg_traverse, /*tp_traverse*/ + (inquiry)replmsg_clear, /*tp_clear*/ + 0, /*tp_richcompare*/ + 0, /*tp_weaklistoffset*/ + 0, /*tp_iter*/ + 0, /*tp_iternext*/ + 0, /*tp_methods*/ + replicationMessageObject_members, /*tp_members*/ + replicationMessageObject_getsets, /*tp_getset*/ + 0, /*tp_base*/ + 0, /*tp_dict*/ + 0, /*tp_descr_get*/ + 0, /*tp_descr_set*/ + 0, /*tp_dictoffset*/ + replmsg_init, /*tp_init*/ + 0, /*tp_alloc*/ + PyType_GenericNew, /*tp_new*/ +}; diff --git a/psycopg/win32_support.c b/psycopg/win32_support.c new file mode 100644 index 00000000..d508b220 --- /dev/null +++ b/psycopg/win32_support.c @@ -0,0 +1,76 @@ +/* win32_support.c - emulate some functions missing on Win32 + * + * Copyright (C) 2003-2015 Federico Di Gregorio + * + * This file is part of psycopg. + * + * psycopg2 is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * In addition, as a special exception, the copyright holders give + * permission to link this program with the OpenSSL library (or with + * modified versions of OpenSSL that use the same license as OpenSSL), + * and distribute linked combinations including the two. + * + * You must obey the GNU Lesser General Public License in all respects for + * all of the code used other than OpenSSL. + * + * psycopg2 is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + */ + +#define PSYCOPG_MODULE +#include "psycopg/psycopg.h" + +#include "psycopg/win32_support.h" + +#ifdef _WIN32 + +#ifndef __MINGW32__ +/* millisecond-precision port of gettimeofday for Win32, taken from + src/port/gettimeofday.c in PostgreSQL core */ + +/* FILETIME of Jan 1 1970 00:00:00. */ +static const unsigned __int64 epoch = 116444736000000000ULL; + +/* + * timezone information is stored outside the kernel so tzp isn't used anymore. + * + * Note: this function is not for Win32 high precision timing purpose. See + * elapsed_time(). + */ +int +gettimeofday(struct timeval * tp, struct timezone * tzp) +{ + FILETIME file_time; + SYSTEMTIME system_time; + ULARGE_INTEGER ularge; + + GetSystemTime(&system_time); + SystemTimeToFileTime(&system_time, &file_time); + ularge.LowPart = file_time.dwLowDateTime; + ularge.HighPart = file_time.dwHighDateTime; + + tp->tv_sec = (long) ((ularge.QuadPart - epoch) / 10000000L); + tp->tv_usec = (long) (system_time.wMilliseconds * 1000); + + return 0; +} +#endif /* !defined(__MINGW32__) */ + +/* timersub is missing on mingw */ +void +timersub(struct timeval *a, struct timeval *b, struct timeval *c) +{ + c->tv_sec = a->tv_sec - b->tv_sec; + c->tv_usec = a->tv_usec - b->tv_usec; + if (tv_usec < 0) { + c->tv_usec += 1000000; + c->tv_sec -= 1; + } +} +#endif /* defined(_WIN32) */ diff --git a/psycopg/win32_support.h b/psycopg/win32_support.h new file mode 100644 index 00000000..be963df5 --- /dev/null +++ b/psycopg/win32_support.h @@ -0,0 +1,40 @@ +/* win32_support.h - definitions for win32_support.c + * + * Copyright (C) 2003-2015 Federico Di Gregorio + * + * This file is part of psycopg. + * + * psycopg2 is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * In addition, as a special exception, the copyright holders give + * permission to link this program with the OpenSSL library (or with + * modified versions of OpenSSL that use the same license as OpenSSL), + * and distribute linked combinations including the two. + * + * You must obey the GNU Lesser General Public License in all respects for + * all of the code used other than OpenSSL. + * + * psycopg2 is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + */ +#ifndef PSYCOPG_WIN32_SUPPORT_H +#define PSYCOPG_WIN32_SUPPORT_H 1 + +#include "psycopg/config.h" + +#include + +#ifdef _WIN32 +#ifndef __MINGW32__ +HIDDEN int gettimeofday(struct timeval * tp, struct timezone * tzp); +#endif + +HIDDEN void timersub(struct timeval *a, struct timeval *b, struct timeval *c); +#endif + +#endif /* !defined(PSYCOPG_WIN32_SUPPORT_H) */ diff --git a/psycopg2.cproj b/psycopg2.cproj index 7755b961..f6f85c72 100644 --- a/psycopg2.cproj +++ b/psycopg2.cproj @@ -85,14 +85,19 @@ + + + + + @@ -124,6 +129,7 @@ + @@ -217,10 +223,14 @@ + + + + @@ -229,6 +239,7 @@ + @@ -251,4 +262,4 @@ - \ No newline at end of file + diff --git a/setup.py b/setup.py index 65d5fa69..6414a88f 100644 --- a/setup.py +++ b/setup.py @@ -471,9 +471,13 @@ data_files = [] sources = [ 'psycopgmodule.c', 'green.c', 'pqpath.c', 'utils.c', 'bytes_format.c', + 'libpq_support.c', 'win32_support.c', 'connection_int.c', 'connection_type.c', 'cursor_int.c', 'cursor_type.c', + 'replication_connection_type.c', + 'replication_cursor_type.c', + 'replication_message_type.c', 'diagnostics_type.c', 'error_type.c', 'lobject_int.c', 'lobject_type.c', 'notify_type.c', 'xid_type.c', @@ -489,7 +493,11 @@ depends = [ # headers 'config.h', 'pgtypes.h', 'psycopg.h', 'python.h', 'connection.h', 'cursor.h', 'diagnostics.h', 'error.h', 'green.h', 'lobject.h', + 'replication_connection.h', + 'replication_cursor.h', + 'replication_message.h', 'notify.h', 'pqpath.h', 'xid.h', + 'libpq_support.h', 'win32_support.h', 'adapter_asis.h', 'adapter_binary.h', 'adapter_datetime.h', 'adapter_list.h', 'adapter_pboolean.h', 'adapter_pdecimal.h', diff --git a/tests/__init__.py b/tests/__init__.py index 3e0db779..2e51cc2b 100755 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -31,6 +31,7 @@ import test_bugX000 import test_bug_gc import test_cancel import test_connection +import test_replication import test_copy import test_cursor import test_dates @@ -69,6 +70,7 @@ def test_suite(): suite.addTest(test_bug_gc.test_suite()) suite.addTest(test_cancel.test_suite()) suite.addTest(test_connection.test_suite()) + suite.addTest(test_replication.test_suite()) suite.addTest(test_copy.test_suite()) suite.addTest(test_cursor.test_suite()) suite.addTest(test_dates.test_suite()) diff --git a/tests/test_async.py b/tests/test_async.py index d40b9c3e..e0bca7d5 100755 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -29,7 +29,6 @@ import psycopg2 from psycopg2 import extensions import time -import select import StringIO from testutils import ConnectingTestCase @@ -66,21 +65,6 @@ class AsyncTests(ConnectingTestCase): )''') self.wait(curs) - def wait(self, cur_or_conn): - pollable = cur_or_conn - if not hasattr(pollable, 'poll'): - pollable = cur_or_conn.connection - while True: - state = pollable.poll() - if state == psycopg2.extensions.POLL_OK: - break - elif state == psycopg2.extensions.POLL_READ: - select.select([pollable], [], [], 10) - elif state == psycopg2.extensions.POLL_WRITE: - select.select([], [pollable], [], 10) - else: - raise Exception("Unexpected result from poll: %r", state) - def test_connection_setup(self): cur = self.conn.cursor() sync_cur = self.sync_conn.cursor() diff --git a/tests/test_connection.py b/tests/test_connection.py index 5e026226..8aa5a2b5 100755 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1246,18 +1246,6 @@ class AutocommitTests(ConnectingTestCase): self.assertEqual(cur.fetchone()[0], 'on') -class ReplicationTest(ConnectingTestCase): - @skip_before_postgres(9, 0) - def test_replication_not_supported(self): - conn = self.repl_connect() - if conn is None: - return - cur = conn.cursor() - f = StringIO() - self.assertRaises(psycopg2.NotSupportedError, - cur.copy_expert, "START_REPLICATION 0/0", f) - - def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) diff --git a/tests/test_module.py b/tests/test_module.py index a6918cb0..1a9a19d4 100755 --- a/tests/test_module.py +++ b/tests/test_module.py @@ -116,6 +116,12 @@ class ConnectTestCase(unittest.TestCase): self.assertEqual(self.args[1], None) self.assert_(self.args[2]) + def test_int_port_param(self): + psycopg2.connect(database='sony', port=6543) + dsn = " %s " % self.args[0] + self.assertIn(" dbname=sony ", dsn) + self.assertIn(" port=6543 ", dsn) + def test_empty_param(self): psycopg2.connect(database='sony', password='') self.assertDsnEqual(self.args[0], "dbname=sony password=''") diff --git a/tests/test_replication.py b/tests/test_replication.py new file mode 100644 index 00000000..f527edd2 --- /dev/null +++ b/tests/test_replication.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python + +# test_replication.py - unit test for replication protocol +# +# Copyright (C) 2015 Daniele Varrazzo +# +# psycopg2 is free software: you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# In addition, as a special exception, the copyright holders give +# permission to link this program with the OpenSSL library (or with +# modified versions of OpenSSL that use the same license as OpenSSL), +# and distribute linked combinations including the two. +# +# You must obey the GNU Lesser General Public License in all respects for +# all of the code used other than OpenSSL. +# +# psycopg2 is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +# License for more details. + +import psycopg2 +import psycopg2.extensions +from psycopg2.extras import PhysicalReplicationConnection, LogicalReplicationConnection +from psycopg2.extras import StopReplication + +import testconfig +from testutils import unittest +from testutils import skip_before_postgres +from testutils import ConnectingTestCase + + +class ReplicationTestCase(ConnectingTestCase): + def setUp(self): + if not testconfig.repl_dsn: + self.skipTest("replication tests disabled by default") + + super(ReplicationTestCase, self).setUp() + + self.slot = testconfig.repl_slot + self._slots = [] + + def tearDown(self): + # first close all connections, as they might keep the slot(s) active + super(ReplicationTestCase, self).tearDown() + + import time + time.sleep(0.025) # sometimes the slot is still active, wait a little + + if self._slots: + kill_conn = self.connect() + if kill_conn: + kill_cur = kill_conn.cursor() + for slot in self._slots: + kill_cur.execute("SELECT pg_drop_replication_slot(%s)", (slot,)) + kill_conn.commit() + kill_conn.close() + + def create_replication_slot(self, cur, slot_name=testconfig.repl_slot, **kwargs): + cur.create_replication_slot(slot_name, **kwargs) + self._slots.append(slot_name) + + def drop_replication_slot(self, cur, slot_name=testconfig.repl_slot): + cur.drop_replication_slot(slot_name) + self._slots.remove(slot_name) + + # generate some events for our replication stream + def make_replication_events(self): + conn = self.connect() + if conn is None: return + cur = conn.cursor() + + try: + cur.execute("DROP TABLE dummy1") + except psycopg2.ProgrammingError: + conn.rollback() + cur.execute("CREATE TABLE dummy1 AS SELECT * FROM generate_series(1, 5) AS id") + conn.commit() + + +class ReplicationTest(ReplicationTestCase): + @skip_before_postgres(9, 0) + def test_physical_replication_connection(self): + conn = self.repl_connect(connection_factory=PhysicalReplicationConnection) + if conn is None: return + cur = conn.cursor() + cur.execute("IDENTIFY_SYSTEM") + cur.fetchall() + + @skip_before_postgres(9, 4) + def test_logical_replication_connection(self): + conn = self.repl_connect(connection_factory=LogicalReplicationConnection) + if conn is None: return + cur = conn.cursor() + cur.execute("IDENTIFY_SYSTEM") + cur.fetchall() + + @skip_before_postgres(9, 4) # slots require 9.4 + def test_create_replication_slot(self): + conn = self.repl_connect(connection_factory=PhysicalReplicationConnection) + if conn is None: return + cur = conn.cursor() + + self.create_replication_slot(cur) + self.assertRaises(psycopg2.ProgrammingError, self.create_replication_slot, cur) + + @skip_before_postgres(9, 4) # slots require 9.4 + def test_start_on_missing_replication_slot(self): + conn = self.repl_connect(connection_factory=PhysicalReplicationConnection) + if conn is None: return + cur = conn.cursor() + + self.assertRaises(psycopg2.ProgrammingError, cur.start_replication, self.slot) + + self.create_replication_slot(cur) + cur.start_replication(self.slot) + + @skip_before_postgres(9, 4) # slots require 9.4 + def test_start_and_recover_from_error(self): + conn = self.repl_connect(connection_factory=LogicalReplicationConnection) + if conn is None: return + cur = conn.cursor() + + self.create_replication_slot(cur, output_plugin='test_decoding') + + # try with invalid options + cur.start_replication(slot_name=self.slot, options={'invalid_param': 'value'}) + def consume(msg): + pass + # we don't see the error from the server before we try to read the data + self.assertRaises(psycopg2.DataError, cur.consume_stream, consume) + + # try with correct command + cur.start_replication(slot_name=self.slot) + + @skip_before_postgres(9, 4) # slots require 9.4 + def test_stop_replication(self): + conn = self.repl_connect(connection_factory=LogicalReplicationConnection) + if conn is None: return + cur = conn.cursor() + + self.create_replication_slot(cur, output_plugin='test_decoding') + + self.make_replication_events() + + cur.start_replication(self.slot) + def consume(msg): + raise StopReplication() + self.assertRaises(StopReplication, cur.consume_stream, consume) + + +class AsyncReplicationTest(ReplicationTestCase): + @skip_before_postgres(9, 4) # slots require 9.4 + def test_async_replication(self): + conn = self.repl_connect(connection_factory=LogicalReplicationConnection, async=1) + if conn is None: return + self.wait(conn) + cur = conn.cursor() + + self.create_replication_slot(cur, output_plugin='test_decoding') + self.wait(cur) + + cur.start_replication(self.slot) + self.wait(cur) + + self.make_replication_events() + + self.msg_count = 0 + def consume(msg): + # just check the methods + log = "%s: %s" % (cur.io_timestamp, repr(msg)) + + self.msg_count += 1 + if self.msg_count > 3: + cur.send_feedback(reply=True) + raise StopReplication() + + cur.send_feedback(flush_lsn=msg.data_start) + + # cannot be used in asynchronous mode + self.assertRaises(psycopg2.ProgrammingError, cur.consume_stream, consume) + + def process_stream(): + from select import select + while True: + msg = cur.read_message() + if msg: + consume(msg) + else: + select([cur], [], []) + self.assertRaises(StopReplication, process_stream) + +def test_suite(): + return unittest.TestLoader().loadTestsFromName(__name__) + +if __name__ == "__main__": + unittest.main() diff --git a/tests/testconfig.py b/tests/testconfig.py index 72c533ec..82b48a39 100644 --- a/tests/testconfig.py +++ b/tests/testconfig.py @@ -7,8 +7,6 @@ dbhost = os.environ.get('PSYCOPG2_TESTDB_HOST', None) dbport = os.environ.get('PSYCOPG2_TESTDB_PORT', None) dbuser = os.environ.get('PSYCOPG2_TESTDB_USER', None) dbpass = os.environ.get('PSYCOPG2_TESTDB_PASSWORD', None) -repl_dsn = os.environ.get('PSYCOPG2_TEST_REPL_DSN', - "dbname=psycopg2_test replication=1") # Check if we want to test psycopg's green path. green = os.environ.get('PSYCOPG2_TEST_GREEN', None) @@ -34,3 +32,11 @@ if dbuser is not None: dsn += ' user=%s' % dbuser if dbpass is not None: dsn += ' password=%s' % dbpass + +# Don't run replication tests if REPL_DSN is not set, default to normal DSN if +# set to empty string. +repl_dsn = os.environ.get('PSYCOPG2_TEST_REPL_DSN', None) +if repl_dsn == '': + repl_dsn = dsn + +repl_slot = os.environ.get('PSYCOPG2_TEST_REPL_SLOT', 'psycopg2_test_slot') diff --git a/tests/testutils.py b/tests/testutils.py index fc2b59d7..1d1ad054 100644 --- a/tests/testutils.py +++ b/tests/testutils.py @@ -27,6 +27,7 @@ import os import platform import sys +import select from functools import wraps from testconfig import dsn, repl_dsn @@ -128,8 +129,6 @@ class ConnectingTestCase(unittest.TestCase): conn = self.connect(**kwargs) except psycopg2.OperationalError, e: return self.skipTest("replication db not configured: %s" % e) - - conn.autocommit = True return conn def _get_conn(self): @@ -143,6 +142,23 @@ class ConnectingTestCase(unittest.TestCase): conn = property(_get_conn, _set_conn) + # for use with async connections only + def wait(self, cur_or_conn): + import psycopg2.extensions + pollable = cur_or_conn + if not hasattr(pollable, 'poll'): + pollable = cur_or_conn.connection + while True: + state = pollable.poll() + if state == psycopg2.extensions.POLL_OK: + break + elif state == psycopg2.extensions.POLL_READ: + select.select([pollable], [], [], 10) + elif state == psycopg2.extensions.POLL_WRITE: + select.select([], [pollable], [], 10) + else: + raise Exception("Unexpected result from poll: %r", state) + def decorate_all_tests(cls, *decorators): """