Skip to content

Commit

Permalink
address code review
Browse files Browse the repository at this point in the history
  • Loading branch information
jingy-li committed Feb 6, 2025
1 parent d512964 commit 1ac9187
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,14 @@ public class BlobTransferUtil {

/**
* Get a P2P blob transfer manager for DaVinci Client and start it.
* @param p2pTransferPort, the port used by the P2P transfer server and client
* @param p2pTransferServerPort, the port used by the P2P transfer server
* @param p2pTransferClientPort, the port used by the P2P transfer client
* @param baseDir, the base directory of the underlying storage
* @param clientConfig, the client config to start up a transport client
* @param storageMetadataService, the storage metadata service
* @return the blob transfer manager
* @throws Exception
*/
public static BlobTransferManager<Void> getP2PBlobTransferManagerForDVCAndStart(
int p2pTransferPort,
String baseDir,
ClientConfig clientConfig,
StorageMetadataService storageMetadataService,
ReadOnlyStoreRepository readOnlyStoreRepository,
StorageEngineRepository storageEngineRepository,
int maxConcurrentSnapshotUser,
int snapshotRetentionTimeInMin,
int blobTransferMaxTimeoutInMin,
AggVersionedBlobTransferStats aggVersionedBlobTransferStats,
BlobTransferUtils.BlobTransferTableFormat transferSnapshotTableFormat) {
return getP2PBlobTransferManagerForDVCAndStart(
p2pTransferPort,
p2pTransferPort,
baseDir,
clientConfig,
storageMetadataService,
readOnlyStoreRepository,
storageEngineRepository,
maxConcurrentSnapshotUser,
snapshotRetentionTimeInMin,
blobTransferMaxTimeoutInMin,
aggVersionedBlobTransferStats,
transferSnapshotTableFormat);
}

public static BlobTransferManager<Void> getP2PBlobTransferManagerForDVCAndStart(
int p2pTransferServerPort,
int p2pTransferClientPort,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class ClientFactory {
private static Function<ClientConfig, TransportClient> configToTransportClientProviderForTests = null;

// Visible for testing
static void setUnitTestMode() {
public static void setUnitTestMode() {
unitTestMode = true;
}

Expand All @@ -33,7 +33,7 @@ static void resetUnitTestMode() {

// Allow for overriding with mock D2Client for unit tests. The caller must release the object to prevent side-effects
// VisibleForTesting
static void setTransportClientProvider(Function<ClientConfig, TransportClient> transportClientProvider) {
public static void setTransportClientProvider(Function<ClientConfig, TransportClient> transportClientProvider) {
if (!unitTestMode) {
throw new VeniceUnsupportedOperationException("setTransportClientProvider in non-unit-test-mode");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,14 @@ public DaVinciBlobFinder(ClientConfig clientConfig) {
* @return the store client
*/
AbstractAvroStoreClient getStoreClient(String storeName) {
if (!storeToClientMap.containsKey(storeName)) {
return storeToClientMap.computeIfAbsent(storeName, k -> {
// update the config with respective store name
ClientConfig storeClientConfig = ClientConfig.cloneConfig(clientConfig).setStoreName(storeName);
AbstractAvroStoreClient storeLevelClient =
new AvroGenericStoreClientImpl<>(getTransportClient(storeClientConfig), false, storeClientConfig);
storeToClientMap.put(storeName, storeLevelClient);
LOGGER.info("Created store client for store: {}", storeName);
}
return storeToClientMap.get(storeName);
return storeLevelClient;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.venice.client.store.AbstractAvroStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.client.store.transport.TransportClient;
import com.linkedin.venice.client.store.transport.TransportClientResponse;
import com.linkedin.venice.utils.ObjectMapperFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.mockito.ArgumentCaptor;
Expand All @@ -42,11 +45,13 @@ public void setUp() {
storeClient = mock(AbstractAvroStoreClient.class);
daVinciBlobFinder = spy(new DaVinciBlobFinder(clientConfig));

Mockito.doReturn(storeClient).when(daVinciBlobFinder).getStoreClient(storeName);
Mockito.doReturn(storeName).when(storeClient).getStoreName();
}

@Test
public void testDiscoverBlobPeers_Success() {
Mockito.doReturn(storeClient).when(daVinciBlobFinder).getStoreClient(storeName);

String responseBodyJson =
"{\"error\":false,\"errorMessage\":\"\",\"discoveryResult\":[\"host1\",\"host2\",\"host3\"]}";
byte[] responseBody = responseBodyJson.getBytes(StandardCharsets.UTF_8);
Expand All @@ -63,6 +68,7 @@ public void testDiscoverBlobPeers_Success() {

@Test
public void testDiscoverBlobPeers_CallsTransportClientWithCorrectURI() {
Mockito.doReturn(storeClient).when(daVinciBlobFinder).getStoreClient(storeName);
String responseBodyJson =
"{\"error\":false,\"errorMessage\":\"\",\"discoveryResult\":[\"host1\",\"host2\",\"host3\"]}";
byte[] responseBody = responseBodyJson.getBytes(StandardCharsets.UTF_8);
Expand All @@ -84,6 +90,8 @@ public void testDiscoverBlobPeers_CallsTransportClientWithCorrectURI() {

@Test
public void testDiscoverBlobPeers_ContentDeserializationError() throws Exception {
Mockito.doReturn(storeClient).when(daVinciBlobFinder).getStoreClient(storeName);

String responseBodyJson = "{\"error\":true,\"errorMessage\":\"some error\",\"discoveryResult\":[]}";
byte[] responseBody = responseBodyJson.getBytes(StandardCharsets.UTF_8);
TransportClientResponse mockResponse = new TransportClientResponse(0, null, responseBody);
Expand All @@ -104,6 +112,8 @@ public void testDiscoverBlobPeers_ContentDeserializationError() throws Exception

@Test
public void testDiscoverBlobPeers_ClientWithIncorrectUri() {
Mockito.doReturn(storeClient).when(daVinciBlobFinder).getStoreClient(storeName);

CompletableFuture<byte[]> futureResponse = new CompletableFuture<>();
futureResponse.completeExceptionally(new RuntimeException("Test Exception"));
when(storeClient.getRaw(anyString())).thenReturn(futureResponse);
Expand All @@ -115,4 +125,34 @@ public void testDiscoverBlobPeers_ClientWithIncorrectUri() {
response.getErrorMessage(),
"Error finding DVC peers for blob transfer in store: testStore, version: 1, partition: 1");
}

@Test
public void testGetStoreClient() {
// set up the transport client provider used to initialize the store client
TransportClient transportClient1 = mock(TransportClient.class);
TransportClient transportClient2 = mock(TransportClient.class);
Function<ClientConfig, TransportClient> clientConfigTransportClientFunction = (clientConfig) -> {
if (clientConfig.getStoreName().equals(storeName)) {
return transportClient1;
} else if (clientConfig.getStoreName().equals("storeName2")) {
return transportClient2;
} else {
// Create TransportClient the regular way
return null;
}
};
ClientFactory.setUnitTestMode();
ClientFactory.setTransportClientProvider(clientConfigTransportClientFunction);

// ClientConfig is initialized with storeName
AbstractAvroStoreClient storeClient = daVinciBlobFinder.getStoreClient(storeName);
Assert.assertNotNull(storeClient);
Assert.assertEquals(storeClient.getStoreName(), storeName);

// Even if the daVinciBlobFinder is initialized at the beginning with "storeName", the getStoreClient
// method should be able to return a store client for "storeName2"
AbstractAvroStoreClient storeClient2 = daVinciBlobFinder.getStoreClient("storeName2");
Assert.assertNotNull(storeClient2);
Assert.assertEquals(storeClient2.getStoreName(), "storeName2");
}
}

0 comments on commit 1ac9187

Please sign in to comment.