Skip to content

Commit

Permalink
feat: updates to allow users to set max_stream_count (#2039)
Browse files Browse the repository at this point in the history
Adds a function `determine_requested_streams()` to compare `preserve_order` and the new argument `max_stream_count` to determine how many streams to request.

```
preserve_order (bool): Whether to preserve the order of streams. If True,
    this limits the number of streams to one (more than one cannot guarantee order).
max_stream_count (Union[int, None]]): The maximum number of streams
    allowed. Must be a non-negative number or None, where None indicates
    the value is unset. If `max_stream_count` is set, it overrides
    `preserve_order`.
```

Fixes #2030 🦕
  • Loading branch information
chalmerlowe authored Oct 10, 2024
1 parent 1d8d0a0 commit 7372ad6
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 19 deletions.
118 changes: 99 additions & 19 deletions google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import logging
import queue
import warnings
from typing import Any, Union
from typing import Any, Union, Optional, Callable, Generator, List


from google.cloud.bigquery import _pyarrow_helpers
from google.cloud.bigquery import _versions_helpers
from google.cloud.bigquery import schema


try:
import pandas # type: ignore

Expand Down Expand Up @@ -75,7 +76,7 @@ def _to_wkb(v):
_to_wkb = _to_wkb()

try:
from google.cloud.bigquery_storage import ArrowSerializationOptions
from google.cloud.bigquery_storage_v1.types import ArrowSerializationOptions
except ImportError:
_ARROW_COMPRESSION_SUPPORT = False
else:
Expand Down Expand Up @@ -816,18 +817,54 @@ def _nowait(futures):


def _download_table_bqstorage(
project_id,
table,
bqstorage_client,
preserve_order=False,
selected_fields=None,
page_to_item=None,
max_queue_size=_MAX_QUEUE_SIZE_DEFAULT,
):
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""
project_id: str,
table: Any,
bqstorage_client: Any,
preserve_order: bool = False,
selected_fields: Optional[List[Any]] = None,
page_to_item: Optional[Callable] = None,
max_queue_size: Any = _MAX_QUEUE_SIZE_DEFAULT,
max_stream_count: Optional[int] = None,
) -> Generator[Any, None, None]:
"""Downloads a BigQuery table using the BigQuery Storage API.
This method uses the faster, but potentially more expensive, BigQuery
Storage API to download a table as a Pandas DataFrame. It supports
parallel downloads and optional data transformations.
Args:
project_id (str): The ID of the Google Cloud project containing
the table.
table (Any): The BigQuery table to download.
bqstorage_client (Any): An
authenticated BigQuery Storage API client.
preserve_order (bool, optional): Whether to preserve the order
of the rows as they are read from BigQuery. If True this limits
the number of streams to one and overrides `max_stream_count`.
Defaults to False.
selected_fields (Optional[List[SchemaField]]):
A list of BigQuery schema fields to select for download. If None,
all fields are downloaded. Defaults to None.
page_to_item (Optional[Callable]): An optional callable
function that takes a page of data from the BigQuery Storage API
max_stream_count (Optional[int]): The maximum number of
concurrent streams to use for downloading data. If `preserve_order`
is True, the requested streams are limited to 1 regardless of the
`max_stream_count` value. If 0 or None, then the number of
requested streams will be unbounded. Defaults to None.
Yields:
pandas.DataFrame: Pandas DataFrames, one for each chunk of data
downloaded from BigQuery.
Raises:
ValueError: If attempting to read from a specific partition or snapshot.
Note:
This method requires the `google-cloud-bigquery-storage` library
to be installed.
"""

# Passing a BQ Storage client in implies that the BigQuery Storage library
# is available and can be imported.
from google.cloud import bigquery_storage

