Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"revision": 6
"revision": 7
}
54 changes: 27 additions & 27 deletions sdks/python/apache_beam/yaml/extended_tests/messaging/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,33 +50,33 @@ pipelines:
# Locally runs fine.
# Kafka read pipeline
# Need a separate read pipeline to make sure the write pipeline is flushed
# - pipeline:
# type: chain
# transforms:
# - type: ReadFromKafka
# config:
# format: "RAW"
# topic: "silly_topic"
# bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}"
# consumer_config:
# auto.offset.reset: "earliest"
# group.id: "yaml-kafka-test-group"
# max_read_time_seconds: 10 # will read forever if not set
# - type: MapToFields
# config:
# language: python
# fields:
# value:
# callable: |
# # Kafka RAW format reads messages as bytes in the 'payload' field of a Row
# lambda row: row.payload.decode('utf-8')
# output_type: string
# - type: AssertEqual
# config:
# elements:
# - {value: "123"}
# - {value: "456"}
# - {value: "789"}
- pipeline:
type: chain
transforms:
- type: ReadFromKafka
config:
format: "RAW"
topic: "silly_topic"
bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}"
consumer_config:
auto.offset.reset: "earliest"
group.id: "yaml-kafka-test-group"
max_read_time_seconds: 10 # will read forever if not set
- type: MapToFields
config:
language: python
fields:
value:
callable: |
# Kafka RAW format reads messages as bytes in the 'payload' field of a Row
lambda row: row.payload.decode('utf-8')
output_type: string
- type: AssertEqual
config:
elements:
- {value: "123"}
- {value: "456"}
- {value: "789"}

options:
streaming: true
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/yaml/standard_io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
'error_handling': 'error_handling'
'file_descriptor_path': 'file_descriptor_path'
'message_name': 'message_name'
'max_read_time_seconds': 'max_read_time_seconds'
'WriteToKafka':
'format': 'format'
'topic': 'topic'
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -1509,6 +1509,8 @@ def create_transform(
"""Creates a PTransform instance for the given transform type and arguments.
"""
mappings = self._mappings[typ]
# NOTE: If the `key` is not found in the mappings (e.g. standard_io.py), the
# `key` is passed down as is to the underlying transform.
remapped_args = {
mappings.get(key, key): value
for key, value in args.items()
Expand Down
Loading