Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to read iceberg partitioned data with non-identity transforms. #6438

Open
malhotrashivam opened this issue Nov 26, 2024 · 1 comment
Assignees
Labels
Milestone

Comments

@malhotrashivam
Copy link
Contributor

For identity transform partitioning case (the one we current support), iceberg tables have a stand-alone partitioning column part of the schema with separate values.

For non-identity transform partitioning case, the partition value is derived from another column. For example, we can have a timestamp column in the data and the partition value can be the day part of that column.

As part of this issue, we should add support for reading non-identity transform partitioning columns properly.

Reference: https://iceberg.apache.org/spec/#partition-transforms

I have added a script in the comments to generate such a table using pyiceberg.

@malhotrashivam malhotrashivam added this to the 0.38.0 milestone Nov 26, 2024
@malhotrashivam malhotrashivam self-assigned this Nov 26, 2024
@malhotrashivam
Copy link
Contributor Author

This is the python script which uses MinIO for building such a table

import boto3
from pyiceberg.catalog import load_catalog

session = boto3.Session(
    aws_access_key_id='admin',
    aws_secret_access_key='password',
    region_name='us-east-1'
)

catalog = load_catalog(
    name='minio-iceberg',
    uri='http://rest:8181',
    warehouse='s3a://warehouse/wh',
    session=session,
    **{
        "s3.endpoint": "http://127.0.0.1:9000",
        "s3.access-key-id": "admin",
        "s3.secret-access-key": "password",
    }
)


from pyiceberg.schema import Schema
from pyiceberg.types import (
    TimestampType,
    FloatType,
    DoubleType,
    StringType,
    NestedField,
    StructType,
)

schema = Schema(
    NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=False),
    NestedField(field_id=2, name="symbol", field_type=StringType(), required=False),
    NestedField(field_id=3, name="bid", field_type=DoubleType(), required=False),
    NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),
)

from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform

partition_spec = PartitionSpec(
    PartitionField(
        source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day"
    )
)

from pyiceberg.table.sorting import SortOrder, SortField
from pyiceberg.transforms import IdentityTransform

# Sort on the symbol
sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform()))

tbl = catalog.create_table(
    identifier="sales.non_identity_spec",
    schema=schema,
    partition_spec=partition_spec,
    sort_order=sort_order,
)

# Add some data

import pyarrow as pa
from datetime import datetime

# Define the data according to your Iceberg schema
data = [
    {"datetime": datetime(2024, 11, 27, 10, 0, 0), "symbol": "AAPL", "bid": 150.25, "ask": 151.0},
    {"datetime": datetime(2024, 11, 26, 10, 1, 0), "symbol": "GOOG", "bid": 2800.75, "ask": 2810.5},
    {"datetime": datetime(2024, 11, 26, 10, 2, 0), "symbol": "AMZN", "bid": 3400.5, "ask": 3420.0},
    {"datetime": datetime(2024, 11, 28, 10, 3, 0), "symbol": "MSFT", "bid": None, "ask": 250.0},
]

# Create a PyArrow Table
table = pa.Table.from_pylist(data)

# Append the table to the Iceberg table
tbl.append(table)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant