diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index a21782bdc603..b2544e773552 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -129,6 +129,8 @@ 'WriteToText': 'apache_beam.yaml.yaml_io.write_to_text' 'ReadFromPubSub': 'apache_beam.yaml.yaml_io.read_from_pubsub' 'WriteToPubSub': 'apache_beam.yaml.yaml_io.write_to_pubsub' + 'ReadFromIceberg': 'apache_beam.yaml.yaml_io.read_from_iceberg' + 'WriteToIceberg': 'apache_beam.yaml.yaml_io.write_to_iceberg' # Declared as a renaming transform to avoid exposing all # (implementation-specific) pandas arguments and aligning with possible Java diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index a6525aef9877..53cbac8c3b16 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -484,6 +484,84 @@ def attributes_extractor(row): timestamp_attribute=timestamp_attribute)) +def read_from_iceberg( + table: str, + catalog_name: Optional[str] = None, + catalog_properties: Optional[Mapping[str, str]] = None, + config_properties: Optional[Mapping[str, str]] = None, +): + # TODO(robertwb): It'd be nice to derive these from the iceberg (or managed) scheams. + + """Reads an Apache Iceberg table. + + See also the [Apache Iceberg Beam documentation](https://cloud.google.com/dataflow/docs/guides/managed-io#iceberg). + + Args: + table: The identifier of the Apache Iceberg table. Example: "db.table1". + catalog_name: The name of the catalog. Example: "local". + catalog_properties: A map of configuration properties for the Apache Iceberg catalog. + The required properties depend on the catalog. For more information, see + CatalogUtil in the Apache Iceberg documentation. + config_properties: An optional set of Hadoop configuration properties. + For more information, see CatalogUtil in the Apache Iceberg documentation. + """ + return beam.managed.Read( + "iceberg", + config=dict( + table=table, + catalog_name=catalog_name, + catalog_properties=catalog_properties, + config_properties=config_properties)) + + +def write_to_iceberg( + table: str, + catalog_name: Optional[str] = None, + catalog_properties: Optional[Mapping[str, str]] = None, + config_properties: Optional[Mapping[str, str]] = None, + triggering_frequency_seconds: Optional[int] = None, + keep: Optional[Iterable[str]] = None, + drop: Optional[Iterable[str]] = None, + only: Optional[str] = None, +): + # TODO(robertwb): It'd be nice to derive these from the iceberg (or managed) scheams. + + """Writes to an Apache Iceberg table. + + See also the [Apache Iceberg Beam documentation](https://cloud.google.com/dataflow/docs/guides/managed-io#iceberg). + + Args: + table: The identifier of the Apache Iceberg table. Example: "db.table1". + catalog_name: The name of the catalog. Example: "local". + catalog_properties: A map of configuration properties for the Apache Iceberg catalog. + The required properties depend on the catalog. For more information, see + CatalogUtil in the Apache Iceberg documentation. + config_properties: An optional set of Hadoop configuration properties. + For more information, see CatalogUtil in the Apache Iceberg documentation. + triggering_frequency_seconds: For streaming write pipelines, the frequency at which + the sink attempts to produce snapshots, in seconds. + + keep: An optional list of field names to keep when writing to the destination. + Other fields are dropped. Mutually exclusive with drop and only. + drop: An optional list of field names to drop before writing to the destination. + Mutually exclusive with keep and only. + only: The name of exactly one field to keep as the top level record when writing + to the destination. All other fields are dropped. This field must be of row type. + Mutually exclusive with drop and keep. + """ + return beam.managed.Write( + "iceberg", + config=dict( + table=table, + catalog_name=catalog_name, + catalog_properties=catalog_properties, + config_properties=config_properties, + triggering_frequency_seconds=triggering_frequency_seconds, + keep=keep, + drop=drop, + only=only)) + + def io_providers(): with open(os.path.join(os.path.dirname(__file__), 'standard_io.yaml')) as fin: return yaml_provider.parse_providers(yaml.load(fin, Loader=yaml.SafeLoader))