Skip to content

Commit

Permalink
impl(bigquery/read): add arrow bq read example
Browse files Browse the repository at this point in the history
  • Loading branch information
alevenberg committed May 30, 2024
1 parent cbed350 commit 3a9a144
Show file tree
Hide file tree
Showing 6 changed files with 392 additions and 0 deletions.
30 changes: 30 additions & 0 deletions bigquery/read/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# ~~~
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ~~~

cmake_minimum_required(VERSION 3.20)
set(CMAKE_CXX_STANDARD 20)

# Define the project name and where to report bugs.
set(PACKAGE_BUGREPORT
"https://github.com/GoogleCloudPlatform/cpp-samples/issues")
project(cpp-samples-bigquery-read CXX)

find_package(google_cloud_cpp_bigquery REQUIRED)
find_package(Arrow REQUIRED)

add_executable(arrow_read arrow_read.cc)
target_link_libraries(arrow_read PRIVATE google-cloud-cpp::bigquery
Arrow::arrow_static)
91 changes: 91 additions & 0 deletions bigquery/read/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Using BigQuery Storage Read

Cloud BigQuery is a data platform that allows users to easily create, manage,
share, and query data using SQL. When you want to access your data, you can read
directly from a table. However, if you want to transform the data in a table by
mapping, filtering, or joining, you need to first make a query. When you make a
query, you can specify a table to store the results. Then you can start a read
session for the table via the BigQuery Storage library and read the rows from
the table.

This example shows how to create a query job using the BigQuery v2 Python API,
and then read the data from the table using the BigQuery Storage C++ API.

If you are not familiar with the BigQuery v2 API or the BigQuery Storage Read
API, we recommend you first read the [API overview] before starting this guide.

## Pre-requisites

You are going to need a Google Cloud project to host the BigQuery dataset and
table used in this example. You will need to install and configure the BigQuery
CLI tool. Follow the [Google Cloud CLI install][install-sdk] instructions, and
then the [quickstart][bigquery cli tool] for the BigQuery CLI tool.

Verify the CLI is working using a simple command to list the active project:

```shell
bq show
```

### Creating the query job

The following script uses the BigQuery v2 python client to create a dataset (if
it does not already exist) and a query job.

```
python3 -m venv env
source env/bin/activate
pip3 install -r requirements.txt
python3 create_query_job.py --project_id alevenb-test --dataset_name usa_names --table_name top10_names
```

## Compiling the Example

This project uses `vcpkg` to install its dependencies. Clone `vcpkg` in your
`$HOME`:

```shell
git clone -C $HOME https://github.com/microsoft/vcpkg.git
```

Install the typical development tools, on Ubuntu you would use:

```shell
apt update && apt install -y build-essential cmake git ninja-build pkg-config g++ curl tar zip unzip
```

In this directory compile the dependencies and the code, this can take as long
as an hour, depending on the performance of your workstation:

```shell
cd cpp-samples/bigquery/read
cmake -S . -B .build -DCMAKE_BUILD_TYPE=Release \
-DCMAKE_TOOLCHAIN_FILE=$HOME/vcpkg/scripts/buildsystems/vcpkg.cmake
cmake --build .build
```

## Run the sample

Run the example, replace the `[PROJECT ID]` placeholder with the id of your
project:

```shell
.build/arrow_read [PROJECT ID] [DATASET_NAME] [TABLE_NAME]
```

```shell
.build/arrow_read [PROJECT ID] usa_names top10_names
```

## Cleanup

Remove the table and dataset:

```shell
bq rm -f usa_names.top10
bq rm -f usa_names
```

[api overview]: https://cloud.google.com/bigquery/docs/reference/storage
[bigquery cli tool]: https://cloud.google.com/bigquery/docs/bq-command-line-tool
[install-sdk]: https://cloud.google.com/sdk/docs/install-sdk
167 changes: 167 additions & 0 deletions bigquery/read/arrow_read.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "google/cloud/bigquery/storage/v1/bigquery_read_client.h"
#include "google/cloud/project.h"
#include <arrow/api.h>
#include <arrow/array/data.h>
#include <arrow/io/memory.h>
#include <arrow/ipc/api.h>
#include <arrow/record_batch.h>
#include <arrow/status.h>
#include <format>
#include <iostream>

