Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28988 Enhance WALPlayer for restore of BulkLoad #6523

Draft
wants to merge 1 commit into
base: HBASE-28957
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Random64;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.hbase.wal.WALEdit;
Expand Down Expand Up @@ -945,7 +946,7 @@ public static class WALMapperSearcher extends WALMapper {
private AtomicInteger rows = new AtomicInteger(0);

@Override
public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Pair<Mutation, List<String>>>.Context context)
throws IOException {
super.setup(context);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.io.BytesWritable;
Expand Down Expand Up @@ -387,7 +388,7 @@ public static class WALMapperSearcher extends WALMapper {
private AtomicInteger rows = new AtomicInteger(0);

@Override
public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Pair<Mutation, List<String>>>.Context context)
throws IOException {
super.setup(context);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@
package org.apache.hadoop.hbase.mapreduce;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
Expand All @@ -30,8 +36,12 @@
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
Expand All @@ -56,7 +66,7 @@
* </p>
*/
@InterfaceAudience.Public
public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, Mutation> {
public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, Pair<Mutation, List<String>>> {
/** Set this to {@link #WAL_OFF} to turn off write-ahead logging (WAL) */
public static final String WAL_PROPERTY = "hbase.mapreduce.multitableoutputformat.wal";
/** Property value to use write-ahead logging */
Expand All @@ -68,7 +78,7 @@ public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable,
* Record writer for outputting to multiple HTables.
*/
protected static class MultiTableRecordWriter
extends RecordWriter<ImmutableBytesWritable, Mutation> {
extends RecordWriter<ImmutableBytesWritable, Pair<Mutation, List<String>>> {
private static final Logger LOG = LoggerFactory.getLogger(MultiTableRecordWriter.class);
Connection connection;
Map<ImmutableBytesWritable, BufferedMutator> mutatorMap = new HashMap<>();
Expand Down Expand Up @@ -119,7 +129,15 @@ public void close(TaskAttemptContext context) throws IOException {
* either a put or a delete. if the action is not a put or a delete.
*/
@Override
public void write(ImmutableBytesWritable tableName, Mutation action) throws IOException {
public void write(ImmutableBytesWritable tableName, Pair<Mutation, List<String>> action) throws IOException {
if (action.getFirst() != null) {
handleMutation(tableName, action.getFirst());
return;
}
handleBulkLoad(tableName, action.getSecond());
}

private void handleMutation(ImmutableBytesWritable tableName, Mutation action) throws IOException {
BufferedMutator mutator = getBufferedMutator(tableName);
// The actions are not immutable, so we defensively copy them
if (action instanceof Put) {
Expand All @@ -131,6 +149,39 @@ public void write(ImmutableBytesWritable tableName, Mutation action) throws IOEx
mutator.mutate(delete);
} else throw new IllegalArgumentException("action must be either Delete or Put");
}

private void handleBulkLoad(ImmutableBytesWritable tableName, List<String> bulkLoadFiles)
throws IOException {

TableName table = TableName.valueOf(tableName.get());
LOG.info("Starting bulk load for table: {}", table);

BulkLoadHFiles bulkLoader = BulkLoadHFiles.create(conf);
LOG.info("Processing {} HFiles for bulk loading into table: {}", bulkLoadFiles.size(), table);

// This map will hold the family-to-files mapping needed for the bulk load operation
Map<byte[], List<Path>> family2Files = new HashMap<>();

try {
for (String file : bulkLoadFiles) {
Path filePath = new Path(file);
String family = filePath.getParent().getName();
byte[] familyBytes = Bytes.toBytes(family);

// Add the file to the list of files for the corresponding column family
family2Files.computeIfAbsent(familyBytes, k -> new ArrayList<>()).add(filePath);
LOG.info("Mapped file {} to family {}", filePath, family);
}

LOG.info("Executing bulk load into table: {}", table);
bulkLoader.bulkLoad(table, family2Files);

LOG.info("Bulk load completed successfully for table: {}", table);
} catch (IOException e) {
LOG.error("Error during bulk load for table: {}. Exception: {}", table, e.getMessage(), e);
throw new IOException("Failed to complete bulk load for table: " + table, e);
}
}
}

@Override
Expand All @@ -146,7 +197,7 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
}

@Override
public RecordWriter<ImmutableBytesWritable, Mutation> getRecordWriter(TaskAttemptContext context)
public RecordWriter<ImmutableBytesWritable, Pair<Mutation, List<String>>> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
return new MultiTableRecordWriter(HBaseConfiguration.create(conf),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
Expand All @@ -46,9 +50,12 @@
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.TableInfo;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
import org.apache.hadoop.hbase.wal.WALKey;
Expand Down Expand Up @@ -79,6 +86,7 @@ public class WALPlayer extends Configured implements Tool {
public final static String INPUT_FILES_SEPARATOR_KEY = "wal.input.separator";
public final static String IGNORE_MISSING_FILES = "wal.input.ignore.missing.files";
public final static String MULTI_TABLES_SUPPORT = "wal.multi.tables.support";
public final static String BULKLOAD_BACKUP_LOCATION = "wal.bulk.backup.location";

protected static final String tableSeparator = ";";

Expand Down Expand Up @@ -156,7 +164,7 @@ protected static enum Counter {
* A mapper that writes out {@link Mutation} to be directly applied to a running HBase instance.
*/
protected static class WALMapper
extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> {
extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Pair<Mutation, List<String>>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of Pair here, can we use a Custom Class? So that The exclusivity between Mutation and BulkLoadFiles is enforced programmatically.

private Map<TableName, TableName> tables = new TreeMap<>();

@Override
Expand All @@ -172,6 +180,52 @@ public void map(WALKey key, WALEdit value, Context context) throws IOException {
ExtendedCell lastCell = null;
for (ExtendedCell cell : WALEditInternalHelper.getExtendedCells(value)) {
context.getCounter(Counter.CELLS_READ).increment(1);

if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the processing of bulkloaded files can be simplified, and we could reduce the log level from INFO to DEBUG or TRACE in some cases.

String namespace = key.getTableName().getNamespaceAsString();
String tableName = key.getTableName().getQualifierAsString();
LOG.info("Processing bulk load for namespace: {}, table: {}", namespace, tableName);

List<String> bulkloadFiles = handleBulkLoadCell(cell);
LOG.info("Found {} bulk load files for table: {}", bulkloadFiles.size(), tableName);

// Prefix each file path with namespace and table name to construct the full paths
List<String> bulkloadFilesWithFullPath = bulkloadFiles.stream()
.map(filePath -> new Path(namespace, new Path(tableName, filePath)).toString())
.collect(Collectors.toList());
LOG.info("Bulk load files with full paths: {}", bulkloadFilesWithFullPath.size());

// Retrieve configuration and set up file systems for backup and staging locations
Configuration conf = context.getConfiguration();
Path backupLocation = new Path(conf.get(BULKLOAD_BACKUP_LOCATION));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check for if backupLocation is not specified.

FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf); // HDFS filesystem
Path hbaseStagingDir = new Path(CommonFSUtils.getRootDir(conf), HConstants.BULKLOAD_STAGING_DIR_NAME);
FileSystem backupFs = FileSystem.get(backupLocation.toUri(), conf);

List<String> stagingPaths = new ArrayList<>();

try {
for (String file : bulkloadFilesWithFullPath) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are not full paths, but the relative paths from namespace

// Full file path from S3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the hardcoded S3 here

Path fullBackupFilePath = new Path(backupLocation, file);
// Staging path on HDFS
Path stagingPath = new Path(hbaseStagingDir, file);

LOG.info("Copying file from backup location (S3): {} to HDFS staging: {}", fullBackupFilePath, stagingPath);
// Copy the file from S3 to HDFS
FileUtil.copy(backupFs, fullBackupFilePath, rootFs, stagingPath, false, conf);

stagingPaths.add(stagingPath.toString());
}
} catch (IOException e) {
LOG.error("Error copying files for bulk load: {}", e.getMessage(), e);
throw new IOException("Failed to copy files for bulk load.", e);
}

Pair<Mutation, List<String>> p = new Pair<>(null, stagingPaths);
context.write(tableOut, p);
}

// Filtering WAL meta marker entries.
if (WALEdit.isMetaEditFamily(cell)) {
continue;
Expand All @@ -188,11 +242,13 @@ public void map(WALKey key, WALEdit value, Context context) throws IOException {
) {
// row or type changed, write out aggregate KVs.
if (put != null) {
context.write(tableOut, put);
Pair<Mutation, List<String>> p = new Pair<>(put, null);
context.write(tableOut, p);
context.getCounter(Counter.PUTS).increment(1);
}
if (del != null) {
context.write(tableOut, del);
Pair<Mutation, List<String>> p = new Pair<>(del, null);
context.write(tableOut, p);
context.getCounter(Counter.DELETES).increment(1);
}
if (CellUtil.isDelete(cell)) {
Expand All @@ -212,12 +268,14 @@ public void map(WALKey key, WALEdit value, Context context) throws IOException {
}
// write residual KVs
if (put != null) {
context.write(tableOut, put);
Pair<Mutation, List<String>> p = new Pair<>(put, null);
context.write(tableOut, p);
context.getCounter(Counter.PUTS).increment(1);
}
if (del != null) {
Pair<Mutation, List<String>> p = new Pair<>(del, null);
context.write(tableOut, p);
context.getCounter(Counter.DELETES).increment(1);
context.write(tableOut, del);
}
}
} catch (InterruptedException e) {
Expand All @@ -230,9 +288,56 @@ protected boolean filter(Context context, final Cell cell) {
return true;
}

private List<String> handleBulkLoadCell(Cell cell) throws IOException {
List<String> resultFiles = new ArrayList<>();
LOG.info("Bulk load detected in cell. Processing...");

WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);

if (bld == null) {
LOG.info("BulkLoadDescriptor is null for cell: {}", cell);
return resultFiles;
}
if (!bld.getReplicate()) {
LOG.info("Replication is disabled for bulk load cell: {}", cell);
}

String regionName = bld.getEncodedRegionName().toStringUtf8();

LOG.info("Encoded region name: {}", regionName);

List<WALProtos.StoreDescriptor> storesList = bld.getStoresList();
if (storesList == null) {
LOG.info("Store descriptor list is null for region: {}", regionName);
return resultFiles;
}

for (WALProtos.StoreDescriptor storeDescriptor : storesList) {
String columnFamilyName = storeDescriptor.getFamilyName().toStringUtf8();
LOG.info("Processing column family: {}", columnFamilyName);

List<String> storeFileList = storeDescriptor.getStoreFileList();
if (storeFileList == null) {
LOG.info("Store file list is null for column family: {}", columnFamilyName);
continue;
}

for (String storeFile : storeFileList) {
String hFilePath = getHFilePath(regionName, columnFamilyName, storeFile);
LOG.info("Adding HFile path to bulk load file paths: {}", hFilePath);
resultFiles.add(hFilePath);
}
}
return resultFiles;
}

private String getHFilePath(String regionName, String columnFamilyName, String storeFileName) {
return new Path(regionName, new Path(columnFamilyName, storeFileName)).toString();
}

@Override
protected void
cleanup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
cleanup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Pair<Mutation, List<String>>>.Context context)
throws IOException, InterruptedException {
super.cleanup(context);
}
Expand Down Expand Up @@ -293,6 +398,8 @@ public Job createSubmittableJob(String[] args) throws IOException {
setupTime(conf, WALInputFormat.START_TIME_KEY);
setupTime(conf, WALInputFormat.END_TIME_KEY);
String inputDirs = args[0];
String walDir = new Path(inputDirs, "WALs").toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct. We are hard-coding the directories here.
We could introduce a new optional parameter that the user can specify if they have bulkloaded files for us to process.
For example:
hbase org.apache.hadoop.hbase.mapreduce.WALPlayer /backuplogdir oldTable1,oldTable2 newTable1,newTable2 -Dwal.bulk.backup.location=/bulkload-files-dir

String bulkLoadFilesDir = new Path(inputDirs, "bulk-load-files").toString();
String[] tables = args.length == 1 ? new String[] {} : args[1].split(",");
String[] tableMap;
if (args.length > 2) {
Expand All @@ -306,7 +413,8 @@ public Job createSubmittableJob(String[] args) throws IOException {
}
conf.setStrings(TABLES_KEY, tables);
conf.setStrings(TABLE_MAP_KEY, tableMap);
conf.set(FileInputFormat.INPUT_DIR, inputDirs);
conf.set(FileInputFormat.INPUT_DIR, walDir);
conf.set(BULKLOAD_BACKUP_LOCATION, bulkLoadFilesDir);
Job job = Job.getInstance(conf,
conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
job.setJarByClass(WALPlayer.class);
Expand Down