From 35a1487184238044facaafc6b0e0fb98102ec8c4 Mon Sep 17 00:00:00 2001 From: Abhin Balur Date: Thu, 29 Feb 2024 07:19:08 -0800 Subject: [PATCH] Fixing stopAsync issues Signed-off-by: Abhin Balur --- .../collector/metrics/PravegaClient.java | 27 ++++----- .../metrics/writers/MetricStreamWriter.java | 8 ++- ...ravegaSensorCollectorIntegrationTests.java | 55 +++++++++---------- 3 files changed, 47 insertions(+), 43 deletions(-) diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java index 0eef13ab..2306beea 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java @@ -43,19 +43,20 @@ public PravegaClient(String scope, String streamName) { private EventStreamWriter initializeWriter() { log.info("Initializing writer with {} {} {}", this.scope, this.streamName, this.controllerURI.toString()); - ClientConfig clientConfig = ClientConfig.builder().controllerURI(this.controllerURI).build(); - StreamManager streamManager = StreamManager.create(clientConfig); - final boolean scopeIsNew = streamManager.createScope(scope); - - StreamConfiguration streamConfig = StreamConfiguration.builder() - .scalingPolicy(ScalingPolicy.fixed(1)) - .build(); - final boolean streamIsNew = streamManager.createStream(scope, streamName, streamConfig); - EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope, - clientConfig); - EventStreamWriter writer = clientFactory.createEventWriter(streamName, - new UTF8StringSerializer(), - EventWriterConfig.builder().build()); + try (StreamManager streamManager = StreamManager.create(controllerURI)) { + final boolean scopeIsNew = streamManager.createScope(scope); + StreamConfiguration streamConfig = StreamConfiguration.builder() + .scalingPolicy(ScalingPolicy.fixed(1)) + .build(); + final boolean streamIsNew = streamManager.createStream(scope, streamName, streamConfig); + } + EventStreamWriter writer; + try (EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope, + ClientConfig.builder().controllerURI(controllerURI).build())) { + writer = clientFactory.createEventWriter(streamName, + new UTF8StringSerializer(), + EventWriterConfig.builder().build()); + } return writer; } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java index 8fc25d3b..d6905fc3 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java @@ -80,8 +80,14 @@ public void doStart() { @Override public void doStop() { log.info("Stopping MetricStreamWriter."); - executor.shutdown(); + executor.shutdownNow(); + try { + executor.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.error("Error stopping MetricStreamWriter {}", e); + } this.client.close(); + log.info("Stopped MetricStreamWriter."); notifyStopped(); } } diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/PravegaSensorCollectorIntegrationTests.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/PravegaSensorCollectorIntegrationTests.java index 8add6ae6..ab560297 100644 --- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/PravegaSensorCollectorIntegrationTests.java +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/PravegaSensorCollectorIntegrationTests.java @@ -88,7 +88,7 @@ public void testPSCDataIntegration() { Service startService = deviceDriverManager.startAsync(); try { startService.awaitRunning(Duration.ofSeconds(30)); - Thread.sleep(12000); + Thread.sleep(15000); } catch (InterruptedException | TimeoutException e) { throw new RuntimeException(e); } @@ -116,37 +116,34 @@ public void testPSCDataIntegration() { } private static void validateStreamData(URI controllerURI, String scope, String streamName, String content) { - StreamManager streamManager = StreamManager.create(controllerURI); - - final String readerGroup = UUID.randomUUID().toString().replace("-", ""); - final ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder() - .stream(Stream.of(scope, streamName)) - .build(); - try (ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(scope, controllerURI)) { - readerGroupManager.createReaderGroup(readerGroup, readerGroupConfig); - } + try (StreamManager streamManager = StreamManager.create(controllerURI)) { + + final String readerGroup = UUID.randomUUID().toString().replace("-", ""); + final ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder() + .stream(Stream.of(scope, streamName)) + .build(); + try (ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(scope, controllerURI)) { + readerGroupManager.createReaderGroup(readerGroup, readerGroupConfig); + } - try (EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope, - ClientConfig.builder().controllerURI(controllerURI).build()); - EventStreamReader reader = clientFactory.createReader("reader", - readerGroup, - new UTF8StringSerializer(), - ReaderConfig.builder().build())) { - System.out.format("Reading all the events from %s/%s%n", scope, streamName); - EventRead eventRead = null; - try { - while ((eventRead = reader.readNextEvent(2000)).getEvent() != null) { - String event = eventRead.getEvent(); - System.out.format("Read event: %s", event); - Assertions.assertNotNull(event); - Assertions.assertFalse(event.isEmpty()); - Assertions.assertEquals(content, event); + try (EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope, + ClientConfig.builder().controllerURI(controllerURI).build()); + EventStreamReader reader = clientFactory.createReader("reader", + readerGroup, + new UTF8StringSerializer(), + ReaderConfig.builder().build())) { + EventRead eventRead; + try { + while ((eventRead = reader.readNextEvent(2000)).getEvent() != null) { + String event = eventRead.getEvent(); + Assertions.assertNotNull(event); + Assertions.assertFalse(event.isEmpty()); + Assertions.assertEquals(content, event); + } + } catch (ReinitializationRequiredException e) { + //There are certain circumstances where the reader needs to be reinitialized } - } catch (ReinitializationRequiredException e) { - //There are certain circumstances where the reader needs to be reinitialized - e.printStackTrace(); } - System.out.format("No more events from %s/%s%n", scope, streamName); } }