namespace {

std::shared_ptr<arrow::Schema> GetArrowSchema(
::google::cloud::bigquery::storage::v1::ArrowSchema const& schema_in) {
std::shared_ptr<arrow::Buffer> buffer =
std::make_shared<arrow::Buffer>(schema_in.serialized_schema());
arrow::io::BufferReader buffer_reader(buffer);
arrow::ipc::DictionaryMemo dictionary_memo;
auto result = arrow::ipc::ReadSchema(&buffer_reader, &dictionary_memo);
if (!result.ok()) {
std::cout << "Unable to parse schema\n";
throw result.status();
}
std::shared_ptr<arrow::Schema> schema = result.ValueOrDie();
return schema;
}

std::shared_ptr<arrow::RecordBatch> GetArrowRecordBatch(
::google::cloud::bigquery::storage::v1::ArrowRecordBatch const&
record_batch_in,
std::shared_ptr<arrow::Schema> schema) {
std::shared_ptr<arrow::Buffer> buffer = std::make_shared<arrow::Buffer>(
record_batch_in.serialized_record_batch());
arrow::io::BufferReader buffer_reader(buffer);
arrow::ipc::DictionaryMemo dictionary_memo;
arrow::ipc::IpcReadOptions read_options;
auto result = arrow::ipc::ReadRecordBatch(schema, &dictionary_memo,
read_options, &buffer_reader);
if (!result.ok()) {
std::cout << "Unable to parse record batch\n";
throw result.status();
}
std::shared_ptr<arrow::RecordBatch> record_batch = result.ValueOrDie();
return record_batch;
}

void ProcessRowsInArrowFormat(
::google::cloud::bigquery::storage::v1::ArrowSchema const& schema_in,
::google::cloud::bigquery::storage::v1::ArrowRecordBatch const&
record_batch_in) {
// Get the arrow schema.
std::shared_ptr<arrow::Schema> schema = GetArrowSchema(schema_in);

// Get the record batch buffer.
std::shared_ptr<arrow::RecordBatch> record_batch =
GetArrowRecordBatch(record_batch_in, schema);

// Print information about the record batch.
std::cout << std::format("Record batch schema is:\n {}\n",
record_batch->schema()->ToString());
std::cout << std::format("Record batch has {} cols and {} rows\n",
record_batch->num_columns(),
record_batch->num_rows());

// Print each row and column in the record batch.
std::cout << std::setfill(' ') << std::setw(7) << "";
for (std::int64_t col = 0; col < record_batch->num_columns(); ++col) {
std::cout << std::left << std::setw(12) << record_batch->column_name(col);
}
std::cout << "\n";
// If you want to see what the result looks like without parsing the
// datatypes, use `record_batch->ToString()` for quick debugging.
for (std::int64_t row = 0; row < record_batch->num_rows(); ++row) {
std::cout << std::format("Row {}: ", row);

for (std::int64_t col = 0; col < record_batch->num_columns(); ++col) {
std::shared_ptr<arrow::Array> column = record_batch->column(col);
arrow::Result<std::shared_ptr<arrow::Scalar> > result =
column->GetScalar(row);
if (!result.ok()) {
std::cout << "Unable to parse scalar\n";
throw result.status();
}

std::shared_ptr<arrow::Scalar> scalar = result.ValueOrDie();
if (scalar->type->id() == arrow::Type::INT64) {
std::shared_ptr<arrow::Int64Scalar> int64_scalar =
std::dynamic_pointer_cast<arrow::Int64Scalar>(scalar);
std::cout << std::left << std::setw(9) << int64_scalar->value << " ";
} else if (scalar->type->id() == arrow::Type::STRING) {
std::shared_ptr<arrow::StringScalar> string_scalar =
std::dynamic_pointer_cast<arrow::StringScalar>(scalar);
std::cout << std::left << std::setw(9) << string_scalar->view() << " ";
}
}
std::cout << "\n";
}
}

} // namespace

