Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
import org.apache.paimon.io.BundleRecords;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.FileWriterAbortExecutor;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.io.RollingFileWriterImpl;
import org.apache.paimon.io.RowDataFileWriter;
import org.apache.paimon.io.SingleFileWriter;
import org.apache.paimon.io.SingleFileWriter.AbortExecutor;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
import org.apache.paimon.statistics.SimpleColStatsCollector;
Expand Down Expand Up @@ -91,7 +91,7 @@ public class RollingBlobFileWriter implements RollingFileWriter<InternalRow, Dat
private final long targetFileSize;

// State management
private final List<AbortExecutor> closedWriters;
private final List<FileWriterAbortExecutor> closedWriters;
private final List<DataFileMeta> results;
private PeojectedFileWriter<SingleFileWriter<InternalRow, DataFileMeta>, DataFileMeta>
currentWriter;
Expand Down Expand Up @@ -315,7 +315,7 @@ public void abort() {
currentWriter.abort();
currentWriter = null;
}
for (AbortExecutor abortExecutor : closedWriters) {
for (FileWriterAbortExecutor abortExecutor : closedWriters) {
abortExecutor.abort();
}
blobWriter.abort();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.io;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;

/** Abort executor to just have reference of path instead of whole writer. */
public class FileWriterAbortExecutor {
private final FileIO fileIO;
private final Path path;

public FileWriterAbortExecutor(FileIO fileIO, Path path) {
this.fileIO = fileIO;
this.path = path;
}

public void abort() {
fileIO.deleteQuietly(path);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.io;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.TwoPhaseOutputStream;
import org.apache.paimon.types.RowType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;

/**
* Format table's writer to roll over to a new file if the current size exceed the target file size.
*/
public class FormatTableRollingFileWriter implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(FormatTableRollingFileWriter.class);

private static final int CHECK_ROLLING_RECORD_CNT = 1000;

private final Supplier<FormatTableSingleFileWriter> writerFactory;
private final long targetFileSize;
private final List<FileWriterAbortExecutor> closedWriters;
private final List<TwoPhaseOutputStream.Committer> committers;

private FormatTableSingleFileWriter currentWriter = null;
private long recordCount = 0;
private boolean closed = false;

public FormatTableRollingFileWriter(
FileIO fileIO,
FileFormat fileFormat,
long targetFileSize,
RowType writeSchema,
DataFilePathFactory pathFactory,
String fileCompression) {
this.writerFactory =
() ->
new FormatTableSingleFileWriter(
fileIO,
fileFormat.createWriterFactory(writeSchema),
pathFactory.newPath(),
fileCompression);
this.targetFileSize = targetFileSize;
this.closedWriters = new ArrayList<>();
this.committers = new ArrayList<>();
}

public long targetFileSize() {
return targetFileSize;
}

public void write(InternalRow row) throws IOException {
try {
if (currentWriter == null) {
currentWriter = writerFactory.get();
}

currentWriter.write(row);
recordCount += 1;
boolean needRolling =
currentWriter.reachTargetSize(
recordCount % CHECK_ROLLING_RECORD_CNT == 0, targetFileSize);
if (needRolling) {
closeCurrentWriter();
}
} catch (Throwable e) {
LOG.warn(
"Exception occurs when writing file {}. Cleaning up.",
currentWriter == null ? null : currentWriter.path(),
e);
abort();
throw e;
}
}

private void closeCurrentWriter() throws IOException {
if (currentWriter == null) {
return;
}

currentWriter.close();
closedWriters.add(currentWriter.abortExecutor());
if (currentWriter.committers() != null) {
committers.addAll(currentWriter.committers());
}

currentWriter = null;
}

public void abort() {
if (currentWriter != null) {
currentWriter.abort();
}
for (FileWriterAbortExecutor abortExecutor : closedWriters) {
abortExecutor.abort();
}
}

public List<TwoPhaseOutputStream.Committer> committers() {
return committers;
}

@Override
public void close() throws IOException {
if (closed) {
return;
}

try {
closeCurrentWriter();
} catch (IOException e) {
LOG.warn(
"Exception occurs when writing file {}. Cleaning up.", currentWriter.path(), e);
abort();
throw e;
} finally {
closed = true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.io;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SupportsDirectWrite;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.TwoPhaseOutputStream;
import org.apache.paimon.utils.IOUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;

/** Format table's writer to produce a single file. */
public class FormatTableSingleFileWriter {

private static final Logger LOG = LoggerFactory.getLogger(FormatTableSingleFileWriter.class);

protected final FileIO fileIO;
protected final Path path;

private FormatWriter writer;
private PositionOutputStream out;
private TwoPhaseOutputStream.Committer committer;

protected long outputBytes;
protected boolean closed;

public FormatTableSingleFileWriter(
FileIO fileIO, FormatWriterFactory factory, Path path, String compression) {
this.fileIO = fileIO;
this.path = path;

try {
if (factory instanceof SupportsDirectWrite) {
throw new UnsupportedOperationException("Does not support SupportsDirectWrite.");
} else {
out = fileIO.newTwoPhaseOutputStream(path, false);
writer = factory.create(out, compression);
}
} catch (IOException e) {
LOG.warn(
"Failed to open the bulk writer, closing the output stream and throw the error.",
e);
if (out != null) {
abort();
}
throw new UncheckedIOException(e);
}

this.closed = false;
}

public Path path() {
return path;
}

public void write(InternalRow record) throws IOException {
if (closed) {
throw new RuntimeException("Writer has already closed!");
}

try {
writer.addElement(record);
} catch (Throwable e) {
LOG.warn("Exception occurs when writing file {}. Cleaning up.", path, e);
abort();
throw e;
}
}

public boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException {
return writer.reachTargetSize(suggestedCheck, targetSize);
}

public void abort() {
if (writer != null) {
IOUtils.closeQuietly(writer);
writer = null;
}
if (out != null) {
try {
committer = ((TwoPhaseOutputStream) out).closeForCommit();
} catch (Throwable e) {
LOG.warn("Exception occurs when close for commit out {}", committer, e);
}
out = null;
}
if (committer != null) {
try {
committer.discard();
} catch (Throwable e) {
LOG.warn("Exception occurs when close out {}", committer, e);
}
}
fileIO.deleteQuietly(path);
}

public List<TwoPhaseOutputStream.Committer> committers() {
if (!closed) {
throw new RuntimeException("Writer should be closed before getting committer!");
}
return Lists.newArrayList(committer);
}

public FileWriterAbortExecutor abortExecutor() {
if (!closed) {
throw new RuntimeException("Writer should be closed!");
}

return new FileWriterAbortExecutor(fileIO, path);
}

public void close() throws IOException {
if (closed) {
return;
}

if (LOG.isDebugEnabled()) {
LOG.debug("Closing file {}", path);
}

try {
if (writer != null) {
writer.close();
writer = null;
}
if (out != null) {
out.flush();
outputBytes = out.getPos();
committer = ((TwoPhaseOutputStream) out).closeForCommit();
out = null;
}
} catch (IOException e) {
LOG.warn("Exception occurs when closing file {}. Cleaning up.", path, e);
abort();
throw e;
} finally {
closed = true;
}
}
}
Loading
Loading