Skip to content

Commit

Permalink
[INLONG-10464][SDK] InlongSDK support retry sending when failed
Browse files Browse the repository at this point in the history
  • Loading branch information
emptyOVO committed Sep 19, 2024
1 parent 807717a commit 5bd2fb2
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,6 @@ public class ConfigConstants {
public static String HTTP = "http://";
public static String HTTPS = "https://";

public static int SENDER_MAX_RETRY = 3;

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

public class DefaultMessageSender implements MessageSender {

Expand All @@ -65,6 +68,7 @@ public class DefaultMessageSender implements MessageSender {
private boolean isReport = false;
private boolean isSupportLF = false;
private int cpsSize = ConfigConstants.COMPRESS_SIZE;
private boolean sendSuccess = true;

public DefaultMessageSender(ProxyClientConfig configure) throws Exception {
this(configure, null);
Expand Down Expand Up @@ -191,11 +195,93 @@ public String getSDKVersion() {
return ConfigConstants.PROXY_SDK_VERSION;
}

public void setSendSuccess(boolean sendSuccess) {
this.sendSuccess = sendSuccess;
}

public boolean isSendSuccess() {
return sendSuccess;
}

private SendResult retryWhenSendMessageFail(Function<DefaultMessageSender, SendResult> sendOperation,
DefaultMessageSender initialSender) {
int attempts = 0;
SendResult sendResult = null;
DefaultMessageSender currentSender = initialSender;
while (attempts < ConfigConstants.SENDER_MAX_RETRY) {
sendResult = sendOperation.apply(currentSender);
if (sendResult != null && sendResult.equals(SendResult.OK)) {
currentSender.setSendSuccess(true);
return sendResult;
}
currentSender.setSendSuccess(false);
// try to get success sender
DefaultMessageSender randomSuccessSender = getRandomSuccessSender();
if (randomSuccessSender != null) {
currentSender = randomSuccessSender;
} else {
break;
}
attempts++;
}

return sendResult;
}

private String retryWhenSendMessageIndexFail(Function<DefaultMessageSender, String> sendOperation,
DefaultMessageSender initialSender) {
int attempts = 0;
String sendIndexResult = null;
DefaultMessageSender currentSender = initialSender;
while (attempts < ConfigConstants.SENDER_MAX_RETRY) {
sendIndexResult = sendOperation.apply(currentSender);
if (sendIndexResult != null && sendIndexResult.startsWith(SendResult.OK.toString())) {
currentSender.setSendSuccess(true);
return sendIndexResult;
}
currentSender.setSendSuccess(false);
// try to get success sender
DefaultMessageSender randomSuccessSender = getRandomSuccessSender();
if (randomSuccessSender != null) {
currentSender = randomSuccessSender;
} else {
break;
}
attempts++;
}

return sendIndexResult;
}

private DefaultMessageSender getRandomSuccessSender() {
List<Integer> keys = new ArrayList<>(CACHE_SENDER.keySet());
if (keys.isEmpty()) {
return null;
}
int attempts = 0;
int maxAttempts = keys.size();
// choose sending success MessageSender randomly
while (attempts < maxAttempts) {
int randomIndex = ThreadLocalRandom.current().nextInt(keys.size());
Integer randomKey = keys.get(randomIndex);
DefaultMessageSender sender = CACHE_SENDER.get(randomKey);

if (sender != null && sender.isSendSuccess()) {
return sender;
}
keys.remove(randomIndex);
attempts++;
}
return null;
}

@Deprecated
public SendResult sendMessage(byte[] body, String attributes, String msgUUID,
long timeout, TimeUnit timeUnit) {
return sender.syncSendMessage(new EncodeObject(body, attributes,
idGenerator.getNextId()), msgUUID, timeout, timeUnit);
Function<DefaultMessageSender, SendResult> sendOperation =
(currentSender) -> currentSender.sender.syncSendMessage(
new EncodeObject(body, attributes, idGenerator.getNextId()), msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
}

public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
Expand Down Expand Up @@ -235,19 +321,28 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long
EncodeObject encodeObject = new EncodeObject(body, msgtype, isCompressEnd, isReport,
isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
} else if (msgtype == 3 || msgtype == 5) {
if (isProxySend) {
proxySend = "&" + proxySend;
}
final String finalProxySend = proxySend;
final long finalDt = dt;
if (isCompressEnd) {
return sender.syncSendMessage(new EncodeObject(body, "groupId=" + groupId + "&streamId="
+ streamId + "&dt=" + dt + "&cp=snappy" + proxySend, idGenerator.getNextId(),
this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(new EncodeObject(body, "groupId=" + groupId + "&streamId="
+ streamId + "&dt=" + finalDt + "&cp=snappy" + finalProxySend, idGenerator.getNextId(),
this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
} else {
return sender.syncSendMessage(new EncodeObject(body,
"groupId=" + groupId + "&streamId=" + streamId + "&dt=" + dt + proxySend,
idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit);
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(new EncodeObject(body,
"groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + finalProxySend,
idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout,
timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
}
}

Expand Down Expand Up @@ -294,18 +389,24 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long
isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(), groupId, streamId, attrs.toString());
encodeObject.setSupportLF(isSupportLF);
return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
} else if (msgtype == 3 || msgtype == 5) {
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt);
if (isCompressEnd) {
attrs.append("&cp=snappy");
return sender.syncSendMessage(new EncodeObject(body, attrs.toString(),
idGenerator.getNextId(), this.getMsgtype(), true, groupId),
msgUUID, timeout, timeUnit);
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(new EncodeObject(body, attrs.toString(),
idGenerator.getNextId(), this.getMsgtype(), true, groupId),
msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
} else {
return sender.syncSendMessage(new EncodeObject(body, attrs.toString(),
idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID,
timeout, timeUnit);
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(new EncodeObject(body, attrs.toString(),
idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID,
timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
}
}
return null;
Expand Down Expand Up @@ -348,20 +449,28 @@ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String stre
isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(), groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
} else if (msgtype == 3 || msgtype == 5) {
if (isProxySend) {
proxySend = "&" + proxySend;
}
final long finalDt = dt;
final String finalProxySend = proxySend;
if (isCompress) {
return sender.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId
+ "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size() + proxySend,
idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId
+ "&dt=" + finalDt + "&cp=snappy" + "&cnt=" + bodyList.size() + finalProxySend,
idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
} else {
return sender.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId
+ "&dt=" + dt + "&cnt=" + bodyList.size() + proxySend, idGenerator.getNextId(),
this.getMsgtype(),
false, groupId), msgUUID, timeout, timeUnit);
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId
+ "&dt=" + finalDt + "&cnt=" + bodyList.size() + finalProxySend,
idGenerator.getNextId(),
this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
}
}
return null;
Expand Down Expand Up @@ -404,19 +513,25 @@ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String stre
isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(), groupId, streamId, attrs.toString());
encodeObject.setSupportLF(isSupportLF);
return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
} else if (msgtype == 3 || msgtype == 5) {
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId)
.append("&dt=").append(dt).append("&cnt=").append(bodyList.size());
if (isCompress) {
attrs.append("&cp=snappy");
return sender.syncSendMessage(new EncodeObject(bodyList, attrs.toString(),
idGenerator.getNextId(), this.getMsgtype(), true, groupId),
msgUUID, timeout, timeUnit);
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(new EncodeObject(bodyList, attrs.toString(),
idGenerator.getNextId(), this.getMsgtype(), true, groupId),
msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
} else {
return sender.syncSendMessage(new EncodeObject(bodyList, attrs.toString(),
idGenerator.getNextId(), this.getMsgtype(), false, groupId),
msgUUID, timeout, timeUnit);
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(new EncodeObject(bodyList, attrs.toString(),
idGenerator.getNextId(), this.getMsgtype(), false, groupId),
msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
}
}
return null;
Expand Down Expand Up @@ -807,7 +922,9 @@ public String sendMessageData(List<byte[]> bodyList, String groupId, String stre
isReport, isGroupIdTransfer, dt / 1000,
sid, groupId, streamId, attrs.toString(), "data", "");
encodeObject.setSupportLF(isSupportLF);
return sender.syncSendMessageIndex(encodeObject, msgUUID, timeout, timeUnit);
Function<DefaultMessageSender, String> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessageIndex(encodeObject, msgUUID, timeout, timeUnit);
return retryWhenSendMessageIndexFail(sendOperation, this);
}
return null;
}
Expand All @@ -822,7 +939,9 @@ private String sendMetric(byte[] body, String groupId, String streamId, long dt,
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(body, msgtype, false, isReport,
isGroupIdTransfer, dt / 1000, sid, groupId, streamId, "", messageKey, ip);
return sender.syncSendMessageIndex(encodeObject, msgUUID, timeout, timeUnit);
Function<DefaultMessageSender, String> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessageIndex(encodeObject, msgUUID, timeout, timeUnit);
return retryWhenSendMessageIndexFail(sendOperation, this);
}
return null;
}
Expand Down

0 comments on commit 5bd2fb2

Please sign in to comment.