From 685c31d31e6c5f6ebc681a02d540087466b8cf25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pere=20Fern=C3=A1ndez?= Date: Mon, 26 Aug 2024 10:00:15 +0200 Subject: [PATCH] incubator-kie-issues#458: Create JPA Storage for `Job Service` --- jobs-service/jobs-service-storage-jpa/pom.xml | 122 ++++++++++ .../jpa/JPAReactiveJobRepository.java | 223 ++++++++++++++++++ ...eactiveJobServiceManagementRepository.java | 133 +++++++++++ .../jpa/converter/JsonBinaryConverter.java | 50 ++++ .../jpa/model/JobDetailsEntity.java | 188 +++++++++++++++ .../jpa/model/JobServiceManagementEntity.java | 66 ++++++ .../JobDetailsEntityRepository.java | 30 +++ .../JobServiceManagementEntityRepository.java | 30 +++ .../jpa/utils/ReactiveRepositoryHelper.java | 51 ++++ .../src/main/resources/META-INF/beans.xml | 20 ++ .../db/jobs-service/V2.0.0__Create_Tables.sql | 46 ++++ .../jpa/JPAReactiveJobRepositoryTest.java | 48 ++++ ...iveJobServiceManagementRepositoryTest.java | 107 +++++++++ .../service/resource/JPAJobResourceTest.java | 29 +++ .../src/test/resources/application.properties | 32 +++ jobs-service/pom.xml | 1 + kogito-apps-bom/pom.xml | 11 + 17 files changed, 1187 insertions(+) create mode 100644 jobs-service/jobs-service-storage-jpa/pom.xml create mode 100644 jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepository.java create mode 100644 jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepository.java create mode 100644 jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/converter/JsonBinaryConverter.java create mode 100644 jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobDetailsEntity.java create mode 100644 jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobServiceManagementEntity.java create mode 100644 jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobDetailsEntityRepository.java create mode 100644 jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobServiceManagementEntityRepository.java create mode 100644 jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/utils/ReactiveRepositoryHelper.java create mode 100644 jobs-service/jobs-service-storage-jpa/src/main/resources/META-INF/beans.xml create mode 100644 jobs-service/jobs-service-storage-jpa/src/main/resources/db/jobs-service/V2.0.0__Create_Tables.sql create mode 100644 jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepositoryTest.java create mode 100644 jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepositoryTest.java create mode 100644 jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/resource/JPAJobResourceTest.java create mode 100644 jobs-service/jobs-service-storage-jpa/src/test/resources/application.properties diff --git a/jobs-service/jobs-service-storage-jpa/pom.xml b/jobs-service/jobs-service-storage-jpa/pom.xml new file mode 100644 index 0000000000..e756309a0f --- /dev/null +++ b/jobs-service/jobs-service-storage-jpa/pom.xml @@ -0,0 +1,122 @@ + + + + 4.0.0 + + org.kie.kogito + jobs-service + 999-SNAPSHOT + + + jobs-service-storage-jpa + Kogito Apps :: Jobs Service :: Storage :: JPA + Jobs Service (Timers and Async Jobs) JPA Storage + + + org.kie.kogito.job.service.repository.jpa + + + + + org.kie.kogito + jobs-service-common + + + jakarta.persistence + jakarta.persistence-api + + + io.smallrye.reactive + mutiny-zero-flow-adapters + + + io.quarkus + quarkus-hibernate-orm-panache + + + io.quarkus + quarkus-jdbc-h2 + + + io.quarkus + quarkus-flyway + + + org.kie.kogito + jobs-service-common + test-jar + test + + + io.quarkus + quarkus-test-h2 + test + + + org.kie.kogito + kogito-quarkus-test-utils + test + + + io.quarkus + quarkus-junit5 + test + + + org.mockito + mockito-junit-jupiter + test + + + org.mockito + mockito-core + test + + + org.assertj + assertj-core + test + + + org.awaitility + awaitility + test + + + io.rest-assured + rest-assured + test + + + org.keycloak + keycloak-core + test + + + com.github.tomakehurst + wiremock-jre8 + test + + + \ No newline at end of file diff --git a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepository.java b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepository.java new file mode 100644 index 0000000000..e67ed85081 --- /dev/null +++ b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepository.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.kie.kogito.jobs.service.repository.jpa; + +import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletionStage; +import java.util.function.Function; + +import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; +import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; +import org.kie.kogito.jackson.utils.ObjectMapperFactory; +import org.kie.kogito.jobs.service.model.JobDetails; +import org.kie.kogito.jobs.service.model.JobStatus; +import org.kie.kogito.jobs.service.repository.ReactiveJobRepository; +import org.kie.kogito.jobs.service.repository.impl.BaseReactiveJobRepository; +import org.kie.kogito.jobs.service.repository.jpa.model.JobDetailsEntity; +import org.kie.kogito.jobs.service.repository.jpa.repository.JobDetailsEntityRepository; +import org.kie.kogito.jobs.service.repository.jpa.utils.ReactiveRepositoryHelper; +import org.kie.kogito.jobs.service.repository.marshaller.RecipientMarshaller; +import org.kie.kogito.jobs.service.repository.marshaller.TriggerMarshaller; +import org.kie.kogito.jobs.service.stream.JobEventPublisher; +import org.kie.kogito.jobs.service.utils.DateUtil; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.quarkus.panache.common.Parameters; +import io.quarkus.panache.common.Sort; +import io.smallrye.mutiny.Multi; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import static java.time.OffsetDateTime.now; +import static mutiny.zero.flow.adapters.AdaptersToReactiveStreams.publisher; +import static org.kie.kogito.jobs.service.utils.DateUtil.DEFAULT_ZONE; + +@ApplicationScoped +public class JPAReactiveJobRepository extends BaseReactiveJobRepository implements ReactiveJobRepository { + + private static final String JOBS_BETWEEN_FIRE_TIMES_QUERY = "select job " + + "from JobDetailsEntity job " + + "where job.fireTime between :from and :to and job.status in :status"; + + private final JobDetailsEntityRepository repository; + private final ReactiveRepositoryHelper reactiveRepositoryHelper; + + private final TriggerMarshaller triggerMarshaller; + private final RecipientMarshaller recipientMarshaller; + + JPAReactiveJobRepository() { + this(null, null, null, null, null, null); + } + + @Inject + public JPAReactiveJobRepository(Vertx vertx, JobEventPublisher jobEventPublisher, JobDetailsEntityRepository repository, + ReactiveRepositoryHelper reactiveRepositoryHelper, + TriggerMarshaller triggerMarshaller, RecipientMarshaller recipientMarshaller) { + super(vertx, jobEventPublisher); + this.repository = repository; + this.reactiveRepositoryHelper = reactiveRepositoryHelper; + this.triggerMarshaller = triggerMarshaller; + this.recipientMarshaller = recipientMarshaller; + } + + @Override + public CompletionStage doSave(JobDetails job) { + return this.reactiveRepositoryHelper.runAsync(() -> persist(job)) + .thenApply(this::from); + } + + private JobDetailsEntity persist(JobDetails job) { + JobDetailsEntity jobDetailsInstance = repository.findByIdOptional(job.getId()).orElseGet(JobDetailsEntity::new); + + merge(job, jobDetailsInstance); + + repository.persist(jobDetailsInstance); + + return jobDetailsInstance; + } + + @Override + public CompletionStage get(String id) { + return this.reactiveRepositoryHelper.runAsync(() -> repository.findById(id)) + .thenApply(this::from); + } + + @Override + public CompletionStage exists(String id) { + return this.reactiveRepositoryHelper.runAsync(() -> repository.findByIdOptional(id)) + .thenApply(Optional::isPresent); + } + + @Override + public CompletionStage delete(String id) { + return this.reactiveRepositoryHelper.runAsync(() -> this.deleteJob(id)) + .thenApply(this::from); + + } + + private JobDetailsEntity deleteJob(String id) { + JobDetailsEntity jobDetailsInstance = repository.findById(id); + + if (Objects.isNull(jobDetailsInstance)) { + return null; + } + + repository.delete(jobDetailsInstance); + + return jobDetailsInstance; + } + + String toColumName(SortTermField field) { + return switch (field) { + case FIRE_TIME -> "fireTime"; + case CREATED -> "created"; + case ID -> "id"; + default -> throw new IllegalArgumentException("No colum name is defined for field: " + field); + }; + } + + @Override + public PublisherBuilder findByStatusBetweenDates(ZonedDateTime fromFireTime, + ZonedDateTime toFireTime, + JobStatus[] status, + SortTerm[] orderBy) { + + Parameters params = Parameters.with("from", fromFireTime.toOffsetDateTime()) + .and("to", toFireTime.toOffsetDateTime()) + .and("status", Arrays.stream(status).map(Enum::toString).toList()); + + Sort sort = Sort.empty(); + + Arrays.stream(orderBy).forEach(sortTerm -> { + String columnName = toColumName(sortTerm.getField()); + sort.and(columnName, sortTerm.isAsc() ? Sort.Direction.Ascending : Sort.Direction.Descending); + }); + + return ReactiveStreams.fromPublisher(publisher(Multi.createFrom() + .completionStage(this.reactiveRepositoryHelper.runAsync(() -> repository.list(JOBS_BETWEEN_FIRE_TIMES_QUERY, sort, params.map()))) + .flatMap(jobDetailsEntities -> Multi.createFrom().iterable(jobDetailsEntities)) + .map(this::from))); + + } + + JobDetailsEntity merge(JobDetails job, JobDetailsEntity instance) { + if (Objects.isNull(instance)) { + instance = new JobDetailsEntity(); + } + + ObjectMapper mapper = ObjectMapperFactory.get(); + + instance.setId(job.getId()); + instance.setCorrelationId(job.getCorrelationId()); + instance.setStatus(mapOptionalValue(job.getStatus(), Enum::name)); + instance.setLastUpdate(now()); + instance.setRetries(job.getRetries()); + instance.setExecutionCounter(job.getExecutionCounter()); + instance.setScheduledId(job.getScheduledId()); + instance.setPriority(job.getPriority()); + + instance.setRecipient(mapOptionalValue(job.getRecipient(), recipient -> mapper.valueToTree(recipientMarshaller.marshall(recipient).getMap()))); + instance.setTrigger(mapOptionalValue(job.getTrigger(), trigger -> mapper.valueToTree(triggerMarshaller.marshall(job.getTrigger()).getMap()))); + instance.setFireTime(mapOptionalValue(job.getTrigger().hasNextFireTime(), DateUtil::dateToOffsetDateTime)); + + instance.setExecutionTimeout(job.getExecutionTimeout()); + instance.setExecutionTimeoutUnit(mapOptionalValue(job.getExecutionTimeoutUnit(), Enum::name)); + + instance.setCreated(Optional.ofNullable(job.getCreated()).map(ZonedDateTime::toOffsetDateTime).orElse(now())); + + return instance; + } + + JobDetails from(JobDetailsEntity instance) { + if (instance == null) { + return null; + } + + return JobDetails.builder() + .id(instance.getId()) + .correlationId(instance.getCorrelationId()) + .status(mapOptionalValue(instance.getStatus(), JobStatus::valueOf)) + .lastUpdate(instance.getLastUpdate().atZoneSameInstant(DEFAULT_ZONE)) + .retries(instance.getRetries()) + .executionCounter(instance.getExecutionCounter()) + .scheduledId(instance.getScheduledId()) + .priority(instance.getPriority()) + .recipient(mapOptionalValue(instance.getRecipient(), recipient -> recipientMarshaller.unmarshall(JsonObject.mapFrom(recipient)))) + .trigger(mapOptionalValue(instance.getTrigger(), trigger -> triggerMarshaller.unmarshall(JsonObject.mapFrom(trigger)))) + .executionTimeout(instance.getExecutionTimeout()) + .executionTimeoutUnit(mapOptionalValue(instance.getExecutionTimeoutUnit(), ChronoUnit::valueOf)) + .created(instance.getCreated().atZoneSameInstant(DEFAULT_ZONE)) + .build(); + } + + private R mapOptionalValue(T object, Function mapper) { + return Optional.ofNullable(object) + .map(mapper) + .orElse(null); + } +} diff --git a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepository.java b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepository.java new file mode 100644 index 0000000000..0634dc8442 --- /dev/null +++ b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepository.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.kie.kogito.jobs.service.repository.jpa; + +import java.util.Objects; +import java.util.function.Function; + +import org.kie.kogito.jobs.service.model.JobServiceManagementInfo; +import org.kie.kogito.jobs.service.repository.JobServiceManagementRepository; +import org.kie.kogito.jobs.service.repository.jpa.model.JobServiceManagementEntity; +import org.kie.kogito.jobs.service.repository.jpa.repository.JobServiceManagementEntityRepository; +import org.kie.kogito.jobs.service.repository.jpa.utils.ReactiveRepositoryHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.quarkus.panache.common.Parameters; +import io.smallrye.mutiny.Uni; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import static java.time.OffsetDateTime.now; + +@ApplicationScoped +public class JPAReactiveJobServiceManagementRepository implements JobServiceManagementRepository { + + private static final Logger LOGGER = LoggerFactory.getLogger(JPAReactiveJobServiceManagementRepository.class); + + private final JobServiceManagementEntityRepository repository; + private final ReactiveRepositoryHelper reactiveRepositoryHelper; + + @Inject + public JPAReactiveJobServiceManagementRepository(JobServiceManagementEntityRepository repository, + ReactiveRepositoryHelper reactiveRepositoryHelper) { + this.repository = repository; + this.reactiveRepositoryHelper = reactiveRepositoryHelper; + } + + @Override + public Uni getAndUpdate(String id, Function computeUpdate) { + LOGGER.info("get {}", id); + return Uni.createFrom() + .completionStage(this.reactiveRepositoryHelper.runAsync(() -> doGetAndUpdate(id, computeUpdate))) + .onItem().ifNotNull().invoke(info -> LOGGER.trace("got {}", info)); + } + + private JobServiceManagementInfo doGetAndUpdate(String id, Function computeUpdate) { + + JobServiceManagementInfo info = this.repository.findByIdOptional(id) + .map(this::from) + .orElse(null); + + return this.update(computeUpdate.apply(info)); + } + + @Override + public Uni set(JobServiceManagementInfo info) { + LOGGER.info("set {}", info); + return Uni.createFrom().completionStage(this.reactiveRepositoryHelper.runAsync(() -> this.doSet(info))); + } + + public JobServiceManagementInfo doSet(JobServiceManagementInfo info) { + return this.update(info); + } + + private JobServiceManagementInfo update(JobServiceManagementInfo info) { + + if (Objects.isNull(info)) { + return null; + } + + JobServiceManagementEntity jobService = this.repository.findByIdOptional(info.getId()).orElse(new JobServiceManagementEntity()); + + jobService.setId(info.getId()); + jobService.setToken(info.getToken()); + jobService.setLastHeartBeat(info.getLastHeartbeat()); + + repository.persist(jobService); + + return from(jobService); + } + + @Override + public Uni heartbeat(JobServiceManagementInfo info) { + return Uni.createFrom().completionStage(this.reactiveRepositoryHelper.runAsync(() -> this.doHeartbeat(info))); + } + + private JobServiceManagementEntity findById(String id) { + return repository.findById(id); + } + + private JobServiceManagementEntity findByIdAndToken(JobServiceManagementInfo info) { + return repository.find("#JobServiceManagementEntity.GetServiceByIdAndToken", Parameters.with("id", info.getId()).and("token", info.getToken()).map()) + .firstResultOptional().orElse(null); + } + + private JobServiceManagementInfo doHeartbeat(JobServiceManagementInfo info) { + JobServiceManagementEntity jobService = findByIdAndToken(info); + + if (jobService == null) { + return null; + } + + jobService.setLastHeartBeat(now()); + repository.persist(jobService); + + return from(jobService); + } + + JobServiceManagementInfo from(JobServiceManagementEntity jobService) { + if (Objects.isNull(jobService)) { + return null; + } + return new JobServiceManagementInfo(jobService.getId(), jobService.getToken(), jobService.getLastHeartBeat()); + } +} diff --git a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/converter/JsonBinaryConverter.java b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/converter/JsonBinaryConverter.java new file mode 100644 index 0000000000..b269ab9eaf --- /dev/null +++ b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/converter/JsonBinaryConverter.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.jobs.service.repository.jpa.converter; + +import java.io.IOException; +import java.io.UncheckedIOException; + +import org.kie.kogito.jackson.utils.ObjectMapperFactory; + +import com.fasterxml.jackson.databind.node.ObjectNode; + +import jakarta.persistence.AttributeConverter; + +public class JsonBinaryConverter implements AttributeConverter { + + @Override + public byte[] convertToDatabaseColumn(ObjectNode attribute) { + try { + return attribute == null ? null : ObjectMapperFactory.get().writeValueAsBytes(attribute); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public ObjectNode convertToEntityAttribute(byte[] dbData) { + try { + return dbData == null ? null : ObjectMapperFactory.get().readValue(dbData, ObjectNode.class); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + +} diff --git a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobDetailsEntity.java b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobDetailsEntity.java new file mode 100644 index 0000000000..9c1b317b47 --- /dev/null +++ b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobDetailsEntity.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.jobs.service.repository.jpa.model; + +import java.time.OffsetDateTime; + +import org.kie.kogito.jobs.service.repository.jpa.converter.JsonBinaryConverter; + +import com.fasterxml.jackson.databind.node.ObjectNode; + +import jakarta.persistence.*; + +@Entity +@Table(name = "job_details", + indexes = { + @Index(name = "job_details_fire_time_idx", columnList = "fire_time"), + @Index(name = "job_details_created_idx", columnList = "created") + }) +public class JobDetailsEntity { + + @Id + private String id; + + @Column(name = "correlation_id") + private String correlationId; + + private String status; + + @Column(name = "last_update") + @Temporal(TemporalType.TIMESTAMP) + private OffsetDateTime lastUpdate; + + private Integer retries; + + @Column(name = "execution_counter") + private Integer executionCounter; + + @Column(name = "scheduled_id") + private String scheduledId; + + private Integer priority; + + @Convert(converter = JsonBinaryConverter.class) + private ObjectNode recipient; + + @Convert(converter = JsonBinaryConverter.class) + private ObjectNode trigger; + + @Column(name = "fire_time") + @Temporal(TemporalType.TIMESTAMP) + private OffsetDateTime fireTime; + + @Column(name = "execution_timeout") + private Long executionTimeout; + @Column(name = "execution_timeout_unit") + private String executionTimeoutUnit; + + @Temporal(TemporalType.TIMESTAMP) + private OffsetDateTime created; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getCorrelationId() { + return correlationId; + } + + public void setCorrelationId(String correlationId) { + this.correlationId = correlationId; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public OffsetDateTime getLastUpdate() { + return lastUpdate; + } + + public void setLastUpdate(OffsetDateTime lastUpdate) { + this.lastUpdate = lastUpdate; + } + + public Integer getRetries() { + return retries; + } + + public void setRetries(Integer retries) { + this.retries = retries; + } + + public Integer getExecutionCounter() { + return executionCounter; + } + + public void setExecutionCounter(Integer executionCounter) { + this.executionCounter = executionCounter; + } + + public String getScheduledId() { + return scheduledId; + } + + public void setScheduledId(String scheduledId) { + this.scheduledId = scheduledId; + } + + public Integer getPriority() { + return priority; + } + + public void setPriority(Integer priority) { + this.priority = priority; + } + + public ObjectNode getRecipient() { + return recipient; + } + + public void setRecipient(ObjectNode recipient) { + this.recipient = recipient; + } + + public ObjectNode getTrigger() { + return trigger; + } + + public void setTrigger(ObjectNode trigger) { + this.trigger = trigger; + } + + public OffsetDateTime getFireTime() { + return fireTime; + } + + public void setFireTime(OffsetDateTime fireTime) { + this.fireTime = fireTime; + } + + public Long getExecutionTimeout() { + return executionTimeout; + } + + public void setExecutionTimeout(Long executionTimeout) { + this.executionTimeout = executionTimeout; + } + + public String getExecutionTimeoutUnit() { + return executionTimeoutUnit; + } + + public void setExecutionTimeoutUnit(String executionTimeoutUnit) { + this.executionTimeoutUnit = executionTimeoutUnit; + } + + public OffsetDateTime getCreated() { + return created; + } + + public void setCreated(OffsetDateTime created) { + this.created = created; + } +} diff --git a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobServiceManagementEntity.java b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobServiceManagementEntity.java new file mode 100644 index 0000000000..77ac0a67ac --- /dev/null +++ b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobServiceManagementEntity.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.kie.kogito.jobs.service.repository.jpa.model; + +import java.time.OffsetDateTime; + +import jakarta.persistence.*; + +@Entity +@NamedQuery(name = "JobServiceManagementEntity.GetServiceByIdAndToken", + query = "select service " + + "from JobServiceManagementEntity service " + + "where service.id = :id and service.token = :token") +@Table(name = "job_service_management") +public class JobServiceManagementEntity { + + @Id + private String id; + + @Column(name = "last_heartbeat") + @Temporal(TemporalType.TIMESTAMP) + private OffsetDateTime lastHeartBeat; + + private String token; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public OffsetDateTime getLastHeartBeat() { + return lastHeartBeat; + } + + public void setLastHeartBeat(OffsetDateTime lastHeartBeat) { + this.lastHeartBeat = lastHeartBeat; + } + + public String getToken() { + return token; + } + + public void setToken(String token) { + this.token = token; + } +} diff --git a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobDetailsEntityRepository.java b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobDetailsEntityRepository.java new file mode 100644 index 0000000000..dd1003f635 --- /dev/null +++ b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobDetailsEntityRepository.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.kie.kogito.jobs.service.repository.jpa.repository; + +import org.kie.kogito.jobs.service.repository.jpa.model.JobDetailsEntity; + +import io.quarkus.hibernate.orm.panache.PanacheRepositoryBase; + +import jakarta.enterprise.context.ApplicationScoped; + +@ApplicationScoped +public class JobDetailsEntityRepository implements PanacheRepositoryBase { +} diff --git a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobServiceManagementEntityRepository.java b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobServiceManagementEntityRepository.java new file mode 100644 index 0000000000..1a528ccb27 --- /dev/null +++ b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobServiceManagementEntityRepository.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.kie.kogito.jobs.service.repository.jpa.repository; + +import org.kie.kogito.jobs.service.repository.jpa.model.JobServiceManagementEntity; + +import io.quarkus.hibernate.orm.panache.PanacheRepositoryBase; + +import jakarta.enterprise.context.ApplicationScoped; + +@ApplicationScoped +public class JobServiceManagementEntityRepository implements PanacheRepositoryBase { +} diff --git a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/utils/ReactiveRepositoryHelper.java b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/utils/ReactiveRepositoryHelper.java new file mode 100644 index 0000000000..9dab31a5b2 --- /dev/null +++ b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/utils/ReactiveRepositoryHelper.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.kie.kogito.jobs.service.repository.jpa.utils; + +import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; + +import io.quarkus.narayana.jta.QuarkusTransaction; +import io.quarkus.narayana.jta.TransactionRunnerOptions; +import io.vertx.core.Vertx; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +@ApplicationScoped +public class ReactiveRepositoryHelper { + + private Vertx vertx; + + @Inject + public ReactiveRepositoryHelper(Vertx vertx) { + this.vertx = vertx; + } + + public CompletionStage runAsync(Supplier blockingFunction) { + return vertx.executeBlocking(() -> wrapInTransaction(blockingFunction)).toCompletionStage(); + } + + private T wrapInTransaction(Supplier function) { + TransactionRunnerOptions runner = QuarkusTransaction.isActive() ? QuarkusTransaction.joiningExisting() : QuarkusTransaction.requiringNew(); + + return runner.call(function::get); + } +} diff --git a/jobs-service/jobs-service-storage-jpa/src/main/resources/META-INF/beans.xml b/jobs-service/jobs-service-storage-jpa/src/main/resources/META-INF/beans.xml new file mode 100644 index 0000000000..a0eb9fbf8c --- /dev/null +++ b/jobs-service/jobs-service-storage-jpa/src/main/resources/META-INF/beans.xml @@ -0,0 +1,20 @@ + diff --git a/jobs-service/jobs-service-storage-jpa/src/main/resources/db/jobs-service/V2.0.0__Create_Tables.sql b/jobs-service/jobs-service-storage-jpa/src/main/resources/db/jobs-service/V2.0.0__Create_Tables.sql new file mode 100644 index 0000000000..b48a32a95c --- /dev/null +++ b/jobs-service/jobs-service-storage-jpa/src/main/resources/db/jobs-service/V2.0.0__Create_Tables.sql @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +create table job_details +( + id varchar(50) primary key, + correlation_id varchar(50), + status varchar(40), + last_update timestamp, + retries integer, + execution_counter integer, + scheduled_id varchar(40), + priority integer, + recipient varbinary(max), + trigger varbinary(max), + fire_time timestamp, + execution_timeout bigint, + execution_timeout_unit varchar(40), + created timestamp +); + +create index job_details_fire_time_idx on job_details (fire_time); +create index job_details_created_idx on job_details (created); + +CREATE TABLE job_service_management +( + id varchar(40) primary key, + last_heartbeat timestamp, + token varchar(40) unique +); \ No newline at end of file diff --git a/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepositoryTest.java b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepositoryTest.java new file mode 100644 index 0000000000..d1fe3cafee --- /dev/null +++ b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepositoryTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.jobs.service.repository.jpa; + +import org.junit.jupiter.api.BeforeEach; +import org.kie.kogito.jobs.service.repository.ReactiveJobRepository; +import org.kie.kogito.jobs.service.repository.impl.BaseJobRepositoryTest; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.h2.H2DatabaseTestResource; +import io.quarkus.test.junit.QuarkusTest; + +import jakarta.inject.Inject; + +@QuarkusTest +@QuarkusTestResource(H2DatabaseTestResource.class) +public class JPAReactiveJobRepositoryTest extends BaseJobRepositoryTest { + + @Inject + JPAReactiveJobRepository tested; + + @BeforeEach + public void setUp() throws Exception { + + super.setUp(); + } + + @Override + public ReactiveJobRepository tested() { + return tested; + } +} diff --git a/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepositoryTest.java b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepositoryTest.java new file mode 100644 index 0000000000..a0a18cab45 --- /dev/null +++ b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepositoryTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.jobs.service.repository.jpa; + +import java.time.OffsetDateTime; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.kie.kogito.jobs.service.model.JobServiceManagementInfo; +import org.kie.kogito.jobs.service.repository.JobServiceManagementRepository; +import org.kie.kogito.jobs.service.utils.DateUtil; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.h2.H2DatabaseTestResource; +import io.quarkus.test.junit.QuarkusTest; + +import jakarta.inject.Inject; + +import static org.assertj.core.api.Assertions.assertThat; + +@QuarkusTest +@QuarkusTestResource(H2DatabaseTestResource.class) +class JPAReactiveJobServiceManagementRepositoryTest { + + @Inject + JobServiceManagementRepository tested; + + @BeforeEach + void setUp() { + } + + @Test + void testGetAndUpdate() { + String id = "instance-id-1"; + String token = "token1"; + create(id, token); + + AtomicReference date = new AtomicReference<>(); + JobServiceManagementInfo updated = tested.getAndUpdate(id, info -> { + date.set(DateUtil.now().toOffsetDateTime()); + info.setLastHeartbeat(date.get()); + return info; + }).await().indefinitely(); + assertThat(updated.getId()).isEqualTo(id); + assertThat(date.get()).isNotNull(); + assertThat(updated.getLastHeartbeat()).isEqualTo(date.get()); + assertThat(updated.getToken()).isEqualTo(token); + } + + @Test + void testGetAndUpdateNotExisting() { + String id = "instance-id-2"; + AtomicReference found = new AtomicReference<>(new JobServiceManagementInfo()); + JobServiceManagementInfo updated = tested.getAndUpdate(id, info -> { + found.set(info); + return info; + }).await().indefinitely(); + assertThat(updated).isNull(); + assertThat(found.get()).isNull(); + } + + private JobServiceManagementInfo create(String id, String token) { + JobServiceManagementInfo created = tested.set(new JobServiceManagementInfo(id, token, null)).await().indefinitely(); + assertThat(created.getId()).isEqualTo(id); + assertThat(created.getToken()).isEqualTo(token); + assertThat(created.getLastHeartbeat()).isNull(); + return created; + } + + @Test + void testHeartbeat() { + String id = "instance-id-3"; + String token = "token3"; + JobServiceManagementInfo created = create(id, token); + + JobServiceManagementInfo updated = tested.heartbeat(created).await().indefinitely(); + assertThat(updated.getLastHeartbeat()).isNotNull(); + assertThat(updated.getLastHeartbeat()).isBefore(DateUtil.now().plusSeconds(1).toOffsetDateTime()); + } + + @Test + void testConflictHeartbeat() { + String id = "instance-id-4"; + String token = "token4"; + create(id, token); + + JobServiceManagementInfo updated = tested.heartbeat(new JobServiceManagementInfo(id, "differentToken", null)).await().indefinitely(); + assertThat(updated).isNull(); + } +} diff --git a/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/resource/JPAJobResourceTest.java b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/resource/JPAJobResourceTest.java new file mode 100644 index 0000000000..0fdc67d712 --- /dev/null +++ b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/resource/JPAJobResourceTest.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.jobs.service.resource; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.h2.H2DatabaseTestResource; +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +@QuarkusTestResource(H2DatabaseTestResource.class) +public class JPAJobResourceTest extends BaseJobResourceTest { + +} diff --git a/jobs-service/jobs-service-storage-jpa/src/test/resources/application.properties b/jobs-service/jobs-service-storage-jpa/src/test/resources/application.properties new file mode 100644 index 0000000000..ef10d96215 --- /dev/null +++ b/jobs-service/jobs-service-storage-jpa/src/test/resources/application.properties @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# Kogito +kogito.apps.persistence.type=jdbc +# Data source +quarkus.datasource.db-kind=h2 +quarkus.datasource.username=kogito +quarkus.datasource.jdbc.url=jdbc:h2:mem:default;NON_KEYWORDS=VALUE,KEY +quarkus.flyway.migrate-at-start=true +quarkus.flyway.clean-at-start=true +quarkus.flyway.locations=db/jobs-service +# Disabling Security for tests +quarkus.oidc.enabled=false +quarkus.oidc.tenant-enabled=false +quarkus.oidc.auth-server-url=none +quarkus.keycloak.devservices.enabled=false \ No newline at end of file diff --git a/jobs-service/pom.xml b/jobs-service/pom.xml index df5f21f771..8eb5f0ff2a 100644 --- a/jobs-service/pom.xml +++ b/jobs-service/pom.xml @@ -39,6 +39,7 @@ jobs-recipients jobs-service-common jobs-service-postgresql-common + jobs-service-storage-jpa jobs-service-postgresql jobs-service-inmemory kogito-addons-jobs-service diff --git a/kogito-apps-bom/pom.xml b/kogito-apps-bom/pom.xml index 5e903d4972..774cd46967 100644 --- a/kogito-apps-bom/pom.xml +++ b/kogito-apps-bom/pom.xml @@ -78,6 +78,17 @@ ${project.version} sources + + org.kie.kogito + jobs-service-storage-jpa + ${project.version} + + + org.kie.kogito + jobs-service-storage-jpa + ${project.version} + sources + org.kie.kogito jobs-service-postgresql