diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java index a55e077c9..5108c19ab 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java @@ -69,12 +69,36 @@ public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccesso * @param fileCompression * the file compression * @param connectTimeoutMs - * the socket timeout in ms (default is 1000) + * the socket timeout in ms (default is 5000) * @throws Exception * the exception */ public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccessor recordAccessor, CompressionAlgorithm fileCompression, int connectTimeoutMs) throws Exception + { + this(dp,recordDef,recordAccessor,fileCompression,RowServiceOutputStream.DEFAULT_CONNECT_TIMEOUT_MILIS, RowServiceOutputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS); + } + + /** + * A remote file writer. + * + * @param dp + * the part of the file, name and location + * @param recordDef + * the record def + * @param recordAccessor + * the record accessor + * @param fileCompression + * the file compression + * @param connectTimeoutMs + * the socket timeout in ms (default is 5000) + * @param socketOpTimeoutMs + * the socket operation timeout in ms (default is 15000) + * @throws Exception + * the exception + */ + public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccessor recordAccessor, CompressionAlgorithm fileCompression, int connectTimeoutMs, int socketOpTimeoutMs) + throws Exception { this.recordDef = recordDef; this.dataPartition = dp; @@ -83,7 +107,7 @@ public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccesso this.outputStream = new RowServiceOutputStream(dataPartition.getCopyIP(0), dataPartition.getPort(), dataPartition.getUseSsl(), dataPartition.getFileAccessBlob(), this.recordDef, this.dataPartition.getThisPart(), this.dataPartition.getCopyPath(0), - fileCompression, connectTimeoutMs); + fileCompression, connectTimeoutMs, socketOpTimeoutMs); this.binaryRecordWriter = new BinaryRecordWriter(this.outputStream); this.binaryRecordWriter.initialize(this.recordAccessor); diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java index 38e283215..78e3e6500 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java @@ -76,7 +76,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde * @param recBuilder * the IRecordBuilder used to construct records * @param connectTimeout - * the connection timeout in seconds, -1 for default + * the connection timeout in milliseconds, -1 for default * @throws Exception * the exception */ @@ -95,7 +95,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde * @param recBuilder * the IRecordBuilder used to construct records * @param connectTimeout - * the connection timeout in seconds, -1 for default + * the connection timeout in milliseconds, -1 for default * @param limit * the maximum number of records to read from the provided data partition, -1 specifies no limit * @throws Exception @@ -116,7 +116,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde * @param recBuilder * the IRecordBuilder used to construct records * @param connectTimeout - * the connection timeout in seconds, -1 for default + * the connection timeout in milliseconds, -1 for default * @param limit * the maximum number of records to read from the provided data partition, -1 specifies no limit * @param createPrefetchThread @@ -141,7 +141,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde * @param recBuilder * the IRecordBuilder used to construct records * @param connectTimeout - * the connection timeout in seconds, -1 for default + * the connection timeout in milliseconds, -1 for default * @param limit * the maximum number of records to read from the provided data partition, -1 specifies no limit * @param createPrefetchThread @@ -154,6 +154,35 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde * general exception */ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo) throws Exception + { + this(dp, originalRD, recBuilder, connectTimeout, limit, true, readSizeKB, resumeInfo, RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS); + } + + /** + * A remote file reader that reads the part identified by the HpccPart object using the record definition provided. + * + * @param dp + * the part of the file, name and location + * @param originalRD + * the record defintion for the dataset + * @param recBuilder + * the IRecordBuilder used to construct records + * @param connectTimeout + * the connection timeout in milliseconds, -1 for default + * @param limit + * the maximum number of records to read from the provided data partition, -1 specifies no limit + * @param createPrefetchThread + * the input stream should create and manage prefetching on its own thread. If false prefetch needs to be called on another thread periodically. + * @param readSizeKB + * read request size in KB, -1 specifies use default value + * @param resumeInfo + * FileReadeResumeInfo data required to restart a read from a particular point in a file, null for reading from start + * @param socketOpTimeoutMs + * Socket (read / write) operation timeout in milliseconds + * @throws Exception + * general exception + */ + public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception { this.handlePrefetch = createPrefetchThread; this.originalRecordDef = originalRD; @@ -178,7 +207,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde if (resumeInfo == null) { - this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB); + this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs); this.binaryRecordReader = new BinaryRecordReader(this.inputStream); this.binaryRecordReader.initialize(this.recordBuilder); @@ -193,7 +222,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde restartInfo.streamPos = resumeInfo.inputStreamPos; restartInfo.tokenBin = resumeInfo.tokenBin; - this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, restartInfo); + this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, restartInfo, false, socketOpTimeoutMs); long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos; if (bytesToSkip < 0) { diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index 794c73b1c..795fd8478 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -108,7 +108,9 @@ public class RowServiceInputStream extends InputStream implements IProfilable private Socket sock; public static final int DEFAULT_CONNECT_TIMEOUT_MILIS = 5000; // 5 second connection timeout + public static final int DEFAULT_SOCKET_OP_TIMEOUT_MS = 15000; // 15 second timeout on read / write operations private int connectTimeout = DEFAULT_CONNECT_TIMEOUT_MILIS; + private int socketOpTimeoutMs = DEFAULT_SOCKET_OP_TIMEOUT_MS; private static final Charset HPCCCharSet = Charset.forName("ISO-8859-1"); private static final Logger log = LogManager.getLogger(RowServiceInputStream.class); @@ -293,6 +295,37 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co * general exception */ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching) throws Exception + { + this(dp, rd, pRd, connectTimeout, limit, createPrefetchThread, maxReadSizeInKB, restartInfo, isFetching, DEFAULT_SOCKET_OP_TIMEOUT_MS); + } + + /** + * A plain socket connect to a THOR node for remote read + * + * @param dp + * the data partition to read + * @param rd + * the JSON definition for the read input and output + * @param pRd + * the projected record definition + * @param connectTimeout + * the connection timeout in milliseconds + * @param limit + * the record limit to use for reading the dataset. -1 implies no limit + * @param createPrefetchThread + * Wether or not this inputstream should handle prefetching itself or if prefetch will be called externally + * @param maxReadSizeInKB + * max readsize in kilobytes + * @param restartInfo + * information used to restart a read from a particular stream position + * @param isFetching + * Will this input stream be used to serviced batched fetch requests + * @param socketOpTimeoutMS + * Socket (read / write) operation timeout in milliseconds + * @throws Exception + * general exception + */ + public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS) throws Exception { this.recordDefinition = rd; this.projectedRecordDefinition = pRd; @@ -325,6 +358,7 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co this.tokenBin = null; this.simulateFail = false; this.connectTimeout = connectTimeout; + this.socketOpTimeoutMs = socketOpTimeoutMS; this.recordLimit = limit; this.readBufferCapacity.set(this.maxReadSizeKB*1024*2); @@ -1530,6 +1564,9 @@ private void makeActive() throws HpccFileException sock.setPerformancePreferences(0, 1, 2); sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout); } + + this.sock.setSoTimeout(socketOpTimeoutMs); + log.debug("Connected: Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort()); } catch (java.net.UnknownHostException e) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java index 5e5b95ceb..f6ea288cd 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java @@ -40,6 +40,7 @@ public class RowServiceOutputStream extends OutputStream { private static final Logger log = LogManager.getLogger(RowServiceOutputStream.class); public static final int DEFAULT_CONNECT_TIMEOUT_MILIS = 5000; // 5 second connection timeout + public static final int DEFAULT_SOCKET_OP_TIMEOUT_MS = 15000; // 15 second timeout on reads private static int SCRATCH_BUFFER_LEN = 2048; private String rowServiceVersion = ""; @@ -50,6 +51,7 @@ public class RowServiceOutputStream extends OutputStream private int filePartIndex = -1; private String accessToken = null; private CompressionAlgorithm compressionAlgo = CompressionAlgorithm.NONE; + private int sockOpTimeoutMs = DEFAULT_SOCKET_OP_TIMEOUT_MS; private Socket socket = null; private DataInputStream dis = null; @@ -125,7 +127,6 @@ private static class RowServiceResponse this(ip,port,useSSL,accessToken,recordDef,filePartIndex,filePartPath,fileCompression, DEFAULT_CONNECT_TIMEOUT_MILIS); } - /** * Creates RowServiceOutputStream to be used to stream data to target dafilesrv on HPCC cluster. * @@ -146,12 +147,44 @@ private static class RowServiceResponse * @param fileCompression * the file compression * @param connectTimeoutMs - * the socket timeout in ms (default is 1000) + * the socket connect timeout in ms (default is 5000) * @throws Exception * the exception */ RowServiceOutputStream(String ip, int port, boolean useSSL, String accessToken, FieldDef recordDef, int filePartIndex, String filePartPath, CompressionAlgorithm fileCompression, int connectTimeoutMs) throws Exception + { + this(ip,port,useSSL,accessToken,recordDef,filePartIndex,filePartPath,fileCompression, connectTimeoutMs, DEFAULT_SOCKET_OP_TIMEOUT_MS); + } + + /** + * Creates RowServiceOutputStream to be used to stream data to target dafilesrv on HPCC cluster. + * + * @param ip + * the ip + * @param port + * the port + * @param useSSL + * the use SSL + * @param accessToken + * the access token + * @param recordDef + * the record def + * @param filePartIndex + * the file part index + * @param filePartPath + * the file part path + * @param fileCompression + * the file compression + * @param connectTimeoutMs + * the socket connect timeout in ms (default is 5000) + * @param socketOpTimeoutMS + * the socket operation(read/write) timeout in ms (default is 15000) + * @throws Exception + * the exception + */ + RowServiceOutputStream(String ip, int port, boolean useSSL, String accessToken, FieldDef recordDef, int filePartIndex, String filePartPath, + CompressionAlgorithm fileCompression, int connectTimeoutMs, int sockOpTimeoutMS) throws Exception { this.rowServiceIP = ip; this.rowServicePort = port; @@ -160,6 +193,7 @@ private static class RowServiceResponse this.filePath = filePartPath; this.accessToken = accessToken; this.compressionAlgo = fileCompression; + this.sockOpTimeoutMs = sockOpTimeoutMS; try { @@ -192,6 +226,7 @@ private static class RowServiceResponse this.socket.connect(new InetSocketAddress(rowServiceIP, rowServicePort), DEFAULT_CONNECT_TIMEOUT_MILIS); } + this.socket.setSoTimeout(sockOpTimeoutMs); this.dos = new DataOutputStream(socket.getOutputStream()); this.dis = new DataInputStream(socket.getInputStream());