mirror of
https://github.com/psycopg/psycopg2.git
synced 2024-11-22 00:46:33 +03:00
Merge branch 'replication-protocol'
This commit is contained in:
commit
1d950748af
2
NEWS
2
NEWS
|
@ -6,6 +6,8 @@ What's new in psycopg 2.7
|
|||
|
||||
New features:
|
||||
|
||||
- Added :ref:`replication-support` (:ticket:`#322`). Main authors are
|
||||
Oleksandr Shulgin and Craig Ringer, who deserve a huge thank you.
|
||||
- Added `~psycopg2.extensions.parse_dsn()` and
|
||||
`~psycopg2.extensions.make_dsn()` functions (:tickets:`#321, #363`).
|
||||
`~psycopg2.connect()` now can take both *dsn* and keyword arguments, merging
|
||||
|
|
|
@ -423,7 +423,7 @@ this will be probably implemented in a future release.
|
|||
Support for coroutine libraries
|
||||
-------------------------------
|
||||
|
||||
.. versionadded:: 2.2.0
|
||||
.. versionadded:: 2.2
|
||||
|
||||
Psycopg can be used together with coroutine_\-based libraries and participate
|
||||
in cooperative multithreading.
|
||||
|
@ -509,3 +509,90 @@ resources about the topic.
|
|||
conn.commit()
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
|
||||
|
||||
.. index::
|
||||
single: Replication
|
||||
|
||||
.. _replication-support:
|
||||
|
||||
Replication protocol support
|
||||
----------------------------
|
||||
|
||||
.. versionadded:: 2.7
|
||||
|
||||
Modern PostgreSQL servers (version 9.0 and above) support replication. The
|
||||
replication protocol is built on top of the client-server protocol and can be
|
||||
operated using ``libpq``, as such it can be also operated by ``psycopg2``.
|
||||
The replication protocol can be operated on both synchronous and
|
||||
:ref:`asynchronous <async-support>` connections.
|
||||
|
||||
Server version 9.4 adds a new feature called *Logical Replication*.
|
||||
|
||||
.. seealso::
|
||||
|
||||
- PostgreSQL `Streaming Replication Protocol`__
|
||||
|
||||
.. __: http://www.postgresql.org/docs/current/static/protocol-replication.html
|
||||
|
||||
|
||||
Logical replication Quick-Start
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
You must be using PostgreSQL server version 9.4 or above to run this quick
|
||||
start.
|
||||
|
||||
Make sure that replication connections are permitted for user ``postgres`` in
|
||||
``pg_hba.conf`` and reload the server configuration. You also need to set
|
||||
``wal_level=logical`` and ``max_wal_senders``, ``max_replication_slots`` to
|
||||
value greater than zero in ``postgresql.conf`` (these changes require a server
|
||||
restart). Create a database ``psycopg2_test``.
|
||||
|
||||
Then run the following code to quickly try the replication support out. This
|
||||
is not production code -- it has no error handling, it sends feedback too
|
||||
often, etc. -- and it's only intended as a simple demo of logical
|
||||
replication::
|
||||
|
||||
from __future__ import print_function
|
||||
import sys
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
|
||||
conn = psycopg2.connect('dbname=psycopg2_test user=postgres',
|
||||
connection_factory=psycopg2.extras.LogicalReplicationConnection)
|
||||
cur = conn.cursor()
|
||||
try:
|
||||
# test_decoding produces textual output
|
||||
cur.start_replication(slot_name='pytest', decode=True)
|
||||
except psycopg2.ProgrammingError:
|
||||
cur.create_replication_slot('pytest', output_plugin='test_decoding')
|
||||
cur.start_replication(slot_name='pytest', decode=True)
|
||||
|
||||
class DemoConsumer(object):
|
||||
def __call__(self, msg):
|
||||
print(msg.payload)
|
||||
msg.cursor.send_feedback(flush_lsn=msg.data_start)
|
||||
|
||||
democonsumer = DemoConsumer()
|
||||
|
||||
print("Starting streaming, press Control-C to end...", file=sys.stderr)
|
||||
try:
|
||||
cur.consume_stream(democonsumer)
|
||||
except KeyboardInterrupt:
|
||||
cur.close()
|
||||
conn.close()
|
||||
print("The slot 'pytest' still exists. Drop it with "
|
||||
"SELECT pg_drop_replication_slot('pytest'); if no longer needed.",
|
||||
file=sys.stderr)
|
||||
print("WARNING: Transaction logs will accumulate in pg_xlog "
|
||||
"until the slot is dropped.", file=sys.stderr)
|
||||
|
||||
|
||||
You can now make changes to the ``psycopg2_test`` database using a normal
|
||||
psycopg2 session, ``psql``, etc. and see the logical decoding stream printed
|
||||
by this demo client.
|
||||
|
||||
This will continue running until terminated with ``Control-C``.
|
||||
|
||||
For the details see :ref:`replication-objects`.
|
||||
|
|
|
@ -143,6 +143,374 @@ Logging cursor
|
|||
|
||||
|
||||
|
||||
.. _replication-objects:
|
||||
|
||||
Replication connection and cursor classes
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
See :ref:`replication-support` for an introduction to the topic.
|
||||
|
||||
|
||||
The following replication types are defined:
|
||||
|
||||
.. data:: REPLICATION_LOGICAL
|
||||
.. data:: REPLICATION_PHYSICAL
|
||||
|
||||
|
||||
.. index::
|
||||
pair: Connection; replication
|
||||
|
||||
.. autoclass:: LogicalReplicationConnection
|
||||
|
||||
This connection factory class can be used to open a special type of
|
||||
connection that is used for logical replication.
|
||||
|
||||
Example::
|
||||
|
||||
from psycopg2.extras import LogicalReplicationConnection
|
||||
log_conn = psycopg2.connect(dsn, connection_factory=LogicalReplicationConnection)
|
||||
log_cur = log_conn.cursor()
|
||||
|
||||
|
||||
.. autoclass:: PhysicalReplicationConnection
|
||||
|
||||
This connection factory class can be used to open a special type of
|
||||
connection that is used for physical replication.
|
||||
|
||||
Example::
|
||||
|
||||
from psycopg2.extras import PhysicalReplicationConnection
|
||||
phys_conn = psycopg2.connect(dsn, connection_factory=PhysicalReplicationConnection)
|
||||
phys_cur = phys_conn.cursor()
|
||||
|
||||
Both `LogicalReplicationConnection` and `PhysicalReplicationConnection` use
|
||||
`ReplicationCursor` for actual communication with the server.
|
||||
|
||||
|
||||
.. index::
|
||||
pair: Message; replication
|
||||
|
||||
The individual messages in the replication stream are represented by
|
||||
`ReplicationMessage` objects (both logical and physical type):
|
||||
|
||||
.. autoclass:: ReplicationMessage
|
||||
|
||||
.. attribute:: payload
|
||||
|
||||
The actual data received from the server.
|
||||
|
||||
An instance of either `bytes()` or `unicode()`, depending on the value
|
||||
of `decode` option passed to `~ReplicationCursor.start_replication()`
|
||||
on the connection. See `~ReplicationCursor.read_message()` for
|
||||
details.
|
||||
|
||||
.. attribute:: data_size
|
||||
|
||||
The raw size of the message payload (before possible unicode
|
||||
conversion).
|
||||
|
||||
.. attribute:: data_start
|
||||
|
||||
LSN position of the start of the message.
|
||||
|
||||
.. attribute:: wal_end
|
||||
|
||||
LSN position of the current end of WAL on the server.
|
||||
|
||||
.. attribute:: send_time
|
||||
|
||||
A `~datetime` object representing the server timestamp at the moment
|
||||
when the message was sent.
|
||||
|
||||
.. attribute:: cursor
|
||||
|
||||
A reference to the corresponding `ReplicationCursor` object.
|
||||
|
||||
|
||||
.. index::
|
||||
pair: Cursor; replication
|
||||
|
||||
.. autoclass:: ReplicationCursor
|
||||
|
||||
.. method:: create_replication_slot(slot_name, slot_type=None, output_plugin=None)
|
||||
|
||||
Create streaming replication slot.
|
||||
|
||||
:param slot_name: name of the replication slot to be created
|
||||
:param slot_type: type of replication: should be either
|
||||
`REPLICATION_LOGICAL` or `REPLICATION_PHYSICAL`
|
||||
:param output_plugin: name of the logical decoding output plugin to be
|
||||
used by the slot; required for logical
|
||||
replication connections, disallowed for physical
|
||||
|
||||
Example::
|
||||
|
||||
log_cur.create_replication_slot("logical1", "test_decoding")
|
||||
phys_cur.create_replication_slot("physical1")
|
||||
|
||||
# either logical or physical replication connection
|
||||
cur.create_replication_slot("slot1", slot_type=REPLICATION_LOGICAL)
|
||||
|
||||
When creating a slot on a logical replication connection, a logical
|
||||
replication slot is created by default. Logical replication requires
|
||||
name of the logical decoding output plugin to be specified.
|
||||
|
||||
When creating a slot on a physical replication connection, a physical
|
||||
replication slot is created by default. No output plugin parameter is
|
||||
required or allowed when creating a physical replication slot.
|
||||
|
||||
In either case the type of slot being created can be specified
|
||||
explicitly using *slot_type* parameter.
|
||||
|
||||
Replication slots are a feature of PostgreSQL server starting with
|
||||
version 9.4.
|
||||
|
||||
.. method:: drop_replication_slot(slot_name)
|
||||
|
||||
Drop streaming replication slot.
|
||||
|
||||
:param slot_name: name of the replication slot to drop
|
||||
|
||||
Example::
|
||||
|
||||
# either logical or physical replication connection
|
||||
cur.drop_replication_slot("slot1")
|
||||
|
||||
Replication slots are a feature of PostgreSQL server starting with
|
||||
version 9.4.
|
||||
|
||||
.. method:: start_replication(slot_name=None, slot_type=None, start_lsn=0, timeline=0, options=None, decode=False)
|
||||
|
||||
Start replication on the connection.
|
||||
|
||||
:param slot_name: name of the replication slot to use; required for
|
||||
logical replication, physical replication can work
|
||||
with or without a slot
|
||||
:param slot_type: type of replication: should be either
|
||||
`REPLICATION_LOGICAL` or `REPLICATION_PHYSICAL`
|
||||
:param start_lsn: the optional LSN position to start replicating from,
|
||||
can be an integer or a string of hexadecimal digits
|
||||
in the form ``XXX/XXX``
|
||||
:param timeline: WAL history timeline to start streaming from (optional,
|
||||
can only be used with physical replication)
|
||||
:param options: a dictionary of options to pass to logical replication
|
||||
slot (not allowed with physical replication)
|
||||
:param decode: a flag indicating that unicode conversion should be
|
||||
performed on messages received from the server
|
||||
|
||||
If a *slot_name* is specified, the slot must exist on the server and
|
||||
its type must match the replication type used.
|
||||
|
||||
If not specified using *slot_type* parameter, the type of replication
|
||||
is defined by the type of replication connection. Logical replication
|
||||
is only allowed on logical replication connection, but physical
|
||||
replication can be used with both types of connection.
|
||||
|
||||
On the other hand, physical replication doesn't require a named
|
||||
replication slot to be used, only logical replication does. In any
|
||||
case logical replication and replication slots are a feature of
|
||||
PostgreSQL server starting with version 9.4. Physical replication can
|
||||
be used starting with 9.0.
|
||||
|
||||
If *start_lsn* is specified, the requested stream will start from that
|
||||
LSN. The default is `!None` which passes the LSN ``0/0`` causing
|
||||
replay to begin at the last point for which the server got flush
|
||||
confirmation from the client, or the oldest available point for a new
|
||||
slot.
|
||||
|
||||
The server might produce an error if a WAL file for the given LSN has
|
||||
already been recycled or it may silently start streaming from a later
|
||||
position: the client can verify the actual position using information
|
||||
provided by the `ReplicationMessage` attributes. The exact server
|
||||
behavior depends on the type of replication and use of slots.
|
||||
|
||||
The *timeline* parameter can only be specified with physical
|
||||
replication and only starting with server version 9.3.
|
||||
|
||||
A dictionary of *options* may be passed to the logical decoding plugin
|
||||
on a logical replication slot. The set of supported options depends
|
||||
on the output plugin that was used to create the slot. Must be
|
||||
`!None` for physical replication.
|
||||
|
||||
If *decode* is set to `!True` the messages received from the server
|
||||
would be converted according to the connection `~connection.encoding`.
|
||||
*This parameter should not be set with physical replication or with
|
||||
logical replication plugins that produce binary output.*
|
||||
|
||||
This function constructs a ``START_REPLICATION`` command and calls
|
||||
`start_replication_expert()` internally.
|
||||
|
||||
After starting the replication, to actually consume the incoming
|
||||
server messages use `consume_stream()` or implement a loop around
|
||||
`read_message()` in case of :ref:`asynchronous connection
|
||||
<async-support>`.
|
||||
|
||||
.. method:: start_replication_expert(command, decode=False)
|
||||
|
||||
Start replication on the connection using provided
|
||||
``START_REPLICATION`` command. See `start_replication()` for
|
||||
description of *decode* parameter.
|
||||
|
||||
.. method:: consume_stream(consume, keepalive_interval=10)
|
||||
|
||||
:param consume: a callable object with signature :samp:`consume({msg})`
|
||||
:param keepalive_interval: interval (in seconds) to send keepalive
|
||||
messages to the server
|
||||
|
||||
This method can only be used with synchronous connection. For
|
||||
asynchronous connections see `read_message()`.
|
||||
|
||||
Before using this method to consume the stream call
|
||||
`start_replication()` first.
|
||||
|
||||
This method enters an endless loop reading messages from the server
|
||||
and passing them to ``consume()`` one at a time, then waiting for more
|
||||
messages from the server. In order to make this method break out of
|
||||
the loop and return, ``consume()`` can throw a `StopReplication`
|
||||
exception. Any unhandled exception will make it break out of the loop
|
||||
as well.
|
||||
|
||||
The *msg* object passed to ``consume()`` is an instance of
|
||||
`ReplicationMessage` class. See `read_message()` for details about
|
||||
message decoding.
|
||||
|
||||
This method also sends keepalive messages to the server in case there
|
||||
were no new data from the server for the duration of
|
||||
*keepalive_interval* (in seconds). The value of this parameter must
|
||||
be set to at least 1 second, but it can have a fractional part.
|
||||
|
||||
After processing certain amount of messages the client should send a
|
||||
confirmation message to the server. This should be done by calling
|
||||
`send_feedback()` method on the corresponding replication cursor. A
|
||||
reference to the cursor is provided in the `ReplicationMessage` as an
|
||||
attribute.
|
||||
|
||||
The following example is a sketch implementation of ``consume()``
|
||||
callable for logical replication::
|
||||
|
||||
class LogicalStreamConsumer(object):
|
||||
|
||||
...
|
||||
|
||||
def __call__(self, msg):
|
||||
self.process_message(msg.payload)
|
||||
|
||||
if self.should_send_feedback(msg):
|
||||
msg.cursor.send_feedback(flush_lsn=msg.data_start)
|
||||
|
||||
consumer = LogicalStreamConsumer()
|
||||
cur.consume_stream(consumer)
|
||||
|
||||
.. warning::
|
||||
|
||||
When using replication with slots, failure to constantly consume
|
||||
*and* report success to the server appropriately can eventually
|
||||
lead to "disk full" condition on the server, because the server
|
||||
retains all the WAL segments that might be needed to stream the
|
||||
changes via all of the currently open replication slots.
|
||||
|
||||
On the other hand, it is not recommended to send confirmation
|
||||
after *every* processed message, since that will put an
|
||||
unnecessary load on network and the server. A possible strategy
|
||||
is to confirm after every COMMIT message.
|
||||
|
||||
.. method:: send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False)
|
||||
|
||||
:param write_lsn: a LSN position up to which the client has written the data locally
|
||||
:param flush_lsn: a LSN position up to which the client has processed the
|
||||
data reliably (the server is allowed to discard all
|
||||
and every data that predates this LSN)
|
||||
:param apply_lsn: a LSN position up to which the warm standby server
|
||||
has applied the changes (physical replication
|
||||
master-slave protocol only)
|
||||
:param reply: request the server to send back a keepalive message immediately
|
||||
|
||||
Use this method to report to the server that all messages up to a
|
||||
certain LSN position have been processed on the client and may be
|
||||
discarded on the server.
|
||||
|
||||
This method can also be called with all default parameters' values to
|
||||
just send a keepalive message to the server.
|
||||
|
||||
Low-level replication cursor methods for :ref:`asynchronous connection
|
||||
<async-support>` operation.
|
||||
|
||||
With the synchronous connection a call to `consume_stream()` handles all
|
||||
the complexity of handling the incoming messages and sending keepalive
|
||||
replies, but at times it might be beneficial to use low-level interface
|
||||
for better control, in particular to `~select` on multiple sockets. The
|
||||
following methods are provided for asynchronous operation:
|
||||
|
||||
.. method:: read_message()
|
||||
|
||||
Try to read the next message from the server without blocking and
|
||||
return an instance of `ReplicationMessage` or `!None`, in case there
|
||||
are no more data messages from the server at the moment.
|
||||
|
||||
This method should be used in a loop with asynchronous connections
|
||||
(after calling `start_replication()` once). For synchronous
|
||||
connections see `consume_stream()`.
|
||||
|
||||
The returned message's `~ReplicationMessage.payload` is an instance of
|
||||
`!unicode` decoded according to connection `~connection.encoding`
|
||||
*iff* *decode* was set to `!True` in the initial call to
|
||||
`start_replication()` on this connection, otherwise it is an instance
|
||||
of `!bytes` with no decoding.
|
||||
|
||||
It is expected that the calling code will call this method repeatedly
|
||||
in order to consume all of the messages that might have been buffered
|
||||
until `!None` is returned. After receiving `!None` from this method
|
||||
the caller should use `~select.select()` or `~select.poll()` on the
|
||||
corresponding connection to block the process until there is more data
|
||||
from the server.
|
||||
|
||||
The server can send keepalive messages to the client periodically.
|
||||
Such messages are silently consumed by this method and are never
|
||||
reported to the caller.
|
||||
|
||||
.. method:: fileno()
|
||||
|
||||
Call the corresponding connection's `~connection.fileno()` method and
|
||||
return the result.
|
||||
|
||||
This is a convenience method which allows replication cursor to be
|
||||
used directly in `~select.select()` or `~select.poll()` calls.
|
||||
|
||||
.. attribute:: io_timestamp
|
||||
|
||||
A `~datetime` object representing the timestamp at the moment of last
|
||||
communication with the server (a data or keepalive message in either
|
||||
direction).
|
||||
|
||||
An actual example of asynchronous operation might look like this::
|
||||
|
||||
from select import select
|
||||
from datetime import datetime
|
||||
|
||||
def consume(msg):
|
||||
...
|
||||
|
||||
keepalive_interval = 10.0
|
||||
while True:
|
||||
msg = cur.read_message()
|
||||
if msg:
|
||||
consume(msg)
|
||||
else:
|
||||
now = datetime.now()
|
||||
timeout = keepalive_interval - (now - cur.io_timestamp).total_seconds()
|
||||
try:
|
||||
sel = select([cur], [], [], max(0, timeout))
|
||||
if not any(sel):
|
||||
cur.send_feedback() # timed out, send keepalive message
|
||||
except InterruptedError:
|
||||
pass # recalculate timeout and continue
|
||||
|
||||
.. index::
|
||||
pair: Cursor; Replication
|
||||
|
||||
.. autoclass:: StopReplication
|
||||
|
||||
|
||||
.. index::
|
||||
single: Data types; Additional
|
||||
|
||||
|
|
123
lib/extras.py
123
lib/extras.py
|
@ -39,8 +39,12 @@ import psycopg2
|
|||
from psycopg2 import extensions as _ext
|
||||
from psycopg2.extensions import cursor as _cursor
|
||||
from psycopg2.extensions import connection as _connection
|
||||
from psycopg2.extensions import adapt as _A
|
||||
from psycopg2.extensions import adapt as _A, quote_ident
|
||||
from psycopg2.extensions import b
|
||||
from psycopg2._psycopg import REPLICATION_PHYSICAL, REPLICATION_LOGICAL
|
||||
from psycopg2._psycopg import ReplicationConnection as _replicationConnection
|
||||
from psycopg2._psycopg import ReplicationCursor as _replicationCursor
|
||||
from psycopg2._psycopg import ReplicationMessage
|
||||
|
||||
|
||||
class DictCursorBase(_cursor):
|
||||
|
@ -437,6 +441,123 @@ class MinTimeLoggingCursor(LoggingCursor):
|
|||
return LoggingCursor.callproc(self, procname, vars)
|
||||
|
||||
|
||||
class LogicalReplicationConnection(_replicationConnection):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
kwargs['replication_type'] = REPLICATION_LOGICAL
|
||||
super(LogicalReplicationConnection, self).__init__(*args, **kwargs)
|
||||
|
||||
|
||||
class PhysicalReplicationConnection(_replicationConnection):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
kwargs['replication_type'] = REPLICATION_PHYSICAL
|
||||
super(PhysicalReplicationConnection, self).__init__(*args, **kwargs)
|
||||
|
||||
|
||||
class StopReplication(Exception):
|
||||
"""
|
||||
Exception used to break out of the endless loop in
|
||||
`~ReplicationCursor.consume_stream()`.
|
||||
|
||||
Subclass of `~exceptions.Exception`. Intentionally *not* inherited from
|
||||
`~psycopg2.Error` as occurrence of this exception does not indicate an
|
||||
error.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class ReplicationCursor(_replicationCursor):
|
||||
"""A cursor used for communication on replication connections."""
|
||||
|
||||
def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
|
||||
"""Create streaming replication slot."""
|
||||
|
||||
command = "CREATE_REPLICATION_SLOT %s " % quote_ident(slot_name, self)
|
||||
|
||||
if slot_type is None:
|
||||
slot_type = self.connection.replication_type
|
||||
|
||||
if slot_type == REPLICATION_LOGICAL:
|
||||
if output_plugin is None:
|
||||
raise psycopg2.ProgrammingError("output plugin name is required to create logical replication slot")
|
||||
|
||||
command += "LOGICAL %s" % quote_ident(output_plugin, self)
|
||||
|
||||
elif slot_type == REPLICATION_PHYSICAL:
|
||||
if output_plugin is not None:
|
||||
raise psycopg2.ProgrammingError("cannot specify output plugin name when creating physical replication slot")
|
||||
|
||||
command += "PHYSICAL"
|
||||
|
||||
else:
|
||||
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % repr(slot_type))
|
||||
|
||||
self.execute(command)
|
||||
|
||||
def drop_replication_slot(self, slot_name):
|
||||
"""Drop streaming replication slot."""
|
||||
|
||||
command = "DROP_REPLICATION_SLOT %s" % quote_ident(slot_name, self)
|
||||
self.execute(command)
|
||||
|
||||
def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
|
||||
timeline=0, options=None, decode=False):
|
||||
"""Start replication stream."""
|
||||
|
||||
command = "START_REPLICATION "
|
||||
|
||||
if slot_type is None:
|
||||
slot_type = self.connection.replication_type
|
||||
|
||||
if slot_type == REPLICATION_LOGICAL:
|
||||
if slot_name:
|
||||
command += "SLOT %s " % quote_ident(slot_name, self)
|
||||
else:
|
||||
raise psycopg2.ProgrammingError("slot name is required for logical replication")
|
||||
|
||||
command += "LOGICAL "
|
||||
|
||||
elif slot_type == REPLICATION_PHYSICAL:
|
||||
if slot_name:
|
||||
command += "SLOT %s " % quote_ident(slot_name, self)
|
||||
# don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX
|
||||
|
||||
else:
|
||||
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % repr(slot_type))
|
||||
|
||||
if type(start_lsn) is str:
|
||||
lsn = start_lsn.split('/')
|
||||
lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16))
|
||||
else:
|
||||
lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF, start_lsn & 0xFFFFFFFF)
|
||||
|
||||
command += lsn
|
||||
|
||||
if timeline != 0:
|
||||
if slot_type == REPLICATION_LOGICAL:
|
||||
raise psycopg2.ProgrammingError("cannot specify timeline for logical replication")
|
||||
|
||||
command += " TIMELINE %d" % timeline
|
||||
|
||||
if options:
|
||||
if slot_type == REPLICATION_PHYSICAL:
|
||||
raise psycopg2.ProgrammingError("cannot specify output plugin options for physical replication")
|
||||
|
||||
command += " ("
|
||||
for k,v in options.iteritems():
|
||||
if not command.endswith('('):
|
||||
command += ", "
|
||||
command += "%s %s" % (quote_ident(k, self), _A(str(v)))
|
||||
command += ")"
|
||||
|
||||
self.start_replication_expert(command, decode=decode)
|
||||
|
||||
# allows replication cursors to be used in select.select() directly
|
||||
def fileno(self):
|
||||
return self.connection.fileno()
|
||||
|
||||
|
||||
# a dbtype and adapter for Python UUID type
|
||||
|
||||
class UUID_adapter(object):
|
||||
|
|
104
psycopg/libpq_support.c
Normal file
104
psycopg/libpq_support.c
Normal file
|
@ -0,0 +1,104 @@
|
|||
/* libpq_support.c - functions not provided by libpq, but which are
|
||||
* required for advanced communication with the server, such as
|
||||
* streaming replication
|
||||
*
|
||||
* Copyright (C) 2003-2015 Federico Di Gregorio <fog@debian.org>
|
||||
*
|
||||
* This file is part of psycopg.
|
||||
*
|
||||
* psycopg2 is free software: you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Lesser General Public License as published
|
||||
* by the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* In addition, as a special exception, the copyright holders give
|
||||
* permission to link this program with the OpenSSL library (or with
|
||||
* modified versions of OpenSSL that use the same license as OpenSSL),
|
||||
* and distribute linked combinations including the two.
|
||||
*
|
||||
* You must obey the GNU Lesser General Public License in all respects for
|
||||
* all of the code used other than OpenSSL.
|
||||
*
|
||||
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
* License for more details.
|
||||
*/
|
||||
|
||||
#define PSYCOPG_MODULE
|
||||
#include "psycopg/psycopg.h"
|
||||
|
||||
#include "psycopg/libpq_support.h"
|
||||
|
||||
/* htonl(), ntohl() */
|
||||
#ifdef _WIN32
|
||||
#include <winsock2.h>
|
||||
/* gettimeofday() */
|
||||
#include "psycopg/win32_support.h"
|
||||
#else
|
||||
#include <arpa/inet.h>
|
||||
#endif
|
||||
|
||||
/* support routines taken from pg_basebackup/streamutil.c */
|
||||
|
||||
/*
|
||||
* Frontend version of GetCurrentTimestamp(), since we are not linked with
|
||||
* backend code. The protocol always uses integer timestamps, regardless of
|
||||
* server setting.
|
||||
*/
|
||||
int64_t
|
||||
feGetCurrentTimestamp(void)
|
||||
{
|
||||
int64_t result;
|
||||
struct timeval tp;
|
||||
|
||||
gettimeofday(&tp, NULL);
|
||||
|
||||
result = (int64_t) tp.tv_sec -
|
||||
((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
|
||||
|
||||
result = (result * USECS_PER_SEC) + tp.tv_usec;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
* Converts an int64 to network byte order.
|
||||
*/
|
||||
void
|
||||
fe_sendint64(int64_t i, char *buf)
|
||||
{
|
||||
uint32_t n32;
|
||||
|
||||
/* High order half first, since we're doing MSB-first */
|
||||
n32 = (uint32_t) (i >> 32);
|
||||
n32 = htonl(n32);
|
||||
memcpy(&buf[0], &n32, 4);
|
||||
|
||||
/* Now the low order half */
|
||||
n32 = (uint32_t) i;
|
||||
n32 = htonl(n32);
|
||||
memcpy(&buf[4], &n32, 4);
|
||||
}
|
||||
|
||||
/*
|
||||
* Converts an int64 from network byte order to native format.
|
||||
*/
|
||||
int64_t
|
||||
fe_recvint64(char *buf)
|
||||
{
|
||||
int64_t result;
|
||||
uint32_t h32;
|
||||
uint32_t l32;
|
||||
|
||||
memcpy(&h32, buf, 4);
|
||||
memcpy(&l32, buf + 4, 4);
|
||||
h32 = ntohl(h32);
|
||||
l32 = ntohl(l32);
|
||||
|
||||
result = h32;
|
||||
result <<= 32;
|
||||
result |= l32;
|
||||
|
||||
return result;
|
||||
}
|
48
psycopg/libpq_support.h
Normal file
48
psycopg/libpq_support.h
Normal file
|
@ -0,0 +1,48 @@
|
|||
/* libpq_support.h - definitions for libpq_support.c
|
||||
*
|
||||
* Copyright (C) 2003-2015 Federico Di Gregorio <fog@debian.org>
|
||||
*
|
||||
* This file is part of psycopg.
|
||||
*
|
||||
* psycopg2 is free software: you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Lesser General Public License as published
|
||||
* by the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* In addition, as a special exception, the copyright holders give
|
||||
* permission to link this program with the OpenSSL library (or with
|
||||
* modified versions of OpenSSL that use the same license as OpenSSL),
|
||||
* and distribute linked combinations including the two.
|
||||
*
|
||||
* You must obey the GNU Lesser General Public License in all respects for
|
||||
* all of the code used other than OpenSSL.
|
||||
*
|
||||
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
* License for more details.
|
||||
*/
|
||||
#ifndef PSYCOPG_LIBPQ_SUPPORT_H
|
||||
#define PSYCOPG_LIBPQ_SUPPORT_H 1
|
||||
|
||||
#include "psycopg/config.h"
|
||||
|
||||
/* type and constant definitions from internal postgres include */
|
||||
typedef unsigned PG_INT64_TYPE XLogRecPtr;
|
||||
|
||||
/* have to use lowercase %x, as PyString_FromFormat can't do %X */
|
||||
#define XLOGFMTSTR "%x/%x"
|
||||
#define XLOGFMTARGS(x) ((uint32_t)((x) >> 32)), ((uint32_t)((x) & 0xFFFFFFFF))
|
||||
|
||||
/* Julian-date equivalents of Day 0 in Unix and Postgres reckoning */
|
||||
#define UNIX_EPOCH_JDATE 2440588 /* == date2j(1970, 1, 1) */
|
||||
#define POSTGRES_EPOCH_JDATE 2451545 /* == date2j(2000, 1, 1) */
|
||||
|
||||
#define SECS_PER_DAY 86400
|
||||
#define USECS_PER_SEC 1000000LL
|
||||
|
||||
HIDDEN int64_t feGetCurrentTimestamp(void);
|
||||
HIDDEN void fe_sendint64(int64_t i, char *buf);
|
||||
HIDDEN int64_t fe_recvint64(char *buf);
|
||||
|
||||
#endif /* !defined(PSYCOPG_LIBPQ_SUPPORT_H) */
|
304
psycopg/pqpath.c
304
psycopg/pqpath.c
|
@ -35,13 +35,22 @@
|
|||
#include "psycopg/pqpath.h"
|
||||
#include "psycopg/connection.h"
|
||||
#include "psycopg/cursor.h"
|
||||
#include "psycopg/replication_cursor.h"
|
||||
#include "psycopg/replication_message.h"
|
||||
#include "psycopg/green.h"
|
||||
#include "psycopg/typecast.h"
|
||||
#include "psycopg/pgtypes.h"
|
||||
#include "psycopg/error.h"
|
||||
|
||||
#include <string.h>
|
||||
#include "psycopg/libpq_support.h"
|
||||
#include "libpq-fe.h"
|
||||
|
||||
#ifdef _WIN32
|
||||
/* select() */
|
||||
#include <winsock2.h>
|
||||
/* gettimeofday() */
|
||||
#include "win32_support.h"
|
||||
#endif
|
||||
|
||||
extern HIDDEN PyObject *psyco_DescriptionType;
|
||||
|
||||
|
@ -1056,6 +1065,13 @@ pq_get_last_result(connectionObject *conn)
|
|||
PQclear(result);
|
||||
}
|
||||
result = res;
|
||||
|
||||
/* After entering copy both mode, libpq will make a phony
|
||||
* PGresult for us every time we query for it, so we need to
|
||||
* break out of this endless loop. */
|
||||
if (PQresultStatus(result) == PGRES_COPY_BOTH) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
|
@ -1520,6 +1536,281 @@ exit:
|
|||
return ret;
|
||||
}
|
||||
|
||||
/* Tries to read the next message from the replication stream, without
|
||||
blocking, in both sync and async connection modes. If no message
|
||||
is ready in the CopyData buffer, tries to read from the server,
|
||||
again without blocking. If that doesn't help, returns Py_None.
|
||||
The caller is then supposed to block on the socket(s) and call this
|
||||
function again.
|
||||
|
||||
Any keepalive messages from the server are silently consumed and
|
||||
are never returned to the caller.
|
||||
*/
|
||||
int
|
||||
pq_read_replication_message(replicationCursorObject *repl, replicationMessageObject **msg)
|
||||
{
|
||||
cursorObject *curs = &repl->cur;
|
||||
connectionObject *conn = curs->conn;
|
||||
PGconn *pgconn = conn->pgconn;
|
||||
char *buffer = NULL;
|
||||
int len, data_size, consumed, hdr, reply;
|
||||
XLogRecPtr data_start, wal_end;
|
||||
int64_t send_time;
|
||||
PyObject *str = NULL, *result = NULL;
|
||||
int ret = -1;
|
||||
|
||||
Dprintf("pq_read_replication_message");
|
||||
|
||||
*msg = NULL;
|
||||
consumed = 0;
|
||||
|
||||
retry:
|
||||
len = PQgetCopyData(pgconn, &buffer, 1 /* async */);
|
||||
|
||||
if (len == 0) {
|
||||
/* If we've tried reading some data, but there was none, bail out. */
|
||||
if (consumed) {
|
||||
ret = 0;
|
||||
goto exit;
|
||||
}
|
||||
/* We should only try reading more data when there is nothing
|
||||
available at the moment. Otherwise, with a really highly loaded
|
||||
server we might be reading a number of messages for every single
|
||||
one we process, thus overgrowing the internal buffer until the
|
||||
client system runs out of memory. */
|
||||
if (!PQconsumeInput(pgconn)) {
|
||||
pq_raise(conn, curs, NULL);
|
||||
goto exit;
|
||||
}
|
||||
/* But PQconsumeInput() doesn't tell us if it has actually read
|
||||
anything into the internal buffer and there is no (supported) way
|
||||
to ask libpq about this directly. The way we check is setting the
|
||||
flag and re-trying PQgetCopyData(): if that returns 0 again,
|
||||
there's no more data available in the buffer, so we return None. */
|
||||
consumed = 1;
|
||||
goto retry;
|
||||
}
|
||||
|
||||
if (len == -2) {
|
||||
/* serious error */
|
||||
pq_raise(conn, curs, NULL);
|
||||
goto exit;
|
||||
}
|
||||
if (len == -1) {
|
||||
/* EOF */
|
||||
curs->pgres = PQgetResult(pgconn);
|
||||
|
||||
if (curs->pgres && PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) {
|
||||
pq_raise(conn, curs, NULL);
|
||||
goto exit;
|
||||
}
|
||||
|
||||
CLEARPGRES(curs->pgres);
|
||||
ret = 0;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
/* It also makes sense to set this flag here to make us return early in
|
||||
case of retry due to keepalive message. Any pending data on the socket
|
||||
will trigger read condition in select() in the calling code anyway. */
|
||||
consumed = 1;
|
||||
|
||||
/* ok, we did really read something: update the io timestamp */
|
||||
gettimeofday(&repl->last_io, NULL);
|
||||
|
||||
Dprintf("pq_read_replication_message: msg=%c, len=%d", buffer[0], len);
|
||||
if (buffer[0] == 'w') {
|
||||
/* XLogData: msgtype(1), dataStart(8), walEnd(8), sendTime(8) */
|
||||
hdr = 1 + 8 + 8 + 8;
|
||||
if (len < hdr + 1) {
|
||||
psyco_set_error(OperationalError, curs, "data message header too small");
|
||||
goto exit;
|
||||
}
|
||||
|
||||
data_size = len - hdr;
|
||||
data_start = fe_recvint64(buffer + 1);
|
||||
wal_end = fe_recvint64(buffer + 1 + 8);
|
||||
send_time = fe_recvint64(buffer + 1 + 8 + 8);
|
||||
|
||||
Dprintf("pq_read_replication_message: data_start="XLOGFMTSTR", wal_end="XLOGFMTSTR,
|
||||
XLOGFMTARGS(data_start), XLOGFMTARGS(wal_end));
|
||||
|
||||
Dprintf("pq_read_replication_message: >>%.*s<<", data_size, buffer + hdr);
|
||||
|
||||
if (repl->decode) {
|
||||
str = PyUnicode_Decode(buffer + hdr, data_size, conn->codec, NULL);
|
||||
} else {
|
||||
str = Bytes_FromStringAndSize(buffer + hdr, data_size);
|
||||
}
|
||||
if (!str) { goto exit; }
|
||||
|
||||
result = PyObject_CallFunctionObjArgs((PyObject *)&replicationMessageType,
|
||||
curs, str, NULL);
|
||||
Py_DECREF(str);
|
||||
if (!result) { goto exit; }
|
||||
|
||||
*msg = (replicationMessageObject *)result;
|
||||
(*msg)->data_size = data_size;
|
||||
(*msg)->data_start = data_start;
|
||||
(*msg)->wal_end = wal_end;
|
||||
(*msg)->send_time = send_time;
|
||||
}
|
||||
else if (buffer[0] == 'k') {
|
||||
/* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */
|
||||
hdr = 1 + 8 + 8;
|
||||
if (len < hdr + 1) {
|
||||
psyco_set_error(OperationalError, curs, "keepalive message header too small");
|
||||
goto exit;
|
||||
}
|
||||
|
||||
reply = buffer[hdr];
|
||||
if (reply && pq_send_replication_feedback(repl, 0) < 0) {
|
||||
goto exit;
|
||||
}
|
||||
|
||||
PQfreemem(buffer);
|
||||
buffer = NULL;
|
||||
goto retry;
|
||||
}
|
||||
else {
|
||||
psyco_set_error(OperationalError, curs, "unrecognized replication message type");
|
||||
goto exit;
|
||||
}
|
||||
|
||||
ret = 0;
|
||||
|
||||
exit:
|
||||
if (buffer) {
|
||||
PQfreemem(buffer);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int
|
||||
pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested)
|
||||
{
|
||||
cursorObject *curs = &repl->cur;
|
||||
connectionObject *conn = curs->conn;
|
||||
PGconn *pgconn = conn->pgconn;
|
||||
char replybuf[1 + 8 + 8 + 8 + 8 + 1];
|
||||
int len = 0;
|
||||
|
||||
Dprintf("pq_send_replication_feedback: write="XLOGFMTSTR", flush="XLOGFMTSTR", apply="XLOGFMTSTR,
|
||||
XLOGFMTARGS(repl->write_lsn),
|
||||
XLOGFMTARGS(repl->flush_lsn),
|
||||
XLOGFMTARGS(repl->apply_lsn));
|
||||
|
||||
replybuf[len] = 'r'; len += 1;
|
||||
fe_sendint64(repl->write_lsn, &replybuf[len]); len += 8;
|
||||
fe_sendint64(repl->flush_lsn, &replybuf[len]); len += 8;
|
||||
fe_sendint64(repl->apply_lsn, &replybuf[len]); len += 8;
|
||||
fe_sendint64(feGetCurrentTimestamp(), &replybuf[len]); len += 8;
|
||||
replybuf[len] = reply_requested ? 1 : 0; len += 1;
|
||||
|
||||
if (PQputCopyData(pgconn, replybuf, len) <= 0 || PQflush(pgconn) != 0) {
|
||||
pq_raise(conn, curs, NULL);
|
||||
return -1;
|
||||
}
|
||||
gettimeofday(&repl->last_io, NULL);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Calls pq_read_replication_message in an endless loop, until
|
||||
stop_replication is called or a fatal error occurs. The messages
|
||||
are passed to the consumer object.
|
||||
|
||||
When no message is available, blocks on the connection socket, but
|
||||
manages to send keepalive messages to the server as needed.
|
||||
*/
|
||||
int
|
||||
pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_interval)
|
||||
{
|
||||
cursorObject *curs = &repl->cur;
|
||||
connectionObject *conn = curs->conn;
|
||||
PGconn *pgconn = conn->pgconn;
|
||||
replicationMessageObject *msg = NULL;
|
||||
PyObject *tmp = NULL;
|
||||
int fd, sel, ret = -1;
|
||||
fd_set fds;
|
||||
struct timeval keep_intr, curr_time, ping_time, timeout;
|
||||
|
||||
if (!PyCallable_Check(consume)) {
|
||||
Dprintf("pq_copy_both: expected callable consume object");
|
||||
goto exit;
|
||||
}
|
||||
|
||||
CLEARPGRES(curs->pgres);
|
||||
|
||||
keep_intr.tv_sec = (int)keepalive_interval;
|
||||
keep_intr.tv_usec = (keepalive_interval - keep_intr.tv_sec)*1.0e6;
|
||||
|
||||
while (1) {
|
||||
if (pq_read_replication_message(repl, &msg) < 0) {
|
||||
goto exit;
|
||||
}
|
||||
else if (msg == NULL) {
|
||||
fd = PQsocket(pgconn);
|
||||
if (fd < 0) {
|
||||
pq_raise(conn, curs, NULL);
|
||||
goto exit;
|
||||
}
|
||||
|
||||
FD_ZERO(&fds);
|
||||
FD_SET(fd, &fds);
|
||||
|
||||
/* how long can we wait before we need to send a keepalive? */
|
||||
gettimeofday(&curr_time, NULL);
|
||||
|
||||
timeradd(&repl->last_io, &keep_intr, &ping_time);
|
||||
timersub(&ping_time, &curr_time, &timeout);
|
||||
|
||||
if (timeout.tv_sec >= 0) {
|
||||
Py_BEGIN_ALLOW_THREADS;
|
||||
sel = select(fd + 1, &fds, NULL, NULL, &timeout);
|
||||
Py_END_ALLOW_THREADS;
|
||||
}
|
||||
else {
|
||||
sel = 0; /* we're past target time, pretend select() timed out */
|
||||
}
|
||||
|
||||
if (sel < 0) {
|
||||
if (errno != EINTR) {
|
||||
PyErr_SetFromErrno(PyExc_OSError);
|
||||
goto exit;
|
||||
}
|
||||
if (PyErr_CheckSignals()) {
|
||||
goto exit;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (sel == 0) {
|
||||
if (pq_send_replication_feedback(repl, 0) < 0) {
|
||||
goto exit;
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
else {
|
||||
tmp = PyObject_CallFunctionObjArgs(consume, msg, NULL);
|
||||
Py_DECREF(msg);
|
||||
|
||||
if (tmp == NULL) {
|
||||
Dprintf("pq_copy_both: consume returned NULL");
|
||||
goto exit;
|
||||
}
|
||||
Py_DECREF(tmp);
|
||||
}
|
||||
}
|
||||
|
||||
ret = 1;
|
||||
|
||||
exit:
|
||||
return ret;
|
||||
}
|
||||
|
||||
int
|
||||
pq_fetch(cursorObject *curs, int no_result)
|
||||
{
|
||||
|
@ -1579,6 +1870,17 @@ pq_fetch(cursorObject *curs, int no_result)
|
|||
CLEARPGRES(curs->pgres);
|
||||
break;
|
||||
|
||||
case PGRES_COPY_BOTH:
|
||||
Dprintf("pq_fetch: data from a streaming replication slot (no tuples)");
|
||||
curs->rowcount = -1;
|
||||
ex = 0;
|
||||
/* Nothing to do here: pq_copy_both will be called separately.
|
||||
|
||||
Also don't clear the result status: it's checked in
|
||||
consume_stream. */
|
||||
/*CLEARPGRES(curs->pgres);*/
|
||||
break;
|
||||
|
||||
case PGRES_TUPLES_OK:
|
||||
if (!no_result) {
|
||||
Dprintf("pq_fetch: got tuples");
|
||||
|
|
|
@ -28,6 +28,8 @@
|
|||
|
||||
#include "psycopg/cursor.h"
|
||||
#include "psycopg/connection.h"
|
||||
#include "psycopg/replication_cursor.h"
|
||||
#include "psycopg/replication_message.h"
|
||||
|
||||
/* macro to clean the pg result */
|
||||
#define CLEARPGRES(pgres) do { PQclear(pgres); pgres = NULL; } while (0)
|
||||
|
@ -72,4 +74,11 @@ HIDDEN int pq_execute_command_locked(connectionObject *conn,
|
|||
RAISES HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres,
|
||||
char **error);
|
||||
|
||||
/* replication protocol support */
|
||||
HIDDEN int pq_copy_both(replicationCursorObject *repl, PyObject *consumer,
|
||||
double keepalive_interval);
|
||||
HIDDEN int pq_read_replication_message(replicationCursorObject *repl,
|
||||
replicationMessageObject **msg);
|
||||
HIDDEN int pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested);
|
||||
|
||||
#endif /* !defined(PSYCOPG_PQPATH_H) */
|
||||
|
|
|
@ -117,6 +117,7 @@ HIDDEN PyObject *psyco_GetDecimalType(void);
|
|||
/* forward declarations */
|
||||
typedef struct cursorObject cursorObject;
|
||||
typedef struct connectionObject connectionObject;
|
||||
typedef struct replicationMessageObject replicationMessageObject;
|
||||
|
||||
/* some utility functions */
|
||||
RAISES HIDDEN PyObject *psyco_set_error(PyObject *exc, cursorObject *curs, const char *msg);
|
||||
|
|
|
@ -28,6 +28,9 @@
|
|||
|
||||
#include "psycopg/connection.h"
|
||||
#include "psycopg/cursor.h"
|
||||
#include "psycopg/replication_connection.h"
|
||||
#include "psycopg/replication_cursor.h"
|
||||
#include "psycopg/replication_message.h"
|
||||
#include "psycopg/green.h"
|
||||
#include "psycopg/lobject.h"
|
||||
#include "psycopg/notify.h"
|
||||
|
@ -112,6 +115,7 @@ psyco_connect(PyObject *self, PyObject *args, PyObject *keywds)
|
|||
return conn;
|
||||
}
|
||||
|
||||
|
||||
#define psyco_parse_dsn_doc \
|
||||
"parse_dsn(dsn) -> dict -- parse a connection string into parameters"
|
||||
|
||||
|
@ -901,6 +905,15 @@ INIT_MODULE(_psycopg)(void)
|
|||
Py_TYPE(&cursorType) = &PyType_Type;
|
||||
if (PyType_Ready(&cursorType) == -1) goto exit;
|
||||
|
||||
Py_TYPE(&replicationConnectionType) = &PyType_Type;
|
||||
if (PyType_Ready(&replicationConnectionType) == -1) goto exit;
|
||||
|
||||
Py_TYPE(&replicationCursorType) = &PyType_Type;
|
||||
if (PyType_Ready(&replicationCursorType) == -1) goto exit;
|
||||
|
||||
Py_TYPE(&replicationMessageType) = &PyType_Type;
|
||||
if (PyType_Ready(&replicationMessageType) == -1) goto exit;
|
||||
|
||||
Py_TYPE(&typecastType) = &PyType_Type;
|
||||
if (PyType_Ready(&typecastType) == -1) goto exit;
|
||||
|
||||
|
@ -981,6 +994,8 @@ INIT_MODULE(_psycopg)(void)
|
|||
/* Initialize the PyDateTimeAPI everywhere is used */
|
||||
PyDateTime_IMPORT;
|
||||
if (psyco_adapter_datetime_init()) { goto exit; }
|
||||
if (psyco_repl_curs_datetime_init()) { goto exit; }
|
||||
if (psyco_replmsg_datetime_init()) { goto exit; }
|
||||
|
||||
Py_TYPE(&pydatetimeType) = &PyType_Type;
|
||||
if (PyType_Ready(&pydatetimeType) == -1) goto exit;
|
||||
|
@ -1016,6 +1031,8 @@ INIT_MODULE(_psycopg)(void)
|
|||
PyModule_AddStringConstant(module, "__version__", PSYCOPG_VERSION);
|
||||
PyModule_AddStringConstant(module, "__doc__", "psycopg PostgreSQL driver");
|
||||
PyModule_AddIntConstant(module, "__libpq_version__", PG_VERSION_NUM);
|
||||
PyModule_AddIntMacro(module, REPLICATION_PHYSICAL);
|
||||
PyModule_AddIntMacro(module, REPLICATION_LOGICAL);
|
||||
PyModule_AddObject(module, "apilevel", Text_FromUTF8(APILEVEL));
|
||||
PyModule_AddObject(module, "threadsafety", PyInt_FromLong(THREADSAFETY));
|
||||
PyModule_AddObject(module, "paramstyle", Text_FromUTF8(PARAMSTYLE));
|
||||
|
@ -1023,6 +1040,9 @@ INIT_MODULE(_psycopg)(void)
|
|||
/* put new types in module dictionary */
|
||||
PyModule_AddObject(module, "connection", (PyObject*)&connectionType);
|
||||
PyModule_AddObject(module, "cursor", (PyObject*)&cursorType);
|
||||
PyModule_AddObject(module, "ReplicationConnection", (PyObject*)&replicationConnectionType);
|
||||
PyModule_AddObject(module, "ReplicationCursor", (PyObject*)&replicationCursorType);
|
||||
PyModule_AddObject(module, "ReplicationMessage", (PyObject*)&replicationMessageType);
|
||||
PyModule_AddObject(module, "ISQLQuote", (PyObject*)&isqlquoteType);
|
||||
PyModule_AddObject(module, "Notify", (PyObject*)¬ifyType);
|
||||
PyModule_AddObject(module, "Xid", (PyObject*)&xidType);
|
||||
|
@ -1062,6 +1082,9 @@ INIT_MODULE(_psycopg)(void)
|
|||
if (0 != psyco_errors_init()) { goto exit; }
|
||||
psyco_errors_fill(dict);
|
||||
|
||||
replicationPhysicalConst = PyDict_GetItemString(dict, "REPLICATION_PHYSICAL");
|
||||
replicationLogicalConst = PyDict_GetItemString(dict, "REPLICATION_LOGICAL");
|
||||
|
||||
Dprintf("initpsycopg: module initialization complete");
|
||||
|
||||
exit:
|
||||
|
|
55
psycopg/replication_connection.h
Normal file
55
psycopg/replication_connection.h
Normal file
|
@ -0,0 +1,55 @@
|
|||
/* replication_connection.h - definition for the psycopg replication connection type
|
||||
*
|
||||
* Copyright (C) 2015 Daniele Varrazzo <daniele.varrazzo@gmail.com>
|
||||
*
|
||||
* This file is part of psycopg.
|
||||
*
|
||||
* psycopg2 is free software: you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Lesser General Public License as published
|
||||
* by the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* In addition, as a special exception, the copyright holders give
|
||||
* permission to link this program with the OpenSSL library (or with
|
||||
* modified versions of OpenSSL that use the same license as OpenSSL),
|
||||
* and distribute linked combinations including the two.
|
||||
*
|
||||
* You must obey the GNU Lesser General Public License in all respects for
|
||||
* all of the code used other than OpenSSL.
|
||||
*
|
||||
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
* License for more details.
|
||||
*/
|
||||
|
||||
#ifndef PSYCOPG_REPLICATION_CONNECTION_H
|
||||
#define PSYCOPG_REPLICATION_CONNECTION_H 1
|
||||
|
||||
#include "psycopg/connection.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
extern HIDDEN PyTypeObject replicationConnectionType;
|
||||
|
||||
typedef struct replicationConnectionObject {
|
||||
connectionObject conn;
|
||||
|
||||
long int type;
|
||||
} replicationConnectionObject;
|
||||
|
||||
/* The funny constant values should help to avoid mixups with some
|
||||
commonly used numbers like 1 and 2. */
|
||||
#define REPLICATION_PHYSICAL 12345678
|
||||
#define REPLICATION_LOGICAL 87654321
|
||||
|
||||
extern HIDDEN PyObject *replicationPhysicalConst;
|
||||
extern HIDDEN PyObject *replicationLogicalConst;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /* !defined(PSYCOPG_REPLICATION_CONNECTION_H) */
|
221
psycopg/replication_connection_type.c
Normal file
221
psycopg/replication_connection_type.c
Normal file
|
@ -0,0 +1,221 @@
|
|||
/* replication_connection_type.c - python interface to replication connection objects
|
||||
*
|
||||
* Copyright (C) 2015 Daniele Varrazzo <daniele.varrazzo@gmail.com>
|
||||
*
|
||||
* This file is part of psycopg.
|
||||
*
|
||||
* psycopg2 is free software: you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Lesser General Public License as published
|
||||
* by the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* In addition, as a special exception, the copyright holders give
|
||||
* permission to link this program with the OpenSSL library (or with
|
||||
* modified versions of OpenSSL that use the same license as OpenSSL),
|
||||
* and distribute linked combinations including the two.
|
||||
*
|
||||
* You must obey the GNU Lesser General Public License in all respects for
|
||||
* all of the code used other than OpenSSL.
|
||||
*
|
||||
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
* License for more details.
|
||||
*/
|
||||
|
||||
#define PSYCOPG_MODULE
|
||||
#include "psycopg/psycopg.h"
|
||||
|
||||
#include "psycopg/replication_connection.h"
|
||||
#include "psycopg/replication_message.h"
|
||||
#include "psycopg/green.h"
|
||||
#include "psycopg/pqpath.h"
|
||||
|
||||
#include <string.h>
|
||||
#include <stdlib.h>
|
||||
|
||||
|
||||
#define psyco_repl_conn_type_doc \
|
||||
"replication_type -- the replication connection type"
|
||||
|
||||
static PyObject *
|
||||
psyco_repl_conn_get_type(replicationConnectionObject *self)
|
||||
{
|
||||
connectionObject *conn = &self->conn;
|
||||
PyObject *res = NULL;
|
||||
|
||||
EXC_IF_CONN_CLOSED(conn);
|
||||
|
||||
if (self->type == REPLICATION_PHYSICAL) {
|
||||
res = replicationPhysicalConst;
|
||||
} else if (self->type == REPLICATION_LOGICAL) {
|
||||
res = replicationLogicalConst;
|
||||
} else {
|
||||
PyErr_Format(PyExc_TypeError, "unknown replication type constant: %ld", self->type);
|
||||
}
|
||||
|
||||
Py_XINCREF(res);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
replicationConnection_init(PyObject *obj, PyObject *args, PyObject *kwargs)
|
||||
{
|
||||
replicationConnectionObject *self = (replicationConnectionObject *)obj;
|
||||
PyObject *dsn = NULL, *replication_type = NULL,
|
||||
*item = NULL, *ext = NULL, *make_dsn = NULL,
|
||||
*extras = NULL, *cursor = NULL;
|
||||
int async = 0;
|
||||
int ret = -1;
|
||||
|
||||
/* 'replication_type' is not actually optional, but there's no
|
||||
good way to put it before 'async' in the list */
|
||||
static char *kwlist[] = {"dsn", "async", "replication_type", NULL};
|
||||
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|iO", kwlist,
|
||||
&dsn, &async, &replication_type)) { return ret; }
|
||||
|
||||
/*
|
||||
We have to call make_dsn() to add replication-specific
|
||||
connection parameters, because the DSN might be an URI (if there
|
||||
were no keyword arguments to connect() it is passed unchanged).
|
||||
*/
|
||||
/* we reuse args and kwargs to call make_dsn() and parent type's tp_init() */
|
||||
if (!(kwargs = PyDict_New())) { return ret; }
|
||||
Py_INCREF(args);
|
||||
|
||||
/* we also reuse the dsn to hold the result of the make_dsn() call */
|
||||
Py_INCREF(dsn);
|
||||
|
||||
if (!(ext = PyImport_ImportModule("psycopg2.extensions"))) { goto exit; }
|
||||
if (!(make_dsn = PyObject_GetAttrString(ext, "make_dsn"))) { goto exit; }
|
||||
|
||||
/* all the nice stuff is located in python-level ReplicationCursor class */
|
||||
if (!(extras = PyImport_ImportModule("psycopg2.extras"))) { goto exit; }
|
||||
if (!(cursor = PyObject_GetAttrString(extras, "ReplicationCursor"))) { goto exit; }
|
||||
|
||||
/* checking the object reference helps to avoid recognizing
|
||||
unrelated integer constants as valid input values */
|
||||
if (replication_type == replicationPhysicalConst) {
|
||||
self->type = REPLICATION_PHYSICAL;
|
||||
|
||||
#define SET_ITEM(k, v) \
|
||||
if (!(item = Text_FromUTF8(#v))) { goto exit; } \
|
||||
if (PyDict_SetItemString(kwargs, #k, item) != 0) { goto exit; } \
|
||||
Py_DECREF(item); \
|
||||
item = NULL;
|
||||
|
||||
SET_ITEM(replication, true);
|
||||
SET_ITEM(dbname, replication); /* required for .pgpass lookup */
|
||||
} else if (replication_type == replicationLogicalConst) {
|
||||
self->type = REPLICATION_LOGICAL;
|
||||
|
||||
SET_ITEM(replication, database);
|
||||
#undef SET_ITEM
|
||||
} else {
|
||||
PyErr_SetString(PyExc_TypeError,
|
||||
"replication_type must be either REPLICATION_PHYSICAL or REPLICATION_LOGICAL");
|
||||
goto exit;
|
||||
}
|
||||
|
||||
Py_DECREF(args);
|
||||
if (!(args = PyTuple_Pack(1, dsn))) { goto exit; }
|
||||
|
||||
Py_DECREF(dsn);
|
||||
if (!(dsn = PyObject_Call(make_dsn, args, kwargs))) { goto exit; }
|
||||
|
||||
Py_DECREF(args);
|
||||
if (!(args = Py_BuildValue("(Oi)", dsn, async))) { goto exit; }
|
||||
|
||||
/* only attempt the connection once we've handled all possible errors */
|
||||
if ((ret = connectionType.tp_init(obj, args, NULL)) < 0) { goto exit; }
|
||||
|
||||
self->conn.autocommit = 1;
|
||||
Py_INCREF(self->conn.cursor_factory = cursor);
|
||||
|
||||
exit:
|
||||
Py_XDECREF(item);
|
||||
Py_XDECREF(ext);
|
||||
Py_XDECREF(make_dsn);
|
||||
Py_XDECREF(extras);
|
||||
Py_XDECREF(cursor);
|
||||
Py_XDECREF(dsn);
|
||||
Py_XDECREF(args);
|
||||
Py_XDECREF(kwargs);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
replicationConnection_repr(replicationConnectionObject *self)
|
||||
{
|
||||
return PyString_FromFormat(
|
||||
"<ReplicationConnection object at %p; dsn: '%s', closed: %ld>",
|
||||
self, self->conn.dsn, self->conn.closed);
|
||||
}
|
||||
|
||||
|
||||
/* object calculated member list */
|
||||
|
||||
static struct PyGetSetDef replicationConnectionObject_getsets[] = {
|
||||
/* override to prevent user tweaking these: */
|
||||
{ "autocommit", NULL, NULL, NULL },
|
||||
{ "isolation_level", NULL, NULL, NULL },
|
||||
{ "set_session", NULL, NULL, NULL },
|
||||
{ "set_isolation_level", NULL, NULL, NULL },
|
||||
{ "reset", NULL, NULL, NULL },
|
||||
/* an actual getter */
|
||||
{ "replication_type",
|
||||
(getter)psyco_repl_conn_get_type, NULL,
|
||||
psyco_repl_conn_type_doc, NULL },
|
||||
{NULL}
|
||||
};
|
||||
|
||||
/* object type */
|
||||
|
||||
#define replicationConnectionType_doc \
|
||||
"A replication connection."
|
||||
|
||||
PyTypeObject replicationConnectionType = {
|
||||
PyVarObject_HEAD_INIT(NULL, 0)
|
||||
"psycopg2.extensions.ReplicationConnection",
|
||||
sizeof(replicationConnectionObject), 0,
|
||||
0, /*tp_dealloc*/
|
||||
0, /*tp_print*/
|
||||
0, /*tp_getattr*/
|
||||
0, /*tp_setattr*/
|
||||
0, /*tp_compare*/
|
||||
(reprfunc)replicationConnection_repr, /*tp_repr*/
|
||||
0, /*tp_as_number*/
|
||||
0, /*tp_as_sequence*/
|
||||
0, /*tp_as_mapping*/
|
||||
0, /*tp_hash*/
|
||||
0, /*tp_call*/
|
||||
(reprfunc)replicationConnection_repr, /*tp_str*/
|
||||
0, /*tp_getattro*/
|
||||
0, /*tp_setattro*/
|
||||
0, /*tp_as_buffer*/
|
||||
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER |
|
||||
Py_TPFLAGS_HAVE_GC, /*tp_flags*/
|
||||
replicationConnectionType_doc, /*tp_doc*/
|
||||
0, /*tp_traverse*/
|
||||
0, /*tp_clear*/
|
||||
0, /*tp_richcompare*/
|
||||
0, /*tp_weaklistoffset*/
|
||||
0, /*tp_iter*/
|
||||
0, /*tp_iternext*/
|
||||
0, /*tp_methods*/
|
||||
0, /*tp_members*/
|
||||
replicationConnectionObject_getsets, /*tp_getset*/
|
||||
&connectionType, /*tp_base*/
|
||||
0, /*tp_dict*/
|
||||
0, /*tp_descr_get*/
|
||||
0, /*tp_descr_set*/
|
||||
0, /*tp_dictoffset*/
|
||||
replicationConnection_init, /*tp_init*/
|
||||
0, /*tp_alloc*/
|
||||
0, /*tp_new*/
|
||||
};
|
||||
|
||||
PyObject *replicationPhysicalConst;
|
||||
PyObject *replicationLogicalConst;
|
59
psycopg/replication_cursor.h
Normal file
59
psycopg/replication_cursor.h
Normal file
|
@ -0,0 +1,59 @@
|
|||
/* replication_cursor.h - definition for the psycopg replication cursor type
|
||||
*
|
||||
* Copyright (C) 2015 Daniele Varrazzo <daniele.varrazzo@gmail.com>
|
||||
*
|
||||
* This file is part of psycopg.
|
||||
*
|
||||
* psycopg2 is free software: you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Lesser General Public License as published
|
||||
* by the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* In addition, as a special exception, the copyright holders give
|
||||
* permission to link this program with the OpenSSL library (or with
|
||||
* modified versions of OpenSSL that use the same license as OpenSSL),
|
||||
* and distribute linked combinations including the two.
|
||||
*
|
||||
* You must obey the GNU Lesser General Public License in all respects for
|
||||
* all of the code used other than OpenSSL.
|
||||
*
|
||||
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
* License for more details.
|
||||
*/
|
||||
|
||||
#ifndef PSYCOPG_REPLICATION_CURSOR_H
|
||||
#define PSYCOPG_REPLICATION_CURSOR_H 1
|
||||
|
||||
#include "psycopg/cursor.h"
|
||||
#include "libpq_support.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
extern HIDDEN PyTypeObject replicationCursorType;
|
||||
|
||||
typedef struct replicationCursorObject {
|
||||
cursorObject cur;
|
||||
|
||||
int consuming:1; /* if running the consume loop */
|
||||
int decode:1; /* if we should use character decoding on the messages */
|
||||
|
||||
struct timeval last_io; /* timestamp of the last exchange with the server */
|
||||
struct timeval keepalive_interval; /* interval for keepalive messages in replication mode */
|
||||
|
||||
XLogRecPtr write_lsn; /* LSNs for replication feedback messages */
|
||||
XLogRecPtr flush_lsn;
|
||||
XLogRecPtr apply_lsn;
|
||||
} replicationCursorObject;
|
||||
|
||||
|
||||
RAISES_NEG int psyco_repl_curs_datetime_init(void);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /* !defined(PSYCOPG_REPLICATION_CURSOR_H) */
|
315
psycopg/replication_cursor_type.c
Normal file
315
psycopg/replication_cursor_type.c
Normal file
|
@ -0,0 +1,315 @@
|
|||
/* replication_cursor_type.c - python interface to replication cursor objects
|
||||
*
|
||||
* Copyright (C) 2015 Daniele Varrazzo <daniele.varrazzo@gmail.com>
|
||||
*
|
||||
* This file is part of psycopg.
|
||||
*
|
||||
* psycopg2 is free software: you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Lesser General Public License as published
|
||||
* by the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* In addition, as a special exception, the copyright holders give
|
||||
* permission to link this program with the OpenSSL library (or with
|
||||
* modified versions of OpenSSL that use the same license as OpenSSL),
|
||||
* and distribute linked combinations including the two.
|
||||
*
|
||||
* You must obey the GNU Lesser General Public License in all respects for
|
||||
* all of the code used other than OpenSSL.
|
||||
*
|
||||
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
* License for more details.
|
||||
*/
|
||||
|
||||
#define PSYCOPG_MODULE
|
||||
#include "psycopg/psycopg.h"
|
||||
|
||||
#include "psycopg/replication_cursor.h"
|
||||
#include "psycopg/replication_message.h"
|
||||
#include "psycopg/green.h"
|
||||
#include "psycopg/pqpath.h"
|
||||
|
||||
#include <string.h>
|
||||
#include <stdlib.h>
|
||||
|
||||
/* python */
|
||||
#include "datetime.h"
|
||||
|
||||
|
||||
#define psyco_repl_curs_start_replication_expert_doc \
|
||||
"start_replication_expert(command, decode=False) -- Start replication with a given command."
|
||||
|
||||
static PyObject *
|
||||
psyco_repl_curs_start_replication_expert(replicationCursorObject *self,
|
||||
PyObject *args, PyObject *kwargs)
|
||||
{
|
||||
cursorObject *curs = &self->cur;
|
||||
connectionObject *conn = self->cur.conn;
|
||||
PyObject *res = NULL;
|
||||
char *command;
|
||||
long int decode = 0;
|
||||
static char *kwlist[] = {"command", "decode", NULL};
|
||||
|
||||
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|l", kwlist, &command, &decode)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
EXC_IF_CURS_CLOSED(curs);
|
||||
EXC_IF_GREEN(start_replication_expert);
|
||||
EXC_IF_TPC_PREPARED(conn, start_replication_expert);
|
||||
|
||||
Dprintf("psyco_repl_curs_start_replication_expert: '%s'; decode: %d", command, decode);
|
||||
|
||||
if (pq_execute(curs, command, conn->async, 1 /* no_result */, 1 /* no_begin */) >= 0) {
|
||||
res = Py_None;
|
||||
Py_INCREF(res);
|
||||
|
||||
self->decode = decode;
|
||||
gettimeofday(&self->last_io, NULL);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
#define psyco_repl_curs_consume_stream_doc \
|
||||
"consume_stream(consumer, keepalive_interval=10) -- Consume replication stream."
|
||||
|
||||
static PyObject *
|
||||
psyco_repl_curs_consume_stream(replicationCursorObject *self,
|
||||
PyObject *args, PyObject *kwargs)
|
||||
{
|
||||
cursorObject *curs = &self->cur;
|
||||
PyObject *consume = NULL, *res = NULL;
|
||||
double keepalive_interval = 10;
|
||||
static char *kwlist[] = {"consume", "keepalive_interval", NULL};
|
||||
|
||||
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|id", kwlist,
|
||||
&consume, &keepalive_interval)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
EXC_IF_CURS_CLOSED(curs);
|
||||
EXC_IF_CURS_ASYNC(curs, consume_stream);
|
||||
EXC_IF_GREEN(consume_stream);
|
||||
EXC_IF_TPC_PREPARED(self->cur.conn, consume_stream);
|
||||
|
||||
Dprintf("psyco_repl_curs_consume_stream");
|
||||
|
||||
if (keepalive_interval < 1.0) {
|
||||
psyco_set_error(ProgrammingError, curs, "keepalive_interval must be >= 1 (sec)");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (self->consuming) {
|
||||
PyErr_SetString(ProgrammingError,
|
||||
"consume_stream cannot be used when already in the consume loop");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (curs->pgres == NULL || PQresultStatus(curs->pgres) != PGRES_COPY_BOTH) {
|
||||
PyErr_SetString(ProgrammingError,
|
||||
"consume_stream: not replicating, call start_replication first");
|
||||
return NULL;
|
||||
}
|
||||
CLEARPGRES(curs->pgres);
|
||||
|
||||
self->consuming = 1;
|
||||
|
||||
if (pq_copy_both(self, consume, keepalive_interval) >= 0) {
|
||||
res = Py_None;
|
||||
Py_INCREF(res);
|
||||
}
|
||||
|
||||
self->consuming = 0;
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
#define psyco_repl_curs_read_message_doc \
|
||||
"read_message() -- Try reading a replication message from the server (non-blocking)."
|
||||
|
||||
static PyObject *
|
||||
psyco_repl_curs_read_message(replicationCursorObject *self)
|
||||
{
|
||||
cursorObject *curs = &self->cur;
|
||||
replicationMessageObject *msg = NULL;
|
||||
|
||||
EXC_IF_CURS_CLOSED(curs);
|
||||
EXC_IF_GREEN(read_message);
|
||||
EXC_IF_TPC_PREPARED(self->cur.conn, read_message);
|
||||
|
||||
if (pq_read_replication_message(self, &msg) < 0) {
|
||||
return NULL;
|
||||
}
|
||||
if (msg) {
|
||||
return (PyObject *)msg;
|
||||
}
|
||||
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
#define psyco_repl_curs_send_feedback_doc \
|
||||
"send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) -- Try sending a replication feedback message to the server and optionally request a reply."
|
||||
|
||||
static PyObject *
|
||||
psyco_repl_curs_send_feedback(replicationCursorObject *self,
|
||||
PyObject *args, PyObject *kwargs)
|
||||
{
|
||||
cursorObject *curs = &self->cur;
|
||||
XLogRecPtr write_lsn = 0, flush_lsn = 0, apply_lsn = 0;
|
||||
int reply = 0;
|
||||
static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", NULL};
|
||||
|
||||
EXC_IF_CURS_CLOSED(curs);
|
||||
|
||||
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|KKKi", kwlist,
|
||||
&write_lsn, &flush_lsn, &apply_lsn, &reply)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (write_lsn > self->write_lsn)
|
||||
self->write_lsn = write_lsn;
|
||||
|
||||
if (flush_lsn > self->flush_lsn)
|
||||
self->flush_lsn = flush_lsn;
|
||||
|
||||
if (apply_lsn > self->apply_lsn)
|
||||
self->apply_lsn = apply_lsn;
|
||||
|
||||
if (pq_send_replication_feedback(self, reply) < 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
|
||||
RAISES_NEG int
|
||||
psyco_repl_curs_datetime_init(void)
|
||||
{
|
||||
Dprintf("psyco_repl_curs_datetime_init: datetime init");
|
||||
|
||||
PyDateTime_IMPORT;
|
||||
|
||||
if (!PyDateTimeAPI) {
|
||||
PyErr_SetString(PyExc_ImportError, "datetime initialization failed");
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
#define psyco_repl_curs_io_timestamp_doc \
|
||||
"io_timestamp -- the timestamp of latest IO with the server"
|
||||
|
||||
static PyObject *
|
||||
psyco_repl_curs_get_io_timestamp(replicationCursorObject *self)
|
||||
{
|
||||
cursorObject *curs = &self->cur;
|
||||
PyObject *tval, *res = NULL;
|
||||
double seconds;
|
||||
|
||||
EXC_IF_CURS_CLOSED(curs);
|
||||
|
||||
seconds = self->last_io.tv_sec + self->last_io.tv_usec / 1.0e6;
|
||||
|
||||
tval = Py_BuildValue("(d)", seconds);
|
||||
if (tval) {
|
||||
res = PyDateTime_FromTimestamp(tval);
|
||||
Py_DECREF(tval);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
/* object method list */
|
||||
|
||||
static struct PyMethodDef replicationCursorObject_methods[] = {
|
||||
{"start_replication_expert", (PyCFunction)psyco_repl_curs_start_replication_expert,
|
||||
METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_start_replication_expert_doc},
|
||||
{"consume_stream", (PyCFunction)psyco_repl_curs_consume_stream,
|
||||
METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_consume_stream_doc},
|
||||
{"read_message", (PyCFunction)psyco_repl_curs_read_message,
|
||||
METH_NOARGS, psyco_repl_curs_read_message_doc},
|
||||
{"send_feedback", (PyCFunction)psyco_repl_curs_send_feedback,
|
||||
METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_send_feedback_doc},
|
||||
{NULL}
|
||||
};
|
||||
|
||||
/* object calculated member list */
|
||||
|
||||
static struct PyGetSetDef replicationCursorObject_getsets[] = {
|
||||
{ "io_timestamp",
|
||||
(getter)psyco_repl_curs_get_io_timestamp, NULL,
|
||||
psyco_repl_curs_io_timestamp_doc, NULL },
|
||||
{NULL}
|
||||
};
|
||||
|
||||
static int
|
||||
replicationCursor_init(PyObject *obj, PyObject *args, PyObject *kwargs)
|
||||
{
|
||||
replicationCursorObject *self = (replicationCursorObject *)obj;
|
||||
|
||||
self->consuming = 0;
|
||||
self->decode = 0;
|
||||
|
||||
self->write_lsn = 0;
|
||||
self->flush_lsn = 0;
|
||||
self->apply_lsn = 0;
|
||||
|
||||
return cursorType.tp_init(obj, args, kwargs);
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
replicationCursor_repr(replicationCursorObject *self)
|
||||
{
|
||||
return PyString_FromFormat(
|
||||
"<ReplicationCursor object at %p; closed: %d>", self, self->cur.closed);
|
||||
}
|
||||
|
||||
|
||||
/* object type */
|
||||
|
||||
#define replicationCursorType_doc \
|
||||
"A database replication cursor."
|
||||
|
||||
PyTypeObject replicationCursorType = {
|
||||
PyVarObject_HEAD_INIT(NULL, 0)
|
||||
"psycopg2.extensions.ReplicationCursor",
|
||||
sizeof(replicationCursorObject), 0,
|
||||
0, /*tp_dealloc*/
|
||||
0, /*tp_print*/
|
||||
0, /*tp_getattr*/
|
||||
0, /*tp_setattr*/
|
||||
0, /*tp_compare*/
|
||||
(reprfunc)replicationCursor_repr, /*tp_repr*/
|
||||
0, /*tp_as_number*/
|
||||
0, /*tp_as_sequence*/
|
||||
0, /*tp_as_mapping*/
|
||||
0, /*tp_hash*/
|
||||
0, /*tp_call*/
|
||||
(reprfunc)replicationCursor_repr, /*tp_str*/
|
||||
0, /*tp_getattro*/
|
||||
0, /*tp_setattro*/
|
||||
0, /*tp_as_buffer*/
|
||||
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER |
|
||||
Py_TPFLAGS_HAVE_GC, /*tp_flags*/
|
||||
replicationCursorType_doc, /*tp_doc*/
|
||||
0, /*tp_traverse*/
|
||||
0, /*tp_clear*/
|
||||
0, /*tp_richcompare*/
|
||||
0, /*tp_weaklistoffset*/
|
||||
0, /*tp_iter*/
|
||||
0, /*tp_iternext*/
|
||||
replicationCursorObject_methods, /*tp_methods*/
|
||||
0, /*tp_members*/
|
||||
replicationCursorObject_getsets, /*tp_getset*/
|
||||
&cursorType, /*tp_base*/
|
||||
0, /*tp_dict*/
|
||||
0, /*tp_descr_get*/
|
||||
0, /*tp_descr_set*/
|
||||
0, /*tp_dictoffset*/
|
||||
replicationCursor_init, /*tp_init*/
|
||||
0, /*tp_alloc*/
|
||||
0, /*tp_new*/
|
||||
};
|
57
psycopg/replication_message.h
Normal file
57
psycopg/replication_message.h
Normal file
|
@ -0,0 +1,57 @@
|
|||
/* replication_message.h - definition for the psycopg ReplicationMessage type
|
||||
*
|
||||
* Copyright (C) 2003-2015 Federico Di Gregorio <fog@debian.org>
|
||||
*
|
||||
* This file is part of psycopg.
|
||||
*
|
||||
* psycopg2 is free software: you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Lesser General Public License as published
|
||||
* by the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* In addition, as a special exception, the copyright holders give
|
||||
* permission to link this program with the OpenSSL library (or with
|
||||
* modified versions of OpenSSL that use the same license as OpenSSL),
|
||||
* and distribute linked combinations including the two.
|
||||
*
|
||||
* You must obey the GNU Lesser General Public License in all respects for
|
||||
* all of the code used other than OpenSSL.
|
||||
*
|
||||
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
* License for more details.
|
||||
*/
|
||||
|
||||
#ifndef PSYCOPG_REPLICATION_MESSAGE_H
|
||||
#define PSYCOPG_REPLICATION_MESSAGE_H 1
|
||||
|
||||
#include "cursor.h"
|
||||
#include "libpq_support.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
extern HIDDEN PyTypeObject replicationMessageType;
|
||||
|
||||
/* the typedef is forward-declared in psycopg.h */
|
||||
struct replicationMessageObject {
|
||||
PyObject_HEAD
|
||||
|
||||
cursorObject *cursor;
|
||||
PyObject *payload;
|
||||
|
||||
int data_size;
|
||||
XLogRecPtr data_start;
|
||||
XLogRecPtr wal_end;
|
||||
int64_t send_time;
|
||||
};
|
||||
|
||||
RAISES_NEG int psyco_replmsg_datetime_init(void);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /* !defined(PSYCOPG_REPLICATION_MESSAGE_H) */
|
191
psycopg/replication_message_type.c
Normal file
191
psycopg/replication_message_type.c
Normal file
|
@ -0,0 +1,191 @@
|
|||
/* replication_message_type.c - python interface to ReplcationMessage objects
|
||||
*
|
||||
* Copyright (C) 2003-2015 Federico Di Gregorio <fog@debian.org>
|
||||
*
|
||||
* This file is part of psycopg.
|
||||
*
|
||||
* psycopg2 is free software: you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Lesser General Public License as published
|
||||
* by the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* In addition, as a special exception, the copyright holders give
|
||||
* permission to link this program with the OpenSSL library (or with
|
||||
* modified versions of OpenSSL that use the same license as OpenSSL),
|
||||
* and distribute linked combinations including the two.
|
||||
*
|
||||
* You must obey the GNU Lesser General Public License in all respects for
|
||||
* all of the code used other than OpenSSL.
|
||||
*
|
||||
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
* License for more details.
|
||||
*/
|
||||
|
||||
#define PSYCOPG_MODULE
|
||||
#include "psycopg/psycopg.h"
|
||||
|
||||
#include "psycopg/replication_message.h"
|
||||
|
||||
#include "datetime.h"
|
||||
|
||||
RAISES_NEG int
|
||||
psyco_replmsg_datetime_init(void)
|
||||
{
|
||||
Dprintf("psyco_replmsg_datetime_init: datetime init");
|
||||
|
||||
PyDateTime_IMPORT;
|
||||
|
||||
if (!PyDateTimeAPI) {
|
||||
PyErr_SetString(PyExc_ImportError, "datetime initialization failed");
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static PyObject *
|
||||
replmsg_repr(replicationMessageObject *self)
|
||||
{
|
||||
return PyString_FromFormat(
|
||||
"<ReplicationMessage object at %p; data_size: %d; "
|
||||
"data_start: "XLOGFMTSTR"; wal_end: "XLOGFMTSTR"; send_time: %ld>",
|
||||
self, self->data_size, XLOGFMTARGS(self->data_start), XLOGFMTARGS(self->wal_end),
|
||||
(long int)self->send_time);
|
||||
}
|
||||
|
||||
static int
|
||||
replmsg_init(PyObject *obj, PyObject *args, PyObject *kwargs)
|
||||
{
|
||||
replicationMessageObject *self = (replicationMessageObject*) obj;
|
||||
|
||||
if (!PyArg_ParseTuple(args, "O!O", &cursorType, &self->cursor, &self->payload))
|
||||
return -1;
|
||||
Py_XINCREF(self->cursor);
|
||||
Py_XINCREF(self->payload);
|
||||
|
||||
self->data_size = 0;
|
||||
self->data_start = 0;
|
||||
self->wal_end = 0;
|
||||
self->send_time = 0;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
replmsg_traverse(replicationMessageObject *self, visitproc visit, void *arg)
|
||||
{
|
||||
Py_VISIT((PyObject* )self->cursor);
|
||||
Py_VISIT(self->payload);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
replmsg_clear(replicationMessageObject *self)
|
||||
{
|
||||
Py_CLEAR(self->cursor);
|
||||
Py_CLEAR(self->payload);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
replmsg_dealloc(PyObject* obj)
|
||||
{
|
||||
PyObject_GC_UnTrack(obj);
|
||||
|
||||
replmsg_clear((replicationMessageObject*) obj);
|
||||
|
||||
Py_TYPE(obj)->tp_free(obj);
|
||||
}
|
||||
|
||||
#define psyco_replmsg_send_time_doc \
|
||||
"send_time - Timestamp of the replication message departure from the server."
|
||||
|
||||
static PyObject *
|
||||
psyco_replmsg_get_send_time(replicationMessageObject *self)
|
||||
{
|
||||
PyObject *tval, *res = NULL;
|
||||
double t;
|
||||
|
||||
t = (double)self->send_time / USECS_PER_SEC +
|
||||
((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
|
||||
|
||||
tval = Py_BuildValue("(d)", t);
|
||||
if (tval) {
|
||||
res = PyDateTime_FromTimestamp(tval);
|
||||
Py_DECREF(tval);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
#define OFFSETOF(x) offsetof(replicationMessageObject, x)
|
||||
|
||||
/* object member list */
|
||||
|
||||
static struct PyMemberDef replicationMessageObject_members[] = {
|
||||
{"cursor", T_OBJECT, OFFSETOF(cursor), READONLY,
|
||||
"Related ReplcationCursor object."},
|
||||
{"payload", T_OBJECT, OFFSETOF(payload), READONLY,
|
||||
"The actual message data."},
|
||||
{"data_size", T_INT, OFFSETOF(data_size), READONLY,
|
||||
"Raw size of the message data in bytes."},
|
||||
{"data_start", T_ULONGLONG, OFFSETOF(data_start), READONLY,
|
||||
"LSN position of the start of this message."},
|
||||
{"wal_end", T_ULONGLONG, OFFSETOF(wal_end), READONLY,
|
||||
"LSN position of the current end of WAL on the server."},
|
||||
{NULL}
|
||||
};
|
||||
|
||||
static struct PyGetSetDef replicationMessageObject_getsets[] = {
|
||||
{ "send_time", (getter)psyco_replmsg_get_send_time, NULL,
|
||||
psyco_replmsg_send_time_doc, NULL },
|
||||
{NULL}
|
||||
};
|
||||
|
||||
/* object type */
|
||||
|
||||
#define replicationMessageType_doc \
|
||||
"A replication protocol message."
|
||||
|
||||
PyTypeObject replicationMessageType = {
|
||||
PyVarObject_HEAD_INIT(NULL, 0)
|
||||
"psycopg2.extensions.ReplicationMessage",
|
||||
sizeof(replicationMessageObject), 0,
|
||||
replmsg_dealloc, /*tp_dealloc*/
|
||||
0, /*tp_print*/
|
||||
0, /*tp_getattr*/
|
||||
0, /*tp_setattr*/
|
||||
0, /*tp_compare*/
|
||||
(reprfunc)replmsg_repr, /*tp_repr*/
|
||||
0, /*tp_as_number*/
|
||||
0, /*tp_as_sequence*/
|
||||
0, /*tp_as_mapping*/
|
||||
0, /*tp_hash */
|
||||
0, /*tp_call*/
|
||||
0, /*tp_str*/
|
||||
0, /*tp_getattro*/
|
||||
0, /*tp_setattro*/
|
||||
0, /*tp_as_buffer*/
|
||||
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
|
||||
Py_TPFLAGS_HAVE_GC, /*tp_flags*/
|
||||
replicationMessageType_doc, /*tp_doc*/
|
||||
(traverseproc)replmsg_traverse, /*tp_traverse*/
|
||||
(inquiry)replmsg_clear, /*tp_clear*/
|
||||
0, /*tp_richcompare*/
|
||||
0, /*tp_weaklistoffset*/
|
||||
0, /*tp_iter*/
|
||||
0, /*tp_iternext*/
|
||||
0, /*tp_methods*/
|
||||
replicationMessageObject_members, /*tp_members*/
|
||||
replicationMessageObject_getsets, /*tp_getset*/
|
||||
0, /*tp_base*/
|
||||
0, /*tp_dict*/
|
||||
0, /*tp_descr_get*/
|
||||
0, /*tp_descr_set*/
|
||||
0, /*tp_dictoffset*/
|
||||
replmsg_init, /*tp_init*/
|
||||
0, /*tp_alloc*/
|
||||
PyType_GenericNew, /*tp_new*/
|
||||
};
|
76
psycopg/win32_support.c
Normal file
76
psycopg/win32_support.c
Normal file
|
@ -0,0 +1,76 @@
|
|||
/* win32_support.c - emulate some functions missing on Win32
|
||||
*
|
||||
* Copyright (C) 2003-2015 Federico Di Gregorio <fog@debian.org>
|
||||
*
|
||||
* This file is part of psycopg.
|
||||
*
|
||||
* psycopg2 is free software: you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Lesser General Public License as published
|
||||
* by the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* In addition, as a special exception, the copyright holders give
|
||||
* permission to link this program with the OpenSSL library (or with
|
||||
* modified versions of OpenSSL that use the same license as OpenSSL),
|
||||
* and distribute linked combinations including the two.
|
||||
*
|
||||
* You must obey the GNU Lesser General Public License in all respects for
|
||||
* all of the code used other than OpenSSL.
|
||||
*
|
||||
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
* License for more details.
|
||||
*/
|
||||
|
||||
#define PSYCOPG_MODULE
|
||||
#include "psycopg/psycopg.h"
|
||||
|
||||
#include "psycopg/win32_support.h"
|
||||
|
||||
#ifdef _WIN32
|
||||
|
||||
#ifndef __MINGW32__
|
||||
/* millisecond-precision port of gettimeofday for Win32, taken from
|
||||
src/port/gettimeofday.c in PostgreSQL core */
|
||||
|
||||
/* FILETIME of Jan 1 1970 00:00:00. */
|
||||
static const unsigned __int64 epoch = 116444736000000000ULL;
|
||||
|
||||
/*
|
||||
* timezone information is stored outside the kernel so tzp isn't used anymore.
|
||||
*
|
||||
* Note: this function is not for Win32 high precision timing purpose. See
|
||||
* elapsed_time().
|
||||
*/
|
||||
int
|
||||
gettimeofday(struct timeval * tp, struct timezone * tzp)
|
||||
{
|
||||
FILETIME file_time;
|
||||
SYSTEMTIME system_time;
|
||||
ULARGE_INTEGER ularge;
|
||||
|
||||
GetSystemTime(&system_time);
|
||||
SystemTimeToFileTime(&system_time, &file_time);
|
||||
ularge.LowPart = file_time.dwLowDateTime;
|
||||
ularge.HighPart = file_time.dwHighDateTime;
|
||||
|
||||
tp->tv_sec = (long) ((ularge.QuadPart - epoch) / 10000000L);
|
||||
tp->tv_usec = (long) (system_time.wMilliseconds * 1000);
|
||||
|
||||
return 0;
|
||||
}
|
||||
#endif /* !defined(__MINGW32__) */
|
||||
|
||||
/* timersub is missing on mingw */
|
||||
void
|
||||
timersub(struct timeval *a, struct timeval *b, struct timeval *c)
|
||||
{
|
||||
c->tv_sec = a->tv_sec - b->tv_sec;
|
||||
c->tv_usec = a->tv_usec - b->tv_usec;
|
||||
if (tv_usec < 0) {
|
||||
c->tv_usec += 1000000;
|
||||
c->tv_sec -= 1;
|
||||
}
|
||||
}
|
||||
#endif /* defined(_WIN32) */
|
40
psycopg/win32_support.h
Normal file
40
psycopg/win32_support.h
Normal file
|
@ -0,0 +1,40 @@
|
|||
/* win32_support.h - definitions for win32_support.c
|
||||
*
|
||||
* Copyright (C) 2003-2015 Federico Di Gregorio <fog@debian.org>
|
||||
*
|
||||
* This file is part of psycopg.
|
||||
*
|
||||
* psycopg2 is free software: you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Lesser General Public License as published
|
||||
* by the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* In addition, as a special exception, the copyright holders give
|
||||
* permission to link this program with the OpenSSL library (or with
|
||||
* modified versions of OpenSSL that use the same license as OpenSSL),
|
||||
* and distribute linked combinations including the two.
|
||||
*
|
||||
* You must obey the GNU Lesser General Public License in all respects for
|
||||
* all of the code used other than OpenSSL.
|
||||
*
|
||||
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
* License for more details.
|
||||
*/
|
||||
#ifndef PSYCOPG_WIN32_SUPPORT_H
|
||||
#define PSYCOPG_WIN32_SUPPORT_H 1
|
||||
|
||||
#include "psycopg/config.h"
|
||||
|
||||
#include <sys/time.h>
|
||||
|
||||
#ifdef _WIN32
|
||||
#ifndef __MINGW32__
|
||||
HIDDEN int gettimeofday(struct timeval * tp, struct timezone * tzp);
|
||||
#endif
|
||||
|
||||
HIDDEN void timersub(struct timeval *a, struct timeval *b, struct timeval *c);
|
||||
#endif
|
||||
|
||||
#endif /* !defined(PSYCOPG_WIN32_SUPPORT_H) */
|
|
@ -85,14 +85,19 @@
|
|||
<None Include="psycopg\config.h" />
|
||||
<None Include="psycopg\connection.h" />
|
||||
<None Include="psycopg\cursor.h" />
|
||||
<None Include="psycopg\libpq_support.h" />
|
||||
<None Include="psycopg\microprotocols.h" />
|
||||
<None Include="psycopg\microprotocols_proto.h" />
|
||||
<None Include="psycopg\pgtypes.h" />
|
||||
<None Include="psycopg\pqpath.h" />
|
||||
<None Include="psycopg\psycopg.h" />
|
||||
<None Include="psycopg\python.h" />
|
||||
<None Include="psycopg\replication_connection.h" />
|
||||
<None Include="psycopg\replication_cursor.h" />
|
||||
<None Include="psycopg\replication_message.h" />
|
||||
<None Include="psycopg\typecast.h" />
|
||||
<None Include="psycopg\typecast_binary.h" />
|
||||
<None Include="psycopg\win32_support.h" />
|
||||
<None Include="scripts\buildtypes.py" />
|
||||
<None Include="scripts\maketypes.sh" />
|
||||
<None Include="ZPsycopgDA\dtml\add.dtml" />
|
||||
|
@ -124,6 +129,7 @@
|
|||
<None Include="tests\test_bugX000.py" />
|
||||
<None Include="tests\test_types_extras.py" />
|
||||
<None Include="tests\test_connection.py" />
|
||||
<None Include="tests\test_replication.py" />
|
||||
<None Include="tests\test_dates.py" />
|
||||
<None Include="tests\test_lobject.py" />
|
||||
<None Include="tests\test_quote.py" />
|
||||
|
@ -217,10 +223,14 @@
|
|||
<Compile Include="psycopg\connection_type.c" />
|
||||
<Compile Include="psycopg\cursor_int.c" />
|
||||
<Compile Include="psycopg\cursor_type.c" />
|
||||
<Compile Include="psycopg\libpq_support.c" />
|
||||
<Compile Include="psycopg\microprotocols.c" />
|
||||
<Compile Include="psycopg\microprotocols_proto.c" />
|
||||
<Compile Include="psycopg\pqpath.c" />
|
||||
<Compile Include="psycopg\psycopgmodule.c" />
|
||||
<Compile Include="psycopg\replication_connection_type.c" />
|
||||
<Compile Include="psycopg\replication_cursor_type.c" />
|
||||
<Compile Include="psycopg\replication_message_type.c" />
|
||||
<Compile Include="psycopg\typecast.c" />
|
||||
<Compile Include="psycopg\typecast_array.c" />
|
||||
<Compile Include="psycopg\typecast_basic.c" />
|
||||
|
@ -229,6 +239,7 @@
|
|||
<Compile Include="psycopg\typecast_datetime.c" />
|
||||
<Compile Include="psycopg\typecast_mxdatetime.c" />
|
||||
<Compile Include="psycopg\utils.c" />
|
||||
<Compile Include="psycopg\win32_support.c" />
|
||||
<Compile Include="psycopg\lobject_int.c" />
|
||||
<Compile Include="psycopg\lobject_type.c" />
|
||||
<Compile Include="psycopg\adapter_pfloat.c" />
|
||||
|
@ -251,4 +262,4 @@
|
|||
</Properties>
|
||||
</MonoDevelop>
|
||||
</ProjectExtensions>
|
||||
</Project>
|
||||
</Project>
|
||||
|
|
8
setup.py
8
setup.py
|
@ -471,9 +471,13 @@ data_files = []
|
|||
sources = [
|
||||
'psycopgmodule.c',
|
||||
'green.c', 'pqpath.c', 'utils.c', 'bytes_format.c',
|
||||
'libpq_support.c', 'win32_support.c',
|
||||
|
||||
'connection_int.c', 'connection_type.c',
|
||||
'cursor_int.c', 'cursor_type.c',
|
||||
'replication_connection_type.c',
|
||||
'replication_cursor_type.c',
|
||||
'replication_message_type.c',
|
||||
'diagnostics_type.c', 'error_type.c',
|
||||
'lobject_int.c', 'lobject_type.c',
|
||||
'notify_type.c', 'xid_type.c',
|
||||
|
@ -489,7 +493,11 @@ depends = [
|
|||
# headers
|
||||
'config.h', 'pgtypes.h', 'psycopg.h', 'python.h', 'connection.h',
|
||||
'cursor.h', 'diagnostics.h', 'error.h', 'green.h', 'lobject.h',
|
||||
'replication_connection.h',
|
||||
'replication_cursor.h',
|
||||
'replication_message.h',
|
||||
'notify.h', 'pqpath.h', 'xid.h',
|
||||
'libpq_support.h', 'win32_support.h',
|
||||
|
||||
'adapter_asis.h', 'adapter_binary.h', 'adapter_datetime.h',
|
||||
'adapter_list.h', 'adapter_pboolean.h', 'adapter_pdecimal.h',
|
||||
|
|
|
@ -31,6 +31,7 @@ import test_bugX000
|
|||
import test_bug_gc
|
||||
import test_cancel
|
||||
import test_connection
|
||||
import test_replication
|
||||
import test_copy
|
||||
import test_cursor
|
||||
import test_dates
|
||||
|
@ -69,6 +70,7 @@ def test_suite():
|
|||
suite.addTest(test_bug_gc.test_suite())
|
||||
suite.addTest(test_cancel.test_suite())
|
||||
suite.addTest(test_connection.test_suite())
|
||||
suite.addTest(test_replication.test_suite())
|
||||
suite.addTest(test_copy.test_suite())
|
||||
suite.addTest(test_cursor.test_suite())
|
||||
suite.addTest(test_dates.test_suite())
|
||||
|
|
|
@ -29,7 +29,6 @@ import psycopg2
|
|||
from psycopg2 import extensions
|
||||
|
||||
import time
|
||||
import select
|
||||
import StringIO
|
||||
|
||||
from testutils import ConnectingTestCase
|
||||
|
@ -66,21 +65,6 @@ class AsyncTests(ConnectingTestCase):
|
|||
)''')
|
||||
self.wait(curs)
|
||||
|
||||
def wait(self, cur_or_conn):
|
||||
pollable = cur_or_conn
|
||||
if not hasattr(pollable, 'poll'):
|
||||
pollable = cur_or_conn.connection
|
||||
while True:
|
||||
state = pollable.poll()
|
||||
if state == psycopg2.extensions.POLL_OK:
|
||||
break
|
||||
elif state == psycopg2.extensions.POLL_READ:
|
||||
select.select([pollable], [], [], 10)
|
||||
elif state == psycopg2.extensions.POLL_WRITE:
|
||||
select.select([], [pollable], [], 10)
|
||||
else:
|
||||
raise Exception("Unexpected result from poll: %r", state)
|
||||
|
||||
def test_connection_setup(self):
|
||||
cur = self.conn.cursor()
|
||||
sync_cur = self.sync_conn.cursor()
|
||||
|
|
|
@ -1246,18 +1246,6 @@ class AutocommitTests(ConnectingTestCase):
|
|||
self.assertEqual(cur.fetchone()[0], 'on')
|
||||
|
||||
|
||||
class ReplicationTest(ConnectingTestCase):
|
||||
@skip_before_postgres(9, 0)
|
||||
def test_replication_not_supported(self):
|
||||
conn = self.repl_connect()
|
||||
if conn is None:
|
||||
return
|
||||
cur = conn.cursor()
|
||||
f = StringIO()
|
||||
self.assertRaises(psycopg2.NotSupportedError,
|
||||
cur.copy_expert, "START_REPLICATION 0/0", f)
|
||||
|
||||
|
||||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
|
|
@ -116,6 +116,12 @@ class ConnectTestCase(unittest.TestCase):
|
|||
self.assertEqual(self.args[1], None)
|
||||
self.assert_(self.args[2])
|
||||
|
||||
def test_int_port_param(self):
|
||||
psycopg2.connect(database='sony', port=6543)
|
||||
dsn = " %s " % self.args[0]
|
||||
self.assertIn(" dbname=sony ", dsn)
|
||||
self.assertIn(" port=6543 ", dsn)
|
||||
|
||||
def test_empty_param(self):
|
||||
psycopg2.connect(database='sony', password='')
|
||||
self.assertDsnEqual(self.args[0], "dbname=sony password=''")
|
||||
|
|
200
tests/test_replication.py
Normal file
200
tests/test_replication.py
Normal file
|
@ -0,0 +1,200 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
# test_replication.py - unit test for replication protocol
|
||||
#
|
||||
# Copyright (C) 2015 Daniele Varrazzo <daniele.varrazzo@gmail.com>
|
||||
#
|
||||
# psycopg2 is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Lesser General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# In addition, as a special exception, the copyright holders give
|
||||
# permission to link this program with the OpenSSL library (or with
|
||||
# modified versions of OpenSSL that use the same license as OpenSSL),
|
||||
# and distribute linked combinations including the two.
|
||||
#
|
||||
# You must obey the GNU Lesser General Public License in all respects for
|
||||
# all of the code used other than OpenSSL.
|
||||
#
|
||||
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
# License for more details.
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.extensions
|
||||
from psycopg2.extras import PhysicalReplicationConnection, LogicalReplicationConnection
|
||||
from psycopg2.extras import StopReplication
|
||||
|
||||
import testconfig
|
||||
from testutils import unittest
|
||||
from testutils import skip_before_postgres
|
||||
from testutils import ConnectingTestCase
|
||||
|
||||
|
||||
class ReplicationTestCase(ConnectingTestCase):
|
||||
def setUp(self):
|
||||
if not testconfig.repl_dsn:
|
||||
self.skipTest("replication tests disabled by default")
|
||||
|
||||
super(ReplicationTestCase, self).setUp()
|
||||
|
||||
self.slot = testconfig.repl_slot
|
||||
self._slots = []
|
||||
|
||||
def tearDown(self):
|
||||
# first close all connections, as they might keep the slot(s) active
|
||||
super(ReplicationTestCase, self).tearDown()
|
||||
|
||||
import time
|
||||
time.sleep(0.025) # sometimes the slot is still active, wait a little
|
||||
|
||||
if self._slots:
|
||||
kill_conn = self.connect()
|
||||
if kill_conn:
|
||||
kill_cur = kill_conn.cursor()
|
||||
for slot in self._slots:
|
||||
kill_cur.execute("SELECT pg_drop_replication_slot(%s)", (slot,))
|
||||
kill_conn.commit()
|
||||
kill_conn.close()
|
||||
|
||||
def create_replication_slot(self, cur, slot_name=testconfig.repl_slot, **kwargs):
|
||||
cur.create_replication_slot(slot_name, **kwargs)
|
||||
self._slots.append(slot_name)
|
||||
|
||||
def drop_replication_slot(self, cur, slot_name=testconfig.repl_slot):
|
||||
cur.drop_replication_slot(slot_name)
|
||||
self._slots.remove(slot_name)
|
||||
|
||||
# generate some events for our replication stream
|
||||
def make_replication_events(self):
|
||||
conn = self.connect()
|
||||
if conn is None: return
|
||||
cur = conn.cursor()
|
||||
|
||||
try:
|
||||
cur.execute("DROP TABLE dummy1")
|
||||
except psycopg2.ProgrammingError:
|
||||
conn.rollback()
|
||||
cur.execute("CREATE TABLE dummy1 AS SELECT * FROM generate_series(1, 5) AS id")
|
||||
conn.commit()
|
||||
|
||||
|
||||
class ReplicationTest(ReplicationTestCase):
|
||||
@skip_before_postgres(9, 0)
|
||||
def test_physical_replication_connection(self):
|
||||
conn = self.repl_connect(connection_factory=PhysicalReplicationConnection)
|
||||
if conn is None: return
|
||||
cur = conn.cursor()
|
||||
cur.execute("IDENTIFY_SYSTEM")
|
||||
cur.fetchall()
|
||||
|
||||
@skip_before_postgres(9, 4)
|
||||
def test_logical_replication_connection(self):
|
||||
conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
|
||||
if conn is None: return
|
||||
cur = conn.cursor()
|
||||
cur.execute("IDENTIFY_SYSTEM")
|
||||
cur.fetchall()
|
||||
|
||||
@skip_before_postgres(9, 4) # slots require 9.4
|
||||
def test_create_replication_slot(self):
|
||||
conn = self.repl_connect(connection_factory=PhysicalReplicationConnection)
|
||||
if conn is None: return
|
||||
cur = conn.cursor()
|
||||
|
||||
self.create_replication_slot(cur)
|
||||
self.assertRaises(psycopg2.ProgrammingError, self.create_replication_slot, cur)
|
||||
|
||||
@skip_before_postgres(9, 4) # slots require 9.4
|
||||
def test_start_on_missing_replication_slot(self):
|
||||
conn = self.repl_connect(connection_factory=PhysicalReplicationConnection)
|
||||
if conn is None: return
|
||||
cur = conn.cursor()
|
||||
|
||||
self.assertRaises(psycopg2.ProgrammingError, cur.start_replication, self.slot)
|
||||
|
||||
self.create_replication_slot(cur)
|
||||
cur.start_replication(self.slot)
|
||||
|
||||
@skip_before_postgres(9, 4) # slots require 9.4
|
||||
def test_start_and_recover_from_error(self):
|
||||
conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
|
||||
if conn is None: return
|
||||
cur = conn.cursor()
|
||||
|
||||
self.create_replication_slot(cur, output_plugin='test_decoding')
|
||||
|
||||
# try with invalid options
|
||||
cur.start_replication(slot_name=self.slot, options={'invalid_param': 'value'})
|
||||
def consume(msg):
|
||||
pass
|
||||
# we don't see the error from the server before we try to read the data
|
||||
self.assertRaises(psycopg2.DataError, cur.consume_stream, consume)
|
||||
|
||||
# try with correct command
|
||||
cur.start_replication(slot_name=self.slot)
|
||||
|
||||
@skip_before_postgres(9, 4) # slots require 9.4
|
||||
def test_stop_replication(self):
|
||||
conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
|
||||
if conn is None: return
|
||||
cur = conn.cursor()
|
||||
|
||||
self.create_replication_slot(cur, output_plugin='test_decoding')
|
||||
|
||||
self.make_replication_events()
|
||||
|
||||
cur.start_replication(self.slot)
|
||||
def consume(msg):
|
||||
raise StopReplication()
|
||||
self.assertRaises(StopReplication, cur.consume_stream, consume)
|
||||
|
||||
|
||||
class AsyncReplicationTest(ReplicationTestCase):
|
||||
@skip_before_postgres(9, 4) # slots require 9.4
|
||||
def test_async_replication(self):
|
||||
conn = self.repl_connect(connection_factory=LogicalReplicationConnection, async=1)
|
||||
if conn is None: return
|
||||
self.wait(conn)
|
||||
cur = conn.cursor()
|
||||
|
||||
self.create_replication_slot(cur, output_plugin='test_decoding')
|
||||
self.wait(cur)
|
||||
|
||||
cur.start_replication(self.slot)
|
||||
self.wait(cur)
|
||||
|
||||
self.make_replication_events()
|
||||
|
||||
self.msg_count = 0
|
||||
def consume(msg):
|
||||
# just check the methods
|
||||
log = "%s: %s" % (cur.io_timestamp, repr(msg))
|
||||
|
||||
self.msg_count += 1
|
||||
if self.msg_count > 3:
|
||||
cur.send_feedback(reply=True)
|
||||
raise StopReplication()
|
||||
|
||||
cur.send_feedback(flush_lsn=msg.data_start)
|
||||
|
||||
# cannot be used in asynchronous mode
|
||||
self.assertRaises(psycopg2.ProgrammingError, cur.consume_stream, consume)
|
||||
|
||||
def process_stream():
|
||||
from select import select
|
||||
while True:
|
||||
msg = cur.read_message()
|
||||
if msg:
|
||||
consume(msg)
|
||||
else:
|
||||
select([cur], [], [])
|
||||
self.assertRaises(StopReplication, process_stream)
|
||||
|
||||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
|
@ -7,8 +7,6 @@ dbhost = os.environ.get('PSYCOPG2_TESTDB_HOST', None)
|
|||
dbport = os.environ.get('PSYCOPG2_TESTDB_PORT', None)
|
||||
dbuser = os.environ.get('PSYCOPG2_TESTDB_USER', None)
|
||||
dbpass = os.environ.get('PSYCOPG2_TESTDB_PASSWORD', None)
|
||||
repl_dsn = os.environ.get('PSYCOPG2_TEST_REPL_DSN',
|
||||
"dbname=psycopg2_test replication=1")
|
||||
|
||||
# Check if we want to test psycopg's green path.
|
||||
green = os.environ.get('PSYCOPG2_TEST_GREEN', None)
|
||||
|
@ -34,3 +32,11 @@ if dbuser is not None:
|
|||
dsn += ' user=%s' % dbuser
|
||||
if dbpass is not None:
|
||||
dsn += ' password=%s' % dbpass
|
||||
|
||||
# Don't run replication tests if REPL_DSN is not set, default to normal DSN if
|
||||
# set to empty string.
|
||||
repl_dsn = os.environ.get('PSYCOPG2_TEST_REPL_DSN', None)
|
||||
if repl_dsn == '':
|
||||
repl_dsn = dsn
|
||||
|
||||
repl_slot = os.environ.get('PSYCOPG2_TEST_REPL_SLOT', 'psycopg2_test_slot')
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
import os
|
||||
import platform
|
||||
import sys
|
||||
import select
|
||||
from functools import wraps
|
||||
from testconfig import dsn, repl_dsn
|
||||
|
||||
|
@ -128,8 +129,6 @@ class ConnectingTestCase(unittest.TestCase):
|
|||
conn = self.connect(**kwargs)
|
||||
except psycopg2.OperationalError, e:
|
||||
return self.skipTest("replication db not configured: %s" % e)
|
||||
|
||||
conn.autocommit = True
|
||||
return conn
|
||||
|
||||
def _get_conn(self):
|
||||
|
@ -143,6 +142,23 @@ class ConnectingTestCase(unittest.TestCase):
|
|||
|
||||
conn = property(_get_conn, _set_conn)
|
||||
|
||||
# for use with async connections only
|
||||
def wait(self, cur_or_conn):
|
||||
import psycopg2.extensions
|
||||
pollable = cur_or_conn
|
||||
if not hasattr(pollable, 'poll'):
|
||||
pollable = cur_or_conn.connection
|
||||
while True:
|
||||
state = pollable.poll()
|
||||
if state == psycopg2.extensions.POLL_OK:
|
||||
break
|
||||
elif state == psycopg2.extensions.POLL_READ:
|
||||
select.select([pollable], [], [], 10)
|
||||
elif state == psycopg2.extensions.POLL_WRITE:
|
||||
select.select([], [pollable], [], 10)
|
||||
else:
|
||||
raise Exception("Unexpected result from poll: %r", state)
|
||||
|
||||
|
||||
def decorate_all_tests(cls, *decorators):
|
||||
"""
|
||||
|
|
Loading…
Reference in New Issue
Block a user