Skip to content

Commit

Permalink
Fixing stopAsync issues
Browse files Browse the repository at this point in the history
Signed-off-by: Abhin Balur <[email protected]>
abhinb committed Feb 29, 2024
1 parent dea286f commit 35a1487
Showing 3 changed files with 47 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -43,19 +43,20 @@ public PravegaClient(String scope, String streamName) {

private EventStreamWriter<String> initializeWriter() {
log.info("Initializing writer with {} {} {}", this.scope, this.streamName, this.controllerURI.toString());
ClientConfig clientConfig = ClientConfig.builder().controllerURI(this.controllerURI).build();
StreamManager streamManager = StreamManager.create(clientConfig);
final boolean scopeIsNew = streamManager.createScope(scope);

StreamConfiguration streamConfig = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(1))
.build();
final boolean streamIsNew = streamManager.createStream(scope, streamName, streamConfig);
EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope,
clientConfig);
EventStreamWriter<String> writer = clientFactory.createEventWriter(streamName,
new UTF8StringSerializer(),
EventWriterConfig.builder().build());
try (StreamManager streamManager = StreamManager.create(controllerURI)) {
final boolean scopeIsNew = streamManager.createScope(scope);
StreamConfiguration streamConfig = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(1))
.build();
final boolean streamIsNew = streamManager.createStream(scope, streamName, streamConfig);
}
EventStreamWriter<String> writer;
try (EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope,
ClientConfig.builder().controllerURI(controllerURI).build())) {
writer = clientFactory.createEventWriter(streamName,
new UTF8StringSerializer(),
EventWriterConfig.builder().build());
}
return writer;
}

Original file line number Diff line number Diff line change
@@ -80,8 +80,14 @@ public void doStart() {
@Override
public void doStop() {
log.info("Stopping MetricStreamWriter.");
executor.shutdown();
executor.shutdownNow();
try {
executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("Error stopping MetricStreamWriter {}", e);
}
this.client.close();
log.info("Stopped MetricStreamWriter.");
notifyStopped();
}
}
Original file line number Diff line number Diff line change
@@ -88,7 +88,7 @@ public void testPSCDataIntegration() {
Service startService = deviceDriverManager.startAsync();
try {
startService.awaitRunning(Duration.ofSeconds(30));
Thread.sleep(12000);
Thread.sleep(15000);
} catch (InterruptedException | TimeoutException e) {
throw new RuntimeException(e);
}
@@ -116,37 +116,34 @@ public void testPSCDataIntegration() {
}

private static void validateStreamData(URI controllerURI, String scope, String streamName, String content) {
StreamManager streamManager = StreamManager.create(controllerURI);

final String readerGroup = UUID.randomUUID().toString().replace("-", "");
final ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder()
.stream(Stream.of(scope, streamName))
.build();
try (ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(scope, controllerURI)) {
readerGroupManager.createReaderGroup(readerGroup, readerGroupConfig);
}
try (StreamManager streamManager = StreamManager.create(controllerURI)) {

final String readerGroup = UUID.randomUUID().toString().replace("-", "");
final ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder()
.stream(Stream.of(scope, streamName))
.build();
try (ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(scope, controllerURI)) {
readerGroupManager.createReaderGroup(readerGroup, readerGroupConfig);
}

try (EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope,
ClientConfig.builder().controllerURI(controllerURI).build());
EventStreamReader<String> reader = clientFactory.createReader("reader",
readerGroup,
new UTF8StringSerializer(),
ReaderConfig.builder().build())) {
System.out.format("Reading all the events from %s/%s%n", scope, streamName);
EventRead<String> eventRead = null;
try {
while ((eventRead = reader.readNextEvent(2000)).getEvent() != null) {
String event = eventRead.getEvent();
System.out.format("Read event: %s", event);
Assertions.assertNotNull(event);
Assertions.assertFalse(event.isEmpty());
Assertions.assertEquals(content, event);
try (EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope,
ClientConfig.builder().controllerURI(controllerURI).build());
EventStreamReader<String> reader = clientFactory.createReader("reader",
readerGroup,
new UTF8StringSerializer(),
ReaderConfig.builder().build())) {
EventRead<String> eventRead;
try {
while ((eventRead = reader.readNextEvent(2000)).getEvent() != null) {
String event = eventRead.getEvent();
Assertions.assertNotNull(event);
Assertions.assertFalse(event.isEmpty());
Assertions.assertEquals(content, event);
}
} catch (ReinitializationRequiredException e) {
//There are certain circumstances where the reader needs to be reinitialized
}
} catch (ReinitializationRequiredException e) {
//There are certain circumstances where the reader needs to be reinitialized
e.printStackTrace();
}
System.out.format("No more events from %s/%s%n", scope, streamName);
}
}

0 comments on commit 35a1487

Please sign in to comment.