Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] Support Collector Alarm #2693

Open
wants to merge 34 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
38e409e
collector alarm
pwallk Sep 8, 2024
98ef2c7
fix eslint error
pwallk Sep 8, 2024
f77314a
Merge branch 'master' into feat-collector-alarm
Calvin979 Sep 9, 2024
201ca71
fix test error
pwallk Sep 9, 2024
f7a3928
Merge branch 'master' into feat-collector-alarm
Aias00 Sep 9, 2024
2cafeea
Merge branch 'master' into feat-collector-alarm
Aias00 Sep 10, 2024
d7e6868
Merge branch 'master' into feat-collector-alarm
yuluo-yx Sep 11, 2024
ffba083
Merge branch 'master' into feat-collector-alarm
Aias00 Sep 11, 2024
84ffd0a
Merge branch 'master' into feat-collector-alarm
Aias00 Sep 12, 2024
5e639b7
Merge branch 'master' into feat-collector-alarm
Aias00 Sep 13, 2024
154a8d1
Merge branch 'master' into feat-collector-alarm
Aias00 Sep 13, 2024
b51ed2b
Merge branch 'master' into feat-collector-alarm
Aias00 Sep 13, 2024
f007fb9
Merge branch 'master' into feat-collector-alarm
Aias00 Sep 15, 2024
5643900
Merge branch 'master' into feat-collector-alarm
Aias00 Sep 15, 2024
d1aaeda
reuse AlarmCommonReduce
pwallk Sep 15, 2024
7a143c2
Merge branch 'feat-collector-alarm' of https://github.com/pwallk/hert…
pwallk Sep 15, 2024
2ecc13d
Merge branch 'master' into feat-collector-alarm
Aias00 Sep 17, 2024
e79ccab
Merge branch 'master' into feat-collector-alarm
Aias00 Sep 19, 2024
b28a7e4
Merge branch 'master' into feat-collector-alarm
Aias00 Sep 21, 2024
1327ce6
Merge branch 'master' into feat-collector-alarm
yuluo-yx Sep 22, 2024
c71f602
Merge branch 'master' into feat-collector-alarm
Aias00 Sep 23, 2024
61c784b
Merge branch 'master' into feat-collector-alarm
Aias00 Sep 24, 2024
46a801f
Merge branch 'master' into feat-collector-alarm
Aias00 Sep 25, 2024
4305259
Merge branch 'master' into feat-collector-alarm
Aias00 Sep 30, 2024
c2be070
Merge branch 'master' into feat-collector-alarm
Aias00 Oct 8, 2024
a9bca1d
Merge branch 'master' into feat-collector-alarm
Aias00 Oct 16, 2024
3ec4a79
Merge branch 'master' into feat-collector-alarm
Aias00 Oct 26, 2024
e59e1f3
Merge branch 'master' into feat-collector-alarm
Aias00 Oct 26, 2024
1caa633
Merge branch 'master' into feat-collector-alarm
Aias00 Oct 27, 2024
67cb93c
Merge branch 'master' into feat-collector-alarm
Aias00 Oct 29, 2024
86591dc
Merge branch 'master' into feat-collector-alarm
Aias00 Oct 29, 2024
75196e7
Merge branch 'master' into feat-collector-alarm
Aias00 Oct 31, 2024
41d28bc
Merge branch 'master' into feat-collector-alarm
Aias00 Nov 1, 2024
f62c8e1
Merge branch 'master' into feat-collector-alarm
Aias00 Nov 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hertzbeat.alert.calculate;

