Skip to content

Commit

Permalink
Resolving merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
bonytoni committed Aug 13, 2024
1 parent 7af0ba4 commit 669a396
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
package com.linkedin.venice.endToEnd;

import static com.linkedin.venice.ConfigKeys.*;
import static com.linkedin.venice.controller.ParentControllerRegionState.*;

import com.linkedin.venice.ZkCopier;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.NewStoreResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.helix.ZkClientFactory;
import com.linkedin.venice.integration.utils.PubSubBrokerWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceControllerCreateOptions;
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiRegionClusterCreateOptions;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.integration.utils.ZkServerWrapper;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.helix.zookeeper.datamodel.serializer.ByteArraySerializer;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
Expand Down Expand Up @@ -51,10 +62,6 @@ public void cleanUp() {
* 2. Perform store operation and VPJ job to both parent controllers.
* 3. Verify ACTIVE parent controller successfully completes operations and PASSIVE parent controller throws exception.
* 4. Verify child controllers are updated.
*
* TODO now: create 2 zk in child region: 1 for parent controller and 1 for child controller
*
* TODO after:
* 5. To imitate a region going down, we switch ACTIVE and PASSIVE states for both parent controllers
* 6. Migrate metadata from old ACTIVE parent controller to the new ACTIVE parent controller
* 7. Verify both parent controllers are synced.
Expand Down Expand Up @@ -110,38 +117,106 @@ public void testActiveParentControllerRegion() {
LOGGER.info("Creating new store in parent controller 0");
NewStoreResponse parentController0NewStoreResponse =
parentControllerClient0.createNewStore(storeName, "", keySchemaStr, valueSchemaStr);
LOGGER.info("parent controller 0 new store response: " + parentController0NewStoreResponse);
Assert.assertFalse(parentController0NewStoreResponse.isError());

LOGGER.info("Creating new store in parent controller 1");
LOGGER.info("Error creating new store in parent controller 1");
NewStoreResponse parentController1NewStoreResponse =
parentControllerClient1.createNewStore(storeName, "", keySchemaStr, valueSchemaStr);
LOGGER.info("parent controller 0 new store response: " + parentController0NewStoreResponse);
Assert.assertTrue(parentController1NewStoreResponse.isError());

LOGGER.info("empty push to parent controller 0");
LOGGER.info("Empty push to parent controller 0");
VersionCreationResponse parentController0VersionCreationResponse =
parentControllerClient0.emptyPush(storeName, "test", 1L);
LOGGER.info("parent controller 0 version creation response: " + parentController0VersionCreationResponse);
parentControllerClient0.emptyPush(storeName, "test-push-1", 1L);
Assert.assertFalse(parentController0VersionCreationResponse.isError());
Assert.assertEquals(parentController0VersionCreationResponse.getVersion(), 1);

LOGGER.info("Error empty push to parent controller 1");
VersionCreationResponse parentController1VersionCreationResponse =
parentControllerClient1.emptyPush(storeName, "test", 1L);
parentControllerClient1.emptyPush(storeName, "test-push-1", 1L);
Assert.assertTrue(parentController1VersionCreationResponse.isError());

// step 4
TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, false, true, () -> {
StoreResponse storeResponse = dc0Client.getStore(storeName);
Assert.assertFalse(storeResponse.isError());
Assert.assertEquals(storeResponse.getStore().getCurrentVersion(), 1);

});
TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, false, true, () -> {
StoreResponse storeResponse = dc1Client.getStore(storeName);
Assert.assertFalse(storeResponse.isError());
Assert.assertEquals(storeResponse.getStore().getCurrentVersion(), 1);
});

LOGGER.info("Empty push to parent controller 0");
VersionCreationResponse parentController0VersionCreationResponseRepush =
parentControllerClient0.emptyPush(storeName, "test-push-2", 1L);
Assert.assertFalse(parentController0VersionCreationResponseRepush.isError());
Assert.assertEquals(parentController0VersionCreationResponseRepush.getVersion(), 2);

TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, false, true, () -> {
StoreResponse storeResponse = dc0Client.getStore(storeName);
Assert.assertFalse(storeResponse.isError());
Assert.assertEquals(storeResponse.getStore().getCurrentVersion(), 2);
});
TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, false, true, () -> {
StoreResponse storeResponse = dc1Client.getStore(storeName);
Assert.assertFalse(storeResponse.isError());
Assert.assertEquals(storeResponse.getStore().getCurrentVersion(), 2);
});

String parentController0ZkAddress = parentController0.getZkAddress();
String parentController1ZkAddress = parentController1.getZkAddress();
// parentController0ZkAddress = parentController0ZkAddress.substring(0,
// parentController0ZkAddress.lastIndexOf('/'));
// parentController1ZkAddress = parentController1ZkAddress.substring(0,
// parentController1ZkAddress.lastIndexOf('/'));
ZkClient parent0ZkClient = ZkClientFactory.newZkClient(parentController0ZkAddress);
ZkClient parent1ZkClient = ZkClientFactory.newZkClient(parentController1ZkAddress);

