Skip to content

Commit

Permalink
HBASE-28767 Simplify backup bulk-loading code (#6134)
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Dimiduk <[email protected]>
  • Loading branch information
DieterDP-ng authored and ndimiduk committed Dec 13, 2024
1 parent 454cd6b commit 9595552
Show file tree
Hide file tree
Showing 7 changed files with 334 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.backup;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -103,7 +104,7 @@ public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
}

try (BackupSystemTable tbl = new BackupSystemTable(connection)) {
fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
fullyBackedUpTables = new ArrayList<>(tbl.getTablesIncludedInBackups());
} catch (IOException ioe) {
LOG.error("Failed to get tables which have been fully backed up, skipping checking", ioe);
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package org.apache.hadoop.hbase.backup;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
Expand Down Expand Up @@ -64,21 +67,8 @@ public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
LOG.debug("skipping recording bulk load in postBulkLoadHFile since backup is disabled");
return;
}
try (Connection connection = ConnectionFactory.createConnection(cfg);
BackupSystemTable tbl = new BackupSystemTable(connection)) {
List<TableName> fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
RegionInfo info = ctx.getEnvironment().getRegionInfo();
TableName tableName = info.getTable();
if (!fullyBackedUpTables.contains(tableName)) {
if (LOG.isTraceEnabled()) {
LOG.trace(tableName + " has not gone thru full backup");
}
return;
}
tbl.writePathsPostBulkLoad(tableName, info.getEncodedNameAsBytes(), finalPaths);
} catch (IOException ioe) {
LOG.error("Failed to get tables which have been fully backed up", ioe);
}

registerBulkLoad(ctx, finalPaths);
}

@Override
Expand All @@ -89,19 +79,31 @@ public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironmen
LOG.debug("skipping recording bulk load in preCommitStoreFile since backup is disabled");
return;
}

List<Path> hfiles = new ArrayList<>(pairs.size());
for (Pair<Path, Path> pair : pairs) {
hfiles.add(pair.getSecond());
}
registerBulkLoad(ctx, Collections.singletonMap(family, hfiles));
}

private void registerBulkLoad(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
Map<byte[], List<Path>> cfToHFilePaths) throws IOException {
Configuration cfg = ctx.getEnvironment().getConfiguration();
RegionInfo region = ctx.getEnvironment().getRegionInfo();
TableName tableName = region.getTable();

try (Connection connection = ConnectionFactory.createConnection(cfg);
BackupSystemTable tbl = new BackupSystemTable(connection)) {
List<TableName> fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
RegionInfo info = ctx.getEnvironment().getRegionInfo();
TableName tableName = info.getTable();
if (!fullyBackedUpTables.contains(tableName)) {
Set<TableName> fullyBackedUpTables = tbl.getTablesIncludedInBackups();

if (fullyBackedUpTables.contains(tableName)) {
tbl.registerBulkLoad(tableName, region.getEncodedNameAsBytes(), cfToHFilePaths);
} else {
if (LOG.isTraceEnabled()) {
LOG.trace(tableName + " has not gone thru full backup");
LOG.trace("Table {} has not gone through full backup - skipping.", tableName);
}
return;
}
tbl.writeFilesForBulkLoadPreCommit(tableName, info.getEncodedNameAsBytes(), family, pairs);
return;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -355,8 +354,7 @@ public HashMap<String, Long> readRegionServerLastLogRollResult() throws IOExcept
return systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir());
}

public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
readBulkloadRows(List<TableName> tableList) throws IOException {
public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOException {
return systemTable.readBulkloadRows(tableList);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -179,10 +178,6 @@ public String toString() {
final static byte[] TBL_COL = Bytes.toBytes("tbl");
final static byte[] FAM_COL = Bytes.toBytes("fam");
final static byte[] PATH_COL = Bytes.toBytes("path");
final static byte[] STATE_COL = Bytes.toBytes("state");
// the two states a bulk loaded file can be
final static byte[] BL_PREPARE = Bytes.toBytes("R");
final static byte[] BL_COMMIT = Bytes.toBytes("D");

private final static String SET_KEY_PREFIX = "backupset:";

Expand Down Expand Up @@ -378,7 +373,7 @@ public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<Table
}
files.add(new Path(path));
if (LOG.isDebugEnabled()) {
LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path);
LOG.debug("found bulk loaded file : {} {} {}", tbl, Bytes.toString(fam), path);
}
}

Expand All @@ -401,43 +396,22 @@ public void deleteBackupInfo(String backupId) throws IOException {
}
}

/*
* For postBulkLoadHFile() hook.
* @param tabName table name
* @param region the region receiving hfile
* @param finalPaths family and associated hfiles
/**
* Registers a bulk load.
* @param tableName table name
* @param region the region receiving hfile
* @param cfToHfilePath column family and associated hfiles
*/
public void writePathsPostBulkLoad(TableName tabName, byte[] region,
Map<byte[], List<Path>> finalPaths) throws IOException {
public void registerBulkLoad(TableName tableName, byte[] region,
Map<byte[], List<Path>> cfToHfilePath) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size()
+ " entries");
LOG.debug("Writing bulk load descriptor to backup {} with {} entries", tableName,
cfToHfilePath.size());
}
try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
List<Put> puts = BackupSystemTable.createPutForBulkLoad(tableName, region, cfToHfilePath);
bufferedMutator.mutate(puts);
LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
}
}

