Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deps: use pandas-gbq to determine schema in load_table_from_dataframe #2095

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Shared helper functions for connecting BigQuery and pandas."""
"""Shared helper functions for connecting BigQuery and pandas.

NOTE: This module is DEPRECATED. Please make updates in the pandas-gbq package,
instead. See: go/pandas-gbq-and-bigframes-redundancy and
https://github.com/googleapis/python-bigquery-pandas/blob/main/pandas_gbq/schema/pandas_to_bigquery.py
"""

import concurrent.futures
from datetime import datetime
Expand All @@ -39,6 +44,16 @@
else:
import numpy


try:
import pandas_gbq.schema.pandas_to_bigquery # type: ignore

pandas_gbq_import_exception = None
except ImportError as exc:
pandas_gbq = None
pandas_gbq_import_exception = exc


try:
import db_dtypes # type: ignore

Expand Down Expand Up @@ -429,6 +444,10 @@ def _first_array_valid(series):
def dataframe_to_bq_schema(dataframe, bq_schema):
"""Convert a pandas DataFrame schema to a BigQuery schema.

DEPRECATED: Use
pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields(),
instead. See: go/pandas-gbq-and-bigframes-redundancy.

Args:
dataframe (pandas.DataFrame):
DataFrame for which the client determines the BigQuery schema.
Expand All @@ -444,6 +463,20 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
The automatically determined schema. Returns None if the type of
any column cannot be determined.
"""
if pandas_gbq is None:
warnings.warn(
"Loading pandas DataFrame into BigQuery will require pandas-gbq "
"package version 0.26.1 or greater in the future. "
f"Tried to import pandas-gbq and got: {pandas_gbq_import_exception}",
category=FutureWarning,
)
else:
return pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields(
dataframe,
override_bigquery_fields=bq_schema,
index=True,
)

if bq_schema:
bq_schema = schema._to_schema_fields(bq_schema)
bq_schema_index = {field.name: field for field in bq_schema}
Expand Down
7 changes: 6 additions & 1 deletion google/cloud/bigquery/_pyarrow_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Shared helper functions for connecting BigQuery and pyarrow."""
"""Shared helper functions for connecting BigQuery and pyarrow.

NOTE: This module is DEPRECATED. Please make updates in the pandas-gbq package,
instead. See: go/pandas-gbq-and-bigframes-redundancy and
https://github.com/googleapis/python-bigquery-pandas/blob/main/pandas_gbq/schema/pyarrow_to_bigquery.py
"""

