Skip to content

[Bug]: Incorrect $partition Metadata in Trino for Iceberg Tables Written via IcebergIO.writeRows with Timestamp Partitioning #35417

@mskyrim

Description

@mskyrim

What happened?

What happened?

We are using an Apache Beam pipeline (v2.62.0) to ingest data read in Protobuf format, transform it into a Beam schema (dynamically), and then write it to an Iceberg table using IcebergIO.writeRows as the final step of the pipeline.
We have noticed an issue when writing to Iceberg tables that are partitioned by the meta_processing_time field using the following specification:
(PartitionSpec.builderFor(contractIcebergSchema).month("meta_processing_time").build())
Although the Parquet data files are correctly written under the expected monthly partition folders (e.g., path/2025-06/filename.parquet), when querying the table using Trino:
select "$partition" , "$path",* from iceberg_table"

The $partition metadata field incorrectly shows a value of "1970-07", while:

the $path value is correctly referencing the meta_processing_time month (e.g., 'path/2025-06/filename.parquet')
the actual meta_processing_time column in the data contains the correct timestamp value (e.g., '2025-06-24 11:06:06.187 +00:00')
After running the following query in Trino:

ALTER TABLE table_name EXECUTE optimize

the $partition value is corrected and shows the expected value '2025-06'.
We have upgraded to the latest Apache Beam version as well as the Iceberg core library, but the issue persists.

here is a unit test method that simulates the issue :
`@When(
"I insert data into the iceberg schema {string} partitioned by {string} with the following values:")
public void i_insert_data_into_the_iceberg_schema_with_the_following_values(
String tableName, String partitionColumn, DataTable dataTable) {
// Step 1: Get Iceberg table and config
Namespace namespace = Namespace.of("platform_data");
TableIdentifier tableIdentifier = TableIdentifier.of(namespace, tableName);
Table table = catalog.catalog().loadTable(tableIdentifier);

if (table == null) {
  throw new IllegalStateException(
      "Iceberg table '" + tableName + "' does not exist in the catalog.");
}

// Build Beam schema
Schema beamSchema =
    Schema.builder()
        .addInt32Field("id")
        .addStringField("name")
        .addDateTimeField(partitionColumn)
        .build();

// Convert DataTable to Beam Row
List<Row> rows =
    dataTable.asMaps(String.class, String.class).stream()
        .map(
            map -> {
              return Row.withSchema(beamSchema)
                  .addValues(
                      Integer.parseInt(map.get("id")),
                      map.get("name"),
                      org.joda.time.Instant.now())
                  .build();
            })
        .collect(Collectors.toList());

// Step 2: Create Beam pipeline and write to Iceberg
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);
Pipeline pipeline = Pipeline.create(options);

rows.forEach(
    row -> {
      log.info("Row created_at = {}", row.getDateTime(partitionColumn));
    });

PCollection<Row> input = pipeline.apply("CreateRows", Create.of(rows)).setRowSchema(beamSchema);

IcebergCatalogConfig icebergCatalogConfig = IcebergHiveCatalogPerEnv.getBeamCatalogConfig();

input.apply(input.apply("WriteToIceberg", IcebergIO.writeRows(icebergCatalogConfig).to(tableIdentifier));

pipeline.run().waitUntilFinish();

}`

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions