Skip to content

Commit

Permalink
Support for Iceberg from YAML via ManagedIO.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Jan 14, 2025
1 parent badbce0 commit b9ab37c
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 0 deletions.
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/yaml/standard_io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@
'WriteToText': 'apache_beam.yaml.yaml_io.write_to_text'
'ReadFromPubSub': 'apache_beam.yaml.yaml_io.read_from_pubsub'
'WriteToPubSub': 'apache_beam.yaml.yaml_io.write_to_pubsub'
'ReadFromIceberg': 'apache_beam.yaml.yaml_io.read_from_iceberg'
'WriteToIceberg': 'apache_beam.yaml.yaml_io.write_to_iceberg'

# Declared as a renaming transform to avoid exposing all
# (implementation-specific) pandas arguments and aligning with possible Java
Expand Down
78 changes: 78 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,84 @@ def attributes_extractor(row):
timestamp_attribute=timestamp_attribute))


def read_from_iceberg(
table: str,
catalog_name: Optional[str] = None,
catalog_properties: Optional[Mapping[str, str]] = None,
config_properties: Optional[Mapping[str, str]] = None,
):
# TODO(robertwb): It'd be nice to derive these from the iceberg (or managed) scheams.

"""Reads an Apache Iceberg table.
See also the [Apache Iceberg Beam documentation](https://cloud.google.com/dataflow/docs/guides/managed-io#iceberg).
Args:
table: The identifier of the Apache Iceberg table. Example: "db.table1".
catalog_name: The name of the catalog. Example: "local".
catalog_properties: A map of configuration properties for the Apache Iceberg catalog.
The required properties depend on the catalog. For more information, see
CatalogUtil in the Apache Iceberg documentation.
config_properties: An optional set of Hadoop configuration properties.
For more information, see CatalogUtil in the Apache Iceberg documentation.
"""
return beam.managed.Read(
"iceberg",
config=dict(
table=table,
catalog_name=catalog_name,
catalog_properties=catalog_properties,
config_properties=config_properties))


def write_to_iceberg(
table: str,
catalog_name: Optional[str] = None,
catalog_properties: Optional[Mapping[str, str]] = None,
config_properties: Optional[Mapping[str, str]] = None,
triggering_frequency_seconds: Optional[int] = None,
keep: Optional[Iterable[str]] = None,
drop: Optional[Iterable[str]] = None,
only: Optional[str] = None,
):
# TODO(robertwb): It'd be nice to derive these from the iceberg (or managed) scheams.

"""Writes to an Apache Iceberg table.
See also the [Apache Iceberg Beam documentation](https://cloud.google.com/dataflow/docs/guides/managed-io#iceberg).
Args:
table: The identifier of the Apache Iceberg table. Example: "db.table1".
catalog_name: The name of the catalog. Example: "local".
catalog_properties: A map of configuration properties for the Apache Iceberg catalog.
The required properties depend on the catalog. For more information, see
CatalogUtil in the Apache Iceberg documentation.
config_properties: An optional set of Hadoop configuration properties.
For more information, see CatalogUtil in the Apache Iceberg documentation.
triggering_frequency_seconds: For streaming write pipelines, the frequency at which
the sink attempts to produce snapshots, in seconds.
keep: An optional list of field names to keep when writing to the destination.
Other fields are dropped. Mutually exclusive with drop and only.
drop: An optional list of field names to drop before writing to the destination.
Mutually exclusive with keep and only.
only: The name of exactly one field to keep as the top level record when writing
to the destination. All other fields are dropped. This field must be of row type.
Mutually exclusive with drop and keep.
"""
return beam.managed.Write(
"iceberg",
config=dict(
table=table,
catalog_name=catalog_name,
catalog_properties=catalog_properties,
config_properties=config_properties,
triggering_frequency_seconds=triggering_frequency_seconds,
keep=keep,
drop=drop,
only=only))


def io_providers():
with open(os.path.join(os.path.dirname(__file__), 'standard_io.yaml')) as fin:
return yaml_provider.parse_providers(yaml.load(fin, Loader=yaml.SafeLoader))

0 comments on commit b9ab37c

Please sign in to comment.