diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveParentControllerRegion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveParentControllerRegion.java index 5596016302..983617de8c 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveParentControllerRegion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveParentControllerRegion.java @@ -1,20 +1,31 @@ package com.linkedin.venice.endToEnd; +import static com.linkedin.venice.ConfigKeys.*; import static com.linkedin.venice.controller.ParentControllerRegionState.*; +import com.linkedin.venice.ZkCopier; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.NewStoreResponse; import com.linkedin.venice.controllerapi.StoreResponse; import com.linkedin.venice.controllerapi.VersionCreationResponse; +import com.linkedin.venice.helix.ZkClientFactory; +import com.linkedin.venice.integration.utils.PubSubBrokerWrapper; import com.linkedin.venice.integration.utils.ServiceFactory; +import com.linkedin.venice.integration.utils.VeniceControllerCreateOptions; import com.linkedin.venice.integration.utils.VeniceControllerWrapper; import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper; import com.linkedin.venice.integration.utils.VeniceMultiRegionClusterCreateOptions; import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; +import com.linkedin.venice.integration.utils.ZkServerWrapper; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Utils; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.concurrent.TimeUnit; +import org.apache.helix.zookeeper.datamodel.serializer.ByteArraySerializer; +import org.apache.helix.zookeeper.impl.client.ZkClient; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.testng.Assert; @@ -51,10 +62,6 @@ public void cleanUp() { * 2. Perform store operation and VPJ job to both parent controllers. * 3. Verify ACTIVE parent controller successfully completes operations and PASSIVE parent controller throws exception. * 4. Verify child controllers are updated. - * - * TODO now: create 2 zk in child region: 1 for parent controller and 1 for child controller - * - * TODO after: * 5. To imitate a region going down, we switch ACTIVE and PASSIVE states for both parent controllers * 6. Migrate metadata from old ACTIVE parent controller to the new ACTIVE parent controller * 7. Verify both parent controllers are synced. @@ -110,38 +117,106 @@ public void testActiveParentControllerRegion() { LOGGER.info("Creating new store in parent controller 0"); NewStoreResponse parentController0NewStoreResponse = parentControllerClient0.createNewStore(storeName, "", keySchemaStr, valueSchemaStr); - LOGGER.info("parent controller 0 new store response: " + parentController0NewStoreResponse); Assert.assertFalse(parentController0NewStoreResponse.isError()); - LOGGER.info("Creating new store in parent controller 1"); + LOGGER.info("Error creating new store in parent controller 1"); NewStoreResponse parentController1NewStoreResponse = parentControllerClient1.createNewStore(storeName, "", keySchemaStr, valueSchemaStr); - LOGGER.info("parent controller 0 new store response: " + parentController0NewStoreResponse); Assert.assertTrue(parentController1NewStoreResponse.isError()); - LOGGER.info("empty push to parent controller 0"); + LOGGER.info("Empty push to parent controller 0"); VersionCreationResponse parentController0VersionCreationResponse = - parentControllerClient0.emptyPush(storeName, "test", 1L); - LOGGER.info("parent controller 0 version creation response: " + parentController0VersionCreationResponse); + parentControllerClient0.emptyPush(storeName, "test-push-1", 1L); Assert.assertFalse(parentController0VersionCreationResponse.isError()); Assert.assertEquals(parentController0VersionCreationResponse.getVersion(), 1); + LOGGER.info("Error empty push to parent controller 1"); VersionCreationResponse parentController1VersionCreationResponse = - parentControllerClient1.emptyPush(storeName, "test", 1L); + parentControllerClient1.emptyPush(storeName, "test-push-1", 1L); Assert.assertTrue(parentController1VersionCreationResponse.isError()); - // step 4 TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, false, true, () -> { StoreResponse storeResponse = dc0Client.getStore(storeName); Assert.assertFalse(storeResponse.isError()); Assert.assertEquals(storeResponse.getStore().getCurrentVersion(), 1); - }); TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, false, true, () -> { StoreResponse storeResponse = dc1Client.getStore(storeName); Assert.assertFalse(storeResponse.isError()); Assert.assertEquals(storeResponse.getStore().getCurrentVersion(), 1); }); + + LOGGER.info("Empty push to parent controller 0"); + VersionCreationResponse parentController0VersionCreationResponseRepush = + parentControllerClient0.emptyPush(storeName, "test-push-2", 1L); + Assert.assertFalse(parentController0VersionCreationResponseRepush.isError()); + Assert.assertEquals(parentController0VersionCreationResponseRepush.getVersion(), 2); + + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, false, true, () -> { + StoreResponse storeResponse = dc0Client.getStore(storeName); + Assert.assertFalse(storeResponse.isError()); + Assert.assertEquals(storeResponse.getStore().getCurrentVersion(), 2); + }); + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, false, true, () -> { + StoreResponse storeResponse = dc1Client.getStore(storeName); + Assert.assertFalse(storeResponse.isError()); + Assert.assertEquals(storeResponse.getStore().getCurrentVersion(), 2); + }); + + String parentController0ZkAddress = parentController0.getZkAddress(); + String parentController1ZkAddress = parentController1.getZkAddress(); + // parentController0ZkAddress = parentController0ZkAddress.substring(0, + // parentController0ZkAddress.lastIndexOf('/')); + // parentController1ZkAddress = parentController1ZkAddress.substring(0, + // parentController1ZkAddress.lastIndexOf('/')); + ZkClient parent0ZkClient = ZkClientFactory.newZkClient(parentController0ZkAddress); + ZkClient parent1ZkClient = ZkClientFactory.newZkClient(parentController1ZkAddress); + + String parentController1RegionName = parentController1.getVeniceAdmin().getRegionName(); + String[] clusterNames = new String[] { clusterName }; + ZkServerWrapper zkServerWrapper = venice.getZkServerByRegionName().get("dc-parent-0dc-1"); + PubSubBrokerWrapper pubSubBrokerWrapper = venice.getParentKafkaBrokerWrapper(); + VeniceControllerWrapper[] childControllers = childDatacenters.stream() + .map(VeniceMultiClusterWrapper::getRandomController) + .toArray(VeniceControllerWrapper[]::new); + Map clusterToD2 = Collections.singletonMap(clusterName, "venice-0"); + Map clusterToServerD2 = + Collections.singletonMap(clusterName, parentController0.getVeniceAdmin().getServerD2Service(clusterName)); + Properties props = new Properties(); + props.put(CONTROLLER_PARENT_REGION_STATE, ACTIVE); + props.setProperty(PARTICIPANT_MESSAGE_STORE_ENABLED, "true"); + VeniceControllerWrapper newActiveParentController = ServiceFactory.getVeniceController( + new VeniceControllerCreateOptions.Builder(clusterNames, zkServerWrapper, pubSubBrokerWrapper) + .multiRegion(true) + .replicationFactor(1) + .childControllers(childControllers) + .extraProperties(props) + .clusterToD2(clusterToD2) + .clusterToServerD2(clusterToServerD2) + .regionName("dc1") + .build()); + + // kill parent controllers and zkservers + Utils.closeQuietlyWithErrorLogged(parentController0); + Utils.closeQuietlyWithErrorLogged(parentController1); + // TODO: kill zkservers + + LOGGER.info( + "new active parent controller is ready" + + newActiveParentController.getVeniceAdmin().getParentControllerRegionState()); + ControllerClient newActiveParentControllerClient = + new ControllerClient(clusterName, newActiveParentController.getControllerUrl()); + + String newActiveParentControllerZkAddress = newActiveParentController.getZkAddress(); + // newActiveParentControllerZkAddress = newActiveParentControllerZkAddress.substring(0, + // newActiveParentControllerZkAddress.lastIndexOf('/')); + ZkClient newActiveParentZkClient = ZkClientFactory.newZkClient(newActiveParentControllerZkAddress); + newActiveParentZkClient.setZkSerializer(new ByteArraySerializer()); + parent0ZkClient.setZkSerializer(new ByteArraySerializer()); + LOGGER.info("WWWW" + newActiveParentZkClient.getChildren("/")); + // TODO: change to admintool + ZkCopier + .migrateVenicePaths(parent0ZkClient, newActiveParentZkClient, Collections.singleton("venice-cluster0"), "/"); } } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java index 1cd9bb01ac..a13c8ba6eb 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java @@ -368,13 +368,6 @@ static StatefulServiceProvider generateService(VeniceCo .setExternalSupersetSchemaGenerator(supersetSchemaGenerator.orElse(null)) .build(); VeniceController veniceController = new VeniceController(ctx); - - String zkBasePath; - if (options.isParent()) { - zkBasePath = "/test-venice-parent"; - } else { - zkBasePath = "/test-venice"; - } return new VeniceControllerWrapper( options.getRegionName(), serviceName, @@ -385,7 +378,7 @@ static StatefulServiceProvider generateService(VeniceCo propertiesList, options.isParent(), d2ServerList, - options.getZkAddress() + zkBasePath, + options.getZkAddress(), metricsRepository); }; } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java index 471a40844b..ecebc55cf7 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java @@ -147,7 +147,13 @@ static ServiceProvider generateSer .put(CHILD_DATA_CENTER_KAFKA_URL_PREFIX + "." + parentRegionName, parentPubSubBrokerWrapper.getAddress()); for (String regionName: childRegionName) { ZkServerWrapper zkServerWrapper = ServiceFactory.getZkServer(); - IntegrationTestUtils.ensureZkPathExists(zkServer.getAddress(), options.getChildVeniceZkBasePath()); + IntegrationTestUtils.ensureZkPathExists(zkServerWrapper.getAddress(), options.getChildVeniceZkBasePath()); + if (options.isParentControllerInChildRegion()) { + ZkServerWrapper parentZkServerWrapper = ServiceFactory.getZkServer(); + IntegrationTestUtils + .ensureZkPathExists(parentZkServerWrapper.getAddress(), options.getParentVeniceZkBasePath()); + zkServerByRegionName.put(parentRegionName + regionName, parentZkServerWrapper); + } PubSubBrokerWrapper regionalPubSubBrokerWrapper = ServiceFactory.getPubSubBroker( new PubSubBrokerConfigs.Builder().setZkWrapper(zkServerWrapper).setRegionName(regionName).build()); allPubSubBrokerWrappers.add(regionalPubSubBrokerWrapper); @@ -238,8 +244,9 @@ static ServiceProvider generateSer VeniceControllerWrapper parentController = ServiceFactory.getVeniceController( new VeniceControllerCreateOptions.Builder( clusterNames, - zkServerByRegionName.get(regionName), + zkServerByRegionName.get(parentRegionName + regionName), parentPubSubBrokerWrapper).multiRegion(true) + .veniceZkBasePath(options.getChildVeniceZkBasePath()) .replicationFactor(options.getReplicationFactor()) .childControllers(childControllers) .extraProperties(i == 0 ? activeParentControllerProperties : passiveParentControllerProperties) @@ -254,6 +261,7 @@ static ServiceProvider generateSer VeniceControllerCreateOptions parentControllerCreateOptions = new VeniceControllerCreateOptions.Builder(clusterNames, zkServer, parentPubSubBrokerWrapper) .multiRegion(true) + .veniceZkBasePath(options.getChildVeniceZkBasePath()) .replicationFactor(options.getReplicationFactor()) .childControllers(childControllers) .extraProperties(finalParentControllerProperties)