Skip to content

Commit

Permalink
add largestUsedRTVersionNumber in store config
Browse files Browse the repository at this point in the history
  • Loading branch information
arjun4084346 committed Feb 12, 2025
1 parent aa7dd9a commit 667dd27
Show file tree
Hide file tree
Showing 15 changed files with 429 additions and 16 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ subprojects {
doFirst {
def versionOverrides = [
// project(':internal:venice-common').file('src/main/resources/avro/StoreVersionState/v5', PathValidation.DIRECTORY)
project(':internal:venice-common').file('src/main/resources/avro/StoreMetaValue/v27', PathValidation.DIRECTORY),
// project(':internal:venice-common').file('src/main/resources/avro/StoreMetaValue/v27', PathValidation.DIRECTORY),
project(':internal:venice-common').file('src/main/resources/avro/PartitionState/v15', PathValidation.DIRECTORY),
project(':internal:venice-common').file('src/main/resources/avro/KafkaMessageEnvelope/v11', PathValidation.DIRECTORY)
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class ControllerApiConstants {
public static final String BATCH_GET_ROUTER_CACHE_ENABLED = "batch_get_router_cache_enabled";
public static final String BATCH_GET_LIMIT = "batch_get_limit";
public static final String LARGEST_USED_VERSION_NUMBER = "largest_used_version_number";
public static final String LARGEST_USED_RT_VERSION_NUMBER = "largest_used_rt_version_number";
public static final String NUM_VERSIONS_TO_PRESERVE = "num_versions_to_preserve";
public static final String DISABLE_META_STORE = "disable_meta_store";
public static final String DISABLE_DAVINCI_PUSH_STATUS_STORE = "disable_davinci_push_status_store";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,11 @@ public VersionResponse getStoreLargestUsedVersion(String clusterName, String sto
return request(ControllerRoute.GET_STORE_LARGEST_USED_VERSION, params, VersionResponse.class);
}

public VersionResponse getStoreLargestUsedRTVersion(String clusterName, String storeName) {
QueryParams params = newParams().add(CLUSTER, clusterName).add(NAME, storeName);
return request(ControllerRoute.GET_STORE_LARGEST_USED_VERSION, params, VersionResponse.class);
}

public RegionPushDetailsResponse getRegionPushDetails(String storeName, boolean isPartitionDetailEnabled) {
QueryParams params = newParams().add(NAME, storeName).add(PARTITION_DETAIL_ENABLED, isPartitionDetailEnabled);
return request(ControllerRoute.GET_REGION_PUSH_DETAILS, params, RegionPushDetailsResponse.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,35 @@ public int getLargestUsedVersionNumber(String storeName) {
return largestUsedVersionNumber;
}

@Override
public int getLargestUsedRTVersionNumber(String storeName) {
if (VeniceSystemStoreUtils.isSystemStore(storeName)) {
VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName);
if (systemStoreType != null && systemStoreType.isStoreZkShared()) {
String userStoreName = systemStoreType.extractRegularStoreName(storeName);
return getPerUserStoreSystemStoreLargestUsedRTVersionNumber(userStoreName, systemStoreType);
}
}

List<Store> stores = getStoreFromAllClusters(storeName);
if (stores.isEmpty()) {
// If store does NOT existing in graveyard, it means store has never been deleted, return 0 which is the default
// value of largestUsedRTVersionNumber for a new store.
return Store.NON_EXISTING_VERSION;
}
int largestUsedRTVersionNumber = Store.NON_EXISTING_VERSION;
for (Store deletedStore: stores) {
if (deletedStore.getLargestUsedRTVersionNumber() > largestUsedRTVersionNumber) {
largestUsedRTVersionNumber = deletedStore.getLargestUsedRTVersionNumber();
}
}
return largestUsedRTVersionNumber;
}

@Override
public void putStoreIntoGraveyard(String clusterName, Store store) {
int largestUsedVersionNumber = getLargestUsedVersionNumber(store.getName());
int largestUsedRTVersionNumber = getLargestUsedRTVersionNumber(store.getName());

if (store.isMigrating()) {
/**
Expand All @@ -88,7 +114,7 @@ public void putStoreIntoGraveyard(String clusterName, Store store) {
* P: C1:v3*, C2:null
* C: C1:v3*, C2:null
*
* After migration, both clusters shoud have the same store with same largest version and cluster discovery points to C2
* After migration, both clusters should have the same store with same largest version and cluster discovery points to C2
* P: C1:v3, C2:v3*
* C: C1:v3, C2:v3*
*
Expand All @@ -101,7 +127,7 @@ public void putStoreIntoGraveyard(String clusterName, Store store) {
* C: C1:v4, C2:null*
*
* Then I realized the error and want to delete the other store as well, but now I can't delete it because the largest
* version number (3) doesn't match with the one retrived from graveyard (4).
* version number (3) doesn't match with the one retrieved from graveyard (4).
* This check will address to this situation, and keep the largest version number in both graveyards the same.
*/
if (largestUsedVersionNumber > store.getLargestUsedVersionNumber()) {
Expand All @@ -112,23 +138,45 @@ public void putStoreIntoGraveyard(String clusterName, Store store) {
largestUsedVersionNumber);
store.setLargestUsedVersionNumber(largestUsedVersionNumber);
}
} else if (store.getLargestUsedVersionNumber() < largestUsedVersionNumber) {
// largestUsedVersion number in re-created store is smaller than the deleted store. It's should be a issue.
String errorMsg = "Invalid largestUsedVersionNumber: " + store.getLargestUsedVersionNumber() + " in Store: "
+ store.getName() + ", it's smaller than one found in graveyard: " + largestUsedVersionNumber;
LOGGER.error(errorMsg);
throw new VeniceException(errorMsg);
if (largestUsedRTVersionNumber > store.getLargestUsedRTVersionNumber()) {
LOGGER.info(
"Increased largestUsedRTVersionNumber for migrating store {} from {} to {}.",
store.getName(),
store.getLargestUsedVersionNumber(),
largestUsedVersionNumber);
store.setLargestUsedRTVersionNumber(largestUsedRTVersionNumber);
}
} else {
if (store.getLargestUsedVersionNumber() < largestUsedVersionNumber) {
// largestUsedVersion number in re-created store is smaller than the deleted store. It's should be a issue.
String errorMsg = "Invalid largestUsedVersionNumber: " + store.getLargestUsedVersionNumber() + " in Store: "
+ store.getName() + ", it's smaller than one found in graveyard: " + largestUsedVersionNumber;
LOGGER.error(errorMsg);
throw new VeniceException(errorMsg);
}
if (store.getLargestUsedRTVersionNumber() < largestUsedRTVersionNumber) {
// largestUsedRTVersion number in re-created store is smaller than the deleted store. It's should be a issue.
String errorMsg = "Invalid largestUsedRTVersionNumber: " + store.getLargestUsedRTVersionNumber() + " in Store: "
+ store.getName() + ", it's smaller than one found in graveyard: " + largestUsedRTVersionNumber;
LOGGER.error(errorMsg);
throw new VeniceException(errorMsg);
}
}

// Store does not exist in graveyard OR store already exists but the re-created store is deleted again so we need to
// update the ZNode.
HelixUtils.update(dataAccessor, getStoreGraveyardPath(clusterName, store.getName()), store);
updateZNode(clusterName, store);

LOGGER.info(
"Put store: {} into graveyard with largestUsedVersionNumber {}.",
store.getName(),
largestUsedVersionNumber);
}

void updateZNode(String clusterName, Store store) {
HelixUtils.update(dataAccessor, getStoreGraveyardPath(clusterName, store.getName()), store);
}

@Override
public Store getStoreFromGraveyard(String clusterName, String storeName, Stat stat) {
String path = getStoreGraveyardPath(clusterName, storeName);
Expand Down Expand Up @@ -156,7 +204,7 @@ public List<String> listStoreNamesFromGraveyard(String clusterName) {
* @return Matching store from each venice. Normally contains one element.
* If the store existed in some other cluster before, there will be more than one element in the return value.
*/
private List<Store> getStoreFromAllClusters(String storeName) {
List<Store> getStoreFromAllClusters(String storeName) {
List<Store> stores = new ArrayList<>();
for (String clusterName: clusterNames) {
Store store = dataAccessor.get(getStoreGraveyardPath(clusterName, storeName), null, AccessOption.PERSISTENT);
Expand Down Expand Up @@ -196,11 +244,40 @@ private int getPerUserStoreSystemStoreLargestUsedVersionNumber(
return largestUsedVersionNumber;
}

int getPerUserStoreSystemStoreLargestUsedRTVersionNumber(
String userStoreName,
VeniceSystemStoreType systemStoreType) {
String systemStoreName = systemStoreType.getSystemStoreName(userStoreName);
List<Store> deletedStores = getStoreFromAllClusters(userStoreName);
if (deletedStores.isEmpty()) {
LOGGER.info(
"User store: {} does NOT exist in the store graveyard. Hence, no largest used RT version for its system store: {}",
userStoreName,
systemStoreName);
return Store.NON_EXISTING_VERSION;
}
int largestUsedRTVersionNumber = Store.NON_EXISTING_VERSION;
for (Store deletedStore: deletedStores) {
Map<String, SystemStoreAttributes> systemStoreNamesToAttributes = deletedStore.getSystemStores();
SystemStoreAttributes systemStoreAttributes =
systemStoreNamesToAttributes.get(VeniceSystemStoreType.getSystemStoreType(systemStoreName).getPrefix());
if (systemStoreAttributes != null) {
largestUsedRTVersionNumber =
Math.max(largestUsedRTVersionNumber, systemStoreAttributes.getLargestUsedRTVersionNumber());
}
}

if (largestUsedRTVersionNumber == Store.NON_EXISTING_VERSION) {
LOGGER.info("Can not find largest used RT version number for {}.", systemStoreName);
}
return largestUsedRTVersionNumber;
}

private String getGeneralStoreGraveyardPath() {
return getStoreGraveyardPath(PathResourceRegistry.WILDCARD_MATCH_ANY, PathResourceRegistry.WILDCARD_MATCH_ANY);
}

private String getStoreGraveyardPath(String clusterName, String storeName) {
String getStoreGraveyardPath(String clusterName, String storeName) {
return getStoreGraveyardParentPath(clusterName) + "/" + storeName;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,16 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) {
throw new UnsupportedOperationException();
}

@Override
public int getLargestUsedRTVersionNumber() {
return this.delegate.getLargestUsedRTVersionNumber();
}

@Override
public void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber) {
throw new UnsupportedOperationException();
}

@Override
public int getCurrentVersion() {
return this.delegate.getCurrentVersion();
Expand Down Expand Up @@ -855,6 +865,16 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) {
throw new UnsupportedOperationException();
}

@Override
public int getLargestUsedRTVersionNumber() {
return this.delegate.getLargestUsedRTVersionNumber();
}

@Override
public void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber) {
throw new UnsupportedOperationException();
}

@Override
public long getStorageQuotaInByte() {
return this.delegate.getStorageQuotaInByte();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ static boolean isSystemStore(String storeName) {

void setLargestUsedVersionNumber(int largestUsedVersionNumber);

int getLargestUsedRTVersionNumber();

void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber);

long getStorageQuotaInByte();

void setStorageQuotaInByte(long storageQuotaInByte);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,17 @@
*/
public interface StoreGraveyard {
/**
* Retrieve the largest used version number by the given store name from graveyard. Return 0 if the store dose not
* Retrieve the largest used version number by the given store name from graveyard. Return 0 if the store does not
* exist in the graveyard, which is the default value we used for the new store.
*/
int getLargestUsedVersionNumber(String storeName);

/**
* Retrieve the largest used version number for the real time topic by the given store name from graveyard.
* Return 0 if the store does not exist in the graveyard, which is the default value we used for the new store.
*/
int getLargestUsedRTVersionNumber(String storeName);

/**
* Put the given store into graveyard. If the store has already existed in the graveyard, update it by this given
* store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,18 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) {
systemStoreAttributes.setLargestUsedVersionNumber(largestUsedVersionNumber);
}

@Override
public int getLargestUsedRTVersionNumber() {
SystemStoreAttributes systemStoreAttributes = fetchAndBackfillSystemStoreAttributes(true);
return systemStoreAttributes.getLargestUsedRTVersionNumber();
}

@Override
public void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber) {
SystemStoreAttributes systemStoreAttributes = fetchAndBackfillSystemStoreAttributes(false);
systemStoreAttributes.setLargestUsedRTVersionNumber(largestUsedRTVersionNumber);
}

@Override
public long getStorageQuotaInByte() {
return zkSharedStore.getStorageQuotaInByte();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ public interface SystemStoreAttributes extends DataModelBackedStructure<SystemSt

void setLargestUsedVersionNumber(int largestUsedVersionNumber);

int getLargestUsedRTVersionNumber();

void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber);

int getCurrentVersion();

void setCurrentVersion(int currentVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) {
this.dataModel.largestUsedVersionNumber = largestUsedVersionNumber;
}

@Override
public int getLargestUsedRTVersionNumber() {
return this.dataModel.largestUsedRTVersionNumber;
}

@Override
public void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber) {
this.dataModel.largestUsedRTVersionNumber = largestUsedRTVersionNumber;
}

@Override
public int getCurrentVersion() {
return this.dataModel.currentVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,18 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) {
this.storeProperties.largestUsedVersionNumber = largestUsedVersionNumber;
}

@SuppressWarnings("unused") // Used by Serializer/De-serializer for storing to Zoo Keeper
@Override
public int getLargestUsedRTVersionNumber() {
return this.storeProperties.largestUsedRTVersionNumber;
}

@SuppressWarnings("unused") // Used by Serializer/De-serializer for storing to Zoo Keeper
@Override
public void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber) {
this.storeProperties.largestUsedRTVersionNumber = largestUsedRTVersionNumber;
}

@SuppressWarnings("unused") // Used by Serializer/De-serializer for storing to Zoo Keeper
@Override
public long getStorageQuotaInByte() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public enum AvroProtocolDefinition {
/**
* Value schema for metadata system store.
*/
METADATA_SYSTEM_SCHEMA_STORE(27, StoreMetaValue.class),
METADATA_SYSTEM_SCHEMA_STORE(28, StoreMetaValue.class),

/**
* Key schema for push status system store.
Expand Down
Loading

0 comments on commit 667dd27

Please sign in to comment.