Skip to content

Commit

Permalink
[INLONG-9484]][Manager] Improve logic of sortstandalone sink auto-ass…
Browse files Browse the repository at this point in the history
…igned cluster (apache#9485)
  • Loading branch information
vernedeng authored Dec 15, 2023
1 parent bfc7750 commit 47c4fa0
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public abstract class ClusterRequest {
private Integer id;

@ApiModelProperty(value = "Cluster name")
@Pattern(regexp = "^[A-Za-z0-9_-]{1,128}$", message = "only supports letters, numbers, '-', or '_'")
@Pattern(regexp = "^[A-Za-z0-9._-]{1,128}$", message = "only supports letters, numbers, '.', '-', or '_'")
private String name;

@ApiModelProperty(value = "Cluster display name, just for display")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public abstract class DataNodeRequest {
private Integer id;

@ApiModelProperty(value = "Data node name")
@Pattern(regexp = "^[A-Za-z0-9_-]{1,128}$", message = "only supports letters, numbers, '-', or '_'")
@Pattern(regexp = "^[A-Za-z0-9._-]{1,128}$", message = "only supports letters, numbers, '.', '-', or '_'")
private String name;

@ApiModelProperty(value = "Data node display name, just for display")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.inlong.manager.service.resource.sink;

import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
Expand All @@ -31,15 +31,15 @@
import org.apache.inlong.manager.service.sink.StreamSinkService;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;

@Slf4j
public abstract class AbstractStandaloneSinkResourceOperator implements SinkResourceOperator {

@Autowired
Expand All @@ -51,14 +51,16 @@ public abstract class AbstractStandaloneSinkResourceOperator implements SinkReso
@Autowired
private StreamSinkService sinkService;

private static final String SORT_PREFIX = "SORT_";

private Random rand = new Random();

@VisibleForTesting
protected void assignCluster(SinkInfo sinkInfo) {
if (StringUtils.isBlank(sinkInfo.getSinkType())) {
throw new IllegalArgumentException(ErrorCodeEnum.SINK_TYPE_IS_NULL.getMessage());
}

if (StringUtils.isNotBlank(sinkInfo.getInlongClusterName())) {
String info = "success to create es resource";
String info = "no need to auto-assign cluster since the cluster has already assigned";
sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
return;
}
Expand Down Expand Up @@ -88,18 +90,23 @@ private String assignFromExist(String dataNodeName) {
private String assignFromRelated(String sinkType, String groupId) {
InlongGroupEntity group = groupEntityMapper.selectByGroupId(groupId);
String sortClusterType = SinkType.relatedSortClusterType(sinkType);
List<InlongClusterEntity> clusters = clusterEntityMapper
.selectByKey(null, null, sortClusterType).stream()
.filter(cluster -> checkCluster(cluster.getClusterTags(), group.getInlongClusterTag()))
.collect(Collectors.toList());
if (StringUtils.isBlank(sortClusterType)) {
log.error("find no relate sort cluster type for sink type={}", sinkType);
return null;
}

return CollectionUtils.isEmpty(clusters) ? null : clusters.get(rand.nextInt(clusters.size())).getName();
// if some clusters have the same tag
List<InlongClusterEntity> clusters =
clusterEntityMapper.selectByKey(group.getInlongClusterTag(), null, sortClusterType);
if (!CollectionUtils.isEmpty(clusters)) {
return clusters.get(rand.nextInt(clusters.size())).getName();
}

}
// if no cluster with the same tag
clusters = clusterEntityMapper.selectByKey(null, null, sortClusterType);

return CollectionUtils.isEmpty(clusters) ? null : clusters.get(rand.nextInt(clusters.size())).getName();

private boolean checkCluster(String clusterTags, String targetTag) {
return StringUtils.isBlank(clusterTags)
|| Sets.newHashSet(clusterTags.split(InlongConstants.COMMA)).contains(targetTag);
}

}

0 comments on commit 47c4fa0

Please sign in to comment.