Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.8.x'
Browse files Browse the repository at this point in the history
Signed-off-by: Gordon Smith <[email protected]>

# Conflicts:
#	commons-hpcc/pom.xml
#	dfsclient/pom.xml
#	pom.xml
#	wsclient/pom.xml
  • Loading branch information
GordonSmith committed Sep 5, 2024
2 parents 4501cc5 + 8936914 commit 3789bdb
Show file tree
Hide file tree
Showing 5 changed files with 496 additions and 163 deletions.
81 changes: 71 additions & 10 deletions dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.io.BufferedInputStream;
import java.io.Console;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
Expand Down Expand Up @@ -74,6 +75,9 @@ public class FileUtility
private static final int NUM_DEFAULT_THREADS = 4;
static private final int DEFAULT_ACCESS_EXPIRY_SECONDS = 120;

static private final int DEFAULT_READ_REQUEST_SIZE = 4096;
static private final int DEFAULT_READ_REQUEST_DELAY = 0;

private static boolean otelInitialized = false;

private static class TaskContext
Expand Down Expand Up @@ -322,6 +326,28 @@ public JSONArray generateResultsMessage()
}
};

private static String[] getCredentials(CommandLine cmd)
{
Console console = System.console();

String user = cmd.getOptionValue("user");
boolean userIsEmpty = user == null || user.isEmpty();
if (userIsEmpty)
{
user = new String(console.readLine("Enter username: "));
userIsEmpty = user == null || user.isEmpty();
}

String pass = cmd.getOptionValue("pass");
boolean passIsEmpty = pass == null || pass.isEmpty();
if (!userIsEmpty && passIsEmpty)
{
pass = new String(console.readPassword("Enter password for " + user + ": "));
}

return new String[] {user, pass};
}

private static enum FileFormat
{
THOR,
Expand Down Expand Up @@ -548,6 +574,8 @@ private static Options getReadTestOptions()
options.addOption("pass", true, "Specifies the password used to connect. Defaults to null.");
options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations.");
options.addOption("access_expiry_seconds", true, "Access token expiration seconds.");
options.addOption("read_request_size", true, "The size of the read requests in KB sent to the rowservice.");
options.addOption("read_request_delay", true, "The delay in MS between read requests sent to the rowservice.");

options.addOption(Option.builder("file_parts")
.argName("_file_parts")
Expand Down Expand Up @@ -801,7 +829,7 @@ public void run()
}
}

private static Runnable[] createReadTestTasks(DataPartition[] fileParts, FieldDef recordDef, TaskContext context) throws Exception
private static Runnable[] createReadTestTasks(DataPartition[] fileParts, FieldDef recordDef, TaskContext context, int readRequestSize, int readRequestDelay) throws Exception
{
Runnable[] tasks = new Runnable[fileParts.length];
for (int i = 0; i < tasks.length; i++)
Expand All @@ -818,7 +846,9 @@ public void run()
HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext();
readContext.parentSpan = context.getCurrentOperation().operationSpan;
readContext.originalRD = recordDef;
readContext.readSizeKB = readRequestSize;
HpccRemoteFileReader<HPCCRecord> fileReader = new HpccRemoteFileReader<HPCCRecord>(readContext, filePart, new HPCCRecordBuilder(recordDef));
fileReader.getInputStream().setReadRequestDelay(readRequestDelay);

while (fileReader.hasNext())
{
Expand Down Expand Up @@ -1198,8 +1228,10 @@ private static void performRead(String[] args, TaskContext context)
}

String connString = cmd.getOptionValue("url");
String user = cmd.getOptionValue("user");
String pass = cmd.getOptionValue("pass");

String[] creds = getCredentials(cmd);
String user = creds[0];
String pass = creds[1];

String outputPath = cmd.getOptionValue("out",".");

Expand Down Expand Up @@ -1376,8 +1408,10 @@ private static void performReadTest(String[] args, TaskContext context)
}

String connString = cmd.getOptionValue("url");
String user = cmd.getOptionValue("user");
String pass = cmd.getOptionValue("pass");

String[] creds = getCredentials(cmd);
String user = creds[0];
String pass = creds[1];

String outputPath = cmd.getOptionValue("out",".");

Expand Down Expand Up @@ -1405,6 +1439,30 @@ private static void performReadTest(String[] args, TaskContext context)
+ numThreadsStr + ", must be an integer. Defaulting to: " + DEFAULT_ACCESS_EXPIRY_SECONDS + "s.");
}

int readRequestSize = DEFAULT_READ_REQUEST_SIZE;
String readRequestSizeStr = cmd.getOptionValue("read_request_size", "" + readRequestSize);
try
{
readRequestSize = Integer.parseInt(readRequestSizeStr);
}
catch(Exception e)
{
System.out.println("Invalid option value for read_request_size: "
+ readRequestSizeStr + ", must be an integer. Defaulting to: " + DEFAULT_READ_REQUEST_SIZE + "KB.");
}

