From 3a9a144c41da472f172ac639e99f5397ebf14f4f Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Thu, 30 May 2024 14:32:19 -0400 Subject: [PATCH] impl(bigquery/read): add arrow bq read example --- bigquery/read/CMakeLists.txt | 30 ++++++ bigquery/read/README.md | 91 ++++++++++++++++ bigquery/read/arrow_read.cc | 167 ++++++++++++++++++++++++++++++ bigquery/read/create_query_job.py | 67 ++++++++++++ bigquery/read/requirements.txt | 23 ++++ bigquery/read/vcpkg.json | 14 +++ 6 files changed, 392 insertions(+) create mode 100644 bigquery/read/CMakeLists.txt create mode 100644 bigquery/read/README.md create mode 100644 bigquery/read/arrow_read.cc create mode 100644 bigquery/read/create_query_job.py create mode 100644 bigquery/read/requirements.txt create mode 100644 bigquery/read/vcpkg.json diff --git a/bigquery/read/CMakeLists.txt b/bigquery/read/CMakeLists.txt new file mode 100644 index 0000000..3c32bdb --- /dev/null +++ b/bigquery/read/CMakeLists.txt @@ -0,0 +1,30 @@ +# ~~~ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ~~~ + +cmake_minimum_required(VERSION 3.20) +set(CMAKE_CXX_STANDARD 20) + +# Define the project name and where to report bugs. +set(PACKAGE_BUGREPORT + "https://github.com/GoogleCloudPlatform/cpp-samples/issues") +project(cpp-samples-bigquery-read CXX) + +find_package(google_cloud_cpp_bigquery REQUIRED) +find_package(Arrow REQUIRED) + +add_executable(arrow_read arrow_read.cc) +target_link_libraries(arrow_read PRIVATE google-cloud-cpp::bigquery + Arrow::arrow_static) diff --git a/bigquery/read/README.md b/bigquery/read/README.md new file mode 100644 index 0000000..f9b3809 --- /dev/null +++ b/bigquery/read/README.md @@ -0,0 +1,91 @@ +# Using BigQuery Storage Read + +Cloud BigQuery is a data platform that allows users to easily create, manage, +share, and query data using SQL. When you want to access your data, you can read +directly from a table. However, if you want to transform the data in a table by +mapping, filtering, or joining, you need to first make a query. When you make a +query, you can specify a table to store the results. Then you can start a read +session for the table via the BigQuery Storage library and read the rows from +the table. + +This example shows how to create a query job using the BigQuery v2 Python API, +and then read the data from the table using the BigQuery Storage C++ API. + +If you are not familiar with the BigQuery v2 API or the BigQuery Storage Read +API, we recommend you first read the [API overview] before starting this guide. + +## Pre-requisites + +You are going to need a Google Cloud project to host the BigQuery dataset and +table used in this example. You will need to install and configure the BigQuery +CLI tool. Follow the [Google Cloud CLI install][install-sdk] instructions, and +then the [quickstart][bigquery cli tool] for the BigQuery CLI tool. + +Verify the CLI is working using a simple command to list the active project: + +```shell +bq show +``` + +### Creating the query job + +The following script uses the BigQuery v2 python client to create a dataset (if +it does not already exist) and a query job. + +``` +python3 -m venv env +source env/bin/activate +pip3 install -r requirements.txt +python3 create_query_job.py --project_id alevenb-test --dataset_name usa_names --table_name top10_names +``` + +## Compiling the Example + +This project uses `vcpkg` to install its dependencies. Clone `vcpkg` in your +`$HOME`: + +```shell +git clone -C $HOME https://github.com/microsoft/vcpkg.git +``` + +Install the typical development tools, on Ubuntu you would use: + +```shell +apt update && apt install -y build-essential cmake git ninja-build pkg-config g++ curl tar zip unzip +``` + +In this directory compile the dependencies and the code, this can take as long +as an hour, depending on the performance of your workstation: + +```shell +cd cpp-samples/bigquery/read +cmake -S . -B .build -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_TOOLCHAIN_FILE=$HOME/vcpkg/scripts/buildsystems/vcpkg.cmake +cmake --build .build +``` + +## Run the sample + +Run the example, replace the `[PROJECT ID]` placeholder with the id of your +project: + +```shell +.build/arrow_read [PROJECT ID] [DATASET_NAME] [TABLE_NAME] +``` + +```shell +.build/arrow_read [PROJECT ID] usa_names top10_names +``` + +## Cleanup + +Remove the table and dataset: + +```shell +bq rm -f usa_names.top10 +bq rm -f usa_names +``` + +[api overview]: https://cloud.google.com/bigquery/docs/reference/storage +[bigquery cli tool]: https://cloud.google.com/bigquery/docs/bq-command-line-tool +[install-sdk]: https://cloud.google.com/sdk/docs/install-sdk diff --git a/bigquery/read/arrow_read.cc b/bigquery/read/arrow_read.cc new file mode 100644 index 0000000..098c1e3 --- /dev/null +++ b/bigquery/read/arrow_read.cc @@ -0,0 +1,167 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/bigquery/storage/v1/bigquery_read_client.h" +#include "google/cloud/project.h" +#include +#include +#include +#include +#include +#include +#include +#include + +namespace { + +std::shared_ptr GetArrowSchema( + ::google::cloud::bigquery::storage::v1::ArrowSchema const& schema_in) { + std::shared_ptr buffer = + std::make_shared(schema_in.serialized_schema()); + arrow::io::BufferReader buffer_reader(buffer); + arrow::ipc::DictionaryMemo dictionary_memo; + auto result = arrow::ipc::ReadSchema(&buffer_reader, &dictionary_memo); + if (!result.ok()) { + std::cout << "Unable to parse schema\n"; + throw result.status(); + } + std::shared_ptr schema = result.ValueOrDie(); + return schema; +} + +std::shared_ptr GetArrowRecordBatch( + ::google::cloud::bigquery::storage::v1::ArrowRecordBatch const& + record_batch_in, + std::shared_ptr schema) { + std::shared_ptr buffer = std::make_shared( + record_batch_in.serialized_record_batch()); + arrow::io::BufferReader buffer_reader(buffer); + arrow::ipc::DictionaryMemo dictionary_memo; + arrow::ipc::IpcReadOptions read_options; + auto result = arrow::ipc::ReadRecordBatch(schema, &dictionary_memo, + read_options, &buffer_reader); + if (!result.ok()) { + std::cout << "Unable to parse record batch\n"; + throw result.status(); + } + std::shared_ptr record_batch = result.ValueOrDie(); + return record_batch; +} + +void ProcessRowsInArrowFormat( + ::google::cloud::bigquery::storage::v1::ArrowSchema const& schema_in, + ::google::cloud::bigquery::storage::v1::ArrowRecordBatch const& + record_batch_in) { + // Get the arrow schema. + std::shared_ptr schema = GetArrowSchema(schema_in); + + // Get the record batch buffer. + std::shared_ptr record_batch = + GetArrowRecordBatch(record_batch_in, schema); + + // Print information about the record batch. + std::cout << std::format("Record batch schema is:\n {}\n", + record_batch->schema()->ToString()); + std::cout << std::format("Record batch has {} cols and {} rows\n", + record_batch->num_columns(), + record_batch->num_rows()); + + // Print each row and column in the record batch. + std::cout << std::setfill(' ') << std::setw(7) << ""; + for (std::int64_t col = 0; col < record_batch->num_columns(); ++col) { + std::cout << std::left << std::setw(12) << record_batch->column_name(col); + } + std::cout << "\n"; + // If you want to see what the result looks like without parsing the + // datatypes, use `record_batch->ToString()` for quick debugging. + for (std::int64_t row = 0; row < record_batch->num_rows(); ++row) { + std::cout << std::format("Row {}: ", row); + + for (std::int64_t col = 0; col < record_batch->num_columns(); ++col) { + std::shared_ptr column = record_batch->column(col); + arrow::Result > result = + column->GetScalar(row); + if (!result.ok()) { + std::cout << "Unable to parse scalar\n"; + throw result.status(); + } + + std::shared_ptr scalar = result.ValueOrDie(); + if (scalar->type->id() == arrow::Type::INT64) { + std::shared_ptr int64_scalar = + std::dynamic_pointer_cast(scalar); + std::cout << std::left << std::setw(9) << int64_scalar->value << " "; + } else if (scalar->type->id() == arrow::Type::STRING) { + std::shared_ptr string_scalar = + std::dynamic_pointer_cast(scalar); + std::cout << std::left << std::setw(9) << string_scalar->view() << " "; + } + } + std::cout << "\n"; + } +} + +} // namespace + +int main(int argc, char* argv[]) try { + if (argc != 4) { + std::cerr << "Usage: " << argv[0] + << " \n"; + return 1; + } + + std::string const project_id = argv[1]; + std::string const dataset_name = argv[2]; + std::string const table_name = argv[3]; + + std::string const table_id = "projects/" + project_id + "/datasets/" + + dataset_name + "/tables/" + table_name; + + // Create a namespace alias to make the code easier to read. + namespace bigquery_storage = ::google::cloud::bigquery_storage_v1; + constexpr int kMaxReadStreams = 1; + // Create the ReadSession. + auto client = bigquery_storage::BigQueryReadClient( + bigquery_storage::MakeBigQueryReadConnection()); + ::google::cloud::bigquery::storage::v1::ReadSession read_session; + read_session.set_data_format( + google::cloud::bigquery::storage::v1::DataFormat::ARROW); + read_session.set_table(table_id); + auto session = + client.CreateReadSession(google::cloud::Project(project_id).FullName(), + read_session, kMaxReadStreams); + if (!session) throw std::move(session).status(); + + // Read rows from the ReadSession. + constexpr int kRowOffset = 0; + auto read_rows = client.ReadRows(session->streams(0).name(), kRowOffset); + + std::int64_t num_rows = 0; + for (auto const& row : read_rows) { + if (row.ok()) { + num_rows += row->row_count(); + ProcessRowsInArrowFormat(session->arrow_schema(), + row->arrow_record_batch()); + } + } + + std::cout << std::format("Read {} rows from table: {}\n", num_rows, table_id); + return 0; +} catch (google::cloud::Status const& status) { + std::cerr << "google::cloud::Status thrown: " << status << "\n"; + return 1; +} catch (arrow::Status const& status) { + std::cerr << "arrow::Status thrown: " << status << "\n"; + return 1; +} diff --git a/bigquery/read/create_query_job.py b/bigquery/read/create_query_job.py new file mode 100644 index 0000000..17c190a --- /dev/null +++ b/bigquery/read/create_query_job.py @@ -0,0 +1,67 @@ +from google.cloud import bigquery +import sys +import argparse +from google.api_core.exceptions import Conflict, NotFound + +def main(args): + # Construct a BigQuery client object. + client = bigquery.Client() + # If it does not already exist, create dataset to store table. + dataset_id = f"{args.project_id}.{args.dataset_name}" + dataset = bigquery.Dataset(dataset_id) + dataset.location = args.dataset_location + try: + dataset = client.create_dataset(dataset, timeout=30) + print("Created dataset {}.{}".format(client.project, dataset.dataset_id)) + except Conflict as e: + if ("ALREADY_EXISTS" in e.details[0]['detail']): + print(f"Dataset {dataset_id} already exists.") + else: + print(f"Unable to create dataset. Error with code {e.code} and message {e.message}") + return + except Exception as e: + print(f"Unable to create dataset. Error with code {e.code} and message {e.message}") + return + + # Verify table exists. + table_id = f"{dataset_id}.{args.table_name}" + try: + table = client.get_table(table_id) + print(f"Table {table_id} already exists. Run script with a new --table_name argument.") + return + except NotFound: + pass + except Exception as e: + print(f"Unable to verify if table exists. Error with code {e.code} and message {e.message}") + return + + # Create query job that writes the top 10 names to a table. + job_config = bigquery.QueryJobConfig(destination=table_id) + sql = """ + SELECT + name, + SUM(number) AS total + FROM + `bigquery-public-data.usa_names.usa_1910_2013` + GROUP BY + name + ORDER BY + total DESC + LIMIT + 10; + """ + + # Start the query, passing in the extra configuration. + query_job = client.query(sql, job_config=job_config) # Make an API request. + query_job.result() # Wait for the job to complete. + + print(f"Query results loaded to the table {table_id}") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="A script to create BigQuery query job.") + parser.add_argument("-p","--project_id", type=str,help="GCP project id") + parser.add_argument("--dataset_name", type=str,help="Dataset name to store the table in") + parser.add_argument("--dataset_location",type=str, default="US") + parser.add_argument("--table_name", type=str,help="Table name to write the query results to") + args = parser.parse_args() + main(args) diff --git a/bigquery/read/requirements.txt b/bigquery/read/requirements.txt new file mode 100644 index 0000000..e750ae8 --- /dev/null +++ b/bigquery/read/requirements.txt @@ -0,0 +1,23 @@ +cachetools==5.3.3 +certifi==2024.2.2 +charset-normalizer==3.3.2 +google-api-core==2.19.0 +google-auth==2.29.0 +google-cloud-bigquery==3.23.1 +google-cloud-core==2.4.1 +google-crc32c==1.5.0 +google-resumable-media==2.7.0 +googleapis-common-protos==1.63.0 +grpcio==1.64.0 +grpcio-status==1.62.2 +idna==3.7 +packaging==24.0 +proto-plus==1.23.0 +protobuf==4.25.3 +pyasn1==0.6.0 +pyasn1_modules==0.4.0 +python-dateutil==2.9.0.post0 +requests==2.32.3 +rsa==4.9 +six==1.16.0 +urllib3==2.2.1 diff --git a/bigquery/read/vcpkg.json b/bigquery/read/vcpkg.json new file mode 100644 index 0000000..eb8ec84 --- /dev/null +++ b/bigquery/read/vcpkg.json @@ -0,0 +1,14 @@ +{ + "name": "gcp-cpp-samples-bigquery-read", + "version-string": "unversioned", + "homepage": "https://github.com/GoogleCloudPlatform/cpp-samples/", + "description": "An example using the BigQuery Storage Read API", + "dependencies": [ + { + "name": "google-cloud-cpp", + "default-features": false, + "features": ["bigquery"] + }, + "arrow" + ] +}