Skip to content

Commit

Permalink
feat!: GCS to BigTable catalog from GCS bucket (#1026)
Browse files Browse the repository at this point in the history
* feat!: GCS to BigTable catalog from GCS bucket

* fix: provide google cloud storage package dependency

* fix: test case

* fix: test case

* fix: test case

* fix: simplify test cases
  • Loading branch information
rajc242 authored Dec 2, 2024
1 parent 6af6cc4 commit f106b7c
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 112 deletions.
89 changes: 71 additions & 18 deletions python/.ci/Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,76 @@ pipeline {
steps{
retry(count: stageRetryCount) {
sh '''
cat > /tmp/cities.json << EOF
{
"table": {
"name": "cities"
},
"rowkey": "key",
"columns": {
"key": {
"cf": "rowkey",
"col": "key",
"type": "string"
},
"LatD": {
"cf": "lat",
"col": "LatD",
"type": "string"
},
"LatM": {
"cf": "lat",
"col": "LatM",
"type": "string"
},
"LatS": {
"cf": "lat",
"col": "LatS",
"type": "string"
},
"NS": {
"cf": "lat",
"col": "NS",
"type": "string"
},
"LonD": {
"cf": "lon",
"col": "LonD",
"type": "string"
},
"LonM": {
"cf": "lon",
"col": "LonM",
"type": "string"
},
"LonS": {
"cf": "lon",
"col": "LonS",
"type": "string"
},
"EW": {
"cf": "lon",
"col": "EW",
"type": "string"
},
"City": {
"cf": "place",
"col": "City",
"type": "string"
},
"State": {
"cf": "place",
"col": "State",
"type": "string"
}
}
}
EOF
cat /tmp/cities.json
gsutil cp /tmp/cities.json gs://dataproc-templates/conf/
export GCS_STAGING_LOCATION="gs://python-dataproc-templates-temp"
export SKIP_BUILD=true
export JARS="gs://spark-lib/bigtable/spark-bigtable_2.12-0.1.0.jar"
Expand All @@ -765,23 +834,7 @@ pipeline {
--gcs.bigtable.input.location="gs://dataproc-templates/data/csv/cities.csv" \
--spark.bigtable.project.id=$GCP_PROJECT \
--spark.bigtable.instance.id=$ENV_TEST_BIGTABLE_INSTANCE \
--gcs.bigtable.catalog.json=\'\'\'{
"table":{"name":"cities"},
"rowkey":"key",
"columns":{
"key":{"cf":"rowkey", "col":"key", "type":"string"},
"LatD":{"cf":"lat", "col":"LatD", "type":"string"},
"LatM":{"cf":"lat", "col":"LatM", "type":"string"},
"LatS":{"cf":"lat", "col":"LatS", "type":"string"},
"NS":{"cf":"lat", "col":"NS", "type":"string"},
"LonD":{"cf":"lon", "col":"LonD", "type":"string"},
"LonM":{"cf":"lon", "col":"LonM", "type":"string"},
"LonS":{"cf":"lon", "col":"LonS", "type":"string"},
"EW":{"cf":"lon", "col":"EW", "type":"string"},
"City":{"cf":"place", "col":"City", "type":"string"},
"State":{"cf":"place", "col":"State", "type":"string"}
}
}\'\'\'
--gcs.bigtable.catalog.json="gs://dataproc-templates/conf/cities.json"
'''
}
}
Expand Down
13 changes: 3 additions & 10 deletions python/dataproc_templates/gcs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ It also requires [DeltaIO dependencies](https://docs.delta.io/latest/releases.ht
* `gcs.bigquery.input.format`: Input file format (one of: avro,parquet,csv,json,delta)
* `spark.bigtable.project.id`: GCP project where BigTable instance is running
* `spark.bigtable.instance.id`: BigTable instance id
* `gcs.bigtable.catalog.json`: BigTable catalog inline json
* `gcs.bigtable.catalog.json`: BigTable catalog json file GCS path
#### Optional Arguments
* `gcs.bigtable.input.chartoescapequoteescaping`: Sets a single character used for escaping the escape for the quote character. The default value is escape character when escape and quote characters are different, \0 otherwise
* `gcs.bigtable.input.columnnameofcorruptrecord`: Allows renaming the new field having malformed string created by PERMISSIVE mode
Expand Down Expand Up @@ -373,7 +373,7 @@ options:
--spark.bigtable.instance.id SPARK.BIGTABLE.INSTANCE.ID
BigTable instance id
--gcs.bigtable.catalog.json GCS.BT.CATALOG.JSON
BigTable catalog inline json
BigTable catalog json file GCS path
```
## Example submission
Expand All @@ -392,14 +392,7 @@ export SPARK_PROPERTIES="spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36"
--gcs.bigtable.input.header="false" \
--spark.bigtable.project.id="<GCP_PROJECT>" \
--spark.bigtable.instance.id="<BIGTABLE_INSTANCE_ID>" \
--gcs.bigtable.catalog.json='''{
"table":{"name":"my_table"},
"rowkey":"key",
"columns":{
"key":{"cf":"rowkey", "col":"key", "type":"string"},
"name":{"cf":"cf", "col":"name", "type":"string"}
}
}'''
--gcs.bigtable.catalog.json="<gs://bucket/path>"
```
Expand Down
13 changes: 11 additions & 2 deletions python/dataproc_templates/gcs/gcs_to_bigtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dataproc_templates.util.argument_parsing import add_spark_options
from dataproc_templates.util.dataframe_reader_wrappers import ingest_dataframe_from_cloud_storage
import dataproc_templates.util.template_constants as constants
from google.cloud import storage


__all__ = ['GCSToBigTableTemplate']
Expand Down Expand Up @@ -87,7 +88,7 @@ def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]:
f'--{constants.GCS_BT_CATALOG_JSON}',
dest=constants.GCS_BT_CATALOG_JSON,
required=True,
help='BigTable catalog inline json'
help='BigTable catalog json stored file GCS location'
)

known_args: argparse.Namespace
Expand All @@ -102,7 +103,6 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None:
# Arguments
input_location: str = args[constants.GCS_BT_INPUT_LOCATION]
input_format: str = args[constants.GCS_BT_INPUT_FORMAT]
catalog: str = ''.join(args[constants.GCS_BT_CATALOG_JSON].split())
project_id: str = args[constants.GCS_BT_PROJECT_ID]
instance_id: str = args[constants.GCS_BT_INSTANCE_ID]
create_new_table: bool = args[constants.GCS_BT_CREATE_NEW_TABLE]
Expand All @@ -114,6 +114,15 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None:
f"{pprint.pformat(args)}"
)

