From 834b432386e6bfdf273be55bc6440ccb92911321 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Wed, 27 Nov 2024 18:37:35 +0000 Subject: [PATCH 1/2] chore: update comments and tests for label counts. --- bigframes/session/_io/bigquery/__init__.py | 15 +- bigframes/session/executor.py | 15 +- tests/unit/session/test_io_bigquery.py | 304 +++++++++++++++++++++ 3 files changed, 327 insertions(+), 7 deletions(-) diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index b7706d34ca..b86fcfe3a2 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -204,7 +204,11 @@ def format_option(key: str, value: Union[bool, str]) -> str: return f"{key}={repr(value)}" -def add_labels(job_config, api_name: Optional[str] = None): +def add_and_trim_labels(job_config, api_name: Optional[str] = None): + """ + Add additional labels to the job configuration and trim the total number of labels + to ensure they do not exceed the maximum limit allowed by BigQuery, which is 64 labels per job. + """ api_methods = log_adapter.get_and_reset_api_methods(dry_run=job_config.dry_run) job_config.labels = create_job_configs_labels( job_configs_labels=job_config.labels, @@ -225,7 +229,9 @@ def start_query_with_client( """ Starts query job and waits for results. """ - add_labels(job_config, api_name=api_name) + # Note: Ensure no additional labels are added to job_config after this point, + # as `add_and_trim_labels` ensures the label count does not exceed 64. + add_and_trim_labels(job_config, api_name=api_name) try: query_job = bq_client.query(sql, job_config=job_config, timeout=timeout) @@ -304,7 +310,10 @@ def create_bq_dataset_reference( bigquery.DatasetReference: The constructed reference to the anonymous dataset. """ job_config = google.cloud.bigquery.QueryJobConfig() - add_labels(job_config, api_name=api_name) + + # Note: Ensure no additional labels are added to job_config after this point, + # as `add_and_trim_labels` ensures the label count does not exceed 64. + add_and_trim_labels(job_config, api_name=api_name) query_job = bq_client.query( "SELECT 1", location=location, project=project, job_config=job_config ) diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index d19ec23501..d2a5b1168a 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -348,7 +348,10 @@ def export_gcs( export_options=dict(export_options), ) job_config = bigquery.QueryJobConfig() - bq_io.add_labels(job_config, api_name=f"dataframe-to_{format.lower()}") + + # Note: Ensure no additional labels are added to job_config after this point, + # as `add_and_trim_labels` ensures the label count does not exceed 64. + bq_io.add_and_trim_labels(job_config, api_name=f"dataframe-to_{format.lower()}") export_job = self.bqclient.query(export_data_statement, job_config=job_config) self._wait_on_job(export_job) return query_job @@ -358,7 +361,9 @@ def dry_run( ) -> bigquery.QueryJob: sql = self.to_sql(array_value, ordered=ordered) job_config = bigquery.QueryJobConfig(dry_run=True) - bq_io.add_labels(job_config) + # Note: Ensure no additional labels are added to job_config after this point, + # as `add_and_trim_labels` ensures the label count does not exceed 64. + bq_io.add_and_trim_labels(job_config) query_job = self.bqclient.query(sql, job_config=job_config) _ = query_job.result() return query_job @@ -486,8 +491,10 @@ def _run_execute_query( if not self.strictly_ordered: job_config.labels["bigframes-mode"] = "unordered" - # Note: add_labels is global scope which may have unexpected effects - bq_io.add_labels(job_config, api_name=api_name) + # Note: add_and_trim_labels is global scope which may have unexpected effects + # Ensure no additional labels are added to job_config after this point, + # as `add_and_trim_labels` ensures the label count does not exceed 64. + bq_io.add_and_trim_labels(job_config, api_name=api_name) try: query_job = self.bqclient.query(sql, job_config=job_config) return ( diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index 46c3c92036..58de3c2d52 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -17,15 +17,66 @@ from typing import Iterable import google.cloud.bigquery as bigquery +import google.cloud.bigquery_storage_v1 import pytest import bigframes from bigframes.core import log_adapter import bigframes.pandas as bpd import bigframes.session._io.bigquery as io_bq +import bigframes.session.executor as bf_exe from tests.unit import resources +@pytest.fixture(scope="function") +def mock_bq_client(mocker): + mock_client = mocker.Mock(spec=bigquery.Client) + mock_query_job = mocker.Mock(spec=bigquery.QueryJob) + mock_row_iterator = mocker.Mock(spec=bigquery.table.RowIterator) + + mock_query_job.result.return_value = mock_row_iterator + + mock_destination = bigquery.DatasetReference( + project="mock_project", dataset_id="mock_dataset" + ) + mock_query_job.destination = mock_destination + + mock_client.query.return_value = mock_query_job + + return mock_client + + +@pytest.fixture(scope="function") +def mock_storage_manager(mocker): + return mocker.Mock(spec=bigframes.session.temp_storage.TemporaryGbqStorageManager) + + +@pytest.fixture(scope="function") +def mock_bq_storage_read_client(mocker): + return mocker.Mock(spec=google.cloud.bigquery_storage_v1.BigQueryReadClient) + + +@pytest.fixture(scope="function") +def mock_array_value(mocker): + return mocker.Mock(spec=bigframes.core.ArrayValue) + + +@pytest.fixture(scope="function") +def patch_bq_caching_executor(mocker): + mock_execute_result = mocker.Mock(spec=bf_exe.ExecuteResult) + mock_execute_result.query_job.destination.project = "some_project" + mock_execute_result.query_job.destination.dataset_id = "some_dataset" + mock_execute_result.query_job.destination.table_id = "some_table" + + with mocker.patch.object( + bf_exe.BigQueryCachingExecutor, "to_sql", return_value='select * from "abc"' + ): + with mocker.patch.object( + bf_exe.BigQueryCachingExecutor, "execute", return_value=mock_execute_result + ): + yield + + def test_create_job_configs_labels_is_none(): api_methods = ["agg", "series-mode"] labels = io_bq.create_job_configs_labels( @@ -148,6 +199,259 @@ def test_create_job_configs_labels_length_limit_met(): assert "source" in labels.keys() +def test_add_and_trim_labels_length_limit_met(): + log_adapter.get_and_reset_api_methods() + cur_labels = { + "bigframes-api": "read_pandas", + "source": "bigquery-dataframes-temp", + } + for i in range(10): + key = f"bigframes-api-test-{i}" + value = f"test{i}" + cur_labels[key] = value + + df = bpd.DataFrame( + {"col1": [1, 2], "col2": [3, 4]}, session=resources.create_bigquery_session() + ) + + job_config = bigquery.job.QueryJobConfig() + job_config.labels = cur_labels + + df.max() + for _ in range(60): + df.head() + + io_bq.add_and_trim_labels(job_config=job_config) + assert job_config.labels is not None + assert len(job_config.labels) == 64 + assert "dataframe-max" not in job_config.labels.values() + assert "dataframe-head" in job_config.labels.values() + assert "bigframes-api" in job_config.labels.keys() + assert "source" in job_config.labels.keys() + + +@pytest.mark.parametrize( + ("max_results", "timeout", "api_name"), + [(None, None, None), (100, 30.0, "test_api")], +) +def test_start_query_with_client_labels_length_limit_met( + mock_bq_client, max_results, timeout, api_name +): + sql = "select * from abc" + cur_labels = { + "bigframes-api": "read_pandas", + "source": "bigquery-dataframes-temp", + } + for i in range(10): + key = f"bigframes-api-test-{i}" + value = f"test{i}" + cur_labels[key] = value + + df = bpd.DataFrame( + {"col1": [1, 2], "col2": [3, 4]}, session=resources.create_bigquery_session() + ) + + job_config = bigquery.job.QueryJobConfig() + job_config.labels = cur_labels + + df.max() + for _ in range(60): + df.head() + + io_bq.start_query_with_client( + mock_bq_client, + sql, + job_config, + max_results=max_results, + timeout=timeout, + api_name=api_name, + ) + + assert job_config.labels is not None + assert len(job_config.labels) == 64 + assert "dataframe-max" not in job_config.labels.values() + assert "dataframe-head" in job_config.labels.values() + assert "bigframes-api" in job_config.labels.keys() + assert "source" in job_config.labels.keys() + + +@pytest.mark.parametrize( + ("location", "project", "api_name"), + [(None, None, None), ("us", "abc", "test_api")], +) +def test_create_bq_dataset_reference_length_limit_met( + mock_bq_client, location, project, api_name +): + df = bpd.DataFrame( + {"col1": [1, 2], "col2": [3, 4]}, session=resources.create_bigquery_session() + ) + + df.max() + for _ in range(64): + df.head() + + io_bq.create_bq_dataset_reference( + mock_bq_client, + location=location, + project=project, + api_name=api_name, + ) + _, kwargs = mock_bq_client.query.call_args + job_config = kwargs["job_config"] + + assert job_config.labels is not None + assert len(job_config.labels) == 64 + assert "dataframe-max" not in job_config.labels.values() + assert "dataframe-head" in job_config.labels.values() + assert "bigframes-api" in job_config.labels.keys() + + +@pytest.mark.parametrize( + ("strictly_ordered", "format"), + [(True, "json"), (False, "csv"), (True, "json")], +) +def test_export_gcs_length_limit_met( + mock_bq_client, + mock_storage_manager, + mock_bq_storage_read_client, + mock_array_value, + strictly_ordered, + format, + patch_bq_caching_executor, +): + bigquery_caching_executor = bf_exe.BigQueryCachingExecutor( + mock_bq_client, + mock_storage_manager, + mock_bq_storage_read_client, + strictly_ordered=strictly_ordered, + ) + + df = bpd.DataFrame( + {"col1": [1, 2], "col2": [3, 4]}, session=resources.create_bigquery_session() + ) + + df.max() + for _ in range(63): + df.head() + + bigquery_caching_executor.export_gcs( + mock_array_value, + col_id_overrides={"a": "b", "c": "d"}, + uri="abc", + format=format, + export_options={"aa": True, "bb": "cc"}, + ) + + _, kwargs = mock_bq_client.query.call_args + job_config = kwargs["job_config"] + + assert job_config.labels is not None + assert len(job_config.labels) == 64 + assert "dataframe-max" not in job_config.labels.values() + assert "dataframe-head" in job_config.labels.values() + assert "bigframes-api" in job_config.labels.keys() + + +@pytest.mark.parametrize( + ("strictly_ordered", "ordered"), + [(True, False), (False, True)], +) +def test_dry_run_length_limit_met( + mock_bq_client, + mock_storage_manager, + mock_bq_storage_read_client, + mock_array_value, + strictly_ordered, + ordered, + patch_bq_caching_executor, +): + bigquery_caching_executor = bf_exe.BigQueryCachingExecutor( + mock_bq_client, + mock_storage_manager, + mock_bq_storage_read_client, + strictly_ordered=strictly_ordered, + ) + + df = bpd.DataFrame( + {"col1": [1, 2], "col2": [3, 4]}, session=resources.create_bigquery_session() + ) + + df.max() + for _ in range(64): + df.head() + + bigquery_caching_executor.dry_run(mock_array_value, ordered=ordered) + + _, kwargs = mock_bq_client.query.call_args + job_config = kwargs["job_config"] + + assert job_config.labels is not None + assert len(job_config.labels) == 64 + assert "dataframe-max" not in job_config.labels.values() + assert "dataframe-head" in job_config.labels.values() + assert "bigframes-api" in job_config.labels.keys() + + +@pytest.mark.parametrize( + ("strictly_ordered", "api_name", "page_size", "max_results"), + [(True, None, None, None), (False, "test_api", 100, 10)], +) +def test__run_execute_query_length_limit_met( + mock_bq_client, + mock_storage_manager, + mock_bq_storage_read_client, + strictly_ordered, + api_name, + page_size, + max_results, +): + sql = "select * from abc" + + bigquery_caching_executor = bf_exe.BigQueryCachingExecutor( + mock_bq_client, + mock_storage_manager, + mock_bq_storage_read_client, + strictly_ordered=strictly_ordered, + ) + + cur_labels = { + "bigframes-api": "read_pandas", + "source": "bigquery-dataframes-temp", + } + for i in range(10): + key = f"bigframes-api-test-{i}" + value = f"test{i}" + cur_labels[key] = value + + job_config = bigquery.job.QueryJobConfig() + job_config.labels = cur_labels + + df = bpd.DataFrame( + {"col1": [1, 2], "col2": [3, 4]}, session=resources.create_bigquery_session() + ) + + df.max() + for _ in range(60): + df.head() + + bigquery_caching_executor._run_execute_query( + sql, + job_config=job_config, + api_name=api_name, + page_size=page_size, + max_results=max_results, + ) + + _, kwargs = mock_bq_client.query.call_args + job_config = kwargs["job_config"] + + assert job_config.labels is not None + assert len(job_config.labels) == 64 + assert "dataframe-max" not in job_config.labels.values() + assert "dataframe-head" in job_config.labels.values() + assert "bigframes-api" in job_config.labels.keys() + + def test_create_temp_table_default_expiration(): """Make sure the created table has an expiration.""" expiration = datetime.datetime( From 1b1866f6d5fab0a8c32ddc64bbddacfe7ff991d4 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Wed, 27 Nov 2024 18:42:32 +0000 Subject: [PATCH 2/2] update docstring --- bigframes/session/_io/bigquery/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index b86fcfe3a2..51c156d658 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -207,7 +207,8 @@ def format_option(key: str, value: Union[bool, str]) -> str: def add_and_trim_labels(job_config, api_name: Optional[str] = None): """ Add additional labels to the job configuration and trim the total number of labels - to ensure they do not exceed the maximum limit allowed by BigQuery, which is 64 labels per job. + to ensure they do not exceed the maximum limit allowed by BigQuery, which is 64 + labels per job. """ api_methods = log_adapter.get_and_reset_api_methods(dry_run=job_config.dry_run) job_config.labels = create_job_configs_labels(