Skip to content

Commit 25f0cf3

Browse files
authored
[FLINK-37162][docs] Add sinks.md to describe Flink's Data Sink API
This closes #26013.
1 parent 3fb231e commit 25f0cf3

File tree

7 files changed

+256
-8
lines changed

7 files changed

+256
-8
lines changed
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
---
2+
title: "Data Sinks"
3+
weight: 12
4+
type: docs
5+
aliases:
6+
- /dev/stream/sinks.html
7+
---
8+
<!--
9+
Licensed to the Apache Software Foundation (ASF) under one
10+
or more contributor license agreements. See the NOTICE file
11+
distributed with this work for additional information
12+
regarding copyright ownership. The ASF licenses this file
13+
to you under the Apache License, Version 2.0 (the
14+
"License"); you may not use this file except in compliance
15+
with the License. You may obtain a copy of the License at
16+
17+
http://www.apache.org/licenses/LICENSE-2.0
18+
19+
Unless required by applicable law or agreed to in writing,
20+
software distributed under the License is distributed on an
21+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22+
KIND, either express or implied. See the License for the
23+
specific language governing permissions and limitations
24+
under the License.
25+
-->
26+
27+
# Data Sinks
28+
29+
This page describes Flink's Data Sink API and the concepts and architecture behind it.
30+
**Read this, if you are interested in how data sinks in Flink work, or if you want to implement a new Data Sink.**
31+
32+
If you are looking for pre-defined sink connectors, please check the [Connector Docs]({{< ref "docs/connectors/datastream/overview" >}}).
33+
34+
## The Data Sink API
35+
This section describes the major interfaces of the new Sink API introduced in [FLIP-191](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) and [FLIP-372](https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Enhance+and+synchronize+Sink+API+to+match+the+Source+API), and provides tips to the developers on the Sink development.
36+
37+
### Sink
38+
The {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java" name="Sink" >}} API is a factory style interface to create the [SinkWriter](#sinkwriter) to write the data.
39+
40+
The Sink implementations should be serializable as the Sink instances are serialized and uploaded to the Flink cluster at runtime.
41+
42+
#### Use the Sink
43+
We can add a `Sink` to `DataStream` by calling `DataStream.sinkTo(Sink)` method. For example,
44+
45+
{{< tabs "bde5ff60-4e61-4633-a6dc-50413cfd7b45" >}}
46+
{{< tab "Java" >}}
47+
```java
48+
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
49+
50+
Source mySource = new MySource(...);
51+
52+
DataStream<Integer> stream = env.fromSource(
53+
mySource,
54+
WatermarkStrategy.noWatermarks(),
55+
"MySourceName");
56+
57+
Sink mySink = new MySink(...);
58+
59+
stream.sinkTo(mySink);
60+
...
61+
```
62+
{{< /tab >}}
63+
{{< /tabs >}}
64+
65+
----
66+
67+
### SinkWriter
68+
69+
The core {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/sink2/SinkWriter.java" name="SinkWriter" >}} API is responsible for writing data to downstream system.
70+
71+
The `SinkWriter` API only has three methods:
72+
- write(InputT element, Context context): Adds an element to the writer.
73+
- flush(boolean endOfInput): Called on checkpoint or end of the input, setting this flag causes the writer to flush all pending data for `at-least-once`. To archive `exactly-once` semantic, the writer should implement the [SupportsCommitter](#supportscommitter) interface.
74+
- writeWatermark(Watermark watermark): Adds a watermark to the writer.
75+
76+
Please check the [Java doc](https://nightlies.apache.org/flink/flink-docs-release-2.0/api/java//org/apache/flink/api/connector/sink2/SinkWriter.html) of the class for more details.
77+
78+
## Advanced Sink API
79+
80+
### SupportsWriterState
81+
82+
The {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsWriterState.java" name="SupportsWriterState" >}} interface is used to indicate that the sink supports writer state, which means that the sink can be recovered from a failure.
83+
84+
The `SupportsWriterState` interface requires the `SinkWriter` to implement the {{ gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSinkWriter.java" name="StatefulSinkWriter" >}} interface.
85+
86+
### SupportsCommitter
87+
88+
The {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java" name="SupportsCommitter" >}} interface is used to indicate that the sink supports exactly-once semantics using a two-phase commit protocol.
89+
90+
The `Sink` consists of a `CommittingSinkWriter` that performs the precommits and a `Committer` that actually commits the data. To facilitate the separation, the `CommittingSinkWriter` creates `committables` on checkpoint or end of input and the sends it to the `Committer`.
91+
92+
The `Sink` needs to be serializable. All configuration should be validated eagerly. The respective sink writers and committers are transient and will only be created in the subtasks on the TaskManagers.
93+
94+
### Custom sink topology
95+
96+
For advanced developers, they may want to specify their own sink operator topology(A structure composed of a series of operators), such as collecting `committables` to one subtask and processing them together, or performing operations such as merging small files after `Committer`. Flink provides the following interfaces to allow expert users to customize the sink operator topology.
97+
98+
#### SupportsPreWriteTopology
99+
100+
`SupportsPreWriteTopology` interface Allows expert users to implement a custom operator topology before `SinkWriter`, which can be used to process or redistribute the input data. For example, sending data of the same partition to the same SinkWriter of Kafka or Iceberg.
101+
102+
The following figure shows the operator topology of using {{< gh_link file="flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreWriteTopology.java" name="SupportsPreWriteTopology" >}}:
103+
104+
{{< img src="/fig/dev/datastream/SupportsPreWriteTopology.png" class="center" >}}
105+
106+
In the figure above, user add a `PrePartition` and `PostPartition` operator in the `SupportsPreWriteTopology` topology, and redistribute the input data to the `SinkWriter`.
107+
108+
#### SupportsPreCommitTopology
109+
110+
`SupportsPreCommitTopology` interface Allows expert users to implement a custom operator topology after `SinkWriter` and before `Committer`, which can be used to process or redistribute the commit messages.
111+
112+
The following figure shows the operator topology of using {{< gh_link file="flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology.java" name="SupportsPreCommitTopology" >}}:
113+
114+
{{< img src="/fig/dev/datastream/SupportsPreWriteTopology.png" class="center" >}}
115+
116+
In the figure above, user add a `CollectCommit` operator in the `SupportsPreCommitTopology` topology, and collect all the commit messages from the `SinkWriter` to one subtask, then send to the `Committer` to process them centrally, this can reduce the number of interactions with the server.
117+
118+
Please note that the parallelism has only been modified here for display purposes. In fact, the parallelism can be set by user.
119+
120+
#### SupportsPostCommitTopology
121+
122+
`SupportsPostCommitTopology` interface Allows expert users to implement a custom operator topology after `Committer`.
123+
124+
The following figure shows the operator topology of using {{< gh_link file="flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPostCommitTopology.java" name="SupportsPostCommitTopology" >}}:
125+
126+
{{< img src="/fig/dev/datastream/SupportsPostCommitTopology.png" class="center" >}}
127+
128+
In the figure above, users add a `MergeFile` operator in the `SupportsPostCommitTopology` topology, the `MergeFile` operator can merge small files into larger files to speed up the reading of file system.

docs/content.zh/docs/dev/datastream/sources.md

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -371,10 +371,6 @@ Python API 中尚不支持该特性。
371371

372372
Source 的实现需要完成一部分*事件时间*分配和*水印生成*的工作。离开 SourceReader 的事件流需要具有事件时间戳,并且(在流执行期间)包含水印。有关事件时间和水印的介绍,请参见[及时流处理]({{< ref "docs/concepts/time" >}})。
373373

374-
{{< hint warning >}}
375-
旧版 {{< gh_link file="flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java" name="SourceFunction" >}} 的应用通常在之后的单独的一步中通过 `stream.assignTimestampsAndWatermarks(WatermarkStrategy)` 生成时间戳和水印。这个函数不应该与新的 Sources 一起使用,因为此时时间戳应该已经被分配了,而且该函数会覆盖掉之前的分片(split-aware)水印。
376-
{{< /hint >}}
377-
378374
#### API
379375

380376
在 DataStream API 创建期间, `WatermarkStrategy` 会被传递给 Source,并同时创建 {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java" name="TimestampAssigner" >}} 和 {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java" name="WatermarkGenerator" >}}。
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
---
2+
title: "Data Sinks"
3+
weight: 12
4+
type: docs
5+
aliases:
6+
- /dev/stream/sinks.html
7+
---
8+
<!--
9+
Licensed to the Apache Software Foundation (ASF) under one
10+
or more contributor license agreements. See the NOTICE file
11+
distributed with this work for additional information
12+
regarding copyright ownership. The ASF licenses this file
13+
to you under the Apache License, Version 2.0 (the
14+
"License"); you may not use this file except in compliance
15+
with the License. You may obtain a copy of the License at
16+
17+
http://www.apache.org/licenses/LICENSE-2.0
18+
19+
Unless required by applicable law or agreed to in writing,
20+
software distributed under the License is distributed on an
21+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22+
KIND, either express or implied. See the License for the
23+
specific language governing permissions and limitations
24+
under the License.
25+
-->
26+
27+
# Data Sinks
28+
29+
This page describes Flink's Data Sink API and the concepts and architecture behind it.
30+
**Read this, if you are interested in how data sinks in Flink work, or if you want to implement a new Data Sink.**
31+
32+
If you are looking for pre-defined sink connectors, please check the [Connector Docs]({{< ref "docs/connectors/datastream/overview" >}}).
33+
34+
## The Data Sink API
35+
This section describes the major interfaces of the new Sink API introduced in [FLIP-191](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) and [FLIP-372](https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Enhance+and+synchronize+Sink+API+to+match+the+Source+API), and provides tips to the developers on the Sink development.
36+
37+
### Sink
38+
The {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java" name="Sink" >}} API is a factory style interface to create the [SinkWriter](#sinkwriter) to write the data.
39+
40+
The Sink implementations should be serializable as the Sink instances are serialized and uploaded to the Flink cluster at runtime.
41+
42+
#### Use the Sink
43+
We can add a `Sink` to `DataStream` by calling `DataStream.sinkTo(Sink)` method. For example,
44+
45+
{{< tabs "bde5ff60-4e61-4633-a6dc-50413cfd7b45" >}}
46+
{{< tab "Java" >}}
47+
```java
48+
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
49+
50+
Source mySource = new MySource(...);
51+
52+
DataStream<Integer> stream = env.fromSource(
53+
mySource,
54+
WatermarkStrategy.noWatermarks(),
55+
"MySourceName");
56+
57+
Sink mySink = new MySink(...);
58+
59+
stream.sinkTo(mySink);
60+
...
61+
```
62+
{{< /tab >}}
63+
{{< /tabs >}}
64+
65+
----
66+
67+
### SinkWriter
68+
69+
The core {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/sink2/SinkWriter.java" name="SinkWriter" >}} API is responsible for writing data to downstream system.
70+
71+
The `SinkWriter` API only has three methods:
72+
- write(InputT element, Context context): Adds an element to the writer.
73+
- flush(boolean endOfInput): Called on checkpoint or end of the input, setting this flag causes the writer to flush all pending data for `at-least-once`. To archive `exactly-once` semantic, the writer should implement the [SupportsCommitter](#supportscommitter) interface.
74+
- writeWatermark(Watermark watermark): Adds a watermark to the writer.
75+
76+
Please check the [Java doc](https://nightlies.apache.org/flink/flink-docs-release-2.0/api/java//org/apache/flink/api/connector/sink2/SinkWriter.html) of the class for more details.
77+
78+
## Advanced Sink API
79+
80+
### SupportsWriterState
81+
82+
The {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsWriterState.java" name="SupportsWriterState" >}} interface is used to indicate that the sink supports writer state, which means that the sink can be recovered from a failure.
83+
84+
The `SupportsWriterState` interface requires the `SinkWriter` to implement the {{ gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSinkWriter.java" name="StatefulSinkWriter" >}} interface.
85+
86+
### SupportsCommitter
87+
88+
The {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java" name="SupportsCommitter" >}} interface is used to indicate that the sink supports exactly-once semantics using a two-phase commit protocol.
89+
90+
The `Sink` consists of a `CommittingSinkWriter` that performs the precommits and a `Committer` that actually commits the data. To facilitate the separation, the `CommittingSinkWriter` creates `committables` on checkpoint or end of input and the sends it to the `Committer`.
91+
92+
The `Sink` needs to be serializable. All configuration should be validated eagerly. The respective sink writers and committers are transient and will only be created in the subtasks on the TaskManagers.
93+
94+
### Custom sink topology
95+
96+
For advanced developers, they may want to specify their own sink operator topology(A structure composed of a series of operators), such as collecting `committables` to one subtask and processing them together, or performing operations such as merging small files after `Committer`. Flink provides the following interfaces to allow expert users to customize the sink operator topology.
97+
98+
#### SupportsPreWriteTopology
99+
100+
`SupportsPreWriteTopology` interface Allows expert users to implement a custom operator topology before `SinkWriter`, which can be used to process or redistribute the input data. For example, sending data of the same partition to the same SinkWriter of Kafka or Iceberg.
101+
102+
The following figure shows the operator topology of using {{< gh_link file="flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreWriteTopology.java" name="SupportsPreWriteTopology" >}}:
103+
104+
{{< img src="/fig/dev/datastream/SupportsPreWriteTopology.png" class="center" >}}
105+
106+
In the figure above, user add a `PrePartition` and `PostPartition` operator in the `SupportsPreWriteTopology` topology, and redistribute the input data to the `SinkWriter`.
107+
108+
#### SupportsPreCommitTopology
109+
110+
`SupportsPreCommitTopology` interface Allows expert users to implement a custom operator topology after `SinkWriter` and before `Committer`, which can be used to process or redistribute the commit messages.
111+
112+
The following figure shows the operator topology of using {{< gh_link file="flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology.java" name="SupportsPreCommitTopology" >}}:
113+
114+
{{< img src="/fig/dev/datastream/SupportsPreWriteTopology.png" class="center" >}}
115+
116+
In the figure above, user add a `CollectCommit` operator in the `SupportsPreCommitTopology` topology, and collect all the commit messages from the `SinkWriter` to one subtask, then send to the `Committer` to process them centrally, this can reduce the number of interactions with the server.
117+
118+
Please note that the parallelism has only been modified here for display purposes. In fact, the parallelism can be set by user.
119+
120+
#### SupportsPostCommitTopology
121+
122+
`SupportsPostCommitTopology` interface Allows expert users to implement a custom operator topology after `Committer`.
123+
124+
The following figure shows the operator topology of using {{< gh_link file="flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPostCommitTopology.java" name="SupportsPostCommitTopology" >}}:
125+
126+
{{< img src="/fig/dev/datastream/SupportsPostCommitTopology.png" class="center" >}}
127+
128+
In the figure above, users add a `MergeFile` operator in the `SupportsPostCommitTopology` topology, the `MergeFile` operator can merge small files into larger files to speed up the reading of file system.

docs/content/docs/dev/datastream/sources.md

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -356,10 +356,6 @@ The `SourceReader` implementations can also implement their own threading model
356356

357357
*Event Time* assignment and *Watermark Generation* happen as part of the data sources. The event streams leaving the Source Readers have event timestamps and (during streaming execution) contain watermarks. See [Timely Stream Processing]({{< ref "docs/concepts/time" >}}) for an introduction to Event Time and Watermarks.
358358

359-
{{< hint warning >}}
360-
Applications based on the legacy {{< gh_link file="flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java" name="SourceFunction" >}} typically generate timestamps and watermarks in a separate later step via `stream.assignTimestampsAndWatermarks(WatermarkStrategy)`. This function should not be used with the new sources, because timestamps will be already assigned, and it will override the previous split-aware watermarks.
361-
{{< /hint >}}
362-
363359
#### API
364360

365361
The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java" name="TimestampAssigner" >}} and {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java" name="WatermarkGenerator" >}}.
307 KB
Loading
276 KB
Loading
340 KB
Loading

0 commit comments

Comments
 (0)