Skip to content

Commit

Permalink
HPCC4J-543 Empty compressed file test failure
Browse files Browse the repository at this point in the history
- Introduced socket operation timeouts

Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu committed Sep 25, 2023
1 parent 3db1e96 commit 4f488c7
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,36 @@ public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccesso
* @param fileCompression
* the file compression
* @param connectTimeoutMs
* the socket timeout in ms (default is 1000)
* the socket timeout in ms (default is 5000)
* @throws Exception
* the exception
*/
public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccessor recordAccessor, CompressionAlgorithm fileCompression, int connectTimeoutMs)
throws Exception
{
this(dp,recordDef,recordAccessor,fileCompression,RowServiceOutputStream.DEFAULT_CONNECT_TIMEOUT_MILIS, RowServiceOutputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS);
}

/**
* A remote file writer.
*
* @param dp
* the part of the file, name and location
* @param recordDef
* the record def
* @param recordAccessor
* the record accessor
* @param fileCompression
* the file compression
* @param connectTimeoutMs
* the socket timeout in ms (default is 5000)
* @param socketOpTimeoutMs
* the socket operation timeout in ms (default is 15000)
* @throws Exception
* the exception
*/
public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccessor recordAccessor, CompressionAlgorithm fileCompression, int connectTimeoutMs, int socketOpTimeoutMs)
throws Exception
{
this.recordDef = recordDef;
this.dataPartition = dp;
Expand All @@ -83,7 +107,7 @@ public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccesso

this.outputStream = new RowServiceOutputStream(dataPartition.getCopyIP(0), dataPartition.getPort(), dataPartition.getUseSsl(),
dataPartition.getFileAccessBlob(), this.recordDef, this.dataPartition.getThisPart(), this.dataPartition.getCopyPath(0),
fileCompression, connectTimeoutMs);
fileCompression, connectTimeoutMs, socketOpTimeoutMs);

this.binaryRecordWriter = new BinaryRecordWriter(this.outputStream);
this.binaryRecordWriter.initialize(this.recordAccessor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
* @param recBuilder
* the IRecordBuilder used to construct records
* @param connectTimeout
* the connection timeout in seconds, -1 for default
* the connection timeout in milliseconds, -1 for default
* @throws Exception
* the exception
*/
Expand All @@ -95,7 +95,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
* @param recBuilder
* the IRecordBuilder used to construct records
* @param connectTimeout
* the connection timeout in seconds, -1 for default
* the connection timeout in milliseconds, -1 for default
* @param limit
* the maximum number of records to read from the provided data partition, -1 specifies no limit
* @throws Exception
Expand All @@ -116,7 +116,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
* @param recBuilder
* the IRecordBuilder used to construct records
* @param connectTimeout
* the connection timeout in seconds, -1 for default
* the connection timeout in milliseconds, -1 for default
* @param limit
* the maximum number of records to read from the provided data partition, -1 specifies no limit
* @param createPrefetchThread
Expand All @@ -141,7 +141,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
* @param recBuilder
* the IRecordBuilder used to construct records
* @param connectTimeout
* the connection timeout in seconds, -1 for default
* the connection timeout in milliseconds, -1 for default
* @param limit
* the maximum number of records to read from the provided data partition, -1 specifies no limit
* @param createPrefetchThread
Expand All @@ -154,6 +154,35 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
* general exception
*/
public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo) throws Exception
{
this(dp, originalRD, recBuilder, connectTimeout, limit, true, readSizeKB, resumeInfo, RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS);
}

/**
* A remote file reader that reads the part identified by the HpccPart object using the record definition provided.
*
* @param dp
* the part of the file, name and location
* @param originalRD
* the record defintion for the dataset
* @param recBuilder
* the IRecordBuilder used to construct records
* @param connectTimeout
* the connection timeout in milliseconds, -1 for default
* @param limit
* the maximum number of records to read from the provided data partition, -1 specifies no limit
* @param createPrefetchThread
* the input stream should create and manage prefetching on its own thread. If false prefetch needs to be called on another thread periodically.
* @param readSizeKB
* read request size in KB, -1 specifies use default value
* @param resumeInfo
* FileReadeResumeInfo data required to restart a read from a particular point in a file, null for reading from start
* @param socketOpTimeoutMs
* Socket (read / write) operation timeout in milliseconds
* @throws Exception
* general exception
*/
public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception
{
this.handlePrefetch = createPrefetchThread;
this.originalRecordDef = originalRD;
Expand All @@ -178,7 +207,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde

if (resumeInfo == null)
{
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB);
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs);
this.binaryRecordReader = new BinaryRecordReader(this.inputStream);
this.binaryRecordReader.initialize(this.recordBuilder);

Expand All @@ -193,7 +222,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
restartInfo.streamPos = resumeInfo.inputStreamPos;
restartInfo.tokenBin = resumeInfo.tokenBin;

this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, restartInfo);
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, restartInfo, false, socketOpTimeoutMs);
long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
if (bytesToSkip < 0)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ public class RowServiceInputStream extends InputStream implements IProfilable

private Socket sock;
public static final int DEFAULT_CONNECT_TIMEOUT_MILIS = 5000; // 5 second connection timeout
public static final int DEFAULT_SOCKET_OP_TIMEOUT_MS = 15000; // 15 second timeout on read / write operations
private int connectTimeout = DEFAULT_CONNECT_TIMEOUT_MILIS;
private int socketOpTimeoutMs = DEFAULT_SOCKET_OP_TIMEOUT_MS;

