diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
index 8773e734d891..49dcee73ef27 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
@@ -113,15 +113,14 @@ public BinaryRow createReuseInstance() {
}
@Override
- public int serializeToPages(BinaryRow record, AbstractPagedOutputView headerLessView)
- throws IOException {
- int skip = checkSkipWriteForFixLengthPart(headerLessView);
- headerLessView.writeInt(record.getSizeInBytes());
- serializeWithoutLength(record, headerLessView);
+ public int serializeToPages(BinaryRow record, AbstractPagedOutputView out) throws IOException {
+ int skip = checkSkipWriteForFixLengthPart(out);
+ out.writeInt(record.getSizeInBytes());
+ serializeWithoutLength(record, out);
return skip;
}
- private static void serializeWithoutLength(BinaryRow record, MemorySegmentWritable writable)
+ public static void serializeWithoutLength(BinaryRow record, MemorySegmentWritable writable)
throws IOException {
if (record.getSegments().length == 1) {
writable.write(record.getSegments()[0], record.getOffset(), record.getSizeInBytes());
@@ -281,11 +280,11 @@ public void checkSkipReadForFixLengthPart(AbstractPagedInputView source) throws
/** Return fixed part length to serialize one row. */
public int getSerializedRowFixedPartLength() {
- return getFixedLengthPartSize() + LENGTH_SIZE_IN_BYTES;
+ return fixedLengthPartSize + LENGTH_SIZE_IN_BYTES;
}
- public int getFixedLengthPartSize() {
- return fixedLengthPartSize;
+ public static int getSerializedRowLength(BinaryRow row) {
+ return row.getSizeInBytes() + LENGTH_SIZE_IN_BYTES;
}
@Override
diff --git a/paimon-common/src/main/java/org/apache/paimon/io/DataOutputSerializer.java b/paimon-common/src/main/java/org/apache/paimon/io/DataOutputSerializer.java
index a0923d94da9f..33f2a8f0bf06 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/DataOutputSerializer.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/DataOutputSerializer.java
@@ -26,7 +26,6 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.UTFDataFormatException;
-import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
@@ -37,8 +36,6 @@ public class DataOutputSerializer implements DataOutputView, MemorySegmentWritab
private int position;
- private ByteBuffer wrapper;
-
// ------------------------------------------------------------------------
public DataOutputSerializer(int startSize) {
@@ -47,13 +44,6 @@ public DataOutputSerializer(int startSize) {
}
this.buffer = new byte[startSize];
- this.wrapper = ByteBuffer.wrap(buffer);
- }
-
- public ByteBuffer wrapAsByteBuffer() {
- this.wrapper.position(0);
- this.wrapper.limit(this.position);
- return this.wrapper;
}
/** @deprecated Replaced by {@link #getSharedBuffer()} for a better, safer name. */
@@ -329,7 +319,6 @@ private void resize(int minCapacityAdd) throws IOException {
System.arraycopy(this.buffer, 0, nb, 0, this.position);
this.buffer = nb;
- this.wrapper = ByteBuffer.wrap(this.buffer);
}
@Override
diff --git a/paimon-common/src/main/java/org/apache/paimon/io/DataPagedOutputSerializer.java b/paimon-common/src/main/java/org/apache/paimon/io/DataPagedOutputSerializer.java
new file mode 100644
index 000000000000..c7ce3c94ec8f
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/io/DataPagedOutputSerializer.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.SimpleCollectingOutputView;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.utils.MathUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.paimon.data.serializer.BinaryRowSerializer.getSerializedRowLength;
+import static org.apache.paimon.data.serializer.BinaryRowSerializer.serializeWithoutLength;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * A paged output serializer that efficiently handles serialization of rows using memory pages.
+ *
+ *
This serializer uses a two-phase approach:
+ *
+ *
+ * - Initial phase: Writes data to an initial buffer until it reaches the
+ * specified page size.
+ *
- Paged phase: Once the initial buffer exceeds the page size, switches to
+ * using {@link SimpleCollectingOutputView} with allocated memory segments for efficient
+ * memory management.
+ *
+ *
+ * The design ensures optimal performance for both small datasets (using single buffer) and large
+ * datasets (using paged memory allocation).
+ */
+public class DataPagedOutputSerializer {
+
+ private final InternalRowSerializer serializer;
+ private final int pageSize;
+
+ private DataOutputSerializer initialOut;
+ private SimpleCollectingOutputView pagedOut;
+
+ /**
+ * Constructs a new DataPagedOutputSerializer with the specified parameters.
+ *
+ * @param serializer the internal row serializer used for converting rows to binary format
+ * @param startSize the initial buffer size for storing serialized data
+ * @param pageSize the maximum size of each memory page before switching to paged mode
+ */
+ public DataPagedOutputSerializer(
+ InternalRowSerializer serializer, int startSize, int pageSize) {
+ this.serializer = serializer;
+ this.pageSize = pageSize;
+ this.initialOut = new DataOutputSerializer(startSize);
+ }
+
+ @VisibleForTesting
+ SimpleCollectingOutputView pagedOut() {
+ return pagedOut;
+ }
+
+ /**
+ * Serializes a binary row to the output.
+ *
+ *
Depending on the current state and available space, this method will either:
+ *
+ *
+ * - Write directly to the initial buffer if there's sufficient space remaining
+ *
- Switch to paged mode and write to memory segments once the initial buffer fills up
+ *
+ *
+ * @param row the binary row to serialize
+ * @throws IOException if an I/O error occurs during serialization
+ */
+ public void write(InternalRow row) throws IOException {
+ if (pagedOut != null) {
+ serializer.serializeToPages(row, pagedOut);
+ } else {
+ BinaryRow binaryRow = serializer.toBinaryRow(row);
+ int serializedSize = getSerializedRowLength(binaryRow);
+ if (initialOut.length() + serializedSize > pageSize) {
+ pagedOut = toPagedOutput(initialOut, pageSize);
+ initialOut = null;
+ serializer.serializeToPages(row, pagedOut);
+ } else {
+ initialOut.writeInt(binaryRow.getSizeInBytes());
+ serializeWithoutLength(binaryRow, initialOut);
+ }
+ }
+ }
+
+ private static SimpleCollectingOutputView toPagedOutput(
+ DataOutputSerializer output, int pageSize) throws IOException {
+ checkArgument(output.length() <= pageSize);
+ SimpleCollectingOutputView pagedOut =
+ new SimpleCollectingOutputView(
+ new ArrayList<>(),
+ () -> MemorySegment.allocateHeapMemory(pageSize),
+ pageSize);
+ pagedOut.write(output.getSharedBuffer(), 0, output.length());
+ return pagedOut;
+ }
+
+ public SimpleCollectingOutputView close() throws IOException {
+ if (pagedOut != null) {
+ return pagedOut;
+ }
+
+ int pageSize = MathUtils.roundUpToPowerOf2(initialOut.length());
+ return toPagedOutput(initialOut, pageSize);
+ }
+}
diff --git a/paimon-common/src/test/java/org/apache/paimon/io/DataPagedOutputSerializerTest.java b/paimon-common/src/test/java/org/apache/paimon/io/DataPagedOutputSerializerTest.java
new file mode 100644
index 000000000000..1156b8533a5f
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/io/DataPagedOutputSerializerTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.SimpleCollectingOutputView;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link DataPagedOutputSerializer}. */
+class DataPagedOutputSerializerTest {
+
+ @Test
+ void testSmallDatasetStaysInInitialPhase() throws IOException {
+ InternalRowSerializer serializer = createSimpleSerializer();
+ DataPagedOutputSerializer output = new DataPagedOutputSerializer(serializer, 256, 8 * 1024);
+ SimpleCollectingOutputView pagedView =
+ new SimpleCollectingOutputView(
+ new ArrayList<>(),
+ () -> MemorySegment.allocateHeapMemory(4 * 1024),
+ 4 * 1024);
+
+ BinaryRow row = createSampleRow();
+
+ // Write a few small rows that fit within the initial buffer
+ for (int i = 0; i < 50; i++) {
+ output.write(row);
+ serializer.serializeToPages(row, pagedView);
+ }
+
+ // Should still be using initial buffer since we haven't exceeded page size
+ assertThat(output.pagedOut()).isNull();
+
+ // assert result
+ SimpleCollectingOutputView result = output.close();
+ assertEqual(result, pagedView);
+ }
+
+ @Test
+ void testLargeDatasetSwitchesToPagedMode() throws IOException {
+ InternalRowSerializer serializer = createSimpleSerializer();
+ DataPagedOutputSerializer output = new DataPagedOutputSerializer(serializer, 256, 4 * 1024);
+ SimpleCollectingOutputView pagedView =
+ new SimpleCollectingOutputView(
+ new ArrayList<>(),
+ () -> MemorySegment.allocateHeapMemory(4 * 1024),
+ 4 * 1024);
+
+ BinaryRow row = createSampleRow();
+
+ // Write many rows
+ for (int i = 0; i < 500; i++) {
+ output.write(row);
+ serializer.serializeToPages(row, pagedView);
+ }
+
+ assertThat(output.pagedOut()).isNotNull();
+
+ // assert result
+ SimpleCollectingOutputView result = output.close();
+ assertEqual(result, pagedView);
+ }
+
+ private void assertEqual(SimpleCollectingOutputView view1, SimpleCollectingOutputView view2) {
+ assertThat(view1.getCurrentOffset()).isEqualTo(view2.getCurrentOffset());
+ assertThat(view1.fullSegments().size()).isEqualTo(view2.fullSegments().size());
+ for (int i = 0; i < view1.fullSegments().size(); i++) {
+ MemorySegment segment1 = view1.fullSegments().get(i);
+ MemorySegment segment2 = view2.fullSegments().get(i);
+ assertThat(segment1.size()).isEqualTo(segment2.size());
+ assertThat(segment1.getHeapMemory()).isEqualTo(segment2.getHeapMemory());
+ }
+ }
+
+ private InternalRowSerializer createSimpleSerializer() {
+ return new InternalRowSerializer(DataTypes.INT(), DataTypes.STRING());
+ }
+
+ private BinaryRow createSampleRow() {
+ GenericRow genericRow =
+ GenericRow.of(
+ 42, BinaryString.fromString("sample-data-" + System.currentTimeMillis()));
+ InternalRowSerializer tempSerializer = createSimpleSerializer();
+ return tempSerializer.toBinaryRow(genericRow);
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java
index 6e43ba01a32b..59b2a34650a4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java
@@ -24,9 +24,8 @@
import org.apache.paimon.data.SimpleCollectingOutputView;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataPagedOutputSerializer;
import org.apache.paimon.manifest.ManifestEntrySegments.RichSegments;
-import org.apache.paimon.memory.MemorySegment;
-import org.apache.paimon.memory.MemorySegmentSource;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.partition.PartitionPredicate.MultiplePartitionPredicate;
import org.apache.paimon.types.RowType;
@@ -77,18 +76,15 @@ public ManifestEntryCache(
@Override
protected ManifestEntrySegments createSegments(Path path, @Nullable Long fileSize) {
- Map, SimpleCollectingOutputView> segments =
+ Map, DataPagedOutputSerializer> segments =
new HashMap<>();
Function partitionGetter = partitionGetter();
Function bucketGetter = bucketGetter();
Function totalBucketGetter = totalBucketGetter();
- MemorySegmentSource segmentSource =
- () -> MemorySegment.allocateHeapMemory(cache.pageSize());
- Supplier outViewSupplier =
- () ->
- new SimpleCollectingOutputView(
- new ArrayList<>(), segmentSource, cache.pageSize());
+ int pageSize = cache.pageSize();
InternalRowSerializer formatSerializer = this.formatSerializer.get();
+ Supplier outputSupplier =
+ () -> new DataPagedOutputSerializer(formatSerializer, 2048, pageSize);
try (CloseableIterator iterator = reader.apply(path, fileSize)) {
while (iterator.hasNext()) {
InternalRow row = iterator.next();
@@ -96,15 +92,15 @@ protected ManifestEntrySegments createSegments(Path path, @Nullable Long fileSiz
int bucket = bucketGetter.apply(row);
int totalBucket = totalBucketGetter.apply(row);
Triple key = Triple.of(partition, bucket, totalBucket);
- SimpleCollectingOutputView view =
- segments.computeIfAbsent(key, k -> outViewSupplier.get());
- formatSerializer.serializeToPages(row, view);
+ DataPagedOutputSerializer output =
+ segments.computeIfAbsent(key, k -> outputSupplier.get());
+ output.write(row);
}
List result = new ArrayList<>();
- for (Map.Entry, SimpleCollectingOutputView> entry :
+ for (Map.Entry, DataPagedOutputSerializer> entry :
segments.entrySet()) {
Triple key = entry.getKey();
- SimpleCollectingOutputView view = entry.getValue();
+ SimpleCollectingOutputView view = entry.getValue().close();
Segments seg =
Segments.create(view.fullSegments(), view.getCurrentPositionInSegment());
result.add(new RichSegments(key.f0, key.f1, key.f2, seg));