diff --git a/jobs-service/jobs-service-storage-jpa/pom.xml b/jobs-service/jobs-service-storage-jpa/pom.xml
new file mode 100644
index 0000000000..e756309a0f
--- /dev/null
+++ b/jobs-service/jobs-service-storage-jpa/pom.xml
@@ -0,0 +1,122 @@
+
+
+
+ 4.0.0
+
+ org.kie.kogito
+ jobs-service
+ 999-SNAPSHOT
+
+
+ jobs-service-storage-jpa
+ Kogito Apps :: Jobs Service :: Storage :: JPA
+ Jobs Service (Timers and Async Jobs) JPA Storage
+
+
+ org.kie.kogito.job.service.repository.jpa
+
+
+
+
+ org.kie.kogito
+ jobs-service-common
+
+
+ jakarta.persistence
+ jakarta.persistence-api
+
+
+ io.smallrye.reactive
+ mutiny-zero-flow-adapters
+
+
+ io.quarkus
+ quarkus-hibernate-orm-panache
+
+
+ io.quarkus
+ quarkus-jdbc-h2
+
+
+ io.quarkus
+ quarkus-flyway
+
+
+ org.kie.kogito
+ jobs-service-common
+ test-jar
+ test
+
+
+ io.quarkus
+ quarkus-test-h2
+ test
+
+
+ org.kie.kogito
+ kogito-quarkus-test-utils
+ test
+
+
+ io.quarkus
+ quarkus-junit5
+ test
+
+
+ org.mockito
+ mockito-junit-jupiter
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+ org.awaitility
+ awaitility
+ test
+
+
+ io.rest-assured
+ rest-assured
+ test
+
+
+ org.keycloak
+ keycloak-core
+ test
+
+
+ com.github.tomakehurst
+ wiremock-jre8
+ test
+
+
+
\ No newline at end of file
diff --git a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepository.java b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepository.java
new file mode 100644
index 0000000000..e67ed85081
--- /dev/null
+++ b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepository.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.jpa;
+
+import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Function;
+
+import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
+import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
+import org.kie.kogito.jackson.utils.ObjectMapperFactory;
+import org.kie.kogito.jobs.service.model.JobDetails;
+import org.kie.kogito.jobs.service.model.JobStatus;
+import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
+import org.kie.kogito.jobs.service.repository.impl.BaseReactiveJobRepository;
+import org.kie.kogito.jobs.service.repository.jpa.model.JobDetailsEntity;
+import org.kie.kogito.jobs.service.repository.jpa.repository.JobDetailsEntityRepository;
+import org.kie.kogito.jobs.service.repository.jpa.utils.ReactiveRepositoryHelper;
+import org.kie.kogito.jobs.service.repository.marshaller.RecipientMarshaller;
+import org.kie.kogito.jobs.service.repository.marshaller.TriggerMarshaller;
+import org.kie.kogito.jobs.service.stream.JobEventPublisher;
+import org.kie.kogito.jobs.service.utils.DateUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.quarkus.panache.common.Parameters;
+import io.quarkus.panache.common.Sort;
+import io.smallrye.mutiny.Multi;
+import io.vertx.core.Vertx;
+import io.vertx.core.json.JsonObject;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+
+import static java.time.OffsetDateTime.now;
+import static mutiny.zero.flow.adapters.AdaptersToReactiveStreams.publisher;
+import static org.kie.kogito.jobs.service.utils.DateUtil.DEFAULT_ZONE;
+
+@ApplicationScoped
+public class JPAReactiveJobRepository extends BaseReactiveJobRepository implements ReactiveJobRepository {
+
+ private static final String JOBS_BETWEEN_FIRE_TIMES_QUERY = "select job " +
+ "from JobDetailsEntity job " +
+ "where job.fireTime between :from and :to and job.status in :status";
+
+ private final JobDetailsEntityRepository repository;
+ private final ReactiveRepositoryHelper reactiveRepositoryHelper;
+
+ private final TriggerMarshaller triggerMarshaller;
+ private final RecipientMarshaller recipientMarshaller;
+
+ JPAReactiveJobRepository() {
+ this(null, null, null, null, null, null);
+ }
+
+ @Inject
+ public JPAReactiveJobRepository(Vertx vertx, JobEventPublisher jobEventPublisher, JobDetailsEntityRepository repository,
+ ReactiveRepositoryHelper reactiveRepositoryHelper,
+ TriggerMarshaller triggerMarshaller, RecipientMarshaller recipientMarshaller) {
+ super(vertx, jobEventPublisher);
+ this.repository = repository;
+ this.reactiveRepositoryHelper = reactiveRepositoryHelper;
+ this.triggerMarshaller = triggerMarshaller;
+ this.recipientMarshaller = recipientMarshaller;
+ }
+
+ @Override
+ public CompletionStage doSave(JobDetails job) {
+ return this.reactiveRepositoryHelper.runAsync(() -> persist(job))
+ .thenApply(this::from);
+ }
+
+ private JobDetailsEntity persist(JobDetails job) {
+ JobDetailsEntity jobDetailsInstance = repository.findByIdOptional(job.getId()).orElseGet(JobDetailsEntity::new);
+
+ merge(job, jobDetailsInstance);
+
+ repository.persist(jobDetailsInstance);
+
+ return jobDetailsInstance;
+ }
+
+ @Override
+ public CompletionStage get(String id) {
+ return this.reactiveRepositoryHelper.runAsync(() -> repository.findById(id))
+ .thenApply(this::from);
+ }
+
+ @Override
+ public CompletionStage exists(String id) {
+ return this.reactiveRepositoryHelper.runAsync(() -> repository.findByIdOptional(id))
+ .thenApply(Optional::isPresent);
+ }
+
+ @Override
+ public CompletionStage delete(String id) {
+ return this.reactiveRepositoryHelper.runAsync(() -> this.deleteJob(id))
+ .thenApply(this::from);
+
+ }
+
+ private JobDetailsEntity deleteJob(String id) {
+ JobDetailsEntity jobDetailsInstance = repository.findById(id);
+
+ if (Objects.isNull(jobDetailsInstance)) {
+ return null;
+ }
+
+ repository.delete(jobDetailsInstance);
+
+ return jobDetailsInstance;
+ }
+
+ String toColumName(SortTermField field) {
+ return switch (field) {
+ case FIRE_TIME -> "fireTime";
+ case CREATED -> "created";
+ case ID -> "id";
+ default -> throw new IllegalArgumentException("No colum name is defined for field: " + field);
+ };
+ }
+
+ @Override
+ public PublisherBuilder findByStatusBetweenDates(ZonedDateTime fromFireTime,
+ ZonedDateTime toFireTime,
+ JobStatus[] status,
+ SortTerm[] orderBy) {
+
+ Parameters params = Parameters.with("from", fromFireTime.toOffsetDateTime())
+ .and("to", toFireTime.toOffsetDateTime())
+ .and("status", Arrays.stream(status).map(Enum::toString).toList());
+
+ Sort sort = Sort.empty();
+
+ Arrays.stream(orderBy).forEach(sortTerm -> {
+ String columnName = toColumName(sortTerm.getField());
+ sort.and(columnName, sortTerm.isAsc() ? Sort.Direction.Ascending : Sort.Direction.Descending);
+ });
+
+ return ReactiveStreams.fromPublisher(publisher(Multi.createFrom()
+ .completionStage(this.reactiveRepositoryHelper.runAsync(() -> repository.list(JOBS_BETWEEN_FIRE_TIMES_QUERY, sort, params.map())))
+ .flatMap(jobDetailsEntities -> Multi.createFrom().iterable(jobDetailsEntities))
+ .map(this::from)));
+
+ }
+
+ JobDetailsEntity merge(JobDetails job, JobDetailsEntity instance) {
+ if (Objects.isNull(instance)) {
+ instance = new JobDetailsEntity();
+ }
+
+ ObjectMapper mapper = ObjectMapperFactory.get();
+
+ instance.setId(job.getId());
+ instance.setCorrelationId(job.getCorrelationId());
+ instance.setStatus(mapOptionalValue(job.getStatus(), Enum::name));
+ instance.setLastUpdate(now());
+ instance.setRetries(job.getRetries());
+ instance.setExecutionCounter(job.getExecutionCounter());
+ instance.setScheduledId(job.getScheduledId());
+ instance.setPriority(job.getPriority());
+
+ instance.setRecipient(mapOptionalValue(job.getRecipient(), recipient -> mapper.valueToTree(recipientMarshaller.marshall(recipient).getMap())));
+ instance.setTrigger(mapOptionalValue(job.getTrigger(), trigger -> mapper.valueToTree(triggerMarshaller.marshall(job.getTrigger()).getMap())));
+ instance.setFireTime(mapOptionalValue(job.getTrigger().hasNextFireTime(), DateUtil::dateToOffsetDateTime));
+
+ instance.setExecutionTimeout(job.getExecutionTimeout());
+ instance.setExecutionTimeoutUnit(mapOptionalValue(job.getExecutionTimeoutUnit(), Enum::name));
+
+ instance.setCreated(Optional.ofNullable(job.getCreated()).map(ZonedDateTime::toOffsetDateTime).orElse(now()));
+
+ return instance;
+ }
+
+ JobDetails from(JobDetailsEntity instance) {
+ if (instance == null) {
+ return null;
+ }
+
+ return JobDetails.builder()
+ .id(instance.getId())
+ .correlationId(instance.getCorrelationId())
+ .status(mapOptionalValue(instance.getStatus(), JobStatus::valueOf))
+ .lastUpdate(instance.getLastUpdate().atZoneSameInstant(DEFAULT_ZONE))
+ .retries(instance.getRetries())
+ .executionCounter(instance.getExecutionCounter())
+ .scheduledId(instance.getScheduledId())
+ .priority(instance.getPriority())
+ .recipient(mapOptionalValue(instance.getRecipient(), recipient -> recipientMarshaller.unmarshall(JsonObject.mapFrom(recipient))))
+ .trigger(mapOptionalValue(instance.getTrigger(), trigger -> triggerMarshaller.unmarshall(JsonObject.mapFrom(trigger))))
+ .executionTimeout(instance.getExecutionTimeout())
+ .executionTimeoutUnit(mapOptionalValue(instance.getExecutionTimeoutUnit(), ChronoUnit::valueOf))
+ .created(instance.getCreated().atZoneSameInstant(DEFAULT_ZONE))
+ .build();
+ }
+
+ private R mapOptionalValue(T object, Function mapper) {
+ return Optional.ofNullable(object)
+ .map(mapper)
+ .orElse(null);
+ }
+}
diff --git a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepository.java b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepository.java
new file mode 100644
index 0000000000..0634dc8442
--- /dev/null
+++ b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepository.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.jpa;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+import org.kie.kogito.jobs.service.model.JobServiceManagementInfo;
+import org.kie.kogito.jobs.service.repository.JobServiceManagementRepository;
+import org.kie.kogito.jobs.service.repository.jpa.model.JobServiceManagementEntity;
+import org.kie.kogito.jobs.service.repository.jpa.repository.JobServiceManagementEntityRepository;
+import org.kie.kogito.jobs.service.repository.jpa.utils.ReactiveRepositoryHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.quarkus.panache.common.Parameters;
+import io.smallrye.mutiny.Uni;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+
+import static java.time.OffsetDateTime.now;
+
+@ApplicationScoped
+public class JPAReactiveJobServiceManagementRepository implements JobServiceManagementRepository {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(JPAReactiveJobServiceManagementRepository.class);
+
+ private final JobServiceManagementEntityRepository repository;
+ private final ReactiveRepositoryHelper reactiveRepositoryHelper;
+
+ @Inject
+ public JPAReactiveJobServiceManagementRepository(JobServiceManagementEntityRepository repository,
+ ReactiveRepositoryHelper reactiveRepositoryHelper) {
+ this.repository = repository;
+ this.reactiveRepositoryHelper = reactiveRepositoryHelper;
+ }
+
+ @Override
+ public Uni getAndUpdate(String id, Function computeUpdate) {
+ LOGGER.info("get {}", id);
+ return Uni.createFrom()
+ .completionStage(this.reactiveRepositoryHelper.runAsync(() -> doGetAndUpdate(id, computeUpdate)))
+ .onItem().ifNotNull().invoke(info -> LOGGER.trace("got {}", info));
+ }
+
+ private JobServiceManagementInfo doGetAndUpdate(String id, Function computeUpdate) {
+
+ JobServiceManagementInfo info = this.repository.findByIdOptional(id)
+ .map(this::from)
+ .orElse(null);
+
+ return this.update(computeUpdate.apply(info));
+ }
+
+ @Override
+ public Uni set(JobServiceManagementInfo info) {
+ LOGGER.info("set {}", info);
+ return Uni.createFrom().completionStage(this.reactiveRepositoryHelper.runAsync(() -> this.doSet(info)));
+ }
+
+ public JobServiceManagementInfo doSet(JobServiceManagementInfo info) {
+ return this.update(info);
+ }
+
+ private JobServiceManagementInfo update(JobServiceManagementInfo info) {
+
+ if (Objects.isNull(info)) {
+ return null;
+ }
+
+ JobServiceManagementEntity jobService = this.repository.findByIdOptional(info.getId()).orElse(new JobServiceManagementEntity());
+
+ jobService.setId(info.getId());
+ jobService.setToken(info.getToken());
+ jobService.setLastHeartBeat(info.getLastHeartbeat());
+
+ repository.persist(jobService);
+
+ return from(jobService);
+ }
+
+ @Override
+ public Uni heartbeat(JobServiceManagementInfo info) {
+ return Uni.createFrom().completionStage(this.reactiveRepositoryHelper.runAsync(() -> this.doHeartbeat(info)));
+ }
+
+ private JobServiceManagementEntity findById(String id) {
+ return repository.findById(id);
+ }
+
+ private JobServiceManagementEntity findByIdAndToken(JobServiceManagementInfo info) {
+ return repository.find("#JobServiceManagementEntity.GetServiceByIdAndToken", Parameters.with("id", info.getId()).and("token", info.getToken()).map())
+ .firstResultOptional().orElse(null);
+ }
+
+ private JobServiceManagementInfo doHeartbeat(JobServiceManagementInfo info) {
+ JobServiceManagementEntity jobService = findByIdAndToken(info);
+
+ if (jobService == null) {
+ return null;
+ }
+
+ jobService.setLastHeartBeat(now());
+ repository.persist(jobService);
+
+ return from(jobService);
+ }
+
+ JobServiceManagementInfo from(JobServiceManagementEntity jobService) {
+ if (Objects.isNull(jobService)) {
+ return null;
+ }
+ return new JobServiceManagementInfo(jobService.getId(), jobService.getToken(), jobService.getLastHeartBeat());
+ }
+}
diff --git a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/converter/JsonBinaryConverter.java b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/converter/JsonBinaryConverter.java
new file mode 100644
index 0000000000..b269ab9eaf
--- /dev/null
+++ b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/converter/JsonBinaryConverter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.repository.jpa.converter;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+import org.kie.kogito.jackson.utils.ObjectMapperFactory;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import jakarta.persistence.AttributeConverter;
+
+public class JsonBinaryConverter implements AttributeConverter {
+
+ @Override
+ public byte[] convertToDatabaseColumn(ObjectNode attribute) {
+ try {
+ return attribute == null ? null : ObjectMapperFactory.get().writeValueAsBytes(attribute);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ public ObjectNode convertToEntityAttribute(byte[] dbData) {
+ try {
+ return dbData == null ? null : ObjectMapperFactory.get().readValue(dbData, ObjectNode.class);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+}
diff --git a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobDetailsEntity.java b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobDetailsEntity.java
new file mode 100644
index 0000000000..9c1b317b47
--- /dev/null
+++ b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobDetailsEntity.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.repository.jpa.model;
+
+import java.time.OffsetDateTime;
+
+import org.kie.kogito.jobs.service.repository.jpa.converter.JsonBinaryConverter;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import jakarta.persistence.*;
+
+@Entity
+@Table(name = "job_details",
+ indexes = {
+ @Index(name = "job_details_fire_time_idx", columnList = "fire_time"),
+ @Index(name = "job_details_created_idx", columnList = "created")
+ })
+public class JobDetailsEntity {
+
+ @Id
+ private String id;
+
+ @Column(name = "correlation_id")
+ private String correlationId;
+
+ private String status;
+
+ @Column(name = "last_update")
+ @Temporal(TemporalType.TIMESTAMP)
+ private OffsetDateTime lastUpdate;
+
+ private Integer retries;
+
+ @Column(name = "execution_counter")
+ private Integer executionCounter;
+
+ @Column(name = "scheduled_id")
+ private String scheduledId;
+
+ private Integer priority;
+
+ @Convert(converter = JsonBinaryConverter.class)
+ private ObjectNode recipient;
+
+ @Convert(converter = JsonBinaryConverter.class)
+ private ObjectNode trigger;
+
+ @Column(name = "fire_time")
+ @Temporal(TemporalType.TIMESTAMP)
+ private OffsetDateTime fireTime;
+
+ @Column(name = "execution_timeout")
+ private Long executionTimeout;
+ @Column(name = "execution_timeout_unit")
+ private String executionTimeoutUnit;
+
+ @Temporal(TemporalType.TIMESTAMP)
+ private OffsetDateTime created;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getCorrelationId() {
+ return correlationId;
+ }
+
+ public void setCorrelationId(String correlationId) {
+ this.correlationId = correlationId;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public OffsetDateTime getLastUpdate() {
+ return lastUpdate;
+ }
+
+ public void setLastUpdate(OffsetDateTime lastUpdate) {
+ this.lastUpdate = lastUpdate;
+ }
+
+ public Integer getRetries() {
+ return retries;
+ }
+
+ public void setRetries(Integer retries) {
+ this.retries = retries;
+ }
+
+ public Integer getExecutionCounter() {
+ return executionCounter;
+ }
+
+ public void setExecutionCounter(Integer executionCounter) {
+ this.executionCounter = executionCounter;
+ }
+
+ public String getScheduledId() {
+ return scheduledId;
+ }
+
+ public void setScheduledId(String scheduledId) {
+ this.scheduledId = scheduledId;
+ }
+
+ public Integer getPriority() {
+ return priority;
+ }
+
+ public void setPriority(Integer priority) {
+ this.priority = priority;
+ }
+
+ public ObjectNode getRecipient() {
+ return recipient;
+ }
+
+ public void setRecipient(ObjectNode recipient) {
+ this.recipient = recipient;
+ }
+
+ public ObjectNode getTrigger() {
+ return trigger;
+ }
+
+ public void setTrigger(ObjectNode trigger) {
+ this.trigger = trigger;
+ }
+
+ public OffsetDateTime getFireTime() {
+ return fireTime;
+ }
+
+ public void setFireTime(OffsetDateTime fireTime) {
+ this.fireTime = fireTime;
+ }
+
+ public Long getExecutionTimeout() {
+ return executionTimeout;
+ }
+
+ public void setExecutionTimeout(Long executionTimeout) {
+ this.executionTimeout = executionTimeout;
+ }
+
+ public String getExecutionTimeoutUnit() {
+ return executionTimeoutUnit;
+ }
+
+ public void setExecutionTimeoutUnit(String executionTimeoutUnit) {
+ this.executionTimeoutUnit = executionTimeoutUnit;
+ }
+
+ public OffsetDateTime getCreated() {
+ return created;
+ }
+
+ public void setCreated(OffsetDateTime created) {
+ this.created = created;
+ }
+}
diff --git a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobServiceManagementEntity.java b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobServiceManagementEntity.java
new file mode 100644
index 0000000000..77ac0a67ac
--- /dev/null
+++ b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/model/JobServiceManagementEntity.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.jpa.model;
+
+import java.time.OffsetDateTime;
+
+import jakarta.persistence.*;
+
+@Entity
+@NamedQuery(name = "JobServiceManagementEntity.GetServiceByIdAndToken",
+ query = "select service " +
+ "from JobServiceManagementEntity service " +
+ "where service.id = :id and service.token = :token")
+@Table(name = "job_service_management")
+public class JobServiceManagementEntity {
+
+ @Id
+ private String id;
+
+ @Column(name = "last_heartbeat")
+ @Temporal(TemporalType.TIMESTAMP)
+ private OffsetDateTime lastHeartBeat;
+
+ private String token;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public OffsetDateTime getLastHeartBeat() {
+ return lastHeartBeat;
+ }
+
+ public void setLastHeartBeat(OffsetDateTime lastHeartBeat) {
+ this.lastHeartBeat = lastHeartBeat;
+ }
+
+ public String getToken() {
+ return token;
+ }
+
+ public void setToken(String token) {
+ this.token = token;
+ }
+}
diff --git a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobDetailsEntityRepository.java b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobDetailsEntityRepository.java
new file mode 100644
index 0000000000..dd1003f635
--- /dev/null
+++ b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobDetailsEntityRepository.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.jpa.repository;
+
+import org.kie.kogito.jobs.service.repository.jpa.model.JobDetailsEntity;
+
+import io.quarkus.hibernate.orm.panache.PanacheRepositoryBase;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+@ApplicationScoped
+public class JobDetailsEntityRepository implements PanacheRepositoryBase {
+}
diff --git a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobServiceManagementEntityRepository.java b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobServiceManagementEntityRepository.java
new file mode 100644
index 0000000000..1a528ccb27
--- /dev/null
+++ b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/repository/JobServiceManagementEntityRepository.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.jpa.repository;
+
+import org.kie.kogito.jobs.service.repository.jpa.model.JobServiceManagementEntity;
+
+import io.quarkus.hibernate.orm.panache.PanacheRepositoryBase;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+@ApplicationScoped
+public class JobServiceManagementEntityRepository implements PanacheRepositoryBase {
+}
diff --git a/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/utils/ReactiveRepositoryHelper.java b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/utils/ReactiveRepositoryHelper.java
new file mode 100644
index 0000000000..9dab31a5b2
--- /dev/null
+++ b/jobs-service/jobs-service-storage-jpa/src/main/java/org/kie/kogito/jobs/service/repository/jpa/utils/ReactiveRepositoryHelper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kie.kogito.jobs.service.repository.jpa.utils;
+
+import java.util.concurrent.CompletionStage;
+import java.util.function.Supplier;
+
+import io.quarkus.narayana.jta.QuarkusTransaction;
+import io.quarkus.narayana.jta.TransactionRunnerOptions;
+import io.vertx.core.Vertx;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+
+@ApplicationScoped
+public class ReactiveRepositoryHelper {
+
+ private Vertx vertx;
+
+ @Inject
+ public ReactiveRepositoryHelper(Vertx vertx) {
+ this.vertx = vertx;
+ }
+
+ public CompletionStage runAsync(Supplier blockingFunction) {
+ return vertx.executeBlocking(() -> wrapInTransaction(blockingFunction)).toCompletionStage();
+ }
+
+ private T wrapInTransaction(Supplier function) {
+ TransactionRunnerOptions runner = QuarkusTransaction.isActive() ? QuarkusTransaction.joiningExisting() : QuarkusTransaction.requiringNew();
+
+ return runner.call(function::get);
+ }
+}
diff --git a/jobs-service/jobs-service-storage-jpa/src/main/resources/META-INF/beans.xml b/jobs-service/jobs-service-storage-jpa/src/main/resources/META-INF/beans.xml
new file mode 100644
index 0000000000..a0eb9fbf8c
--- /dev/null
+++ b/jobs-service/jobs-service-storage-jpa/src/main/resources/META-INF/beans.xml
@@ -0,0 +1,20 @@
+
diff --git a/jobs-service/jobs-service-storage-jpa/src/main/resources/db/jobs-service/V2.0.0__Create_Tables.sql b/jobs-service/jobs-service-storage-jpa/src/main/resources/db/jobs-service/V2.0.0__Create_Tables.sql
new file mode 100644
index 0000000000..b48a32a95c
--- /dev/null
+++ b/jobs-service/jobs-service-storage-jpa/src/main/resources/db/jobs-service/V2.0.0__Create_Tables.sql
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+create table job_details
+(
+ id varchar(50) primary key,
+ correlation_id varchar(50),
+ status varchar(40),
+ last_update timestamp,
+ retries integer,
+ execution_counter integer,
+ scheduled_id varchar(40),
+ priority integer,
+ recipient varbinary(max),
+ trigger varbinary(max),
+ fire_time timestamp,
+ execution_timeout bigint,
+ execution_timeout_unit varchar(40),
+ created timestamp
+);
+
+create index job_details_fire_time_idx on job_details (fire_time);
+create index job_details_created_idx on job_details (created);
+
+CREATE TABLE job_service_management
+(
+ id varchar(40) primary key,
+ last_heartbeat timestamp,
+ token varchar(40) unique
+);
\ No newline at end of file
diff --git a/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepositoryTest.java b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepositoryTest.java
new file mode 100644
index 0000000000..d1fe3cafee
--- /dev/null
+++ b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobRepositoryTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.repository.jpa;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
+import org.kie.kogito.jobs.service.repository.impl.BaseJobRepositoryTest;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.h2.H2DatabaseTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+
+import jakarta.inject.Inject;
+
+@QuarkusTest
+@QuarkusTestResource(H2DatabaseTestResource.class)
+public class JPAReactiveJobRepositoryTest extends BaseJobRepositoryTest {
+
+ @Inject
+ JPAReactiveJobRepository tested;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+
+ super.setUp();
+ }
+
+ @Override
+ public ReactiveJobRepository tested() {
+ return tested;
+ }
+}
diff --git a/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepositoryTest.java b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepositoryTest.java
new file mode 100644
index 0000000000..a0a18cab45
--- /dev/null
+++ b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/repository/jpa/JPAReactiveJobServiceManagementRepositoryTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.repository.jpa;
+
+import java.time.OffsetDateTime;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.kie.kogito.jobs.service.model.JobServiceManagementInfo;
+import org.kie.kogito.jobs.service.repository.JobServiceManagementRepository;
+import org.kie.kogito.jobs.service.utils.DateUtil;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.h2.H2DatabaseTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+
+import jakarta.inject.Inject;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@QuarkusTest
+@QuarkusTestResource(H2DatabaseTestResource.class)
+class JPAReactiveJobServiceManagementRepositoryTest {
+
+ @Inject
+ JobServiceManagementRepository tested;
+
+ @BeforeEach
+ void setUp() {
+ }
+
+ @Test
+ void testGetAndUpdate() {
+ String id = "instance-id-1";
+ String token = "token1";
+ create(id, token);
+
+ AtomicReference date = new AtomicReference<>();
+ JobServiceManagementInfo updated = tested.getAndUpdate(id, info -> {
+ date.set(DateUtil.now().toOffsetDateTime());
+ info.setLastHeartbeat(date.get());
+ return info;
+ }).await().indefinitely();
+ assertThat(updated.getId()).isEqualTo(id);
+ assertThat(date.get()).isNotNull();
+ assertThat(updated.getLastHeartbeat()).isEqualTo(date.get());
+ assertThat(updated.getToken()).isEqualTo(token);
+ }
+
+ @Test
+ void testGetAndUpdateNotExisting() {
+ String id = "instance-id-2";
+ AtomicReference found = new AtomicReference<>(new JobServiceManagementInfo());
+ JobServiceManagementInfo updated = tested.getAndUpdate(id, info -> {
+ found.set(info);
+ return info;
+ }).await().indefinitely();
+ assertThat(updated).isNull();
+ assertThat(found.get()).isNull();
+ }
+
+ private JobServiceManagementInfo create(String id, String token) {
+ JobServiceManagementInfo created = tested.set(new JobServiceManagementInfo(id, token, null)).await().indefinitely();
+ assertThat(created.getId()).isEqualTo(id);
+ assertThat(created.getToken()).isEqualTo(token);
+ assertThat(created.getLastHeartbeat()).isNull();
+ return created;
+ }
+
+ @Test
+ void testHeartbeat() {
+ String id = "instance-id-3";
+ String token = "token3";
+ JobServiceManagementInfo created = create(id, token);
+
+ JobServiceManagementInfo updated = tested.heartbeat(created).await().indefinitely();
+ assertThat(updated.getLastHeartbeat()).isNotNull();
+ assertThat(updated.getLastHeartbeat()).isBefore(DateUtil.now().plusSeconds(1).toOffsetDateTime());
+ }
+
+ @Test
+ void testConflictHeartbeat() {
+ String id = "instance-id-4";
+ String token = "token4";
+ create(id, token);
+
+ JobServiceManagementInfo updated = tested.heartbeat(new JobServiceManagementInfo(id, "differentToken", null)).await().indefinitely();
+ assertThat(updated).isNull();
+ }
+}
diff --git a/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/resource/JPAJobResourceTest.java b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/resource/JPAJobResourceTest.java
new file mode 100644
index 0000000000..0fdc67d712
--- /dev/null
+++ b/jobs-service/jobs-service-storage-jpa/src/test/java/org/kie/kogito/jobs/service/resource/JPAJobResourceTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.jobs.service.resource;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.h2.H2DatabaseTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+
+@QuarkusTest
+@QuarkusTestResource(H2DatabaseTestResource.class)
+public class JPAJobResourceTest extends BaseJobResourceTest {
+
+}
diff --git a/jobs-service/jobs-service-storage-jpa/src/test/resources/application.properties b/jobs-service/jobs-service-storage-jpa/src/test/resources/application.properties
new file mode 100644
index 0000000000..ef10d96215
--- /dev/null
+++ b/jobs-service/jobs-service-storage-jpa/src/test/resources/application.properties
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Kogito
+kogito.apps.persistence.type=jdbc
+# Data source
+quarkus.datasource.db-kind=h2
+quarkus.datasource.username=kogito
+quarkus.datasource.jdbc.url=jdbc:h2:mem:default;NON_KEYWORDS=VALUE,KEY
+quarkus.flyway.migrate-at-start=true
+quarkus.flyway.clean-at-start=true
+quarkus.flyway.locations=db/jobs-service
+# Disabling Security for tests
+quarkus.oidc.enabled=false
+quarkus.oidc.tenant-enabled=false
+quarkus.oidc.auth-server-url=none
+quarkus.keycloak.devservices.enabled=false
\ No newline at end of file
diff --git a/jobs-service/pom.xml b/jobs-service/pom.xml
index df5f21f771..8eb5f0ff2a 100644
--- a/jobs-service/pom.xml
+++ b/jobs-service/pom.xml
@@ -39,6 +39,7 @@
jobs-recipients
jobs-service-common
jobs-service-postgresql-common
+ jobs-service-storage-jpa
jobs-service-postgresql
jobs-service-inmemory
kogito-addons-jobs-service
diff --git a/kogito-apps-bom/pom.xml b/kogito-apps-bom/pom.xml
index 5e903d4972..774cd46967 100644
--- a/kogito-apps-bom/pom.xml
+++ b/kogito-apps-bom/pom.xml
@@ -78,6 +78,17 @@
${project.version}
sources
+
+ org.kie.kogito
+ jobs-service-storage-jpa
+ ${project.version}
+
+
+ org.kie.kogito
+ jobs-service-storage-jpa
+ ${project.version}
+ sources
+
org.kie.kogito
jobs-service-postgresql