Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migeng 371 avoid messages duplication issues #70

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
cdf6246
MIGENG-365 - End2end integration test (#20)
Jan 9, 2020
dba6ad4
MIGENG-371 adding idempotentcomponent and removing kafka offset reset
Jan 17, 2020
e8ac231
MIGENG-371 adding idempotentcomponent and removing kafka offset reset
Jan 17, 2020
06d2f8b
MIGENG-371 upgraded testcontainers version
Jan 20, 2020
1f0dfc6
MIGENG-371 upgraded testcontainers version
Jan 20, 2020
745eca0
MIGENG-371 commented out to test on CI
Jan 21, 2020
b5b5e18
MIGENG-371 commented out to test on CI
Jan 21, 2020
0975cb5
MIGENG-371 added tests to check duplication of rows with the Kafka co…
Jan 21, 2020
f99745b
MIGENG-371 added tests to check duplication of rows with the Kafka co…
Jan 21, 2020
29c1139
MIGENG-371 removed the comment on the idempotentConsumer, for the mom…
Jan 21, 2020
30095a5
MIGENG-371 removed the comment on the idempotentConsumer, for the mom…
Jan 21, 2020
2130709
MIGENG-371 refactoring and giving more time as we have discrepancies …
Jan 22, 2020
e6a73d6
MIGENG-371 refactoring and giving more time as we have discrepancies …
Jan 22, 2020
e889ada
MIGENG-371 reintroduced autoOffsetReset=latest
Jan 22, 2020
98d9870
MIGENG-371 reintroduced autoOffsetReset=latest
Jan 22, 2020
b556339
MIGENG-371 parametrised sleep between duplication check and start
Jan 22, 2020
2adbdf5
MIGENG-371 parametrised sleep between duplication check and start
Jan 22, 2020
01c8eb4
MIGENG-371 fixed typo on value injection
Jan 23, 2020
fad6692
MIGENG-371 fixed typo on value injection
Jan 23, 2020
544d971
Merge branch 'master' into MIGENG-371-avoid-messages-duplication-issues
Feb 25, 2020
04b0c0d
Merging master
Feb 25, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions src/test/java/org/jboss/xavier/integrations/EndToEndTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public class EndToEndTest {
@ClassRule
public static LocalStackContainer localstack = new LocalStackContainer()
.withLogConsumer(new Slf4jLogConsumer(logger).withPrefix("AWS-LOG"))
.withEnv("DEBUG", "1")
.withServices(S3);

private static String ingressCommitHash = "3ea33a8d793c2154f7cfa12057ca005c5f6031fa"; // 2019-11-11
Expand All @@ -154,12 +155,19 @@ public class EndToEndTest {
@Value("${test.timeout.performance:60000}") // 1 minute
private int timeoutMilliseconds_PerformaceTest;

@Value("${test.timetout.ics:10000}") // 10 seconds
@Value("${test.timeout.ics:10000}") // 10 seconds
private int timeoutMilliseconds_InitialCostSavingsReport;

@Value("${minio.host}") // Set in the Initializer
private String minio_host;

@Value("${insights.kafka.host}")
private String kafkaHost;

@Value("${test.sleep.startandcheckduplicated:10000}")
private long sleepBetweenStartAndDuplicatedReportCheck_inMillis;


public static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {

@Override
Expand Down Expand Up @@ -333,7 +341,6 @@ public void end2endTest() throws Exception {

// given
camelContext.getGlobalOptions().put(Exchange.LOG_DEBUG_BODY_MAX_CHARS, "5000");
camelContext.start();

camelContext.getRouteDefinition("store-in-s3").adviceWith(camelContext, new AdviceWithRouteBuilder() {
@Override
Expand Down Expand Up @@ -361,6 +368,7 @@ public void configure() {
.to("http4:oldhost?preserveHostHeader=true");
}
});
camelContext.start();

// 1. Check user has firstTime
ResponseEntity<User> userEntity = new RestTemplate().exchange("http://localhost:" + serverPort + "/api/xavier/user", HttpMethod.GET, getRequestEntity(), new ParameterizedTypeReference<User>() {});
Expand Down Expand Up @@ -535,7 +543,42 @@ public void configure() {
assertThat(workloadInventoryReport_file_wrong_cpu_cores.getBody().getContent().stream().filter(e -> e.getCpuCores() != null).count()).isEqualTo(5);
assertThat(workloadInventoryReport_file_wrong_cpu_cores.getBody().getContent().size()).isEqualTo(5);

// Testing the duplication of Rows depending on the type of the Kafka consumer
long duplicatedRows = camelStopKafkaReconnectnumberOfDuplicatedRowsInReports("&autoOffsetReset=latest", "latestOffsetConsumer");
assertThat(duplicatedRows).isEqualTo(0);

duplicatedRows = camelStopKafkaReconnectnumberOfDuplicatedRowsInReports("&autoOffsetReset=earliest", "earliestOffsetConsumer");
assertThat(duplicatedRows).isGreaterThan(0);

assertThat(camelStopKafkaReconnectnumberOfDuplicatedRowsInReports("&autoOffsetReset=latest", "secondLatestOffsetConsumer")).isEqualTo(duplicatedRows);

camelContext.stop();
}

private long camelStopKafkaReconnectnumberOfDuplicatedRowsInReports(String autoOffsetReset, String consumer) throws Exception {
camelContext.stop();

// To avoid issues with Localstack
camelContext.addComponent("aws-s3", camelContext.getComponent("stub"));

// Test to check we will have duplicates if we set autoOffsetReset=earliest
camelContext.getRouteDefinition("kafka-upload-message").adviceWith(camelContext, new AdviceWithRouteBuilder() {
@Override
public void configure() throws Exception {
replaceFromWith("kafka:" + kafkaHost + "?topic={{insights.kafka.upload.topic}}&brokers=" + kafkaHost + autoOffsetReset + "&autoCommitEnable=true&clientId=" + consumer);
}
});

camelContext.start();
Thread.sleep(sleepBetweenStartAndDuplicatedReportCheck_inMillis);

// checking some Initial Savings Report is duplicated regarding the analysisId
return initialSavingsEstimationReportRepository.findAll()
.stream()
.collect(Collectors.groupingBy(e -> e.getAnalysis().getId()))
.values().stream()
.filter(m -> m.size() > 1)
.count();
}

@NotNull
Expand Down