if "$" in table.table_id:
Expand All @@ -837,18 +874,20 @@ def _download_table_bqstorage(
if "@" in table.table_id:
raise ValueError("Reading from a specific snapshot is not currently supported.")

requested_streams = 1 if preserve_order else 0
requested_streams = determine_requested_streams(preserve_order, max_stream_count)

requested_session = bigquery_storage.types.ReadSession(
table=table.to_bqstorage(), data_format=bigquery_storage.types.DataFormat.ARROW
requested_session = bigquery_storage.types.stream.ReadSession(
table=table.to_bqstorage(),
data_format=bigquery_storage.types.stream.DataFormat.ARROW,
)
if selected_fields is not None:
for field in selected_fields:
requested_session.read_options.selected_fields.append(field.name)

if _ARROW_COMPRESSION_SUPPORT:
requested_session.read_options.arrow_serialization_options.buffer_compression = (
ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
# CompressionCodec(1) -> LZ4_FRAME
ArrowSerializationOptions.CompressionCodec(1)
)

session = bqstorage_client.create_read_session(
Expand Down Expand Up @@ -884,7 +923,7 @@ def _download_table_bqstorage(
elif max_queue_size is None:
max_queue_size = 0 # unbounded

worker_queue = queue.Queue(maxsize=max_queue_size)
worker_queue: queue.Queue[int] = queue.Queue(maxsize=max_queue_size)

with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool:
try:
Expand All @@ -910,7 +949,7 @@ def _download_table_bqstorage(
# we want to block on the queue's get method, instead. This
# prevents the queue from filling up, because the main thread
# has smaller gaps in time between calls to the queue's get
# method. For a detailed explaination, see:
# method. For a detailed explanation, see:
# https://friendliness.dev/2019/06/18/python-nowait/
done, not_done = _nowait(not_done)
for future in done:
Expand Down Expand Up @@ -949,6 +988,7 @@ def download_arrow_bqstorage(
preserve_order=False,
selected_fields=None,
max_queue_size=_MAX_QUEUE_SIZE_DEFAULT,
max_stream_count=None,
):
return _download_table_bqstorage(
project_id,
Expand All @@ -958,6 +998,7 @@ def download_arrow_bqstorage(
selected_fields=selected_fields,
page_to_item=_bqstorage_page_to_arrow,
max_queue_size=max_queue_size,
max_stream_count=max_stream_count,
)


Expand All @@ -970,6 +1011,7 @@ def download_dataframe_bqstorage(
preserve_order=False,
selected_fields=None,
max_queue_size=_MAX_QUEUE_SIZE_DEFAULT,
max_stream_count=None,
):
page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes)
return _download_table_bqstorage(
Expand All @@ -980,6 +1022,7 @@ def download_dataframe_bqstorage(
selected_fields=selected_fields,
page_to_item=page_to_item,
max_queue_size=max_queue_size,
max_stream_count=max_stream_count,
)


Expand Down Expand Up @@ -1024,3 +1067,40 @@ def verify_pandas_imports():
raise ValueError(_NO_PANDAS_ERROR) from pandas_import_exception
if db_dtypes is None:
raise ValueError(_NO_DB_TYPES_ERROR) from db_dtypes_import_exception


def determine_requested_streams(
preserve_order: bool,
max_stream_count: Union[int, None],
) -> int:
"""Determines the value of requested_streams based on the values of
`preserve_order` and `max_stream_count`.
Args:
preserve_order (bool): Whether to preserve the order of streams. If True,
this limits the number of streams to one. `preserve_order` takes
precedence over `max_stream_count`.
max_stream_count (Union[int, None]]): The maximum number of streams
allowed. Must be a non-negative number or None, where None indicates
the value is unset. NOTE: if `preserve_order` is also set, it takes
precedence over `max_stream_count`, thus to ensure that `max_stream_count`
is used, ensure that `preserve_order` is None.
Returns:
(int) The appropriate value for requested_streams.
"""

if preserve_order:
# If preserve order is set, it takes precendence.
# Limit the requested streams to 1, to ensure that order
# is preserved)
return 1

elif max_stream_count is not None:
# If preserve_order is not set, only then do we consider max_stream_count
if max_stream_count <= -1:
raise ValueError("max_stream_count must be non-negative OR None")
return max_stream_count

# Default to zero requested streams (unbounded).
return 0
31 changes: 31 additions & 0 deletions tests/unit/test__pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import functools
import operator
import queue
from typing import Union
from unittest import mock
import warnings

Expand Down Expand Up @@ -46,6 +47,7 @@
from google.cloud.bigquery import _pyarrow_helpers
from google.cloud.bigquery import _versions_helpers
from google.cloud.bigquery import schema
from google.cloud.bigquery._pandas_helpers import determine_requested_streams

pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import()

Expand Down Expand Up @@ -2053,3 +2055,32 @@ def test_verify_pandas_imports_no_db_dtypes(module_under_test, monkeypatch):
monkeypatch.setattr(module_under_test, "db_dtypes", None)
with pytest.raises(ValueError, match="Please install the 'db-dtypes' package"):
module_under_test.verify_pandas_imports()


@pytest.mark.parametrize(
"preserve_order, max_stream_count, expected_requested_streams",
[
# If preserve_order is set/True, it takes precedence:
(True, 10, 1), # use 1
(True, None, 1), # use 1
# If preserve_order is not set check max_stream_count:
(False, 10, 10), # max_stream_count (X) takes precedence
(False, None, 0), # Unbounded (0) when both are unset
],
)
def test_determine_requested_streams(
preserve_order: bool,
max_stream_count: Union[int, None],
expected_requested_streams: int,
):
"""Tests various combinations of preserve_order and max_stream_count."""
actual_requested_streams = determine_requested_streams(
preserve_order, max_stream_count
)
assert actual_requested_streams == expected_requested_streams


def test_determine_requested_streams_invalid_max_stream_count():
"""Tests that a ValueError is raised if max_stream_count is negative."""
with pytest.raises(ValueError):
determine_requested_streams(preserve_order=False, max_stream_count=-1)

0 comments on commit 7372ad6

Please sign in to comment.