int readRequestDelay = DEFAULT_READ_REQUEST_DELAY;
String readRequestDelayStr = cmd.getOptionValue("read_request_delay", "" + readRequestDelay);
try
{
readRequestDelay = Integer.parseInt(readRequestDelayStr);
}
catch(Exception e)
{
System.out.println("Invalid option value for read_request_delay: "
+ readRequestDelayStr + ", must be an integer. Defaulting to: " + DEFAULT_READ_REQUEST_DELAY + "ms.");
}

String formatStr = cmd.getOptionValue("format");
if (formatStr == null)
{
Expand Down Expand Up @@ -1477,6 +1535,7 @@ private static void performReadTest(String[] args, TaskContext context)
context.addWarn("InvalidParams: Skipping invalid file part index: " + filePartsStrs[i]);
}
}
fileParts = filePartList.toArray(new DataPartition[0]);
}

Runnable[] tasks = null;
Expand All @@ -1485,7 +1544,7 @@ private static void performReadTest(String[] args, TaskContext context)
switch (format)
{
case THOR:
tasks = createReadTestTasks(fileParts, recordDef, context);
tasks = createReadTestTasks(fileParts, recordDef, context, readRequestSize, readRequestDelay);
break;
case PARQUET:
default:
Expand Down Expand Up @@ -1560,8 +1619,9 @@ private static void performCopy(String[] args, TaskContext context)
+ numThreadsStr + ", must be an integer. Defaulting to: " + NUM_DEFAULT_THREADS + " threads.");
}

String user = cmd.getOptionValue("user");
String pass = cmd.getOptionValue("pass");
String[] creds = getCredentials(cmd);
String user = creds[0];
String pass = creds[1];

String destClusterName = cmd.getOptionValue("dest_cluster");

Expand Down Expand Up @@ -1741,8 +1801,9 @@ private static void performWrite(String[] args, TaskContext context)
+ numThreadsStr + ", must be an integer. Defaulting to: " + NUM_DEFAULT_THREADS + " threads.");
}

String user = cmd.getOptionValue("user");
String pass = cmd.getOptionValue("pass");
String[] creds = getCredentials(cmd);
String user = creds[0];
String pass = creds[1];

String destClusterName = cmd.getOptionValue("dest_cluster");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.semconv.ServerAttributes;

import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -142,8 +143,9 @@ public HPCCRemoteFileWriter(FileWriteContext ctx, DataPartition dp, IRecordAcces

this.recordAccessor = recordAccessor;

this.writeSpanName = "HPCCRemoteFileWriter.RowService/Write_" + dp.getFileName() + "_" + dp.getThisPart();
this.writeSpanName = "HPCCRemoteFileWriter/Write_" + dp.getFileName() + "_" + dp.getThisPart();
this.writeSpan = Utils.createChildSpan(context.parentSpan, writeSpanName);
this.writeSpan.setStatus(StatusCode.OK);

String primaryIP = dp.getCopyIP(0);
String secondaryIP = "";
Expand All @@ -154,7 +156,7 @@ public HPCCRemoteFileWriter(FileWriteContext ctx, DataPartition dp, IRecordAcces

Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP,
AttributeKey.stringKey("server.1.address"), secondaryIP,
ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort()));
AttributeKey.stringKey("server.port"), Integer.toString(dp.getPort()));
writeSpan.setAllAttributes(attributes);

