Skip to content

Commit

Permalink
[producer][samza] Reinitalize transport client if a store has moved (…
Browse files Browse the repository at this point in the history
…resulting in store not found errors)

This is to allow store moves to not break system producers.
  • Loading branch information
ZacAttack committed Feb 1, 2024
1 parent bf13822 commit 8f3c92c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.venice.client.store.transport.TransportClient;
import com.linkedin.venice.client.store.transport.TransportClientResponse;
import com.linkedin.venice.exceptions.ErrorType;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.routerapi.HybridStoreQuotaStatusResponse;
Expand Down Expand Up @@ -41,7 +42,8 @@ public RouterBasedHybridStoreQuotaMonitor(
TransportClient transportClient,
String storeName,
Version.PushType pushType,
String topicName) {
String topicName,
TransportClientReinitProvider reinitProvider) {
final String requestPath;
if (Version.PushType.STREAM.equals(pushType)) {
requestPath = buildStreamHybridStoreQuotaRequestPath(storeName);
Expand All @@ -54,7 +56,7 @@ public RouterBasedHybridStoreQuotaMonitor(
+ " can monitor hybrid store quota.");
}
executor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("RouterBasedHybridQuotaMonitor"));
hybridQuotaMonitorTask = new HybridQuotaMonitorTask(transportClient, storeName, requestPath, this);
hybridQuotaMonitorTask = new HybridQuotaMonitorTask(transportClient, storeName, requestPath, this, reinitProvider);
}

public void start() {
Expand Down Expand Up @@ -87,20 +89,24 @@ private static class HybridQuotaMonitorTask implements Runnable, Closeable {

private final AtomicBoolean isRunning;
private final String storeName;
private final TransportClient transportClient;
private TransportClient transportClient;

private TransportClientReinitProvider reinitProvider;
private final String requestPath;
private final RouterBasedHybridStoreQuotaMonitor hybridStoreQuotaMonitorService;

public HybridQuotaMonitorTask(
TransportClient transportClient,
String storeName,
String requestPath,
RouterBasedHybridStoreQuotaMonitor hybridStoreQuotaMonitorService) {
RouterBasedHybridStoreQuotaMonitor hybridStoreQuotaMonitorService,
TransportClientReinitProvider reinitProvider) {
this.transportClient = transportClient;
this.storeName = storeName;
this.requestPath = requestPath;
this.hybridStoreQuotaMonitorService = hybridStoreQuotaMonitorService;
this.isRunning = new AtomicBoolean(true);
this.reinitProvider = reinitProvider;
}

@Override
Expand All @@ -114,6 +120,13 @@ public void run() {
HybridStoreQuotaStatusResponse quotaStatusResponse =
mapper.readValue(response.getBody(), HybridStoreQuotaStatusResponse.class);
if (quotaStatusResponse.isError()) {
if (quotaStatusResponse.getErrorType().equals(ErrorType.STORE_NOT_FOUND)) {
LOGGER.warn("Store not found, reinitializing client! Error: {}", quotaStatusResponse.getError());
// TODO: It'd make sense to call shutdown on the transport client, but it's a shared object so that's
// a bit dangerous.
transportClient = reinitProvider.apply();
continue;
}
LOGGER.error("Router was not able to get hybrid quota status: {}", quotaStatusResponse.getError());
continue;
}
Expand Down Expand Up @@ -144,4 +157,9 @@ public void close() {
isRunning.set(false);
}
}

@FunctionalInterface
public interface TransportClientReinitProvider {
TransportClient apply();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ public synchronized void start() {
this.isStarted = true;

final TransportClient transportClient;
RouterBasedHybridStoreQuotaMonitor.TransportClientReinitProvider reinitProvider;
if (discoveryUrl.isPresent()) {
this.controllerClient =
ControllerClientFactory.discoverAndConstructControllerClient(storeName, discoveryUrl.get(), sslFactory, 1);
Expand All @@ -448,10 +449,11 @@ public synchronized void start() {
}

if (sslFactory.isPresent()) {
transportClient = new HttpsTransportClient(discoveryUrl.get(), 0, 0, false, sslFactory.get());
reinitProvider = () -> new HttpsTransportClient(discoveryUrl.get(), 0, 0, false, sslFactory.get());
} else {
transportClient = new HttpTransportClient(discoveryUrl.get(), 0, 0);
reinitProvider = () -> new HttpTransportClient(discoveryUrl.get(), 0, 0);
}
transportClient = reinitProvider.apply();
} else {
this.primaryControllerColoD2Client = getStartedD2Client(primaryControllerColoD2ZKHost);
this.childColoD2Client = getStartedD2Client(veniceChildD2ZkHost);
Expand All @@ -461,6 +463,7 @@ public synchronized void start() {
() -> D2ControllerClient
.discoverCluster(primaryControllerColoD2Client, primaryControllerD2ServiceName, this.storeName),
10);

String clusterName = discoveryResponse.getCluster();
LOGGER.info("Found cluster: {} for store: {}", clusterName, storeName);

Expand Down Expand Up @@ -496,6 +499,15 @@ public synchronized void start() {
primaryControllerColoD2Client,
sslFactory);
transportClient = new D2TransportClient(discoveryResponse.getD2Service(), childColoD2Client);

reinitProvider = () -> {
D2ServiceDiscoveryResponse d2DiscoveryResponse = (D2ServiceDiscoveryResponse) controllerRequestWithRetry(
() -> D2ControllerClient
.discoverCluster(primaryControllerColoD2Client, primaryControllerD2ServiceName, this.storeName),
10);
LOGGER.info("Found cluster: {} for store: {}", clusterName, storeName);
return new D2TransportClient(d2DiscoveryResponse.getD2Service(), childColoD2Client);
};
}

// Request all the necessary info from Venice Controller
Expand Down Expand Up @@ -568,8 +580,8 @@ public synchronized void start() {

if ((pushType.equals(Version.PushType.STREAM) || pushType.equals(Version.PushType.STREAM_REPROCESSING))
&& hybridStoreDiskQuotaEnabled) {
hybridStoreQuotaMonitor =
Optional.of(new RouterBasedHybridStoreQuotaMonitor(transportClient, storeName, pushType, topicName));
hybridStoreQuotaMonitor = Optional
.of(new RouterBasedHybridStoreQuotaMonitor(transportClient, storeName, pushType, topicName, reinitProvider));
hybridStoreQuotaMonitor.get().start();
}
}
Expand Down

0 comments on commit 8f3c92c

Please sign in to comment.