subscriber) {
+ return new SubscriberReference<>(subscriber);
+ }
+
+ void releaseReference() {
+ this.subscriber = Optional.empty();
+ }
+
+ @Override
+ public void onSubscribe(Flow.Subscription subscription) {
+ subscriber.ifPresent(s -> s.onSubscribe(subscription));
+ }
+
+ @Override
+ public void onNext(T item) {
+ subscriber.ifPresent(s -> s.onNext(item));
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ subscriber.ifPresent(s -> s.onError(throwable));
+ }
+
+ @Override
+ public void onComplete() {
+ subscriber.ifPresent(Flow.Subscriber::onComplete);
+ }
+}
diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/valve/CloseableSupport.java b/common/reactive/src/main/java/io/helidon/common/reactive/valve/CloseableSupport.java
deleted file mode 100644
index 8ba3d91415f..00000000000
--- a/common/reactive/src/main/java/io/helidon/common/reactive/valve/CloseableSupport.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.helidon.common.reactive.valve;
-
-/**
- * Can be closed and has information about the state.
- *
- * The goal is to provide just enough synchronisation.
- */
-class CloseableSupport implements AutoCloseable {
-
- private boolean closed = false;
- private volatile boolean closedVolatile = false;
-
- @Override
- public void close() {
- closed = true;
- closedVolatile = true;
- }
-
- /**
- * Returns {@code true} if it is closed.
- *
- * @return {@code true} if it is closed
- */
- boolean closed() {
- return closed || closedVolatile;
- }
-}
diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/valve/DetachedValve.java b/common/reactive/src/main/java/io/helidon/common/reactive/valve/DetachedValve.java
deleted file mode 100644
index 2ca5f6bd7e8..00000000000
--- a/common/reactive/src/main/java/io/helidon/common/reactive/valve/DetachedValve.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.helidon.common.reactive.valve;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-
-class DetachedValve implements Valve {
-
- private static final int INTERNAL_INDEX = 0;
- private static final int EXTERNAL_INDEX = 1;
-
- private final boolean[] paused = new boolean[] {false, false};
-
- private final Lock lock = new ReentrantLock();
- private final Valve delegate;
- private final ExecutorService executorService;
-
- DetachedValve(Valve delegate, ExecutorService executorService) {
- this.delegate = delegate;
- this.executorService = executorService;
- }
-
- @Override
- public void handle(BiConsumer onData, Consumer onError, Runnable onComplete) {
- delegate.handle((t, p) -> {
- pause(INTERNAL_INDEX);
- CompletableFuture.runAsync(() -> onData.accept(t, this), executorService)
- .whenComplete((vd, thr) -> {
- if (thr == null) {
- resume(INTERNAL_INDEX);
- } else {
- executorService.submit(() -> onError.accept(thr));
- }
- });
- },
- t -> executorService.submit(() -> onError.accept(t)),
- () -> executorService.submit(onComplete));
- }
-
- private void pause(int index) {
- lock.lock();
- try {
- boolean callIt = !paused[0] && !paused[1];
- paused[index] = true;
- if (callIt) {
- delegate.pause();
- }
- } finally {
- lock.unlock();
- }
- }
-
- private void resume(int index) {
- lock.lock();
- try {
- boolean callIt = paused[index] && !paused[index == 0 ? 1 : 0];
- paused[index] = false;
- if (callIt) {
- delegate.resume();
- }
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public void pause() {
- pause(EXTERNAL_INDEX);
- }
-
- @Override
- public void resume() {
- resume(EXTERNAL_INDEX);
- }
-}
diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/valve/EmptyValve.java b/common/reactive/src/main/java/io/helidon/common/reactive/valve/EmptyValve.java
deleted file mode 100644
index 113549c99df..00000000000
--- a/common/reactive/src/main/java/io/helidon/common/reactive/valve/EmptyValve.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.helidon.common.reactive.valve;
-
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-
-/**
- * Represents a Valve which is empty.
- *
- * For the performance sake the Valve accepts unlimited number of handlers.
- * Each complete handler is called as soon as registered.
- */
-class EmptyValve implements Valve {
-
- @Override
- public void handle(BiConsumer onData, Consumer onError, Runnable onComplete) {
- if (onComplete != null) {
- onComplete.run();
- }
- }
-
- @Override
- public void pause() {
- // No-op
- }
-
- @Override
- public void resume() {
- // No-op
- }
-}
diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/valve/InputStreamValve.java b/common/reactive/src/main/java/io/helidon/common/reactive/valve/InputStreamValve.java
deleted file mode 100644
index 3802fe4c4bd..00000000000
--- a/common/reactive/src/main/java/io/helidon/common/reactive/valve/InputStreamValve.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.helidon.common.reactive.valve;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.nio.charset.Charset;
-import java.util.concurrent.ExecutorService;
-import java.util.stream.Collector;
-import java.util.stream.Collectors;
-
-/**
- * The InputStreamValve is a {@link ByteBuffer} based {@link Valve} that transforms
- * a possibly blocking {@link InputStream} into the Valve.
- */
-public class InputStreamValve extends RetryingPausableRegistry implements Valve {
- private final InputStream stream;
- private final int bufferSize;
-
- InputStreamValve(InputStream stream, int bufferSize) {
- this.stream = stream;
- this.bufferSize = bufferSize;
- }
-
- @Override
- protected ByteBuffer moreData() throws Throwable {
- byte[] bytes = new byte[bufferSize];
-
- int len = stream.read(bytes);
- return len != -1 ? ByteBuffer.wrap(bytes, 0, len) : null;
- }
-
- static class InputStreamExecutorValve extends InputStreamValve {
-
- private final ExecutorService executorService;
-
- InputStreamExecutorValve(InputStream stream, int bufferSize, ExecutorService executorService) {
- super(stream, bufferSize);
- this.executorService = executorService;
- }
-
- @Override
- protected void tryProcess() {
- executorService.submit(() -> {
- super.tryProcess();
- });
- }
- }
-
- /**
- * A collector of {@link ByteBuffer} instances into a {@link String} of the provided
- * charset.
- *
- * @param charset the desired charset of the returned string
- * @return a string representation of the collected byte buffers
- */
- public static Collector byteBufferStringCollector(Charset charset) {
- return Collectors.collectingAndThen(byteBufferByteArrayCollector(), bytes -> new String(bytes, charset));
- }
-
- /**
- * A collector of {@link ByteBuffer} instances into a single {@link ByteBuffer} instance.
- *
- * @return a single byte buffer from the collected byte buffers
- */
- public static Collector byteBuffer2Collector() {
- return Collectors.collectingAndThen(byteBufferByteArrayCollector(), ByteBuffer::wrap);
- }
-
- /**
- * A collector of {@link ByteBuffer} instances into a single byte array.
- *
- * @return a single byte array from the collected byte buffers
- */
- public static Collector byteBufferByteArrayCollector() {
-
- return Collector.of(ByteArrayOutputStream::new,
- (stream, byteBuffer) -> {
- try {
- synchronized (stream) {
- WritableByteChannel channel = Channels.newChannel(stream);
- channel.write(byteBuffer);
- }
- } catch (IOException e) {
- // not expected to be thrown because we're operating in memory only
- throw new IllegalStateException("This exception is never expected.", e);
- }
- },
- (stream, stream2) -> {
- try {
- synchronized (stream) {
- stream2.writeTo(stream);
- }
- return stream;
- } catch (IOException e) {
- // not expected to be thrown because we're operating in memory only
- throw new IllegalStateException("This exception is never expected.", e);
- }
- },
- ByteArrayOutputStream::toByteArray);
- }
-
-}
diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/valve/PausableRegistry.java b/common/reactive/src/main/java/io/helidon/common/reactive/valve/PausableRegistry.java
deleted file mode 100644
index 3da2cbcf17e..00000000000
--- a/common/reactive/src/main/java/io/helidon/common/reactive/valve/PausableRegistry.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.helidon.common.reactive.valve;
-
-import java.util.Objects;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Supports {@link Valve} implementation by providing single handler registry
- * and pause / resume functionality with facility tryProcess reservation.
- */
-abstract class PausableRegistry implements Pausable {
-
- private static final Logger LOGGER = Logger.getLogger(PausableRegistry.class.getName());
-
- private final ReentrantLock lock = new ReentrantLock();
-
- private volatile BiConsumer onData;
- private volatile Consumer onError;
- private volatile Runnable onComplete;
-
- private volatile boolean paused = false;
- private boolean processing = false;
-
- @Override
- public void pause() {
- paused = true;
- }
-
- @Override
- public void resume() {
- paused = false;
- tryProcess();
- }
-
- /**
- * Implements item handling / processing. Implementation can use {@link #canProcess()} and {@link #canContinueProcessing()}
- * method to ensure, that processing is done by a single thread at a time.
- */
- protected abstract void tryProcess();
-
- public void handle(BiConsumer onData, Consumer onError, Runnable onComplete) {
- Objects.requireNonNull(onData, "Parameter onData is null!");
- synchronized (this) {
- if (this.onData != null) {
- throw new IllegalStateException("Handler is already registered!");
- }
- this.onData = onData;
- this.onError = onError;
- this.onComplete = onComplete;
- }
- resume();
- }
-
- /**
- * Implementation of {@link #tryProcess()} method should call this to reserve initial handle processing (if possible).
- * The same method should call {@link #canContinueProcessing()} before every iteration to be sure, that handle processing
- * should continue.
- *
- * @return {@code true} only if method can process (handle) item
- */
- protected boolean canProcess() {
- if (onData == null) {
- return false;
- }
- lock.lock();
- try {
- if (paused || processing) {
- return false;
- } else {
- processing = true;
- return true;
- }
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Implementation of {@link #tryProcess()} which initially was accepted by {@link #canProcess()} should call this method
- * before every iteration to be sure, that processing can continue (is not paused).
- *
- * @return {@code true} only if method can continue with handle processing
- */
- protected boolean canContinueProcessing() {
- if (paused) {
- lock.lock();
- try {
- processing = false;
- } finally {
- lock.unlock();
- }
- return false;
- } else {
- return true;
- }
- }
-
- protected boolean paused() {
- return paused;
- }
-
- protected void releaseProcessing() {
- lock.lock();
- try {
- processing = false;
- } finally {
- lock.unlock();
- }
- }
-
- protected void handleError(Throwable thr) {
- if (onError != null) {
- onError.accept(thr);
- } else {
- LOGGER.log(Level.WARNING, "Unhandled throwable!", thr);
- }
- }
-
- protected BiConsumer getOnData() {
- return onData;
- }
-
- protected Consumer getOnError() {
- return onError;
- }
-
- protected Runnable getOnComplete() {
- return onComplete;
- }
-}
diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/valve/PublisherValve.java b/common/reactive/src/main/java/io/helidon/common/reactive/valve/PublisherValve.java
deleted file mode 100644
index dd108681d51..00000000000
--- a/common/reactive/src/main/java/io/helidon/common/reactive/valve/PublisherValve.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.helidon.common.reactive.valve;
-
-import java.util.Objects;
-import java.util.concurrent.Flow;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * The {@link Valve} implementation on top of {@link java.util.concurrent.Flow.Publisher}.
- *
- * @param Type of {@code Valve} and {@code Publisher} items
- */
-class PublisherValve implements Valve {
-
- private static final Logger LOGGER = Logger.getLogger(PublisherValve.class.getName());
-
- private final ReentrantLock lock = new ReentrantLock();
- private final Flow.Publisher publisher;
-
- private volatile Subscriber subscriber;
- private volatile boolean paused = false;
-
- private boolean recordedDemand = false;
-
- /**
- * Creates new instance.
- *
- * @param publisher a publisher as a base of this {@code Valve}
- */
- PublisherValve(Flow.Publisher publisher) {
- Objects.requireNonNull(publisher, "Parameter 'publisher' is null!");
- this.publisher = publisher;
- }
-
- @Override
- public void handle(BiConsumer onData, Consumer onError, Runnable onComplete) {
- synchronized (this) {
- if (this.subscriber != null) {
- throw new IllegalStateException("Handler is already registered!");
- }
- this.subscriber = new Subscriber(onData, onError, onComplete);
- }
- this.paused = false;
- publisher.subscribe(this.subscriber);
- }
-
- @Override
- public void pause() {
- lock.lock();
- try {
- this.paused = true;
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public void resume() {
- boolean processDemand = false;
- lock.lock();
- try {
- if (paused && subscriber != null) {
- paused = false;
- if (recordedDemand) {
- processDemand = true;
- recordedDemand = false;
- }
- }
- } finally {
- lock.unlock();
- if (processDemand) {
- subscriber.subscription.request(1);
- }
- }
- }
-
- private boolean recordDemand() {
- lock.lock();
- try {
- if (paused) {
- this.recordedDemand = true;
- return true;
- } else {
- return false;
- }
- } finally {
- lock.unlock();
- }
- }
-
- private class Subscriber implements Flow.Subscriber {
-
- private final BiConsumer onData;
- private final Consumer onError;
- private final Runnable onComplete;
-
- private volatile Flow.Subscription subscription;
-
- Subscriber(BiConsumer onData, Consumer onError, Runnable onComplete) {
- Objects.requireNonNull(onData, "Parameter 'onData' is null!");
- this.onData = onData;
- this.onError = onError;
- this.onComplete = onComplete;
- }
-
- @Override
- public void onSubscribe(Flow.Subscription subscription) {
- this.subscription = subscription;
- subscription.request(1);
- }
-
- @Override
- public void onNext(T item) {
- onData.accept(item, PublisherValve.this);
- if (!paused || !recordDemand()) {
- subscription.request(1);
- }
- }
-
- @Override
- public void onError(Throwable throwable) {
- if (onError != null) {
- onError.accept(throwable);
- } else {
- LOGGER.log(Level.WARNING, "Unhandled throwable!", throwable);
- }
- }
-
- @Override
- public void onComplete() {
- if (onComplete != null) {
- onComplete.run();
- }
- }
- }
-}
diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/valve/RetryingPausableRegistry.java b/common/reactive/src/main/java/io/helidon/common/reactive/valve/RetryingPausableRegistry.java
deleted file mode 100644
index f6064557485..00000000000
--- a/common/reactive/src/main/java/io/helidon/common/reactive/valve/RetryingPausableRegistry.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.helidon.common.reactive.valve;
-
-import java.util.function.BiConsumer;
-
-/**
- * The RetryingPausableRegistry.
- */
-abstract class RetryingPausableRegistry extends PausableRegistry {
- @Override
- protected void tryProcess() {
- if (canProcess()) {
- try {
- BiConsumer onData = getOnData();
- boolean breakByPause = false;
- T data;
- while ((data = moreData()) != null) {
- onData.accept(data, this);
- if (!canContinueProcessing()) {
- breakByPause = true;
- break;
- }
- }
- if (!breakByPause && getOnComplete() != null) {
- getOnComplete().run();
- }
- } catch (Throwable thr) {
- handleError(thr);
- releaseProcessing();
- }
- }
- }
-
- protected abstract T moreData() throws Throwable;
-}
diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/valve/Tank.java b/common/reactive/src/main/java/io/helidon/common/reactive/valve/Tank.java
deleted file mode 100644
index 0ea84e6431a..00000000000
--- a/common/reactive/src/main/java/io/helidon/common/reactive/valve/Tank.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.helidon.common.reactive.valve;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Objects;
-import java.util.Queue;
-import java.util.Spliterator;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.function.Predicate;
-
-/**
- * Tank of events is a closeable FIFO queue with a limited size implementing {@link Valve} reactive API.
- *
- * @param a type of items produced by {@code Valve} API
- */
-public class Tank implements Valve, BlockingQueue, AutoCloseable {
-
- private final int capacity;
- private final CloseableSupport closeableSupport = new CloseableSupport();
- private final Queue drainHandlers = new LinkedBlockingDeque<>();
- private final PausableRegistry registry = new PausableRegistry() {
- @Override
- protected void tryProcess() {
- Tank.this.tryProcess();
- }
- };
- private final ThreadLocal inDrainHandler = ThreadLocal.withInitial(() -> Boolean.FALSE);
- private final ArrayBlockingQueue queue;
-
- /**
- * Creates new instance.
- *
- * @param capacity the capacity of this queue
- */
- public Tank(int capacity) {
- this.capacity = capacity;
- queue = new ArrayBlockingQueue<>(capacity, true);
- }
-
- /**
- * Provided handler is called a single time when internal capacity is maximally half full and instance is not closed.
- *
- * @param drainHandler an handler of drain event
- * @throws NullPointerException if {@code drainHandler} is {@code null}
- */
- public void whenDrain(Runnable drainHandler) {
- Objects.requireNonNull(drainHandler, "Parameter 'drainHandler' is null!");
- checkClosed();
- if (!inDrainHandler.get() && remainingCapacity() >= (capacity / 2)) {
- inDrainHandler.set(true);
- try {
- drainHandler.run();
- } finally {
- inDrainHandler.set(false);
- }
- } else {
- drainHandlers.add(drainHandler);
- }
- }
-
- // ----- Valve implementation
-
- @Override
- public void pause() {
- registry.pause();
- }
-
- @Override
- public void resume() {
- registry.resume();
- }
-
- @Override
- public void handle(BiConsumer onData, Consumer onError, Runnable onComplete) {
- registry.handle(onData, onError, onComplete);
- }
-
- private void tryProcess() {
- if (registry.canProcess()) {
- boolean breakByPause = false;
- try {
- BiConsumer onData = registry.getOnData();
- T t;
- while ((t = poll()) != null) {
- onData.accept(t, this);
- if (registry.paused()) {
- breakByPause = true;
- break;
- }
- }
- } catch (Exception e) {
- registry.handleError(e);
- } finally {
- if (!breakByPause && closeableSupport.closed()) {
- // Handle close
- Runnable onComplete = registry.getOnComplete();
- if (onComplete != null) {
- onComplete.run();
- }
- }
- registry.releaseProcessing();
- }
- processDrainHandlers();
- }
- }
-
- private void processDrainHandlers() {
- while (!inDrainHandler.get() && !closeableSupport.closed() && remainingCapacity() >= (capacity / 2)) {
- Runnable hndlr = drainHandlers.poll();
- if (hndlr != null) {
- inDrainHandler.set(true);
- try {
- hndlr.run();
- } finally {
- inDrainHandler.set(false);
- }
- } else {
- break;
- }
- }
- }
-
- // ----- AutoCloseable
-
- @Override
- public void close() {
- closeableSupport.close();
- tryProcess();
- }
-
- private void checkClosed() {
- if (closeableSupport.closed()) {
- throw new IllegalStateException("Tank instance is closed!");
- }
- }
-
- // ----- Insert methods
-
- @Override
- public boolean add(T t) {
- checkClosed();
- boolean result = queue.add(t);
- tryProcess();
- return result;
- }
-
- @Override
- public boolean addAll(Collection extends T> c) {
- checkClosed();
- boolean result = queue.addAll(c);
- tryProcess();
- return result;
- }
-
- @Override
- public boolean offer(T t) {
- if (closeableSupport.closed()) {
- return false;
- }
- boolean result = queue.offer(t);
- if (result) {
- tryProcess();
- }
- return result;
- }
-
- /**
- * Inserts the specified element at the tail of this queue, waiting
- * for space to become available if the queue is full.
- *
- * @throws InterruptedException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
- * @throws IllegalArgumentException if Tank is closed
- */
- @Override
- public void put(T t) throws InterruptedException {
- checkClosed();
- queue.put(t);
- tryProcess();
- }
-
- @Override
- public boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {
- if (closeableSupport.closed()) {
- return false;
- }
- boolean result = queue.offer(t, timeout, unit);
- if (result) {
- tryProcess();
- }
- return result;
- }
-
- // ----- Remove methods
-
- @Override
- public void clear() {
- queue.clear();
- }
-
- @Override
- public T poll() {
- T t = queue.poll();
- if (t != null) {
- processDrainHandlers();
- }
- return t;
- }
-
- @Override
- public T take() throws InterruptedException {
- T t = queue.take();
- processDrainHandlers();
- return t;
- }
-
- @Override
- public T poll(long timeout, TimeUnit unit) throws InterruptedException {
- T t = queue.poll(timeout, unit);
- if (t != null) {
- processDrainHandlers();
- }
- return t;
- }
-
- @Override
- public boolean remove(Object o) {
- boolean result = queue.remove(o);
- if (result) {
- processDrainHandlers();
- }
- return result;
- }
-
- @Override
- public int drainTo(Collection super T> c) {
- int result = queue.drainTo(c);
- if (result > 0) {
- processDrainHandlers();
- }
- return result;
- }
-
- @Override
- public int drainTo(Collection super T> c, int maxElements) {
- int result = queue.drainTo(c, maxElements);
- if (result > 0) {
- processDrainHandlers();
- }
- return result;
- }
-
- @Override
- public boolean removeIf(Predicate super T> filter) {
- boolean result = queue.removeIf(filter);
- if (result) {
- processDrainHandlers();
- }
- return result;
- }
-
- @Override
- public boolean removeAll(Collection> c) {
- boolean result = queue.removeAll(c);
- if (result) {
- processDrainHandlers();
- }
- return result;
- }
-
- @Override
- public boolean retainAll(Collection> c) {
- boolean result = queue.retainAll(c);
- if (result) {
- processDrainHandlers();
- }
- return result;
- }
-
- @Override
- public T remove() {
- T t = queue.remove();
- if (t != null) {
- processDrainHandlers();
- }
- return t;
- }
-
- // ----- Query methods (delegated only)
-
- @Override
- public T element() {
- return queue.element();
- }
-
- @Override
- public T peek() {
- return queue.peek();
- }
-
- @Override
- public int size() {
- return queue.size();
- }
-
- @Override
- public boolean isEmpty() {
- return queue.isEmpty();
- }
-
- @Override
- public int remainingCapacity() {
- return queue.remainingCapacity();
- }
-
- @Override
- public boolean containsAll(Collection> c) {
- return false;
- }
-
- @Override
- public boolean contains(Object o) {
- return queue.contains(o);
- }
-
- @Override
- public Object[] toArray() {
- return queue.toArray();
- }
-
- @Override
- public