diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java index 78e3e6500..ade51a57e 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java @@ -35,6 +35,8 @@ public class HpccRemoteFileReader implements Iterator private BinaryRecordReader binaryRecordReader; private IRecordBuilder recordBuilder = null; private boolean handlePrefetch = true; + private boolean isClosed = false; + private boolean canReadNext = true; private long openTimeMs = 0; private long recordsRead = 0; @@ -234,7 +236,6 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde this.binaryRecordReader.initialize(this.recordBuilder); } - log.info("HPCCRemoteFileReader: Opening file part: " + dataPartition.getThisPart() + (resumeInfo != null ? " resume position: " + resumeInfo.inputStreamPos : "" )); log.trace("Original record definition:\n" @@ -315,12 +316,18 @@ public String getRemoteReadMessages() */ public void prefetch() { - if (this.handlePrefetch) + if (handlePrefetch) { log.warn("Prefetch called on an HpccRemoteFileReader that has an internal prefetch thread."); return; } + if (isClosed) + { + log.warn("Prefetch called on an HpccRemoteFileReader that has been closed."); + return; + } + this.inputStream.prefetchData(); } @@ -332,10 +339,19 @@ public void prefetch() @Override public boolean hasNext() { - boolean rslt = false; + if (isClosed) + { + log.warn("hasNext() called on an HpccRemoteFileReader that has been closed."); + return false; + } + + // Keep track of whether we have said there is another record. + // This allows us to handle edge cases around close() being called between hasNext() and next() + canReadNext = false; + try { - rslt = this.binaryRecordReader.hasNext(); + canReadNext = this.binaryRecordReader.hasNext(); // Has next may not catch the prefetch exception if it occurs at the beginning of a read // This is due to InputStream.hasNext() being allowed to throw an IOException when closed. @@ -346,12 +362,14 @@ public boolean hasNext() } catch (HpccFileException e) { - rslt = false; + canReadNext = false; log.error("Read failure for " + this.dataPartition.toString()); - throw new java.util.NoSuchElementException("Fatal read error: " + e.getMessage()); + java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage()); + exception.initCause(e); + throw exception; } - return rslt; + return canReadNext; } /** @@ -362,6 +380,11 @@ public boolean hasNext() @Override public T next() { + if (isClosed && !canReadNext) + { + throw new java.util.NoSuchElementException("Fatal read error: Attempting to read next() from a closed file reader."); + } + Object rslt = null; try { @@ -370,10 +393,16 @@ public T next() catch (HpccFileException e) { log.error("Read failure for " + this.dataPartition.toString() + " " + e.getMessage()); - throw new java.util.NoSuchElementException("Fatal read error: " + e.getMessage()); + java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage()); + exception.initCause(e); + throw exception; } recordsRead++; + + // Reset this after each read so we can handle edge cases where close() was called between hasNext() / next() + canReadNext = false; + return (T) rslt; } @@ -385,8 +414,15 @@ public T next() */ public void close() throws Exception { + if (isClosed) + { + log.warn("Calling close on an already closed file reader for file part: " + this.dataPartition.toString()); + return; + } + report(); this.inputStream.close(); + isClosed = true; long closeTimeMs = System.currentTimeMillis(); double readTimeS = (closeTimeMs - openTimeMs) / 1000.0; diff --git a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java index 6eab980e1..44252cbdd 100644 --- a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java +++ b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java @@ -1179,6 +1179,77 @@ public void invalidSignatureTest() } } + @Test + public void earlyCloseTest() throws Exception + { + HPCCFile file = new HPCCFile(datasets[0], connString , hpccUser, hpccPass); + + DataPartition[] fileParts = file.getFileParts(); + if (fileParts == null || fileParts.length == 0) + { + Assert.fail("No file parts found"); + } + + FieldDef originalRD = file.getRecordDefinition(); + if (originalRD == null || originalRD.getNumDefs() == 0) + { + Assert.fail("Invalid or null record definition"); + } + + { + HPCCRecordBuilder recordBuilder = new HPCCRecordBuilder(file.getProjectedRecordDefinition()); + HpccRemoteFileReader fileReader = new HpccRemoteFileReader(fileParts[0], originalRD, recordBuilder); + + int expectedRecordCounts = 10; + int numRecords = 0; + while (fileReader.hasNext()) + { + try + { + fileReader.next(); + numRecords++; + } + catch (Exception e) + { + System.out.println("Error: " + e.getMessage()); + } + + if (numRecords == expectedRecordCounts) + { + fileReader.close(); + } + } + assertTrue("Expected record count: " + expectedRecordCounts + " Actual count: " + numRecords, numRecords == expectedRecordCounts); + } + + // Check that calling close() inbetween hasNext() & next() allows the current record to be read + { + HPCCRecordBuilder recordBuilder = new HPCCRecordBuilder(file.getProjectedRecordDefinition()); + HpccRemoteFileReader fileReader = new HpccRemoteFileReader(fileParts[0], originalRD, recordBuilder); + + int expectedRecordCounts = 11; + int numRecords = 0; + while (fileReader.hasNext()) + { + if (numRecords == expectedRecordCounts-1) + { + fileReader.close(); + } + + try + { + fileReader.next(); + numRecords++; + } + catch (Exception e) + { + System.out.println("Error: " + e.getMessage()); + } + } + assertTrue("Expected record count: " + expectedRecordCounts + " Actual count: " + numRecords, numRecords == expectedRecordCounts); + } + } + public List readFile(HPCCFile file, Integer connectTimeoutMillis, boolean shouldForceTimeout) throws Exception { return readFile(file, connectTimeoutMillis, shouldForceTimeout, false, BinaryRecordReader.NO_STRING_PROCESSING);