Skip to content

Commit

Permalink
[da-vinci][common] DVC consumer for materialized view (batch only) (l…
Browse files Browse the repository at this point in the history
…inkedin#1466)

* [da-vinci][common] DVC consumer for materialized view (batch only)

Limiting the change to support batch only materialized view to keep the PR small. Hybrid DVC consumer support and related features such as heartbeat will be added in a separate PR

New DaVinciClientFactory APIs for creating DVC for a given view.

Defined a new "storeName" rule for views to be used for metrics reporting and DVC client. See VeniceView.getViewStoreName for details.

Introduced NativeMetadataRepositoryViewAdapter and HelixReadOnlyStoreViewConfigRepositoryAdapter to provide read-only interface to access various store metadata for both regular Venice stores and store views with the VeniceView.getViewStoreName.

There is some issue with chunking support on the read path. When chunking is enabled the view topic keys are doubly wrapped by serializeNonChunkedKey. This is because during NR pass-through mode the view writer is essentially trying to chunk the chunk. The tactical fix now is to unwrap the key with chunked suffix bytes appended and pass it to the view writer to be wrapped again and sent to the correct partition. This only works with non-large messages. I.e. chunking is enabled but nothing is actually getting chunked. Large messages will require a proper fix.
  • Loading branch information
xunyin8 authored Jan 31, 2025
1 parent 21327c7 commit df28d1e
Show file tree
Hide file tree
Showing 32 changed files with 1,363 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,22 @@ public CompletableFuture<Void> subscribe(ComplementSet<Integer> partitions) {
return subscribe(partitions, Optional.empty());
}

private Version getCurrentVersion() {
return backend.getVeniceCurrentVersion(storeName);
}

private Version getLatestNonFaultyVersion() {
return backend.getVeniceLatestNonFaultyVersion(storeName, faultyVersionSet);
}

synchronized CompletableFuture<Void> subscribe(
ComplementSet<Integer> partitions,
Optional<Version> bootstrapVersion) {
if (daVinciCurrentVersion == null) {
setDaVinciCurrentVersion(new VersionBackend(backend, bootstrapVersion.orElseGet(() -> {
Version version = backend.getVeniceCurrentVersion(storeName);
Version version = getCurrentVersion();
if (version == null) {
version = backend.getVeniceLatestNonFaultyVersion(storeName, faultyVersionSet);
version = getLatestNonFaultyVersion();
}
if (version == null) {
throw new VeniceException("Cannot subscribe to an empty store, storeName=" + storeName);
Expand Down Expand Up @@ -218,9 +226,9 @@ synchronized void trySubscribeDaVinciFutureVersion() {
return;
}

Version veniceCurrentVersion = backend.getVeniceCurrentVersion(storeName);
Version veniceCurrentVersion = getCurrentVersion();
// Latest non-faulty store version in Venice store.
Version veniceLatestVersion = backend.getVeniceLatestNonFaultyVersion(storeName, faultyVersionSet);
Version veniceLatestVersion = getLatestNonFaultyVersion();
Version targetVersion;
// Make sure current version in the store config has highest priority.
if (veniceCurrentVersion != null
Expand All @@ -246,7 +254,7 @@ synchronized void trySubscribeDaVinciFutureVersion() {
* failure.
*/
synchronized void validateDaVinciAndVeniceCurrentVersion() {
Version veniceCurrentVersion = backend.getVeniceCurrentVersion(storeName);
Version veniceCurrentVersion = getCurrentVersion();
if (veniceCurrentVersion != null && daVinciCurrentVersion != null) {
if (veniceCurrentVersion.getNumber() > daVinciCurrentVersion.getVersion().getNumber()
&& faultyVersionSet.contains(veniceCurrentVersion.getNumber())) {
Expand Down Expand Up @@ -294,7 +302,7 @@ synchronized void tryDeleteInvalidDaVinciFutureVersion() {
synchronized void trySwapDaVinciCurrentVersion(Throwable failure) {
if (daVinciFutureVersion != null) {
// Fetch current version from store config.
Version veniceCurrentVersion = backend.getVeniceCurrentVersion(storeName);
Version veniceCurrentVersion = getCurrentVersion();
if (veniceCurrentVersion == null) {
LOGGER.warn("Failed to retrieve current version of store: " + storeName);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ public synchronized void start() {
if (isReady()) {
return;
}
logger.info("Starting client, storeName=" + getStoreName());
logger.info("Starting client, storeName={}", getStoreName());
VeniceConfigLoader configLoader = buildVeniceConfig();
Optional<ObjectCacheConfig> cacheConfig = Optional.ofNullable(daVinciConfig.getCacheConfig());
initBackend(clientConfig, configLoader, managedClients, icProvider, cacheConfig, recordTransformerConfig);
Expand All @@ -790,7 +790,6 @@ public synchronized void start() {
if (daVinciConfig.isCacheEnabled()) {
cacheBackend = getBackend().getObjectCache();
}

storeBackend = getBackend().getStoreOrThrow(getStoreName());
if (managedClients.isPresent()) {
storeBackend.setManaged(daVinciConfig.isManaged());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.views.VeniceView;
import io.tehuti.metrics.MetricsRepository;
import java.io.Closeable;
import java.util.ArrayList;
Expand Down Expand Up @@ -162,6 +163,7 @@ private Class getClientClass(DaVinciConfig daVinciConfig, boolean isSpecific) {
public <K, V> DaVinciClient<K, V> getGenericAvroClient(String storeName, DaVinciConfig config) {
return getClient(
storeName,
null,
config,
null,
new GenericDaVinciClientConstructor<>(),
Expand All @@ -172,6 +174,7 @@ public <K, V> DaVinciClient<K, V> getGenericAvroClient(String storeName, DaVinci
public <K, V> DaVinciClient<K, V> getGenericAvroClient(String storeName, DaVinciConfig config, Class<V> valueClass) {
return getClient(
storeName,
null,
config,
valueClass,
new GenericDaVinciClientConstructor<>(),
Expand All @@ -183,6 +186,7 @@ public <K, V> DaVinciClient<K, V> getGenericAvroClient(String storeName, DaVinci
public <K, V> DaVinciClient<K, V> getAndStartGenericAvroClient(String storeName, DaVinciConfig config) {
return getClient(
storeName,
null,
config,
null,
new GenericDaVinciClientConstructor<>(),
Expand All @@ -196,8 +200,66 @@ public <K, V> DaVinciClient<K, V> getAndStartGenericAvroClient(
Class<V> valueClass) {
return getClient(
storeName,
null,
config,
valueClass,
new GenericDaVinciClientConstructor<>(),
getClientClass(config, false),
true);
}

@Override
public <K, V extends SpecificRecord> DaVinciClient<K, V> getSpecificAvroClient(
String storeName,
DaVinciConfig config,
Class<V> valueClass) {
return getClient(
storeName,
null,
config,
valueClass,
new SpecificDaVinciClientConstructor<>(),
getClientClass(config, true),
false);
}

@Override
public <K, V extends SpecificRecord> DaVinciClient<K, V> getAndStartSpecificAvroClient(
String storeName,
DaVinciConfig config,
Class<V> valueClass) {
return getClient(
storeName,
null,
config,
valueClass,
new SpecificDaVinciClientConstructor<>(),
getClientClass(config, true),
true);
}

@Override
public <K, V> DaVinciClient<K, V> getGenericAvroClient(String storeName, String viewName, DaVinciConfig config) {
return getClient(
storeName,
viewName,
config,
null,
new GenericDaVinciClientConstructor<>(),
getClientClass(config, false),
false);
}

@Override
public <K, V> DaVinciClient<K, V> getAndStartGenericAvroClient(
String storeName,
String viewName,
DaVinciConfig config) {
return getClient(
storeName,
viewName,
config,
null,
new GenericDaVinciClientConstructor<>(),
getClientClass(config, false),
true);
Expand All @@ -206,10 +268,12 @@ public <K, V> DaVinciClient<K, V> getAndStartGenericAvroClient(
@Override
public <K, V extends SpecificRecord> DaVinciClient<K, V> getSpecificAvroClient(
String storeName,
String viewName,
DaVinciConfig config,
Class<V> valueClass) {
return getClient(
storeName,
viewName,
config,
valueClass,
new SpecificDaVinciClientConstructor<>(),
Expand All @@ -220,10 +284,12 @@ public <K, V extends SpecificRecord> DaVinciClient<K, V> getSpecificAvroClient(
@Override
public <K, V extends SpecificRecord> DaVinciClient<K, V> getAndStartSpecificAvroClient(
String storeName,
String viewName,
DaVinciConfig config,
Class<V> valueClass) {
return getClient(
storeName,
viewName,
config,
valueClass,
new SpecificDaVinciClientConstructor<>(),
Expand Down Expand Up @@ -290,29 +356,31 @@ public DaVinciClient<K, V> apply(

protected synchronized DaVinciClient getClient(
String storeName,
String viewName,
DaVinciConfig config,
Class valueClass,
DaVinciClientConstructor clientConstructor,
Class clientClass,
boolean startClient) {
String internalStoreName = viewName == null ? storeName : VeniceView.getViewStoreName(storeName, viewName);
if (closed) {
throw new VeniceException("Unable to get a client from a closed factory, storeName=" + storeName);
throw new VeniceException("Unable to get a client from a closed factory, storeName=" + internalStoreName);
}

DaVinciConfig originalConfig = configs.computeIfAbsent(storeName, k -> config);
DaVinciConfig originalConfig = configs.computeIfAbsent(internalStoreName, k -> config);
if (originalConfig.isManaged() != config.isManaged()) {
throw new VeniceException(
"Managed flag conflict" + ", storeName=" + storeName + ", original=" + originalConfig.isManaged()
"Managed flag conflict" + ", storeName=" + internalStoreName + ", original=" + originalConfig.isManaged()
+ ", requested=" + config.isManaged());
}

if (originalConfig.getStorageClass() != config.getStorageClass()) {
throw new VeniceException(
"Storage class conflict" + ", storeName=" + storeName + ", original=" + originalConfig.getStorageClass()
+ ", requested=" + config.getStorageClass());
"Storage class conflict" + ", storeName=" + internalStoreName + ", original="
+ originalConfig.getStorageClass() + ", requested=" + config.getStorageClass());
}

ClientConfig clientConfig = new ClientConfig(storeName).setD2Client(d2Client)
ClientConfig clientConfig = new ClientConfig(internalStoreName).setD2Client(d2Client)
.setD2ServiceName(clusterDiscoveryD2ServiceName)
.setMetricsRepository(metricsRepository)
.setSpecificValueClass(valueClass);
Expand All @@ -325,12 +393,12 @@ protected synchronized DaVinciClient getClient(
isolatedClients.add(client);
} else {
client = sharedClients.computeIfAbsent(
storeName,
internalStoreName,
k -> clientConstructor.apply(config, clientConfig, backendConfig, managedClients, icProvider));

if (!clientClass.isInstance(client)) {
throw new VeniceException(
"Client type conflict" + ", storeName=" + storeName + ", originalClientClass=" + client.getClass()
"Client type conflict" + ", storeName=" + internalStoreName + ", originalClientClass=" + client.getClass()
+ ", requestedClientClass=" + clientClass);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,20 @@ <K, V extends SpecificRecord> DaVinciClient<K, V> getAndStartSpecificAvroClient(
String storeName,
DaVinciConfig config,
Class<V> valueClass);

<K, V> DaVinciClient<K, V> getGenericAvroClient(String storeName, String viewName, DaVinciConfig config);

<K, V> DaVinciClient<K, V> getAndStartGenericAvroClient(String storeName, String viewName, DaVinciConfig config);

<K, V extends SpecificRecord> DaVinciClient<K, V> getSpecificAvroClient(
String storeName,
String viewName,
DaVinciConfig config,
Class<V> valueClass);

<K, V extends SpecificRecord> DaVinciClient<K, V> getAndStartSpecificAvroClient(
String storeName,
String viewName,
DaVinciConfig config,
Class<V> valueClass);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3377,9 +3377,11 @@ protected void processMessageAndMaybeProduceToKafka(
// Write to views
if (hasViewWriters()) {
Put newPut = writeComputeResultWrapper.getNewPut();
// keys will be serialized with chunk suffix during pass-through mode in L/F NR if chunking is enabled
boolean isChunkedKey = isChunked() && !partitionConsumptionState.isEndOfPushReceived();
queueUpVersionTopicWritesWithViewWriters(
partitionConsumptionState,
(viewWriter) -> viewWriter.processRecord(newPut.putValue, keyBytes, newPut.schemaId),
(viewWriter) -> viewWriter.processRecord(newPut.putValue, keyBytes, newPut.schemaId, isChunkedKey),
produceToVersionTopic);
} else {
produceToVersionTopic.run();
Expand Down
Loading

0 comments on commit df28d1e

Please sign in to comment.