Skip to content

Commit

Permalink
RATIS-1550. Rewrite stream client reply queue. (#740)
Browse files Browse the repository at this point in the history
  • Loading branch information
guohao-rosicky authored Aug 2, 2023
1 parent 0b79689 commit 9039959
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,12 @@
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.DataStreamRequestHeader;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.SlidingWindow;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -105,15 +101,10 @@ public String toString() {
private final DataStreamClientRpc dataStreamClientRpc;

private final Semaphore requestSemaphore;
private final TimeDuration requestTimeout;
private final TimeDuration closeTimeout;
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();

OrderedStreamAsync(DataStreamClientRpc dataStreamClientRpc, RaftProperties properties){
this.dataStreamClientRpc = dataStreamClientRpc;
this.requestSemaphore = new Semaphore(RaftClientConfigKeys.DataStream.outstandingRequestsMax(properties));
this.requestTimeout = RaftClientConfigKeys.DataStream.requestTimeout(properties);
this.closeTimeout = requestTimeout.multiply(2);
}

CompletableFuture<DataStreamReply> sendRequest(DataStreamRequestHeader header, Object data,
Expand Down Expand Up @@ -149,9 +140,6 @@ private void sendRequestToNetwork(DataStreamWindowRequest request,
request.getDataStreamRequest());
long seqNum = request.getSeqNum();

final boolean isClose = request.getDataStreamRequest().getWriteOptionList().contains(StandardWriteOption.CLOSE);
scheduleWithTimeout(request, isClose? closeTimeout: requestTimeout);

requestFuture.thenApply(reply -> {
slidingWindow.receiveReply(
seqNum, reply, r -> sendRequestToNetwork(r, slidingWindow));
Expand All @@ -166,13 +154,4 @@ private void sendRequestToNetwork(DataStreamWindowRequest request,
return null;
});
}

private void scheduleWithTimeout(DataStreamWindowRequest request, TimeDuration timeout) {
scheduler.onTimeout(timeout, () -> {
if (!request.getReplyFuture().isDone()) {
request.getReplyFuture().completeExceptionally(
new TimeoutIOException("Timeout " + timeout + ": Failed to send " + request));
}
}, LOG, () -> "Failed to completeExceptionally for " + request);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ratis.netty.client;

import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamPacket;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.thirdparty.io.netty.util.concurrent.ScheduledFuture;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;

public class NettyClientReplies {
public static final Logger LOG = LoggerFactory.getLogger(NettyClientReplies.class);

private final ConcurrentMap<ClientInvocationId, ReplyMap> replies = new ConcurrentHashMap<>();

ReplyMap getReplyMap(ClientInvocationId clientInvocationId) {
final MemoizedSupplier<ReplyMap> q = MemoizedSupplier.valueOf(() -> new ReplyMap(clientInvocationId));
return replies.computeIfAbsent(clientInvocationId, key -> q.get());
}

class ReplyMap {
private final ClientInvocationId clientInvocationId;
private final Map<RequestEntry, ReplyEntry> map = new ConcurrentHashMap<>();

ReplyMap(ClientInvocationId clientInvocationId) {
this.clientInvocationId = clientInvocationId;
}

ReplyEntry submitRequest(RequestEntry requestEntry, boolean isClose, CompletableFuture<DataStreamReply> f) {
LOG.debug("put {} to the map for {}", requestEntry, clientInvocationId);
final MemoizedSupplier<ReplyEntry> replySupplier = MemoizedSupplier.valueOf(() -> new ReplyEntry(isClose, f));
return map.computeIfAbsent(requestEntry, r -> replySupplier.get());
}

void receiveReply(DataStreamReply reply) {
final RequestEntry requestEntry = new RequestEntry(reply);
final ReplyEntry replyEntry = map.remove(requestEntry);
LOG.debug("remove: {}; replyEntry: {}; reply: {}", requestEntry, replyEntry, reply);
if (replyEntry == null) {
LOG.debug("Request not found: {}", this);
return;
}
replyEntry.complete(reply);
if (!reply.isSuccess()) {
failAll("a request failed with " + reply);
} else if (replyEntry.isClosed()) { // stream closed clean up reply map
removeThisMap();
}
}

private void removeThisMap() {
final ReplyMap removed = replies.remove(clientInvocationId);
Preconditions.assertSame(removed, this, "removed");
}

void completeExceptionally(Throwable e) {
removeThisMap();
for (ReplyEntry entry : map.values()) {
entry.completeExceptionally(e);
}
map.clear();
}

private void failAll(String message) {
completeExceptionally(new IllegalStateException(this + ": " + message));
}

void fail(RequestEntry requestEntry) {
map.remove(requestEntry);
failAll(requestEntry + " failed ");
}

@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
for (RequestEntry requestEntry : map.keySet()) {
builder.append(requestEntry).append(", ");
}
return builder.toString();
}
}

static class RequestEntry {
private final long streamOffset;
private final Type type;

RequestEntry(DataStreamPacket packet) {
this.streamOffset = packet.getStreamOffset();
this.type = packet.getType();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final RequestEntry that = (RequestEntry) o;
return streamOffset == that.streamOffset
&& type == that.type;
}

@Override
public int hashCode() {
return Objects.hash(type, streamOffset);
}

@Override
public String toString() {
return "Request{" +
"streamOffset=" + streamOffset +
", type=" + type +
'}';
}
}

static class ReplyEntry {
private final boolean isClosed;
private final CompletableFuture<DataStreamReply> replyFuture;
private final AtomicReference<ScheduledFuture<?>> timeoutFuture = new AtomicReference<>();

ReplyEntry(boolean isClosed, CompletableFuture<DataStreamReply> replyFuture) {
this.isClosed = isClosed;
this.replyFuture = replyFuture;
}

boolean isClosed() {
return isClosed;
}

void complete(DataStreamReply reply) {
cancelTimeoutFuture();
replyFuture.complete(reply);
}

void completeExceptionally(Throwable t) {
cancelTimeoutFuture();
replyFuture.completeExceptionally(t);
}

private void cancelTimeoutFuture() {
Optional.ofNullable(timeoutFuture.get()).ifPresent(f -> f.cancel(false));
}

void setTimeoutFuture(ScheduledFuture<?> timeoutFuture) {
this.timeoutFuture.compareAndSet(null, timeoutFuture);
}
}
}
Loading

0 comments on commit 9039959

Please sign in to comment.