Skip to content

Commit

Permalink
[INLONG-11595][SDK] Optimize the implementation of node connection ma…
Browse files Browse the repository at this point in the history
…nagement (#11596)



Co-authored-by: gosonzhang <[email protected]>
  • Loading branch information
gosonzhang and gosonzhang authored Dec 11, 2024
1 parent 4d597ad commit d3be4d4
Show file tree
Hide file tree
Showing 15 changed files with 963 additions and 1,618 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class ConfigConstants {
public static final int VAL_MAX_CONFIG_SYNC_INTERVAL_MIN = 30;
public static final long VAL_UNIT_MIN_TO_MS = 60 * 1000L;
// config info sync max retry if failure
public static final int VAL_DEF_RETRY_IF_CONFIG_SYNC_FAIL = 3;
public static final int VAL_DEF_RETRY_IF_CONFIG_SYNC_FAIL = 2;
public static final int VAL_MAX_RETRY_IF_CONFIG_SYNC_FAIL = 5;
// cache config expired time in ms
public static final long VAL_DEF_CACHE_CONFIG_EXPIRED_MS = 20 * 60 * 1000L;
Expand All @@ -48,17 +48,30 @@ public class ConfigConstants {
public static final long VAL_MIN_FORCE_CHOOSE_INR_MS = 30 * 1000L;

// connection timeout in milliseconds
public static final int VAL_DEF_CONNECT_TIMEOUT_MS = 10000;
public static final int VAL_DEF_CONNECT_TIMEOUT_MS = 8000;
public static final int VAL_MIN_CONNECT_TIMEOUT_MS = 2000;
public static final int VAL_MAX_CONNECT_TIMEOUT_MS = 60000;
public static final int VAL_DEF_CONNECT_CLOSE_DELAY_MS = 500;
// socket timeout in milliseconds
public static final int VAL_DEF_SOCKET_TIMEOUT_MS = 20000;
public static final int VAL_MIN_SOCKET_TIMEOUT_MS = 2000;
public static final int VAL_MAX_SOCKET_TIMEOUT_MS = 60000;
// active connects
public static final int VAL_DEF_ALIVE_CONNECTIONS = 6;
public static final int VAL_MIN_ALIVE_CONNECTIONS = 1;
// request timeout in milliseconds
public static final long VAL_DEF_REQUEST_TIMEOUT_MS = 10000L;
public static final long VAL_MIN_REQUEST_TIMEOUT_MS = 500L;
// reconnect wait ms
public static final long VAL_DEF_RECONNECT_WAIT_MS = 1000L;
public static final long VAL_MAX_RECONNECT_WAIT_MS = 180000L;
// socket buffer size
public static final int DEFAULT_SEND_BUFFER_SIZE = 16777216;
public static final int DEFAULT_RECEIVE_BUFFER_SIZE = 16777216;
// max inflight msg count per connection
public static final long MAX_INFLIGHT_MSG_COUNT_PER_CONNECTION = 10000L;

public static final int ALIVE_CONNECTIONS = 3;
public static final int MAX_TIMEOUT_CNT = 3;
public static final int MAX_TIMEOUT_CNT = 10;
public static final int LOAD_THRESHOLD = 0;
public static final int CYCLE = 30;

Expand All @@ -81,24 +94,13 @@ public class ConfigConstants {

public static final int MAX_LINE_CNT = 30;

// request timeout in milliseconds
public static final long VAL_DEF_REQUEST_TIMEOUT_MS = 10000L;
public static final long VAL_MIN_REQUEST_TIMEOUT_MS = 500L;

public static final int DEFAULT_SEND_BUFFER_SIZE = 16777216;
public static final int DEFAULT_RECEIVE_BUFFER_SIZE = 16777216;

public static final String RECEIVE_BUFFER_SIZE = "receiveBufferSize";
public static final String SEND_BUFFER_SIZE = "sendBufferSize";

public static final int FLAG_ALLOW_AUTH = 1 << 7;
public static final int FLAG_ALLOW_ENCRYPT = 1 << 6;
public static final int FLAG_ALLOW_COMPRESS = 1 << 5;

public static LoadBalance DEFAULT_LOAD_BALANCE = LoadBalance.ROBIN;
public static int DEFAULT_VIRTUAL_NODE = 1000;
public static int DEFAULT_RANDOM_MAX_RETRY = 1000;

public static int DEFAULT_SENDER_MAX_ATTEMPT = 1;

/* Reserved attribute data size(bytes). */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public DefaultMessageSender(ProxyClientConfig configure) throws Exception {
public DefaultMessageSender(ProxyClientConfig configure, ThreadFactory selfDefineFactory) throws Exception {
ProxyUtils.validClientConfig(configure);
sender = new Sender(configure, selfDefineFactory);
sender.start();
groupId = configure.getInlongGroupId();
indexCol = new IndexCollectThread(storeIndex);
senderMaxAttempt = configure.getSenderMaxAttempt();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ public class ProxyClientConfig {
private boolean enableAuthentication = false;
private String authSecretId = "";
private String authSecretKey = "";
private String inlongGroupId;
private int aliveConnections = ConfigConstants.VAL_DEF_ALIVE_CONNECTIONS;

private int aliveConnections;
private int syncThreadPoolSize;
private int asyncCallbackSize;

private String inlongGroupId;
private boolean isNeedDataEncry = false;
private String rsaPubKeyUrl = "";
private String tlsServerCertFilePathAndName;
Expand All @@ -68,6 +68,14 @@ public class ProxyClientConfig {
// connect close wait period in milliseconds
private long conCloseWaitPeriodMs =
ConfigConstants.VAL_DEF_REQUEST_TIMEOUT_MS + ConfigConstants.VAL_DEF_CONNECT_CLOSE_DELAY_MS;
// client reconnect wait period in ms
private long reConnectWaitMs = ConfigConstants.VAL_DEF_RECONNECT_WAIT_MS;
// socket receive buffer
private int recvBufferSize = ConfigConstants.DEFAULT_RECEIVE_BUFFER_SIZE;
// socket send buffer
private int sendBufferSize = ConfigConstants.DEFAULT_SEND_BUFFER_SIZE;
// max message count per connection
private long maxMsgInFlightPerConn = ConfigConstants.MAX_INFLIGHT_MSG_COUNT_PER_CONNECTION;

// configuration for http client
// whether discard old metric when cache is full.
Expand All @@ -82,17 +90,11 @@ public class ProxyClientConfig {
private int ioThreadNum = Runtime.getRuntime().availableProcessors();
private boolean enableBusyWait = false;

private int virtualNode;

private LoadBalance loadBalance;

private int maxRetry;
private int senderMaxAttempt = ConfigConstants.DEFAULT_SENDER_MAX_ATTEMPT;

/* pay attention to the last url parameter ip */
public ProxyClientConfig(String localHost, boolean visitManagerByHttp, String managerIp,
int managerPort, String inlongGroupId, String authSecretId, String authSecretKey,
LoadBalance loadBalance, int virtualNode, int maxRetry) throws ProxysdkException {
int managerPort, String inlongGroupId, String authSecretId, String authSecretKey) throws ProxysdkException {
if (StringUtils.isBlank(localHost)) {
throw new ProxysdkException("localHost is blank!");
}
Expand All @@ -110,48 +112,26 @@ public ProxyClientConfig(String localHost, boolean visitManagerByHttp, String ma
this.managerPort = managerPort;
this.managerIP = managerIp;
IpUtils.validLocalIp(localHost);
this.aliveConnections = ConfigConstants.ALIVE_CONNECTIONS;
this.syncThreadPoolSize = ConfigConstants.SYNC_THREAD_POOL_SIZE;
this.asyncCallbackSize = ConfigConstants.ASYNC_CALLBACK_SIZE;
this.proxyHttpUpdateIntervalMinutes = ConfigConstants.PROXY_HTTP_UPDATE_INTERVAL_MINUTES;
this.authSecretId = authSecretId;
this.authSecretKey = authSecretKey;
this.loadBalance = loadBalance;
this.virtualNode = virtualNode;
this.maxRetry = maxRetry;
}

/* pay attention to the last url parameter ip */
public ProxyClientConfig(String managerAddress, String inlongGroupId, String authSecretId, String authSecretKey,
LoadBalance loadBalance, int virtualNode, int maxRetry) throws ProxysdkException {
public ProxyClientConfig(String managerAddress,
String inlongGroupId, String authSecretId, String authSecretKey) throws ProxysdkException {
checkAndParseAddress(managerAddress);
if (StringUtils.isBlank(inlongGroupId)) {
throw new ProxysdkException("groupId is blank!");
}
this.inlongGroupId = inlongGroupId.trim();
this.aliveConnections = ConfigConstants.ALIVE_CONNECTIONS;
this.syncThreadPoolSize = ConfigConstants.SYNC_THREAD_POOL_SIZE;
this.asyncCallbackSize = ConfigConstants.ASYNC_CALLBACK_SIZE;
this.proxyHttpUpdateIntervalMinutes = ConfigConstants.PROXY_HTTP_UPDATE_INTERVAL_MINUTES;
this.authSecretId = authSecretId;
this.authSecretKey = authSecretKey;
this.loadBalance = loadBalance;
this.virtualNode = virtualNode;
this.maxRetry = maxRetry;
}

public ProxyClientConfig(String localHost, boolean visitManagerByHttp, String managerIp, int managerPort,
String inlongGroupId, String authSecretId, String authSecretKey) throws ProxysdkException {
this(localHost, visitManagerByHttp, managerIp, managerPort, inlongGroupId, authSecretId, authSecretKey,
ConfigConstants.DEFAULT_LOAD_BALANCE, ConfigConstants.DEFAULT_VIRTUAL_NODE,
ConfigConstants.DEFAULT_RANDOM_MAX_RETRY);
}

public ProxyClientConfig(String managerAddress, String inlongGroupId, String authSecretId, String authSecretKey)
throws ProxysdkException {
this(managerAddress, inlongGroupId, authSecretId, authSecretKey,
ConfigConstants.DEFAULT_LOAD_BALANCE, ConfigConstants.DEFAULT_VIRTUAL_NODE,
ConfigConstants.DEFAULT_RANDOM_MAX_RETRY);
}

public String getManagerIP() {
Expand Down Expand Up @@ -269,14 +249,6 @@ public void setForceReChooseInrMs(long forceReChooseInrMs) {
Math.max(ConfigConstants.VAL_MIN_FORCE_CHOOSE_INR_MS, forceReChooseInrMs);
}

public String getTlsServerCertFilePathAndName() {
return tlsServerCertFilePathAndName;
}

public String getTlsServerKey() {
return tlsServerKey;
}

public String getInlongGroupId() {
return inlongGroupId;
}
Expand All @@ -286,7 +258,16 @@ public int getAliveConnections() {
}

public void setAliveConnections(int aliveConnections) {
this.aliveConnections = aliveConnections;
this.aliveConnections =
Math.max(ConfigConstants.VAL_MIN_ALIVE_CONNECTIONS, aliveConnections);
}

public String getTlsServerCertFilePathAndName() {
return tlsServerCertFilePathAndName;
}

public String getTlsServerKey() {
return tlsServerKey;
}

public int getSyncThreadPoolSize() {
Expand Down Expand Up @@ -351,6 +332,44 @@ public void setConCloseWaitPeriodMs(long conCloseWaitPeriodMs) {
}
}

public long getReConnectWaitMs() {
return reConnectWaitMs;
}

public void setReConnectWaitMs(long reConnectWaitMs) {
if (reConnectWaitMs > ConfigConstants.VAL_MAX_RECONNECT_WAIT_MS) {
this.reConnectWaitMs = ConfigConstants.VAL_MAX_RECONNECT_WAIT_MS;
}
}

public int getRecvBufferSize() {
return recvBufferSize;
}

public void setRecvBufferSize(int recvBufferSize) {
if (recvBufferSize > 0 && recvBufferSize < Integer.MAX_VALUE) {
this.recvBufferSize = recvBufferSize;
}
}

public int getSendBufferSize() {
return sendBufferSize;
}

public void setSendBufferSize(int sendBufferSize) {
if (sendBufferSize > 0 && sendBufferSize < Integer.MAX_VALUE) {
this.sendBufferSize = sendBufferSize;
}
}

public long getMaxMsgInFlightPerConn() {
return maxMsgInFlightPerConn;
}

public void setMaxMsgInFlightPerConn(long maxMsgInFlightPerConn) {
this.maxMsgInFlightPerConn = maxMsgInFlightPerConn;
}

public String getRsaPubKeyUrl() {
return rsaPubKeyUrl;
}
Expand Down Expand Up @@ -449,29 +468,6 @@ public void setEnableBusyWait(boolean enableBusyWait) {
this.enableBusyWait = enableBusyWait;
}

public int getVirtualNode() {
return virtualNode;
}

public void setVirtualNode(int virtualNode) {
this.virtualNode = virtualNode;
}

public LoadBalance getLoadBalance() {
return loadBalance;
}

public void setLoadBalance(LoadBalance loadBalance) {
this.loadBalance = loadBalance;
}

public int getMaxRetry() {
return maxRetry;
}

public void setMaxRetry(int maxRetry) {
this.maxRetry = maxRetry;
}
public int getSenderMaxAttempt() {
return senderMaxAttempt;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ public class EncodeObject {
private boolean isAuth = false;
private boolean isEncrypt = false;
private boolean isCompress = true;
private int groupIdNum;
private int streamIdNum;
private int groupIdNum = 0;
private int streamIdNum = 0;
private String groupId;
private String streamId;
private short load;
Expand Down Expand Up @@ -210,8 +210,20 @@ public boolean isGroupIdTransfer() {
return isGroupIdTransfer;
}

public void setGroupIdTransfer(boolean isGroupIdTransfer) {
this.isGroupIdTransfer = isGroupIdTransfer;
public int getGroupIdNum() {
return groupIdNum;
}

public int getStreamIdNum() {
return streamIdNum;
}

public void setGroupIdAndStreamIdNum(int groupIdNum, int streamIdNum) {
this.groupIdNum = groupIdNum;
this.streamIdNum = streamIdNum;
if (groupIdNum != 0 && streamIdNum != 0) {
this.isGroupIdTransfer = true;
}
}

public short getLoad() {
Expand Down Expand Up @@ -280,22 +292,6 @@ public void setEncryptEntry(boolean isEncrypt, String userName, EncryptConfigEnt
this.encryptEntry = encryptEntry;
}

public int getGroupIdNum() {
return groupIdNum;
}

public void setGroupIdNum(int groupIdNum) {
this.groupIdNum = groupIdNum;
}

public int getStreamIdNum() {
return streamIdNum;
}

public void setStreamIdNum(int streamIdNum) {
this.streamIdNum = streamIdNum;
}

public long getDt() {
return dt;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ public enum SendResult {
UNCONFIGURED_GROUPID_OR_STREAMID, // DataProxyErrCode(113)
TOPIC_IS_BLANK, // DataProxyErrCode(115)
DATAPROXY_FAIL_TO_RECEIVE, // DataProxyErrCode(114,116,117,118,119,120)
MESSAGE_TOO_LARGE,
WRITE_OVER_WATERMARK, /* error when water overflow */
MAX_FLIGHT_ON_ALL_CONNECTION,
NO_REMOTE_NODE_META_INFOS,
EMPTY_ACTIVE_NODE_SET,
NO_VALID_REMOTE_NODE,
SENDER_CLOSED,

UNKOWN_ERROR
}
Loading

0 comments on commit d3be4d4

Please sign in to comment.