Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat) Snapshot file to interrupt concurrent transmission(#503) #632

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ public interface LocalFileMetaOrBuilder extends
* <code>optional string checksum = 3;</code>
*/
com.google.protobuf.ByteString getChecksumBytes();

/**
* <code>optional int32 sliceTotal = 4;</code>
*/
boolean hasSliceTotal();

/**
* <code>optional int32 sliceTotal = 4;</code>
*/
int getSliceTotal();
}

/**
Expand All @@ -173,6 +183,7 @@ private LocalFileMeta() {
userMeta_ = com.google.protobuf.ByteString.EMPTY;
source_ = 0;
checksum_ = "";
sliceTotal_ = 0;
}

@java.lang.Override
Expand Down Expand Up @@ -227,6 +238,11 @@ private LocalFileMeta(com.google.protobuf.CodedInputStream input,
checksum_ = bs;
break;
}
case 32: {
bitField0_ |= 0x00000008;
sliceTotal_ = input.readInt32();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
Expand Down Expand Up @@ -328,6 +344,23 @@ public com.google.protobuf.ByteString getChecksumBytes() {
}
}

public static final int SLICETOTAL_FIELD_NUMBER = 4;
private int sliceTotal_;

/**
* <code>optional int32 sliceTotal = 4;</code>
*/
public boolean hasSliceTotal() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}

/**
* <code>optional int32 sliceTotal = 4;</code>
*/
public int getSliceTotal() {
return sliceTotal_;
}

private byte memoizedIsInitialized = -1;

public final boolean isInitialized() {
Expand All @@ -351,6 +384,9 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io
if (((bitField0_ & 0x00000004) == 0x00000004)) {
com.google.protobuf.GeneratedMessageV3.writeString(output, 3, checksum_);
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeInt32(4, sliceTotal_);
}
unknownFields.writeTo(output);
}

Expand All @@ -369,6 +405,9 @@ public int getSerializedSize() {
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += com.google.protobuf.GeneratedMessageV3.computeStringSize(3, checksum_);
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
size += com.google.protobuf.CodedOutputStream.computeInt32Size(4, sliceTotal_);
}
size += unknownFields.getSerializedSize();
memoizedSize = size;
return size;
Expand Down Expand Up @@ -397,6 +436,10 @@ public boolean equals(final java.lang.Object obj) {
if (hasChecksum()) {
result = result && getChecksum().equals(other.getChecksum());
}
result = result && (hasSliceTotal() == other.hasSliceTotal());
if (hasSliceTotal()) {
result = result && (getSliceTotal() == other.getSliceTotal());
}
result = result && unknownFields.equals(other.unknownFields);
return result;
}
Expand All @@ -420,6 +463,10 @@ public int hashCode() {
hash = (37 * hash) + CHECKSUM_FIELD_NUMBER;
hash = (53 * hash) + getChecksum().hashCode();
}
if (hasSliceTotal()) {
hash = (37 * hash) + SLICETOTAL_FIELD_NUMBER;
hash = (53 * hash) + getSliceTotal();
}
hash = (29 * hash) + unknownFields.hashCode();
memoizedHashCode = hash;
return hash;
Expand Down Expand Up @@ -555,6 +602,8 @@ public Builder clear() {
bitField0_ = (bitField0_ & ~0x00000002);
checksum_ = "";
bitField0_ = (bitField0_ & ~0x00000004);
sliceTotal_ = 0;
bitField0_ = (bitField0_ & ~0x00000008);
return this;
}

Expand Down Expand Up @@ -591,6 +640,10 @@ public com.alipay.sofa.jraft.entity.LocalFileMetaOutter.LocalFileMeta buildParti
to_bitField0_ |= 0x00000004;
}
result.checksum_ = checksum_;
if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
to_bitField0_ |= 0x00000008;
}
result.sliceTotal_ = sliceTotal_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
Expand Down Expand Up @@ -645,6 +698,9 @@ public Builder mergeFrom(com.alipay.sofa.jraft.entity.LocalFileMetaOutter.LocalF
checksum_ = other.checksum_;
onChanged();
}
if (other.hasSliceTotal()) {
setSliceTotal(other.getSliceTotal());
}
this.mergeUnknownFields(other.unknownFields);
onChanged();
return this;
Expand Down Expand Up @@ -832,6 +888,42 @@ public Builder setChecksumBytes(com.google.protobuf.ByteString value) {
return this;
}

private int sliceTotal_;

/**
* <code>optional int32 sliceTotal = 4;</code>
*/
public boolean hasSliceTotal() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}

/**
* <code>optional int32 sliceTotal = 4;</code>
*/
public int getSliceTotal() {
return sliceTotal_;
}

/**
* <code>optional int32 sliceTotal = 4;</code>
*/
public Builder setSliceTotal(int value) {
bitField0_ |= 0x00000008;
sliceTotal_ = value;
onChanged();
return this;
}

