Skip to content

Commit

Permalink
[INLONG-9124][Agent] Add task and instance db
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang committed Oct 26, 2023
1 parent 3ee9921 commit f32af99
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.db;

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.constant.TaskConstants;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

/**
* db interface for task profile.
*/
public class InstanceDb {

private static final Logger LOGGER = LoggerFactory.getLogger(TaskProfileDb.class);

private final Db db;

public InstanceDb(Db db) {
this.db = db;
}

/**
* get instance list from db.
*
* @return list of task
*/
public List<InstanceProfile> getInstances(String taskId) {
List<KeyValueEntity> result = this.db.findAll(getKeyByTaskId(taskId));
List<InstanceProfile> instanceList = new ArrayList<>();
for (KeyValueEntity entity : result) {
instanceList.add(entity.getAsInstanceProfile());
}
return instanceList;
}

/**
* store instance profile.
*
* @param instance instance
*/
public void storeInstance(InstanceProfile instance) {
if (instance.allRequiredKeyExist()) {
String keyName = getKeyByTaskAndInstanceId(instance.get(TaskConstants.TASK_ID),
instance.get(TaskConstants.INSTANCE_ID));
KeyValueEntity entity = new KeyValueEntity(keyName,
instance.toJsonStr(), instance.get(TaskConstants.INSTANCE_ID));
KeyValueEntity oldEntity = db.put(entity);
if (oldEntity != null) {
LOGGER.warn("instance profile {} has been replaced", oldEntity.getKey());
}
} else {
LOGGER.error("instance profile invalid!");
}
}

/**
* get instance profile.
*
* @param taskId task id from manager
* @param instanceId it can be file name(file collect), table name(db sync) etc
*/
public InstanceProfile getInstance(String taskId, String instanceId) {
KeyValueEntity result = this.db.get(getKeyByTaskAndInstanceId(taskId, instanceId));
if (result == null) {
return null;
}
return result.getAsInstanceProfile();
}

/**
* delete instance
*
* @param taskId task id from manager
* @param instanceId it can be file name(file collect), table name(db sync) etc
*/
public void deleteInstance(String taskId, String instanceId) {
db.remove(getKeyByTaskAndInstanceId(taskId, instanceId));
}

private String getKey() {
return CommonConstants.INSTANCE_ID_PREFIX;
}

private String getKeyByTaskId(String taskId) {
return CommonConstants.INSTANCE_ID_PREFIX + taskId;
}

private String getKeyByTaskAndInstanceId(String taskId, String instanceId) {
return CommonConstants.INSTANCE_ID_PREFIX + taskId + "_" + instanceId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.inlong.agent.db;

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.conf.OffsetProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.conf.TriggerProfile;

/**
Expand Down Expand Up @@ -98,6 +100,20 @@ public OffsetProfile getAsOffsetProfile() {
return OffsetProfile.parseJsonStr(getJsonValue());
}

/**
* convert keyValue to task profile
*/
public TaskProfile getAsTaskProfile() {
return TaskProfile.parseJsonStr(getJsonValue());
}

/**
* convert keyValue to instance profile
*/
public InstanceProfile getAsInstanceProfile() {
return InstanceProfile.parseJsonStr(getJsonValue());
}

/**
* check whether the entity is finished
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.db;

import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.constant.TaskConstants;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

/**
* db interface for task profile.
*/
public class TaskProfileDb {

private static final Logger LOGGER = LoggerFactory.getLogger(TaskProfileDb.class);

private final Db db;

public TaskProfileDb(Db db) {
this.db = db;
}

/**
* get task list from db.
*
* @return list of task
*/
public List<TaskProfile> getTasks() {
List<KeyValueEntity> result = this.db.findAll(getKey());
List<TaskProfile> taskList = new ArrayList<>();
for (KeyValueEntity entity : result) {
taskList.add(entity.getAsTaskProfile());
}
return taskList;
}

/**
* store task profile.
*
* @param task task
*/
public void storeTask(TaskProfile task) {
if (task.allRequiredKeyExist()) {
String keyName = getKeyByTaskId(task.getTaskId());
KeyValueEntity entity = new KeyValueEntity(keyName,
task.toJsonStr(), task.get(TaskConstants.FILE_DIR_FILTER_PATTERNS));
KeyValueEntity oldEntity = db.put(entity);
if (oldEntity != null) {
LOGGER.warn("task profile {} has been replaced", oldEntity.getKey());
}
}
}

/**
* get task profile.
*
* @param taskId taskId
*/
public TaskProfile getTask(String taskId) {
KeyValueEntity result = this.db.get(getKeyByTaskId(taskId));
if (result == null) {
return null;
}
return result.getAsTaskProfile();
}

/**
* delete task by id.
*/
public void deleteTask(String taskId) {
db.remove(getKeyByTaskId(taskId));
}

private String getKey() {
return CommonConstants.TASK_ID_PREFIX;
}

private String getKeyByTaskId(String taskId) {
return CommonConstants.TASK_ID_PREFIX + taskId;
}
}

0 comments on commit f32af99

Please sign in to comment.