Skip to content

Commit

Permalink
[INLONG-11323][Manager] Modify the parameters of the data add tasks f…
Browse files Browse the repository at this point in the history
…or file collection
  • Loading branch information
fuweng11 committed Oct 10, 2024
1 parent 58fd7dd commit 4309f21
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import javax.validation.constraints.NotBlank;

import java.util.List;

/**
* Data add task information
*/
Expand All @@ -33,9 +35,16 @@
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "sourceType")
public class DataAddTaskRequest {

@ApiModelProperty(value = "Source ID")
@ApiModelProperty(value = "Group Id")
@NotBlank(message = "inlongGroupId cannot be blank")
private String groupId;

@ApiModelProperty(value = "Source ID", hidden = true)
private Integer sourceId;

@ApiModelProperty(value = "Agent ip List")
private List<String> agentIpList;

@ApiModelProperty("Source type, including: FILE, KAFKA, etc.")
@NotBlank(message = "sourceType cannot be blank")
@Length(min = 1, max = 20, message = "length must be between 1 and 20")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,42 @@ public void updateAgentTaskConfig(SourceRequest request, String operator) {
if (existEntity != null) {
agentTaskConfigEntity = CommonBeanUtils.copyProperties(existEntity, AgentTaskConfigEntity::new, true);
}

LOGGER.debug("begin to get agent config info for {}", request);
Set<String> tagSet = new HashSet<>(16);
InlongClusterEntity agentClusterInfo = clusterMapper.selectByNameAndType(request.getInlongClusterName(),
ClusterType.AGENT);
if (agentClusterInfo == null) {
agentTaskConfigEntity.setIsDeleted(agentTaskConfigEntity.getId());
agentTaskConfigEntityMapper.updateByIdSelective(agentTaskConfigEntity);
return;
}
String clusterTag = agentClusterInfo.getClusterTags();
AgentConfigInfo agentConfigInfo = AgentConfigInfo.builder()
.cluster(AgentConfigInfo.AgentClusterInfo.builder()
.parentId(agentClusterInfo.getId())
.clusterName(agentClusterInfo.getName())
.build())
.build();
if (StringUtils.isNotBlank(clusterTag)) {
tagSet.addAll(Arrays.asList(clusterTag.split(InlongConstants.COMMA)));
List<String> clusterTagList = new ArrayList<>(tagSet);
ClusterPageRequest pageRequest = ClusterPageRequest.builder()
.type(ClusterType.AGENT_ZK)
.clusterTagList(clusterTagList)
.build();
List<InlongClusterEntity> agentZkCluster = clusterMapper.selectByCondition(pageRequest);
if (CollectionUtils.isNotEmpty(agentZkCluster)) {
agentConfigInfo.setZkUrl(agentZkCluster.get(0).getUrl());
}
}

String jsonStr = GSON.toJson(agentConfigInfo);
String configMd5 = DigestUtils.md5Hex(jsonStr);
agentConfigInfo.setMd5(configMd5);
agentConfigInfo.setCode(AgentResponseCode.SUCCESS);
agentTaskConfigEntity.setConfigParams(objectMapper.writeValueAsString(agentConfigInfo));

List<StreamSourceEntity> normalSourceEntities = sourceMapper.selectByStatusAndCluster(
SourceStatus.NORMAL_STATUS_SET.stream().map(SourceStatus::getCode)
.collect(Collectors.toList()),
Expand All @@ -443,7 +479,7 @@ public void updateAgentTaskConfig(SourceRequest request, String operator) {
return cmdConfig;
}).collect(Collectors.toList());
if (CollectionUtils.isEmpty(taskLists)) {
agentTaskConfigEntity.setIsDeleted(agentTaskConfigEntity.getId());
agentTaskConfigEntity.setTaskParams(null);
agentTaskConfigEntityMapper.updateByIdSelective(agentTaskConfigEntity);
return;
}
Expand All @@ -461,37 +497,6 @@ public void updateAgentTaskConfig(SourceRequest request, String operator) {
agentTaskConfigEntity.setClusterName(request.getInlongClusterName());
agentTaskConfigEntity.setTaskParams(objectMapper.writeValueAsString(taskResult));

LOGGER.debug("begin to get agent config info for {}", request);
Set<String> tagSet = new HashSet<>(16);
InlongGroupEntity groupEntity =
groupMapper.selectByGroupIdWithoutTenant(request.getInlongGroupId());
String clusterTag = groupEntity.getInlongClusterTag();
InlongClusterEntity agentClusterInfo = clusterMapper.selectByNameAndType(request.getInlongClusterName(),
ClusterType.AGENT);
AgentConfigInfo agentConfigInfo = AgentConfigInfo.builder()
.cluster(AgentConfigInfo.AgentClusterInfo.builder()
.parentId(agentClusterInfo.getId())
.clusterName(agentClusterInfo.getName())
.build())
.build();
if (StringUtils.isNotBlank(clusterTag)) {
tagSet.addAll(Arrays.asList(clusterTag.split(InlongConstants.COMMA)));
List<String> clusterTagList = new ArrayList<>(tagSet);
ClusterPageRequest pageRequest = ClusterPageRequest.builder()
.type(ClusterType.AGENT_ZK)
.clusterTagList(clusterTagList)
.build();
List<InlongClusterEntity> agentZkCluster = clusterMapper.selectByCondition(pageRequest);
if (CollectionUtils.isNotEmpty(agentZkCluster)) {
agentConfigInfo.setZkUrl(agentZkCluster.get(0).getUrl());
}
}

String jsonStr = GSON.toJson(agentConfigInfo);
String configMd5 = DigestUtils.md5Hex(jsonStr);
agentConfigInfo.setMd5(configMd5);
agentConfigInfo.setCode(AgentResponseCode.SUCCESS);
agentTaskConfigEntity.setConfigParams(objectMapper.writeValueAsString(agentConfigInfo));
agentClusterInfo.setModifier(operator);
if (existEntity == null) {
agentTaskConfigEntity.setCreator(operator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,6 @@ default Boolean updateAfterApprove(String operator) {
* @param operator Operator's name.
* @return source id after saving.
*/
Integer addDataAddTask(DataAddTaskRequest request, String operator);
List<Integer> addDataAddTask(DataAddTaskRequest request, String operator);

/**
* Batch Save the data add task information
*
* @param requestList Source request list.
* @param operator Operator's name.
* @return source id list after saving.
*/
List<Integer> batchAddDataAddTask(String groupId, List<DataAddTaskRequest> requestList,
String operator);
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -533,25 +534,32 @@ private void chkUnmodifiableParams(SourceRequest request) {
}

@Override
public Integer addDataAddTask(DataAddTaskRequest request, String operator) {
public List<Integer> addDataAddTask(DataAddTaskRequest request, String operator) {
LOGGER.info("begin to add data add task info: {}", request);
StreamSourceEntity entity = sourceMapper.selectById(request.getSourceId());
StreamSourceOperator sourceOperator = operatorFactory.getInstance(entity.getSourceType());
int id = sourceOperator.addDataAddTask(request, operator);
LOGGER.info("success to add data add task info: {}", request);
return id;
}

@Override
public List<Integer> batchAddDataAddTask(String groupId, List<DataAddTaskRequest> requestList,
String operator) {
List<Integer> result = new ArrayList<>();
String auditVersion = String.valueOf(sourceMapper.selectDataAddTaskCount(groupId, null));
for (DataAddTaskRequest request : requestList) {
request.setAuditVersion(auditVersion);
int id = addDataAddTask(request, operator);
result.add(id);
String auditVersion = String.valueOf(sourceMapper.selectDataAddTaskCount(request.getGroupId(), null));
request.setAuditVersion(auditVersion);
List<String> agentIpList = request.getAgentIpList();
List<StreamSourceEntity> entityList = new ArrayList<>();
List<Integer> resultIdList = new ArrayList<>();
if (CollectionUtils.isEmpty(agentIpList)) {
entityList = sourceMapper.selectByRelatedId(request.getGroupId(), null, null);
} else {
for (String agentIp : agentIpList) {
List<StreamSourceEntity> sourceEntityList = sourceMapper.selectByAgentIp(agentIp);
entityList.addAll(sourceEntityList);
}
}
return result;
for (StreamSourceEntity sourceEntity : entityList) {
if (sourceEntity.getTaskMapId() != null || !Objects.equals(sourceEntity.getInlongGroupId(),
request.getGroupId())) {
continue;
}
StreamSourceOperator sourceOperator = operatorFactory.getInstance(sourceEntity.getSourceType());
request.setSourceId(sourceEntity.getId());
int id = sourceOperator.addDataAddTask(request, operator);
resultIdList.add(id);
}
LOGGER.info("success to add data add task info: {}, data add task size: {}", request, resultIdList.size());
return resultIdList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,10 @@ public Response<Boolean> forceDelete(@RequestParam String inlongGroupId, @Reques
sourceService.forceDelete(inlongGroupId, inlongStreamId, LoginUserUtils.getLoginUser().getName()));
}

@RequestMapping(value = "/source/addDataAddTask/{groupId}", method = RequestMethod.POST)
@RequestMapping(value = "/source/addDataAddTask", method = RequestMethod.POST)
@ApiOperation(value = "Add supplementary recording task for stream source")
@ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true)
public Response<List<Integer>> addSub(@PathVariable String groupId,
@RequestBody List<DataAddTaskRequest> requestList) {
return Response.success(
sourceService.batchAddDataAddTask(groupId, requestList, LoginUserUtils.getLoginUser().getName()));
public Response<List<Integer>> addSub(@RequestBody DataAddTaskRequest request) {
return Response.success(sourceService.addDataAddTask(request, LoginUserUtils.getLoginUser().getName()));
}

}

0 comments on commit 4309f21

Please sign in to comment.