Skip to content

Commit

Permalink
feat: support number configure retry times, change logic of choose Me…
Browse files Browse the repository at this point in the history
…ssageSender
  • Loading branch information
emptyOVO committed Sep 20, 2024
1 parent 5bd2fb2 commit b5aac77
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ public class ConfigConstants {
public static String HTTP = "http://";
public static String HTTPS = "https://";

public static int SENDER_MAX_RETRY = 3;
public static int DEFAULT_SENDER_MAX_RETRY = 3;

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
Expand All @@ -69,6 +67,7 @@ public class DefaultMessageSender implements MessageSender {
private boolean isSupportLF = false;
private int cpsSize = ConfigConstants.COMPRESS_SIZE;
private boolean sendSuccess = true;
private final int senderMaxRetry;

public DefaultMessageSender(ProxyClientConfig configure) throws Exception {
this(configure, null);
Expand All @@ -79,6 +78,7 @@ public DefaultMessageSender(ProxyClientConfig configure, ThreadFactory selfDefin
sender = new Sender(configure, selfDefineFactory);
groupId = configure.getInlongGroupId();
indexCol = new IndexCollectThread(storeIndex);
senderMaxRetry = configure.getSenderMaxRetry();
indexCol.start();

}
Expand Down Expand Up @@ -208,17 +208,17 @@ private SendResult retryWhenSendMessageFail(Function<DefaultMessageSender, SendR
int attempts = 0;
SendResult sendResult = null;
DefaultMessageSender currentSender = initialSender;
while (attempts < ConfigConstants.SENDER_MAX_RETRY) {
while (attempts < this.senderMaxRetry) {
sendResult = sendOperation.apply(currentSender);
if (sendResult != null && sendResult.equals(SendResult.OK)) {
currentSender.setSendSuccess(true);
return sendResult;
}
currentSender.setSendSuccess(false);
// try to get success sender
DefaultMessageSender randomSuccessSender = getRandomSuccessSender();
if (randomSuccessSender != null) {
currentSender = randomSuccessSender;
// try to get another success sender
DefaultMessageSender anotherSender = getAnotherSuccessSender(currentSender);
if (anotherSender != null) {
currentSender = anotherSender;
} else {
break;
}
Expand All @@ -233,15 +233,15 @@ private String retryWhenSendMessageIndexFail(Function<DefaultMessageSender, Stri
int attempts = 0;
String sendIndexResult = null;
DefaultMessageSender currentSender = initialSender;
while (attempts < ConfigConstants.SENDER_MAX_RETRY) {
while (attempts < this.senderMaxRetry) {
sendIndexResult = sendOperation.apply(currentSender);
if (sendIndexResult != null && sendIndexResult.startsWith(SendResult.OK.toString())) {
currentSender.setSendSuccess(true);
return sendIndexResult;
}
currentSender.setSendSuccess(false);
// try to get success sender
DefaultMessageSender randomSuccessSender = getRandomSuccessSender();
DefaultMessageSender randomSuccessSender = getAnotherSuccessSender(currentSender);
if (randomSuccessSender != null) {
currentSender = randomSuccessSender;
} else {
Expand All @@ -253,24 +253,11 @@ private String retryWhenSendMessageIndexFail(Function<DefaultMessageSender, Stri
return sendIndexResult;
}

private DefaultMessageSender getRandomSuccessSender() {
List<Integer> keys = new ArrayList<>(CACHE_SENDER.keySet());
if (keys.isEmpty()) {
return null;
}
int attempts = 0;
int maxAttempts = keys.size();
// choose sending success MessageSender randomly
while (attempts < maxAttempts) {
int randomIndex = ThreadLocalRandom.current().nextInt(keys.size());
Integer randomKey = keys.get(randomIndex);
DefaultMessageSender sender = CACHE_SENDER.get(randomKey);

if (sender != null && sender.isSendSuccess()) {
private DefaultMessageSender getAnotherSuccessSender(DefaultMessageSender currentSender) {
for (DefaultMessageSender sender : CACHE_SENDER.values()) {
if (sender != null && sender.isSendSuccess() && !sender.equals(currentSender)) {
return sender;
}
keys.remove(randomIndex);
attempts++;
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,12 @@ public class ProxyClientConfig {
private LoadBalance loadBalance;

private int maxRetry;
private int senderMaxRetry;

/* pay attention to the last url parameter ip */
public ProxyClientConfig(String localHost, boolean requestByHttp, String managerIp,
int managerPort, String inlongGroupId, String authSecretId, String authSecretKey,
LoadBalance loadBalance, int virtualNode, int maxRetry) throws ProxysdkException {
LoadBalance loadBalance, int virtualNode, int maxRetry, int senderMaxRetry) throws ProxysdkException {
if (Utils.isBlank(localHost)) {
throw new ProxysdkException("localHost is blank!");
}
Expand Down Expand Up @@ -135,11 +136,12 @@ public ProxyClientConfig(String localHost, boolean requestByHttp, String manager
this.loadBalance = loadBalance;
this.virtualNode = virtualNode;
this.maxRetry = maxRetry;
this.senderMaxRetry = senderMaxRetry;
}

/* pay attention to the last url parameter ip */
public ProxyClientConfig(String managerAddress, String inlongGroupId, String authSecretId, String authSecretKey,
LoadBalance loadBalance, int virtualNode, int maxRetry) throws ProxysdkException {
LoadBalance loadBalance, int virtualNode, int maxRetry, int senderMaxRetry) throws ProxysdkException {
if (Utils.isBlank(managerAddress) || (!managerAddress.startsWith(ConfigConstants.HTTP)
&& !managerAddress.startsWith(ConfigConstants.HTTPS))) {
throw new ProxysdkException("managerAddress is blank or missing http/https protocol ");
Expand All @@ -166,6 +168,7 @@ public ProxyClientConfig(String managerAddress, String inlongGroupId, String aut
this.loadBalance = loadBalance;
this.virtualNode = virtualNode;
this.maxRetry = maxRetry;
this.senderMaxRetry = senderMaxRetry;
}

private String getManagerUrl(String managerAddress, String inlongGroupId) {
Expand All @@ -184,14 +187,14 @@ public ProxyClientConfig(String localHost, boolean requestByHttp, String manager
String inlongGroupId, String authSecretId, String authSecretKey) throws ProxysdkException {
this(localHost, requestByHttp, managerIp, managerPort, inlongGroupId, authSecretId, authSecretKey,
ConfigConstants.DEFAULT_LOAD_BALANCE, ConfigConstants.DEFAULT_VIRTUAL_NODE,
ConfigConstants.DEFAULT_RANDOM_MAX_RETRY);
ConfigConstants.DEFAULT_RANDOM_MAX_RETRY, ConfigConstants.DEFAULT_SENDER_MAX_RETRY);
}

public ProxyClientConfig(String managerAddress, String inlongGroupId, String authSecretId, String authSecretKey)
throws ProxysdkException {
this(managerAddress, inlongGroupId, authSecretId, authSecretKey,
ConfigConstants.DEFAULT_LOAD_BALANCE, ConfigConstants.DEFAULT_VIRTUAL_NODE,
ConfigConstants.DEFAULT_RANDOM_MAX_RETRY);
ConfigConstants.DEFAULT_RANDOM_MAX_RETRY, ConfigConstants.DEFAULT_SENDER_MAX_RETRY);
}

public String getTlsServerCertFilePathAndName() {
Expand Down Expand Up @@ -548,4 +551,10 @@ public int getMaxRetry() {
public void setMaxRetry(int maxRetry) {
this.maxRetry = maxRetry;
}
public int getSenderMaxRetry() {
return senderMaxRetry;
}
public void setSenderMaxRetry(int senderMaxRetry) {
this.senderMaxRetry = senderMaxRetry;
}
}

0 comments on commit b5aac77

Please sign in to comment.