From 4f267484633607baa0b2cba7c944ea1eb24ed1ad Mon Sep 17 00:00:00 2001 From: DieterDP <90392398+DieterDP-ng@users.noreply.github.com> Date: Fri, 13 Dec 2024 14:43:13 +0100 Subject: [PATCH] HBASE-28767 Simplify backup bulk-loading code (#6134) Signed-off-by: Nick Dimiduk --- .../hbase/backup/BackupHFileCleaner.java | 3 +- .../hadoop/hbase/backup/BackupObserver.java | 48 +++--- .../hbase/backup/impl/BackupManager.java | 4 +- .../hbase/backup/impl/BackupSystemTable.java | 153 +++++------------- .../hadoop/hbase/backup/impl/BulkLoad.java | 93 +++++++++++ .../impl/IncrementalTableBackupClient.java | 112 +++++-------- 6 files changed, 197 insertions(+), 216 deletions(-) create mode 100644 hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java index 619cecaeaaac..d7474997412a 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.backup; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -103,7 +104,7 @@ public Iterable getDeletableFiles(Iterable files) { } try (BackupSystemTable tbl = new BackupSystemTable(connection)) { - fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL); + fullyBackedUpTables = new ArrayList<>(tbl.getTablesIncludedInBackups()); } catch (IOException ioe) { LOG.error("Failed to get tables which have been fully backed up, skipping checking", ioe); return Collections.emptyList(); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java index 73f97365adbe..db8c29c4c0a3 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java @@ -18,9 +18,12 @@ package org.apache.hadoop.hbase.backup; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -64,21 +67,8 @@ public void postBulkLoadHFile(ObserverContext ctx, LOG.debug("skipping recording bulk load in postBulkLoadHFile since backup is disabled"); return; } - try (Connection connection = ConnectionFactory.createConnection(cfg); - BackupSystemTable tbl = new BackupSystemTable(connection)) { - List fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL); - RegionInfo info = ctx.getEnvironment().getRegionInfo(); - TableName tableName = info.getTable(); - if (!fullyBackedUpTables.contains(tableName)) { - if (LOG.isTraceEnabled()) { - LOG.trace(tableName + " has not gone thru full backup"); - } - return; - } - tbl.writePathsPostBulkLoad(tableName, info.getEncodedNameAsBytes(), finalPaths); - } catch (IOException ioe) { - LOG.error("Failed to get tables which have been fully backed up", ioe); - } + + registerBulkLoad(ctx, finalPaths); } @Override @@ -89,19 +79,31 @@ public void preCommitStoreFile(final ObserverContext hfiles = new ArrayList<>(pairs.size()); + for (Pair pair : pairs) { + hfiles.add(pair.getSecond()); + } + registerBulkLoad(ctx, Collections.singletonMap(family, hfiles)); + } + + private void registerBulkLoad(ObserverContext ctx, + Map> cfToHFilePaths) throws IOException { + Configuration cfg = ctx.getEnvironment().getConfiguration(); + RegionInfo region = ctx.getEnvironment().getRegionInfo(); + TableName tableName = region.getTable(); + try (Connection connection = ConnectionFactory.createConnection(cfg); BackupSystemTable tbl = new BackupSystemTable(connection)) { - List fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL); - RegionInfo info = ctx.getEnvironment().getRegionInfo(); - TableName tableName = info.getTable(); - if (!fullyBackedUpTables.contains(tableName)) { + Set fullyBackedUpTables = tbl.getTablesIncludedInBackups(); + + if (fullyBackedUpTables.contains(tableName)) { + tbl.registerBulkLoad(tableName, region.getEncodedNameAsBytes(), cfToHFilePaths); + } else { if (LOG.isTraceEnabled()) { - LOG.trace(tableName + " has not gone thru full backup"); + LOG.trace("Table {} has not gone through full backup - skipping.", tableName); } - return; } - tbl.writeFilesForBulkLoadPreCommit(tableName, info.getEncodedNameAsBytes(), family, pairs); - return; } } } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java index e3644c7d7f06..5afd580a649d 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.procedure.ProcedureManagerHost; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -355,8 +354,7 @@ public HashMap readRegionServerLastLogRollResult() throws IOExcept return systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir()); } - public Pair>>>>, List> - readBulkloadRows(List tableList) throws IOException { + public List readBulkloadRows(List tableList) throws IOException { return systemTable.readBulkloadRows(tableList); } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index 01106b4cd0ea..203f3f61b0fb 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -70,7 +70,6 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -179,10 +178,6 @@ public String toString() { final static byte[] TBL_COL = Bytes.toBytes("tbl"); final static byte[] FAM_COL = Bytes.toBytes("fam"); final static byte[] PATH_COL = Bytes.toBytes("path"); - final static byte[] STATE_COL = Bytes.toBytes("state"); - // the two states a bulk loaded file can be - final static byte[] BL_PREPARE = Bytes.toBytes("R"); - final static byte[] BL_COMMIT = Bytes.toBytes("D"); private final static String SET_KEY_PREFIX = "backupset:"; @@ -378,7 +373,7 @@ public Map>[] readBulkLoadedFiles(String backupId, List> finalPaths) throws IOException { + public void registerBulkLoad(TableName tableName, byte[] region, + Map> cfToHfilePath) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size() - + " entries"); + LOG.debug("Writing bulk load descriptor to backup {} with {} entries", tableName, + cfToHfilePath.size()); } try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { - List puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths); + List puts = BackupSystemTable.createPutForBulkLoad(tableName, region, cfToHfilePath); bufferedMutator.mutate(puts); - LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); - } - } - - /* - * For preCommitStoreFile() hook - * @param tabName table name - * @param region the region receiving hfile - * @param family column family - * @param pairs list of paths for hfiles - */ - public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family, - final List> pairs) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug( - "write bulk load descriptor to backup " + tabName + " with " + pairs.size() + " entries"); - } - try (Table table = connection.getTable(bulkLoadTableName)) { - List puts = - BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs); - table.put(puts); - LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); + LOG.debug("Written {} rows for bulk load of {}", puts.size(), tableName); } } @@ -459,33 +433,25 @@ public void deleteBulkLoadedRows(List rows) throws IOException { } } - /* + /** * Reads the rows from backup table recording bulk loaded hfiles * @param tableList list of table names - * @return The keys of the Map are table, region and column family. Value of the map reflects - * whether the hfile was recorded by preCommitStoreFile hook (true) - */ - public Pair>>>>, List> - readBulkloadRows(List tableList) throws IOException { - - Map>>>> map = new HashMap<>(); - List rows = new ArrayList<>(); - for (TableName tTable : tableList) { - Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable); - Map>>> tblMap = map.get(tTable); - try (Table table = connection.getTable(bulkLoadTableName); - ResultScanner scanner = table.getScanner(scan)) { - Result res = null; + */ + public List readBulkloadRows(List tableList) throws IOException { + List result = new ArrayList<>(); + for (TableName table : tableList) { + Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table); + try (Table bulkLoadTable = connection.getTable(bulkLoadTableName); + ResultScanner scanner = bulkLoadTable.getScanner(scan)) { + Result res; while ((res = scanner.next()) != null) { res.advance(); String fam = null; String path = null; - boolean raw = false; - byte[] row; String region = null; + byte[] row = null; for (Cell cell : res.listCells()) { row = CellUtil.cloneRow(cell); - rows.add(row); String rowStr = Bytes.toString(row); region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr); if ( @@ -498,35 +464,14 @@ public void deleteBulkLoadedRows(List rows) throws IOException { BackupSystemTable.PATH_COL.length) == 0 ) { path = Bytes.toString(CellUtil.cloneValue(cell)); - } else if ( - CellUtil.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0, - BackupSystemTable.STATE_COL.length) == 0 - ) { - byte[] state = CellUtil.cloneValue(cell); - if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) { - raw = true; - } else { - raw = false; - } } } - if (map.get(tTable) == null) { - map.put(tTable, new HashMap<>()); - tblMap = map.get(tTable); - } - if (tblMap.get(region) == null) { - tblMap.put(region, new HashMap<>()); - } - Map>> famMap = tblMap.get(region); - if (famMap.get(fam) == null) { - famMap.put(fam, new ArrayList<>()); - } - famMap.get(fam).add(new Pair<>(path, raw)); + result.add(new BulkLoad(table, region, fam, path, row)); LOG.debug("found orig " + path + " for " + fam + " of table " + region); } } } - return new Pair<>(map, rows); + return result; } /* @@ -793,20 +738,19 @@ public List getBackupHistory(int n, BackupInfo.Filter... filters) th return result; } - /* - * Retrieve TableName's for completed backup of given type - * @param type backup type - * @return List of table names + /** + * Retrieve all table names that are part of any known backup */ - public List getTablesForBackupType(BackupType type) throws IOException { + public Set getTablesIncludedInBackups() throws IOException { Set names = new HashSet<>(); List infos = getBackupHistory(true); for (BackupInfo info : infos) { - if (info.getType() == type) { + // Incremental backups have the same tables as the preceding full backups + if (info.getType() == BackupType.FULL) { names.addAll(info.getTableNames()); } } - return new ArrayList<>(names); + return names; } /** @@ -1500,13 +1444,13 @@ private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) { return s.substring(index + 1); } - /* - * Creates Put's for bulk load resulting from running LoadIncrementalHFiles + /** + * Creates Put's for bulk loads. */ - static List createPutForCommittedBulkload(TableName table, byte[] region, - Map> finalPaths) { + private static List createPutForBulkLoad(TableName table, byte[] region, + Map> columnFamilyToHFilePaths) { List puts = new ArrayList<>(); - for (Map.Entry> entry : finalPaths.entrySet()) { + for (Map.Entry> entry : columnFamilyToHFilePaths.entrySet()) { for (Path path : entry.getValue()) { String file = path.toString(); int lastSlash = file.lastIndexOf("/"); @@ -1516,10 +1460,8 @@ static List createPutForCommittedBulkload(TableName table, byte[] region, put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey()); put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file)); - put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT); puts.add(put); - LOG - .debug("writing done bulk path " + file + " for " + table + " " + Bytes.toString(region)); + LOG.debug("Done writing bulk path {} for {} {}", file, table, Bytes.toString(region)); } } return puts; @@ -1580,29 +1522,6 @@ public static void deleteSnapshot(Connection conn) throws IOException { } } - /* - * Creates Put's for bulk load resulting from running LoadIncrementalHFiles - */ - static List createPutForPreparedBulkload(TableName table, byte[] region, final byte[] family, - final List> pairs) { - List puts = new ArrayList<>(pairs.size()); - for (Pair pair : pairs) { - Path path = pair.getSecond(); - String file = path.toString(); - int lastSlash = file.lastIndexOf("/"); - String filename = file.substring(lastSlash + 1); - Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, - Bytes.toString(region), BLK_LD_DELIM, filename)); - put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); - put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family); - put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file)); - put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE); - puts.add(put); - LOG.debug("writing raw bulk path " + file + " for " + table + " " + Bytes.toString(region)); - } - return puts; - } - public static List createDeleteForOrigBulkLoad(List lst) { List lstDels = new ArrayList<>(lst.size()); for (TableName table : lst) { diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java new file mode 100644 index 000000000000..0f1e79c976bb --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.hadoop.hbase.TableName; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * The data corresponding to a single bulk-loaded file that is being tracked by the backup logic. + */ +@InterfaceAudience.Private +public class BulkLoad { + private final TableName tableName; + private final String region; + private final String columnFamily; + private final String hfilePath; + private final byte[] rowKey; + + public BulkLoad(TableName tableName, String region, String columnFamily, String hfilePath, + byte[] rowKey) { + this.tableName = tableName; + this.region = region; + this.columnFamily = columnFamily; + this.hfilePath = hfilePath; + this.rowKey = rowKey; + } + + public TableName getTableName() { + return tableName; + } + + public String getRegion() { + return region; + } + + public String getColumnFamily() { + return columnFamily; + } + + public String getHfilePath() { + return hfilePath; + } + + public byte[] getRowKey() { + return rowKey; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof BulkLoad)) { + return false; + } + BulkLoad that = (BulkLoad) o; + return new EqualsBuilder().append(tableName, that.tableName).append(region, that.region) + .append(columnFamily, that.columnFamily).append(hfilePath, that.hfilePath) + .append(rowKey, that.rowKey).isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(tableName).append(region).append(columnFamily) + .append(hfilePath).append(rowKey).toHashCode(); + } + + @Override + public String toString() { + return new ToStringBuilder(this, ToStringStyle.NO_CLASS_NAME_STYLE) + .append("tableName", tableName).append("region", region).append("columnFamily", columnFamily) + .append("hfilePath", hfilePath).append("rowKey", rowKey).toString(); + } +} diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 6ad487337c22..03a6ecc02f37 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -27,7 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; +import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -48,16 +48,16 @@ import org.apache.hadoop.hbase.mapreduce.WALPlayer; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotManifest; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.util.Tool; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; /** @@ -110,22 +110,17 @@ protected static int getIndex(TableName tbl, List sTableList) { return -1; } - /* + /** * Reads bulk load records from backup table, iterates through the records and forms the paths for * bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination. This method does NOT * clean up the entries in the bulk load system table. Those entries should not be cleaned until * the backup is marked as complete. - * @param sTableList list of tables to be backed up - * @return the rowkeys of bulk loaded files + * @param tablesToBackup list of tables to be backed up */ - @SuppressWarnings("unchecked") - protected List handleBulkLoad(List sTableList) throws IOException { - Map>[] mapForSrc = new Map[sTableList.size()]; + protected List handleBulkLoad(List tablesToBackup) throws IOException { List activeFiles = new ArrayList<>(); List archiveFiles = new ArrayList<>(); - Pair>>>>, List> pair = - backupManager.readBulkloadRows(sTableList); - Map>>>> map = pair.getFirst(); + List bulkLoads = backupManager.readBulkloadRows(tablesToBackup); FileSystem tgtFs; try { tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf); @@ -135,74 +130,46 @@ protected List handleBulkLoad(List sTableList) throws IOExcep Path rootdir = CommonFSUtils.getRootDir(conf); Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId); - for (Map.Entry>>>> tblEntry : map - .entrySet()) { - TableName srcTable = tblEntry.getKey(); + for (BulkLoad bulkLoad : bulkLoads) { + TableName srcTable = bulkLoad.getTableName(); + String regionName = bulkLoad.getRegion(); + String fam = bulkLoad.getColumnFamily(); + String filename = FilenameUtils.getName(bulkLoad.getHfilePath()); - int srcIdx = getIndex(srcTable, sTableList); - if (srcIdx < 0) { - LOG.warn("Couldn't find " + srcTable + " in source table List"); + if (!tablesToBackup.contains(srcTable)) { + LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable); continue; } - if (mapForSrc[srcIdx] == null) { - mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR); - } Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable); - Path tgtTable = new Path(new Path(tgtRoot, srcTable.getNamespaceAsString()), - srcTable.getQualifierAsString()); - for (Map.Entry>>> regionEntry : tblEntry - .getValue().entrySet()) { - String regionName = regionEntry.getKey(); - Path regionDir = new Path(tblDir, regionName); - // map from family to List of hfiles - for (Map.Entry>> famEntry : regionEntry.getValue() - .entrySet()) { - String fam = famEntry.getKey(); - Path famDir = new Path(regionDir, fam); - List files; - if (!mapForSrc[srcIdx].containsKey(Bytes.toBytes(fam))) { - files = new ArrayList<>(); - mapForSrc[srcIdx].put(Bytes.toBytes(fam), files); - } else { - files = mapForSrc[srcIdx].get(Bytes.toBytes(fam)); - } - Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam); - String tblName = srcTable.getQualifierAsString(); - Path tgtFam = new Path(new Path(tgtTable, regionName), fam); - if (!tgtFs.mkdirs(tgtFam)) { - throw new IOException("couldn't create " + tgtFam); - } - for (Pair fileWithState : famEntry.getValue()) { - String file = fileWithState.getFirst(); - int idx = file.lastIndexOf("/"); - String filename = file; - if (idx > 0) { - filename = file.substring(idx + 1); - } - Path p = new Path(famDir, filename); - Path tgt = new Path(tgtFam, filename); - Path archive = new Path(archiveDir, filename); - if (fs.exists(p)) { - if (LOG.isTraceEnabled()) { - LOG.trace("found bulk hfile " + file + " in " + famDir + " for " + tblName); - } - if (LOG.isTraceEnabled()) { - LOG.trace("copying " + p + " to " + tgt); - } - activeFiles.add(p.toString()); - } else if (fs.exists(archive)) { - LOG.debug("copying archive " + archive + " to " + tgt); - archiveFiles.add(archive.toString()); - } - files.add(tgt); - } + Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); + + String srcTableQualifier = srcTable.getQualifierAsString(); + String srcTableNs = srcTable.getNamespaceAsString(); + Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + srcTableQualifier + + Path.SEPARATOR + regionName + Path.SEPARATOR + fam); + if (!tgtFs.mkdirs(tgtFam)) { + throw new IOException("couldn't create " + tgtFam); + } + Path tgt = new Path(tgtFam, filename); + + Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam); + Path archive = new Path(archiveDir, filename); + + if (fs.exists(p)) { + if (LOG.isTraceEnabled()) { + LOG.trace("found bulk hfile {} in {} for {}", bulkLoad.getHfilePath(), p.getParent(), + srcTableQualifier); + LOG.trace("copying {} to {}", p, tgt); } + activeFiles.add(p.toString()); + } else if (fs.exists(archive)) { + LOG.debug("copying archive {} to {}", archive, tgt); + archiveFiles.add(archive.toString()); } } copyBulkLoadedFiles(activeFiles, archiveFiles); - - return pair.getSecond(); + return bulkLoads; } private void copyBulkLoadedFiles(List activeFiles, List archiveFiles) @@ -327,11 +294,12 @@ public void execute() throws IOException, ColumnFamilyMismatchException { BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap)); backupManager.writeBackupStartCode(newStartCode); - List bulkLoadedRows = handleBulkLoad(backupInfo.getTableNames()); + List bulkLoads = handleBulkLoad(backupInfo.getTableNames()); // backup complete completeBackup(conn, backupInfo, BackupType.INCREMENTAL, conf); + List bulkLoadedRows = Lists.transform(bulkLoads, BulkLoad::getRowKey); backupManager.deleteBulkLoadedRows(bulkLoadedRows); } catch (IOException e) { failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",