Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
pixeeai authored Dec 1, 2024
2 parents 9c44f13 + e72c06c commit 735dd1c
Show file tree
Hide file tree
Showing 62 changed files with 1,405 additions and 349 deletions.
2 changes: 1 addition & 1 deletion docs/content/concepts/table-types.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Paimon supports table types:
3. view: metastore required, views in SQL are a kind of virtual table
4. format-table: file format table refers to a directory that contains multiple files of the same format, where
operations on this table allow for reading or writing to these files, compatible with Hive tables
5. object table: provides metadata indexes for unstructured data objects in the specified Object Storage storage directory.
5. object table: provides metadata indexes for unstructured data objects in the specified Object Storage directory.
6. materialized-table: aimed at simplifying both batch and stream data pipelines, providing a consistent development
experience, see [Flink Materialized Table](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/materialized-table/overview/)

Expand Down
4 changes: 2 additions & 2 deletions docs/content/engines/doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ See [Apache Doris Website](https://doris.apache.org/docs/lakehouse/datalake-anal

- Read optimized for Primary Key Table

Doris can utilize the [Read optimized](https://paimon.apache.org/releases/release-0.6/#read-optimized) feature for Primary Key Table(release in Paimon 0.6), by reading base data files using native Parquet/ORC reader and delta file using JNI.
Doris can utilize the [Read optimized](https://paimon.apache.org/docs/0.8/primary-key-table/read-optimized/) feature for Primary Key Table(release in Paimon 0.6), by reading base data files using native Parquet/ORC reader and delta file using JNI.

- Deletion Vectors

Doris(2.1.4+) natively supports [Deletion Vectors](https://paimon.apache.org/releases/release-0.8/#deletion-vectors)(released in Paimon 0.8).
Doris(2.1.4+) natively supports [Deletion Vectors](https://paimon.apache.org/docs/0.8/primary-key-table/deletion-vectors/)(released in Paimon 0.8).

## Doris to Paimon type mapping

Expand Down
2 changes: 1 addition & 1 deletion docs/content/engines/starrocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ SELECT * FROM paimon_catalog.test_db.partition_tbl$partitions;
## StarRocks to Paimon type mapping

This section lists all supported type conversion between StarRocks and Paimon.
All StarRocks’s data types can be found in this doc [StarRocks Data type overview](https://docs.starrocks.io/docs/sql-reference/data-types/data-type-list/).
All StarRocks’s data types can be found in this doc [StarRocks Data type overview](https://docs.starrocks.io/docs/sql-reference/data-types/).

<table class="table table-bordered">
<thead>
Expand Down
2 changes: 1 addition & 1 deletion docs/content/primary-key-table/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ Records within a data file are sorted by their primary keys. Within a sorted run

{{< img src="/img/sorted-runs.png">}}

As you can see, different sorted runs may have overlapping primary key ranges, and may even contain the same primary key. When querying the LSM tree, all sorted runs must be combined and all records with the same primary key must be merged according to the user-specified [merge engine]({{< ref "primary-key-table/merge-engine" >}}) and the timestamp of each record.
As you can see, different sorted runs may have overlapping primary key ranges, and may even contain the same primary key. When querying the LSM tree, all sorted runs must be combined and all records with the same primary key must be merged according to the user-specified [merge engine]({{< ref "primary-key-table/merge-engine/overview" >}}) and the timestamp of each record.

New records written into the LSM tree will be first buffered in memory. When the memory buffer is full, all records in memory will be sorted and flushed to disk. A new sorted run is now created.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import org.apache.paimon.rest.RESTResponse;
import org.apache.paimon.utils.Preconditions;

import org.apache.paimon.shade.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.beans.ConstructorProperties;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.paimon.rest.responses;

import org.apache.paimon.shade.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.beans.ConstructorProperties;
import java.io.PrintWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

package org.apache.paimon.rest;

import org.apache.paimon.shade.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.beans.ConstructorProperties;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.paimon.table.sink.KeyAndBucketExtractor;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;

/** {@link CdcDynamicBucketSinkBase} for {@link CdcRecord}. */
public class CdcDynamicBucketSink extends CdcDynamicBucketSinkBase<CdcRecord> {
Expand All @@ -42,8 +42,8 @@ protected KeyAndBucketExtractor<CdcRecord> createExtractor(TableSchema schema) {
}

@Override
protected OneInputStreamOperator<Tuple2<CdcRecord, Integer>, Committable> createWriteOperator(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new CdcDynamicBucketWriteOperator(table, writeProvider, commitUser);
protected OneInputStreamOperatorFactory<Tuple2<CdcRecord, Integer>, Committable>
createWriteOperatorFactory(StoreSinkWrite.Provider writeProvider, String commitUser) {
return new CdcDynamicBucketWriteOperator.Factory(table, writeProvider, commitUser);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@
package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.TableWriteOperator;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.io.IOException;
Expand All @@ -43,11 +47,12 @@ public class CdcDynamicBucketWriteOperator extends TableWriteOperator<Tuple2<Cdc

private final long retrySleepMillis;

public CdcDynamicBucketWriteOperator(
private CdcDynamicBucketWriteOperator(
StreamOperatorParameters<Committable> parameters,
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
super(parameters, table, storeSinkWriteProvider, initialCommitUser);
this.retrySleepMillis =
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
}
Expand Down Expand Up @@ -85,4 +90,30 @@ public void processElement(StreamRecord<Tuple2<CdcRecord, Integer>> element) thr
throw new IOException(e);
}
}

/** {@link StreamOperatorFactory} of {@link CdcDynamicBucketWriteOperator}. */
public static class Factory extends TableWriteOperator.Factory<Tuple2<CdcRecord, Integer>> {

public Factory(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<Committable>> T createStreamOperator(
StreamOperatorParameters<Committable> parameters) {
return (T)
new CdcDynamicBucketWriteOperator(
parameters, table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("rawtypes")
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return CdcDynamicBucketWriteOperator.class;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;

/**
* A {@link FlinkSink} for fixed-bucket table which accepts {@link CdcRecord} and waits for a schema
Expand All @@ -39,8 +39,8 @@ public CdcFixedBucketSink(FileStoreTable table) {
}

@Override
protected OneInputStreamOperator<CdcRecord, Committable> createWriteOperator(
protected OneInputStreamOperatorFactory<CdcRecord, Committable> createWriteOperatorFactory(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new CdcRecordStoreWriteOperator(table, writeProvider, commitUser);
return new CdcRecordStoreWriteOperator.Factory(table, writeProvider, commitUser);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@

import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.io.IOException;
Expand Down Expand Up @@ -74,12 +77,13 @@ public class CdcRecordStoreMultiWriteOperator
private String commitUser;
private ExecutorService compactExecutor;

public CdcRecordStoreMultiWriteOperator(
private CdcRecordStoreMultiWriteOperator(
StreamOperatorParameters<MultiTableCommittable> parameters,
Catalog.Loader catalogLoader,
StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider,
String initialCommitUser,
Options options) {
super(options);
super(parameters, options);
this.catalogLoader = catalogLoader;
this.storeSinkWriteProvider = storeSinkWriteProvider;
this.initialCommitUser = initialCommitUser;
Expand Down Expand Up @@ -254,4 +258,42 @@ public Map<Identifier, StoreSinkWrite> writes() {
public String commitUser() {
return commitUser;
}

/** {@link StreamOperatorFactory} of {@link CdcRecordStoreMultiWriteOperator}. */
public static class Factory
extends PrepareCommitOperator.Factory<CdcMultiplexRecord, MultiTableCommittable> {
private final StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider;
private final String initialCommitUser;
private final Catalog.Loader catalogLoader;

public Factory(
Catalog.Loader catalogLoader,
StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider,
String initialCommitUser,
Options options) {
super(options);
this.catalogLoader = catalogLoader;
this.storeSinkWriteProvider = storeSinkWriteProvider;
this.initialCommitUser = initialCommitUser;
}

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<MultiTableCommittable>> T createStreamOperator(
StreamOperatorParameters<MultiTableCommittable> parameters) {
return (T)
new CdcRecordStoreMultiWriteOperator(
parameters,
catalogLoader,
storeSinkWriteProvider,
initialCommitUser,
options);
}

@Override
@SuppressWarnings("rawtypes")
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return CdcRecordStoreMultiWriteOperator.class;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.TableWriteOperator;
Expand All @@ -27,6 +28,9 @@
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.io.IOException;
Expand All @@ -50,11 +54,12 @@ public class CdcRecordStoreWriteOperator extends TableWriteOperator<CdcRecord> {

private final long retrySleepMillis;

public CdcRecordStoreWriteOperator(
protected CdcRecordStoreWriteOperator(
StreamOperatorParameters<Committable> parameters,
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
super(parameters, table, storeSinkWriteProvider, initialCommitUser);
this.retrySleepMillis =
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
}
Expand Down Expand Up @@ -92,4 +97,30 @@ public void processElement(StreamRecord<CdcRecord> element) throws Exception {
throw new IOException(e);
}
}

/** {@link StreamOperatorFactory} of {@link CdcRecordStoreWriteOperator}. */
public static class Factory extends TableWriteOperator.Factory<CdcRecord> {

public Factory(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<Committable>> T createStreamOperator(
StreamOperatorParameters<Committable> parameters) {
return (T)
new CdcRecordStoreWriteOperator(
parameters, table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("rawtypes")
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return CdcRecordStoreWriteOperator.class;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;

import javax.annotation.Nullable;

Expand All @@ -42,9 +42,9 @@ public CdcUnawareBucketSink(FileStoreTable table, Integer parallelism) {
}

@Override
protected OneInputStreamOperator<CdcRecord, Committable> createWriteOperator(
protected OneInputStreamOperatorFactory<CdcRecord, Committable> createWriteOperatorFactory(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new CdcUnawareBucketWriteOperator(table, writeProvider, commitUser);
return new CdcUnawareBucketWriteOperator.Factory(table, writeProvider, commitUser);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,26 @@

package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowKind;

import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/** A {@link PrepareCommitOperator} to write {@link CdcRecord} to unaware-bucket mode table. */
public class CdcUnawareBucketWriteOperator extends CdcRecordStoreWriteOperator {

public CdcUnawareBucketWriteOperator(
private CdcUnawareBucketWriteOperator(
StreamOperatorParameters<Committable> parameters,
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
super(parameters, table, storeSinkWriteProvider, initialCommitUser);
}

@Override
Expand All @@ -42,4 +47,30 @@ public void processElement(StreamRecord<CdcRecord> element) throws Exception {
super.processElement(element);
}
}

/** {@link StreamOperatorFactory} of {@link CdcUnawareBucketWriteOperator}. */
public static class Factory extends CdcRecordStoreWriteOperator.Factory {

public Factory(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<Committable>> T createStreamOperator(
StreamOperatorParameters<Committable> parameters) {
return (T)
new CdcUnawareBucketWriteOperator(
parameters, table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("rawtypes")
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return CdcUnawareBucketWriteOperator.class;
}
}
}
Loading

0 comments on commit 735dd1c

Please sign in to comment.