this.outputStream = new RowServiceOutputStream(dataPartition.getCopyIP(0), dataPartition.getPort(), dataPartition.getUseSsl(),
Expand All @@ -181,8 +183,20 @@ public HPCCRemoteFileWriter(FileWriteContext ctx, DataPartition dp, IRecordAcces
*/
public void writeRecord(T record) throws Exception
{
this.binaryRecordWriter.writeRecord(record);
this.recordsWritten++;
try
{
this.binaryRecordWriter.writeRecord(record);
this.recordsWritten++;
}
catch (Exception e)
{
log.error("HPCCRemoteFileWriter: Error writing record: " + e.getMessage());
this.writeSpan.recordException(e);
this.writeSpan.setStatus(StatusCode.ERROR);
this.writeSpan.end();

throw e;
}
}

/**
Expand All @@ -197,7 +211,20 @@ public void writeRecords(Iterator<T> it) throws Exception
{
while (it.hasNext())
{
this.binaryRecordWriter.writeRecord(it.next());
try
{
this.binaryRecordWriter.writeRecord(it.next());
this.recordsWritten++;
}
catch (Exception e)
{
log.error("HPCCRemoteFileWriter: Error writing record: " + e.getMessage());
this.writeSpan.recordException(e);
this.writeSpan.setStatus(StatusCode.ERROR);
this.writeSpan.end();

throw e;
}
this.recordsWritten++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.semconv.ServerAttributes;

import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -73,6 +74,7 @@ public static class FileReadContext
public int recordReadLimit = -1;
public boolean createPrefetchThread = true;
public int readSizeKB = -1;
public int readRequestSpanBatchSize = -1; // The number of read requests before creating a new span
public Span parentSpan = null;
};

Expand Down Expand Up @@ -266,21 +268,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
this.dataPartition = dp;
this.recordBuilder = recBuilder;

String readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dataPartition.getFileName() + "_" + dataPartition.getThisPart();
this.readSpan = Utils.createChildSpan(context.parentSpan, readSpanName);

String primaryIP = dp.getCopyIP(0);
String secondaryIP = "";
if (dp.getCopyCount() > 1)
{
secondaryIP = dp.getCopyIP(1);
}

Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP,
AttributeKey.stringKey("server.1.address"), secondaryIP,
ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort()),
AttributeKey.longKey("read.size"), Long.valueOf(context.readSizeKB*1000));
this.readSpan.setAllAttributes(attributes);
this.readSpan = createReadSpan(ctx, dp);

if (context.originalRD == null)
{
Expand All @@ -304,6 +292,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout,
context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, null,
false, context.socketOpTimeoutMS, this.readSpan);
this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize);
this.binaryRecordReader = new BinaryRecordReader(this.inputStream);
this.binaryRecordReader.initialize(this.recordBuilder);

Expand All @@ -321,13 +310,15 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout,
context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, restartInfo,
false, context.socketOpTimeoutMS, this.readSpan);
this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize);

long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
if (bytesToSkip < 0)
{
Exception e = new Exception("Unable to restart read stream, unexpected stream position in record reader.");
this.readSpan.recordException(e);
this.readSpan.end();
throw e;
}
this.inputStream.skip(bytesToSkip);

Expand All @@ -344,6 +335,35 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
openTimeMs = System.currentTimeMillis();
}

private static Span createReadSpan(FileReadContext context, DataPartition dp)
{
String readSpanName = "HPCCRemoteFileReader/Read_" + dp.getFileName() + "_" + dp.getThisPart();
Span readSpan = Utils.createChildSpan(context.parentSpan, readSpanName);
readSpan.setStatus(StatusCode.OK);

String primaryIP = dp.getCopyIP(0);
String secondaryIP = "";
if (dp.getCopyCount() > 1)
{
secondaryIP = dp.getCopyIP(1);
}

long readSize = context.readSizeKB;
if (readSize < 0)
{
readSize = RowServiceInputStream.DEFAULT_MAX_READ_SIZE_KB;
}
readSize *= 1000;

Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP,
AttributeKey.stringKey("server.1.address"), secondaryIP,
AttributeKey.stringKey("server.port"), Integer.toString(dp.getPort()),
AttributeKey.longKey("read.size"), Long.valueOf(readSize));
readSpan.setAllAttributes(attributes);

return readSpan;
}

private boolean retryRead()
{
if (retryCount < maxReadRetries)
Expand All @@ -364,20 +384,12 @@ private boolean retryRead()

try
{
String readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dataPartition.getFileName() + "_" + dataPartition.getThisPart();
if (context.parentSpan != null)
{
this.readSpan = Utils.createChildSpan(context.parentSpan, readSpanName);
}
else
{
this.readSpan = Utils.createSpan(readSpanName);
}
this.readSpan = createReadSpan(context, dataPartition);

this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD,this.recordBuilder.getRecordDefinition(),
context.connectTimeout, context.recordReadLimit, context.createPrefetchThread,
context.readSizeKB, restartInfo, false, context.socketOpTimeoutMS, this.readSpan);

this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize);
long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
if (bytesToSkip < 0)
{
Expand All @@ -391,6 +403,7 @@ private boolean retryRead()
catch (Exception e)
{
this.readSpan.recordException(e);
this.readSpan.setStatus(StatusCode.ERROR);
this.readSpan.end();
log.error("Failed to retry read for " + this.dataPartition.toString() + " " + e.getMessage(), e);
return false;
Expand Down Expand Up @@ -529,6 +542,10 @@ public boolean hasNext()
}
catch (HpccFileException e)
{
this.readSpan.recordException(e);
this.readSpan.setStatus(StatusCode.ERROR);
this.readSpan.end();

if (!retryRead())
{
canReadNext = false;
Expand Down Expand Up @@ -564,6 +581,10 @@ public T next()
}
catch (HpccFileException e)
{
this.readSpan.recordException(e);
this.readSpan.setStatus(StatusCode.ERROR);
this.readSpan.end();

if (!retryRead())
{
log.error("Read failure for " + this.dataPartition.toString() + " " + e.getMessage(), e);
Expand Down
Loading

0 comments on commit 3789bdb

Please sign in to comment.