import jakarta.persistence.criteria.Predicate;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.alert.AlerterWorkerPool;
import org.apache.hertzbeat.alert.dao.AlertCollectorDao;
import org.apache.hertzbeat.alert.reduce.AlarmCommonReduce;
import org.apache.hertzbeat.alert.service.AlertService;
import org.apache.hertzbeat.common.constants.CommonConstants;
import org.apache.hertzbeat.common.entity.alerter.Alert;
import org.apache.hertzbeat.common.entity.manager.Collector;
import org.apache.hertzbeat.common.support.event.CollectorDeletedEvent;
import org.apache.hertzbeat.common.support.event.SystemConfigChangeEvent;
import org.apache.hertzbeat.common.util.ResourceBundleUtil;
import org.springframework.context.event.EventListener;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.HashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.ResourceBundle;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.hertzbeat.common.constants.CommonConstants.ALERT_STATUS_CODE_PENDING;

/**
* handle collector alarm
*/
@Component
@Slf4j
public class CollectorAlarmHandler {

private final Map<String, Alert> offlineAlertMap;

private final AlertService alertService;

private final AlertCollectorDao alertCollectorDao;

private final AlarmCommonReduce alarmCommonReduce;

private final AlerterWorkerPool workerPool;

private ResourceBundle bundle;

public CollectorAlarmHandler(AlarmCommonReduce alarmCommonReduce, AlertService alertService, AlertCollectorDao alertCollectorDao,
AlerterWorkerPool workerPool) {
this.offlineAlertMap = new ConcurrentHashMap<>(16);
this.alarmCommonReduce = alarmCommonReduce;
this.alertService = alertService;
this.alertCollectorDao = alertCollectorDao;
this.workerPool = workerPool;
this.bundle = ResourceBundleUtil.getBundle("alerter");
List<Collector> collectors = this.alertCollectorDao.findCollectorsByStatus(CommonConstants.COLLECTOR_STATUS_OFFLINE);
if (!CollectionUtils.isEmpty(collectors)) {
for (Collector collector : collectors) {
Map<String, String> tags = new HashMap<>(8);
tags.put(CommonConstants.TAG_COLLECTOR_ID, String.valueOf(collector.getId()));
tags.put(CommonConstants.TAG_COLLECTOR_NAME, collector.getName());
this.offlineAlertMap.put(collector.getName(),
Alert.builder().tags(tags).target(CommonConstants.AVAILABILITY).status(ALERT_STATUS_CODE_PENDING).build());
}
}
}

/**
* handle collector online
*
* @param identity collector name
*/
public void online(final String identity) {
Collector collector = alertCollectorDao.findCollectorByName(identity);
if (collector == null) {
return;
}
long currentTimeMill = System.currentTimeMillis();
Alert preAlert = offlineAlertMap.remove(identity);
if (preAlert != null) {
Map<String, String> tags = preAlert.getTags();
tags.put(CommonConstants.TAG_COLLECTOR_HOST, collector.getIp());
tags.put(CommonConstants.TAG_COLLECTOR_VERSION, collector.getVersion());
String content = this.bundle.getString("alerter.availability.collector.recover");
Alert resumeAlert = Alert.builder()
.tags(tags)
.target(CommonConstants.AVAILABILITY_COLLECTOR)
.content(content)
.priority(CommonConstants.ALERT_PRIORITY_CODE_WARNING)
.status(CommonConstants.ALERT_STATUS_CODE_RESTORED)
.firstAlarmTime(currentTimeMill)
.lastAlarmTime(preAlert.getLastAlarmTime())
.build();
workerPool.executeJob(() -> recoverAlarm(identity, resumeAlert));
}
}

private void recoverAlarm(String identity, Alert restoreAlert) {
List<Long> alertIds = queryAvailabilityAlerts(identity, restoreAlert)
.stream()
.filter(alert -> Objects.equals(alert.getTags().get(CommonConstants.TAG_COLLECTOR_NAME), identity))
.map(Alert::getId)
.toList();

if (!alertIds.isEmpty()) {
alertService.editAlertStatus(CommonConstants.ALERT_STATUS_CODE_SOLVED, alertIds);

// Recovery notifications are generated only after an alarm has occurred
alarmCommonReduce.reduceAndSendAlarm(restoreAlert);
}
}

private List<Alert> queryAvailabilityAlerts(String identity, Alert restoreAlert) {
//create query condition
Specification<Alert> specification = (root, query, criteriaBuilder) -> {
List<Predicate> andList = new ArrayList<>();

Predicate predicateTags = criteriaBuilder.like(root.get("tags").as(String.class), "%" + identity + "%");
andList.add(predicateTags);

Predicate predicatePriority = criteriaBuilder.equal(root.get("priority"), CommonConstants.ALERT_PRIORITY_CODE_EMERGENCY);
andList.add(predicatePriority);

Predicate predicateStatus = criteriaBuilder.equal(root.get("status"), ALERT_STATUS_CODE_PENDING);
andList.add(predicateStatus);

Predicate predicateAlertTime = criteriaBuilder.lessThanOrEqualTo(root.get("lastAlarmTime"), restoreAlert.getLastAlarmTime());
andList.add(predicateAlertTime);

Predicate[] predicates = new Predicate[andList.size()];
return criteriaBuilder.and(andList.toArray(predicates));
};

//query results
return alertService.getAlerts(specification);
}

/**
* handle collector offline
*
* @param identity collector name
*/
public void offline(final String identity) {
Collector collector = alertCollectorDao.findCollectorByName(identity);
if (collector == null) {
return;
}
long currentTimeMill = System.currentTimeMillis();

Alert preAlert = offlineAlertMap.get(identity);
if (preAlert == null) {
Map<String, String> tags = new HashMap<>();
tags.put(CommonConstants.TAG_COLLECTOR_ID, String.valueOf(collector.getId()));
tags.put(CommonConstants.TAG_COLLECTOR_NAME, collector.getName());
tags.put(CommonConstants.TAG_COLLECTOR_HOST, collector.getIp());
tags.put(CommonConstants.TAG_COLLECTOR_VERSION, collector.getVersion());
tags.put(CommonConstants.TAG_CODE, "OFFLINE");

String content = this.bundle.getString("alerter.availability.collector.offline");
Alert alert = Alert.builder()
.tags(tags)
.priority(CommonConstants.ALERT_PRIORITY_CODE_EMERGENCY)
.status(CommonConstants.ALERT_STATUS_CODE_PENDING)
.target(CommonConstants.AVAILABILITY_COLLECTOR)
.content(content)
.firstAlarmTime(currentTimeMill)
.lastAlarmTime(currentTimeMill)
.times(1)
.build();
this.offlineAlertMap.put(identity, alert);
alarmCommonReduce.reduceAndSendAlarm(alert);
}
}


@EventListener(SystemConfigChangeEvent.class)
public void onSystemConfigChangeEvent(SystemConfigChangeEvent event) {
log.info("calculate alarm receive system config change event: {}.", event.getSource());
this.bundle = ResourceBundleUtil.getBundle("alerter");
}

@EventListener(CollectorDeletedEvent.class)
public void onCollectorDeletedEvent(CollectorDeletedEvent event) {
log.info("collector alarm handler receive collector {} has been deleted.", event.getIdentity());
offlineAlertMap.remove(event.getIdentity());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hertzbeat.alert.dao;

import org.apache.hertzbeat.common.entity.manager.Collector;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;

import java.util.List;

/**
* Alert Collector Dao
*/
public interface AlertCollectorDao extends JpaRepository<Collector, Long>, JpaSpecificationExecutor<Collector> {

/**
* Query the collector in the specified state
* @param status status value
* @return collector list
*/
List<Collector> findCollectorsByStatus(Byte status);

/**
* Query collector by name
* @param name collector name
* @return collector
*/
Collector findCollectorByName(String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,16 @@ public void reduceAndSendAlarm(Alert alert) {
alert.setTags(tags);
}
String monitorIdStr = tags.get(CommonConstants.TAG_MONITOR_ID);
if (monitorIdStr == null) {
log.debug("receiver extern alarm message: {}", alert);
} else {
if (monitorIdStr != null){
long monitorId = Long.parseLong(monitorIdStr);
List<Tag> tagList = alertMonitorDao.findMonitorIdBindTags(monitorId);
for (Tag tag : tagList) {
if (!tags.containsKey(tag.getName())) {
tags.put(tag.getName(), tag.getTagValue());
}
}
} else if (tags.get(CommonConstants.TAG_COLLECTOR_NAME) == null){
log.debug("receiver extern alarm message: {}", alert);
}
// converge -> silence
if (alarmConvergeReduce.filterConverge(alert) && alarmSilenceReduce.filterSilence(alert)) {
Expand Down
2 changes: 2 additions & 0 deletions alerter/src/main/resources/alerter_en_US.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# limitations under the License.

alerter.availability.recover = Availability Alert Resolved, Monitor Status Normal Now
alerter.availability.collector.recover = Collector Availability Alert Resolved, The collector is online
alerter.availability.collector.offline = Collector Availability Alert Notify, The collector is offline
alerter.alarm.recover = Alert Resolved Notice
alerter.notify.title = HertzBeat Alert Notify
alerter.notify.target = Monitor Target
Expand Down
2 changes: 2 additions & 0 deletions alerter/src/main/resources/alerter_zh_CN.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# limitations under the License.

alerter.availability.recover = 可用性告警恢复通知, 任务状态已恢复正常
alerter.availability.collector.recover = 采集器可用性恢复通知,采集器已上线
alerter.availability.collector.offline = 采集器可用性告警通知,采集器已下线
alerter.alarm.recover = 告警恢复通知
alerter.notify.title = HertzBeat告警通知
alerter.notify.target = 告警目标对象
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ public interface CommonConstants {
*/
String AVAILABILITY = "availability";

/**
* Collector availability
*/
String AVAILABILITY_COLLECTOR = "collectorAvailability";

/**
* Parameter Type Number
*/
Expand Down Expand Up @@ -192,6 +197,26 @@ public interface CommonConstants {
*/
byte AUTH_TYPE_GITEE = 5;

/**
* Inside the tag: monitorId
*/
String TAG_COLLECTOR_ID = "collectorId";

/**
* Inside the tag: collectorName
*/
String TAG_COLLECTOR_NAME = "collectorName";

/**
* Inside the tag: collectorHost
*/
String TAG_COLLECTOR_HOST = "collectorHost";

/**
* Inside the tag: collectorVersion
*/
String TAG_COLLECTOR_VERSION = "collectorVersion";

/**
* Inside the tag: monitorId Monitor task ID
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,23 @@
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.persistence.Entity;
import jakarta.persistence.EntityListeners;
import jakarta.persistence.FetchType;
import jakarta.persistence.ForeignKey;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.CascadeType;
import jakarta.persistence.ConstraintMode;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import jakarta.persistence.JoinColumn;
import jakarta.persistence.JoinTable;
import jakarta.persistence.UniqueConstraint;
import jakarta.persistence.ManyToMany;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotBlank;
import java.time.LocalDateTime;
import java.util.List;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
Expand Down Expand Up @@ -89,4 +98,12 @@ public class Collector {
@Schema(title = "Record the latest modification time (timestamp in milliseconds)")
@LastModifiedDate
private LocalDateTime gmtUpdate;

@ManyToMany(targetEntity = Tag.class, cascade = {CascadeType.PERSIST, CascadeType.MERGE}, fetch = FetchType.EAGER)
@JoinTable(name = "hzb_tag_collector_bind",
foreignKey = @ForeignKey(ConstraintMode.NO_CONSTRAINT),
inverseForeignKey = @ForeignKey(ConstraintMode.NO_CONSTRAINT),
joinColumns = {@JoinColumn(name = "collector_id", referencedColumnName = "id")},
inverseJoinColumns = {@JoinColumn(name = "tag_id", referencedColumnName = "id")})
private List<Tag> tags;
}
Loading
Loading