diff --git a/doc/src/advanced.rst b/doc/src/advanced.rst index f2e279f8..0f1882cc 100644 --- a/doc/src/advanced.rst +++ b/doc/src/advanced.rst @@ -423,7 +423,7 @@ this will be probably implemented in a future release. Support for coroutine libraries ------------------------------- -.. versionadded:: 2.2.0 +.. versionadded:: 2.2 Psycopg can be used together with coroutine_\-based libraries and participate in cooperative multithreading. @@ -509,3 +509,88 @@ resources about the topic. conn.commit() cur.close() conn.close() + + + +.. index:: + single: Replication + +Replication protocol support +---------------------------- + +.. versionadded:: 2.7 + +Modern PostgreSQL servers (version 9.0 and above) support replication. The +replication protocol is built on top of the client-server protocol and can be +operated using ``libpq``, as such it can be also operated by ``psycopg2``. +The replication protocol can be operated on both synchronous and +:ref:`asynchronous ` connections. + +Server version 9.4 adds a new feature called *Logical Replication*. + +.. seealso:: + + - PostgreSQL `Streaming Replication Protocol`__ + + .. __: http://www.postgresql.org/docs/current/static/protocol-replication.html + + +Logical replication Quick-Start +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +You must be using PostgreSQL server version 9.4 or above to run this quick +start. + +Make sure that replication connections are permitted for user ``postgres`` in +``pg_hba.conf`` and reload the server configuration. You also need to set +``wal_level=logical`` and ``max_wal_senders``, ``max_replication_slots`` to +value greater than zero in ``postgresql.conf`` (these changes require a server +restart). Create a database ``psycopg2test``. + +Then run the following code to quickly try the replication support out. This +is not production code -- it has no error handling, it sends feedback too +often, etc. -- and it's only intended as a simple demo of logical +replication:: + + from __future__ import print_function + import sys + import psycopg2 + import psycopg2.extras + + conn = psycopg2.connect('dbname=psycopg2test user=postgres', + connection_factory=psycopg2.extras.LogicalReplicationConnection) + cur = conn.cursor() + try: + # test_decoding produces textual output + cur.start_replication(slot_name='pytest', decode=True) + except psycopg2.ProgrammingError: + cur.create_replication_slot('pytest', output_plugin='test_decoding') + cur.start_replication(slot_name='pytest', decode=True) + + class DemoConsumer(object): + def __call__(self, msg): + print(msg.payload) + msg.cursor.send_feedback(flush_lsn=msg.data_start) + + democonsumer = DemoConsumer() + + print("Starting streaming, press Control-C to end...", file=sys.stderr) + try: + cur.consume_stream(democonsumer) + except KeyboardInterrupt: + cur.close() + conn.close() + print("The slot 'pytest' still exists. Drop it with " + "SELECT pg_drop_replication_slot('pytest'); if no longer needed.", + file=sys.stderr) + print("WARNING: Transaction logs will accumulate in pg_xlog " + "until the slot is dropped.", file=sys.stderr) + + +You can now make changes to the ``psycopg2test`` database using a normal +psycopg2 session, ``psql``, etc. and see the logical decoding stream printed +by this demo client. + +This will continue running until terminated with ``Control-C``. + +For the details see :ref:`replication-objects`. diff --git a/doc/src/extras.rst b/doc/src/extras.rst index 58b0dc07..b46afdb3 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -141,81 +141,15 @@ Logging cursor .. autoclass:: MinTimeLoggingCursor -Replication protocol support -^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -Modern PostgreSQL servers (version 9.0 and above) support replication. The -replication protocol is built on top of the client-server protocol and can be -operated using ``libpq``, as such it can be also operated by ``psycopg2``. -The replication protocol can be operated on both synchronous and -:ref:`asynchronous ` connections. - -Server version 9.4 adds a new feature called *Logical Replication*. - -.. seealso:: - - - PostgreSQL `Streaming Replication Protocol`__ - - .. __: http://www.postgresql.org/docs/current/static/protocol-replication.html -Logical replication Quick-Start -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -You must be using PostgreSQL server version 9.4 or above to run this quick -start. - -Make sure that replication connections are permitted for user ``postgres`` in -``pg_hba.conf`` and reload the server configuration. You also need to set -``wal_level=logical`` and ``max_wal_senders``, ``max_replication_slots`` to -value greater than zero in ``postgresql.conf`` (these changes require a server -restart). Create a database ``psycopg2test``. - -Then run the following code to quickly try the replication support out. This -is not production code -- it has no error handling, it sends feedback too -often, etc. -- and it's only intended as a simple demo of logical -replication:: - - from __future__ import print_function - import sys - import psycopg2 - import psycopg2.extras - - conn = psycopg2.connect('dbname=psycopg2test user=postgres', - connection_factory=psycopg2.extras.LogicalReplicationConnection) - cur = conn.cursor() - try: - cur.start_replication(slot_name='pytest', decode=True) # test_decoding produces textual output - except psycopg2.ProgrammingError: - cur.create_replication_slot('pytest', output_plugin='test_decoding') - cur.start_replication(slot_name='pytest', decode=True) - - class DemoConsumer(object): - def __call__(self, msg): - print(msg.payload) - msg.cursor.send_feedback(flush_lsn=msg.data_start) - - democonsumer = DemoConsumer() - - print("Starting streaming, press Control-C to end...", file=sys.stderr) - try: - cur.consume_stream(democonsumer) - except KeyboardInterrupt: - cur.close() - conn.close() - print("The slot 'pytest' still exists. Drop it with SELECT pg_drop_replication_slot('pytest'); if no longer needed.", file=sys.stderr) - print("WARNING: Transaction logs will accumulate in pg_xlog until the slot is dropped.", file=sys.stderr) - - -You can now make changes to the ``psycopg2test`` database using a normal -psycopg2 session, ``psql``, etc. and see the logical decoding stream printed -by this demo client. - -This will continue running until terminated with ``Control-C``. - +.. _replication-objects: Replication connection and cursor classes -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. index:: + pair: Connection; replication .. autoclass:: ReplicationConnectionBase @@ -253,6 +187,9 @@ The following replication types are defined: `ReplicationCursor` for actual communication with the server. +.. index:: + pair: Message; replication + The individual messages in the replication stream are represented by `ReplicationMessage` objects (both logical and physical type): @@ -290,6 +227,9 @@ The individual messages in the replication stream are represented by A reference to the corresponding `ReplicationCursor` object. +.. index:: + pair: Cursor; replication + .. autoclass:: ReplicationCursor .. method:: create_replication_slot(slot_name, slot_type=None, output_plugin=None)