Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions docs/content.zh/docs/connectors/datastream/pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,20 @@ SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder()
streamExecEnv.addSource(pubsubSource);
```
{{< /tab >}}
{{< tab "Python" >}}
```python
stream_exec_env = StreamExecutionEnvironment.get_execution_environment()

deserializer = ...
pubsub_source = PubSubSource.new_builder() \
.with_deserialization_schema(deserializer) \
.with_project_name("project") \
.with_subscription_name("subscription") \
.build()

stream_exec_env.add_source(pubsub_source)
```
{{< /tab >}}
{{< /tabs >}}

当前还不支持 PubSub 的 source functions [pulls](https://cloud.google.com/pubsub/docs/pull) messages 和 [push endpoints](https://cloud.google.com/pubsub/docs/push)。
Expand Down Expand Up @@ -90,6 +104,20 @@ SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder()
dataStream.addSink(pubsubSink);
```
{{< /tab >}}
{{< tab "Python" >}}
```python
data_stream = ...

serialization_schema = ...
pubsub_sink = PubSubSink.new_builder() \
.with_serialization_schema(serialization_schema) \
.with_project_name("project") \
.with_topic_name("topic") \
.build()

data_stream.add_sink(pubsub_sink)
```
{{< /tab >}}
{{< /tabs >}}

### Google Credentials
Expand Down Expand Up @@ -130,6 +158,29 @@ env.addSource(pubsubSource)
.addSink(pubsubSink);
```
{{< /tab >}}
{{< tab "Python" >}}
```python
host_and_port = "localhost:1234"
deserialization_schema = ...
pubsub_source = PubSubSource.new_builder() \
.with_deserialization_schema(deserialization_schema) \
.with_project_name("my-fake-project") \
.with_subscription_name("subscription") \
.with_pubsub_subscriber_factory(PubSubSubscriberFactory.default(10, Duration.of_seconds(15), 100)) \
.build()
serialization_schema = ...
pubsub_sink = PubSubSink.new_builder() \
.with_serialization_schema(serialization_schema) \
.with_project_name("my-fake-project") \
.with_topic_name("topic") \
.with_host_and_port_for_emulator(host_and_port) \
.build()

env = StreamExecutionEnvironment.get_execution_environment()
env.add_source(pubsub_source) \
.add_sink(pubsub_sink)
```
{{< /tab >}}
{{< /tabs >}}

### 至少一次语义保证
Expand Down
59 changes: 59 additions & 0 deletions docs/content/docs/connectors/datastream/pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,20 @@ SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder()
streamExecEnv.addSource(pubsubSource);
```
{{< /tab >}}
{{< tab "Python" >}}
```python
stream_exec_env = StreamExecutionEnvironment.get_execution_environment()

deserializer = ...
pubsub_source = PubSubSource.new_builder() \
.with_deserialization_schema(deserializer) \
.with_project_name("project") \
.with_subscription_name("subscription") \
.build()

stream_exec_env.add_source(pubsub_source)
```
{{< /tab >}}
{{< /tabs >}}

Currently the source functions [pulls](https://cloud.google.com/pubsub/docs/pull) messages from PubSub, [push endpoints](https://cloud.google.com/pubsub/docs/push) are not supported.
Expand All @@ -83,6 +97,8 @@ This builder works in a similar way to the PubSubSource.

Example:

{{< tabs "f5af7878-b460-4e05-8072-a0fd077ba6e5" >}}
{{< tab "Java" >}}
```java
DataStream<SomeObject> dataStream = (...);

Expand All @@ -95,6 +111,22 @@ SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder()

dataStream.addSink(pubsubSink);
```
{{< /tab >}}
{{< tab "Python" >}}
```python
data_stream = ...

serialization_schema = ...
pubsub_sink = PubSubSink.new_builder() \
.with_serialization_schema(serialization_schema) \
.with_project_name("project") \
.with_topic_name("topic") \
.build()

data_stream.add_sink(pubsub_sink)
```
{{< /tab >}}
{{< /tabs >}}

### Google Credentials

Expand All @@ -110,6 +142,8 @@ When running integration tests you might not want to connect to PubSub directly

The following example shows how you would create a source to read messages from the emulator and send them back:

{{< tabs "f5af7878-b460-4e05-8072-a0fd078ba6e1" >}}
{{< tab "Java" >}}
```java
String hostAndPort = "localhost:1234";
DeserializationSchema<SomeObject> deserializationSchema = (...);
Expand All @@ -131,6 +165,31 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
env.addSource(pubsubSource)
.addSink(pubsubSink);
```
{{< /tab >}}
{{< tab "Python" >}}
```python
host_and_port = "localhost:1234"
deserialization_schema = ...
pubsub_source = PubSubSource.new_builder() \
.with_deserialization_schema(deserialization_schema) \
.with_project_name("my-fake-project") \
.with_subscription_name("subscription") \
.with_pubsub_subscriber_factory(PubSubSubscriberFactory.default(10, Duration.of_seconds(15), 100)) \
.build()
serialization_schema = ...
pubsub_sink = PubSubSink.new_builder() \
.with_serialization_schema(serialization_schema) \
.with_project_name("my-fake-project") \
.with_topic_name("topic") \
.with_host_and_port_for_emulator(host_and_port) \
.build()

env = StreamExecutionEnvironment.get_execution_environment()
env.add_source(pubsub_source) \
.add_sink(pubsub_sink)
```
{{< /tab >}}
{{< /tabs >}}

### At least once guarantee

Expand Down
36 changes: 36 additions & 0 deletions flink-connectors/flink-connector-gcp-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,40 @@ under the License.
<scope>test</scope>
</dependency>
</dependencies>

<build>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to add a module flink-sql-connector-gcp-pubsub and move the following logic there.

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>org.apache.flink:flink-connector-gcp-pubsub*</include>
<include>com.google.cloud:google-cloud-pubsub</include>
<include>com.google.*:*</include>
<include>org.threeten:*</include>
<include>io.grpc:*</include>
<include>io.opencensus:*</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>org.apache.flink.pubsub.shaded.com.google</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
12 changes: 12 additions & 0 deletions flink-python/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,14 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<!-- Indirectly accessed in pyflink_gateway_server -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-gcp-pubsub</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<!-- Indirectly accessed in pyflink_gateway_server -->
<groupId>org.apache.flink</groupId>
Expand Down Expand Up @@ -556,6 +564,10 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-gcp-pubsub</artifactId>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
Expand Down
7 changes: 7 additions & 0 deletions flink-python/pyflink/datastream/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ def _install():
setattr(connectors, 'StreamFormat', file_system.StreamFormat)
setattr(connectors, 'StreamingFileSink', file_system.StreamingFileSink)

# pubsub
from pyflink.datastream.connectors import pubsub
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for backward compatibility. Since pubsub is newly introduced, so this is not necessary and could be removed.

Besides, we may need to add some documentation here: https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/__init__.py#L209

setattr(connectors, 'PubSubSource', pubsub.PubSubSource)
setattr(connectors, 'PubSubSink', pubsub.PubSubSink)
setattr(connectors, 'Credentials', pubsub.Credentials)
setattr(connectors, 'PubSubSubscriberFactory', pubsub.PubSubSubscriberFactory)


# for backward compatibility
_install()
Expand Down
Loading