int main(int argc, char* argv[]) try {
if (argc != 4) {
std::cerr << "Usage: " << argv[0]
<< " <project-id> <dataset-name> <table-name>\n";
return 1;
}

std::string const project_id = argv[1];
std::string const dataset_name = argv[2];
std::string const table_name = argv[3];

std::string const table_id = "projects/" + project_id + "/datasets/" +
dataset_name + "/tables/" + table_name;

// Create a namespace alias to make the code easier to read.
namespace bigquery_storage = ::google::cloud::bigquery_storage_v1;
constexpr int kMaxReadStreams = 1;
// Create the ReadSession.
auto client = bigquery_storage::BigQueryReadClient(
bigquery_storage::MakeBigQueryReadConnection());
::google::cloud::bigquery::storage::v1::ReadSession read_session;
read_session.set_data_format(
google::cloud::bigquery::storage::v1::DataFormat::ARROW);
read_session.set_table(table_id);
auto session =
client.CreateReadSession(google::cloud::Project(project_id).FullName(),
read_session, kMaxReadStreams);
if (!session) throw std::move(session).status();

// Read rows from the ReadSession.
constexpr int kRowOffset = 0;
auto read_rows = client.ReadRows(session->streams(0).name(), kRowOffset);

std::int64_t num_rows = 0;
for (auto const& row : read_rows) {
if (row.ok()) {
num_rows += row->row_count();
ProcessRowsInArrowFormat(session->arrow_schema(),
row->arrow_record_batch());
}
}

std::cout << std::format("Read {} rows from table: {}\n", num_rows, table_id);
return 0;
} catch (google::cloud::Status const& status) {
std::cerr << "google::cloud::Status thrown: " << status << "\n";
return 1;
} catch (arrow::Status const& status) {
std::cerr << "arrow::Status thrown: " << status << "\n";
return 1;
}
67 changes: 67 additions & 0 deletions bigquery/read/create_query_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from google.cloud import bigquery
import sys
import argparse
from google.api_core.exceptions import Conflict, NotFound

def main(args):
# Construct a BigQuery client object.
client = bigquery.Client()
# If it does not already exist, create dataset to store table.
dataset_id = f"{args.project_id}.{args.dataset_name}"
dataset = bigquery.Dataset(dataset_id)
dataset.location = args.dataset_location
try:
dataset = client.create_dataset(dataset, timeout=30)
print("Created dataset {}.{}".format(client.project, dataset.dataset_id))
except Conflict as e:
if ("ALREADY_EXISTS" in e.details[0]['detail']):
print(f"Dataset {dataset_id} already exists.")
else:
print(f"Unable to create dataset. Error with code {e.code} and message {e.message}")
return
except Exception as e:
print(f"Unable to create dataset. Error with code {e.code} and message {e.message}")
return

# Verify table exists.
table_id = f"{dataset_id}.{args.table_name}"
try:
table = client.get_table(table_id)
print(f"Table {table_id} already exists. Run script with a new --table_name argument.")
return
except NotFound:
pass
except Exception as e:
print(f"Unable to verify if table exists. Error with code {e.code} and message {e.message}")
return

# Create query job that writes the top 10 names to a table.
job_config = bigquery.QueryJobConfig(destination=table_id)
sql = """
SELECT
name,
SUM(number) AS total
FROM
`bigquery-public-data.usa_names.usa_1910_2013`
GROUP BY
name
ORDER BY
total DESC
LIMIT
10;
"""

# Start the query, passing in the extra configuration.
query_job = client.query(sql, job_config=job_config) # Make an API request.
query_job.result() # Wait for the job to complete.

print(f"Query results loaded to the table {table_id}")

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="A script to create BigQuery query job.")
parser.add_argument("-p","--project_id", type=str,help="GCP project id")
parser.add_argument("--dataset_name", type=str,help="Dataset name to store the table in")
parser.add_argument("--dataset_location",type=str, default="US")
parser.add_argument("--table_name", type=str,help="Table name to write the query results to")
args = parser.parse_args()
main(args)
23 changes: 23 additions & 0 deletions bigquery/read/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
cachetools==5.3.3
certifi==2024.2.2
charset-normalizer==3.3.2
google-api-core==2.19.0
google-auth==2.29.0
google-cloud-bigquery==3.23.1
google-cloud-core==2.4.1
google-crc32c==1.5.0
google-resumable-media==2.7.0
googleapis-common-protos==1.63.0
grpcio==1.64.0
grpcio-status==1.62.2
idna==3.7
packaging==24.0
proto-plus==1.23.0
protobuf==4.25.3
pyasn1==0.6.0
pyasn1_modules==0.4.0
python-dateutil==2.9.0.post0
requests==2.32.3
rsa==4.9
six==1.16.0
urllib3==2.2.1
14 changes: 14 additions & 0 deletions bigquery/read/vcpkg.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "gcp-cpp-samples-bigquery-read",
"version-string": "unversioned",
"homepage": "https://github.com/GoogleCloudPlatform/cpp-samples/",
"description": "An example using the BigQuery Storage Read API",
"dependencies": [
{
"name": "google-cloud-cpp",
"default-features": false,
"features": ["bigquery"]
},
"arrow"
]
}

0 comments on commit 3a9a144

Please sign in to comment.