From 0e4f39c569e7db99cf116651f7d63e41c87d73d6 Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Wed, 21 Aug 2024 18:41:09 -0400 Subject: [PATCH] Run loops check the interrupted flag (#1215) * Run loops check the interrupted flag * Run loops check the interrupted flag --- .../io/nats/client/impl/MessageManager.java | 2 +- .../io/nats/client/impl/NatsConnection.java | 18 ++++++------------ .../nats/client/impl/NatsConnectionReader.java | 2 +- .../nats/client/impl/NatsConnectionWriter.java | 2 +- .../java/io/nats/client/impl/NatsConsumer.java | 12 +++++------- .../io/nats/client/impl/NatsDispatcher.java | 4 +--- .../impl/NatsDispatcherWithExecutor.java | 4 +--- 7 files changed, 16 insertions(+), 28 deletions(-) diff --git a/src/main/java/io/nats/client/impl/MessageManager.java b/src/main/java/io/nats/client/impl/MessageManager.java index f6bac5081..6997907d9 100644 --- a/src/main/java/io/nats/client/impl/MessageManager.java +++ b/src/main/java/io/nats/client/impl/MessageManager.java @@ -149,7 +149,7 @@ public void shutdown() { @Override public void run() { - if (alive.get()) { + if (alive.get() && !Thread.interrupted()) { long sinceLast = System.currentTimeMillis() - lastMsgReceived.get(); if (alive.get() && sinceLast > alarmPeriodSetting) { handleHeartbeatError(); diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index f318e59d1..a5d8a1b10 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -2178,20 +2178,17 @@ public CompletableFuture drain(Duration timeout) throws TimeoutExceptio // Wait for the timeout or the pending count to go to 0 executor.submit(() -> { try { - Instant now = Instant.now(); - - while (timeout == null || timeout.equals(Duration.ZERO) - || Duration.between(start, now).compareTo(timeout) < 0) { + long stop = (timeout == null || timeout.equals(Duration.ZERO)) + ? Long.MAX_VALUE + : System.nanoTime() + timeout.toNanos(); + while (System.nanoTime() < stop && !Thread.interrupted()) + { consumers.removeIf(NatsConsumer::isDrained); - if (consumers.isEmpty()) { break; } - //noinspection BusyWait Thread.sleep(1); // Sleep 1 milli - - now = Instant.now(); } // Stop publishing @@ -2201,16 +2198,13 @@ public CompletableFuture drain(Duration timeout) throws TimeoutExceptio if (timeout == null || timeout.equals(Duration.ZERO)) { this.flush(Duration.ZERO); } else { - now = Instant.now(); - + Instant now = Instant.now(); Duration passed = Duration.between(start, now); Duration newTimeout = timeout.minus(passed); - if (newTimeout.toNanos() > 0) { this.flush(newTimeout); } } - this.close(false, false); // close the connection after the last flush tracker.complete(consumers.isEmpty()); } catch (TimeoutException e) { diff --git a/src/main/java/io/nats/client/impl/NatsConnectionReader.java b/src/main/java/io/nats/client/impl/NatsConnectionReader.java index 235346135..7278a0754 100644 --- a/src/main/java/io/nats/client/impl/NatsConnectionReader.java +++ b/src/main/java/io/nats/client/impl/NatsConnectionReader.java @@ -128,7 +128,7 @@ public void run() { this.gotCR = false; this.opPos = 0; - while (this.running.get()) { + while (running.get() && !Thread.interrupted()) { this.bufferPosition = 0; int bytesRead = dataPort.read(this.buffer, 0, this.buffer.length); diff --git a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java index 554fb6730..aff60d61c 100644 --- a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java +++ b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java @@ -193,7 +193,7 @@ public void run() { dataPort = this.dataPortFuture.get(); // Will wait for the future to complete StatisticsCollector stats = this.connection.getNatsStatistics(); - while (this.running.get()) { + while (running.get() && !Thread.interrupted()) { NatsMessage msg; if (this.reconnectMode.get()) { msg = this.reconnectOutgoing.accumulate(sendBufferLength.get(), Options.MAX_MESSAGES_IN_NETWORK_BUFFER, reconnectTimeout); diff --git a/src/main/java/io/nats/client/impl/NatsConsumer.java b/src/main/java/io/nats/client/impl/NatsConsumer.java index 0eb5949ce..6b665276e 100644 --- a/src/main/java/io/nats/client/impl/NatsConsumer.java +++ b/src/main/java/io/nats/client/impl/NatsConsumer.java @@ -205,17 +205,15 @@ public CompletableFuture drain(Duration timeout) throws InterruptedExce // draining connection.getExecutor().submit(() -> { try { - Instant now = Instant.now(); - - while (timeout == null || timeout.equals(Duration.ZERO) - || Duration.between(start, now).compareTo(timeout) < 0) { + long stop = (timeout == null || timeout.equals(Duration.ZERO)) + ? Long.MAX_VALUE + : System.nanoTime() + timeout.toNanos(); + while (System.nanoTime() < stop && !Thread.interrupted()) { if (this.isDrained()) { break; } - + //noinspection BusyWait Thread.sleep(1); // Sleep 1 milli - - now = Instant.now(); } this.cleanUpAfterDrain(); diff --git a/src/main/java/io/nats/client/impl/NatsDispatcher.java b/src/main/java/io/nats/client/impl/NatsDispatcher.java index 5dce5d0a6..d13d6e6f3 100644 --- a/src/main/java/io/nats/client/impl/NatsDispatcher.java +++ b/src/main/java/io/nats/client/impl/NatsDispatcher.java @@ -84,10 +84,8 @@ boolean breakRunLoop() { public void run() { try { - while (this.running.get()) { // start - + while (running.get() && !Thread.interrupted()) { NatsMessage msg = this.incoming.pop(this.waitForMessage); - if (msg != null) { NatsSubscription sub = msg.getNatsSubscription(); if (sub != null && sub.isActive()) { diff --git a/src/main/java/io/nats/client/impl/NatsDispatcherWithExecutor.java b/src/main/java/io/nats/client/impl/NatsDispatcherWithExecutor.java index 8dfb04740..c9cd4cae2 100644 --- a/src/main/java/io/nats/client/impl/NatsDispatcherWithExecutor.java +++ b/src/main/java/io/nats/client/impl/NatsDispatcherWithExecutor.java @@ -24,10 +24,8 @@ class NatsDispatcherWithExecutor extends NatsDispatcher { @Override public void run() { try { - while (this.running.get()) { // start - + while (running.get() && !Thread.interrupted()) { NatsMessage msg = this.incoming.pop(this.waitForMessage); - if (msg != null) { NatsSubscription sub = msg.getNatsSubscription(); if (sub != null && sub.isActive()) {