Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TASK-4372 - Hadoop ExportSnapshot #2333

Merged
merged 3 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,24 +122,52 @@ protected static ObjectMap getArgsMap(String[] args, int firstIdx, String... key
ObjectMap argsMap;
argsMap = new ObjectMap();
int i = firstIdx;
int offset;
while (i < args.length) {
String key = args[i];
while (key.startsWith("-")) {
key = key.substring(1);
String inputKey = args[i];
offset = 1;
String key;
String value;
if (inputKey.startsWith("--")) {
key = inputKey.substring(2);
value = safeArg(args, i + 1);
offset++; // one extra value read
} else if (inputKey.startsWith("-")) {
key = inputKey.substring(1, 2);
value = inputKey.substring(2);
if (value.isEmpty()) {
value = safeArg(args, i + 1);
offset++; // one extra value read
}
} else {
throw new IllegalArgumentException("Unknown argument '" + inputKey + "'");
}
if (value == null || value.startsWith("-")) {
value = "true";
offset--; // extra value discarded
}
if (!acceptedKeys.isEmpty()) {
if (!acceptedKeys.contains(key)) {
throw new IllegalArgumentException("Unknown argument '" + args[i] + "'");
throw new IllegalArgumentException("Unknown argument '" + inputKey + "'");
}
}
String value = safeArg(args, i + 1);
if (value == null || value.startsWith("-")) {
argsMap.put(key, true);
i += 1;

if (key.equals("D")) {
ObjectMap dynamic = (ObjectMap) argsMap.computeIfAbsent(key, k -> new ObjectMap());
String[] split = value.split("=", 2);
if (split.length != 2) {
throw new IllegalArgumentException("Expected '-D key=value'");
}
if (dynamic.put(split[0], split[1]) != null) {
throw new IllegalArgumentException("Duplicated argument '-D " + value + "'");
}
} else {
argsMap.put(key, value);
i += 2;
if (argsMap.put(key, value) != null) {
throw new IllegalArgumentException("Duplicated param '" + inputKey + "'");
}
}
i += offset;

}
return argsMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,21 @@
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.tools.ant.types.Commandline;
import org.opencb.commons.ProgressLogger;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.opencga.core.common.TimeUtils;
import org.opencb.opencga.core.config.storage.StorageConfiguration;
import org.opencb.opencga.storage.hadoop.utils.HBaseManager;
import org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageEngine;
import org.opencb.opencga.storage.hadoop.variant.executors.MRExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
Expand All @@ -36,6 +43,8 @@ public class HBaseMain extends AbstractMain {
public static final String SNAPSHOT_TABLE = "snapshot-table";
public static final String DELETE_SNAPSHOTS = "delete-snapshots";
public static final String CLONE_SNAPSHOTS = "clone-snapshots";
public static final String EXPORT_SNAPSHOTS = "export-snapshots";
public static final String EXEC = "exec";
public static final String DISABLE_TABLE = "disable-table";
public static final String DROP_TABLE = "drop-table";
public static final String ENABLE_TABLE = "enable-table";
Expand Down Expand Up @@ -137,6 +146,25 @@ public void run(String[] args) throws Exception {
);
break;
}
case EXPORT_SNAPSHOTS: {
ObjectMap argsMap = getArgsMap(args, 1, "dryRun", "snapshot", "copy-to", "copy-to-local", "copy-from", "target",
"mappers", "overwrite", "D");
exportSnapshot(null,
argsMap.getString("snapshot"),
argsMap.getString("copy-to"),
argsMap.getBoolean("copy-to-local"),
argsMap.getString("copy-from"),
argsMap.getString("target"),
argsMap.getString("mappers"),
argsMap.getBoolean("overwrite"),
argsMap.getBoolean("dryRun"),
argsMap);
break;
}
case EXEC: {
exec(getArg(args, 1), Arrays.asList(args).subList(2, args.length));
break;
}
case DISABLE_TABLE: {
ObjectMap argsMap = getArgsMap(args, 2, "dryRun");
disableTables(getArg(args, 1), argsMap.getBoolean("dryRun"));
Expand Down Expand Up @@ -186,6 +214,25 @@ public void run(String[] args) throws Exception {
+ "[--onExistingTables [fail|skip|drop] ]");
System.out.println(" Clone all snapshots into tables matching the regex. "
+ "Generated tables can have a table prefix change.");
System.out.println(" " + EXPORT_SNAPSHOTS + " \n"
+ " --dryRun <arg> Dry run.\n"
+ " --snapshot <arg> Snapshot to restore.\n"
+ " --copy-to <arg> Remote destination hdfs://\n"
+ " --copy-to-local Flag to indicate that must copy to local hbase.rootdir (for imports)\n"
+ " --copy-from <arg> Input folder hdfs:// (default hbase.rootdir)\n"
+ " --target <arg> Target name for the snapshot.\n"
// + " --no-checksum-verify Do not verify checksum, use name+length only.\n"
// + " --no-target-verify Do not verify the integrity of the exported snapshot.\n"
+ " --overwrite Rewrite the snapshot manifest if already exists.\n"
// + " --chuser <arg> Change the owner of the files to the specified one.\n"
// + " --chgroup <arg> Change the group of the files to the specified one.\n"
// + " --chmod <arg> Change the permission of the files to the specified one.\n"
// + " --bandwidth <arg> Limit bandwidth to this value in MB/second.\n"
+ " --mappers <arg> Number of mappers to use during the copy (mapreduce.job.maps).\n"
+ " -Dkey=value Other key-value fields");
System.out.println(" Export a given snapshot an external location.");
System.out.println(" " + EXEC + "[hadoop|yarn|hbase|hdfs]");
System.out.println(" Execute a MR job on the hadoop cluster. Use \"exec yarn jar ....\"");
System.out.println(" " + DISABLE_TABLE + " <table-name-regex> [--dryRun]");
System.out.println(" Disable all tables matching the regex.");
System.out.println(" " + DROP_TABLE + " <table-name-regex> [--dryRun]");
Expand All @@ -199,6 +246,85 @@ public void run(String[] args) throws Exception {

}

private void exec(String tool, List<String> args) throws Exception {
Path opencgaHome = Paths.get(System.getProperty("app.home"));
String storageConfigurationPath = opencgaHome.resolve("conf").resolve("storage-configuration.yml").toString();
StorageConfiguration storageConfiguration;
try (FileInputStream is = new FileInputStream(storageConfigurationPath)) {
storageConfiguration = StorageConfiguration.load(is);
}

HadoopVariantStorageEngine engine = new HadoopVariantStorageEngine();
engine.setConfiguration(storageConfiguration, HadoopVariantStorageEngine.STORAGE_ENGINE_ID, "");

MRExecutor mrExecutor = engine.getMRExecutor();
int exitError = mrExecutor.run(tool, args.toArray(new String[0]));
if (exitError != 0) {
throw new Exception("Exec failed with exit number '" + exitError + "'");
}
}

private void exportSnapshot(String storageConfigurationPath, String snapshot, String copyTo, boolean copyToLocal,
String copyFrom, String target,
String mappers, boolean overwrite, boolean dryRun, ObjectMap options) throws Exception {
if (storageConfigurationPath == null) {
Path opencgaHome = Paths.get(System.getProperty("app.home"));
storageConfigurationPath = opencgaHome.resolve("conf").resolve("storage-configuration.yml").toString();
}
StorageConfiguration storageConfiguration;
try (FileInputStream is = new FileInputStream(storageConfigurationPath)) {
storageConfiguration = StorageConfiguration.load(is);
}

List<String> args = new LinkedList<>();
args.add(org.apache.hadoop.hbase.snapshot.ExportSnapshot.class.getName());
for (Map.Entry<String, Object> entry : options.get("D", ObjectMap.class, new ObjectMap()).entrySet()) {
args.add("-D" + entry.getKey() + "=" + entry.getValue().toString());
}
args.add("--snapshot");
args.add(snapshot);

args.add("--copy-to");
if (StringUtils.isNotEmpty(copyTo)) {
args.add(copyTo);
if (copyToLocal) {
throw new Exception("Incompatible arguments `--copy-to` and `--copy-to-local`. Use only one of them");
}
} else if (copyToLocal) {
args.add(hBaseManager.getConf().get(HConstants.HBASE_DIR));
} else {
throw new Exception("Missing copy destination. Add either `--copy-to` or `--copy-to-local`");
}
if (StringUtils.isNotEmpty(copyFrom)) {
args.add("--copy-from");
args.add(copyFrom);
}
if (StringUtils.isNotEmpty(target)) {
args.add("--target");
args.add(target);
}
if (overwrite) {
args.add("--overwrite");
}
if (StringUtils.isNotEmpty(mappers)) {
args.add("--mappers");
args.add(mappers);
}

if (dryRun) {
System.out.println("hbase " + Commandline.toString(args.toArray(new String[0])));
} else {
HadoopVariantStorageEngine engine = new HadoopVariantStorageEngine();
engine.setConfiguration(storageConfiguration, HadoopVariantStorageEngine.STORAGE_ENGINE_ID, "");

MRExecutor mrExecutor = engine.getMRExecutor();
int exitError = mrExecutor.run("hbase", args.toArray(new String[0]));
if (exitError != 0) {
throw new Exception("ExportSnapshot failed with exit number '" + exitError + "'");
}
}
}

private void regionsPerTable(String tableNameStr) throws Exception {
// TableName tableName = getTable(tableNameStr);
// hBaseManager.act(tableName.getNameAsString(), (table, admin) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.io.IOException;
import java.net.URI;

import static org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageOptions.MR_HADOOP_BIN;
import static org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageOptions.INTERMEDIATE_HDFS_DIRECTORY;

/**
Expand Down Expand Up @@ -96,29 +95,11 @@ public URI preLoad(URI input, URI output) throws StorageEngineException {

protected void load(URI input, URI outdir, int studyId, int fileId) throws StorageEngineException {
URI vcfMeta = URI.create(VariantReaderUtils.getMetaFromTransformedFile(input.toString()));

String hadoopRoute = options.getString(MR_HADOOP_BIN.key(), MR_HADOOP_BIN.defaultValue());
String jar = MRExecutor.getJarWithDependencies(getOptions());

Class execClass = ArchiveDriver.class;
String executable = hadoopRoute + " jar " + jar + " " + execClass.getName();
String args = ArchiveDriver.buildCommandLineArgs(input, vcfMeta,
dbAdaptor.getVariantTable(), getArchiveTable(), studyId,
fileId, options);

long startTime = System.currentTimeMillis();
logger.info("------------------------------------------------------");
logger.info("Loading file {} into archive table '{}'", fileId, getArchiveTable());
logger.debug(executable + " " + args);
logger.info("------------------------------------------------------");
int exitValue = mrExecutor.run(executable, Commandline.translateCommandline(args));
logger.info("------------------------------------------------------");
logger.info("Exit value: {}", exitValue);
logger.info("Total time: {}s", (System.currentTimeMillis() - startTime) / 1000.0);
if (exitValue != 0) {
throw new StorageEngineException("Error loading file " + input + " into archive table \""
+ getArchiveTable() + "\"");
}
mrExecutor.run(ArchiveDriver.class, Commandline.translateCommandline(args),
"Loading file " + fileId + " into archive table '" + getArchiveTable() + "'");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
*
* @author Jacobo Coll &lt;[email protected]&gt;
*/
public class MRExecutorFactory {
public final class MRExecutorFactory {

private MRExecutorFactory() {
}

public static MRExecutor getMRExecutor(ObjectMap options) throws StorageEngineException {
MRExecutor mrExecutor;
Expand Down
Loading