# Read Catalog From GCS
storage_client = storage.Client()
bucket = storage_client.bucket(args[constants.GCS_BT_CATALOG_JSON].split('/')[2])
blob = bucket.blob('/'.join(args[constants.GCS_BT_CATALOG_JSON].split('/')[3:]))
catalog = blob.download_as_text()

logger.info(f"Catalog: {catalog}")


# Read
input_data = ingest_dataframe_from_cloud_storage(
spark, args, input_location, input_format, "gcs.bigtable.input."
Expand Down
1 change: 1 addition & 0 deletions python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ pyarrow==16.1.0
pandas==2.0.3
numpy==1.24.4
pyspark==3.5.1
google-cloud-storage==2.18.2
84 changes: 2 additions & 82 deletions python/test/gcs/test_gcs_to_bigtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,90 +35,10 @@ def test_parse_args(self):
"--gcs.bigtable.input.location=gs://test",
"--spark.bigtable.project.id=GCP_PROJECT",
"--spark.bigtable.instance.id=BIGTABLE_INSTANCE_ID",
"--gcs.bigtable.catalog.json={key:value}"])
"--gcs.bigtable.catalog.json=gs://dataproc-templates/conf/employeecatalog.json"])

assert parsed_args["gcs.bigtable.input.format"] == "parquet"
assert parsed_args["gcs.bigtable.input.location"] == "gs://test"
assert parsed_args["spark.bigtable.project.id"] == "GCP_PROJECT"
assert parsed_args["spark.bigtable.instance.id"] == "BIGTABLE_INSTANCE_ID"
assert parsed_args["gcs.bigtable.catalog.json"] == '{key:value}'

@mock.patch.object(pyspark.sql, 'SparkSession')
def test_run(self, mock_spark_session):
"""Tests GCSToBigTableTemplate runs"""

