From 4e862c1394cee0f1c7f21612645c8138c37b6168 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Mon, 26 Aug 2024 17:05:00 +0800 Subject: [PATCH] [INLONG-10889][Agent] When the oom is detected, the process exits --- .../org/apache/inlong/agent/constant/AgentConstants.java | 2 +- .../java/org/apache/inlong/agent/utils/ThreadUtils.java | 4 ++-- .../org/apache/inlong/agent/utils/file/FileUtils.java | 4 ++-- .../inlong/agent/plugin/instance/CommonInstance.java | 1 + .../agent/plugin/sinks/filecollect/SenderManager.java | 8 ++++---- .../apache/inlong/agent/plugin/sources/LogFileSource.java | 3 +++ .../inlong/agent/plugin/sources/file/AbstractSource.java | 2 ++ .../org/apache/inlong/agent/plugin/task/AbstractTask.java | 2 ++ 8 files changed, 17 insertions(+), 9 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java index ec5831973be..29bf91ed7e2 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java @@ -73,7 +73,7 @@ public class AgentConstants { public static final String DEFAULT_AGENT_HISTORY_PATH = ".history"; public static final String AGENT_ENABLE_OOM_EXIT = "agent.enable.oom.exit"; - public static final boolean DEFAULT_ENABLE_OOM_EXIT = false; + public static final boolean DEFAULT_ENABLE_OOM_EXIT = true; // pulsar sink config public static final String PULSAR_CLIENT_IO_TREHAD_NUM = "agent.sink.pulsar.client.io.thread.num"; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/ThreadUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/ThreadUtils.java index 42270ad7651..e438712ab80 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/ThreadUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/ThreadUtils.java @@ -35,7 +35,7 @@ public static void threadThrowableHandler(Thread t, Throwable e) { } private static void handleOOM(Thread t, Throwable e) { - if (ExceptionUtils.indexOfThrowable(e, java.lang.OutOfMemoryError.class) != -1) { + if (ExceptionUtils.indexOfThrowable(e, OutOfMemoryError.class) != -1) { LOGGER.error("Agent exit caused by {} OutOfMemory: ", t.getName(), e); forceShutDown(); } @@ -43,7 +43,7 @@ private static void handleOOM(Thread t, Throwable e) { private static void forceShutDown() { try { - Runtime.getRuntime().exit(-1); + Runtime.getRuntime().halt(-1); } catch (Throwable e) { LOGGER.error("exit failed, just halt, exception: ", e); Runtime.getRuntime().halt(-2); diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/file/FileUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/file/FileUtils.java index b141bad43a9..a9ffd0dec5d 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/file/FileUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/file/FileUtils.java @@ -58,7 +58,7 @@ public static long getFileCreationTime(String fileName) { creationTime = Files.readAttributes(Paths.get(fileName), BasicFileAttributes.class).creationTime() .toMillis(); } catch (IOException e) { - LOGGER.error("getFileCreationTime error {}", e.getMessage()); + LOGGER.error("getFileCreationTime error {}", e); } return creationTime; } @@ -68,7 +68,7 @@ public static long getFileLastModifyTime(String fileName) { try { lastModify = Files.getLastModifiedTime(Paths.get(fileName)).toMillis(); } catch (IOException e) { - LOGGER.error("getFileLastModifyTime error {}", e.getMessage()); + LOGGER.error("getFileLastModifyTime error {}", e); } return lastModify; } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java index 7eb77c7237e..415b05825af 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java @@ -114,6 +114,7 @@ public void run() { doRun(); } catch (Throwable e) { LOGGER.error("do run error: ", e); + ThreadUtils.threadThrowableHandler(Thread.currentThread(), e); } running = false; } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java index fff55577c68..984baf6de63 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java @@ -262,6 +262,7 @@ private void sendBatchWithRetryCount(SenderMessage message, int retry) { } retry++; AgentUtils.silenceSleepInMs(retrySleepTime); + ThreadUtils.threadThrowableHandler(Thread.currentThread(), exception); } } } @@ -299,10 +300,9 @@ private Runnable flushResendQueue() { message.getTotalSize(), auditVersion); sendBatchWithRetryCount(callback.message, callback.retry + 1); } - } catch (Exception ex) { - LOGGER.error("error caught", ex); - } catch (Throwable t) { - ThreadUtils.threadThrowableHandler(Thread.currentThread(), t); + } catch (Exception e) { + LOGGER.error("error caught", e); + ThreadUtils.threadThrowableHandler(Thread.currentThread(), e); } finally { AgentUtils.silenceSleepInMs(batchFlushInterval); } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index 929a3317779..4f2048b87f6 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -332,6 +332,9 @@ protected void releaseSource() { data.setReadBytes(String.valueOf(bytePosition)); data.setReadLines(String.valueOf(linePosition)); OffsetProfile offsetProfile = OffsetManager.getInstance().getOffset(taskId, instanceId); + if (offsetProfile == null) { + return; + } data.setSendLines(offsetProfile.getOffset()); FileStaticManager.getInstance().putStaticMsg(data); randomAccessFile.close(); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java index fde4cb4ff46..6ee2950d3c7 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java @@ -32,6 +32,7 @@ import org.apache.inlong.agent.plugin.file.Source; import org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler; import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.ThreadUtils; import org.apache.inlong.common.metric.MetricRegister; import lombok.AllArgsConstructor; @@ -153,6 +154,7 @@ private Runnable run() { doRun(); } catch (Throwable e) { LOGGER.error("do run error maybe file deleted: ", e); + ThreadUtils.threadThrowableHandler(Thread.currentThread(), e); } running = false; }; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java index c463543bf56..75d87bb235e 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java @@ -28,6 +28,7 @@ import org.apache.inlong.agent.state.State; import org.apache.inlong.agent.store.Store; import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,6 +112,7 @@ public void run() { doRun(); } catch (Throwable e) { LOGGER.error("do run error: ", e); + ThreadUtils.threadThrowableHandler(Thread.currentThread(), e); } running = false; }