Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,14 @@ public BinaryRow createReuseInstance() {
}

@Override
public int serializeToPages(BinaryRow record, AbstractPagedOutputView headerLessView)
throws IOException {
int skip = checkSkipWriteForFixLengthPart(headerLessView);
headerLessView.writeInt(record.getSizeInBytes());
serializeWithoutLength(record, headerLessView);
public int serializeToPages(BinaryRow record, AbstractPagedOutputView out) throws IOException {
int skip = checkSkipWriteForFixLengthPart(out);
out.writeInt(record.getSizeInBytes());
serializeWithoutLength(record, out);
return skip;
}

private static void serializeWithoutLength(BinaryRow record, MemorySegmentWritable writable)
public static void serializeWithoutLength(BinaryRow record, MemorySegmentWritable writable)
throws IOException {
if (record.getSegments().length == 1) {
writable.write(record.getSegments()[0], record.getOffset(), record.getSizeInBytes());
Expand Down Expand Up @@ -281,11 +280,11 @@ public void checkSkipReadForFixLengthPart(AbstractPagedInputView source) throws

/** Return fixed part length to serialize one row. */
public int getSerializedRowFixedPartLength() {
return getFixedLengthPartSize() + LENGTH_SIZE_IN_BYTES;
return fixedLengthPartSize + LENGTH_SIZE_IN_BYTES;
}

public int getFixedLengthPartSize() {
return fixedLengthPartSize;
public static int getSerializedRowLength(BinaryRow row) {
return row.getSizeInBytes() + LENGTH_SIZE_IN_BYTES;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.UTFDataFormatException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;

Expand All @@ -37,8 +36,6 @@ public class DataOutputSerializer implements DataOutputView, MemorySegmentWritab

private int position;

private ByteBuffer wrapper;

// ------------------------------------------------------------------------

public DataOutputSerializer(int startSize) {
Expand All @@ -47,13 +44,6 @@ public DataOutputSerializer(int startSize) {
}

this.buffer = new byte[startSize];
this.wrapper = ByteBuffer.wrap(buffer);
}

public ByteBuffer wrapAsByteBuffer() {
this.wrapper.position(0);
this.wrapper.limit(this.position);
return this.wrapper;
}

/** @deprecated Replaced by {@link #getSharedBuffer()} for a better, safer name. */
Expand Down Expand Up @@ -329,7 +319,6 @@ private void resize(int minCapacityAdd) throws IOException {

System.arraycopy(this.buffer, 0, nb, 0, this.position);
this.buffer = nb;
this.wrapper = ByteBuffer.wrap(this.buffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.io;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.SimpleCollectingOutputView;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.utils.MathUtils;

import java.io.IOException;
import java.util.ArrayList;

import static org.apache.paimon.data.serializer.BinaryRowSerializer.getSerializedRowLength;
import static org.apache.paimon.data.serializer.BinaryRowSerializer.serializeWithoutLength;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
* A paged output serializer that efficiently handles serialization of rows using memory pages.
*
* <p>This serializer uses a two-phase approach:
*
* <ol>
* <li><strong>Initial phase:</strong> Writes data to an initial buffer until it reaches the
* specified page size.
* <li><strong>Paged phase:</strong> Once the initial buffer exceeds the page size, switches to
* using {@link SimpleCollectingOutputView} with allocated memory segments for efficient
* memory management.
* </ol>
*
* <p>The design ensures optimal performance for both small datasets (using single buffer) and large
* datasets (using paged memory allocation).
*/
public class DataPagedOutputSerializer {

private final InternalRowSerializer serializer;
private final int pageSize;

private DataOutputSerializer initialOut;
private SimpleCollectingOutputView pagedOut;

/**
* Constructs a new DataPagedOutputSerializer with the specified parameters.
*
* @param serializer the internal row serializer used for converting rows to binary format
* @param startSize the initial buffer size for storing serialized data
* @param pageSize the maximum size of each memory page before switching to paged mode
*/
public DataPagedOutputSerializer(
InternalRowSerializer serializer, int startSize, int pageSize) {
this.serializer = serializer;
this.pageSize = pageSize;
this.initialOut = new DataOutputSerializer(startSize);
}

@VisibleForTesting
SimpleCollectingOutputView pagedOut() {
return pagedOut;
}

/**
* Serializes a binary row to the output.
*
* <p>Depending on the current state and available space, this method will either:
*
* <ul>
* <li>Write directly to the initial buffer if there's sufficient space remaining
* <li>Switch to paged mode and write to memory segments once the initial buffer fills up
* </ul>
*
* @param row the binary row to serialize
* @throws IOException if an I/O error occurs during serialization
*/
public void write(InternalRow row) throws IOException {
if (pagedOut != null) {
serializer.serializeToPages(row, pagedOut);
} else {
BinaryRow binaryRow = serializer.toBinaryRow(row);
int serializedSize = getSerializedRowLength(binaryRow);
if (initialOut.length() + serializedSize > pageSize) {
pagedOut = toPagedOutput(initialOut, pageSize);
initialOut = null;
serializer.serializeToPages(row, pagedOut);
} else {
initialOut.writeInt(binaryRow.getSizeInBytes());
serializeWithoutLength(binaryRow, initialOut);
}
}
}

private static SimpleCollectingOutputView toPagedOutput(
DataOutputSerializer output, int pageSize) throws IOException {
checkArgument(output.length() <= pageSize);
SimpleCollectingOutputView pagedOut =
new SimpleCollectingOutputView(
new ArrayList<>(),
() -> MemorySegment.allocateHeapMemory(pageSize),
pageSize);
pagedOut.write(output.getSharedBuffer(), 0, output.length());
return pagedOut;
}

public SimpleCollectingOutputView close() throws IOException {
if (pagedOut != null) {
return pagedOut;
}

int pageSize = MathUtils.roundUpToPowerOf2(initialOut.length());
return toPagedOutput(initialOut, pageSize);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.io;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.SimpleCollectingOutputView;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.types.DataTypes;

import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;

import static org.assertj.core.api.Assertions.assertThat;

/** Unit tests for {@link DataPagedOutputSerializer}. */
class DataPagedOutputSerializerTest {

@Test
void testSmallDatasetStaysInInitialPhase() throws IOException {
InternalRowSerializer serializer = createSimpleSerializer();
DataPagedOutputSerializer output = new DataPagedOutputSerializer(serializer, 256, 8 * 1024);
SimpleCollectingOutputView pagedView =
new SimpleCollectingOutputView(
new ArrayList<>(),
() -> MemorySegment.allocateHeapMemory(4 * 1024),
4 * 1024);

BinaryRow row = createSampleRow();

// Write a few small rows that fit within the initial buffer
for (int i = 0; i < 50; i++) {
output.write(row);
serializer.serializeToPages(row, pagedView);
}

// Should still be using initial buffer since we haven't exceeded page size
assertThat(output.pagedOut()).isNull();

// assert result
SimpleCollectingOutputView result = output.close();
assertEqual(result, pagedView);
}

@Test
void testLargeDatasetSwitchesToPagedMode() throws IOException {
InternalRowSerializer serializer = createSimpleSerializer();
DataPagedOutputSerializer output = new DataPagedOutputSerializer(serializer, 256, 4 * 1024);
SimpleCollectingOutputView pagedView =
new SimpleCollectingOutputView(
new ArrayList<>(),
() -> MemorySegment.allocateHeapMemory(4 * 1024),
4 * 1024);

BinaryRow row = createSampleRow();

// Write many rows
for (int i = 0; i < 500; i++) {
output.write(row);
serializer.serializeToPages(row, pagedView);
}

assertThat(output.pagedOut()).isNotNull();

// assert result
SimpleCollectingOutputView result = output.close();
assertEqual(result, pagedView);
}

private void assertEqual(SimpleCollectingOutputView view1, SimpleCollectingOutputView view2) {
assertThat(view1.getCurrentOffset()).isEqualTo(view2.getCurrentOffset());
assertThat(view1.fullSegments().size()).isEqualTo(view2.fullSegments().size());
for (int i = 0; i < view1.fullSegments().size(); i++) {
MemorySegment segment1 = view1.fullSegments().get(i);
MemorySegment segment2 = view2.fullSegments().get(i);
assertThat(segment1.size()).isEqualTo(segment2.size());
assertThat(segment1.getHeapMemory()).isEqualTo(segment2.getHeapMemory());
}
}

private InternalRowSerializer createSimpleSerializer() {
return new InternalRowSerializer(DataTypes.INT(), DataTypes.STRING());
}

private BinaryRow createSampleRow() {
GenericRow genericRow =
GenericRow.of(
42, BinaryString.fromString("sample-data-" + System.currentTimeMillis()));
InternalRowSerializer tempSerializer = createSimpleSerializer();
return tempSerializer.toBinaryRow(genericRow);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import org.apache.paimon.data.SimpleCollectingOutputView;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.DataPagedOutputSerializer;
import org.apache.paimon.manifest.ManifestEntrySegments.RichSegments;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentSource;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.partition.PartitionPredicate.MultiplePartitionPredicate;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -77,34 +76,31 @@ public ManifestEntryCache(

@Override
protected ManifestEntrySegments createSegments(Path path, @Nullable Long fileSize) {
Map<Triple<BinaryRow, Integer, Integer>, SimpleCollectingOutputView> segments =
Map<Triple<BinaryRow, Integer, Integer>, DataPagedOutputSerializer> segments =
new HashMap<>();
Function<InternalRow, BinaryRow> partitionGetter = partitionGetter();
Function<InternalRow, Integer> bucketGetter = bucketGetter();
Function<InternalRow, Integer> totalBucketGetter = totalBucketGetter();
MemorySegmentSource segmentSource =
() -> MemorySegment.allocateHeapMemory(cache.pageSize());
Supplier<SimpleCollectingOutputView> outViewSupplier =
() ->
new SimpleCollectingOutputView(
new ArrayList<>(), segmentSource, cache.pageSize());
int pageSize = cache.pageSize();
InternalRowSerializer formatSerializer = this.formatSerializer.get();
Supplier<DataPagedOutputSerializer> outputSupplier =
() -> new DataPagedOutputSerializer(formatSerializer, 2048, pageSize);
try (CloseableIterator<InternalRow> iterator = reader.apply(path, fileSize)) {
while (iterator.hasNext()) {
InternalRow row = iterator.next();
BinaryRow partition = partitionGetter.apply(row);
int bucket = bucketGetter.apply(row);
int totalBucket = totalBucketGetter.apply(row);
Triple<BinaryRow, Integer, Integer> key = Triple.of(partition, bucket, totalBucket);
SimpleCollectingOutputView view =
segments.computeIfAbsent(key, k -> outViewSupplier.get());
formatSerializer.serializeToPages(row, view);
DataPagedOutputSerializer output =
segments.computeIfAbsent(key, k -> outputSupplier.get());
output.write(row);
}
List<RichSegments> result = new ArrayList<>();
for (Map.Entry<Triple<BinaryRow, Integer, Integer>, SimpleCollectingOutputView> entry :
for (Map.Entry<Triple<BinaryRow, Integer, Integer>, DataPagedOutputSerializer> entry :
segments.entrySet()) {
Triple<BinaryRow, Integer, Integer> key = entry.getKey();
SimpleCollectingOutputView view = entry.getValue();
SimpleCollectingOutputView view = entry.getValue().close();
Segments seg =
Segments.create(view.fullSegments(), view.getCurrentPositionInSegment());
result.add(new RichSegments(key.f0, key.f1, key.f2, seg));
Expand Down
Loading