From 2fb93a87d3648e2dece5698cb64b059da5134a57 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Thu, 22 Feb 2024 15:26:56 -0800 Subject: [PATCH 1/2] fix(core): Revert "Fix possible endless wait in stop() after AUTH_FAILED error (#688)" This reverts commit 5225b3e2fab6fec3b12b789e3cc6f3218429d32d. The commit being reverted here caused kazoo not to empty the send queue before disconnecting. This means that if a client submitted asynchronous requests and then called client.stop(), the connection would be closed immediately, usually after only one (but possibly more) of the submitted requests were sent. Prior to this, Kazoo would empty the queue of submitted requests all the way up to and including the Close request when client.stop() was called. Another area where this caused problems is in a busy multi-threaded system. One thread might decide to gracefully close the connection, but if there is any traffic generated by another thread, then the connection would end up terminating without ever sending the Close request. Failure to gracefully shutdown a ZooKeeper connection can mean that other system components need to wait for ephemeral node timeouts to detect that a component has shutdown. --- kazoo/protocol/connection.py | 2 +- kazoo/tests/test_client.py | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/kazoo/protocol/connection.py b/kazoo/protocol/connection.py index cee425a2..ad4f3b1f 100644 --- a/kazoo/protocol/connection.py +++ b/kazoo/protocol/connection.py @@ -619,7 +619,7 @@ def _connect_attempt(self, host, hostip, port, retry): self.ping_outstanding.clear() last_send = time.monotonic() with self._socket_error_handling(): - while not self.client._stopped.is_set(): + while True: # Watch for something to read or send jitter_time = random.randint(1, 40) / 100.0 deadline = last_send + read_timeout / 2.0 - jitter_time diff --git a/kazoo/tests/test_client.py b/kazoo/tests/test_client.py index 178a4452..e376baaf 100644 --- a/kazoo/tests/test_client.py +++ b/kazoo/tests/test_client.py @@ -258,8 +258,6 @@ def test_async_auth_failure(self): with pytest.raises(AuthFailedError): client.add_auth("unknown-scheme", digest_auth) - client.stop() - def test_add_auth_on_reconnect(self): client = self._get_client() client.start() From 6540c932505a988ccae3c77c023113d4c4c01947 Mon Sep 17 00:00:00 2001 From: Stephen Sorriaux Date: Sun, 3 Mar 2024 12:10:01 -0500 Subject: [PATCH 2/2] fix(test): avoid racy reader vs writer contender in `test_rw_lock` `test_rw_lock` is creating 2 threads (`reader` and `writer`) and, after being started, it is expected that `reader` is a contender before `writer`. In some busy systems (like the CI... it's always the CI!) this may not be true and lead to the test failure because `writer` can be a contender before `reader`. This commit makes sure that `reader` is always a contender before `writer`. --- kazoo/tests/test_lock.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/kazoo/tests/test_lock.py b/kazoo/tests/test_lock.py index 397e971f..7e5ecb77 100644 --- a/kazoo/tests/test_lock.py +++ b/kazoo/tests/test_lock.py @@ -487,27 +487,29 @@ def test_rw_lock(self): lock = self.client.WriteLock(self.lockpath, "test") lock.acquire() + wait = self.make_wait() reader_thread.start() + # make sure reader_thread is a contender before writer_thread + wait(lambda: len(lock.contenders()) == 2) writer_thread.start() # wait for everyone to line up on the lock - wait = self.make_wait() wait(lambda: len(lock.contenders()) == 3) contenders = lock.contenders() assert contenders[0] == "test" remaining = contenders[1:] - # release the lock and contenders should claim it in order - lock.release() - contender_bits = { "reader": (reader_thread, reader_event), "writer": (writer_thread, writer_event), } - for contender in ("reader", "writer"): - thread, event = contender_bits[contender] + # release the lock and contenders should claim it in order + lock.release() + + for contender, contender_bits in contender_bits.items(): + _, event = contender_bits with self.condition: while not self.active_thread: