diff --git a/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json b/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json index b5704c67ef1c..86bf1193abd9 100644 --- a/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "revision": 6 + "revision": 7 } diff --git a/sdks/python/apache_beam/yaml/extended_tests/messaging/kafka.yaml b/sdks/python/apache_beam/yaml/extended_tests/messaging/kafka.yaml index 5793fd111a6c..1c0e3fc6adaa 100644 --- a/sdks/python/apache_beam/yaml/extended_tests/messaging/kafka.yaml +++ b/sdks/python/apache_beam/yaml/extended_tests/messaging/kafka.yaml @@ -50,33 +50,33 @@ pipelines: # Locally runs fine. # Kafka read pipeline # Need a separate read pipeline to make sure the write pipeline is flushed - # - pipeline: - # type: chain - # transforms: - # - type: ReadFromKafka - # config: - # format: "RAW" - # topic: "silly_topic" - # bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}" - # consumer_config: - # auto.offset.reset: "earliest" - # group.id: "yaml-kafka-test-group" - # max_read_time_seconds: 10 # will read forever if not set - # - type: MapToFields - # config: - # language: python - # fields: - # value: - # callable: | - # # Kafka RAW format reads messages as bytes in the 'payload' field of a Row - # lambda row: row.payload.decode('utf-8') - # output_type: string - # - type: AssertEqual - # config: - # elements: - # - {value: "123"} - # - {value: "456"} - # - {value: "789"} + - pipeline: + type: chain + transforms: + - type: ReadFromKafka + config: + format: "RAW" + topic: "silly_topic" + bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}" + consumer_config: + auto.offset.reset: "earliest" + group.id: "yaml-kafka-test-group" + max_read_time_seconds: 10 # will read forever if not set + - type: MapToFields + config: + language: python + fields: + value: + callable: | + # Kafka RAW format reads messages as bytes in the 'payload' field of a Row + lambda row: row.payload.decode('utf-8') + output_type: string + - type: AssertEqual + config: + elements: + - {value: "123"} + - {value: "456"} + - {value: "789"} options: streaming: true diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index ddc3c7662a65..87655484f4b3 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -74,6 +74,7 @@ 'error_handling': 'error_handling' 'file_descriptor_path': 'file_descriptor_path' 'message_name': 'message_name' + 'max_read_time_seconds': 'max_read_time_seconds' 'WriteToKafka': 'format': 'format' 'topic': 'topic' diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index 0b47cbf2e686..ae02a37edc8b 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -1509,6 +1509,8 @@ def create_transform( """Creates a PTransform instance for the given transform type and arguments. """ mappings = self._mappings[typ] + # NOTE: If the `key` is not found in the mappings (e.g. standard_io.py), the + # `key` is passed down as is to the underlying transform. remapped_args = { mappings.get(key, key): value for key, value in args.items()