Skip to content

Commit

Permalink
[ISSUE openmessaging#140] Fix obtain pair from batchPendingMap (openm…
Browse files Browse the repository at this point in the history
…essaging#141)

Signed-off-by: zhangyang <[email protected]>
  • Loading branch information
Git-Yang authored Apr 29, 2022
1 parent 80d9f2b commit eb1b2e2
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -590,15 +590,15 @@ private void doAppend() throws Exception {
private void sendBatchAppendEntryRequest() throws Exception {
batchAppendEntryRequest.setCommitIndex(dLedgerStore.getCommittedIndex());
CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(batchAppendEntryRequest);
batchPendingMap.put(batchAppendEntryRequest.getLastEntryIndex(), new Pair<>(System.currentTimeMillis(), batchAppendEntryRequest.getCount()));
batchPendingMap.put(batchAppendEntryRequest.getFirstEntryIndex(), new Pair<>(System.currentTimeMillis(), batchAppendEntryRequest.getCount()));
responseFuture.whenComplete((x, ex) -> {
try {
PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN);
DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());
switch (responseCode) {
case SUCCESS:
batchPendingMap.remove(x.getIndex());
updatePeerWaterMark(x.getTerm(), peerId, x.getIndex());
updatePeerWaterMark(x.getTerm(), peerId, x.getIndex() + x.getCount() - 1);
break;
case INCONSISTENT_STATE:
logger.info("[Push-{}]Get INCONSISTENT_STATE when batch push index={} term={}", peerId, x.getIndex(), x.getTerm());
Expand Down Expand Up @@ -891,7 +891,8 @@ private PushEntryResponse buildResponse(PushEntryRequest request, int code) {
response.setCode(code);
response.setTerm(request.getTerm());
if (request.getType() != PushEntryRequest.Type.COMMIT) {
response.setIndex(request.getLastEntryIndex());
response.setIndex(request.getFirstEntryIndex());
response.setCount(request.getCount());
}
response.setBeginIndex(dLedgerStore.getLedgerBeginIndex());
response.setEndIndex(dLedgerStore.getLedgerEndIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,13 @@ public long getLastEntryIndex() {
}

public int getCount() {
return batchEntry.size();
if (!batchEntry.isEmpty()) {
return batchEntry.size();
} else if (entry != null) {
return 1;
} else {
return 0;
}
}

public long getTotalSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class PushEntryResponse extends RequestOrResponse {

private long beginIndex;
private long endIndex;
private int count;

public Long getIndex() {
return index;
Expand All @@ -45,4 +46,12 @@ public long getEndIndex() {
public void setEndIndex(long endIndex) {
this.endIndex = endIndex;
}

public int getCount() {
return count;
}

public void setCount(int count) {
this.count = count;
}
}

0 comments on commit eb1b2e2

Please sign in to comment.