Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MODSOURMAN-1246 Data import completion notifications #946

Merged
merged 15 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* [MODSOURMAN-1241](https://folio-org.atlassian.net/browse/MODSOURMAN-1241) Add missing module permissions for PUT /change-manager/parsedRecords/{id}
* [MODSOURMAN-1222](https://folio-org.atlassian.net/browse/MODSOURMAN-1222) Fix inconsistencies in permission namings
* [MODSOURMAN-1244](https://folio-org.atlassian.net/browse/MODSOURMAN-1244) Update MARC bib-instance default mapping to include additional subject types
* [MODSOURMAN-1246](https://folio-org.atlassian.net/browse/MODSOURMAN-1246) Data import completion notifications
* [MODSOURMAN-1240](https://folio-org.atlassian.net/browse/MODSOURMAN-1240) The title of record is not displayed on the JSON data after importing file for creating order

## 2023-03-22 v3.8.0
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ After setup, it is good to check logs in all related modules for errors. Data im
There are another important properties - `number of partitions` for topics `DI_COMPLETED`, `DI_ERROR`, `DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED`,
`DI_SRS_MARC_AUTHORITY_RECORD_CREATED`, `DI_SRS_MARC_HOLDING_RECORD_CREATED`, `DI_MARC_FOR_UPDATE_RECEIVED`,
`DI_MARC_FOR_DELETE_RECEIVED`,
`DI_INCOMING_MARC_BIB_RECORD_PARSED`, `DI_INCOMING_EDIFACT_RECORD_PARSED`
`DI_INCOMING_MARC_BIB_RECORD_PARSED`, `DI_INCOMING_EDIFACT_RECORD_PARSED`, `DI_JOB_COMPLETED`
and `DI_RAW_RECORDS_CHUNK_PARSED`
which are created during tenant initialization, the values of which can be customized with
`DI_COMPLETED_PARTITIONS`, `DI_ERROR_PARTITIONS`, `DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED_PARTITIONS`,
`DI_SRS_MARC_AUTHORITY_RECORD_CREATED_PARTITIONS`, `DI_SRS_MARC_HOLDINGS_RECORD_CREATED_PARTITIONS`,
`DI_MARC_FOR_UPDATE_RECEIVED_PARTITIONS`, `DI_MARC_FOR_DELETE_RECEIVED_PARTITIONS`,
`DI_INCOMING_MARC_BIB_RECORD_PARSED_PARTITIONS`, `DI_INCOMING_EDIFACT_RECORD_PARSED_PARTITIONS`
`DI_INCOMING_MARC_BIB_RECORD_PARSED_PARTITIONS`, `DI_INCOMING_EDIFACT_RECORD_PARSED_PARTITIONS`, `DI_JOB_COMPLETED_PARTITIONS`
and `DI_RAW_RECORDS_CHUNK_PARSED_PARTITIONS` env variables respectively.
Default value - `1`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public class SRMKafkaTopicService {
@Value("${di_edifact_parsed.partitions}")
private Integer diEdifactRecordParsedNumPartitions;

@Value("${di_job_completed.partitions}")
private Integer diJobCompletedNumPartitions;

public KafkaTopic[] createTopicObjects() {
return new SRMKafkaTopic[] {
new SRMKafkaTopic("DI_COMPLETED", diCompletedNumPartitions),
Expand All @@ -51,7 +54,8 @@ public KafkaTopic[] createTopicObjects() {
new SRMKafkaTopic("DI_MARC_FOR_DELETE_RECEIVED", diMarcForDeleteReceivedNumPartitions),
new SRMKafkaTopic("DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED", diMarcOrderParsedNumPartitions),
new SRMKafkaTopic("DI_INCOMING_MARC_BIB_RECORD_PARSED", diMarcBibRecordParsedNumPartitions),
new SRMKafkaTopic("DI_INCOMING_EDIFACT_RECORD_PARSED", diEdifactRecordParsedNumPartitions)
new SRMKafkaTopic("DI_INCOMING_EDIFACT_RECORD_PARSED", diEdifactRecordParsedNumPartitions),
new SRMKafkaTopic("DI_JOB_COMPLETED", diJobCompletedNumPartitions),
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.flowables.GroupedFlowable;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.json.Json;
import io.vertx.kafka.client.producer.impl.KafkaHeaderImpl;
import io.vertx.rxjava3.core.AbstractVerticle;
import io.vertx.rxjava3.core.RxHelper;
import io.vertx.rxjava3.core.eventbus.EventBus;
Expand All @@ -18,6 +20,8 @@
import org.apache.logging.log4j.Logger;
import org.folio.dao.JobExecutionProgressDao;
import org.folio.dataimport.util.OkapiConnectionParams;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaHeaderUtils;
import org.folio.rest.jaxrs.model.JobExecution;
import org.folio.rest.jaxrs.model.JobExecutionDto;
import org.folio.rest.jaxrs.model.JobExecutionDtoCollection;
Expand All @@ -28,6 +32,7 @@
import org.folio.services.Status;
import org.folio.services.progress.BatchableJobExecutionProgress;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

Expand All @@ -36,12 +41,15 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import static java.lang.String.format;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_JOB_COMPLETED;
import static org.folio.rest.jaxrs.model.JobExecution.Status.CANCELLED;
import static org.folio.rest.jaxrs.model.JobExecution.Status.COMMITTED;
import static org.folio.services.progress.JobExecutionProgressUtil.BATCH_JOB_PROGRESS_ADDRESS;
import static org.folio.services.util.EventHandlingUtil.sendEventToKafka;
import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_PROTOTYPE;


Expand All @@ -53,15 +61,24 @@
public class JobExecutionProgressVerticle extends AbstractVerticle {
private static final Logger LOGGER = LogManager.getLogger();
private static final int MAX_NUM_EVENTS = 100;
private static final int MAX_DISTRIBUTION = 100;
private static final String USER_ID_HEADER = "userId";
private static final String JOB_EXECUTION_ID_HEADER = "jobExecutionId";

private final JobExecutionProgressDao jobExecutionProgressDao;
private final JobExecutionService jobExecutionService;
private static final AtomicInteger indexer = new AtomicInteger();
private Scheduler scheduler;

public JobExecutionProgressVerticle(@Autowired JobExecutionProgressDao jobExecutionProgressDao,
@Autowired JobExecutionService jobExecutionService) {
private final KafkaConfig kafkaConfig;

@Autowired
public JobExecutionProgressVerticle(JobExecutionProgressDao jobExecutionProgressDao,
JobExecutionService jobExecutionService,
@Qualifier("newKafkaConfig") KafkaConfig kafkaConfig) {
this.jobExecutionProgressDao = jobExecutionProgressDao;
this.jobExecutionService = jobExecutionService;
this.kafkaConfig = kafkaConfig;
}


Expand Down Expand Up @@ -94,7 +111,7 @@ private void consumeJobExecutionProgress(MessageConsumer<BatchableJobExecutionPr
.flatMapCompletable(flowable ->
groupByTenantIdAndJobExecutionId(flowable)
.map(groupedMessages -> reduceManyJobExecutionProgressObjectsIntoSingleJobExecutionProgress(groupedMessages.toList(), groupedMessages.getKey().jobExecutionId()))
.flatMapCompletable(progressMaybe -> saveJobExecutionProgress(progressMaybe))
.flatMapCompletable(this::saveJobExecutionProgress)
)
.subscribeOn(scheduler)
.observeOn(scheduler)
Expand Down Expand Up @@ -239,6 +256,7 @@ private Future<Boolean> updateJobExecutionIfAllRecordsProcessed(String jobExecut
parentExecution.withStatus(JobExecution.Status.COMMITTED)
.withUiStatus(JobExecution.UiStatus.RUNNING_COMPLETE)
.withCompletedDate(new Date());
sendDiJobCompletedEvent(parentExecution, params);
return jobExecutionService.updateJobExecutionWithSnapshotStatus(parentExecution, params);
}
return Future.succeededFuture(parentExecution);
Expand All @@ -255,6 +273,16 @@ private Future<Boolean> updateJobExecutionIfAllRecordsProcessed(String jobExecut
return Future.succeededFuture(false);
}

private void sendDiJobCompletedEvent(JobExecution jobExecution, OkapiConnectionParams params) {
var kafkaHeaders = KafkaHeaderUtils.kafkaHeadersFromMultiMap(params.getHeaders());
kafkaHeaders.add(new KafkaHeaderImpl(JOB_EXECUTION_ID_HEADER, jobExecution.getId()));
kafkaHeaders.add(new KafkaHeaderImpl(USER_ID_HEADER, jobExecution.getUserId()));
var key = String.valueOf(indexer.incrementAndGet() % MAX_DISTRIBUTION);
sendEventToKafka(params.getTenantId(), Json.encode(jobExecution), DI_JOB_COMPLETED.value(), kafkaHeaders, kafkaConfig, key)
.onSuccess(event -> LOGGER.info("sendEventToBulkOps:: DI_JOB_COMPLETED event published, jobExecutionId={}", jobExecution.getId()))
.onFailure(event -> LOGGER.warn("sendEventToBulkOps:: Error publishing DI_JOB_COMPLETED event, jobExecutionId = {}", jobExecution.getId(), event));
}

private Future<JobExecution> updateJobStatusToError(String jobExecutionId, OkapiConnectionParams params) {
return jobExecutionService.updateJobExecutionStatus(jobExecutionId, new StatusDto()
.withStatus(StatusDto.Status.ERROR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ di_marc_for_delete_received.partitions = ${DI_MARC_FOR_DELETE_RECEIVED_PARTITION
di_marc_for_order_parsed.partitions = ${DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED_PARTITIONS:1}
di_marc_bib_record_parsed.partitions = ${DI_INCOMING_MARC_BIB_RECORD_PARSED_PARTITIONS:1}
di_edifact_parsed.partitions = ${DI_INCOMING_EDIFACT_RECORD_PARSED_PARTITIONS:1}
di_job_completed.partitions = ${DI_JOB_COMPLETED_PARTITIONS:1}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.folio.MappingProfile;
import org.folio.MatchProfile;
import org.folio.TestUtil;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaTopicNameHelper;
import org.folio.rest.RestVerticle;
import org.folio.rest.client.TenantClient;
Expand Down Expand Up @@ -144,10 +145,11 @@ public abstract class AbstractRestTest {
private static final String KAFKA_HOST = "KAFKA_HOST";
private static final String KAFKA_PORT = "KAFKA_PORT";
private static final String KAFKA_ENV = "ENV";
private static final String KAFKA_ENV_VALUE = "test-env";
protected static final String KAFKA_ENV_VALUE = "test-env";
public static final String OKAPI_URL_ENV = "OKAPI_URL";
private static final int PORT = NetworkUtils.nextFreePort();
protected static final String OKAPI_URL = "http://localhost:" + PORT;
protected static final String JOB_EXECUTION_ID_HEADER = "jobExecutionId";

private final JsonObject userResponse = new JsonObject()
.put("users",
Expand Down Expand Up @@ -342,6 +344,7 @@ public abstract class AbstractRestTest {
);

public static EmbeddedKafkaCluster kafkaCluster;
protected static KafkaConfig kafkaConfig;

@BeforeClass
public static void setUpClass(final TestContext context) throws Exception {
Expand All @@ -356,6 +359,11 @@ public static void setUpClass(final TestContext context) throws Exception {
System.setProperty(KAFKA_ENV, KAFKA_ENV_VALUE);
System.setProperty(OKAPI_URL_ENV, OKAPI_URL);
runDatabase();
kafkaConfig = KafkaConfig.builder()
.kafkaHost(hostAndPort[0])
.kafkaPort(hostAndPort[1])
.envId(KAFKA_ENV_VALUE)
.build();
deployVerticle(context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public void setUp() {
new SRMKafkaTopic("DI_MARC_FOR_DELETE_RECEIVED", 10),
new SRMKafkaTopic("DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED", 10),
new SRMKafkaTopic("DI_INCOMING_MARC_BIB_RECORD_PARSED", 10),
new SRMKafkaTopic("DI_INCOMING_EDIFACT_RECORD_PARSED", 10)
new SRMKafkaTopic("DI_INCOMING_EDIFACT_RECORD_PARSED", 10),
new SRMKafkaTopic("DI_JOB_COMPLETED", 10)
};

when(srmKafkaTopicService.createTopicObjects()).thenReturn(topicObjects);
Expand Down Expand Up @@ -157,6 +158,7 @@ private Future<Void> createKafkaTopicsAsync(KafkaAdminClient client) {
env + ".Default.foo-tenant.DI_MARC_FOR_DELETE_RECEIVED",
env + ".Default.foo-tenant.DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED",
env + ".Default.foo-tenant.DI_INCOMING_MARC_BIB_RECORD_PARSED",
env + ".Default.foo-tenant.DI_INCOMING_EDIFACT_RECORD_PARSED"
env + ".Default.foo-tenant.DI_INCOMING_EDIFACT_RECORD_PARSED",
env + ".Default.foo-tenant.DI_JOB_COMPLETED"
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void setUp() throws IOException {
MockitoAnnotations.openMocks(this);

registerCodecs(vertx);
vertx.deployVerticle(new JobExecutionProgressVerticle(jobExecutionProgressDao, jobExecutionService));
vertx.deployVerticle(new JobExecutionProgressVerticle(jobExecutionProgressDao, jobExecutionService, kafkaConfig));

MappingRuleCache mappingRuleCache = new MappingRuleCache(mappingRuleDao, vertx);
marcRecordAnalyzer = new MarcRecordAnalyzer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.json.Json;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.RunTestOnContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import net.mguenther.kafka.junit.KeyValue;
import net.mguenther.kafka.junit.ObserveKeyValues;
import net.mguenther.kafka.junit.SendKeyValues;
import org.folio.dao.JobExecutionProgressDao;
import org.folio.dataimport.util.OkapiConnectionParams;
import org.folio.rest.impl.AbstractRestTest;
import org.folio.rest.jaxrs.model.Event;
import org.folio.rest.jaxrs.model.JobExecution;
import org.folio.rest.jaxrs.model.JobExecutionDto;
import org.folio.rest.jaxrs.model.JobExecutionDtoCollection;
Expand All @@ -32,20 +38,19 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.singletonList;
import static org.folio.dataimport.util.RestUtil.OKAPI_URL_HEADER;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_JOB_COMPLETED;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
import static org.folio.services.progress.JobExecutionProgressUtil.getBatchJobProgressProducer;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;

@RunWith(VertxUnitRunner.class)
public class JobExecutionProgressVerticleTest {
public class JobExecutionProgressVerticleTest extends AbstractRestTest {

private final int AWAIT_TIME = 3;

Expand All @@ -64,13 +69,14 @@ public class JobExecutionProgressVerticleTest {
private String jobExecutionId;
private String tenantId;

@Override
@Before
public void setUp(TestContext context) {
MockitoAnnotations.openMocks(this);
vertx = rule.vertx();
vertx.eventBus().registerCodec(new BatchableJobExecutionProgressCodec());
JobExecutionProgressVerticle jobExecutionProgressVerticle =
new JobExecutionProgressVerticle(jobExecutionProgressDao, jobExecutionService);
new JobExecutionProgressVerticle(jobExecutionProgressDao, jobExecutionService, kafkaConfig);
vertx.deployVerticle(jobExecutionProgressVerticle,
context.asyncAssertSuccess());
batchJobProgressProducer = getBatchJobProgressProducer(vertx);
Expand All @@ -87,7 +93,7 @@ private OkapiConnectionParams createOkapiConnectionParams(String tenantId) {
}

@Test
public void testSingleProgressUpdate(TestContext context) {
public void testSingleProgressUpdate(TestContext context) throws InterruptedException {
Async async = context.async();

// Arrange
Expand Down Expand Up @@ -139,6 +145,10 @@ public void testSingleProgressUpdate(TestContext context) {
)
)
);
var topic = formatToKafkaTopicName(DI_JOB_COMPLETED.value());
var request = prepareWithSpecifiedEventPayload(Json.encode(parentJobExecution), topic);

kafkaCluster.send(request);

// Act
batchJobProgressProducer.write(batchableJobExecutionProgress)
Expand All @@ -150,6 +160,9 @@ public void testSingleProgressUpdate(TestContext context) {
.atMost(AWAIT_TIME, TimeUnit.SECONDS)
.untilAsserted(() -> verify(jobExecutionProgressDao)
.updateCompletionCounts(any(), eq(2), eq(1), eq(tenantId)));
kafkaCluster.observeValues(ObserveKeyValues.on(topic, 1)
.observeFor(30, TimeUnit.SECONDS)
.build());
} catch (Exception e) {
context.fail(e);
}
Expand All @@ -160,6 +173,17 @@ public void testSingleProgressUpdate(TestContext context) {
});
}

private SendKeyValues<String, String> prepareWithSpecifiedEventPayload(String eventPayload, String topic) {
Event event = new Event().withId(UUID.randomUUID().toString()).withEventPayload(eventPayload);
KeyValue<String, String> kafkaRecord = new KeyValue<>("key", Json.encode(event));
kafkaRecord.addHeader(OKAPI_TENANT_HEADER, TENANT_ID, UTF_8);
kafkaRecord.addHeader(OKAPI_URL_HEADER, snapshotMockServer.baseUrl(), UTF_8);
kafkaRecord.addHeader(JOB_EXECUTION_ID_HEADER, jobExecutionId, UTF_8);

return SendKeyValues.to(topic, singletonList(kafkaRecord))
.useDefaults();
}

@Test
public void testMultipleProgressUpdateShouldBatch(TestContext context) {
Async async = context.async();
Expand Down
2 changes: 1 addition & 1 deletion ramls/raml-storage
Loading