diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java index edae8e5f39129..34d1c019dd0a5 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java @@ -454,6 +454,11 @@ public void setCurrentKeyAndKeyGroup(K newKey, int newKeyGroupIndex) { sharedRocksKeyBuilder.setKeyAndKeyGroup(getCurrentKey(), getCurrentKeyGroupIndex()); } + @VisibleForTesting + LinkedHashMap getKvStateInformation() { + return kvStateInformation; + } + /** Should only be called by one thread, and only after all accesses to the DB happened. */ @Override public void dispose() { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/DistributeStateHandlerHelper.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/DistributeStateHandlerHelper.java new file mode 100644 index 0000000000000..2cb8c1a708f27 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/DistributeStateHandlerHelper.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.rocksdb.restore; + +import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.state.rocksdb.RocksDBIncrementalCheckpointUtils; +import org.apache.flink.state.rocksdb.ttl.RocksDbTtlCompactFiltersManager; +import org.apache.flink.types.Either; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.ExportImportFilesMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Helper class for distributing state handle data during RocksDB incremental restore. This class + * encapsulates the logic for processing a single state handle. + */ +public class DistributeStateHandlerHelper implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(DistributeStateHandlerHelper.class); + + private final IncrementalLocalKeyedStateHandle stateHandle; + private final RestoredDBInstance restoredDbInstance; + private final int keyGroupPrefixBytes; + private final KeyGroupRange keyGroupRange; + private final String operatorIdentifier; + private final int index; + + /** + * Creates a helper for processing a single state handle. The database instance is created in + * the constructor to enable proper resource management and separation of concerns. + * + * @param stateHandle the state handle to process + * @param columnFamilyOptionsFactory factory for creating column family options + * @param dbOptions database options + * @param ttlCompactFiltersManager TTL compact filters manager (can be null) + * @param writeBufferManagerCapacity write buffer manager capacity (can be null) + * @param keyGroupPrefixBytes number of key group prefix bytes for SST file range checking + * @param keyGroupRange target key group range (for logging) + * @param operatorIdentifier operator identifier (for logging) + * @param index current processing index (for logging) + * @throws Exception on any database opening error + */ + public DistributeStateHandlerHelper( + IncrementalLocalKeyedStateHandle stateHandle, + List stateMetaInfoSnapshots, + Function columnFamilyOptionsFactory, + DBOptions dbOptions, + RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, + Long writeBufferManagerCapacity, + int keyGroupPrefixBytes, + KeyGroupRange keyGroupRange, + String operatorIdentifier, + int index) + throws Exception { + this.stateHandle = stateHandle; + this.keyGroupPrefixBytes = keyGroupPrefixBytes; + this.keyGroupRange = keyGroupRange; + this.operatorIdentifier = operatorIdentifier; + this.index = index; + + final String logLineSuffix = createLogLineSuffix(); + + LOG.debug("Opening temporary database : {}", logLineSuffix); + + // Open database using restored instance helper method + this.restoredDbInstance = + RestoredDBInstance.restoreTempDBInstanceFromLocalState( + stateHandle, + stateMetaInfoSnapshots, + columnFamilyOptionsFactory, + dbOptions, + ttlCompactFiltersManager, + writeBufferManagerCapacity); + } + + /** + * Distributes state handle data by checking SST file ranges and exporting column families. + * Returns Left if successfully exported, Right if the handle was skipped. + * + * @param exportCfBasePath base path for export + * @param exportedColumnFamiliesOut output parameter for exported column families + * @return Either.Left containing key group range if successfully exported, Either.Right + * containing the skipped state handle otherwise + * @throws Exception on any export error + */ + public Either tryDistribute( + Path exportCfBasePath, + Map> + exportedColumnFamiliesOut) + throws Exception { + + final String logLineSuffix = createLogLineSuffix(); + + List tmpColumnFamilyHandles = restoredDbInstance.columnFamilyHandles; + + LOG.debug("Checking actual keys of sst files {}", logLineSuffix); + + // Check SST file range + RocksDBIncrementalCheckpointUtils.RangeCheckResult rangeCheckResult = + RocksDBIncrementalCheckpointUtils.checkSstDataAgainstKeyGroupRange( + restoredDbInstance.db, keyGroupPrefixBytes, stateHandle.getKeyGroupRange()); + + LOG.info("{} {}", rangeCheckResult, logLineSuffix); + + if (rangeCheckResult.allInRange()) { + LOG.debug("Start exporting {}", logLineSuffix); + + List registeredStateMetaInfoBases = + restoredDbInstance.stateMetaInfoSnapshots.stream() + .map(RegisteredStateMetaInfoBase::fromMetaInfoSnapshot) + .collect(Collectors.toList()); + + // Export all the Column Families and store the result in exportedColumnFamiliesOut + RocksDBIncrementalCheckpointUtils.exportColumnFamilies( + restoredDbInstance.db, + tmpColumnFamilyHandles, + registeredStateMetaInfoBases, + exportCfBasePath, + exportedColumnFamiliesOut); + + LOG.debug("Done exporting {}", logLineSuffix); + return Either.Left(stateHandle.getKeyGroupRange()); + } else { + LOG.debug("Skipped export {}", logLineSuffix); + return Either.Right(stateHandle); + } + } + + @Override + public void close() throws Exception { + restoredDbInstance.close(); + } + + /** Creates a consistent log line suffix for logging operations. */ + private String createLogLineSuffix() { + return " for state handle at index " + + index + + " with proclaimed key-group range " + + stateHandle.getKeyGroupRange().prettyPrintInterval() + + " for backend with range " + + keyGroupRange.prettyPrintInterval() + + " in operator " + + operatorIdentifier + + "."; + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RestoredDBInstance.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RestoredDBInstance.java new file mode 100644 index 0000000000000..9b5c4b74ad31e --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RestoredDBInstance.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.rocksdb.restore; + +import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; +import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.state.rocksdb.RocksDBOperationUtils; +import org.apache.flink.state.rocksdb.ttl.RocksDbTtlCompactFiltersManager; +import org.apache.flink.util.IOUtils; + +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; + +import javax.annotation.Nonnull; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +/** Restored DB instance containing all necessary handles and metadata. */ +public class RestoredDBInstance implements AutoCloseable { + + @Nonnull public final RocksDB db; + @Nonnull public final ColumnFamilyHandle defaultColumnFamilyHandle; + @Nonnull public final List columnFamilyHandles; + @Nonnull public final List columnFamilyDescriptors; + @Nonnull public final List stateMetaInfoSnapshots; + public final ReadOptions readOptions; + public final IncrementalLocalKeyedStateHandle srcStateHandle; + + public RestoredDBInstance( + @Nonnull RocksDB db, + @Nonnull List columnFamilyHandles, + @Nonnull List columnFamilyDescriptors, + @Nonnull List stateMetaInfoSnapshots, + IncrementalLocalKeyedStateHandle srcStateHandle) { + this.db = db; + this.defaultColumnFamilyHandle = columnFamilyHandles.remove(0); + this.columnFamilyHandles = columnFamilyHandles; + this.columnFamilyDescriptors = columnFamilyDescriptors; + this.stateMetaInfoSnapshots = stateMetaInfoSnapshots; + this.readOptions = new ReadOptions(); + this.srcStateHandle = srcStateHandle; + } + + @Override + public void close() { + List columnFamilyOptions = + new ArrayList<>(columnFamilyDescriptors.size() + 1); + columnFamilyDescriptors.forEach((cfd) -> columnFamilyOptions.add(cfd.getOptions())); + RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater( + columnFamilyOptions, defaultColumnFamilyHandle); + IOUtils.closeQuietly(defaultColumnFamilyHandle); + IOUtils.closeAllQuietly(columnFamilyHandles); + IOUtils.closeQuietly(db); + IOUtils.closeAllQuietly(columnFamilyOptions); + IOUtils.closeQuietly(readOptions); + } + + /** + * Restores a RocksDB instance from local state for the given state handle. + * + * @param stateHandle the state handle to restore from + * @param columnFamilyOptionsFactory factory for creating column family options + * @param dbOptions database options + * @param ttlCompactFiltersManager TTL compact filters manager (can be null) + * @param writeBufferManagerCapacity write buffer manager capacity (can be null) + * @return restored DB instance with all necessary handles and metadata + * @throws Exception on any restore error + */ + public static RestoredDBInstance restoreTempDBInstanceFromLocalState( + IncrementalLocalKeyedStateHandle stateHandle, + List stateMetaInfoSnapshots, + Function columnFamilyOptionsFactory, + DBOptions dbOptions, + RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, + Long writeBufferManagerCapacity) + throws Exception { + + Function tempDBCfFactory = + stateName -> + columnFamilyOptionsFactory.apply(stateName).setDisableAutoCompactions(true); + + List columnFamilyDescriptors = + createColumnFamilyDescriptors( + stateMetaInfoSnapshots, + tempDBCfFactory, + ttlCompactFiltersManager, + writeBufferManagerCapacity, + false); + + Path restoreSourcePath = stateHandle.getDirectoryStateHandle().getDirectory(); + + List columnFamilyHandles = + new ArrayList<>(stateMetaInfoSnapshots.size() + 1); + + RocksDB db = + RocksDBOperationUtils.openDB( + restoreSourcePath.toString(), + columnFamilyDescriptors, + columnFamilyHandles, + RocksDBOperationUtils.createColumnFamilyOptions(tempDBCfFactory, "default"), + dbOptions); + + return new RestoredDBInstance( + db, + columnFamilyHandles, + columnFamilyDescriptors, + stateMetaInfoSnapshots, + stateHandle); + } + + /** + * This method recreates and registers all {@link ColumnFamilyDescriptor} from Flink's state + * metadata snapshot. + */ + public static List createColumnFamilyDescriptors( + List stateMetaInfoSnapshots, + Function columnFamilyOptionsFactory, + RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, + Long writeBufferManagerCapacity, + boolean registerTtlCompactFilter) { + + List columnFamilyDescriptors = + new ArrayList<>(stateMetaInfoSnapshots.size()); + + for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) { + RegisteredStateMetaInfoBase metaInfoBase = + RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot); + + ColumnFamilyDescriptor columnFamilyDescriptor = + RocksDBOperationUtils.createColumnFamilyDescriptor( + metaInfoBase, + columnFamilyOptionsFactory, + registerTtlCompactFilter ? ttlCompactFiltersManager : null, + writeBufferManagerCapacity); + + columnFamilyDescriptors.add(columnFamilyDescriptor); + } + return columnFamilyDescriptors; + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RocksDBIncrementalRestoreOperation.java index 79bc7bba2019a..d5a04132238cb 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RocksDBIncrementalRestoreOperation.java @@ -48,6 +48,7 @@ import org.apache.flink.state.rocksdb.RocksIteratorWrapper; import org.apache.flink.state.rocksdb.StateHandleDownloadSpec; import org.apache.flink.state.rocksdb.ttl.RocksDbTtlCompactFiltersManager; +import org.apache.flink.types.Either; import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; @@ -61,8 +62,6 @@ import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; import org.rocksdb.ExportImportFilesMetaData; -import org.rocksdb.ReadOptions; -import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,7 +87,6 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.function.Function; -import java.util.stream.Collectors; import static org.apache.flink.runtime.metrics.MetricNames.DOWNLOAD_STATE_DURATION; import static org.apache.flink.runtime.metrics.MetricNames.RESTORE_ASYNC_COMPACTION_DURATION; @@ -492,7 +490,7 @@ private void mergeStateHandlesWithClipAndIngest( notImportableHandles); if (exportedColumnFamilyMetaData.isEmpty()) { - // Nothing coule be exported, so we fall back to + // Nothing could be exported, so we fall back to // #mergeStateHandlesWithCopyFromTemporaryInstance mergeStateHandlesWithCopyFromTemporaryInstance( notImportableHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes); @@ -542,74 +540,39 @@ private KeyGroupRange exportColumnFamiliesWithSstDataInKeyGroupsRange( int minExportKeyGroup = Integer.MAX_VALUE; int maxExportKeyGroup = Integer.MIN_VALUE; int index = 0; - for (IncrementalLocalKeyedStateHandle stateHandle : localKeyedStateHandles) { - - final String logLineSuffix = - " for state handle at index " - + index - + " with proclaimed key-group range " - + stateHandle.getKeyGroupRange().prettyPrintInterval() - + " for backend with range " - + keyGroupRange.prettyPrintInterval() - + " in operator " - + operatorIdentifier - + "."; - - logger.debug("Opening temporary database" + logLineSuffix); - try (RestoredDBInstance tmpRestoreDBInfo = - restoreTempDBInstanceFromLocalState(stateHandle)) { - - List tmpColumnFamilyHandles = - tmpRestoreDBInfo.columnFamilyHandles; - - logger.debug("Checking actual keys of sst files" + logLineSuffix); - - // Check if the data in all SST files referenced in the handle is within the - // proclaimed key-groups range of the handle. - RocksDBIncrementalCheckpointUtils.RangeCheckResult rangeCheckResult = - RocksDBIncrementalCheckpointUtils.checkSstDataAgainstKeyGroupRange( - tmpRestoreDBInfo.db, - keyGroupPrefixBytes, - stateHandle.getKeyGroupRange()); - - logger.info("{}" + logLineSuffix, rangeCheckResult); - - if (rangeCheckResult.allInRange()) { - - logger.debug("Start exporting" + logLineSuffix); - - List registeredStateMetaInfoBases = - tmpRestoreDBInfo.stateMetaInfoSnapshots.stream() - .map(RegisteredStateMetaInfoBase::fromMetaInfoSnapshot) - .collect(Collectors.toList()); - - // Export all the Column Families and store the result in - // exportedColumnFamiliesOut - RocksDBIncrementalCheckpointUtils.exportColumnFamilies( - tmpRestoreDBInfo.db, - tmpColumnFamilyHandles, - registeredStateMetaInfoBases, - exportCfBasePath, - exportedColumnFamiliesOut); + for (IncrementalLocalKeyedStateHandle stateHandle : localKeyedStateHandles) { + KeyedBackendSerializationProxy serializationProxy = + readMetaData(stateHandle.getMetaDataStateHandle()); + List stateMetaInfoSnapshots = + serializationProxy.getStateMetaInfoSnapshots(); + // Use Helper to encapsulate single stateHandle processing + try (DistributeStateHandlerHelper helper = + new DistributeStateHandlerHelper( + stateHandle, + stateMetaInfoSnapshots, + rocksHandle.getColumnFamilyOptionsFactory(), + rocksHandle.getDbOptions(), + rocksHandle.getTtlCompactFiltersManager(), + rocksHandle.getWriteBufferManagerCapacity(), + keyGroupPrefixBytes, + keyGroupRange, + operatorIdentifier, + index)) { + + Either result = + helper.tryDistribute(exportCfBasePath, exportedColumnFamiliesOut); + + // Handle the result and collect skipped handles + if (result.isLeft()) { + KeyGroupRange exportedRange = result.left(); minExportKeyGroup = - Math.min( - minExportKeyGroup, - stateHandle.getKeyGroupRange().getStartKeyGroup()); - maxExportKeyGroup = - Math.max( - maxExportKeyGroup, - stateHandle.getKeyGroupRange().getEndKeyGroup()); - - logger.debug("Done exporting" + logLineSuffix); + Math.min(minExportKeyGroup, exportedRange.getStartKeyGroup()); + maxExportKeyGroup = Math.max(maxExportKeyGroup, exportedRange.getEndKeyGroup()); } else { - // Actual key range in files exceeds proclaimed range, cannot import. We - // will copy this handle using a temporary DB later. - skipped.add(stateHandle); - logger.debug("Skipped export" + logLineSuffix); + skipped.add(result.right()); } } - ++index; } @@ -737,7 +700,7 @@ private void restorePreviousIncrementalFilesStatus( /** * Restores the base DB from local state of a single state handle. * - * @param localKeyedStateHandle the state handle tor estore from. + * @param localKeyedStateHandle the state handle to restore from. * @throws Exception on any restore error. */ private void restoreBaseDBFromLocalState(IncrementalLocalKeyedStateHandle localKeyedStateHandle) @@ -750,7 +713,12 @@ private void restoreBaseDBFromLocalState(IncrementalLocalKeyedStateHandle localK Path restoreSourcePath = localKeyedStateHandle.getDirectoryStateHandle().getDirectory(); this.rocksHandle.openDB( - createColumnFamilyDescriptors(stateMetaInfoSnapshots, true), + RestoredDBInstance.createColumnFamilyDescriptors( + stateMetaInfoSnapshots, + rocksHandle.getColumnFamilyOptionsFactory(), + rocksHandle.getTtlCompactFiltersManager(), + rocksHandle.getWriteBufferManagerCapacity(), + true), stateMetaInfoSnapshots, restoreSourcePath, cancelStreamRegistryForRestore); @@ -812,10 +780,20 @@ private void copyToBaseDBUsingTempDBs( cancelStreamRegistryForRestore.registerCloseableTemporarily( writeBatchWrapper.getCancelCloseable())) { for (IncrementalLocalKeyedStateHandle handleToCopy : toImport) { - try (RestoredDBInstance restoredDBInstance = - restoreTempDBInstanceFromLocalState(handleToCopy)) { + KeyedBackendSerializationProxy serializationProxy = + readMetaData(handleToCopy.getMetaDataStateHandle()); + List stateMetaInfoSnapshots = + serializationProxy.getStateMetaInfoSnapshots(); + try (RestoredDBInstance restoredDbInstance = + RestoredDBInstance.restoreTempDBInstanceFromLocalState( + handleToCopy, + stateMetaInfoSnapshots, + rocksHandle.getColumnFamilyOptionsFactory(), + rocksHandle.getDbOptions(), + rocksHandle.getTtlCompactFiltersManager(), + rocksHandle.getWriteBufferManagerCapacity())) { copyTempDbIntoBaseDb( - restoredDBInstance, + restoredDbInstance, writeBatchWrapper, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes); @@ -824,7 +802,7 @@ private void copyToBaseDBUsingTempDBs( } logger.info( - "Competed copying state handles for backend with range {} in operator {} using temporary instances.", + "Completed copying state handles for backend with range {} in operator {} using temporary instances.", keyGroupRange.prettyPrintInterval(), operatorIdentifier); } @@ -856,8 +834,7 @@ private void copyTempDbIntoBaseDb( List tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles; // iterating only the requested descriptors automatically skips the default - // column - // family handle + // column family handle for (int descIdx = 0; descIdx < tmpColumnFamilyDescriptors.size(); ++descIdx) { ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(descIdx); @@ -905,114 +882,8 @@ private void cleanUpPathQuietly(@Nonnull Path path) { try { FileUtils.deleteDirectory(path.toFile()); } catch (IOException ex) { - logger.warn("Failed to clean up path " + path, ex); - } - } - - /** Entity to hold the temporary RocksDB instance created for restore. */ - private static class RestoredDBInstance implements AutoCloseable { - - @Nonnull private final RocksDB db; - - @Nonnull private final ColumnFamilyHandle defaultColumnFamilyHandle; - - @Nonnull private final List columnFamilyHandles; - - @Nonnull private final List columnFamilyDescriptors; - - @Nonnull private final List stateMetaInfoSnapshots; - - private final ReadOptions readOptions; - - private final IncrementalLocalKeyedStateHandle srcStateHandle; - - private RestoredDBInstance( - @Nonnull RocksDB db, - @Nonnull List columnFamilyHandles, - @Nonnull List columnFamilyDescriptors, - @Nonnull List stateMetaInfoSnapshots, - @Nonnull IncrementalLocalKeyedStateHandle srcStateHandle) { - this.db = db; - this.defaultColumnFamilyHandle = columnFamilyHandles.remove(0); - this.columnFamilyHandles = columnFamilyHandles; - this.columnFamilyDescriptors = columnFamilyDescriptors; - this.stateMetaInfoSnapshots = stateMetaInfoSnapshots; - this.readOptions = new ReadOptions(); - this.srcStateHandle = srcStateHandle; - } - - @Override - public void close() { - List columnFamilyOptions = - new ArrayList<>(columnFamilyDescriptors.size() + 1); - columnFamilyDescriptors.forEach((cfd) -> columnFamilyOptions.add(cfd.getOptions())); - RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater( - columnFamilyOptions, defaultColumnFamilyHandle); - IOUtils.closeQuietly(defaultColumnFamilyHandle); - IOUtils.closeAllQuietly(columnFamilyHandles); - IOUtils.closeQuietly(db); - IOUtils.closeAllQuietly(columnFamilyOptions); - IOUtils.closeQuietly(readOptions); - } - } - - private RestoredDBInstance restoreTempDBInstanceFromLocalState( - IncrementalLocalKeyedStateHandle stateHandle) throws Exception { - KeyedBackendSerializationProxy serializationProxy = - readMetaData(stateHandle.getMetaDataStateHandle()); - // read meta data - List stateMetaInfoSnapshots = - serializationProxy.getStateMetaInfoSnapshots(); - - List columnFamilyDescriptors = - createColumnFamilyDescriptors(stateMetaInfoSnapshots, false); - - List columnFamilyHandles = - new ArrayList<>(stateMetaInfoSnapshots.size() + 1); - - RocksDB restoreDb = - RocksDBOperationUtils.openDB( - stateHandle.getDirectoryStateHandle().getDirectory().toString(), - columnFamilyDescriptors, - columnFamilyHandles, - RocksDBOperationUtils.createColumnFamilyOptions( - this.rocksHandle.getColumnFamilyOptionsFactory(), "default"), - this.rocksHandle.getDbOptions()); - - return new RestoredDBInstance( - restoreDb, - columnFamilyHandles, - columnFamilyDescriptors, - stateMetaInfoSnapshots, - stateHandle); - } - - /** - * This method recreates and registers all {@link ColumnFamilyDescriptor} from Flink's state - * metadata snapshot. - */ - private List createColumnFamilyDescriptors( - List stateMetaInfoSnapshots, boolean registerTtlCompactFilter) { - - List columnFamilyDescriptors = - new ArrayList<>(stateMetaInfoSnapshots.size()); - - for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) { - RegisteredStateMetaInfoBase metaInfoBase = - RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot); - - ColumnFamilyDescriptor columnFamilyDescriptor = - RocksDBOperationUtils.createColumnFamilyDescriptor( - metaInfoBase, - this.rocksHandle.getColumnFamilyOptionsFactory(), - registerTtlCompactFilter - ? this.rocksHandle.getTtlCompactFiltersManager() - : null, - this.rocksHandle.getWriteBufferManagerCapacity()); - - columnFamilyDescriptors.add(columnFamilyDescriptor); + logger.warn("Failed to clean up path {}", path, ex); } - return columnFamilyDescriptors; } private void runAndReportDuration(RunnableWithException runnable, String metricName) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBAutoCompactionIngestRestoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBAutoCompactionIngestRestoreTest.java new file mode 100644 index 0000000000000..165d1d7df333c --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBAutoCompactionIngestRestoreTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.rocksdb; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; +import org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.RocksDbKvStateInfo; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.util.Collector; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; + +import java.util.LinkedHashMap; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test to verify that auto-compaction is correctly configured during RocksDB incremental restore + * with ingest DB mode. This test ensures that production DBs maintain auto-compaction enabled while + * temporary DBs used during restore have auto-compaction disabled for performance. + */ +public class RocksDBAutoCompactionIngestRestoreTest { + + @TempDir private java.nio.file.Path tempFolder; + + private static final int MAX_PARALLELISM = 10; + + @Test + public void testAutoCompactionEnabledWithIngestDBRestore() throws Exception { + // Create two subtask snapshots and merge them to trigger the multi-state-handle scenario + // required for reproducing the ingest DB restore path + OperatorSubtaskState operatorSubtaskState = + AbstractStreamOperatorTestHarness.repackageState( + createSubtaskSnapshot(0), createSubtaskSnapshot(1)); + + OperatorSubtaskState initState = + AbstractStreamOperatorTestHarness.repartitionOperatorState( + operatorSubtaskState, MAX_PARALLELISM, 2, 1, 0); + + // Restore with ingest DB mode and verify auto-compaction + try (KeyedOneInputStreamOperatorTestHarness, String> + harness = createTestHarness(new TestKeyedFunction(), MAX_PARALLELISM, 1, 0)) { + + EmbeddedRocksDBStateBackend stateBackend = createStateBackend(true); + harness.setStateBackend(stateBackend); + harness.setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + tempFolder.resolve("checkpoint-restore").toAbsolutePath())); + + harness.initializeState(initState); + harness.open(); + + verifyAutoCompactionEnabled(harness); + } + } + + private OperatorSubtaskState createSubtaskSnapshot(int subtaskIndex) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness, String> + harness = + createTestHarness( + new TestKeyedFunction(), MAX_PARALLELISM, 2, subtaskIndex)) { + + harness.setStateBackend(createStateBackend(false)); + harness.setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + + tempFolder + .resolve("checkpoint-subtask" + subtaskIndex) + .toAbsolutePath())); + harness.open(); + + // Create an empty snapshot - data content doesn't matter for this test + return harness.snapshot(0, 0); + } + } + + private void verifyAutoCompactionEnabled( + KeyedOneInputStreamOperatorTestHarness, String> harness) + throws Exception { + KeyedStateBackend backend = harness.getOperator().getKeyedStateBackend(); + assertThat(backend).isNotNull(); + + LinkedHashMap kvStateInformation = + ((RocksDBKeyedStateBackend) backend).getKvStateInformation(); + + assertThat(kvStateInformation).as("kvStateInformation should not be empty").isNotEmpty(); + + for (RocksDbKvStateInfo stateInfo : kvStateInformation.values()) { + ColumnFamilyHandle handle = stateInfo.columnFamilyHandle; + assertThat(handle).isNotNull(); + + ColumnFamilyDescriptor descriptor = handle.getDescriptor(); + ColumnFamilyOptions options = descriptor.getOptions(); + + assertThat(options.disableAutoCompactions()) + .as( + "Production DB should have auto-compaction enabled for column family: " + + stateInfo.metaInfo.getName()) + .isFalse(); + } + } + + private KeyedOneInputStreamOperatorTestHarness, String> + createTestHarness( + TestKeyedFunction keyedFunction, + int maxParallelism, + int parallelism, + int subtaskIndex) + throws Exception { + + return new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedProcessOperator<>(keyedFunction), + tuple2 -> tuple2.f0, + BasicTypeInfo.STRING_TYPE_INFO, + maxParallelism, + parallelism, + subtaskIndex); + } + + private EmbeddedRocksDBStateBackend createStateBackend(boolean useIngestDbRestoreMode) { + Configuration config = new Configuration(); + config.set(RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, useIngestDbRestoreMode); + + EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend(true); + return stateBackend.configure(config, getClass().getClassLoader()); + } + + private static class TestKeyedFunction + extends KeyedProcessFunction, String> { + private ValueState state; + + @Override + public void open(OpenContext openContext) throws Exception { + super.open(openContext); + state = + getRuntimeContext() + .getState(new ValueStateDescriptor<>("test-state", String.class)); + } + + @Override + public void processElement(Tuple2 value, Context ctx, Collector out) + throws Exception { + state.update(value.f1); + out.collect(value.f1); + } + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/restore/DistributeStateHandlerHelperTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/restore/DistributeStateHandlerHelperTest.java new file mode 100644 index 0000000000000..93daf0455e458 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/restore/DistributeStateHandlerHelperTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.rocksdb.restore; + +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.state.CompositeKeySerializationUtils; +import org.apache.flink.runtime.state.DirectoryStateHandle; +import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; +import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.types.Either; +import org.apache.flink.util.TestLogger; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.rocksdb.Checkpoint; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.ExportImportFilesMetaData; +import org.rocksdb.FlushOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test class for {@link DistributeStateHandlerHelper}. */ +public class DistributeStateHandlerHelperTest extends TestLogger { + + private static final int NUM_KEY_GROUPS = 128; + private static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, NUM_KEY_GROUPS - 1); + private static final int KEY_GROUP_PREFIX_BYTES = + CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(NUM_KEY_GROUPS); + private static final String CF_NAME = "test-column-family"; + + @TempDir private Path tempDir; + + /** Test whether sst files are exported when the key group all in range. */ + @Test + public void testAutoCompactionIsDisabled() throws Exception { + Path rocksDir = tempDir.resolve("rocksdb_dir"); + Path dbPath = rocksDir.resolve("db"); + Path chkDir = rocksDir.resolve("chk"); + Path exportDir = rocksDir.resolve("export"); + + Files.createDirectories(dbPath); + Files.createDirectories(exportDir); + + ArrayList columnFamilyHandles = new ArrayList<>(2); + + try (RocksDB db = openDB(dbPath.toString(), columnFamilyHandles)) { + ColumnFamilyHandle testCfHandler = columnFamilyHandles.get(1); + + // Create SST files and verify their creation + for (int i = 0; i < 4; i++) { + db.flush(new FlushOptions().setWaitForFlush(true), testCfHandler); + for (int j = 10; j < NUM_KEY_GROUPS / 2; j++) { + byte[] bytes = new byte[KEY_GROUP_PREFIX_BYTES]; + CompositeKeySerializationUtils.serializeKeyGroup(j, bytes); + db.delete(testCfHandler, bytes); + } + assertThat( + dbPath.toFile() + .listFiles( + (file, name) -> + name.toLowerCase().endsWith(".sst"))) + .hasSize(i); + } + + // Create checkpoint + try (Checkpoint checkpoint = Checkpoint.create(db)) { + checkpoint.createCheckpoint(chkDir.toString()); + } + } + + // Verify there are 4 sst files in level 0, compaction will be triggered once the DB is + // opened. + assertThat(chkDir.toFile().listFiles((file, name) -> name.toLowerCase().endsWith(".sst"))) + .hasSize(4); + + // Create IncrementalLocalKeyedStateHandle for testing + IncrementalLocalKeyedStateHandle stateHandle = createTestStateHandle(chkDir.toString()); + + try (DistributeStateHandlerHelper helper = + createDistributeStateHandlerHelper( + stateHandle, (name) -> new ColumnFamilyOptions())) { + + // This simulates the delay that allows background compaction to clean up SST files if + // auto compaction is enabled. + Thread.sleep(500); + Map> + exportedColumnFamiliesOut = new HashMap<>(); + List skipped = new ArrayList<>(); + + Either result = + helper.tryDistribute(exportDir, exportedColumnFamiliesOut); + assertThat(result.isLeft()).isTrue(); + assertThat(exportedColumnFamiliesOut).isNotEmpty(); + assertThat(skipped).isEmpty(); + } + } + + private RocksDB openDB(String path, ArrayList columnFamilyHandles) + throws RocksDBException { + + List columnFamilyDescriptors = new ArrayList<>(2); + columnFamilyDescriptors.add( + new ColumnFamilyDescriptor( + RocksDB.DEFAULT_COLUMN_FAMILY, + new ColumnFamilyOptions().setDisableAutoCompactions(true))); + columnFamilyDescriptors.add( + new ColumnFamilyDescriptor( + CF_NAME.getBytes(ConfigConstants.DEFAULT_CHARSET), + new ColumnFamilyOptions().setDisableAutoCompactions(true))); + + return RocksDB.open( + new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true), + path, + columnFamilyDescriptors, + columnFamilyHandles); + } + + /** + * Creates a minimal IncrementalLocalKeyedStateHandle for testing. Uses empty metadata to focus + * on SST file distribution behavior. + */ + private IncrementalLocalKeyedStateHandle createTestStateHandle(String checkpointDir) { + return new IncrementalLocalKeyedStateHandle( + UUID.randomUUID(), + 1L, + new DirectoryStateHandle(Paths.get(checkpointDir), 0L), + KEY_GROUP_RANGE, + new ByteStreamStateHandle("meta", new byte[0]), + Collections.emptyList()); + } + + /** Creates a DistributeStateHandlerHelper with test-specific configuration. */ + private DistributeStateHandlerHelper createDistributeStateHandlerHelper( + IncrementalLocalKeyedStateHandle stateHandle, + Function columnFamilyOptionsFactory) + throws Exception { + TypeSerializer namespaceSerializer = LongSerializer.INSTANCE; + TypeSerializer stateSerializer = DoubleSerializer.INSTANCE; + + List stateMetaInfoList = new ArrayList<>(); + stateMetaInfoList.add( + new RegisteredKeyValueStateBackendMetaInfo<>( + StateDescriptor.Type.VALUE, + CF_NAME, + namespaceSerializer, + stateSerializer) + .snapshot()); + return new DistributeStateHandlerHelper( + stateHandle, + stateMetaInfoList, + columnFamilyOptionsFactory, + new DBOptions().setCreateIfMissing(true), + null, + null, + KEY_GROUP_PREFIX_BYTES, + KEY_GROUP_RANGE, + "test-operator", + 0); + } +}