diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
index 58c329c0cd76..3a17bdc698ea 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
@@ -81,6 +81,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Random64;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.hbase.wal.WALEdit;
@@ -945,8 +946,8 @@ public static class WALMapperSearcher extends WALMapper {
private AtomicInteger rows = new AtomicInteger(0);
@Override
- public void setup(Mapper.Context context)
- throws IOException {
+ public void setup(Mapper>>.Context context) throws IOException {
super.setup(context);
try {
this.keysToFind = readKeysToSearch(context.getConfiguration());
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java
index 5566bd79cab0..fba82f73f79f 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java
@@ -70,6 +70,7 @@
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.io.BytesWritable;
@@ -387,8 +388,8 @@ public static class WALMapperSearcher extends WALMapper {
private AtomicInteger rows = new AtomicInteger(0);
@Override
- public void setup(Mapper.Context context)
- throws IOException {
+ public void setup(Mapper>>.Context context) throws IOException {
super.setup(context);
try {
this.keysToFind = readKeysToSearch(context.getConfiguration());
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
index 35c12672deac..17ad0a64b60a 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
@@ -18,9 +18,12 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
@@ -31,7 +34,9 @@
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -56,7 +61,8 @@
*
*/
@InterfaceAudience.Public
-public class MultiTableOutputFormat extends OutputFormat {
+public class MultiTableOutputFormat
+ extends OutputFormat>> {
/** Set this to {@link #WAL_OFF} to turn off write-ahead logging (WAL) */
public static final String WAL_PROPERTY = "hbase.mapreduce.multitableoutputformat.wal";
/** Property value to use write-ahead logging */
@@ -68,7 +74,7 @@ public class MultiTableOutputFormat extends OutputFormat {
+ extends RecordWriter>> {
private static final Logger LOG = LoggerFactory.getLogger(MultiTableRecordWriter.class);
Connection connection;
Map mutatorMap = new HashMap<>();
@@ -119,7 +125,17 @@ public void close(TaskAttemptContext context) throws IOException {
* either a put or a delete. if the action is not a put or a delete.
*/
@Override
- public void write(ImmutableBytesWritable tableName, Mutation action) throws IOException {
+ public void write(ImmutableBytesWritable tableName, Pair> action)
+ throws IOException {
+ if (action.getFirst() != null) {
+ handleMutation(tableName, action.getFirst());
+ return;
+ }
+ handleBulkLoad(tableName, action.getSecond());
+ }
+
+ private void handleMutation(ImmutableBytesWritable tableName, Mutation action)
+ throws IOException {
BufferedMutator mutator = getBufferedMutator(tableName);
// The actions are not immutable, so we defensively copy them
if (action instanceof Put) {
@@ -131,6 +147,39 @@ public void write(ImmutableBytesWritable tableName, Mutation action) throws IOEx
mutator.mutate(delete);
} else throw new IllegalArgumentException("action must be either Delete or Put");
}
+
+ private void handleBulkLoad(ImmutableBytesWritable tableName, List bulkLoadFiles)
+ throws IOException {
+
+ TableName table = TableName.valueOf(tableName.get());
+ LOG.info("Starting bulk load for table: {}", table);
+
+ BulkLoadHFiles bulkLoader = BulkLoadHFiles.create(conf);
+ LOG.info("Processing {} HFiles for bulk loading into table: {}", bulkLoadFiles.size(), table);
+
+ // This map will hold the family-to-files mapping needed for the bulk load operation
+ Map> family2Files = new HashMap<>();
+
+ try {
+ for (String file : bulkLoadFiles) {
+ Path filePath = new Path(file);
+ String family = filePath.getParent().getName();
+ byte[] familyBytes = Bytes.toBytes(family);
+
+ // Add the file to the list of files for the corresponding column family
+ family2Files.computeIfAbsent(familyBytes, k -> new ArrayList<>()).add(filePath);
+ LOG.info("Mapped file {} to family {}", filePath, family);
+ }
+
+ LOG.info("Executing bulk load into table: {}", table);
+ bulkLoader.bulkLoad(table, family2Files);
+
+ LOG.info("Bulk load completed successfully for table: {}", table);
+ } catch (IOException e) {
+ LOG.error("Error during bulk load for table: {}. Exception: {}", table, e.getMessage(), e);
+ throw new IOException("Failed to complete bulk load for table: " + table, e);
+ }
+ }
}
@Override
@@ -146,8 +195,8 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
}
@Override
- public RecordWriter getRecordWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
+ public RecordWriter>>
+ getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
return new MultiTableRecordWriter(HBaseConfiguration.create(conf),
conf.getBoolean(WAL_PROPERTY, WAL_ON));
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index 99b1dd112b98..97e8c966efa0 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -27,13 +27,17 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
@@ -47,8 +51,10 @@
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.TableInfo;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -62,6 +68,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+
/**
* A tool to replay WAL files as a M/R job. The WAL can be replayed for a set of tables or all
* tables, and a time range can be provided (in milliseconds). The WAL is filtered to the passed set
@@ -79,6 +87,7 @@ public class WALPlayer extends Configured implements Tool {
public final static String INPUT_FILES_SEPARATOR_KEY = "wal.input.separator";
public final static String IGNORE_MISSING_FILES = "wal.input.ignore.missing.files";
public final static String MULTI_TABLES_SUPPORT = "wal.multi.tables.support";
+ public final static String BULKLOAD_BACKUP_LOCATION = "wal.bulk.backup.location";
protected static final String tableSeparator = ";";
@@ -156,7 +165,7 @@ protected static enum Counter {
* A mapper that writes out {@link Mutation} to be directly applied to a running HBase instance.
*/
protected static class WALMapper
- extends Mapper {
+ extends Mapper>> {
private Map tables = new TreeMap<>();
@Override
@@ -172,6 +181,54 @@ public void map(WALKey key, WALEdit value, Context context) throws IOException {
ExtendedCell lastCell = null;
for (ExtendedCell cell : WALEditInternalHelper.getExtendedCells(value)) {
context.getCounter(Counter.CELLS_READ).increment(1);
+
+ if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+ String namespace = key.getTableName().getNamespaceAsString();
+ String tableName = key.getTableName().getQualifierAsString();
+ LOG.info("Processing bulk load for namespace: {}, table: {}", namespace, tableName);
+
+ List bulkloadFiles = handleBulkLoadCell(cell);
+ LOG.info("Found {} bulk load files for table: {}", bulkloadFiles.size(), tableName);
+
+ // Prefix each file path with namespace and table name to construct the full paths
+ List bulkloadFilesWithFullPath = bulkloadFiles.stream()
+ .map(filePath -> new Path(namespace, new Path(tableName, filePath)).toString())
+ .collect(Collectors.toList());
+ LOG.info("Bulk load files with full paths: {}", bulkloadFilesWithFullPath.size());
+
+ // Retrieve configuration and set up file systems for backup and staging locations
+ Configuration conf = context.getConfiguration();
+ Path backupLocation = new Path(conf.get(BULKLOAD_BACKUP_LOCATION));
+ FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf); // HDFS filesystem
+ Path hbaseStagingDir =
+ new Path(CommonFSUtils.getRootDir(conf), HConstants.BULKLOAD_STAGING_DIR_NAME);
+ FileSystem backupFs = FileSystem.get(backupLocation.toUri(), conf);
+
+ List stagingPaths = new ArrayList<>();
+
+ try {
+ for (String file : bulkloadFilesWithFullPath) {
+ // Full file path from S3
+ Path fullBackupFilePath = new Path(backupLocation, file);
+ // Staging path on HDFS
+ Path stagingPath = new Path(hbaseStagingDir, file);
+
+ LOG.info("Copying file from backup location (S3): {} to HDFS staging: {}",
+ fullBackupFilePath, stagingPath);
+ // Copy the file from S3 to HDFS
+ FileUtil.copy(backupFs, fullBackupFilePath, rootFs, stagingPath, false, conf);
+
+ stagingPaths.add(stagingPath.toString());
+ }
+ } catch (IOException e) {
+ LOG.error("Error copying files for bulk load: {}", e.getMessage(), e);
+ throw new IOException("Failed to copy files for bulk load.", e);
+ }
+
+ Pair> p = new Pair<>(null, stagingPaths);
+ context.write(tableOut, p);
+ }
+
// Filtering WAL meta marker entries.
if (WALEdit.isMetaEditFamily(cell)) {
continue;
@@ -188,11 +245,13 @@ public void map(WALKey key, WALEdit value, Context context) throws IOException {
) {
// row or type changed, write out aggregate KVs.
if (put != null) {
- context.write(tableOut, put);
+ Pair> p = new Pair<>(put, null);
+ context.write(tableOut, p);
context.getCounter(Counter.PUTS).increment(1);
}
if (del != null) {
- context.write(tableOut, del);
+ Pair> p = new Pair<>(del, null);
+ context.write(tableOut, p);
context.getCounter(Counter.DELETES).increment(1);
}
if (CellUtil.isDelete(cell)) {
@@ -212,12 +271,14 @@ public void map(WALKey key, WALEdit value, Context context) throws IOException {
}
// write residual KVs
if (put != null) {
- context.write(tableOut, put);
+ Pair> p = new Pair<>(put, null);
+ context.write(tableOut, p);
context.getCounter(Counter.PUTS).increment(1);
}
if (del != null) {
+ Pair> p = new Pair<>(del, null);
+ context.write(tableOut, p);
context.getCounter(Counter.DELETES).increment(1);
- context.write(tableOut, del);
}
}
} catch (InterruptedException e) {
@@ -230,10 +291,57 @@ protected boolean filter(Context context, final Cell cell) {
return true;
}
+ private List handleBulkLoadCell(Cell cell) throws IOException {
+ List resultFiles = new ArrayList<>();
+ LOG.info("Bulk load detected in cell. Processing...");
+
+ WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+
+ if (bld == null) {
+ LOG.info("BulkLoadDescriptor is null for cell: {}", cell);
+ return resultFiles;
+ }
+ if (!bld.getReplicate()) {
+ LOG.info("Replication is disabled for bulk load cell: {}", cell);
+ }
+
+ String regionName = bld.getEncodedRegionName().toStringUtf8();
+
+ LOG.info("Encoded region name: {}", regionName);
+
+ List storesList = bld.getStoresList();
+ if (storesList == null) {
+ LOG.info("Store descriptor list is null for region: {}", regionName);
+ return resultFiles;
+ }
+
+ for (WALProtos.StoreDescriptor storeDescriptor : storesList) {
+ String columnFamilyName = storeDescriptor.getFamilyName().toStringUtf8();
+ LOG.info("Processing column family: {}", columnFamilyName);
+
+ List storeFileList = storeDescriptor.getStoreFileList();
+ if (storeFileList == null) {
+ LOG.info("Store file list is null for column family: {}", columnFamilyName);
+ continue;
+ }
+
+ for (String storeFile : storeFileList) {
+ String hFilePath = getHFilePath(regionName, columnFamilyName, storeFile);
+ LOG.info("Adding HFile path to bulk load file paths: {}", hFilePath);
+ resultFiles.add(hFilePath);
+ }
+ }
+ return resultFiles;
+ }
+
+ private String getHFilePath(String regionName, String columnFamilyName, String storeFileName) {
+ return new Path(regionName, new Path(columnFamilyName, storeFileName)).toString();
+ }
+
@Override
- protected void
- cleanup(Mapper.Context context)
- throws IOException, InterruptedException {
+ protected void cleanup(
+ Mapper>>.Context context)
+ throws IOException, InterruptedException {
super.cleanup(context);
}
@@ -293,6 +401,8 @@ public Job createSubmittableJob(String[] args) throws IOException {
setupTime(conf, WALInputFormat.START_TIME_KEY);
setupTime(conf, WALInputFormat.END_TIME_KEY);
String inputDirs = args[0];
+ String walDir = new Path(inputDirs, "WALs").toString();
+ String bulkLoadFilesDir = new Path(inputDirs, "bulk-load-files").toString();
String[] tables = args.length == 1 ? new String[] {} : args[1].split(",");
String[] tableMap;
if (args.length > 2) {
@@ -306,7 +416,8 @@ public Job createSubmittableJob(String[] args) throws IOException {
}
conf.setStrings(TABLES_KEY, tables);
conf.setStrings(TABLE_MAP_KEY, tableMap);
- conf.set(FileInputFormat.INPUT_DIR, inputDirs);
+ conf.set(FileInputFormat.INPUT_DIR, walDir);
+ conf.set(BULKLOAD_BACKUP_LOCATION, bulkLoadFilesDir);
Job job = Job.getInstance(conf,
conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
job.setJarByClass(WALPlayer.class);