Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller][server] Remove SIT ready-to-serve check for A/A and non-AGG store during L/F transition #1409

Merged
merged 9 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1127,10 +1127,6 @@ protected void leaderExecuteTopicSwitch(
partitionConsumptionState.getOffsetRecord().setLeaderTopic(newSourceTopic);
// Calculate leader offset and start consumption
prepareLeaderOffsetCheckpointAndStartConsumptionAsLeader(newSourceTopic, partitionConsumptionState, true);

// In case new topic is empty and leader can never become online
// TODO: Remove this check after AGG mode is deprecated.
defaultReadyToServeChecker.apply(partitionConsumptionState);
}

/**
Expand All @@ -1155,7 +1151,6 @@ protected boolean processTopicSwitch(
syncTopicSwitchToIngestionMetadataService(topicSwitch, partitionConsumptionState);
if (!isLeader(partitionConsumptionState)) {
partitionConsumptionState.getOffsetRecord().setLeaderTopic(newSourceTopic);
return true;
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ protected void checkLongRunningTaskState() throws InterruptedException {
* In extreme case, if there is no message in real-time topic, there will be no new message after leader switch
* to the real-time topic, so `isReadyToServe()` check will never be invoked.
*/
defaultReadyToServeChecker.apply(partitionConsumptionState);
maybeApplyReadyToServeCheck(partitionConsumptionState);
}
break;

Expand Down Expand Up @@ -1064,7 +1064,7 @@ protected void leaderExecuteTopicSwitch(
upstreamStartOffset);

// In case new topic is empty and leader can never become online
defaultReadyToServeChecker.apply(partitionConsumptionState);
maybeApplyReadyToServeCheck(partitionConsumptionState);
}

protected void syncConsumedUpstreamRTOffsetMapIfNeeded(
Expand Down Expand Up @@ -1326,7 +1326,7 @@ protected boolean processTopicSwitch(
* Real time topic for that partition is empty or the rewind start offset is very closed to the end, followers
* calculate the lag of the leader and decides the lag is small enough.
*/
return true;
return isHybridAggregateMode();
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
Expand Down Expand Up @@ -2071,6 +2072,7 @@ private void checkConsumptionStateWhenStart(
}
}
}
// This ready-to-serve check is acceptable in SIT thread as it happens before subscription.
if (!isCompletedReport) {
defaultReadyToServeChecker.apply(newPartitionConsumptionState);
}
Expand Down Expand Up @@ -4566,7 +4568,22 @@ void setVersionRole(PartitionReplicaIngestionContext.VersionRole versionRole) {
this.versionRole = versionRole;
}

