Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Iceberg from YAML via ManagedIO. #33579

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/yaml/standard_io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@
'WriteToText': 'apache_beam.yaml.yaml_io.write_to_text'
'ReadFromPubSub': 'apache_beam.yaml.yaml_io.read_from_pubsub'
'WriteToPubSub': 'apache_beam.yaml.yaml_io.write_to_pubsub'
'ReadFromIceberg': 'apache_beam.yaml.yaml_io.read_from_iceberg'
'WriteToIceberg': 'apache_beam.yaml.yaml_io.write_to_iceberg'

# Declared as a renaming transform to avoid exposing all
# (implementation-specific) pandas arguments and aligning with possible Java
Expand Down
55 changes: 55 additions & 0 deletions sdks/python/apache_beam/yaml/tests/iceberg.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

fixtures:
- name: TEMP_DIR
type: "tempfile.TemporaryDirectory"

pipelines:
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {label: "11a", rank: 0}
- {label: "37a", rank: 1}
- {label: "389a", rank: 2}
- type: WriteToIceberg
config:
table: "default.table"
catalog_name: "some_catalog"
catalog_properties:
type: "hadoop"
warehouse: "{TEMP_DIR}/dir"

- pipeline:
type: chain
transforms:
- type: ReadFromIceberg
config:
table: "default.table"
catalog_name: "some_catalog"
catalog_properties:
type: "hadoop"
warehouse: "{TEMP_DIR}/dir"
- type: AssertEqual
config:
elements:
- {label: "11a", rank: 0}
- {label: "37a", rank: 1}
- {label: "389a", rank: 2}
90 changes: 90 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,96 @@ def attributes_extractor(row):
timestamp_attribute=timestamp_attribute))


def read_from_iceberg(
table: str,
catalog_name: Optional[str] = None,
catalog_properties: Optional[Mapping[str, str]] = None,
config_properties: Optional[Mapping[str, str]] = None,
):
# TODO(robertwb): It'd be nice to derive this list of parameters, along with
# their types and docs, programmatically from the iceberg (or managed)
# schemas.

"""Reads an Apache Iceberg table.

See also the [Apache Iceberg Beam documentation](
https://cloud.google.com/dataflow/docs/guides/managed-io#iceberg).

Args:
table: The identifier of the Apache Iceberg table. Example: "db.table1".
catalog_name: The name of the catalog. Example: "local".
catalog_properties: A map of configuration properties for the Apache Iceberg
catalog.
The required properties depend on the catalog. For more information, see
CatalogUtil in the Apache Iceberg documentation.
config_properties: An optional set of Hadoop configuration properties.
For more information, see CatalogUtil in the Apache Iceberg documentation.
"""
return beam.managed.Read(
"iceberg",
config=dict(
table=table,
catalog_name=catalog_name,
catalog_properties=catalog_properties,
config_properties=config_properties))


def write_to_iceberg(
table: str,
catalog_name: Optional[str] = None,
catalog_properties: Optional[Mapping[str, str]] = None,
config_properties: Optional[Mapping[str, str]] = None,
triggering_frequency_seconds: Optional[int] = None,
keep: Optional[Iterable[str]] = None,
drop: Optional[Iterable[str]] = None,
only: Optional[str] = None,
):
# TODO(robertwb): It'd be nice to derive this list of parameters, along with
# their types and docs, programmatically from the iceberg (or managed)
# schemas.

"""Writes to an Apache Iceberg table.

See also the [Apache Iceberg Beam documentation](
https://cloud.google.com/dataflow/docs/guides/managed-io#iceberg)
and [](
https://cloud.google.com/dataflow/docs/guides/managed-io#dynamic-destinations)
for use of the keep, drop, and only parameters.

Args:
table: The identifier of the Apache Iceberg table. Example: "db.table1".
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to hint that dynamic destinations is supported and link to https://cloud.google.com/dataflow/docs/guides/managed-io#dynamic-destinations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

catalog_name: The name of the catalog. Example: "local".
catalog_properties: A map of configuration properties for the Apache Iceberg
catalog.
The required properties depend on the catalog. For more information, see
CatalogUtil in the Apache Iceberg documentation.
config_properties: An optional set of Hadoop configuration properties.
For more information, see CatalogUtil in the Apache Iceberg documentation.
triggering_frequency_seconds: For streaming write pipelines, the frequency
at which the sink attempts to produce snapshots, in seconds.

keep: An optional list of field names to keep when writing to the
destination. Other fields are dropped. Mutually exclusive with drop
and only.
drop: An optional list of field names to drop before writing to the
destination. Mutually exclusive with keep and only.
only: The name of exactly one field to keep as the top level record when
writing to the destination. All other fields are dropped. This field must
be of row type. Mutually exclusive with drop and keep.
"""
return beam.managed.Write(
"iceberg",
config=dict(
table=table,
catalog_name=catalog_name,
catalog_properties=catalog_properties,
config_properties=config_properties,
triggering_frequency_seconds=triggering_frequency_seconds,
keep=keep,
drop=drop,
only=only))


def io_providers():
with open(os.path.join(os.path.dirname(__file__), 'standard_io.yaml')) as fin:
return yaml_provider.parse_providers(yaml.load(fin, Loader=yaml.SafeLoader))
Loading