diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index edac7efa4b..655b9c9dee 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1590,7 +1590,7 @@ def _start_query_ml_ddl( job_config.destination_encryption_configuration = None return bf_io_bigquery.start_query_with_client( - self.bqclient, sql, job_config, metrics=self._metrics + self.bqclient, sql, job_config=job_config, metrics=self._metrics ) def _create_object_table(self, path: str, connection: str) -> str: diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index b7706d34ca..6a5ba3f4c7 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -40,7 +40,7 @@ IO_ORDERING_ID = "bqdf_row_nums" -MAX_LABELS_COUNT = 64 +MAX_LABELS_COUNT = 64 - 8 _LIST_TABLES_LIMIT = 10000 # calls to bqclient.list_tables # will be limited to this many tables @@ -204,7 +204,12 @@ def format_option(key: str, value: Union[bool, str]) -> str: return f"{key}={repr(value)}" -def add_labels(job_config, api_name: Optional[str] = None): +def add_and_trim_labels(job_config, api_name: Optional[str] = None): + """ + Add additional labels to the job configuration and trim the total number of labels + to ensure they do not exceed the maximum limit allowed by BigQuery, which is 64 + labels per job. + """ api_methods = log_adapter.get_and_reset_api_methods(dry_run=job_config.dry_run) job_config.labels = create_job_configs_labels( job_configs_labels=job_config.labels, @@ -217,7 +222,10 @@ def start_query_with_client( bq_client: bigquery.Client, sql: str, job_config: bigquery.job.QueryJobConfig, + location: Optional[str] = None, + project: Optional[str] = None, max_results: Optional[int] = None, + page_size: Optional[int] = None, timeout: Optional[float] = None, api_name: Optional[str] = None, metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, @@ -225,10 +233,17 @@ def start_query_with_client( """ Starts query job and waits for results. """ - add_labels(job_config, api_name=api_name) - try: - query_job = bq_client.query(sql, job_config=job_config, timeout=timeout) + # Note: Ensure no additional labels are added to job_config after this point, + # as `add_and_trim_labels` ensures the label count does not exceed 64. + add_and_trim_labels(job_config, api_name=api_name) + query_job = bq_client.query( + sql, + job_config=job_config, + location=location, + project=project, + timeout=timeout, + ) except google.api_core.exceptions.Forbidden as ex: if "Drive credentials" in ex.message: ex.message += CHECK_DRIVE_PERMISSIONS @@ -237,10 +252,15 @@ def start_query_with_client( opts = bigframes.options.display if opts.progress_bar is not None and not query_job.configuration.dry_run: results_iterator = formatting_helpers.wait_for_query_job( - query_job, max_results=max_results, progress_bar=opts.progress_bar + query_job, + max_results=max_results, + progress_bar=opts.progress_bar, + page_size=page_size, ) else: - results_iterator = query_job.result(max_results=max_results) + results_iterator = query_job.result( + max_results=max_results, page_size=page_size + ) if metrics is not None: metrics.count_job_stats(query_job) @@ -304,11 +324,15 @@ def create_bq_dataset_reference( bigquery.DatasetReference: The constructed reference to the anonymous dataset. """ job_config = google.cloud.bigquery.QueryJobConfig() - add_labels(job_config, api_name=api_name) - query_job = bq_client.query( - "SELECT 1", location=location, project=project, job_config=job_config + + _, query_job = start_query_with_client( + bq_client, + "SELECT 1", + location=location, + job_config=job_config, + project=project, + api_name=api_name, ) - query_job.result() # blocks until finished # The anonymous dataset is used by BigQuery to write query results and # session tables. BigQuery DataFrames also writes temp tables directly diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index 01476ed113..9ca1fa3117 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -48,7 +48,6 @@ import bigframes.core.schema import bigframes.core.tree_properties as tree_properties import bigframes.features -import bigframes.formatting_helpers as formatting_helpers import bigframes.session._io.bigquery as bq_io import bigframes.session.metrics import bigframes.session.planner @@ -347,10 +346,14 @@ def export_gcs( format=format, export_options=dict(export_options), ) - job_config = bigquery.QueryJobConfig() - bq_io.add_labels(job_config, api_name=f"dataframe-to_{format.lower()}") - export_job = self.bqclient.query(export_data_statement, job_config=job_config) - self._wait_on_job(export_job) + + bq_io.start_query_with_client( + self.bqclient, + export_data_statement, + job_config=bigquery.QueryJobConfig(), + api_name=f"dataframe-to_{format.lower()}", + metrics=self.metrics, + ) return query_job def dry_run( @@ -358,9 +361,7 @@ def dry_run( ) -> bigquery.QueryJob: sql = self.to_sql(array_value, ordered=ordered) job_config = bigquery.QueryJobConfig(dry_run=True) - bq_io.add_labels(job_config) query_job = self.bqclient.query(sql, job_config=job_config) - _ = query_job.result() return query_job def peek( @@ -487,15 +488,19 @@ def _run_execute_query( if not self.strictly_ordered: job_config.labels["bigframes-mode"] = "unordered" - # Note: add_labels is global scope which may have unexpected effects - bq_io.add_labels(job_config, api_name=api_name) + # Note: add_and_trim_labels is global scope which may have unexpected effects + # Ensure no additional labels are added to job_config after this point, + # as `add_and_trim_labels` ensures the label count does not exceed 64. + bq_io.add_and_trim_labels(job_config, api_name=api_name) try: - query_job = self.bqclient.query(sql, job_config=job_config) - return ( - self._wait_on_job( - query_job, max_results=max_results, page_size=page_size - ), - query_job, + return bq_io.start_query_with_client( + self.bqclient, + sql, + job_config=job_config, + api_name=api_name, + max_results=max_results, + page_size=page_size, + metrics=self.metrics, ) except google.api_core.exceptions.BadRequest as e: @@ -506,29 +511,6 @@ def _run_execute_query( else: raise - def _wait_on_job( - self, - query_job: bigquery.QueryJob, - page_size: Optional[int] = None, - max_results: Optional[int] = None, - ) -> bq_table.RowIterator: - opts = bigframes.options.display - if opts.progress_bar is not None and not query_job.configuration.dry_run: - results_iterator = formatting_helpers.wait_for_query_job( - query_job, - progress_bar=opts.progress_bar, - max_results=max_results, - page_size=page_size, - ) - else: - results_iterator = query_job.result( - max_results=max_results, page_size=page_size - ) - - if self.metrics is not None: - self.metrics.count_job_stats(query_job) - return results_iterator - def replace_cached_subtrees(self, node: nodes.BigFrameNode) -> nodes.BigFrameNode: return nodes.top_down( node, lambda x: self._cached_executions.get(x, x), memoize=True diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index e7579b1138..ec922e286d 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -707,9 +707,9 @@ def _start_query( return bf_io_bigquery.start_query_with_client( self._bqclient, sql, - job_config, - max_results, - timeout, + job_config=job_config, + max_results=max_results, + timeout=timeout, api_name=api_name, ) diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index f722ccbe75..960e40465b 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -574,16 +574,10 @@ def test_read_gbq_with_custom_global_labels( bigframes.options.compute.extra_query_labels["test3"] = False query_job = session.read_gbq(scalars_table_id).query_job - job_labels = query_job.labels # type:ignore - expected_labels = {"test1": "1", "test2": "abc", "test3": "false"} - - # All jobs should include a bigframes-api key. See internal issue 336521938. - assert "bigframes-api" in job_labels - - assert all( - job_labels.get(key) == value for key, value in expected_labels.items() - ) + # No real job created from read_gbq, so we should expect 0 labels + assert query_job is not None + assert query_job.labels == {} # No labels outside of the option_context. assert len(bigframes.options.compute.extra_query_labels) == 0 diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index f06578ce03..36caea0c0e 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -26,6 +26,24 @@ from tests.unit import resources +@pytest.fixture(scope="function") +def mock_bq_client(mocker): + mock_client = mocker.Mock(spec=bigquery.Client) + mock_query_job = mocker.Mock(spec=bigquery.QueryJob) + mock_row_iterator = mocker.Mock(spec=bigquery.table.RowIterator) + + mock_query_job.result.return_value = mock_row_iterator + + mock_destination = bigquery.DatasetReference( + project="mock_project", dataset_id="mock_dataset" + ) + mock_query_job.destination = mock_destination + + mock_client.query.return_value = mock_query_job + + return mock_client + + def test_create_job_configs_labels_is_none(): api_methods = ["agg", "series-mode"] labels = io_bq.create_job_configs_labels( @@ -124,7 +142,7 @@ def test_create_job_configs_labels_length_limit_met(): "bigframes-api": "read_pandas", "source": "bigquery-dataframes-temp", } - for i in range(61): + for i in range(53): key = f"bigframes-api-test-{i}" value = f"test{i}" cur_labels[key] = value @@ -141,13 +159,89 @@ def test_create_job_configs_labels_length_limit_met(): job_configs_labels=cur_labels, api_methods=api_methods ) assert labels is not None - assert len(labels) == 64 + assert len(labels) == 56 assert "dataframe-max" in labels.values() assert "dataframe-head" not in labels.values() assert "bigframes-api" in labels.keys() assert "source" in labels.keys() +def test_add_and_trim_labels_length_limit_met(): + log_adapter.get_and_reset_api_methods() + cur_labels = { + "bigframes-api": "read_pandas", + "source": "bigquery-dataframes-temp", + } + for i in range(10): + key = f"bigframes-api-test-{i}" + value = f"test{i}" + cur_labels[key] = value + + df = bpd.DataFrame( + {"col1": [1, 2], "col2": [3, 4]}, session=resources.create_bigquery_session() + ) + + job_config = bigquery.job.QueryJobConfig() + job_config.labels = cur_labels + + df.max() + for _ in range(52): + df.head() + + io_bq.add_and_trim_labels(job_config=job_config) + assert job_config.labels is not None + assert len(job_config.labels) == 56 + assert "dataframe-max" not in job_config.labels.values() + assert "dataframe-head" in job_config.labels.values() + assert "bigframes-api" in job_config.labels.keys() + assert "source" in job_config.labels.keys() + + +@pytest.mark.parametrize( + ("max_results", "timeout", "api_name"), + [(None, None, None), (100, 30.0, "test_api")], +) +def test_start_query_with_client_labels_length_limit_met( + mock_bq_client, max_results, timeout, api_name +): + sql = "select * from abc" + cur_labels = { + "bigframes-api": "read_pandas", + "source": "bigquery-dataframes-temp", + } + for i in range(10): + key = f"bigframes-api-test-{i}" + value = f"test{i}" + cur_labels[key] = value + + df = bpd.DataFrame( + {"col1": [1, 2], "col2": [3, 4]}, session=resources.create_bigquery_session() + ) + + job_config = bigquery.job.QueryJobConfig() + job_config.labels = cur_labels + + df.max() + for _ in range(52): + df.head() + + io_bq.start_query_with_client( + mock_bq_client, + sql, + job_config, + max_results=max_results, + timeout=timeout, + api_name=api_name, + ) + + assert job_config.labels is not None + assert len(job_config.labels) == 56 + assert "dataframe-max" not in job_config.labels.values() + assert "dataframe-head" in job_config.labels.values() + assert "bigframes-api" in job_config.labels.keys() + assert "source" in job_config.labels.keys() + + def test_create_temp_table_default_expiration(): """Make sure the created table has an expiration.""" expiration = datetime.datetime( diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 053ed7b94c..f1565ed536 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -6771,6 +6771,7 @@ def iat(self): **Examples:** >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None >>> df = bpd.DataFrame([[0, 2, 3], [0, 4, 1], [10, 20, 30]], ... columns=['A', 'B', 'C']) >>> bpd.options.display.progress_bar = None @@ -6804,6 +6805,7 @@ def at(self): **Examples:** >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None >>> df = bpd.DataFrame([[0, 2, 3], [0, 4, 1], [10, 20, 30]], ... index=[4, 5, 6], columns=['A', 'B', 'C']) >>> bpd.options.display.progress_bar = None