Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[changelog] Make changes for venice view consumption #1497

Merged
merged 13 commits into from
Feb 12, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public class ChangelogClientConfig<T extends SpecificRecord> {
private Properties consumerProperties;
private SchemaReader schemaReader;
private String viewName;
private Boolean isBeforeImageView = false;

private String consumerName = "";

Expand Down Expand Up @@ -219,7 +220,17 @@ public static <V extends SpecificRecord> ChangelogClientConfig<V> cloneConfig(Ch
.setRocksDBBlockCacheSizeInBytes(config.getRocksDBBlockCacheSizeInBytes())
.setConsumerName(config.consumerName)
.setDatabaseSyncBytesInterval(config.getDatabaseSyncBytesInterval())
.setShouldCompactMessages(config.shouldCompactMessages());
.setShouldCompactMessages(config.shouldCompactMessages())
.setIsBeforeImageView(config.isBeforeImageView());
return newConfig;
}

protected Boolean isBeforeImageView() {
return isBeforeImageView;
}

public ChangelogClientConfig setIsBeforeImageView(Boolean beforeImageView) {
isBeforeImageView = beforeImageView;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaOffsetPosition;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
Expand Down Expand Up @@ -93,7 +94,8 @@ public InternalLocalBootstrappingVeniceChangelogConsumer(
bootstrapStateMap = new VeniceConcurrentHashMap<>();
syncBytesInterval = changelogClientConfig.getDatabaseSyncBytesInterval();
metricsRepository = changelogClientConfig.getInnerClientConfig().getMetricsRepository();
String localStateTopicNameTemp = changelogClientConfig.getStoreName() + LOCAL_STATE_TOPIC_SUFFIX;
String viewNamePath = changelogClientConfig.getViewName() == null ? "" : "-" + changelogClientConfig.getViewName();
String localStateTopicNameTemp = changelogClientConfig.getStoreName() + viewNamePath + LOCAL_STATE_TOPIC_SUFFIX;
String bootstrapFileSystemPath = changelogClientConfig.getBootstrapFileSystemPath();
if (StringUtils.isNotEmpty(consumerId)) {
localStateTopicNameTemp += "-" + consumerId;
Expand Down Expand Up @@ -183,7 +185,8 @@ private Function<String, Boolean> functionToCheckWhetherStorageEngineShouldBeKep
protected boolean handleVersionSwapControlMessage(
ControlMessage controlMessage,
PubSubTopicPartition pubSubTopicPartition,
String topicSuffix) {
String topicSuffix,
Integer upstreamPartition) {
ControlMessageType controlMessageType = ControlMessageType.valueOf(controlMessage);
if (controlMessageType.equals(ControlMessageType.VERSION_SWAP)) {
VersionSwap versionSwap = (VersionSwap) controlMessage.controlMessageUnion;
Expand Down Expand Up @@ -506,10 +509,9 @@ public CompletableFuture<Void> start(Set<Integer> partitions) {

storageService.start();
try {
storeRepository.start();
storeRepository.subscribe(storeName);
} catch (InterruptedException e) {
throw new RuntimeException(e);
throw new VeniceException("Failed to start bootstrapping changelog consumer with error:", e);
}

return seekWithBootStrap(partitions);
Expand All @@ -518,7 +520,13 @@ public CompletableFuture<Void> start(Set<Integer> partitions) {
@Override
public CompletableFuture<Void> start() {
Set<Integer> allPartitions = new HashSet<>();
for (int partition = 0; partition < partitionCount; partition++) {
try {
storeRepository.subscribe(storeName);
} catch (InterruptedException e) {
throw new VeniceException("Failed to start bootstrapping changelog consumer with error:", e);
}
Store store = storeRepository.getStore(storeName);
for (int partition = 0; partition < store.getVersion(store.getCurrentVersion()).getPartitionCount(); partition++) {
allPartitions.add(partition);
}
return this.start(allPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.alpini.base.concurrency.Executors;
import com.linkedin.alpini.base.concurrency.ScheduledExecutorService;
import com.linkedin.davinci.repository.NativeMetadataRepositoryViewAdapter;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
Expand Down Expand Up @@ -92,6 +93,11 @@ public CompletableFuture<Void> subscribe(Set<Integer> partitions) {
if (partitions.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
try {
storeRepository.subscribe(storeName);
} catch (InterruptedException e) {
throw new VeniceException("Failed to start bootstrapping changelog consumer with error:", e);
}
if (!versionSwapThreadScheduled.get()) {
// schedule the version swap thread and set up the callback listener
this.storeRepository.registerStoreDataChangedListener(versionSwapListener);
Expand Down Expand Up @@ -208,4 +214,10 @@ public void run() {
versionSwapListener.handleStoreChanged(null);
}
}

@Override
public void setStoreRepository(NativeMetadataRepositoryViewAdapter repository) {
super.setStoreRepository(repository);
versionSwapListener.setStoreRepository(repository);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public <K, V> VeniceChangelogConsumer<K, V> getChangelogConsumer(String storeNam
String viewClass = getViewClass(newStoreChangelogClientConfig, storeName);
String consumerName = suffixConsumerIdToStore(storeName + "-" + viewClass.getClass().getSimpleName(), consumerId);
if (viewClass.equals(ChangeCaptureView.class.getCanonicalName())) {
// TODO: This is a little bit of a hack. This is to deal with the an issue where the before image change
// capture topic doesn't follow the same naming convention as view topics.
newStoreChangelogClientConfig.setIsBeforeImageView(true);
return new VeniceChangelogConsumerImpl(
newStoreChangelogClientConfig,
consumer != null
Expand Down
Loading
Loading