Skip to content

Commit

Permalink
[INLONG-9120][Agent] Add offset db to store the offset data
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang committed Oct 26, 2023
1 parent a194f57 commit 0875eef
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public class AgentConstants {

public static final String AGENT_ROCKS_DB_PATH = "agent.rocks.db.path";
public static final String DEFAULT_AGENT_ROCKS_DB_PATH = ".rocksdb";
public static final String AGENT_LOCAL_DB_PATH_TASK = ".localdb/task";
public static final String AGENT_LOCAL_DB_PATH_INSTANCE = ".localdb/instance";
public static final String AGENT_LOCAL_DB_PATH_OFFSET = ".localdb/offset";

public static final String AGENT_UNIQ_ID = "agent.uniq.id";
public static final String AGENT_DB_INSTANCE_NAME = "agent.db.instance.name";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public class CommonConstants {
// the same task must have the same Partition Key if choose sync
public static final String PROXY_SEND_PARTITION_KEY = "proxy.partitionKey";

// max size of single batch in bytes, default is 800KB.
public static final int DEFAULT_PROXY_PACKAGE_MAX_SIZE = 800000;
// max size of single batch in bytes, default is 500KB
public static final int DEFAULT_PROXY_PACKAGE_MAX_SIZE = 500000;

public static final String PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER = "proxy.group.queue.maxNumber";
public static final int DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER = 10000;
Expand Down Expand Up @@ -141,11 +141,18 @@ public class CommonConstants {
public static final String PULSAR_PRODUCER_BLOCK_QUEUE = "pulsar.producer.block.queue";
public static final boolean DEFAULT_PULSAR_PRODUCER_BLOCK_QUEUE = true;

public static final String FILE_MAX_NUM = "file.max.num";
public static final int DEFAULT_FILE_MAX_NUM = 4096;

public static final String FILE_MAX_NUM = "file.max.num";

public static final String TRIGGER_ID_PREFIX = "trigger_";

public static final String TASK_ID_PREFIX = "task_";

public static final String INSTANCE_ID_PREFIX = "ins_";

public static final String OFFSET_ID_PREFIX = "offset_";

public static final String COMMAND_STORE_INSTANCE_NAME = "commandStore";

public static final String AGENT_OS_NAME = "os.name";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.db;

import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.OffsetProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.utils.AgentUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* db interface for task profile.
*/
public class OffsetDb {

private static final Logger LOGGER = LoggerFactory.getLogger(OffsetDb.class);
private final Db db;
private final AgentConfiguration agentConf;

public OffsetDb() {
agentConf = AgentConfiguration.getAgentConf();
db = initDb(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET));
}

/**
* init db by class name
*
* @return db
*/
private Db initDb(String childPath) {
try {
return new RocksDbImp(childPath);
} catch (Exception ex) {
throw new UnsupportedClassVersionError(ex.getMessage());
}
}

public OffsetProfile getOffset(String taskId, String instanceId) {
KeyValueEntity result = db.get(getKey(taskId, instanceId));
if (result == null) {
return null;
}
return result.getAsOffsetProfile();
}

public void deleteOffset(String taskId, String instanceId) {
db.remove(getKey(taskId, instanceId));
}

public void setOffset(OffsetProfile offsetProfile) {
offsetProfile.setLastUpdateTime(AgentUtils.getCurrentTime());
if (offsetProfile.allRequiredKeyExist()) {
String keyName = getKey(offsetProfile.getTaskId(),
offsetProfile.getInstanceId());
KeyValueEntity entity = new KeyValueEntity(keyName,
offsetProfile.toJsonStr(), offsetProfile.get(TaskConstants.INSTANCE_ID));
db.put(entity);
}
}

private String getKey(String taskId, String instanceId) {
return CommonConstants.OFFSET_ID_PREFIX + taskId + "_" + instanceId;
}
}

0 comments on commit 0875eef

Please sign in to comment.