Skip to content

Commit

Permalink
[INLONG-11459][SDK] Add MetricConfig class to save metric-related set…
Browse files Browse the repository at this point in the history
…tings (apache#11460)



Co-authored-by: gosonzhang <[email protected]>
  • Loading branch information
gosonzhang and gosonzhang authored Nov 5, 2024
1 parent d339ed7 commit 58fe6ee
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.sdk.dataproxy;

import org.apache.inlong.sdk.dataproxy.metric.MetricConfig;
import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
import org.apache.inlong.sdk.dataproxy.network.Utils;

Expand Down Expand Up @@ -55,8 +56,8 @@ public class ProxyClientConfig {
private String protocolType;

private boolean enableSaveManagerVIps = false;

private boolean enableSlaMetric = false;
// metric configure
private MetricConfig metricConfig = new MetricConfig();

private int managerConnectionTimeout = 10000;
private boolean readProxyIPFromLocal = false;
Expand All @@ -77,20 +78,8 @@ public class ProxyClientConfig {
// interval for async worker in microseconds.
private int asyncWorkerInterval = 500;
private boolean cleanHttpCacheWhenClosing = false;

// config for metric collector
// whether use groupId as key for metric, default is true
private boolean useGroupIdAsKey = true;
// whether use StreamId as key for metric, default is true
private boolean useStreamIdAsKey = true;
// whether use localIp as key for metric, default is true
private boolean useLocalIpAsKey = true;
// metric collection interval, default is 1 mins in milliseconds.
private int metricIntervalInMs = 60 * 1000;
// max cache time for proxy config.
private long maxProxyCacheTimeInMs = 30 * 60 * 1000;
// metric groupId
private String metricGroupId = "inlong_sla_metric";

private int ioThreadNum = Runtime.getRuntime().availableProcessors();
private boolean enableBusyWait = false;
Expand Down Expand Up @@ -446,46 +435,6 @@ public void setCleanHttpCacheWhenClosing(boolean cleanHttpCacheWhenClosing) {
this.cleanHttpCacheWhenClosing = cleanHttpCacheWhenClosing;
}

public boolean isUseGroupIdAsKey() {
return useGroupIdAsKey;
}

public void setUseGroupIdAsKey(boolean useGroupIdAsKey) {
this.useGroupIdAsKey = useGroupIdAsKey;
}

public boolean isUseStreamIdAsKey() {
return useStreamIdAsKey;
}

public void setUseStreamIdAsKey(boolean useStreamIdAsKey) {
this.useStreamIdAsKey = useStreamIdAsKey;
}

public boolean isUseLocalIpAsKey() {
return useLocalIpAsKey;
}

public void setUseLocalIpAsKey(boolean useLocalIpAsKey) {
this.useLocalIpAsKey = useLocalIpAsKey;
}

public int getMetricIntervalInMs() {
return metricIntervalInMs;
}

public void setMetricIntervalInMs(int metricIntervalInMs) {
this.metricIntervalInMs = metricIntervalInMs;
}

public String getMetricGroupId() {
return metricGroupId;
}

public void setMetricGroupId(String metricGroupId) {
this.metricGroupId = metricGroupId;
}

public long getMaxProxyCacheTimeInMs() {
return maxProxyCacheTimeInMs;
}
Expand All @@ -502,12 +451,19 @@ public void setManagerConnectionTimeout(int managerConnectionTimeout) {
this.managerConnectionTimeout = managerConnectionTimeout;
}

public boolean isEnableSlaMetric() {
return enableSlaMetric;
public MetricConfig getMetricConfig() {
return metricConfig;
}

public void setEnableSlaMetric(boolean enableSlaMetric) {
this.enableSlaMetric = enableSlaMetric;
public boolean isEnableMetric() {
return metricConfig.isEnableMetric();
}

public void setMetricConfig(MetricConfig metricConfig) {
if (metricConfig == null) {
return;
}
this.metricConfig = metricConfig;
}

public int getIoThreadNum() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy.metric;

import org.apache.commons.lang.StringUtils;

public class MetricConfig {

private static final long DEF_METRIC_REPORT_INTVL_MS = 60000L;
private static final long MIN_METRIC_REPORT_INTVL_MS = 30000L;
private static final long DEF_METRIC_DATE_FORMAT_MS = 60000L;
private static final long MIN_METRIC_DATE_FORMAT_MS = 1L;
private static final String DEF_METRIC_REPORT_GROUP_ID = "inlong_sla_metric";
// metric enable
private boolean enableMetric = false;
// whether use groupId as key for metric, default is true
private boolean useGroupIdAsKey = true;
// whether use StreamId as key for metric, default is true
private boolean useStreamIdAsKey = true;
// whether use localIp as key for metric, default is true
private boolean useLocalIpAsKey = true;
// metric report interval, default is 1 mins in milliseconds.
private long metricRptIntvlMs = DEF_METRIC_REPORT_INTVL_MS;
// metric date format
private long dateFormatIntvlMs = DEF_METRIC_DATE_FORMAT_MS;
// metric groupId
private String metricGroupId = DEF_METRIC_REPORT_GROUP_ID;

public MetricConfig() {

}

public void setEnableMetric(boolean enableMetric) {
this.enableMetric = enableMetric;
}

public boolean isEnableMetric() {
return enableMetric;
}

public void setMetricKeyBuildParams(
boolean useGroupIdAsKey, boolean useStreamIdAsKey, boolean useLocalIpAsKey) {
this.useGroupIdAsKey = useGroupIdAsKey;
this.useStreamIdAsKey = useStreamIdAsKey;
this.useLocalIpAsKey = useLocalIpAsKey;
}

public boolean isUseGroupIdAsKey() {
return useGroupIdAsKey;
}

public boolean isUseStreamIdAsKey() {
return useStreamIdAsKey;
}

public boolean isUseLocalIpAsKey() {
return useLocalIpAsKey;
}

public void setMetricRptIntvlMs(long metricRptIntvlMs) {
if (metricRptIntvlMs >= MIN_METRIC_REPORT_INTVL_MS) {
this.metricRptIntvlMs = metricRptIntvlMs;
}
}

public long getMetricRptIntvlMs() {
return metricRptIntvlMs;
}

public void setDateFormatIntvlMs(long dateFormatIntvlMs) {
if (dateFormatIntvlMs >= MIN_METRIC_DATE_FORMAT_MS) {
this.dateFormatIntvlMs = dateFormatIntvlMs;
}
}

public long getDateFormatIntvlMs() {
return dateFormatIntvlMs;
}

public String getMetricGroupId() {
return metricGroupId;
}

public void setMetricGroupId(String metricGroupId) {
if (StringUtils.isNotBlank(metricGroupId)) {
this.metricGroupId = metricGroupId;
}
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("MetricConfig{");
sb.append("enableMetric=").append(enableMetric);
sb.append(", useGroupIdAsKey=").append(useGroupIdAsKey);
sb.append(", useStreamIdAsKey=").append(useStreamIdAsKey);
sb.append(", useLocalIpAsKey=").append(useLocalIpAsKey);
sb.append(", metricRptIntvlMs=").append(metricRptIntvlMs);
sb.append(", dateFormatIntvlMs=").append(dateFormatIntvlMs);
sb.append(", metricGroupId='").append(metricGroupId).append('\'');
sb.append('}');
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class Sender {
private final ClientMgr clientMgr;
private final ProxyClientConfig configure;
private final boolean isFile;
private final MetricWorkerThread metricWorker;
private MetricWorkerThread metricWorker = null;
private int clusterId = -1;

public Sender(ProxyClientConfig configure) throws Exception {
Expand Down Expand Up @@ -102,8 +102,11 @@ public Sender(ProxyClientConfig configure, ThreadFactory selfDefineFactory) thro
scanThread = new TimeoutScanThread(callbacks, currentBufferSize, configure, clientMgr);
scanThread.start();

metricWorker = new MetricWorkerThread(configure, this);
metricWorker.start();
if (configure.isEnableMetric()) {
metricWorker = new MetricWorkerThread(configure, this);
metricWorker.start();
}

LOGGER.info("proxy sdk is starting!");
}

Expand All @@ -130,7 +133,9 @@ public void close() {
scanThread.shutDown();
clientMgr.shutDown();
threadPool.shutdown();
metricWorker.close();
if (configure.isEnableMetric()) {
metricWorker.close();
}
}

public String getExceptionStack(Throwable e) {
Expand Down Expand Up @@ -227,8 +232,11 @@ private SendResult syncSendInternalMessage(NettyClient client, EncodeObject enco
* @return
*/
public SendResult syncSendMessage(EncodeObject encodeObject, String msgUUID, long timeout, TimeUnit timeUnit) {
metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getGroupId(), encodeObject.getStreamId(),
Utils.getLocalIp(), encodeObject.getDt(), encodeObject.getPackageTime(), encodeObject.getRealCnt());
if (configure.isEnableMetric()) {
metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getGroupId(),
encodeObject.getStreamId(), Utils.getLocalIp(), encodeObject.getDt(),
encodeObject.getPackageTime(), encodeObject.getRealCnt());
}
NettyClient client = clientMgr.getClient(clientMgr.getLoadBalance(), encodeObject);
SendResult message = null;
try {
Expand Down Expand Up @@ -272,7 +280,9 @@ public SendResult syncSendMessage(EncodeObject encodeObject, String msgUUID, lon
scanThread.resetTimeoutChannel(client.getChannel());
}
if (message == SendResult.OK) {
metricWorker.recordSuccessByMessageId(encodeObject.getMessageId());
if (configure.isEnableMetric()) {
metricWorker.recordSuccessByMessageId(encodeObject.getMessageId());
}
}
return message;
}
Expand Down Expand Up @@ -510,12 +520,12 @@ private boolean validAttribute(String attr) {
*/
public void asyncSendMessage(EncodeObject encodeObject, SendMessageCallback callback, String msgUUID,
long timeout, TimeUnit timeUnit) throws ProxysdkException {
metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getGroupId(),
encodeObject.getStreamId(), Utils.getLocalIp(), encodeObject.getPackageTime(),
encodeObject.getDt(), encodeObject.getRealCnt());

if (configure.isEnableMetric()) {
metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getGroupId(),
encodeObject.getStreamId(), Utils.getLocalIp(), encodeObject.getPackageTime(),
encodeObject.getDt(), encodeObject.getRealCnt());
}
// send message package time

NettyClient client = clientMgr.getClient(clientMgr.getLoadBalance(), encodeObject);
if (client == null) {
throw new ProxysdkException(SendResult.NO_CONNECTION.toString());
Expand Down Expand Up @@ -585,7 +595,9 @@ public void notifyFeedback(Channel channel, EncodeObject response) {
SyncMessageCallable callable = syncCallables.remove(messageId);
SendResult result = response.getSendResult();
if (result == SendResult.OK) {
metricWorker.recordSuccessByMessageId(messageId);
if (configure.isEnableMetric()) {
metricWorker.recordSuccessByMessageId(messageId);
}
} else {
LOGGER.error("{} exception happens, error message {}", channel, response.getErrMsg());
}
Expand Down
Loading

0 comments on commit 58fe6ee

Please sign in to comment.