/**
* <code>optional int32 sliceTotal = 4;</code>
*/
public Builder clearSliceTotal() {
bitField0_ = (bitField0_ & ~0x00000008);
sliceTotal_ = 0;
onChanged();
return this;
}

public final Builder setUnknownFields(final com.google.protobuf.UnknownFieldSet unknownFields) {
return super.setUnknownFields(unknownFields);
}
Expand Down Expand Up @@ -887,12 +979,13 @@ public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() {

private static com.google.protobuf.Descriptors.FileDescriptor descriptor;
static {
java.lang.String[] descriptorData = { "\n\025local_file_meta.proto\022\005jraft\"W\n\rLocalF"
java.lang.String[] descriptorData = { "\n\025local_file_meta.proto\022\005jraft\"k\n\rLocalF"
+ "ileMeta\022\021\n\tuser_meta\030\001 \001(\014\022!\n\006source\030\002 \001"
+ "(\0162\021.jraft.FileSource\022\020\n\010checksum\030\003 \001(\t*"
+ ">\n\nFileSource\022\025\n\021FILE_SOURCE_LOCAL\020\000\022\031\n\025"
+ "FILE_SOURCE_REFERENCE\020\001B3\n\034com.alipay.so"
+ "fa.jraft.entityB\023LocalFileMetaOutter" };
+ "(\0162\021.jraft.FileSource\022\020\n\010checksum\030\003 \001(\t\022"
+ "\022\n\nsliceTotal\030\004 \001(\005*>\n\nFileSource\022\025\n\021FIL"
+ "E_SOURCE_LOCAL\020\000\022\031\n\025FILE_SOURCE_REFERENC"
+ "E\020\001B3\n\034com.alipay.sofa.jraft.entityB\023Loc"
+ "alFileMetaOutter" };
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
public com.google.protobuf.ExtensionRegistry assignDescriptors(com.google.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
Expand All @@ -903,8 +996,8 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors(com.google.protob
new com.google.protobuf.Descriptors.FileDescriptor[] {}, assigner);
internal_static_jraft_LocalFileMeta_descriptor = getDescriptor().getMessageTypes().get(0);
internal_static_jraft_LocalFileMeta_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
internal_static_jraft_LocalFileMeta_descriptor,
new java.lang.String[] { "UserMeta", "Source", "Checksum", });
internal_static_jraft_LocalFileMeta_descriptor, new java.lang.String[] { "UserMeta", "Source", "Checksum",
"SliceTotal", });
}

