Skip to content

Commit

Permalink
Async process snapshot by working thread of RAFT
Browse files Browse the repository at this point in the history
  • Loading branch information
yfei-z authored and jabolina committed Oct 2, 2024
1 parent f7459b1 commit 4490c3c
Showing 1 changed file with 60 additions and 18 deletions.
78 changes: 60 additions & 18 deletions src/org/jgroups/protocols/raft/RAFT.java
Original file line number Diff line number Diff line change
Expand Up @@ -469,21 +469,13 @@ public CompletableFuture<byte[]> removeServer(String name) throws Exception {

/** Creates a snapshot and truncates the log. See https://github.com/belaban/jgroups-raft/issues/7 for details */
@ManagedOperation(description="Creates a new snapshot and truncates the log")
public synchronized void snapshot() throws Exception {
if(state_machine == null)
throw new IllegalStateException("state machine is null");
public void snapshot() throws Exception {
snapshotAsync().get();
}

ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(128, true);
internal_state.writeTo(out);
state_machine.writeContentTo(out);
ByteBuffer buf=ByteBuffer.wrap(out.buffer(), 0, out.position());
log_impl.setSnapshot(buf);
log_impl.truncate(commitIndex());
num_snapshots++;
// curr_log_size=logSizeInBytes();
// this is faster than calling logSizeInBytes(), but may not be accurate: if commit-index is way
// behind last-appended, then this may perform the next truncation later than it should
curr_log_size=0;
public CompletableFuture<Void> snapshotAsync() {
CompletableFuture<Void> f = new CompletableFuture<>();
offer(new SnapshotRequest(f)); return f;
}

/** Loads the log entries from [first .. commit_index] into the state machine */
Expand Down Expand Up @@ -690,7 +682,7 @@ public CompletableFuture<byte[]> setAsync(byte[] buf, int offset, int length, bo
if(synchronous) // set only for testing purposes
handleDownRequest(retval, buf, offset, length, internal, options);
else {
add(new DownRequest(retval, buf, offset, length, internal, options)); // will call handleDownRequest()
offer(new DownRequest(retval, buf, offset, length, internal, options)); // will call handleDownRequest()
}
return retval; // 4. Return CompletableFuture
}
Expand All @@ -710,6 +702,12 @@ protected void add(Request r) {
}
}

protected void offer(Request r) {
if (!processing_queue.offer(r)) {
r.failed(new IllegalStateException("processing queue is full"));
}
}

/** This method is always called by a single thread only, and does therefore not need to be reentrant */
protected void handleDownRequest(CompletableFuture<byte[]> f, byte[] buf, int offset, int length,
boolean internal, Options opts) {
Expand Down Expand Up @@ -858,6 +856,15 @@ else if(r instanceof DownRequest) {
reqtab.create(index++, raft_id, dr.f, this::majority, dr.options);
length+=dr.length;
}
else if (r instanceof SnapshotRequest) {
SnapshotRequest sr = (SnapshotRequest) r;
try {
takeSnapshot(); sr.f.complete(null);
} catch (Exception e) {
sr.f.completeExceptionally(e);
throw e;
}
}
}
catch(Throwable ex) {
log.error("%s: failed handling request %s: %s", local_addr, r, ex);
Expand Down Expand Up @@ -1048,7 +1055,7 @@ protected void resend(Address target, long from, long to) {
protected void sendSnapshotTo(Address dest) throws Exception {
LogEntry last_committed_entry=log_impl.get(commitIndex());
long last_index=commit_index, last_term=last_committed_entry.term;
snapshot();
takeSnapshot();
ByteBuffer data=log_impl.getSnapshot();
log.debug("%s: sending snapshot (%s) to %s", local_addr, Util.printBytes(data.position()), dest);
Message msg=new BytesMessage(dest, data)
Expand Down Expand Up @@ -1094,14 +1101,31 @@ protected void snapshotIfNeeded(int bytes_added) {
try {
this.log.debug("%s: current log size is %d, exceeding max_log_size of %d: creating snapshot",
local_addr, curr_log_size, max_log_size);
snapshot();
takeSnapshot();
}
catch(Exception ex) {
log.error("%s: failed snapshotting log: %s", local_addr, ex);
}
}
}

protected void takeSnapshot() throws Exception {
if(state_machine == null)
throw new IllegalStateException("state machine is null");

ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(128, true);
internal_state.writeTo(out);
state_machine.writeContentTo(out);
ByteBuffer buf=ByteBuffer.wrap(out.buffer(), 0, out.position());
log_impl.setSnapshot(buf);
log_impl.truncate(commitIndex());
num_snapshots++;
// curr_log_size=logSizeInBytes();
// this is faster than calling logSizeInBytes(), but may not be accurate: if commit-index is way
// behind last-appended, then this may perform the next truncation later than it should
curr_log_size=0;
}

/**
* Applies log entries [commit_index+1 .. to_inclusive] to the state machine and notifies clients in RequestTable.
* @param to_inclusive The end index (inclusive) of the log entries to apply
Expand Down Expand Up @@ -1319,7 +1343,7 @@ public String toString() {
}
}

/** Generated by {@link org.jgroups.protocols.raft.RAFT#setAsync(byte[], int, int)} */
/** Generated by {@link RAFT#setAsync(byte[], int, int)} */
protected static class DownRequest extends Request {
final CompletableFuture<byte[]> f;
final byte[] buf;
Expand All @@ -1346,4 +1370,22 @@ public String toString() {
return String.format("%s %d bytes", DownRequest.class.getSimpleName(), length);
}
}

protected static class SnapshotRequest extends Request {
final CompletableFuture<Void> f;

public SnapshotRequest(CompletableFuture<Void> f) {
this.f = f;
}

@Override
protected final void failed(Throwable t) {
f.completeExceptionally(t);
}

@Override
public String toString() {
return SnapshotRequest.class.getSimpleName();
}
}
}

0 comments on commit 4490c3c

Please sign in to comment.