Skip to content

Commit

Permalink
[da-vinci] Bug fix for peer discovery in DVC with multi-stores
Browse files Browse the repository at this point in the history
  • Loading branch information
jingy-li committed Feb 5, 2025
1 parent 76dbc70 commit d512964
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
package com.linkedin.davinci.blobtransfer;

import static com.linkedin.venice.client.store.ClientFactory.getTransportClient;

import com.linkedin.davinci.blobtransfer.client.NettyFileTransferClient;
import com.linkedin.davinci.blobtransfer.server.P2PBlobTransferService;
import com.linkedin.davinci.stats.AggVersionedBlobTransferStats;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.venice.blobtransfer.DaVinciBlobFinder;
import com.linkedin.venice.blobtransfer.ServerBlobFinder;
import com.linkedin.venice.client.store.AbstractAvroStoreClient;
import com.linkedin.venice.client.store.AvroGenericStoreClientImpl;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
Expand Down Expand Up @@ -79,12 +75,10 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerForDVCAndStart(
maxConcurrentSnapshotUser,
snapshotRetentionTimeInMin,
transferSnapshotTableFormat);
AbstractAvroStoreClient storeClient =
new AvroGenericStoreClientImpl<>(getTransportClient(clientConfig), false, clientConfig);
BlobTransferManager<Void> manager = new NettyP2PBlobTransferManager(
new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobTransferMaxTimeoutInMin, blobSnapshotManager),
new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService),
new DaVinciBlobFinder(storeClient),
new DaVinciBlobFinder(clientConfig),
baseDir,
aggVersionedBlobTransferStats);
manager.start();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
package com.linkedin.venice.blobtransfer;

