Skip to content

Commit

Permalink
HPCC4J-635 DFSClient: FileUtility add additional testing / debug opti…
Browse files Browse the repository at this point in the history
…ons (#742)

Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu authored Aug 30, 2024
1 parent 0ced7ed commit be177ec
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public class FileUtility
private static final int NUM_DEFAULT_THREADS = 4;
static private final int DEFAULT_ACCESS_EXPIRY_SECONDS = 120;

static private final int DEFAULT_READ_REQUEST_SIZE = 4096;
static private final int DEFAULT_READ_REQUEST_DELAY = 0;

private static boolean otelInitialized = false;

private static class TaskContext
Expand Down Expand Up @@ -548,6 +551,8 @@ private static Options getReadTestOptions()
options.addOption("pass", true, "Specifies the password used to connect. Defaults to null.");
options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations.");
options.addOption("access_expiry_seconds", true, "Access token expiration seconds.");
options.addOption("read_request_size", true, "The size of the read requests in KB sent to the rowservice.");
options.addOption("read_request_delay", true, "The delay in MS between read requests sent to the rowservice.");

options.addOption(Option.builder("file_parts")
.argName("_file_parts")
Expand Down Expand Up @@ -801,7 +806,7 @@ public void run()
}
}

private static Runnable[] createReadTestTasks(DataPartition[] fileParts, FieldDef recordDef, TaskContext context) throws Exception
private static Runnable[] createReadTestTasks(DataPartition[] fileParts, FieldDef recordDef, TaskContext context, int readRequestSize, int readRequestDelay) throws Exception
{
Runnable[] tasks = new Runnable[fileParts.length];
for (int i = 0; i < tasks.length; i++)
Expand All @@ -818,7 +823,9 @@ public void run()
HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext();
readContext.parentSpan = context.getCurrentOperation().operationSpan;
readContext.originalRD = recordDef;
readContext.readSizeKB = readRequestSize;
HpccRemoteFileReader<HPCCRecord> fileReader = new HpccRemoteFileReader<HPCCRecord>(readContext, filePart, new HPCCRecordBuilder(recordDef));
fileReader.getInputStream().setReadRequestDelay(readRequestDelay);

while (fileReader.hasNext())
{
Expand Down Expand Up @@ -1405,6 +1412,30 @@ private static void performReadTest(String[] args, TaskContext context)
+ numThreadsStr + ", must be an integer. Defaulting to: " + DEFAULT_ACCESS_EXPIRY_SECONDS + "s.");
}

int readRequestSize = DEFAULT_READ_REQUEST_SIZE;
String readRequestSizeStr = cmd.getOptionValue("read_request_size", "" + readRequestSize);
try
{
readRequestSize = Integer.parseInt(readRequestSizeStr);
}
catch(Exception e)
{
System.out.println("Invalid option value for read_request_size: "
+ readRequestSizeStr + ", must be an integer. Defaulting to: " + DEFAULT_READ_REQUEST_SIZE + "KB.");
}

int readRequestDelay = DEFAULT_READ_REQUEST_DELAY;
String readRequestDelayStr = cmd.getOptionValue("read_request_delay", "" + readRequestDelay);
try
{
readRequestDelay = Integer.parseInt(readRequestDelayStr);
}
catch(Exception e)
{
System.out.println("Invalid option value for read_request_delay: "
+ readRequestDelayStr + ", must be an integer. Defaulting to: " + DEFAULT_READ_REQUEST_DELAY + "ms.");
}

String formatStr = cmd.getOptionValue("format");
if (formatStr == null)
{
Expand Down Expand Up @@ -1477,6 +1508,7 @@ private static void performReadTest(String[] args, TaskContext context)
context.addWarn("InvalidParams: Skipping invalid file part index: " + filePartsStrs[i]);
}
}
fileParts = filePartList.toArray(new DataPartition[0]);
}

Runnable[] tasks = null;
Expand All @@ -1485,7 +1517,7 @@ private static void performReadTest(String[] args, TaskContext context)
switch (format)
{
case THOR:
tasks = createReadTestTasks(fileParts, recordDef, context);
tasks = createReadTestTasks(fileParts, recordDef, context, readRequestSize, readRequestDelay);
break;
case PARQUET:
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public class RowServiceInputStream extends InputStream implements IProfilable
private long mutexWaitTimeNS = 0;
private long waitTimeNS = 0;
private long sleepTimeNS = 0;
private int readRequestDelayMS = 0;
private long fetchStartTimeNS = 0;
private long fetchTimeNS = 0;
private long fetchFinishTimeNS = 0;
Expand Down Expand Up @@ -668,6 +669,15 @@ public int getHandle()
return handle;
}

/**
* The delay in milliseconds between read requests. Primarily used for testing.
* @param sleepTimeMS
*/
public void setReadRequestDelay(int sleepTimeMS)
{
this.readRequestDelayMS = sleepTimeMS;
}

/**
* Simulate a handle failure and use the file token instead. The handle is set to an invalid value so the THOR node
* will indicate that the handle is unknown and request a otken.
Expand Down Expand Up @@ -1150,6 +1160,18 @@ private void finishFetch()

if (inFetchingMode == false)
{
if (readRequestDelayMS > 0)
{
try
{
Thread.sleep(readRequestDelayMS);
}
catch (InterruptedException e)
{
// We don't care about waking early
}
}

if (readSpan != null)
{
Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()),
Expand All @@ -1158,7 +1180,6 @@ private void finishFetch()
readSpan.addEvent("RowServiceInputStream.readRequest", attributes);
}


// Create the read ahead request
if (this.simulateFail) this.handle = -1;
String readAheadRequest = (this.forceTokenUse) ? this.makeTokenRequest() : this.makeHandleRequest();
Expand Down

0 comments on commit be177ec

Please sign in to comment.