Skip to content

Commit

Permalink
[feature] support managing tasks by using http_sd (#2830)
Browse files Browse the repository at this point in the history
Co-authored-by: Zhang Yuxuan <[email protected]>
Co-authored-by: aias00 <[email protected]>
  • Loading branch information
3 people authored Nov 27, 2024
1 parent fc8be06 commit 5acaf48
Show file tree
Hide file tree
Showing 67 changed files with 1,501 additions and 186 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hertzbeat.collector.collect.sd;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hertzbeat.collector.collect.AbstractCollect;
import org.apache.hertzbeat.collector.collect.common.http.CommonHttpClient;
import org.apache.hertzbeat.collector.dispatch.DispatchConstants;
import org.apache.hertzbeat.common.entity.job.Metrics;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.common.entity.sd.ConnectionConfig;
import org.apache.hertzbeat.common.entity.sd.ServiceDiscoveryResponseEntity;
import org.apache.hertzbeat.common.util.CommonUtil;
import org.apache.hertzbeat.common.util.JsonUtil;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.methods.RequestBuilder;
import org.apache.http.util.EntityUtils;
import org.springframework.util.CollectionUtils;


/**
* http sd collector
*/
@Slf4j
public class HttpSdCollectImpl extends AbstractCollect {
@Override
public void preCheck(Metrics metrics) throws IllegalArgumentException {
}

@Override
public void collect(CollectRep.MetricsData.Builder builder, long monitorId, String app, Metrics metrics) {
List<ConnectionConfig> configList = Lists.newArrayList();
HttpUriRequest request = RequestBuilder.get().setUri(metrics.getSdProtocol().getSdSource()).build();

try (CloseableHttpResponse response = CommonHttpClient.getHttpClient().execute(request)) {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200) {
log.warn("Failed to fetch sd...");
builder.setMsg("StatusCode " + statusCode);
builder.setCode(CollectRep.Code.FAIL);
return;
}

String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
TypeReference<List<ServiceDiscoveryResponseEntity>> typeReference = new TypeReference<>() {};
final List<ServiceDiscoveryResponseEntity> responseEntityList = JsonUtil.fromJson(responseBody, typeReference);
if (CollectionUtils.isEmpty(responseEntityList)) {
return;
}

responseEntityList.stream()
.filter(entity -> !CollectionUtils.isEmpty(entity.getTarget()))
.forEach(responseEntity -> convertTarget(configList, responseEntity));


configList.forEach(config -> {
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
valueRowBuilder.addColumns(config.getHost());
valueRowBuilder.addColumns(config.getPort());
builder.addValues(valueRowBuilder.build());
});
} catch (IOException e) {
String errorMsg = CommonUtil.getMessageFromThrowable(e);
log.warn("Failed to fetch sd... {}", errorMsg);
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg(errorMsg);
}
}

private void convertTarget(List<ConnectionConfig> configList, ServiceDiscoveryResponseEntity responseEntity) {
responseEntity.getTarget().stream()
.filter(StringUtils::isNotBlank)
.forEach(fetchedTarget -> addConfig(configList, fetchedTarget));
}

private void addConfig(List<ConnectionConfig> configList, String fetchedTarget) {
for (String url : fetchedTarget.split(",")) {
final String[] split = url.split(":");
if (split.length != 2) {
continue;
}

configList.add(ConnectionConfig.builder()
.host(split[0])
.port(split[1])
.build());
}
}

@Override
public String supportProtocol() {
return DispatchConstants.PROTOCOL_HTTP_SD;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.Met
}
Set<Metrics> metricsSet = job.getNextCollectMetrics(metrics, false);
if (job.isCyclic()) {
if (job.isSd()) {
commonDataQueue.sendServiceDiscoveryData(metricsData);
}

// If it is an asynchronous periodic cyclic task, directly response the collected data
commonDataQueue.sendMetricsData(metricsData);
if (log.isDebugEnabled()) {
Expand All @@ -226,11 +230,12 @@ public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.Met
}
}
}

