-
Notifications
You must be signed in to change notification settings - Fork 91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dvc][server] verifying discovered host connectivity before initiating blob transfer request #1534
base: main
Are you sure you want to change the base?
Conversation
…g blob transfer request
@@ -92,11 +95,10 @@ public CompletionStage<InputStream> get( | |||
} | |||
|
|||
List<String> discoverPeers = response.getDiscoveryResult(); | |||
LOGGER | |||
.info("Discovered peers {} for store {} version {} partition {}", discoverPeers, storeName, version, partition); | |||
Set<String> uniqueConnectablePeers = getUniqueConnectableHosts(discoverPeers, storeName, version, partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So even for a single partition, the backend can return duplicate peers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read more codes and realized that this function is used to discover connectable hosts concurrently, so why do you put unique
keyword into the function name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even for a single partition, the router may return the same host name but with different tags (for example, host1_app1_i000, host1_app1_i001). Therefore, we changed the list to a set to ensure uniqueness.
However, the term 'unique' seems misleading, so I will remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emm..
For different tags, I assume DaVinci will use different listening ports for blob transfer, right?
Different instances in the same node can host different partitions in a sharded env.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory, if a single host is running two DVC instances, it should be able to perform the transfer, as long as their ports are different. This is how we conduct our integration tests.
However, in practice, this is not the case. We default both the client and server ports to 27017, so if instance 1 is running on port 27017 and instance 2 on port 27018, we will only be able to connect to instance 1 when another host try to send connect request.
Here is code in global-config, da-vinci.src:
<property name="daVinciClient.davinci.p2p.blob.transfer.client.port" value="27017"/> <property name="daVinciClient.davinci.p2p.blob.transfer.server.port" value="27017"/>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is certainly an issue and we can follow up offline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have production case where they deploy different tag to the same host? There should be way to set port by instance id per JVM but I wonder if we should do it or not in the first phase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We indeed observe this case in flip testing.
Synced with @gaojieliu offline, for short term, we ask the client to reserve the port 27017 before they onboard to blob transfer, for long term, we need to update host address and the listener(server-side) port num in the push status system. Then when trying to connect to server, the port info is also obtained from the peer finding.
...i-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java
Outdated
Show resolved
Hide resolved
...i-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java
Show resolved
Hide resolved
...i-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java
Outdated
Show resolved
Hide resolved
...i-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java
Outdated
Show resolved
Hide resolved
…shness timeframe. 2. Add null checker for peer finding result. 3. rename some variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some more minor comments.
...i-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java
Outdated
Show resolved
Hide resolved
...i-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java
Outdated
Show resolved
Hide resolved
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
|
||
|
||
public class NettyFileTransferClient { | ||
private static final Logger LOGGER = LogManager.getLogger(NettyFileTransferClient.class); | ||
private static final int MAX_METADATA_CONTENT_LENGTH = 1024 * 1024 * 100; | ||
private static final int TIMEOUT_IN_MINUTES = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5 mins seems to be long, and typically, will the connect request take more than 1min?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it is an extremely rare occurrence for a host to send a response but the receiving host not to receive anything, the receiver's thread could potentially remain in a waiting state indefinitely. To address this, we’ve added a timeout for such rare cases. Rather than setting it to a typical transfer time, I’ve config the timeout with a larger buffer to prevent the "waiting forever" scenario.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The static timeout value is used for both the connection timeout and the transfer complete timeout.
The connection timeout has been updated to 1 minute, while the input stream timeout is now set to 5 minutes in the get
method.
[dvc][server] verifying discovered host connectivity before initiating blob transfer request
During testing on the dark hosts, we observed a few issues:
Previously, the per-partition bootstrap process worked as follows: it finds peers (e.g., 1500+ peers) and attempts to connect to each one. If the connection is successful, it sends the bootstrap request; if the connection fails, it moves on to the next candidate host.
When there are many partitions, such as 200, this results in 200 * 1500 connection attempts. This process is unnecessarily time-consuming (waiting time = reject response time * 200 * 1500). Some partitions share the same peer-finding results, leading to redundant connection attempts to unreachable hosts. To address this, this PR introduces two map (a connectable host map and an unconnectable host map) to avoid redundant reconnections. The connectivity status need to be within certain preset config, if exceed the freshness, Netty client will retry to check the connectivity.
During testing, we encountered a case where a partition bootstrap thread stalled after sending the connection request. It was found that the sender had already sent a reject response for the connection, but the receiver did not receive this response at all. As a result, the thread continued waiting for a response, and the partition was never successfully bootstrapped, whether via blob transfer or Kafka ingestion. To resolve this, this PR introduces a timeout to ensure that if the blob transfer or host connection is not completed, the process will fall back to Kafka ingestion instead of waiting indefinitely.
How was this PR tested?
Tested on dark hosts.
Does this PR introduce any user-facing changes?