Skip to content

Commit

Permalink
impl(bigquery/read): add arrow bq read example (#335)
Browse files Browse the repository at this point in the history
* impl(bigquery/read): add arrow bq read example

* address comments

* fix formatting

* fix formatting again

* add copyright to py file
  • Loading branch information
alevenberg authored May 31, 2024
1 parent cbed350 commit 2c5328b
Show file tree
Hide file tree
Showing 6 changed files with 444 additions and 0 deletions.
30 changes: 30 additions & 0 deletions bigquery/read/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# ~~~
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ~~~

cmake_minimum_required(VERSION 3.20)
set(CMAKE_CXX_STANDARD 20)

# Define the project name and where to report bugs.
set(PACKAGE_BUGREPORT
"https://github.com/GoogleCloudPlatform/cpp-samples/issues")
project(cpp-samples-bigquery-read CXX)

find_package(google_cloud_cpp_bigquery REQUIRED)
find_package(Arrow REQUIRED)

add_executable(arrow_read arrow_read.cc)
target_link_libraries(arrow_read PRIVATE google-cloud-cpp::bigquery
Arrow::arrow_static)
113 changes: 113 additions & 0 deletions bigquery/read/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Using BigQuery Storage Read

Cloud BigQuery is a data platform that allows users to easily create, manage,
share, and query data using SQL. When you want to access your data, you can read
directly from a table. However, if you want to transform the data in a table by
mapping, filtering, or joining, you need to first make a query. When you make a
query, you can specify a table to store the results. Then you can start a read
session for the table via the BigQuery Storage library and read the rows from
the table.

This example shows how to create a query job using the BigQuery v2 Python API,
and then read the data from the table using the BigQuery Storage C++ API.

If you are not familiar with the BigQuery v2 API or the BigQuery Storage Read
API, we recommend you first read the [API overview] before starting this guide.

## Pre-requisites

You are going to need a Google Cloud project to host the BigQuery dataset and
table used in this example. You will need to install and configure the BigQuery
CLI tool. Follow the [Google Cloud CLI install][install-sdk] instructions, and
then the [quickstart][bigquery cli tool] for the BigQuery CLI tool.

Verify the CLI is working using a simple command to list the active project:

```shell
bq show
```

### Creating the query job

The following script uses the BigQuery v2 python client to create a dataset (if
it does not already exist) and a query job.

```
python3 -m venv env
source env/bin/activate
pip3 install -r requirements.txt
python3 create_query_job.py --project_id [PROJECT-ID] --dataset_name usa_names --table_name top10_names
```

## Compiling the Example

This project uses `vcpkg` to install its dependencies. Clone `vcpkg` in your
`$HOME`:

```shell
git clone -C $HOME https://github.com/microsoft/vcpkg.git
```

Install the typical development tools, on Ubuntu you would use:

```shell
apt update && apt install -y build-essential cmake git ninja-build pkg-config g++ curl tar zip unzip
```

In this directory compile the dependencies and the code, this can take as long
as an hour, depending on the performance of your workstation:

```shell
cd cpp-samples/bigquery/read
cmake -S . -B .build -DCMAKE_BUILD_TYPE=Release \
-DCMAKE_TOOLCHAIN_FILE=$HOME/vcpkg/scripts/buildsystems/vcpkg.cmake
cmake --build .build
```

## Run the sample

Run the example, replace the `[PROJECT ID]` placeholder with the id of your
project:

```shell
.build/arrow_read [PROJECT ID] [DATASET_NAME] [TABLE_NAME]
```

```shell
.build/arrow_read [PROJECT ID] usa_names top10_names
```

## Output

Your output should look like the following:

```
Schema is:
name: string
total: int64
name total
Row 0: James 4942431
Row 1: John 4834422
Row 2: Robert 4718787
Row 3: Michael 4297230
Row 4: William 3822209
Row 5: Mary 3737679
Row 6: David 3549801
Row 7: Richard 2531924
Row 8: Joseph 2472917
Row 9: Charles 2244693
Read 1 record batch(es) and 10 total row(s) from table: projects/[PROJECT-ID]/datasets/usa_names/tables/top10_names
```

## Cleanup

Remove the table and dataset:

```shell
bq rm -f usa_names.top10
bq rm -f usa_names
```

[api overview]: https://cloud.google.com/bigquery/docs/reference/storage
[bigquery cli tool]: https://cloud.google.com/bigquery/docs/bq-command-line-tool
[install-sdk]: https://cloud.google.com/sdk/docs/install-sdk
183 changes: 183 additions & 0 deletions bigquery/read/arrow_read.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "google/cloud/bigquery/storage/v1/bigquery_read_client.h"
#include "google/cloud/project.h"
#include <arrow/api.h>
#include <arrow/array/data.h>
#include <arrow/io/memory.h>
#include <arrow/ipc/api.h>
#include <arrow/record_batch.h>
#include <arrow/status.h>
#include <format>
#include <iostream>

namespace {

std::shared_ptr<arrow::Schema> GetArrowSchema(
::google::cloud::bigquery::storage::v1::ArrowSchema const& schema_in) {
std::shared_ptr<arrow::Buffer> buffer =
std::make_shared<arrow::Buffer>(schema_in.serialized_schema());
arrow::io::BufferReader buffer_reader(buffer);
arrow::ipc::DictionaryMemo dictionary_memo;
auto result = arrow::ipc::ReadSchema(&buffer_reader, &dictionary_memo);
if (!result.ok()) {
std::cout << "Unable to parse schema\n";
throw result.status();
}
std::shared_ptr<arrow::Schema> schema = result.ValueOrDie();
std::cout << std::format("Schema is:\n {}\n", schema->ToString());
return schema;
}

std::shared_ptr<arrow::RecordBatch> GetArrowRecordBatch(
::google::cloud::bigquery::storage::v1::ArrowRecordBatch const&
record_batch_in,
std::shared_ptr<arrow::Schema> schema) {
std::shared_ptr<arrow::Buffer> buffer = std::make_shared<arrow::Buffer>(
record_batch_in.serialized_record_batch());
arrow::io::BufferReader buffer_reader(buffer);
arrow::ipc::DictionaryMemo dictionary_memo;
arrow::ipc::IpcReadOptions read_options;
auto result = arrow::ipc::ReadRecordBatch(schema, &dictionary_memo,
read_options, &buffer_reader);
if (!result.ok()) {
std::cout << "Unable to parse record batch\n";
throw result.status();
}
std::shared_ptr<arrow::RecordBatch> record_batch = result.ValueOrDie();
return record_batch;
}

void PrintColumnNames(std::shared_ptr<arrow::RecordBatch> record_batch) {
// Print each column name for the record batch.
std::cout << std::setfill(' ') << std::setw(7) << "";
for (std::int64_t col = 0; col < record_batch->num_columns(); ++col) {
std::cout << std::left << std::setw(16) << record_batch->column_name(col);
}
std::cout << "\n";
}

void ProcessRecordBatch(std::shared_ptr<arrow::Schema> schema,
std::shared_ptr<arrow::RecordBatch> record_batch,
std::int64_t num_rows) {
// If you want to see what the result looks like without parsing the
// datatypes, use `record_batch->ToString()` for quick debugging.
// Note: you might need to adjust the formatting depending on how big the data
// in your table is.
for (std::int64_t row = 0; row < record_batch->num_rows(); ++row) {
std::cout << std::format("Row {}: ", row + num_rows);

for (std::int64_t col = 0; col < record_batch->num_columns(); ++col) {
std::shared_ptr<arrow::Array> column = record_batch->column(col);
arrow::Result<std::shared_ptr<arrow::Scalar> > result =
column->GetScalar(row);
if (!result.ok()) {
std::cout << "Unable to parse scalar\n";
throw result.status();
}

std::shared_ptr<arrow::Scalar> scalar = result.ValueOrDie();
switch (scalar->type->id()) {
case arrow::Type::INT64:
std::cout
<< std::left << std::setw(15)
<< std::dynamic_pointer_cast<arrow::Int64Scalar>(scalar)->value
<< " ";
break;
case arrow::Type::STRING:
std::cout
<< std::left << std::setw(15)
<< std::dynamic_pointer_cast<arrow::StringScalar>(scalar)->view()
<< " ";
break;
// Depending on the table you are reading, you might need to add cases
// for other datatypes here. The schema will tell you what datatypes
// need to be handled.
default:
std::cout << std::left << std::setw(15) << "UNDEFINED ";
<< " ";
}
}
std::cout << "\n";
}
}

} // namespace

int main(int argc, char* argv[]) try {
if (argc != 4) {
std::cerr << "Usage: " << argv[0]
<< " <project-id> <dataset-name> <table-name>\n";
return 1;
}

std::string const project_id = argv[1];
std::string const dataset_name = argv[2];
std::string const table_name = argv[3];

std::string const table_id = "projects/" + project_id + "/datasets/" +
dataset_name + "/tables/" + table_name;

// Create a namespace alias to make the code easier to read.
namespace bigquery_storage = ::google::cloud::bigquery_storage_v1;
constexpr int kMaxReadStreams = 1;
// Create the ReadSession.
auto client = bigquery_storage::BigQueryReadClient(
bigquery_storage::MakeBigQueryReadConnection());
::google::cloud::bigquery::storage::v1::ReadSession read_session;
read_session.set_data_format(
google::cloud::bigquery::storage::v1::DataFormat::ARROW);
read_session.set_table(table_id);
auto session =
client.CreateReadSession(google::cloud::Project(project_id).FullName(),
read_session, kMaxReadStreams);
if (!session) throw std::move(session).status();

// Get schema.
std::shared_ptr<arrow::Schema> schema =
GetArrowSchema(session->arrow_schema());

// Read rows from the ReadSession.
constexpr int kRowOffset = 0;
auto read_rows = client.ReadRows(session->streams(0).name(), kRowOffset);

std::int64_t num_rows = 0;
std::int64_t record_batch_count = 0;
for (auto const& read_rows_response : read_rows) {
if (read_rows_response.ok()) {
std::shared_ptr<arrow::RecordBatch> record_batch =
GetArrowRecordBatch(read_rows_response->arrow_record_batch(), schema);

if (record_batch_count == 0) {
PrintColumnNames(record_batch);
}

ProcessRecordBatch(schema, record_batch, num_rows);
num_rows += row->row_count();
++record_batch_count;
}
}

std::cout << std::format(
"Read {} record batch(es) and {} total row(s) from table: {}\n",
record_batch_count, num_rows, table_id);
return 0;
} catch (google::cloud::Status const& status) {
std::cerr << "google::cloud::Status thrown: " << status << "\n";
return 1;
} catch (arrow::Status const& status) {
std::cerr << "arrow::Status thrown: " << status << "\n";
return 1;
}
Loading

0 comments on commit 2c5328b

Please sign in to comment.