diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java index d2b9ecbf92e..635bcb1d8db 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java @@ -128,14 +128,22 @@ public CompletableFuture subscribe(ComplementSet partitions) { return subscribe(partitions, Optional.empty()); } + private Version getCurrentVersion() { + return backend.getVeniceCurrentVersion(storeName); + } + + private Version getLatestNonFaultyVersion() { + return backend.getVeniceLatestNonFaultyVersion(storeName, faultyVersionSet); + } + synchronized CompletableFuture subscribe( ComplementSet partitions, Optional bootstrapVersion) { if (daVinciCurrentVersion == null) { setDaVinciCurrentVersion(new VersionBackend(backend, bootstrapVersion.orElseGet(() -> { - Version version = backend.getVeniceCurrentVersion(storeName); + Version version = getCurrentVersion(); if (version == null) { - version = backend.getVeniceLatestNonFaultyVersion(storeName, faultyVersionSet); + version = getLatestNonFaultyVersion(); } if (version == null) { throw new VeniceException("Cannot subscribe to an empty store, storeName=" + storeName); @@ -218,9 +226,9 @@ synchronized void trySubscribeDaVinciFutureVersion() { return; } - Version veniceCurrentVersion = backend.getVeniceCurrentVersion(storeName); + Version veniceCurrentVersion = getCurrentVersion(); // Latest non-faulty store version in Venice store. - Version veniceLatestVersion = backend.getVeniceLatestNonFaultyVersion(storeName, faultyVersionSet); + Version veniceLatestVersion = getLatestNonFaultyVersion(); Version targetVersion; // Make sure current version in the store config has highest priority. if (veniceCurrentVersion != null @@ -246,7 +254,7 @@ synchronized void trySubscribeDaVinciFutureVersion() { * failure. */ synchronized void validateDaVinciAndVeniceCurrentVersion() { - Version veniceCurrentVersion = backend.getVeniceCurrentVersion(storeName); + Version veniceCurrentVersion = getCurrentVersion(); if (veniceCurrentVersion != null && daVinciCurrentVersion != null) { if (veniceCurrentVersion.getNumber() > daVinciCurrentVersion.getVersion().getNumber() && faultyVersionSet.contains(veniceCurrentVersion.getNumber())) { @@ -294,7 +302,7 @@ synchronized void tryDeleteInvalidDaVinciFutureVersion() { synchronized void trySwapDaVinciCurrentVersion(Throwable failure) { if (daVinciFutureVersion != null) { // Fetch current version from store config. - Version veniceCurrentVersion = backend.getVeniceCurrentVersion(storeName); + Version veniceCurrentVersion = getCurrentVersion(); if (veniceCurrentVersion == null) { LOGGER.warn("Failed to retrieve current version of store: " + storeName); return; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java index a23c2dc8811..b081a3c462c 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java @@ -779,7 +779,7 @@ public synchronized void start() { if (isReady()) { return; } - logger.info("Starting client, storeName=" + getStoreName()); + logger.info("Starting client, storeName={}", getStoreName()); VeniceConfigLoader configLoader = buildVeniceConfig(); Optional cacheConfig = Optional.ofNullable(daVinciConfig.getCacheConfig()); initBackend(clientConfig, configLoader, managedClients, icProvider, cacheConfig, recordTransformerConfig); @@ -790,7 +790,6 @@ public synchronized void start() { if (daVinciConfig.isCacheEnabled()) { cacheBackend = getBackend().getObjectCache(); } - storeBackend = getBackend().getStoreOrThrow(getStoreName()); if (managedClients.isPresent()) { storeBackend.setManaged(daVinciConfig.isManaged()); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/factory/CachingDaVinciClientFactory.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/factory/CachingDaVinciClientFactory.java index 7479412a62b..f57bfe7c565 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/factory/CachingDaVinciClientFactory.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/factory/CachingDaVinciClientFactory.java @@ -11,6 +11,7 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.service.ICProvider; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.views.VeniceView; import io.tehuti.metrics.MetricsRepository; import java.io.Closeable; import java.util.ArrayList; @@ -162,6 +163,7 @@ private Class getClientClass(DaVinciConfig daVinciConfig, boolean isSpecific) { public DaVinciClient getGenericAvroClient(String storeName, DaVinciConfig config) { return getClient( storeName, + null, config, null, new GenericDaVinciClientConstructor<>(), @@ -172,6 +174,7 @@ public DaVinciClient getGenericAvroClient(String storeName, DaVinci public DaVinciClient getGenericAvroClient(String storeName, DaVinciConfig config, Class valueClass) { return getClient( storeName, + null, config, valueClass, new GenericDaVinciClientConstructor<>(), @@ -183,6 +186,7 @@ public DaVinciClient getGenericAvroClient(String storeName, DaVinci public DaVinciClient getAndStartGenericAvroClient(String storeName, DaVinciConfig config) { return getClient( storeName, + null, config, null, new GenericDaVinciClientConstructor<>(), @@ -196,8 +200,66 @@ public DaVinciClient getAndStartGenericAvroClient( Class valueClass) { return getClient( storeName, + null, + config, + valueClass, + new GenericDaVinciClientConstructor<>(), + getClientClass(config, false), + true); + } + + @Override + public DaVinciClient getSpecificAvroClient( + String storeName, + DaVinciConfig config, + Class valueClass) { + return getClient( + storeName, + null, + config, + valueClass, + new SpecificDaVinciClientConstructor<>(), + getClientClass(config, true), + false); + } + + @Override + public DaVinciClient getAndStartSpecificAvroClient( + String storeName, + DaVinciConfig config, + Class valueClass) { + return getClient( + storeName, + null, config, valueClass, + new SpecificDaVinciClientConstructor<>(), + getClientClass(config, true), + true); + } + + @Override + public DaVinciClient getGenericAvroClient(String storeName, String viewName, DaVinciConfig config) { + return getClient( + storeName, + viewName, + config, + null, + new GenericDaVinciClientConstructor<>(), + getClientClass(config, false), + false); + } + + @Override + public DaVinciClient getAndStartGenericAvroClient( + String storeName, + String viewName, + DaVinciConfig config) { + return getClient( + storeName, + viewName, + config, + null, new GenericDaVinciClientConstructor<>(), getClientClass(config, false), true); @@ -206,10 +268,12 @@ public DaVinciClient getAndStartGenericAvroClient( @Override public DaVinciClient getSpecificAvroClient( String storeName, + String viewName, DaVinciConfig config, Class valueClass) { return getClient( storeName, + viewName, config, valueClass, new SpecificDaVinciClientConstructor<>(), @@ -220,10 +284,12 @@ public DaVinciClient getSpecificAvroClient( @Override public DaVinciClient getAndStartSpecificAvroClient( String storeName, + String viewName, DaVinciConfig config, Class valueClass) { return getClient( storeName, + viewName, config, valueClass, new SpecificDaVinciClientConstructor<>(), @@ -290,29 +356,31 @@ public DaVinciClient apply( protected synchronized DaVinciClient getClient( String storeName, + String viewName, DaVinciConfig config, Class valueClass, DaVinciClientConstructor clientConstructor, Class clientClass, boolean startClient) { + String internalStoreName = viewName == null ? storeName : VeniceView.getViewStoreName(storeName, viewName); if (closed) { - throw new VeniceException("Unable to get a client from a closed factory, storeName=" + storeName); + throw new VeniceException("Unable to get a client from a closed factory, storeName=" + internalStoreName); } - DaVinciConfig originalConfig = configs.computeIfAbsent(storeName, k -> config); + DaVinciConfig originalConfig = configs.computeIfAbsent(internalStoreName, k -> config); if (originalConfig.isManaged() != config.isManaged()) { throw new VeniceException( - "Managed flag conflict" + ", storeName=" + storeName + ", original=" + originalConfig.isManaged() + "Managed flag conflict" + ", storeName=" + internalStoreName + ", original=" + originalConfig.isManaged() + ", requested=" + config.isManaged()); } if (originalConfig.getStorageClass() != config.getStorageClass()) { throw new VeniceException( - "Storage class conflict" + ", storeName=" + storeName + ", original=" + originalConfig.getStorageClass() - + ", requested=" + config.getStorageClass()); + "Storage class conflict" + ", storeName=" + internalStoreName + ", original=" + + originalConfig.getStorageClass() + ", requested=" + config.getStorageClass()); } - ClientConfig clientConfig = new ClientConfig(storeName).setD2Client(d2Client) + ClientConfig clientConfig = new ClientConfig(internalStoreName).setD2Client(d2Client) .setD2ServiceName(clusterDiscoveryD2ServiceName) .setMetricsRepository(metricsRepository) .setSpecificValueClass(valueClass); @@ -325,12 +393,12 @@ protected synchronized DaVinciClient getClient( isolatedClients.add(client); } else { client = sharedClients.computeIfAbsent( - storeName, + internalStoreName, k -> clientConstructor.apply(config, clientConfig, backendConfig, managedClients, icProvider)); if (!clientClass.isInstance(client)) { throw new VeniceException( - "Client type conflict" + ", storeName=" + storeName + ", originalClientClass=" + client.getClass() + "Client type conflict" + ", storeName=" + internalStoreName + ", originalClientClass=" + client.getClass() + ", requestedClientClass=" + clientClass); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/factory/DaVinciClientFactory.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/factory/DaVinciClientFactory.java index 1bea17cc879..2287ed0bc2a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/factory/DaVinciClientFactory.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/factory/DaVinciClientFactory.java @@ -19,4 +19,20 @@ DaVinciClient getAndStartSpecificAvroClient( String storeName, DaVinciConfig config, Class valueClass); + + DaVinciClient getGenericAvroClient(String storeName, String viewName, DaVinciConfig config); + + DaVinciClient getAndStartGenericAvroClient(String storeName, String viewName, DaVinciConfig config); + + DaVinciClient getSpecificAvroClient( + String storeName, + String viewName, + DaVinciConfig config, + Class valueClass); + + DaVinciClient getAndStartSpecificAvroClient( + String storeName, + String viewName, + DaVinciConfig config, + Class valueClass); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 65d9413712e..97423460f92 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -3377,9 +3377,11 @@ protected void processMessageAndMaybeProduceToKafka( // Write to views if (hasViewWriters()) { Put newPut = writeComputeResultWrapper.getNewPut(); + // keys will be serialized with chunk suffix during pass-through mode in L/F NR if chunking is enabled + boolean isChunkedKey = isChunked() && !partitionConsumptionState.isEndOfPushReceived(); queueUpVersionTopicWritesWithViewWriters( partitionConsumptionState, - (viewWriter) -> viewWriter.processRecord(newPut.putValue, keyBytes, newPut.schemaId), + (viewWriter) -> viewWriter.processRecord(newPut.putValue, keyBytes, newPut.schemaId, isChunkedKey), produceToVersionTopic); } else { produceToVersionTopic.run(); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepositoryViewAdapter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepositoryViewAdapter.java new file mode 100644 index 00000000000..25206becf7b --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepositoryViewAdapter.java @@ -0,0 +1,243 @@ +package com.linkedin.davinci.repository; + +import com.linkedin.venice.exceptions.VeniceNoStoreException; +import com.linkedin.venice.meta.ClusterInfoProvider; +import com.linkedin.venice.meta.ReadOnlySchemaRepository; +import com.linkedin.venice.meta.ReadOnlyViewStore; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.StoreDataChangedListener; +import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository; +import com.linkedin.venice.schema.GeneratedSchemaID; +import com.linkedin.venice.schema.SchemaEntry; +import com.linkedin.venice.schema.rmd.RmdSchemaEntry; +import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import com.linkedin.venice.views.VeniceView; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; + + +/** + * Adapter that provides read only interface to access store, schema and cluster info using the underlying + * {@link NativeMetadataRepository} for both regular Venice stores and Venice view stores. Intended for client + * libraries like DaVinci and CC clients which need to consume and materialize store views. + */ +public class NativeMetadataRepositoryViewAdapter implements SubscriptionBasedReadOnlyStoreRepository, + ReadOnlySchemaRepository, ClusterInfoProvider, SubscribedViewStoreProvider { + private final NativeMetadataRepository nativeMetadataRepository; + + // Map of store name to a set of subscribed store name and store view name(s). Used to track subscription, so we only + // unsubscribe from the internal store once all corresponding regular store and view store(s) are unsubscribed. + private final Map> subscribedStoreMap = new VeniceConcurrentHashMap<>(); + + // A map to track registered StoreDataChangedListener and the corresponding StoreDataChangedListenerViewAdapter. + private final Map storeDataChangedAdapterMap = + new VeniceConcurrentHashMap<>(); + + public NativeMetadataRepositoryViewAdapter(NativeMetadataRepository nativeMetadataRepository) { + this.nativeMetadataRepository = nativeMetadataRepository; + } + + @Override + public void refresh() { + nativeMetadataRepository.refresh(); + } + + @Override + public void clear() { + nativeMetadataRepository.clear(); + } + + @Override + public String getVeniceCluster(String storeName) { + return nativeMetadataRepository.getVeniceCluster(VeniceView.getStoreName(storeName)); + } + + @Override + public SchemaEntry getKeySchema(String storeName) { + return nativeMetadataRepository.getKeySchema(VeniceView.getStoreName(storeName)); + } + + @Override + public SchemaEntry getValueSchema(String storeName, int id) { + return nativeMetadataRepository.getValueSchema(VeniceView.getStoreName(storeName), id); + } + + @Override + public boolean hasValueSchema(String storeName, int id) { + return nativeMetadataRepository.hasValueSchema(VeniceView.getStoreName(storeName), id); + } + + @Override + public int getValueSchemaId(String storeName, String valueSchemaStr) { + return nativeMetadataRepository.getValueSchemaId(VeniceView.getStoreName(storeName), valueSchemaStr); + } + + @Override + public Collection getValueSchemas(String storeName) { + return nativeMetadataRepository.getValueSchemas(VeniceView.getStoreName(storeName)); + } + + @Override + public SchemaEntry getSupersetOrLatestValueSchema(String storeName) { + return nativeMetadataRepository.getSupersetOrLatestValueSchema(VeniceView.getStoreName(storeName)); + } + + @Override + public SchemaEntry getSupersetSchema(String storeName) { + return nativeMetadataRepository.getSupersetSchema(VeniceView.getStoreName(storeName)); + } + + @Override + public GeneratedSchemaID getDerivedSchemaId(String storeName, String derivedSchemaStr) { + return nativeMetadataRepository.getDerivedSchemaId(VeniceView.getStoreName(storeName), derivedSchemaStr); + } + + @Override + public DerivedSchemaEntry getDerivedSchema(String storeName, int valueSchemaId, int writeComputeSchemaId) { + return nativeMetadataRepository + .getDerivedSchema(VeniceView.getStoreName(storeName), valueSchemaId, writeComputeSchemaId); + } + + @Override + public Collection getDerivedSchemas(String storeName) { + return nativeMetadataRepository.getDerivedSchemas(VeniceView.getStoreName(storeName)); + } + + @Override + public DerivedSchemaEntry getLatestDerivedSchema(String storeName, int valueSchemaId) { + return nativeMetadataRepository.getLatestDerivedSchema(VeniceView.getStoreName(storeName), valueSchemaId); + } + + @Override + public RmdSchemaEntry getReplicationMetadataSchema( + String storeName, + int valueSchemaId, + int replicationMetadataVersionId) { + return nativeMetadataRepository + .getReplicationMetadataSchema(VeniceView.getStoreName(storeName), valueSchemaId, replicationMetadataVersionId); + } + + @Override + public Collection getReplicationMetadataSchemas(String storeName) { + return nativeMetadataRepository.getReplicationMetadataSchemas(VeniceView.getStoreName(storeName)); + } + + @Override + public Store getStore(String storeName) { + Store store = nativeMetadataRepository.getStore(VeniceView.getStoreName(storeName)); + if (store == null) { + return null; + } + if (!VeniceView.isViewStore(storeName)) { + return store; + } + // It's a view store, so we need to create a ReadOnlyViewStore + return new ReadOnlyViewStore(store, storeName); + } + + @Override + public Store getStoreOrThrow(String storeName) throws VeniceNoStoreException { + Store store = getStore(storeName); + if (store != null) { + return store; + } + throw new VeniceNoStoreException(storeName); + } + + @Override + public boolean hasStore(String storeName) { + return nativeMetadataRepository.hasStore(VeniceView.getStoreName(storeName)); + } + + @Override + public Store refreshOneStore(String storeName) { + return nativeMetadataRepository.refreshOneStore(VeniceView.getStoreName(storeName)); + } + + /** + * This method will only return all the subscribed Venice stores and not include the view stores for the following + * reasons: + * 1. Currently, there is no usage of get all view stores. + * 2. The purpose of this adapter is to allow consumers to work with view stores without having to leak view store + * specific logic everywhere. If the repository included view stores in getAllStores() then callers will need to + * understand and differentiate view stores in order to avoid unexpected behaviors. + */ + @Override + public List getAllStores() { + return nativeMetadataRepository.getAllStores(); + } + + @Override + public long getTotalStoreReadQuota() { + return nativeMetadataRepository.getTotalStoreReadQuota(); + } + + @Override + public void registerStoreDataChangedListener(StoreDataChangedListener listener) { + StoreDataChangedListener dataChangedListenerAdapter = new StoreDataChangedListenerViewAdapter(listener, this); + storeDataChangedAdapterMap.computeIfAbsent(listener, (ignored) -> { + nativeMetadataRepository.registerStoreDataChangedListener(dataChangedListenerAdapter); + return dataChangedListenerAdapter; + }); + } + + @Override + public void unregisterStoreDataChangedListener(StoreDataChangedListener listener) { + storeDataChangedAdapterMap.computeIfPresent(listener, (ignored, dataChangedListenerAdapter) -> { + nativeMetadataRepository.unregisterStoreDataChangedListener(dataChangedListenerAdapter); + return null; + }); + } + + @Override + public int getBatchGetLimit(String storeName) { + return nativeMetadataRepository.getBatchGetLimit(VeniceView.getStoreName(storeName)); + } + + @Override + public boolean isReadComputationEnabled(String storeName) { + return nativeMetadataRepository.isReadComputationEnabled(VeniceView.getStoreName(storeName)); + } + + @Override + public void subscribe(String storeName) throws InterruptedException { + String internalStoreName = VeniceView.getStoreName(storeName); + nativeMetadataRepository.subscribe(internalStoreName); + subscribedStoreMap.compute(internalStoreName, (ignored, subscribedStores) -> { + if (subscribedStores == null) { + subscribedStores = new ConcurrentSkipListSet<>(); + } + subscribedStores.add(storeName); + return subscribedStores; + }); + } + + @Override + public void unsubscribe(String storeName) { + String internalStoreName = VeniceView.getStoreName(storeName); + subscribedStoreMap.computeIfPresent(internalStoreName, (ignored, subscribedStores) -> { + subscribedStores.remove(storeName); + if (subscribedStores.isEmpty()) { + nativeMetadataRepository.unsubscribe(internalStoreName); + return null; + } + return subscribedStores; + }); + } + + @Override + public @Nonnull Set getSubscribedViewStores(String storeName) { + Set subscribedStores = subscribedStoreMap.get(storeName); + if (subscribedStores == null) { + return Collections.emptySet(); + } + return subscribedStores.stream().filter(VeniceView::isViewStore).collect(Collectors.toSet()); + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/StoreDataChangedListenerViewAdapter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/StoreDataChangedListenerViewAdapter.java new file mode 100644 index 00000000000..bef05bc224e --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/StoreDataChangedListenerViewAdapter.java @@ -0,0 +1,58 @@ +package com.linkedin.davinci.repository; + +import com.linkedin.venice.meta.ReadOnlyViewStore; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.StoreDataChangedListener; + + +/** + * Adapter that provides store data changed listener interface for the store and any subscribed view stores. + */ +public class StoreDataChangedListenerViewAdapter implements StoreDataChangedListener { + private final StoreDataChangedListener storeDataChangedListener; + private final SubscribedViewStoreProvider viewStoreProvider; + + /** + * We need an external provider to provide us the set of subscribed view stores because the passed in Store object + * after data changed may or may not include all the view stores that were subscribed and expecting the store change + * event. e.g. A view that's being deprecated via new version pushes. + */ + public StoreDataChangedListenerViewAdapter( + StoreDataChangedListener storeDataChangedListener, + SubscribedViewStoreProvider viewStoreProvider) { + this.storeDataChangedListener = storeDataChangedListener; + this.viewStoreProvider = viewStoreProvider; + } + + @Override + public void handleStoreCreated(Store store) { + storeDataChangedListener.handleStoreCreated(store); + for (String viewStore: viewStoreProvider.getSubscribedViewStores(store.getName())) { + storeDataChangedListener.handleStoreCreated(new ReadOnlyViewStore(store, viewStore)); + } + } + + @Override + public void handleStoreDeleted(Store store) { + storeDataChangedListener.handleStoreDeleted(store); + for (String viewStore: viewStoreProvider.getSubscribedViewStores(store.getName())) { + storeDataChangedListener.handleStoreDeleted(new ReadOnlyViewStore(store, viewStore)); + } + } + + @Override + public void handleStoreDeleted(String storeName) { + storeDataChangedListener.handleStoreDeleted(storeName); + for (String viewStore: viewStoreProvider.getSubscribedViewStores(storeName)) { + storeDataChangedListener.handleStoreDeleted(viewStore); + } + } + + @Override + public void handleStoreChanged(Store store) { + storeDataChangedListener.handleStoreChanged(store); + for (String viewStore: viewStoreProvider.getSubscribedViewStores(store.getName())) { + storeDataChangedListener.handleStoreChanged(new ReadOnlyViewStore(store, viewStore)); + } + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/SubscribedViewStoreProvider.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/SubscribedViewStoreProvider.java new file mode 100644 index 00000000000..f9e31bb1388 --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/SubscribedViewStoreProvider.java @@ -0,0 +1,18 @@ +package com.linkedin.davinci.repository; + +import java.util.Set; +import javax.annotation.Nonnull; + + +/** + * Interface for {@link com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository} to provide a set of view + * stores given the parent store. + */ +public interface SubscribedViewStoreProvider { + /** + * @param storeName of the Venice store. + * @return a set of subscribed view store names associated with the provided Venice store. + */ + @Nonnull + Set getSubscribedViewStores(String storeName); +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/VeniceMetadataRepositoryBuilder.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/VeniceMetadataRepositoryBuilder.java index ad9e298e38e..7721b09bcab 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/VeniceMetadataRepositoryBuilder.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/VeniceMetadataRepositoryBuilder.java @@ -113,9 +113,11 @@ private void initDaVinciStoreAndSchemaRepository() { NativeMetadataRepository.getInstance(clientConfig, veniceProperties, icProvider); systemStoreBasedRepository.start(); systemStoreBasedRepository.refresh(); - clusterInfoProvider = systemStoreBasedRepository; - storeRepo = systemStoreBasedRepository; - schemaRepo = systemStoreBasedRepository; + NativeMetadataRepositoryViewAdapter repositoryViewAdapter = + new NativeMetadataRepositoryViewAdapter(systemStoreBasedRepository); + clusterInfoProvider = repositoryViewAdapter; + storeRepo = repositoryViewAdapter; + schemaRepo = repositoryViewAdapter; liveClusterConfigRepo = null; } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java index d31bf4a8cb8..65c155230d3 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java @@ -4,6 +4,7 @@ import com.linkedin.davinci.kafka.consumer.StoreIngestionTask; import com.linkedin.venice.meta.ReadOnlyStoreRepository; import com.linkedin.venice.meta.Version; +import com.linkedin.venice.views.VeniceView; import io.tehuti.metrics.MetricsRepository; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -30,11 +31,14 @@ public AggVersionedIngestionStats( } public void setIngestionTask(String storeVersionTopic, StoreIngestionTask ingestionTask) { - if (!Version.isVersionTopicOrStreamReprocessingTopic(storeVersionTopic)) { + if (!Version.isATopicThatIsVersioned(storeVersionTopic)) { LOGGER.warn("Invalid store version topic name: {}", storeVersionTopic); return; } - String storeName = Version.parseStoreFromKafkaTopicName(storeVersionTopic); + // For metrics reporting purpose the store name for Venice view ingestion will be _ + String storeName = VeniceView.isViewTopic(storeVersionTopic) + ? VeniceView.parseStoreAndViewFromViewTopic(storeVersionTopic) + : Version.parseStoreFromKafkaTopicName(storeVersionTopic); int version = Version.parseVersionFromKafkaTopicName(storeVersionTopic); try { /** diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java index 9eecd7baec3..707215c8651 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java @@ -83,7 +83,11 @@ public CompletableFuture processRecord( } @Override - public CompletableFuture processRecord(ByteBuffer newValue, byte[] key, int newValueSchemaId) { + public CompletableFuture processRecord( + ByteBuffer newValue, + byte[] key, + int newValueSchemaId, + boolean isChunkedKey) { // No op return CompletableFuture.completedFuture(null); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java index a529d0aef10..3c92b567e0e 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java @@ -8,6 +8,7 @@ import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; import com.linkedin.venice.pubsub.api.PubSubProduceResult; +import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer; import com.linkedin.venice.utils.ByteUtils; import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.views.MaterializedView; @@ -31,6 +32,7 @@ public class MaterializedViewWriter extends VeniceViewWriter { private final MaterializedView internalView; private final String materializedViewTopicName; private Lazy veniceWriter; + private final KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer(); public MaterializedViewWriter( VeniceConfigLoader props, @@ -63,16 +65,32 @@ public CompletableFuture processRecord( int newValueSchemaId, int oldValueSchemaId, GenericRecord replicationMetadataRecord) { - return processRecord(newValue, key, newValueSchemaId); + return processRecord(newValue, key, newValueSchemaId, false); } + /** + * Before we have proper chunking support for view writers we assume that even when chunking is enabled the actual + * k/v will not be chunked. This way we don't need to worry about how to ensure all the chunks are forwarded to the + * same view partition during NR's pass-through mode. Proper chunking support can leverage message footer populated + * by the CompositeVeniceWriter in VPJ (write to views first and then VT) to figure out which view partition to + * forward the chunks to. Another alternative is trigger the view writer write upon receiving the manifest, and we + * will assemble and re-chunk. + */ @Override - public CompletableFuture processRecord(ByteBuffer newValue, byte[] key, int newValueSchemaId) { + public CompletableFuture processRecord( + ByteBuffer newValue, + byte[] key, + int newValueSchemaId, + boolean isChunkedKey) { + byte[] viewTopicKey = key; + if (isChunkedKey) { + viewTopicKey = keyWithChunkingSuffixSerializer.getKeyFromChunkedKey(key); + } if (newValue == null) { // this is a delete operation - return veniceWriter.get().delete(key, null); + return veniceWriter.get().delete(viewTopicKey, null); } - return veniceWriter.get().put(key, ByteUtils.extractByteArray(newValue), newValueSchemaId); + return veniceWriter.get().put(viewTopicKey, ByteUtils.extractByteArray(newValue), newValueSchemaId); } @Override diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java index 48919afaa40..49ee755041a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java @@ -81,13 +81,14 @@ public abstract CompletableFuture processRecord( * * @param newValue the incoming fully specified value which hasn't yet been committed to Venice * @param key the key of the record that designates newValue and oldValue - * @param version the version of the store taking this record * @param newValueSchemaId the schemaId of the incoming record + * @param isChunkedKey is the key already serialized with {@link com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer} */ public abstract CompletableFuture processRecord( ByteBuffer newValue, byte[] key, - int newValueSchemaId); + int newValueSchemaId, + boolean isChunkedKey); /** * Called when the server encounters a control message. There isn't (today) a strict ordering diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java index fcade1f5f90..c3b4e17d430 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java @@ -1,6 +1,7 @@ package com.linkedin.davinci.kafka.consumer; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doReturn; @@ -279,7 +280,7 @@ public void testQueueUpVersionTopicWritesWithViewWriters() throws InterruptedExc viewWriterMap.put("testView", materializedViewWriter); when(mockVeniceViewWriterFactory.buildStoreViewWriters(any(), anyInt(), any())).thenReturn(viewWriterMap); CompletableFuture viewWriterFuture = new CompletableFuture<>(); - when(materializedViewWriter.processRecord(any(), any(), anyInt())).thenReturn(viewWriterFuture); + when(materializedViewWriter.processRecord(any(), any(), anyInt(), anyBoolean())).thenReturn(viewWriterFuture); setUp(); WriteComputeResultWrapper mockResult = mock(WriteComputeResultWrapper.class); Put put = new Put(); @@ -290,12 +291,12 @@ public void testQueueUpVersionTopicWritesWithViewWriters() throws InterruptedExc .thenReturn(CompletableFuture.completedFuture(null)); leaderFollowerStoreIngestionTask.queueUpVersionTopicWritesWithViewWriters( mockPartitionConsumptionState, - (viewWriter) -> viewWriter.processRecord(mock(ByteBuffer.class), new byte[1], 1), + (viewWriter) -> viewWriter.processRecord(mock(ByteBuffer.class), new byte[1], 1, false), () -> writeToVersionTopic.set(true)); verify(mockPartitionConsumptionState, times(1)).getLastVTProduceCallFuture(); ArgumentCaptor vtWriteFutureCaptor = ArgumentCaptor.forClass(CompletableFuture.class); verify(mockPartitionConsumptionState, times(1)).setLastVTProduceCallFuture(vtWriteFutureCaptor.capture()); - verify(materializedViewWriter, times(1)).processRecord(any(), any(), anyInt()); + verify(materializedViewWriter, times(1)).processRecord(any(), any(), anyInt(), anyBoolean()); verify(hostLevelIngestionStats, times(1)).recordViewProducerLatency(anyDouble()); verify(hostLevelIngestionStats, never()).recordViewProducerAckLatency(anyDouble()); assertFalse(writeToVersionTopic.get()); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/NativeMetadataRepositoryViewAdapterTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/NativeMetadataRepositoryViewAdapterTest.java new file mode 100644 index 00000000000..9c3cf8cd56c --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/NativeMetadataRepositoryViewAdapterTest.java @@ -0,0 +1,268 @@ +package com.linkedin.davinci.repository; + +import static com.linkedin.venice.views.MaterializedView.MATERIALIZED_VIEW_TOPIC_SUFFIX; +import static com.linkedin.venice.views.VeniceView.VIEW_NAME_SEPARATOR; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; + +import com.linkedin.venice.exceptions.VeniceNoStoreException; +import com.linkedin.venice.meta.MaterializedViewParameters; +import com.linkedin.venice.meta.PartitionerConfig; +import com.linkedin.venice.meta.PartitionerConfigImpl; +import com.linkedin.venice.meta.ReadOnlyViewStore; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.StoreDataChangedListener; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.meta.VersionImpl; +import com.linkedin.venice.meta.ViewConfig; +import com.linkedin.venice.meta.ViewConfigImpl; +import com.linkedin.venice.partitioner.ConstantVenicePartitioner; +import com.linkedin.venice.partitioner.DefaultVenicePartitioner; +import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.views.MaterializedView; +import com.linkedin.venice.views.VeniceView; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.testng.annotations.Test; + + +public class NativeMetadataRepositoryViewAdapterTest { + @Test + public void testPassThroughAPIs() { + NativeMetadataRepository nativeMetadataRepository = mock(NativeMetadataRepository.class); + NativeMetadataRepositoryViewAdapter repoViewAdapter = + new NativeMetadataRepositoryViewAdapter(nativeMetadataRepository); + String storeName = Utils.getUniqueString("testStore"); + String viewName = "testView"; + String viewStoreName = VeniceView.getViewStoreName(storeName, viewName); + invokeAndVerifyPassThroughAPIs(repoViewAdapter, nativeMetadataRepository, viewStoreName, storeName); + invokeAndVerifyPassThroughAPIs(repoViewAdapter, nativeMetadataRepository, storeName, storeName); + } + + @Test + public void testSubscribeUnsubscribe() throws InterruptedException, ExecutionException, TimeoutException { + NativeMetadataRepository nativeMetadataRepository = mock(NativeMetadataRepository.class); + NativeMetadataRepositoryViewAdapter repoViewAdapter = + new NativeMetadataRepositoryViewAdapter(nativeMetadataRepository); + String storeName = Utils.getUniqueString("testStore"); + String viewStoreName1 = VeniceView.getViewStoreName(storeName, "testView1"); + String viewStoreName2 = VeniceView.getViewStoreName(storeName, "testView2"); + repoViewAdapter.subscribe(viewStoreName1); + repoViewAdapter.subscribe(viewStoreName2); + repoViewAdapter.subscribe(storeName); + verify(nativeMetadataRepository, times(3)).subscribe(storeName); + Set subscribedViewStores = repoViewAdapter.getSubscribedViewStores(storeName); + assertEquals(subscribedViewStores.size(), 2); + assertTrue(subscribedViewStores.contains(viewStoreName1)); + assertTrue(subscribedViewStores.contains(viewStoreName2)); + repoViewAdapter.unsubscribe(viewStoreName2); + verify(nativeMetadataRepository, never()).unsubscribe(storeName); + subscribedViewStores = repoViewAdapter.getSubscribedViewStores(storeName); + assertEquals(subscribedViewStores.size(), 1); + assertTrue(subscribedViewStores.contains(viewStoreName1)); + repoViewAdapter.unsubscribe(viewStoreName1); + verify(nativeMetadataRepository, never()).unsubscribe(storeName); + assertEquals(repoViewAdapter.getSubscribedViewStores(storeName).size(), 0); + repoViewAdapter.unsubscribe(storeName); + verify(nativeMetadataRepository, times(1)).unsubscribe(storeName); + // We should be able to resubscribe + repoViewAdapter.subscribe(viewStoreName1); + verify(nativeMetadataRepository, times(4)).subscribe(storeName); + assertEquals(repoViewAdapter.getSubscribedViewStores(storeName).iterator().next(), viewStoreName1); + + // Do some simple thread-safe sanity check + Mockito.clearInvocations(nativeMetadataRepository); + Runnable subscribeRunnable = () -> { + try { + repoViewAdapter.subscribe(viewStoreName1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }; + Runnable unSubscribeRunnable = () -> { + repoViewAdapter.unsubscribe(viewStoreName1); + }; + CompletableFuture[] completableFutures = new CompletableFuture[50]; + for (int i = 0; i < 50; i++) { + if (i % 2 == 0) { + completableFutures[i] = CompletableFuture.runAsync(subscribeRunnable); + } else { + completableFutures[i] = CompletableFuture.runAsync(unSubscribeRunnable); + } + } + CompletableFuture.allOf(completableFutures).get(1, TimeUnit.SECONDS); + // Regardless order of events subscribe should be called 25 times + verify(nativeMetadataRepository, times(25)).subscribe(storeName); + repoViewAdapter.unsubscribe(viewStoreName1); + assertEquals(repoViewAdapter.getSubscribedViewStores(storeName).size(), 0); + } + + @Test + public void testGetStore() { + NativeMetadataRepository nativeMetadataRepository = mock(NativeMetadataRepository.class); + NativeMetadataRepositoryViewAdapter repoViewAdapter = + new NativeMetadataRepositoryViewAdapter(nativeMetadataRepository); + String storeName = Utils.getUniqueString("testStore"); + String viewName = "testView"; + String viewStoreName = VeniceView.getViewStoreName(storeName, viewName); + assertThrows(VeniceNoStoreException.class, () -> repoViewAdapter.getStoreOrThrow(viewStoreName)); + Store store = mock(Store.class); + List storeVersions = new ArrayList<>(); + Version storeVersion = new VersionImpl(storeName, 1, "dummyId"); + Version storeVersion2 = new VersionImpl(storeName, 2, "dummyId2"); + storeVersions.add(storeVersion); + storeVersions.add(storeVersion2); + // Set some common store version configs + int storePartitionCount = 6; + PartitionerConfig storePartitionerConfig = new PartitionerConfigImpl(); + storePartitionerConfig.setPartitionerClass(DefaultVenicePartitioner.class.getCanonicalName()); + for (Version version: storeVersions) { + version.setPartitionCount(storePartitionCount); + version.setPartitionerConfig(storePartitionerConfig); + } + MaterializedViewParameters.Builder builder = new MaterializedViewParameters.Builder(viewName); + builder.setPartitionCount(12).setPartitioner(ConstantVenicePartitioner.class.getCanonicalName()); + ViewConfig viewConfig = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build()); + Map storeViewConfigMap = new HashMap<>(); + storeViewConfigMap.put(viewName, viewConfig); + storeVersion2.setViewConfigs(storeViewConfigMap); + doReturn(storeVersions).when(store).getVersions(); + doReturn(storeName).when(store).getName(); + doReturn(2).when(store).getCurrentVersion(); + doReturn(store).when(nativeMetadataRepository).getStore(storeName); + assertEquals(repoViewAdapter.getStoreOrThrow(storeName).getName(), storeName); + // Now get the view store and verify its properties + Store viewStore = repoViewAdapter.getStoreOrThrow(viewStoreName); + assertTrue(viewStore instanceof ReadOnlyViewStore); + assertEquals(viewStore.getName(), viewStoreName); + // v1 should be filtered since it doesn't have view configs + assertNull(viewStore.getVersion(1)); + Version viewStoreVersion = viewStore.getVersion(viewStore.getCurrentVersion()); + assertNotNull(viewStoreVersion); + assertTrue(viewStoreVersion instanceof ReadOnlyViewStore.ReadOnlyMaterializedViewVersion); + assertEquals(viewStoreVersion.getPartitionCount(), 12); + assertEquals( + viewStoreVersion.getPartitionerConfig().getPartitionerClass(), + ConstantVenicePartitioner.class.getCanonicalName()); + String viewStoreVersionTopicName = + Version.composeKafkaTopic(storeName, 2) + VIEW_NAME_SEPARATOR + viewName + MATERIALIZED_VIEW_TOPIC_SUFFIX; + assertEquals(viewStoreVersion.kafkaTopicName(), viewStoreVersionTopicName); + } + + @Test + public void testStoreDataChangedListener() throws InterruptedException { + AtomicInteger storeChangeCounter = new AtomicInteger(0); + AtomicInteger viewStoreChangeCounter = new AtomicInteger(0); + StoreDataChangedListener testDataChangedListener = new StoreDataChangedListener() { + @Override + public void handleStoreChanged(Store store) { + if (VeniceView.isViewStore(store.getName())) { + viewStoreChangeCounter.incrementAndGet(); + } else { + storeChangeCounter.incrementAndGet(); + } + } + }; + NativeMetadataRepository nativeMetadataRepository = mock(NativeMetadataRepository.class); + NativeMetadataRepositoryViewAdapter repoViewAdapter = + new NativeMetadataRepositoryViewAdapter(nativeMetadataRepository); + String storeName = Utils.getUniqueString("testStore"); + String viewStoreName1 = VeniceView.getViewStoreName(storeName, "testView1"); + String viewStoreName2 = VeniceView.getViewStoreName(storeName, "testView2"); + repoViewAdapter.subscribe(viewStoreName1); + repoViewAdapter.subscribe(viewStoreName2); + assertEquals(repoViewAdapter.getSubscribedViewStores(storeName).size(), 2); + repoViewAdapter.registerStoreDataChangedListener(testDataChangedListener); + ArgumentCaptor wrappedListenerCaptor = + ArgumentCaptor.forClass(StoreDataChangedListener.class); + verify(nativeMetadataRepository, times(1)).registerStoreDataChangedListener(wrappedListenerCaptor.capture()); + Store store = mock(Store.class); + doReturn(storeName).when(store).getName(); + doReturn(Collections.emptyList()).when(store).getVersions(); + StoreDataChangedListener listenerViewAdapter = wrappedListenerCaptor.getValue(); + listenerViewAdapter.handleStoreCreated(store); + listenerViewAdapter.handleStoreChanged(store); + listenerViewAdapter.handleStoreChanged(store); + listenerViewAdapter.handleStoreDeleted(store); + assertEquals(storeChangeCounter.get(), 2); + assertEquals(viewStoreChangeCounter.get(), 4); + repoViewAdapter.unregisterStoreDataChangedListener(testDataChangedListener); + verify(nativeMetadataRepository, times(1)).unregisterStoreDataChangedListener(wrappedListenerCaptor.capture()); + // Ensure the storeDataChangedAdapterMap is working and we are unregistering the correct listener + assertEquals(wrappedListenerCaptor.getValue(), listenerViewAdapter); + // We should be able to re-register the same listener + repoViewAdapter.registerStoreDataChangedListener(testDataChangedListener); + verify(nativeMetadataRepository, times(2)).registerStoreDataChangedListener(wrappedListenerCaptor.capture()); + } + + private void invokeAndVerifyPassThroughAPIs( + NativeMetadataRepositoryViewAdapter adapter, + NativeMetadataRepository repo, + String inputStoreName, + String expectedRepoInvocationStoreName) { + Mockito.clearInvocations(repo); + adapter.refresh(); + verify(repo, times(1)).refresh(); + adapter.clear(); + verify(repo, times(1)).clear(); + adapter.getVeniceCluster(inputStoreName); + verify(repo, times(1)).getVeniceCluster(expectedRepoInvocationStoreName); + adapter.getKeySchema(inputStoreName); + verify(repo, times(1)).getKeySchema(expectedRepoInvocationStoreName); + adapter.getValueSchema(inputStoreName, 1); + verify(repo, times(1)).getValueSchema(expectedRepoInvocationStoreName, 1); + adapter.hasValueSchema(inputStoreName, 1); + verify(repo, times(1)).hasValueSchema(expectedRepoInvocationStoreName, 1); + adapter.getValueSchemaId(inputStoreName, ""); + verify(repo, times(1)).getValueSchemaId(expectedRepoInvocationStoreName, ""); + adapter.getValueSchemas(inputStoreName); + verify(repo, times(1)).getValueSchemas(expectedRepoInvocationStoreName); + adapter.getSupersetOrLatestValueSchema(inputStoreName); + verify(repo, times(1)).getSupersetOrLatestValueSchema(expectedRepoInvocationStoreName); + adapter.getSupersetSchema(inputStoreName); + verify(repo, times(1)).getSupersetSchema(expectedRepoInvocationStoreName); + adapter.getDerivedSchemaId(inputStoreName, ""); + verify(repo, times(1)).getDerivedSchemaId(expectedRepoInvocationStoreName, ""); + adapter.getDerivedSchema(inputStoreName, 1, 1); + verify(repo, times(1)).getDerivedSchema(expectedRepoInvocationStoreName, 1, 1); + adapter.getDerivedSchemas(inputStoreName); + verify(repo, times(1)).getDerivedSchemas(expectedRepoInvocationStoreName); + adapter.getLatestDerivedSchema(inputStoreName, 1); + verify(repo, times(1)).getLatestDerivedSchema(expectedRepoInvocationStoreName, 1); + adapter.getReplicationMetadataSchema(inputStoreName, 1, 1); + verify(repo, times(1)).getReplicationMetadataSchema(expectedRepoInvocationStoreName, 1, 1); + adapter.getReplicationMetadataSchemas(inputStoreName); + verify(repo, times(1)).getReplicationMetadataSchemas(expectedRepoInvocationStoreName); + adapter.hasStore(inputStoreName); + verify(repo, times(1)).hasStore(expectedRepoInvocationStoreName); + adapter.refreshOneStore(inputStoreName); + verify(repo, times(1)).refreshOneStore(expectedRepoInvocationStoreName); + adapter.getAllStores(); + verify(repo, times(1)).getAllStores(); + adapter.getTotalStoreReadQuota(); + verify(repo, times(1)).getTotalStoreReadQuota(); + adapter.getBatchGetLimit(inputStoreName); + verify(repo, times(1)).getBatchGetLimit(expectedRepoInvocationStoreName); + adapter.isReadComputationEnabled(inputStoreName); + verify(repo, times(1)).isReadComputationEnabled(expectedRepoInvocationStoreName); + } +} diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java index 2518d111d57..7e11f795ff5 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java @@ -5,9 +5,11 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -28,25 +30,30 @@ import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.pubsub.PubSubClientsFactory; import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; +import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer; import com.linkedin.venice.utils.ObjectMapperFactory; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.views.MaterializedView; import com.linkedin.venice.views.VeniceView; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterOptions; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.concurrent.CompletableFuture; import org.apache.avro.Schema; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; public class MaterializedViewWriterTest { private static final Schema SCHEMA = AvroCompatibilityHelper.parse("\"string\""); + private static final Random RANDOM = new Random(123); @Test public void testViewParametersBuilder() throws JsonProcessingException { @@ -93,7 +100,7 @@ public void testBuildWriterOptions() { VeniceWriterOptions writerOptions = materializedViewWriter.buildWriterOptions(); Assert.assertEquals( writerOptions.getTopicName(), - Version.composeKafkaTopic(storeName, 1) + VeniceView.VIEW_TOPIC_SEPARATOR + viewName + Version.composeKafkaTopic(storeName, 1) + VeniceView.VIEW_NAME_SEPARATOR + viewName + MaterializedView.MATERIALIZED_VIEW_TOPIC_SUFFIX); Assert.assertEquals(writerOptions.getPartitionCount(), Integer.valueOf(6)); Assert.assertEquals(writerOptions.getPartitioner().getClass(), DefaultVenicePartitioner.class); @@ -132,6 +139,37 @@ public void testProcessControlMessage() { verify(veniceWriter, never()).sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong()); } + @Test + public void testViewWriterCanForwardChunkedKeysCorrectly() { + String storeName = "testStoreWithChunkedKeys"; + String viewName = "testMaterializedViewWithChunkedKeys"; + Version version = mock(Version.class); + doReturn(true).when(version).isChunkingEnabled(); + doReturn(true).when(version).isRmdChunkingEnabled(); + getMockStore(storeName, 1, version); + MaterializedViewParameters.Builder viewParamsBuilder = new MaterializedViewParameters.Builder(viewName); + viewParamsBuilder.setPartitionCount(6); + viewParamsBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); + Map viewParamsMap = viewParamsBuilder.build(); + VeniceConfigLoader props = getMockProps(); + MaterializedViewWriter materializedViewWriter = new MaterializedViewWriter(props, version, SCHEMA, viewParamsMap); + VeniceWriter veniceWriter = mock(VeniceWriter.class); + doReturn(CompletableFuture.completedFuture(null)).when(veniceWriter).put(any(), any(), anyInt()); + materializedViewWriter.setVeniceWriter(veniceWriter); + KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer(); + ByteBuffer dummyValue = mock(ByteBuffer.class); + // Deterministic random bytes + int keySize = 5; + for (int i = 0; i < 100; i++) { + byte[] key = new byte[keySize]; + RANDOM.nextBytes(key); + materializedViewWriter + .processRecord(dummyValue, keyWithChunkingSuffixSerializer.serializeNonChunkedKey(key), 1, true); + verify(veniceWriter, times(1)).put(eq(key), any(), eq(1)); + Mockito.clearInvocations(veniceWriter); + } + } + private VeniceConfigLoader getMockProps() { VeniceConfigLoader props = mock(VeniceConfigLoader.class); VeniceServerConfig serverConfig = mock(VeniceServerConfig.class); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadOnlyStoreViewConfigRepositoryAdapter.java b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadOnlyStoreViewConfigRepositoryAdapter.java new file mode 100644 index 00000000000..abccfb666cc --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadOnlyStoreViewConfigRepositoryAdapter.java @@ -0,0 +1,40 @@ +package com.linkedin.venice.helix; + +import com.linkedin.venice.VeniceResource; +import com.linkedin.venice.meta.ReadOnlyStoreConfigRepository; +import com.linkedin.venice.meta.StoreConfig; +import com.linkedin.venice.views.VeniceView; +import java.util.Optional; + + +/** + * This repository provides read-only interface to access store configs for both regular Venice store names and view + * store names. This adapter is only needed if the component is expected to work with view stores. + */ +public class HelixReadOnlyStoreViewConfigRepositoryAdapter implements ReadOnlyStoreConfigRepository, VeniceResource { + private final HelixReadOnlyStoreConfigRepository storeConfigRepository; + + public HelixReadOnlyStoreViewConfigRepositoryAdapter(HelixReadOnlyStoreConfigRepository storeConfigRepository) { + this.storeConfigRepository = storeConfigRepository; + } + + @Override + public void refresh() { + storeConfigRepository.refresh(); + } + + @Override + public void clear() { + storeConfigRepository.clear(); + } + + @Override + public Optional getStoreConfig(String storeName) { + return storeConfigRepository.getStoreConfig(VeniceView.getStoreName(storeName)); + } + + @Override + public StoreConfig getStoreConfigOrThrow(String storeName) { + return storeConfigRepository.getStoreConfigOrThrow(VeniceView.getStoreName(storeName)); + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java index 304eeaabae4..2137d1686f6 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java @@ -306,7 +306,7 @@ public DataRecoveryVersionConfig clone() { * A read-only wrapper of {@link Version} */ public static class ReadOnlyVersion implements Version { - private final Version delegate; + protected final Version delegate; public ReadOnlyVersion(Version delegate) { this.delegate = delegate; @@ -769,7 +769,7 @@ public boolean equals(Object o) { } } - private final Store delegate; + protected final Store delegate; public ReadOnlyStore(Store delegate) { this.delegate = delegate; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyViewStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyViewStore.java new file mode 100644 index 00000000000..eb442ae24f8 --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyViewStore.java @@ -0,0 +1,156 @@ +package com.linkedin.venice.meta; + +import com.linkedin.venice.exceptions.StoreVersionNotFoundException; +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.views.MaterializedView; +import com.linkedin.venice.views.VeniceView; +import com.linkedin.venice.views.ViewUtils; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + + +public class ReadOnlyViewStore extends ReadOnlyStore { + private final Map viewVersionMap; + private final String viewStoreName; + + public ReadOnlyViewStore(Store delegate, String viewStoreName) { + super(delegate); + this.viewVersionMap = new HashMap<>(); + this.viewStoreName = viewStoreName; + // Decorate the Store with appropriate version list based on the provided view + List storeVersions = delegate.getVersions(); + String viewName = VeniceView.getViewNameFromViewStoreName(viewStoreName); + for (Version version: storeVersions) { + ViewConfig viewConfig = version.getViewConfigs().get(viewName); + if (viewConfig == null) { + // versions that do not contain the corresponding view name is omitted + continue; + } + viewVersionMap + .put(version.getNumber(), new ReadOnlyViewStore.ReadOnlyMaterializedViewVersion(version, viewConfig)); + } + } + + @Override + public String getName() { + return viewStoreName; + } + + @Override + public List getVersions() { + return new ArrayList<>(viewVersionMap.values()); + } + + @Override + @Nullable + public Version getVersion(int versionNumber) { + return viewVersionMap.get(versionNumber); + } + + @Override + @Nonnull + public Version getVersionOrThrow(int versionNumber) throws StoreVersionNotFoundException { + Version version = viewVersionMap.get(versionNumber); + if (version == null) { + throw new StoreVersionNotFoundException(getName(), versionNumber); + } + return version; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ReadOnlyViewStore storeView = (ReadOnlyViewStore) o; + return this.delegate.equals(storeView.delegate) && this.viewVersionMap.equals(storeView.viewVersionMap); + } + + @Override + public int hashCode() { + return Objects.hash(delegate, viewVersionMap); + } + + /** + * A read only {@link Version} representation of a materialized view for consumers (e.g. DVC client). Any view + * specific version properties will be overwritten here and provided by the corresponding {@link MaterializedView}. + */ + public static class ReadOnlyMaterializedViewVersion extends ReadOnlyStore.ReadOnlyVersion { + private static final String CONSTRUCTOR_ERROR_MESSAGE = "Cannot construct materialized view version because "; + private final MaterializedView materializedView; + private final String materializedViewTopicName; + + public ReadOnlyMaterializedViewVersion(Version delegate, ViewConfig viewConfig) { + super(delegate); + if (viewConfig == null) { + throw new VeniceException(CONSTRUCTOR_ERROR_MESSAGE + "provided viewConfig is null"); + } + VeniceView veniceView = ViewUtils.getVeniceView( + viewConfig.getViewClassName(), + new Properties(), + delegate.getStoreName(), + viewConfig.getViewParameters()); + if (!(veniceView instanceof MaterializedView)) { + throw new VeniceException( + CONSTRUCTOR_ERROR_MESSAGE + viewConfig.getViewClassName() + " is not a " + + MaterializedView.class.getCanonicalName()); + } + this.materializedView = (MaterializedView) veniceView; + Optional optionalMaterializedViewTopicName = + materializedView.getTopicNamesAndConfigsForVersion(delegate.getNumber()).keySet().stream().findAny(); + if (!optionalMaterializedViewTopicName.isPresent()) { + throw new VeniceException( + CONSTRUCTOR_ERROR_MESSAGE + "view topic map is empty for view: " + materializedView.getViewName() + + ", version: " + delegate.getNumber()); + } + this.materializedViewTopicName = optionalMaterializedViewTopicName.get(); + } + + @Override + public int getPartitionCount() { + return materializedView.getViewPartitionCount(); + } + + @Override + public String kafkaTopicName() { + return materializedViewTopicName; + } + + @Override + public PartitionerConfig getPartitionerConfig() { + return materializedView.getPartitionerConfig(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ReadOnlyMaterializedViewVersion version = (ReadOnlyMaterializedViewVersion) o; + return this.delegate.equals(version.delegate) && this.materializedView.equals(version.materializedView); + } + + @Override + public int hashCode() { + return Objects.hash(delegate, materializedView); + } + + @Override + public String getStoreName() { + return VeniceView.getViewStoreName(delegate.getStoreName(), materializedView.getViewName()); + } + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/KeyWithChunkingSuffixSerializer.java b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/KeyWithChunkingSuffixSerializer.java index eabce510d9d..cc15d7e4a18 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/KeyWithChunkingSuffixSerializer.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/KeyWithChunkingSuffixSerializer.java @@ -53,6 +53,13 @@ public ByteBuffer serializeNonChunkedKey(ByteBuffer key) { return serialize(key, serializedNonChunkKeySuffix); } + public byte[] getKeyFromChunkedKey(byte[] keyBytesWithChunkSuffix) { + int keyLength = keyBytesWithChunkSuffix.length - serializedNonChunkKeySuffix.length; + byte[] keyBytes = new byte[keyLength]; + System.arraycopy(keyBytesWithChunkSuffix, 0, keyBytes, 0, keyLength); + return keyBytes; + } + private ByteBuffer serialize(ByteBuffer key, byte[] encodedChunkedKeySuffix) { /** * Here will always allocate a new {@link ByteBuffer} to accommodate} the combination of key and chunked diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/utils/PartitionUtils.java b/internal/venice-common/src/main/java/com/linkedin/venice/utils/PartitionUtils.java index 68b14e69583..609b1ac1c76 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/utils/PartitionUtils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/utils/PartitionUtils.java @@ -8,6 +8,7 @@ import com.linkedin.venice.meta.Store; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.partitioner.VenicePartitioner; +import java.util.Collections; import java.util.Map; import java.util.Properties; import org.apache.avro.Schema; @@ -99,18 +100,22 @@ public static VenicePartitioner getUserPartitionLevelVenicePartitioner(Partition public static VenicePartitioner getVenicePartitioner(String partitionerClass, String partitionerParamsString) { Properties params = new Properties(); - if (partitionerParamsString != null) { - Map partitionerParamsMap = null; - try { - partitionerParamsMap = ObjectMapperFactory.getInstance().readValue(partitionerParamsString, Map.class); - } catch (JsonProcessingException e) { - throw new VeniceException("Invalid partitioner params string: " + partitionerParamsString, e); - } - params.putAll(partitionerParamsMap); - } + Map partitionerParamsMap = getPartitionerParamsMap(partitionerParamsString); + params.putAll(partitionerParamsMap); return getVenicePartitioner(partitionerClass, new VeniceProperties(params), null); } + public static Map getPartitionerParamsMap(String partitionerParamsString) { + if (partitionerParamsString == null) { + return Collections.emptyMap(); + } + try { + return ObjectMapperFactory.getInstance().readValue(partitionerParamsString, Map.class); + } catch (JsonProcessingException e) { + throw new VeniceException("Invalid partitioner params string: " + partitionerParamsString, e); + } + } + public static VenicePartitioner getVenicePartitioner(String partitionerClass, VeniceProperties params) { return getVenicePartitioner(partitionerClass, params, null); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java b/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java index 67e9b584204..d0656401e13 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java @@ -4,6 +4,8 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.meta.MaterializedViewParameters; +import com.linkedin.venice.meta.PartitionerConfig; +import com.linkedin.venice.meta.PartitionerConfigImpl; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.ViewConfig; @@ -18,27 +20,32 @@ public class MaterializedView extends VeniceView { + private static final int DEFAULT_AMP_FACTOR = 1; public static final String MATERIALIZED_VIEW_TOPIC_SUFFIX = "_mv"; public static final String MATERIALIZED_VIEW_WRITER_CLASS_NAME = "com.linkedin.davinci.store.view.MaterializedViewWriter"; private static final String MISSING_PARAMETER_MESSAGE = "%s is required for materialized view!"; + private final String viewName; private final int viewPartitionCount; - + private final PartitionerConfig partitionerConfig; private Lazy viewPartitioner; public MaterializedView(Properties props, String storeName, Map viewParameters) { super(props, storeName, viewParameters); + this.viewName = viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name()); // Override topic partition count config - viewPartitionCount = + this.viewPartitionCount = Integer.parseInt(viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name())); this.props.put(PARTITION_COUNT, viewPartitionCount); - viewPartitioner = Lazy.of(() -> { - String viewPartitionerClass = - this.viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER.name()); - String viewPartitionerParamsString = - this.viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER_PARAMS.name()); - return PartitionUtils.getVenicePartitioner(viewPartitionerClass, viewPartitionerParamsString); - }); + String viewPartitionerClass = + this.viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER.name()); + String viewPartitionerParamsString = + this.viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER_PARAMS.name()); + this.viewPartitioner = + Lazy.of(() -> PartitionUtils.getVenicePartitioner(viewPartitionerClass, viewPartitionerParamsString)); + Map viewPartitionerParamsMap = PartitionUtils.getPartitionerParamsMap(viewPartitionerParamsString); + this.partitionerConfig = + new PartitionerConfigImpl(viewPartitionerClass, viewPartitionerParamsMap, DEFAULT_AMP_FACTOR); } @Override @@ -54,10 +61,8 @@ public VeniceWriterOptions.Builder getWriterOptionsBuilder(String viewTopicName, @Override public Map getTopicNamesAndConfigsForVersion(int version) { VeniceProperties properties = new VeniceProperties(props); - String viewName = viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name()); return Collections.singletonMap( - Version.composeKafkaTopic(storeName, version) + VIEW_TOPIC_SEPARATOR + viewName - + MATERIALIZED_VIEW_TOPIC_SUFFIX, + Version.composeKafkaTopic(storeName, version) + VIEW_NAME_SEPARATOR + viewName + MATERIALIZED_VIEW_TOPIC_SUFFIX, properties); } @@ -129,4 +134,12 @@ public int getViewPartitionCount() { public VenicePartitioner getViewPartitioner() { return viewPartitioner.get(); } + + public String getViewName() { + return viewName; + } + + public PartitionerConfig getPartitionerConfig() { + return partitionerConfig; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java b/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java index 1f7cb007719..cac0706ba87 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java @@ -30,7 +30,8 @@ * this interface for lifecycle management of arbitrary resources (not just kafka topics). */ public abstract class VeniceView { - public static final String VIEW_TOPIC_SEPARATOR = "_"; + public static final String VIEW_STORE_PREFIX = "view_store_"; + public static final String VIEW_NAME_SEPARATOR = "_"; protected final Properties props; protected final String storeName; protected final Map viewParameters; @@ -106,7 +107,63 @@ public static int parseVersionFromViewTopic(String topicName) { throw new VeniceException("Cannot parse version because this is not a view topic, topic name: " + topicName); } int versionStartIndex = Version.getLastIndexOfVersionSeparator(topicName) + Version.VERSION_SEPARATOR.length(); - int versionEndIndex = versionStartIndex + topicName.substring(versionStartIndex).indexOf(VIEW_TOPIC_SEPARATOR); + int versionEndIndex = versionStartIndex + topicName.substring(versionStartIndex).indexOf(VIEW_NAME_SEPARATOR); return Integer.parseInt(topicName.substring(versionStartIndex, versionEndIndex)); } + + /** + * @param topicName for the view topic, e.g. batchStore_148ff3a146001_v1_MaterializedViewTest_mv + * @return the corresponding store name and view name for the given view topic in a single string. + */ + public static String parseStoreAndViewFromViewTopic(String topicName) { + if (!isViewTopic(topicName)) { + throw new IllegalArgumentException( + "Cannot parse store and view because this is not a view topic, topic name: " + topicName); + } + String storeName = parseStoreFromViewTopic(topicName); + int viewTopicSuffixIndex = topicName.lastIndexOf(VIEW_NAME_SEPARATOR); + int viewNameStartIndex = topicName.substring(0, viewTopicSuffixIndex).lastIndexOf(VIEW_NAME_SEPARATOR); + return VIEW_STORE_PREFIX + storeName + topicName.substring(viewNameStartIndex, viewTopicSuffixIndex); + } + + public static String getViewStoreName(String storeName, String viewName) { + if (isViewStore(storeName)) { + throw new IllegalArgumentException(storeName + " is already associated with a view"); + } + return VIEW_STORE_PREFIX + storeName + VIEW_NAME_SEPARATOR + viewName; + } + + public static String getStoreNameFromViewStoreName(String viewStoreName) { + if (!isViewStore(viewStoreName)) { + throw new IllegalArgumentException(viewStoreName + " is not a view store"); + } + String viewStoreNameWithoutPrefix = viewStoreName.substring(VIEW_STORE_PREFIX.length()); + int storeNameEndIndex = viewStoreNameWithoutPrefix.lastIndexOf(VIEW_NAME_SEPARATOR); + return viewStoreNameWithoutPrefix.substring(0, storeNameEndIndex); + } + + public static String getViewNameFromViewStoreName(String viewStoreName) { + if (!isViewStore(viewStoreName)) { + throw new IllegalArgumentException(viewStoreName + " is not a view store"); + } + int viewNameStartIndex = viewStoreName.lastIndexOf(VIEW_NAME_SEPARATOR); + return viewStoreName.substring(viewNameStartIndex + 1); + } + + public static boolean isViewStore(String storeName) { + return storeName.startsWith(VIEW_STORE_PREFIX); + } + + /** + * @param storeName + * @return storeName if the provided store name is a regular Venice store name or extract the regular Venice store + * name if the provided storeName is a view store name. + */ + public static String getStoreName(String storeName) { + if (isViewStore(storeName)) { + return getStoreNameFromViewStoreName(storeName); + } else { + return storeName; + } + } } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/meta/ReadOnlyViewStoreTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/meta/ReadOnlyViewStoreTest.java new file mode 100644 index 00000000000..18b2800d88f --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/meta/ReadOnlyViewStoreTest.java @@ -0,0 +1,89 @@ +package com.linkedin.venice.meta; + +import static com.linkedin.venice.views.MaterializedView.MATERIALIZED_VIEW_TOPIC_SUFFIX; +import static com.linkedin.venice.views.VeniceView.VIEW_NAME_SEPARATOR; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; + +import com.linkedin.venice.exceptions.StoreVersionNotFoundException; +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.partitioner.ConstantVenicePartitioner; +import com.linkedin.venice.partitioner.DefaultVenicePartitioner; +import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.views.ChangeCaptureView; +import com.linkedin.venice.views.MaterializedView; +import com.linkedin.venice.views.VeniceView; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.testng.annotations.Test; + + +public class ReadOnlyViewStoreTest { + @Test + public void testViewStoreAndVersions() { + String storeName = Utils.getUniqueString("testStore"); + String viewName = "testView"; + String viewStoreName = VeniceView.getViewStoreName(storeName, viewName); + Store store = mock(Store.class); + List storeVersions = new ArrayList<>(); + Version storeVersion = new VersionImpl(storeName, 1, "dummyId"); + Version storeVersion2 = new VersionImpl(storeName, 2, "dummyId2"); + storeVersions.add(storeVersion); + storeVersions.add(storeVersion2); + int storePartitionCount = 6; + PartitionerConfig storePartitionerConfig = new PartitionerConfigImpl(); + storePartitionerConfig.setPartitionerClass(DefaultVenicePartitioner.class.getCanonicalName()); + for (Version version: storeVersions) { + version.setPartitionCount(storePartitionCount); + version.setPartitionerConfig(storePartitionerConfig); + version.setChunkingEnabled(true); + version.setRmdChunkingEnabled(true); + } + MaterializedViewParameters.Builder builder = new MaterializedViewParameters.Builder(viewName); + builder.setPartitionCount(12).setPartitioner(ConstantVenicePartitioner.class.getCanonicalName()); + ViewConfig viewConfig = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build()); + Map storeViewConfigMap = new HashMap<>(); + storeViewConfigMap.put(viewName, viewConfig); + storeVersion2.setViewConfigs(storeViewConfigMap); + doReturn(storeVersions).when(store).getVersions(); + doReturn(storeName).when(store).getName(); + doReturn(2).when(store).getCurrentVersion(); + + Store viewStore = new ReadOnlyViewStore(store, viewStoreName); + assertEquals(viewStore.getName(), viewStoreName); + // v1 should be filtered since it doesn't have view configs + assertEquals(viewStore.getVersions().size(), 1); + assertNull(viewStore.getVersion(1)); + assertThrows(StoreVersionNotFoundException.class, () -> viewStore.getVersionOrThrow(1)); + Version viewStoreVersion = viewStore.getVersionOrThrow(viewStore.getCurrentVersion()); + assertNotNull(viewStoreVersion); + assertTrue(viewStoreVersion instanceof ReadOnlyViewStore.ReadOnlyMaterializedViewVersion); + assertEquals(viewStoreVersion.getPartitionCount(), 12); + assertEquals( + viewStoreVersion.getPartitionerConfig().getPartitionerClass(), + ConstantVenicePartitioner.class.getCanonicalName()); + String viewStoreVersionTopicName = + Version.composeKafkaTopic(storeName, 2) + VIEW_NAME_SEPARATOR + viewName + MATERIALIZED_VIEW_TOPIC_SUFFIX; + assertEquals(viewStoreVersion.kafkaTopicName(), viewStoreVersionTopicName); + assertTrue(viewStoreVersion.isChunkingEnabled()); + assertTrue(viewStoreVersion.isRmdChunkingEnabled()); + } + + @Test + public void testInvalidViewConfig() { + String storeName = Utils.getUniqueString("testStore"); + Version version = new VersionImpl(storeName, 1, "dummyId"); + assertThrows(VeniceException.class, () -> new ReadOnlyViewStore.ReadOnlyMaterializedViewVersion(version, null)); + ViewConfig invalidViewConfig = new ViewConfigImpl(ChangeCaptureView.class.getCanonicalName(), new HashMap<>()); + assertThrows( + VeniceException.class, + () -> new ReadOnlyViewStore.ReadOnlyMaterializedViewVersion(version, invalidViewConfig)); + } +} diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/views/TestMaterializedView.java b/internal/venice-common/src/test/java/com/linkedin/venice/views/TestMaterializedView.java index 387887a9df7..af14206aea7 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/views/TestMaterializedView.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/views/TestMaterializedView.java @@ -85,10 +85,11 @@ public void testValidateConfigs() { @Test public void testRePartitionViewTopicProcessing() { - String storeName = "test-store"; + String storeName = "test-store_V2"; Map viewParams = new HashMap<>(); int version = 8; String rePartitionViewName = "test-view"; + String viewStoreName = VeniceView.getViewStoreName(storeName, rePartitionViewName); viewParams.put(MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name(), rePartitionViewName); viewParams.put(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "24"); MaterializedView materializedView = new MaterializedView(new Properties(), storeName, viewParams); @@ -100,7 +101,17 @@ public void testRePartitionViewTopicProcessing() { assertTrue(VeniceView.isViewTopic(viewTopic)); assertEquals(VeniceView.parseStoreFromViewTopic(viewTopic), storeName); assertEquals(VeniceView.parseVersionFromViewTopic(viewTopic), version); + assertEquals(VeniceView.parseStoreAndViewFromViewTopic(viewTopic), viewStoreName); } + assertEquals(VeniceView.getStoreNameFromViewStoreName(viewStoreName), storeName); + assertEquals(VeniceView.getViewNameFromViewStoreName(viewStoreName), rePartitionViewName); + + assertThrows( + IllegalArgumentException.class, + () -> VeniceView.parseStoreAndViewFromViewTopic(Version.composeKafkaTopic(storeName, version))); + assertThrows(IllegalArgumentException.class, () -> VeniceView.getViewStoreName(viewStoreName, "another-test-view")); + assertThrows(IllegalArgumentException.class, () -> VeniceView.getStoreNameFromViewStoreName(storeName)); + assertThrows(IllegalArgumentException.class, () -> VeniceView.getViewNameFromViewStoreName(storeName)); } @Test diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java index 8aa14960fc5..2e30a113a01 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java @@ -2,40 +2,61 @@ import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED; import static com.linkedin.venice.ConfigKeys.CHILD_DATA_CENTER_KAFKA_URL_PREFIX; +import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS; +import static com.linkedin.venice.ConfigKeys.CLIENT_USE_SYSTEM_STORE_REPOSITORY; +import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH; +import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE; import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_PARENT_DATA_CENTER_REGION_NAME; +import static com.linkedin.venice.utils.TestWriteUtils.DEFAULT_USER_DATA_VALUE_PREFIX; import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory; import static com.linkedin.venice.views.MaterializedView.MATERIALIZED_VIEW_TOPIC_SUFFIX; -import static com.linkedin.venice.views.VeniceView.VIEW_TOPIC_SEPARATOR; +import static com.linkedin.venice.views.VeniceView.VIEW_NAME_SEPARATOR; import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_KEY_FIELD_PROP; import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_VALUE_FIELD_PROP; import static com.linkedin.venice.vpj.VenicePushJobConstants.KAFKA_INPUT_BROKER_URL; import static com.linkedin.venice.vpj.VenicePushJobConstants.SOURCE_KAFKA; +import static org.testng.Assert.assertNotNull; +import com.linkedin.d2.balancer.D2Client; +import com.linkedin.davinci.client.DaVinciClient; +import com.linkedin.davinci.client.DaVinciConfig; +import com.linkedin.davinci.client.factory.CachingDaVinciClientFactory; import com.linkedin.venice.ConfigKeys; +import com.linkedin.venice.D2.D2ClientUtils; import com.linkedin.venice.controller.VeniceHelixAdmin; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; +import com.linkedin.venice.integration.utils.D2TestUtils; import com.linkedin.venice.integration.utils.ServiceFactory; import com.linkedin.venice.integration.utils.VeniceControllerWrapper; import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper; +import com.linkedin.venice.integration.utils.VeniceRouterWrapper; import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; import com.linkedin.venice.meta.MaterializedViewParameters; +import com.linkedin.venice.meta.PersistenceType; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.ViewConfig; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.utils.IntegrationTestPushUtils; +import com.linkedin.venice.utils.PropertyBuilder; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.TestWriteUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.views.MaterializedView; +import com.linkedin.venice.views.VeniceView; +import io.tehuti.Metric; +import io.tehuti.metrics.MetricsRepository; import it.unimi.dsi.fastutil.ints.Int2LongMap; import java.io.File; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import org.apache.avro.Schema; @@ -126,8 +147,8 @@ public void testLFIngestionWithMaterializedView() throws IOException { TestWriteUtils.runPushJob("Run push job", props); // TODO we will verify the actual content once the DVC consumption part of the view topic is completed. // For now just check for topic existence and that they contain some records. - String viewTopicName = Version.composeKafkaTopic(storeName, 1) + VIEW_TOPIC_SEPARATOR + testViewName - + MATERIALIZED_VIEW_TOPIC_SUFFIX; + String viewTopicName = + Version.composeKafkaTopic(storeName, 1) + VIEW_NAME_SEPARATOR + testViewName + MATERIALIZED_VIEW_TOPIC_SUFFIX; String versionTopicName = Version.composeKafkaTopic(storeName, 1); validateViewTopicAndVersionTopic(viewTopicName, versionTopicName, 6, 3, 100); @@ -137,13 +158,138 @@ public void testLFIngestionWithMaterializedView() throws IOException { rePushProps.setProperty(SOURCE_KAFKA, "true"); rePushProps.setProperty(KAFKA_INPUT_BROKER_URL, childDatacenters.get(0).getPubSubBrokerWrapper().getAddress()); TestWriteUtils.runPushJob("Run push job", rePushProps); - String rePushViewTopicName = Version.composeKafkaTopic(storeName, 2) + VIEW_TOPIC_SEPARATOR + testViewName - + MATERIALIZED_VIEW_TOPIC_SUFFIX; + String rePushViewTopicName = + Version.composeKafkaTopic(storeName, 2) + VIEW_NAME_SEPARATOR + testViewName + MATERIALIZED_VIEW_TOPIC_SUFFIX; String rePushVersionTopicName = Version.composeKafkaTopic(storeName, 2); validateViewTopicAndVersionTopic(rePushViewTopicName, rePushVersionTopicName, 6, 3, 100); } } + @Test(timeOut = TEST_TIMEOUT) + public void testBatchOnlyMaterializedViewDVCConsumer() throws IOException, ExecutionException, InterruptedException { + // Create a batch only store with materialized view and run batch push job with 100 records + File inputDir = getTempDataDirectory(); + Schema recordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToStringSchema(inputDir); + String inputDirPath = "file:" + inputDir.getAbsolutePath(); + String storeName = Utils.getUniqueString("batchStore"); + Properties props = + TestWriteUtils.defaultVPJProps(parentControllers.get(0).getControllerUrl(), inputDirPath, storeName); + String keySchemaStr = recordSchema.getField(DEFAULT_KEY_FIELD_PROP).schema().toString(); + String valueSchemaStr = recordSchema.getField(DEFAULT_VALUE_FIELD_PROP).schema().toString(); + UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(false) + .setChunkingEnabled(true) + .setRmdChunkingEnabled(true) + .setNativeReplicationEnabled(true) + .setNativeReplicationSourceFabric(childDatacenters.get(0).getRegionName()) + .setPartitionCount(3); + String testViewName = "MaterializedViewTest"; + try (ControllerClient controllerClient = + IntegrationTestPushUtils.createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, storeParms)) { + MaterializedViewParameters.Builder viewParamBuilder = + new MaterializedViewParameters.Builder(testViewName).setPartitionCount(1); + UpdateStoreQueryParams updateViewParam = new UpdateStoreQueryParams().setViewName(testViewName) + .setViewClassName(MaterializedView.class.getCanonicalName()) + .setViewClassParams(viewParamBuilder.build()); + controllerClient + .retryableRequest(5, controllerClient1 -> controllerClient.updateStore(storeName, updateViewParam)); + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, false, () -> { + Map viewConfigMap = controllerClient.getStore(storeName).getStore().getViewConfigs(); + Assert.assertEquals(viewConfigMap.size(), 1); + Assert.assertEquals( + viewConfigMap.get(testViewName).getViewClassName(), + MaterializedView.class.getCanonicalName()); + }); + } + TestWriteUtils.runPushJob("Run push job", props); + // Start a DVC client that's subscribed to partition 0 of the store's materialized view. The DVC client should + // contain all data records. + VeniceProperties backendConfig = + new PropertyBuilder().put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath()) + .put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB) + .put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) + .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) + .build(); + DaVinciConfig daVinciConfig = new DaVinciConfig(); + // Use non-source fabric region to also verify NR for materialized view. + D2Client daVinciD2RemoteFabric = D2TestUtils + .getAndStartD2Client(multiRegionMultiClusterWrapper.getChildRegions().get(1).getZkServerWrapper().getAddress()); + MetricsRepository dvcMetricsRepo = new MetricsRepository(); + try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( + daVinciD2RemoteFabric, + VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, + dvcMetricsRepo, + backendConfig)) { + // Ensure compatibility of store and view DVC clients since it's possible for the same JVM to subscribe to a store + // and its view(s). + DaVinciClient storeClient = factory.getAndStartGenericAvroClient(storeName, daVinciConfig); + storeClient.subscribeAll().get(); + // writeSimpleAvroFileWithStringToStringSchema writes input starting at 1 and stops with <= recordCount (100) + for (int i = 1; i <= 100; i++) { + Assert.assertEquals(storeClient.get(Integer.toString(i)).get().toString(), DEFAULT_USER_DATA_VALUE_PREFIX + i); + } + DaVinciClient viewClient = + factory.getAndStartGenericAvroClient(storeName, testViewName, daVinciConfig); + viewClient.subscribe(Collections.singleton(0)).get(); + Assert.assertEquals( + getMetric( + dvcMetricsRepo, + "current_version_number.Gauge", + VeniceView.getViewStoreName(storeName, testViewName)), + (double) 1); + for (int i = 1; i <= 100; i++) { + Assert.assertEquals(viewClient.get(Integer.toString(i)).get().toString(), DEFAULT_USER_DATA_VALUE_PREFIX + i); + } + // Perform another push with 200 keys to verify future version ingestion and version swap + File newPushInputDir = getTempDataDirectory(); + TestWriteUtils.writeSimpleAvroFileWithStringToStringSchema(newPushInputDir, 200); + String newPushInputDirPath = "file:" + newPushInputDir.getAbsolutePath(); + Properties newPushProps = + TestWriteUtils.defaultVPJProps(parentControllers.get(0).getControllerUrl(), newPushInputDirPath, storeName); + TestWriteUtils.runPushJob("Run another push job", newPushProps); + Assert.assertEquals( + getMetric( + dvcMetricsRepo, + "current_version_number.Gauge", + VeniceView.getViewStoreName(storeName, testViewName)), + (double) 2); + // The materialized view DVC client should be able to read all the keys from the new push + for (int i = 1; i <= 200; i++) { + Assert.assertEquals(viewClient.get(Integer.toString(i)).get().toString(), DEFAULT_USER_DATA_VALUE_PREFIX + i); + } + } finally { + D2ClientUtils.shutdownClient(daVinciD2RemoteFabric); + } + // Make sure things work in the source fabric too. + VeniceProperties newBackendConfig = + new PropertyBuilder().put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath()) + .put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB) + .put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) + .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) + .build(); + D2Client daVinciD2SourceFabric = D2TestUtils + .getAndStartD2Client(multiRegionMultiClusterWrapper.getChildRegions().get(1).getZkServerWrapper().getAddress()); + try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( + daVinciD2SourceFabric, + VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, + new MetricsRepository(), + newBackendConfig)) { + DaVinciClient viewClient = + factory.getAndStartGenericAvroClient(storeName, testViewName, daVinciConfig); + viewClient.subscribe(Collections.singleton(0)).get(); + for (int i = 1; i <= 200; i++) { + Assert.assertEquals(viewClient.get(Integer.toString(i)).get().toString(), DEFAULT_USER_DATA_VALUE_PREFIX + i); + } + } finally { + D2ClientUtils.shutdownClient(daVinciD2SourceFabric); + } + } + + private double getMetric(MetricsRepository metricsRepository, String metricName, String storeName) { + Metric metric = metricsRepository.getMetric("." + storeName + "--" + metricName); + assertNotNull(metric, "Expected metric " + metricName + " not found."); + return metric.value(); + } + private void validateViewTopicAndVersionTopic( String viewTopicName, String versionTopicName, diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/MockVeniceRouterWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/MockVeniceRouterWrapper.java index 0fe0e873158..266a9c10b45 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/MockVeniceRouterWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/MockVeniceRouterWrapper.java @@ -20,8 +20,8 @@ import com.linkedin.venice.helix.HelixHybridStoreQuotaRepository; import com.linkedin.venice.helix.HelixLiveInstanceMonitor; import com.linkedin.venice.helix.HelixReadOnlySchemaRepository; -import com.linkedin.venice.helix.HelixReadOnlyStoreConfigRepository; import com.linkedin.venice.helix.HelixReadOnlyStoreRepository; +import com.linkedin.venice.helix.HelixReadOnlyStoreViewConfigRepositoryAdapter; import com.linkedin.venice.meta.Instance; import com.linkedin.venice.meta.PartitionerConfig; import com.linkedin.venice.meta.PartitionerConfigImpl; @@ -105,8 +105,8 @@ static StatefulServiceProvider generateService( doReturn(CONTROLLER).when(mockControllerInstance).getUrl(); doReturn(mockControllerInstance).when(mockRepo).getLeaderController(); - HelixReadOnlyStoreConfigRepository mockStoreConfigRepository = - Mockito.mock(HelixReadOnlyStoreConfigRepository.class); + HelixReadOnlyStoreViewConfigRepositoryAdapter mockStoreConfigRepository = + Mockito.mock(HelixReadOnlyStoreViewConfigRepositoryAdapter.class); HelixLiveInstanceMonitor mockLiveInstanceMonitor = Mockito.mock(HelixLiveInstanceMonitor.class); doReturn(true).when(mockLiveInstanceMonitor).isInstanceAlive(any()); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java index acad086a82d..5d9f5dbcb8b 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java @@ -45,7 +45,11 @@ public CompletableFuture processRecord( } @Override - public CompletableFuture processRecord(ByteBuffer newValue, byte[] key, int newValueSchemaId) { + public CompletableFuture processRecord( + ByteBuffer newValue, + byte[] key, + int newValueSchemaId, + boolean isChunkedKey) { internalView.incrementRecordCount(storeName); return CompletableFuture.completedFuture(null); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index cc8a995eb86..8291c4117e2 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -1993,6 +1993,12 @@ private void checkStoreNameConflict(String storeName, boolean allowSystemStore) throw new VeniceException("Store name: " + storeName + " clashes with the internal usage, please change it"); } + if (VeniceView.isViewStore(storeName)) { + throw new VeniceException( + "Store name: " + storeName + " clashes with the internal usage, please remove the prefix: " + + VeniceView.VIEW_STORE_PREFIX); + } + if (!allowSystemStore && VeniceSystemStoreUtils.isSystemStore(storeName)) { throw new VeniceException( "Store name: " + storeName + " clashes with the Venice system store usage, please change it"); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 5e0a19fc557..2e680a42fb8 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -74,6 +74,7 @@ import static com.linkedin.venice.meta.VersionStatus.PUSHED; import static com.linkedin.venice.serialization.avro.AvroProtocolDefinition.BATCH_JOB_HEARTBEAT; import static com.linkedin.venice.serialization.avro.AvroProtocolDefinition.PUSH_JOB_DETAILS; +import static com.linkedin.venice.views.VeniceView.VIEW_NAME_SEPARATOR; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -2864,8 +2865,11 @@ private ViewConfig validateAndDecorateStoreViewConfig(Store store, ViewConfig vi // TODO: Pass a proper properties object here. Today this isn't used in this context if (viewConfig.getViewClassName().equals(MaterializedView.class.getCanonicalName())) { if (viewName.contains(VERSION_SEPARATOR)) { + throw new VeniceException(String.format("View name cannot contain version separator: %s", VERSION_SEPARATOR)); + } + if (viewName.contains(VIEW_NAME_SEPARATOR)) { throw new VeniceException( - String.format("Materialized View name cannot contain version separator: %s", VERSION_SEPARATOR)); + String.format("View name cannot contain view name separator: %s", VIEW_NAME_SEPARATOR)); } Map viewParams = viewConfig.getViewParameters(); MaterializedViewParameters.Builder decoratedViewParamBuilder = diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/RouterServer.java b/services/venice-router/src/main/java/com/linkedin/venice/router/RouterServer.java index 347775feddb..5911845e237 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/RouterServer.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/RouterServer.java @@ -33,6 +33,7 @@ import com.linkedin.venice.helix.HelixReadOnlyStoreConfigRepository; import com.linkedin.venice.helix.HelixReadOnlyStoreRepository; import com.linkedin.venice.helix.HelixReadOnlyStoreRepositoryAdapter; +import com.linkedin.venice.helix.HelixReadOnlyStoreViewConfigRepositoryAdapter; import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository; import com.linkedin.venice.helix.HelixReadOnlyZKSharedSystemStoreRepository; import com.linkedin.venice.helix.SafeHelixManager; @@ -162,7 +163,7 @@ public class RouterServer extends AbstractVeniceService { private Optional hybridStoreQuotaRepository; private ReadOnlyStoreRepository metadataRepository; private RouterStats routerStats; - private HelixReadOnlyStoreConfigRepository storeConfigRepository; + private HelixReadOnlyStoreViewConfigRepositoryAdapter storeConfigRepository; private PushStatusStoreReader pushStatusStoreReader; private HelixLiveInstanceMonitor liveInstanceMonitor; @@ -363,7 +364,8 @@ public RouterServer( this.hybridStoreQuotaRepository = config.isHelixHybridStoreQuotaEnabled() ? Optional.of(new HelixHybridStoreQuotaRepository(manager)) : Optional.empty(); - this.storeConfigRepository = new HelixReadOnlyStoreConfigRepository(zkClient, adapter); + this.storeConfigRepository = + new HelixReadOnlyStoreViewConfigRepositoryAdapter(new HelixReadOnlyStoreConfigRepository(zkClient, adapter)); this.liveInstanceMonitor = new HelixLiveInstanceMonitor(this.zkClient, config.getClusterName()); this.pushStatusStoreReader = new PushStatusStoreReader( @@ -419,7 +421,7 @@ public RouterServer( Optional hybridStoreQuotaRepository, HelixReadOnlyStoreRepository metadataRepository, HelixReadOnlySchemaRepository schemaRepository, - HelixReadOnlyStoreConfigRepository storeConfigRepository, + HelixReadOnlyStoreViewConfigRepositoryAdapter storeConfigRepository, List serviceDiscoveryAnnouncers, Optional sslFactory, HelixLiveInstanceMonitor liveInstanceMonitor) { diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceVersionFinder.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceVersionFinder.java index e8670193a03..ffb9cb212b9 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceVersionFinder.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceVersionFinder.java @@ -9,8 +9,8 @@ import com.linkedin.venice.exceptions.VeniceNoStoreException; import com.linkedin.venice.exceptions.VeniceStoreIsMigratedException; import com.linkedin.venice.helix.HelixBaseRoutingRepository; -import com.linkedin.venice.helix.HelixReadOnlyStoreConfigRepository; import com.linkedin.venice.meta.Instance; +import com.linkedin.venice.meta.ReadOnlyStoreConfigRepository; import com.linkedin.venice.meta.ReadOnlyStoreRepository; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreConfig; @@ -41,7 +41,7 @@ public class VeniceVersionFinder { private final ReadOnlyStoreRepository metadataRepository; private final StaleVersionStats stats; - private final HelixReadOnlyStoreConfigRepository storeConfigRepo; + private final ReadOnlyStoreConfigRepository storeConfigRepo; private final Map clusterToD2Map; private final String clusterName; private final ConcurrentMap lastCurrentVersionMap = new ConcurrentHashMap<>(); @@ -57,7 +57,7 @@ public VeniceVersionFinder( ReadOnlyStoreRepository metadataRepository, HelixBaseRoutingRepository routingDataRepository, StaleVersionStats stats, - HelixReadOnlyStoreConfigRepository storeConfigRepo, + ReadOnlyStoreConfigRepository storeConfigRepo, Map clusterToD2Map, String clusterName, CompressorFactory compressorFactory,