Skip to content

Conversation

@ahmedabu98
Copy link
Contributor

@ahmedabu98 ahmedabu98 commented Oct 23, 2025

Fixes #36553

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @ahmedabu98, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the Apache Beam Avro extension by enabling comprehensive support for microsecond-precision timestamps. It ensures that Beam's SqlTypes.TIMESTAMP can be accurately converted to Avro's LogicalTypes.timestampMicros, preserving data fidelity for time-sensitive applications. The changes include updates to schema mapping, value conversion, and a dedicated test case to verify the new functionality.

Highlights

  • Microsecond Timestamp Support: Introduced support for converting Beam's SqlTypes.TIMESTAMP to Avro's LogicalTypes.timestampMicros, allowing for microsecond precision in timestamp data.
  • Schema Mapping Update: Updated the AvroUtils class to correctly map Beam's TIMESTAMP logical type to an Avro timestampMicros logical type, which is backed by a LONG primitive type.
  • Value Conversion Logic: Implemented the necessary conversion logic to transform java.time.Instant values into their microsecond-precision Long representation when serializing to Avro.
  • New Test Case: Added a new test method, testBeamTimestampLogicalTypeToAvro, to validate the one-way conversion of Beam TIMESTAMP logical types and their corresponding java.time.Instant values to Avro timestampMicros.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@ahmedabu98
Copy link
Contributor Author

R: @claudevdm

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

Copy link
Collaborator

@claudevdm claudevdm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Should we also handlle millis?

MILLIS_INSTANT = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =

@liferoad
Copy link
Contributor

can we add a test to cover this?

for example:

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import datetime
import os
import secrets
import time
import unittest
import uuid

import pytest

import apache_beam as beam
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
from apache_beam.io.gcp.bigquery_tools import beam_row_from_dict
from apache_beam.io.gcp.bigquery_tools import (
    get_beam_typehints_from_tableschema)
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms import managed
from apache_beam.typehints.row_type import RowTypeConstraint
from apache_beam.utils.timestamp import Timestamp


@pytest.mark.uses_io_java_expansion_service
@unittest.skipUnless(
    os.environ.get('EXPANSION_JARS'),
    "EXPANSION_JARS environment var is not provided, "
    "indicating that jars have not been built")
class ManagedBigQueryIT(unittest.TestCase):
  BIG_QUERY_DATASET_ID = 'python_managed_bq_'

  def setUp(self):
    self.test_pipeline = TestPipeline(is_integration_test=True)
    self.project = self.test_pipeline.get_option('project')
    self.args = self.test_pipeline.get_full_options_as_args()
    self.args.extend([
        '--experiments=enable_managed_transforms',
    ])

    # Create a unique dataset for this test
    self.dataset_id = '%s%d%s' % (
        self.BIG_QUERY_DATASET_ID, int(time.time()), secrets.token_hex(3))
    self.bigquery_client = BigQueryWrapper()
    self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)

  def tearDown(self):
    # Clean up the dataset after test
    try:
      self.bigquery_client._delete_dataset(
          self.project, self.dataset_id, delete_contents=True)
    except Exception:
      # Dataset might not exist or already deleted
      pass

  def test_managed_bigquery_write(self):
    """Test writing data to BigQuery using managed transforms."""
    table_name = 'test_table_' + uuid.uuid4().hex
    table_spec = f"{self.project}.{self.dataset_id}.{table_name}"

    # Define BigQuery table schema
    schema = {
        "fields": [
            {"name": "id", "type": "INTEGER", "mode": "REQUIRED"},
            {"name": "name", "type": "STRING", "mode": "REQUIRED"},
            {"name": "details", "type": "RECORD", "mode": "NULLABLE",
             "fields": [
                 {"name": "color", "type": "STRING", "mode": "NULLABLE"},
                 {"name": "size", "type": "FLOAT", "mode": "NULLABLE"}
             ]},
            {"name": "ts", "type": "TIMESTAMP", "mode": "NULLABLE"}
        ]
    }

    # Test data
    test_data = [
        {
            "id": 1,
            "name": "test_row_1",
            "details": {
                "color": "red", "size": 1.5
            },
            "ts": Timestamp.from_utc_datetime(
                datetime.datetime(
                    2025, 1, 1, 1, 0, 0, tzinfo=datetime.timezone.utc))
        },
        {
            "id": 2,
            "name": "test_row_2",
            "details": {
                "color": "blue", "size": 2.0
            },
            "ts": Timestamp.from_utc_datetime(
                datetime.datetime(
                    2025, 1, 1, 2, 0, 0, tzinfo=datetime.timezone.utc))
        },
    ]

    with TestPipeline(argv=self.args) as pipeline:
      rows = (
          pipeline
          | "CreateRows" >> beam.Create(test_data)
          | "ToRow" >>
          beam.Map(lambda row_dict: beam_row_from_dict(row_dict, schema)
                   ).with_output_types(
                       RowTypeConstraint.from_fields(
                           get_beam_typehints_from_tableschema(schema))))

      _ = rows | "WriteToBQ" >> managed.Write(
          managed.BIGQUERY,
          config={
              "table": table_spec,
              "write_disposition": "WRITE_APPEND",
          }
      )

    # Verify data was written by querying the table
    query = f"SELECT COUNT(*) as count FROM `{table_spec}`"
    query_job = self.bigquery_client.client.query(query)
    results = list(query_job.result())
    self.assertEqual(results[0].count, 2)


if __name__ == '__main__':
  unittest.main()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: Managed I/O with BIGQUERY can not insert TIMESTAMP

3 participants