Skip to content

Commit daf482e

Browse files
authored
[flink] Support creating table with blob in flink sql. (#6351)
1 parent 4a83228 commit daf482e

File tree

19 files changed

+222
-55
lines changed

19 files changed

+222
-55
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@
5050
<td>Boolean</td>
5151
<td>Whether to create underlying storage when reading and writing the table.</td>
5252
</tr>
53+
<tr>
54+
<td><h5>blob.field</h5></td>
55+
<td style="word-wrap: break-word;">(none)</td>
56+
<td>String</td>
57+
<td>Specify the blob field.</td>
58+
</tr>
5359
<tr>
5460
<td><h5>bucket</h5></td>
5561
<td style="word-wrap: break-word;">-1</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1989,6 +1989,12 @@ public InlineElement getDescription() {
19891989
.defaultValue(false)
19901990
.withDescription("Format table file path only contain partition value.");
19911991

1992+
public static final ConfigOption<String> BLOB_FIELD =
1993+
key("blob.field")
1994+
.stringType()
1995+
.noDefaultValue()
1996+
.withDescription("Specify the blob field.");
1997+
19921998
private final Options options;
19931999

19942000
public CoreOptions(Map<String, String> options) {

paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java

Lines changed: 1 addition & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,17 @@
1818

1919
package org.apache.paimon.io;
2020

21-
import org.apache.paimon.annotation.VisibleForTesting;
2221
import org.apache.paimon.data.InternalRow;
2322
import org.apache.paimon.fileindex.FileIndexOptions;
2423
import org.apache.paimon.format.FileFormat;
25-
import org.apache.paimon.format.SimpleStatsCollector;
26-
import org.apache.paimon.format.avro.AvroFileFormat;
2724
import org.apache.paimon.fs.FileIO;
2825
import org.apache.paimon.manifest.FileSource;
29-
import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
3026
import org.apache.paimon.statistics.SimpleColStatsCollector;
3127
import org.apache.paimon.types.RowType;
3228
import org.apache.paimon.utils.LongCounter;
3329

3430
import javax.annotation.Nullable;
3531

36-
import java.util.Arrays;
3732
import java.util.List;
3833

3934
/** {@link RollingFileWriterImpl} for data files containing {@link InternalRow}. */
@@ -58,7 +53,7 @@ public RowDataRollingFileWriter(
5853
() ->
5954
new RowDataFileWriter(
6055
fileIO,
61-
createFileWriterContext(
56+
RollingFileWriter.createFileWriterContext(
6257
fileFormat, writeSchema, statsCollectors, fileCompression),
6358
pathFactory.newPath(),
6459
writeSchema,
@@ -72,34 +67,4 @@ public RowDataRollingFileWriter(
7267
writeCols),
7368
targetFileSize);
7469
}
75-
76-
@VisibleForTesting
77-
static FileWriterContext createFileWriterContext(
78-
FileFormat fileFormat,
79-
RowType rowType,
80-
SimpleColStatsCollector.Factory[] statsCollectors,
81-
String fileCompression) {
82-
return new FileWriterContext(
83-
fileFormat.createWriterFactory(rowType),
84-
createStatsProducer(fileFormat, rowType, statsCollectors),
85-
fileCompression);
86-
}
87-
88-
private static SimpleStatsProducer createStatsProducer(
89-
FileFormat fileFormat,
90-
RowType rowType,
91-
SimpleColStatsCollector.Factory[] statsCollectors) {
92-
boolean isDisabled =
93-
Arrays.stream(SimpleColStatsCollector.create(statsCollectors))
94-
.allMatch(p -> p instanceof NoneSimpleColStatsCollector);
95-
if (isDisabled) {
96-
return SimpleStatsProducer.disabledProducer();
97-
}
98-
if (fileFormat instanceof AvroFileFormat) {
99-
SimpleStatsCollector collector = new SimpleStatsCollector(rowType, statsCollectors);
100-
return SimpleStatsProducer.fromCollector(collector);
101-
}
102-
return SimpleStatsProducer.fromExtractor(
103-
fileFormat.createStatsExtractor(rowType, statsCollectors).orElse(null));
104-
}
10570
}

paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void initialize(String identifier, boolean statsDenseStore) {
7171
() ->
7272
new RowDataFileWriter(
7373
LocalFileIO.create(),
74-
RowDataRollingFileWriter.createFileWriterContext(
74+
RollingFileWriter.createFileWriterContext(
7575
fileFormat,
7676
SCHEMA,
7777
SimpleColStatsCollector.createFullStatsFactories(

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@
151151
import static org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB;
152152
import static org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER;
153153
import static org.apache.paimon.flink.FlinkCatalogOptions.REGISTER_TIMEOUT;
154+
import static org.apache.paimon.flink.LogicalTypeConversion.toBlobType;
154155
import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
155156
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
156157
import static org.apache.paimon.flink.log.LogStoreRegister.registerLogSystem;
@@ -1050,6 +1051,16 @@ public static Schema fromCatalogTable(CatalogBaseTable catalogTable) {
10501051
RowType rowType = (RowType) schema.toPhysicalRowDataType().getLogicalType();
10511052

10521053
Map<String, String> options = new HashMap<>(catalogTable.getOptions());
1054+
String blobName = options.get(CoreOptions.BLOB_FIELD.key());
1055+
if (blobName != null) {
1056+
checkArgument(
1057+
options.containsKey(CoreOptions.DATA_EVOLUTION_ENABLED.key()),
1058+
"When setting '"
1059+
+ CoreOptions.BLOB_FIELD.key()
1060+
+ "', you must also set '"
1061+
+ CoreOptions.DATA_EVOLUTION_ENABLED.key()
1062+
+ "'");
1063+
}
10531064
// Serialize virtual columns and watermark to the options
10541065
// This is what Flink SQL needs, the storage itself does not need them
10551066
options.putAll(columnOptions(schema));
@@ -1069,7 +1080,9 @@ public static Schema fromCatalogTable(CatalogBaseTable catalogTable) {
10691080
field ->
10701081
schemaBuilder.column(
10711082
field.getName(),
1072-
toDataType(field.getType()),
1083+
field.getName().equals(blobName)
1084+
? toBlobType(field.getType())
1085+
: toDataType(field.getType()),
10731086
columnComments.get(field.getName())));
10741087

10751088
return schemaBuilder.build();

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
/** Convert to Flink row data. */
4242
public class FlinkRowData implements RowData {
4343

44-
private InternalRow row;
44+
protected InternalRow row;
4545

4646
public FlinkRowData(InternalRow row) {
4747
this.row = row;
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink;
20+
21+
import org.apache.paimon.data.InternalRow;
22+
23+
/** Convert to Flink row data with blob. */
24+
public class FlinkRowDataWithBlob extends FlinkRowData {
25+
26+
private final int blobField;
27+
28+
public FlinkRowDataWithBlob(InternalRow row, int blobField) {
29+
super(row);
30+
this.blobField = blobField;
31+
}
32+
33+
@Override
34+
public byte[] getBinary(int pos) {
35+
return pos == blobField ? row.getBlob(pos).toData() : row.getBinary(pos);
36+
}
37+
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,18 @@
1818

1919
package org.apache.paimon.flink;
2020

21+
import org.apache.paimon.types.BlobType;
2122
import org.apache.paimon.types.DataType;
2223
import org.apache.paimon.types.RowType;
2324

25+
import org.apache.flink.table.types.logical.BinaryType;
2426
import org.apache.flink.table.types.logical.LogicalType;
27+
import org.apache.flink.table.types.logical.VarBinaryType;
2528

2629
import java.util.concurrent.atomic.AtomicInteger;
2730

31+
import static org.apache.paimon.utils.Preconditions.checkArgument;
32+
2833
/** Conversion between {@link LogicalType} and {@link DataType}. */
2934
public class LogicalTypeConversion {
3035

@@ -37,6 +42,13 @@ public static LogicalType toLogicalType(DataType dataType) {
3742
return dataType.accept(DataTypeToLogicalType.INSTANCE);
3843
}
3944

45+
public static BlobType toBlobType(LogicalType logicalType) {
46+
checkArgument(
47+
logicalType instanceof BinaryType || logicalType instanceof VarBinaryType,
48+
"Expected BinaryType or VarBinaryType, but got: " + logicalType);
49+
return new BlobType();
50+
}
51+
4052
public static RowType toDataType(org.apache.flink.table.types.logical.RowType logicalType) {
4153
return (RowType) toDataType(logicalType, new AtomicInteger(-1));
4254
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
2424
import org.apache.paimon.flink.utils.TableScanUtils;
2525
import org.apache.paimon.table.source.TableRead;
26+
import org.apache.paimon.types.RowType;
2627

2728
import org.apache.flink.api.connector.source.SourceReader;
2829
import org.apache.flink.api.connector.source.SourceReaderContext;
@@ -50,14 +51,16 @@ public FileStoreSourceReader(
5051
FileStoreSourceReaderMetrics metrics,
5152
IOManager ioManager,
5253
@Nullable Long limit,
53-
@Nullable NestedProjectedRowData rowData) {
54+
@Nullable NestedProjectedRowData rowData,
55+
@Nullable RowType readType) {
5456
// limiter is created in SourceReader, it can be shared in all split readers
5557
super(
5658
() ->
5759
new FileStoreSourceSplitReader(
5860
tableRead.withIOManager(ioManager),
5961
RecordLimiter.create(limit),
60-
metrics),
62+
metrics,
63+
readType),
6164
(element, output, state) ->
6265
FlinkRecordsWithSplitIds.emitRecord(
6366
readerContext, element, output, state, metrics, rowData),

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@
2020

2121
import org.apache.paimon.data.InternalRow;
2222
import org.apache.paimon.flink.FlinkRowData;
23+
import org.apache.paimon.flink.FlinkRowDataWithBlob;
2324
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
2425
import org.apache.paimon.reader.RecordReader;
2526
import org.apache.paimon.reader.RecordReader.RecordIterator;
2627
import org.apache.paimon.table.source.DataSplit;
2728
import org.apache.paimon.table.source.Split;
2829
import org.apache.paimon.table.source.TableRead;
30+
import org.apache.paimon.types.DataTypeRoot;
31+
import org.apache.paimon.types.RowType;
2932
import org.apache.paimon.utils.Pool;
3033

3134
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -77,12 +80,13 @@ public class FileStoreSourceSplitReader
7780
public FileStoreSourceSplitReader(
7881
TableRead tableRead,
7982
@Nullable RecordLimiter limiter,
80-
FileStoreSourceReaderMetrics metrics) {
83+
FileStoreSourceReaderMetrics metrics,
84+
@Nullable RowType readType) {
8185
this.tableRead = tableRead;
8286
this.limiter = limiter;
8387
this.splits = new LinkedList<>();
8488
this.pool = new Pool<>(1);
85-
this.pool.add(new FileStoreRecordIterator());
89+
this.pool.add(new FileStoreRecordIterator(readType));
8690
this.paused = false;
8791
this.metrics = metrics;
8892
this.wakeup = new AtomicBoolean(false);
@@ -260,6 +264,20 @@ private class FileStoreRecordIterator implements BulkFormat.RecordIterator<RowDa
260264

261265
private final MutableRecordAndPosition<RowData> recordAndPosition =
262266
new MutableRecordAndPosition<>();
267+
@Nullable private final Integer blobField;
268+
269+
private FileStoreRecordIterator(@Nullable RowType rowType) {
270+
this.blobField = rowType == null ? null : blobFieldIndex(rowType);
271+
}
272+
273+
private Integer blobFieldIndex(RowType rowType) {
274+
for (int i = 0; i < rowType.getFieldCount(); i++) {
275+
if (rowType.getTypeAt(i).getTypeRoot() == DataTypeRoot.BLOB) {
276+
return i;
277+
}
278+
}
279+
return null;
280+
}
263281

264282
public FileStoreRecordIterator replace(RecordIterator<InternalRow> iterator) {
265283
this.iterator = iterator;
@@ -283,7 +301,10 @@ public RecordAndPosition<RowData> next() {
283301
return null;
284302
}
285303

286-
recordAndPosition.setNext(new FlinkRowData(row));
304+
recordAndPosition.setNext(
305+
blobField == null
306+
? new FlinkRowData(row)
307+
: new FlinkRowDataWithBlob(row, blobField));
287308
currentNumRead++;
288309
if (limiter != null) {
289310
limiter.increment();

0 commit comments

Comments
 (0)