From 4490c3c325ea8e7238d2795157e0d4c02fec6e17 Mon Sep 17 00:00:00 2001 From: Zhang Yifei Date: Wed, 25 Sep 2024 14:54:33 +0800 Subject: [PATCH] Async process snapshot by working thread of RAFT --- src/org/jgroups/protocols/raft/RAFT.java | 78 ++++++++++++++++++------ 1 file changed, 60 insertions(+), 18 deletions(-) diff --git a/src/org/jgroups/protocols/raft/RAFT.java b/src/org/jgroups/protocols/raft/RAFT.java index 20f0431..690b801 100644 --- a/src/org/jgroups/protocols/raft/RAFT.java +++ b/src/org/jgroups/protocols/raft/RAFT.java @@ -469,21 +469,13 @@ public CompletableFuture removeServer(String name) throws Exception { /** Creates a snapshot and truncates the log. See https://github.com/belaban/jgroups-raft/issues/7 for details */ @ManagedOperation(description="Creates a new snapshot and truncates the log") - public synchronized void snapshot() throws Exception { - if(state_machine == null) - throw new IllegalStateException("state machine is null"); + public void snapshot() throws Exception { + snapshotAsync().get(); + } - ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(128, true); - internal_state.writeTo(out); - state_machine.writeContentTo(out); - ByteBuffer buf=ByteBuffer.wrap(out.buffer(), 0, out.position()); - log_impl.setSnapshot(buf); - log_impl.truncate(commitIndex()); - num_snapshots++; - // curr_log_size=logSizeInBytes(); - // this is faster than calling logSizeInBytes(), but may not be accurate: if commit-index is way - // behind last-appended, then this may perform the next truncation later than it should - curr_log_size=0; + public CompletableFuture snapshotAsync() { + CompletableFuture f = new CompletableFuture<>(); + offer(new SnapshotRequest(f)); return f; } /** Loads the log entries from [first .. commit_index] into the state machine */ @@ -690,7 +682,7 @@ public CompletableFuture setAsync(byte[] buf, int offset, int length, bo if(synchronous) // set only for testing purposes handleDownRequest(retval, buf, offset, length, internal, options); else { - add(new DownRequest(retval, buf, offset, length, internal, options)); // will call handleDownRequest() + offer(new DownRequest(retval, buf, offset, length, internal, options)); // will call handleDownRequest() } return retval; // 4. Return CompletableFuture } @@ -710,6 +702,12 @@ protected void add(Request r) { } } + protected void offer(Request r) { + if (!processing_queue.offer(r)) { + r.failed(new IllegalStateException("processing queue is full")); + } + } + /** This method is always called by a single thread only, and does therefore not need to be reentrant */ protected void handleDownRequest(CompletableFuture f, byte[] buf, int offset, int length, boolean internal, Options opts) { @@ -858,6 +856,15 @@ else if(r instanceof DownRequest) { reqtab.create(index++, raft_id, dr.f, this::majority, dr.options); length+=dr.length; } + else if (r instanceof SnapshotRequest) { + SnapshotRequest sr = (SnapshotRequest) r; + try { + takeSnapshot(); sr.f.complete(null); + } catch (Exception e) { + sr.f.completeExceptionally(e); + throw e; + } + } } catch(Throwable ex) { log.error("%s: failed handling request %s: %s", local_addr, r, ex); @@ -1048,7 +1055,7 @@ protected void resend(Address target, long from, long to) { protected void sendSnapshotTo(Address dest) throws Exception { LogEntry last_committed_entry=log_impl.get(commitIndex()); long last_index=commit_index, last_term=last_committed_entry.term; - snapshot(); + takeSnapshot(); ByteBuffer data=log_impl.getSnapshot(); log.debug("%s: sending snapshot (%s) to %s", local_addr, Util.printBytes(data.position()), dest); Message msg=new BytesMessage(dest, data) @@ -1094,7 +1101,7 @@ protected void snapshotIfNeeded(int bytes_added) { try { this.log.debug("%s: current log size is %d, exceeding max_log_size of %d: creating snapshot", local_addr, curr_log_size, max_log_size); - snapshot(); + takeSnapshot(); } catch(Exception ex) { log.error("%s: failed snapshotting log: %s", local_addr, ex); @@ -1102,6 +1109,23 @@ protected void snapshotIfNeeded(int bytes_added) { } } + protected void takeSnapshot() throws Exception { + if(state_machine == null) + throw new IllegalStateException("state machine is null"); + + ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(128, true); + internal_state.writeTo(out); + state_machine.writeContentTo(out); + ByteBuffer buf=ByteBuffer.wrap(out.buffer(), 0, out.position()); + log_impl.setSnapshot(buf); + log_impl.truncate(commitIndex()); + num_snapshots++; + // curr_log_size=logSizeInBytes(); + // this is faster than calling logSizeInBytes(), but may not be accurate: if commit-index is way + // behind last-appended, then this may perform the next truncation later than it should + curr_log_size=0; + } + /** * Applies log entries [commit_index+1 .. to_inclusive] to the state machine and notifies clients in RequestTable. * @param to_inclusive The end index (inclusive) of the log entries to apply @@ -1319,7 +1343,7 @@ public String toString() { } } - /** Generated by {@link org.jgroups.protocols.raft.RAFT#setAsync(byte[], int, int)} */ + /** Generated by {@link RAFT#setAsync(byte[], int, int)} */ protected static class DownRequest extends Request { final CompletableFuture f; final byte[] buf; @@ -1346,4 +1370,22 @@ public String toString() { return String.format("%s %d bytes", DownRequest.class.getSimpleName(), length); } } + + protected static class SnapshotRequest extends Request { + final CompletableFuture f; + + public SnapshotRequest(CompletableFuture f) { + this.f = f; + } + + @Override + protected final void failed(Throwable t) { + f.completeExceptionally(t); + } + + @Override + public String toString() { + return SnapshotRequest.class.getSimpleName(); + } + } }