Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC4J-577 Added Read Retry to HPCCRemoteFileReader #706

Merged
merged 1 commit into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class FileUtility
private static final int DEFAULT_SPLIT_TABLE_SIZE = 128;

private static final int NUM_DEFAULT_THREADS = 4;
static private final int DEFAULT_ACCESS_EXPIRY_SECONDS = 120;

private static class TaskContext
{
Expand Down Expand Up @@ -425,6 +426,7 @@ private static Options getReadTestOptions()
options.addOption("user", true, "Specifies the username used to connect. Defaults to null.");
options.addOption("pass", true, "Specifies the password used to connect. Defaults to null.");
options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations.");
options.addOption("access_expiry_seconds", true, "Access token expiration seconds.");

options.addOption(Option.builder("file_parts")
.argName("_file_parts")
Expand Down Expand Up @@ -633,11 +635,6 @@ private static String[] filterFilesByFormat(String[] srcFiles, FileFormat format

private static void executeTasks(Runnable[] tasks, int numThreads) throws Exception
{
if (tasks.length > numThreads)
{
numThreads = tasks.length;
}

int numTasksPerThread = tasks.length / numThreads;
int numResidualTasks = tasks.length % numThreads;

Expand Down Expand Up @@ -686,16 +683,15 @@ private static Runnable[] createReadTestTasks(DataPartition[] fileParts, FieldDe
{
final int taskIndex = i;
final DataPartition filePart = fileParts[taskIndex];
final HpccRemoteFileReader<HPCCRecord> filePartReader = new HpccRemoteFileReader<HPCCRecord>(filePart, recordDef, new HPCCRecordBuilder(recordDef));

tasks[taskIndex] = new Runnable()
{
HpccRemoteFileReader<HPCCRecord> fileReader = filePartReader;

public void run()
{
try
{
HpccRemoteFileReader<HPCCRecord> fileReader = new HpccRemoteFileReader<HPCCRecord>(filePart, recordDef, new HPCCRecordBuilder(recordDef));

while (fileReader.hasNext())
{
HPCCRecord record = fileReader.next();
Expand Down Expand Up @@ -1250,6 +1246,18 @@ private static void performReadTest(String[] args, TaskContext context)
+ numThreadsStr + ", must be an integer. Defaulting to: " + NUM_DEFAULT_THREADS + " threads.");
}

int expirySeconds = DEFAULT_ACCESS_EXPIRY_SECONDS;
String expirySecondsStr = cmd.getOptionValue("access_expiry_seconds", "" + expirySeconds);
try
{
expirySeconds = Integer.parseInt(expirySecondsStr);
}
catch(Exception e)
{
System.out.println("Invalid option value for access_expiry_seconds: "
+ numThreadsStr + ", must be an integer. Defaulting to: " + DEFAULT_ACCESS_EXPIRY_SECONDS + "s.");
}

String formatStr = cmd.getOptionValue("format");
if (formatStr == null)
{
Expand Down Expand Up @@ -1277,6 +1285,7 @@ private static void performReadTest(String[] args, TaskContext context)
try
{
file = new HPCCFile(datasetName, connString, user, pass);
file.setFileAccessExpirySecs(expirySeconds);
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,20 @@ public class HpccRemoteFileReader<T> implements Iterator<T>
private boolean handlePrefetch = true;
private boolean isClosed = false;
private boolean canReadNext = true;
private boolean createPrefetchThread = true;
private int retryCount = 0;
private int connectTimeout = 0;
private int readSizeKB = 0;
private int limit = -1;
private int maxReadRetries = DEFAULT_READ_RETRIES;
private int socketOpTimeoutMs = 0;
private long openTimeMs = 0;
private long recordsRead = 0;

public static final int NO_RECORD_LIMIT = -1;
public static final int DEFAULT_READ_SIZE_OPTION = -1;
public static final int DEFAULT_CONNECT_TIMEOUT_OPTION = -1;
public static final int DEFAULT_READ_RETRIES = 3;

public static class FileReadResumeInfo
{
Expand Down Expand Up @@ -189,18 +197,23 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
{
this.handlePrefetch = createPrefetchThread;
this.originalRecordDef = originalRD;
if (this.originalRecordDef == null)
{
throw new Exception("HpccRemoteFileReader: Original record definition is null.");
}
this.dataPartition = dp;
this.recordBuilder = recBuilder;
this.readSizeKB = readSizeKB;
this.limit = limit;
this.createPrefetchThread = createPrefetchThread;
this.socketOpTimeoutMs = socketOpTimeoutMs;

if (connectTimeout < 1)
{
connectTimeout = RowServiceInputStream.DEFAULT_CONNECT_TIMEOUT_MILIS;
}
this.connectTimeout = connectTimeout;

this.dataPartition = dp;
this.recordBuilder = recBuilder;
if (this.originalRecordDef == null)
{
throw new Exception("HpccRemoteFileReader: Provided original record definition is null, original record definition is required.");
}

FieldDef projectedRecordDefinition = recBuilder.getRecordDefinition();
if (projectedRecordDefinition == null)
Expand Down Expand Up @@ -246,6 +259,61 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
openTimeMs = System.currentTimeMillis();
}

private boolean retryRead()
{
if (retryCount < maxReadRetries)
{
log.info("Retrying read for " + this.dataPartition.toString() + " retry count: " + retryCount);
retryCount++;

FileReadResumeInfo resumeInfo = getFileReadResumeInfo();
RowServiceInputStream.RestartInformation restartInfo = new RowServiceInputStream.RestartInformation();
restartInfo.streamPos = resumeInfo.inputStreamPos;
restartInfo.tokenBin = resumeInfo.tokenBin;

try
{
this.inputStream.close();
}
catch (Exception e) {}

try
{
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef,
this.recordBuilder.getRecordDefinition(), this.connectTimeout, this.limit, this.createPrefetchThread,
this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs);
long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
if (bytesToSkip < 0)
{
throw new Exception("Unable to restart read stream, unexpected stream position in record reader.");
}
this.inputStream.skip(bytesToSkip);

this.binaryRecordReader = new BinaryRecordReader(this.inputStream, resumeInfo.recordReaderStreamPos);
this.binaryRecordReader.initialize(this.recordBuilder);
}
catch (Exception e)
{
log.error("Failed to retry read for " + this.dataPartition.toString() + " " + e.getMessage(), e);
return false;
drealeed marked this conversation as resolved.
Show resolved Hide resolved
}

return true;
}

return false;
}

/**
* Sets the maximum number of times to retry a read operation before failing.
*
* @param maxReadRetries maximum number of read retries
*/
public void setMaxReadRetries(int maxReadRetries)
{
this.maxReadRetries = maxReadRetries;
}

/**
* Returns the stream position within the file.
*
Expand Down Expand Up @@ -363,11 +431,16 @@ public boolean hasNext()
}
catch (HpccFileException e)
{
canReadNext = false;
log.error("Read failure for " + this.dataPartition.toString());
java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage());
exception.initCause(e);
throw exception;
if (!retryRead())
{
canReadNext = false;
log.error("Read failure for " + this.dataPartition.toString(), e);
java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage());
exception.initCause(e);
throw exception;
}

return hasNext();
}

return canReadNext;
Expand All @@ -393,10 +466,15 @@ public T next()
}
catch (HpccFileException e)
{
log.error("Read failure for " + this.dataPartition.toString() + " " + e.getMessage());
java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage());
exception.initCause(e);
throw exception;
if (!retryRead())
{
log.error("Read failure for " + this.dataPartition.toString() + " " + e.getMessage(), e);
java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage());
exception.initCause(e);
drealeed marked this conversation as resolved.
Show resolved Hide resolved
throw exception;
}

return next();
}

recordsRead++;
Expand Down
Loading