diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriter.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriter.java index 970cc0b141e..faa6e322a66 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriter.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriter.java @@ -69,6 +69,7 @@ import org.apache.jackrabbit.oak.plugins.memory.ModifiedNodeState; import org.apache.jackrabbit.oak.segment.RecordWriters.RecordWriter; import org.apache.jackrabbit.oak.segment.WriteOperationHandler.WriteOperation; +import org.apache.jackrabbit.oak.segment.data.PartialSegmentState; import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.state.DefaultNodeStateDiff; @@ -156,6 +157,11 @@ public void flush() throws IOException { writeOperationHandler.flush(store); } + @Override + public @Nullable PartialSegmentState readPartialSegmentState(@NotNull SegmentId sid) { + return writeOperationHandler.readPartialSegmentState(sid); + } + @NotNull RecordId writeMap(@Nullable final MapRecord base, @NotNull final Map changes) throws IOException { return new SegmentWriteOperation(writeOperationHandler.getGCGeneration()) diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java index d7a8d1f3705..cb67638378b 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java @@ -38,12 +38,18 @@ import java.io.IOException; import java.io.PrintStream; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.apache.commons.io.HexDump; import org.apache.jackrabbit.oak.segment.RecordNumbers.Entry; +import org.apache.jackrabbit.oak.segment.data.PartialSegmentState; import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -65,7 +71,9 @@ * The behaviour of this class is undefined should the pre-allocated buffer be * overrun be calling any of the write methods. *

- * Instances of this class are not thread safe + * It is safe to call {@link #readPartialSegmentState(SegmentId)} concurrently with the write operations of a single + * writer (including several concurrent calls of this method). However, it is not safe to have concurrent writers, + * notably because the order in which {@code prepare} and {@code writeXYZ} methods are called matters. */ public class SegmentBufferWriter implements WriteOperationHandler { @@ -150,7 +158,7 @@ public SegmentBufferWriter(@NotNull SegmentIdProvider idProvider, @NotNull @Override - public RecordId execute(@NotNull GCGeneration gcGeneration, + public synchronized RecordId execute(@NotNull GCGeneration gcGeneration, @NotNull WriteOperation writeOperation) throws IOException { checkState(gcGeneration.equals(this.gcGeneration)); @@ -159,7 +167,7 @@ public RecordId execute(@NotNull GCGeneration gcGeneration, @Override @NotNull - public GCGeneration getGCGeneration() { + public synchronized GCGeneration getGCGeneration() { return gcGeneration; } @@ -220,22 +228,22 @@ private void newSegment(SegmentStore store) throws IOException { dirty = false; } - public void writeByte(byte value) { + public synchronized void writeByte(byte value) { position = BinaryUtils.writeByte(buffer, position, value); dirty = true; } - public void writeShort(short value) { + public synchronized void writeShort(short value) { position = BinaryUtils.writeShort(buffer, position, value); dirty = true; } - public void writeInt(int value) { + public synchronized void writeInt(int value) { position = BinaryUtils.writeInt(buffer, position, value); dirty = true; } - public void writeLong(long value) { + public synchronized void writeLong(long value) { position = BinaryUtils.writeLong(buffer, position, value); dirty = true; } @@ -245,7 +253,7 @@ public void writeLong(long value) { * * @param recordId the record ID. */ - public void writeRecordId(RecordId recordId) { + public synchronized void writeRecordId(RecordId recordId) { requireNonNull(recordId); checkState(segmentReferences.size() + 1 < 0xffff, "Segment cannot have more than 0xffff references"); @@ -278,7 +286,7 @@ private static String info(Segment segment) { return info; } - public void writeBytes(byte[] data, int offset, int length) { + public synchronized void writeBytes(byte[] data, int offset, int length) { arraycopy(data, offset, buffer, position, length); position += length; dirty = true; @@ -308,7 +316,7 @@ private String dumpSegmentBuffer() { * enough space for a record. It can also be called explicitly. */ @Override - public void flush(@NotNull SegmentStore store) throws IOException { + public synchronized void flush(@NotNull SegmentStore store) throws IOException { if (dirty) { int referencedSegmentIdCount = segmentReferences.size(); BinaryUtils.writeInt(buffer, Segment.REFERENCED_SEGMENT_ID_COUNT_OFFSET, referencedSegmentIdCount); @@ -381,7 +389,7 @@ public void flush(@NotNull SegmentStore store) throws IOException { * @param store the {@code SegmentStore} instance to write full segments to * @return a new record id */ - public RecordId prepare(RecordType type, int size, Collection ids, SegmentStore store) throws IOException { + public synchronized RecordId prepare(RecordType type, int size, Collection ids, SegmentStore store) throws IOException { checkArgument(size >= 0); requireNonNull(ids); @@ -459,4 +467,65 @@ public RecordId prepare(RecordType type, int size, Collection ids, Seg return new RecordId(segment.getSegmentId(), recordNumber); } + @Override + public synchronized @Nullable PartialSegmentState readPartialSegmentState(@NotNull SegmentId sid) { + if (segment == null || !segment.getSegmentId().equals(sid)) { + return null; + } + + byte version = SegmentVersion.asByte(LATEST_VERSION); + int generation = gcGeneration.getGeneration(); + int fullGeneration = gcGeneration.getFullGeneration(); + boolean isCompacted = gcGeneration.isCompacted(); + + List segmentReferencesList = StreamSupport.stream(segmentReferences.spliterator(), false) + .map(segmentId -> new PartialSegmentState.SegmentReference(segmentId.getMostSignificantBits(), segmentId.getLeastSignificantBits())) + .collect(Collectors.toUnmodifiableList()); + + List records = getCurrentRecords(); + + return new PartialSegmentState( + version, + generation, + fullGeneration, + isCompacted, + segmentReferencesList, + records + ); + } + + /** + * Get the current records in the buffer, in descending order of their offset. + * + *

+ * The contents of the record currently being written to can be incomplete. In this case, + * {@link PartialSegmentState.Record#contents()} will only contain the data that has been written so far. + */ + private @NotNull List getCurrentRecords() { + List result = new ArrayList<>(); + + Entry previousEntry = null; + for (Entry entry : recordNumbers) { + int currentRecordStart = entry.getOffset(); + + // Record in recordNumbers are sorted in descending order of offset + assert previousEntry == null || previousEntry.getOffset() >= currentRecordStart; + + int nextRecordStart = previousEntry == null ? MAX_SEGMENT_SIZE : previousEntry.getOffset(); + boolean isPartiallyWrittenRecord = position >= currentRecordStart; + int currentRecordEnd = isPartiallyWrittenRecord ? position : nextRecordStart; + result.add( + new PartialSegmentState.Record( + entry.getRecordNumber(), + entry.getType(), + currentRecordStart, + Arrays.copyOfRange(buffer, currentRecordStart, currentRecordEnd) + ) + ); + + previousEntry = entry; + } + + return List.copyOf(result); + } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java index 05ebc154689..2a67bdc63e9 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java @@ -22,6 +22,7 @@ import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.segment.data.PartialSegmentState; import org.apache.jackrabbit.oak.spi.state.NodeState; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -33,6 +34,19 @@ public interface SegmentWriter { void flush() throws IOException; + /** + * Get the {@link PartialSegmentState partial state} of a segment + * that has started being written to but hasn’t been flushed yet. + * + * @param sid The ID of the segment + * @return The partial state or {@code null} if no partial state was found for the given segment ID. + * @throws UnsupportedOperationException if reading partial segment states is not supported. + */ + @Nullable + default PartialSegmentState readPartialSegmentState(@NotNull SegmentId sid) { + throw new UnsupportedOperationException("Trying to read partial segment state from a SegmentWriter that doesn’t support it."); + } + /** * Write a blob (as list of block records) * diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java index faf65c0007a..8e806f63e1a 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java @@ -21,9 +21,11 @@ import java.io.IOException; +import org.apache.jackrabbit.oak.segment.data.PartialSegmentState; import org.jetbrains.annotations.NotNull; import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; +import org.jetbrains.annotations.Nullable; /** * A {@code WriteOperationHandler} executes {@link WriteOperation @@ -73,4 +75,10 @@ RecordId execute(@NotNull GCGeneration gcGeneration, @NotNull WriteOperation wri * @throws IOException */ void flush(@NotNull SegmentStore store) throws IOException; + + /** @see SegmentWriter#readPartialSegmentState(SegmentId) */ + @Nullable + default PartialSegmentState readPartialSegmentState(@NotNull SegmentId sid) { + throw new UnsupportedOperationException("Trying to read partial segment state from a WriteOperationHandler that doesn’t support it."); + } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/data/PartialSegmentState.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/data/PartialSegmentState.java new file mode 100644 index 00000000000..e6f11ac451e --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/data/PartialSegmentState.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.data; + +import org.apache.jackrabbit.oak.segment.RecordType; +import org.apache.jackrabbit.oak.segment.SegmentWriter; +import org.jetbrains.annotations.NotNull; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * Contains the state of a partial segment, i.e. a segment that is currently being written to by {@link SegmentWriter} + * and has not been flushed yet. + * + *

+ * The data contained in the partial segment is permanent, i.e. instances of {@code PartialSegmentState} will never + * return data which is incoherent with earlier instances of {@code PartialSegmentState} for the same segment. + * For instance, {@link #version()} will always return the same value for the same segment and will never be changed. + * However, the data may be incomplete. For instance, the segment may not contain all the records that will be written + * to it before it is flushed. + * + *

+ * Instances of this class are immutable and thus thread-safe. + * + * @link Documentation of the segment structure + */ +public final class PartialSegmentState { + private final byte version; + private final int generation; + private final int fullGeneration; + private final boolean isCompacted; + private final @NotNull List<@NotNull SegmentReference> segmentReferences; + private final @NotNull List<@NotNull Record> records; + + public PartialSegmentState(byte version, int generation, int fullGeneration, boolean isCompacted, @NotNull List<@NotNull SegmentReference> segmentReferences, @NotNull List<@NotNull Record> records) { + this.version = version; + this.generation = generation; + this.fullGeneration = fullGeneration; + this.isCompacted = isCompacted; + this.segmentReferences = List.copyOf(segmentReferences); + this.records = List.copyOf(records); + } + + public byte version() { + return version; + } + + public int generation() { + return generation; + } + + public int fullGeneration() { + return fullGeneration; + } + + public boolean isCompacted() { + return isCompacted; + } + + /** + * The references to other segments, in the order of their appearance in the segment header + * (segment number = index of a reference in {@code segmentReferences} + 1). + * + *

+ * New elements might be added to this list before the segment is flushed in newer instances of + * {@code PartialSegmentState} for the same segment, but the elements that are already present will never be changed. + */ + public @NotNull List<@NotNull SegmentReference> segmentReferences() { + return segmentReferences; + } + + /** + * The records in the segment, sorted by offset in descending order (highest offset first, which is the order of + * record numbers in the segment header). + * + *

+ * This list may be incomplete if new records are added before the segment is flushed. + * Also, existing records may themselves be incomplete (see {@link Record}). + */ + public @NotNull List<@NotNull Record> records() { + return records; + } + + public static final class SegmentReference { + private final long msb; + private final long lsb; + + public SegmentReference(long msb, long lsb) { + this.msb = msb; + this.lsb = lsb; + } + + public long msb() { + return msb; + } + + public long lsb() { + return lsb; + } + } + + public static final class Record { + /** The reference number of the record (= record number) */ + private final int refNumber; + + /** The type of the record */ + private final RecordType recordType; + + /** + * The offset of the record in the segment (smaller than {@link Segment#MAX_SEGMENT_SIZE}). + * + *

+ * This offset is relative to a + * theoretical 256-KiB segment. + */ + private final int offset; + + /** + * The known contents of the segment starting at its {@link #offset}. + * + *

+ * This array can be smaller than the actual size of the record if the contents of this record are only + * partially known. + */ + private final byte @NotNull [] contents; + + public Record(int refNumber, @NotNull RecordType recordType, int offset, byte @NotNull [] contents) { + this.refNumber = refNumber; + this.recordType = requireNonNull(recordType); + this.offset = offset; + this.contents = requireNonNull(contents); + } + + /** @see #refNumber */ + public int refNumber() { + return refNumber; + } + + /** @see #recordType */ + public @NotNull RecordType recordType() { + return recordType; + } + + /** @see #offset */ + public int offset() { + return offset; + } + + /** @see #contents */ + public byte @NotNull [] contents() { + return contents; + } + } +} diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterTest.java index 95b761f8b9b..a9344aadd48 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterTest.java @@ -20,8 +20,11 @@ import static org.apache.jackrabbit.guava.common.collect.Lists.newArrayList; import static org.apache.jackrabbit.oak.segment.DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder; import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import java.io.File; import java.util.Collections; @@ -29,8 +32,10 @@ import java.util.Optional; import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState; +import org.apache.jackrabbit.oak.segment.data.PartialSegmentState; import org.apache.jackrabbit.oak.segment.file.FileStore; import org.apache.jackrabbit.oak.segment.file.ReadOnlyFileStore; +import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -135,4 +140,74 @@ public void tooBigRecord() throws Exception { } } + @Test + public void readPartialSegmentState() throws Exception { + try (FileStore store = openFileStore()) { + GCGeneration gcGeneration = store.getRevisions().getHead().getSegment().getGcGeneration(); + SegmentBufferWriter writer = new SegmentBufferWriter(store.getSegmentIdProvider(), "t", gcGeneration, RecordWriteListener.DEFAULT); + + var referencedRecordId = store.getHead().getRecordId(); + writer.flush(store); + + // Write a record completely + RecordId completeRecordId = writer.prepare(RecordType.BLOCK, Long.BYTES, Collections.emptyList(), store); + writer.writeLong(0b00001010_11010000_10111110_10101101_00001011_11101010_11010000_10111110L); + assert completeRecordId.getSegmentId() != referencedRecordId.getSegmentId(); + + // Reference another segment + final int RECORD_ID_BYTES = Short.BYTES + Integer.BYTES; + RecordId referenceRecordId = writer.prepare(RecordType.VALUE, RECORD_ID_BYTES, Collections.emptyList(), store); + writer.writeRecordId(referencedRecordId); + + // Write a record partially + RecordId partialRecordId = writer.prepare(RecordType.BLOCK, Integer.BYTES * 2, Collections.emptyList(), store); + writer.writeByte((byte)42); + + // Read the segment partially + PartialSegmentState partialSegmentState = writer.readPartialSegmentState(completeRecordId.getSegmentId()); + assertNotNull(partialSegmentState); + + assertEquals(partialSegmentState.generation(), gcGeneration.getGeneration()); + assertEquals(partialSegmentState.fullGeneration(), gcGeneration.getFullGeneration()); + assertEquals(partialSegmentState.isCompacted(), gcGeneration.isCompacted()); + assertEquals(partialSegmentState.version(), (byte) 13); + + assertEquals(1, partialSegmentState.segmentReferences().size()); + assertEquals(referencedRecordId.getSegmentId().getMostSignificantBits(), partialSegmentState.segmentReferences().get(0).msb()); + assertEquals(referencedRecordId.getSegmentId().getLeastSignificantBits(), partialSegmentState.segmentReferences().get(0).lsb()); + + assertEquals(4, partialSegmentState.records().size()); + + writer.flush(store); + + int offset = partialSegmentState.records().get(1).offset(); + assertEquals(completeRecordId.getRecordNumber(), partialSegmentState.records().get(1).refNumber()); + assertEquals(RecordType.BLOCK, partialSegmentState.records().get(1).recordType()); + assertArrayEquals(new byte[] { (byte)0b00001010, (byte)0b11010000, (byte)0b10111110, (byte)0b10101101, (byte)0b00001011, (byte)0b11101010, (byte)0b11010000, (byte)0b10111110 }, partialSegmentState.records().get(1).contents()); + offset -= Long.BYTES; + + assertEquals(referenceRecordId.getRecordNumber(), partialSegmentState.records().get(2).refNumber()); + assertEquals(RecordType.VALUE, partialSegmentState.records().get(2).recordType()); + assertEquals(offset, partialSegmentState.records().get(2).offset()); + offset -= 8; // align(RECORD_ID_BYTES, 4) + + assertEquals(partialRecordId.getRecordNumber(), partialSegmentState.records().get(3).refNumber()); + assertEquals(RecordType.BLOCK, partialSegmentState.records().get(3).recordType()); + assertEquals(offset, partialSegmentState.records().get(3).offset()); + assertArrayEquals(new byte[] { 42 }, partialSegmentState.records().get(3).contents()); + } + } + + @Test + public void readPartialSegmentStateOfOtherSegmentReturnsNull() throws Exception { + try (FileStore store = openFileStore()) { + SegmentBufferWriter writer = new SegmentBufferWriter(store.getSegmentIdProvider(), "t", store.getRevisions().getHead().getSegment().getGcGeneration(), RecordWriteListener.DEFAULT); + + RecordId recordId = writer.prepare(RecordType.BLOCK, Long.BYTES, Collections.emptyList(), store); + writer.writeLong(0b101011010000101111101010110100001011111010101101000010111110L); + writer.flush(store); + + assertNull(writer.readPartialSegmentState(recordId.getSegmentId())); + } + } }