protected boolean isDaVinciClient() {
boolean isDaVinciClient() {
return isDaVinciClient;
}

boolean isHybridAggregateMode() {
return hybridStoreConfig.isPresent()
&& hybridStoreConfig.get().getDataReplicationPolicy().equals(DataReplicationPolicy.AGGREGATE);
}

ReadyToServeCheck getReadyToServerChecker() {
return defaultReadyToServeChecker;
}

void maybeApplyReadyToServeCheck(PartitionConsumptionState partitionConsumptionState) {
if (isHybridAggregateMode()) {
getReadyToServerChecker().apply(partitionConsumptionState);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,8 @@ void testRollbackAndRollForward() {
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber());
}
// Bootstrap checker thread can also modify the versionMap, adding wait-assert here to avoid NPE.
assertNotNull(versionMap.get(version2.kafkaTopicName()));
});
versionMap.get(version2.kafkaTopicName()).completePartition(partition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3683,16 +3683,20 @@ public void testGetAndUpdateLeaderCompletedState(HybridConfig hybridConfig, Node

@DataProvider
public static Object[][] testProcessTopicSwitchProvider() {
return new Object[][] { { LEADER }, { DA_VINCI } };
return DataProviderUtils.allPermutationGenerator(new NodeType[] { DA_VINCI, LEADER }, DataProviderUtils.BOOLEAN);
}

@Test(dataProvider = "testProcessTopicSwitchProvider")
public void testProcessTopicSwitch(NodeType nodeType) {
public void testProcessTopicSwitch(NodeType nodeType, boolean isAggregateMode) {
VenicePartitioner partitioner = getVenicePartitioner();
PartitionerConfig partitionerConfig = new PartitionerConfigImpl();
partitionerConfig.setPartitionerClass(partitioner.getClass().getName());
HybridStoreConfig hybridStoreConfig =
new HybridStoreConfigImpl(100, 100, 100, DataReplicationPolicy.AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP);
HybridStoreConfig hybridStoreConfig = new HybridStoreConfigImpl(
100,
100,
100,
isAggregateMode ? DataReplicationPolicy.AGGREGATE : DataReplicationPolicy.NON_AGGREGATE,
BufferReplayPolicy.REWIND_FROM_EOP);
MockStoreVersionConfigs storeAndVersionConfigs =
setupStoreAndVersionMocks(2, partitionerConfig, Optional.of(hybridStoreConfig), false, true, AA_OFF);
StorageService storageService = mock(StorageService.class);
Expand Down Expand Up @@ -3740,7 +3744,8 @@ public void testProcessTopicSwitch(NodeType nodeType) {
doReturn(mockOffsetRecord).when(mockPcs).getOffsetRecord();
doReturn(PARTITION_FOO).when(mockPcs).getPartition();
doReturn(PARTITION_FOO).when(mockPcs).getPartition();
storeIngestionTaskUnderTest.processTopicSwitch(controlMessage, PARTITION_FOO, 10, mockPcs);
boolean result = storeIngestionTaskUnderTest.processTopicSwitch(controlMessage, PARTITION_FOO, 10, mockPcs);
Assert.assertEquals(isAggregateMode, result);
verify(mockTopicManagerRemoteKafka, never()).getOffsetByTime(any(), anyLong());
verify(mockOffsetRecord, never()).setLeaderUpstreamOffset(anyString(), anyLong());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ public void testRTTopicDeletionWithHybridAndIncrementalVersions() {
PubSubTopic rtPubSubTopic = pubSubTopicRepository.getTopic(rtTopicName);

UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams();
updateStoreParams.setIncrementalPushEnabled(true)
.setBackupStrategy(BackupStrategy.KEEP_MIN_VERSIONS)
updateStoreParams.setBackupStrategy(BackupStrategy.KEEP_MIN_VERSIONS)
.setNumVersionsToPreserve(2)
.setHybridRewindSeconds(1000)
.setHybridOffsetLagThreshold(1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ public void testIncrementalPushPartialUpdateNewFormat(boolean useSparkCompute) t
UpdateStoreQueryParams updateStoreParams =
new UpdateStoreQueryParams().setStorageQuotaInByte(Store.UNLIMITED_STORAGE_QUOTA)
.setCompressionStrategy(CompressionStrategy.NO_OP)
.setActiveActiveReplicationEnabled(true)
.setWriteComputationEnabled(true)
.setChunkingEnabled(true)
.setIncrementalPushEnabled(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,12 @@ public void testPushJobDetails(boolean useCustomCheckpoints) throws IOException
// because hadoop job client cannot fetch counters properly.
parentControllerClient.updateStore(
testStoreName,
new UpdateStoreQueryParams().setStorageQuotaInByte(-1).setPartitionCount(2).setIncrementalPushEnabled(true));
new UpdateStoreQueryParams().setStorageQuotaInByte(-1)
.setPartitionCount(2)
.setHybridOffsetLagThreshold(10)
.setHybridRewindSeconds(10)
.setActiveActiveReplicationEnabled(true)
.setIncrementalPushEnabled(true));
Properties pushJobProps = defaultVPJProps(multiRegionMultiClusterWrapper, inputDirPathForFullPush, testStoreName);
pushJobProps.setProperty(PUSH_JOB_STATUS_UPLOAD_ENABLE, String.valueOf(true));
try (VenicePushJob testPushJob = new VenicePushJob("test-push-job-details-job", pushJobProps)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ public void testNativeReplicationForIncrementalPush() throws Exception {
updateStoreQueryParams -> updateStoreQueryParams.setPartitionCount(1)
.setHybridOffsetLagThreshold(TEST_TIMEOUT)
.setHybridRewindSeconds(2L)
.setActiveActiveReplicationEnabled(true)
.setIncrementalPushEnabled(true),
100,
(parentControllerClient, clusterName, storeName, props, inputDir) -> {
Expand Down Expand Up @@ -509,8 +510,7 @@ public void testActiveActiveForHeartbeatSystemStores() throws Exception {
int partitionCount = 2;
motherOfAllTests(
"testActiveActiveForHeartbeatSystemStores",
updateStoreQueryParams -> updateStoreQueryParams.setPartitionCount(partitionCount)
.setIncrementalPushEnabled(true),
updateStoreQueryParams -> updateStoreQueryParams.setPartitionCount(partitionCount),
recordCount,
(parentControllerClient, clusterName, storeName, props, inputDir) -> {
try (
Expand Down Expand Up @@ -595,7 +595,8 @@ public void testMultiDataCenterRePushWithIncrementalPush() throws Exception {
parentControllerClient
.updateStore(
storeName,
new UpdateStoreQueryParams().setIncrementalPushEnabled(true)
new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(true)
.setIncrementalPushEnabled(true)
.setHybridOffsetLagThreshold(1)
.setHybridRewindSeconds(Time.SECONDS_PER_DAY))
.isError());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ public void testIngestionHeartBeat(
ControllerResponse updateStoreResponse =
parentControllerClient.retryableRequest(5, c -> c.updateStore(storeName, updateStoreParams));

// If config combination for incremental push is wrong, update store should fail loudly.
if (!isActiveActiveEnabled && isIncrementalPushEnabled) {
assertTrue(updateStoreResponse.isError(), "Update store does not error on invalid config combination.");
return;
}
assertFalse(updateStoreResponse.isError(), "Update store got error: " + updateStoreResponse.getError());

VersionCreationResponse response = parentControllerClient.emptyPush(storeName, "test_push_id", 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ public static void waitForNonDeterministicPushCompletion(
ControllerClient controllerClient,
long timeout,
TimeUnit timeoutUnit) {
waitForNonDeterministicAssertion(timeout, timeoutUnit, () -> {
waitForNonDeterministicAssertion(timeout, timeoutUnit, true, () -> {
JobStatusQueryResponse jobStatusQueryResponse =
assertCommand(controllerClient.queryJobStatus(topicName, Optional.empty()));
ExecutionStatus executionStatus = ExecutionStatus.valueOf(jobStatusQueryResponse.getStatus());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2768,6 +2768,18 @@ && getVeniceHelixAdmin().isHybrid(setStore.getHybridStoreConfig()) && setStore.g
updatedConfigsList.add(PARTITION_COUNT);
}

/**
* Pre-flight check for incremental push config update. We only allow incremental push config to be turned on
* when store is A/A. Otherwise, we should fail store update.
*/
if (setStore.hybridStoreConfig != null && setStore.incrementalPushEnabled
&& !setStore.activeActiveReplicationEnabled) {
throw new VeniceHttpException(
HttpStatus.SC_BAD_REQUEST,
"Hybrid store config invalid. Cannot have incremental push enabled while A/A not enabled",
ErrorType.BAD_REQUEST);
}

/**
* By default, parent controllers will not try to replicate the unchanged store configs to child controllers;
* an updatedConfigsList will be used to represent which configs are updated by users.
Expand Down
Loading