From 0875eef0d347dc7c6db65ddad7cc8c05977ac1db Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Thu, 26 Oct 2023 17:21:42 +0800 Subject: [PATCH] [INLONG-9120][Agent] Add offset db to store the offset data --- .../inlong/agent/constant/AgentConstants.java | 3 + .../agent/constant/CommonConstants.java | 13 ++- .../org/apache/inlong/agent/db/OffsetDb.java | 83 +++++++++++++++++++ 3 files changed, 96 insertions(+), 3 deletions(-) create mode 100755 inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java index 58976a289cb..20bf4af7933 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java @@ -44,6 +44,9 @@ public class AgentConstants { public static final String AGENT_ROCKS_DB_PATH = "agent.rocks.db.path"; public static final String DEFAULT_AGENT_ROCKS_DB_PATH = ".rocksdb"; + public static final String AGENT_LOCAL_DB_PATH_TASK = ".localdb/task"; + public static final String AGENT_LOCAL_DB_PATH_INSTANCE = ".localdb/instance"; + public static final String AGENT_LOCAL_DB_PATH_OFFSET = ".localdb/offset"; public static final String AGENT_UNIQ_ID = "agent.uniq.id"; public static final String AGENT_DB_INSTANCE_NAME = "agent.db.instance.name"; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java index 1cd9cbf0290..45fd34f69f3 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java @@ -61,8 +61,8 @@ public class CommonConstants { // the same task must have the same Partition Key if choose sync public static final String PROXY_SEND_PARTITION_KEY = "proxy.partitionKey"; - // max size of single batch in bytes, default is 800KB. - public static final int DEFAULT_PROXY_PACKAGE_MAX_SIZE = 800000; + // max size of single batch in bytes, default is 500KB + public static final int DEFAULT_PROXY_PACKAGE_MAX_SIZE = 500000; public static final String PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER = "proxy.group.queue.maxNumber"; public static final int DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER = 10000; @@ -141,11 +141,18 @@ public class CommonConstants { public static final String PULSAR_PRODUCER_BLOCK_QUEUE = "pulsar.producer.block.queue"; public static final boolean DEFAULT_PULSAR_PRODUCER_BLOCK_QUEUE = true; - public static final String FILE_MAX_NUM = "file.max.num"; public static final int DEFAULT_FILE_MAX_NUM = 4096; + public static final String FILE_MAX_NUM = "file.max.num"; + public static final String TRIGGER_ID_PREFIX = "trigger_"; + public static final String TASK_ID_PREFIX = "task_"; + + public static final String INSTANCE_ID_PREFIX = "ins_"; + + public static final String OFFSET_ID_PREFIX = "offset_"; + public static final String COMMAND_STORE_INSTANCE_NAME = "commandStore"; public static final String AGENT_OS_NAME = "os.name"; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java new file mode 100755 index 00000000000..5ceeb2e4ea6 --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.db; + +import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.agent.conf.OffsetProfile; +import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.constant.CommonConstants; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.utils.AgentUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * db interface for task profile. + */ +public class OffsetDb { + + private static final Logger LOGGER = LoggerFactory.getLogger(OffsetDb.class); + private final Db db; + private final AgentConfiguration agentConf; + + public OffsetDb() { + agentConf = AgentConfiguration.getAgentConf(); + db = initDb(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET)); + } + + /** + * init db by class name + * + * @return db + */ + private Db initDb(String childPath) { + try { + return new RocksDbImp(childPath); + } catch (Exception ex) { + throw new UnsupportedClassVersionError(ex.getMessage()); + } + } + + public OffsetProfile getOffset(String taskId, String instanceId) { + KeyValueEntity result = db.get(getKey(taskId, instanceId)); + if (result == null) { + return null; + } + return result.getAsOffsetProfile(); + } + + public void deleteOffset(String taskId, String instanceId) { + db.remove(getKey(taskId, instanceId)); + } + + public void setOffset(OffsetProfile offsetProfile) { + offsetProfile.setLastUpdateTime(AgentUtils.getCurrentTime()); + if (offsetProfile.allRequiredKeyExist()) { + String keyName = getKey(offsetProfile.getTaskId(), + offsetProfile.getInstanceId()); + KeyValueEntity entity = new KeyValueEntity(keyName, + offsetProfile.toJsonStr(), offsetProfile.get(TaskConstants.INSTANCE_ID)); + db.put(entity); + } + } + + private String getKey(String taskId, String instanceId) { + return CommonConstants.OFFSET_ID_PREFIX + taskId + "_" + instanceId; + } +}