diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java index 353f532c6f..d184eb2ab1 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.client.impl; +import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.client.AsyncRpcApi; import org.apache.ratis.client.DataStreamClient; import org.apache.ratis.client.DataStreamClientRpc; @@ -46,6 +47,8 @@ import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.SlidingWindow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; @@ -61,6 +64,8 @@ * allows client to create streams and send asynchronously. */ public class DataStreamClientImpl implements DataStreamClient { + public static final Logger LOG = LoggerFactory.getLogger(DataStreamClientImpl.class); + private final RaftClient client; private final ClientId clientId; private final RaftGroupId groupId; @@ -68,9 +73,11 @@ public class DataStreamClientImpl implements DataStreamClient { private final RaftPeer dataStreamServer; private final DataStreamClientRpc dataStreamClientRpc; private final OrderedStreamAsync orderedStreamAsync; + private final boolean skipSendForward; DataStreamClientImpl(ClientId clientId, RaftGroupId groupId, RaftPeer dataStreamServer, DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) { + this.skipSendForward = RaftConfigKeys.DataStream.skipSendForward(properties, LOG::info); this.client = null; this.clientId = clientId; this.groupId = groupId; @@ -81,6 +88,7 @@ public class DataStreamClientImpl implements DataStreamClient { DataStreamClientImpl(RaftClient client, RaftPeer dataStreamServer, DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) { + this.skipSendForward = RaftConfigKeys.DataStream.skipSendForward(properties, LOG::info); this.client = client; this.clientId = client.getId(); this.groupId = client.getGroupId(); @@ -146,7 +154,11 @@ private CompletableFuture writeAsyncImpl(Object data, long leng } final CompletableFuture f = combineHeader(send(Type.STREAM_DATA, data, length, options)); if (WriteOption.containsOption(options, StandardWriteOption.CLOSE)) { - closeFuture = client != null? f.thenCompose(this::sendForward): f; + if (skipSendForward) { + closeFuture = f; + } else { + closeFuture = client != null? f.thenCompose(this::sendForward): f; + } closeFuture.thenApply(ClientProtoUtils::getRaftClientReply) .whenComplete(JavaUtils.asBiConsumer(raftClientReplyFuture)); } diff --git a/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java b/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java index 7c14a479f2..a3f40ce916 100644 --- a/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java +++ b/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java @@ -18,8 +18,10 @@ package org.apache.ratis; import static org.apache.ratis.conf.ConfUtils.get; +import static org.apache.ratis.conf.ConfUtils.getBoolean; import static org.apache.ratis.conf.ConfUtils.printAll; import static org.apache.ratis.conf.ConfUtils.set; +import static org.apache.ratis.conf.ConfUtils.setBoolean; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.datastream.SupportedDataStreamType; @@ -61,6 +63,17 @@ static SupportedDataStreamType type(RaftProperties properties, Consumer static void setType(RaftProperties properties, SupportedDataStreamType type) { set(properties::set, TYPE_KEY, type.name()); } + + String SKIP_SEND_FORWARD_KEY = PREFIX + ".skip.send-forward"; + boolean SKIP_SEND_FORWARD_DEFAULT = false; + + static boolean skipSendForward(RaftProperties properties, Consumer logger) { + return getBoolean(properties::getBoolean, SKIP_SEND_FORWARD_KEY, SKIP_SEND_FORWARD_DEFAULT, logger); + } + + static void setSkipSendForward(RaftProperties properties, boolean skipSendForward) { + setBoolean(properties::setBoolean, SKIP_SEND_FORWARD_KEY, skipSendForward); + } } static void main(String[] args) {