From 283b9553f329d293b7798ab4872ed41db7e164c3 Mon Sep 17 00:00:00 2001 From: Kevin Michel Date: Wed, 14 Dec 2022 14:54:50 +0100 Subject: [PATCH 1/5] tests: remove unused local variable --- test/test_walreceiver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_walreceiver.py b/test/test_walreceiver.py index 807a689b..0bfecdd6 100644 --- a/test/test_walreceiver.py +++ b/test/test_walreceiver.py @@ -70,7 +70,7 @@ def test_walreceiver(self, db, pghoard_walreceiver, replication_slot): previous_wal_name = lsn.previous_walfile_start_lsn.walfile_name pghoard.start_walreceiver(pghoard.test_site, node, last_flushed_lsn) wait_for_xlog(pghoard, 4) - last_flushed_lsn = stop_walreceiver(pghoard) + stop_walreceiver(pghoard) state = get_transfer_agent_upload_xlog_state(pghoard) assert state.get("xlogs_since_basebackup") == 4 assert state.get("latest_filename") == previous_wal_name From 3391454605279c753082b9c7991ef0c1e0ff3b75 Mon Sep 17 00:00:00 2001 From: Kevin Michel Date: Wed, 14 Dec 2022 14:56:19 +0100 Subject: [PATCH 2/5] walreceiver: stop replication when stopping thread This avoids failures when starting the replication twice (mainly in tests). --- pghoard/walreceiver.py | 85 ++++++++++++++++++++++++------------------ 1 file changed, 48 insertions(+), 37 deletions(-) diff --git a/pghoard/walreceiver.py b/pghoard/walreceiver.py index 4691d870..151257c4 100644 --- a/pghoard/walreceiver.py +++ b/pghoard/walreceiver.py @@ -127,6 +127,14 @@ def start_replication(self): self.c.start_replication(start_lsn=lsn, timeline=timeline) return timeline + def stop_replication(self) -> None: + if self.c is not None: + self.c.close() + self.c = None + if self.conn is not None: + self.conn.close() + self.conn = None + def switch_wal(self): self.log.debug("Switching WAL from %r amount of data: %r", self.latest_wal, self.buffer.tell()) @@ -159,43 +167,46 @@ def switch_wal(self): def run_safe(self): self._init_cursor() - if self.replication_slot: - self.create_replication_slot() - timeline = self.start_replication() - while self.running: - wal_name = None - try: - msg = self.c.read_message() - except psycopg2.DatabaseError as ex: - self.log.exception("Unexpected exception in reading walreceiver msg") - self.metrics.unexpected_exception(ex, where="walreceiver_run") - time.sleep(1) - continue - self.log.debug("replication_msg: %r, buffer: %r/%r", msg, self.buffer.tell(), WAL_SEG_SIZE) - if msg: - self.latest_activity = datetime.datetime.utcnow() - lsn = LSN(msg.data_start, timeline_id=timeline, server_version=self.pg_version_server) - wal_name = lsn.walfile_name - - if not self.latest_wal: - self.latest_wal_start = lsn.lsn - self.latest_wal = wal_name - self.buffer.write(msg.payload) - - # TODO: Calculate end pos and transmit that? - msg.cursor.send_feedback(write_lsn=lsn.lsn) - - if wal_name and self.latest_wal != wal_name or self.buffer.tell() >= WAL_SEG_SIZE: - self.switch_wal() - self.process_completed_segments() - - if not msg: - timeout = KEEPALIVE_INTERVAL - (datetime.datetime.now() - self.c.io_timestamp).total_seconds() - with suppress(InterruptedError): - if not any(select.select([self.c], [], [], max(0, timeout))): - self.c.send_feedback() # timing out, send keepalive - # When we stop, process sent wals to update last_flush lsn. - self.process_completed_segments(block=True) + try: + if self.replication_slot: + self.create_replication_slot() + timeline = self.start_replication() + while self.running: + wal_name = None + try: + msg = self.c.read_message() + except psycopg2.DatabaseError as ex: + self.log.exception("Unexpected exception in reading walreceiver msg") + self.metrics.unexpected_exception(ex, where="walreceiver_run") + time.sleep(1) + continue + self.log.debug("replication_msg: %r, buffer: %r/%r", msg, self.buffer.tell(), WAL_SEG_SIZE) + if msg: + self.latest_activity = datetime.datetime.utcnow() + lsn = LSN(msg.data_start, timeline_id=timeline, server_version=self.pg_version_server) + wal_name = lsn.walfile_name + + if not self.latest_wal: + self.latest_wal_start = lsn.lsn + self.latest_wal = wal_name + self.buffer.write(msg.payload) + + # TODO: Calculate end pos and transmit that? + msg.cursor.send_feedback(write_lsn=lsn.lsn) + + if wal_name and self.latest_wal != wal_name or self.buffer.tell() >= WAL_SEG_SIZE: + self.switch_wal() + self.process_completed_segments() + + if not msg: + timeout = KEEPALIVE_INTERVAL - (datetime.datetime.now() - self.c.io_timestamp).total_seconds() + with suppress(InterruptedError): + if not any(select.select([self.c], [], [], max(0.0, timeout))): + self.c.send_feedback() # timing out, send keepalive + # When we stop, process sent wals to update last_flush lsn. + self.process_completed_segments(block=True) + finally: + self.stop_replication() def process_completed_segments(self, *, block=False): for wal_start, queue in self.callbacks.items(): From 2847a74a4d63081670cc95287bd3aff560d0e6ae Mon Sep 17 00:00:00 2001 From: Kevin Michel Date: Thu, 15 Dec 2022 09:09:45 +0100 Subject: [PATCH 3/5] walreceiver: don't split a message in two segments If the WAL segment buffer is almost full, it's possible for the next message to only partially fit in the buffer. In that case, we were sending only part of the message in the current WAL segment, and then failing because there was something left in the buffer. Instead, look ahead and if we know the message won't fit, split the WAL file before adding the message. We will still split the message (and fail) if a single message is larger than the WAL segment size. I don't know if that can happen and what's the correct way to handle this case. --- pghoard/walreceiver.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pghoard/walreceiver.py b/pghoard/walreceiver.py index 151257c4..7d872301 100644 --- a/pghoard/walreceiver.py +++ b/pghoard/walreceiver.py @@ -186,6 +186,13 @@ def run_safe(self): lsn = LSN(msg.data_start, timeline_id=timeline, server_version=self.pg_version_server) wal_name = lsn.walfile_name + if self.buffer.tell() > 0 and self.buffer.tell() + len(msg.payload) > WAL_SEG_SIZE: + # If adding the payload would make the wal segment too large, switch the WAL + # now instead of adding the payload and having it written partly in the current + # wal segment and partly in the next one. + self.switch_wal() + self.process_completed_segments() + if not self.latest_wal: self.latest_wal_start = lsn.lsn self.latest_wal = wal_name From 6ae9791d4b7f2577ee3f2520ab1bb0451040948a Mon Sep 17 00:00:00 2001 From: Kevin Michel Date: Thu, 15 Dec 2022 10:07:39 +0100 Subject: [PATCH 4/5] walreceiver: finish WAL segments on timeout If we received no new replication message for a long time, and some messages are still pending, flush a WAL segment. --- pghoard/walreceiver.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pghoard/walreceiver.py b/pghoard/walreceiver.py index 7d872301..c58565c0 100644 --- a/pghoard/walreceiver.py +++ b/pghoard/walreceiver.py @@ -210,6 +210,10 @@ def run_safe(self): with suppress(InterruptedError): if not any(select.select([self.c], [], [], max(0.0, timeout))): self.c.send_feedback() # timing out, send keepalive + # Don't leave unfinished segments waiting for more than the KEEPALIVE_INTERVAL + if self.buffer.tell() > 0: + self.switch_wal() + self.process_completed_segments() # When we stop, process sent wals to update last_flush lsn. self.process_completed_segments(block=True) finally: From 99b67f558593c5c93b2cf52f35c6b7a69adf7db6 Mon Sep 17 00:00:00 2001 From: Kevin Michel Date: Wed, 21 Dec 2022 01:08:31 +0100 Subject: [PATCH 5/5] tests: avoid concurrency bug in walreceiver There was a possible race condition when fetching the wal_name and *then* starting walreceiver. Fix it using a `threading.Event`. --- pghoard/walreceiver.py | 6 ++++++ test/test_walreceiver.py | 12 ++++++------ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/pghoard/walreceiver.py b/pghoard/walreceiver.py index c58565c0..173e2209 100644 --- a/pghoard/walreceiver.py +++ b/pghoard/walreceiver.py @@ -8,9 +8,11 @@ import logging import os import select +import threading import time from io import BytesIO from queue import Empty, Queue +from typing import Optional import psycopg2 import psycopg2.errors @@ -50,6 +52,8 @@ def __init__( self.conn = None self.c = None self.buffer = BytesIO() + self.initial_lsn: Optional[LSN] = None + self.initial_lsn_available = threading.Event() self.latest_wal = None self.latest_wal_start = None self.latest_activity = datetime.datetime.utcnow() @@ -117,6 +121,8 @@ def start_replication(self): lsn = LSN(self.last_flushed_lsn, self.pg_version_server) else: lsn = lsn_from_sysinfo(identify_system, self.pg_version_server) + self.initial_lsn = lsn + self.initial_lsn_available.set() lsn = str(lsn.walfile_start_lsn) self.log.info("Starting replication from %r, timeline: %r with slot: %r", lsn, timeline, self.replication_slot) if self.replication_slot: diff --git a/test/test_walreceiver.py b/test/test_walreceiver.py index 0bfecdd6..5c1f4c41 100644 --- a/test/test_walreceiver.py +++ b/test/test_walreceiver.py @@ -43,13 +43,13 @@ def test_walreceiver(self, db, pghoard_walreceiver, replication_slot): else: node["slot"] = replication_slot - # The transfer agent state will be used to check what - # was uploaded - # Before starting the walreceiver, get the current wal name. - wal_name = get_current_lsn(node).walfile_name - # Start streaming, force a wal rotation, and check the wal has been - # archived + # The transfer agent state will be used to check what was uploaded + # Start streaming pghoard.start_walreceiver(pghoard.test_site, node, None) + # Get the initial wal name of the server + pghoard.walreceivers[pghoard.test_site].initial_lsn_available.wait() + wal_name = pghoard.walreceivers[pghoard.test_site].initial_lsn.walfile_name + # Force a wal rotation switch_wal(conn) # Check that we uploaded one file, and it is the right one. wait_for_xlog(pghoard, 1)