diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java b/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java index c5c902f304..f30b7a26ea 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java @@ -18,6 +18,7 @@ package net.openhft.chronicle.queue.impl.single; +import net.openhft.chronicle.assertions.AssertUtil; import net.openhft.chronicle.bytes.Bytes; import net.openhft.chronicle.bytes.MappedBytes; import net.openhft.chronicle.bytes.MappedBytesStore; @@ -42,6 +43,7 @@ import java.io.StreamCorruptedException; import java.text.ParseException; +import static net.openhft.chronicle.assertions.AssertUtil.SKIP_ASSERTIONS; import static net.openhft.chronicle.queue.TailerDirection.*; import static net.openhft.chronicle.queue.TailerState.*; import static net.openhft.chronicle.queue.impl.single.ScanResult.*; @@ -202,11 +204,17 @@ public String toString() { @Override public DocumentContext readingDocument(final boolean includeMetaData) { DocumentContext documentContext = readingDocument0(includeMetaData); - // this check was added after a strange behaviour seen by one client. It should be impossible. + checkReadRemainingPrecondition(documentContext); + return documentContext; + } + + /** + * Added in response to client bug please see 801. + */ + private void checkReadRemainingPrecondition(DocumentContext documentContext) { if (documentContext.wire() != null) if (documentContext.wire().bytes().readRemaining() >= 1 << 30) throw new AssertionError("readRemaining " + documentContext.wire().bytes().readRemaining()); - return documentContext; } DocumentContext readingDocument0(final boolean includeMetaData) { @@ -228,16 +236,14 @@ DocumentContext readingDocument0(final boolean includeMetaData) { if (tryAgain) next = next0(includeMetaData); - Wire wire = context.wire(); - if (wire != null && context.present(next)) { - Bytes bytes = wire.bytes(); - context.setStart(bytes.readPosition() - 4); - readingDocumentFound = true; - this.lastReadIndex = this.index(); - return context; + // An entry has been found, prepare the context and return it. + if (next && (context.wire() != null)) { + return prepareStoreTailerContext(); } - readingDocumentCycleNotFound(next); + if (state == CYCLE_NOT_FOUND) { + readingDocumentCycleNotFound(next); + } } catch (StreamCorruptedException e) { throw new IllegalStateException(e); @@ -251,6 +257,16 @@ DocumentContext readingDocument0(final boolean includeMetaData) { return INSTANCE; } + /** + * A document has been found - prepare the tailer context for return to the caller. + */ + private StoreTailerContext prepareStoreTailerContext() { + context.init(); + readingDocumentFound = true; + this.lastReadIndex = this.index(); + return context; + } + private void readingDocumentDBUE(DecoratedBufferUnderflowException e) { if (queue.isReadOnly()) { Jvm.warn().on(StoreTailer.class, @@ -262,12 +278,12 @@ private void readingDocumentDBUE(DecoratedBufferUnderflowException e) { } private void readingDocumentCycleNotFound(boolean next) { - RollCycle rollCycle = queue.rollCycle(); - if (state == CYCLE_NOT_FOUND && direction == FORWARD) { + if (direction == FORWARD) { + RollCycle rollCycle = queue.rollCycle(); int firstCycle = queue.firstCycle(); if (rollCycle.toCycle(index()) < firstCycle) toStart(); - } else if (!next && state == CYCLE_NOT_FOUND && cycle != queue.cycle()) { + } else if (!next && cycle != queue.cycle()) { // appenders have moved on, it's possible that linearScan is hitting EOF, which is ignored // since we can't find an entry at current index, indicate that we're at the end of a cycle state = TailerState.END_OF_CYCLE; @@ -452,15 +468,15 @@ private boolean inACycle2(boolean includeMetaData, Wire wire, Bytes bytes) th bytes.readLimitToCapacity(); switch (wire.readDataHeader(includeMetaData)) { + case DATA: + context.metaData(false); + break; case NONE: // no more polling - appender will always write (or recover) EOF return false; case META_DATA: context.metaData(true); break; - case DATA: - context.metaData(false); - break; case EOF: throw EOF_EXCEPTION; } @@ -1420,6 +1436,9 @@ protected void finalize() throws Throwable { } } + /** + * Supports a view to a document somewhere in the queue. + */ class StoreTailerContext extends BinaryReadDocumentContext { StoreTailerContext() { super(null); @@ -1446,8 +1465,14 @@ public void close() { super.close(); } - boolean present(final boolean present) { - return this.present = present; + /** + * Initialise the context with a new document ensuring that the start of this context points to the start of + * this document in the underlying buffer. + */ + public void init() { + present = true; + long bytesReadPosition = wire.bytes().readPosition(); + setStart(bytesReadPosition - Wires.SPB_HEADER_SIZE); } public void wire(@Nullable final AbstractWire wire) {