diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index b7f3e14bb3d..35d0e583443 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -320,7 +320,8 @@ public DaVinciBackend( aggVersionedBlobTransferStats, backendConfig.getRocksDBServerConfig().isRocksDBPlainTableFormatEnabled() ? BlobTransferTableFormat.PLAIN_TABLE - : BlobTransferTableFormat.BLOCK_BASED_TABLE); + : BlobTransferTableFormat.BLOCK_BASED_TABLE, + backendConfig.getBlobTransferPeersConnectivityFreshnessInSeconds()); } else { aggVersionedBlobTransferStats = null; blobTransferManager = null; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java index 5ee5620ac2d..9b09257bed3 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java @@ -351,6 +351,16 @@ public BlobTransferPartitionMetadata getTransferredSnapshotMetadata(String topic * @return the metadata for the blob transfer request */ public BlobTransferPartitionMetadata prepareMetadata(BlobTransferPayload blobTransferRequest) { + if (storageMetadataService == null || storeVersionStateSerializer == null) { + throw new VeniceException("StorageMetadataService or storeVersionStateSerializer is not initialized"); + } + + if (storageMetadataService.getStoreVersionState(blobTransferRequest.getTopicName()) == null + || storageMetadataService + .getLastOffset(blobTransferRequest.getTopicName(), blobTransferRequest.getPartition()) == null) { + throw new VeniceException("Cannot get store version state or offset record from storage metadata service."); + } + // prepare metadata StoreVersionState storeVersionState = storageMetadataService.getStoreVersionState(blobTransferRequest.getTopicName()); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtil.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtil.java index 76db7bedbfa..37652e27989 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtil.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtil.java @@ -40,7 +40,8 @@ public static BlobTransferManager getP2PBlobTransferManagerForDVCAndStart( int snapshotRetentionTimeInMin, int blobTransferMaxTimeoutInMin, AggVersionedBlobTransferStats aggVersionedBlobTransferStats, - BlobTransferUtils.BlobTransferTableFormat transferSnapshotTableFormat) { + BlobTransferUtils.BlobTransferTableFormat transferSnapshotTableFormat, + int peersConnectivityFreshnessInSeconds) { try { BlobSnapshotManager blobSnapshotManager = new BlobSnapshotManager( readOnlyStoreRepository, @@ -51,7 +52,11 @@ public static BlobTransferManager getP2PBlobTransferManagerForDVCAndStart( transferSnapshotTableFormat); BlobTransferManager manager = new NettyP2PBlobTransferManager( new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobTransferMaxTimeoutInMin, blobSnapshotManager), - new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService), + new NettyFileTransferClient( + p2pTransferClientPort, + baseDir, + storageMetadataService, + peersConnectivityFreshnessInSeconds), new DaVinciBlobFinder(clientConfig), baseDir, aggVersionedBlobTransferStats); @@ -84,7 +89,8 @@ public static BlobTransferManager getP2PBlobTransferManagerForServerAndSta int snapshotRetentionTimeInMin, int blobTransferMaxTimeoutInMin, AggVersionedBlobTransferStats aggVersionedBlobTransferStats, - BlobTransferUtils.BlobTransferTableFormat transferSnapshotTableFormat) { + BlobTransferUtils.BlobTransferTableFormat transferSnapshotTableFormat, + int peersConnectivityFreshnessInSeconds) { try { BlobSnapshotManager blobSnapshotManager = new BlobSnapshotManager( readOnlyStoreRepository, @@ -95,7 +101,11 @@ public static BlobTransferManager getP2PBlobTransferManagerForServerAndSta transferSnapshotTableFormat); BlobTransferManager manager = new NettyP2PBlobTransferManager( new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobTransferMaxTimeoutInMin, blobSnapshotManager), - new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService), + new NettyFileTransferClient( + p2pTransferClientPort, + baseDir, + storageMetadataService, + peersConnectivityFreshnessInSeconds), new ServerBlobFinder(customizedViewFuture), baseDir, aggVersionedBlobTransferStats); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/NettyP2PBlobTransferManager.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/NettyP2PBlobTransferManager.java index 5f520d5ef10..ec0477d5c6b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/NettyP2PBlobTransferManager.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/NettyP2PBlobTransferManager.java @@ -17,9 +17,12 @@ import java.io.InputStream; import java.time.Duration; import java.time.Instant; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -92,11 +95,10 @@ public CompletionStage get( } List discoverPeers = response.getDiscoveryResult(); - LOGGER - .info("Discovered peers {} for store {} version {} partition {}", discoverPeers, storeName, version, partition); + Set connectablePeers = getConnectableHosts(discoverPeers, storeName, version, partition); // 2: Process peers sequentially to fetch the blob - processPeersSequentially(discoverPeers, storeName, version, partition, tableFormat, resultFuture); + processPeersSequentially(connectablePeers, storeName, version, partition, tableFormat, resultFuture); return resultFuture; } @@ -121,7 +123,7 @@ public CompletionStage get( * - Success case: * 1. If the blob is successfully fetched from a peer, an InputStream of the blob is returned. * - * @param peers the list of peers to process + * @param uniqueConnectablePeers the set of peers to process * @param storeName the name of the store * @param version the version of the store * @param partition the partition of the store @@ -129,7 +131,7 @@ public CompletionStage get( * @param resultFuture the future to complete with the InputStream of the blob */ private void processPeersSequentially( - List peers, + Set uniqueConnectablePeers, String storeName, int version, int partition, @@ -142,11 +144,9 @@ private void processPeersSequentially( CompletableFuture chainOfPeersFuture = CompletableFuture.completedFuture(null); // Iterate through each peer and chain the futures - for (int currentPeerIndex = 0; currentPeerIndex < peers.size(); currentPeerIndex++) { - final int peerIndex = currentPeerIndex; + for (String chosenHost: uniqueConnectablePeers) { // Chain the next operation to the previous future chainOfPeersFuture = chainOfPeersFuture.thenCompose(v -> { - String chosenHost = peers.get(peerIndex).split("_")[0]; if (resultFuture.isDone()) { // If the result future is already completed, skip the current peer @@ -154,7 +154,13 @@ private void processPeersSequentially( } // Attempt to fetch the blob from the current peer asynchronously - LOGGER.info("Attempting to connect to host: {}", chosenHost); + LOGGER.info( + "Attempting to connect to host: {} for store {} version {} partition {} table format {}", + chosenHost, + storeName, + version, + partition, + tableFormat); return nettyClient.get(chosenHost, storeName, version, partition, tableFormat) .toCompletableFuture() @@ -244,4 +250,38 @@ private void updateBlobTransferFileReceiveStats(double transferTime, String stor e); } } + + /** + * Get the connectable hosts for the given storeName, version, and partition + * @param discoverPeers the list of discovered peers + * @param storeName the name of the store + * @param version the version of the store + * @param partition the partition of the store + * @return the set of unique connectable hosts + */ + private Set getConnectableHosts(List discoverPeers, String storeName, int version, int partition) { + // Extract unique hosts from the discovered peers + Set uniquePeers = discoverPeers.stream().map(peer -> peer.split("_")[0]).collect(Collectors.toSet()); + + LOGGER.info( + "Discovered {} unique peers store {} version {} partition {}, peers are {}", + uniquePeers.size(), + storeName, + version, + partition, + uniquePeers); + + // Get the connectable hosts for this store, version, and partition + Set connectablePeers = + nettyClient.getConnectableHosts((HashSet) uniquePeers, storeName, version, partition); + + LOGGER.info( + "Total {} unique connectable peers for store {} version {} partition {}, peers are {}", + connectablePeers.size(), + storeName, + version, + partition, + connectablePeers); + return connectablePeers; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java index d8584e53c6a..6a77da7aba1 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java @@ -1,8 +1,11 @@ package com.linkedin.davinci.blobtransfer.client; +import com.linkedin.alpini.base.concurrency.Executors; import com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferTableFormat; import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.venice.exceptions.VenicePeersConnectionException; +import com.linkedin.venice.utils.DaemonThreadFactory; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; @@ -18,8 +21,18 @@ import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.timeout.IdleStateHandler; import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -27,18 +40,33 @@ public class NettyFileTransferClient { private static final Logger LOGGER = LogManager.getLogger(NettyFileTransferClient.class); private static final int MAX_METADATA_CONTENT_LENGTH = 1024 * 1024 * 100; + private static final int REQUEST_TIMEOUT_IN_MINUTES = 5; + private static final int CONNECTION_TIMEOUT_IN_MINUTES = 1; EventLoopGroup workerGroup; Bootstrap clientBootstrap; private final String baseDir; private final int serverPort; + private final int peersConnectivityFreshnessInSeconds; private StorageMetadataService storageMetadataService; + private final ExecutorService hostConnectExecutorService; + private final ScheduledExecutorService connectTimeoutScheduler; + + // A map to contain the connectable and unconnectable hosts for saving effort on reconnection + // format: host -> timestamp of the last connection attempt + private VeniceConcurrentHashMap unconnectableHostsToTimestamp = new VeniceConcurrentHashMap<>(); + private VeniceConcurrentHashMap connectedHostsToTimestamp = new VeniceConcurrentHashMap<>(); // TODO 1: move tunable configs to a config class // TODO 2: consider either increasing worker threads or have a dedicated thread pool to handle requests. - public NettyFileTransferClient(int serverPort, String baseDir, StorageMetadataService storageMetadataService) { + public NettyFileTransferClient( + int serverPort, + String baseDir, + StorageMetadataService storageMetadataService, + int peersConnectivityFreshnessInSeconds) { this.baseDir = baseDir; this.serverPort = serverPort; this.storageMetadataService = storageMetadataService; + this.peersConnectivityFreshnessInSeconds = peersConnectivityFreshnessInSeconds; clientBootstrap = new Bootstrap(); workerGroup = new NioEventLoopGroup(); @@ -51,6 +79,127 @@ public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new HttpClientCodec()); } }); + this.hostConnectExecutorService = + Executors.newCachedThreadPool(new DaemonThreadFactory("Venice-BlobTransfer-Host-Connect-Executor-Service")); + this.connectTimeoutScheduler = Executors + .newSingleThreadScheduledExecutor(new DaemonThreadFactory("Venice-BlobTransfer-Client-Timeout-Checker")); + } + + /** + * A method to get the connectable hosts for the given store, version, and partition + * This method is only used for checking connectivity to the hosts. Channel is closed after checking. + * @param discoveredHosts the list of discovered hosts for the store, version, and partition, but not necessarily connectable + * @param storeName the store name + * @param version the version + * @param partition the partition + * @return the list of connectable hosts + */ + public Set getConnectableHosts( + HashSet discoveredHosts, + String storeName, + int version, + int partition) { + List> futures = new ArrayList<>(); + + // 1. Purge the host connectivity records that are stale + purgeStaleConnectivityRecords(unconnectableHostsToTimestamp); + purgeStaleConnectivityRecords(connectedHostsToTimestamp); + + // 2. Remove the hosts that are already marked as unconnectable + discoveredHosts.removeAll(unconnectableHostsToTimestamp.keySet()); + + // 3. Check if the discovered hosts are already connectable + Set connectableHostsResult = new HashSet<>(); + Iterator discoveredHostsIterator = discoveredHosts.iterator(); + while (discoveredHostsIterator.hasNext()) { + String host = discoveredHostsIterator.next(); + if (connectedHostsToTimestamp.containsKey(host)) { + connectableHostsResult.add(host); + discoveredHostsIterator.remove(); + } + } + + // 4. Checking connectivity of remaining host via connectToHost + for (String host: discoveredHosts) { + CompletableFuture future = CompletableFuture.supplyAsync(() -> { + try { + // Check if the host is connectable + Channel channel = connectToHost(host, storeName, version, partition); + if (channel != null && channel.isActive()) { + // Mark the host as connected + connectedHostsToTimestamp.put(host, System.currentTimeMillis()); + channel.close(); // this is only for checking connectivity no need to open it. + return host; + } else { + LOGGER.warn( + "Failed to connect to host: {} for store {} version {} partition {}", + host, + storeName, + version, + partition); + unconnectableHostsToTimestamp.put(host, System.currentTimeMillis()); + return null; + } + } catch (Exception e) { + LOGGER.warn( + "Failed to connect to host: {} for store {} version {} partition {}", + host, + storeName, + version, + partition, + e); + unconnectableHostsToTimestamp.put(host, System.currentTimeMillis()); + return null; + } + }, hostConnectExecutorService); + + futures.add(future); + } + + // Wait for all futures to complete + CompletableFuture allConnections = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + + connectTimeoutScheduler.schedule(() -> { + if (!allConnections.isDone()) { + for (CompletableFuture future: futures) { + if (!future.isDone()) { + future.complete(null); + } + } + allConnections.complete(null); + } + }, CONNECTION_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES); + + allConnections.join(); + + // 5. Collect only the successfully connected hosts + for (CompletableFuture future: futures) { + try { + String host = future.get(); + if (host != null) { + connectableHostsResult.add(host); + } + } catch (Exception e) { + LOGGER.error("Error getting result from future", e); + } + } + + return connectableHostsResult; + } + + /** + * Check the freshness of the connectivity records and purge the stale records + * @param hostsToTimestamp the map of hosts to the timestamp of the last connection attempt + */ + public void purgeStaleConnectivityRecords(VeniceConcurrentHashMap hostsToTimestamp) { + Iterator> iterator = hostsToTimestamp.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getValue() == null || System.currentTimeMillis() - entry.getValue() > TimeUnit.SECONDS + .toMillis(peersConnectivityFreshnessInSeconds)) { + iterator.remove(); + } + } } public CompletionStage get( @@ -62,6 +211,8 @@ public CompletionStage get( CompletionStage inputStream = new CompletableFuture<>(); try { // Connects to the remote host + // Must open a new connection for each request (per store per version per partition level), + // Otherwise response will be mixed up Channel ch = connectToHost(host, storeName, version, partition); // Request to get the blob file and metadata @@ -88,6 +239,16 @@ public CompletionStage get( requestedTableFormat)); // Send a GET request ch.writeAndFlush(prepareRequest(storeName, version, partition, requestedTableFormat)); + // Set a timeout, otherwise if the host is not responding, the future will never complete + connectTimeoutScheduler.schedule(() -> { + if (!inputStream.toCompletableFuture().isDone()) { + inputStream.toCompletableFuture() + .completeExceptionally( + new TimeoutException( + "Request timed out for store " + storeName + " version " + version + " partition " + partition + + " table format " + requestedTableFormat + " from host " + host)); + } + }, REQUEST_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES); } catch (Exception e) { if (!inputStream.toCompletableFuture().isCompletedExceptionally()) { inputStream.toCompletableFuture().completeExceptionally(e); @@ -98,6 +259,10 @@ public CompletionStage get( public void close() { workerGroup.shutdownGracefully(); + hostConnectExecutorService.shutdown(); + connectTimeoutScheduler.shutdown(); + unconnectableHostsToTimestamp.clear(); + connectedHostsToTimestamp.clear(); } private FullHttpRequest prepareRequest( diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java index 5e505a7bbd2..83fff7a40fb 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java @@ -9,6 +9,7 @@ import static com.linkedin.venice.ConfigKeys.BLOB_TRANSFER_MANAGER_ENABLED; import static com.linkedin.venice.ConfigKeys.BLOB_TRANSFER_MAX_CONCURRENT_SNAPSHOT_USER; import static com.linkedin.venice.ConfigKeys.BLOB_TRANSFER_MAX_TIMEOUT_IN_MIN; +import static com.linkedin.venice.ConfigKeys.BLOB_TRANSFER_PEERS_CONNECTIVITY_FRESHNESS_IN_SECONDS; import static com.linkedin.venice.ConfigKeys.BLOB_TRANSFER_SNAPSHOT_RETENTION_TIME_IN_MIN; import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH; import static com.linkedin.venice.ConfigKeys.DAVINCI_P2P_BLOB_TRANSFER_CLIENT_PORT; @@ -556,6 +557,7 @@ public class VeniceServerConfig extends VeniceClusterConfig { private final int snapshotRetentionTimeInMin; private final int maxConcurrentSnapshotUser; private final int blobTransferMaxTimeoutInMin; + private final int blobTransferPeersConnectivityFreshnessInSeconds; private final long blobTransferDisabledOffsetLagThreshold; private final int dvcP2pBlobTransferServerPort; private final int dvcP2pBlobTransferClientPort; @@ -608,6 +610,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map connectivityMap = new VeniceConcurrentHashMap<>(); + connectivityMap.put("oldestHost", System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(120)); + connectivityMap.put("newestHost", System.currentTimeMillis()); + client.purgeStaleConnectivityRecords(connectivityMap); + + Assert.assertEquals(connectivityMap.size(), 1); + } + /** * Host order: localhost, badhost1, badhost2 * Test when the localhost (good host) is the first host in the list, the nettyP2PBlobTransferManager should use the localhost, @@ -425,9 +441,9 @@ private void verifyFileTransferSuccess(OffsetRecord expectOffsetRecord) throws I Assert.assertTrue(Arrays.equals(Files.readAllBytes(file3), Files.readAllBytes(destFile3))); // Verify the metadata is retrieved - Mockito.verify(storageMetadataService, Mockito.times(1)) + Mockito.verify(storageMetadataService, Mockito.times(2)) .getLastOffset(TEST_STORE + "_v" + TEST_VERSION, TEST_PARTITION); - Mockito.verify(storageMetadataService, Mockito.times(1)).getStoreVersionState(TEST_STORE + "_v" + TEST_VERSION); + Mockito.verify(storageMetadataService, Mockito.times(2)).getStoreVersionState(TEST_STORE + "_v" + TEST_VERSION); // Verify the record is updated Mockito.verify(storageMetadataService, Mockito.times(1)) diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index c1f5420da42..15a94b55f5d 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -1827,6 +1827,10 @@ private ConfigKeys() { // this is a config to decide the max allowed offset lag to use kafka, even if the blob transfer is enable. public static final String BLOB_TRANSFER_DISABLED_OFFSET_LAG_THRESHOLD = "blob.transfer.disabled.offset.lag.threshold"; + // This is a freshness in sec to measure the connectivity between the peers, + // if the connectivity is not fresh, then retry the connection. + public static final String BLOB_TRANSFER_PEERS_CONNECTIVITY_FRESHNESS_IN_SECONDS = + "blob.transfer.peers.connectivity.freshness.in.seconds"; // Port used by peer-to-peer transfer service. It should be used by both server and client public static final String DAVINCI_P2P_BLOB_TRANSFER_SERVER_PORT = "davinci.p2p.blob.transfer.server.port"; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/DaVinciBlobFinder.java b/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/DaVinciBlobFinder.java index 10d6b6c0b80..d577fd635b0 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/DaVinciBlobFinder.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/DaVinciBlobFinder.java @@ -9,6 +9,7 @@ import com.linkedin.venice.client.store.AbstractAvroStoreClient; import com.linkedin.venice.client.store.AvroGenericStoreClientImpl; import com.linkedin.venice.client.store.ClientConfig; +import com.linkedin.venice.exceptions.VenicePeersNotFoundException; import com.linkedin.venice.utils.ObjectMapperFactory; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import java.io.IOException; @@ -66,6 +67,16 @@ public BlobPeersDiscoveryResponse discoverBlobPeers(String storeName, int versio CompletableFuture futureResponse = CompletableFuture.supplyAsync(() -> { try { byte[] responseBody = (byte[]) storeClient.getRaw(uri).get(3, TimeUnit.SECONDS); + if (responseBody == null) { + return handleError( + ERROR_DISCOVERY_MESSAGE, + storeName, + version, + partition, + new VenicePeersNotFoundException( + "The response body is null for store: " + storeName + ", version: " + version + ", partition: " + + partition)); + } ObjectMapper mapper = ObjectMapperFactory.getInstance(); return mapper.readValue(responseBody, BlobPeersDiscoveryResponse.class); } catch (Exception e) { diff --git a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java index 15fe2060505..10463363783 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java @@ -491,7 +491,8 @@ private List createServices() { aggVersionedBlobTransferStats, serverConfig.getRocksDBServerConfig().isRocksDBPlainTableFormatEnabled() ? BlobTransferTableFormat.PLAIN_TABLE - : BlobTransferTableFormat.BLOCK_BASED_TABLE); + : BlobTransferTableFormat.BLOCK_BASED_TABLE, + serverConfig.getBlobTransferPeersConnectivityFreshnessInSeconds()); } else { aggVersionedBlobTransferStats = null; blobTransferManager = null;