From 433f531853c24b2236e29f1991be44e1437700fe Mon Sep 17 00:00:00 2001 From: Goson Zhang <4675739@qq.com> Date: Wed, 8 Jan 2025 10:37:37 +0800 Subject: [PATCH] [INLONG-11656][SDK] DataProxy Java SDK lacks a common-lang dependency (#11657) * [INLONG-11656][SDK] DataProxy Java SDK lacks a common-lang dependency * [INLONG-11656][SDK] DataProxy Java SDK lacks a common-lang dependency --------- Co-authored-by: gosonzhang --- .../dataproxy/config/ProxyConfigManager.java | 42 +++++++++---------- .../sdk/dataproxy/metric/MetricConfig.java | 2 +- .../sdk/dataproxy/network/ClientMgr.java | 5 ++- .../inlong/sdk/dataproxy/network/Sender.java | 2 +- .../ContextProxyClusterConfigLoader.java | 2 +- 5 files changed, 28 insertions(+), 25 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java index 97952eb1ff8..8ff1dc0ea92 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java @@ -374,8 +374,8 @@ private void doEncryptConfigEntryQueryWork() throws Exception { } if (result.getF0() == null) { if (this.userEncryptConfigEntry != null) { - logger.warn("ConfigManager({}) connect manager({}) failure, using the last pubKey, secretId={}", - this.callerId, this.encryptConfigVisitUrl, this.clientConfig.getAuthSecretId()); + logger.warn("ConfigManager({}) connect manager({}) failure, using the last pubKey, userName={}", + this.callerId, this.encryptConfigVisitUrl, this.clientConfig.getUserName()); return; } throw new Exception("Visit manager error:" + result.getF1()); @@ -570,8 +570,8 @@ private Tuple2 requestPubKeyFromManager() { pubKeyConf = JsonParser.parseString(queryResult.getF1()).getAsJsonObject(); } catch (Throwable ex) { if (parseCounter.shouldPrint()) { - logger.warn("ConfigManager({}) parse failure, secretId={}, config={}!", - this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); + logger.warn("ConfigManager({}) parse failure, userName={}, config={}!", + this.callerId, this.clientConfig.getUserName(), queryResult.getF1()); } errorMsg = "parse pubkey failure:" + ex.getMessage(); bookManagerQryFailStatus(false, errorMsg); @@ -585,23 +585,23 @@ private Tuple2 requestPubKeyFromManager() { try { if (!pubKeyConf.has("resultCode")) { if (parseCounter.shouldPrint()) { - logger.warn("ConfigManager({}) config failure: resultCode field not exist, secretId={}, config={}!", - this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); + logger.warn("ConfigManager({}) config failure: resultCode field not exist, userName={}, config={}!", + this.callerId, this.clientConfig.getUserName(), queryResult.getF1()); } throw new Exception("resultCode field not exist"); } int resultCode = pubKeyConf.get("resultCode").getAsInt(); if (resultCode != 0) { if (parseCounter.shouldPrint()) { - logger.warn("ConfigManager({}) config failure: resultCode != 0, secretId={}, config={}!", - this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); + logger.warn("ConfigManager({}) config failure: resultCode != 0, userName={}, config={}!", + this.callerId, this.clientConfig.getUserName(), queryResult.getF1()); } throw new Exception("resultCode != 0!"); } if (!pubKeyConf.has("resultData")) { if (parseCounter.shouldPrint()) { - logger.warn("ConfigManager({}) config failure: resultData field not exist, secretId={}, config={}!", - this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); + logger.warn("ConfigManager({}) config failure: resultData field not exist, userName={}, config={}!", + this.callerId, this.clientConfig.getUserName(), queryResult.getF1()); } throw new Exception("resultData field not exist"); } @@ -610,24 +610,24 @@ private Tuple2 requestPubKeyFromManager() { String publicKey = resultData.get("publicKey").getAsString(); if (StringUtils.isBlank(publicKey)) { if (parseCounter.shouldPrint()) { - logger.warn("ConfigManager({}) config failure: publicKey is blank, secretId={}, config={}!", - this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); + logger.warn("ConfigManager({}) config failure: publicKey is blank, userName={}, config={}!", + this.callerId, this.clientConfig.getUserName(), queryResult.getF1()); } throw new Exception("publicKey is blank!"); } String username = resultData.get("username").getAsString(); if (StringUtils.isBlank(username)) { if (parseCounter.shouldPrint()) { - logger.warn("ConfigManager({}) config failure: username is blank, secretId={}, config={}!", - this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); + logger.warn("ConfigManager({}) config failure: username is blank, userName={}, config={}!", + this.callerId, this.clientConfig.getUserName(), queryResult.getF1()); } throw new Exception("username is blank!"); } String versionStr = resultData.get("version").getAsString(); if (StringUtils.isBlank(versionStr)) { if (parseCounter.shouldPrint()) { - logger.warn("ConfigManager({}) config failure: version is blank, secretId={}, config={}!", - this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); + logger.warn("ConfigManager({}) config failure: version is blank, userName={}, config={}!", + this.callerId, this.clientConfig.getUserName(), queryResult.getF1()); } throw new Exception("version is blank!"); } @@ -670,8 +670,8 @@ private Tuple2 readCachedPubKeyEntry() { } } catch (Throwable ex) { if (exptCounter.shouldPrint()) { - logger.warn("ConfigManager({}) read({}) file exception, secretId={}", - callerId, encryptConfigCacheFile, clientConfig.getAuthSecretId(), ex); + logger.warn("ConfigManager({}) read({}) file exception, userName={}", + callerId, encryptConfigCacheFile, clientConfig.getUserName(), ex); } return new Tuple2<>(null, "read PubKeyEntry file failure:" + ex.getMessage()); } finally { @@ -705,8 +705,8 @@ private void writeCachePubKeyEntryFile(EncryptConfigEntry entry) { // p.close(); } catch (Throwable ex) { if (exptCounter.shouldPrint()) { - logger.warn("ConfigManager({}) write file({}) exception, secretId={}, content={}", - callerId, encryptConfigCacheFile, clientConfig.getAuthSecretId(), entry.toString(), ex); + logger.warn("ConfigManager({}) write file({}) exception, userName={}, content={}", + callerId, encryptConfigCacheFile, clientConfig.getUserName(), entry.toString(), ex); } } finally { if (fos != null) { @@ -836,7 +836,7 @@ private void storeAndBuildMetaConfigure(ProxyClientConfig config) { this.encryptConfigCacheFile = strBuff .append(clientConfig.getConfigStoreBasePath()) .append(ConfigConstants.META_STORE_SUB_DIR) - .append(clientConfig.getAuthSecretId()) + .append(clientConfig.getUserName()) .append(ConfigConstants.REMOTE_ENCRYPT_CACHE_FILE_SUFFIX) .toString(); strBuff.delete(0, strBuff.length()); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java index 2a9543af297..29ea696c7b0 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java @@ -17,7 +17,7 @@ package org.apache.inlong.sdk.dataproxy.metric; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; public class MetricConfig { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java index ea9fc15a510..27003f64111 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java @@ -33,7 +33,7 @@ import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; -import org.apache.commons.lang.mutable.MutableBoolean; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -322,6 +322,9 @@ public void updateProxyInfoList(boolean nodeChanged, List newNodes) { do { int selectCnt = 0; for (HostInfo hostInfo : candidateNodes) { + if (realHosts.contains(hostInfo.getReferenceName())) { + continue; + } try { client = new NettyClient(this.sender.getInstanceId(), this.bootstrap, hostInfo, this.configure); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java index eb320b4a2ac..5ee4c93e4b2 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java @@ -28,8 +28,8 @@ import org.apache.inlong.sdk.dataproxy.utils.Tuple2; import io.netty.channel.Channel; -import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/config/ContextProxyClusterConfigLoader.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/config/ContextProxyClusterConfigLoader.java index dc14458830f..c23c623973d 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/config/ContextProxyClusterConfigLoader.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/config/ContextProxyClusterConfigLoader.java @@ -23,7 +23,7 @@ import org.apache.inlong.sdk.dataproxy.pb.config.pojo.ProxyInfo; import com.alibaba.fastjson.JSON; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.flume.Context; import org.slf4j.Logger; import org.slf4j.LoggerFactory;