From 21357eea3b53904963fc0daaee2d7e8f7fe83162 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20K=C3=B6ninger?= Date: Thu, 14 Sep 2023 20:28:54 +0200 Subject: [PATCH] fix: status update trigger (#2719) (#2760) (cherry picked from commit 521b43cbd345c33c5e836cf8546b6d39089d1512) --- .../server/services/AbstractEventHandler.java | 4 +-- .../admin/server/services/IntervalCheck.java | 23 ++++++------- .../server/services/StatusUpdateTrigger.java | 9 +++--- .../admin/server/services/StatusUpdater.java | 32 ++++++++++++------- .../services/StatusUpdateTriggerTest.java | 1 + 5 files changed, 36 insertions(+), 33 deletions(-) diff --git a/spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/AbstractEventHandler.java b/spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/AbstractEventHandler.java index 0dbd00f84a4..d7e42c12612 100644 --- a/spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/AbstractEventHandler.java +++ b/spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/AbstractEventHandler.java @@ -27,7 +27,6 @@ import reactor.core.publisher.Flux; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -import reactor.util.retry.Retry; import de.codecentric.boot.admin.server.domain.events.InstanceEvent; @@ -55,8 +54,7 @@ public void start() { this.subscription = Flux.from(this.publisher).subscribeOn(this.scheduler).log(this.log.getName(), Level.FINEST) .doOnSubscribe((s) -> this.log.debug("Subscribed to {} events", this.eventType)).ofType(this.eventType) .cast(this.eventType).transform(this::handle) - .retryWhen(Retry.indefinitely().doBeforeRetry((s) -> this.log.warn("Unexpected error", s.failure()))) - .subscribe(); + .onErrorContinue((throwable, o) -> this.log.warn("Unexpected error", throwable)).subscribe(); } protected abstract Publisher handle(Flux publisher); diff --git a/spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/IntervalCheck.java b/spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/IntervalCheck.java index a1591b5b873..3993755f0b3 100644 --- a/spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/IntervalCheck.java +++ b/spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/IntervalCheck.java @@ -25,8 +25,9 @@ import javax.annotation.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -43,18 +44,20 @@ * * @author Johannes Edmeier */ +@Slf4j public class IntervalCheck { - private static final Logger log = LoggerFactory.getLogger(IntervalCheck.class); - private final String name; private final Map lastChecked = new ConcurrentHashMap<>(); private final Function> checkFn; + @Getter + @Setter private Duration interval; + @Setter private Duration minRetention; @Nullable @@ -82,7 +85,7 @@ public void start() { .log(log.getName(), Level.FINEST).subscribeOn(this.scheduler).concatMap((i) -> this.checkAllInstances()) .retryWhen(Retry.indefinitely() .doBeforeRetry((s) -> log.warn("Unexpected error in {}-check", this.name, s.failure()))) - .subscribe(); + .subscribe(null, (error) -> log.error("Unexpected error in {}-check", name, error)); } public void markAsChecked(InstanceId instanceId) { @@ -92,7 +95,7 @@ public void markAsChecked(InstanceId instanceId) { protected Mono checkAllInstances() { log.debug("check {} for all instances", this.name); Instant expiration = Instant.now().minus(this.minRetention); - return Flux.fromIterable(this.lastChecked.entrySet()).filter((e) -> e.getValue().isBefore(expiration)) + return Flux.fromIterable(this.lastChecked.entrySet()).filter((entry) -> entry.getValue().isBefore(expiration)) .map(Map.Entry::getKey).flatMap(this.checkFn).then(); } @@ -107,12 +110,4 @@ public void stop() { } } - public void setInterval(Duration interval) { - this.interval = interval; - } - - public void setMinRetention(Duration minRetention) { - this.minRetention = minRetention; - } - } diff --git a/spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/StatusUpdateTrigger.java b/spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/StatusUpdateTrigger.java index fc6e7f7012a..ef15a919033 100644 --- a/spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/StatusUpdateTrigger.java +++ b/spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/StatusUpdateTrigger.java @@ -52,10 +52,11 @@ protected Publisher handle(Flux publisher) { } protected Mono updateStatus(InstanceId instanceId) { - return this.statusUpdater.updateStatus(instanceId).onErrorResume((e) -> { - log.warn("Unexpected error while updating status for {}", instanceId, e); - return Mono.empty(); - }).doFinally((s) -> this.intervalCheck.markAsChecked(instanceId)); + return this.statusUpdater.timeout(this.intervalCheck.getInterval()).updateStatus(instanceId) + .onErrorResume((e) -> { + log.warn("Unexpected error while updating status for {}", instanceId, e); + return Mono.empty(); + }).doFinally((s) -> this.intervalCheck.markAsChecked(instanceId)); } @Override diff --git a/spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/StatusUpdater.java b/spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/StatusUpdater.java index 4ca7ed6d728..2b5c11f7c57 100644 --- a/spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/StatusUpdater.java +++ b/spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/StatusUpdater.java @@ -16,13 +16,14 @@ package de.codecentric.boot.admin.server.services; +import java.time.Duration; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.logging.Level; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; @@ -44,10 +45,10 @@ * * @author Johannes Edmeier */ +@Slf4j +@RequiredArgsConstructor public class StatusUpdater { - private static final Logger log = LoggerFactory.getLogger(StatusUpdater.class); - private static final ParameterizedTypeReference> RESPONSE_TYPE = new ParameterizedTypeReference>() { }; @@ -57,16 +58,15 @@ public class StatusUpdater { private final ApiMediaTypeHandler apiMediaTypeHandler; - public StatusUpdater(InstanceRepository repository, InstanceWebClient instanceWebClient, - ApiMediaTypeHandler apiMediaTypeHandler) { - this.repository = repository; - this.instanceWebClient = instanceWebClient; - this.apiMediaTypeHandler = apiMediaTypeHandler; + private Duration timeout = Duration.ofSeconds(10); + + public StatusUpdater timeout(Duration timeout) { + this.timeout = timeout; + return this; } public Mono updateStatus(InstanceId id) { return this.repository.computeIfPresent(id, (key, instance) -> this.doUpdateStatus(instance)).then(); - } protected Mono doUpdateStatus(Instance instance) { @@ -77,8 +77,16 @@ protected Mono doUpdateStatus(Instance instance) { log.debug("Update status for {}", instance); return this.instanceWebClient.instance(instance).get().uri(Endpoint.HEALTH) .exchangeToMono(this::convertStatusInfo).log(log.getName(), Level.FINEST) - .doOnError((ex) -> logError(instance, ex)).onErrorResume(this::handleError) - .map(instance::withStatusInfo); + .timeout(getTimeoutWithMargin()).doOnError((ex) -> logError(instance, ex)) + .onErrorResume(this::handleError).map(instance::withStatusInfo); + } + + /* + * return a timeout less than the given one to prevent backdrops in concurrent get + * request. This prevents flakyness of health checks. + */ + private Duration getTimeoutWithMargin() { + return this.timeout.minusSeconds(1).abs(); } protected Mono convertStatusInfo(ClientResponse response) { diff --git a/spring-boot-admin-server/src/test/java/de/codecentric/boot/admin/server/services/StatusUpdateTriggerTest.java b/spring-boot-admin-server/src/test/java/de/codecentric/boot/admin/server/services/StatusUpdateTriggerTest.java index 3032a6ff89d..0b2ae6736dc 100644 --- a/spring-boot-admin-server/src/test/java/de/codecentric/boot/admin/server/services/StatusUpdateTriggerTest.java +++ b/spring-boot-admin-server/src/test/java/de/codecentric/boot/admin/server/services/StatusUpdateTriggerTest.java @@ -56,6 +56,7 @@ public class StatusUpdateTriggerTest { @BeforeEach public void setUp() throws Exception { when(this.updater.updateStatus(any(InstanceId.class))).thenReturn(Mono.empty()); + when(this.updater.timeout(any())).thenReturn(this.updater); this.trigger = new StatusUpdateTrigger(this.updater, this.events.flux()); this.trigger.start();