/*
* For preCommitStoreFile() hook
* @param tabName table name
* @param region the region receiving hfile
* @param family column family
* @param pairs list of paths for hfiles
*/
public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family,
final List<Pair<Path, Path>> pairs) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug(
"write bulk load descriptor to backup " + tabName + " with " + pairs.size() + " entries");
}
try (Table table = connection.getTable(bulkLoadTableName)) {
List<Put> puts =
BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs);
table.put(puts);
LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
LOG.debug("Written {} rows for bulk load of {}", puts.size(), tableName);
}
}

Expand All @@ -459,33 +433,25 @@ public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
}
}

/*
/**
* Reads the rows from backup table recording bulk loaded hfiles
* @param tableList list of table names
* @return The keys of the Map are table, region and column family. Value of the map reflects
* whether the hfile was recorded by preCommitStoreFile hook (true)
*/
public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
readBulkloadRows(List<TableName> tableList) throws IOException {

Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>();
List<byte[]> rows = new ArrayList<>();
for (TableName tTable : tableList) {
Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable);
Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable);
try (Table table = connection.getTable(bulkLoadTableName);
ResultScanner scanner = table.getScanner(scan)) {
Result res = null;
*/
public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOException {
List<BulkLoad> result = new ArrayList<>();
for (TableName table : tableList) {
Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table);
try (Table bulkLoadTable = connection.getTable(bulkLoadTableName);
ResultScanner scanner = bulkLoadTable.getScanner(scan)) {
Result res;
while ((res = scanner.next()) != null) {
res.advance();
String fam = null;
String path = null;
boolean raw = false;
byte[] row;
String region = null;
byte[] row = null;
for (Cell cell : res.listCells()) {
row = CellUtil.cloneRow(cell);
rows.add(row);
String rowStr = Bytes.toString(row);
region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
if (
Expand All @@ -498,35 +464,14 @@ public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
BackupSystemTable.PATH_COL.length) == 0
) {
path = Bytes.toString(CellUtil.cloneValue(cell));
} else if (
CellUtil.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0,
BackupSystemTable.STATE_COL.length) == 0
) {
byte[] state = CellUtil.cloneValue(cell);
if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) {
raw = true;
} else {
raw = false;
}
}
}
if (map.get(tTable) == null) {
map.put(tTable, new HashMap<>());
tblMap = map.get(tTable);
}
if (tblMap.get(region) == null) {
tblMap.put(region, new HashMap<>());
}
Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region);
if (famMap.get(fam) == null) {
famMap.put(fam, new ArrayList<>());
}
famMap.get(fam).add(new Pair<>(path, raw));
result.add(new BulkLoad(table, region, fam, path, row));
LOG.debug("found orig " + path + " for " + fam + " of table " + region);
}
}
}
return new Pair<>(map, rows);
return result;
}

/*
Expand Down Expand Up @@ -793,20 +738,19 @@ public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) th
return result;
}

/*
* Retrieve TableName's for completed backup of given type
* @param type backup type
* @return List of table names
/**
* Retrieve all table names that are part of any known backup
*/
public List<TableName> getTablesForBackupType(BackupType type) throws IOException {
public Set<TableName> getTablesIncludedInBackups() throws IOException {
Set<TableName> names = new HashSet<>();
List<BackupInfo> infos = getBackupHistory(true);
for (BackupInfo info : infos) {
if (info.getType() == type) {
// Incremental backups have the same tables as the preceding full backups
if (info.getType() == BackupType.FULL) {
names.addAll(info.getTableNames());
}
}
return new ArrayList<>(names);
return names;
}

/**
Expand Down Expand Up @@ -1500,13 +1444,13 @@ private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) {
return s.substring(index + 1);
}

/*
* Creates Put's for bulk load resulting from running LoadIncrementalHFiles
/**
* Creates Put's for bulk loads.
*/
static List<Put> createPutForCommittedBulkload(TableName table, byte[] region,
Map<byte[], List<Path>> finalPaths) {
private static List<Put> createPutForBulkLoad(TableName table, byte[] region,
Map<byte[], List<Path>> columnFamilyToHFilePaths) {
List<Put> puts = new ArrayList<>();
for (Map.Entry<byte[], List<Path>> entry : finalPaths.entrySet()) {
for (Map.Entry<byte[], List<Path>> entry : columnFamilyToHFilePaths.entrySet()) {
for (Path path : entry.getValue()) {
String file = path.toString();
int lastSlash = file.lastIndexOf("/");
Expand All @@ -1516,10 +1460,8 @@ static List<Put> createPutForCommittedBulkload(TableName table, byte[] region,
put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file));
put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT);
puts.add(put);
LOG
.debug("writing done bulk path " + file + " for " + table + " " + Bytes.toString(region));
LOG.debug("Done writing bulk path {} for {} {}", file, table, Bytes.toString(region));
}
}
return puts;
Expand Down Expand Up @@ -1580,29 +1522,6 @@ public static void deleteSnapshot(Connection conn) throws IOException {
}
}

/*
* Creates Put's for bulk load resulting from running LoadIncrementalHFiles
*/
static List<Put> createPutForPreparedBulkload(TableName table, byte[] region, final byte[] family,
final List<Pair<Path, Path>> pairs) {
List<Put> puts = new ArrayList<>(pairs.size());
for (Pair<Path, Path> pair : pairs) {
Path path = pair.getSecond();
String file = path.toString();
int lastSlash = file.lastIndexOf("/");
String filename = file.substring(lastSlash + 1);
Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
Bytes.toString(region), BLK_LD_DELIM, filename));
put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family);
put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file));
put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE);
puts.add(put);
LOG.debug("writing raw bulk path " + file + " for " + table + " " + Bytes.toString(region));
}
return puts;
}

public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) {
List<Delete> lstDels = new ArrayList<>(lst.size());
for (TableName table : lst) {
Expand Down
Loading

0 comments on commit 9595552

Please sign in to comment.