From 7b799e5dd8fa4e5089fd3076a148578ad069a56b Mon Sep 17 00:00:00 2001 From: James McMullan Date: Tue, 17 Sep 2024 13:24:04 -0400 Subject: [PATCH] HPCC4J-645 FileUtility filtering support and additional testing improvements - Added FileUtility options: ignore_tlk, read_retries,socket_time_seconds,filter Signed-off-by: James McMullan James.McMullan@lexisnexis.com --- .../hpccsystems/dfs/client/FileUtility.java | 132 ++++++++++++++++++ 1 file changed, 132 insertions(+) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java index 883cec3f8..e522b1e50 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java @@ -87,6 +87,7 @@ private static class TaskOperation public String currentOperationDesc = ""; public long operationStartNS = 0; + public List errorMessages = new ArrayList(); public List warnMessages = new ArrayList(); @@ -156,6 +157,9 @@ public JSONObject end(boolean success) private Stack operations = new Stack(); public List operationResults = new ArrayList(); + public int readRetries = HpccRemoteFileReader.DEFAULT_READ_RETRIES; + public int socketOpTimeoutMS = RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS; + public void setCurrentOperationSpanAttributes(Attributes attributes) { if (!hasCurrentOperation()) @@ -545,6 +549,46 @@ public void save(OutputStream outStream) throws IOException } } + private static int getReadRetries(CommandLine cmd) + { + int readRetries = HpccRemoteFileReader.DEFAULT_READ_RETRIES; + String retriesStr = cmd.getOptionValue("read_retries"); + if (retriesStr != null) + { + try + { + readRetries = Integer.parseInt(retriesStr); + } + catch(Exception e) + { + System.out.println("Invalid option value for read_retries: " + + retriesStr + ", must be an integer. Defaulting to: " + HpccRemoteFileReader.DEFAULT_READ_RETRIES + " retries."); + } + } + + return readRetries; + } + + private static int getSocketOpTimeoutMS(CommandLine cmd) + { + int socketOpTimeoutS = RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS / 1000; + String timeoutStr = cmd.getOptionValue("socket_timeout_seconds"); + if (timeoutStr != null) + { + try + { + socketOpTimeoutS = Integer.parseInt(timeoutStr); + } + catch(Exception e) + { + System.out.println("Invalid option value for socket_timeout: " + + timeoutStr + ", must be an integer. Defaulting to: " + socketOpTimeoutS + " seconds."); + } + } + + return socketOpTimeoutS * 1000; + } + private static Options getReadOptions() { Options options = new Options(); @@ -554,6 +598,10 @@ private static Options getReadOptions() options.addOption("format", true, "Specifies the output format to be used when writing files to disk. Defaults to Thor files."); options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations."); options.addOption("out", true, "Specifies the directory that the files should be written to."); + options.addOption("filter", true, "Specifies a filter to apply to the files read from the cluster."); + options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files."); + options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file."); + options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds."); options.addOption(Option.builder("read") .argName("files") @@ -576,6 +624,10 @@ private static Options getReadTestOptions() options.addOption("access_expiry_seconds", true, "Access token expiration seconds."); options.addOption("read_request_size", true, "The size of the read requests in KB sent to the rowservice."); options.addOption("read_request_delay", true, "The delay in MS between read requests sent to the rowservice."); + options.addOption("filter", true, "Specifies a filter to apply to the files read from the cluster."); + options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files."); + options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file."); + options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds."); options.addOption(Option.builder("file_parts") .argName("_file_parts") @@ -595,6 +647,10 @@ private static Options getCopyOptions() options.addRequiredOption("dest_cluster", "Destination Cluster Name", true, "Specifies the name of the cluster to write files back to."); options.addOption("dest_url", "Destination Cluster URL", true, "Specifies the URL of the ESP to write to."); options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations."); + options.addOption("filter", true, "Specifies a filter to apply to the files read from the cluster."); + options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files."); + options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file."); + options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds."); options.addOption(Option.builder("copy") .argName("files") @@ -616,6 +672,7 @@ private static Options getWriteOptions() options.addOption("dest_url", "Destination Cluster URL", true, "Specifies the URL of the ESP to write to."); options.addRequiredOption("dest_cluster", "Destination Cluster Name", true, "Specifies the name of the cluster to write files back to."); options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations."); + options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds."); options.addOption(Option.builder("write") .argName("files") @@ -847,8 +904,11 @@ public void run() readContext.parentSpan = context.getCurrentOperation().operationSpan; readContext.originalRD = recordDef; readContext.readSizeKB = readRequestSize; + readContext.socketOpTimeoutMS = context.socketOpTimeoutMS; + HpccRemoteFileReader fileReader = new HpccRemoteFileReader(readContext, filePart, new HPCCRecordBuilder(recordDef)); fileReader.getInputStream().setReadRequestDelay(readRequestDelay); + fileReader.setMaxReadRetries(context.readRetries); while (fileReader.hasNext()) { @@ -881,7 +941,10 @@ private static Runnable[] createReadToThorTasks(DataPartition[] fileParts, Split HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext(); readContext.parentSpan = context.getCurrentOperation().operationSpan; readContext.originalRD = recordDef; + readContext.socketOpTimeoutMS = context.socketOpTimeoutMS; + final HpccRemoteFileReader filePartReader = new HpccRemoteFileReader(readContext, fileParts[taskIndex], new HPCCRecordBuilder(recordDef)); + filePartReader.setMaxReadRetries(context.readRetries); final String filePath = outFilePaths[taskIndex]; final FileOutputStream outStream = new FileOutputStream(filePath); @@ -1004,7 +1067,9 @@ private static Runnable[] createNonRedistributingCopyTasks(HPCCFile file, DFUCre HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext(); readContext.parentSpan = context.getCurrentOperation().operationSpan; readContext.originalRD = recordDef; + readContext.socketOpTimeoutMS = context.socketOpTimeoutMS; filePartReaders[j] = new HpccRemoteFileReader(readContext, inFilePart, new HPCCRecordBuilder(recordDef)); + filePartReaders[j].setMaxReadRetries(context.readRetries); } incomingFilePartIndex += numIncomingParts; @@ -1142,6 +1207,7 @@ private static Runnable[] createWriteTasks(String[] srcFiles, SplitTable[] split writeContext.parentSpan = context.getCurrentOperation().operationSpan; writeContext.recordDef = recordDef; writeContext.fileCompression = CompressionAlgorithm.NONE; + writeContext.socketOpTimeoutMs = context.socketOpTimeoutMS; HPCCRemoteFileWriter filePartWriter = new HPCCRemoteFileWriter(writeContext, outFilePart, recordAccessor); tasks[taskIndex] = new Runnable() @@ -1253,6 +1319,9 @@ private static void performRead(String[] args, TaskContext context) formatStr = "THOR"; } + context.readRetries = getReadRetries(cmd); + context.socketOpTimeoutMS = getSocketOpTimeoutMS(cmd); + FileFormat format = FileFormat.THOR; switch (formatStr.toUpperCase()) { @@ -1267,6 +1336,9 @@ private static void performRead(String[] args, TaskContext context) return; } + String filter = cmd.getOptionValue("filter"); + boolean ignoreTLK = cmd.hasOption("ignore_tlk"); + String[] datasets = cmd.getOptionValues("read"); for (int i = 0; i < datasets.length; i++) { @@ -1286,6 +1358,22 @@ private static void performRead(String[] args, TaskContext context) return; } + file.setUseTLK(!ignoreTLK); + + if (filter != null) + { + try + { + file.setFilter(filter); + } + catch (Exception e) + { + String error = "Error while attempting to set filter for: '" + datasetName + "': " + e.getMessage(); + context.addError(error); + return; + } + } + DataPartition[] fileParts = null; FieldDef recordDef = null; try @@ -1463,6 +1551,9 @@ private static void performReadTest(String[] args, TaskContext context) + readRequestDelayStr + ", must be an integer. Defaulting to: " + DEFAULT_READ_REQUEST_DELAY + "ms."); } + context.readRetries = getReadRetries(cmd); + context.socketOpTimeoutMS = getSocketOpTimeoutMS(cmd); + String formatStr = cmd.getOptionValue("format"); if (formatStr == null) { @@ -1483,6 +1574,9 @@ private static void performReadTest(String[] args, TaskContext context) return; } + String filter = cmd.getOptionValue("filter"); + boolean ignoreTLK = cmd.hasOption("ignore_tlk"); + String datasetName = cmd.getOptionValue("read_test"); context.startOperation("FileUtility.ReadTest_" + datasetName); @@ -1500,6 +1594,22 @@ private static void performReadTest(String[] args, TaskContext context) return; } + file.setUseTLK(!ignoreTLK); + + if (filter != null) + { + try + { + file.setFilter(filter); + } + catch (Exception e) + { + String error = "Error while attempting to set filter for: '" + datasetName + "': " + e.getMessage(); + context.addError(error); + return; + } + } + DataPartition[] fileParts = null; FieldDef recordDef = null; try @@ -1666,6 +1776,12 @@ private static void performCopy(String[] args, TaskContext context) return; } + String filter = cmd.getOptionValue("filter"); + boolean ignoreTLK = cmd.hasOption("ignore_tlk"); + + context.readRetries = getReadRetries(cmd); + context.socketOpTimeoutMS = getSocketOpTimeoutMS(cmd); + for (int i = 0; i < copyPairs.length; i+=2) { String srcFile = copyPairs[i]; @@ -1686,6 +1802,22 @@ private static void performCopy(String[] args, TaskContext context) return; } + file.setUseTLK(!ignoreTLK); + + if (filter != null) + { + try + { + file.setFilter(filter); + } + catch (Exception e) + { + String error = "Error while attempting to set filter for: '" + srcFile + "': " + e.getMessage(); + context.addError(error); + return; + } + } + DataPartition[] srcFileParts = null; try {