mirror of
				https://github.com/psycopg/psycopg2.git
				synced 2025-11-04 01:37:31 +03:00 
			
		
		
		
	Address code review feedback
This commit is contained in:
		
							parent
							
								
									f946042a79
								
							
						
					
					
						commit
						ff91ad5186
					
				| 
						 | 
					@ -506,10 +506,23 @@ The individual messages in the replication stream are represented by
 | 
				
			||||||
              try:
 | 
					              try:
 | 
				
			||||||
                  sel = select([cur], [], [], max(0, timeout))
 | 
					                  sel = select([cur], [], [], max(0, timeout))
 | 
				
			||||||
                  if not any(sel):
 | 
					                  if not any(sel):
 | 
				
			||||||
                      cur.send_feedback(flush_lsn=cur.wal_end)  # timed out, send keepalive message
 | 
					                      cur.send_feedback()  # timed out, send keepalive message
 | 
				
			||||||
              except InterruptedError:
 | 
					              except InterruptedError:
 | 
				
			||||||
                  pass  # recalculate timeout and continue
 | 
					                  pass  # recalculate timeout and continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      .. warning::
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					         The ``consume({msg})`` function will only be called when
 | 
				
			||||||
 | 
					         there are new database writes on the server e.g. any DML or
 | 
				
			||||||
 | 
					         DDL statement. Depending on your Postgres cluster
 | 
				
			||||||
 | 
					         configuration this might cause the server to run out of disk
 | 
				
			||||||
 | 
					         space if the writes are far apart. To prevent this from
 | 
				
			||||||
 | 
					         happening you can use `~ReplicationCursor.wal_end` value to
 | 
				
			||||||
 | 
					         periodically send feedback to the server to notify that your
 | 
				
			||||||
 | 
					         replication client has received and processed all the
 | 
				
			||||||
 | 
					         messages.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
.. index::
 | 
					.. index::
 | 
				
			||||||
    pair: Cursor; Replication
 | 
					    pair: Cursor; Replication
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1550,6 +1550,8 @@ retry:
 | 
				
			||||||
        (*msg)->data_start = data_start;
 | 
					        (*msg)->data_start = data_start;
 | 
				
			||||||
        (*msg)->wal_end    = wal_end;
 | 
					        (*msg)->wal_end    = wal_end;
 | 
				
			||||||
        (*msg)->send_time  = send_time;
 | 
					        (*msg)->send_time  = send_time;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        repl->wal_end = wal_end;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    else if (buffer[0] == 'k') {
 | 
					    else if (buffer[0] == 'k') {
 | 
				
			||||||
        /* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */
 | 
					        /* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */
 | 
				
			||||||
| 
						 | 
					@ -1577,7 +1579,6 @@ retry:
 | 
				
			||||||
        goto exit;
 | 
					        goto exit;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    repl->wal_end = wal_end;
 | 
					 | 
				
			||||||
    ret = 0;
 | 
					    ret = 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
exit:
 | 
					exit:
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -44,11 +44,11 @@ typedef struct replicationCursorObject {
 | 
				
			||||||
    struct timeval last_io;       /* timestamp of the last exchange with the server */
 | 
					    struct timeval last_io;       /* timestamp of the last exchange with the server */
 | 
				
			||||||
    struct timeval keepalive_interval;   /* interval for keepalive messages in replication mode */
 | 
					    struct timeval keepalive_interval;   /* interval for keepalive messages in replication mode */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    XLogRecPtr  wal_end;          /* WAL end pointer from the last exchange with the server */
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    XLogRecPtr  write_lsn;        /* LSNs for replication feedback messages */
 | 
					    XLogRecPtr  write_lsn;        /* LSNs for replication feedback messages */
 | 
				
			||||||
    XLogRecPtr  flush_lsn;
 | 
					    XLogRecPtr  flush_lsn;
 | 
				
			||||||
    XLogRecPtr  apply_lsn;
 | 
					    XLogRecPtr  apply_lsn;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    XLogRecPtr  wal_end;          /* WAL end pointer from the last exchange with the server */
 | 
				
			||||||
} replicationCursorObject;
 | 
					} replicationCursorObject;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue
	
	Block a user