Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async snapshot by working thread of RAFT #309

Merged
merged 1 commit into from
Oct 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 60 additions & 18 deletions src/org/jgroups/protocols/raft/RAFT.java
Original file line number Diff line number Diff line change
Expand Up @@ -469,21 +469,13 @@ public CompletableFuture<byte[]> removeServer(String name) throws Exception {

/** Creates a snapshot and truncates the log. See https://github.com/belaban/jgroups-raft/issues/7 for details */
@ManagedOperation(description="Creates a new snapshot and truncates the log")
public synchronized void snapshot() throws Exception {
if(state_machine == null)
throw new IllegalStateException("state machine is null");
public void snapshot() throws Exception {
snapshotAsync().get();
}

ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(128, true);
internal_state.writeTo(out);
state_machine.writeContentTo(out);
ByteBuffer buf=ByteBuffer.wrap(out.buffer(), 0, out.position());
log_impl.setSnapshot(buf);
log_impl.truncate(commitIndex());
num_snapshots++;
// curr_log_size=logSizeInBytes();
// this is faster than calling logSizeInBytes(), but may not be accurate: if commit-index is way
// behind last-appended, then this may perform the next truncation later than it should
curr_log_size=0;
public CompletableFuture<Void> snapshotAsync() {
CompletableFuture<Void> f = new CompletableFuture<>();
offer(new SnapshotRequest(f)); return f;
}

/** Loads the log entries from [first .. commit_index] into the state machine */
Expand Down Expand Up @@ -690,7 +682,7 @@ public CompletableFuture<byte[]> setAsync(byte[] buf, int offset, int length, bo
if(synchronous) // set only for testing purposes
handleDownRequest(retval, buf, offset, length, internal, options);
else {
add(new DownRequest(retval, buf, offset, length, internal, options)); // will call handleDownRequest()
offer(new DownRequest(retval, buf, offset, length, internal, options)); // will call handleDownRequest()
}
return retval; // 4. Return CompletableFuture
}
Expand All @@ -710,6 +702,12 @@ protected void add(Request r) {
}
}

protected void offer(Request r) {
if (!processing_queue.offer(r)) {
r.failed(new IllegalStateException("processing queue is full"));
}
}

/** This method is always called by a single thread only, and does therefore not need to be reentrant */
protected void handleDownRequest(CompletableFuture<byte[]> f, byte[] buf, int offset, int length,
boolean internal, Options opts) {
Expand Down Expand Up @@ -858,6 +856,15 @@ else if(r instanceof DownRequest) {
reqtab.create(index++, raft_id, dr.f, this::majority, dr.options);
length+=dr.length;
}
else if (r instanceof SnapshotRequest) {
SnapshotRequest sr = (SnapshotRequest) r;
try {
takeSnapshot(); sr.f.complete(null);
} catch (Exception e) {
sr.f.completeExceptionally(e);
throw e;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to rethrow this exception. It will run from a single thread and would discard the batch of messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in a Throwable try catch block, it can't affect other messages of the batch, and I think the exception is a internal error of the state machine it‘s worth printing out.

}
}
}
catch(Throwable ex) {
log.error("%s: failed handling request %s: %s", local_addr, r, ex);
Expand Down Expand Up @@ -1048,7 +1055,7 @@ protected void resend(Address target, long from, long to) {
protected void sendSnapshotTo(Address dest) throws Exception {
LogEntry last_committed_entry=log_impl.get(commitIndex());
long last_index=commit_index, last_term=last_committed_entry.term;
snapshot();
takeSnapshot();
ByteBuffer data=log_impl.getSnapshot();
log.debug("%s: sending snapshot (%s) to %s", local_addr, Util.printBytes(data.position()), dest);
Message msg=new BytesMessage(dest, data)
Expand Down Expand Up @@ -1094,14 +1101,31 @@ protected void snapshotIfNeeded(int bytes_added) {
try {
this.log.debug("%s: current log size is %d, exceeding max_log_size of %d: creating snapshot",
local_addr, curr_log_size, max_log_size);
snapshot();
takeSnapshot();
}
catch(Exception ex) {
log.error("%s: failed snapshotting log: %s", local_addr, ex);
}
}
}

protected void takeSnapshot() throws Exception {
if(state_machine == null)
throw new IllegalStateException("state machine is null");

ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(128, true);
internal_state.writeTo(out);
state_machine.writeContentTo(out);
ByteBuffer buf=ByteBuffer.wrap(out.buffer(), 0, out.position());
log_impl.setSnapshot(buf);
log_impl.truncate(commitIndex());
num_snapshots++;
// curr_log_size=logSizeInBytes();
// this is faster than calling logSizeInBytes(), but may not be accurate: if commit-index is way
// behind last-appended, then this may perform the next truncation later than it should
curr_log_size=0;
}

/**
* Applies log entries [commit_index+1 .. to_inclusive] to the state machine and notifies clients in RequestTable.
* @param to_inclusive The end index (inclusive) of the log entries to apply
Expand Down Expand Up @@ -1319,7 +1343,7 @@ public String toString() {
}
}

/** Generated by {@link org.jgroups.protocols.raft.RAFT#setAsync(byte[], int, int)} */
/** Generated by {@link RAFT#setAsync(byte[], int, int)} */
protected static class DownRequest extends Request {
final CompletableFuture<byte[]> f;
final byte[] buf;
Expand All @@ -1346,4 +1370,22 @@ public String toString() {
return String.format("%s %d bytes", DownRequest.class.getSimpleName(), length);
}
}

protected static class SnapshotRequest extends Request {
final CompletableFuture<Void> f;

public SnapshotRequest(CompletableFuture<Void> f) {
this.f = f;
}

@Override
protected final void failed(Throwable t) {
f.completeExceptionally(t);
}

@Override
public String toString() {
return SnapshotRequest.class.getSimpleName();
}
}
}
Loading