diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java index 7dddb841afe..bdd1f2a9ea8 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java @@ -71,6 +71,7 @@ public class ControllerApiConstants { public static final String BATCH_GET_ROUTER_CACHE_ENABLED = "batch_get_router_cache_enabled"; public static final String BATCH_GET_LIMIT = "batch_get_limit"; public static final String LARGEST_USED_VERSION_NUMBER = "largest_used_version_number"; + public static final String LARGEST_USED_RT_VERSION_NUMBER = "largest_used_rt_version_number"; public static final String NUM_VERSIONS_TO_PRESERVE = "num_versions_to_preserve"; public static final String DISABLE_META_STORE = "disable_meta_store"; public static final String DISABLE_DAVINCI_PUSH_STATUS_STORE = "disable_davinci_push_status_store"; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java index 11452d95fe3..9d5e30b3b7a 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java @@ -1190,6 +1190,11 @@ public VersionResponse getStoreLargestUsedVersion(String clusterName, String sto return request(ControllerRoute.GET_STORE_LARGEST_USED_VERSION, params, VersionResponse.class); } + public VersionResponse getStoreLargestUsedRTVersion(String clusterName, String storeName) { + QueryParams params = newParams().add(CLUSTER, clusterName).add(NAME, storeName); + return request(ControllerRoute.GET_STORE_LARGEST_USED_VERSION, params, VersionResponse.class); + } + public RegionPushDetailsResponse getRegionPushDetails(String storeName, boolean isPartitionDetailEnabled) { QueryParams params = newParams().add(NAME, storeName).add(PARTITION_DETAIL_ENABLED, isPartitionDetailEnabled); return request(ControllerRoute.GET_REGION_PUSH_DETAILS, params, RegionPushDetailsResponse.class); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixStoreGraveyard.java b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixStoreGraveyard.java index 65bef764c74..8c9bce19a8b 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixStoreGraveyard.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixStoreGraveyard.java @@ -77,9 +77,35 @@ public int getLargestUsedVersionNumber(String storeName) { return largestUsedVersionNumber; } + @Override + public int getLargestUsedRTVersionNumber(String storeName) { + if (VeniceSystemStoreUtils.isSystemStore(storeName)) { + VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName); + if (systemStoreType != null && systemStoreType.isStoreZkShared()) { + String userStoreName = systemStoreType.extractRegularStoreName(storeName); + return getPerUserStoreSystemStoreLargestUsedRTVersionNumber(userStoreName, systemStoreType); + } + } + + List stores = getStoreFromAllClusters(storeName); + if (stores.isEmpty()) { + // If store does NOT existing in graveyard, it means store has never been deleted, return 0 which is the default + // value of largestUsedRTVersionNumber for a new store. + return Store.NON_EXISTING_VERSION; + } + int largestUsedRTVersionNumber = Store.NON_EXISTING_VERSION; + for (Store deletedStore: stores) { + if (deletedStore.getLargestUsedRTVersionNumber() > largestUsedRTVersionNumber) { + largestUsedRTVersionNumber = deletedStore.getLargestUsedRTVersionNumber(); + } + } + return largestUsedRTVersionNumber; + } + @Override public void putStoreIntoGraveyard(String clusterName, Store store) { int largestUsedVersionNumber = getLargestUsedVersionNumber(store.getName()); + int largestUsedRTVersionNumber = getLargestUsedRTVersionNumber(store.getName()); if (store.isMigrating()) { /** @@ -88,7 +114,7 @@ public void putStoreIntoGraveyard(String clusterName, Store store) { * P: C1:v3*, C2:null * C: C1:v3*, C2:null * - * After migration, both clusters shoud have the same store with same largest version and cluster discovery points to C2 + * After migration, both clusters should have the same store with same largest version and cluster discovery points to C2 * P: C1:v3, C2:v3* * C: C1:v3, C2:v3* * @@ -101,7 +127,7 @@ public void putStoreIntoGraveyard(String clusterName, Store store) { * C: C1:v4, C2:null* * * Then I realized the error and want to delete the other store as well, but now I can't delete it because the largest - * version number (3) doesn't match with the one retrived from graveyard (4). + * version number (3) doesn't match with the one retrieved from graveyard (4). * This check will address to this situation, and keep the largest version number in both graveyards the same. */ if (largestUsedVersionNumber > store.getLargestUsedVersionNumber()) { @@ -112,23 +138,45 @@ public void putStoreIntoGraveyard(String clusterName, Store store) { largestUsedVersionNumber); store.setLargestUsedVersionNumber(largestUsedVersionNumber); } - } else if (store.getLargestUsedVersionNumber() < largestUsedVersionNumber) { - // largestUsedVersion number in re-created store is smaller than the deleted store. It's should be a issue. - String errorMsg = "Invalid largestUsedVersionNumber: " + store.getLargestUsedVersionNumber() + " in Store: " - + store.getName() + ", it's smaller than one found in graveyard: " + largestUsedVersionNumber; - LOGGER.error(errorMsg); - throw new VeniceException(errorMsg); + if (largestUsedRTVersionNumber > store.getLargestUsedRTVersionNumber()) { + LOGGER.info( + "Increased largestUsedRTVersionNumber for migrating store {} from {} to {}.", + store.getName(), + store.getLargestUsedVersionNumber(), + largestUsedVersionNumber); + store.setLargestUsedRTVersionNumber(largestUsedRTVersionNumber); + } + } else { + if (store.getLargestUsedVersionNumber() < largestUsedVersionNumber) { + // largestUsedVersion number in re-created store is smaller than the deleted store. It's should be a issue. + String errorMsg = "Invalid largestUsedVersionNumber: " + store.getLargestUsedVersionNumber() + " in Store: " + + store.getName() + ", it's smaller than one found in graveyard: " + largestUsedVersionNumber; + LOGGER.error(errorMsg); + throw new VeniceException(errorMsg); + } + if (store.getLargestUsedRTVersionNumber() < largestUsedRTVersionNumber) { + // largestUsedRTVersion number in re-created store is smaller than the deleted store. It's should be a issue. + String errorMsg = "Invalid largestUsedRTVersionNumber: " + store.getLargestUsedRTVersionNumber() + " in Store: " + + store.getName() + ", it's smaller than one found in graveyard: " + largestUsedRTVersionNumber; + LOGGER.error(errorMsg); + throw new VeniceException(errorMsg); + } } // Store does not exist in graveyard OR store already exists but the re-created store is deleted again so we need to // update the ZNode. - HelixUtils.update(dataAccessor, getStoreGraveyardPath(clusterName, store.getName()), store); + updateZNode(clusterName, store); + LOGGER.info( "Put store: {} into graveyard with largestUsedVersionNumber {}.", store.getName(), largestUsedVersionNumber); } + void updateZNode(String clusterName, Store store) { + HelixUtils.update(dataAccessor, getStoreGraveyardPath(clusterName, store.getName()), store); + } + @Override public Store getStoreFromGraveyard(String clusterName, String storeName, Stat stat) { String path = getStoreGraveyardPath(clusterName, storeName); @@ -156,7 +204,7 @@ public List listStoreNamesFromGraveyard(String clusterName) { * @return Matching store from each venice. Normally contains one element. * If the store existed in some other cluster before, there will be more than one element in the return value. */ - private List getStoreFromAllClusters(String storeName) { + List getStoreFromAllClusters(String storeName) { List stores = new ArrayList<>(); for (String clusterName: clusterNames) { Store store = dataAccessor.get(getStoreGraveyardPath(clusterName, storeName), null, AccessOption.PERSISTENT); @@ -196,11 +244,40 @@ private int getPerUserStoreSystemStoreLargestUsedVersionNumber( return largestUsedVersionNumber; } + int getPerUserStoreSystemStoreLargestUsedRTVersionNumber( + String userStoreName, + VeniceSystemStoreType systemStoreType) { + String systemStoreName = systemStoreType.getSystemStoreName(userStoreName); + List deletedStores = getStoreFromAllClusters(userStoreName); + if (deletedStores.isEmpty()) { + LOGGER.info( + "User store: {} does NOT exist in the store graveyard. Hence, no largest used RT version for its system store: {}", + userStoreName, + systemStoreName); + return Store.NON_EXISTING_VERSION; + } + int largestUsedRTVersionNumber = Store.NON_EXISTING_VERSION; + for (Store deletedStore: deletedStores) { + Map systemStoreNamesToAttributes = deletedStore.getSystemStores(); + SystemStoreAttributes systemStoreAttributes = + systemStoreNamesToAttributes.get(VeniceSystemStoreType.getSystemStoreType(systemStoreName).getPrefix()); + if (systemStoreAttributes != null) { + largestUsedRTVersionNumber = + Math.max(largestUsedRTVersionNumber, systemStoreAttributes.getLargestUsedRTVersionNumber()); + } + } + + if (largestUsedRTVersionNumber == Store.NON_EXISTING_VERSION) { + LOGGER.info("Can not find largest used RT version number for {}.", systemStoreName); + } + return largestUsedRTVersionNumber; + } + private String getGeneralStoreGraveyardPath() { return getStoreGraveyardPath(PathResourceRegistry.WILDCARD_MATCH_ANY, PathResourceRegistry.WILDCARD_MATCH_ANY); } - private String getStoreGraveyardPath(String clusterName, String storeName) { + String getStoreGraveyardPath(String clusterName, String storeName) { return getStoreGraveyardParentPath(clusterName) + "/" + storeName; } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java index 2137d1686f6..76ac143f2dd 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java @@ -715,6 +715,16 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) { throw new UnsupportedOperationException(); } + @Override + public int getLargestUsedRTVersionNumber() { + return this.delegate.getLargestUsedRTVersionNumber(); + } + + @Override + public void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber) { + throw new UnsupportedOperationException(); + } + @Override public int getCurrentVersion() { return this.delegate.getCurrentVersion(); @@ -855,6 +865,16 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) { throw new UnsupportedOperationException(); } + @Override + public int getLargestUsedRTVersionNumber() { + return this.delegate.getLargestUsedRTVersionNumber(); + } + + @Override + public void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber) { + throw new UnsupportedOperationException(); + } + @Override public long getStorageQuotaInByte() { return this.delegate.getStorageQuotaInByte(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java index 917353cfa34..6e9e61a9d98 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java @@ -80,6 +80,10 @@ static boolean isSystemStore(String storeName) { void setLargestUsedVersionNumber(int largestUsedVersionNumber); + int getLargestUsedRTVersionNumber(); + + void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber); + long getStorageQuotaInByte(); void setStorageQuotaInByte(long storageQuotaInByte); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreGraveyard.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreGraveyard.java index 83b5511f254..e9a4b619829 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreGraveyard.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreGraveyard.java @@ -12,11 +12,17 @@ */ public interface StoreGraveyard { /** - * Retrieve the largest used version number by the given store name from graveyard. Return 0 if the store dose not + * Retrieve the largest used version number by the given store name from graveyard. Return 0 if the store does not * exist in the graveyard, which is the default value we used for the new store. */ int getLargestUsedVersionNumber(String storeName); + /** + * Retrieve the largest used version number for the real time topic by the given store name from graveyard. + * Return 0 if the store does not exist in the graveyard, which is the default value we used for the new store. + */ + int getLargestUsedRTVersionNumber(String storeName); + /** * Put the given store into graveyard. If the store has already existed in the graveyard, update it by this given * store. diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java index 8e2e2091a4e..954e4e5f1f7 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java @@ -186,6 +186,18 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) { systemStoreAttributes.setLargestUsedVersionNumber(largestUsedVersionNumber); } + @Override + public int getLargestUsedRTVersionNumber() { + SystemStoreAttributes systemStoreAttributes = fetchAndBackfillSystemStoreAttributes(true); + return systemStoreAttributes.getLargestUsedRTVersionNumber(); + } + + @Override + public void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber) { + SystemStoreAttributes systemStoreAttributes = fetchAndBackfillSystemStoreAttributes(false); + systemStoreAttributes.setLargestUsedRTVersionNumber(largestUsedRTVersionNumber); + } + @Override public long getStorageQuotaInByte() { return zkSharedStore.getStorageQuotaInByte(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStoreAttributes.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStoreAttributes.java index 3a09f1586a8..799e1273060 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStoreAttributes.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStoreAttributes.java @@ -11,6 +11,10 @@ public interface SystemStoreAttributes extends DataModelBackedStructure REWIND_FROM_EOP (replay from 'EOP - rewindTimeInSeconds'), 1 => REWIND_FROM_SOP (replay from 'SOP - rewindTimeInSeconds')", + "default": 0 + }, + {"name": "realTimeTopicName", "type": "string", "default": "", "doc": "Name of the real time topic this store/version uses"} + ] + } + ], + "default": null + }, + { + "name": "views", + "doc": "A map of views which describe and configure a downstream view of a venice store. Keys in this map are for convenience of managing configs.", + "type": { + "type":"map", + "values": { + "name": "StoreViewConfig", + "type": "record", + "doc": "A configuration for a particular view. This config should inform Venice leaders how to transform and transmit data to destination views.", + "fields": [ + { + "name": "viewClassName", + "type": "string", + "doc": "This informs what kind of view we are materializing. This then informs what kind of parameters are passed to parse this input. This is expected to be a fully formed class path name for materialization.", + "default": "" + }, + { + "name": "viewParameters", + "doc": "Optional parameters to be passed to the given view config.", + "type": ["null", + { + "type": "map", + "java-key-class": "java.lang.String", + "avro.java.string": "String", + "values": { "type": "string", "avro.java.string": "String" } + } + ], + "default": null + } + ] + } + }, + "default": {} + }, + {"name": "accessControlled", "type": "boolean", "default": true, "doc": "Store-level ACL switch. When disabled, Venice Router should accept every request."}, + {"name": "compressionStrategy", "type": "int", "default": 0, "doc": "Strategy used to compress/decompress Record's value, and default is 'NO_OP'"}, + {"name": "clientDecompressionEnabled", "type": "boolean", "default": true, "doc": "le/Disable client-side record decompression (default: true)"}, + {"name": "chunkingEnabled", "type": "boolean", "default": false, "doc": "Whether current store supports large value (typically more than 1MB). By default, the chunking feature is disabled."}, + {"name": "rmdChunkingEnabled", "type": "boolean", "default": false, "doc": "Whether current store supports large replication metadata (typically more than 1MB). By default, the chunking feature is disabled."}, + {"name": "batchGetLimit", "type": "int", "default": -1, "doc": "Batch get key number limit, and Venice will use cluster-level config if it is not positive."}, + {"name": "numVersionsToPreserve", "type": "int", "default": 0, "doc": "How many versions this store preserve at most. By default it's 0 means we use the cluster level config to determine how many version is preserved."}, + {"name": "incrementalPushEnabled", "type": "boolean", "default": false, "doc": "Flag to see if the store supports incremental push or not"}, + {"name": "separateRealTimeTopicEnabled", "type": "boolean", "default": false, "doc": "Flag to see if the store supports separate real-time topic for incremental push."}, + {"name": "migrating", "type": "boolean", "default": false, "doc": "Whether or not the store is in the process of migration."}, + {"name": "writeComputationEnabled", "type": "boolean", "default": false, "doc": "Whether or not write-path computation feature is enabled for this store."}, + {"name": "readComputationEnabled", "type": "boolean", "default": false, "doc": "Whether read-path computation is enabled for this store."}, + {"name": "bootstrapToOnlineTimeoutInHours", "type": "int", "default": 24, "doc": "Maximum number of hours allowed for the store to transition from bootstrap to online state."}, + {"name": "leaderFollowerModelEnabled", "type": "boolean", "default": false, "doc": "Whether or not to use leader follower state transition model for upcoming version."}, + {"name": "nativeReplicationEnabled", "type": "boolean", "default": false, "doc": "Whether or not native should be enabled for this store. Will only successfully apply if leaderFollowerModelEnabled is also true either in this update or a previous version of the store."}, + {"name": "replicationMetadataVersionID", "type": "int", "default": -1, "doc": "RMD (Replication metadata) version ID on the store-level. Default -1 means NOT_SET and the cluster-level RMD version ID should be used for stores."}, + {"name": "pushStreamSourceAddress", "type": "string", "default": "", "doc": "Address to the kafka broker which holds the source of truth topic for this store version."}, + {"name": "backupStrategy", "type": "int", "default": 1, "doc": "Strategies to store backup versions, and default is 'DELETE_ON_NEW_PUSH_START'"}, + {"name": "schemaAutoRegisteFromPushJobEnabled", "type": "boolean", "default": false, "doc": "Whether or not value schema auto registration enabled from push job for this store."}, + {"name": "latestSuperSetValueSchemaId", "type": "int", "default": -1, "doc": "For read compute stores with auto super-set schema enabled, stores the latest super-set value schema ID."}, + {"name": "hybridStoreDiskQuotaEnabled", "type": "boolean", "default": false, "doc": "Whether or not storage disk quota is enabled for a hybrid store. This store config cannot be enabled until the routers and servers in the corresponding cluster are upgraded to the right version: 0.2.249 or above for routers and servers."}, + {"name": "storeMetadataSystemStoreEnabled", "type": "boolean", "default": false, "doc": "Whether or not the store metadata system store is enabled for this store."}, + { + "name": "etlConfig", + "doc": "Properties related to ETL Store behavior.", + "type": [ + "null", + { + "name": "StoreETLConfig", + "type": "record", + "fields": [ + {"name": "etledUserProxyAccount", "type": "string", "doc": "If enabled regular ETL or future version ETL, this account name is part of path for where the ETLed snapshots will go. for example, for user account veniceetl001, snapshots will be published to HDFS /jobs/veniceetl001/storeName."}, + {"name": "regularVersionETLEnabled", "type": "boolean", "doc": "Whether or not enable regular version ETL for this store."}, + {"name": "futureVersionETLEnabled", "type": "boolean", "doc": "Whether or not enable future version ETL - the version that might come online in future - for this store."} + ] + } + ], + "default": null + }, + { + "name": "partitionerConfig", + "doc": "", + "type": [ + "null", + { + "name": "StorePartitionerConfig", + "type": "record", + "fields": [ + {"name": "partitionerClass", "type": "string"}, + {"name": "partitionerParams", "type": {"type": "map", "values": "string"}}, + {"name": "amplificationFactor", "type": "int"} + ] + } + ], + "default": null + }, + {"name": "incrementalPushPolicy", "type": "int", "default": 0, "doc": "Incremental Push Policy to reconcile with real time pushes, and default is 'PUSH_TO_VERSION_TOPIC'"}, + {"name": "latestVersionPromoteToCurrentTimestamp", "type": "long", "default": -1, "doc": "This is used to track the time when a new version is promoted to current version. For now, it is mostly to decide whether a backup version can be removed or not based on retention. For the existing store before this code change, it will be set to be current timestamp."}, + {"name": "backupVersionRetentionMs", "type": "long", "default": -1, "doc": "Backup retention time, and if it is not set (-1), Venice Controller will use the default configured retention. {@link com.linkedin.venice.ConfigKeys#CONTROLLER_BACKUP_VERSION_DEFAULT_RETENTION_MS}."}, + {"name": "replicationFactor", "type": "int", "default": 3, "doc": "The number of replica each store version will keep."}, + {"name": "migrationDuplicateStore", "type": "boolean", "default": false, "doc": "Whether or not the store is a duplicate store in the process of migration."}, + {"name": "nativeReplicationSourceFabric", "type": "string", "default": "", "doc": "The source fabric name to be uses in native replication. Remote consumption will happen from kafka in this fabric."}, + {"name": "daVinciPushStatusStoreEnabled", "type": "boolean", "default": false, "doc": "Whether or not davinci push status store is enabled."}, + {"name": "storeMetaSystemStoreEnabled", "type": "boolean", "default": false, "doc": "Whether or not the store meta system store is enabled for this store."}, + {"name": "activeActiveReplicationEnabled", "type": "boolean", "default": false, "doc": "Whether or not active/active replication is enabled for hybrid stores; eventually this config will replace native replication flag, when all stores are on A/A"}, + {"name": "applyTargetVersionFilterForIncPush", "type": "boolean", "default": false, "doc": "Whether or not the target version field in Kafka messages will be used in increment push to RT policy"}, + {"name": "minCompactionLagSeconds", "type": "long", "default": -1, "doc": "Store level min compaction lag config and if not specified, it will use the global config for version topics"}, + {"name": "maxCompactionLagSeconds", "type": "long", "default": -1, "doc": "Store level max compaction lag config and if not specified, 'max.compaction.lag.ms' config won't be setup in the corresponding version topics"}, + {"name": "maxRecordSizeBytes", "type": "int", "default": -1, "doc": "Store-level max record size in bytes. If not specified (-1), the controller config 'default.max.record.size.bytes' (100MB default) will be backfilled"}, + {"name": "maxNearlineRecordSizeBytes", "type": "int", "default": -1, "doc": "Store-level max record size in bytes for nearline jobs with partial updates. If not specified (-1), the server config 'default.max.record.size.bytes' (100MB default) will be backfilled. This may converge with maxRecordSizeBytes in the future"}, + {"name": "unusedSchemaDeletionEnabled", "type": "boolean", "default": false, "doc": "Store level config to indicate whether unused schema deletion is enabled or not."}, + { + "name": "versions", + "doc": "List of non-retired versions. It's currently sorted and there is code run under the assumption that the last element in the list is the largest. Check out {VeniceHelixAdmin#getIncrementalPushVersion}, and please make it in mind if you want to change this logic", + "type": { + "type": "array", + "items": { + "name": "StoreVersion", + "type": "record", + "doc": "Type describes all the version attributes", + "fields": [ + {"name": "storeName", "type": "string", "doc": "Name of the store which this version belong to."}, + {"name": "number", "type": "int", "doc": "Version number."}, + {"name": "createdTime", "type": "long", "doc": "Time when this version was created."}, + {"name": "status", "type": "int", "default": 1, "doc": "Status of version, and default is 'STARTED'"}, + {"name": "pushJobId", "type": "string", "default": ""}, + {"name": "compressionStrategy", "type": "int", "default": 0, "doc": "strategies used to compress/decompress Record's value, and default is 'NO_OP'"}, + {"name": "leaderFollowerModelEnabled", "type": "boolean", "default": false, "doc": "Whether or not to use leader follower state transition."}, + {"name": "nativeReplicationEnabled", "type": "boolean", "default": false, "doc": "Whether or not native replication is enabled."}, + {"name": "pushStreamSourceAddress", "type": "string", "default": "", "doc": "Address to the kafka broker which holds the source of truth topic for this store version."}, + {"name": "bufferReplayEnabledForHybrid", "type": "boolean", "default": true, "doc": "Whether or not to enable buffer replay for hybrid."}, + {"name": "chunkingEnabled", "type": "boolean", "default": false, "doc": "Whether or not large values are supported (via chunking)."}, + {"name": "rmdChunkingEnabled", "type": "boolean", "default": false, "doc": "Whether or not large replication metadata are supported (via chunking)."}, + {"name": "pushType", "type": "int", "default": 0, "doc": "Producer type for this version, and default is 'BATCH'"}, + {"name": "partitionCount", "type": "int", "default": 0, "doc": "Partition count of this version."}, + { + "name": "partitionerConfig", + "type": [ + "null", + "com.linkedin.venice.systemstore.schemas.StorePartitionerConfig" + ], + "default": null, + "doc": "Config for custom partitioning." + }, + {"name": "incrementalPushPolicy", "type": "int", "default": 0, "doc": "Incremental Push Policy to reconcile with real time pushes., and default is 'PUSH_TO_VERSION_TOPIC'"}, + {"name": "replicationFactor", "type": "int", "default": 3, "doc": "The number of replica this store version is keeping."}, + {"name": "nativeReplicationSourceFabric", "type": "string", "default": "", "doc": "The source fabric name to be uses in native replication. Remote consumption will happen from kafka in this fabric."}, + {"name": "incrementalPushEnabled", "type": "boolean", "default": false, "doc": "Flag to see if the store supports incremental push or not"}, + {"name": "separateRealTimeTopicEnabled", "type": "boolean", "default": false, "doc": "Flag to see if the store supports separate real-time topic for incremental push."}, + {"name": "blobTransferEnabled", "type": "boolean", "default": false, "doc": "Flag to indicate if the blob transfer is allowed or not"}, + {"name": "useVersionLevelIncrementalPushEnabled", "type": "boolean", "default": false, "doc": "Flag to see if incrementalPushEnabled config at StoreVersion should be used. This is needed during migration of this config from Store level to Version level. We can deprecate this field later."}, + { + "name": "hybridConfig", + "type": [ + "null", + "com.linkedin.venice.systemstore.schemas.StoreHybridConfig" + ], + "default": null, + "doc": "Properties related to Hybrid Store behavior. If absent (null), then the store is not hybrid." + }, + {"name": "useVersionLevelHybridConfig", "type": "boolean", "default": false, "doc": "Flag to see if hybridConfig at StoreVersion should be used. This is needed during migration of this config from Store level to Version level. We can deprecate this field later."}, + {"name": "activeActiveReplicationEnabled", "type": "boolean", "default": false, "doc": "Whether or not active/active replication is enabled for hybrid stores; eventually this config will replace native replication flag, when all stores are on A/A"}, + {"name": "timestampMetadataVersionId", "type": "int", "default": -1, "doc": "The A/A timestamp metadata schema version ID that will be used to deserialize metadataPayload."}, + { + "name": "dataRecoveryConfig", + "type": [ + "null", + { + "name": "DataRecoveryConfig", + "type": "record", + "fields": [ + {"name": "dataRecoverySourceFabric", "type": "string", "doc": "The fabric name to be used as the source for data recovery."}, + {"name": "isDataRecoveryComplete", "type": "boolean", "doc": "Whether or not data recovery is complete."}, + {"name": "dataRecoverySourceVersionNumber", "type": "int", "default": 0, "doc": "The store version number to be used as the source for data recovery."} + ] + } + ], + "default": null, + "doc": "Properties related to data recovery mode behavior for this version. If absent (null), then the version never went go through data recovery." + }, + {"name": "deferVersionSwap", "type": "boolean", "default": false, "doc": "flag that informs venice controller to defer marking this version as the serving version after instances report ready to serve. This version must be marked manually as the current version in order to serve traffic from it."}, + { + "name": "views", + "doc": "A list of views which describe and configure a downstream view of a venice store.", + "type": { + "type": "map", + "java-key-class": "java.lang.String", + "avro.java.string": "String", + "values": "com.linkedin.venice.systemstore.schemas.StoreViewConfig" + }, + "default": {} + }, + {"name": "repushSourceVersion", "type": "int", "default": -1, "doc": "For store version created from repush, indicates the source store version its created from."}, + {"name": "targetSwapRegion", "type": "string", "default": "", "doc": "Controls what region to swap in the current version during target colo push"}, + {"name": "targetSwapRegionWaitTime", "type": "int", "default": 60, "doc": "Controls how long to wait in minutes before swapping the version on the regions"}, + {"name": "isDaVinciHeartBeatReported", "type": "boolean", "default": false, "doc": "Flag to indicate whether DVC is bootstrapping and sending heartbeats"} + ] + } + }, + "default": [] + }, + { + "name": "systemStores", + "doc": "This field is used to maintain a mapping between each type of system store and the corresponding distinct properties", + "type": { + "type": "map", + "values": { + "name": "SystemStoreProperties", + "type": "record", + "doc": "This type describes all the distinct properties", + "fields": [ + {"name": "largestUsedVersionNumber", "type": "int", "default": 0}, + {"name": "largestUsedRTVersionNumber", "type": "int", "default": 0}, + {"name": "currentVersion", "type": "int", "default": 0}, + {"name": "latestVersionPromoteToCurrentTimestamp", "type": "long", "default": -1}, + {"name": "versions", "type": {"type": "array", "items": "com.linkedin.venice.systemstore.schemas.StoreVersion"}, "default": []} + ] + } + }, + "default": {} + }, + {"name": "storageNodeReadQuotaEnabled", "type": "boolean", "default": false, "doc": "Controls the storage node read quota enforcement for the given Venice store"}, + {"name": "blobTransferEnabled", "type": "boolean", "default": false, "doc": "Flag to indicate if the blob transfer is allowed or not"}, + {"name": "nearlineProducerCompressionEnabled", "type": "boolean", "default": true, "doc": "Flag to control whether the producer in Server for near-line workload will enable compression or not"}, + {"name": "nearlineProducerCountPerWriter", "type": "int", "default": 1, "doc": "How many producers will be used for the nearline producer in Server to improve producing throughput"}, + {"name": "targetSwapRegion", "type": "string", "default": "", "doc": "Controls what region to swap in the current version during target colo push"}, + {"name": "targetSwapRegionWaitTime", "type": "int", "default": 60, "doc": "Controls how long to wait in minutes before swapping the version on the regions"}, + {"name": "isDaVinciHeartBeatReported", "type": "boolean", "default": false, "doc": "Flag to indicate whether DVC is bootstrapping and sending heartbeats"} + ] + } + ], + "default": null + }, + { + "name": "storeKeySchemas", + "doc": "", + "type": [ + "null", + { + "name": "StoreKeySchemas", + "doc": "This type describes the key schemas of the store", + "type": "record", + "fields": [ + { + "name": "keySchemaMap", + "doc": "A string to string map representing the mapping from id to key schema.", + "type": { + "type": "map", + "values": "string" + } + } + ] + } + ], + "default": null + }, + { + "name": "storeValueSchemas", + "doc": "", + "type": [ + "null", + { + "name": "StoreValueSchemas", + "doc": "This type describes the value schemas of the store.", + "type": "record", + "fields": [ + { + "name": "valueSchemaMap", + "doc": "A string to string map representing the mapping from schema id to value schema string. The value could be an empty string indicating the value schema is stored in another field.", + "type": { + "type": "map", + "values": "string" + } + } + ] + } + ], + "default": null + }, + { + "name": "storeValueSchema", + "doc": "", + "type": [ + "null", + { + "name": "StoreValueSchema", + "doc": "This type describes a single version of the value schema of the store.", + "type": "record", + "fields": [ + { + "name": "valueSchema", + "doc": "Store value schema string.", + "type": "string", + "default": "" + } + ] + } + ], + "default": null + }, + { + "name": "storeReplicaStatuses", + "doc": "This field describes the replica statuses per version per partition, and the mapping is 'host_port' -> 'replica status'", + "type": [ + "null", + { + "type": "map", + "values": { + "name": "StoreReplicaStatus", + "type": "record", + "doc": "This structure will contain all kinds of info related to one replica", + "fields": [ + {"name": "status", "type": "int", "doc": "replica status"} + ] + } + } + ], + "default": null + }, + { + "name": "storeValueSchemaIdsWrittenPerStoreVersion", + "doc": "This field described the set of value schemas id written by a store version.", + "type": [ + "null", + { + "name": "StoreValueSchemaIdsWrittenPerStoreVersion", + "doc": "This type describes value schema IDs written by the store version.", + "type": "array", + "items": "int" + } + ], + "default": null + }, + { + "name": "storeClusterConfig", + "doc": "This is the Zk's StoreConfig equivalent which contains various Venice cluster information", + "type": [ + "null", + { + "name": "StoreClusterConfig", + "doc": "This type describes the various Venice cluster information for a store", + "type": "record", + "fields": [ + {"name": "cluster", "type": "string", "default": "", "doc": "The Venice cluster of the store."}, + {"name": "deleting", "type": "boolean", "default": false, "doc": "Is the store undergoing deletion."}, + {"name": "migrationDestCluster", "type": ["null", "string"], "default": null, "doc": "The destination cluster for store migration"}, + {"name": "migrationSrcCluster", "type": ["null", "string"], "default": null, "doc": "The source cluster for store migration"}, + {"name": "storeName", "type": "string", "default": "", "doc": "The name of the store"} + ] + } + ], + "default": null + } + ] +} diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/helix/HelixStoreGraveyardUnitTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/helix/HelixStoreGraveyardUnitTest.java new file mode 100644 index 00000000000..d9cf4e62a4c --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/helix/HelixStoreGraveyardUnitTest.java @@ -0,0 +1,109 @@ +package com.linkedin.venice.helix; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; + +import com.linkedin.venice.common.VeniceSystemStoreType; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.SystemStoreAttributes; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.helix.zookeeper.impl.client.ZkClient; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class HelixStoreGraveyardUnitTest { + private HelixStoreGraveyard graveyard; + + @BeforeMethod + void setUp() { + graveyard = + spy(new HelixStoreGraveyard(mock(ZkClient.class), new HelixAdapterSerializer(), Arrays.asList("clusterName"))); + doNothing().when(graveyard).updateZNode(anyString(), any()); + } + + @Test + void testNoStoreFoundReturnsDefault() { + String storeName = "non_existent_store"; + when(graveyard.getStoreFromAllClusters(storeName)).thenReturn(Collections.emptyList()); + + int result = graveyard.getLargestUsedRTVersionNumber(storeName); + assertEquals(result, Store.NON_EXISTING_VERSION); + } + + @Test + void testFindsLargestUsedRTVersionNumber() { + String storeName = "test_store"; + + Store store1 = mock(Store.class); + Store store2 = mock(Store.class); + when(store1.getLargestUsedRTVersionNumber()).thenReturn(2); + when(store2.getLargestUsedRTVersionNumber()).thenReturn(5); + + List stores = Arrays.asList(store1, store2); + when(graveyard.getStoreFromAllClusters(storeName)).thenReturn(stores); + + int result = graveyard.getLargestUsedRTVersionNumber(storeName); + assertEquals(result, 5); + } + + @Test + void testPutStoreIntoGraveyardMigratingStoreUpdatesVersions() { + Store store = mock(Store.class); + when(store.isMigrating()).thenReturn(true); + when(store.getName()).thenReturn("migrating_store"); + when(store.getLargestUsedVersionNumber()).thenReturn(2); + when(store.getLargestUsedRTVersionNumber()).thenReturn(3); + + when(graveyard.getLargestUsedVersionNumber("migrating_store")).thenReturn(4); + when(graveyard.getLargestUsedRTVersionNumber("migrating_store")).thenReturn(5); + + graveyard.putStoreIntoGraveyard("cluster1", store); + + verify(store).setLargestUsedVersionNumber(4); + verify(store).setLargestUsedRTVersionNumber(5); + } + + @Test + void testNoDeletedStores() { + String userStoreName = "userStore"; + when(graveyard.getStoreFromAllClusters(userStoreName)).thenReturn(Collections.emptyList()); + + int result = graveyard.getPerUserStoreSystemStoreLargestUsedRTVersionNumber( + userStoreName, + VeniceSystemStoreType.BATCH_JOB_HEARTBEAT_STORE); + + assertEquals(result, Store.NON_EXISTING_VERSION); + } + + @Test + void testStoreWithValidSystemStoreAttributes() { + String userStoreName = "userStore"; + Store deletedStore = mock(Store.class); + SystemStoreAttributes attributes = mock(SystemStoreAttributes.class); + when(attributes.getLargestUsedRTVersionNumber()).thenReturn(5); + + Map systemStoreNamesToAttributes = new HashMap<>(); + systemStoreNamesToAttributes.put(VeniceSystemStoreType.BATCH_JOB_HEARTBEAT_STORE.getPrefix(), attributes); + when(deletedStore.getSystemStores()).thenReturn(systemStoreNamesToAttributes); + + List deletedStores = Collections.singletonList(deletedStore); + when(graveyard.getStoreFromAllClusters(userStoreName)).thenReturn(deletedStores); + + int result = graveyard.getPerUserStoreSystemStoreLargestUsedRTVersionNumber( + userStoreName, + VeniceSystemStoreType.BATCH_JOB_HEARTBEAT_STORE); + + assertEquals(result, 5); + } +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/helix/TestHybridStoreRepartitioningWithMultiDataCenter.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/helix/TestHybridStoreRepartitioningWithMultiDataCenter.java new file mode 100644 index 00000000000..a822b990b21 --- /dev/null +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/helix/TestHybridStoreRepartitioningWithMultiDataCenter.java @@ -0,0 +1,134 @@ +package com.linkedin.venice.helix; + +import static com.linkedin.venice.ConfigKeys.CONTROLLER_ENABLE_HYBRID_STORE_PARTITION_COUNT_UPDATE; +import static com.linkedin.venice.ConfigKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS; +import static com.linkedin.venice.ConfigKeys.DEFAULT_NUMBER_OF_PARTITION_FOR_HYBRID; +import static com.linkedin.venice.ConfigKeys.DEFAULT_PARTITION_SIZE; + +import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.ControllerResponse; +import com.linkedin.venice.controllerapi.NewStoreResponse; +import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; +import com.linkedin.venice.integration.utils.ServiceFactory; +import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper; +import com.linkedin.venice.integration.utils.VeniceMultiRegionClusterCreateOptions; +import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; +import com.linkedin.venice.meta.BackupStrategy; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.pubsub.manager.TopicManager; +import com.linkedin.venice.utils.TestWriteUtils; +import com.linkedin.venice.utils.Time; +import com.linkedin.venice.utils.Utils; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.stream.IntStream; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestHybridStoreRepartitioningWithMultiDataCenter { + private static final int TEST_TIMEOUT = 90_000; // ms + private static final int NUMBER_OF_CHILD_DATACENTERS = 2; + private static final int NUMBER_OF_CLUSTERS = 1; + private static final String[] CLUSTER_NAMES = + IntStream.range(0, NUMBER_OF_CLUSTERS).mapToObj(i -> "venice-cluster" + i).toArray(String[]::new); + private List childDatacenters; + private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper; + List topicManagers; + + @BeforeClass + public void setUp() { + Properties controllerProps = new Properties(); + controllerProps.put(DEFAULT_NUMBER_OF_PARTITION_FOR_HYBRID, 2); + controllerProps.put(DEFAULT_MAX_NUMBER_OF_PARTITIONS, 3); + controllerProps.put(DEFAULT_PARTITION_SIZE, 1024); + controllerProps.put(CONTROLLER_ENABLE_HYBRID_STORE_PARTITION_COUNT_UPDATE, true); + VeniceMultiRegionClusterCreateOptions.Builder optionsBuilder = + new VeniceMultiRegionClusterCreateOptions.Builder().numberOfRegions(NUMBER_OF_CHILD_DATACENTERS) + .numberOfClusters(NUMBER_OF_CLUSTERS) + .numberOfParentControllers(1) + .numberOfChildControllers(1) + .numberOfServers(1) + .numberOfRouters(1) + .replicationFactor(1) + .forkServer(false) + .parentControllerProperties(controllerProps) + .childControllerProperties(controllerProps); + multiRegionMultiClusterWrapper = + ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(optionsBuilder.build()); + + childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); + topicManagers = new ArrayList<>(2); + topicManagers + .add(childDatacenters.get(0).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager()); + topicManagers + .add(childDatacenters.get(1).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager()); + } + + @AfterClass(alwaysRun = true) + public void cleanUp() { + Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper); + } + + @Test(timeOut = TEST_TIMEOUT) + public void testLargestUsedRTVersionNumber() { + String storeName = Utils.getUniqueString("TestLargestUsedRTVersionNumber"); + String clusterName = CLUSTER_NAMES[0]; + String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString(); + + ControllerClient parentControllerClient = + ControllerClient.constructClusterControllerClient(clusterName, parentControllerURLs); + ControllerClient[] childControllerClients = new ControllerClient[childDatacenters.size()]; + for (int i = 0; i < childDatacenters.size(); i++) { + childControllerClients[i] = + new ControllerClient(clusterName, childDatacenters.get(i).getControllerConnectString()); + } + + NewStoreResponse newStoreResponse = + parentControllerClient.retryableRequest(5, c -> c.createNewStore(storeName, "", "\"string\"", "\"string\"")); + Assert.assertFalse( + newStoreResponse.isError(), + "The NewStoreResponse returned an error: " + newStoreResponse.getError()); + + UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams(); + updateStoreParams.setEnableReads(false).setEnableWrites(false); + + TestWriteUtils.updateStore(storeName, parentControllerClient, updateStoreParams); + + ControllerResponse deleteStoreResponse = + childControllerClients[0].retryableRequest(5, c -> c.deleteStore(storeName)); + Assert.assertFalse( + deleteStoreResponse.isError(), + "The DeleteStoreResponse returned an error: " + deleteStoreResponse.getError()); + + newStoreResponse = + childControllerClients[0].retryableRequest(5, c -> c.createNewStore(storeName, "", "\"string\"", "\"string\"")); + Assert.assertFalse( + newStoreResponse.isError(), + "The NewStoreResponse returned an error: " + newStoreResponse.getError()); + + String newRealTimeTopicName = "NewRealTimeTopicName" + Version.REAL_TIME_TOPIC_SUFFIX; + updateStoreParams = new UpdateStoreQueryParams(); + updateStoreParams.setIncrementalPushEnabled(true) + .setBackupStrategy(BackupStrategy.KEEP_MIN_VERSIONS) + .setNumVersionsToPreserve(2) + .setHybridRewindSeconds(1000) + .setActiveActiveReplicationEnabled(true) + .setRealTimeTopicName(newRealTimeTopicName) + .setEnableWrites(true) + .setEnableReads(true) + .setHybridOffsetLagThreshold(1000); + TestWriteUtils.updateStore(storeName, parentControllerClient, updateStoreParams); + + // create new version by doing an empty push + parentControllerClient + .sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND); + + for (ControllerClient controllerClient: childControllerClients) { + Assert.assertEquals(controllerClient.getStore(storeName).getStore().getCurrentVersion(), 1); + } + } +} diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index c38f7baa7af..6f5ecbc2396 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -1010,7 +1010,21 @@ public void createStore( storeName, largestUsedStoreVersion); } - configureNewStore(newStore, config, largestUsedStoreVersion); + + int largestUsedRTStoreVersion = storeGraveyard.getLargestUsedRTVersionNumber(storeName); + if (largestUsedRTStoreVersion == Store.NON_EXISTING_VERSION) { + LOGGER.info( + "Store: {} does NOT exist in the store graveyard. Will initialize the RT version to {}.", + storeName, + Store.NON_EXISTING_VERSION); + } else { + LOGGER.info( + "Found store: {} in the store graveyard. Will initialize the RT version to {}.", + storeName, + largestUsedStoreVersion); + } + + configureNewStore(newStore, config, largestUsedStoreVersion, largestUsedRTStoreVersion); storeRepo.addStore(newStore); // Create global config for that store. @@ -1032,7 +1046,11 @@ public void createStore( } } - private void configureNewStore(Store newStore, VeniceControllerClusterConfig config, int largestUsedVersionNumber) { + private void configureNewStore( + Store newStore, + VeniceControllerClusterConfig config, + int largestUsedVersionNumber, + int largestUsedRTVersionNumber) { newStore.setNativeReplicationEnabled(config.isMultiRegion()); /** @@ -1044,6 +1062,7 @@ private void configureNewStore(Store newStore, VeniceControllerClusterConfig con newStore.setNativeReplicationSourceFabric(config.getNativeReplicationSourceFabricAsDefaultForBatchOnly()); } newStore.setLargestUsedVersionNumber(largestUsedVersionNumber); + newStore.setLargestUsedRTVersionNumber(largestUsedRTVersionNumber); } /**