From feb19f520eb7caf0c2e10654b0e08b6455aaa73c Mon Sep 17 00:00:00 2001 From: James McMullan Date: Wed, 8 Nov 2023 08:48:07 -0500 Subject: [PATCH] HPCC4J-552 File Utility Read Invididual File Parts (#658) - Added support for reading a file without writing it to disk - Added support for test reading individual file parts Signed-off-by: James McMullan James.McMullan@lexisnexis.com Signed-off-by: James McMullan James.McMullan@lexisnexis.com --- .../hpccsystems/dfs/client/FileUtility.java | 220 ++++++++++++++++++ .../dfs/client/FileUtilityTest.java | 12 + 2 files changed, 232 insertions(+) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java index 3da02c805..2cf0e3b25 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java @@ -417,6 +417,24 @@ private static Options getReadOptions() return options; } + private static Options getReadTestOptions() + { + Options options = new Options(); + options.addRequiredOption("read_test", "Read test", true, "Specifies the file that should be read."); + options.addRequiredOption("url", "Source Cluster URL", true, "Specifies the URL of the ESP to connect to."); + options.addOption("user", true, "Specifies the username used to connect. Defaults to null."); + options.addOption("pass", true, "Specifies the password used to connect. Defaults to null."); + options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations."); + + options.addOption(Option.builder("file_parts") + .argName("_file_parts") + .hasArgs() + .valueSeparator(',') + .desc("Specifies the file parts that should be read. Defaults to all file parts.") + .build()); + return options; + } + private static Options getCopyOptions() { Options options = new Options(); @@ -463,6 +481,7 @@ private static Options getTopLevelOptions() { Options options = new Options(); options.addOption("read", "Reads the specified file(s) and writes a copy of the files to the local directory."); + options.addOption("read_test", "Reads the specified file and/or particular file parts without writing it locally."); options.addOption("copy", "Copies the specified remote source file to the specified remote destination cluster / file."); options.addOption("write", "Writes the specified local source file to the specified remote destination cluster / file."); @@ -660,6 +679,44 @@ public void run() } } + private static Runnable[] createReadTestTasks(DataPartition[] fileParts, FieldDef recordDef, TaskContext context) throws Exception + { + Runnable[] tasks = new Runnable[fileParts.length]; + for (int i = 0; i < tasks.length; i++) + { + final int taskIndex = i; + final DataPartition filePart = fileParts[taskIndex]; + final HpccRemoteFileReader filePartReader = new HpccRemoteFileReader(filePart, recordDef, new HPCCRecordBuilder(recordDef)); + + tasks[taskIndex] = new Runnable() + { + HpccRemoteFileReader fileReader = filePartReader; + + public void run() + { + try + { + while (fileReader.hasNext()) + { + HPCCRecord record = fileReader.next(); + context.recordsRead.incrementAndGet(); + } + + fileReader.close(); + context.bytesRead.addAndGet(fileReader.getStreamPosition()); + } + catch (Exception e) + { + context.addError("Error while reading file part index: '" + filePart.getThisPart() + " Error message: " + e.getMessage()); + return; + } + } + }; + } + + return tasks; + } + private static Runnable[] createReadToThorTasks(DataPartition[] fileParts, SplitTable[] splitTables, String[] outFilePaths, FieldDef recordDef, TaskContext context) throws Exception { Runnable[] tasks = new Runnable[fileParts.length]; @@ -1159,6 +1216,165 @@ private static void performRead(String[] args, TaskContext context) } } + private static void performReadTest(String[] args, TaskContext context) + { + Options options = getReadTestOptions(); + CommandLineParser parser = new DefaultParser(); + + CommandLine cmd = null; + try + { + cmd = parser.parse(options, args); + } + catch (ParseException e) + { + System.out.println("Error parsing commandline options:\n" + e.getMessage()); + return; + } + + String connString = cmd.getOptionValue("url"); + String user = cmd.getOptionValue("user"); + String pass = cmd.getOptionValue("pass"); + + String outputPath = cmd.getOptionValue("out","."); + + int numThreads = NUM_DEFAULT_THREADS; + String numThreadsStr = cmd.getOptionValue("num_threads", "" + numThreads); + try + { + numThreads = Integer.parseInt(numThreadsStr); + } + catch(Exception e) + { + System.out.println("Invalid option value for num_threads: " + + numThreadsStr + ", must be an integer. Defaulting to: " + NUM_DEFAULT_THREADS + " threads."); + } + + String formatStr = cmd.getOptionValue("format"); + if (formatStr == null) + { + formatStr = "THOR"; + } + + FileFormat format = FileFormat.THOR; + switch (formatStr.toUpperCase()) + { + case "THOR": + format = FileFormat.THOR; + break; + case "PARQUET": + format = FileFormat.PARQUET; + break; + default: + System.out.println("Error unsupported format specified: " + format); + return; + } + + String datasetName = cmd.getOptionValue("read_test"); + context.startOperation("Read Test " + datasetName); + + HPCCFile file = null; + try + { + file = new HPCCFile(datasetName, connString, user, pass); + } + catch (Exception e) + { + System.out.println("Error while attempting to open file: '" + datasetName + "': " + e.getMessage()); + return; + } + + DataPartition[] fileParts = null; + FieldDef recordDef = null; + try + { + fileParts = file.getFileParts(); + recordDef = file.getRecordDefinition(); + } + catch (Exception e) + { + System.out.println("Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage()); + return; + } + + String[] filePartsStrs = cmd.getOptionValues("file_parts"); + if (filePartsStrs != null && filePartsStrs.length > 0) + { + ArrayList filePartList = new ArrayList(); + for (int i = 0; i < filePartsStrs.length; i++) + { + try + { + int filePartIndex = Integer.parseInt(filePartsStrs[i]) - 1; + if (filePartIndex < 0 || filePartIndex >= fileParts.length) + { + System.out.println("Skipping invalid file part index: " + filePartsStrs[i] + + " outside of range: [0," + fileParts.length + "]"); + continue; + } + + filePartList.add(fileParts[filePartIndex]); + } + catch (NumberFormatException e) + { + System.out.println("Skipping invalid file part index: " + filePartsStrs[i]); + } + } + } + + Runnable[] tasks = null; + try + { + switch (format) + { + case THOR: + tasks = createReadTestTasks(fileParts, recordDef, context); + break; + case PARQUET: + default: + throw new Exception("Error unsupported format specified: " + format); + }; + } + catch (Exception e) + { + context.addError("Error while attempting to create read tasks for file: '" + datasetName + "': " + e.getMessage()); + return; + } + + try + { + executeTasks(tasks, numThreads); + } + catch (Exception e) + { + context.addError("Error while attempting to execute read tasks for file: '" + datasetName + "': " + e.getMessage()); + return; + } + + if (context.hasError()) + { + return; + } + + try + { + String fileName = file.getFileName().replace(":","_"); + String filePath = outputPath + File.separator + fileName + ".meta"; + FileOutputStream metaFile = new FileOutputStream(filePath); + + String metaStr = RecordDefinitionTranslator.toJsonRecord(file.getRecordDefinition()).toString(); + metaFile.write(metaStr.getBytes()); + metaFile.close(); + } + catch (Exception e) + { + context.addError("Error while attempting to write meta-data for file: '" + datasetName + "': " + e.getMessage()); + return; + } + + context.endOperation(); + } + private static void performCopy(String[] args, TaskContext context) { Options options = getCopyOptions(); @@ -1576,6 +1792,10 @@ public static JSONArray run(String[] args) { performRead(args, context); } + else if (cmd.hasOption("read_test")) + { + performReadTest(args, context); + } else if (cmd.hasOption("copy")) { performCopy(args, context); diff --git a/dfsclient/src/test/java/org/hpccsystems/dfs/client/FileUtilityTest.java b/dfsclient/src/test/java/org/hpccsystems/dfs/client/FileUtilityTest.java index 84ea5b022..e89b87483 100644 --- a/dfsclient/src/test/java/org/hpccsystems/dfs/client/FileUtilityTest.java +++ b/dfsclient/src/test/java/org/hpccsystems/dfs/client/FileUtilityTest.java @@ -56,6 +56,18 @@ public void thorFileTests() Assert.assertTrue("FileUtility operation didn't complete successfully", success); } + { + String readArgs[] = {"-read_test", "benchmark::integer::20kb", "-url", this.connString, + "-user", this.hpccUser, "-pass", this.hpccPass, "-file_parts", "1" }; + + JSONArray results = FileUtility.run(readArgs); + JSONObject result = results.optJSONObject(0); + Assert.assertNotNull("FileUtility result should not be null.", result); + + boolean success = result.optBoolean("successful",false); + Assert.assertTrue("FileUtility operation didn't complete successfully", success); + } + { String copyArgs[] = {"-copy", "benchmark::integer::20kb benchmark::integer::20kb-copy", "-url", this.connString, "-dest_url", this.connString,