Skip to content

Commit

Permalink
add test case
Browse files Browse the repository at this point in the history
  • Loading branch information
JavokhirAbdullayev committed Nov 12, 2024
1 parent 4b8f34c commit 072df21
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package org.folio.verticle;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.flowables.GroupedFlowable;
import io.vertx.core.Future;
import io.vertx.core.Promise;
Expand All @@ -22,7 +18,12 @@
import org.folio.dataimport.util.OkapiConnectionParams;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaHeaderUtils;
import org.folio.rest.jaxrs.model.*;
import org.folio.rest.jaxrs.model.JobExecution;
import org.folio.rest.jaxrs.model.JobExecutionDto;
import org.folio.rest.jaxrs.model.JobExecutionDtoCollection;
import org.folio.rest.jaxrs.model.JobExecutionProgress;
import org.folio.rest.jaxrs.model.Progress;
import org.folio.rest.jaxrs.model.StatusDto;
import org.folio.services.JobExecutionService;
import org.folio.services.Status;
import org.folio.services.progress.BatchableJobExecutionProgress;
Expand All @@ -40,7 +41,7 @@
import java.util.function.Function;

import static java.lang.String.format;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_JOB_EXECUTION_COMPLETED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_JOB_COMPLETED;
import static org.folio.rest.jaxrs.model.JobExecution.Status.CANCELLED;
import static org.folio.rest.jaxrs.model.JobExecution.Status.COMMITTED;
import static org.folio.services.progress.JobExecutionProgressUtil.BATCH_JOB_PROGRESS_ADDRESS;
Expand All @@ -67,12 +68,14 @@ public class JobExecutionProgressVerticle extends AbstractVerticle {

@Autowired
@Qualifier("newKafkaConfig")
private KafkaConfig kafkaConfig;
private final KafkaConfig kafkaConfig;

public JobExecutionProgressVerticle(@Autowired JobExecutionProgressDao jobExecutionProgressDao,
@Autowired JobExecutionService jobExecutionService) {
@Autowired JobExecutionService jobExecutionService,
KafkaConfig kafkaConfig) {
this.jobExecutionProgressDao = jobExecutionProgressDao;
this.jobExecutionService = jobExecutionService;
this.kafkaConfig = kafkaConfig;
}


Expand Down Expand Up @@ -105,7 +108,7 @@ private void consumeJobExecutionProgress(MessageConsumer<BatchableJobExecutionPr
.flatMapCompletable(flowable ->
groupByTenantIdAndJobExecutionId(flowable)
.map(groupedMessages -> reduceManyJobExecutionProgressObjectsIntoSingleJobExecutionProgress(groupedMessages.toList(), groupedMessages.getKey().jobExecutionId()))
.flatMapCompletable(progressMaybe -> saveJobExecutionProgress(progressMaybe))
.flatMapCompletable(this::saveJobExecutionProgress)
)
.subscribeOn(scheduler)
.observeOn(scheduler)
Expand Down Expand Up @@ -272,9 +275,9 @@ private void sendEventToBulkOps(JobExecution jobExecution, OkapiConnectionParams
kafkaHeaders.add(new KafkaHeaderImpl(JOB_EXECUTION_ID_HEADER, jobExecution.getId()));
kafkaHeaders.add(new KafkaHeaderImpl(USER_ID_HEADER, jobExecution.getUserId()));
var key = String.valueOf(indexer.incrementAndGet() % MAX_DISTRIBUTION);
sendEventToKafka(params.getTenantId(), Json.encode(jobExecution), DI_JOB_EXECUTION_COMPLETED.value(), kafkaHeaders, kafkaConfig, key)
.onSuccess(event -> LOGGER.info("sendEventToBulkOps:: DI_JOB_EXECUTION_COMPLETED event published, jobExecutionId={}", jobExecution.getId()))
.onFailure(event -> LOGGER.warn("sendEventToBulkOps:: Error publishing DI_JOB_EXECUTION_COMPLETED event, jobExecutionId = {}", jobExecution.getId(), event));
sendEventToKafka(params.getTenantId(), Json.encode(jobExecution), DI_JOB_COMPLETED.value(), kafkaHeaders, kafkaConfig, key)
.onSuccess(event -> LOGGER.info("sendEventToBulkOps:: DI_JOB_COMPLETED event published, jobExecutionId={}", jobExecution.getId()))
.onFailure(event -> LOGGER.warn("sendEventToBulkOps:: Error publishing DI_JOB_COMPLETED event, jobExecutionId = {}", jobExecution.getId(), event));
}

private Future<JobExecution> updateJobStatusToError(String jobExecutionId, OkapiConnectionParams params) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.folio.MappingProfile;
import org.folio.MatchProfile;
import org.folio.TestUtil;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaTopicNameHelper;
import org.folio.rest.RestVerticle;
import org.folio.rest.client.TenantClient;
Expand Down Expand Up @@ -144,10 +145,11 @@ public abstract class AbstractRestTest {
private static final String KAFKA_HOST = "KAFKA_HOST";
private static final String KAFKA_PORT = "KAFKA_PORT";
private static final String KAFKA_ENV = "ENV";
private static final String KAFKA_ENV_VALUE = "test-env";
protected static final String KAFKA_ENV_VALUE = "test-env";
public static final String OKAPI_URL_ENV = "OKAPI_URL";
private static final int PORT = NetworkUtils.nextFreePort();
protected static final String OKAPI_URL = "http://localhost:" + PORT;
protected static final String JOB_EXECUTION_ID_HEADER = "jobExecutionId";

private final JsonObject userResponse = new JsonObject()
.put("users",
Expand Down Expand Up @@ -342,6 +344,7 @@ public abstract class AbstractRestTest {
);

public static EmbeddedKafkaCluster kafkaCluster;
protected static KafkaConfig kafkaConfig;

@BeforeClass
public static void setUpClass(final TestContext context) throws Exception {
Expand All @@ -356,6 +359,11 @@ public static void setUpClass(final TestContext context) throws Exception {
System.setProperty(KAFKA_ENV, KAFKA_ENV_VALUE);
System.setProperty(OKAPI_URL_ENV, OKAPI_URL);
runDatabase();
kafkaConfig = KafkaConfig.builder()
.kafkaHost(hostAndPort[0])
.kafkaPort(hostAndPort[1])
.envId(KAFKA_ENV_VALUE)
.build();
deployVerticle(context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void setUp() throws IOException {
MockitoAnnotations.openMocks(this);

registerCodecs(vertx);
vertx.deployVerticle(new JobExecutionProgressVerticle(jobExecutionProgressDao, jobExecutionService));
vertx.deployVerticle(new JobExecutionProgressVerticle(jobExecutionProgressDao, jobExecutionService, kafkaConfig));

MappingRuleCache mappingRuleCache = new MappingRuleCache(mappingRuleDao, vertx);
marcRecordAnalyzer = new MarcRecordAnalyzer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,18 @@
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.json.Json;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.RunTestOnContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import net.mguenther.kafka.junit.KeyValue;
import net.mguenther.kafka.junit.ObserveKeyValues;
import net.mguenther.kafka.junit.SendKeyValues;
import org.folio.dao.JobExecutionProgressDao;
import org.folio.dataimport.util.OkapiConnectionParams;
import org.folio.kafka.KafkaConfig;
import org.folio.rest.jaxrs.model.JobExecution;
import org.folio.rest.jaxrs.model.JobExecutionDto;
import org.folio.rest.jaxrs.model.JobExecutionDtoCollection;
import org.folio.rest.jaxrs.model.JobExecutionProgress;
import org.folio.rest.jaxrs.model.JobProfileInfo;
import org.folio.rest.jaxrs.model.StatusDto;
import org.folio.rest.impl.AbstractRestTest;
import org.folio.rest.jaxrs.model.*;
import org.folio.services.JobExecutionService;
import org.folio.services.progress.BatchableJobExecutionProgress;
import org.folio.services.progress.BatchableJobExecutionProgressCodec;
Expand All @@ -33,20 +32,19 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.singletonList;
import static org.folio.dataimport.util.RestUtil.OKAPI_URL_HEADER;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_JOB_COMPLETED;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
import static org.folio.services.progress.JobExecutionProgressUtil.getBatchJobProgressProducer;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;

@RunWith(VertxUnitRunner.class)
public class JobExecutionProgressVerticleTest {
public class JobExecutionProgressVerticleTest extends AbstractRestTest {

private final int AWAIT_TIME = 3;

Expand All @@ -64,15 +62,14 @@ public class JobExecutionProgressVerticleTest {
private MessageProducer<BatchableJobExecutionProgress> batchJobProgressProducer;
private String jobExecutionId;
private String tenantId;
private KafkaConfig kafkaConfig;

@Before
public void setUp(TestContext context) {
MockitoAnnotations.openMocks(this);
vertx = rule.vertx();
vertx.eventBus().registerCodec(new BatchableJobExecutionProgressCodec());
JobExecutionProgressVerticle jobExecutionProgressVerticle =
new JobExecutionProgressVerticle(jobExecutionProgressDao, jobExecutionService);
new JobExecutionProgressVerticle(jobExecutionProgressDao, jobExecutionService, kafkaConfig);
vertx.deployVerticle(jobExecutionProgressVerticle,
context.asyncAssertSuccess());
batchJobProgressProducer = getBatchJobProgressProducer(vertx);
Expand All @@ -89,7 +86,7 @@ private OkapiConnectionParams createOkapiConnectionParams(String tenantId) {
}

@Test
public void testSingleProgressUpdate(TestContext context) {
public void testSingleProgressUpdate(TestContext context) throws InterruptedException {
Async async = context.async();

// Arrange
Expand Down Expand Up @@ -141,6 +138,10 @@ public void testSingleProgressUpdate(TestContext context) {
)
)
);
var topic = formatToKafkaTopicName(DI_JOB_COMPLETED.value());
var request = prepareWithSpecifiedEventPayload(Json.encode(parentJobExecution), topic);

kafkaCluster.send(request);

// Act
batchJobProgressProducer.write(batchableJobExecutionProgress)
Expand All @@ -152,6 +153,9 @@ public void testSingleProgressUpdate(TestContext context) {
.atMost(AWAIT_TIME, TimeUnit.SECONDS)
.untilAsserted(() -> verify(jobExecutionProgressDao)
.updateCompletionCounts(any(), eq(2), eq(1), eq(tenantId)));
kafkaCluster.observeValues(ObserveKeyValues.on(topic, 1)
.observeFor(30, TimeUnit.SECONDS)
.build());
} catch (Exception e) {
context.fail(e);
}
Expand All @@ -162,6 +166,17 @@ public void testSingleProgressUpdate(TestContext context) {
});
}

private SendKeyValues<String, String> prepareWithSpecifiedEventPayload(String eventPayload, String topic) {
Event event = new Event().withId(UUID.randomUUID().toString()).withEventPayload(eventPayload);
KeyValue<String, String> kafkaRecord = new KeyValue<>("key", Json.encode(event));
kafkaRecord.addHeader(OKAPI_TENANT_HEADER, TENANT_ID, UTF_8);
kafkaRecord.addHeader(OKAPI_URL_HEADER, snapshotMockServer.baseUrl(), UTF_8);
kafkaRecord.addHeader(JOB_EXECUTION_ID_HEADER, jobExecutionId, UTF_8);

return SendKeyValues.to(topic, singletonList(kafkaRecord))
.useDefaults();
}

@Test
public void testMultipleProgressUpdateShouldBatch(TestContext context) {
Async async = context.async();
Expand Down
2 changes: 1 addition & 1 deletion ramls/raml-storage

0 comments on commit 072df21

Please sign in to comment.