import static com.linkedin.venice.client.store.ClientFactory.getTransportClient;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.NAME;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_PARTITION;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VERSION;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.venice.client.store.AbstractAvroStoreClient;
import com.linkedin.venice.client.store.AvroGenericStoreClientImpl;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.utils.ObjectMapperFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URLEncodedUtils;
Expand All @@ -29,14 +34,35 @@ public class DaVinciBlobFinder implements BlobFinder {
private static final String TYPE_BLOB_DISCOVERY = "blob_discovery";
private static final String ERROR_DISCOVERY_MESSAGE =
"Error finding DVC peers for blob transfer in store: %s, version: %d, partition: %d";
private final AbstractAvroStoreClient storeClient;
private final ClientConfig clientConfig;
private ConcurrentMap<String, AbstractAvroStoreClient> storeToClientMap;

public DaVinciBlobFinder(AbstractAvroStoreClient storeClient) {
this.storeClient = storeClient;
public DaVinciBlobFinder(ClientConfig clientConfig) {
this.storeToClientMap = new ConcurrentHashMap<>();
this.clientConfig = clientConfig;
}

/**
* Get the store client for the given store name
* @param storeName
* @return the store client
*/
AbstractAvroStoreClient getStoreClient(String storeName) {
if (!storeToClientMap.containsKey(storeName)) {
// update the config with respective store name
ClientConfig storeClientConfig = ClientConfig.cloneConfig(clientConfig).setStoreName(storeName);
AbstractAvroStoreClient storeLevelClient =
new AvroGenericStoreClientImpl<>(getTransportClient(storeClientConfig), false, storeClientConfig);
storeToClientMap.put(storeName, storeLevelClient);
LOGGER.info("Created store client for store: {}", storeName);
}
return storeToClientMap.get(storeName);
}

@Override
public BlobPeersDiscoveryResponse discoverBlobPeers(String storeName, int version, int partition) {
AbstractAvroStoreClient storeClient = getStoreClient(storeName);

String uri = buildUriForBlobDiscovery(storeName, version, partition);
CompletableFuture<BlobPeersDiscoveryResponse> futureResponse = CompletableFuture.supplyAsync(() -> {
try {
Expand Down Expand Up @@ -85,6 +111,8 @@ private BlobPeersDiscoveryResponse handleError(

@Override
public void close() throws IOException {
storeClient.close();
for (AbstractAvroStoreClient storeClient: storeToClientMap.values()) {
storeClient.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.venice.client.store.AbstractAvroStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.transport.TransportClientResponse;
import com.linkedin.venice.utils.ObjectMapperFactory;
import java.io.IOException;
Expand All @@ -20,6 +21,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand All @@ -35,8 +37,12 @@ public class DaVinciBlobFinderTest {

@BeforeMethod
public void setUp() {
ClientConfig clientConfig = ClientConfig.defaultGenericClientConfig(storeName);

storeClient = mock(AbstractAvroStoreClient.class);
daVinciBlobFinder = new DaVinciBlobFinder(storeClient);
daVinciBlobFinder = spy(new DaVinciBlobFinder(clientConfig));

Mockito.doReturn(storeClient).when(daVinciBlobFinder).getStoreClient(storeName);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,13 @@ public void testBlobP2PTransferAmongDVC() throws Exception {
String snapshotPath = RocksDBUtils.composeSnapshotDir(dvcPath1 + "/rocksdb", storeName + "_v1", i);
Assert.assertTrue(Files.exists(Paths.get(snapshotPath)));
}

for (int i = 0; i < 3; i++) {
String partitionPath2 = RocksDBUtils.composePartitionDbDir(dvcPath2 + "/rocksdb", storeName + "_v1", i);
Assert.assertTrue(Files.exists(Paths.get(partitionPath2)));
String snapshotPath2 = RocksDBUtils.composeSnapshotDir(dvcPath2 + "/rocksdb", storeName + "_v1", i);
Assert.assertFalse(Files.exists(Paths.get(snapshotPath2)));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import static com.linkedin.venice.ConfigKeys.OFFLINE_JOB_START_TIMEOUT_MS;
import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE;
import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_ENABLED;
import static com.linkedin.venice.client.store.ClientFactory.getTransportClient;
import static com.linkedin.venice.utils.TestUtils.assertCommand;

import com.linkedin.d2.balancer.D2Client;
Expand All @@ -18,7 +17,6 @@
import com.linkedin.venice.blobtransfer.BlobFinder;
import com.linkedin.venice.blobtransfer.BlobPeersDiscoveryResponse;
import com.linkedin.venice.blobtransfer.DaVinciBlobFinder;
import com.linkedin.venice.client.store.AvroGenericStoreClientImpl;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.controllerapi.ControllerClient;
Expand Down Expand Up @@ -65,7 +63,9 @@ public class TestBlobDiscovery {
private static final String INT_KEY_SCHEMA = "\"int\"";
private static final String INT_VALUE_SCHEMA = "\"int\"";
String clusterName;
String clusterName2;
String storeName;
String storeName2;
private VeniceMultiClusterWrapper multiClusterVenice;
private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper;
D2Client daVinciD2;
Expand Down Expand Up @@ -102,7 +102,15 @@ public void setUp() {
.collect(Collectors.joining(","));

clusterName = clusterNames[0];
clusterName2 = clusterNames[1];
storeName = Utils.getUniqueString("test-store");
storeName2 = Utils.getUniqueString("test-store");
LOGGER.info(
"Cluster 1 is {}, Cluster 2 is {}, Store 1 is {}, Store 2 is {}",
clusterName,
clusterName2,
storeName,
storeName2);

List<PubSubBrokerWrapper> pubSubBrokerWrappers = multiClusterVenice.getClusters()
.values()
Expand All @@ -112,6 +120,7 @@ public void setUp() {
Map<String, String> additionalPubSubProperties =
PubSubBrokerWrapper.getBrokerDetailsForClients(pubSubBrokerWrappers);

// create store 1 in cluster 1
try (ControllerClient parentControllerClient = new ControllerClient(clusterName, parentControllerURLs)) {
assertCommand(parentControllerClient.createNewStore(storeName, "venice-test", INT_KEY_SCHEMA, INT_VALUE_SCHEMA));

Expand Down Expand Up @@ -141,6 +150,36 @@ public void setUp() {
multiClusterVenice.getClusters().get(clusterName).refreshAllRouterMetaData();
}

// create store 2 in cluster 2
try (ControllerClient parentControllerClient = new ControllerClient(clusterName2, parentControllerURLs)) {
assertCommand(parentControllerClient.createNewStore(storeName2, "venice-test", INT_KEY_SCHEMA, INT_VALUE_SCHEMA));

PubSubProducerAdapterFactory pubSubProducerAdapterFactory = multiClusterVenice.getClusters()
.get(clusterName2)
.getPubSubBrokerWrapper()
.getPubSubClientsFactory()
.getProducerAdapterFactory();

VersionCreationResponse response = TestUtils.createVersionWithBatchData(
parentControllerClient,
storeName2,
INT_KEY_SCHEMA,
INT_VALUE_SCHEMA,
IntStream.range(0, 10).mapToObj(i -> new AbstractMap.SimpleEntry<>(i, 0)),
pubSubProducerAdapterFactory,
additionalPubSubProperties);

// Verify the data can be ingested by classical Venice before proceeding.
TestUtils.waitForNonDeterministicPushCompletion(
response.getKafkaTopic(),
parentControllerClient,
30,
TimeUnit.SECONDS);

makeSureSystemStoresAreOnline(parentControllerClient, storeName2);
multiClusterVenice.getClusters().get(clusterName2).refreshAllRouterMetaData();
}

VeniceProperties backendConfig =
new PropertyBuilder().put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath())
.put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB)
Expand All @@ -153,17 +192,24 @@ public void setUp() {
DaVinciConfig daVinciConfig = new DaVinciConfig();
daVinciD2 = D2TestUtils.getAndStartD2Client(multiClusterVenice.getZkServerWrapper().getAddress());

// create one DVC client which subscribes to all partitions for store 1 and store 2
try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory(
daVinciD2,
VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME,
new MetricsRepository(),
backendConfig)) {
// subscribe to all partitions for store 1
List<DaVinciClient<Integer, Object>> clients = new ArrayList<>();
DaVinciClient<Integer, Object> client = factory.getAndStartGenericAvroClient(storeName, daVinciConfig);
client.subscribeAll().get();
clients.add(client);
// subscribe to all partitions for store 2
DaVinciClient<Integer, Object> client2 = factory.getAndStartGenericAvroClient(storeName2, daVinciConfig);
client2.subscribeAll().get();
clients.add(client2);
// This is a very dumb and basic assertion that's only purpose is to get static analysis to not be mad
Assert.assertTrue(clients.get(0).getPartitionCount() > 0);
Assert.assertTrue(clients.get(1).getPartitionCount() > 0);
} catch (ExecutionException | InterruptedException e) {
throw new VeniceException(e);
}
Expand All @@ -181,17 +227,36 @@ public void testBlobDiscovery() throws Exception {
TestUtils.waitForNonDeterministicAssertion(1, TimeUnit.MINUTES, true, () -> {
veniceClusterWrapper.updateStore(storeName, new UpdateStoreQueryParams().setBlobTransferEnabled(true));
});
VeniceClusterWrapper veniceClusterWrapper2 = multiClusterVenice.getClusters().get(clusterName2);
TestUtils.waitForNonDeterministicAssertion(1, TimeUnit.MINUTES, true, () -> {
veniceClusterWrapper2.updateStore(storeName2, new UpdateStoreQueryParams().setBlobTransferEnabled(true));
});

// Even if the config here only use "storeName",
// the DaVinciBlobFinder should be able to discover both stores
ClientConfig clientConfig = new ClientConfig(storeName).setD2Client(daVinciD2)
.setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME)
.setMetricsRepository(new MetricsRepository());

BlobFinder daVinciBlobFinder =
new DaVinciBlobFinder(new AvroGenericStoreClientImpl<>(getTransportClient(clientConfig), false, clientConfig));
// Checking if the DVC created at the beginning is found by the DaVinciBlobFinder
// when try to find store1's peers.
BlobFinder daVinciBlobFinder = new DaVinciBlobFinder(clientConfig);
TestUtils.waitForNonDeterministicAssertion(1, TimeUnit.MINUTES, true, () -> {
BlobPeersDiscoveryResponse response = daVinciBlobFinder.discoverBlobPeers(storeName, 1, 1);
Assert.assertNotNull(response);
List<String> hostNames = response.getDiscoveryResult();
LOGGER.info("Discovered hosts: {} for store1 {}", hostNames, storeName);
Assert.assertNotNull(hostNames);
Assert.assertEquals(hostNames.size(), 1);
});

// Checking if the DVC created at the beginning is found by the DaVinciBlobFinder
// when try to find store2's peers.
TestUtils.waitForNonDeterministicAssertion(1, TimeUnit.MINUTES, true, () -> {
BlobPeersDiscoveryResponse response = daVinciBlobFinder.discoverBlobPeers(storeName2, 1, 1);
Assert.assertNotNull(response);
List<String> hostNames = response.getDiscoveryResult();
LOGGER.info("Discovered hosts: {} for store2 {}", hostNames, storeName2);
Assert.assertNotNull(hostNames);
Assert.assertEquals(hostNames.size(), 1);
});
Expand Down

0 comments on commit d512964

Please sign in to comment.