diff --git a/src/test/java/org/jboss/xavier/integrations/EndToEndTest.java b/src/test/java/org/jboss/xavier/integrations/EndToEndTest.java index 61e447b1..c05ee713 100644 --- a/src/test/java/org/jboss/xavier/integrations/EndToEndTest.java +++ b/src/test/java/org/jboss/xavier/integrations/EndToEndTest.java @@ -132,6 +132,7 @@ public class EndToEndTest { @ClassRule public static LocalStackContainer localstack = new LocalStackContainer() .withLogConsumer(new Slf4jLogConsumer(logger).withPrefix("AWS-LOG")) + .withEnv("DEBUG", "1") .withServices(S3); private static String ingressCommitHash = "3ea33a8d793c2154f7cfa12057ca005c5f6031fa"; // 2019-11-11 @@ -154,12 +155,19 @@ public class EndToEndTest { @Value("${test.timeout.performance:60000}") // 1 minute private int timeoutMilliseconds_PerformaceTest; - @Value("${test.timetout.ics:10000}") // 10 seconds + @Value("${test.timeout.ics:10000}") // 10 seconds private int timeoutMilliseconds_InitialCostSavingsReport; @Value("${minio.host}") // Set in the Initializer private String minio_host; + @Value("${insights.kafka.host}") + private String kafkaHost; + + @Value("${test.sleep.startandcheckduplicated:10000}") + private long sleepBetweenStartAndDuplicatedReportCheck_inMillis; + + public static class Initializer implements ApplicationContextInitializer { @Override @@ -333,7 +341,6 @@ public void end2endTest() throws Exception { // given camelContext.getGlobalOptions().put(Exchange.LOG_DEBUG_BODY_MAX_CHARS, "5000"); - camelContext.start(); camelContext.getRouteDefinition("store-in-s3").adviceWith(camelContext, new AdviceWithRouteBuilder() { @Override @@ -361,6 +368,7 @@ public void configure() { .to("http4:oldhost?preserveHostHeader=true"); } }); + camelContext.start(); // 1. Check user has firstTime ResponseEntity userEntity = new RestTemplate().exchange("http://localhost:" + serverPort + "/api/xavier/user", HttpMethod.GET, getRequestEntity(), new ParameterizedTypeReference() {}); @@ -535,7 +543,42 @@ public void configure() { assertThat(workloadInventoryReport_file_wrong_cpu_cores.getBody().getContent().stream().filter(e -> e.getCpuCores() != null).count()).isEqualTo(5); assertThat(workloadInventoryReport_file_wrong_cpu_cores.getBody().getContent().size()).isEqualTo(5); + // Testing the duplication of Rows depending on the type of the Kafka consumer + long duplicatedRows = camelStopKafkaReconnectnumberOfDuplicatedRowsInReports("&autoOffsetReset=latest", "latestOffsetConsumer"); + assertThat(duplicatedRows).isEqualTo(0); + + duplicatedRows = camelStopKafkaReconnectnumberOfDuplicatedRowsInReports("&autoOffsetReset=earliest", "earliestOffsetConsumer"); + assertThat(duplicatedRows).isGreaterThan(0); + + assertThat(camelStopKafkaReconnectnumberOfDuplicatedRowsInReports("&autoOffsetReset=latest", "secondLatestOffsetConsumer")).isEqualTo(duplicatedRows); + + camelContext.stop(); + } + + private long camelStopKafkaReconnectnumberOfDuplicatedRowsInReports(String autoOffsetReset, String consumer) throws Exception { camelContext.stop(); + + // To avoid issues with Localstack + camelContext.addComponent("aws-s3", camelContext.getComponent("stub")); + + // Test to check we will have duplicates if we set autoOffsetReset=earliest + camelContext.getRouteDefinition("kafka-upload-message").adviceWith(camelContext, new AdviceWithRouteBuilder() { + @Override + public void configure() throws Exception { + replaceFromWith("kafka:" + kafkaHost + "?topic={{insights.kafka.upload.topic}}&brokers=" + kafkaHost + autoOffsetReset + "&autoCommitEnable=true&clientId=" + consumer); + } + }); + + camelContext.start(); + Thread.sleep(sleepBetweenStartAndDuplicatedReportCheck_inMillis); + + // checking some Initial Savings Report is duplicated regarding the analysisId + return initialSavingsEstimationReportRepository.findAll() + .stream() + .collect(Collectors.groupingBy(e -> e.getAnalysis().getId())) + .values().stream() + .filter(m -> m.size() > 1) + .count(); } @NotNull