From 59777df1682b33ac7cbd20e4998a5ddd9f586c77 Mon Sep 17 00:00:00 2001 From: Walter Medvedeo Date: Wed, 7 Aug 2024 12:51:32 +0200 Subject: [PATCH 1/5] kie-kogito-apps-2069: Improve Jobs Service liveness check to limit the amount of time to get the leader status --- .../management/JobServiceInstanceManager.java | 2 +- .../JobServiceLeaderLivenessHealthCheck.java | 76 +++++++++++++++++++ .../JobServiceManagementRepository.java | 2 + ...DefaultJobServiceManagementRepository.java | 6 ++ .../META-INF/microprofile-config.properties | 2 + .../JobServiceInstanceManagerTest.java | 20 ++++- ...bServiceLeaderLivenessHealthCheckTest.java | 67 ++++++++++++++++ ...tgreSqlJobServiceManagementRepository.java | 10 +++ 8 files changed, 181 insertions(+), 4 deletions(-) create mode 100644 jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheck.java create mode 100644 jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheckTest.java diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManager.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManager.java index fea4350a02..3e986ba48d 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManager.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManager.java @@ -174,7 +174,7 @@ protected Uni tryBecomeLeader(JobServiceManagementInfo protected Uni release(JobServiceManagementInfo info) { leader.set(false); - return repository.set(new JobServiceManagementInfo(info.getId(), null, null)) + return repository.release(info) .onItem().invoke(this::disableCommunication) .onItem().invoke(i -> LOGGER.info("Leader instance released")) .onFailure().invoke(ex -> LOGGER.error("Error releasing leader")) diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheck.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheck.java new file mode 100644 index 0000000000..30c9998726 --- /dev/null +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheck.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.jobs.service.management; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.eclipse.microprofile.health.HealthCheck; +import org.eclipse.microprofile.health.HealthCheckResponse; +import org.eclipse.microprofile.health.HealthCheckResponseBuilder; +import org.eclipse.microprofile.health.Liveness; + +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; + +@Liveness +@ApplicationScoped +public class JobServiceLeaderLivenessHealthCheck implements HealthCheck { + + private final AtomicBoolean enabled = new AtomicBoolean(false); + + private final AtomicLong startTime = new AtomicLong(); + + private static final String EXPIRATION_IN_SECONDS = "kogito.jobs-service.management.leader-check.expiration-in-seconds"; + + @ConfigProperty(name = EXPIRATION_IN_SECONDS, defaultValue = "-1") + long expirationInSeconds; + + @PostConstruct + void init() { + startTime.set(getCurrentTimeMillis()); + } + + @Override + public HealthCheckResponse call() { + final HealthCheckResponseBuilder responseBuilder = HealthCheckResponse.named("Get Leader Instance Timeout"); + if (hasExpired() && !enabled.get()) { + return responseBuilder.down().build(); + } + return responseBuilder.up().build(); + } + + boolean hasExpired() { + return (expirationInSeconds > 0) && (getCurrentTimeMillis() - startTime.get()) > (expirationInSeconds * 1000); + } + + protected void onMessagingStatusChange(@Observes MessagingChangeEvent event) { + this.enabled.set(event.isEnabled()); + startTime.set(getCurrentTimeMillis()); + } + + /** + * Facilitates testing + */ + long getCurrentTimeMillis() { + return System.currentTimeMillis(); + } +} diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/JobServiceManagementRepository.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/JobServiceManagementRepository.java index 405c6de6e5..6b17c370e7 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/JobServiceManagementRepository.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/JobServiceManagementRepository.java @@ -30,6 +30,8 @@ public interface JobServiceManagementRepository { Uni set(JobServiceManagementInfo info); + Uni release(JobServiceManagementInfo info); + Uni heartbeat(JobServiceManagementInfo info); } diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/DefaultJobServiceManagementRepository.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/DefaultJobServiceManagementRepository.java index 7a7124127c..8d377e0f9c 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/DefaultJobServiceManagementRepository.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/DefaultJobServiceManagementRepository.java @@ -52,4 +52,10 @@ public Uni heartbeat(JobServiceManagementInfo info) { info.setLastHeartbeat(DateUtil.now().toOffsetDateTime()); return set(info); } + + @Override + public Uni release(JobServiceManagementInfo info) { + instance.set(new JobServiceManagementInfo(info.getId(), null, null)); + return Uni.createFrom().item(true); + } } diff --git a/jobs-service/jobs-service-common/src/main/resources/META-INF/microprofile-config.properties b/jobs-service/jobs-service-common/src/main/resources/META-INF/microprofile-config.properties index a24e7828c7..c958aa2a12 100644 --- a/jobs-service/jobs-service-common/src/main/resources/META-INF/microprofile-config.properties +++ b/jobs-service/jobs-service-common/src/main/resources/META-INF/microprofile-config.properties @@ -39,6 +39,8 @@ quarkus.http.port=8080 mp.openapi.filter=org.kie.kogito.jobs.service.openapi.JobServiceModelFilter # Job Service +quarkus.smallrye-health.check."org.kie.kogito.jobs.service.management.JobServiceLeaderLivenessHealthCheck".enabled=false + kogito.jobs-service.maxIntervalLimitToRetryMillis=60000 kogito.jobs-service.backoffRetryMillis=1000 kogito.jobs-service.schedulerChunkInMinutes=10 diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManagerTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManagerTest.java index 5fada45266..c099717b4f 100644 --- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManagerTest.java +++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManagerTest.java @@ -20,6 +20,7 @@ import java.time.OffsetDateTime; import java.util.Arrays; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Stream; @@ -46,8 +47,10 @@ import jakarta.enterprise.inject.Instance; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -112,7 +115,7 @@ void onShutdown() { verify(tested, times(1)).release(infoCaptor.capture()); assertThat(infoCaptor.getValue()).isEqualTo(tested.getCurrentInfo()); - verify(repository, times(1)).set(new JobServiceManagementInfo()); + verify(repository, times(1)).release(tested.getCurrentInfo()); } @Test @@ -153,7 +156,18 @@ void heartbeatNotLeader() { @Test void heartbeatLeader() { tested.startup(startupEvent); - tested.heartbeat(tested.getCurrentInfo()).await().indefinitely(); - verify(repository).heartbeat(tested.getCurrentInfo()); + await().atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> { + assertThat(tested.isLeader()).isTrue(); + }); + await().atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> { + verify(repository, atLeastOnce()).heartbeat(infoCaptor.capture()); + }); + JobServiceManagementInfo lastHeartbeat = infoCaptor.getValue(); + assertThat(lastHeartbeat).isNotNull(); + assertThat(lastHeartbeat.getId()).isEqualTo(tested.getCurrentInfo().getId()); + assertThat(lastHeartbeat.getToken()).isEqualTo(tested.getCurrentInfo().getToken()); + assertThat(lastHeartbeat.getLastHeartbeat()).isNotNull(); } } diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheckTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheckTest.java new file mode 100644 index 0000000000..156e27b6c5 --- /dev/null +++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheckTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.jobs.service.management; + +import org.eclipse.microprofile.health.HealthCheckResponse; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + +class JobServiceLeaderLivenessHealthCheckTest { + + private static final long START_TIME = 1234; + + private JobServiceLeaderLivenessHealthCheck healthCheck; + + @BeforeEach + void setUp() { + healthCheck = spy(new JobServiceLeaderLivenessHealthCheck()); + doReturn(START_TIME).when(healthCheck).getCurrentTimeMillis(); + healthCheck.init(); + } + + @Test + void timeoutNotSet() { + doReturn(START_TIME + 1000 * 50).when(healthCheck).getCurrentTimeMillis(); + assertThat(healthCheck.call().getStatus()) + .isNotNull() + .isEqualTo(HealthCheckResponse.Status.UP); + } + + @Test + void timeoutSetButNotReached() { + healthCheck.expirationInSeconds = 60; + doReturn(START_TIME + 1000 * 10).when(healthCheck).getCurrentTimeMillis(); + assertThat(healthCheck.call().getStatus()) + .isNotNull() + .isEqualTo(HealthCheckResponse.Status.UP); + } + + @Test + void timeoutSetAndReached() { + healthCheck.expirationInSeconds = 60; + doReturn(START_TIME + 1000 * 60 + 1).when(healthCheck).getCurrentTimeMillis(); + assertThat(healthCheck.call().getStatus()) + .isNotNull() + .isEqualTo(HealthCheckResponse.Status.DOWN); + } +} diff --git a/jobs-service/jobs-service-postgresql-common/src/main/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobServiceManagementRepository.java b/jobs-service/jobs-service-postgresql-common/src/main/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobServiceManagementRepository.java index 92f4f68e9e..f5141e0236 100644 --- a/jobs-service/jobs-service-postgresql-common/src/main/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobServiceManagementRepository.java +++ b/jobs-service/jobs-service-postgresql-common/src/main/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobServiceManagementRepository.java @@ -31,6 +31,7 @@ import io.smallrye.mutiny.Uni; import io.vertx.mutiny.pgclient.PgPool; import io.vertx.mutiny.sqlclient.Row; +import io.vertx.mutiny.sqlclient.RowIterator; import io.vertx.mutiny.sqlclient.RowSet; import io.vertx.mutiny.sqlclient.SqlClient; import io.vertx.mutiny.sqlclient.Tuple; @@ -99,4 +100,13 @@ public Uni heartbeat(JobServiceManagementInfo info) { .onItem().transform(iterator -> iterator.hasNext() ? from(iterator.next()) : null) .onItem().invoke(r -> LOGGER.trace("Heartbeat {}", r))); } + + @Override + public Uni release(JobServiceManagementInfo info) { + return client.withTransaction(conn -> conn + .preparedQuery("UPDATE job_service_management SET token = null, last_heartbeat = null WHERE id = $1 AND token = $2 RETURNING id, token, last_heartbeat") + .execute(Tuple.of(info.getId(), info.getToken())) + .onItem().transform(RowSet::iterator) + .onItem().transform(RowIterator::hasNext)); + } } From d89b36b6989ab0744c2713e3711316ecf84f08ca Mon Sep 17 00:00:00 2001 From: Walter Medvedeo Date: Mon, 2 Sep 2024 14:09:31 +0200 Subject: [PATCH 2/5] Update jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheckTest.java MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Gonzalo Muñoz --- .../JobServiceLeaderLivenessHealthCheckTest.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheckTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheckTest.java index 156e27b6c5..bcf618c45c 100644 --- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheckTest.java +++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheckTest.java @@ -64,4 +64,14 @@ void timeoutSetAndReached() { .isNotNull() .isEqualTo(HealthCheckResponse.Status.DOWN); } + + @Test + void statusChanged() { + healthCheck.onMessagingStatusChange(new MessagingChangeEvent(true)); + doReturn(START_TIME + 1000 * 10).when(healthCheck).getCurrentTimeMillis(); + HealthCheckResponse response = healthCheck.call(); + assertThat(response.getStatus()) + .isNotNull() + .isEqualTo(HealthCheckResponse.Status.UP); + } } From 2b2e08677ae873a1229eda1b80b8dc97c53721a1 Mon Sep 17 00:00:00 2001 From: Walter Medvedeo Date: Tue, 3 Sep 2024 12:06:39 +0200 Subject: [PATCH 3/5] Apply formatting --- .../management/JobServiceLeaderLivenessHealthCheckTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheckTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheckTest.java index bcf618c45c..9d19a8897d 100644 --- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheckTest.java +++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceLeaderLivenessHealthCheckTest.java @@ -64,7 +64,7 @@ void timeoutSetAndReached() { .isNotNull() .isEqualTo(HealthCheckResponse.Status.DOWN); } - + @Test void statusChanged() { healthCheck.onMessagingStatusChange(new MessagingChangeEvent(true)); From dda0d8c054267702449fe13a48c9b0013ef18063 Mon Sep 17 00:00:00 2001 From: Walter Medvedeo Date: Tue, 3 Sep 2024 21:05:00 +0200 Subject: [PATCH 4/5] Tests improvements --- .../impl/VertxTimerServiceSchedulerTest.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/VertxTimerServiceSchedulerTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/VertxTimerServiceSchedulerTest.java index b6c1421977..a48199817c 100644 --- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/VertxTimerServiceSchedulerTest.java +++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/VertxTimerServiceSchedulerTest.java @@ -62,7 +62,6 @@ class VertxTimerServiceSchedulerTest { private Job job; private JobContext context; private Trigger trigger; - private JobDetails jobDetails; @Mock private JobExecutorResolver jobExecutorResolver; @@ -89,19 +88,23 @@ public void setUp() { @Test void testScheduleJob() { - ZonedDateTime time = DateUtil.now().plusSeconds(1); - final ManageableJobHandle handle = schedule(time); + JobDetails jobDetails = JobDetails.builder().build(); doReturn(jobExecutor).when(jobExecutorResolver).get(any()); JobExecutionResponse response = new JobExecutionResponse(); Uni result = Uni.createFrom().item(response); PublisherBuilder executionSuccessPublisherBuilder = ReactiveStreams.of(jobDetails); doReturn(executionSuccessPublisherBuilder).when(reactiveJobScheduler).handleJobExecutionSuccess(response); doReturn(result).when(jobExecutor).execute(jobDetails); + ZonedDateTime time = DateUtil.now().plusSeconds(1); + final ManageableJobHandle handle = schedule(jobDetails, time); verify(vertx).setTimer(timeCaptor.capture(), any()); assertThat(timeCaptor.getValue()).isGreaterThanOrEqualTo(time.toInstant().minusMillis(System.currentTimeMillis()).toEpochMilli()); given().await() .atMost(2, TimeUnit.SECONDS) .untilAsserted(() -> verify(jobExecutorResolver).get(jobCaptor.capture())); + given().await() + .atMost(2, TimeUnit.SECONDS) + .untilAsserted(() -> verify(reactiveJobScheduler).handleJobExecutionSuccess(response)); assertThat(jobCaptor.getValue()).isEqualTo(jobDetails); assertThat(handle.isCancel()).isFalse(); assertThat(handle.getScheduledTime()).isNotNull(); @@ -109,7 +112,8 @@ void testScheduleJob() { @Test void testRemoveScheduleJob() { - final ManageableJobHandle handle = schedule(DateUtil.now().plusHours(1)); + JobDetails jobDetails = JobDetails.builder().build(); + final ManageableJobHandle handle = schedule(jobDetails, DateUtil.now().plusHours(1)); verify(vertx).setTimer(timeCaptor.capture(), any()); given().await() .atMost(1, TimeUnit.SECONDS) @@ -120,10 +124,9 @@ void testRemoveScheduleJob() { assertThat(tested.removeJob(handle)).isTrue(); } - private ManageableJobHandle schedule(ZonedDateTime time) { + private ManageableJobHandle schedule(JobDetails jobDetails, ZonedDateTime time) { final long timestamp = time.toInstant().toEpochMilli(); trigger = new PointInTimeTrigger(timestamp, null, null); - jobDetails = JobDetails.builder().build(); context = new JobDetailsContext(jobDetails); job = new DelegateJob(jobExecutorResolver, reactiveJobScheduler); return tested.scheduleJob(job, context, trigger); From 8ac4db77d34f1e8bb855fc6db018f3a32e1ccbfc Mon Sep 17 00:00:00 2001 From: Walter Medvedeo Date: Wed, 4 Sep 2024 10:14:32 +0200 Subject: [PATCH 5/5] JPAReactiveJobServiceManagementRepository updates --- ...eactiveJobServiceManagementRepository.java | 16 +++++++++++++++ ...iveJobServiceManagementRepositoryTest.java | 20 +++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepository.java b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepository.java index 0634dc8442..2156126141 100644 --- a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepository.java +++ b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepository.java @@ -102,6 +102,11 @@ public Uni heartbeat(JobServiceManagementInfo info) { return Uni.createFrom().completionStage(this.reactiveRepositoryHelper.runAsync(() -> this.doHeartbeat(info))); } + @Override + public Uni release(JobServiceManagementInfo info) { + return Uni.createFrom().completionStage(this.reactiveRepositoryHelper.runAsync(() -> this.doRelease(info))); + } + private JobServiceManagementEntity findById(String id) { return repository.findById(id); } @@ -124,6 +129,17 @@ private JobServiceManagementInfo doHeartbeat(JobServiceManagementInfo info) { return from(jobService); } + private Boolean doRelease(JobServiceManagementInfo info) { + JobServiceManagementEntity jobService = findByIdAndToken(info); + if (jobService == null) { + return false; + } + jobService.setToken(null); + jobService.setLastHeartBeat(null); + repository.persist(jobService); + return true; + } + JobServiceManagementInfo from(JobServiceManagementEntity jobService) { if (Objects.isNull(jobService)) { return null; diff --git a/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepositoryTest.java b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepositoryTest.java index a0a18cab45..4ca80a3df7 100644 --- a/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepositoryTest.java +++ b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepositoryTest.java @@ -104,4 +104,24 @@ void testConflictHeartbeat() { JobServiceManagementInfo updated = tested.heartbeat(new JobServiceManagementInfo(id, "differentToken", null)).await().indefinitely(); assertThat(updated).isNull(); } + + @Test + void testRelease() { + String id = "instance-id-5"; + String token = "token5"; + JobServiceManagementInfo created = create(id, token); + + Boolean released = tested.release(created).await().indefinitely(); + assertThat(released).isTrue(); + } + + @Test + void testReleaseNotExisting() { + String id = "instance-id-6"; + String token = "token6"; + JobServiceManagementInfo notExisting = new JobServiceManagementInfo(id, token, OffsetDateTime.now()); + + Boolean released = tested.release(notExisting).await().indefinitely(); + assertThat(released).isFalse(); + } }