// @@protoc_insertion_point(outer_class_scope)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,37 +35,38 @@
*/
public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {

public static final JRaftServiceFactory defaultServiceFactory = JRaftServiceLoader.load(JRaftServiceFactory.class) //
.first();
public static final JRaftServiceFactory defaultServiceFactory = JRaftServiceLoader.load(
JRaftServiceFactory.class) //
.first();

// A follower would become a candidate if it doesn't receive any message
// from the leader in |election_timeout_ms| milliseconds
// Default: 1000 (1s)
private int electionTimeoutMs = 1000; // follower to candidate timeout
private int electionTimeoutMs = 1000; // follower to candidate timeout

// One node's local priority value would be set to | electionPriority |
// value when it starts up.If this value is set to 0,the node will never be a leader.
// If this node doesn't support priority election,then set this value to -1.
// Default: -1
private int electionPriority = ElectionPriority.Disabled;
private int electionPriority = ElectionPriority.Disabled;

// If next leader is not elected until next election timeout, it exponentially
// decay its local target priority, for example target_priority = target_priority - gap
// Default: 10
private int decayPriorityGap = 10;
private int decayPriorityGap = 10;

// Leader lease time's ratio of electionTimeoutMs,
// To minimize the effects of clock drift, we should make that:
// clockDrift + leaderLeaseTimeoutMs < electionTimeout
// Default: 90, Max: 100
private int leaderLeaseTimeRatio = 90;
private int leaderLeaseTimeRatio = 90;

// A snapshot saving would be triggered every |snapshot_interval_s| seconds
// if this was reset as a positive number
// If |snapshot_interval_s| <= 0, the time based snapshot would be disabled.
//
// Default: 3600 (1 hour)
private int snapshotIntervalSecs = 3600;
private int snapshotIntervalSecs = 3600;

// A snapshot saving would be triggered every |snapshot_interval_s| seconds,
// and at this moment when state machine's lastAppliedIndex value
Expand All @@ -74,22 +75,22 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
// If |snapshotLogIndexMargin| <= 0, the distance based snapshot would be disable.
//
// Default: 0
private int snapshotLogIndexMargin = 0;
private int snapshotLogIndexMargin = 0;

// We will regard a adding peer as caught up if the margin between the
// last_log_index of this peer and the last_log_index of leader is less than
// |catchup_margin|
//
// Default: 1000
private int catchupMargin = 1000;
private int catchupMargin = 1000;

// If node is starting from a empty environment (both LogStorage and
// SnapshotStorage are empty), it would use |initial_conf| as the
// configuration of the group, otherwise it would load configuration from
// the existing environment.
//
// Default: A empty group
private Configuration initialConf = new Configuration();
private Configuration initialConf = new Configuration();

// The specific StateMachine implemented your business logic, which must be
// a valid instance.
Expand All @@ -108,37 +109,42 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
// to avoid useless transmission. Two files in local and remote are duplicate,
// only if they has the same filename and the same checksum (stored in file meta).
// Default: false
private boolean filterBeforeCopyRemote = false;
private boolean filterBeforeCopyRemote = true;

// If non-null, we will pass this throughput_snapshot_throttle to SnapshotExecutor
// Default: NULL
// scoped_refptr<SnapshotThrottle>* snapshot_throttle;

// If true, RPCs through raft_cli will be denied.
// Default: false
private boolean disableCli = false;
private boolean disableCli = false;

/**
* Whether use global timer pool, if true, the {@code timerPoolSize} will be invalid.
*/
private boolean sharedTimerPool = false;
private boolean sharedTimerPool = false;
/**
* Timer manager thread pool size
*/
private int timerPoolSize = Utils.cpus() * 3 > 20 ? 20 : Utils.cpus() * 3;
private int timerPoolSize = Utils.cpus() * 3 > 20 ? 20
: Utils.cpus() * 3;

/**
* CLI service request RPC executor pool size, use default executor if -1.
*/
private int cliRpcThreadPoolSize = Utils.cpus();
private int cliRpcThreadPoolSize = Utils.cpus();
/**
* RAFT request RPC executor pool size, use default executor if -1.
*/
private int raftRpcThreadPoolSize = Utils.cpus() * 6;
private int raftRpcThreadPoolSize = Utils.cpus() * 6;
/**
* SNAPSHOT file copy executor pool size, use default executor if -1.
*/
private int snapshotCopierThreadPoolSize = Utils.cpus();
/**
* Whether to enable metrics for node.
*/
private boolean enableMetrics = false;
private boolean enableMetrics = false;

/**
* If non-null, we will pass this SnapshotThrottle to SnapshotExecutor
Expand All @@ -149,24 +155,24 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
/**
* Whether use global election timer
*/
private boolean sharedElectionTimer = false;
private boolean sharedElectionTimer = false;
/**
* Whether use global vote timer
*/
private boolean sharedVoteTimer = false;
private boolean sharedVoteTimer = false;
/**
* Whether use global step down timer
*/
private boolean sharedStepDownTimer = false;
private boolean sharedStepDownTimer = false;
/**
* Whether use global snapshot timer
*/
private boolean sharedSnapshotTimer = false;
private boolean sharedSnapshotTimer = false;

/**
* Custom service factory.
*/
private JRaftServiceFactory serviceFactory = defaultServiceFactory;
private JRaftServiceFactory serviceFactory = defaultServiceFactory;

public JRaftServiceFactory getServiceFactory() {
return this.serviceFactory;
Expand Down Expand Up @@ -401,6 +407,14 @@ public void setSharedSnapshotTimer(boolean sharedSnapshotTimer) {
this.sharedSnapshotTimer = sharedSnapshotTimer;
}

public int getSnapshotCopierThreadPoolSize() {
return snapshotCopierThreadPoolSize;
}

public void setSnapshotCopierThreadPoolSize(int snapshotCopierThreadPoolSize) {
this.snapshotCopierThreadPoolSize = snapshotCopierThreadPoolSize;
}

@Override
public NodeOptions copy() {
final NodeOptions nodeOptions = new NodeOptions();
Expand All @@ -422,6 +436,7 @@ public NodeOptions copy() {
nodeOptions.setSharedVoteTimer(this.sharedVoteTimer);
nodeOptions.setSharedStepDownTimer(this.sharedStepDownTimer);
nodeOptions.setSharedSnapshotTimer(this.sharedSnapshotTimer);
nodeOptions.setSnapshotCopierThreadPoolSize(this.snapshotCopierThreadPoolSize);
return nodeOptions;
}

Expand All @@ -438,6 +453,7 @@ public String toString() {
+ raftRpcThreadPoolSize + ", enableMetrics=" + enableMetrics + ", snapshotThrottle=" + snapshotThrottle
+ ", sharedElectionTimer=" + sharedElectionTimer + ", sharedVoteTimer=" + sharedVoteTimer
+ ", sharedStepDownTimer=" + sharedStepDownTimer + ", sharedSnapshotTimer=" + sharedSnapshotTimer
+ ", serviceFactory=" + serviceFactory + ", raftOptions=" + raftOptions + "} " + super.toString();
+ ", serviceFactory=" + serviceFactory + ", raftOptions=" + raftOptions
+ ", snapshotCopierThreadPoolSize=" + snapshotCopierThreadPoolSize + "}" + super.toString();
}
}
Loading