String parentController1RegionName = parentController1.getVeniceAdmin().getRegionName();
String[] clusterNames = new String[] { clusterName };
ZkServerWrapper zkServerWrapper = venice.getZkServerByRegionName().get("dc-parent-0dc-1");
PubSubBrokerWrapper pubSubBrokerWrapper = venice.getParentKafkaBrokerWrapper();
VeniceControllerWrapper[] childControllers = childDatacenters.stream()
.map(VeniceMultiClusterWrapper::getRandomController)
.toArray(VeniceControllerWrapper[]::new);
Map<String, String> clusterToD2 = Collections.singletonMap(clusterName, "venice-0");
Map<String, String> clusterToServerD2 =
Collections.singletonMap(clusterName, parentController0.getVeniceAdmin().getServerD2Service(clusterName));
Properties props = new Properties();
props.put(CONTROLLER_PARENT_REGION_STATE, ACTIVE);
props.setProperty(PARTICIPANT_MESSAGE_STORE_ENABLED, "true");
VeniceControllerWrapper newActiveParentController = ServiceFactory.getVeniceController(
new VeniceControllerCreateOptions.Builder(clusterNames, zkServerWrapper, pubSubBrokerWrapper)
.multiRegion(true)
.replicationFactor(1)
.childControllers(childControllers)
.extraProperties(props)
.clusterToD2(clusterToD2)
.clusterToServerD2(clusterToServerD2)
.regionName("dc1")
.build());

// kill parent controllers and zkservers
Utils.closeQuietlyWithErrorLogged(parentController0);
Utils.closeQuietlyWithErrorLogged(parentController1);
// TODO: kill zkservers

LOGGER.info(
"new active parent controller is ready"
+ newActiveParentController.getVeniceAdmin().getParentControllerRegionState());
ControllerClient newActiveParentControllerClient =
new ControllerClient(clusterName, newActiveParentController.getControllerUrl());

String newActiveParentControllerZkAddress = newActiveParentController.getZkAddress();
// newActiveParentControllerZkAddress = newActiveParentControllerZkAddress.substring(0,
// newActiveParentControllerZkAddress.lastIndexOf('/'));
ZkClient newActiveParentZkClient = ZkClientFactory.newZkClient(newActiveParentControllerZkAddress);
newActiveParentZkClient.setZkSerializer(new ByteArraySerializer());
parent0ZkClient.setZkSerializer(new ByteArraySerializer());
LOGGER.info("WWWW" + newActiveParentZkClient.getChildren("/"));
// TODO: change to admintool
ZkCopier
.migrateVenicePaths(parent0ZkClient, newActiveParentZkClient, Collections.singleton("venice-cluster0"), "/");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -368,13 +368,6 @@ static StatefulServiceProvider<VeniceControllerWrapper> generateService(VeniceCo
.setExternalSupersetSchemaGenerator(supersetSchemaGenerator.orElse(null))
.build();
VeniceController veniceController = new VeniceController(ctx);

String zkBasePath;
if (options.isParent()) {
zkBasePath = "/test-venice-parent";
} else {
zkBasePath = "/test-venice";
}
return new VeniceControllerWrapper(
options.getRegionName(),
serviceName,
Expand All @@ -385,7 +378,7 @@ static StatefulServiceProvider<VeniceControllerWrapper> generateService(VeniceCo
propertiesList,
options.isParent(),
d2ServerList,
options.getZkAddress() + zkBasePath,
options.getZkAddress(),
metricsRepository);
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,13 @@ static ServiceProvider<VeniceTwoLayerMultiRegionMultiClusterWrapper> generateSer
.put(CHILD_DATA_CENTER_KAFKA_URL_PREFIX + "." + parentRegionName, parentPubSubBrokerWrapper.getAddress());
for (String regionName: childRegionName) {
ZkServerWrapper zkServerWrapper = ServiceFactory.getZkServer();
IntegrationTestUtils.ensureZkPathExists(zkServer.getAddress(), options.getChildVeniceZkBasePath());
IntegrationTestUtils.ensureZkPathExists(zkServerWrapper.getAddress(), options.getChildVeniceZkBasePath());
if (options.isParentControllerInChildRegion()) {
ZkServerWrapper parentZkServerWrapper = ServiceFactory.getZkServer();
IntegrationTestUtils
.ensureZkPathExists(parentZkServerWrapper.getAddress(), options.getParentVeniceZkBasePath());
zkServerByRegionName.put(parentRegionName + regionName, parentZkServerWrapper);
}
PubSubBrokerWrapper regionalPubSubBrokerWrapper = ServiceFactory.getPubSubBroker(
new PubSubBrokerConfigs.Builder().setZkWrapper(zkServerWrapper).setRegionName(regionName).build());
allPubSubBrokerWrappers.add(regionalPubSubBrokerWrapper);
Expand Down Expand Up @@ -238,8 +244,9 @@ static ServiceProvider<VeniceTwoLayerMultiRegionMultiClusterWrapper> generateSer
VeniceControllerWrapper parentController = ServiceFactory.getVeniceController(
new VeniceControllerCreateOptions.Builder(
clusterNames,
zkServerByRegionName.get(regionName),
zkServerByRegionName.get(parentRegionName + regionName),
parentPubSubBrokerWrapper).multiRegion(true)
.veniceZkBasePath(options.getChildVeniceZkBasePath())
.replicationFactor(options.getReplicationFactor())
.childControllers(childControllers)
.extraProperties(i == 0 ? activeParentControllerProperties : passiveParentControllerProperties)
Expand All @@ -254,6 +261,7 @@ static ServiceProvider<VeniceTwoLayerMultiRegionMultiClusterWrapper> generateSer
VeniceControllerCreateOptions parentControllerCreateOptions =
new VeniceControllerCreateOptions.Builder(clusterNames, zkServer, parentPubSubBrokerWrapper)
.multiRegion(true)
.veniceZkBasePath(options.getChildVeniceZkBasePath())
.replicationFactor(options.getReplicationFactor())
.childControllers(childControllers)
.extraProperties(finalParentControllerProperties)
Expand Down

0 comments on commit 669a396

Please sign in to comment.