From 758ebee3b43c42a425d230b58240d57aded7349a Mon Sep 17 00:00:00 2001 From: Walter Medvedeo Date: Wed, 7 Aug 2024 12:51:32 +0200 Subject: [PATCH] kie-kogito-apps-2069: Improve Jobs Service liveness check to limit the amount of time to get the leader status --- .../management/JobServiceInstanceManager.java | 2 +- .../JobServiceLeaderLivenessHealthCheck.java | 76 +++++++++++++++++++ .../JobServiceManagementRepository.java | 2 + ...DefaultJobServiceManagementRepository.java | 6 ++ .../META-INF/microprofile-config.properties | 2 + .../JobServiceInstanceManagerTest.java | 20 ++++- ...bServiceLeaderLivenessHealthCheckTest.java | 67 ++++++++++++++++ ...tgreSqlJobServiceManagementRepository.java | 10 +++ 8 files changed, 181 insertions(+), 4 deletions(-) create mode 100644 jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheck.java create mode 100644 jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheckTest.java diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManager.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManager.java index fea4350a02..3e986ba48d 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManager.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManager.java @@ -174,7 +174,7 @@ protected Uni tryBecomeLeader(JobServiceManagementInfo protected Uni release(JobServiceManagementInfo info) { leader.set(false); - return repository.set(new JobServiceManagementInfo(info.getId(), null, null)) + return repository.release(info) .onItem().invoke(this::disableCommunication) .onItem().invoke(i -> LOGGER.info("Leader instance released")) .onFailure().invoke(ex -> LOGGER.error("Error releasing leader")) diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheck.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheck.java new file mode 100644 index 0000000000..30c9998726 --- /dev/null +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheck.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.jobs.service.management; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.eclipse.microprofile.health.HealthCheck; +import org.eclipse.microprofile.health.HealthCheckResponse; +import org.eclipse.microprofile.health.HealthCheckResponseBuilder; +import org.eclipse.microprofile.health.Liveness; + +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; + +@Liveness +@ApplicationScoped +public class JobServiceLeaderLivenessHealthCheck implements HealthCheck { + + private final AtomicBoolean enabled = new AtomicBoolean(false); + + private final AtomicLong startTime = new AtomicLong(); + + private static final String EXPIRATION_IN_SECONDS = "kogito.jobs-service.management.leader-check.expiration-in-seconds"; + + @ConfigProperty(name = EXPIRATION_IN_SECONDS, defaultValue = "-1") + long expirationInSeconds; + + @PostConstruct + void init() { + startTime.set(getCurrentTimeMillis()); + } + + @Override + public HealthCheckResponse call() { + final HealthCheckResponseBuilder responseBuilder = HealthCheckResponse.named("Get Leader Instance Timeout"); + if (hasExpired() && !enabled.get()) { + return responseBuilder.down().build(); + } + return responseBuilder.up().build(); + } + + boolean hasExpired() { + return (expirationInSeconds > 0) && (getCurrentTimeMillis() - startTime.get()) > (expirationInSeconds * 1000); + } + + protected void onMessagingStatusChange(@Observes MessagingChangeEvent event) { + this.enabled.set(event.isEnabled()); + startTime.set(getCurrentTimeMillis()); + } + + /** + * Facilitates testing + */ + long getCurrentTimeMillis() { + return System.currentTimeMillis(); + } +} diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/JobServiceManagementRepository.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/JobServiceManagementRepository.java index 405c6de6e5..6b17c370e7 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/JobServiceManagementRepository.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/JobServiceManagementRepository.java @@ -30,6 +30,8 @@ public interface JobServiceManagementRepository { Uni set(JobServiceManagementInfo info); + Uni release(JobServiceManagementInfo info); + Uni heartbeat(JobServiceManagementInfo info); } diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/DefaultJobServiceManagementRepository.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/DefaultJobServiceManagementRepository.java index 7a7124127c..8d377e0f9c 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/DefaultJobServiceManagementRepository.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/DefaultJobServiceManagementRepository.java @@ -52,4 +52,10 @@ public Uni heartbeat(JobServiceManagementInfo info) { info.setLastHeartbeat(DateUtil.now().toOffsetDateTime()); return set(info); } + + @Override + public Uni release(JobServiceManagementInfo info) { + instance.set(new JobServiceManagementInfo(info.getId(), null, null)); + return Uni.createFrom().item(true); + } } diff --git a/jobs-service/jobs-service-common/src/main/resources/META-INF/microprofile-config.properties b/jobs-service/jobs-service-common/src/main/resources/META-INF/microprofile-config.properties index a24e7828c7..c958aa2a12 100644 --- a/jobs-service/jobs-service-common/src/main/resources/META-INF/microprofile-config.properties +++ b/jobs-service/jobs-service-common/src/main/resources/META-INF/microprofile-config.properties @@ -39,6 +39,8 @@ quarkus.http.port=8080 mp.openapi.filter=org.kie.kogito.jobs.service.openapi.JobServiceModelFilter # Job Service +quarkus.smallrye-health.check."org.kie.kogito.jobs.service.management.JobServiceLeaderLivenessHealthCheck".enabled=false + kogito.jobs-service.maxIntervalLimitToRetryMillis=60000 kogito.jobs-service.backoffRetryMillis=1000 kogito.jobs-service.schedulerChunkInMinutes=10 diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManagerTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManagerTest.java index 5fada45266..c099717b4f 100644 --- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManagerTest.java +++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManagerTest.java @@ -20,6 +20,7 @@ import java.time.OffsetDateTime; import java.util.Arrays; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Stream; @@ -46,8 +47,10 @@ import jakarta.enterprise.inject.Instance; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -112,7 +115,7 @@ void onShutdown() { verify(tested, times(1)).release(infoCaptor.capture()); assertThat(infoCaptor.getValue()).isEqualTo(tested.getCurrentInfo()); - verify(repository, times(1)).set(new JobServiceManagementInfo()); + verify(repository, times(1)).release(tested.getCurrentInfo()); } @Test @@ -153,7 +156,18 @@ void heartbeatNotLeader() { @Test void heartbeatLeader() { tested.startup(startupEvent); - tested.heartbeat(tested.getCurrentInfo()).await().indefinitely(); - verify(repository).heartbeat(tested.getCurrentInfo()); + await().atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> { + assertThat(tested.isLeader()).isTrue(); + }); + await().atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> { + verify(repository, atLeastOnce()).heartbeat(infoCaptor.capture()); + }); + JobServiceManagementInfo lastHeartbeat = infoCaptor.getValue(); + assertThat(lastHeartbeat).isNotNull(); + assertThat(lastHeartbeat.getId()).isEqualTo(tested.getCurrentInfo().getId()); + assertThat(lastHeartbeat.getToken()).isEqualTo(tested.getCurrentInfo().getToken()); + assertThat(lastHeartbeat.getLastHeartbeat()).isNotNull(); } } diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheckTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheckTest.java new file mode 100644 index 0000000000..156e27b6c5 --- /dev/null +++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheckTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.jobs.service.management; + +import org.eclipse.microprofile.health.HealthCheckResponse; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + +class JobServiceLeaderLivenessHealthCheckTest { + + private static final long START_TIME = 1234; + + private JobServiceLeaderLivenessHealthCheck healthCheck; + + @BeforeEach + void setUp() { + healthCheck = spy(new JobServiceLeaderLivenessHealthCheck()); + doReturn(START_TIME).when(healthCheck).getCurrentTimeMillis(); + healthCheck.init(); + } + + @Test + void timeoutNotSet() { + doReturn(START_TIME + 1000 * 50).when(healthCheck).getCurrentTimeMillis(); + assertThat(healthCheck.call().getStatus()) + .isNotNull() + .isEqualTo(HealthCheckResponse.Status.UP); + } + + @Test + void timeoutSetButNotReached() { + healthCheck.expirationInSeconds = 60; + doReturn(START_TIME + 1000 * 10).when(healthCheck).getCurrentTimeMillis(); + assertThat(healthCheck.call().getStatus()) + .isNotNull() + .isEqualTo(HealthCheckResponse.Status.UP); + } + + @Test + void timeoutSetAndReached() { + healthCheck.expirationInSeconds = 60; + doReturn(START_TIME + 1000 * 60 + 1).when(healthCheck).getCurrentTimeMillis(); + assertThat(healthCheck.call().getStatus()) + .isNotNull() + .isEqualTo(HealthCheckResponse.Status.DOWN); + } +} diff --git a/jobs-service/jobs-service-postgresql-common/src/main/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobServiceManagementRepository.java b/jobs-service/jobs-service-postgresql-common/src/main/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobServiceManagementRepository.java index 92f4f68e9e..f5141e0236 100644 --- a/jobs-service/jobs-service-postgresql-common/src/main/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobServiceManagementRepository.java +++ b/jobs-service/jobs-service-postgresql-common/src/main/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobServiceManagementRepository.java @@ -31,6 +31,7 @@ import io.smallrye.mutiny.Uni; import io.vertx.mutiny.pgclient.PgPool; import io.vertx.mutiny.sqlclient.Row; +import io.vertx.mutiny.sqlclient.RowIterator; import io.vertx.mutiny.sqlclient.RowSet; import io.vertx.mutiny.sqlclient.SqlClient; import io.vertx.mutiny.sqlclient.Tuple; @@ -99,4 +100,13 @@ public Uni heartbeat(JobServiceManagementInfo info) { .onItem().transform(iterator -> iterator.hasNext() ? from(iterator.next()) : null) .onItem().invoke(r -> LOGGER.trace("Heartbeat {}", r))); } + + @Override + public Uni release(JobServiceManagementInfo info) { + return client.withTransaction(conn -> conn + .preparedQuery("UPDATE job_service_management SET token = null, last_heartbeat = null WHERE id = $1 AND token = $2 RETURNING id, token, last_heartbeat") + .execute(Tuple.of(info.getId(), info.getToken())) + .onItem().transform(RowSet::iterator) + .onItem().transform(RowIterator::hasNext)); + } }