Skip to content

Commit af45eb4

Browse files
committed
[FLINK-38415][refactor] Extract single state handle processing logic for testability
1 parent 4bb9992 commit af45eb4

File tree

3 files changed

+390
-182
lines changed

3 files changed

+390
-182
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.state.rocksdb.restore;
20+
21+
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
22+
import org.apache.flink.runtime.state.KeyGroupRange;
23+
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
24+
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
25+
import org.apache.flink.state.rocksdb.RocksDBIncrementalCheckpointUtils;
26+
import org.apache.flink.state.rocksdb.ttl.RocksDbTtlCompactFiltersManager;
27+
import org.apache.flink.types.Either;
28+
29+
import org.rocksdb.ColumnFamilyHandle;
30+
import org.rocksdb.ColumnFamilyOptions;
31+
import org.rocksdb.DBOptions;
32+
import org.rocksdb.ExportImportFilesMetaData;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
import java.nio.file.Path;
37+
import java.util.List;
38+
import java.util.Map;
39+
import java.util.function.Function;
40+
import java.util.stream.Collectors;
41+
42+
/**
43+
* Helper class for distributing state handle data during RocksDB incremental restore. This class
44+
* encapsulates the logic for processing a single state handle.
45+
*/
46+
public class DistributeStateHandlerHelper implements AutoCloseable {
47+
48+
private static final Logger LOG = LoggerFactory.getLogger(DistributeStateHandlerHelper.class);
49+
50+
private final IncrementalLocalKeyedStateHandle stateHandle;
51+
private final RestoredDBInstance restoredDbInstance;
52+
private final int keyGroupPrefixBytes;
53+
private final KeyGroupRange keyGroupRange;
54+
private final String operatorIdentifier;
55+
private final int index;
56+
57+
/**
58+
* Creates a helper for processing a single state handle. The database instance is created in
59+
* the constructor to enable proper resource management and separation of concerns.
60+
*
61+
* @param stateHandle the state handle to process
62+
* @param columnFamilyOptionsFactory factory for creating column family options
63+
* @param dbOptions database options
64+
* @param ttlCompactFiltersManager TTL compact filters manager (can be null)
65+
* @param writeBufferManagerCapacity write buffer manager capacity (can be null)
66+
* @param keyGroupPrefixBytes number of key group prefix bytes for SST file range checking
67+
* @param keyGroupRange target key group range (for logging)
68+
* @param operatorIdentifier operator identifier (for logging)
69+
* @param index current processing index (for logging)
70+
* @throws Exception on any database opening error
71+
*/
72+
public DistributeStateHandlerHelper(
73+
IncrementalLocalKeyedStateHandle stateHandle,
74+
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
75+
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
76+
DBOptions dbOptions,
77+
RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
78+
Long writeBufferManagerCapacity,
79+
int keyGroupPrefixBytes,
80+
KeyGroupRange keyGroupRange,
81+
String operatorIdentifier,
82+
int index)
83+
throws Exception {
84+
this.stateHandle = stateHandle;
85+
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
86+
this.keyGroupRange = keyGroupRange;
87+
this.operatorIdentifier = operatorIdentifier;
88+
this.index = index;
89+
90+
final String logLineSuffix = createLogLineSuffix();
91+
92+
LOG.debug("Opening temporary database : {}", logLineSuffix);
93+
94+
// Open database using restored instance helper method
95+
this.restoredDbInstance =
96+
RestoredDBInstance.restoreTempDBInstanceFromLocalState(
97+
stateHandle,
98+
stateMetaInfoSnapshots,
99+
columnFamilyOptionsFactory,
100+
dbOptions,
101+
ttlCompactFiltersManager,
102+
writeBufferManagerCapacity);
103+
}
104+
105+
/**
106+
* Distributes state handle data by checking SST file ranges and exporting column families.
107+
* Returns Left if successfully exported, Right if the handle was skipped.
108+
*
109+
* @param exportCfBasePath base path for export
110+
* @param exportedColumnFamiliesOut output parameter for exported column families
111+
* @return Either.Left containing key group range if successfully exported, Either.Right
112+
* containing the skipped state handle otherwise
113+
* @throws Exception on any export error
114+
*/
115+
public Either<KeyGroupRange, IncrementalLocalKeyedStateHandle> tryDistribute(
116+
Path exportCfBasePath,
117+
Map<RegisteredStateMetaInfoBase.Key, List<ExportImportFilesMetaData>>
118+
exportedColumnFamiliesOut)
119+
throws Exception {
120+
121+
final String logLineSuffix = createLogLineSuffix();
122+
123+
List<ColumnFamilyHandle> tmpColumnFamilyHandles = restoredDbInstance.columnFamilyHandles;
124+
125+
LOG.debug("Checking actual keys of sst files {}", logLineSuffix);
126+
127+
// Check SST file range
128+
RocksDBIncrementalCheckpointUtils.RangeCheckResult rangeCheckResult =
129+
RocksDBIncrementalCheckpointUtils.checkSstDataAgainstKeyGroupRange(
130+
restoredDbInstance.db, keyGroupPrefixBytes, stateHandle.getKeyGroupRange());
131+
132+
LOG.info("{} {}", rangeCheckResult, logLineSuffix);
133+
134+
if (rangeCheckResult.allInRange()) {
135+
LOG.debug("Start exporting {}", logLineSuffix);
136+
137+
List<RegisteredStateMetaInfoBase> registeredStateMetaInfoBases =
138+
restoredDbInstance.stateMetaInfoSnapshots.stream()
139+
.map(RegisteredStateMetaInfoBase::fromMetaInfoSnapshot)
140+
.collect(Collectors.toList());
141+
142+
// Export all the Column Families and store the result in exportedColumnFamiliesOut
143+
RocksDBIncrementalCheckpointUtils.exportColumnFamilies(
144+
restoredDbInstance.db,
145+
tmpColumnFamilyHandles,
146+
registeredStateMetaInfoBases,
147+
exportCfBasePath,
148+
exportedColumnFamiliesOut);
149+
150+
LOG.debug("Done exporting {}", logLineSuffix);
151+
return Either.Left(stateHandle.getKeyGroupRange());
152+
} else {
153+
LOG.debug("Skipped export {}", logLineSuffix);
154+
return Either.Right(stateHandle);
155+
}
156+
}
157+
158+
@Override
159+
public void close() throws Exception {
160+
restoredDbInstance.close();
161+
}
162+
163+
/** Creates a consistent log line suffix for logging operations. */
164+
private String createLogLineSuffix() {
165+
return " for state handle at index "
166+
+ index
167+
+ " with proclaimed key-group range "
168+
+ stateHandle.getKeyGroupRange().prettyPrintInterval()
169+
+ " for backend with range "
170+
+ keyGroupRange.prettyPrintInterval()
171+
+ " in operator "
172+
+ operatorIdentifier
173+
+ ".";
174+
}
175+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package org.apache.flink.state.rocksdb.restore;
21+
22+
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
23+
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
24+
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
25+
import org.apache.flink.state.rocksdb.RocksDBOperationUtils;
26+
import org.apache.flink.state.rocksdb.ttl.RocksDbTtlCompactFiltersManager;
27+
import org.apache.flink.util.IOUtils;
28+
29+
import org.rocksdb.ColumnFamilyDescriptor;
30+
import org.rocksdb.ColumnFamilyHandle;
31+
import org.rocksdb.ColumnFamilyOptions;
32+
import org.rocksdb.DBOptions;
33+
import org.rocksdb.ReadOptions;
34+
import org.rocksdb.RocksDB;
35+
36+
import javax.annotation.Nonnull;
37+
38+
import java.nio.file.Path;
39+
import java.util.ArrayList;
40+
import java.util.List;
41+
import java.util.function.Function;
42+
43+
/** Restored DB instance containing all necessary handles and metadata. */
44+
public class RestoredDBInstance implements AutoCloseable {
45+
46+
@Nonnull public final RocksDB db;
47+
@Nonnull public final ColumnFamilyHandle defaultColumnFamilyHandle;
48+
@Nonnull public final List<ColumnFamilyHandle> columnFamilyHandles;
49+
@Nonnull public final List<ColumnFamilyDescriptor> columnFamilyDescriptors;
50+
@Nonnull public final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
51+
public final ReadOptions readOptions;
52+
public final IncrementalLocalKeyedStateHandle srcStateHandle;
53+
54+
public RestoredDBInstance(
55+
@Nonnull RocksDB db,
56+
@Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
57+
@Nonnull List<ColumnFamilyDescriptor> columnFamilyDescriptors,
58+
@Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
59+
IncrementalLocalKeyedStateHandle srcStateHandle) {
60+
this.db = db;
61+
this.defaultColumnFamilyHandle = columnFamilyHandles.remove(0);
62+
this.columnFamilyHandles = columnFamilyHandles;
63+
this.columnFamilyDescriptors = columnFamilyDescriptors;
64+
this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
65+
this.readOptions = new ReadOptions();
66+
this.srcStateHandle = srcStateHandle;
67+
}
68+
69+
@Override
70+
public void close() {
71+
List<ColumnFamilyOptions> columnFamilyOptions =
72+
new ArrayList<>(columnFamilyDescriptors.size() + 1);
73+
columnFamilyDescriptors.forEach((cfd) -> columnFamilyOptions.add(cfd.getOptions()));
74+
RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(
75+
columnFamilyOptions, defaultColumnFamilyHandle);
76+
IOUtils.closeQuietly(defaultColumnFamilyHandle);
77+
IOUtils.closeAllQuietly(columnFamilyHandles);
78+
IOUtils.closeQuietly(db);
79+
IOUtils.closeAllQuietly(columnFamilyOptions);
80+
IOUtils.closeQuietly(readOptions);
81+
}
82+
83+
/**
84+
* Restores a RocksDB instance from local state for the given state handle.
85+
*
86+
* @param stateHandle the state handle to restore from
87+
* @param columnFamilyOptionsFactory factory for creating column family options
88+
* @param dbOptions database options
89+
* @param ttlCompactFiltersManager TTL compact filters manager (can be null)
90+
* @param writeBufferManagerCapacity write buffer manager capacity (can be null)
91+
* @return restored DB instance with all necessary handles and metadata
92+
* @throws Exception on any restore error
93+
*/
94+
public static RestoredDBInstance restoreTempDBInstanceFromLocalState(
95+
IncrementalLocalKeyedStateHandle stateHandle,
96+
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
97+
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
98+
DBOptions dbOptions,
99+
RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
100+
Long writeBufferManagerCapacity)
101+
throws Exception {
102+
103+
List<ColumnFamilyDescriptor> columnFamilyDescriptors =
104+
createColumnFamilyDescriptors(
105+
stateMetaInfoSnapshots,
106+
columnFamilyOptionsFactory,
107+
ttlCompactFiltersManager,
108+
writeBufferManagerCapacity,
109+
false);
110+
111+
Path restoreSourcePath = stateHandle.getDirectoryStateHandle().getDirectory();
112+
113+
List<ColumnFamilyHandle> columnFamilyHandles =
114+
new ArrayList<>(stateMetaInfoSnapshots.size() + 1);
115+
116+
RocksDB db =
117+
RocksDBOperationUtils.openDB(
118+
restoreSourcePath.toString(),
119+
columnFamilyDescriptors,
120+
columnFamilyHandles,
121+
RocksDBOperationUtils.createColumnFamilyOptions(
122+
columnFamilyOptionsFactory, "default"),
123+
dbOptions);
124+
125+
return new RestoredDBInstance(
126+
db,
127+
columnFamilyHandles,
128+
columnFamilyDescriptors,
129+
stateMetaInfoSnapshots,
130+
stateHandle);
131+
}
132+
133+
/**
134+
* This method recreates and registers all {@link ColumnFamilyDescriptor} from Flink's state
135+
* metadata snapshot.
136+
*/
137+
public static List<ColumnFamilyDescriptor> createColumnFamilyDescriptors(
138+
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
139+
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
140+
RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
141+
Long writeBufferManagerCapacity,
142+
boolean registerTtlCompactFilter) {
143+
144+
List<ColumnFamilyDescriptor> columnFamilyDescriptors =
145+
new ArrayList<>(stateMetaInfoSnapshots.size());
146+
147+
for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
148+
RegisteredStateMetaInfoBase metaInfoBase =
149+
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
150+
151+
ColumnFamilyDescriptor columnFamilyDescriptor =
152+
RocksDBOperationUtils.createColumnFamilyDescriptor(
153+
metaInfoBase,
154+
columnFamilyOptionsFactory,
155+
registerTtlCompactFilter ? ttlCompactFiltersManager : null,
156+
writeBufferManagerCapacity);
157+
158+
columnFamilyDescriptors.add(columnFamilyDescriptor);
159+
}
160+
return columnFamilyDescriptors;
161+
}
162+
}

0 commit comments

Comments
 (0)