Skip to content

Commit

Permalink
[IOTDB-4421] implement retry in RatisConsensus (apache#7334)
Browse files Browse the repository at this point in the history
  • Loading branch information
SzyWilliam authored Sep 16, 2022
1 parent 31f21e6 commit f95d6a3
Showing 1 changed file with 31 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
public class ApplicationStateMachineProxy extends BaseStateMachine {
private final Logger logger = LoggerFactory.getLogger(ApplicationStateMachineProxy.class);
private final IStateMachine applicationStateMachine;
private final IStateMachine.RetryPolicy retryPolicy;

// Raft Storage sub dir for statemachine data, default (_sm)
private File statemachineDir;
Expand All @@ -58,6 +59,10 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {

public ApplicationStateMachineProxy(IStateMachine stateMachine, RaftGroupId id) {
applicationStateMachine = stateMachine;
retryPolicy =
applicationStateMachine instanceof IStateMachine.RetryPolicy
? (IStateMachine.RetryPolicy) applicationStateMachine
: new IStateMachine.RetryPolicy() {};
snapshotStorage = new SnapshotStorage(applicationStateMachine);
applicationStateMachine.start();
groupId = id;
Expand Down Expand Up @@ -117,23 +122,44 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
log.getStateMachineLogEntry().getLogData().asReadOnlyByteBuffer());
}

Message ret;
Message ret = null;
waitUntilSystemNotReadOnly();
TSStatus finalStatus = null;
boolean shouldRetry = false;
boolean firstTry = true;
do {
try {
if (!firstTry) {
Thread.sleep(retryPolicy.getSleepTime());
}
TSStatus result = applicationStateMachine.write(applicationRequest);
ret = new ResponseMessage(result);
break;
} catch (Exception rte) {

if (firstTry) {
finalStatus = result;
firstTry = false;
} else {
finalStatus = retryPolicy.updateResult(finalStatus, result);
}

shouldRetry = retryPolicy.shouldRetry(finalStatus);
if (!shouldRetry) {
ret = new ResponseMessage(finalStatus);
break;
}
} catch (InterruptedException i) {
logger.warn("{} interrupted when retry sleep", this);
Thread.currentThread().interrupt();
} catch (Throwable rte) {
logger.error("application statemachine throws a runtime exception: ", rte);
ret = Message.valueOf("internal error. statemachine throws a runtime exception: " + rte);
if (applicationStateMachine.isReadOnly()) {
waitUntilSystemNotReadOnly();
shouldRetry = true;
} else {
break;
}
}
} while (!applicationStateMachine.isReadOnly());
} while (shouldRetry);

return CompletableFuture.completedFuture(ret);
}
Expand Down

0 comments on commit f95d6a3

Please sign in to comment.