diff --git a/.github/workflows/test-linux-windows.yml b/.github/workflows/test-linux-windows.yml
index c2ec10d354..1ae62c8640 100644
--- a/.github/workflows/test-linux-windows.yml
+++ b/.github/workflows/test-linux-windows.yml
@@ -2,9 +2,9 @@ name: tests
on:
pull_request:
- branches: ["main"]
+ branches: ["main", "major-release"]
push:
- branches: ["main"]
+ branches: ["main", "major-release"]
env:
TESTING: 1
diff --git a/.github/workflows/tests-mac.yml b/.github/workflows/tests-mac.yml
index 65c5478e55..012025a520 100644
--- a/.github/workflows/tests-mac.yml
+++ b/.github/workflows/tests-mac.yml
@@ -3,9 +3,9 @@ name: tests for mac
on:
pull_request:
- branches: ["main"]
+ branches: ["main", "major-release"]
push:
- branches: ["main"]
+ branches: ["main", "major-release"]
env:
TESTING: 1
diff --git a/.gitignore b/.gitignore
index 99a87cf379..8d6520e18e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -110,6 +110,7 @@ venv.bak/
# scratch
scratch*
old!_*
+test.ipynb
# vscode
.vscode/
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index e34f0872c2..68c8866f0b 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -12,4 +12,157 @@ You can contribute by:
* [teaching and mentoring](https://www.parsonsproject.org/pub/contributing-guide#teaching-and-mentoring)
* [helping "triage" issues and review pull requests](https://www.parsonsproject.org/pub/contributing-guide#maintainer-tasks)
-If you're not sure how to get started, please ask for help! We're happy to chat and help you find the best way to get involved.
\ No newline at end of file
+We encourage folks to review existing issues before starting a new issue.
+
+* If the issue you want exists, feel free to use the *thumbs up* emoji to up vote the issue.
+* If you have additional documentation or context that would be helpful, please add using comments.
+* If you have code snippets, but don’t have time to do the full write, please add to the issue!
+
+We use labels to help us classify issues. They include:
+* **bug** - something in Parsons isn’t working the way it should
+* **enhancement** - new feature or request (e.g. a new API connector)
+* **good first issue** - an issue that would be good for someone who is new to Parsons
+
+## Contributing Code to Parsons
+
+Generally, code contributions to Parsons will be either enhancements or bug requests (or contributions of [sample code](#sample-code), discussed below). All changes to the repository are made [via pull requests](#submitting-a-pull-request).
+
+If you would like to contribute code to Parsons, please review the issues in the repository and find one you would like to work on. If you are new to Parsons or to open source projects, look for issues with the [**good first issue**](https://github.com/move-coop/parsons/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22) label. Once you have found your issue, please add a comment to the issue that lets others know that you are interested in working on it. If you're having trouble finding something to work on, please ask us for help on Slack.
+
+The bulk of Parsons is made up of Connector classes, which are Python classes that help move data in and out of third party services. When you feel ready, you may want to contribute by [adding a new Connector class](https://move-coop.github.io/parsons/html/build_a_connector.html).
+
+### Making Changes to Parsons
+
+To make code changes to Parsons, you'll need to set up your development environment, make your changes, and then submit a pull request.
+
+To set up your development environment:
+
+* Fork the Parsons project using [the “Fork” button in GitHub](https://guides.github.com/activities/forking/)
+* Clone your fork to your local computer
+* Set up a [virtual environment](#virtual-environments)
+* Install the [dependencies](#installing-dependencies)
+* Check that everything's working by [running the unit tests](#unit-tests) and the [linter](#linting)
+
+Now it's time to make your changes. We suggest taking a quick look at our [coding conventions](#coding-conventions) - it'll make the review process easier down the line. In addition to any code changes, make sure to update the documentation and the unit tests if necessary. Not sure if your changes require test or documentation updates? Just ask in Slack or through a comment on the relevant issue. When you're done, make sure to run the [unit tests](#unit-tests) and the [linter](#linting) again.
+
+Finally, you'll want to [submit a pull request](#submitting-a-pull-request). And that's it!
+
+#### Virtual Environments
+
+If required dependencies conflict with packages or modules you need for other projects, you can create and use a [virtual environment](https://docs.python.org/3/library/venv.html).
+
+```
+python3 -m venv .venv # Creates a virtual environment in the .venv folder
+source .venv/bin/activate # Activate in Unix or MacOS
+.venv/Scripts/activate.bat # Activate in Windows
+```
+
+#### Installing Dependencies
+
+Before running or testing your code changes, be sure to install all of the required Python libraries that Parsons depends on.
+
+From the root of the parsons repository, use the run the following command:
+
+```bash
+> pip install -r requirements.txt
+```
+
+#### Unit Tests
+
+When contributing code, we ask you to add to tests that can be used to verify that the code is working as expected. All of our unit tests are located in the `test/` folder at the root of the repository.
+
+We use the pytest tool to run our suite of automated unit tests. The pytest command line tool is installed as part of the Parsons dependencies.
+
+To run all the entire suite of unit tests, execute the following command:
+
+```bash
+> pytest -rf test/
+```
+
+Once the pytest tool has finished running all of the tests, it will output details around any errors or test failures it encountered. If no failures are identified, then you are good to go!
+
+**Note:*** Some tests are written to call out to external API’s, and will be skipped as part of standard unit testing. This is expected.
+
+See the [pytest documentation](https://docs.pytest.org/en/latest/contents.html) for more info and many more options.
+
+#### Linting
+
+We use the [black](https://github.com/psf/black) and [flake8](http://flake8.pycqa.org/en/latest/) tools to [lint](https://en.wikipedia.org/wiki/Lint_(software)) the code in the repository to make sure it matches our preferred style. Both tools are installed as part of the Parsons dependencies.
+
+Run the following commands from the root of the Parsons repository to lint your code changes:
+
+```bash
+> flake8 --max-line-length=100 --extend-ignore=E203,W503 parsons
+> black parsons
+```
+
+Pre-commit hooks are available to enforce black and isort formatting on
+commit. You can also set up your IDE to reformat using black and/or isort on
+save.
+
+To set up the pre-commit hooks, install pre-commit with `pip install
+pre-commit`, and then run `pre-commit install`.
+
+#### Coding Conventions
+
+The following is a list of best practices to consider when writing code for the Parsons project:
+
+* Each tool connector should be its own unique class (e.g. ActionKit, VAN) in its own Python package. Use existing connectors as examples when deciding how to layout your code.
+
+* Methods should be named using a verb_noun structure, such as `get_activist()` or `update_event()`.
+
+* Methods should reflect the vocabulary utilized by the original tool where possible to mantain transparency. For example, Google Cloud Storage refers to file like objects as blobs. The methods are called `get_blob()` rather than `get_file()`.
+
+* Methods that can work with arbitrarily large data (e.g. database or API queries) should use of Parson Tables to hold the data instead of standard Python collections (e.g. lists, dicts).
+
+* You should avoid abbreviations for method names and variable names where possible.
+
+* Inline comments explaining complex codes and methods are appreciated.
+
+* Capitalize the word Parsons for consistency where possible, especially in documentation.
+
+If you are building a new connector or extending an existing connector, there are more best practices in the [How to Build a Connector](https://move-coop.github.io/parsons/html/build_a_connector.html) documentation.
+
+## Documentation
+
+Parsons documentation is built using the Python Sphinx tool. Sphinx uses the `docs/*.rst` files in the repository to create the documentation.
+
+We have a [documentation label](https://github.com/move-coop/parsons/issues?q=is%3Aissue+is%3Aopen+label%3Adocumentation) that may help you find good docs issues to work on. If you are adding a new connector, you will need to add a reference to the connector to one of the .rst files. Please use the existing documentation as an example.
+
+When editing documentation, make sure you are editing the source files (with .md or .rst extension) and not the build files (.html extension).
+
+The workflow for documentation changes is a bit simpler than for code changes:
+
+* Fork the Parsons project using [the “Fork” button in GitHub](https://guides.github.com/activities/forking/)
+* Clone your fork to your local computer
+* Change into the `docs` folder and install the requirements with `pip install -r requirements.txt` (you may want to set up a [virtual environment](#virtual-environments) first)
+* Make your changes and re-build the docs by running `make html`. (Note: this builds only a single version of the docs, from the current files. To create docs with multiple versions like our publicly hosted docs, run `make deploy_docs`.)
+* Open these files in your web browser to check that they look as you expect.
+* [Submit a pull request](#submitting-a-pull-request)
+
+When you make documentation changes, you only need to track the source files with git. The docs built by the html folder should not be included.
+
+You should not need to worry about the unit tests or the linter if you are making documentation changes only.
+
+## Contributing Sample Code
+
+One important way to contribute to the Parsons project is to submit sample code that provides recipes and patterns for how to use the Parsons library.
+
+We have a folder called `useful_resources/` in the root of the repository. If you have scripts that incorporate Parsons, we encourage you to add them there!
+
+The workflow for adding sample code is:
+
+* Fork the Parsons project using [the “Fork” button in GitHub](https://guides.github.com/activities/forking/)
+* Clone your fork to your local computer
+* Add your sample code into the `useful_resources/` folder
+* [Submit a pull request](#submitting-a-pull-request)
+
+You should not need to worry about the unit tests or the linter if you are only adding sample code.
+
+## Submitting a Pull Request
+
+To submit a pull request, follow [these instructions to create a Pull Request from your fork](https://help.github.com/en/github/collaborating-with-issues-and-pull-requests/creating-a-pull-request-from-a-fork) back to the original Parsons repository.
+
+The Parsons team will review your pull request and provide feedback. Please feel free to ping us if no one's responded to your Pull Request after a few days. We may not be able to review it right away, but we should be able to tell you when we'll get to it.
+
+Once your pull request has been approved, the Parsons team will merge your changes into the Parsons repository
diff --git a/Dockerfile b/Dockerfile
index 7fdd250950..3abd763c4e 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -42,4 +42,4 @@ RUN python setup.py develop
RUN mkdir /app
WORKDIR /app
# Useful for importing modules that are associated with your python scripts:
-env PYTHONPATH=.:/app
+ENV PYTHONPATH=.:/app
diff --git a/docs/airtable.rst b/docs/airtable.rst
index 5046ff7770..fd02dca8b8 100644
--- a/docs/airtable.rst
+++ b/docs/airtable.rst
@@ -6,7 +6,7 @@ Overview
********
The Airtable class allows you to interact with an `Airtable `_ base. In order to use this class
-you must generate an Airtable API Key which can be found in your Airtable `account settings `_.
+you must generate an Airtable personal access token which can be found in your Airtable `settings `_.
.. note::
Finding The Base Key
@@ -18,20 +18,20 @@ you must generate an Airtable API Key which can be found in your Airtable `accou
**********
QuickStart
**********
-To instantiate the Airtable class, you can either store your Airtable API
-``AIRTABLE_API_KEY`` as an environmental variable or pass in your api key
+To instantiate the Airtable class, you can either store your Airtable personal access token
+``AIRTABLE_PERSONAL_ACCESS_TOKEN`` as an environmental variable or pass in your personal access token
as an argument. You also need to pass in the base key and table name.
.. code-block:: python
from parsons import Airtable
- # First approach: Use API credentials via environmental variables and pass
+ # First approach: Use personal access token via environmental variable and pass
# the base key and the table as arguments.
at = Airtable(base_key, 'table01')
- # Second approach: Pass API credentials, base key and table name as arguments.
- at = Airtable(base_key, 'table01', api_key='MYFAKEKEY')
+ # Second approach: Pass personal access token, base key and table name as arguments.
+ at = Airtable(base_key, 'table01', personal_access_token='MYFAKETOKEN')
You can then call various endpoints:
diff --git a/docs/google.rst b/docs/google.rst
index 84b4060934..a1428875ef 100644
--- a/docs/google.rst
+++ b/docs/google.rst
@@ -68,7 +68,7 @@ Google Cloud projects.
Quickstart
==========
-To instantiate the GoogleBigQuery class, you can pass the constructor a string containing either the name of the Google service account credentials file or a JSON string encoding those credentials. Alternatively, you can set the environment variable ``GOOGLE_APPLICATION_CREDENTIALS`` to be either of those strings and call the constructor without that argument.
+To instantiate the `GoogleBigQuery` class, you can pass the constructor a string containing either the name of the Google service account credentials file or a JSON string encoding those credentials. Alternatively, you can set the environment variable ``GOOGLE_APPLICATION_CREDENTIALS`` to be either of those strings and call the constructor without that argument.
.. code-block:: python
@@ -78,7 +78,7 @@ To instantiate the GoogleBigQuery class, you can pass the constructor a string c
# be the file name or a JSON encoding of the credentials.
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'google_credentials_file.json'
- big_query = GoogleBigQuery()
+ bigquery = GoogleBigQuery()
Alternatively, you can pass the credentials in as an argument. In the example below, we also specify the project.
@@ -86,8 +86,10 @@ Alternatively, you can pass the credentials in as an argument. In the example be
# Project in which we're working
project = 'parsons-test'
- big_query = GoogleBigQuery(app_creds='google_credentials_file.json',
- project=project)
+ bigquery = GoogleBigQuery(
+ app_creds='google_credentials_file.json',
+ project=project
+ )
We can now upload/query data.
@@ -98,7 +100,7 @@ We can now upload/query data.
# Table name should be project.dataset.table, or dataset.table, if
# working with the default project
- table_name = project + '.' + dataset + '.' + table
+ table_name = f"`{project}.{dataset}.{table}`"
# Must be pre-existing bucket. Create via GoogleCloudStorage() or
# at https://console.cloud.google.com/storage/create-bucket. May be
@@ -107,7 +109,7 @@ We can now upload/query data.
gcs_temp_bucket = 'parsons_bucket'
# Create dataset if it doesn't already exist
- big_query.client.create_dataset(dataset=dataset, exists_ok=True)
+ bigquery.client.create_dataset(dataset=dataset, exists_ok=True)
parsons_table = Table([{'name':'Bob', 'party':'D'},
{'name':'Jane', 'party':'D'},
@@ -115,15 +117,23 @@ We can now upload/query data.
{'name':'Bill', 'party':'I'}])
# Copy table in to create new BigQuery table
- big_query.copy(table_obj=parsons_table,
- table_name=table_name,
- tmp_gcs_bucket=gcs_temp_bucket)
+ bigquery.copy(
+ table_obj=parsons_table,
+ table_name=table_name,
+ tmp_gcs_bucket=gcs_temp_bucket
+ )
# Select from project.dataset.table
- big_query.query(f'select name from {table_name} where party = "D"')
+ bigquery.query(f'select name from {table_name} where party = "D"')
+
+ # Query with parameters
+ bigquery.query(
+ f"select name from {table_name} where party = %s",
+ parameters=["D"]
+ )
# Delete the table when we're done
- big_query.client.delete_table(table=table_name)
+ bigquery.client.delete_table(table=table_name)
===
API
diff --git a/parsons/airtable/airtable.py b/parsons/airtable/airtable.py
index 299391033f..d6d7baf12c 100644
--- a/parsons/airtable/airtable.py
+++ b/parsons/airtable/airtable.py
@@ -15,14 +15,17 @@ class Airtable(object):
table_name: str
The name of the table in the base. The table name is the equivilant of the sheet name
in Excel or GoogleDocs.
- api_key: str
- The Airtable provided api key. Not required if ``AIRTABLE_API_KEY`` env variable set.
+ personal_access_token: str
+ The Airtable personal access token. Not required if ``AIRTABLE_PERSONAL_ACCESS_TOKEN``
+ env variable set.
"""
- def __init__(self, base_key, table_name, api_key=None):
+ def __init__(self, base_key, table_name, personal_access_token=None):
- self.api_key = check_env.check("AIRTABLE_API_KEY", api_key)
- self.client = client(base_key, table_name, self.api_key)
+ self.personal_access_token = check_env.check(
+ "AIRTABLE_PERSONAL_ACCESS_TOKEN", personal_access_token
+ )
+ self.client = client(base_key, table_name, self.personal_access_token)
def get_record(self, record_id):
"""
diff --git a/parsons/databases/database/constants.py b/parsons/databases/database/constants.py
index 8935a8734d..1b78ffb0af 100644
--- a/parsons/databases/database/constants.py
+++ b/parsons/databases/database/constants.py
@@ -153,11 +153,7 @@
VARCHAR = "varchar"
FLOAT = "float"
-
-DO_PARSE_BOOLS = False
BOOL = "bool"
-TRUE_VALS = ("TRUE", "T", "YES", "Y", "1", 1)
-FALSE_VALS = ("FALSE", "F", "NO", "N", "0", 0)
# The following values are the minimum and maximum values for MySQL int
# types. https://dev.mysql.com/doc/refman/8.0/en/integer-types.html
diff --git a/parsons/databases/database/database.py b/parsons/databases/database/database.py
index 9f369d43c9..bfec754241 100644
--- a/parsons/databases/database/database.py
+++ b/parsons/databases/database/database.py
@@ -1,5 +1,7 @@
import parsons.databases.database.constants as consts
-import ast
+import logging
+
+logger = logging.getLogger(__name__)
class DatabaseCreateStatement:
@@ -17,11 +19,7 @@ def __init__(self):
self.BIGINT = consts.BIGINT
self.FLOAT = consts.FLOAT
- # Added for backwards compatability
- self.DO_PARSE_BOOLS = consts.DO_PARSE_BOOLS
self.BOOL = consts.BOOL
- self.TRUE_VALS = consts.TRUE_VALS
- self.FALSE_VALS = consts.FALSE_VALS
self.VARCHAR = consts.VARCHAR
self.RESERVED_WORDS = consts.RESERVED_WORDS
@@ -117,29 +115,6 @@ def is_valid_sql_num(self, val):
except (TypeError, ValueError):
return False
- def is_sql_bool(self, val):
- """Check whether val is a valid sql boolean.
-
- When inserting data into databases, different values can be accepted
- as boolean types. For excample, ``False``, ``'FALSE'``, ``1``.
-
- `Args`:
- val: any
- The value to check.
- `Returns`:
- bool
- Whether or not the value is a valid sql boolean.
- """
- if not self.DO_PARSE_BOOLS:
- return
-
- if isinstance(val, bool) or (
- type(val) in (int, str)
- and str(val).upper() in self.TRUE_VALS + self.FALSE_VALS
- ):
- return True
- return False
-
def detect_data_type(self, value, cmp_type=None):
"""Detect the higher of value's type cmp_type.
@@ -161,64 +136,45 @@ def detect_data_type(self, value, cmp_type=None):
# Stop if the compare type is already a varchar
# varchar is the highest data type.
if cmp_type == self.VARCHAR:
- return cmp_type
-
- # Attempt to evaluate value as a literal (e.g. '1' => 1, ) If the value
- # is just a string, is None, or is empty, it will raise an error. These
- # should be considered varchars.
- # E.g.
- # "" => SyntaxError
- # "anystring" => ValueError
- try:
- val_lit = ast.literal_eval(str(value))
- except (SyntaxError, ValueError):
- if self.is_sql_bool(value):
- return self.BOOL
- return self.VARCHAR
-
- # Exit early if it's None
- # is_valid_sql_num(None) == False
- # instead of defaulting to varchar (which is the next test)
- # return the compare type
- if val_lit is None:
- return cmp_type
+ result = cmp_type
+
+ elif isinstance(value, bool):
+ result = self.BOOL
+
+ elif value is None:
+ result = cmp_type
# Make sure that it is a valid integer
# Python accepts 100_000 as a valid form of 100000,
# however a sql engine may throw an error
- if not self.is_valid_sql_num(value):
- if self.is_sql_bool(val_lit) and cmp_type != self.VARCHAR:
- return self.BOOL
- else:
- return self.VARCHAR
+ elif not self.is_valid_sql_num(value):
+ result = self.VARCHAR
- if self.is_sql_bool(val_lit) and cmp_type not in self.INT_TYPES + [self.FLOAT]:
- return self.BOOL
-
- type_lit = type(val_lit)
-
- # If a float, stop here
- # float is highest after varchar
- if type_lit == float or cmp_type == self.FLOAT:
- return self.FLOAT
+ elif isinstance(value, float) or cmp_type == self.FLOAT:
+ result = self.FLOAT
# The value is very likely an int
# let's get its size
# If the compare types are empty and use the types of the current value
- if type_lit == int and cmp_type in (self.INT_TYPES + [None, "", self.BOOL]):
-
+ elif isinstance(value, int) and cmp_type in (
+ self.INT_TYPES + [None, "", self.BOOL]
+ ):
# Use smallest possible int type above TINYINT
- if self.SMALLINT_MIN < val_lit < self.SMALLINT_MAX:
- return self.get_bigger_int(self.SMALLINT, cmp_type)
- elif self.MEDIUMINT_MIN < val_lit < self.MEDIUMINT_MAX:
- return self.get_bigger_int(self.MEDIUMINT, cmp_type)
- elif self.INT_MIN < val_lit < self.INT_MAX:
- return self.get_bigger_int(self.INT, cmp_type)
+ if self.SMALLINT_MIN < value < self.SMALLINT_MAX:
+ result = self.get_bigger_int(self.SMALLINT, cmp_type)
+ elif self.MEDIUMINT_MIN < value < self.MEDIUMINT_MAX:
+ result = self.get_bigger_int(self.MEDIUMINT, cmp_type)
+ elif self.INT_MIN < value < self.INT_MAX:
+ result = self.get_bigger_int(self.INT, cmp_type)
else:
- return self.BIGINT
+ result = self.BIGINT
+
+ else:
+ # Need to determine who makes it all the way down here
+ logger.debug(f"Unexpected object type: {type(value)}")
+ result = cmp_type
- # Need to determine who makes it all the way down here
- return cmp_type
+ return result
def format_column(self, col, index="", replace_chars=None, col_prefix="_"):
"""Format the column to meet database contraints.
diff --git a/parsons/databases/discover_database.py b/parsons/databases/discover_database.py
index 1d51a37112..729afa5cb0 100644
--- a/parsons/databases/discover_database.py
+++ b/parsons/databases/discover_database.py
@@ -5,7 +5,7 @@
from parsons.databases.redshift import Redshift
from parsons.databases.mysql import MySQL
from parsons.databases.postgres import Postgres
-from parsons.google.google_bigquery import GoogleBigQuery
+from parsons.google.google_bigquery import GoogleBigQuery as BigQuery
def discover_database(
@@ -40,7 +40,7 @@ def discover_database(
"Redshift": Redshift,
"MySQL": MySQL,
"Postgres": Postgres,
- "GoogleBigQuery": GoogleBigQuery,
+ "GoogleBigQuery": BigQuery,
}
password_vars = {
diff --git a/parsons/databases/redshift/redshift.py b/parsons/databases/redshift/redshift.py
index 77ed0460c9..a68498579d 100644
--- a/parsons/databases/redshift/redshift.py
+++ b/parsons/databases/redshift/redshift.py
@@ -724,7 +724,7 @@ def unload(
sql: str
The SQL string to execute to generate the data to unload.
- buckey: str
+ bucket: str
The destination S3 bucket
key_prefix: str
The prefix of the key names that will be written
diff --git a/parsons/etl/etl.py b/parsons/etl/etl.py
index e4eef99228..f2b20e94fe 100644
--- a/parsons/etl/etl.py
+++ b/parsons/etl/etl.py
@@ -7,7 +7,6 @@
class ETL(object):
def __init__(self):
-
pass
def add_column(self, column, value=None, index=None, if_exists="fail"):
@@ -206,7 +205,6 @@ def get_column_max_width(self, column):
max_width = 0
for v in petl.values(self.table, column):
-
if len(str(v).encode("utf-8")) > max_width:
max_width = len(str(v).encode("utf-8"))
@@ -314,7 +312,6 @@ def map_columns(self, column_map, exact_match=True):
"""
for col in self.columns:
-
if not exact_match:
cleaned_col = col.lower().replace("_", "").replace(" ", "")
else:
@@ -830,7 +827,6 @@ def _prepend_dict(self, dict_obj, prepend):
new_dict = {}
for k, v in dict_obj.items():
-
new_dict[prepend + "_" + k] = v
return new_dict
diff --git a/parsons/etl/table.py b/parsons/etl/table.py
index b0a0fd42fe..1bf6acd943 100644
--- a/parsons/etl/table.py
+++ b/parsons/etl/table.py
@@ -28,13 +28,11 @@ class Table(ETL, ToFrom):
"""
def __init__(self, lst=[]):
-
self.table = None
lst_type = type(lst)
if lst_type in [list, tuple]:
-
# Check for empty list
if not len(lst):
self.table = petl.fromdicts([])
@@ -59,21 +57,16 @@ def __init__(self, lst=[]):
self._index_count = 0
def __repr__(self):
-
return repr(petl.dicts(self.table))
def __iter__(self):
-
return iter(petl.dicts(self.table))
def __getitem__(self, index):
-
if isinstance(index, int):
-
return self.row_data(index)
elif isinstance(index, str):
-
return self.column_data(index)
elif isinstance(index, slice):
@@ -81,11 +74,9 @@ def __getitem__(self, index):
return [row for row in tblslice]
else:
-
raise TypeError("You must pass a string or an index as a value.")
def __bool__(self):
-
# Try to get a single row from our table
head_one = petl.head(self.table)
diff --git a/parsons/etl/tofrom.py b/parsons/etl/tofrom.py
index 4d564659bf..c87a3cdb12 100644
--- a/parsons/etl/tofrom.py
+++ b/parsons/etl/tofrom.py
@@ -2,6 +2,7 @@
import json
import io
import gzip
+from typing import Optional
from parsons.utilities import files, zip_archive
@@ -619,8 +620,39 @@ def to_postgres(
pg = Postgres(username=username, password=password, host=host, db=db, port=port)
pg.copy(self, table_name, **copy_args)
- def to_petl(self):
+ def to_bigquery(
+ self,
+ table_name: str,
+ app_creds: Optional[str] = None,
+ project: Optional[str] = None,
+ **kwargs,
+ ):
+ """
+ Write a table to BigQuery
+ `Args`:
+ table_name: str
+ Table name to write to in BigQuery; this should be in `schema.table` format
+ app_creds: str
+ A credentials json string or a path to a json file. Not required
+ if ``GOOGLE_APPLICATION_CREDENTIALS`` env variable set.
+ project: str
+ The project which the client is acting on behalf of. If not passed
+ then will use the default inferred environment.
+ **kwargs: kwargs
+ Additional keyword arguments passed into the `.copy()` function (`if_exists`,
+ `max_errors`, etc.)
+
+ `Returns`:
+ ``None``
+ """
+
+ from parsons import GoogleBigQuery as BigQuery
+
+ bq = BigQuery(app_creds=app_creds, project=project)
+ bq.copy(self, table_name=table_name, **kwargs)
+
+ def to_petl(self):
return self.table
def to_civis(
@@ -898,6 +930,35 @@ def from_s3_csv(
return cls(petl.cat(*tbls))
+ @classmethod
+ def from_bigquery(cls, sql: str, app_creds: str = None, project: str = None):
+ """
+ Create a ``parsons table`` from a BigQuery statement.
+
+ To pull an entire BigQuery table, use a query like ``SELECT * FROM {{ table }}``.
+
+ `Args`:
+ sql: str
+ A valid SQL statement
+ app_creds: str
+ A credentials json string or a path to a json file. Not required
+ if ``GOOGLE_APPLICATION_CREDENTIALS`` env variable set.
+ project: str
+ The project which the client is acting on behalf of. If not passed
+ then will use the default inferred environment.
+ TODO - Should users be able to pass in kwargs here? For parameters?
+
+ `Returns`:
+ Parsons Table
+ See :ref:`parsons-table` for output options.
+ """
+
+ from parsons import GoogleBigQuery as BigQuery
+
+ bq = BigQuery(app_creds=app_creds, project=project)
+
+ return bq.query(sql=sql)
+
@classmethod
def from_dataframe(cls, dataframe, include_index=False):
"""
diff --git a/parsons/google/google_bigquery.py b/parsons/google/google_bigquery.py
index f10641c466..2ae3aa8337 100644
--- a/parsons/google/google_bigquery.py
+++ b/parsons/google/google_bigquery.py
@@ -1,12 +1,16 @@
import pickle
-from typing import Optional, Union
+from typing import Optional, Union, List
import uuid
+import logging
+import datetime
+import random
from google.cloud import bigquery
from google.cloud.bigquery import dbapi
from google.cloud.bigquery.job import LoadJobConfig
from google.cloud import exceptions
import petl
+from contextlib import contextmanager
from parsons.databases.table import BaseTable
from parsons.databases.database_connector import DatabaseConnector
@@ -16,6 +20,8 @@
from parsons.utilities import check_env
from parsons.utilities.files import create_temp_file
+logger = logging.getLogger(__name__)
+
BIGQUERY_TYPE_MAP = {
"str": "STRING",
"float": "FLOAT",
@@ -26,6 +32,8 @@
"datetime.time": "TIME",
"dict": "RECORD",
"NoneType": "STRING",
+ "UUID": "STRING",
+ "datetime": "DATETIME",
}
# Max number of rows that we query at a time, so we can avoid loading huge
@@ -59,6 +67,55 @@ def parse_table_name(table_name):
return parsed
+def ends_with_semicolon(query: str) -> str:
+ query = query.strip()
+ if query[-1] == ";":
+ return query
+ return query + ";"
+
+
+def map_column_headers_to_schema_field(schema_definition: list) -> list:
+ """
+ Loops through a list of dictionaries and instantiates
+ google.cloud.bigquery.SchemaField objects. Useful docs
+ from Google's API can be found here:
+ https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.schema.SchemaField
+
+ `Args`:
+ schema_definition: list
+ This function expects a list of dictionaries in the following format:
+
+ ```
+ schema_definition = [
+ {
+ "name": column_name,
+ "field_type": [INTEGER, STRING, FLOAT, etc.]
+ },
+ {
+ "name": column_name,
+ "field_type": [INTEGER, STRING, FLOAT, etc.],
+ "mode": "REQUIRED"
+ },
+ {
+ "name": column_name,
+ "field_type": [INTEGER, STRING, FLOAT, etc.],
+ "default_value_expression": CURRENT_TIMESTAMP()
+ }
+ ]
+ ```
+
+ `Returns`:
+ List of instantiated `SchemaField` objects
+ """
+
+ # TODO - Better way to test for this
+ if isinstance(schema_definition[0], bigquery.SchemaField):
+ logger.debug("User supplied list of SchemaField objects")
+ return schema_definition
+
+ return [bigquery.SchemaField(**x) for x in schema_definition]
+
+
class GoogleBigQuery(DatabaseConnector):
"""
Class for querying BigQuery table and returning the data as Parsons tables.
@@ -103,11 +160,568 @@ def __init__(self, app_creds=None, project=None, location=None):
self.dialect = "bigquery"
+ @property
+ def client(self):
+ """
+ Get the Google BigQuery client to use for making queries.
+
+ `Returns:`
+ `google.cloud.bigquery.client.Client`
+ """
+ if not self._client:
+ # Create a BigQuery client to use to make the query
+ self._client = bigquery.Client(project=self.project, location=self.location)
+
+ return self._client
+
+ @contextmanager
+ def connection(self):
+ """
+ Generate a BigQuery connection.
+ The connection is set up as a python "context manager", so it will be closed
+ automatically when the connection goes out of scope. Note that the BigQuery
+ API uses jobs to run database operations and, as such, simply has a no-op for
+ a "commit" function.
+
+ If you would like to manage transactions, please use multi-statement queries
+ as [outlined here](https://cloud.google.com/bigquery/docs/transactions)
+ or utilize the `query_with_transaction` method on this class.
+
+ When using the connection, make sure to put it in a ``with`` block (necessary for
+ any context manager):
+ ``with bq.connection() as conn:``
+
+ `Returns:`
+ Google BigQuery ``connection`` object
+ """
+ conn = self._dbapi.connect(self.client)
+ try:
+ yield conn
+ finally:
+ conn.close()
+
+ @contextmanager
+ def cursor(self, connection):
+ cur = connection.cursor()
+ try:
+ yield cur
+ finally:
+ cur.close()
+
+ def query(
+ self,
+ sql: str,
+ parameters: Optional[Union[list, dict]] = None,
+ return_values: bool = True,
+ ) -> Optional[Table]:
+ """
+ Run a BigQuery query and return the results as a Parsons table.
+
+ To include python variables in your query, it is recommended to pass them as parameters,
+ following the BigQuery style where parameters are prefixed with `@`s.
+ Using the ``parameters`` argument ensures that values are escaped properly, and avoids SQL
+ injection attacks.
+
+ **Parameter Examples**
+
+ .. code-block:: python
+
+ name = "Beatrice O'Brady"
+ sql = 'SELECT * FROM my_table WHERE name = %s'
+ rs.query(sql, parameters=[name])
+
+ .. code-block:: python
+
+ name = "Beatrice O'Brady"
+ sql = "SELECT * FROM my_table WHERE name = %(name)s"
+ rs.query(sql, parameters={'name': name})
+
+ `Args:`
+ sql: str
+ A valid BigTable statement
+ parameters: dict
+ A dictionary of query parameters for BigQuery.
+
+ `Returns:`
+ Parsons Table
+ See :ref:`parsons-table` for output options.
+ """
+
+ with self.connection() as connection:
+ return self.query_with_connection(
+ sql, connection, parameters=parameters, return_values=return_values
+ )
+
+ def query_with_connection(
+ self, sql, connection, parameters=None, commit=True, return_values: bool = True
+ ):
+ """
+ Execute a query against the BigQuery database, with an existing connection.
+ Useful for batching queries together. Will return ``None`` if the query
+ returns zero rows.
+
+ `Args:`
+ sql: str
+ A valid SQL statement
+ connection: obj
+ A connection object obtained from ``redshift.connection()``
+ parameters: list
+ A list of python variables to be converted into SQL values in your query
+ commit: boolean
+ Must be true. BigQuery
+
+ `Returns:`
+ Parsons Table
+ See :ref:`parsons-table` for output options.
+ """
+
+ if not commit:
+ raise ValueError(
+ """
+ BigQuery implementation uses an API client which always auto-commits.
+ If you wish to wrap multiple queries in a transaction, use
+ Mulit-Statement transactions within a single query as outlined
+ here: https://cloud.google.com/bigquery/docs/transactions or use the
+ `query_with_transaction` method on the BigQuery connector.
+ """
+ )
+
+ # get our connection and cursor
+ with self.cursor(connection) as cursor:
+ # Run the query
+ cursor.execute(sql, parameters)
+
+ if not return_values:
+ return None
+
+ final_table = self._fetch_query_results(cursor=cursor)
+
+ return final_table
+
+ def query_with_transaction(self, queries, parameters=None):
+ queries_with_semicolons = [ends_with_semicolon(q) for q in queries]
+ queries_on_newlines = "\n".join(queries_with_semicolons)
+ queries_wrapped = f"""
+ BEGIN
+ BEGIN TRANSACTION;
+ {queries_on_newlines}
+ COMMIT TRANSACTION;
+ END;
+ """
+ self.query(sql=queries_wrapped, parameters=parameters, return_values=False)
+
+ def copy_from_gcs(
+ self,
+ gcs_blob_uri: str,
+ table_name: str,
+ if_exists: str = "fail",
+ max_errors: int = 0,
+ data_type: str = "csv",
+ csv_delimiter: str = ",",
+ ignoreheader: int = 1,
+ nullas: Optional[str] = None,
+ allow_quoted_newlines: bool = True,
+ allow_jagged_rows: bool = True,
+ quote: Optional[str] = None,
+ schema: Optional[List[dict]] = None,
+ job_config: Optional[LoadJobConfig] = None,
+ force_unzip_blobs: bool = False,
+ compression_type: str = "gzip",
+ new_file_extension: str = "csv",
+ **load_kwargs,
+ ):
+ """
+ Copy a csv saved in Google Cloud Storage into Google BigQuery.
+
+ `Args:`
+ gcs_blob_uri: str
+ The GoogleCloudStorage URI referencing the file to be copied.
+ table_name: str
+ The table name to load the data into.
+ if_exists: str
+ If the table already exists, either ``fail``, ``append``, ``drop``
+ or ``truncate`` the table. This maps to `write_disposition` in the
+ `LoadJobConfig` class.
+ max_errors: int
+ The maximum number of rows that can error and be skipped before
+ the job fails. This maps to `max_bad_records` in the `LoadJobConfig` class.
+ data_type: str
+ Denotes whether target file is a JSON or CSV
+ csv_delimiter: str
+ Character used to separate values in the target file
+ ignoreheader: int
+ Treats the specified number_rows as a file header and doesn't load them
+ nullas: str
+ Loads fields that match null_string as NULL, where null_string can be any string
+ allow_quoted_newlines: bool
+ If True, detects quoted new line characters within a CSV field and does
+ not interpret the quoted new line character as a row boundary
+ allow_jagged_rows: bool
+ Allow missing trailing optional columns (CSV only).
+ quote: str
+ The value that is used to quote data sections in a CSV file.
+ BigQuery converts the string to ISO-8859-1 encoding, and then uses the first byte of
+ the encoded string to split the data in its raw, binary state.
+ schema: list
+ BigQuery expects a list of dictionaries in the following format
+ ```
+ schema = [
+ {"name": "column_name", "type": STRING},
+ {"name": "another_column_name", "type": INT}
+ ]
+ ```
+ job_config: object
+ A LoadJobConfig object to provide to the underlying call to load_table_from_uri
+ on the BigQuery client. The function will create its own if not provided. Note
+ if there are any conflicts between the job_config and other parameters, the
+ job_config values are preferred.
+ force_unzip_blobs: bool
+ If True, target blobs will be unzipped before being loaded to BigQuery.
+ compression_type: str
+ Accepts `zip` or `gzip` values to differentially unzip a compressed
+ blob in cloud storage.
+ new_file_extension: str
+ Provides a file extension if a blob is decompressed and rewritten
+ to cloud storage.
+ **load_kwargs: kwargs
+ Other arguments to pass to the underlying load_table_from_uri
+ call on the BigQuery client.
+ """
+ if if_exists not in ["fail", "truncate", "append", "drop"]:
+ raise ValueError(
+ f"Unexpected value for if_exists: {if_exists}, must be one of "
+ '"append", "drop", "truncate", or "fail"'
+ )
+ if data_type not in ["csv", "json"]:
+ raise ValueError(
+ f"Only supports csv or json files [data_type = {data_type}]"
+ )
+
+ table_exists = self.table_exists(table_name)
+
+ job_config = self._process_job_config(
+ job_config=job_config,
+ table_exists=table_exists,
+ table_name=table_name,
+ if_exists=if_exists,
+ max_errors=max_errors,
+ data_type=data_type,
+ csv_delimiter=csv_delimiter,
+ ignoreheader=ignoreheader,
+ nullas=nullas,
+ allow_quoted_newlines=allow_quoted_newlines,
+ allow_jagged_rows=allow_jagged_rows,
+ quote=quote,
+ schema=schema,
+ )
+
+ # load CSV from Cloud Storage into BigQuery
+ table_ref = get_table_ref(self.client, table_name)
+
+ try:
+ if force_unzip_blobs:
+ self.copy_large_compressed_file_from_gcs(
+ gcs_blob_uri=gcs_blob_uri,
+ table_name=table_name,
+ if_exists=if_exists,
+ max_errors=max_errors,
+ data_type=data_type,
+ csv_delimiter=csv_delimiter,
+ ignoreheader=ignoreheader,
+ nullas=nullas,
+ allow_quoted_newlines=allow_quoted_newlines,
+ quote=quote,
+ schema=schema,
+ job_config=job_config,
+ compression_type=compression_type,
+ new_file_extension=new_file_extension,
+ )
+ else:
+ load_job = self.client.load_table_from_uri(
+ source_uris=gcs_blob_uri,
+ destination=table_ref,
+ job_config=job_config,
+ **load_kwargs,
+ )
+ load_job.result()
+ except exceptions.BadRequest as e:
+ if "one of the files is larger than the maximum allowed size." in str(e):
+ logger.debug(
+ f"{gcs_blob_uri.split('/')[-1]} exceeds max size ... \
+ running decompression function..."
+ )
+
+ self.copy_large_compressed_file_from_gcs(
+ gcs_blob_uri=gcs_blob_uri,
+ table_name=table_name,
+ if_exists=if_exists,
+ max_errors=max_errors,
+ data_type=data_type,
+ csv_delimiter=csv_delimiter,
+ ignoreheader=ignoreheader,
+ nullas=nullas,
+ allow_quoted_newlines=allow_quoted_newlines,
+ quote=quote,
+ schema=schema,
+ job_config=job_config,
+ compression_type=compression_type,
+ new_file_extension=new_file_extension,
+ )
+ elif "Schema has no field" in str(e):
+ logger.debug(f"{gcs_blob_uri.split('/')[-1]} is empty, skipping file")
+ return "Empty file"
+
+ elif "encountered too many errors, giving up" in str(e):
+ # TODO - Is this TOO verbose?
+ logger.error(f"Max errors exceeded for {gcs_blob_uri.split('/')[-1]}")
+
+ for error_ in load_job.errors:
+ logger.error(error_)
+
+ raise e
+
+ else:
+ raise e
+
+ def copy_large_compressed_file_from_gcs(
+ self,
+ gcs_blob_uri: str,
+ table_name: str,
+ if_exists: str = "fail",
+ max_errors: int = 0,
+ data_type: str = "csv",
+ csv_delimiter: str = ",",
+ ignoreheader: int = 1,
+ nullas: Optional[str] = None,
+ allow_quoted_newlines: bool = True,
+ allow_jagged_rows: bool = True,
+ quote: Optional[str] = None,
+ schema: Optional[List[dict]] = None,
+ job_config: Optional[LoadJobConfig] = None,
+ compression_type: str = "gzip",
+ new_file_extension: str = "csv",
+ **load_kwargs,
+ ):
+ """
+ Copy a compressed CSV file that exceeds the maximum size in Google Cloud Storage
+ into Google BigQuery.
+
+ `Args:`
+ gcs_blob_uri: str
+ The GoogleCloudStorage URI referencing the file to be copied.
+ table_name: str
+ The table name to load the data into.
+ if_exists: str
+ If the table already exists, either ``fail``, ``append``, ``drop``
+ or ``truncate`` the table. This maps to `write_disposition` in the
+ `LoadJobConfig` class.
+ max_errors: int
+ The maximum number of rows that can error and be skipped before
+ the job fails. This maps to `max_bad_records` in the `LoadJobConfig` class.
+ data_type: str
+ Denotes whether target file is a JSON or CSV
+ csv_delimiter: str
+ Character used to separate values in the target file
+ ignoreheader: int
+ Treats the specified number_rows as a file header and doesn't load them
+ nullas: str
+ Loads fields that match null_string as NULL, where null_string can be any string
+ allow_quoted_newlines: bool
+ If True, detects quoted new line characters within a CSV field
+ and does not interpret the quoted new line character as a row boundary
+ allow_jagged_rows: bool
+ Allow missing trailing optional columns (CSV only).
+ quote: str
+ The value that is used to quote data sections in a CSV file.
+ BigQuery converts the string to ISO-8859-1 encoding, and then uses the first byte of
+ the encoded string to split the data in its raw, binary state.
+ schema: list
+ BigQuery expects a list of dictionaries in the following format
+ ```
+ schema = [
+ {"name": "column_name", "type": STRING},
+ {"name": "another_column_name", "type": INT}
+ ]
+ ```
+ job_config: object
+ A LoadJobConfig object to provide to the underlying call to load_table_from_uri
+ on the BigQuery client. The function will create its own if not provided. Note
+ if there are any conflicts between the job_config and other parameters, the
+ job_config values are preferred.
+ compression_type: str
+ Accepts `zip` or `gzip` values to differentially unzip a compressed
+ blob in cloud storage.
+ new_file_extension: str
+ Provides a file extension if a blob is decompressed and rewritten to cloud storage.
+ **load_kwargs: kwargs
+ Other arguments to pass to the underlying load_table_from_uri call on the BigQuery
+ client.
+ """
+
+ if if_exists not in ["fail", "truncate", "append", "drop"]:
+ raise ValueError(
+ f"Unexpected value for if_exists: {if_exists}, must be one of "
+ '"append", "drop", "truncate", or "fail"'
+ )
+ if data_type not in ["csv", "json"]:
+ raise ValueError(
+ f"Only supports csv or json files [data_type = {data_type}]"
+ )
+
+ table_exists = self.table_exists(table_name)
+
+ job_config = self._process_job_config(
+ job_config=job_config,
+ table_exists=table_exists,
+ table_name=table_name,
+ if_exists=if_exists,
+ max_errors=max_errors,
+ data_type=data_type,
+ csv_delimiter=csv_delimiter,
+ ignoreheader=ignoreheader,
+ nullas=nullas,
+ allow_quoted_newlines=allow_quoted_newlines,
+ allow_jagged_rows=allow_jagged_rows,
+ quote=quote,
+ schema=schema,
+ )
+
+ # TODO - See if this inheritance is happening in other places
+ gcs = GoogleCloudStorage(app_creds=self.app_creds, project=self.project)
+ old_bucket_name, old_blob_name = gcs.split_uri(gcs_uri=gcs_blob_uri)
+
+ uncompressed_gcs_uri = None
+
+ try:
+ logger.debug("Unzipping large file")
+ uncompressed_gcs_uri = gcs.unzip_blob(
+ bucket_name=old_bucket_name,
+ blob_name=old_blob_name,
+ new_file_extension=new_file_extension,
+ compression_type=compression_type,
+ )
+
+ logger.debug(
+ f"Loading uncompressed uri into BigQuery {uncompressed_gcs_uri}..."
+ )
+ table_ref = get_table_ref(self.client, table_name)
+ load_job = self.client.load_table_from_uri(
+ source_uris=uncompressed_gcs_uri,
+ destination=table_ref,
+ job_config=job_config,
+ **load_kwargs,
+ )
+ load_job.result()
+ finally:
+ if uncompressed_gcs_uri:
+ new_bucket_name, new_blob_name = gcs.split_uri(
+ gcs_uri=uncompressed_gcs_uri
+ )
+ gcs.delete_blob(new_bucket_name, new_blob_name)
+ logger.debug("Successfully dropped uncompressed blob")
+
+ def copy_s3(
+ self,
+ table_name,
+ bucket,
+ key,
+ if_exists: str = "fail",
+ max_errors: int = 0,
+ data_type: str = "csv",
+ csv_delimiter: str = ",",
+ ignoreheader: int = 1,
+ nullas: Optional[str] = None,
+ aws_access_key_id: Optional[str] = None,
+ aws_secret_access_key: Optional[str] = None,
+ gcs_client: Optional[GoogleCloudStorage] = None,
+ tmp_gcs_bucket: Optional[str] = None,
+ job_config: Optional[LoadJobConfig] = None,
+ **load_kwargs,
+ ):
+ """
+ Copy a file from s3 to BigQuery.
+
+ `Args:`
+ table_name: str
+ The table name and schema (``tmc.cool_table``) to point the file.
+ bucket: str
+ The s3 bucket where the file or manifest is located.
+ key: str
+ The key of the file or manifest in the s3 bucket.
+ if_exists: str
+ If the table already exists, either ``fail``, ``append``, ``drop``
+ or ``truncate`` the table.
+ max_errors: int
+ The maximum number of rows that can error and be skipped before
+ the job fails.
+ data_type: str
+ The data type of the file. Only ``csv`` supported currently.
+ csv_delimiter: str
+ The delimiter of the ``csv``. Only relevant if data_type is ``csv``.
+ ignoreheader: int
+ The number of header rows to skip. Ignored if data_type is ``json``.
+ nullas: str
+ Loads fields that match string as NULL
+ aws_access_key_id:
+ An AWS access key granted to the bucket where the file is located. Not required
+ if keys are stored as environmental variables.
+ aws_secret_access_key:
+ An AWS secret access key granted to the bucket where the file is located. Not
+ required if keys are stored as environmental variables.
+ gcs_client: object
+ The GoogleCloudStorage Connector to use for loading data into Google Cloud Storage.
+ tmp_gcs_bucket: str
+ The name of the Google Cloud Storage bucket to use to stage the data to load
+ into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified.
+ job_config: object
+ A LoadJobConfig object to provide to the underlying call to load_table_from_uri
+ on the BigQuery client. The function will create its own if not provided. Note
+ if there are any conflicts between the job_config and other parameters, the
+ job_config values are preferred.
+
+ `Returns`
+ Parsons Table or ``None``
+ See :ref:`parsons-table` for output options.
+ """
+
+ # copy from S3 to GCS
+ tmp_gcs_bucket = check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket)
+ gcs_client = gcs_client or GoogleCloudStorage()
+ temp_blob_uri = gcs_client.copy_s3_to_gcs(
+ aws_source_bucket=bucket,
+ aws_access_key_id=aws_access_key_id,
+ aws_secret_access_key=aws_secret_access_key,
+ gcs_sink_bucket=tmp_gcs_bucket,
+ aws_s3_key=key,
+ )
+ temp_blob_name = key
+ temp_blob_uri = gcs_client.format_uri(
+ bucket=tmp_gcs_bucket, name=temp_blob_name
+ )
+
+ # load CSV from Cloud Storage into BigQuery
+ try:
+ self.copy_from_gcs(
+ gcs_blob_uri=temp_blob_uri,
+ table_name=table_name,
+ if_exists=if_exists,
+ max_errors=max_errors,
+ data_type=data_type,
+ csv_delimiter=csv_delimiter,
+ ignoreheader=ignoreheader,
+ nullas=nullas,
+ job_config=job_config,
+ **load_kwargs,
+ )
+ finally:
+ gcs_client.delete_blob(tmp_gcs_bucket, temp_blob_name)
+
def copy(
self,
tbl: Table,
table_name: str,
if_exists: str = "fail",
+ max_errors: int = 0,
tmp_gcs_bucket: Optional[str] = None,
gcs_client: Optional[GoogleCloudStorage] = None,
job_config: Optional[LoadJobConfig] = None,
@@ -117,13 +731,16 @@ def copy(
Copy a :ref:`parsons-table` into Google BigQuery via Google Cloud Storage.
`Args:`
- table_obj: obj
+ tbl: obj
The Parsons Table to copy into BigQuery.
table_name: str
The table name to load the data into.
if_exists: str
If the table already exists, either ``fail``, ``append``, ``drop``
or ``truncate`` the table.
+ max_errors: int
+ The maximum number of rows that can error and be skipped before
+ the job fails.
tmp_gcs_bucket: str
The name of the Google Cloud Storage bucket to use to stage the data to load
into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified.
@@ -138,53 +755,183 @@ def copy(
"""
tmp_gcs_bucket = check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket)
- if if_exists not in ["fail", "truncate", "append", "drop"]:
- raise ValueError(
- f"Unexpected value for if_exists: {if_exists}, must be one of "
- '"append", "drop", "truncate", or "fail"'
- )
-
- table_exists = self.table_exists(table_name)
-
- if not job_config:
- job_config = bigquery.LoadJobConfig()
-
+ # if not job_config:
+ job_config = bigquery.LoadJobConfig()
if not job_config.schema:
- job_config.schema = self._generate_schema(tbl)
-
- if not job_config.create_disposition:
- job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED
- job_config.skip_leading_rows = 1
- job_config.source_format = bigquery.SourceFormat.CSV
- job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY
-
- if table_exists:
- if if_exists == "fail":
- raise ValueError("Table already exists.")
- elif if_exists == "drop":
- self.delete_table(table_name)
- elif if_exists == "append":
- job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
- elif if_exists == "truncate":
- job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
+ job_config.schema = self._generate_schema_from_parsons_table(tbl)
gcs_client = gcs_client or GoogleCloudStorage()
temp_blob_name = f"{uuid.uuid4()}.csv"
temp_blob_uri = gcs_client.upload_table(tbl, tmp_gcs_bucket, temp_blob_name)
# load CSV from Cloud Storage into BigQuery
- table_ref = get_table_ref(self.client, table_name)
try:
- load_job = self.client.load_table_from_uri(
- temp_blob_uri,
- table_ref,
+ self.copy_from_gcs(
+ gcs_blob_uri=temp_blob_uri,
+ table_name=table_name,
+ if_exists=if_exists,
+ max_errors=max_errors,
job_config=job_config,
**load_kwargs,
)
- load_job.result()
finally:
gcs_client.delete_blob(tmp_gcs_bucket, temp_blob_name)
+ def duplicate_table(
+ self,
+ source_table,
+ destination_table,
+ if_exists="fail",
+ drop_source_table=False,
+ ):
+ """
+ Create a copy of an existing table (or subset of rows) in a new
+ table.
+
+ `Args:`
+ source_table: str
+ Name of existing schema and table (e.g. ``myschema.oldtable``)
+ destination_table: str
+ Name of destination schema and table (e.g. ``myschema.newtable``)
+ if_exists: str
+ If the table already exists, either ``fail``, ``replace``, or
+ ``ignore`` the operation.
+ drop_source_table: boolean
+ Drop the source table
+ """
+ if if_exists not in ["fail", "replace", "ignore"]:
+ raise ValueError("Invalid value for `if_exists` argument")
+ if if_exists == "fail" and self.table_exists(destination_table):
+ raise ValueError("Table already exists.")
+
+ table__replace_clause = "OR REPLACE " if if_exists == "replace" else ""
+ table__exists_clause = " IF NOT EXISTS" if if_exists == "ignore" else ""
+
+ query = f"""
+ CREATE {table__replace_clause}TABLE{table__exists_clause}
+ {destination_table}
+ CLONE {source_table}
+ """
+ self.query(sql=query, return_values=False)
+ if drop_source_table:
+ self.delete_table(table_name=source_table)
+
+ def upsert(
+ self,
+ table_obj,
+ target_table,
+ primary_key,
+ distinct_check=True,
+ cleanup_temp_table=True,
+ from_s3=False,
+ **copy_args,
+ ):
+ """
+ Preform an upsert on an existing table. An upsert is a function in which rows
+ in a table are updated and inserted at the same time.
+
+ `Args:`
+ table_obj: obj
+ A Parsons table object
+ target_table: str
+ The schema and table name to upsert
+ primary_key: str or list
+ The primary key column(s) of the target table
+ distinct_check: boolean
+ Check if the primary key column is distinct. Raise error if not.
+ cleanup_temp_table: boolean
+ A temp table is dropped by default on cleanup. You can set to False for debugging.
+ from_s3: boolean
+ Instead of specifying a table_obj (set the first argument to None),
+ set this to True and include :func:`~parsons.databases.bigquery.Bigquery.copy_s3`
+ arguments to upsert a pre-existing s3 file into the target_table
+ \**copy_args: kwargs
+ See :func:`~parsons.databases.bigquery.BigQuery.copy` for options.
+ """ # noqa: W605
+ if not self.table_exists(target_table):
+ logger.info(
+ "Target table does not exist. Copying into newly \
+ created target table."
+ )
+
+ self.copy(table_obj, target_table)
+ return None
+
+ if isinstance(primary_key, str):
+ primary_keys = [primary_key]
+ else:
+ primary_keys = primary_key
+
+ if distinct_check:
+ primary_keys_statement = ", ".join(primary_keys)
+ diff = self.query(
+ f"""
+ select (
+ select count(*)
+ from {target_table}
+ ) - (
+ SELECT COUNT(*) from (
+ select distinct {primary_keys_statement}
+ from {target_table}
+ )
+ ) as total_count
+ """
+ ).first
+ if diff > 0:
+ raise ValueError("Primary key column contains duplicate values.")
+
+ noise = f"{random.randrange(0, 10000):04}"[:4]
+ date_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M")
+ # Generate a temp table like "table_tmp_20200210_1230_14212"
+ staging_tbl = f"{target_table}_stg_{date_stamp}_{noise}"
+
+ # Copy to a staging table
+ logger.info(f"Building staging table: {staging_tbl}")
+
+ if from_s3:
+ if table_obj is not None:
+ raise ValueError(
+ "upsert(... from_s3=True) requires the first argument (table_obj)"
+ " to be None. from_s3 and table_obj are mutually exclusive."
+ )
+ self.copy_s3(staging_tbl, template_table=target_table, **copy_args)
+
+ else:
+ self.copy(
+ tbl=table_obj,
+ table_name=staging_tbl,
+ template_table=target_table,
+ **copy_args,
+ )
+
+ staging_table_name = staging_tbl.split(".")[1]
+ target_table_name = target_table.split(".")[1]
+
+ # Delete rows
+ comparisons = [
+ f"{staging_table_name}.{primary_key} = {target_table_name}.{primary_key}"
+ for primary_key in primary_keys
+ ]
+ where_clause = " and ".join(comparisons)
+
+ queries = [
+ f"""
+ DELETE FROM {target_table}
+ USING {staging_tbl}
+ WHERE {where_clause}
+ """,
+ f"""
+ INSERT INTO {target_table}
+ SELECT * FROM {staging_tbl}
+ """,
+ ]
+
+ if cleanup_temp_table:
+ # Drop the staging table
+ queries.append(f"DROP TABLE IF EXISTS {staging_tbl}")
+
+ return self.query_with_transaction(queries=queries)
+
def delete_table(self, table_name):
"""
Delete a BigQuery table.
@@ -196,47 +943,234 @@ def delete_table(self, table_name):
table_ref = get_table_ref(self.client, table_name)
self.client.delete_table(table_ref)
- def query(
- self, sql: str, parameters: Optional[Union[list, dict]] = None
- ) -> Optional[Table]:
+ def table_exists(self, table_name: str) -> bool:
"""
- Run a BigQuery query and return the results as a Parsons table.
-
- To include python variables in your query, it is recommended to pass them as parameters,
- following the BigQuery style where parameters are prefixed with `@`s.
- Using the ``parameters`` argument ensures that values are escaped properly, and avoids SQL
- injection attacks.
+ Check whether or not the Google BigQuery table exists in the specified dataset.
- **Parameter Examples**
+ `Args:`
+ table_name: str
+ The name of the BigQuery table to check for
+ `Returns:`
+ bool
+ True if the table exists in the specified dataset, false otherwise
+ """
+ table_ref = get_table_ref(self.client, table_name)
+ try:
+ self.client.get_table(table_ref)
+ except exceptions.NotFound:
+ return False
- .. code-block:: python
+ return True
- name = "Beatrice O'Brady"
- sql = 'SELECT * FROM my_table WHERE name = %s'
- rs.query(sql, parameters=[name])
+ def get_tables(self, schema, table_name: Optional[str] = None):
+ """
+ List the tables in a schema including metadata.
- .. code-block:: python
+ Args:
+ schema: str
+ Filter by a schema
+ table_name: str
+ Filter by a table name
+ `Returns:`
+ Parsons Table
+ See :ref:`parsons-table` for output options.
+ """
- name = "Beatrice O'Brady"
- sql = "SELECT * FROM my_table WHERE name = %(name)s"
- rs.query(sql, parameters={'name': name})
+ logger.debug("Retrieving tables info.")
+ sql = f"select * from {schema}.INFORMATION_SCHEMA.TABLES"
+ if table_name:
+ sql += f" where table_name = '{table_name}'"
+ return self.query(sql)
- `Args:`
- sql: str
- A valid BigTable statement
- parameters: dict
- A dictionary of query parameters for BigQuery.
+ def get_views(self, schema, view: Optional[str] = None):
+ """
+ List views.
+ Args:
+ schema: str
+ Filter by a schema
+ view: str
+ Filter by a table name
`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""
- # get our connection and cursor
- cursor = self._dbapi.connect(self.client).cursor()
- # Run the query
- cursor.execute(sql, parameters)
+ logger.debug("Retrieving views info.")
+ sql = f"""
+ select
+ table_schema as schema_name,
+ table_name as view_name,
+ view_definition
+ from {schema}.INFORMATION_SCHEMA.VIEWS
+ """
+ if view:
+ sql += f" where table_name = '{view}'"
+ return self.query(sql)
+
+ def get_columns(self, schema: str, table_name: str):
+ """
+ Gets the column names (and other column metadata) for a table. If you
+ need just the column names run ``get_columns_list()``, as it is faster.
+
+ `Args:`
+ schema: str
+ The schema name
+ table_name: str
+ The table name
+
+ `Returns:`
+ A dictionary mapping column name to a dictionary with extra info. The
+ keys of the dictionary are ordered just liked the columns in the table.
+ The extra info is a dict with format
+ """
+
+ base_query = f"""
+ SELECT
+ *
+ FROM `{self.project}.{schema}.INFORMATION_SCHEMA.COLUMNS`
+ WHERE
+ table_name = '{table_name}'
+ """
+
+ logger.debug(base_query)
+
+ return {
+ row["column_name"]: {
+ "data_type": row["data_type"],
+ "is_nullable": row["is_nullable"],
+ "is_updatable": row["is_updatable"],
+ "is_partioning_column": row["is_partitioning_column"],
+ "rounding_mode": row["rounding_mode"],
+ }
+ for row in self.query(base_query)
+ }
+ def get_columns_list(self, schema: str, table_name: str) -> list:
+ """
+ Gets the column names for a table.
+
+ `Args:`
+ schema: str
+ The schema name
+ table_name: str
+ The table name
+
+ `Returns:`
+ A list of column names
+ """
+
+ first_row = self.query(f"SELECT * FROM {schema}.{table_name} LIMIT 1;")
+
+ return [x for x in first_row.columns]
+
+ def get_row_count(self, schema: str, table_name: str) -> int:
+ """
+ Gets the row count for a BigQuery materialization.
+
+ `Args`:
+ schema: str
+ The schema name
+ table_name: str
+ The table name
+
+ `Returns:`
+ Row count of the target table
+ """
+
+ sql = f"SELECT COUNT(*) AS row_count FROM `{schema}.{table_name}`"
+ result = self.query(sql=sql)
+
+ return result["row_count"][0]
+
+ def _generate_schema_from_parsons_table(self, tbl):
+ stats = tbl.get_columns_type_stats()
+ fields = []
+ for stat in stats:
+ petl_types = stat["type"]
+ best_type = "str" if "str" in petl_types else petl_types[0]
+ field_type = self._bigquery_type(best_type)
+ field = bigquery.schema.SchemaField(stat["name"], field_type)
+ fields.append(field)
+ return fields
+
+ def _process_job_config(
+ self, job_config: Optional[LoadJobConfig] = None, **kwargs
+ ) -> LoadJobConfig:
+ """
+ Internal function to neatly process a user-supplied job configuration object.
+ As a convention, if both the job_config and keyword arguments specify a value,
+ we defer to the job_config.
+
+ `Args`:
+ job_config: `LoadJobConfig`
+ Optionally supplied GCS `LoadJobConfig` object
+
+ `Returns`:
+ A `LoadJobConfig` object
+ """
+
+ if not job_config:
+ job_config = bigquery.LoadJobConfig()
+
+ if not job_config.schema:
+ if kwargs["schema"]:
+ logger.debug("Using user-supplied schema definition...")
+ job_config.schema = map_column_headers_to_schema_field(kwargs["schema"])
+ job_config.autodetect = False
+ else:
+ logger.debug("Autodetecting schema definition...")
+ job_config.autodetect = True
+
+ if not job_config.create_disposition:
+ job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED
+
+ if not job_config.max_bad_records:
+ job_config.max_bad_records = kwargs["max_errors"]
+
+ if not job_config.skip_leading_rows and kwargs["data_type"] == "csv":
+ job_config.skip_leading_rows = kwargs["ignoreheader"]
+
+ if not job_config.source_format:
+ job_config.source_format = (
+ bigquery.SourceFormat.CSV
+ if kwargs["data_type"] == "csv"
+ else bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
+ )
+
+ if not job_config.field_delimiter:
+ if kwargs["data_type"] == "csv":
+ job_config.field_delimiter = kwargs["csv_delimiter"]
+ if kwargs["nullas"]:
+ job_config.null_marker = kwargs["nullas"]
+
+ if not job_config.write_disposition:
+ if kwargs["if_exists"] == "append":
+ job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
+ elif kwargs["if_exists"] == "truncate":
+ job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
+ elif kwargs["table_exists"] and kwargs["if_exists"] == "fail":
+ raise Exception("Table already exists.")
+ elif kwargs["if_exists"] == "drop" and kwargs["table_exists"]:
+ self.delete_table(kwargs["table_name"])
+ job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY
+ else:
+ job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY
+
+ if not job_config.allow_quoted_newlines:
+ job_config.allow_quoted_newlines = kwargs["allow_quoted_newlines"]
+
+ if kwargs["data_type"] == "csv" and kwargs["allow_jagged_rows"]:
+ job_config.allow_jagged_rows = kwargs["allow_jagged_rows"]
+ else:
+ job_config.allow_jagged_rows = True
+
+ if not job_config.quote_character and kwargs["quote"]:
+ job_config.quote_character = kwargs["quote"]
+
+ return job_config
+
+ def _fetch_query_results(self, cursor) -> Table:
# We will use a temp file to cache the results so that they are not all living
# in memory. We'll use pickle to serialize the results to file in order to maintain
# the proper data types (e.g. integer).
@@ -267,53 +1201,7 @@ def query(
return None
ptable = petl.frompickle(temp_filename)
- final_table = Table(ptable)
-
- return final_table
-
- def table_exists(self, table_name: str) -> bool:
- """
- Check whether or not the Google BigQuery table exists in the specified dataset.
-
- `Args:`
- table_name: str
- The name of the BigQuery table to check for
- `Returns:`
- bool
- True if the table exists in the specified dataset, false otherwise
- """
- table_ref = get_table_ref(self.client, table_name)
- try:
- self.client.get_table(table_ref)
- except exceptions.NotFound:
- return False
-
- return True
-
- @property
- def client(self):
- """
- Get the Google BigQuery client to use for making queries.
-
- `Returns:`
- `google.cloud.bigquery.client.Client`
- """
- if not self._client:
- # Create a BigQuery client to use to make the query
- self._client = bigquery.Client(project=self.project, location=self.location)
-
- return self._client
-
- def _generate_schema(self, tbl):
- stats = tbl.get_columns_type_stats()
- fields = []
- for stat in stats:
- petl_types = stat["type"]
- best_type = "str" if "str" in petl_types else petl_types[0]
- field_type = self._bigquery_type(best_type)
- field = bigquery.schema.SchemaField(stat["name"], field_type)
- fields.append(field)
- return fields
+ return Table(ptable)
@staticmethod
def _bigquery_type(tp):
@@ -326,7 +1214,7 @@ def table(self, table_name):
class BigQueryTable(BaseTable):
- # BigQuery table object.
+ """BigQuery table object."""
def drop(self, cascade=False):
"""
@@ -339,16 +1227,5 @@ def truncate(self):
"""
Truncate the table.
"""
- # BigQuery does not support truncate natively, so we will "load" an empty dataset
- # with write disposition of "truncate"
- table_ref = get_table_ref(self.db.client, self.table)
- bq_table = self.db.client.get_table(table_ref)
- # BigQuery wants the schema when we load the data, so we will grab it from the table
- job_config = bigquery.LoadJobConfig()
- job_config.schema = bq_table.schema
-
- empty_table = Table([])
- self.db.copy(
- empty_table, self.table, if_exists="truncate", job_config=job_config
- )
+ self.db.query(f"TRUNCATE TABLE {self.table}")
diff --git a/parsons/google/google_cloud_storage.py b/parsons/google/google_cloud_storage.py
index 0b6065ef2b..19993c5768 100644
--- a/parsons/google/google_cloud_storage.py
+++ b/parsons/google/google_cloud_storage.py
@@ -1,9 +1,15 @@
import google
from google.cloud import storage
+from google.cloud import storage_transfer
from parsons.google.utitities import setup_google_application_credentials
from parsons.utilities import files
import datetime
import logging
+import time
+import uuid
+import gzip
+import zipfile
+from typing import Optional
logger = logging.getLogger(__name__)
@@ -35,17 +41,17 @@ class GoogleCloudStorage(object):
"""
def __init__(self, app_creds=None, project=None):
-
setup_google_application_credentials(app_creds)
+ self.project = project
# Throws an error if you pass project=None, so adding if/else statement.
- if not project:
+ if not self.project:
self.client = storage.Client()
"""
Access all methods of `google.cloud` package
"""
else:
- self.client = storage.Client(project=project)
+ self.client = storage.Client(project=self.project)
def list_buckets(self):
"""
@@ -71,10 +77,10 @@ def bucket_exists(self, bucket_name):
"""
if bucket_name in self.list_buckets():
- logger.info(f"{bucket_name} exists.")
+ logger.debug(f"{bucket_name} exists.")
return True
else:
- logger.info(f"{bucket_name} does not exist.")
+ logger.debug(f"{bucket_name} does not exist.")
return False
def get_bucket(self, bucket_name):
@@ -93,7 +99,7 @@ def get_bucket(self, bucket_name):
else:
raise google.cloud.exceptions.NotFound("Bucket not found")
- logger.info(f"Returning {bucket_name} object")
+ logger.debug(f"Returning {bucket_name} object")
return bucket
def create_bucket(self, bucket_name):
@@ -107,7 +113,7 @@ def create_bucket(self, bucket_name):
``None``
"""
- # To Do: Allow user to set all of the bucket parameters
+ # TODO: Allow user to set all of the bucket parameters
self.client.create_bucket(bucket_name)
logger.info(f"Created {bucket_name} bucket.")
@@ -130,7 +136,7 @@ def delete_bucket(self, bucket_name, delete_blobs=False):
bucket.delete(force=delete_blobs)
logger.info(f"{bucket_name} bucket deleted.")
- def list_blobs(self, bucket_name, max_results=None, prefix=None):
+ def list_blobs(self, bucket_name, max_results=None, prefix=None, match_glob=None):
"""
List all of the blobs in a bucket
@@ -138,15 +144,19 @@ def list_blobs(self, bucket_name, max_results=None, prefix=None):
bucket_name: str
The name of the bucket
max_results: int
- TBD
- prefix_filter: str
+ Maximum number of blobs to return
+ prefix: str
A prefix to filter files
+ match_glob: str
+ Filters files based on glob string. NOTE that the match_glob
+ parameter runs on the full blob URI, include a preceding wildcard
+ value to account for nested files (*/ for one level, **/ for n levels)
`Returns:`
A list of blob names
"""
blobs = self.client.list_blobs(
- bucket_name, max_results=max_results, prefix=prefix
+ bucket_name, max_results=max_results, prefix=prefix, match_glob=match_glob
)
lst = [b.name for b in blobs]
logger.info(f"Found {len(lst)} in {bucket_name} bucket.")
@@ -167,10 +177,10 @@ def blob_exists(self, bucket_name, blob_name):
"""
if blob_name in self.list_blobs(bucket_name):
- logger.info(f"{blob_name} exists.")
+ logger.debug(f"{blob_name} exists.")
return True
else:
- logger.info(f"{blob_name} does not exist.")
+ logger.debug(f"{blob_name} does not exist.")
return False
def get_blob(self, bucket_name, blob_name):
@@ -191,15 +201,15 @@ def get_blob(self, bucket_name, blob_name):
logger.debug(f"Got {blob_name} object from {bucket_name} bucket.")
return blob
- def put_blob(self, bucket_name, blob_name, local_path):
+ def put_blob(self, bucket_name, blob_name, local_path, **kwargs):
"""
Puts a blob (aka file) in a bucket
`Args:`
- blob_name:
- The name of blob to be stored in the bucket
bucket_name:
The name of the bucket to store the blob
+ blob_name:
+ The name of blob to be stored in the bucket
local_path: str
The local path of the file to upload
`Returns:`
@@ -210,7 +220,7 @@ def put_blob(self, bucket_name, blob_name, local_path):
blob = storage.Blob(blob_name, bucket)
with open(local_path, "rb") as f:
- blob.upload_from_file(f)
+ blob.upload_from_file(f, **kwargs)
logger.info(f"{blob_name} put in {bucket_name} bucket.")
@@ -238,10 +248,10 @@ def download_blob(self, bucket_name, blob_name, local_path=None):
bucket = storage.Bucket(self.client, name=bucket_name)
blob = storage.Blob(blob_name, bucket)
- logger.info(f"Downloading {blob_name} from {bucket_name} bucket.")
+ logger.debug(f"Downloading {blob_name} from {bucket_name} bucket.")
with open(local_path, "wb") as f:
blob.download_to_file(f, client=self.client)
- logger.info(f"{blob_name} saved to {local_path}.")
+ logger.debug(f"{blob_name} saved to {local_path}.")
return local_path
@@ -277,6 +287,11 @@ def upload_table(
The name of the blob to upload the data into.
data_type: str
The file format to use when writing the data. One of: `csv` or `json`
+ default_acl:
+ ACL desired for newly uploaded table
+
+ `Returns`:
+ String representation of file URI in GCS
"""
bucket = storage.Bucket(self.client, name=bucket_name)
blob = storage.Blob(blob_name, bucket)
@@ -328,3 +343,290 @@ def get_url(self, bucket_name, blob_name, expires_in=60):
method="GET",
)
return url
+
+ def copy_bucket_to_gcs(
+ self,
+ gcs_sink_bucket: str,
+ source: str,
+ source_bucket: str,
+ destination_path: str = "",
+ source_path: str = "",
+ aws_access_key_id: Optional[str] = None,
+ aws_secret_access_key: Optional[str] = None,
+ ):
+ """
+ Creates a one-time transfer job from Amazon S3 to Google Cloud
+ Storage. Copies all blobs within the bucket unless a key or prefix
+ is passed.
+
+ `Args`:
+ gcs_sink_bucket (str):
+ Destination for the data transfer (located in GCS)
+ source (str):
+ File storge vendor [gcs or s3]
+ source_bucket (str):
+ Source bucket name
+ source_path (str):
+ Path in the source system pointing to the relevant keys
+ / files to sync. Must end in a '/'
+ aws_access_key_id (str):
+ Access key to authenticate storage transfer
+ aws_secret_access_key (str):
+ Secret key to authenticate storage transfer
+ """
+ if source not in ["gcs", "s3"]:
+ raise ValueError(
+ f"Blob transfer only supports gcs and s3 sources [source={source}]"
+ )
+ if source_path and source_path[-1] != "/":
+ raise ValueError("Source path much end in a '/'")
+
+ client = storage_transfer.StorageTransferServiceClient()
+
+ now = datetime.datetime.utcnow()
+ # Setting the start date and the end date as
+ # the same time creates a one-time transfer
+ one_time_schedule = {"day": now.day, "month": now.month, "year": now.year}
+
+ if source == "gcs":
+ description = f"""One time GCS to GCS Transfer
+ [{source_bucket} -> {gcs_sink_bucket}] - {uuid.uuid4()}"""
+ elif source == "s3":
+ description = f"""One time S3 to GCS Transfer
+ [{source_bucket} -> {gcs_sink_bucket}] - {uuid.uuid4()}"""
+
+ transfer_job_config = {
+ "project_id": self.project,
+ "description": description,
+ "status": storage_transfer.TransferJob.Status.ENABLED,
+ "schedule": {
+ "schedule_start_date": one_time_schedule,
+ "schedule_end_date": one_time_schedule,
+ },
+ }
+
+ # Setup transfer job configuration based on user imput
+ if source == "s3":
+ blob_storage = "S3"
+ transfer_job_config["transfer_spec"] = {
+ "aws_s3_data_source": {
+ "bucket_name": source_bucket,
+ "path": source_path,
+ "aws_access_key": {
+ "access_key_id": aws_access_key_id,
+ "secret_access_key": aws_secret_access_key,
+ },
+ },
+ "gcs_data_sink": {
+ "bucket_name": gcs_sink_bucket,
+ "path": destination_path,
+ },
+ }
+ elif source == "gcs":
+ blob_storage = "GCS"
+ transfer_job_config["transfer_spec"] = {
+ "gcs_data_source": {
+ "bucket_name": source_bucket,
+ "path": source_path,
+ },
+ "gcs_data_sink": {
+ "bucket_name": gcs_sink_bucket,
+ "path": destination_path,
+ },
+ }
+
+ create_transfer_job_request = storage_transfer.CreateTransferJobRequest(
+ {"transfer_job": transfer_job_config}
+ )
+
+ # Create the transfer job
+ create_result = client.create_transfer_job(create_transfer_job_request)
+
+ polling = True
+ wait_time = 0
+ wait_between_attempts_in_sec = 10
+
+ # NOTE: This value defaults to an empty string until GCP
+ # triggers the job internally ... we'll use this value to
+ # determine whether or not the transfer has kicked off
+ latest_operation_name = create_result.latest_operation_name
+
+ while polling:
+ if latest_operation_name:
+ operation = client.get_operation({"name": latest_operation_name})
+
+ if not operation.done:
+ logger.debug("Operation still running...")
+
+ else:
+ operation_metadata = storage_transfer.TransferOperation.deserialize(
+ operation.metadata.value
+ )
+ error_output = operation_metadata.error_breakdowns
+ if len(error_output) != 0:
+ raise Exception(
+ f"""{blob_storage} to GCS Transfer Job
+ {create_result.name} failed with error: {error_output}"""
+ )
+ else:
+ logger.info(f"TransferJob: {create_result.name} succeeded.")
+ return
+
+ else:
+ logger.info("Waiting to kickoff operation...")
+ get_transfer_job_request = storage_transfer.GetTransferJobRequest(
+ {"job_name": create_result.name, "project_id": self.project}
+ )
+ get_result = client.get_transfer_job(request=get_transfer_job_request)
+ latest_operation_name = get_result.latest_operation_name
+
+ wait_time += wait_between_attempts_in_sec
+ time.sleep(wait_between_attempts_in_sec)
+
+ def format_uri(self, bucket: str, name: str):
+ """
+ Represent a GCS URI as a string
+
+ `Args`:
+ bucket: str
+ GCS bucket name
+ name: str
+ Filename in bucket
+
+ `Returns`:
+ String represetnation of URI
+ """
+ return f"gs://{bucket}/{name}"
+
+ def split_uri(self, gcs_uri: str):
+ """
+ Split a GCS URI into a bucket and blob name
+
+ `Args`:
+ gcs_uri: str
+ GCS URI
+
+ `Returns`:
+ Tuple of strings with bucket_name and blob_name
+ """
+ # TODO: make this more robust with regex?
+ remove_protocol = gcs_uri.replace("gs://", "")
+ uri_parts = remove_protocol.split("/")
+ bucket_name = uri_parts[0]
+ blob_name = "/".join(uri_parts[1:])
+ return bucket_name, blob_name
+
+ def unzip_blob(
+ self,
+ bucket_name: str,
+ blob_name: str,
+ compression_type: str = "gzip",
+ new_filename: Optional[str] = None,
+ new_file_extension: Optional[str] = None,
+ ) -> str:
+ """
+ Downloads and decompresses a blob. The decompressed blob
+ is re-uploaded with the same filename if no `new_filename`
+ parameter is provided.
+
+ `Args`:
+ bucket_name: str
+ GCS bucket name
+
+ blob_name: str
+ Blob name in GCS bucket
+
+ compression_type: str
+ Either `zip` or `gzip`
+
+ new_filename: str
+ If provided, replaces the existing blob name
+ when the decompressed file is uploaded
+
+ new_file_extension: str
+ If provided, replaces the file extension
+ when the decompressed file is uploaded
+
+ `Returns`:
+ String representation of decompressed GCS URI
+ """
+
+ compression_params = {
+ "zip": {
+ "file_extension": ".zip",
+ "compression_function": self.__zip_decompress_and_write_to_gcs,
+ "read": "r",
+ },
+ "gzip": {
+ "file_extension": ".gz",
+ "compression_function": self.__gzip_decompress_and_write_to_gcs,
+ },
+ }
+
+ file_extension = compression_params[compression_type]["file_extension"]
+ compression_function = compression_params[compression_type][
+ "compression_function"
+ ]
+
+ compressed_filepath = self.download_blob(
+ bucket_name=bucket_name, blob_name=blob_name
+ )
+
+ decompressed_filepath = compressed_filepath.replace(file_extension, "")
+ decompressed_blob_name = (
+ new_filename if new_filename else blob_name.replace(file_extension, "")
+ )
+ if new_file_extension:
+ decompressed_filepath += f".{new_file_extension}"
+ decompressed_blob_name += f".{new_file_extension}"
+
+ logger.debug("Decompressing file...")
+ compression_function(
+ compressed_filepath=compressed_filepath,
+ decompressed_filepath=decompressed_filepath,
+ decompressed_blob_name=decompressed_blob_name,
+ bucket_name=bucket_name,
+ new_file_extension=new_file_extension,
+ )
+
+ return self.format_uri(bucket=bucket_name, name=decompressed_blob_name)
+
+ def __gzip_decompress_and_write_to_gcs(self, **kwargs):
+ """
+ Handles `.gzip` decompression and streams blob contents
+ to a decompressed storage object
+ """
+
+ compressed_filepath = kwargs.pop("compressed_filepath")
+ decompressed_blob_name = kwargs.pop("decompressed_blob_name")
+ bucket_name = kwargs.pop("bucket_name")
+
+ with gzip.open(compressed_filepath, "rb") as f_in:
+ logger.debug(
+ f"Uploading uncompressed file to GCS: {decompressed_blob_name}"
+ )
+ bucket = self.get_bucket(bucket_name=bucket_name)
+ blob = storage.Blob(name=decompressed_blob_name, bucket=bucket)
+ blob.upload_from_file(file_obj=f_in, rewind=True, timeout=3600)
+
+ def __zip_decompress_and_write_to_gcs(self, **kwargs):
+ """
+ Handles `.zip` decompression and streams blob contents
+ to a decompressed storage object
+ """
+
+ compressed_filepath = kwargs.pop("compressed_filepath")
+ decompressed_blob_name = kwargs.pop("decompressed_blob_name")
+ decompressed_blob_in_archive = decompressed_blob_name.split("/")[-1]
+ bucket_name = kwargs.pop("bucket_name")
+
+ # Unzip the archive
+ with zipfile.ZipFile(compressed_filepath) as path_:
+ # Open the underlying file
+ with path_.open(decompressed_blob_in_archive) as f_in:
+ logger.debug(
+ f"Uploading uncompressed file to GCS: {decompressed_blob_name}"
+ )
+ bucket = self.get_bucket(bucket_name=bucket_name)
+ blob = storage.Blob(name=decompressed_blob_name, bucket=bucket)
+ blob.upload_from_file(file_obj=f_in, rewind=True, timeout=3600)
diff --git a/requirements.txt b/requirements.txt
index 3031efb451..869a574886 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -20,7 +20,8 @@ xmltodict==0.11.0
joblib==1.2.0
censusgeocode==0.4.3.post1
airtable-python-wrapper==0.13.0
-google-cloud-storage==2.2.0
+google-cloud-storage==2.10.0
+google-cloud-storage-transfer==1.9.1
google-cloud-bigquery==3.4.0
docutils<0.18,>=0.14
urllib3==1.26.18
@@ -44,3 +45,4 @@ bs4==0.0.1
# TODO Remove when we have a TMC-specific Docker image
selenium==3.141.0
jinja2==3.0.2
+us==3.1.1
\ No newline at end of file
diff --git a/setup.py b/setup.py
index c75e8cfe43..9d35042565 100644
--- a/setup.py
+++ b/setup.py
@@ -29,6 +29,7 @@ def main():
"google-api-python-client",
"google-cloud-bigquery",
"google-cloud-storage",
+ "google-cloud-storage-transfer",
"gspread",
"httplib2",
"oauth2client",
@@ -61,7 +62,7 @@ def main():
setup(
name="parsons",
- version="2.1.0",
+ version="3.0.0",
author="The Movement Cooperative",
author_email="info@movementcooperative.org",
url="https://github.com/move-coop/parsons",
diff --git a/test/test_airtable/test_airtable.py b/test/test_airtable/test_airtable.py
index ff326c43b9..0459283abb 100644
--- a/test/test_airtable/test_airtable.py
+++ b/test/test_airtable/test_airtable.py
@@ -11,7 +11,7 @@
)
-os.environ["AIRTABLE_API_KEY"] = "SOME_KEY"
+os.environ["AIRTABLE_PERSONAL_ACCESS_TOKEN"] = "SOME_TOKEN"
BASE_KEY = "BASEKEY"
TABLE_NAME = "TABLENAME"
diff --git a/test/test_databases/test_bigquery.py b/test/test_databases/test_bigquery.py
new file mode 100644
index 0000000000..e77baee8b9
--- /dev/null
+++ b/test/test_databases/test_bigquery.py
@@ -0,0 +1,539 @@
+import json
+import os
+import unittest.mock as mock
+
+from google.cloud import bigquery
+from google.cloud import exceptions
+
+from parsons import GoogleBigQuery as BigQuery, Table
+from parsons.google.google_cloud_storage import GoogleCloudStorage
+from test.test_google.test_utilities import FakeCredentialTest
+
+
+class FakeClient:
+ """A Fake Storage Client used for monkey-patching."""
+
+ def __init__(self, project=None):
+ self.project = project
+
+
+class FakeGoogleCloudStorage(GoogleCloudStorage):
+ """A Fake GoogleCloudStorage object used to test setting up credentials."""
+
+ @mock.patch("google.cloud.storage.Client", FakeClient)
+ def __init__(self):
+ super().__init__(None, None)
+
+ def upload_table(
+ self, table, bucket_name, blob_name, data_type="csv", default_acl=None
+ ):
+ pass
+
+ def delete_blob(self, bucket_name, blob_name):
+ pass
+
+
+class TestGoogleBigQuery(FakeCredentialTest):
+ def setUp(self):
+ super().setUp()
+ os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.cred_path
+ self.tmp_gcs_bucket = "tmp"
+
+ def tearDown(self) -> None:
+ super().tearDown()
+ del os.environ["GOOGLE_APPLICATION_CREDENTIALS"]
+
+ def test_query(self):
+ query_string = "select * from table"
+
+ # Pass the mock class into our GoogleBigQuery constructor
+ bq = self._build_mock_client_for_querying([{"one": 1, "two": 2}])
+
+ # Run a query against our parsons GoogleBigQuery class
+ result = bq.query(query_string)
+
+ # Check our return value
+ self.assertEqual(result.num_rows, 1)
+ self.assertEqual(result.columns, ["one", "two"])
+ self.assertEqual(result[0], {"one": 1, "two": 2})
+
+ def test_query__no_results(self):
+ query_string = "select * from table"
+
+ # Pass the mock class into our GoogleBigQuery constructor
+ bq = self._build_mock_client_for_querying([])
+
+ # Run a query against our parsons GoogleBigQuery class
+ result = bq.query(query_string)
+
+ # Check our return value
+ self.assertEqual(result, None)
+
+ @mock.patch("parsons.utilities.files.create_temp_file")
+ def test_query__no_return(self, create_temp_file_mock):
+ query_string = "select * from table"
+
+ # Pass the mock class into our GoogleBigQuery constructor
+ bq = self._build_mock_client_for_querying([{"one": 1, "two": 2}])
+ bq._fetch_query_results = mock.MagicMock()
+
+ # Run a query against our parsons GoogleBigQuery class
+ result = bq.query(query_string, return_values=False)
+
+ # Check our return value
+ self.assertEqual(result, None)
+
+ # Check that query results were not fetched
+ bq._fetch_query_results.assert_not_called()
+
+ @mock.patch("parsons.utilities.files.create_temp_file")
+ def test_query_with_transaction(self, create_temp_file_mock):
+ queries = ["select * from table", "select foo from bar"]
+ parameters = ["baz"]
+
+ # Pass the mock class into our GoogleBigQuery constructor
+ bq = self._build_mock_client_for_querying([{"one": 1, "two": 2}])
+ bq.query = mock.MagicMock()
+
+ # Run a query against our parsons GoogleBigQuery class
+ result = bq.query_with_transaction(queries=queries, parameters=parameters)
+ keyword_args = bq.query.call_args[1]
+
+ # Check our return value
+ self.assertEqual(result, None)
+
+ # Check that queries and transaction keywords are included in sql
+ self.assertTrue(
+ all(
+ [
+ text in keyword_args["sql"]
+ for text in queries + ["BEGIN TRANSACTION", "COMMIT"]
+ ]
+ )
+ )
+ self.assertEqual(keyword_args["parameters"], parameters)
+ self.assertFalse(keyword_args["return_values"])
+
+ def test_copy_gcs(self):
+ # setup dependencies / inputs
+ tmp_blob_uri = "gs://tmp/file"
+
+ # set up object under test
+ bq = self._build_mock_client_for_copying(table_exists=False)
+
+ # call the method being tested
+ bq.copy_from_gcs(
+ gcs_blob_uri=tmp_blob_uri,
+ table_name="dataset.table",
+ )
+
+ # check that the method did the right things
+ self.assertEqual(bq.client.load_table_from_uri.call_count, 1)
+ load_call_args = bq.client.load_table_from_uri.call_args
+ self.assertEqual(load_call_args[1]["source_uris"], tmp_blob_uri)
+
+ job_config = load_call_args[1]["job_config"]
+ self.assertEqual(
+ job_config.write_disposition, bigquery.WriteDisposition.WRITE_EMPTY
+ )
+
+ def test_copy_gcs__if_exists_truncate(self):
+ # setup dependencies / inputs
+ tmp_blob_uri = "gs://tmp/file"
+
+ # set up object under test
+ bq = self._build_mock_client_for_copying(table_exists=False)
+
+ # call the method being tested
+ bq.copy_from_gcs(
+ gcs_blob_uri=tmp_blob_uri,
+ table_name="dataset.table",
+ if_exists="truncate",
+ )
+
+ # check that the method did the right things
+ self.assertEqual(bq.client.load_table_from_uri.call_count, 1)
+ load_call_args = bq.client.load_table_from_uri.call_args
+ self.assertEqual(load_call_args[1]["source_uris"], tmp_blob_uri)
+
+ job_config = load_call_args[1]["job_config"]
+ self.assertEqual(
+ job_config.write_disposition, bigquery.WriteDisposition.WRITE_TRUNCATE
+ )
+
+ def test_copy_gcs__if_exists_append(self):
+ # setup dependencies / inputs
+ tmp_blob_uri = "gs://tmp/file"
+
+ # set up object under test
+ bq = self._build_mock_client_for_copying(table_exists=False)
+
+ # call the method being tested
+ bq.copy_from_gcs(
+ gcs_blob_uri=tmp_blob_uri,
+ table_name="dataset.table",
+ if_exists="append",
+ )
+
+ # check that the method did the right things
+ self.assertEqual(bq.client.load_table_from_uri.call_count, 1)
+ load_call_args = bq.client.load_table_from_uri.call_args
+ self.assertEqual(load_call_args[1]["source_uris"], tmp_blob_uri)
+
+ job_config = load_call_args[1]["job_config"]
+ self.assertEqual(
+ job_config.write_disposition, bigquery.WriteDisposition.WRITE_APPEND
+ )
+
+ def test_copy_gcs__if_exists_fail(self):
+ # setup dependencies / inputs
+ tmp_blob_uri = "gs://tmp/file"
+
+ # set up object under test
+ bq = self._build_mock_client_for_copying(table_exists=False)
+
+ # call the method being tested
+ bq.copy_from_gcs(
+ gcs_blob_uri=tmp_blob_uri,
+ table_name="dataset.table",
+ if_exists="truncate",
+ )
+ bq.table_exists = mock.MagicMock()
+ bq.table_exists.return_value = True
+
+ # call the method being tested
+ with self.assertRaises(Exception):
+ bq.copy_from_gcs(
+ self.default_table,
+ "dataset.table",
+ tmp_gcs_bucket=self.tmp_gcs_bucket,
+ gcs_client=self._build_mock_cloud_storage_client(),
+ )
+
+ def test_copy_gcs__if_exists_drop(self):
+ # setup dependencies / inputs
+ tmp_blob_uri = "gs://tmp/file"
+
+ # set up object under test
+ bq = self._build_mock_client_for_copying(table_exists=False)
+ bq.table_exists = mock.MagicMock()
+ bq.table_exists.return_value = True
+
+ # call the method being tested
+ bq.copy_from_gcs(
+ gcs_blob_uri=tmp_blob_uri,
+ table_name="dataset.table",
+ if_exists="drop",
+ )
+
+ # check that we tried to delete the table
+ self.assertEqual(bq.client.delete_table.call_count, 1)
+
+ def test_copy_gcs__bad_if_exists(self):
+ # setup dependencies / inputs
+ tmp_blob_uri = "gs://tmp/file"
+
+ # set up object under test
+ bq = self._build_mock_client_for_copying(table_exists=False)
+ bq.table_exists = mock.MagicMock()
+ bq.table_exists.return_value = True
+
+ # call the method being tested
+ with self.assertRaises(ValueError):
+ bq.copy_from_gcs(
+ gcs_blob_uri=tmp_blob_uri,
+ table_name="dataset.table",
+ if_exists="foobar",
+ )
+
+ @mock.patch("google.cloud.storage.Client")
+ @mock.patch.object(
+ GoogleCloudStorage, "split_uri", return_value=("tmp", "file.gzip")
+ )
+ @mock.patch.object(
+ GoogleCloudStorage, "unzip_blob", return_value="gs://tmp/file.csv"
+ )
+ def test_copy_large_compressed_file_from_gcs(
+ self, unzip_mock: mock.MagicMock, split_mock: mock.MagicMock, *_
+ ):
+ # setup dependencies / inputs
+ tmp_blob_uri = "gs://tmp/file.gzip"
+
+ # set up object under test
+ bq = self._build_mock_client_for_copying(table_exists=False)
+
+ # call the method being tested
+ bq.copy_large_compressed_file_from_gcs(
+ gcs_blob_uri=tmp_blob_uri,
+ table_name="dataset.table",
+ )
+
+ # check that the method did the right things
+ split_mock.assert_has_calls(
+ [
+ mock.call(gcs_uri="gs://tmp/file.gzip"),
+ mock.call(gcs_uri="gs://tmp/file.csv"),
+ ]
+ )
+ unzip_mock.assert_called_once_with(
+ bucket_name="tmp",
+ blob_name="file.gzip",
+ new_file_extension="csv",
+ compression_type="gzip",
+ )
+ self.assertEqual(bq.client.load_table_from_uri.call_count, 1)
+ load_call_args = bq.client.load_table_from_uri.call_args
+ self.assertEqual(load_call_args[1]["source_uris"], "gs://tmp/file.csv")
+
+ job_config = load_call_args[1]["job_config"]
+ self.assertEqual(
+ job_config.write_disposition, bigquery.WriteDisposition.WRITE_EMPTY
+ )
+
+ def test_copy_s3(self):
+ # setup dependencies / inputs
+ table_name = "table_name"
+ bucket = "aws_bucket"
+ key = "file.gzip"
+ aws_access_key_id = "AAAAAA"
+ aws_secret_access_key = "BBBBB"
+ tmp_gcs_bucket = "tmp"
+
+ # set up object under test
+ bq = self._build_mock_client_for_copying(table_exists=False)
+ gcs_client = self._build_mock_cloud_storage_client()
+ bq.copy_from_gcs = mock.MagicMock()
+
+ # call the method being tested
+ bq.copy_s3(
+ table_name=table_name,
+ bucket=bucket,
+ key=key,
+ gcs_client=gcs_client,
+ aws_access_key_id=aws_access_key_id,
+ aws_secret_access_key=aws_secret_access_key,
+ tmp_gcs_bucket=tmp_gcs_bucket,
+ )
+
+ # check that the method did the right things
+ gcs_client.copy_s3_to_gcs.assert_called_once_with(
+ aws_source_bucket=bucket,
+ aws_access_key_id=aws_access_key_id,
+ aws_secret_access_key=aws_secret_access_key,
+ gcs_sink_bucket=tmp_gcs_bucket,
+ aws_s3_key=key,
+ )
+ bq.copy_from_gcs.assert_called_once()
+ gcs_client.delete_blob.assert_called_once()
+
+ def test_copy(self):
+ # setup dependencies / inputs
+ tmp_blob_uri = "gs://tmp/file"
+
+ # set up object under test
+ gcs_client = self._build_mock_cloud_storage_client(tmp_blob_uri)
+ tbl = self.default_table
+ bq = self._build_mock_client_for_copying(table_exists=False)
+ bq.copy_from_gcs = mock.MagicMock()
+ table_name = ("dataset.table",)
+
+ # call the method being tested
+ bq.copy(
+ tbl,
+ table_name,
+ tmp_gcs_bucket=self.tmp_gcs_bucket,
+ gcs_client=gcs_client,
+ )
+
+ # check that the method did the right things
+ self.assertEqual(gcs_client.upload_table.call_count, 1)
+ upload_call_args = gcs_client.upload_table.call_args
+ self.assertEqual(upload_call_args[0][0], tbl)
+ self.assertEqual(upload_call_args[0][1], self.tmp_gcs_bucket)
+ tmp_blob_name = upload_call_args[0][2]
+
+ self.assertEqual(bq.copy_from_gcs.call_count, 1)
+ load_call_args = bq.copy_from_gcs.call_args
+ self.assertEqual(load_call_args[1]["gcs_blob_uri"], tmp_blob_uri)
+ self.assertEqual(load_call_args[1]["table_name"], table_name)
+
+ # make sure we cleaned up the temp file
+ self.assertEqual(gcs_client.delete_blob.call_count, 1)
+ delete_call_args = gcs_client.delete_blob.call_args
+ self.assertEqual(delete_call_args[0][0], self.tmp_gcs_bucket)
+ self.assertEqual(delete_call_args[0][1], tmp_blob_name)
+
+ def test_copy__credentials_are_correctly_set(self):
+ tbl = self.default_table
+ bq = self._build_mock_client_for_copying(table_exists=False)
+
+ # Pass in our fake GCS Client.
+ bq.copy(
+ tbl,
+ "dataset.table",
+ tmp_gcs_bucket=self.tmp_gcs_bucket,
+ gcs_client=FakeGoogleCloudStorage(),
+ )
+
+ actual = os.environ["GOOGLE_APPLICATION_CREDENTIALS"]
+
+ with open(actual, "r") as factual:
+ with open(self.cred_path, "r") as fexpected:
+ actual_str = factual.read()
+ self.assertEqual(actual_str, fexpected.read())
+ self.assertEqual(self.cred_contents, json.loads(actual_str))
+
+ def test_copy__if_exists_passed_through(self):
+ # setup dependencies / inputs
+ tmp_blob_uri = "gs://tmp/file"
+
+ # set up object under test
+ gcs_client = self._build_mock_cloud_storage_client(tmp_blob_uri)
+ tbl = self.default_table
+ bq = self._build_mock_client_for_copying(table_exists=False)
+ bq.copy_from_gcs = mock.MagicMock()
+ table_name = "dataset.table"
+ if_exists = "append"
+
+ # call the method being tested
+ bq.copy(
+ tbl,
+ table_name,
+ tmp_gcs_bucket=self.tmp_gcs_bucket,
+ gcs_client=gcs_client,
+ if_exists=if_exists,
+ )
+
+ self.assertEqual(bq.copy_from_gcs.call_count, 1)
+ load_call_args = bq.copy_from_gcs.call_args
+ self.assertEqual(load_call_args[1]["if_exists"], if_exists)
+
+ @mock.patch.object(BigQuery, "table_exists", return_value=False)
+ @mock.patch.object(BigQuery, "query", return_value=None)
+ def test_duplicate_table(self, query_mock, table_exists_mock):
+ source_table = "vendor_table"
+ destination_table = "raw_table"
+ expected_query = f"""
+ CREATE TABLE
+ {destination_table}
+ CLONE {source_table}
+ """
+ bq = self._build_mock_client_for_querying(results=None)
+
+ bq.duplicate_table(
+ source_table=source_table,
+ destination_table=destination_table,
+ )
+
+ query_mock.assert_called_once()
+ actual_query = query_mock.call_args[1]["sql"]
+ self.assertEqual(actual_query, expected_query)
+
+ @mock.patch.object(BigQuery, "table_exists", return_value=False)
+ @mock.patch.object(BigQuery, "delete_table", return_value=None)
+ @mock.patch.object(BigQuery, "query", return_value=None)
+ def test_duplicate_table_with_drop(
+ self, query_mock: mock.MagicMock, delete_mock: mock.MagicMock, table_exists_mock
+ ):
+ source_table = "vendor_table"
+ destination_table = "raw_table"
+ bq = self._build_mock_client_for_querying(results=None)
+
+ bq.duplicate_table(
+ source_table=source_table,
+ destination_table=destination_table,
+ drop_source_table=True,
+ )
+
+ delete_mock.assert_called_once_with(table_name=source_table)
+
+ @mock.patch.object(BigQuery, "table_exists", return_value=True)
+ @mock.patch.object(BigQuery, "query_with_transaction", return_value=None)
+ @mock.patch.object(BigQuery, "copy", return_value=None)
+ def test_upsert(self, copy_mock, query_mock, *_):
+ upsert_tbl = Table([["id", "name"], [1, "Jane"]])
+ target_table = "my_dataset.my_target_table"
+ primary_key = "id"
+ bq = self._build_mock_client_for_querying(results=[])
+
+ bq.upsert(
+ table_obj=upsert_tbl,
+ target_table=target_table,
+ primary_key=primary_key,
+ distinct_check=False,
+ )
+
+ # stages the table -> calls copy
+ copy_mock.assert_called_once()
+ self.assertEqual(copy_mock.call_args[1]["tbl"], upsert_tbl)
+ self.assertEqual(copy_mock.call_args[1]["template_table"], target_table)
+
+ # runs a delete insert within a transaction
+ query_mock.assert_called_once()
+ actual_queries = query_mock.call_args[1]["queries"]
+ self.assertIn("DELETE", actual_queries[0])
+ self.assertIn("INSERT", actual_queries[1])
+
+ @mock.patch.object(BigQuery, "query")
+ def test_get_row_count(self, query_mock):
+ # Arrange
+ schema = "foo"
+ table_name = "bar"
+ expected_num_rows = 2
+
+ query_mock.return_value = Table([{"row_count": expected_num_rows}])
+ expected_query = f"SELECT COUNT(*) AS row_count FROM `{schema}.{table_name}`"
+ bq = self._build_mock_client_for_querying(results=Table([{"row_count": 2}]))
+
+ # Act
+ row_count = bq.get_row_count(schema=schema, table_name=table_name)
+
+ # Assert
+ query_mock.assert_called_once()
+ actual_query = query_mock.call_args[1]["sql"]
+ self.assertEqual(row_count, expected_num_rows)
+ self.assertEqual(actual_query, expected_query)
+
+ def _build_mock_client_for_querying(self, results):
+ # Create a mock that will play the role of the cursor
+ cursor = mock.MagicMock()
+ cursor.execute.return_value = None
+ cursor.fetchmany.side_effect = [results, []]
+
+ # Create a mock that will play the role of the connection
+ connection = mock.MagicMock()
+ connection.cursor.return_value = cursor
+
+ # Create a mock that will play the role of the Google BigQuery dbapi module
+ dbapi = mock.MagicMock()
+ dbapi.connect.return_value = connection
+
+ # Create a mock that will play the role of our GoogleBigQuery client
+ client = mock.MagicMock()
+
+ bq = BigQuery()
+ bq._client = client
+ bq._dbapi = dbapi
+ return bq
+
+ def _build_mock_client_for_copying(self, table_exists=True):
+ bq_client = mock.MagicMock()
+ if not table_exists:
+ bq_client.get_table.side_effect = exceptions.NotFound("not found")
+ bq = BigQuery()
+ bq._client = bq_client
+ return bq
+
+ def _build_mock_cloud_storage_client(self, tmp_blob_uri=""):
+ gcs_client = mock.MagicMock()
+ gcs_client.upload_table.return_value = tmp_blob_uri
+ return gcs_client
+
+ @property
+ def default_table(self):
+ return Table(
+ [
+ {"num": 1, "ltr": "a"},
+ {"num": 2, "ltr": "b"},
+ ]
+ )
diff --git a/test/test_databases/test_database.py b/test/test_databases/test_database.py
index b49491e687..c56116e3d1 100644
--- a/test/test_databases/test_database.py
+++ b/test/test_databases/test_database.py
@@ -3,7 +3,6 @@
MEDIUMINT,
INT,
BIGINT,
- FLOAT,
BOOL,
VARCHAR,
)
@@ -19,13 +18,6 @@ def dcs():
return db
-@pytest.fixture
-def dcs_bool():
- db = DatabaseCreateStatement()
- db.DO_PARSE_BOOLS = True
- return db
-
-
@pytest.mark.parametrize(
("int1", "int2", "higher"),
(
@@ -95,34 +87,6 @@ def test_is_valid_sql_num(dcs, val, is_valid):
assert dcs.is_valid_sql_num(val) == is_valid
-@pytest.mark.parametrize(
- ("val", "cmp_type", "detected_type"),
- (
- (1, None, SMALLINT),
- (1, "", SMALLINT),
- (1, MEDIUMINT, MEDIUMINT),
- (32769, None, MEDIUMINT),
- (32769, BIGINT, BIGINT),
- (2147483648, None, BIGINT),
- (2147483648, FLOAT, FLOAT),
- (5.001, None, FLOAT),
- (5.001, "", FLOAT),
- ("FALSE", VARCHAR, VARCHAR),
- ("word", "", VARCHAR),
- ("word", INT, VARCHAR),
- ("1_2", BIGINT, VARCHAR),
- ("01", FLOAT, VARCHAR),
- ("00001", None, VARCHAR),
- ("word", None, VARCHAR),
- ("1_2", None, VARCHAR),
- ("01", None, VARCHAR),
- ("{}", None, VARCHAR),
- ),
-)
-def test_detect_data_type(dcs, val, cmp_type, detected_type):
- assert dcs.detect_data_type(val, cmp_type) == detected_type
-
-
@pytest.mark.parametrize(
("val", "cmp_type", "detected_type"),
(
@@ -131,16 +95,16 @@ def test_detect_data_type(dcs, val, cmp_type, detected_type):
(1, MEDIUMINT, MEDIUMINT),
(2, BOOL, SMALLINT),
(True, None, BOOL),
- (0, None, BOOL),
- (1, None, BOOL),
- (1, BOOL, BOOL),
- ("F", None, BOOL),
- ("FALSE", None, BOOL),
- ("Yes", None, BOOL),
+ (0, None, SMALLINT),
+ (1, None, SMALLINT),
+ (1, BOOL, SMALLINT),
+ ("F", None, VARCHAR),
+ ("FALSE", None, VARCHAR),
+ ("Yes", None, VARCHAR),
),
)
-def test_detect_data_type_bools(dcs_bool, val, cmp_type, detected_type):
- assert dcs_bool.detect_data_type(val, cmp_type) == detected_type
+def test_detect_data_type_bools(dcs, val, cmp_type, detected_type):
+ assert dcs.detect_data_type(val, cmp_type) == detected_type
@pytest.mark.parametrize(
diff --git a/test/test_databases/test_discover_database.py b/test/test_databases/test_discover_database.py
index b946629e10..4f8fbf647e 100644
--- a/test/test_databases/test_discover_database.py
+++ b/test/test_databases/test_discover_database.py
@@ -3,12 +3,12 @@
from parsons.databases.redshift import Redshift
from parsons.databases.mysql import MySQL
from parsons.databases.postgres import Postgres
-from parsons.google.google_bigquery import GoogleBigQuery
+from parsons import GoogleBigQuery as BigQuery
from parsons.databases.discover_database import discover_database
class TestDiscoverDatabase(unittest.TestCase):
- @patch.object(GoogleBigQuery, "__init__", return_value=None)
+ @patch.object(BigQuery, "__init__", return_value=None)
@patch.object(Postgres, "__init__", return_value=None)
@patch.object(MySQL, "__init__", return_value=None)
@patch.object(Redshift, "__init__", return_value=None)
@@ -18,7 +18,7 @@ def test_no_database_detected(self, mock_getenv, *_):
with self.assertRaises(EnvironmentError):
discover_database()
- @patch.object(GoogleBigQuery, "__init__", return_value=None)
+ @patch.object(BigQuery, "__init__", return_value=None)
@patch.object(Postgres, "__init__", return_value=None)
@patch.object(MySQL, "__init__", return_value=None)
@patch.object(Redshift, "__init__", return_value=None)
@@ -29,7 +29,7 @@ def test_single_database_detected(self, mock_getenv, *_):
)
self.assertIsInstance(discover_database(), Redshift)
- @patch.object(GoogleBigQuery, "__init__", return_value=None)
+ @patch.object(BigQuery, "__init__", return_value=None)
@patch.object(Postgres, "__init__", return_value=None)
@patch.object(MySQL, "__init__", return_value=None)
@patch.object(Redshift, "__init__", return_value=None)
@@ -40,7 +40,7 @@ def test_single_database_detected_with_other_default(self, mock_getenv, *_):
)
self.assertIsInstance(discover_database(default_connector=Postgres), Redshift)
- @patch.object(GoogleBigQuery, "__init__", return_value=None)
+ @patch.object(BigQuery, "__init__", return_value=None)
@patch.object(Postgres, "__init__", return_value=None)
@patch.object(MySQL, "__init__", return_value=None)
@patch.object(Redshift, "__init__", return_value=None)
@@ -53,7 +53,7 @@ def test_single_database_detected_with_other_default_list(self, mock_getenv, *_)
discover_database(default_connector=[Postgres, MySQL]), Redshift
)
- @patch.object(GoogleBigQuery, "__init__", return_value=None)
+ @patch.object(BigQuery, "__init__", return_value=None)
@patch.object(Postgres, "__init__", return_value=None)
@patch.object(MySQL, "__init__", return_value=None)
@patch.object(Redshift, "__init__", return_value=None)
@@ -63,7 +63,7 @@ def test_multiple_databases_no_default(self, mock_getenv, *_):
with self.assertRaises(EnvironmentError):
discover_database()
- @patch.object(GoogleBigQuery, "__init__", return_value=None)
+ @patch.object(BigQuery, "__init__", return_value=None)
@patch.object(Postgres, "__init__", return_value=None)
@patch.object(MySQL, "__init__", return_value=None)
@patch.object(Redshift, "__init__", return_value=None)
@@ -72,7 +72,7 @@ def test_multiple_databases_with_default(self, mock_getenv, *_):
mock_getenv.return_value = "password"
self.assertIsInstance(discover_database(default_connector=Redshift), Redshift)
- @patch.object(GoogleBigQuery, "__init__", return_value=None)
+ @patch.object(BigQuery, "__init__", return_value=None)
@patch.object(Postgres, "__init__", return_value=None)
@patch.object(MySQL, "__init__", return_value=None)
@patch.object(Redshift, "__init__", return_value=None)
@@ -83,7 +83,7 @@ def test_multiple_databases_with_default_list(self, mock_getenv, *_):
discover_database(default_connector=[MySQL, Redshift]), MySQL
)
- @patch.object(GoogleBigQuery, "__init__", return_value=None)
+ @patch.object(BigQuery, "__init__", return_value=None)
@patch.object(Postgres, "__init__", return_value=None)
@patch.object(MySQL, "__init__", return_value=None)
@patch.object(Redshift, "__init__", return_value=None)
@@ -97,7 +97,7 @@ def test_multiple_databases_invalid_default(self, mock_getenv, *_):
with self.assertRaises(EnvironmentError):
discover_database(default_connector=Postgres)
- @patch.object(GoogleBigQuery, "__init__", return_value=None)
+ @patch.object(BigQuery, "__init__", return_value=None)
@patch.object(Postgres, "__init__", return_value=None)
@patch.object(MySQL, "__init__", return_value=None)
@patch.object(Redshift, "__init__", return_value=None)
@@ -109,7 +109,7 @@ def test_multiple_databases_invalid_default_list(self, mock_getenv, *_):
else None
)
with self.assertRaises(EnvironmentError):
- discover_database(default_connector=[Postgres, GoogleBigQuery])
+ discover_database(default_connector=[Postgres, BigQuery])
if __name__ == "__main__":
diff --git a/test/test_databases/test_mysql.py b/test/test_databases/test_mysql.py
index 01c156dfe4..323b4ffbf6 100644
--- a/test/test_databases/test_mysql.py
+++ b/test/test_databases/test_mysql.py
@@ -156,11 +156,9 @@ def setUp(self):
def test_data_type(self):
# Test bool
- self.mysql.DO_PARSE_BOOLS = True
- self.assertEqual(self.mysql.data_type(1, ""), "bool")
self.assertEqual(self.mysql.data_type(False, ""), "bool")
+ self.assertEqual(self.mysql.data_type(True, ""), "bool")
- self.mysql.DO_PARSE_BOOLS = False
# Test smallint
self.assertEqual(self.mysql.data_type(1, ""), "smallint")
self.assertEqual(self.mysql.data_type(2, ""), "smallint")
@@ -170,14 +168,14 @@ def test_data_type(self):
self.assertEqual(self.mysql.data_type(2147483648, ""), "bigint")
# Test varchar that looks like an int
self.assertEqual(self.mysql.data_type("00001", ""), "varchar")
- # Test varchar that looks like a bool
- self.assertEqual(self.mysql.data_type(False, ""), "varchar")
# Test a float as a decimal
self.assertEqual(self.mysql.data_type(5.001, ""), "float")
# Test varchar
self.assertEqual(self.mysql.data_type("word", ""), "varchar")
- # Test int with underscore
+ # Test int with underscore as string
self.assertEqual(self.mysql.data_type("1_2", ""), "varchar")
+ # Test int with underscore
+ self.assertEqual(self.mysql.data_type(1_2, ""), "smallint")
# Test int with leading zero
self.assertEqual(self.mysql.data_type("01", ""), "varchar")
diff --git a/test/test_databases/test_postgres.py b/test/test_databases/test_postgres.py
index 08956c672d..5279c94ccf 100644
--- a/test/test_databases/test_postgres.py
+++ b/test/test_databases/test_postgres.py
@@ -30,11 +30,8 @@ def setUp(self):
["g", "", 9, "NA", 1.4, 1, 2],
]
)
-
self.mapping = self.pg.generate_data_types(self.tbl)
self.mapping2 = self.pg.generate_data_types(self.tbl2)
- self.pg.DO_PARSE_BOOLS = True
- self.mapping3 = self.pg.generate_data_types(self.tbl2)
def test_connection(self):
@@ -56,7 +53,6 @@ def test_connection(self):
self.assertEqual(pg_env.port, 5432)
def test_data_type(self):
- self.pg.DO_PARSE_BOOLS = False
# Test smallint
self.assertEqual(self.pg.data_type(1, ""), "smallint")
self.assertEqual(self.pg.data_type(2, ""), "smallint")
@@ -66,20 +62,18 @@ def test_data_type(self):
self.assertEqual(self.pg.data_type(2147483648, ""), "bigint")
# Test varchar that looks like an int
self.assertEqual(self.pg.data_type("00001", ""), "varchar")
- # Test varchar that looks like a bool
- self.assertEqual(self.pg.data_type(True, ""), "varchar")
# Test a float as a decimal
self.assertEqual(self.pg.data_type(5.001, ""), "decimal")
# Test varchar
self.assertEqual(self.pg.data_type("word", ""), "varchar")
- # Test int with underscore
+ # Test int with underscore as string
self.assertEqual(self.pg.data_type("1_2", ""), "varchar")
- # Test int with leading zero
+ # Test int with leading zero as string
self.assertEqual(self.pg.data_type("01", ""), "varchar")
+ # Test int with underscore
+ self.assertEqual(self.pg.data_type(1_2, ""), "smallint")
# Test bool
- self.pg.DO_PARSE_BOOLS = True
- self.assertEqual(self.pg.data_type(1, ""), "bool")
self.assertEqual(self.pg.data_type(True, ""), "bool")
def test_generate_data_types(self):
@@ -100,10 +94,6 @@ def test_generate_data_types(self):
"varchar",
],
)
- self.assertEqual(
- self.mapping3["type_list"],
- ["varchar", "varchar", "decimal", "varchar", "decimal", "bool", "varchar"],
- )
# Test correct lengths
self.assertEqual(self.mapping["longest"], [1, 5])
diff --git a/test/test_redshift.py b/test/test_databases/test_redshift.py
similarity index 98%
rename from test/test_redshift.py
rename to test/test_databases/test_redshift.py
index fd664fede5..b811e98857 100644
--- a/test/test_redshift.py
+++ b/test/test_databases/test_redshift.py
@@ -35,10 +35,7 @@ def setUp(self):
)
self.mapping = self.rs.generate_data_types(self.tbl)
- self.rs.DO_PARSE_BOOLS = True
self.mapping2 = self.rs.generate_data_types(self.tbl2)
- self.rs.DO_PARSE_BOOLS = False
- self.mapping3 = self.rs.generate_data_types(self.tbl2)
def test_split_full_table_name(self):
schema, table = Redshift.split_full_table_name("some_schema.some_table")
@@ -60,14 +57,9 @@ def test_combine_schema_and_table_name(self):
self.assertEqual(full_table_name, "some_schema.some_table")
def test_data_type(self):
-
# Test bool
- self.rs.DO_PARSE_BOOLS = True
- self.assertEqual(self.rs.data_type(1, ""), "bool")
self.assertEqual(self.rs.data_type(True, ""), "bool")
- self.rs.DO_PARSE_BOOLS = False
self.assertEqual(self.rs.data_type(1, ""), "int")
- self.assertEqual(self.rs.data_type(True, ""), "varchar")
# Test smallint
# Currently smallints are coded as ints
self.assertEqual(self.rs.data_type(2, ""), "int")
@@ -81,13 +73,14 @@ def test_data_type(self):
self.assertEqual(self.rs.data_type(5.001, ""), "float")
# Test varchar
self.assertEqual(self.rs.data_type("word", ""), "varchar")
- # Test int with underscore
+ # Test int with underscore as varchar
self.assertEqual(self.rs.data_type("1_2", ""), "varchar")
+ # Test int with underscore
+ self.assertEqual(self.rs.data_type(1_2, ""), "int")
# Test int with leading zero
self.assertEqual(self.rs.data_type("01", ""), "varchar")
def test_generate_data_types(self):
-
# Test correct header labels
self.assertEqual(self.mapping["headers"], ["ID", "Name"])
# Test correct data types
@@ -95,13 +88,9 @@ def test_generate_data_types(self):
self.assertEqual(
self.mapping2["type_list"],
- ["varchar", "varchar", "float", "varchar", "float", "bool", "varchar"],
- )
-
- self.assertEqual(
- self.mapping3["type_list"],
["varchar", "varchar", "float", "varchar", "float", "int", "varchar"],
)
+
# Test correct lengths
self.assertEqual(self.mapping["longest"], [1, 5])
diff --git a/test/test_google/test_google_bigquery.py b/test/test_google/test_google_bigquery.py
deleted file mode 100644
index 4fa3f0ac20..0000000000
--- a/test/test_google/test_google_bigquery.py
+++ /dev/null
@@ -1,270 +0,0 @@
-import json
-import os
-import unittest.mock as mock
-
-from google.cloud import bigquery
-from google.cloud import exceptions
-
-from parsons import GoogleBigQuery, Table
-from parsons.google.google_cloud_storage import GoogleCloudStorage
-from test.test_google.test_utilities import FakeCredentialTest
-
-
-class FakeClient:
- """A Fake Storage Client used for monkey-patching."""
-
- def __init__(self, project=None):
- self.project = project
-
-
-class FakeGoogleCloudStorage(GoogleCloudStorage):
- """A Fake GoogleCloudStorage object used to test setting up credentials."""
-
- @mock.patch("google.cloud.storage.Client", FakeClient)
- def __init__(self):
- super().__init__(None, None)
-
- def upload_table(
- self, table, bucket_name, blob_name, data_type="csv", default_acl=None
- ):
- pass
-
- def delete_blob(self, bucket_name, blob_name):
- pass
-
-
-class TestGoogleBigQuery(FakeCredentialTest):
- def setUp(self):
- super().setUp()
- os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.cred_path
- self.tmp_gcs_bucket = "tmp"
-
- def tearDown(self) -> None:
- super().tearDown()
- del os.environ["GOOGLE_APPLICATION_CREDENTIALS"]
-
- def test_query(self):
- query_string = "select * from table"
-
- # Pass the mock class into our GoogleBigQuery constructor
- bq = self._build_mock_client_for_querying([{"one": 1, "two": 2}])
-
- # Run a query against our parsons GoogleBigQuery class
- result = bq.query(query_string)
-
- # Check our return value
- self.assertEqual(result.num_rows, 1)
- self.assertEqual(result.columns, ["one", "two"])
- self.assertEqual(result[0], {"one": 1, "two": 2})
-
- def test_query__no_results(self):
- query_string = "select * from table"
-
- # Pass the mock class into our GoogleBigQuery constructor
- bq = self._build_mock_client_for_querying([])
-
- # Run a query against our parsons GoogleBigQuery class
- result = bq.query(query_string)
-
- # Check our return value
- self.assertEqual(result, None)
-
- def test_copy(self):
- # setup dependencies / inputs
- tmp_blob_uri = "gs://tmp/file"
-
- # set up object under test
- gcs_client = self._build_mock_cloud_storage_client(tmp_blob_uri)
- tbl = self.default_table
- bq = self._build_mock_client_for_copying(table_exists=False)
-
- # call the method being tested
- bq.copy(
- tbl,
- "dataset.table",
- tmp_gcs_bucket=self.tmp_gcs_bucket,
- gcs_client=gcs_client,
- )
-
- # check that the method did the right things
- self.assertEqual(gcs_client.upload_table.call_count, 1)
- upload_call_args = gcs_client.upload_table.call_args
- self.assertEqual(upload_call_args[0][0], tbl)
- self.assertEqual(upload_call_args[0][1], self.tmp_gcs_bucket)
- tmp_blob_name = upload_call_args[0][2]
-
- self.assertEqual(bq.client.load_table_from_uri.call_count, 1)
- load_call_args = bq.client.load_table_from_uri.call_args
- self.assertEqual(load_call_args[0][0], tmp_blob_uri)
-
- job_config = load_call_args[1]["job_config"]
- self.assertEqual(
- job_config.write_disposition, bigquery.WriteDisposition.WRITE_EMPTY
- )
-
- # make sure we cleaned up the temp file
- self.assertEqual(gcs_client.delete_blob.call_count, 1)
- delete_call_args = gcs_client.delete_blob.call_args
- self.assertEqual(delete_call_args[0][0], self.tmp_gcs_bucket)
- self.assertEqual(delete_call_args[0][1], tmp_blob_name)
-
- def test_copy__if_exists_truncate(self):
- gcs_client = self._build_mock_cloud_storage_client()
- # set up object under test
- bq = self._build_mock_client_for_copying()
-
- # call the method being tested
- bq.copy(
- self.default_table,
- "dataset.table",
- tmp_gcs_bucket=self.tmp_gcs_bucket,
- if_exists="truncate",
- gcs_client=gcs_client,
- )
-
- # check that the method did the right things
- call_args = bq.client.load_table_from_uri.call_args
- job_config = call_args[1]["job_config"]
- self.assertEqual(
- job_config.write_disposition, bigquery.WriteDisposition.WRITE_TRUNCATE
- )
-
- # make sure we cleaned up the temp file
- self.assertEqual(gcs_client.delete_blob.call_count, 1)
-
- def test_copy__if_exists_append(self):
- gcs_client = self._build_mock_cloud_storage_client()
- # set up object under test
- bq = self._build_mock_client_for_copying()
-
- # call the method being tested
- bq.copy(
- self.default_table,
- "dataset.table",
- tmp_gcs_bucket=self.tmp_gcs_bucket,
- if_exists="append",
- gcs_client=gcs_client,
- )
-
- # check that the method did the right things
- call_args = bq.client.load_table_from_uri.call_args
- job_config = call_args[1]["job_config"]
- self.assertEqual(
- job_config.write_disposition, bigquery.WriteDisposition.WRITE_APPEND
- )
-
- # make sure we cleaned up the temp file
- self.assertEqual(gcs_client.delete_blob.call_count, 1)
-
- def test_copy__if_exists_fail(self):
- # set up object under test
- bq = self._build_mock_client_for_copying()
-
- # call the method being tested
- with self.assertRaises(Exception):
- bq.copy(
- self.default_table,
- "dataset.table",
- tmp_gcs_bucket=self.tmp_gcs_bucket,
- gcs_client=self._build_mock_cloud_storage_client(),
- )
-
- def test_copy__if_exists_drop(self):
- gcs_client = self._build_mock_cloud_storage_client()
- # set up object under test
- bq = self._build_mock_client_for_copying()
-
- # call the method being tested
- bq.copy(
- self.default_table,
- "dataset.table",
- tmp_gcs_bucket=self.tmp_gcs_bucket,
- if_exists="drop",
- gcs_client=gcs_client,
- )
-
- # check that we tried to delete the table
- self.assertEqual(bq.client.delete_table.call_count, 1)
-
- # make sure we cleaned up the temp file
- self.assertEqual(gcs_client.delete_blob.call_count, 1)
-
- def test_copy__bad_if_exists(self):
- gcs_client = self._build_mock_cloud_storage_client()
-
- # set up object under test
- bq = self._build_mock_client_for_copying()
-
- # call the method being tested
- with self.assertRaises(ValueError):
- bq.copy(
- self.default_table,
- "dataset.table",
- tmp_gcs_bucket=self.tmp_gcs_bucket,
- if_exists="foo",
- gcs_client=gcs_client,
- )
-
- def test_copy__credentials_are_correctly_set(self):
- tbl = self.default_table
- bq = self._build_mock_client_for_copying(table_exists=False)
-
- # Pass in our fake GCS Client.
- bq.copy(
- tbl,
- "dataset.table",
- tmp_gcs_bucket=self.tmp_gcs_bucket,
- gcs_client=FakeGoogleCloudStorage(),
- )
-
- actual = os.environ["GOOGLE_APPLICATION_CREDENTIALS"]
-
- with open(actual, "r") as factual:
- with open(self.cred_path, "r") as fexpected:
- actual_str = factual.read()
- self.assertEqual(actual_str, fexpected.read())
- self.assertEqual(self.cred_contents, json.loads(actual_str))
-
- def _build_mock_client_for_querying(self, results):
- # Create a mock that will play the role of the cursor
- cursor = mock.MagicMock()
- cursor.execute.return_value = None
- cursor.fetchmany.side_effect = [results, []]
-
- # Create a mock that will play the role of the connection
- connection = mock.MagicMock()
- connection.cursor.return_value = cursor
-
- # Create a mock that will play the role of the Google BigQuery dbapi module
- dbapi = mock.MagicMock()
- dbapi.connect.return_value = connection
-
- # Create a mock that will play the role of our GoogleBigQuery client
- client = mock.MagicMock()
-
- bq = GoogleBigQuery()
- bq._client = client
- bq._dbapi = dbapi
- return bq
-
- def _build_mock_client_for_copying(self, table_exists=True):
- bq_client = mock.MagicMock()
- if not table_exists:
- bq_client.get_table.side_effect = exceptions.NotFound("not found")
- bq = GoogleBigQuery()
- bq._client = bq_client
- return bq
-
- def _build_mock_cloud_storage_client(self, tmp_blob_uri=""):
- gcs_client = mock.MagicMock()
- gcs_client.upload_table.return_value = tmp_blob_uri
- return gcs_client
-
- @property
- def default_table(self):
- return Table(
- [
- {"num": 1, "ltr": "a"},
- {"num": 2, "ltr": "b"},
- ]
- )