Skip to content

Commit

Permalink
[compat] [controller] add a field largestUsedRTVersionNumber in sto…
Browse files Browse the repository at this point in the history
…re config (#1512)

When we delete and recreate a store, we do not want to use the same old real time topic name, because that might not have been fully cleaned up. We want to use a new name, and to create a new name, we need to know the previous name that was ever used. This config `largestUsedRTVersionNumber` will help in finding this out.
  • Loading branch information
arjun4084346 authored Feb 21, 2025
1 parent a6c0bf5 commit e5ad505
Show file tree
Hide file tree
Showing 15 changed files with 395 additions and 62 deletions.
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,6 @@ subprojects {
doFirst {
def versionOverrides = [
// project(':internal:venice-common').file('src/main/resources/avro/StoreVersionState/v5', PathValidation.DIRECTORY)
project(':internal:venice-common').file('src/main/resources/avro/StoreMetaValue/v27', PathValidation.DIRECTORY),
project(':internal:venice-common').file('src/main/resources/avro/PartitionState/v15', PathValidation.DIRECTORY),
project(':internal:venice-common').file('src/main/resources/avro/KafkaMessageEnvelope/v11', PathValidation.DIRECTORY)
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class ControllerApiConstants {
public static final String BATCH_GET_ROUTER_CACHE_ENABLED = "batch_get_router_cache_enabled";
public static final String BATCH_GET_LIMIT = "batch_get_limit";
public static final String LARGEST_USED_VERSION_NUMBER = "largest_used_version_number";
public static final String LARGEST_USED_RT_VERSION_NUMBER = "largest_used_rt_version_number";
public static final String NUM_VERSIONS_TO_PRESERVE = "num_versions_to_preserve";
public static final String DISABLE_META_STORE = "disable_meta_store";
public static final String DISABLE_DAVINCI_PUSH_STATUS_STORE = "disable_davinci_push_status_store";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.linkedin.venice.zk.VeniceZkPaths.STORE_GRAVEYARD;

import com.linkedin.venice.annotation.VisibleForTesting;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.exceptions.VeniceException;
Expand Down Expand Up @@ -52,43 +53,86 @@ public HelixStoreGraveyard(
dataAccessor = new ZkBaseDataAccessor<>(zkClient);
}

@Override
public int getLargestUsedRTVersionNumber(String storeName) {
return getLargestUsedVersionNumber(storeName, true);
}

@Override
public int getLargestUsedVersionNumber(String storeName) {
return getLargestUsedVersionNumber(storeName, false);
}

private int getLargestUsedVersionNumber(String storeName, boolean isRTVersion) {
if (VeniceSystemStoreUtils.isSystemStore(storeName)) {
VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName);
if (systemStoreType != null && systemStoreType.isStoreZkShared()) {
String userStoreName = systemStoreType.extractRegularStoreName(storeName);
return getPerUserStoreSystemStoreLargestUsedVersionNumber(userStoreName, systemStoreType);
return getPerUserStoreSystemStoreLargestUsedVersionNumber(userStoreName, systemStoreType, isRTVersion);
}
}

List<Store> stores = getStoreFromAllClusters(storeName);
if (stores.isEmpty()) {
// If store does NOT existing in graveyard, it means store has never been deleted, return 0 which is the default
// value of largestUsedVersionNumber for a new store.
return Store.NON_EXISTING_VERSION;
}

int largestUsedVersionNumber = Store.NON_EXISTING_VERSION;
for (Store deletedStore: stores) {
if (deletedStore.getLargestUsedVersionNumber() > largestUsedVersionNumber) {
largestUsedVersionNumber = deletedStore.getLargestUsedVersionNumber();
int versionNumber =
isRTVersion ? deletedStore.getLargestUsedRTVersionNumber() : deletedStore.getLargestUsedVersionNumber();
largestUsedVersionNumber = Math.max(largestUsedVersionNumber, versionNumber);
}
return largestUsedVersionNumber;
}

@VisibleForTesting
int getPerUserStoreSystemStoreLargestUsedVersionNumber(
String userStoreName,
VeniceSystemStoreType systemStoreType,
boolean isRTVersion) {
String systemStoreName = systemStoreType.getSystemStoreName(userStoreName);
List<Store> deletedStores = getStoreFromAllClusters(userStoreName);
if (deletedStores.isEmpty()) {
LOGGER.info(
"User store: {} does NOT exist in the store graveyard. Hence, no largest used {} version for its system store: {}",
userStoreName,
isRTVersion ? "RT" : "",
systemStoreName);
return Store.NON_EXISTING_VERSION;
}
int largestUsedVersionNumber = Store.NON_EXISTING_VERSION;
for (Store deletedStore: deletedStores) {
Map<String, SystemStoreAttributes> systemStoreNamesToAttributes = deletedStore.getSystemStores();
SystemStoreAttributes systemStoreAttributes = systemStoreNamesToAttributes.get(systemStoreType.getPrefix());
if (systemStoreAttributes != null) {
largestUsedVersionNumber = Math.max(
largestUsedVersionNumber,
isRTVersion
? systemStoreAttributes.getLargestUsedRTVersionNumber()
: systemStoreAttributes.getLargestUsedVersionNumber());
}
}

if (largestUsedVersionNumber == Store.NON_EXISTING_VERSION) {
LOGGER.info("Can not find largest used {} version number for {}.", isRTVersion ? "RT" : "", systemStoreName);
}
return largestUsedVersionNumber;
}

@Override
public void putStoreIntoGraveyard(String clusterName, Store store) {
int largestUsedVersionNumber = getLargestUsedVersionNumber(store.getName());
int largestUsedRTVersionNumber = getLargestUsedRTVersionNumber(store.getName());

if (store.isMigrating()) {
/**
* Suppose I have two datacenters Parent and Child, each has two clusters C1 and C2
* Before migration, I have a store with largest version 3:
* Before migration, I have a store with the largest version 3:
* P: C1:v3*, C2:null
* C: C1:v3*, C2:null
*
* After migration, both clusters shoud have the same store with same largest version and cluster discovery points to C2
* After migration, both clusters should have the same store with same largest version and cluster discovery points to C2
* P: C1:v3, C2:v3*
* C: C1:v3, C2:v3*
*
Expand All @@ -101,7 +145,7 @@ public void putStoreIntoGraveyard(String clusterName, Store store) {
* C: C1:v4, C2:null*
*
* Then I realized the error and want to delete the other store as well, but now I can't delete it because the largest
* version number (3) doesn't match with the one retrived from graveyard (4).
* version number (3) doesn't match with the one retrieved from graveyard (4).
* This check will address to this situation, and keep the largest version number in both graveyards the same.
*/
if (largestUsedVersionNumber > store.getLargestUsedVersionNumber()) {
Expand All @@ -112,23 +156,45 @@ public void putStoreIntoGraveyard(String clusterName, Store store) {
largestUsedVersionNumber);
store.setLargestUsedVersionNumber(largestUsedVersionNumber);
}
} else if (store.getLargestUsedVersionNumber() < largestUsedVersionNumber) {
// largestUsedVersion number in re-created store is smaller than the deleted store. It's should be a issue.
String errorMsg = "Invalid largestUsedVersionNumber: " + store.getLargestUsedVersionNumber() + " in Store: "
+ store.getName() + ", it's smaller than one found in graveyard: " + largestUsedVersionNumber;
LOGGER.error(errorMsg);
throw new VeniceException(errorMsg);
if (largestUsedRTVersionNumber > store.getLargestUsedRTVersionNumber()) {
LOGGER.info(
"Increased largestUsedRTVersionNumber for migrating store {} from {} to {}.",
store.getName(),
store.getLargestUsedVersionNumber(),
largestUsedVersionNumber);
store.setLargestUsedRTVersionNumber(largestUsedRTVersionNumber);
}
} else {
if (store.getLargestUsedVersionNumber() < largestUsedVersionNumber) {
// largestUsedVersion number in re-created store is smaller than the deleted store. It's should be a issue.
String errorMsg = "Invalid largestUsedVersionNumber: " + store.getLargestUsedVersionNumber() + " in Store: "
+ store.getName() + ", it's smaller than one found in graveyard: " + largestUsedVersionNumber;
LOGGER.error(errorMsg);
throw new VeniceException(errorMsg);
}
if (store.getLargestUsedRTVersionNumber() < largestUsedRTVersionNumber) {
// largestUsedRTVersion number in re-created store is smaller than the deleted store. It's should be a issue.
String errorMsg = "Invalid largestUsedRTVersionNumber: " + store.getLargestUsedRTVersionNumber() + " in Store: "
+ store.getName() + ", it's smaller than one found in graveyard: " + largestUsedRTVersionNumber;
LOGGER.error(errorMsg);
throw new VeniceException(errorMsg);
}
}

// Store does not exist in graveyard OR store already exists but the re-created store is deleted again so we need to
// update the ZNode.
HelixUtils.update(dataAccessor, getStoreGraveyardPath(clusterName, store.getName()), store);
updateZNode(clusterName, store);

LOGGER.info(
"Put store: {} into graveyard with largestUsedVersionNumber {}.",
store.getName(),
largestUsedVersionNumber);
}

void updateZNode(String clusterName, Store store) {
HelixUtils.update(dataAccessor, getStoreGraveyardPath(clusterName, store.getName()), store);
}

@Override
public Store getStoreFromGraveyard(String clusterName, String storeName, Stat stat) {
String path = getStoreGraveyardPath(clusterName, storeName);
Expand Down Expand Up @@ -156,7 +222,8 @@ public List<String> listStoreNamesFromGraveyard(String clusterName) {
* @return Matching store from each venice. Normally contains one element.
* If the store existed in some other cluster before, there will be more than one element in the return value.
*/
private List<Store> getStoreFromAllClusters(String storeName) {
@VisibleForTesting
List<Store> getStoreFromAllClusters(String storeName) {
List<Store> stores = new ArrayList<>();
for (String clusterName: clusterNames) {
Store store = dataAccessor.get(getStoreGraveyardPath(clusterName, storeName), null, AccessOption.PERSISTENT);
Expand All @@ -167,40 +234,11 @@ private List<Store> getStoreFromAllClusters(String storeName) {
return stores;
}

private int getPerUserStoreSystemStoreLargestUsedVersionNumber(
String userStoreName,
VeniceSystemStoreType systemStoreType) {
String systemStoreName = systemStoreType.getSystemStoreName(userStoreName);
List<Store> deletedStores = getStoreFromAllClusters(userStoreName);
if (deletedStores.isEmpty()) {
LOGGER.info(
"User store: {} does NOT exist in the store graveyard. Hence, no largest used version for its system store: {}",
userStoreName,
systemStoreName);
return Store.NON_EXISTING_VERSION;
}
int largestUsedVersionNumber = Store.NON_EXISTING_VERSION;
for (Store deletedStore: deletedStores) {
Map<String, SystemStoreAttributes> systemStoreNamesToAttributes = deletedStore.getSystemStores();
SystemStoreAttributes systemStoreAttributes =
systemStoreNamesToAttributes.get(VeniceSystemStoreType.getSystemStoreType(systemStoreName).getPrefix());
if (systemStoreAttributes != null) {
largestUsedVersionNumber =
Math.max(largestUsedVersionNumber, systemStoreAttributes.getLargestUsedVersionNumber());
}
}

if (largestUsedVersionNumber == Store.NON_EXISTING_VERSION) {
LOGGER.info("Can not find largest used version number for {}.", systemStoreName);
}
return largestUsedVersionNumber;
}

private String getGeneralStoreGraveyardPath() {
return getStoreGraveyardPath(PathResourceRegistry.WILDCARD_MATCH_ANY, PathResourceRegistry.WILDCARD_MATCH_ANY);
}

private String getStoreGraveyardPath(String clusterName, String storeName) {
String getStoreGraveyardPath(String clusterName, String storeName) {
return getStoreGraveyardParentPath(clusterName) + "/" + storeName;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,16 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) {
throw new UnsupportedOperationException();
}

@Override
public int getLargestUsedRTVersionNumber() {
return this.delegate.getLargestUsedRTVersionNumber();
}

@Override
public void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber) {
throw new UnsupportedOperationException();
}

@Override
public int getCurrentVersion() {
return this.delegate.getCurrentVersion();
Expand Down Expand Up @@ -855,6 +865,16 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) {
throw new UnsupportedOperationException();
}

@Override
public int getLargestUsedRTVersionNumber() {
return this.delegate.getLargestUsedRTVersionNumber();
}

@Override
public void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber) {
throw new UnsupportedOperationException();
}

@Override
public long getStorageQuotaInByte() {
return this.delegate.getStorageQuotaInByte();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ static boolean isSystemStore(String storeName) {

void setLargestUsedVersionNumber(int largestUsedVersionNumber);

int getLargestUsedRTVersionNumber();

void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber);

long getStorageQuotaInByte();

void setStorageQuotaInByte(long storageQuotaInByte);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,17 @@
*/
public interface StoreGraveyard {
/**
* Retrieve the largest used version number by the given store name from graveyard. Return 0 if the store dose not
* Retrieve the largest used version number by the given store name from graveyard. Return 0 if the store does not
* exist in the graveyard, which is the default value we used for the new store.
*/
int getLargestUsedVersionNumber(String storeName);

/**
* Retrieve the largest used version number for the real time topic by the given store name from graveyard.
* Return 0 if the store does not exist in the graveyard, which is the default value we used for the new store.
*/
int getLargestUsedRTVersionNumber(String storeName);

/**
* Put the given store into graveyard. If the store has already existed in the graveyard, update it by this given
* store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public static StoreInfo fromStore(Store store) {
storeInfo.setHybridStoreDiskQuotaEnabled(store.isHybridStoreDiskQuotaEnabled());
storeInfo.setIncrementalPushEnabled(store.isIncrementalPushEnabled());
storeInfo.setLargestUsedVersionNumber(store.getLargestUsedVersionNumber());
storeInfo.setLargestUsedRTVersionNumber(store.getLargestUsedRTVersionNumber());
storeInfo.setLatestSuperSetValueSchemaId(store.getLatestSuperSetValueSchemaId());
storeInfo.setLowWatermark(store.getLowWatermark());
storeInfo.setMigrating(store.isMigrating());
Expand Down Expand Up @@ -185,6 +186,11 @@ public static StoreInfo fromStore(Store store) {
*/
private int largestUsedVersionNumber;

/**
* Largest used version number of the RT topic.
*/
private int largestUsedRTVersionNumber;

/**
* a flag to see if the store supports incremental push or not
*/
Expand Down Expand Up @@ -550,6 +556,14 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) {
this.largestUsedVersionNumber = largestUsedVersionNumber;
}

public int getLargestUsedRTVersionNumber() {
return largestUsedRTVersionNumber;
}

public void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber) {
this.largestUsedRTVersionNumber = largestUsedRTVersionNumber;
}

public boolean isIncrementalPushEnabled() {
return incrementalPushEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,18 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) {
systemStoreAttributes.setLargestUsedVersionNumber(largestUsedVersionNumber);
}

@Override
public int getLargestUsedRTVersionNumber() {
SystemStoreAttributes systemStoreAttributes = fetchAndBackfillSystemStoreAttributes(true);
return systemStoreAttributes.getLargestUsedRTVersionNumber();
}

@Override
public void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber) {
SystemStoreAttributes systemStoreAttributes = fetchAndBackfillSystemStoreAttributes(false);
systemStoreAttributes.setLargestUsedRTVersionNumber(largestUsedRTVersionNumber);
}

@Override
public long getStorageQuotaInByte() {
return zkSharedStore.getStorageQuotaInByte();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ public interface SystemStoreAttributes extends DataModelBackedStructure<SystemSt

void setLargestUsedVersionNumber(int largestUsedVersionNumber);

int getLargestUsedRTVersionNumber();

void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber);

int getCurrentVersion();

void setCurrentVersion(int currentVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) {
this.dataModel.largestUsedVersionNumber = largestUsedVersionNumber;
}

@Override
public int getLargestUsedRTVersionNumber() {
return this.dataModel.largestUsedRTVersionNumber;
}

@Override
public void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber) {
this.dataModel.largestUsedRTVersionNumber = largestUsedRTVersionNumber;
}

@Override
public int getCurrentVersion() {
return this.dataModel.currentVersion;
Expand Down
Loading

0 comments on commit e5ad505

Please sign in to comment.