Skip to content

Commit 7b4f0e7

Browse files
authored
[core] Introduce format table write (#6288)
1 parent daf482e commit 7b4f0e7

15 files changed

+948
-145
lines changed

paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@
2626
import org.apache.paimon.io.BundleRecords;
2727
import org.apache.paimon.io.DataFileMeta;
2828
import org.apache.paimon.io.DataFilePathFactory;
29+
import org.apache.paimon.io.FileWriterAbortExecutor;
2930
import org.apache.paimon.io.RollingFileWriter;
3031
import org.apache.paimon.io.RollingFileWriterImpl;
3132
import org.apache.paimon.io.RowDataFileWriter;
3233
import org.apache.paimon.io.SingleFileWriter;
33-
import org.apache.paimon.io.SingleFileWriter.AbortExecutor;
3434
import org.apache.paimon.manifest.FileSource;
3535
import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
3636
import org.apache.paimon.statistics.SimpleColStatsCollector;
@@ -91,7 +91,7 @@ public class RollingBlobFileWriter implements RollingFileWriter<InternalRow, Dat
9191
private final long targetFileSize;
9292

9393
// State management
94-
private final List<AbortExecutor> closedWriters;
94+
private final List<FileWriterAbortExecutor> closedWriters;
9595
private final List<DataFileMeta> results;
9696
private PeojectedFileWriter<SingleFileWriter<InternalRow, DataFileMeta>, DataFileMeta>
9797
currentWriter;
@@ -316,7 +316,7 @@ public void abort() {
316316
currentWriter.abort();
317317
currentWriter = null;
318318
}
319-
for (AbortExecutor abortExecutor : closedWriters) {
319+
for (FileWriterAbortExecutor abortExecutor : closedWriters) {
320320
abortExecutor.abort();
321321
}
322322
blobWriter.abort();
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.io;
20+
21+
import org.apache.paimon.fs.FileIO;
22+
import org.apache.paimon.fs.Path;
23+
24+
/** Abort executor to just have reference of path instead of whole writer. */
25+
public class FileWriterAbortExecutor {
26+
private final FileIO fileIO;
27+
private final Path path;
28+
29+
public FileWriterAbortExecutor(FileIO fileIO, Path path) {
30+
this.fileIO = fileIO;
31+
this.path = path;
32+
}
33+
34+
public void abort() {
35+
fileIO.deleteQuietly(path);
36+
}
37+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.io;
20+
21+
import org.apache.paimon.data.InternalRow;
22+
import org.apache.paimon.format.FileFormat;
23+
import org.apache.paimon.fs.FileIO;
24+
import org.apache.paimon.fs.TwoPhaseOutputStream;
25+
import org.apache.paimon.types.RowType;
26+
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import java.io.IOException;
31+
import java.util.ArrayList;
32+
import java.util.List;
33+
import java.util.function.Supplier;
34+
35+
/**
36+
* Format table's writer to roll over to a new file if the current size exceed the target file size.
37+
*/
38+
public class FormatTableRollingFileWriter implements AutoCloseable {
39+
40+
private static final Logger LOG = LoggerFactory.getLogger(FormatTableRollingFileWriter.class);
41+
42+
private static final int CHECK_ROLLING_RECORD_CNT = 1000;
43+
44+
private final Supplier<FormatTableSingleFileWriter> writerFactory;
45+
private final long targetFileSize;
46+
private final List<FileWriterAbortExecutor> closedWriters;
47+
private final List<TwoPhaseOutputStream.Committer> committers;
48+
49+
private FormatTableSingleFileWriter currentWriter = null;
50+
private long recordCount = 0;
51+
private boolean closed = false;
52+
53+
public FormatTableRollingFileWriter(
54+
FileIO fileIO,
55+
FileFormat fileFormat,
56+
long targetFileSize,
57+
RowType writeSchema,
58+
DataFilePathFactory pathFactory,
59+
String fileCompression) {
60+
this.writerFactory =
61+
() ->
62+
new FormatTableSingleFileWriter(
63+
fileIO,
64+
fileFormat.createWriterFactory(writeSchema),
65+
pathFactory.newPath(),
66+
fileCompression);
67+
this.targetFileSize = targetFileSize;
68+
this.closedWriters = new ArrayList<>();
69+
this.committers = new ArrayList<>();
70+
}
71+
72+
public long targetFileSize() {
73+
return targetFileSize;
74+
}
75+
76+
public void write(InternalRow row) throws IOException {
77+
try {
78+
if (currentWriter == null) {
79+
currentWriter = writerFactory.get();
80+
}
81+
82+
currentWriter.write(row);
83+
recordCount += 1;
84+
boolean needRolling =
85+
currentWriter.reachTargetSize(
86+
recordCount % CHECK_ROLLING_RECORD_CNT == 0, targetFileSize);
87+
if (needRolling) {
88+
closeCurrentWriter();
89+
}
90+
} catch (Throwable e) {
91+
LOG.warn(
92+
"Exception occurs when writing file {}. Cleaning up.",
93+
currentWriter == null ? null : currentWriter.path(),
94+
e);
95+
abort();
96+
throw e;
97+
}
98+
}
99+
100+
private void closeCurrentWriter() throws IOException {
101+
if (currentWriter == null) {
102+
return;
103+
}
104+
105+
currentWriter.close();
106+
closedWriters.add(currentWriter.abortExecutor());
107+
if (currentWriter.committers() != null) {
108+
committers.addAll(currentWriter.committers());
109+
}
110+
111+
currentWriter = null;
112+
}
113+
114+
public void abort() {
115+
if (currentWriter != null) {
116+
currentWriter.abort();
117+
}
118+
for (FileWriterAbortExecutor abortExecutor : closedWriters) {
119+
abortExecutor.abort();
120+
}
121+
}
122+
123+
public List<TwoPhaseOutputStream.Committer> committers() {
124+
return committers;
125+
}
126+
127+
@Override
128+
public void close() throws IOException {
129+
if (closed) {
130+
return;
131+
}
132+
133+
try {
134+
closeCurrentWriter();
135+
} catch (IOException e) {
136+
LOG.warn(
137+
"Exception occurs when writing file {}. Cleaning up.", currentWriter.path(), e);
138+
abort();
139+
throw e;
140+
} finally {
141+
closed = true;
142+
}
143+
}
144+
}
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.io;
20+
21+
import org.apache.paimon.data.InternalRow;
22+
import org.apache.paimon.format.FormatWriter;
23+
import org.apache.paimon.format.FormatWriterFactory;
24+
import org.apache.paimon.format.SupportsDirectWrite;
25+
import org.apache.paimon.fs.FileIO;
26+
import org.apache.paimon.fs.Path;
27+
import org.apache.paimon.fs.PositionOutputStream;
28+
import org.apache.paimon.fs.TwoPhaseOutputStream;
29+
import org.apache.paimon.utils.IOUtils;
30+
31+
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
32+
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
import java.io.IOException;
37+
import java.io.UncheckedIOException;
38+
import java.util.List;
39+
40+
/** Format table's writer to produce a single file. */
41+
public class FormatTableSingleFileWriter {
42+
43+
private static final Logger LOG = LoggerFactory.getLogger(FormatTableSingleFileWriter.class);
44+
45+
protected final FileIO fileIO;
46+
protected final Path path;
47+
48+
private FormatWriter writer;
49+
private PositionOutputStream out;
50+
private TwoPhaseOutputStream.Committer committer;
51+
52+
protected long outputBytes;
53+
protected boolean closed;
54+
55+
public FormatTableSingleFileWriter(
56+
FileIO fileIO, FormatWriterFactory factory, Path path, String compression) {
57+
this.fileIO = fileIO;
58+
this.path = path;
59+
60+
try {
61+
if (factory instanceof SupportsDirectWrite) {
62+
throw new UnsupportedOperationException("Does not support SupportsDirectWrite.");
63+
} else {
64+
out = fileIO.newTwoPhaseOutputStream(path, false);
65+
writer = factory.create(out, compression);
66+
}
67+
} catch (IOException e) {
68+
LOG.warn(
69+
"Failed to open the bulk writer, closing the output stream and throw the error.",
70+
e);
71+
if (out != null) {
72+
abort();
73+
}
74+
throw new UncheckedIOException(e);
75+
}
76+
77+
this.closed = false;
78+
}
79+
80+
public Path path() {
81+
return path;
82+
}
83+
84+
public void write(InternalRow record) throws IOException {
85+
if (closed) {
86+
throw new RuntimeException("Writer has already closed!");
87+
}
88+
89+
try {
90+
writer.addElement(record);
91+
} catch (Throwable e) {
92+
LOG.warn("Exception occurs when writing file {}. Cleaning up.", path, e);
93+
abort();
94+
throw e;
95+
}
96+
}
97+
98+
public boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException {
99+
return writer.reachTargetSize(suggestedCheck, targetSize);
100+
}
101+
102+
public void abort() {
103+
if (writer != null) {
104+
IOUtils.closeQuietly(writer);
105+
writer = null;
106+
}
107+
if (out != null) {
108+
try {
109+
committer = ((TwoPhaseOutputStream) out).closeForCommit();
110+
} catch (Throwable e) {
111+
LOG.warn("Exception occurs when close for commit out {}", committer, e);
112+
}
113+
out = null;
114+
}
115+
if (committer != null) {
116+
try {
117+
committer.discard();
118+
} catch (Throwable e) {
119+
LOG.warn("Exception occurs when close out {}", committer, e);
120+
}
121+
}
122+
fileIO.deleteQuietly(path);
123+
}
124+
125+
public List<TwoPhaseOutputStream.Committer> committers() {
126+
if (!closed) {
127+
throw new RuntimeException("Writer should be closed before getting committer!");
128+
}
129+
return Lists.newArrayList(committer);
130+
}
131+
132+
public FileWriterAbortExecutor abortExecutor() {
133+
if (!closed) {
134+
throw new RuntimeException("Writer should be closed!");
135+
}
136+
137+
return new FileWriterAbortExecutor(fileIO, path);
138+
}
139+
140+
public void close() throws IOException {
141+
if (closed) {
142+
return;
143+
}
144+
145+
if (LOG.isDebugEnabled()) {
146+
LOG.debug("Closing file {}", path);
147+
}
148+
149+
try {
150+
if (writer != null) {
151+
writer.close();
152+
writer = null;
153+
}
154+
if (out != null) {
155+
out.flush();
156+
outputBytes = out.getPos();
157+
committer = ((TwoPhaseOutputStream) out).closeForCommit();
158+
out = null;
159+
}
160+
} catch (IOException e) {
161+
LOG.warn("Exception occurs when closing file {}. Cleaning up.", path, e);
162+
abort();
163+
throw e;
164+
} finally {
165+
closed = true;
166+
}
167+
}
168+
}

0 commit comments

Comments
 (0)