From 19b35063ad7c9737551b0c96bf5cf9cb767191f2 Mon Sep 17 00:00:00 2001 From: Sushant Mane Date: Wed, 11 Dec 2024 09:39:06 -0800 Subject: [PATCH] [test] Fix TestChangelogConsumer and address related issues (#1384) - Fixed regression in `TestChangelogConsumer` caused by PR #1328, where it attempted to register incompatible schemas. - Removed the creation of a pub-sub broker in the test as it was incorrectly relying on the ZK state of child controllers' Kafka broker. - Miscellaneous: Ensured proper cleanup by closing the cluster setup during teardown in `TestDumpIngestionContext`. --- .../consumer/TestChangelogConsumer.java | 1085 ++++++++--------- .../endToEnd/TestDumpIngestionContext.java | 6 + .../utils/VeniceMultiClusterWrapper.java | 8 + 3 files changed, 545 insertions(+), 554 deletions(-) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestChangelogConsumer.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestChangelogConsumer.java index 2d68dc0940..d12b085e99 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestChangelogConsumer.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestChangelogConsumer.java @@ -19,7 +19,6 @@ import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V1_SCHEMA; import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V2_SCHEMA; import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V3_SCHEMA; -import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V4_SCHEMA; import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V5_SCHEMA; import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V6_SCHEMA; import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V7_SCHEMA; @@ -49,7 +48,6 @@ import com.linkedin.venice.controllerapi.MultiStoreTopicsResponse; import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; import com.linkedin.venice.endToEnd.TestChangelogValue; -import com.linkedin.venice.integration.utils.PubSubBrokerConfigs; import com.linkedin.venice.integration.utils.PubSubBrokerWrapper; import com.linkedin.venice.integration.utils.ServiceFactory; import com.linkedin.venice.integration.utils.VeniceClusterWrapper; @@ -66,7 +64,6 @@ import com.linkedin.venice.samza.VeniceSystemFactory; import com.linkedin.venice.samza.VeniceSystemProducer; import com.linkedin.venice.utils.MockCircularTime; -import com.linkedin.venice.utils.TestMockTime; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.TestWriteUtils; import com.linkedin.venice.utils.Time; @@ -78,6 +75,7 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -115,21 +113,17 @@ public class TestChangelogConsumer { private VeniceClusterWrapper clusterWrapper; private ControllerClient parentControllerClient; - List SCHEMA_HISTORY = new ArrayList() { - { - add(NAME_RECORD_V1_SCHEMA); - add(NAME_RECORD_V2_SCHEMA); - add(NAME_RECORD_V3_SCHEMA); - add(NAME_RECORD_V4_SCHEMA); - add(NAME_RECORD_V5_SCHEMA); - add(NAME_RECORD_V6_SCHEMA); - add(NAME_RECORD_V7_SCHEMA); - add(NAME_RECORD_V8_SCHEMA); - add(NAME_RECORD_V9_SCHEMA); - add(NAME_RECORD_V10_SCHEMA); - add(NAME_RECORD_V11_SCHEMA); - } - }; + private static final List SCHEMA_HISTORY = Arrays.asList( + NAME_RECORD_V1_SCHEMA, + NAME_RECORD_V2_SCHEMA, + NAME_RECORD_V3_SCHEMA, + NAME_RECORD_V5_SCHEMA, + NAME_RECORD_V6_SCHEMA, + NAME_RECORD_V7_SCHEMA, + NAME_RECORD_V8_SCHEMA, + NAME_RECORD_V9_SCHEMA, + NAME_RECORD_V10_SCHEMA, + NAME_RECORD_V11_SCHEMA); protected boolean isAAWCParallelProcessingEnabled() { return false; @@ -251,411 +245,404 @@ public void testAAIngestionWithStoreView() throws Exception { // Use a unique key for DELETE with RMD validation int deleteWithRmdKeyIndex = 1000; - TestMockTime testMockTime = new TestMockTime(); ZkServerWrapper localZkServer = multiRegionMultiClusterWrapper.getChildRegions().get(0).getZkServerWrapper(); - try (PubSubBrokerWrapper localKafka = ServiceFactory.getPubSubBroker( - new PubSubBrokerConfigs.Builder().setZkWrapper(localZkServer) - .setMockTime(testMockTime) - .setRegionName("local-pubsub") - .build())) { - Properties consumerProperties = new Properties(); - String localKafkaUrl = localKafka.getAddress(); - consumerProperties.put(KAFKA_BOOTSTRAP_SERVERS, localKafkaUrl); - ChangelogClientConfig globalChangelogClientConfig = new ChangelogClientConfig().setViewName("changeCaptureView") - .setConsumerProperties(consumerProperties) - .setControllerD2ServiceName(D2_SERVICE_NAME) - .setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) - .setLocalD2ZkHosts(localZkServer.getAddress()) - .setControllerRequestRetryCount(3); - VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory = - new VeniceChangelogConsumerClientFactory(globalChangelogClientConfig, metricsRepository); - - ChangelogClientConfig globalAfterImageClientConfig = - ChangelogClientConfig.cloneConfig(globalChangelogClientConfig).setViewName(""); - VeniceChangelogConsumerClientFactory veniceAfterImageConsumerClientFactory = - new VeniceChangelogConsumerClientFactory(globalAfterImageClientConfig, metricsRepository); - - VeniceChangelogConsumer versionTopicConsumer = - veniceAfterImageConsumerClientFactory.getChangelogConsumer(storeName); - Assert.assertTrue(versionTopicConsumer instanceof VeniceAfterImageConsumerImpl); - versionTopicConsumer.subscribeAll().get(); - - // Let's consume those 100 records off of version 1 - Map versionTopicEvents = new HashMap<>(); - pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); - Assert.assertEquals(versionTopicEvents.size(), 100); - - VeniceChangelogConsumer veniceChangelogConsumer = - veniceChangelogConsumerClientFactory.getChangelogConsumer(storeName); - veniceChangelogConsumer.subscribeAll().get(); - try (VeniceSystemProducer veniceProducer = - factory.getClosableProducer("venice", new MapConfig(samzaConfig), null)) { - veniceProducer.start(); - // Run Samza job to send PUT and DELETE requests. - runSamzaStreamJob(veniceProducer, storeName, null, 10, 10, 100); - // Produce a DELETE record with large timestamp - sendStreamingRecordWithLogicalTimestamp(veniceProducer, storeName, deleteWithRmdKeyIndex, 1000, true); - } - - try (AvroGenericStoreClient client = ClientFactory.getAndStartGenericAvroClient( - ClientConfig.defaultGenericClientConfig(storeName) - .setVeniceURL(clusterWrapper.getRandomRouterURL()) - .setMetricsRepository(metricsRepository))) { - TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { - Assert.assertNull(client.get(Integer.toString(deleteWithRmdKeyIndex)).get()); - }); - } + PubSubBrokerWrapper localKafka = multiRegionMultiClusterWrapper.getChildRegions().get(0).getPubSubBrokerWrapper(); + Properties consumerProperties = new Properties(); + String localKafkaUrl = localKafka.getAddress(); + consumerProperties.put(KAFKA_BOOTSTRAP_SERVERS, localKafkaUrl); + ChangelogClientConfig globalChangelogClientConfig = new ChangelogClientConfig().setViewName("changeCaptureView") + .setConsumerProperties(consumerProperties) + .setControllerD2ServiceName(D2_SERVICE_NAME) + .setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) + .setLocalD2ZkHosts(localZkServer.getAddress()) + .setControllerRequestRetryCount(3); + VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory = + new VeniceChangelogConsumerClientFactory(globalChangelogClientConfig, metricsRepository); + + ChangelogClientConfig globalAfterImageClientConfig = + ChangelogClientConfig.cloneConfig(globalChangelogClientConfig).setViewName(""); + VeniceChangelogConsumerClientFactory veniceAfterImageConsumerClientFactory = + new VeniceChangelogConsumerClientFactory(globalAfterImageClientConfig, metricsRepository); + + VeniceChangelogConsumer versionTopicConsumer = + veniceAfterImageConsumerClientFactory.getChangelogConsumer(storeName); + Assert.assertTrue(versionTopicConsumer instanceof VeniceAfterImageConsumerImpl); + versionTopicConsumer.subscribeAll().get(); + + // Let's consume those 100 records off of version 1 + Map versionTopicEvents = new HashMap<>(); + pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); + Assert.assertEquals(versionTopicEvents.size(), 100); + + VeniceChangelogConsumer veniceChangelogConsumer = + veniceChangelogConsumerClientFactory.getChangelogConsumer(storeName); + veniceChangelogConsumer.subscribeAll().get(); + try ( + VeniceSystemProducer veniceProducer = factory.getClosableProducer("venice", new MapConfig(samzaConfig), null)) { + veniceProducer.start(); + // Run Samza job to send PUT and DELETE requests. + runSamzaStreamJob(veniceProducer, storeName, null, 10, 10, 100); + // Produce a DELETE record with large timestamp + sendStreamingRecordWithLogicalTimestamp(veniceProducer, storeName, deleteWithRmdKeyIndex, 1000, true); + } - // Validate change events for version 1. 100 records exist in version 1. - Map, VeniceChangeCoordinate>> polledChangeEvents = new HashMap<>(); - Map, VeniceChangeCoordinate>> allChangeEvents = new HashMap<>(); + try (AvroGenericStoreClient client = ClientFactory.getAndStartGenericAvroClient( + ClientConfig.defaultGenericClientConfig(storeName) + .setVeniceURL(clusterWrapper.getRandomRouterURL()) + .setMetricsRepository(metricsRepository))) { TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - Assert.assertEquals(polledChangeEvents.size(), 100); + Assert.assertNull(client.get(Integer.toString(deleteWithRmdKeyIndex)).get()); }); + } - allChangeEvents.putAll(polledChangeEvents); - polledChangeEvents.clear(); + // Validate change events for version 1. 100 records exist in version 1. + Map, VeniceChangeCoordinate>> polledChangeEvents = new HashMap<>(); + Map, VeniceChangeCoordinate>> allChangeEvents = new HashMap<>(); + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); + Assert.assertEquals(polledChangeEvents.size(), 100); + }); + + allChangeEvents.putAll(polledChangeEvents); + polledChangeEvents.clear(); + + // 21 changes in nearline. 10 puts, 10 deletes, and 1 record with a producer timestamp + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); + // 21 events for nearline events + Assert.assertEquals(polledChangeEvents.size(), 21); + for (int i = 100; i < 110; i++) { + String key = Integer.toString(i); + ChangeEvent changeEvent = polledChangeEvents.get(key).getValue(); + Assert.assertNotNull(changeEvent); + if (i != 100) { + Assert.assertNull(changeEvent.getPreviousValue()); + } else { + Assert.assertTrue(changeEvent.getPreviousValue().toString().contains(key)); + } + Assert.assertEquals(changeEvent.getCurrentValue().toString(), "stream_" + i); + } + for (int i = 110; i < 120; i++) { + String key = Integer.toString(i); + ChangeEvent changeEvent = polledChangeEvents.get(key).getValue(); + Assert.assertNotNull(changeEvent); + Assert.assertNull(changeEvent.getPreviousValue()); // schema id is negative, so we did not parse. + Assert.assertNull(changeEvent.getCurrentValue()); + } + }); - // 21 changes in nearline. 10 puts, 10 deletes, and 1 record with a producer timestamp + versionTopicEvents.clear(); + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { + pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); + Assert.assertEquals(versionTopicEvents.size(), 21); + }); + + /** + * Now we have store version v2. + */ + + // run repush. Repush will reapply all existing events to the new store and trim all events from the RT + props.setProperty(SOURCE_KAFKA, "true"); + props.setProperty(KAFKA_INPUT_BROKER_URL, clusterWrapper.getPubSubBrokerWrapper().getAddress()); + props.setProperty(KAFKA_INPUT_MAX_RECORDS_PER_MAPPER, "5"); + // intentionally stop re-consuming from RT so stale records don't affect the testing results + props.put(REWIND_TIME_IN_SECONDS_OVERRIDE, 0); + TestWriteUtils.runPushJob("Run repush job", props); + ControllerClient controllerClient = + new ControllerClient(clusterName, childDatacenters.get(0).getControllerConnectString()); + TestUtils.waitForNonDeterministicAssertion( + 5, + TimeUnit.SECONDS, + () -> Assert.assertEquals(controllerClient.getStore(storeName).getStore().getCurrentVersion(), 2)); + clusterWrapper.refreshAllRouterMetaData(); + // Validate repush from version 2 + try (AvroGenericStoreClient client = ClientFactory.getAndStartGenericAvroClient( + ClientConfig.defaultGenericClientConfig(storeName) + .setVeniceURL(clusterWrapper.getRandomRouterURL()) + .setMetricsRepository(metricsRepository))) { TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - // 21 events for nearline events - Assert.assertEquals(polledChangeEvents.size(), 21); + // test single get for (int i = 100; i < 110; i++) { String key = Integer.toString(i); - ChangeEvent changeEvent = polledChangeEvents.get(key).getValue(); - Assert.assertNotNull(changeEvent); - if (i != 100) { - Assert.assertNull(changeEvent.getPreviousValue()); - } else { - Assert.assertTrue(changeEvent.getPreviousValue().toString().contains(key)); - } - Assert.assertEquals(changeEvent.getCurrentValue().toString(), "stream_" + i); + Utf8 value = client.get(key).get(); + Assert.assertNotNull(value); + Assert.assertEquals(value.toString(), "stream_" + i); } + // test deletes for (int i = 110; i < 120; i++) { String key = Integer.toString(i); - ChangeEvent changeEvent = polledChangeEvents.get(key).getValue(); - Assert.assertNotNull(changeEvent); - Assert.assertNull(changeEvent.getPreviousValue()); // schema id is negative, so we did not parse. - Assert.assertNull(changeEvent.getCurrentValue()); + Utf8 value = client.get(key).get(); + Assert.assertNull(value); + } + // test old data + for (int i = 20; i < 100; i++) { + String key = Integer.toString(i); + Utf8 value = client.get(key).get(); + Assert.assertNotNull(value); + Assert.assertTrue(value.toString().contains(String.valueOf(i).substring(0, 0))); } }); - - versionTopicEvents.clear(); - TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { - pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); - Assert.assertEquals(versionTopicEvents.size(), 21); + } + try ( + VeniceSystemProducer veniceProducer = factory.getClosableProducer("venice", new MapConfig(samzaConfig), null)) { + veniceProducer.start(); + // Produce a new PUT with smaller logical timestamp, it is expected to be ignored as there was a DELETE with + // larger timestamp + sendStreamingRecordWithLogicalTimestamp(veniceProducer, storeName, deleteWithRmdKeyIndex, 2, false); + // Produce another record to the same partition to make sure the above PUT is processed during validation stage. + sendStreamingRecordWithLogicalTimestamp(veniceProducer, storeName, deleteWithRmdKeyIndex + 1, 1, false); + } + try (AvroGenericStoreClient client = ClientFactory.getAndStartGenericAvroClient( + ClientConfig.defaultGenericClientConfig(storeName) + .setVeniceURL(clusterWrapper.getRandomRouterURL()) + .setMetricsRepository(metricsRepository))) { + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + Assert.assertNotNull(client.get(Integer.toString(deleteWithRmdKeyIndex + 1)).get()); }); - - /** - * Now we have store version v2. - */ - - // run repush. Repush will reapply all existing events to the new store and trim all events from the RT - props.setProperty(SOURCE_KAFKA, "true"); - props.setProperty(KAFKA_INPUT_BROKER_URL, clusterWrapper.getPubSubBrokerWrapper().getAddress()); - props.setProperty(KAFKA_INPUT_MAX_RECORDS_PER_MAPPER, "5"); - // intentionally stop re-consuming from RT so stale records don't affect the testing results - props.put(REWIND_TIME_IN_SECONDS_OVERRIDE, 0); - TestWriteUtils.runPushJob("Run repush job", props); - ControllerClient controllerClient = - new ControllerClient(clusterName, childDatacenters.get(0).getControllerConnectString()); - TestUtils.waitForNonDeterministicAssertion( - 5, - TimeUnit.SECONDS, - () -> Assert.assertEquals(controllerClient.getStore(storeName).getStore().getCurrentVersion(), 2)); - clusterWrapper.refreshAllRouterMetaData(); - // Validate repush from version 2 - try (AvroGenericStoreClient client = ClientFactory.getAndStartGenericAvroClient( - ClientConfig.defaultGenericClientConfig(storeName) - .setVeniceURL(clusterWrapper.getRandomRouterURL()) - .setMetricsRepository(metricsRepository))) { - TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { - // test single get - for (int i = 100; i < 110; i++) { - String key = Integer.toString(i); - Utf8 value = client.get(key).get(); - Assert.assertNotNull(value); - Assert.assertEquals(value.toString(), "stream_" + i); - } - // test deletes - for (int i = 110; i < 120; i++) { - String key = Integer.toString(i); - Utf8 value = client.get(key).get(); - Assert.assertNull(value); - } - // test old data - for (int i = 20; i < 100; i++) { - String key = Integer.toString(i); - Utf8 value = client.get(key).get(); - Assert.assertNotNull(value); - Assert.assertTrue(value.toString().contains(String.valueOf(i).substring(0, 0))); - } - }); - } - try (VeniceSystemProducer veniceProducer = - factory.getClosableProducer("venice", new MapConfig(samzaConfig), null)) { - veniceProducer.start(); - // Produce a new PUT with smaller logical timestamp, it is expected to be ignored as there was a DELETE with - // larger timestamp - sendStreamingRecordWithLogicalTimestamp(veniceProducer, storeName, deleteWithRmdKeyIndex, 2, false); - // Produce another record to the same partition to make sure the above PUT is processed during validation stage. - sendStreamingRecordWithLogicalTimestamp(veniceProducer, storeName, deleteWithRmdKeyIndex + 1, 1, false); - } - try (AvroGenericStoreClient client = ClientFactory.getAndStartGenericAvroClient( - ClientConfig.defaultGenericClientConfig(storeName) - .setVeniceURL(clusterWrapper.getRandomRouterURL()) - .setMetricsRepository(metricsRepository))) { - TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { - Assert.assertNotNull(client.get(Integer.toString(deleteWithRmdKeyIndex + 1)).get()); - }); - TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { - Assert.assertNull(client.get(Integer.toString(deleteWithRmdKeyIndex)).get()); - }); - } - - // Validate changed events for version 2. - allChangeEvents.putAll(polledChangeEvents); - polledChangeEvents.clear(); - // As records keys from VPJ start from 1, real-time produced records' key starts from 0, the message with key as 0 - // is new message. - TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { - // poll enough to get through the empty push and the topic jump to RT. - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - String deleteWithRmdKey = Integer.toString(deleteWithRmdKeyIndex); - String persistWithRmdKey = Integer.toString(deleteWithRmdKeyIndex + 1); - Assert.assertNull(polledChangeEvents.get(deleteWithRmdKey)); - Assert.assertNotNull(polledChangeEvents.get(persistWithRmdKey)); - Assert.assertEquals( - polledChangeEvents.get(persistWithRmdKey).getValue().getCurrentValue().toString(), - "stream_" + persistWithRmdKey); + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + Assert.assertNull(client.get(Integer.toString(deleteWithRmdKeyIndex)).get()); }); + } - /** - * Now we have store version v3. - */ - - // run empty push to clean up batch data - parentControllerClient.sendEmptyPushAndWait(storeName, "Run empty push job", 1000, 30 * Time.MS_PER_SECOND); - // set up mocked time for Samza records so some records can be stale intentionally. - List mockTimestampInMs = new LinkedList<>(); - Instant now = Instant.now(); - // always-valid record - mockTimestampInMs.add(now.toEpochMilli()); - // always-stale records since ttl time is 360 sec - Instant past = now.minus(1, ChronoUnit.HOURS); - mockTimestampInMs.add(past.toEpochMilli()); - Time mockTime = new MockCircularTime(mockTimestampInMs); - try (VeniceSystemProducer veniceProducer = - factory.getClosableProducer("venice", new MapConfig(samzaConfig), null)) { - veniceProducer.start(); - // run samza to stream put and delete - runSamzaStreamJob(veniceProducer, storeName, mockTime, 10, 10, 20); - } - // Validate changed events for version 3. + // Validate changed events for version 2. + allChangeEvents.putAll(polledChangeEvents); + polledChangeEvents.clear(); + // As records keys from VPJ start from 1, real-time produced records' key starts from 0, the message with key as 0 + // is new message. + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { + // poll enough to get through the empty push and the topic jump to RT. + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); + String deleteWithRmdKey = Integer.toString(deleteWithRmdKeyIndex); + String persistWithRmdKey = Integer.toString(deleteWithRmdKeyIndex + 1); + Assert.assertNull(polledChangeEvents.get(deleteWithRmdKey)); + Assert.assertNotNull(polledChangeEvents.get(persistWithRmdKey)); + Assert.assertEquals( + polledChangeEvents.get(persistWithRmdKey).getValue().getCurrentValue().toString(), + "stream_" + persistWithRmdKey); + }); - TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> { - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - // Filter previous 21 messages. - Assert.assertEquals(polledChangeEvents.size(), 1); - }); + /** + * Now we have store version v3. + */ + + // run empty push to clean up batch data + parentControllerClient.sendEmptyPushAndWait(storeName, "Run empty push job", 1000, 30 * Time.MS_PER_SECOND); + // set up mocked time for Samza records so some records can be stale intentionally. + List mockTimestampInMs = new LinkedList<>(); + Instant now = Instant.now(); + // always-valid record + mockTimestampInMs.add(now.toEpochMilli()); + // always-stale records since ttl time is 360 sec + Instant past = now.minus(1, ChronoUnit.HOURS); + mockTimestampInMs.add(past.toEpochMilli()); + Time mockTime = new MockCircularTime(mockTimestampInMs); + try ( + VeniceSystemProducer veniceProducer = factory.getClosableProducer("venice", new MapConfig(samzaConfig), null)) { + veniceProducer.start(); + // run samza to stream put and delete + runSamzaStreamJob(veniceProducer, storeName, mockTime, 10, 10, 20); + } + // Validate changed events for version 3. - // Drain the remaining events on version 3 and verify that we got everything. We don't verify the count - // because at this stage, the total events which will get polled will be determined by how far back the rewind - // managed to get (and test run duration might be variable) - TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - for (int i = 20; i < 40; i++) { - String key = Integer.toString(i); - ChangeEvent changeEvent = polledChangeEvents.get(key).getValue(); - Assert.assertNotNull(changeEvent); - Assert.assertNull(changeEvent.getPreviousValue()); - if (i < 30) { - Assert.assertEquals(changeEvent.getCurrentValue().toString(), "stream_" + i); - } else { - Assert.assertNull(changeEvent.getCurrentValue()); - } + TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> { + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); + // Filter previous 21 messages. + Assert.assertEquals(polledChangeEvents.size(), 1); + }); + + // Drain the remaining events on version 3 and verify that we got everything. We don't verify the count + // because at this stage, the total events which will get polled will be determined by how far back the rewind + // managed to get (and test run duration might be variable) + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); + for (int i = 20; i < 40; i++) { + String key = Integer.toString(i); + ChangeEvent changeEvent = polledChangeEvents.get(key).getValue(); + Assert.assertNotNull(changeEvent); + Assert.assertNull(changeEvent.getPreviousValue()); + if (i < 30) { + Assert.assertEquals(changeEvent.getCurrentValue().toString(), "stream_" + i); + } else { + Assert.assertNull(changeEvent.getCurrentValue()); } - }); + } + }); - allChangeEvents.putAll(polledChangeEvents); - polledChangeEvents.clear(); + allChangeEvents.putAll(polledChangeEvents); + polledChangeEvents.clear(); - // This should get everything submitted to the CC topic on this version since the timestamp is before anything got - // transmitted - veniceChangelogConsumer.seekToTimestamp(timestamp); + // This should get everything submitted to the CC topic on this version since the timestamp is before anything got + // transmitted + veniceChangelogConsumer.seekToTimestamp(timestamp); - // test pause and resume - veniceChangelogConsumer.pause(); - TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - Assert.assertEquals(polledChangeEvents.size(), 0); - }); - veniceChangelogConsumer.resume(); + // test pause and resume + veniceChangelogConsumer.pause(); + TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); + Assert.assertEquals(polledChangeEvents.size(), 0); + }); + veniceChangelogConsumer.resume(); - // This should get everything submitted to the CC topic on this version since the timestamp is before anything got - // transmitted - TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - Assert.assertEquals(polledChangeEvents.size(), 42); - }); - allChangeEvents.putAll(polledChangeEvents); - polledChangeEvents.clear(); - - /** - * Now we have store version v4. - */ - // enable repush ttl - props.setProperty(REPUSH_TTL_ENABLE, "true"); - TestWriteUtils.runPushJob("Run repush job with TTL", props); - TestUtils.waitForNonDeterministicAssertion( - 5, - TimeUnit.SECONDS, - () -> Assert.assertEquals(controllerClient.getStore(storeName).getStore().getCurrentVersion(), 4)); - // Validate repush from version 4 - clusterWrapper.refreshAllRouterMetaData(); - try (AvroGenericStoreClient client = ClientFactory.getAndStartGenericAvroClient( - ClientConfig.defaultGenericClientConfig(storeName) - .setVeniceURL(clusterWrapper.getRandomRouterURL()) - .setMetricsRepository(metricsRepository))) { - // test single get - int validGet = 0, filteredGet = 0; - for (int i = 20; i < 30; i++) { - Object result = client.get(Integer.toString(i)).get(); - if (result == null) { - filteredGet++; - } else { - validGet++; - } - } - // Half records are valid, another half is not - Assert.assertEquals(validGet, 5); - Assert.assertEquals(filteredGet, 5); - // test deletes - for (int i = 30; i < 40; i++) { - // not matter the DELETE is TTLed or not, the value should always be null - Assert.assertNull(client.get(Integer.toString(i)).get()); - } - // test old data - should be empty due to empty push - for (int i = 40; i < 100; i++) { - Assert.assertNull(client.get(Integer.toString(i)).get()); + // This should get everything submitted to the CC topic on this version since the timestamp is before anything got + // transmitted + TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); + Assert.assertEquals(polledChangeEvents.size(), 42); + }); + allChangeEvents.putAll(polledChangeEvents); + polledChangeEvents.clear(); + + /** + * Now we have store version v4. + */ + // enable repush ttl + props.setProperty(REPUSH_TTL_ENABLE, "true"); + TestWriteUtils.runPushJob("Run repush job with TTL", props); + TestUtils.waitForNonDeterministicAssertion( + 5, + TimeUnit.SECONDS, + () -> Assert.assertEquals(controllerClient.getStore(storeName).getStore().getCurrentVersion(), 4)); + // Validate repush from version 4 + clusterWrapper.refreshAllRouterMetaData(); + try (AvroGenericStoreClient client = ClientFactory.getAndStartGenericAvroClient( + ClientConfig.defaultGenericClientConfig(storeName) + .setVeniceURL(clusterWrapper.getRandomRouterURL()) + .setMetricsRepository(metricsRepository))) { + // test single get + int validGet = 0, filteredGet = 0; + for (int i = 20; i < 30; i++) { + Object result = client.get(Integer.toString(i)).get(); + if (result == null) { + filteredGet++; + } else { + validGet++; } } + // Half records are valid, another half is not + Assert.assertEquals(validGet, 5); + Assert.assertEquals(filteredGet, 5); + // test deletes + for (int i = 30; i < 40; i++) { + // not matter the DELETE is TTLed or not, the value should always be null + Assert.assertNull(client.get(Integer.toString(i)).get()); + } + // test old data - should be empty due to empty push + for (int i = 40; i < 100; i++) { + Assert.assertNull(client.get(Integer.toString(i)).get()); + } + } - // Since nothing is produced, so no changed events generated. - TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, true, () -> { - pollChangeEventsFromChangeCaptureConsumer2(polledChangeEvents, veniceChangelogConsumer); - Assert.assertEquals(polledChangeEvents.size(), 0); - }); + // Since nothing is produced, so no changed events generated. + TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, true, () -> { + pollChangeEventsFromChangeCaptureConsumer2(polledChangeEvents, veniceChangelogConsumer); + Assert.assertEquals(polledChangeEvents.size(), 0); + }); - // Seek to the beginning of the push - veniceChangelogConsumer.seekToBeginningOfPush().join(); - TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - Assert.assertEquals(polledChangeEvents.size(), 30); - }); + // Seek to the beginning of the push + veniceChangelogConsumer.seekToBeginningOfPush().join(); + TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); + Assert.assertEquals(polledChangeEvents.size(), 30); + }); - // Save a checkpoint and clear the map - Set checkpointSet = new HashSet<>(); - checkpointSet.add(polledChangeEvents.get(Integer.toString(20)).getOffset()); - allChangeEvents.putAll(polledChangeEvents); - polledChangeEvents.clear(); - - // Seek to a bogus checkpoint - PubSubPosition badPubSubPosition = new ApacheKafkaOffsetPosition(1337L); - VeniceChangeCoordinate badCoordinate = - new MockVeniceChangeCoordinate(storeName + "_v777777", badPubSubPosition, 0); - Set badCheckpointSet = new HashSet<>(); - badCheckpointSet.add(badCoordinate); - - Assert.assertThrows(() -> veniceChangelogConsumer.seekToCheckpoint(badCheckpointSet).get()); - - // Seek the consumer by checkpoint - veniceChangelogConsumer.seekToCheckpoint(checkpointSet).join(); - allChangeEvents.putAll(polledChangeEvents); - polledChangeEvents.clear(); - - // Poll Change events again, verify we get everything - TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - // Repush with TTL will include delete events in the topic - Assert.assertEquals(polledChangeEvents.size(), 16); - }); - allChangeEvents.putAll(polledChangeEvents); - polledChangeEvents.clear(); - Assert.assertEquals(allChangeEvents.size(), 121); - - // Seek the consumer to the beginning of push (since the latest is version 4 with no nearline writes, shouldn't - // have any new writes) - // veniceAfterImageConsumer.seekToEndOfPush().join(); - TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - Assert.assertEquals(polledChangeEvents.size(), 0); - }); + // Save a checkpoint and clear the map + Set checkpointSet = new HashSet<>(); + checkpointSet.add(polledChangeEvents.get(Integer.toString(20)).getOffset()); + allChangeEvents.putAll(polledChangeEvents); + polledChangeEvents.clear(); + + // Seek to a bogus checkpoint + PubSubPosition badPubSubPosition = new ApacheKafkaOffsetPosition(1337L); + VeniceChangeCoordinate badCoordinate = new MockVeniceChangeCoordinate(storeName + "_v777777", badPubSubPosition, 0); + Set badCheckpointSet = new HashSet<>(); + badCheckpointSet.add(badCoordinate); + + Assert.assertThrows(() -> veniceChangelogConsumer.seekToCheckpoint(badCheckpointSet).get()); + + // Seek the consumer by checkpoint + veniceChangelogConsumer.seekToCheckpoint(checkpointSet).join(); + allChangeEvents.putAll(polledChangeEvents); + polledChangeEvents.clear(); + + // Poll Change events again, verify we get everything + TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); + // Repush with TTL will include delete events in the topic + Assert.assertEquals(polledChangeEvents.size(), 16); + }); + allChangeEvents.putAll(polledChangeEvents); + polledChangeEvents.clear(); + Assert.assertEquals(allChangeEvents.size(), 121); + + // Seek the consumer to the beginning of push (since the latest is version 4 with no nearline writes, shouldn't + // have any new writes) + // veniceAfterImageConsumer.seekToEndOfPush().join(); + TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); + Assert.assertEquals(polledChangeEvents.size(), 0); + }); - // Also should be nothing on the tail - // veniceAfterImageConsumer.seekToTail().join(); - TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - Assert.assertEquals(polledChangeEvents.size(), 0); - }); + // Also should be nothing on the tail + // veniceAfterImageConsumer.seekToTail().join(); + TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); + Assert.assertEquals(polledChangeEvents.size(), 0); + }); - // This should get everything submitted to the CC topic on this version (version 4 doesn't have anything) - veniceChangelogConsumer.seekToTimestamp(timestamp); - TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - Assert.assertEquals(polledChangeEvents.size(), 0); - }); + // This should get everything submitted to the CC topic on this version (version 4 doesn't have anything) + veniceChangelogConsumer.seekToTimestamp(timestamp); + TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); + Assert.assertEquals(polledChangeEvents.size(), 0); + }); - versionTopicEvents.clear(); - TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { - pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); - // At this point, the consumer should have auto tracked to version 4, and since we didn't apply any nearline - // writes to version 4, there should be no events to consume at this point - Assert.assertEquals(versionTopicEvents.size(), 0); - }); + versionTopicEvents.clear(); + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { + pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); + // At this point, the consumer should have auto tracked to version 4, and since we didn't apply any nearline + // writes to version 4, there should be no events to consume at this point + Assert.assertEquals(versionTopicEvents.size(), 0); + }); - versionTopicConsumer.seekToEndOfPush().get(); - TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { - pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); - // Again, no events to consume here. - Assert.assertEquals(versionTopicEvents.size(), 0); - }); + versionTopicConsumer.seekToEndOfPush().get(); + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { + pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); + // Again, no events to consume here. + Assert.assertEquals(versionTopicEvents.size(), 0); + }); - versionTopicConsumer.seekToBeginningOfPush().get(); - TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { - pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); - // Reconsuming the events from the version topic, which at this point should just contain the same 16 - // events we consumed with the before/after image consumer earlier. - Assert.assertEquals(versionTopicEvents.size(), 30); - }); + versionTopicConsumer.seekToBeginningOfPush().get(); + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { + pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); + // Reconsuming the events from the version topic, which at this point should just contain the same 16 + // events we consumed with the before/after image consumer earlier. + Assert.assertEquals(versionTopicEvents.size(), 30); + }); - // Verify version swap count matches with version count - 1 (since we don't transmit from version 0 to version 1). - // This will include messages for all partitions, so (4 version -1)*3 partitions=9 messages - TestUtils.waitForNonDeterministicAssertion( - 5, - TimeUnit.SECONDS, - () -> Assert.assertEquals(TestView.getInstance().getVersionSwapCountForStore(storeName), 9)); - // Verify total updates match up (first 20 + next 20 should make 40, And then double it again as rewind updates - // are - // applied to a version) - TestUtils.waitForNonDeterministicAssertion( - 8, - TimeUnit.SECONDS, - () -> Assert.assertEquals(TestView.getInstance().getRecordCountForStore(storeName), 85)); - parentControllerClient.disableAndDeleteStore(storeName); - // Verify that topics and store is cleaned up - TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { - MultiStoreTopicsResponse storeTopicsResponse = childControllerClient.getDeletableStoreTopics(); - Assert.assertFalse(storeTopicsResponse.isError()); - Assert.assertEquals(storeTopicsResponse.getTopics().size(), 0); - }); - } + // Verify version swap count matches with version count - 1 (since we don't transmit from version 0 to version 1). + // This will include messages for all partitions, so (4 version -1)*3 partitions=9 messages + TestUtils.waitForNonDeterministicAssertion( + 5, + TimeUnit.SECONDS, + () -> Assert.assertEquals(TestView.getInstance().getVersionSwapCountForStore(storeName), 9)); + // Verify total updates match up (first 20 + next 20 should make 40, And then double it again as rewind updates + // are + // applied to a version) + TestUtils.waitForNonDeterministicAssertion( + 8, + TimeUnit.SECONDS, + () -> Assert.assertEquals(TestView.getInstance().getRecordCountForStore(storeName), 85)); + parentControllerClient.disableAndDeleteStore(storeName); + // Verify that topics and store is cleaned up + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + MultiStoreTopicsResponse storeTopicsResponse = childControllerClient.getDeletableStoreTopics(); + Assert.assertFalse(storeTopicsResponse.isError()); + Assert.assertEquals(storeTopicsResponse.getTopics().size(), 0); + }); } @Test(timeOut = TEST_TIMEOUT, priority = 3) @@ -687,89 +674,82 @@ public void testSpecificRecordBootstrappingVeniceChangelogConsumer() throws Exce controllerClient1 -> setupControllerClient.addValueSchema(storeName, NAME_RECORD_V1_SCHEMA.toString())); TestWriteUtils.runPushJob("Run push job", props); - - TestMockTime testMockTime = new TestMockTime(); ZkServerWrapper localZkServer = multiRegionMultiClusterWrapper.getChildRegions().get(0).getZkServerWrapper(); - try (PubSubBrokerWrapper localKafka = ServiceFactory.getPubSubBroker( - new PubSubBrokerConfigs.Builder().setZkWrapper(localZkServer) - .setMockTime(testMockTime) - .setRegionName("local-pubsub") - .build())) { - Properties consumerProperties = new Properties(); - String localKafkaUrl = localKafka.getAddress(); - consumerProperties.put(KAFKA_BOOTSTRAP_SERVERS, localKafkaUrl); - consumerProperties.put(CLUSTER_NAME, clusterName); - consumerProperties.put(ZOOKEEPER_ADDRESS, localZkServer.getAddress()); - ChangelogClientConfig globalChangelogClientConfig = - new ChangelogClientConfig().setConsumerProperties(consumerProperties) - .setControllerD2ServiceName(D2_SERVICE_NAME) - .setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) - .setLocalD2ZkHosts(localZkServer.getAddress()) - .setControllerRequestRetryCount(3) - .setSpecificValue(TestChangelogValue.class) - .setBootstrapFileSystemPath(Utils.getUniqueString(inputDirPath)); - VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory = - new VeniceChangelogConsumerClientFactory(globalChangelogClientConfig, metricsRepository); - BootstrappingVeniceChangelogConsumer specificChangelogConsumer = - veniceChangelogConsumerClientFactory - .getBootstrappingChangelogConsumer(storeName, "0", TestChangelogValue.class); - specificChangelogConsumer.start().get(); - - Map, VeniceChangeCoordinate>> polledChangeEventsMap = - new HashMap<>(); - List, VeniceChangeCoordinate>> polledChangeEventsList = - new ArrayList<>(); - TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { - pollChangeEventsFromSpecificBootstrappingChangeCaptureConsumer( - polledChangeEventsMap, - polledChangeEventsList, - specificChangelogConsumer); - Assert.assertEquals(polledChangeEventsList.size(), 101); - }); - Assert.assertTrue( - polledChangeEventsMap.get(Integer.toString(1)).getValue().getCurrentValue() instanceof SpecificRecord); - TestChangelogValue value = new TestChangelogValue(); - value.firstName = "first_name_1"; - value.lastName = "last_name_1"; - Assert.assertEquals(polledChangeEventsMap.get(Integer.toString(1)).getValue().getCurrentValue(), value); - polledChangeEventsList.clear(); - polledChangeEventsMap.clear(); - - GenericRecord genericRecord = new GenericData.Record(NAME_RECORD_V1_SCHEMA); - genericRecord.put("firstName", "Venice"); - genericRecord.put("lastName", "Italy"); - - GenericRecord genericRecordV2 = new GenericData.Record(NAME_RECORD_V1_SCHEMA); - genericRecordV2.put("firstName", "Barcelona"); - genericRecordV2.put("lastName", "Spain"); - - VeniceSystemFactory factory = new VeniceSystemFactory(); - try (VeniceSystemProducer veniceProducer = factory - .getClosableProducer("venice", new MapConfig(getSamzaProducerConfig(childDatacenters, 0, storeName)), null)) { - veniceProducer.start(); - // Run Samza job to send PUT and DELETE requests. - sendStreamingRecord(veniceProducer, storeName, Integer.toString(10000), genericRecord, null); - sendStreamingRecord(veniceProducer, storeName, Integer.toString(10000), genericRecordV2, null); - } - - TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { - pollChangeEventsFromSpecificBootstrappingChangeCaptureConsumer( - polledChangeEventsMap, - polledChangeEventsList, - specificChangelogConsumer); - Assert.assertEquals(polledChangeEventsList.size(), 2); - }); - Assert.assertTrue( - polledChangeEventsMap.get(Integer.toString(10000)).getValue().getCurrentValue() instanceof SpecificRecord); + PubSubBrokerWrapper localKafka = multiRegionMultiClusterWrapper.getChildRegions().get(0).getPubSubBrokerWrapper(); + Properties consumerProperties = new Properties(); + String localKafkaUrl = localKafka.getAddress(); + consumerProperties.put(KAFKA_BOOTSTRAP_SERVERS, localKafkaUrl); + consumerProperties.put(CLUSTER_NAME, clusterName); + consumerProperties.put(ZOOKEEPER_ADDRESS, localZkServer.getAddress()); + ChangelogClientConfig globalChangelogClientConfig = + new ChangelogClientConfig().setConsumerProperties(consumerProperties) + .setControllerD2ServiceName(D2_SERVICE_NAME) + .setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) + .setLocalD2ZkHosts(localZkServer.getAddress()) + .setControllerRequestRetryCount(3) + .setSpecificValue(TestChangelogValue.class) + .setBootstrapFileSystemPath(Utils.getUniqueString(inputDirPath)); + VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory = + new VeniceChangelogConsumerClientFactory(globalChangelogClientConfig, metricsRepository); + BootstrappingVeniceChangelogConsumer specificChangelogConsumer = + veniceChangelogConsumerClientFactory + .getBootstrappingChangelogConsumer(storeName, "0", TestChangelogValue.class); + specificChangelogConsumer.start().get(); + + Map, VeniceChangeCoordinate>> polledChangeEventsMap = + new HashMap<>(); + List, VeniceChangeCoordinate>> polledChangeEventsList = + new ArrayList<>(); + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + pollChangeEventsFromSpecificBootstrappingChangeCaptureConsumer( + polledChangeEventsMap, + polledChangeEventsList, + specificChangelogConsumer); + Assert.assertEquals(polledChangeEventsList.size(), 101); + }); + Assert.assertTrue( + polledChangeEventsMap.get(Integer.toString(1)).getValue().getCurrentValue() instanceof SpecificRecord); + TestChangelogValue value = new TestChangelogValue(); + value.firstName = "first_name_1"; + value.lastName = "last_name_1"; + Assert.assertEquals(polledChangeEventsMap.get(Integer.toString(1)).getValue().getCurrentValue(), value); + polledChangeEventsList.clear(); + polledChangeEventsMap.clear(); + + GenericRecord genericRecord = new GenericData.Record(NAME_RECORD_V1_SCHEMA); + genericRecord.put("firstName", "Venice"); + genericRecord.put("lastName", "Italy"); + + GenericRecord genericRecordV2 = new GenericData.Record(NAME_RECORD_V1_SCHEMA); + genericRecordV2.put("firstName", "Barcelona"); + genericRecordV2.put("lastName", "Spain"); - parentControllerClient.disableAndDeleteStore(storeName); - // Verify that topics and store is cleaned up - TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { - MultiStoreTopicsResponse storeTopicsResponse = childControllerClient.getDeletableStoreTopics(); - Assert.assertFalse(storeTopicsResponse.isError()); - Assert.assertEquals(storeTopicsResponse.getTopics().size(), 0); - }); + VeniceSystemFactory factory = new VeniceSystemFactory(); + try (VeniceSystemProducer veniceProducer = factory + .getClosableProducer("venice", new MapConfig(getSamzaProducerConfig(childDatacenters, 0, storeName)), null)) { + veniceProducer.start(); + // Run Samza job to send PUT and DELETE requests. + sendStreamingRecord(veniceProducer, storeName, Integer.toString(10000), genericRecord, null); + sendStreamingRecord(veniceProducer, storeName, Integer.toString(10000), genericRecordV2, null); } + + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + pollChangeEventsFromSpecificBootstrappingChangeCaptureConsumer( + polledChangeEventsMap, + polledChangeEventsList, + specificChangelogConsumer); + Assert.assertEquals(polledChangeEventsList.size(), 2); + }); + Assert.assertTrue( + polledChangeEventsMap.get(Integer.toString(10000)).getValue().getCurrentValue() instanceof SpecificRecord); + + parentControllerClient.disableAndDeleteStore(storeName); + // Verify that topics and store is cleaned up + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + MultiStoreTopicsResponse storeTopicsResponse = childControllerClient.getDeletableStoreTopics(); + Assert.assertFalse(storeTopicsResponse.isError()); + Assert.assertEquals(storeTopicsResponse.getTopics().size(), 0); + }); } @Test(timeOut = TEST_TIMEOUT, priority = 3) @@ -793,101 +773,98 @@ public void testSpecificRecordVeniceChangelogConsumer() throws Exception { MetricsRepository metricsRepository = new MetricsRepository(); ControllerClient setupControllerClient = createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, storeParms); - setupControllerClient - .retryableRequest(5, controllerClient1 -> setupControllerClient.updateStore(storeName, storeParms)); + TestUtils.assertCommand( + setupControllerClient + .retryableRequest(5, controllerClient1 -> setupControllerClient.updateStore(storeName, storeParms))); // Registering real data schema as schema v2. - for (Schema schema: SCHEMA_HISTORY) { - setupControllerClient - .retryableRequest(5, controllerClient1 -> setupControllerClient.addValueSchema(storeName, schema.toString())); + TestUtils.assertCommand( + setupControllerClient.retryableRequest( + 5, + controllerClient1 -> setupControllerClient.addValueSchema(storeName, schema.toString())), + "Failed to add schema: " + schema.toString() + " to store " + storeName); } TestWriteUtils.runPushJob("Run push job", props); - TestMockTime testMockTime = new TestMockTime(); ZkServerWrapper localZkServer = multiRegionMultiClusterWrapper.getChildRegions().get(0).getZkServerWrapper(); - try (PubSubBrokerWrapper localKafka = ServiceFactory.getPubSubBroker( - new PubSubBrokerConfigs.Builder().setZkWrapper(localZkServer) - .setMockTime(testMockTime) - .setRegionName("local-pubsub") - .build())) { - Properties consumerProperties = new Properties(); - String localKafkaUrl = localKafka.getAddress(); - consumerProperties.put(KAFKA_BOOTSTRAP_SERVERS, localKafkaUrl); - consumerProperties.put(CLUSTER_NAME, clusterName); - consumerProperties.put(ZOOKEEPER_ADDRESS, localZkServer.getAddress()); - ChangelogClientConfig globalChangelogClientConfig = - new ChangelogClientConfig().setConsumerProperties(consumerProperties) - .setControllerD2ServiceName(D2_SERVICE_NAME) - .setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) - .setLocalD2ZkHosts(localZkServer.getAddress()) - .setControllerRequestRetryCount(3) - .setSpecificValue(TestChangelogValue.class) - .setBootstrapFileSystemPath(Utils.getUniqueString(inputDirPath)); - VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory = - new VeniceChangelogConsumerClientFactory(globalChangelogClientConfig, metricsRepository); - VeniceChangelogConsumer specificChangelogConsumer = - veniceChangelogConsumerClientFactory.getChangelogConsumer(storeName, "0", TestChangelogValue.class); - - specificChangelogConsumer.subscribeAll().get(); - - Assert.assertFalse(specificChangelogConsumer.isCaughtUp()); - - Map, VeniceChangeCoordinate>> polledChangeEventsMap = - new HashMap<>(); - List, VeniceChangeCoordinate>> polledChangeEventsList = - new ArrayList<>(); - TestUtils.waitForNonDeterministicAssertion(120, TimeUnit.SECONDS, true, () -> { - pollChangeEventsFromSpecificChangeCaptureConsumer( - polledChangeEventsMap, - polledChangeEventsList, - specificChangelogConsumer); - Assert.assertEquals(polledChangeEventsList.size(), 100); - Assert.assertTrue(specificChangelogConsumer.isCaughtUp()); - }); + PubSubBrokerWrapper localKafka = multiRegionMultiClusterWrapper.getChildRegions().get(0).getPubSubBrokerWrapper(); + Properties consumerProperties = new Properties(); + String localKafkaUrl = localKafka.getAddress(); + consumerProperties.put(KAFKA_BOOTSTRAP_SERVERS, localKafkaUrl); + consumerProperties.put(CLUSTER_NAME, clusterName); + consumerProperties.put(ZOOKEEPER_ADDRESS, localZkServer.getAddress()); + ChangelogClientConfig globalChangelogClientConfig = + new ChangelogClientConfig().setConsumerProperties(consumerProperties) + .setControllerD2ServiceName(D2_SERVICE_NAME) + .setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) + .setLocalD2ZkHosts(localZkServer.getAddress()) + .setControllerRequestRetryCount(3) + .setSpecificValue(TestChangelogValue.class) + .setBootstrapFileSystemPath(Utils.getUniqueString(inputDirPath)); + VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory = + new VeniceChangelogConsumerClientFactory(globalChangelogClientConfig, metricsRepository); + VeniceChangelogConsumer specificChangelogConsumer = + veniceChangelogConsumerClientFactory.getChangelogConsumer(storeName, "0", TestChangelogValue.class); + + specificChangelogConsumer.subscribeAll().get(); + + Assert.assertFalse(specificChangelogConsumer.isCaughtUp()); + + Map, VeniceChangeCoordinate>> polledChangeEventsMap = + new HashMap<>(); + List, VeniceChangeCoordinate>> polledChangeEventsList = + new ArrayList<>(); + TestUtils.waitForNonDeterministicAssertion(120, TimeUnit.SECONDS, true, () -> { + pollChangeEventsFromSpecificChangeCaptureConsumer( + polledChangeEventsMap, + polledChangeEventsList, + specificChangelogConsumer); + Assert.assertEquals(polledChangeEventsList.size(), 100); + Assert.assertTrue(specificChangelogConsumer.isCaughtUp()); + }); - Assert.assertTrue( - polledChangeEventsMap.get(Integer.toString(1)).getValue().getCurrentValue() instanceof SpecificRecord); - TestChangelogValue value = new TestChangelogValue(); - value.firstName = "first_name_1"; - value.lastName = "last_name_1"; - Assert.assertEquals(polledChangeEventsMap.get(Integer.toString(1)).getValue().getCurrentValue(), value); - polledChangeEventsList.clear(); - polledChangeEventsMap.clear(); - - GenericRecord genericRecord = new GenericData.Record(NAME_RECORD_V1_SCHEMA); - genericRecord.put("firstName", "Venice"); - genericRecord.put("lastName", "Italy"); - - GenericRecord genericRecordV2 = new GenericData.Record(NAME_RECORD_V1_SCHEMA); - genericRecordV2.put("firstName", "Barcelona"); - genericRecordV2.put("lastName", "Spain"); - - VeniceSystemFactory factory = new VeniceSystemFactory(); - try (VeniceSystemProducer veniceProducer = factory - .getClosableProducer("venice", new MapConfig(getSamzaProducerConfig(childDatacenters, 0, storeName)), null)) { - veniceProducer.start(); - // Run Samza job to send PUT and DELETE requests. - sendStreamingRecord(veniceProducer, storeName, Integer.toString(10000), genericRecord, null); - sendStreamingRecord(veniceProducer, storeName, Integer.toString(10000), genericRecordV2, null); - } + Assert.assertTrue( + polledChangeEventsMap.get(Integer.toString(1)).getValue().getCurrentValue() instanceof SpecificRecord); + TestChangelogValue value = new TestChangelogValue(); + value.firstName = "first_name_1"; + value.lastName = "last_name_1"; + Assert.assertEquals(polledChangeEventsMap.get(Integer.toString(1)).getValue().getCurrentValue(), value); + polledChangeEventsList.clear(); + polledChangeEventsMap.clear(); - TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { - pollChangeEventsFromSpecificChangeCaptureConsumer( - polledChangeEventsMap, - polledChangeEventsList, - specificChangelogConsumer); - Assert.assertEquals(polledChangeEventsList.size(), 2); - }); - Assert.assertTrue( - polledChangeEventsMap.get(Integer.toString(10000)).getValue().getCurrentValue() instanceof SpecificRecord); - parentControllerClient.disableAndDeleteStore(storeName); - // Verify that topics and store is cleaned up - TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { - MultiStoreTopicsResponse storeTopicsResponse = childControllerClient.getDeletableStoreTopics(); - Assert.assertFalse(storeTopicsResponse.isError()); - Assert.assertEquals(storeTopicsResponse.getTopics().size(), 0); - }); + GenericRecord genericRecord = new GenericData.Record(NAME_RECORD_V1_SCHEMA); + genericRecord.put("firstName", "Venice"); + genericRecord.put("lastName", "Italy"); + + GenericRecord genericRecordV2 = new GenericData.Record(NAME_RECORD_V1_SCHEMA); + genericRecordV2.put("firstName", "Barcelona"); + genericRecordV2.put("lastName", "Spain"); + + VeniceSystemFactory factory = new VeniceSystemFactory(); + try (VeniceSystemProducer veniceProducer = factory + .getClosableProducer("venice", new MapConfig(getSamzaProducerConfig(childDatacenters, 0, storeName)), null)) { + veniceProducer.start(); + // Run Samza job to send PUT and DELETE requests. + sendStreamingRecord(veniceProducer, storeName, Integer.toString(10000), genericRecord, null); + sendStreamingRecord(veniceProducer, storeName, Integer.toString(10000), genericRecordV2, null); } + + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + pollChangeEventsFromSpecificChangeCaptureConsumer( + polledChangeEventsMap, + polledChangeEventsList, + specificChangelogConsumer); + Assert.assertEquals(polledChangeEventsList.size(), 2); + }); + Assert.assertTrue( + polledChangeEventsMap.get(Integer.toString(10000)).getValue().getCurrentValue() instanceof SpecificRecord); + parentControllerClient.disableAndDeleteStore(storeName); + // Verify that topics and store is cleaned up + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + MultiStoreTopicsResponse storeTopicsResponse = childControllerClient.getDeletableStoreTopics(); + Assert.assertFalse(storeTopicsResponse.isError()); + Assert.assertEquals(storeTopicsResponse.getTopics().size(), 0); + }); } private void runSamzaStreamJob( diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestDumpIngestionContext.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestDumpIngestionContext.java index d6dc2bd365..a5a67cd9fc 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestDumpIngestionContext.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestDumpIngestionContext.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.testng.Assert; +import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -78,6 +79,11 @@ public void setUp() { this.parentController = parentControllers.get(0); } + @AfterClass(alwaysRun = true) + public void tearDown() { + Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper); + } + @Test(timeOut = TEST_TIMEOUT_MS) public void testDumpHostHeartbeatLag() { final String storeName = Utils.getUniqueString("dumpInfo"); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceMultiClusterWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceMultiClusterWrapper.java index d03c9518ce..88baea8393 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceMultiClusterWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceMultiClusterWrapper.java @@ -231,10 +231,18 @@ public ZkServerWrapper getZkServerWrapper() { return zkServerWrapper; } + /** + * @deprecated Use {@link #getPubSubBrokerWrapper()} instead. + */ + @Deprecated public PubSubBrokerWrapper getKafkaBrokerWrapper() { return pubSubBrokerWrapper; } + public PubSubBrokerWrapper getPubSubBrokerWrapper() { + return pubSubBrokerWrapper; + } + public VeniceControllerWrapper getRandomController() { return this.controllers.values().stream().filter(controller -> controller.isRunning()).findAny().get(); }