Skip to content

Commit

Permalink
[INLONG-10152][Sort] Refactor MetricOption code structure. (apache#10156
Browse files Browse the repository at this point in the history
)

Co-authored-by: vinnerzhang <[email protected]>
  • Loading branch information
XiaoYou201 and vinnerzhang authored May 9, 2024
1 parent 00595bb commit 011e96b
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

import static org.apache.inlong.sort.base.Constants.AUDIT_SORT_INPUT;
Expand All @@ -46,7 +47,7 @@ public class MetricOption implements Serializable {
private static final long serialVersionUID = 1L;

private Map<String, String> labels;
private HashSet<String> ipPortList;
private Set<String> ipPortSet;
private String ipPorts;
private RegisteredMetric registeredMetric;
private long initRecords;
Expand All @@ -57,62 +58,34 @@ public class MetricOption implements Serializable {
private List<Integer> inlongAuditKeys;

private MetricOption(
String inlongLabels,
Map<String, String> labels,
@Nullable String inlongAudit,
RegisteredMetric registeredMetric,
long initRecords,
long initBytes,
Long initDirtyRecords,
Long initDirtyBytes,
Long readPhase,
String inlongAuditKeys) {
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(inlongLabels),
"Inlong labels must be set for register metric.");

List<Integer> inlongAuditKeys,
Set<String> ipPortSet) {
this.initRecords = initRecords;
this.initBytes = initBytes;
this.initDirtyRecords = initDirtyRecords;
this.initDirtyBytes = initDirtyBytes;
this.readPhase = readPhase;
this.labels = new LinkedHashMap<>();
String[] inLongLabelArray = inlongLabels.split(DELIMITER);
Preconditions.checkArgument(Stream.of(inLongLabelArray).allMatch(label -> label.contains("=")),
"InLong metric label format must be xxx=xxx");
Stream.of(inLongLabelArray).forEach(label -> {
String key = label.substring(0, label.indexOf('='));
String value = label.substring(label.indexOf('=') + 1);
labels.put(key, value);
});

this.labels = labels;
this.ipPorts = inlongAudit;

if (ipPorts != null) {

Preconditions.checkArgument(labels.containsKey(GROUP_ID) && labels.containsKey(STREAM_ID),
"groupId and streamId must be set when enable inlong audit collect.");

if (inlongAuditKeys == null) {
LOG.warn("should set inlongAuditKeys when enable inlong audit collect, "
+ "fallback to use id {} as audit key", AUDIT_SORT_INPUT);
inlongAuditKeys = AUDIT_SORT_INPUT;
}

this.inlongAuditKeys = AuditUtils.extractAuditKeys(inlongAuditKeys);
this.ipPortList = AuditUtils.extractAuditIpPorts(ipPorts);

}

if (registeredMetric != null) {
this.registeredMetric = registeredMetric;
}
this.inlongAuditKeys = inlongAuditKeys;
this.ipPortSet = ipPortSet;
this.registeredMetric = registeredMetric;
}

public Map<String, String> getLabels() {
return labels;
}

public HashSet<String> getIpPortList() {
return ipPortList;
public HashSet<String> getIpPortSet() {
return new HashSet<>(ipPortSet);
}

public Optional<String> getIpPorts() {
Expand Down Expand Up @@ -238,11 +211,43 @@ public MetricOption.Builder withInitReadPhase(Long initReadPhase) {
}

public MetricOption build() {
if (inlongLabels == null && inlongAudit == null) {
if (inlongAudit == null && inlongLabels == null) {
LOG.warn("The property 'metrics.audit.proxy.hosts and inlong.metric.labels' has not been set," +
" the program will not open audit function");
return null;
}
return new MetricOption(inlongLabels, inlongAudit, registeredMetric, initRecords, initBytes,
initDirtyRecords, initDirtyBytes, initReadPhase, inlongAuditKeys);

Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(inlongLabels),
"Inlong labels must be set for register metric.");
String[] inLongLabelArray = inlongLabels.split(DELIMITER);
Preconditions.checkArgument(Stream.of(inLongLabelArray).allMatch(label -> label.contains("=")),
"InLong metric label format must be xxx=xxx");
Map<String, String> labels = new LinkedHashMap<>();
Stream.of(inLongLabelArray).forEach(label -> {
String key = label.substring(0, label.indexOf('='));
String value = label.substring(label.indexOf('=') + 1);
labels.put(key, value);
});

List<Integer> inlongAuditKeysList = null;
Set<String> ipPortSet = null;

if (inlongAudit != null) {
Preconditions.checkArgument(labels.containsKey(GROUP_ID) && labels.containsKey(STREAM_ID),
"The groupId and streamId must be set when enable inlong audit collect.");

if (inlongAuditKeys == null) {
LOG.warn("The inlongAuditKeys should be set when enable inlong audit collect, "
+ "fallback to use id {} as audit key", AUDIT_SORT_INPUT);
inlongAuditKeys = AUDIT_SORT_INPUT;
}

inlongAuditKeysList = AuditUtils.extractAuditKeys(inlongAuditKeys);
ipPortSet = AuditUtils.extractAuditIpPorts(inlongAudit);
}

return new MetricOption(labels, inlongAudit, registeredMetric, initRecords, initBytes,
initDirtyRecords, initDirtyBytes, initReadPhase, inlongAuditKeysList, ipPortSet);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public SinkMetricData(MetricOption option, MetricGroup metricGroup) {
}

if (option.getIpPorts().isPresent()) {
AuditOperator.getInstance().setAuditProxy(option.getIpPortList());
AuditOperator.getInstance().setAuditProxy(option.getIpPortSet());
this.auditOperator = AuditOperator.getInstance();
this.auditKeys = option.getInlongAuditKeys();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public SourceMetricData(MetricOption option, MetricGroup metricGroup) {
}

if (option.getIpPorts().isPresent()) {
AuditOperator.getInstance().setAuditProxy(option.getIpPortList());
AuditOperator.getInstance().setAuditProxy(option.getIpPortSet());
this.auditOperator = AuditOperator.getInstance();
this.auditKeys = option.getInlongAuditKeys();
}
Expand All @@ -114,7 +114,7 @@ public SourceMetricData(MetricOption option) {
this.labels = option.getLabels();

if (option.getIpPorts().isPresent()) {
AuditOperator.getInstance().setAuditProxy(option.getIpPortList());
AuditOperator.getInstance().setAuditProxy(option.getIpPortSet());
this.auditOperator = AuditOperator.getInstance();
this.auditKeys = option.getInlongAuditKeys();
}
Expand Down

0 comments on commit 011e96b

Please sign in to comment.