mirror of
				https://github.com/psycopg/psycopg2.git
				synced 2025-11-04 09:47:30 +03:00 
			
		
		
		
	Store WAL end pointer in the replication cursor
This commit is contained in:
		
							parent
							
								
									3eecf34bea
								
							
						
					
					
						commit
						f946042a79
					
				| 
						 | 
					@ -481,6 +481,12 @@ The individual messages in the replication stream are represented by
 | 
				
			||||||
        communication with the server (a data or keepalive message in either
 | 
					        communication with the server (a data or keepalive message in either
 | 
				
			||||||
        direction).
 | 
					        direction).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    .. attribute:: wal_end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					       LSN position of the current end of WAL on the server at the
 | 
				
			||||||
 | 
					       moment of last data or keepalive message received from the
 | 
				
			||||||
 | 
					       server.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    An actual example of asynchronous operation might look like this::
 | 
					    An actual example of asynchronous operation might look like this::
 | 
				
			||||||
 | 
					
 | 
				
			||||||
      from select import select
 | 
					      from select import select
 | 
				
			||||||
| 
						 | 
					@ -500,7 +506,7 @@ The individual messages in the replication stream are represented by
 | 
				
			||||||
              try:
 | 
					              try:
 | 
				
			||||||
                  sel = select([cur], [], [], max(0, timeout))
 | 
					                  sel = select([cur], [], [], max(0, timeout))
 | 
				
			||||||
                  if not any(sel):
 | 
					                  if not any(sel):
 | 
				
			||||||
                      cur.send_feedback()  # timed out, send keepalive message
 | 
					                      cur.send_feedback(flush_lsn=cur.wal_end)  # timed out, send keepalive message
 | 
				
			||||||
              except InterruptedError:
 | 
					              except InterruptedError:
 | 
				
			||||||
                  pass  # recalculate timeout and continue
 | 
					                  pass  # recalculate timeout and continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1559,6 +1559,10 @@ retry:
 | 
				
			||||||
            goto exit;
 | 
					            goto exit;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        wal_end = fe_recvint64(buffer + 1);
 | 
				
			||||||
 | 
					        Dprintf("pq_read_replication_message: wal_end="XLOGFMTSTR, XLOGFMTARGS(wal_end));
 | 
				
			||||||
 | 
					        repl->wal_end = wal_end;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        reply = buffer[hdr];
 | 
					        reply = buffer[hdr];
 | 
				
			||||||
        if (reply && pq_send_replication_feedback(repl, 0) < 0) {
 | 
					        if (reply && pq_send_replication_feedback(repl, 0) < 0) {
 | 
				
			||||||
            goto exit;
 | 
					            goto exit;
 | 
				
			||||||
| 
						 | 
					@ -1573,6 +1577,7 @@ retry:
 | 
				
			||||||
        goto exit;
 | 
					        goto exit;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    repl->wal_end = wal_end;
 | 
				
			||||||
    ret = 0;
 | 
					    ret = 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
exit:
 | 
					exit:
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -44,6 +44,8 @@ typedef struct replicationCursorObject {
 | 
				
			||||||
    struct timeval last_io;       /* timestamp of the last exchange with the server */
 | 
					    struct timeval last_io;       /* timestamp of the last exchange with the server */
 | 
				
			||||||
    struct timeval keepalive_interval;   /* interval for keepalive messages in replication mode */
 | 
					    struct timeval keepalive_interval;   /* interval for keepalive messages in replication mode */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    XLogRecPtr  wal_end;          /* WAL end pointer from the last exchange with the server */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    XLogRecPtr  write_lsn;        /* LSNs for replication feedback messages */
 | 
					    XLogRecPtr  write_lsn;        /* LSNs for replication feedback messages */
 | 
				
			||||||
    XLogRecPtr  flush_lsn;
 | 
					    XLogRecPtr  flush_lsn;
 | 
				
			||||||
    XLogRecPtr  apply_lsn;
 | 
					    XLogRecPtr  apply_lsn;
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -228,6 +228,17 @@ repl_curs_get_io_timestamp(replicationCursorObject *self)
 | 
				
			||||||
    return res;
 | 
					    return res;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/* object member list */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#define OFFSETOF(x) offsetof(replicationCursorObject, x)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static struct PyMemberDef replicationCursorObject_members[] = {
 | 
				
			||||||
 | 
					    {"wal_end", T_ULONGLONG, OFFSETOF(wal_end), READONLY,
 | 
				
			||||||
 | 
					        "LSN position of the current end of WAL on the server."},
 | 
				
			||||||
 | 
					    {NULL}
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/* object method list */
 | 
					/* object method list */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static struct PyMethodDef replicationCursorObject_methods[] = {
 | 
					static struct PyMethodDef replicationCursorObject_methods[] = {
 | 
				
			||||||
| 
						 | 
					@ -259,6 +270,8 @@ replicationCursor_init(PyObject *obj, PyObject *args, PyObject *kwargs)
 | 
				
			||||||
    self->consuming = 0;
 | 
					    self->consuming = 0;
 | 
				
			||||||
    self->decode = 0;
 | 
					    self->decode = 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    self->wal_end = 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    self->write_lsn = 0;
 | 
					    self->write_lsn = 0;
 | 
				
			||||||
    self->flush_lsn = 0;
 | 
					    self->flush_lsn = 0;
 | 
				
			||||||
    self->apply_lsn = 0;
 | 
					    self->apply_lsn = 0;
 | 
				
			||||||
| 
						 | 
					@ -308,7 +321,7 @@ PyTypeObject replicationCursorType = {
 | 
				
			||||||
    0,          /*tp_iter*/
 | 
					    0,          /*tp_iter*/
 | 
				
			||||||
    0,          /*tp_iternext*/
 | 
					    0,          /*tp_iternext*/
 | 
				
			||||||
    replicationCursorObject_methods, /*tp_methods*/
 | 
					    replicationCursorObject_methods, /*tp_methods*/
 | 
				
			||||||
    0,          /*tp_members*/
 | 
					    replicationCursorObject_members, /*tp_members*/
 | 
				
			||||||
    replicationCursorObject_getsets, /*tp_getset*/
 | 
					    replicationCursorObject_getsets, /*tp_getset*/
 | 
				
			||||||
    &cursorType, /*tp_base*/
 | 
					    &cursorType, /*tp_base*/
 | 
				
			||||||
    0,          /*tp_dict*/
 | 
					    0,          /*tp_dict*/
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -242,6 +242,7 @@ class AsyncReplicationTest(ReplicationTestCase):
 | 
				
			||||||
        def consume(msg):
 | 
					        def consume(msg):
 | 
				
			||||||
            # just check the methods
 | 
					            # just check the methods
 | 
				
			||||||
            "%s: %s" % (cur.io_timestamp, repr(msg))
 | 
					            "%s: %s" % (cur.io_timestamp, repr(msg))
 | 
				
			||||||
 | 
					            "%s: %s" % (cur.wal_end, repr(msg))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            self.msg_count += 1
 | 
					            self.msg_count += 1
 | 
				
			||||||
            if self.msg_count > 3:
 | 
					            if self.msg_count > 3:
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue
	
	Block a user