Skip to content

Commit

Permalink
kie-kogito-apps-2069: Improve Jobs Service liveness check to limit th…
Browse files Browse the repository at this point in the history
…e amount of time to get the leader status
  • Loading branch information
wmedvede committed Aug 9, 2024
1 parent a0812e4 commit 758ebee
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ protected Uni<JobServiceManagementInfo> tryBecomeLeader(JobServiceManagementInfo

protected Uni<Void> release(JobServiceManagementInfo info) {
leader.set(false);
return repository.set(new JobServiceManagementInfo(info.getId(), null, null))
return repository.release(info)
.onItem().invoke(this::disableCommunication)
.onItem().invoke(i -> LOGGER.info("Leader instance released"))
.onFailure().invoke(ex -> LOGGER.error("Error releasing leader"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.jobs.service.management;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.health.HealthCheck;
import org.eclipse.microprofile.health.HealthCheckResponse;
import org.eclipse.microprofile.health.HealthCheckResponseBuilder;
import org.eclipse.microprofile.health.Liveness;

import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;

@Liveness
@ApplicationScoped
public class JobServiceLeaderLivenessHealthCheck implements HealthCheck {

private final AtomicBoolean enabled = new AtomicBoolean(false);

private final AtomicLong startTime = new AtomicLong();

private static final String EXPIRATION_IN_SECONDS = "kogito.jobs-service.management.leader-check.expiration-in-seconds";

@ConfigProperty(name = EXPIRATION_IN_SECONDS, defaultValue = "-1")
long expirationInSeconds;

@PostConstruct
void init() {
startTime.set(getCurrentTimeMillis());
}

@Override
public HealthCheckResponse call() {
final HealthCheckResponseBuilder responseBuilder = HealthCheckResponse.named("Get Leader Instance Timeout");
if (hasExpired() && !enabled.get()) {
return responseBuilder.down().build();
}
return responseBuilder.up().build();
}

boolean hasExpired() {
return (expirationInSeconds > 0) && (getCurrentTimeMillis() - startTime.get()) > (expirationInSeconds * 1000);
}

protected void onMessagingStatusChange(@Observes MessagingChangeEvent event) {
this.enabled.set(event.isEnabled());
startTime.set(getCurrentTimeMillis());
}

/**
* Facilitates testing
*/
long getCurrentTimeMillis() {
return System.currentTimeMillis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public interface JobServiceManagementRepository {

Uni<JobServiceManagementInfo> set(JobServiceManagementInfo info);

Uni<Boolean> release(JobServiceManagementInfo info);

Uni<JobServiceManagementInfo> heartbeat(JobServiceManagementInfo info);

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,10 @@ public Uni<JobServiceManagementInfo> heartbeat(JobServiceManagementInfo info) {
info.setLastHeartbeat(DateUtil.now().toOffsetDateTime());
return set(info);
}

@Override
public Uni<Boolean> release(JobServiceManagementInfo info) {
instance.set(new JobServiceManagementInfo(info.getId(), null, null));
return Uni.createFrom().item(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ quarkus.http.port=8080
mp.openapi.filter=org.kie.kogito.jobs.service.openapi.JobServiceModelFilter

# Job Service
quarkus.smallrye-health.check."org.kie.kogito.jobs.service.management.JobServiceLeaderLivenessHealthCheck".enabled=false

kogito.jobs-service.maxIntervalLimitToRetryMillis=60000
kogito.jobs-service.backoffRetryMillis=1000
kogito.jobs-service.schedulerChunkInMinutes=10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Stream;

Expand All @@ -46,8 +47,10 @@
import jakarta.enterprise.inject.Instance;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -112,7 +115,7 @@ void onShutdown() {

verify(tested, times(1)).release(infoCaptor.capture());
assertThat(infoCaptor.getValue()).isEqualTo(tested.getCurrentInfo());
verify(repository, times(1)).set(new JobServiceManagementInfo());
verify(repository, times(1)).release(tested.getCurrentInfo());
}

@Test
Expand Down Expand Up @@ -153,7 +156,18 @@ void heartbeatNotLeader() {
@Test
void heartbeatLeader() {
tested.startup(startupEvent);
tested.heartbeat(tested.getCurrentInfo()).await().indefinitely();
verify(repository).heartbeat(tested.getCurrentInfo());
await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> {
assertThat(tested.isLeader()).isTrue();
});
await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> {
verify(repository, atLeastOnce()).heartbeat(infoCaptor.capture());
});
JobServiceManagementInfo lastHeartbeat = infoCaptor.getValue();
assertThat(lastHeartbeat).isNotNull();
assertThat(lastHeartbeat.getId()).isEqualTo(tested.getCurrentInfo().getId());
assertThat(lastHeartbeat.getToken()).isEqualTo(tested.getCurrentInfo().getToken());
assertThat(lastHeartbeat.getLastHeartbeat()).isNotNull();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.jobs.service.management;

import org.eclipse.microprofile.health.HealthCheckResponse;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

class JobServiceLeaderLivenessHealthCheckTest {

private static final long START_TIME = 1234;

private JobServiceLeaderLivenessHealthCheck healthCheck;

@BeforeEach
void setUp() {
healthCheck = spy(new JobServiceLeaderLivenessHealthCheck());
doReturn(START_TIME).when(healthCheck).getCurrentTimeMillis();
healthCheck.init();
}

@Test
void timeoutNotSet() {
doReturn(START_TIME + 1000 * 50).when(healthCheck).getCurrentTimeMillis();
assertThat(healthCheck.call().getStatus())
.isNotNull()
.isEqualTo(HealthCheckResponse.Status.UP);
}

@Test
void timeoutSetButNotReached() {
healthCheck.expirationInSeconds = 60;
doReturn(START_TIME + 1000 * 10).when(healthCheck).getCurrentTimeMillis();
assertThat(healthCheck.call().getStatus())
.isNotNull()
.isEqualTo(HealthCheckResponse.Status.UP);
}

@Test
void timeoutSetAndReached() {
healthCheck.expirationInSeconds = 60;
doReturn(START_TIME + 1000 * 60 + 1).when(healthCheck).getCurrentTimeMillis();
assertThat(healthCheck.call().getStatus())
.isNotNull()
.isEqualTo(HealthCheckResponse.Status.DOWN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.pgclient.PgPool;
import io.vertx.mutiny.sqlclient.Row;
import io.vertx.mutiny.sqlclient.RowIterator;
import io.vertx.mutiny.sqlclient.RowSet;
import io.vertx.mutiny.sqlclient.SqlClient;
import io.vertx.mutiny.sqlclient.Tuple;
Expand Down Expand Up @@ -99,4 +100,13 @@ public Uni<JobServiceManagementInfo> heartbeat(JobServiceManagementInfo info) {
.onItem().transform(iterator -> iterator.hasNext() ? from(iterator.next()) : null)
.onItem().invoke(r -> LOGGER.trace("Heartbeat {}", r)));
}

@Override
public Uni<Boolean> release(JobServiceManagementInfo info) {
return client.withTransaction(conn -> conn
.preparedQuery("UPDATE job_service_management SET token = null, last_heartbeat = null WHERE id = $1 AND token = $2 RETURNING id, token, last_heartbeat")
.execute(Tuple.of(info.getId(), info.getToken()))
.onItem().transform(RowSet::iterator)
.onItem().transform(RowIterator::hasNext));
}
}

0 comments on commit 758ebee

Please sign in to comment.