gcs_to_bigtable_template = GCSToBigTableTemplate()
mock_parsed_args = gcs_to_bigtable_template.parse_args(
["--gcs.bigtable.input.format=parquet",
"--gcs.bigtable.input.location=gs://test",
"--spark.bigtable.project.id=GCP_PROJECT",
"--spark.bigtable.instance.id=BIGTABLE_INSTANCE_ID",
"--gcs.bigtable.catalog.json={key:value}"])
mock_spark_session.read.parquet.return_value = mock_spark_session.dataframe.DataFrame
gcs_to_bigtable_template.run(mock_spark_session, mock_parsed_args)

mock_spark_session.read.parquet.assert_called_once_with("gs://test")
mock_spark_session.dataframe.DataFrame.write.format. \
assert_called_once_with(constants.FORMAT_BIGTABLE)
mock_spark_session.dataframe.DataFrame.write.format().options. \
assert_called_with(catalog='{key:value}')

@mock.patch.object(pyspark.sql, 'SparkSession')
def test_run_csv1(self, mock_spark_session):
"""Tests GCSToBigTableTemplate runs with csv format"""

gcs_to_bigtable_template = GCSToBigTableTemplate()
mock_parsed_args = gcs_to_bigtable_template.parse_args(
["--gcs.bigtable.input.format=csv",
"--gcs.bigtable.input.location=gs://test",
"--gcs.bigtable.input.header=false",
"--spark.bigtable.project.id=GCP_PROJECT",
"--spark.bigtable.instance.id=BIGTABLE_INSTANCE_ID",
"--gcs.bigtable.catalog.json={key:value}"])
mock_spark_session.read.format().options().load.return_value = mock_spark_session.dataframe.DataFrame
gcs_to_bigtable_template.run(mock_spark_session, mock_parsed_args)

mock_spark_session.read.format.assert_called_with(
constants.FORMAT_CSV)
mock_spark_session.read.format().options.assert_called_with(**{
constants.CSV_HEADER: 'false',
constants.CSV_INFER_SCHEMA: 'true',
})
mock_spark_session.read.format().options().load.assert_called_once_with("gs://test")
mock_spark_session.dataframe.DataFrame.write.format. \
assert_called_once_with(constants.FORMAT_BIGTABLE)
mock_spark_session.dataframe.DataFrame.write.format().options. \
assert_called_with(catalog='{key:value}')

@mock.patch.object(pyspark.sql, 'SparkSession')
def test_run_csv2(self, mock_spark_session):
"""Tests GCSToBigTableTemplate runs with csv format and some optional csv options"""

gcs_to_bigtable_template = GCSToBigTableTemplate()
mock_parsed_args = gcs_to_bigtable_template.parse_args(
["--gcs.bigtable.input.format=csv",
"--gcs.bigtable.input.location=gs://test",
"--gcs.bigtable.input.inferschema=false",
"--gcs.bigtable.input.sep=|",
"--gcs.bigtable.input.comment=#",
"--gcs.bigtable.input.timestampntzformat=yyyy-MM-dd'T'HH:mm:ss",
"--spark.bigtable.project.id=GCP_PROJECT",
"--spark.bigtable.instance.id=BIGTABLE_INSTANCE_ID",
"--gcs.bigtable.catalog.json={key:value}"])
mock_spark_session.read.format().options().load.return_value = mock_spark_session.dataframe.DataFrame
gcs_to_bigtable_template.run(mock_spark_session, mock_parsed_args)

mock_spark_session.read.format.assert_called_with(
constants.FORMAT_CSV)
mock_spark_session.read.format().options.assert_called_with(**{
constants.CSV_HEADER: 'true',
constants.CSV_INFER_SCHEMA: 'false',
constants.CSV_SEP: "|",
constants.CSV_COMMENT: "#",
constants.CSV_TIMESTAMPNTZFORMAT: "yyyy-MM-dd'T'HH:mm:ss",
})
mock_spark_session.read.format().options().load.assert_called_once_with("gs://test")
mock_spark_session.dataframe.DataFrame.write.format. \
assert_called_once_with(constants.FORMAT_BIGTABLE)
mock_spark_session.dataframe.DataFrame.write.format().options. \
assert_called_with(catalog='{key:value}')
assert parsed_args["gcs.bigtable.catalog.json"] == 'gs://dataproc-templates/conf/employeecatalog.json'

0 comments on commit f106b7c

Please sign in to comment.