Skip to content

Commit

Permalink
[INLONG-9117][Agent] Rewrite class RocksDbImp to enable it to be cons…
Browse files Browse the repository at this point in the history
…tructed with a child path

Agent will have more than one local db instance including task, instance and offset. So we need to init the db instance with different child path.
  • Loading branch information
justinwwhuang committed Oct 26, 2023
1 parent 0abbc5d commit 8d325f0
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.inlong.agent.db;

import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.conf.OffsetProfile;
import org.apache.inlong.agent.conf.TriggerProfile;

/**
Expand Down Expand Up @@ -90,6 +91,13 @@ public TriggerProfile getAsTriggerProfile() {
return TriggerProfile.parseJsonStr(getJsonValue());
}

/**
* convert keyValue to offset profile
*/
public OffsetProfile getAsOffsetProfile() {
return OffsetProfile.parseJsonStr(getJsonValue());
}

/**
* check whether the entity is finished
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ public class RocksDbImp implements Db {
private ConcurrentHashMap<String, ColumnFamilyDescriptor> columnDescriptorMap;
private String storePath;

public RocksDbImp() {
public RocksDbImp(String childPath) {
// init rocks db
this.conf = AgentConfiguration.getAgentConf();
this.db = initEnv();
this.db = initEnv(childPath);
// add a command column family
addColumnFamily(commandFamilyName);
}
Expand All @@ -74,10 +74,10 @@ private static ColumnFamilyDescriptor getColumnFamilyDescriptor(byte[] columnFam
return new ColumnFamilyDescriptor(columnFamilyName, new ColumnFamilyOptions());
}

private RocksDB initEnv() {
String configPath = conf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.DEFAULT_AGENT_ROCKS_DB_PATH);
private RocksDB initEnv(String childPath) {
String parentPath = conf.get(AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME);
File finalPath = new File(parentPath, configPath);
LOGGER.info("parentPath {} childPath {}", parentPath, childPath);
File finalPath = new File(parentPath, childPath);
storePath = finalPath.getAbsolutePath();
RocksDB.loadLibrary();
final Options options = new Options();
Expand Down Expand Up @@ -160,23 +160,12 @@ public KeyValueEntity get(String key) {

@Override
public CommandEntity getCommand(String commandId) {
try {
byte[] bytes = db.get(columnHandlesMap.get(commandFamilyName), commandId.getBytes());
return bytes == null ? null : GSON.fromJson(new String(bytes), CommandEntity.class);
} catch (Exception e) {
throw new RuntimeException("get command value error", e);
}
return null;
}

@Override
public CommandEntity putCommand(CommandEntity entity) {
requireNonNull(entity);
try {
db.put(columnHandlesMap.get(commandFamilyName), entity.getId().getBytes(), GSON.toJson(entity).getBytes());
} catch (Exception e) {
throw new RuntimeException("put value to rocks db error", e);
}
return entity;
return null;
}

@Override
Expand Down Expand Up @@ -228,6 +217,11 @@ public List<KeyValueEntity> searchWithKeyPrefix(StateSearchKey searchKey, String
return results;
}

@Override
public List<CommandEntity> searchCommands(boolean isAcked) {
return null;
}

@Override
public List<KeyValueEntity> search(StateSearchKey searchKey) {
List<KeyValueEntity> results = new LinkedList<>();
Expand Down Expand Up @@ -260,22 +254,6 @@ public List<KeyValueEntity> search(List<StateSearchKey> searchKeys) {
return results;
}

@Override
public List<CommandEntity> searchCommands(boolean isAcked) {
List<CommandEntity> results = new LinkedList<>();
try (final RocksIterator it = db.newIterator(columnHandlesMap.get(commandFamilyName))) {
it.seekToFirst();
while (it.isValid()) {
CommandEntity commandEntity = GSON.fromJson(new String(it.value()), CommandEntity.class);
if (commandEntity.isAcked() == isAcked) {
results.add(commandEntity);
}
it.next();
}
}
return results;
}

@Override
public KeyValueEntity searchOne(StateSearchKey searchKey) {
try (final RocksIterator it = db.newIterator(columnHandlesMap.get(defaultFamilyName))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
package org.apache.inlong.agent.db;

import org.apache.inlong.agent.AgentBaseTestsHelper;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.db.CommandEntity;

import org.junit.AfterClass;
import org.junit.Assert;
Expand All @@ -30,10 +27,6 @@
import java.io.IOException;
import java.util.List;

import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
import static org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX;
import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;

public class TestRocksDbImp {

private static RocksDbImp db;
Expand All @@ -42,7 +35,7 @@ public class TestRocksDbImp {
@BeforeClass
public static void setup() throws Exception {
helper = new AgentBaseTestsHelper(TestRocksDbImp.class.getName()).setupAgentHome();
db = new RocksDbImp();
db = new RocksDbImp("/localdb");
}

@AfterClass
Expand Down Expand Up @@ -81,22 +74,6 @@ public void testKeyValueDB() {
db.put(entity);
KeyValueEntity newEntity = db.get("test1");
Assert.assertEquals("testC", newEntity.getJsonValue());

}

@Test
public void testCommandDb() {
CommandEntity commandEntity = new CommandEntity();
commandEntity.setId("1");
commandEntity.setCommandResult(0);
commandEntity.setAcked(false);
commandEntity.setTaskId(1);
commandEntity.setVersion(1);
db.putCommand(commandEntity);
CommandEntity command = db.getCommand("1");
Assert.assertEquals("1", command.getId());
List<CommandEntity> commandEntities = db.searchCommands(false);
Assert.assertEquals("1", commandEntities.get(0).getId());
}

@Test
Expand All @@ -115,16 +92,4 @@ public void testFileNameSearch() {
KeyValueEntity entityResult = db.searchOne(StateSearchKey.ACCEPTED);
Assert.assertEquals("searchKey1", entityResult.getKey());
}

@Test
public void testBinlogJobStore() {
JobProfile jobProfile = JobProfile.parseJsonFile("binlogJob.json");
JobProfileDb jobDb = new JobProfileDb(db);
String jobId = jobProfile.get(JOB_ID);
jobProfile.set(JOB_INSTANCE_ID, AgentUtils.getSingleJobId(JOB_ID_PREFIX, jobId));
jobDb.storeJobFirstTime(jobProfile);
List<JobProfile> restarts = jobDb.getRestartJobs();
Assert.assertEquals(1, restarts.size());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.inlong.agent.db.CommandDb;
import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.db.JobProfileDb;
import org.apache.inlong.agent.db.RocksDbImp;
import org.apache.inlong.agent.db.TriggerProfileDb;

import org.slf4j.Logger;
Expand Down Expand Up @@ -102,11 +103,8 @@ private ProfileFetcher initFetcher(AgentManager agentManager) {
*/
private Db initDb() {
try {
// db is a required component, so if not init correctly,
// throw exception and stop running.
return (Db) Class.forName(conf.get(
AgentConstants.AGENT_DB_CLASSNAME, AgentConstants.DEFAULT_AGENT_DB_CLASSNAME))
.newInstance();
String childPath = conf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.DEFAULT_AGENT_ROCKS_DB_PATH);
return new RocksDbImp(childPath);
} catch (Exception ex) {
throw new UnsupportedClassVersionError(ex.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.inlong.agent.plugin.utils;

import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.TriggerProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.JobConstants;
import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.db.RocksDbImp;
Expand All @@ -31,7 +33,9 @@
public class RocksDBUtils {

public static void main(String[] args) {
Db db = new RocksDbImp();
AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
Db db = new RocksDbImp(
agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.DEFAULT_AGENT_ROCKS_DB_PATH));
upgrade(db);
}

Expand Down

0 comments on commit 8d325f0

Please sign in to comment.