From 58fe6ee3c8b5e76949acc79b00e9c2d9031acf05 Mon Sep 17 00:00:00 2001 From: Goson Zhang <4675739@qq.com> Date: Tue, 5 Nov 2024 18:55:46 +0800 Subject: [PATCH] [INLONG-11459][SDK] Add MetricConfig class to save metric-related settings (#11460) Co-authored-by: gosonzhang --- .../sdk/dataproxy/ProxyClientConfig.java | 72 +++-------- .../sdk/dataproxy/metric/MetricConfig.java | 118 ++++++++++++++++++ .../inlong/sdk/dataproxy/network/Sender.java | 38 ++++-- .../dataproxy/threads/MetricWorkerThread.java | 57 ++++----- 4 files changed, 181 insertions(+), 104 deletions(-) create mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java index f866b4b76df..d74f876fabe 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java @@ -17,6 +17,7 @@ package org.apache.inlong.sdk.dataproxy; +import org.apache.inlong.sdk.dataproxy.metric.MetricConfig; import org.apache.inlong.sdk.dataproxy.network.ProxysdkException; import org.apache.inlong.sdk.dataproxy.network.Utils; @@ -55,8 +56,8 @@ public class ProxyClientConfig { private String protocolType; private boolean enableSaveManagerVIps = false; - - private boolean enableSlaMetric = false; + // metric configure + private MetricConfig metricConfig = new MetricConfig(); private int managerConnectionTimeout = 10000; private boolean readProxyIPFromLocal = false; @@ -77,20 +78,8 @@ public class ProxyClientConfig { // interval for async worker in microseconds. private int asyncWorkerInterval = 500; private boolean cleanHttpCacheWhenClosing = false; - - // config for metric collector - // whether use groupId as key for metric, default is true - private boolean useGroupIdAsKey = true; - // whether use StreamId as key for metric, default is true - private boolean useStreamIdAsKey = true; - // whether use localIp as key for metric, default is true - private boolean useLocalIpAsKey = true; - // metric collection interval, default is 1 mins in milliseconds. - private int metricIntervalInMs = 60 * 1000; // max cache time for proxy config. private long maxProxyCacheTimeInMs = 30 * 60 * 1000; - // metric groupId - private String metricGroupId = "inlong_sla_metric"; private int ioThreadNum = Runtime.getRuntime().availableProcessors(); private boolean enableBusyWait = false; @@ -446,46 +435,6 @@ public void setCleanHttpCacheWhenClosing(boolean cleanHttpCacheWhenClosing) { this.cleanHttpCacheWhenClosing = cleanHttpCacheWhenClosing; } - public boolean isUseGroupIdAsKey() { - return useGroupIdAsKey; - } - - public void setUseGroupIdAsKey(boolean useGroupIdAsKey) { - this.useGroupIdAsKey = useGroupIdAsKey; - } - - public boolean isUseStreamIdAsKey() { - return useStreamIdAsKey; - } - - public void setUseStreamIdAsKey(boolean useStreamIdAsKey) { - this.useStreamIdAsKey = useStreamIdAsKey; - } - - public boolean isUseLocalIpAsKey() { - return useLocalIpAsKey; - } - - public void setUseLocalIpAsKey(boolean useLocalIpAsKey) { - this.useLocalIpAsKey = useLocalIpAsKey; - } - - public int getMetricIntervalInMs() { - return metricIntervalInMs; - } - - public void setMetricIntervalInMs(int metricIntervalInMs) { - this.metricIntervalInMs = metricIntervalInMs; - } - - public String getMetricGroupId() { - return metricGroupId; - } - - public void setMetricGroupId(String metricGroupId) { - this.metricGroupId = metricGroupId; - } - public long getMaxProxyCacheTimeInMs() { return maxProxyCacheTimeInMs; } @@ -502,12 +451,19 @@ public void setManagerConnectionTimeout(int managerConnectionTimeout) { this.managerConnectionTimeout = managerConnectionTimeout; } - public boolean isEnableSlaMetric() { - return enableSlaMetric; + public MetricConfig getMetricConfig() { + return metricConfig; } - public void setEnableSlaMetric(boolean enableSlaMetric) { - this.enableSlaMetric = enableSlaMetric; + public boolean isEnableMetric() { + return metricConfig.isEnableMetric(); + } + + public void setMetricConfig(MetricConfig metricConfig) { + if (metricConfig == null) { + return; + } + this.metricConfig = metricConfig; } public int getIoThreadNum() { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java new file mode 100644 index 00000000000..2a9543af297 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.metric; + +import org.apache.commons.lang.StringUtils; + +public class MetricConfig { + + private static final long DEF_METRIC_REPORT_INTVL_MS = 60000L; + private static final long MIN_METRIC_REPORT_INTVL_MS = 30000L; + private static final long DEF_METRIC_DATE_FORMAT_MS = 60000L; + private static final long MIN_METRIC_DATE_FORMAT_MS = 1L; + private static final String DEF_METRIC_REPORT_GROUP_ID = "inlong_sla_metric"; + // metric enable + private boolean enableMetric = false; + // whether use groupId as key for metric, default is true + private boolean useGroupIdAsKey = true; + // whether use StreamId as key for metric, default is true + private boolean useStreamIdAsKey = true; + // whether use localIp as key for metric, default is true + private boolean useLocalIpAsKey = true; + // metric report interval, default is 1 mins in milliseconds. + private long metricRptIntvlMs = DEF_METRIC_REPORT_INTVL_MS; + // metric date format + private long dateFormatIntvlMs = DEF_METRIC_DATE_FORMAT_MS; + // metric groupId + private String metricGroupId = DEF_METRIC_REPORT_GROUP_ID; + + public MetricConfig() { + + } + + public void setEnableMetric(boolean enableMetric) { + this.enableMetric = enableMetric; + } + + public boolean isEnableMetric() { + return enableMetric; + } + + public void setMetricKeyBuildParams( + boolean useGroupIdAsKey, boolean useStreamIdAsKey, boolean useLocalIpAsKey) { + this.useGroupIdAsKey = useGroupIdAsKey; + this.useStreamIdAsKey = useStreamIdAsKey; + this.useLocalIpAsKey = useLocalIpAsKey; + } + + public boolean isUseGroupIdAsKey() { + return useGroupIdAsKey; + } + + public boolean isUseStreamIdAsKey() { + return useStreamIdAsKey; + } + + public boolean isUseLocalIpAsKey() { + return useLocalIpAsKey; + } + + public void setMetricRptIntvlMs(long metricRptIntvlMs) { + if (metricRptIntvlMs >= MIN_METRIC_REPORT_INTVL_MS) { + this.metricRptIntvlMs = metricRptIntvlMs; + } + } + + public long getMetricRptIntvlMs() { + return metricRptIntvlMs; + } + + public void setDateFormatIntvlMs(long dateFormatIntvlMs) { + if (dateFormatIntvlMs >= MIN_METRIC_DATE_FORMAT_MS) { + this.dateFormatIntvlMs = dateFormatIntvlMs; + } + } + + public long getDateFormatIntvlMs() { + return dateFormatIntvlMs; + } + + public String getMetricGroupId() { + return metricGroupId; + } + + public void setMetricGroupId(String metricGroupId) { + if (StringUtils.isNotBlank(metricGroupId)) { + this.metricGroupId = metricGroupId; + } + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("MetricConfig{"); + sb.append("enableMetric=").append(enableMetric); + sb.append(", useGroupIdAsKey=").append(useGroupIdAsKey); + sb.append(", useStreamIdAsKey=").append(useStreamIdAsKey); + sb.append(", useLocalIpAsKey=").append(useLocalIpAsKey); + sb.append(", metricRptIntvlMs=").append(metricRptIntvlMs); + sb.append(", dateFormatIntvlMs=").append(dateFormatIntvlMs); + sb.append(", metricGroupId='").append(metricGroupId).append('\''); + sb.append('}'); + return sb.toString(); + } +} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java index d68a0c23308..9581da1f805 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java @@ -64,7 +64,7 @@ public class Sender { private final ClientMgr clientMgr; private final ProxyClientConfig configure; private final boolean isFile; - private final MetricWorkerThread metricWorker; + private MetricWorkerThread metricWorker = null; private int clusterId = -1; public Sender(ProxyClientConfig configure) throws Exception { @@ -102,8 +102,11 @@ public Sender(ProxyClientConfig configure, ThreadFactory selfDefineFactory) thro scanThread = new TimeoutScanThread(callbacks, currentBufferSize, configure, clientMgr); scanThread.start(); - metricWorker = new MetricWorkerThread(configure, this); - metricWorker.start(); + if (configure.isEnableMetric()) { + metricWorker = new MetricWorkerThread(configure, this); + metricWorker.start(); + } + LOGGER.info("proxy sdk is starting!"); } @@ -130,7 +133,9 @@ public void close() { scanThread.shutDown(); clientMgr.shutDown(); threadPool.shutdown(); - metricWorker.close(); + if (configure.isEnableMetric()) { + metricWorker.close(); + } } public String getExceptionStack(Throwable e) { @@ -227,8 +232,11 @@ private SendResult syncSendInternalMessage(NettyClient client, EncodeObject enco * @return */ public SendResult syncSendMessage(EncodeObject encodeObject, String msgUUID, long timeout, TimeUnit timeUnit) { - metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getGroupId(), encodeObject.getStreamId(), - Utils.getLocalIp(), encodeObject.getDt(), encodeObject.getPackageTime(), encodeObject.getRealCnt()); + if (configure.isEnableMetric()) { + metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getGroupId(), + encodeObject.getStreamId(), Utils.getLocalIp(), encodeObject.getDt(), + encodeObject.getPackageTime(), encodeObject.getRealCnt()); + } NettyClient client = clientMgr.getClient(clientMgr.getLoadBalance(), encodeObject); SendResult message = null; try { @@ -272,7 +280,9 @@ public SendResult syncSendMessage(EncodeObject encodeObject, String msgUUID, lon scanThread.resetTimeoutChannel(client.getChannel()); } if (message == SendResult.OK) { - metricWorker.recordSuccessByMessageId(encodeObject.getMessageId()); + if (configure.isEnableMetric()) { + metricWorker.recordSuccessByMessageId(encodeObject.getMessageId()); + } } return message; } @@ -510,12 +520,12 @@ private boolean validAttribute(String attr) { */ public void asyncSendMessage(EncodeObject encodeObject, SendMessageCallback callback, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException { - metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getGroupId(), - encodeObject.getStreamId(), Utils.getLocalIp(), encodeObject.getPackageTime(), - encodeObject.getDt(), encodeObject.getRealCnt()); - + if (configure.isEnableMetric()) { + metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getGroupId(), + encodeObject.getStreamId(), Utils.getLocalIp(), encodeObject.getPackageTime(), + encodeObject.getDt(), encodeObject.getRealCnt()); + } // send message package time - NettyClient client = clientMgr.getClient(clientMgr.getLoadBalance(), encodeObject); if (client == null) { throw new ProxysdkException(SendResult.NO_CONNECTION.toString()); @@ -585,7 +595,9 @@ public void notifyFeedback(Channel channel, EncodeObject response) { SyncMessageCallable callable = syncCallables.remove(messageId); SendResult result = response.getSendResult(); if (result == SendResult.OK) { - metricWorker.recordSuccessByMessageId(messageId); + if (configure.isEnableMetric()) { + metricWorker.recordSuccessByMessageId(messageId); + } } else { LOGGER.error("{} exception happens, error message {}", channel, response.getErrMsg()); } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java index ac6da06c61a..270531bf5bf 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java @@ -22,6 +22,7 @@ import org.apache.inlong.sdk.dataproxy.common.FileCallback; import org.apache.inlong.sdk.dataproxy.common.SendResult; import org.apache.inlong.sdk.dataproxy.metric.MessageRecord; +import org.apache.inlong.sdk.dataproxy.metric.MetricConfig; import org.apache.inlong.sdk.dataproxy.metric.MetricTimeNumSummary; import org.apache.inlong.sdk.dataproxy.network.Sender; import org.apache.inlong.sdk.dataproxy.network.SequentialID; @@ -40,38 +41,30 @@ */ public class MetricWorkerThread extends Thread implements Closeable { + private static final long DEF_METRIC_DELAY_TIME_MS = 20 * 1000L; private static final String DEFAULT_KEY_ITEM = ""; private static final String DEFAULT_KEY_SPLITTER = "#"; private final Logger logger = LoggerFactory.getLogger(MetricWorkerThread.class); private final SequentialID idGenerator = new SequentialID(Utils.getLocalIp()); - private final ConcurrentHashMap metricValueCache = new ConcurrentHashMap<>(); - private final ConcurrentHashMap metricPackTimeMap = new ConcurrentHashMap<>(); - private final ConcurrentHashMap metricDtMap = new ConcurrentHashMap<>(); - - private final ProxyClientConfig proxyClientConfig; - - private final long delayTime; + private final MetricConfig metricConfig; + private final long delayTime = DEF_METRIC_DELAY_TIME_MS; private final Sender sender; - private final boolean enableSlaMetric; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private volatile boolean bShutdown = false; public MetricWorkerThread(ProxyClientConfig proxyClientConfig, Sender sender) { - this.proxyClientConfig = proxyClientConfig; - this.enableSlaMetric = proxyClientConfig.isEnableSlaMetric(); - - this.delayTime = 20 * 1000; + this.metricConfig = proxyClientConfig.getMetricConfig(); this.sender = sender; this.setDaemon(true); this.setName("MetricWorkerThread"); } public long getFormatKeyTime(long keyTime) { - return keyTime - keyTime % proxyClientConfig.getMetricIntervalInMs(); + return keyTime - keyTime % metricConfig.getDateFormatIntvlMs(); } /** @@ -79,9 +72,9 @@ public long getFormatKeyTime(long keyTime) { */ private String getKeyStringByConfig(String groupId, String streamId, String localIp, long keyTime) { StringBuilder builder = new StringBuilder(); - String groupIdStr = proxyClientConfig.isUseGroupIdAsKey() ? groupId : DEFAULT_KEY_ITEM; - String streamIdStr = proxyClientConfig.isUseStreamIdAsKey() ? streamId : DEFAULT_KEY_ITEM; - String localIpStr = proxyClientConfig.isUseLocalIpAsKey() ? localIp : DEFAULT_KEY_ITEM; + String groupIdStr = metricConfig.isUseGroupIdAsKey() ? groupId : DEFAULT_KEY_ITEM; + String streamIdStr = metricConfig.isUseStreamIdAsKey() ? streamId : DEFAULT_KEY_ITEM; + String localIpStr = metricConfig.isUseLocalIpAsKey() ? localIp : DEFAULT_KEY_ITEM; builder.append(groupIdStr).append(DEFAULT_KEY_SPLITTER) .append(streamIdStr).append(DEFAULT_KEY_SPLITTER) @@ -103,7 +96,7 @@ private String getKeyStringByConfig(String groupId, String streamId, String loca */ public void recordNumByKey(String msgId, String groupId, String streamId, String localIp, long packTime, long dt, int num) { - if (!enableSlaMetric) { + if (!metricConfig.isEnableMetric()) { return; } MessageRecord messageRecord = new MessageRecord(groupId, streamId, localIp, msgId, @@ -127,7 +120,7 @@ private MetricTimeNumSummary getMetricSummary(String keyName, MetricTimeNumSumma * @param msgId msg id */ public void recordSuccessByMessageId(String msgId) { - if (!enableSlaMetric) { + if (!metricConfig.isEnableMetric()) { return; } MessageRecord messageRecord = metricValueCache.remove(msgId); @@ -176,35 +169,36 @@ public void recordFailedByMessageId(String msgId) { public void close() { bShutdown = true; flushMetric(true); + logger.info("MetricWorkerThread closed!"); } @Override public void run() { - logger.info("MetricWorkerThread Thread=" + Thread.currentThread().getId() + " started!"); + logger.info("MetricWorkerThread thread=" + Thread.currentThread().getId() + " started!"); while (!bShutdown) { // check metric try { checkCacheRecords(); flushMetric(false); - TimeUnit.MILLISECONDS.sleep(proxyClientConfig.getMetricIntervalInMs()); - } catch (Exception ex) { + TimeUnit.MILLISECONDS.sleep(metricConfig.getMetricRptIntvlMs()); + } catch (Throwable ex) { // exception happens } } + logger.info("MetricWorkerThread thread existed!"); } private void tryToSendMetricToManager(EncodeObject encodeObject, MetricSendCallBack callBack) { callBack.increaseRetry(); try { - if (callBack.getRetryCount() < 4) { sender.asyncSendMessageIndex(encodeObject, callBack, String.valueOf(System.currentTimeMillis()), 20, TimeUnit.SECONDS); } else { - logger.error("error while sending {} {}", encodeObject.getBodyBytes(), encodeObject.getBodylist()); + logger.error("Send metric failure: {} {}", encodeObject.getBodyBytes(), encodeObject.getBodylist()); } - } catch (Exception ex) { - logger.warn("exception caught {}", ex.getMessage()); + } catch (Throwable ex) { + logger.warn("Send metric throw exception", ex); tryToSendMetricToManager(encodeObject, callBack); } } @@ -213,7 +207,7 @@ private void sendSingleLine(String line, String streamId, long dtTime) { EncodeObject encodeObject = new EncodeObject(line.getBytes(), 7, false, false, false, dtTime, idGenerator.getNextInt(), - proxyClientConfig.getMetricGroupId(), streamId, "", "", Utils.getLocalIp()); + metricConfig.getMetricGroupId(), streamId, "", "", Utils.getLocalIp()); MetricSendCallBack callBack = new MetricSendCallBack(encodeObject); tryToSendMetricToManager(encodeObject, callBack); } @@ -222,7 +216,7 @@ private void flushMapRecords(boolean isClosing, ConcurrentHashMap proxyClientConfig.getMetricIntervalInMs())) { + + delayTime > metricConfig.getMetricRptIntvlMs())) { summary = cacheMap.remove(keyName); if (summary != null) { long metricDtTime = summary.getStartCalculateTime() / 1000; @@ -231,9 +225,7 @@ private void flushMapRecords(boolean isClosing, ConcurrentHashMap proxyClientConfig.getMetricIntervalInMs()) { + if (record != null && record.getMessageTime() + delayTime > metricConfig.getMetricRptIntvlMs()) { recordFailedByMessageId(msgId); } } @@ -298,7 +289,7 @@ public void onMessageAck(String result) { if (!SendResult.OK.toString().equals(result)) { tryToSendMetricToManager(encodeObject, this); } else { - logger.info("metric is ok"); + logger.debug("Send metric is ok!"); } }