Skip to content

Commit

Permalink
MODSOURMAN-1246 Data import completion notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
JavokhirAbdullayev committed Nov 12, 2024
1 parent 7f1bed6 commit 4b8f34c
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 7 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* [MODSOURMAN-1241](https://folio-org.atlassian.net/browse/MODSOURMAN-1241) Add missing module permissions for PUT /change-manager/parsedRecords/{id}
* [MODSOURMAN-1222](https://folio-org.atlassian.net/browse/MODSOURMAN-1222) Fix inconsistencies in permission namings
* [MODSOURMAN-1244](https://folio-org.atlassian.net/browse/MODSOURMAN-1244) Update MARC bib-instance default mapping to include additional subject types
* [MODSOURMAN-1246](https://folio-org.atlassian.net/browse/MODSOURMAN-1246) Data import completion notifications


## 2023-03-22 v3.8.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import io.reactivex.rxjava3.flowables.GroupedFlowable;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.json.Json;
import io.vertx.kafka.client.producer.impl.KafkaHeaderImpl;
import io.vertx.rxjava3.core.AbstractVerticle;
import io.vertx.rxjava3.core.RxHelper;
import io.vertx.rxjava3.core.eventbus.EventBus;
Expand All @@ -18,16 +20,14 @@
import org.apache.logging.log4j.Logger;
import org.folio.dao.JobExecutionProgressDao;
import org.folio.dataimport.util.OkapiConnectionParams;
import org.folio.rest.jaxrs.model.JobExecution;
import org.folio.rest.jaxrs.model.JobExecutionDto;
import org.folio.rest.jaxrs.model.JobExecutionDtoCollection;
import org.folio.rest.jaxrs.model.JobExecutionProgress;
import org.folio.rest.jaxrs.model.Progress;
import org.folio.rest.jaxrs.model.StatusDto;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaHeaderUtils;
import org.folio.rest.jaxrs.model.*;
import org.folio.services.JobExecutionService;
import org.folio.services.Status;
import org.folio.services.progress.BatchableJobExecutionProgress;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

Expand All @@ -36,12 +36,15 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import static java.lang.String.format;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_JOB_EXECUTION_COMPLETED;
import static org.folio.rest.jaxrs.model.JobExecution.Status.CANCELLED;
import static org.folio.rest.jaxrs.model.JobExecution.Status.COMMITTED;
import static org.folio.services.progress.JobExecutionProgressUtil.BATCH_JOB_PROGRESS_ADDRESS;
import static org.folio.services.util.EventHandlingUtil.sendEventToKafka;
import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_PROTOTYPE;


Expand All @@ -53,11 +56,19 @@
public class JobExecutionProgressVerticle extends AbstractVerticle {
private static final Logger LOGGER = LogManager.getLogger();
private static final int MAX_NUM_EVENTS = 100;
private static final int MAX_DISTRIBUTION = 100;
private static final String USER_ID_HEADER = "userId";
private static final String JOB_EXECUTION_ID_HEADER = "jobExecutionId";

private final JobExecutionProgressDao jobExecutionProgressDao;
private final JobExecutionService jobExecutionService;
private static final AtomicInteger indexer = new AtomicInteger();
private Scheduler scheduler;

@Autowired
@Qualifier("newKafkaConfig")
private KafkaConfig kafkaConfig;

public JobExecutionProgressVerticle(@Autowired JobExecutionProgressDao jobExecutionProgressDao,
@Autowired JobExecutionService jobExecutionService) {
this.jobExecutionProgressDao = jobExecutionProgressDao;
Expand Down Expand Up @@ -239,6 +250,7 @@ private Future<Boolean> updateJobExecutionIfAllRecordsProcessed(String jobExecut
parentExecution.withStatus(JobExecution.Status.COMMITTED)
.withUiStatus(JobExecution.UiStatus.RUNNING_COMPLETE)
.withCompletedDate(new Date());
sendEventToBulkOps(parentExecution, params);
return jobExecutionService.updateJobExecutionWithSnapshotStatus(parentExecution, params);
}
return Future.succeededFuture(parentExecution);
Expand All @@ -255,6 +267,16 @@ private Future<Boolean> updateJobExecutionIfAllRecordsProcessed(String jobExecut
return Future.succeededFuture(false);
}

private void sendEventToBulkOps(JobExecution jobExecution, OkapiConnectionParams params) {
var kafkaHeaders = KafkaHeaderUtils.kafkaHeadersFromMultiMap(params.getHeaders());
kafkaHeaders.add(new KafkaHeaderImpl(JOB_EXECUTION_ID_HEADER, jobExecution.getId()));
kafkaHeaders.add(new KafkaHeaderImpl(USER_ID_HEADER, jobExecution.getUserId()));
var key = String.valueOf(indexer.incrementAndGet() % MAX_DISTRIBUTION);
sendEventToKafka(params.getTenantId(), Json.encode(jobExecution), DI_JOB_EXECUTION_COMPLETED.value(), kafkaHeaders, kafkaConfig, key)
.onSuccess(event -> LOGGER.info("sendEventToBulkOps:: DI_JOB_EXECUTION_COMPLETED event published, jobExecutionId={}", jobExecution.getId()))
.onFailure(event -> LOGGER.warn("sendEventToBulkOps:: Error publishing DI_JOB_EXECUTION_COMPLETED event, jobExecutionId = {}", jobExecution.getId(), event));
}

private Future<JobExecution> updateJobStatusToError(String jobExecutionId, OkapiConnectionParams params) {
return jobExecutionService.updateJobExecutionStatus(jobExecutionId, new StatusDto()
.withStatus(StatusDto.Status.ERROR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.vertx.ext.unit.junit.VertxUnitRunner;
import org.folio.dao.JobExecutionProgressDao;
import org.folio.dataimport.util.OkapiConnectionParams;
import org.folio.kafka.KafkaConfig;
import org.folio.rest.jaxrs.model.JobExecution;
import org.folio.rest.jaxrs.model.JobExecutionDto;
import org.folio.rest.jaxrs.model.JobExecutionDtoCollection;
Expand Down Expand Up @@ -63,6 +64,7 @@ public class JobExecutionProgressVerticleTest {
private MessageProducer<BatchableJobExecutionProgress> batchJobProgressProducer;
private String jobExecutionId;
private String tenantId;
private KafkaConfig kafkaConfig;

@Before
public void setUp(TestContext context) {
Expand Down
2 changes: 1 addition & 1 deletion ramls/raml-storage

0 comments on commit 4b8f34c

Please sign in to comment.