// If metricsSet is null, it means that the execution is completed or whether the priority of the collection metrics is 0, that is, the availability collection metrics.
// If the availability collection fails, the next metrics scheduling will be cancelled and the next round of scheduling will be entered directly.
boolean isAvailableCollectFailed = metricsSet != null && !metricsSet.isEmpty()
&& metrics.getPriority() == (byte) 0 && metricsData.getCode() != CollectRep.Code.SUCCESS;
if (metricsSet == null || isAvailableCollectFailed) {
if (metricsSet == null || isAvailableCollectFailed || job.isSd()) {
// The collection and execution task of this job are completed.
// The periodic task pushes the task to the time wheel again.
// First, determine the execution time of the task and the task collection interval.
Expand Down Expand Up @@ -300,7 +305,8 @@ public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.Met
}
}
}
if (metricsSet == null) {

if (job.isSd() || metricsSet == null) {
// The collection and execution of all metrics of this job are completed
// and the result listener is notified of the combination of all metrics data
timerDispatch.responseSyncJobData(job.getId(), job.getResponseDataTemp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {
* Start time of the collection task
*/
protected long startTime;
/**
* Whether it is a service discovery job, true is yes, false is no
*/
protected boolean isSd;

protected List<UnitConvert> unitConvertList;

Expand All @@ -115,6 +119,7 @@ public MetricsCollect(Metrics metrics, Timeout timeout,
this.app = job.getApp();
this.collectDataDispatch = collectDataDispatch;
this.isCyclic = job.isCyclic();
this.isSd = job.isSd();
this.unitConvertList = unitConvertList;
// Temporary one-time tasks are executed with high priority
if (isCyclic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ org.apache.hertzbeat.collector.collect.script.ScriptCollectImpl
org.apache.hertzbeat.collector.collect.mqtt.MqttCollectImpl
org.apache.hertzbeat.collector.collect.ipmi2.IpmiCollectImpl
org.apache.hertzbeat.collector.collect.kafka.KafkaCollectImpl
org.apache.hertzbeat.collector.collect.sd.HttpSdCollectImpl
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ public interface DispatchConstants {
* protocol registry
*/
String PROTOCOL_REGISTRY = "registry";
/**
* protocol http sd
*/
String PROTOCOL_HTTP_SD = "httpsd";
/**
* protocol redfish
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,17 @@ public void sendAsyncCollectData(CollectRep.MetricsData metricsData) {
this.collectServer.sendMsg(message);
}

public void sendAsyncServiceDiscoveryData(CollectRep.MetricsData metricsData) {
String data = ProtoJsonUtil.toJsonStr(metricsData);
ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
.setIdentity(collectorIdentity)
.setMsg(data)
.setDirection(ClusterMsg.Direction.REQUEST)
.setType(ClusterMsg.MessageType.RESPONSE_CYCLIC_TASK_SD_DATA)
.build();
this.collectServer.sendMsg(message);
}

public String getCollectorIdentity() {
return collectorIdentity;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,18 @@ public CollectRep.MetricsData pollMetricsDataToRealTimeStorage() throws Interrup
return null;
}

@Override
public CollectRep.MetricsData pollServiceDiscoveryData() throws InterruptedException {
return null;
}

@Override
public void sendMetricsData(CollectRep.MetricsData metricsData) {
collectJobService.sendAsyncCollectData(metricsData);
}

@Override
public void sendServiceDiscoveryData(CollectRep.MetricsData metricsData) {
collectJobService.sendAsyncServiceDiscoveryData(metricsData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public static class RedisProperties {
*/
private String metricsDataQueueNameToRealTimeStorage;

/**
* Queue name for service discovery
*/
private String metricsDataQueueNameForServiceDiscovery;

/**
* Queue name for alerts data
*/
Expand All @@ -128,6 +133,10 @@ public static class KafkaProperties extends BaseKafkaProperties {
* metrics data topic
*/
private String metricsDataTopic;
/**
* service discovery data topic
*/
private String serviceDiscoveryDataTopic;
/**
* alerts data topic
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ public interface CommonConstants {
*/
byte MONITOR_DOWN_CODE = 0x02;

/**
* Monitor bind type, 0: monitor that auto-created by sd
*/
byte MONITOR_BIND_TYPE_SD_SUB_MONITOR = 0x00;

/**
* Monitor bind type, 1: the main monitor of sd
*/
byte MONITOR_BIND_TYPE_SD_MAIN_MONITOR = 0x01;

/**
* Alarm status: 0 - normal alarm (to be processed)
*/
Expand Down Expand Up @@ -202,6 +212,16 @@ public interface CommonConstants {
*/
String TAG_MONITOR_NAME = "monitorName";

/**
* Inside the tag: indicate that this monitor is auto-created by main monitor
*/
String TAG_AUTO_CREATED = "autoCreated";

/**
* Inside the tag: indicate that this monitor is a main monitor which provides service discovery
*/
String TAG_SD_MAIN_MONITOR = "sdMainMonitor";

/**
* Inside the tag: monitorHost Task host
*/
Expand Down Expand Up @@ -232,6 +252,19 @@ public interface CommonConstants {
*/
String TAG_CODE = "code";

/**
* Tag Type: Auto-generated
*/
byte TAG_TYPE_AUTO_GENERATE = 0;
/**
* Tag Type: User-generated
*/
byte TAG_TYPE_USER_GENERATE = 1;
/**
* Tag Type: System preset
*/
byte TAG_TYPE_SYSTEM_PRESET = 2;

/**
* notice_period type Type field, daily type
*/
Expand Down Expand Up @@ -367,4 +400,14 @@ public interface CommonConstants {
* status page incident state resolved
*/
byte STATUS_PAGE_INCIDENT_STATE_RESOLVED = 3;

/**
* host column for service discovery
*/
int SD_HOST_COLUMN = 0;

/**
* port column for service discovery
*/
int SD_PORT_COLUMN = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ public class Job {
* Monitoring configuration parameter properties and values eg: username password timeout host
*/
private List<Configmap> configmap;
/**
* Whether it is a service discovery job, true is yes, false is no
*/
private boolean isSd = false;

/**
* the collect data response metrics as env configmap for other collect use. ^o^xxx^o^
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.hertzbeat.common.entity.job.protocol.UdpProtocol;
import org.apache.hertzbeat.common.entity.job.protocol.WebsocketProtocol;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.common.entity.sd.ServiceDiscoveryProtocol;

/**
* Details of the monitoring metrics collected
Expand Down Expand Up @@ -250,6 +251,10 @@ public class Metrics {
* Monitoring configuration information using the public kafka protocol
*/
private KafkaProtocol kclient;
/**
* Collect sd data protocol
*/
private ServiceDiscoveryProtocol sdProtocol;

/**
* collector use - Temporarily store subTask metrics response data
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hertzbeat.common.entity.job.protocol;

/**
* Define common field method for each protocol in {@link org.apache.hertzbeat.common.entity.job.Metrics}
*/
public interface CommonRequestProtocol {
void setHost(String host);

void setPort(String port);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class DnsProtocol {
public class DnsProtocol implements CommonRequestProtocol {
private String dnsServerIP;
private String port;
private String address;
Expand All @@ -41,4 +41,9 @@ public class DnsProtocol {
public boolean isInvalid() {
return StringUtils.isAnyBlank(dnsServerIP, port, address, timeout, tcp, queryClass);
}

@Override
public void setHost(String host) {
this.address = host;
}
}
Loading

0 comments on commit 5acaf48

Please sign in to comment.