Skip to content

Commit

Permalink
[INLONG-10756][Agent] Report file metrics for backend problem analysis (
Browse files Browse the repository at this point in the history
apache#10757)

* [INLONG-10756][Agent] Report file metrics for backend problem analysis

* [INLONG-10756][Agent] Report file metrics for backend problem analysis

* [INLONG-10756][Agent] Report file metrics for backend problem analysis
  • Loading branch information
justinwwhuang authored Aug 7, 2024
1 parent 8a27664 commit 0b2646a
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,30 @@
package org.apache.inlong.agent.core;

import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.core.task.MemoryManager;
import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.core.task.TaskManager;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ExcuteLinux;
import org.apache.inlong.common.constant.ProtocolType;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.common.SendResult;

import com.google.common.collect.Lists;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.commons.lang3.StringUtils;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.logging.log4j.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadMXBean;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -55,44 +51,99 @@
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;

/**
* Collect various indicators of agent processes for backend problem analysis
*/
public class AgentStatusManager {

@Data
@AllArgsConstructor
@NoArgsConstructor
public static class AgentStatus {

private String agentIp;
private String tag;
private String cluster;
private String agentVersion;
private String agentStartTime;
private String cpuCore;
private String procCpu;
private String freeMem;
private String maxMem;
private String useMem;
private String os;
private String installPlatform;
private String usrDir;
private String usrName;
private String processId;
private String globalConfigMd5;
private String taskMd5;
private String taskNum;
private String instanceNum;
private String bootTime;
private String sendPackageCount;
private String sendDataLen;
private String sourcePermitLeft;
private String queuePermitLeft;
private String writerPermitLeft;
private String activeThreadCount;

public String getFieldsString() {
List<String> fields = Lists.newArrayList();
fields.add(agentIp);
fields.add(tag);
fields.add(cluster);
fields.add(agentVersion);
fields.add(agentStartTime);
fields.add(cpuCore);
fields.add(procCpu);
fields.add(freeMem);
fields.add(maxMem);
fields.add(useMem);
fields.add(os);
fields.add(installPlatform);
fields.add(usrDir);
fields.add(usrName);
fields.add(processId);
fields.add(globalConfigMd5);
fields.add(taskMd5);
fields.add(taskNum);
fields.add(instanceNum);
fields.add(bootTime);
fields.add(sendPackageCount);
fields.add(sendDataLen);
fields.add(sourcePermitLeft);
fields.add(queuePermitLeft);
fields.add(writerPermitLeft);
fields.add(activeThreadCount);
return Strings.join(fields, ',');
}
}

private static final Logger LOGGER = LoggerFactory.getLogger(AgentStatusManager.class);
public static final String INLONG_AGENT_SYSTEM = "inlong_agent_system";
public static final String INLONG_AGENT_STATUS = "inlong_agent_status";

private static AgentStatusManager manager = null;
private final AgentConfiguration conf;
private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 设置格式
private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private Runtime runtime = Runtime.getRuntime();
final long GB = 1024 * 1024 * 1024;
private OperatingSystemMXBean osMxBean;
private ThreadMXBean threadBean;
private final long INVALID_CPU = -1;
private RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
private ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
private AgentManager agentManager;
public static AtomicLong sendDataLen = new AtomicLong();
public static AtomicLong sendPackageCount = new AtomicLong();
private DefaultMessageSender sender;
private List<String> statusFieldsPre = Lists.newArrayList();
private String processStartupTime = format.format(runtimeMXBean.getStartTime());
private String systemStartupTime = ExcuteLinux.exeCmd("who -b|awk '{print $(NF-1), $NF}'").replaceAll("\r|\n", "");
private String systemStartupTime = ExcuteLinux.exeCmd("uptime -s").replaceAll("\r|\n", "");

private AgentStatusManager(AgentManager agentManager) {
this.agentManager = agentManager;
this.conf = AgentConfiguration.getAgentConf();
osMxBean = ManagementFactory.getOperatingSystemMXBean();
threadBean = ManagementFactory.getThreadMXBean();
initStatusFieldsPre();
createMessageSender();
}

public static AgentStatusManager getInstance(AgentManager agentManager) {
Expand All @@ -113,16 +164,13 @@ public static AgentStatusManager getInstance() {
return manager;
}

public void sendStatusMsg(List<String> fields) {
public void sendStatusMsg(DefaultMessageSender sender) {
AgentStatus data = AgentStatusManager.getInstance().getStatus();
LOGGER.info("status detail: {}", data);
if (sender == null) {
LOGGER.error("sender is null");
createMessageSender();
if (sender == null) {
return;
}
return;
}
SendResult ret = sender.sendMessage(
StringUtils.join(fields, ",").getBytes(StandardCharsets.UTF_8),
SendResult ret = sender.sendMessage(data.getFieldsString().getBytes(StandardCharsets.UTF_8),
INLONG_AGENT_SYSTEM,
INLONG_AGENT_STATUS,
AgentUtils.getCurrentTime(),
Expand All @@ -132,33 +180,6 @@ public void sendStatusMsg(List<String> fields) {
}
}

public void printStatusMsg(List<String> fields) {
List<String> toPrint = new ArrayList<>();
for (int i = 0; i < statusFieldsPre.size(); i++) {
toPrint.add(statusFieldsPre.get(i) + ": " + fields.get(i));
}
LOGGER.info("status detail:\n{}", StringUtils.join(toPrint, "\n"));
}

private void createMessageSender() {
String managerAddr = conf.get(AGENT_MANAGER_ADDR);
String authSecretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID);
String authSecretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
ProxyClientConfig proxyClientConfig = null;
try {
proxyClientConfig = new ProxyClientConfig(managerAddr, INLONG_AGENT_SYSTEM, authSecretId, authSecretKey);
proxyClientConfig.setTotalAsyncCallbackSize(CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
proxyClientConfig.setIoThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
proxyClientConfig.setProtocolType(ProtocolType.TCP);
ThreadFactory SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-heartbeat",
Thread.currentThread().isDaemon());
sender = new DefaultMessageSender(proxyClientConfig, SHARED_FACTORY);
} catch (Exception e) {
LOGGER.error("heartbeat manager create sdk failed: ", e);
}
}

private double getProcessCpu() {
double cpu = tryGetProcessCpu();
int tryTimes = 0;
Expand Down Expand Up @@ -199,68 +220,38 @@ private double tryGetProcessCpu() {
return (((double) usedTime) / totalPassedTime) * 100;
}

public List<String> getStatusMessage() {
List<String> fields = Lists.newArrayList();
fields.add(AgentUtils.fetchLocalIp());
fields.add(conf.get(AGENT_CLUSTER_NAME));
fields.add(conf.get(AGENT_CLUSTER_TAG));
fields.add(TaskManager.class.getPackage().getImplementationVersion());
fields.add(processStartupTime);
fields.add(String.valueOf(runtime.availableProcessors()));
fields.add(String.valueOf(twoDecimal(getProcessCpu())));
fields.add(String.valueOf(twoDecimal((double) runtime.freeMemory() / GB)));
fields.add(String.valueOf(twoDecimal((double) runtime.maxMemory() / GB)));
fields.add(String.valueOf(twoDecimal((double) runtime.totalMemory() / GB)));
fields.add(System.getProperty("os.version"));
fields.add(conf.get(AGENT_INSTALL_PLATFORM, ""));
fields.add(System.getProperty("user.dir"));
fields.add(System.getProperty("user.name"));
fields.add(String.valueOf(getProcessId()));
private AgentStatus getStatus() {
AgentStatus data = new AgentStatus();
data.setAgentIp(AgentUtils.fetchLocalIp());
data.setTag(conf.get(AGENT_CLUSTER_TAG));
data.setCluster(conf.get(AGENT_CLUSTER_NAME));
data.setAgentVersion(TaskManager.class.getPackage().getImplementationVersion());
data.setAgentStartTime(processStartupTime);
data.setCpuCore(String.valueOf(runtime.availableProcessors()));
data.setProcCpu(String.valueOf(twoDecimal(getProcessCpu())));
data.setFreeMem(String.valueOf(twoDecimal((double) runtime.freeMemory() / GB)));
data.setMaxMem(String.valueOf(twoDecimal((double) runtime.maxMemory() / GB)));
data.setUseMem(String.valueOf(twoDecimal((double) runtime.totalMemory() / GB)));
data.setOs(System.getProperty("os.version"));
data.setInstallPlatform(conf.get(AGENT_INSTALL_PLATFORM, ""));
data.setUsrDir(System.getProperty("user.dir"));
data.setUsrName(System.getProperty("user.name"));
data.setProcessId(String.valueOf(getProcessId()));
if (AgentManager.getAgentConfigInfo() != null) {
fields.add(AgentManager.getAgentConfigInfo().getMd5());
} else {
fields.add("");
data.setGlobalConfigMd5(AgentManager.getAgentConfigInfo().getMd5());
}
fields.add(agentManager.getTaskManager().getTaskResultMd5());
fields.add(String.valueOf(agentManager.getTaskManager().getTaskStore().getTasks().size()));
fields.add(String.valueOf(OffsetManager.getInstance().getRunningInstanceCount()));
fields.add(systemStartupTime);
fields.add(String.valueOf(sendPackageCount.getAndSet(0)));
fields.add(String.valueOf(sendDataLen.getAndSet(0)));
fields.add(String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_SOURCE_PERMIT)));
fields.add(String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT)));
fields.add(String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_WRITER_PERMIT)));
fields.add(String.valueOf(threadMXBean.getThreadCount()));
return fields;
}

private void initStatusFieldsPre() {
statusFieldsPre.add("ip: ");
statusFieldsPre.add("cluster: ");
statusFieldsPre.add("tag: ");
statusFieldsPre.add("agent version: ");
statusFieldsPre.add("agent start time: ");
statusFieldsPre.add("cpu core: ");
statusFieldsPre.add("proc cpu: ");
statusFieldsPre.add("free mem: ");
statusFieldsPre.add("max mem: ");
statusFieldsPre.add("use mem: ");
statusFieldsPre.add("os: ");
statusFieldsPre.add("install platform: ");
statusFieldsPre.add("usr dir: ");
statusFieldsPre.add("usr name: ");
statusFieldsPre.add("process id: ");
statusFieldsPre.add("global config md5: ");
statusFieldsPre.add("task md5: ");
statusFieldsPre.add("task num: ");
statusFieldsPre.add("instance num: ");
statusFieldsPre.add("boot time: ");
statusFieldsPre.add("send package count: ");
statusFieldsPre.add("send data len: ");
statusFieldsPre.add("source permit left: ");
statusFieldsPre.add("queue permit left: ");
statusFieldsPre.add("writer permit left: ");
statusFieldsPre.add("active thread count: ");
data.setTaskMd5(agentManager.getTaskManager().getTaskResultMd5());
data.setTaskNum(String.valueOf(agentManager.getTaskManager().getTaskStore().getTasks().size()));
data.setInstanceNum(String.valueOf(OffsetManager.getInstance().getRunningInstanceCount()));
data.setBootTime(systemStartupTime);
data.setSendPackageCount(String.valueOf(sendPackageCount.getAndSet(0)));
data.setSendDataLen(String.valueOf(sendDataLen.getAndSet(0)));
data.setSourcePermitLeft(
String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_SOURCE_PERMIT)));
data.setQueuePermitLeft(String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT)));
data.setWriterPermitLeft(String.valueOf(MemoryManager.getInstance().getLeft(AGENT_GLOBAL_WRITER_PERMIT)));
data.setActiveThreadCount(String.valueOf(threadMXBean.getThreadCount()));
return data;
}

public double twoDecimal(double doubleValue) {
Expand Down
Loading

0 comments on commit 0b2646a

Please sign in to comment.