diff --git a/docs/content.zh/docs/connectors/datastream/pubsub.md b/docs/content.zh/docs/connectors/datastream/pubsub.md index de1ea93ea6928..f1b79b4970651 100644 --- a/docs/content.zh/docs/connectors/datastream/pubsub.md +++ b/docs/content.zh/docs/connectors/datastream/pubsub.md @@ -63,6 +63,20 @@ SourceFunction pubsubSource = PubSubSource.newBuilder() streamExecEnv.addSource(pubsubSource); ``` {{< /tab >}} +{{< tab "Python" >}} +```python +stream_exec_env = StreamExecutionEnvironment.get_execution_environment() + +deserializer = ... +pubsub_source = PubSubSource.new_builder() \ + .with_deserialization_schema(deserializer) \ + .with_project_name("project") \ + .with_subscription_name("subscription") \ + .build() + +stream_exec_env.add_source(pubsub_source) +``` +{{< /tab >}} {{< /tabs >}} 当前还不支持 PubSub 的 source functions [pulls](https://cloud.google.com/pubsub/docs/pull) messages 和 [push endpoints](https://cloud.google.com/pubsub/docs/push)。 @@ -90,6 +104,20 @@ SinkFunction pubsubSink = PubSubSink.newBuilder() dataStream.addSink(pubsubSink); ``` {{< /tab >}} +{{< tab "Python" >}} +```python +data_stream = ... + +serialization_schema = ... +pubsub_sink = PubSubSink.new_builder() \ + .with_serialization_schema(serialization_schema) \ + .with_project_name("project") \ + .with_topic_name("topic") \ + .build() + +data_stream.add_sink(pubsub_sink) +``` +{{< /tab >}} {{< /tabs >}} ### Google Credentials @@ -130,6 +158,29 @@ env.addSource(pubsubSource) .addSink(pubsubSink); ``` {{< /tab >}} +{{< tab "Python" >}} +```python +host_and_port = "localhost:1234" +deserialization_schema = ... +pubsub_source = PubSubSource.new_builder() \ + .with_deserialization_schema(deserialization_schema) \ + .with_project_name("my-fake-project") \ + .with_subscription_name("subscription") \ + .with_pubsub_subscriber_factory(PubSubSubscriberFactory.default(10, Duration.of_seconds(15), 100)) \ + .build() +serialization_schema = ... +pubsub_sink = PubSubSink.new_builder() \ + .with_serialization_schema(serialization_schema) \ + .with_project_name("my-fake-project") \ + .with_topic_name("topic") \ + .with_host_and_port_for_emulator(host_and_port) \ + .build() + +env = StreamExecutionEnvironment.get_execution_environment() +env.add_source(pubsub_source) \ + .add_sink(pubsub_sink) +``` +{{< /tab >}} {{< /tabs >}} ### 至少一次语义保证 diff --git a/docs/content/docs/connectors/datastream/pubsub.md b/docs/content/docs/connectors/datastream/pubsub.md index 0b5db1c63f9a5..c68015a93b93e 100644 --- a/docs/content/docs/connectors/datastream/pubsub.md +++ b/docs/content/docs/connectors/datastream/pubsub.md @@ -71,6 +71,20 @@ SourceFunction pubsubSource = PubSubSource.newBuilder() streamExecEnv.addSource(pubsubSource); ``` {{< /tab >}} +{{< tab "Python" >}} +```python +stream_exec_env = StreamExecutionEnvironment.get_execution_environment() + +deserializer = ... +pubsub_source = PubSubSource.new_builder() \ + .with_deserialization_schema(deserializer) \ + .with_project_name("project") \ + .with_subscription_name("subscription") \ + .build() + +stream_exec_env.add_source(pubsub_source) +``` +{{< /tab >}} {{< /tabs >}} Currently the source functions [pulls](https://cloud.google.com/pubsub/docs/pull) messages from PubSub, [push endpoints](https://cloud.google.com/pubsub/docs/push) are not supported. @@ -83,6 +97,8 @@ This builder works in a similar way to the PubSubSource. Example: +{{< tabs "f5af7878-b460-4e05-8072-a0fd077ba6e5" >}} +{{< tab "Java" >}} ```java DataStream dataStream = (...); @@ -95,6 +111,22 @@ SinkFunction pubsubSink = PubSubSink.newBuilder() dataStream.addSink(pubsubSink); ``` +{{< /tab >}} +{{< tab "Python" >}} +```python +data_stream = ... + +serialization_schema = ... +pubsub_sink = PubSubSink.new_builder() \ + .with_serialization_schema(serialization_schema) \ + .with_project_name("project") \ + .with_topic_name("topic") \ + .build() + +data_stream.add_sink(pubsub_sink) +``` +{{< /tab >}} +{{< /tabs >}} ### Google Credentials @@ -110,6 +142,8 @@ When running integration tests you might not want to connect to PubSub directly The following example shows how you would create a source to read messages from the emulator and send them back: +{{< tabs "f5af7878-b460-4e05-8072-a0fd078ba6e1" >}} +{{< tab "Java" >}} ```java String hostAndPort = "localhost:1234"; DeserializationSchema deserializationSchema = (...); @@ -131,6 +165,31 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm env.addSource(pubsubSource) .addSink(pubsubSink); ``` +{{< /tab >}} +{{< tab "Python" >}} +```python +host_and_port = "localhost:1234" +deserialization_schema = ... +pubsub_source = PubSubSource.new_builder() \ + .with_deserialization_schema(deserialization_schema) \ + .with_project_name("my-fake-project") \ + .with_subscription_name("subscription") \ + .with_pubsub_subscriber_factory(PubSubSubscriberFactory.default(10, Duration.of_seconds(15), 100)) \ + .build() +serialization_schema = ... +pubsub_sink = PubSubSink.new_builder() \ + .with_serialization_schema(serialization_schema) \ + .with_project_name("my-fake-project") \ + .with_topic_name("topic") \ + .with_host_and_port_for_emulator(host_and_port) \ + .build() + +env = StreamExecutionEnvironment.get_execution_environment() +env.add_source(pubsub_source) \ + .add_sink(pubsub_sink) +``` +{{< /tab >}} +{{< /tabs >}} ### At least once guarantee diff --git a/flink-connectors/flink-connector-gcp-pubsub/pom.xml b/flink-connectors/flink-connector-gcp-pubsub/pom.xml index ff45f0dd8f11b..4ee455b2d59c3 100644 --- a/flink-connectors/flink-connector-gcp-pubsub/pom.xml +++ b/flink-connectors/flink-connector-gcp-pubsub/pom.xml @@ -101,4 +101,40 @@ under the License. test + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-flink + package + + shade + + + + + org.apache.flink:flink-connector-gcp-pubsub* + com.google.cloud:google-cloud-pubsub + com.google.*:* + org.threeten:* + io.grpc:* + io.opencensus:* + + + + + com.google + org.apache.flink.pubsub.shaded.com.google + + + + + + + + diff --git a/flink-python/pom.xml b/flink-python/pom.xml index cb61c21180687..8d61bec0b3694 100644 --- a/flink-python/pom.xml +++ b/flink-python/pom.xml @@ -327,6 +327,14 @@ under the License. test + + + org.apache.flink + flink-connector-gcp-pubsub + ${project.version} + test + + org.apache.flink @@ -556,6 +564,10 @@ under the License. org.apache.flink flink-connector-cassandra_${scala.binary.version} + + org.apache.flink + flink-connector-gcp-pubsub + org.apache.flink flink-statebackend-rocksdb diff --git a/flink-python/pyflink/datastream/connectors/__init__.py b/flink-python/pyflink/datastream/connectors/__init__.py index 02681d299a814..8f68d78a37214 100644 --- a/flink-python/pyflink/datastream/connectors/__init__.py +++ b/flink-python/pyflink/datastream/connectors/__init__.py @@ -73,6 +73,13 @@ def _install(): setattr(connectors, 'StreamFormat', file_system.StreamFormat) setattr(connectors, 'StreamingFileSink', file_system.StreamingFileSink) + # pubsub + from pyflink.datastream.connectors import pubsub + setattr(connectors, 'PubSubSource', pubsub.PubSubSource) + setattr(connectors, 'PubSubSink', pubsub.PubSubSink) + setattr(connectors, 'Credentials', pubsub.Credentials) + setattr(connectors, 'PubSubSubscriberFactory', pubsub.PubSubSubscriberFactory) + # for backward compatibility _install() diff --git a/flink-python/pyflink/datastream/connectors/pubsub.py b/flink-python/pyflink/datastream/connectors/pubsub.py new file mode 100644 index 0000000000000..9096341aa7c01 --- /dev/null +++ b/flink-python/pyflink/datastream/connectors/pubsub.py @@ -0,0 +1,291 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from typing import Union, Tuple + +from py4j.java_gateway import JavaObject + +from pyflink.common import Duration, DeserializationSchema, SerializationSchema +from pyflink.datastream.connectors import Source, Sink +from pyflink.java_gateway import get_gateway + +__all__ = [ + 'PubSubSource', + 'PubSubSink', + 'Credentials', + 'PubSubSubscriberFactory' +] + + +class Credentials(object): + """ + The authority authentication policy. + """ + + def __init__(self, j_credentials): + self._j_credentials = j_credentials + + @staticmethod + def emulator_credentials() -> 'Credentials': + """ + A placeholder for credentials to signify that requests sent to the server should not be + authenticated. + This is typically useful when using local service emulators. + """ + JEmulatorCredentialsProvider = get_gateway().jvm. \ + org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider + return Credentials(JEmulatorCredentialsProvider.create().getCredentials()) + + +class PubSubSubscriberFactory(object): + """ + A factory class to create a SubscriberStub. + This allows for customized Subscribers with for instance tweaked configurations. + """ + + def __init__(self, j_pubsub_subscriber_factory: Union[Tuple, JavaObject]): + self._j_pubsub_subscriber_factory = j_pubsub_subscriber_factory + + @staticmethod + def default(max_messages_per_pull: int, timeout: Duration, retries: int) \ + -> 'PubSubSubscriberFactory': + """ + A default PubSubSubscriberFactory configuration. + """ + return PubSubSubscriberFactory((max_messages_per_pull, timeout, retries)) + + @staticmethod + def emulator(host_and_port: str, project: str, subscription: str, retries: int, + timeout: Duration, max_messages_per_pull: int) -> 'PubSubSubscriberFactory': + """ + A convenience PubSubSubscriberFactory that can be used to connect to a PubSub emulator. + The PubSub emulators do not support SSL or Credentials and as such this SubscriberStub does + not require or provide this. + """ + JPubSubSubscriberFactoryForEmulator = get_gateway().jvm.org.apache.flink. \ + streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator + return PubSubSubscriberFactory( + JPubSubSubscriberFactoryForEmulator(host_and_port, project, subscription, retries, + timeout._j_duration, max_messages_per_pull)) + + +class PubSubSource(Source): + """ + PubSub Source, this Source will consume PubSub messages from a subscription and Acknowledge + them on the next checkpoint. This ensures every message will get acknowledged at least once. + """ + + def __init__(self, j_pubsub_source: JavaObject): + super(PubSubSource, self).__init__(j_pubsub_source) + + @staticmethod + def new_builder() -> 'DeserializationSchemaBuilder': + """ + Create a builder for a new PubSubSource. + """ + JPubSubSource = get_gateway().jvm \ + .org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource + return PubSubSource.DeserializationSchemaBuilder(JPubSubSource.newBuilder()) + + class DeserializationSchemaBuilder(object): + """ + Part of PubSubSource.PubSubSourceBuilder to set required fields. + """ + + def __init__(self, j_deserialization_schema_builder): + self._j_deserialization_schema_builder = j_deserialization_schema_builder + + def with_deserialization_schema(self, deserialization_schema: DeserializationSchema) \ + -> 'PubSubSource.ProjectNameBuilder': + """ + Set the DeserializationSchema used to deserialize incoming PubSubMessages. + """ + _j_deserialization_schema = self._j_deserialization_schema_builder \ + .withDeserializationSchema(deserialization_schema._j_deserialization_schema) + return PubSubSource.ProjectNameBuilder(_j_deserialization_schema) + + class ProjectNameBuilder(object): + """ + Part of PubSubSource.PubSubSourceBuilder to set required fields. + """ + + def __init__(self, j_project_name_builder): + self._j_project_name_builder = j_project_name_builder + + def with_project_name(self, project_name: str) -> 'PubSubSource.SubscriptionNameBuilder': + """ + Set the project name of the subscription to pull messages from. + """ + _j_project_name = self._j_project_name_builder.withProjectName(project_name) + return PubSubSource.SubscriptionNameBuilder(_j_project_name) + + class SubscriptionNameBuilder(object): + """ + Part of PubSubSource.PubSubSourceBuilder to set required fields. + """ + + def __init__(self, j_subscription_name_builder): + self._j_subscription_name_builder = j_subscription_name_builder + + def with_subscription_name(self, subscription_name: str) \ + -> 'PubSubSource.PubSubSourceBuilder': + """ + Set the subscription name of the subscription to pull messages from. + """ + _j_subscription_name = self._j_subscription_name_builder.withSubscriptionName( + subscription_name) + return PubSubSource.PubSubSourceBuilder(_j_subscription_name) + + class PubSubSourceBuilder(object): + """ + Builder to create PubSubSource. + """ + + def __init__(self, j_pubsub_source_builder): + self._j_pubsub_source_builder = j_pubsub_source_builder + + def with_credentials(self, credentials: Credentials) \ + -> 'PubSubSource.PubSubSourceBuilder': + """ + Set the credentials. If this is not used then the credentials are picked up from the + environment variables. + """ + self._j_pubsub_source_builder.withCredentials(credentials._j_credentials) + return self + + def with_pubsub_subscriber_factory(self, factory: 'PubSubSubscriberFactory') \ + -> 'PubSubSource.PubSubSourceBuilder': + j_factory = factory._j_pubsub_subscriber_factory + if isinstance(j_factory, tuple): + self._j_pubsub_source_builder.withPubSubSubscriberFactory( + j_factory[0], + j_factory[1]._j_duration, + j_factory[2]) + else: + self._j_pubsub_source_builder.withPubSubSubscriberFactory(j_factory) + return self + + def with_message_rate_limit(self, message_per_second_rate_limit: int) \ + -> 'PubSubSource.PubSubSourceBuilder': + """ + Set a limit on the rate of messages per second received. This limit is per parallel + instance of the source function. Default is set to 100000 messages per second + """ + self._j_pubsub_source_builder.withMessageRateLimit(message_per_second_rate_limit) + return self + + def build(self) -> 'PubSubSource': + """ + Actually build the desired instance of the PubSubSourceBuilder. + """ + return PubSubSource(self._j_pubsub_source_builder.build()) + + +class PubSubSink(Sink): + """ + A sink function that outputs to PubSub. + """ + + def __init__(self, j_pub_sub_sink: JavaObject): + super(PubSubSink, self).__init__(sink=j_pub_sub_sink) + + @staticmethod + def new_builder() -> 'SerializationSchemaBuilder': + """ + Create a builder for a new PubSubSink. + """ + JPubSubSink = get_gateway().jvm. \ + org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink + return PubSubSink.SerializationSchemaBuilder(JPubSubSink.newBuilder()) + + class SerializationSchemaBuilder(object): + """ + Part of PubSubSink.PubSubSinkBuilder to set required fields. + """ + + def __init__(self, j_serialization_schema_builder): + self._j_serialization_schema_builder = j_serialization_schema_builder + + def with_serialization_schema(self, deserialization_schema: SerializationSchema) \ + -> 'PubSubSink.ProjectNameBuilder': + """ + Set the SerializationSchema used to Serialize objects to be added as payloads of + PubSubMessages. + """ + _j_serialization_schema = self._j_serialization_schema_builder.withSerializationSchema( + deserialization_schema._j_serialization_schema) + return PubSubSink.ProjectNameBuilder(_j_serialization_schema) + + class ProjectNameBuilder(object): + """ + Part of PubSubSource.PubSubSourceBuilder to set required fields. + """ + + def __init__(self, j_project_name_builder): + self._j_project_name_builder = j_project_name_builder + + def with_project_name(self, project_name: str) -> 'PubSubSink.TopicNameBuilder': + """ + Set the project name of the subscription to pull messages from. + """ + _j_project_name = self._j_project_name_builder.withProjectName(project_name) + return PubSubSink.TopicNameBuilder(_j_project_name) + + class TopicNameBuilder(object): + """ + Part of PubSubSink.PubSubSinkBuilder to set required fields. + """ + + def __init__(self, j_topic_name_builder): + self._j_topic_name_builder = j_topic_name_builder + + def with_topic_name(self, topic_name: str) -> 'PubSubSink.PubSubSinkBuilder': + """ + Set the subscription name of the subscription to pull messages from. + """ + _j_topic_name_builder = self._j_topic_name_builder.withTopicName(topic_name) + return PubSubSink.PubSubSinkBuilder(_j_topic_name_builder) + + class PubSubSinkBuilder(object): + """ + PubSubSinkBuilder to create a PubSubSink. + """ + + def __init__(self, j_pubsub_sink_builder): + self._j_pubsub_sink_builder = j_pubsub_sink_builder + + def with_credentials(self, credentials: Credentials) -> 'PubSubSink.PubSubSinkBuilder': + """ + Set the credentials. If this is not used then the credentials are picked up from the + environment variables. + """ + self._j_pubsub_sink_builder.withCredentials(credentials._j_credentials) + return self + + def with_host_and_port_for_emulator(self, host_and_port: str) \ + -> 'PubSubSink.PubSubSinkBuilder': + """ + Set the custom hostname/port combination of PubSub. The ONLY reason to use this is + during tests with the emulator provided by Google. + """ + self._j_pubsub_sink_builder.withHostAndPortForEmulator(host_and_port) + return self + + def build(self) -> 'PubSubSink': + """ + Actually builder the desired instance of the PubSubSink. + """ + return PubSubSink(self._j_pubsub_sink_builder.build()) diff --git a/flink-python/pyflink/datastream/connectors/tests/test_pubsub.py b/flink-python/pyflink/datastream/connectors/tests/test_pubsub.py new file mode 100644 index 0000000000000..fe6b2bfd47e05 --- /dev/null +++ b/flink-python/pyflink/datastream/connectors/tests/test_pubsub.py @@ -0,0 +1,60 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from pyflink.common import SimpleStringSchema, Types, Duration +from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase +from pyflink.util.java_utils import get_field_value, is_instance_of +from pyflink.datastream.connectors.pubsub import PubSubSource, PubSubSink, Credentials, \ + PubSubSubscriberFactory + + +class PubSubTest(PyFlinkStreamingTestCase): + + def test_pubsub_source(self): + pubsub_source = PubSubSource.new_builder() \ + .with_deserialization_schema(SimpleStringSchema()) \ + .with_project_name("project") \ + .with_subscription_name("subscription") \ + .with_credentials(Credentials.emulator_credentials()) \ + .with_pubsub_subscriber_factory( + PubSubSubscriberFactory.default(10, Duration.of_seconds(10), 10)) \ + .build() + ds = self.env.add_source(source_func=pubsub_source, source_name="pubsub source") + ds.print() + plan = eval(self.env.get_execution_plan()) + self.assertEqual('Source: pubsub source', plan['nodes'][0]['type']) + self.assertTrue(is_instance_of( + get_field_value(pubsub_source.get_java_function(), 'credentials'), + 'org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials')) + + def test_pubsub_sink(self): + ds = self.env.from_collection([('ab', 1), ('bdc', 2), ('cfgs', 3), ('deeefg', 4)], + type_info=Types.ROW([Types.STRING(), Types.INT()])) + + pubsub_sink = PubSubSink.new_builder() \ + .with_serialization_schema(SimpleStringSchema()) \ + .with_project_name("project") \ + .with_topic_name("topic") \ + .with_host_and_port_for_emulator("localhost:8080") \ + .build() + + ds.add_sink(pubsub_sink).name('pubsub sink') + plan = eval(self.env.get_execution_plan()) + + self.assertEqual('Sink: pubsub sink', plan['nodes'][1]['type']) + self.assertEqual(get_field_value(pubsub_sink.get_java_function(), 'projectName'), 'project')