Skip to content

Commit

Permalink
Add FLUSH.
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed Dec 3, 2023
1 parent cb99e96 commit d82ea56
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,10 @@ default CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, WriteOptio


/**
* The same as writeAsync(src, 0, src.length(), sync_default).
* where sync_default depends on the underlying implementation.
* The same as writeAsync(src, 0, src.length(), options).
*/
default CompletableFuture<DataStreamReply> writeAsync(File src) {
return writeAsync(src, 0, src.length());
default CompletableFuture<DataStreamReply> writeAsync(File src, WriteOption... options) {
return writeAsync(src, 0, src.length(), options);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public final class DataStreamOutputImpl implements DataStreamOutputRpc {
@Override
public int write(ByteBuffer src) throws IOException {
final int remaining = src.remaining();
final DataStreamReply reply = IOUtils.getFromFuture(writeAsync(src),
// flush each call; otherwise the future will not be completed.
final DataStreamReply reply = IOUtils.getFromFuture(writeAsync(src, StandardWriteOption.FLUSH),
() -> "write(" + remaining + " bytes for " + ClientInvocationId.valueOf(header) + ")");
return Math.toIntExact(reply.getBytesWritten());
}
Expand All @@ -134,7 +135,9 @@ private DataStreamOutputImpl(RaftClientRequest request) {
this.header = request;
this.slidingWindow = new SlidingWindow.Client<>(ClientInvocationId.valueOf(clientId, header.getCallId()));
final ByteBuffer buffer = ClientProtoUtils.toRaftClientRequestProtoByteBuffer(header);
this.headerFuture = send(Type.STREAM_HEADER, buffer, buffer.remaining(), Collections.emptyList());
// TODO: RATIS-1938: In order not to auto-flush the header, remove the FLUSH below.
this.headerFuture = send(Type.STREAM_HEADER, buffer, buffer.remaining(),
Collections.singleton(StandardWriteOption.FLUSH));
}
private CompletableFuture<DataStreamReply> send(Type type, Object data, long length,
Iterable<WriteOption> options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
package org.apache.ratis.io;

public enum StandardWriteOption implements WriteOption {
/** Sync the data to the underlying storage. */
/**
* Sync the data to the underlying storage.
* Note that SYNC does not imply {@link #FLUSH}.
*/
SYNC,
/** Close the data to the underlying storage. */
CLOSE,
/** Flush the data out to the network. */
/** Flush the data out from the buffer. */
FLUSH,
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.ratis.io.WriteOption;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
import org.apache.ratis.util.CollectionUtils;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -41,7 +41,7 @@ public DataStreamRequestHeader(ClientId clientId, Type type, long streamId, long
public DataStreamRequestHeader(ClientId clientId, Type type, long streamId, long streamOffset, long dataLength,
Iterable<WriteOption> options) {
super(clientId, type, streamId, streamOffset, dataLength);
this.options = Collections.unmodifiableList(Lists.newArrayList(options));
this.options = Collections.unmodifiableList(CollectionUtils.distinct(options));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,9 @@ static <V> boolean equalsIgnoreOrder(List<V> left, List<V> right, Comparator<V>
right.sort(comparator);
return left.equals(right);
}

/** @return a list the distinct elements. */
static <V> List<V> distinct(Iterable<V> elements) {
return StreamSupport.stream(elements.spliterator(), false).distinct().collect(Collectors.toList());
}
}
12 changes: 10 additions & 2 deletions ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.ratis.util;

import org.apache.ratis.util.function.CheckedFunction;
import org.apache.ratis.util.function.CheckedRunnable;
import org.apache.ratis.util.function.CheckedSupplier;
import org.slf4j.Logger;
Expand Down Expand Up @@ -208,13 +209,20 @@ static <RETURN, THROWABLE extends Throwable> RETURN attempt(
CheckedSupplier<RETURN, THROWABLE> supplier,
int numAttempts, TimeDuration sleepTime, Supplier<?> name, Logger log)
throws THROWABLE, InterruptedException {
Objects.requireNonNull(supplier, "supplier == null");
return attempt(i -> supplier.get(), numAttempts, sleepTime, name, log);
}

static <RETURN, THROWABLE extends Throwable> RETURN attempt(
CheckedFunction<Integer, RETURN, THROWABLE> attemptMethod,
int numAttempts, TimeDuration sleepTime, Supplier<?> name, Logger log)
throws THROWABLE, InterruptedException {
Objects.requireNonNull(attemptMethod, "attemptMethod == null");
Preconditions.assertTrue(numAttempts > 0, () -> "numAttempts = " + numAttempts + " <= 0");
Preconditions.assertTrue(!sleepTime.isNegative(), () -> "sleepTime = " + sleepTime + " < 0");

for(int i = 1; i <= numAttempts; i++) {
try {
return supplier.get();
return attemptMethod.apply(i);
} catch (Throwable t) {
if (i == numAttempts) {
throw t;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,37 @@
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;

public interface NettyDataStreamUtils {
Logger LOG = LoggerFactory.getLogger(NettyDataStreamUtils.class);

static DataStreamPacketHeaderProto.Option getOption(WriteOption option) {
if (option == StandardWriteOption.FLUSH) {
// FLUSH is a local option which should not be included in the header.
return null;
} else if (option instanceof StandardWriteOption) {
return DataStreamPacketHeaderProto.Option.forNumber(((StandardWriteOption) option).ordinal());
}
throw new IllegalArgumentException("Unexpected WriteOption " + option);
}

static ByteBuffer getDataStreamRequestHeaderProtoByteBuffer(DataStreamRequest request) {
DataStreamPacketHeaderProto.Builder b = DataStreamPacketHeaderProto
.newBuilder()
final DataStreamPacketHeaderProto.Builder b = DataStreamPacketHeaderProto.newBuilder()
.setClientId(request.getClientId().toByteString())
.setStreamId(request.getStreamId())
.setStreamOffset(request.getStreamOffset())
.setType(request.getType())
.setDataLength(request.getDataLength());
for (WriteOption option : request.getWriteOptionList()) {
b.addOptions(DataStreamPacketHeaderProto.Option.forNumber(
((StandardWriteOption) option).ordinal()));
}

request.getWriteOptionList().stream()
.map(NettyDataStreamUtils::getOption)
.filter(Objects::nonNull)
.forEach(b::addOptions);

return DataStreamRequestHeaderProto
.newBuilder()
.setPacketHeader(b)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.netty.NettyUtils;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
Expand Down Expand Up @@ -236,6 +237,8 @@ void close() {
if (previous != null && previous.isInitialized()) {
// wait channel closed, do shutdown workerGroup
previous.get().channel().close().addListener(future -> workerGroup.shutdownGracefully());
} else {
workerGroup.shutdownGracefully();
}
}

Expand Down Expand Up @@ -280,8 +283,8 @@ synchronized boolean shouldFlush(int countMin, SizeInBytes bytesMin, DataStreamR
}

final boolean flush = shouldFlush(options, countMin, bytesMin);
LOG.debug("flush? {}, (count, bytes)=({}, {}), min=({}, {}), request={}",
flush, count, bytes, countMin, bytesMin, request);
LOG.debug("flush? {}, (count, bytes)=({}, {}), min=({}, {}), request={}, options={}",
flush, count, bytes, countMin, bytesMin, request, options);
if (flush) {
count = 0;
bytes = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class DataStreamManagement {
public static final Logger LOG = LoggerFactory.getLogger(DataStreamManagement.class);
Expand Down Expand Up @@ -120,10 +121,18 @@ static class RemoteStream {
this.out = out;
}

static Iterable<WriteOption> addFlush(List<WriteOption> original) {
if (original.contains(StandardWriteOption.FLUSH)) {
return original;
}
return Stream.concat(Stream.of(StandardWriteOption.FLUSH), original.stream())
.collect(Collectors.toList());
}

CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request, Executor executor) {
final Timekeeper.Context context = metrics.start();
return composeAsync(sendFuture, executor,
n -> out.writeAsync(request.slice().nioBuffer(), request.getWriteOptionList())
n -> out.writeAsync(request.slice().nioBuffer(), addFlush(request.getWriteOptionList()))
.whenComplete((l, e) -> metrics.stop(context, e == null)));
}
}
Expand Down
17 changes: 11 additions & 6 deletions ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,18 @@ static RaftServer.Division waitForLeader(MiniRaftCluster cluster, RaftGroupId gr
exception.set(ise);
};

final RaftServer.Division leader = JavaUtils.attemptRepeatedly(() -> {
final RaftServer.Division l = cluster.getLeader(groupId, handleNoLeaders, handleMultipleLeaders);
if (l != null && !l.getInfo().isLeaderReady()) {
throw new IllegalStateException("Leader: "+ l.getMemberId() + " not ready");
final RaftServer.Division leader = JavaUtils.attempt(i -> {
try {
final RaftServer.Division l = cluster.getLeader(groupId, handleNoLeaders, handleMultipleLeaders);
if (l != null && !l.getInfo().isLeaderReady()) {
throw new IllegalStateException("Leader: " + l.getMemberId() + " not ready");
}
return l;
} catch (Exception e) {
LOG.warn("Attempt #{} failed: " + e, i);
throw e;
}
return l;
}, numAttempts, sleepTime, name, LOG);
}, numAttempts, sleepTime, () -> name, null);

LOG.info(cluster.printServers(groupId));
if (expectLeader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedBiFunction;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.event.Level;
Expand All @@ -50,15 +51,23 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste
extends DataStreamClusterTests<CLUSTER> {
final Executor executor = Executors.newFixedThreadPool(16);

{
Slf4jUtils.setLogLevel(NettyClientStreamRpc.LOG, Level.TRACE);
}

@Override
public int getGlobalTimeoutSeconds() {
return 300;
}

@Test
public void testSingleStreamsMultipleServers() throws Exception {
Slf4jUtils.setLogLevel(NettyClientStreamRpc.LOG, Level.TRACE);
try {
runWithNewCluster(3,
cluster -> runTestDataStream(cluster, false,
(c, stepDownLeader) -> runTestDataStream(c, 1, 1, 1_000, 3, stepDownLeader)));
} finally {
Slf4jUtils.setLogLevel(NettyClientStreamRpc.LOG, Level.INFO);
}
}

@Test
public void testMultipleStreamsSingleServer() throws Exception {
runWithNewCluster(1, this::runTestDataStream);
Expand All @@ -85,34 +94,37 @@ public void testMultipleStreamsMultipleServersStepDownLeader() throws Exception
}

void runTestDataStreamStepDownLeader(CLUSTER cluster) throws Exception {
runTestDataStream(cluster, true);
runMultipleStreams(cluster, true);
}

void runTestDataStream(CLUSTER cluster) throws Exception {
runTestDataStream(cluster, false);
runTestDataStream(cluster, false, this::runMultipleStreams);
}

void runTestDataStream(CLUSTER cluster, boolean stepDownLeader) throws Exception {
RaftTestUtil.waitForLeader(cluster);

long runMultipleStreams(CLUSTER cluster, boolean stepDownLeader) {
final List<CompletableFuture<Long>> futures = new ArrayList<>();
futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 5, 10, 100_000, 10, stepDownLeader), executor));
futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 2, 20, 1_000, 5_000, stepDownLeader), executor));
final long maxIndex = futures.stream()
return futures.stream()
.map(CompletableFuture::join)
.max(Long::compareTo)
.orElseThrow(IllegalStateException::new);
}

void runTestDataStream(CLUSTER cluster, boolean stepDownLeader, CheckedBiFunction<CLUSTER, Boolean, Long, Exception> runMethod) throws Exception {
RaftTestUtil.waitForLeader(cluster);

final long maxIndex = runMethod.apply(cluster, stepDownLeader);

if (stepDownLeader) {
final RaftPeerId oldLeader = cluster.getLeader().getId();
final CompletableFuture<RaftPeerId> changeLeader = futures.get(0).thenApplyAsync(dummy -> {
try {
return RaftTestUtil.changeLeader(cluster, oldLeader);
} catch (Exception e) {
throw new CompletionException("Failed to change leader from " + oldLeader, e);
}
});
LOG.info("Changed leader from {} to {}", oldLeader, changeLeader.join());
final RaftPeerId changed;
try {
changed = RaftTestUtil.changeLeader(cluster, oldLeader);
} catch (Exception e) {
throw new CompletionException("Failed to change leader from " + oldLeader, e);
}
LOG.info("Changed leader from {} to {}", oldLeader, changed);
}

// wait for all servers to catch up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ratis.datastream;

import org.apache.ratis.BaseTest;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.server.impl.MiniRaftCluster;
Expand Down Expand Up @@ -170,7 +171,7 @@ static CheckedConsumer<DataStreamOutputImpl, Exception> writeAsyncDefaultFileReg
return new CheckedConsumer<DataStreamOutputImpl, Exception>() {
@Override
public void accept(DataStreamOutputImpl out) {
final DataStreamReply dataStreamReply = out.writeAsync(f).join();
final DataStreamReply dataStreamReply = out.writeAsync(f, StandardWriteOption.FLUSH).join();
DataStreamTestUtils.assertSuccessReply(Type.STREAM_DATA, size, dataStreamReply);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ static int writeAndAssertReplies(DataStreamOutputImpl out, int bufferSize, int b
sizes.add(size);

final ByteBuffer bf = initBuffer(dataSize, size);
futures.add(i == bufferNum - 1 ? out.writeAsync(bf, StandardWriteOption.SYNC) : out.writeAsync(bf));
futures.add(i == bufferNum - 1 ? out.writeAsync(bf, StandardWriteOption.FLUSH, StandardWriteOption.SYNC)
: out.writeAsync(bf));
dataSize += size;
}

Expand Down

0 comments on commit d82ea56

Please sign in to comment.