private static final Charset HPCCCharSet = Charset.forName("ISO-8859-1");
private static final Logger log = LogManager.getLogger(RowServiceInputStream.class);
Expand Down Expand Up @@ -293,6 +295,37 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co
* general exception
*/
public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching) throws Exception
{
this(dp, rd, pRd, connectTimeout, limit, createPrefetchThread, maxReadSizeInKB, restartInfo, isFetching, DEFAULT_SOCKET_OP_TIMEOUT_MS);
}

/**
* A plain socket connect to a THOR node for remote read
*
* @param dp
* the data partition to read
* @param rd
* the JSON definition for the read input and output
* @param pRd
* the projected record definition
* @param connectTimeout
* the connection timeout in milliseconds
* @param limit
* the record limit to use for reading the dataset. -1 implies no limit
* @param createPrefetchThread
* Wether or not this inputstream should handle prefetching itself or if prefetch will be called externally
* @param maxReadSizeInKB
* max readsize in kilobytes
* @param restartInfo
* information used to restart a read from a particular stream position
* @param isFetching
* Will this input stream be used to serviced batched fetch requests
* @param socketOpTimeoutMS
* Socket (read / write) operation timeout in milliseconds
* @throws Exception
* general exception
*/
public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS) throws Exception
{
this.recordDefinition = rd;
this.projectedRecordDefinition = pRd;
Expand Down Expand Up @@ -325,6 +358,7 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co
this.tokenBin = null;
this.simulateFail = false;
this.connectTimeout = connectTimeout;
this.socketOpTimeoutMs = socketOpTimeoutMS;
this.recordLimit = limit;

this.readBufferCapacity.set(this.maxReadSizeKB*1024*2);
Expand Down Expand Up @@ -1530,6 +1564,9 @@ private void makeActive() throws HpccFileException
sock.setPerformancePreferences(0, 1, 2);
sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);
}

this.sock.setSoTimeout(socketOpTimeoutMs);

log.debug("Connected: Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort());
}
catch (java.net.UnknownHostException e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class RowServiceOutputStream extends OutputStream
{
private static final Logger log = LogManager.getLogger(RowServiceOutputStream.class);
public static final int DEFAULT_CONNECT_TIMEOUT_MILIS = 5000; // 5 second connection timeout
public static final int DEFAULT_SOCKET_OP_TIMEOUT_MS = 15000; // 15 second timeout on reads
private static int SCRATCH_BUFFER_LEN = 2048;

private String rowServiceVersion = "";
Expand All @@ -50,6 +51,7 @@ public class RowServiceOutputStream extends OutputStream
private int filePartIndex = -1;
private String accessToken = null;
private CompressionAlgorithm compressionAlgo = CompressionAlgorithm.NONE;
private int sockOpTimeoutMs = DEFAULT_SOCKET_OP_TIMEOUT_MS;

private Socket socket = null;
private DataInputStream dis = null;
Expand Down Expand Up @@ -125,7 +127,6 @@ private static class RowServiceResponse
this(ip,port,useSSL,accessToken,recordDef,filePartIndex,filePartPath,fileCompression, DEFAULT_CONNECT_TIMEOUT_MILIS);
}


/**
* Creates RowServiceOutputStream to be used to stream data to target dafilesrv on HPCC cluster.
*
Expand All @@ -146,12 +147,44 @@ private static class RowServiceResponse
* @param fileCompression
* the file compression
* @param connectTimeoutMs
* the socket timeout in ms (default is 1000)
* the socket connect timeout in ms (default is 5000)
* @throws Exception
* the exception
*/
RowServiceOutputStream(String ip, int port, boolean useSSL, String accessToken, FieldDef recordDef, int filePartIndex, String filePartPath,
CompressionAlgorithm fileCompression, int connectTimeoutMs) throws Exception
{
this(ip,port,useSSL,accessToken,recordDef,filePartIndex,filePartPath,fileCompression, connectTimeoutMs, DEFAULT_SOCKET_OP_TIMEOUT_MS);
}

/**
* Creates RowServiceOutputStream to be used to stream data to target dafilesrv on HPCC cluster.
*
* @param ip
* the ip
* @param port
* the port
* @param useSSL
* the use SSL
* @param accessToken
* the access token
* @param recordDef
* the record def
* @param filePartIndex
* the file part index
* @param filePartPath
* the file part path
* @param fileCompression
* the file compression
* @param connectTimeoutMs
* the socket connect timeout in ms (default is 5000)
* @param socketOpTimeoutMS
* the socket operation(read/write) timeout in ms (default is 15000)
* @throws Exception
* the exception
*/
RowServiceOutputStream(String ip, int port, boolean useSSL, String accessToken, FieldDef recordDef, int filePartIndex, String filePartPath,
CompressionAlgorithm fileCompression, int connectTimeoutMs, int sockOpTimeoutMS) throws Exception
{
this.rowServiceIP = ip;
this.rowServicePort = port;
Expand All @@ -160,6 +193,7 @@ private static class RowServiceResponse
this.filePath = filePartPath;
this.accessToken = accessToken;
this.compressionAlgo = fileCompression;
this.sockOpTimeoutMs = sockOpTimeoutMS;

try
{
Expand Down Expand Up @@ -192,6 +226,7 @@ private static class RowServiceResponse
this.socket.connect(new InetSocketAddress(rowServiceIP, rowServicePort), DEFAULT_CONNECT_TIMEOUT_MILIS);
}

this.socket.setSoTimeout(sockOpTimeoutMs);

this.dos = new DataOutputStream(socket.getOutputStream());
this.dis = new DataInputStream(socket.getInputStream());
Expand Down

0 comments on commit 4f488c7

Please sign in to comment.