from typing import Any

Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ bqstorage = [
]
pandas = [
"pandas >= 1.1.0",
"pandas-gbq >= 0.26.1; python_version >= '3.8'",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this means we are testing without pandas-gbq on Python 3.7.

"grpcio >= 1.47.0, < 2.0dev",
"grpcio >= 1.49.1, < 2.0dev; python_version >= '3.11'",
"pyarrow >= 3.0.0",
"db-dtypes >= 0.3.0, < 2.0.0dev",
"importlib_metadata >= 1.0.0; python_version < '3.8'",
Expand Down
9 changes: 9 additions & 0 deletions testing/constraints-3.8.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@
grpcio==1.47.0
pandas==1.2.0

# This constraints file is used to check that lower bounds
# are correct in setup.py
#
# Pin the version to the lower bound.
#
# e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev",
# Then this file should have foo==1.14.0
pandas-gbq==0.26.1
2 changes: 1 addition & 1 deletion tests/system/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -1259,7 +1259,7 @@ def test_upload_time_and_datetime_56(bigquery_client, dataset_id):
df = pandas.DataFrame(
dict(
dt=[
datetime.datetime(2020, 1, 8, 8, 0, 0),
datetime.datetime(2020, 1, 8, 8, 0, 0, tzinfo=datetime.timezone.utc),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure is we should have defined what happens when you mix and match naive datetime (DATETIME in BQ) with datetime + timezone (TIMESTAMP in BQ). Since the test expects TIMESTAMP, I update this to be a datetime + timezone.

We still mix/match timezones, so I think that part is important to make sure we end up returning UTC values.

datetime.datetime(
2020,
1,
Expand Down
65 changes: 54 additions & 11 deletions tests/unit/test__pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
except ImportError:
pandas = None

try:
import pandas_gbq.schema.pandas_to_bigquery
except ImportError:
pandas_gbq = None

try:
import geopandas
except ImportError:
Expand Down Expand Up @@ -1280,7 +1285,21 @@ def test_dataframe_to_parquet_compression_method(module_under_test):


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
def test_dataframe_to_bq_schema_w_named_index(module_under_test):
@pytest.mark.skipif(pandas_gbq is None, reason="Requires `pandas-gbq`")
def test_dataframe_to_bq_schema_returns_schema_with_pandas_gbq(
module_under_test, monkeypatch
):
monkeypatch.setattr(module_under_test, "pandas_gbq", None)
dataframe = pandas.DataFrame({"field00": ["foo", "bar"]})
got = module_under_test.dataframe_to_bq_schema(dataframe, [])
# Don't assert beyond this, since pandas-gbq is now source of truth.
assert got is not None


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
def test_dataframe_to_bq_schema_w_named_index(module_under_test, monkeypatch):
monkeypatch.setattr(module_under_test, "pandas_gbq", None)

df_data = collections.OrderedDict(
[
("str_column", ["hello", "world"]),
Expand All @@ -1291,7 +1310,8 @@ def test_dataframe_to_bq_schema_w_named_index(module_under_test):
index = pandas.Index(["a", "b"], name="str_index")
dataframe = pandas.DataFrame(df_data, index=index)

returned_schema = module_under_test.dataframe_to_bq_schema(dataframe, [])
with pytest.warns(FutureWarning, match="pandas-gbq"):
returned_schema = module_under_test.dataframe_to_bq_schema(dataframe, [])

expected_schema = (
schema.SchemaField("str_index", "STRING", "NULLABLE"),
Expand All @@ -1303,7 +1323,9 @@ def test_dataframe_to_bq_schema_w_named_index(module_under_test):


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
def test_dataframe_to_bq_schema_w_multiindex(module_under_test):
def test_dataframe_to_bq_schema_w_multiindex(module_under_test, monkeypatch):
monkeypatch.setattr(module_under_test, "pandas_gbq", None)

df_data = collections.OrderedDict(
[
("str_column", ["hello", "world"]),
Expand All @@ -1320,7 +1342,8 @@ def test_dataframe_to_bq_schema_w_multiindex(module_under_test):
)
dataframe = pandas.DataFrame(df_data, index=index)

returned_schema = module_under_test.dataframe_to_bq_schema(dataframe, [])
with pytest.warns(FutureWarning, match="pandas-gbq"):
returned_schema = module_under_test.dataframe_to_bq_schema(dataframe, [])

expected_schema = (
schema.SchemaField("str_index", "STRING", "NULLABLE"),
Expand All @@ -1334,7 +1357,9 @@ def test_dataframe_to_bq_schema_w_multiindex(module_under_test):


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
def test_dataframe_to_bq_schema_w_bq_schema(module_under_test):
def test_dataframe_to_bq_schema_w_bq_schema(module_under_test, monkeypatch):
monkeypatch.setattr(module_under_test, "pandas_gbq", None)

df_data = collections.OrderedDict(
[
("str_column", ["hello", "world"]),
Expand All @@ -1349,7 +1374,10 @@ def test_dataframe_to_bq_schema_w_bq_schema(module_under_test):
{"name": "bool_column", "type": "BOOL", "mode": "REQUIRED"},
]

returned_schema = module_under_test.dataframe_to_bq_schema(dataframe, dict_schema)
with pytest.warns(FutureWarning, match="pandas-gbq"):
returned_schema = module_under_test.dataframe_to_bq_schema(
dataframe, dict_schema
)

expected_schema = (
schema.SchemaField("str_column", "STRING", "NULLABLE"),
Expand All @@ -1360,7 +1388,11 @@ def test_dataframe_to_bq_schema_w_bq_schema(module_under_test):


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
def test_dataframe_to_bq_schema_fallback_needed_wo_pyarrow(module_under_test):
def test_dataframe_to_bq_schema_fallback_needed_wo_pyarrow(
module_under_test, monkeypatch
):
monkeypatch.setattr(module_under_test, "pandas_gbq", None)

dataframe = pandas.DataFrame(
data=[
{"id": 10, "status": "FOO", "execution_date": datetime.date(2019, 5, 10)},
Expand Down Expand Up @@ -1388,7 +1420,11 @@ def test_dataframe_to_bq_schema_fallback_needed_wo_pyarrow(module_under_test):

@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`")
def test_dataframe_to_bq_schema_fallback_needed_w_pyarrow(module_under_test):
def test_dataframe_to_bq_schema_fallback_needed_w_pyarrow(
module_under_test, monkeypatch
):
monkeypatch.setattr(module_under_test, "pandas_gbq", None)

dataframe = pandas.DataFrame(
data=[
{"id": 10, "status": "FOO", "created_at": datetime.date(2019, 5, 10)},
Expand Down Expand Up @@ -1418,7 +1454,9 @@ def test_dataframe_to_bq_schema_fallback_needed_w_pyarrow(module_under_test):

@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`")
def test_dataframe_to_bq_schema_pyarrow_fallback_fails(module_under_test):
def test_dataframe_to_bq_schema_pyarrow_fallback_fails(module_under_test, monkeypatch):
monkeypatch.setattr(module_under_test, "pandas_gbq", None)

dataframe = pandas.DataFrame(
data=[
{"struct_field": {"one": 2}, "status": "FOO"},
Expand All @@ -1442,9 +1480,11 @@ def test_dataframe_to_bq_schema_pyarrow_fallback_fails(module_under_test):


@pytest.mark.skipif(geopandas is None, reason="Requires `geopandas`")
def test_dataframe_to_bq_schema_geography(module_under_test):
def test_dataframe_to_bq_schema_geography(module_under_test, monkeypatch):
from shapely import wkt

monkeypatch.setattr(module_under_test, "pandas_gbq", None)

df = geopandas.GeoDataFrame(
pandas.DataFrame(
dict(
Expand All @@ -1455,7 +1495,10 @@ def test_dataframe_to_bq_schema_geography(module_under_test):
),
geometry="geo1",
)
bq_schema = module_under_test.dataframe_to_bq_schema(df, [])

with pytest.warns(FutureWarning, match="pandas-gbq"):
bq_schema = module_under_test.dataframe_to_bq_schema(df, [])

assert bq_schema == (
schema.SchemaField("name", "STRING"),
schema.SchemaField("geo1", "GEOGRAPHY"),
Expand Down
33 changes: 25 additions & 8 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8391,8 +8391,12 @@ def test_load_table_from_dataframe_w_automatic_schema_detection_fails(self):
autospec=True,
side_effect=google.api_core.exceptions.NotFound("Table not found"),
)
pandas_gbq_patch = mock.patch(
"google.cloud.bigquery._pandas_helpers.pandas_gbq",
new=None,
)

with load_patch as load_table_from_file, get_table_patch:
with load_patch as load_table_from_file, get_table_patch, pandas_gbq_patch:
with warnings.catch_warnings(record=True) as warned:
client.load_table_from_dataframe(
dataframe, self.TABLE_REF, location=self.LOCATION
Expand Down Expand Up @@ -8448,7 +8452,6 @@ def test_load_table_from_dataframe_w_index_and_auto_schema(self):
load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
)

get_table_patch = mock.patch(
"google.cloud.bigquery.client.Client.get_table",
autospec=True,
Expand All @@ -8460,6 +8463,7 @@ def test_load_table_from_dataframe_w_index_and_auto_schema(self):
]
),
)

with load_patch as load_table_from_file, get_table_patch:
client.load_table_from_dataframe(
dataframe, self.TABLE_REF, location=self.LOCATION
Expand Down Expand Up @@ -8580,10 +8584,10 @@ def test_load_table_from_dataframe_w_nullable_int64_datatype_automatic_schema(se

client = self._make_client()
dataframe = pandas.DataFrame({"x": [1, 2, None, 4]}, dtype="Int64")

load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
)

get_table_patch = mock.patch(
"google.cloud.bigquery.client.Client.get_table",
autospec=True,
Expand Down Expand Up @@ -8612,8 +8616,11 @@ def test_load_table_from_dataframe_w_nullable_int64_datatype_automatic_schema(se

sent_config = load_table_from_file.mock_calls[0][2]["job_config"]
assert sent_config.source_format == job.SourceFormat.PARQUET
assert tuple(sent_config.schema) == (
SchemaField("x", "INT64", "NULLABLE", None),
assert (
# Accept either the GoogleSQL or legacy SQL type name from pandas-gbq.
tuple(sent_config.schema) == (SchemaField("x", "INT64", "NULLABLE", None),)
or tuple(sent_config.schema)
== (SchemaField("x", "INTEGER", "NULLABLE", None),)
)

def test_load_table_from_dataframe_struct_fields(self):
Expand Down Expand Up @@ -8759,14 +8766,22 @@ def test_load_table_from_dataframe_array_fields_w_auto_schema(self):
data=records, columns=["float_column", "array_column"]
)

expected_schema = [
expected_schema_googlesql = [
SchemaField("float_column", "FLOAT"),
SchemaField(
"array_column",
"INT64",
mode="REPEATED",
),
]
expected_schema_legacy_sql = [
SchemaField("float_column", "FLOAT"),
SchemaField(
"array_column",
"INTEGER",
mode="REPEATED",
),
]

load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
Expand Down Expand Up @@ -8802,7 +8817,10 @@ def test_load_table_from_dataframe_array_fields_w_auto_schema(self):

sent_config = load_table_from_file.mock_calls[0][2]["job_config"]
assert sent_config.source_format == job.SourceFormat.PARQUET
assert sent_config.schema == expected_schema
assert (
sent_config.schema == expected_schema_googlesql
or sent_config.schema == expected_schema_legacy_sql
)

def test_load_table_from_dataframe_w_partial_schema(self):
pandas = pytest.importorskip("pandas")
Expand Down Expand Up @@ -8922,7 +8940,6 @@ def test_load_table_from_dataframe_w_partial_schema_extra_types(self):

load_table_from_file.assert_not_called()
message = str(exc_context.value)
assert "bq_schema contains fields not present in dataframe" in message
assert "unknown_col" in message

def test_load_table_from_dataframe_w_schema_arrow_custom_compression(self):
Expand Down