Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server][controller] Add MaterializedViewWriter and support view writers in L/F #1296

Merged
merged 11 commits into from
Jan 17, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.record.ByteBufferValueRecord;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.davinci.store.view.VeniceViewWriter;
import com.linkedin.davinci.utils.ByteArrayKey;
import com.linkedin.venice.exceptions.PersistenceFailureException;
import com.linkedin.venice.exceptions.VeniceException;
Expand Down Expand Up @@ -68,7 +67,6 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -641,66 +639,40 @@ protected void processMessageAndMaybeProduceToKafka(
// call in this context much less obtrusive, however, it implies that all views can only work for AA stores

// Write to views
if (this.viewWriters.size() > 0) {
Runnable produceToVersionTopic = () -> producePutOrDeleteToKafka(
mergeConflictResultWrapper,
partitionConsumptionState,
keyBytes,
consumerRecord,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
if (hasViewWriters()) {
/**
* The ordering guarantees we want is the following:
*
* 1. Write to all view topics (in parallel).
* 2. Write to the VT only after we get the ack for all views AND the previous write to VT was queued into the
* producer (but not necessarily acked).
*/
long preprocessingTime = System.currentTimeMillis();
CompletableFuture currentVersionTopicWrite = new CompletableFuture();
CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1];
int index = 0;
// The first future is for the previous write to VT
viewWriterFutures[index++] = partitionConsumptionState.getLastVTProduceCallFuture();
ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get();
int oldValueSchemaId =
oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId();
for (VeniceViewWriter writer: viewWriters.values()) {
viewWriterFutures[index++] = writer.processRecord(
mergeConflictResult.getNewValue(),
oldValueBB,
keyBytes,
versionNumber,
mergeConflictResult.getValueSchemaId(),
oldValueSchemaId,
mergeConflictResult.getRmdRecord());
}
CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> {
hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
if (exception == null) {
producePutOrDeleteToKafka(
mergeConflictResultWrapper,
partitionConsumptionState,
queueUpVersionTopicWritesWithViewWriters(
partitionConsumptionState,
(viewWriter) -> viewWriter.processRecord(
mergeConflictResultWrapper.getUpdatedValueBytes(),
oldValueBB,
keyBytes,
consumerRecord,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
currentVersionTopicWrite.complete(null);
} else {
VeniceException veniceException = new VeniceException(exception);
this.setIngestionException(partitionConsumptionState.getPartition(), veniceException);
currentVersionTopicWrite.completeExceptionally(veniceException);
}
});
partitionConsumptionState.setLastVTProduceCallFuture(currentVersionTopicWrite);
mergeConflictResult.getValueSchemaId(),
oldValueSchemaId,
mergeConflictResult.getRmdRecord()),
produceToVersionTopic);
} else {
// This function may modify the original record in KME and it is unsafe to use the payload from KME directly
// after
// this call.
producePutOrDeleteToKafka(
mergeConflictResultWrapper,
partitionConsumptionState,
keyBytes,
consumerRecord,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
// after this call.
produceToVersionTopic.run();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.STANDBY;
import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.END_OF_PUSH;
import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.START_OF_SEGMENT;
import static com.linkedin.venice.kafka.protocol.enums.MessageType.UPDATE;
import static com.linkedin.venice.pubsub.api.PubSubMessageHeaders.VENICE_LEADER_COMPLETION_STATE_HEADER;
import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS;
import static com.linkedin.venice.writer.VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER;
Expand Down Expand Up @@ -100,6 +101,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -108,6 +110,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.LongPredicate;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -2402,7 +2405,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
boolean produceToLocalKafka = shouldProduceToVersionTopic(partitionConsumptionState);
// UPDATE message is only expected in LEADER which must be produced to kafka.
MessageType msgType = MessageType.valueOf(kafkaValue);
if (msgType == MessageType.UPDATE && !produceToLocalKafka) {
if (msgType == UPDATE && !produceToLocalKafka) {
throw new VeniceMessageException(
ingestionTaskName + " hasProducedToKafka: Received UPDATE message in non-leader for: "
+ consumerRecord.getTopicPartition() + " Offset " + consumerRecord.getOffset());
Expand Down Expand Up @@ -2436,6 +2439,15 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
partitionConsumptionState.getVeniceWriterLazyRef().ifPresent(vw -> vw.flush());
partitionConsumptionState.setVeniceWriterLazyRef(veniceWriterForRealTime);
}
/**
* Materialized view need to produce to the corresponding view topic for the batch portion of the data. This is
* achieved in the following ways:
* 1. Remote fabric(s) will leverage NR where the leader will replicate VT from NR source fabric and produce
* to local view topic(s).
* 2. NR source fabric's view topic will be produced by VPJ. This is because there is no checkpointing and
* easy way to add checkpointing for leaders consuming the local VT. Making it difficult and error prone if
* we let the leader produce to view topic(s) in NR source fabric.
*/
return DelegateConsumerRecordResult.QUEUED_TO_DRAINER;
}

Expand Down Expand Up @@ -2464,7 +2476,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(

if (kafkaKey.isControlMessage()) {
boolean producedFinally = true;
ControlMessage controlMessage = (ControlMessage) kafkaValue.payloadUnion;
ControlMessage controlMessage = (ControlMessage) kafkaValue.getPayloadUnion();
ControlMessageType controlMessageType = ControlMessageType.valueOf(controlMessage);
leaderProducedRecordContext = LeaderProducedRecordContext
.newControlMessageRecord(kafkaClusterId, consumerRecord.getOffset(), kafkaKey.getKey(), controlMessage);
Expand All @@ -2489,6 +2501,14 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
* consumes the first message; potential message type: SOS, EOS, SOP, EOP, data message (consider server restart).
*/
case END_OF_PUSH:
// CMs that are produced with DIV pass-through mode can break DIV without synchronization with view writers.
// This is because for data (PUT) records we queue their produceToLocalKafka behind the completion of view
// writers. The main SIT will move on to subsequent messages and for CMs that don't need to be propagated
// to view topics we are producing them directly. If we don't check the previous write before producing the
// CMs then in the VT we might get out of order messages and with pass-through DIV that's going to be an
// issue. e.g. a PUT record belonging to seg:0 can come after the EOS of seg:0 due to view writer delays.
// Since SOP and EOP are rare we can simply wait for the last VT produce future.
checkAndWaitForLastVTProduceFuture(partitionConsumptionState);
/**
* Simply produce this EOP to local VT. It will be processed in order in the drainer queue later
* after successfully producing to kafka.
Expand Down Expand Up @@ -2533,35 +2553,46 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
*
* There is one exception that overrules the above conditions. i.e. if the SOS is a heartbeat from the RT topic.
* In such case the heartbeat is produced to VT with updated {@link LeaderMetadataWrapper}.
*
* We want to ensure correct ordering for any SOS and EOS that we do decide to write to VT. This is done by
* coordinating with the corresponding {@link PartitionConsumptionState#getLastVTProduceCallFuture}.
* However, this coordination is only needed if there are view writers. i.e. the VT writes and CM writes
* need to be in the same mode. Either both coordinate with lastVTProduceCallFuture or neither.
*/
if (!consumerRecord.getTopicPartition().getPubSubTopic().isRealTime()) {
produceToLocalKafka(
consumerRecord,
final LeaderProducedRecordContext segmentCMLeaderProduceRecordContext = leaderProducedRecordContext;
maybeQueueCMWritesToVersionTopic(
partitionConsumptionState,
leaderProducedRecordContext,
(callback, leaderMetadataWrapper) -> partitionConsumptionState.getVeniceWriterLazyRef()
.get()
.put(
consumerRecord.getKey(),
consumerRecord.getValue(),
callback,
consumerRecord.getTopicPartition().getPartitionNumber(),
leaderMetadataWrapper),
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingPerRecordTimestampNs);
() -> produceToLocalKafka(
consumerRecord,
partitionConsumptionState,
segmentCMLeaderProduceRecordContext,
(callback, leaderMetadataWrapper) -> partitionConsumptionState.getVeniceWriterLazyRef()
.get()
.put(
consumerRecord.getKey(),
consumerRecord.getValue(),
callback,
consumerRecord.getTopicPartition().getPartitionNumber(),
leaderMetadataWrapper),
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingPerRecordTimestampNs));
} else {
if (controlMessageType == START_OF_SEGMENT
&& Arrays.equals(consumerRecord.getKey().getKey(), KafkaKey.HEART_BEAT.getKey())) {
propagateHeartbeatFromUpstreamTopicToLocalVersionTopic(
final LeaderProducedRecordContext heartbeatLeaderProducedRecordContext = leaderProducedRecordContext;
maybeQueueCMWritesToVersionTopic(
partitionConsumptionState,
consumerRecord,
leaderProducedRecordContext,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingPerRecordTimestampNs);
() -> propagateHeartbeatFromUpstreamTopicToLocalVersionTopic(
partitionConsumptionState,
consumerRecord,
heartbeatLeaderProducedRecordContext,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingPerRecordTimestampNs));
} else {
/**
* Based on current design handling this case (specially EOS) is tricky as we don't produce the SOS/EOS
Expand Down Expand Up @@ -3334,9 +3365,43 @@ protected void processMessageAndMaybeProduceToKafka(
beforeProcessingRecordTimestampNs,
beforeProcessingBatchRecordsTimestampMs).getWriteComputeResultWrapper();
}
if (msgType.equals(UPDATE) && writeComputeResultWrapper.isSkipProduce()) {
return;
}
Runnable produceToVersionTopic = () -> produceToLocalKafkaHelper(
consumerRecord,
partitionConsumptionState,
writeComputeResultWrapper,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
// Write to views
if (hasViewWriters()) {
Put newPut = writeComputeResultWrapper.getNewPut();
queueUpVersionTopicWritesWithViewWriters(
partitionConsumptionState,
(viewWriter) -> viewWriter.processRecord(newPut.putValue, keyBytes, newPut.schemaId),
produceToVersionTopic);
} else {
produceToVersionTopic.run();
}
}

Put newPut = writeComputeResultWrapper.getNewPut();
private void produceToLocalKafkaHelper(
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord,
PartitionConsumptionState partitionConsumptionState,
WriteComputeResultWrapper writeComputeResultWrapper,
int partition,
String kafkaUrl,
int kafkaClusterId,
long beforeProcessingRecordTimestampNs) {
KafkaKey kafkaKey = consumerRecord.getKey();
KafkaMessageEnvelope kafkaValue = consumerRecord.getValue();
byte[] keyBytes = kafkaKey.getKey();
MessageType msgType = MessageType.valueOf(kafkaValue.messageType);
LeaderProducedRecordContext leaderProducedRecordContext;
Put newPut = writeComputeResultWrapper.getNewPut();
switch (msgType) {
case PUT:
leaderProducedRecordContext =
Expand Down Expand Up @@ -3390,10 +3455,6 @@ protected void processMessageAndMaybeProduceToKafka(
break;

case UPDATE:
if (writeComputeResultWrapper.isSkipProduce()) {
return;
}

leaderProducedRecordContext =
LeaderProducedRecordContext.newPutRecord(kafkaClusterId, consumerRecord.getOffset(), keyBytes, newPut);
BiConsumer<ChunkAwareCallback, LeaderMetadataWrapper> produceFunction =
Expand Down Expand Up @@ -3610,15 +3671,17 @@ protected long measureRTOffsetLagForSingleRegion(
}

@Override
protected void processVersionSwapMessage(
protected void processControlMessageForViews(
KafkaKey kafkaKey,
KafkaMessageEnvelope kafkaMessageEnvelope,
ControlMessage controlMessage,
int partition,
PartitionConsumptionState partitionConsumptionState) {

// Iterate through list of views for the store and process the control message.
for (VeniceViewWriter viewWriter: viewWriters.values()) {
// TODO: at some point, we should do this on more or all control messages potentially as we add more view types
viewWriter.processControlMessage(controlMessage, partition, partitionConsumptionState, this.versionNumber);
viewWriter
.processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, partition, partitionConsumptionState);
}
}

Expand Down Expand Up @@ -3879,6 +3942,35 @@ protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptio
}
}

protected void queueUpVersionTopicWritesWithViewWriters(
PartitionConsumptionState partitionConsumptionState,
Function<VeniceViewWriter, CompletableFuture<PubSubProduceResult>> viewWriterRecordProcessor,
Runnable versionTopicWrite) {
long preprocessingTime = System.currentTimeMillis();
CompletableFuture<Void> currentVersionTopicWrite = new CompletableFuture<>();
CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1];
int index = 0;
// The first future is for the previous write to VT
viewWriterFutures[index++] = partitionConsumptionState.getLastVTProduceCallFuture();
for (VeniceViewWriter writer: viewWriters.values()) {
viewWriterFutures[index++] = viewWriterRecordProcessor.apply(writer);
}
hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> {
hostLevelIngestionStats.recordViewProducerAckLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
if (exception == null) {
versionTopicWrite.run();
currentVersionTopicWrite.complete(null);
} else {
VeniceException veniceException = new VeniceException(exception);
this.setIngestionException(partitionConsumptionState.getPartition(), veniceException);
currentVersionTopicWrite.completeExceptionally(veniceException);
}
});

partitionConsumptionState.setLastVTProduceCallFuture(currentVersionTopicWrite);
}

/**
* Once leader is marked completed, immediately reset {@link #lastSendIngestionHeartbeatTimestamp}
* such that {@link #maybeSendIngestionHeartbeat()} will send HB SOS to the respective RT topics
Expand All @@ -3904,4 +3996,34 @@ Set<String> getKafkaUrlSetFromTopicSwitch(TopicSwitchWrapper topicSwitchWrapper)
}
return topicSwitchWrapper.getSourceServers();
}

private void checkAndWaitForLastVTProduceFuture(PartitionConsumptionState partitionConsumptionState)
throws ExecutionException, InterruptedException {
partitionConsumptionState.getLastVTProduceCallFuture().get();
}

protected boolean hasViewWriters() {
return viewWriters != null && !viewWriters.isEmpty();
}

private void maybeQueueCMWritesToVersionTopic(
PartitionConsumptionState partitionConsumptionState,
Runnable produceCall) {
if (hasViewWriters()) {
CompletableFuture<Void> propagateSegmentCMWrite = new CompletableFuture<>();
partitionConsumptionState.getLastVTProduceCallFuture().whenCompleteAsync((value, exception) -> {
if (exception == null) {
produceCall.run();
propagateSegmentCMWrite.complete(null);
} else {
VeniceException veniceException = new VeniceException(exception);
this.setIngestionException(partitionConsumptionState.getPartition(), veniceException);
propagateSegmentCMWrite.completeExceptionally(veniceException);
}
});
partitionConsumptionState.setLastVTProduceCallFuture(propagateSegmentCMWrite);
} else {
produceCall.run();
}
}
}
Loading
Loading