Skip to content

Commit

Permalink
RATIS-1936. Make the raft request configurable in Streaming (apache#968)
Browse files Browse the repository at this point in the history
  • Loading branch information
guohao-rosicky authored Nov 20, 2023
1 parent 4cae595 commit 4692f3d
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.ratis.client.impl;

import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.AsyncRpcApi;
import org.apache.ratis.client.DataStreamClient;
import org.apache.ratis.client.DataStreamClientRpc;
Expand Down Expand Up @@ -46,6 +47,8 @@
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SlidingWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -61,16 +64,20 @@
* allows client to create streams and send asynchronously.
*/
public class DataStreamClientImpl implements DataStreamClient {
public static final Logger LOG = LoggerFactory.getLogger(DataStreamClientImpl.class);

private final RaftClient client;
private final ClientId clientId;
private final RaftGroupId groupId;

private final RaftPeer dataStreamServer;
private final DataStreamClientRpc dataStreamClientRpc;
private final OrderedStreamAsync orderedStreamAsync;
private final boolean skipSendForward;

DataStreamClientImpl(ClientId clientId, RaftGroupId groupId, RaftPeer dataStreamServer,
DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
this.skipSendForward = RaftConfigKeys.DataStream.skipSendForward(properties, LOG::info);
this.client = null;
this.clientId = clientId;
this.groupId = groupId;
Expand All @@ -81,6 +88,7 @@ public class DataStreamClientImpl implements DataStreamClient {

DataStreamClientImpl(RaftClient client, RaftPeer dataStreamServer,
DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
this.skipSendForward = RaftConfigKeys.DataStream.skipSendForward(properties, LOG::info);
this.client = client;
this.clientId = client.getId();
this.groupId = client.getGroupId();
Expand Down Expand Up @@ -146,7 +154,11 @@ private CompletableFuture<DataStreamReply> writeAsyncImpl(Object data, long leng
}
final CompletableFuture<DataStreamReply> f = combineHeader(send(Type.STREAM_DATA, data, length, options));
if (WriteOption.containsOption(options, StandardWriteOption.CLOSE)) {
closeFuture = client != null? f.thenCompose(this::sendForward): f;
if (skipSendForward) {
closeFuture = f;
} else {
closeFuture = client != null? f.thenCompose(this::sendForward): f;
}
closeFuture.thenApply(ClientProtoUtils::getRaftClientReply)
.whenComplete(JavaUtils.asBiConsumer(raftClientReplyFuture));
}
Expand Down
13 changes: 13 additions & 0 deletions ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.ratis;

import static org.apache.ratis.conf.ConfUtils.get;
import static org.apache.ratis.conf.ConfUtils.getBoolean;
import static org.apache.ratis.conf.ConfUtils.printAll;
import static org.apache.ratis.conf.ConfUtils.set;
import static org.apache.ratis.conf.ConfUtils.setBoolean;

import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.SupportedDataStreamType;
Expand Down Expand Up @@ -61,6 +63,17 @@ static SupportedDataStreamType type(RaftProperties properties, Consumer<String>
static void setType(RaftProperties properties, SupportedDataStreamType type) {
set(properties::set, TYPE_KEY, type.name());
}

String SKIP_SEND_FORWARD_KEY = PREFIX + ".skip.send-forward";
boolean SKIP_SEND_FORWARD_DEFAULT = false;

static boolean skipSendForward(RaftProperties properties, Consumer<String> logger) {
return getBoolean(properties::getBoolean, SKIP_SEND_FORWARD_KEY, SKIP_SEND_FORWARD_DEFAULT, logger);
}

static void setSkipSendForward(RaftProperties properties, boolean skipSendForward) {
setBoolean(properties::setBoolean, SKIP_SEND_FORWARD_KEY, skipSendForward);
}
}

static void main(String[] args) {
Expand Down

0 comments on commit 4692f3d

Please sign in to comment.