Skip to content

Commit

Permalink
fix: status update trigger (#2719) (#2760)
Browse files Browse the repository at this point in the history
(cherry picked from commit 521b43c)
  • Loading branch information
SteKoe authored Sep 14, 2023
1 parent 568d102 commit 21357ee
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

import de.codecentric.boot.admin.server.domain.events.InstanceEvent;

Expand Down Expand Up @@ -55,8 +54,7 @@ public void start() {
this.subscription = Flux.from(this.publisher).subscribeOn(this.scheduler).log(this.log.getName(), Level.FINEST)
.doOnSubscribe((s) -> this.log.debug("Subscribed to {} events", this.eventType)).ofType(this.eventType)
.cast(this.eventType).transform(this::handle)
.retryWhen(Retry.indefinitely().doBeforeRetry((s) -> this.log.warn("Unexpected error", s.failure())))
.subscribe();
.onErrorContinue((throwable, o) -> this.log.warn("Unexpected error", throwable)).subscribe();
}

protected abstract Publisher<Void> handle(Flux<T> publisher);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -43,18 +44,20 @@
*
* @author Johannes Edmeier
*/
@Slf4j
public class IntervalCheck {

private static final Logger log = LoggerFactory.getLogger(IntervalCheck.class);

private final String name;

private final Map<InstanceId, Instant> lastChecked = new ConcurrentHashMap<>();

private final Function<InstanceId, Mono<Void>> checkFn;

@Getter
@Setter
private Duration interval;

@Setter
private Duration minRetention;

@Nullable
Expand Down Expand Up @@ -82,7 +85,7 @@ public void start() {
.log(log.getName(), Level.FINEST).subscribeOn(this.scheduler).concatMap((i) -> this.checkAllInstances())
.retryWhen(Retry.indefinitely()
.doBeforeRetry((s) -> log.warn("Unexpected error in {}-check", this.name, s.failure())))
.subscribe();
.subscribe(null, (error) -> log.error("Unexpected error in {}-check", name, error));
}

public void markAsChecked(InstanceId instanceId) {
Expand All @@ -92,7 +95,7 @@ public void markAsChecked(InstanceId instanceId) {
protected Mono<Void> checkAllInstances() {
log.debug("check {} for all instances", this.name);
Instant expiration = Instant.now().minus(this.minRetention);
return Flux.fromIterable(this.lastChecked.entrySet()).filter((e) -> e.getValue().isBefore(expiration))
return Flux.fromIterable(this.lastChecked.entrySet()).filter((entry) -> entry.getValue().isBefore(expiration))
.map(Map.Entry::getKey).flatMap(this.checkFn).then();
}

Expand All @@ -107,12 +110,4 @@ public void stop() {
}
}

public void setInterval(Duration interval) {
this.interval = interval;
}

public void setMinRetention(Duration minRetention) {
this.minRetention = minRetention;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
}

protected Mono<Void> updateStatus(InstanceId instanceId) {
return this.statusUpdater.updateStatus(instanceId).onErrorResume((e) -> {
log.warn("Unexpected error while updating status for {}", instanceId, e);
return Mono.empty();
}).doFinally((s) -> this.intervalCheck.markAsChecked(instanceId));
return this.statusUpdater.timeout(this.intervalCheck.getInterval()).updateStatus(instanceId)
.onErrorResume((e) -> {
log.warn("Unexpected error while updating status for {}", instanceId, e);
return Mono.empty();
}).doFinally((s) -> this.intervalCheck.markAsChecked(instanceId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

package de.codecentric.boot.admin.server.services;

import java.time.Duration;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.logging.Level;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
Expand All @@ -44,10 +45,10 @@
*
* @author Johannes Edmeier
*/
@Slf4j
@RequiredArgsConstructor
public class StatusUpdater {

private static final Logger log = LoggerFactory.getLogger(StatusUpdater.class);

private static final ParameterizedTypeReference<Map<String, Object>> RESPONSE_TYPE = new ParameterizedTypeReference<Map<String, Object>>() {
};

Expand All @@ -57,16 +58,15 @@ public class StatusUpdater {

private final ApiMediaTypeHandler apiMediaTypeHandler;

public StatusUpdater(InstanceRepository repository, InstanceWebClient instanceWebClient,
ApiMediaTypeHandler apiMediaTypeHandler) {
this.repository = repository;
this.instanceWebClient = instanceWebClient;
this.apiMediaTypeHandler = apiMediaTypeHandler;
private Duration timeout = Duration.ofSeconds(10);

public StatusUpdater timeout(Duration timeout) {
this.timeout = timeout;
return this;
}

public Mono<Void> updateStatus(InstanceId id) {
return this.repository.computeIfPresent(id, (key, instance) -> this.doUpdateStatus(instance)).then();

}

protected Mono<Instance> doUpdateStatus(Instance instance) {
Expand All @@ -77,8 +77,16 @@ protected Mono<Instance> doUpdateStatus(Instance instance) {
log.debug("Update status for {}", instance);
return this.instanceWebClient.instance(instance).get().uri(Endpoint.HEALTH)
.exchangeToMono(this::convertStatusInfo).log(log.getName(), Level.FINEST)
.doOnError((ex) -> logError(instance, ex)).onErrorResume(this::handleError)
.map(instance::withStatusInfo);
.timeout(getTimeoutWithMargin()).doOnError((ex) -> logError(instance, ex))
.onErrorResume(this::handleError).map(instance::withStatusInfo);
}

/*
* return a timeout less than the given one to prevent backdrops in concurrent get
* request. This prevents flakyness of health checks.
*/
private Duration getTimeoutWithMargin() {
return this.timeout.minusSeconds(1).abs();
}

protected Mono<StatusInfo> convertStatusInfo(ClientResponse response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class StatusUpdateTriggerTest {
@BeforeEach
public void setUp() throws Exception {
when(this.updater.updateStatus(any(InstanceId.class))).thenReturn(Mono.empty());
when(this.updater.timeout(any())).thenReturn(this.updater);

this.trigger = new StatusUpdateTrigger(this.updater, this.events.flux());
this.trigger.start();
Expand Down

0 comments on commit 21357ee

Please sign in to comment.