Skip to content

Commit

Permalink
Merge conflicts :()
Browse files Browse the repository at this point in the history
  • Loading branch information
bonytoni committed Aug 13, 2024
1 parent 98ca7f2 commit cb62f8f
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,12 @@ static StatefulServiceProvider<VeniceControllerWrapper> generateService(VeniceCo

PubSubClientsFactory pubSubClientsFactory = options.getKafkaBroker().getPubSubClientsFactory();
for (String clusterName: options.getClusterNames()) {
VeniceProperties clusterProps = IntegrationTestUtils
.getClusterProps(clusterName, options.getZkAddress(), options.getKafkaBroker(), options.isSslToKafka());
VeniceProperties clusterProps = IntegrationTestUtils.getClusterProps(
clusterName,
options.getZkAddress(),
options.getVeniceZkBasePath(),
options.getKafkaBroker(),
options.isSslToKafka());

// TODO: Validate that these configs are all still used.
// TODO: Centralize default config values in a single place
Expand Down Expand Up @@ -365,12 +369,6 @@ static StatefulServiceProvider<VeniceControllerWrapper> generateService(VeniceCo
.build();
VeniceController veniceController = new VeniceController(ctx);

// String zkBasePath;
// if (options.isParent()) {
// zkBasePath = "/test-venice-parent";
// } else {
// zkBasePath = "/test-venice";
// }
return new VeniceControllerWrapper(
options.getRegionName(),
serviceName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ static ServiceProvider<VeniceTwoLayerMultiRegionMultiClusterWrapper> generateSer

try {
zkServer = ServiceFactory.getZkServer();
IntegrationTestUtils.ensureZkPathExists(zkServer.getAddress(), options.getParentVeniceZkBasePath());
parentPubSubBrokerWrapper = ServiceFactory.getPubSubBroker(
new PubSubBrokerConfigs.Builder().setZkWrapper(zkServer)
.setRegionName(DEFAULT_PARENT_DATA_CENTER_REGION_NAME)
Expand Down Expand Up @@ -149,11 +150,16 @@ static ServiceProvider<VeniceTwoLayerMultiRegionMultiClusterWrapper> generateSer
.put(CHILD_DATA_CENTER_KAFKA_URL_PREFIX + "." + parentRegionName, parentPubSubBrokerWrapper.getAddress());
for (String regionName: childRegionName) {
ZkServerWrapper zkServerWrapper = ServiceFactory.getZkServer();
ZkServerWrapper parentZkServerWrapper = ServiceFactory.getZkServer();
IntegrationTestUtils.ensureZkPathExists(zkServerWrapper.getAddress(), options.getChildVeniceZkBasePath());
if (options.isParentControllerInChildRegion()) {
ZkServerWrapper parentZkServerWrapper = ServiceFactory.getZkServer();
IntegrationTestUtils
.ensureZkPathExists(parentZkServerWrapper.getAddress(), options.getParentVeniceZkBasePath());
zkServerByRegionName.put(parentRegionName + regionName, parentZkServerWrapper);
}
PubSubBrokerWrapper regionalPubSubBrokerWrapper = ServiceFactory.getPubSubBroker(
new PubSubBrokerConfigs.Builder().setZkWrapper(zkServerWrapper).setRegionName(regionName).build());
allPubSubBrokerWrappers.add(regionalPubSubBrokerWrapper);
zkServerByRegionName.put(parentRegionName + regionName, parentZkServerWrapper);
zkServerByRegionName.put(regionName, zkServerWrapper);
pubSubBrokerByRegionName.put(regionName, regionalPubSubBrokerWrapper);
nativeReplicationRequiredChildControllerProps
Expand Down Expand Up @@ -192,6 +198,7 @@ static ServiceProvider<VeniceTwoLayerMultiRegionMultiClusterWrapper> generateSer
additionalServerProps.putAll(pubSubBrokerProps);

VeniceMultiClusterCreateOptions.Builder builder = new VeniceMultiClusterCreateOptions.Builder().multiRegion(true)
.veniceZkBasePath(options.getChildVeniceZkBasePath())
.numberOfClusters(options.getNumberOfClusters())
.numberOfControllers(options.getNumberOfChildControllers())
.numberOfServers(options.getNumberOfServers())
Expand Down Expand Up @@ -242,6 +249,7 @@ static ServiceProvider<VeniceTwoLayerMultiRegionMultiClusterWrapper> generateSer
clusterNames,
zkServerByRegionName.get(parentRegionName + regionName),
parentPubSubBrokerWrapper).multiRegion(true)
.veniceZkBasePath(options.getParentVeniceZkBasePath())
.replicationFactor(options.getReplicationFactor())
.childControllers(childControllers)
.extraProperties(i == 0 ? activeParentControllerProperties : passiveParentControllerProperties)
Expand All @@ -256,6 +264,7 @@ static ServiceProvider<VeniceTwoLayerMultiRegionMultiClusterWrapper> generateSer
VeniceControllerCreateOptions parentControllerCreateOptions =
new VeniceControllerCreateOptions.Builder(clusterNames, zkServer, parentPubSubBrokerWrapper)
.multiRegion(true)
.veniceZkBasePath(options.getParentVeniceZkBasePath())
.replicationFactor(options.getReplicationFactor())
.childControllers(childControllers)
.extraProperties(finalParentControllerProperties)
Expand Down

0 comments on commit cb62f8f

Please sign in to comment.