From 626edc783a3f3eb530e8381fde36d9bfef121493 Mon Sep 17 00:00:00 2001 From: Shauna Date: Fri, 8 Dec 2023 15:24:49 -0500 Subject: [PATCH] Merge major-release into main & bump version number in setup.py (#938) * Merge main into major-release (#814) * Use black formatting in addition to flake8 (#796) * Run black formatter on entire repository * Update requirements.txt and CONTRIBUTING.md to reflect black format * Use black linting in circleci test job * Use longer variable name to resolve flake8 E741 * Move noqa comments back to proper lines after black reformat * Standardize S3 Prefix Conventions (#803) This PR catches exception errors when a user does not exhaustive access to keys in an S3 bucket * Add Default Parameter Flexibility (#807) Skips over new `/` logic checks if prefix is `None` (which is true by default) * MoveOn Shopify / AK changes (#801) * Add access_token authentication option for Shopify * Remove unnecessary check The access token will either be None or explicitly set; don't worry about an empty string. * Add get_orders function and test * Add get_transactions function and test * Add function and test to get order * style fixes * style fixes --------- Co-authored-by: sjwmoveon Co-authored-by: Alex French Co-authored-by: Kathy Nguyen * Catch File Extensions in S3 Prefix (#809) * add exception handling * Shortened logs for flake8 * add logic for default case * added file logic + note to user * restructured prefix logic This change moves the prefix -> prefix/ logic into a try/except block ... this will be more robust to most use cases, while adding flexibility that we desire for split-permission buckets * drop nested try/catch + add verbose error log * Add error message verbosity Co-authored-by: willyraedy --------- Co-authored-by: willyraedy --------- Co-authored-by: Austin Weisgrau <62900254+austinweisgrau@users.noreply.github.com> Co-authored-by: Ian <47256454+IanRFerguson@users.noreply.github.com> Co-authored-by: Cody Gordon <13374656+codygordon@users.noreply.github.com> Co-authored-by: sjwmoveon Co-authored-by: Alex French Co-authored-by: Kathy Nguyen Co-authored-by: willyraedy * DatabaseConnector Interface to Major Release (#815) * Create the DatabaseConnector * Implement DatabaseConnector for the DB connectors * Add DatabaseConnector to std imports * Flake8 fix * Remove reference to padding in copy() * Add database_discover and fix inheritance * Remove strict_length from copy() * Put strict_length back in original order * Remove strict_length stub from BQ * Fix discover_database export statement * Add return annotation to mysql table_exists * Black formatter pass * Add more documentation on when you should use * Add developer notes. * Fix code block documentation * Enhance discover database * Add unit tests for discover database * Fix unit tests * Add two more tests * Reverse Postgres string_length change --------- Co-authored-by: Jason Walker * Zoom Authentication + Polling API (#873) * Add multiple python versions to CI tests (#858) * Add multiple python versions to CI tests * Remove duplicate key * Combine CI jobs * Update ubuntu image and actually install Python versions * Replace pyenv with apt-get to install python versions * Remove sudo * Remove get from 'apt-get' * Update apt before attempting to install * Add ppa/deadsnakes repository * Add prereq * Fix typo * Add -y to install command * Move -y to correct spot * Add more -ys * Add some echoes to debug * Switch back to pyenv approach * Remove tests from circleci config and move to new github actions config Note: no caching yet, this is more of a proof of concept * Split out Mac tests into seaparate file * Set testing environmental variable separately * First attempt to add depdendency cache * Remove windows tests for now * Fix circleci config * Fix circleci for real this time * Add tests on merging of PRs and update readme to show we do not support for Python 3.7 * Enable passing `identifiers` to ActionNetwork `upsert_person()` (#861) * Enable passing `identifiers` to ActionNetwork upsert_person * Remove unused arguments from method self.get_page method doesn't exist and that method call doesn't return anything. The return statement works fine as-is to return all tags and handles pagination on its own. * Include deprecated per_page argument for backwards compatibility Emit a deprecation warning if this argument is used * Include examples in docstring for `identifiers` argument * Expand documentation on ActionNetwork identifiers * Add pre-commit hook config to run flake8 and black on commit (#864) Notes added to README on how to install and set up * black format * black format * jwt -> s2s oauth * scaffold new functions * add docs * return * add type handling * pass in updated params * move access token function * ok let's rock!! * make changes * pass access token key only * use temporary client to gen token * mock request in constructor * drop unused imports * add changes * scaffolding tests * Add multiple python versions to CI tests (#858) * Add multiple python versions to CI tests * Remove duplicate key * Combine CI jobs * Update ubuntu image and actually install Python versions * Replace pyenv with apt-get to install python versions * Remove sudo * Remove get from 'apt-get' * Update apt before attempting to install * Add ppa/deadsnakes repository * Add prereq * Fix typo * Add -y to install command * Move -y to correct spot * Add more -ys * Add some echoes to debug * Switch back to pyenv approach * Remove tests from circleci config and move to new github actions config Note: no caching yet, this is more of a proof of concept * Split out Mac tests into seaparate file * Set testing environmental variable separately * First attempt to add depdendency cache * Remove windows tests for now * Fix circleci config * Fix circleci for real this time * Add tests on merging of PRs and update readme to show we do not support for Python 3.7 * Enable passing `identifiers` to ActionNetwork `upsert_person()` (#861) * Enable passing `identifiers` to ActionNetwork upsert_person * Remove unused arguments from method self.get_page method doesn't exist and that method call doesn't return anything. The return statement works fine as-is to return all tags and handles pagination on its own. * Include deprecated per_page argument for backwards compatibility Emit a deprecation warning if this argument is used * Include examples in docstring for `identifiers` argument * Expand documentation on ActionNetwork identifiers * Add pre-commit hook config to run flake8 and black on commit (#864) Notes added to README on how to install and set up * black format * black format * jwt -> s2s oauth * scaffold new functions * add docs * return * add type handling * pass in updated params * move access token function * ok let's rock!! * make changes * pass access token key only * use temporary client to gen token * mock request in constructor * drop unused imports * add changes * scaffolding tests * write unit tests * drop poll endpoints for now --------- Co-authored-by: Shauna Co-authored-by: Austin Weisgrau <62900254+austinweisgrau@users.noreply.github.com> * Merging Main Before Release (#880) * Add multiple python versions to CI tests (#858) * Add multiple python versions to CI tests * Remove duplicate key * Combine CI jobs * Update ubuntu image and actually install Python versions * Replace pyenv with apt-get to install python versions * Remove sudo * Remove get from 'apt-get' * Update apt before attempting to install * Add ppa/deadsnakes repository * Add prereq * Fix typo * Add -y to install command * Move -y to correct spot * Add more -ys * Add some echoes to debug * Switch back to pyenv approach * Remove tests from circleci config and move to new github actions config Note: no caching yet, this is more of a proof of concept * Split out Mac tests into seaparate file * Set testing environmental variable separately * First attempt to add depdendency cache * Remove windows tests for now * Fix circleci config * Fix circleci for real this time * Add tests on merging of PRs and update readme to show we do not support for Python 3.7 * Enable passing `identifiers` to ActionNetwork `upsert_person()` (#861) * Enable passing `identifiers` to ActionNetwork upsert_person * Remove unused arguments from method self.get_page method doesn't exist and that method call doesn't return anything. The return statement works fine as-is to return all tags and handles pagination on its own. * Include deprecated per_page argument for backwards compatibility Emit a deprecation warning if this argument is used * Include examples in docstring for `identifiers` argument * Expand documentation on ActionNetwork identifiers * Add pre-commit hook config to run flake8 and black on commit (#864) Notes added to README on how to install and set up * Add Events Helpers to PDI Connector (#865) * add helpers to Events object * stage docstring * add docs * linting * fix typo + enforce validation * add return docs * add events tests * use mock pdi * jk * mark live tests * add alias * drop unused imports * change release number (#872) * add release notes yml (#878) --------- Co-authored-by: Shauna Co-authored-by: Austin Weisgrau <62900254+austinweisgrau@users.noreply.github.com> Co-authored-by: sharinetmc <128429303+sharinetmc@users.noreply.github.com> * Switch from API key to Personal Access Token (#866) * Wraedy/bigquery db connector (#875) * Create the DatabaseConnector * Implement DatabaseConnector for the DB connectors * Add DatabaseConnector to std imports * Flake8 fix * Remove reference to padding in copy() * Add database_discover and fix inheritance * Remove strict_length from copy() * Put strict_length back in original order * Remove strict_length stub from BQ * Fix discover_database export statement * Add return annotation to mysql table_exists * Black formatter pass * create bigquery folder in databases folde * create query parity between bigquery and redshift * mock up copy functionality for bigquery * fix typo * add duplicate function to bigquery * move transaction to helper function * implement upsert * fix imports and packages * add get tables and views methods * add query return flexibility * match bigquery apis with redshift * make s3 to gcs more generic * add transaction support to bigquery * remove logs * add gcs docs * process job config in function * finish todo's (and add one more lol) * [ wip ] AttributeError * add raw download param * drop raw download * copy from GCS docstring * copy s3 docs * copy docs * docstrings * control flow * add source path to aws transfer spec * add Code object to imports * cleaning up slightly * check status code * nice * pass in required param * add pattern handling * add quote character to LoadJobConfig * add schema to copy from gcs * drop dist and sortkeys No longer input params * add delimiter param * use schema definition * write column mapping helper * pass in formatted schema to load_uri fn * rename new file * move file with jason's changes * move new changes back into file to maintain history * remove extraneous fn and move project job config * get back to test parity * fix bad merge conflict * remove extra params from copy sig * clarify transaction guidance * clean up list blobs * clean up storage transfer polling * upgrade cloud storage package * use list of schema mappings * scaffolded big file function :sunglasses: * add to docs * default to compression we can make this more flexible, just scaffolding * add temp logging we can drop this later just trying to get a handle on cycle time * use decompress * add logging * implement unzipping and reuploading cloud file * logging error * Add destination path * Small fix * add todo's * drop max wait time * add kwargs to put blob Potentially useful for metadata (content type, etc.) * add verbosity to description * black formatted * add gcs to/from helpers * write to_bigquery function * update big file logic * allow jagged rows logic * test additional methods * add duplicate table test * test drop flag for duplicate * basic test for upsert * add typing * move non-essential logs to debug * move logs to debug * hey, it works! * add UUID support for bigquery type map * add datetime to bigquery type map * address comments * address comments * drop GCS class function we can pick this up later but it doesn't currently work * move class back to old location with new import * revert to old name * remove transaction error handler * add description conditional block for s3 * change one more conditional to s3 * handle empty source paths * reverting new import path --------- Co-authored-by: Jason Walker Co-authored-by: Ian <47256454+IanRFerguson@users.noreply.github.com> Co-authored-by: Kasia Hinkson <52927664+KasiaHinkson@users.noreply.github.com> * BigQuery - Add Column Helpers (#911) * add column outlines * optionally log query * flip default params * flip back * Google BigQuery - Clean Up Autodetect Logic (#914) * don't delete * clean up schema autodetect logic * undo comments * Update stale references to parsons.databases.bigquery (#920) * Fix BQ references in discover_database * Update BQ references in tofrom.py * Update BQ refs in test_discover_database.py * Fix gcs hidden error (#930) * logging * edit flake8 max line for testing * change flake8 for testing * comment out unsused var * add print to check branch * change to logging * more logging * try printing * more logging * logging: * more printing * more logging * print transfer job request * change error message * requested changes * remove comment * GoogleCloudStorage - Handle zip / gzip files flexibly (#937) * Update release (#894) * Zoom Polls (#886) * Merge main into major-release (#814) * Use black formatting in addition to flake8 (#796) * Run black formatter on entire repository * Update requirements.txt and CONTRIBUTING.md to reflect black format * Use black linting in circleci test job * Use longer variable name to resolve flake8 E741 * Move noqa comments back to proper lines after black reformat * Standardize S3 Prefix Conventions (#803) This PR catches exception errors when a user does not exhaustive access to keys in an S3 bucket * Add Default Parameter Flexibility (#807) Skips over new `/` logic checks if prefix is `None` (which is true by default) * MoveOn Shopify / AK changes (#801) * Add access_token authentication option for Shopify * Remove unnecessary check The access token will either be None or explicitly set; don't worry about an empty string. * Add get_orders function and test * Add get_transactions function and test * Add function and test to get order * style fixes * style fixes --------- Co-authored-by: sjwmoveon Co-authored-by: Alex French Co-authored-by: Kathy Nguyen * Catch File Extensions in S3 Prefix (#809) * add exception handling * Shortened logs for flake8 * add logic for default case * added file logic + note to user * restructured prefix logic This change moves the prefix -> prefix/ logic into a try/except block ... this will be more robust to most use cases, while adding flexibility that we desire for split-permission buckets * drop nested try/catch + add verbose error log * Add error message verbosity Co-authored-by: willyraedy --------- Co-authored-by: willyraedy --------- Co-authored-by: Austin Weisgrau <62900254+austinweisgrau@users.noreply.github.com> Co-authored-by: Ian <47256454+IanRFerguson@users.noreply.github.com> Co-authored-by: Cody Gordon <13374656+codygordon@users.noreply.github.com> Co-authored-by: sjwmoveon Co-authored-by: Alex French Co-authored-by: Kathy Nguyen Co-authored-by: willyraedy * black format * black format * jwt -> s2s oauth * scaffold new functions * add docs * return * DatabaseConnector Interface to Major Release (#815) * Create the DatabaseConnector * Implement DatabaseConnector for the DB connectors * Add DatabaseConnector to std imports * Flake8 fix * Remove reference to padding in copy() * Add database_discover and fix inheritance * Remove strict_length from copy() * Put strict_length back in original order * Remove strict_length stub from BQ * Fix discover_database export statement * Add return annotation to mysql table_exists * Black formatter pass * Add more documentation on when you should use * Add developer notes. * Fix code block documentation * Enhance discover database * Add unit tests for discover database * Fix unit tests * Add two more tests * Reverse Postgres string_length change --------- Co-authored-by: Jason Walker * add type handling * pass in updated params * move access token function * ok let's rock!! * make changes * pass access token key only * use temporary client to gen token * mock request in constructor * drop unused imports * add changes * scaffolding tests * Add multiple python versions to CI tests (#858) * Add multiple python versions to CI tests * Remove duplicate key * Combine CI jobs * Update ubuntu image and actually install Python versions * Replace pyenv with apt-get to install python versions * Remove sudo * Remove get from 'apt-get' * Update apt before attempting to install * Add ppa/deadsnakes repository * Add prereq * Fix typo * Add -y to install command * Move -y to correct spot * Add more -ys * Add some echoes to debug * Switch back to pyenv approach * Remove tests from circleci config and move to new github actions config Note: no caching yet, this is more of a proof of concept * Split out Mac tests into seaparate file * Set testing environmental variable separately * First attempt to add depdendency cache * Remove windows tests for now * Fix circleci config * Fix circleci for real this time * Add tests on merging of PRs and update readme to show we do not support for Python 3.7 * Enable passing `identifiers` to ActionNetwork `upsert_person()` (#861) * Enable passing `identifiers` to ActionNetwork upsert_person * Remove unused arguments from method self.get_page method doesn't exist and that method call doesn't return anything. The return statement works fine as-is to return all tags and handles pagination on its own. * Include deprecated per_page argument for backwards compatibility Emit a deprecation warning if this argument is used * Include examples in docstring for `identifiers` argument * Expand documentation on ActionNetwork identifiers * Add pre-commit hook config to run flake8 and black on commit (#864) Notes added to README on how to install and set up * black format * black format * jwt -> s2s oauth * scaffold new functions * add docs * return * add type handling * pass in updated params * move access token function * ok let's rock!! * make changes * pass access token key only * use temporary client to gen token * mock request in constructor * drop unused imports * add changes * scaffolding tests * write unit tests * added testing * drop typing (for now) * update docstring typing * add tests * write functions * update typing * add poll results * update table output * fix tests * uhhh run it back * add scope requirements * add to docs We can add more here if folks see fit * one for the money two for the show --------- Co-authored-by: Jason Co-authored-by: Austin Weisgrau <62900254+austinweisgrau@users.noreply.github.com> Co-authored-by: Cody Gordon <13374656+codygordon@users.noreply.github.com> Co-authored-by: sjwmoveon Co-authored-by: Alex French Co-authored-by: Kathy Nguyen Co-authored-by: willyraedy Co-authored-by: Jason Walker Co-authored-by: Shauna * Check for empty tables in zoom poll results (#897) Co-authored-by: Jason Walker * Bump urllib3 from 1.26.5 to 1.26.17 (#901) Bumps [urllib3](https://github.com/urllib3/urllib3) from 1.26.5 to 1.26.17. - [Release notes](https://github.com/urllib3/urllib3/releases) - [Changelog](https://github.com/urllib3/urllib3/blob/main/CHANGES.rst) - [Commits](https://github.com/urllib3/urllib3/compare/1.26.5...1.26.17) --- updated-dependencies: - dependency-name: urllib3 dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Add MobileCommons Connector (#896) * mobilecommons class * Update __init__.py * get broadcasts * fix get broadcast request * Add mc_get_request method * Add annotation * Incorporate Daniel's suggestions and finish up get_broadcasts * A few more methods Need to figure out page_count issue * small fix * Remove page_count, use page record num instead * Add in page_count again Not all get responses include num param, but do include page_count. wft * Fix logging numbers * Add create_profile * Fix error message for post request * Start tests * Add some tests * Continue testing * Update test_mobilecommons.py * functionalize status_code check * break out parse_get_request function * fix test data * fix documentation typo * Add several tests * Update mobilecommons.py * Fix limit and pagination logic * debug unit testing * better commenting and logic * Documentation * Add MC to init file * Revert "Merge branch 'main' into cormac-mobilecommons-connector" This reverts commit cad250f44d94084e2c1a65b0662117c507b7e7cd, reversing changes made to 493e117fb4921994d4efc534e0a5c195c6170a60. * Revert "Add MC to init file" This reverts commit 493e117fb4921994d4efc534e0a5c195c6170a60. * Revert "Revert "Add MC to init file"" This reverts commit 8f87ec20115cc0ba6eff77fcf7d89fcbe6696504. * Revert "Revert "Merge branch 'main' into cormac-mobilecommons-connector"" This reverts commit 819005272152a6bebe0529fe0606f01102ee482c. * Fix init destruction * fix init yet again * Update testing docs with underscores * Lint * Lint tests * break up long responses * Fix more linting issues * Hopefully last linting issue * DGJKSNCHIVBN * Documentation fixes * Remove note to self * date format * remove random notes * Update test_mobilecommons.py --------- Co-authored-by: sharinetmc <128429303+sharinetmc@users.noreply.github.com> * #741 : Deprecate Slack chat.postMessage `as_user` argument and allow for new authorship arguments (#891) * remove the argument and add a warning that the usage is deprecated * remove usage of as_user from sample code * add in the user customization arguments in lieu of the deprecated as_user argument * add comment regarding the permissions required to use these arguments * use kwargs * surface the whole response * allow usage of the deprecated argument but surface the failed response better * add to retry * delete test file * fix linting * formatting to fix tests * fix if style * add warning for using thread_ts * move the documentation to the optional arguments * #816 Airtable.get_records() fields argument can be either str or list (#892) * all fields to be a str object * remove newline * Nir's actionnetwork changes (#900) * working on adding a functio to an and took care of a lint issues * init * working on all get functions * actionnetwork functions batch 1 is ready * linting and black formatted compliance * removed unwanted/unsused lines * merged updated main * did some linting * added some more get functions to support all ActionNetwork objects (Advocacy Campaigns, Attendances, Campaigns, Custom Fields, Donations, Embeds, Event Campaigns, Events, Forms, Fundraising Pages, Items, Lists, Messages, Metadata, Outreaches, People, Petitions, Queries, Signatures, Submissions, Tags, Taggings, Wrappers) * worked on linting again * fix airtable.insert_records table arg (#907) * Add canales s3 functions (#885) * add raw s3 functions to parsons * add selected functions to s3.py * delte redundant functions and move drop_and_save function to redshift.py * create test file * add s3 unit tests * add rs.drop_and_unload unit test * add printing for debugging * remove testing file * unsaved changes * remove unused packages * remove unneeded module * Bump urllib3 from 1.26.17 to 1.26.18 (#904) Bumps [urllib3](https://github.com/urllib3/urllib3) from 1.26.17 to 1.26.18. - [Release notes](https://github.com/urllib3/urllib3/releases) - [Changelog](https://github.com/urllib3/urllib3/blob/main/CHANGES.rst) - [Commits](https://github.com/urllib3/urllib3/compare/1.26.17...1.26.18) --- updated-dependencies: - dependency-name: urllib3 dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: sharinetmc <128429303+sharinetmc@users.noreply.github.com> * New connector for working with the Catalist Match API (#912) * Enable api_connector to return error message in `text` attribute Some API error responses contain the error message in the `text` attribute, so this update makes it possible to fetch that message if it exists. * New connector to work with the Catalist Match API * Add pytest-mock to requirements to support mocking in pytests * Tests on the catalist match connector * More open ended pytest-mock version for compatibility * Expand docstring documetation based on feedback in PR * More verbose error on match failure * Parameterize template_id variable * Expand docstrings on initial setup * Include Catalist documentation rst file * Enhancement: Action Network Connector: Added unpack_statistics param in get_messages method (#917) * Adds parameter to get_messages This adds the ability to unpack the statistics which are returned as a nested dictionary in the response. * added unpack_statistics to an.get_messages() * added parameters to get_messages and built tests * changes unpack_statistics to False by default. * added tbl variable * formatted with black * fixed docs --------- Co-authored-by: mattkrausse <106627640+mattkrausse@users.noreply.github.com> * Adding rename_columns method to Parsons Table (#923) * added rename_columns for multiple cols * linted * added clarification to docs about dict structure * updated docs --------- Co-authored-by: mattkrausse <106627640+mattkrausse@users.noreply.github.com> * Add http response to update_mailer (#924) Without returning the response, or at least the status code, it's impossible to check for errors. * Enable passing arbitrary additional fields to NGPVAN person match API (#916) * match gcs api to s3 * wip * two different functions * use csv as default * drop unused var * add docs * use temp file * add comments * wip * add docs + replicate in gzip * boy howdy! * set timeout * Revert "Merge branch 'main' into ianferguson/gcs-pathing" This reverts commit 5b1ef6e9fc2eaacd43c15998a81c0e4584401c59, reversing changes made to f0eb3d6d58c31d5138cdce257fcde7c9172b447a. * black format --------- Signed-off-by: dependabot[bot] Co-authored-by: Kasia Hinkson <52927664+KasiaHinkson@users.noreply.github.com> Co-authored-by: Jason Co-authored-by: Austin Weisgrau <62900254+austinweisgrau@users.noreply.github.com> Co-authored-by: Cody Gordon <13374656+codygordon@users.noreply.github.com> Co-authored-by: sjwmoveon Co-authored-by: Alex French Co-authored-by: Kathy Nguyen Co-authored-by: willyraedy Co-authored-by: Jason Walker Co-authored-by: Shauna Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Cormac Martinez del Rio <66973815+cmdelrio@users.noreply.github.com> Co-authored-by: sharinetmc <128429303+sharinetmc@users.noreply.github.com> Co-authored-by: Angela Gloyna Co-authored-by: NirTatcher <75395024+NirTatcher@users.noreply.github.com> Co-authored-by: justicehaze <69082853+justicehaze@users.noreply.github.com> Co-authored-by: mattkrausse <106627640+mattkrausse@users.noreply.github.com> Co-authored-by: mkrausse-ggtx <131683556+mkrausse-ggtx@users.noreply.github.com> Co-authored-by: Sophie Waldman <62553142+sjwmoveon@users.noreply.github.com> * GoogleCloudStorage - Add GCS Destination Path Param (#936) * Update release (#894) * Zoom Polls (#886) * Merge main into major-release (#814) * Use black formatting in addition to flake8 (#796) * Run black formatter on entire repository * Update requirements.txt and CONTRIBUTING.md to reflect black format * Use black linting in circleci test job * Use longer variable name to resolve flake8 E741 * Move noqa comments back to proper lines after black reformat * Standardize S3 Prefix Conventions (#803) This PR catches exception errors when a user does not exhaustive access to keys in an S3 bucket * Add Default Parameter Flexibility (#807) Skips over new `/` logic checks if prefix is `None` (which is true by default) * MoveOn Shopify / AK changes (#801) * Add access_token authentication option for Shopify * Remove unnecessary check The access token will either be None or explicitly set; don't worry about an empty string. * Add get_orders function and test * Add get_transactions function and test * Add function and test to get order * style fixes * style fixes --------- Co-authored-by: sjwmoveon Co-authored-by: Alex French Co-authored-by: Kathy Nguyen * Catch File Extensions in S3 Prefix (#809) * add exception handling * Shortened logs for flake8 * add logic for default case * added file logic + note to user * restructured prefix logic This change moves the prefix -> prefix/ logic into a try/except block ... this will be more robust to most use cases, while adding flexibility that we desire for split-permission buckets * drop nested try/catch + add verbose error log * Add error message verbosity Co-authored-by: willyraedy --------- Co-authored-by: willyraedy --------- Co-authored-by: Austin Weisgrau <62900254+austinweisgrau@users.noreply.github.com> Co-authored-by: Ian <47256454+IanRFerguson@users.noreply.github.com> Co-authored-by: Cody Gordon <13374656+codygordon@users.noreply.github.com> Co-authored-by: sjwmoveon Co-authored-by: Alex French Co-authored-by: Kathy Nguyen Co-authored-by: willyraedy * black format * black format * jwt -> s2s oauth * scaffold new functions * add docs * return * DatabaseConnector Interface to Major Release (#815) * Create the DatabaseConnector * Implement DatabaseConnector for the DB connectors * Add DatabaseConnector to std imports * Flake8 fix * Remove reference to padding in copy() * Add database_discover and fix inheritance * Remove strict_length from copy() * Put strict_length back in original order * Remove strict_length stub from BQ * Fix discover_database export statement * Add return annotation to mysql table_exists * Black formatter pass * Add more documentation on when you should use * Add developer notes. * Fix code block documentation * Enhance discover database * Add unit tests for discover database * Fix unit tests * Add two more tests * Reverse Postgres string_length change --------- Co-authored-by: Jason Walker * add type handling * pass in updated params * move access token function * ok let's rock!! * make changes * pass access token key only * use temporary client to gen token * mock request in constructor * drop unused imports * add changes * scaffolding tests * Add multiple python versions to CI tests (#858) * Add multiple python versions to CI tests * Remove duplicate key * Combine CI jobs * Update ubuntu image and actually install Python versions * Replace pyenv with apt-get to install python versions * Remove sudo * Remove get from 'apt-get' * Update apt before attempting to install * Add ppa/deadsnakes repository * Add prereq * Fix typo * Add -y to install command * Move -y to correct spot * Add more -ys * Add some echoes to debug * Switch back to pyenv approach * Remove tests from circleci config and move to new github actions config Note: no caching yet, this is more of a proof of concept * Split out Mac tests into seaparate file * Set testing environmental variable separately * First attempt to add depdendency cache * Remove windows tests for now * Fix circleci config * Fix circleci for real this time * Add tests on merging of PRs and update readme to show we do not support for Python 3.7 * Enable passing `identifiers` to ActionNetwork `upsert_person()` (#861) * Enable passing `identifiers` to ActionNetwork upsert_person * Remove unused arguments from method self.get_page method doesn't exist and that method call doesn't return anything. The return statement works fine as-is to return all tags and handles pagination on its own. * Include deprecated per_page argument for backwards compatibility Emit a deprecation warning if this argument is used * Include examples in docstring for `identifiers` argument * Expand documentation on ActionNetwork identifiers * Add pre-commit hook config to run flake8 and black on commit (#864) Notes added to README on how to install and set up * black format * black format * jwt -> s2s oauth * scaffold new functions * add docs * return * add type handling * pass in updated params * move access token function * ok let's rock!! * make changes * pass access token key only * use temporary client to gen token * mock request in constructor * drop unused imports * add changes * scaffolding tests * write unit tests * added testing * drop typing (for now) * update docstring typing * add tests * write functions * update typing * add poll results * update table output * fix tests * uhhh run it back * add scope requirements * add to docs We can add more here if folks see fit * one for the money two for the show --------- Co-authored-by: Jason Co-authored-by: Austin Weisgrau <62900254+austinweisgrau@users.noreply.github.com> Co-authored-by: Cody Gordon <13374656+codygordon@users.noreply.github.com> Co-authored-by: sjwmoveon Co-authored-by: Alex French Co-authored-by: Kathy Nguyen Co-authored-by: willyraedy Co-authored-by: Jason Walker Co-authored-by: Shauna * Check for empty tables in zoom poll results (#897) Co-authored-by: Jason Walker * Bump urllib3 from 1.26.5 to 1.26.17 (#901) Bumps [urllib3](https://github.com/urllib3/urllib3) from 1.26.5 to 1.26.17. - [Release notes](https://github.com/urllib3/urllib3/releases) - [Changelog](https://github.com/urllib3/urllib3/blob/main/CHANGES.rst) - [Commits](https://github.com/urllib3/urllib3/compare/1.26.5...1.26.17) --- updated-dependencies: - dependency-name: urllib3 dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Add MobileCommons Connector (#896) * mobilecommons class * Update __init__.py * get broadcasts * fix get broadcast request * Add mc_get_request method * Add annotation * Incorporate Daniel's suggestions and finish up get_broadcasts * A few more methods Need to figure out page_count issue * small fix * Remove page_count, use page record num instead * Add in page_count again Not all get responses include num param, but do include page_count. wft * Fix logging numbers * Add create_profile * Fix error message for post request * Start tests * Add some tests * Continue testing * Update test_mobilecommons.py * functionalize status_code check * break out parse_get_request function * fix test data * fix documentation typo * Add several tests * Update mobilecommons.py * Fix limit and pagination logic * debug unit testing * better commenting and logic * Documentation * Add MC to init file * Revert "Merge branch 'main' into cormac-mobilecommons-connector" This reverts commit cad250f44d94084e2c1a65b0662117c507b7e7cd, reversing changes made to 493e117fb4921994d4efc534e0a5c195c6170a60. * Revert "Add MC to init file" This reverts commit 493e117fb4921994d4efc534e0a5c195c6170a60. * Revert "Revert "Add MC to init file"" This reverts commit 8f87ec20115cc0ba6eff77fcf7d89fcbe6696504. * Revert "Revert "Merge branch 'main' into cormac-mobilecommons-connector"" This reverts commit 819005272152a6bebe0529fe0606f01102ee482c. * Fix init destruction * fix init yet again * Update testing docs with underscores * Lint * Lint tests * break up long responses * Fix more linting issues * Hopefully last linting issue * DGJKSNCHIVBN * Documentation fixes * Remove note to self * date format * remove random notes * Update test_mobilecommons.py --------- Co-authored-by: sharinetmc <128429303+sharinetmc@users.noreply.github.com> * #741 : Deprecate Slack chat.postMessage `as_user` argument and allow for new authorship arguments (#891) * remove the argument and add a warning that the usage is deprecated * remove usage of as_user from sample code * add in the user customization arguments in lieu of the deprecated as_user argument * add comment regarding the permissions required to use these arguments * use kwargs * surface the whole response * allow usage of the deprecated argument but surface the failed response better * add to retry * delete test file * fix linting * formatting to fix tests * fix if style * add warning for using thread_ts * move the documentation to the optional arguments * #816 Airtable.get_records() fields argument can be either str or list (#892) * all fields to be a str object * remove newline * Nir's actionnetwork changes (#900) * working on adding a functio to an and took care of a lint issues * init * working on all get functions * actionnetwork functions batch 1 is ready * linting and black formatted compliance * removed unwanted/unsused lines * merged updated main * did some linting * added some more get functions to support all ActionNetwork objects (Advocacy Campaigns, Attendances, Campaigns, Custom Fields, Donations, Embeds, Event Campaigns, Events, Forms, Fundraising Pages, Items, Lists, Messages, Metadata, Outreaches, People, Petitions, Queries, Signatures, Submissions, Tags, Taggings, Wrappers) * worked on linting again * fix airtable.insert_records table arg (#907) * Add canales s3 functions (#885) * add raw s3 functions to parsons * add selected functions to s3.py * delte redundant functions and move drop_and_save function to redshift.py * create test file * add s3 unit tests * add rs.drop_and_unload unit test * add printing for debugging * remove testing file * unsaved changes * remove unused packages * remove unneeded module * Bump urllib3 from 1.26.17 to 1.26.18 (#904) Bumps [urllib3](https://github.com/urllib3/urllib3) from 1.26.17 to 1.26.18. - [Release notes](https://github.com/urllib3/urllib3/releases) - [Changelog](https://github.com/urllib3/urllib3/blob/main/CHANGES.rst) - [Commits](https://github.com/urllib3/urllib3/compare/1.26.17...1.26.18) --- updated-dependencies: - dependency-name: urllib3 dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: sharinetmc <128429303+sharinetmc@users.noreply.github.com> * New connector for working with the Catalist Match API (#912) * Enable api_connector to return error message in `text` attribute Some API error responses contain the error message in the `text` attribute, so this update makes it possible to fetch that message if it exists. * New connector to work with the Catalist Match API * Add pytest-mock to requirements to support mocking in pytests * Tests on the catalist match connector * More open ended pytest-mock version for compatibility * Expand docstring documetation based on feedback in PR * More verbose error on match failure * Parameterize template_id variable * Expand docstrings on initial setup * Include Catalist documentation rst file * Enhancement: Action Network Connector: Added unpack_statistics param in get_messages method (#917) * Adds parameter to get_messages This adds the ability to unpack the statistics which are returned as a nested dictionary in the response. * added unpack_statistics to an.get_messages() * added parameters to get_messages and built tests * changes unpack_statistics to False by default. * added tbl variable * formatted with black * fixed docs --------- Co-authored-by: mattkrausse <106627640+mattkrausse@users.noreply.github.com> * Adding rename_columns method to Parsons Table (#923) * added rename_columns for multiple cols * linted * added clarification to docs about dict structure * updated docs --------- Co-authored-by: mattkrausse <106627640+mattkrausse@users.noreply.github.com> * Add http response to update_mailer (#924) Without returning the response, or at least the status code, it's impossible to check for errors. * Enable passing arbitrary additional fields to NGPVAN person match API (#916) * match gcs api to s3 * Revert "Merge branch 'main' into ianferguson/gcs-pathing" This reverts commit 5b1ef6e9fc2eaacd43c15998a81c0e4584401c59, reversing changes made to f0eb3d6d58c31d5138cdce257fcde7c9172b447a. --------- Signed-off-by: dependabot[bot] Co-authored-by: Kasia Hinkson <52927664+KasiaHinkson@users.noreply.github.com> Co-authored-by: Jason Co-authored-by: Austin Weisgrau <62900254+austinweisgrau@users.noreply.github.com> Co-authored-by: Cody Gordon <13374656+codygordon@users.noreply.github.com> Co-authored-by: sjwmoveon Co-authored-by: Alex French Co-authored-by: Kathy Nguyen Co-authored-by: willyraedy Co-authored-by: Jason Walker Co-authored-by: Shauna Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Cormac Martinez del Rio <66973815+cmdelrio@users.noreply.github.com> Co-authored-by: sharinetmc <128429303+sharinetmc@users.noreply.github.com> Co-authored-by: Angela Gloyna Co-authored-by: NirTatcher <75395024+NirTatcher@users.noreply.github.com> Co-authored-by: justicehaze <69082853+justicehaze@users.noreply.github.com> Co-authored-by: mattkrausse <106627640+mattkrausse@users.noreply.github.com> Co-authored-by: mkrausse-ggtx <131683556+mkrausse-ggtx@users.noreply.github.com> Co-authored-by: Sophie Waldman <62553142+sjwmoveon@users.noreply.github.com> * Bump version number to 3.0.0 * Fix import statement in test_bigquery.py * Add Tests to major-release Branch (#949) * add major release branch to gh workflow * add mac tests * null changes (want to trigger test) * remove temp change * Resolve GCP Test Failures For Major Release (#948) * add full import * resolve bigquery unzip test * remove keyword * fix flake8 errors * fix linting * push docs * fix flake8 * too long for flake8 * Install google-cloud-storage-transfer for google extras (#946) This is required for the import of storage_transfer to work Co-authored-by: Ian <47256454+IanRFerguson@users.noreply.github.com> * Revert "Enable passing `identifiers` to ActionNetwork `upsert_person()` (#861)" (#945) This reverts commit 77ead6079ee03c399bbc83314351bc719ed457c3. Co-authored-by: Ian <47256454+IanRFerguson@users.noreply.github.com> * BigQuery - Add row count function to connector (#913) * add row count function * use sql * add unit test * unit test * whoops! * add examples (#952) * Parse Boolean types by default (#943) * Parse Boolean types by default Commit 766cfaedc5652af8c39fde890067a99b6fa58518 created a feature for parsing boolean types but turned it off by default. This commit turns that feature on by default and adds a comment about how to turn it off and what that does. * Fix test expectations after updating boolean parsing behavior * Only ever interpret python bools as SQL booleans No longer coerce by default any of the following as booleans: "yes", "True", "t", 1, 0, "no", "False", "f" * Fix redshift test parsing bools * Move redshift test into test_databases folder * Remove retired TRUE_VALS and FALSE_VALS configuration variables We now only use python booleans --------- Signed-off-by: dependabot[bot] Co-authored-by: Jason Co-authored-by: Austin Weisgrau <62900254+austinweisgrau@users.noreply.github.com> Co-authored-by: Ian <47256454+IanRFerguson@users.noreply.github.com> Co-authored-by: Cody Gordon <13374656+codygordon@users.noreply.github.com> Co-authored-by: sjwmoveon Co-authored-by: Alex French Co-authored-by: Kathy Nguyen Co-authored-by: willyraedy Co-authored-by: Jason Walker Co-authored-by: sharinetmc <128429303+sharinetmc@users.noreply.github.com> Co-authored-by: Kathy Nguyen Co-authored-by: Kasia Hinkson <52927664+KasiaHinkson@users.noreply.github.com> Co-authored-by: dexchan <2642977+dexchan@users.noreply.github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Cormac Martinez del Rio <66973815+cmdelrio@users.noreply.github.com> Co-authored-by: Angela Gloyna Co-authored-by: NirTatcher <75395024+NirTatcher@users.noreply.github.com> Co-authored-by: justicehaze <69082853+justicehaze@users.noreply.github.com> Co-authored-by: mattkrausse <106627640+mattkrausse@users.noreply.github.com> Co-authored-by: mkrausse-ggtx <131683556+mkrausse-ggtx@users.noreply.github.com> Co-authored-by: Sophie Waldman <62553142+sjwmoveon@users.noreply.github.com> --- .github/workflows/test-linux-windows.yml | 4 +- .github/workflows/tests-mac.yml | 4 +- .gitignore | 1 + CONTRIBUTING.md | 155 ++- Dockerfile | 2 +- docs/airtable.rst | 12 +- docs/google.rst | 32 +- parsons/airtable/airtable.py | 13 +- parsons/databases/database/constants.py | 4 - parsons/databases/database/database.py | 104 +- parsons/databases/discover_database.py | 4 +- parsons/databases/redshift/redshift.py | 2 +- parsons/etl/etl.py | 4 - parsons/etl/table.py | 9 - parsons/etl/tofrom.py | 63 +- parsons/google/google_bigquery.py | 1121 +++++++++++++++-- parsons/google/google_cloud_storage.py | 340 ++++- requirements.txt | 4 +- setup.py | 3 +- test/test_airtable/test_airtable.py | 2 +- test/test_databases/test_bigquery.py | 539 ++++++++ test/test_databases/test_database.py | 52 +- test/test_databases/test_discover_database.py | 22 +- test/test_databases/test_mysql.py | 10 +- test/test_databases/test_postgres.py | 18 +- test/{ => test_databases}/test_redshift.py | 19 +- test/test_google/test_google_bigquery.py | 270 ---- 27 files changed, 2186 insertions(+), 627 deletions(-) create mode 100644 test/test_databases/test_bigquery.py rename test/{ => test_databases}/test_redshift.py (98%) delete mode 100644 test/test_google/test_google_bigquery.py diff --git a/.github/workflows/test-linux-windows.yml b/.github/workflows/test-linux-windows.yml index c2ec10d354..1ae62c8640 100644 --- a/.github/workflows/test-linux-windows.yml +++ b/.github/workflows/test-linux-windows.yml @@ -2,9 +2,9 @@ name: tests on: pull_request: - branches: ["main"] + branches: ["main", "major-release"] push: - branches: ["main"] + branches: ["main", "major-release"] env: TESTING: 1 diff --git a/.github/workflows/tests-mac.yml b/.github/workflows/tests-mac.yml index 65c5478e55..012025a520 100644 --- a/.github/workflows/tests-mac.yml +++ b/.github/workflows/tests-mac.yml @@ -3,9 +3,9 @@ name: tests for mac on: pull_request: - branches: ["main"] + branches: ["main", "major-release"] push: - branches: ["main"] + branches: ["main", "major-release"] env: TESTING: 1 diff --git a/.gitignore b/.gitignore index 99a87cf379..8d6520e18e 100644 --- a/.gitignore +++ b/.gitignore @@ -110,6 +110,7 @@ venv.bak/ # scratch scratch* old!_* +test.ipynb # vscode .vscode/ diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e34f0872c2..68c8866f0b 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -12,4 +12,157 @@ You can contribute by: * [teaching and mentoring](https://www.parsonsproject.org/pub/contributing-guide#teaching-and-mentoring) * [helping "triage" issues and review pull requests](https://www.parsonsproject.org/pub/contributing-guide#maintainer-tasks) -If you're not sure how to get started, please ask for help! We're happy to chat and help you find the best way to get involved. \ No newline at end of file +We encourage folks to review existing issues before starting a new issue. + +* If the issue you want exists, feel free to use the *thumbs up* emoji to up vote the issue. +* If you have additional documentation or context that would be helpful, please add using comments. +* If you have code snippets, but don’t have time to do the full write, please add to the issue! + +We use labels to help us classify issues. They include: +* **bug** - something in Parsons isn’t working the way it should +* **enhancement** - new feature or request (e.g. a new API connector) +* **good first issue** - an issue that would be good for someone who is new to Parsons + +## Contributing Code to Parsons + +Generally, code contributions to Parsons will be either enhancements or bug requests (or contributions of [sample code](#sample-code), discussed below). All changes to the repository are made [via pull requests](#submitting-a-pull-request). + +If you would like to contribute code to Parsons, please review the issues in the repository and find one you would like to work on. If you are new to Parsons or to open source projects, look for issues with the [**good first issue**](https://github.com/move-coop/parsons/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22) label. Once you have found your issue, please add a comment to the issue that lets others know that you are interested in working on it. If you're having trouble finding something to work on, please ask us for help on Slack. + +The bulk of Parsons is made up of Connector classes, which are Python classes that help move data in and out of third party services. When you feel ready, you may want to contribute by [adding a new Connector class](https://move-coop.github.io/parsons/html/build_a_connector.html). + +### Making Changes to Parsons + +To make code changes to Parsons, you'll need to set up your development environment, make your changes, and then submit a pull request. + +To set up your development environment: + +* Fork the Parsons project using [the “Fork” button in GitHub](https://guides.github.com/activities/forking/) +* Clone your fork to your local computer +* Set up a [virtual environment](#virtual-environments) +* Install the [dependencies](#installing-dependencies) +* Check that everything's working by [running the unit tests](#unit-tests) and the [linter](#linting) + +Now it's time to make your changes. We suggest taking a quick look at our [coding conventions](#coding-conventions) - it'll make the review process easier down the line. In addition to any code changes, make sure to update the documentation and the unit tests if necessary. Not sure if your changes require test or documentation updates? Just ask in Slack or through a comment on the relevant issue. When you're done, make sure to run the [unit tests](#unit-tests) and the [linter](#linting) again. + +Finally, you'll want to [submit a pull request](#submitting-a-pull-request). And that's it! + +#### Virtual Environments + +If required dependencies conflict with packages or modules you need for other projects, you can create and use a [virtual environment](https://docs.python.org/3/library/venv.html). + +``` +python3 -m venv .venv # Creates a virtual environment in the .venv folder +source .venv/bin/activate # Activate in Unix or MacOS +.venv/Scripts/activate.bat # Activate in Windows +``` + +#### Installing Dependencies + +Before running or testing your code changes, be sure to install all of the required Python libraries that Parsons depends on. + +From the root of the parsons repository, use the run the following command: + +```bash +> pip install -r requirements.txt +``` + +#### Unit Tests + +When contributing code, we ask you to add to tests that can be used to verify that the code is working as expected. All of our unit tests are located in the `test/` folder at the root of the repository. + +We use the pytest tool to run our suite of automated unit tests. The pytest command line tool is installed as part of the Parsons dependencies. + +To run all the entire suite of unit tests, execute the following command: + +```bash +> pytest -rf test/ +``` + +Once the pytest tool has finished running all of the tests, it will output details around any errors or test failures it encountered. If no failures are identified, then you are good to go! + +**Note:*** Some tests are written to call out to external API’s, and will be skipped as part of standard unit testing. This is expected. + +See the [pytest documentation](https://docs.pytest.org/en/latest/contents.html) for more info and many more options. + +#### Linting + +We use the [black](https://github.com/psf/black) and [flake8](http://flake8.pycqa.org/en/latest/) tools to [lint](https://en.wikipedia.org/wiki/Lint_(software)) the code in the repository to make sure it matches our preferred style. Both tools are installed as part of the Parsons dependencies. + +Run the following commands from the root of the Parsons repository to lint your code changes: + +```bash +> flake8 --max-line-length=100 --extend-ignore=E203,W503 parsons +> black parsons +``` + +Pre-commit hooks are available to enforce black and isort formatting on +commit. You can also set up your IDE to reformat using black and/or isort on +save. + +To set up the pre-commit hooks, install pre-commit with `pip install +pre-commit`, and then run `pre-commit install`. + +#### Coding Conventions + +The following is a list of best practices to consider when writing code for the Parsons project: + +* Each tool connector should be its own unique class (e.g. ActionKit, VAN) in its own Python package. Use existing connectors as examples when deciding how to layout your code. + +* Methods should be named using a verb_noun structure, such as `get_activist()` or `update_event()`. + +* Methods should reflect the vocabulary utilized by the original tool where possible to mantain transparency. For example, Google Cloud Storage refers to file like objects as blobs. The methods are called `get_blob()` rather than `get_file()`. + +* Methods that can work with arbitrarily large data (e.g. database or API queries) should use of Parson Tables to hold the data instead of standard Python collections (e.g. lists, dicts). + +* You should avoid abbreviations for method names and variable names where possible. + +* Inline comments explaining complex codes and methods are appreciated. + +* Capitalize the word Parsons for consistency where possible, especially in documentation. + +If you are building a new connector or extending an existing connector, there are more best practices in the [How to Build a Connector](https://move-coop.github.io/parsons/html/build_a_connector.html) documentation. + +## Documentation + +Parsons documentation is built using the Python Sphinx tool. Sphinx uses the `docs/*.rst` files in the repository to create the documentation. + +We have a [documentation label](https://github.com/move-coop/parsons/issues?q=is%3Aissue+is%3Aopen+label%3Adocumentation) that may help you find good docs issues to work on. If you are adding a new connector, you will need to add a reference to the connector to one of the .rst files. Please use the existing documentation as an example. + +When editing documentation, make sure you are editing the source files (with .md or .rst extension) and not the build files (.html extension). + +The workflow for documentation changes is a bit simpler than for code changes: + +* Fork the Parsons project using [the “Fork” button in GitHub](https://guides.github.com/activities/forking/) +* Clone your fork to your local computer +* Change into the `docs` folder and install the requirements with `pip install -r requirements.txt` (you may want to set up a [virtual environment](#virtual-environments) first) +* Make your changes and re-build the docs by running `make html`. (Note: this builds only a single version of the docs, from the current files. To create docs with multiple versions like our publicly hosted docs, run `make deploy_docs`.) +* Open these files in your web browser to check that they look as you expect. +* [Submit a pull request](#submitting-a-pull-request) + +When you make documentation changes, you only need to track the source files with git. The docs built by the html folder should not be included. + +You should not need to worry about the unit tests or the linter if you are making documentation changes only. + +## Contributing Sample Code + +One important way to contribute to the Parsons project is to submit sample code that provides recipes and patterns for how to use the Parsons library. + +We have a folder called `useful_resources/` in the root of the repository. If you have scripts that incorporate Parsons, we encourage you to add them there! + +The workflow for adding sample code is: + +* Fork the Parsons project using [the “Fork” button in GitHub](https://guides.github.com/activities/forking/) +* Clone your fork to your local computer +* Add your sample code into the `useful_resources/` folder +* [Submit a pull request](#submitting-a-pull-request) + +You should not need to worry about the unit tests or the linter if you are only adding sample code. + +## Submitting a Pull Request + +To submit a pull request, follow [these instructions to create a Pull Request from your fork](https://help.github.com/en/github/collaborating-with-issues-and-pull-requests/creating-a-pull-request-from-a-fork) back to the original Parsons repository. + +The Parsons team will review your pull request and provide feedback. Please feel free to ping us if no one's responded to your Pull Request after a few days. We may not be able to review it right away, but we should be able to tell you when we'll get to it. + +Once your pull request has been approved, the Parsons team will merge your changes into the Parsons repository diff --git a/Dockerfile b/Dockerfile index 7fdd250950..3abd763c4e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -42,4 +42,4 @@ RUN python setup.py develop RUN mkdir /app WORKDIR /app # Useful for importing modules that are associated with your python scripts: -env PYTHONPATH=.:/app +ENV PYTHONPATH=.:/app diff --git a/docs/airtable.rst b/docs/airtable.rst index 5046ff7770..fd02dca8b8 100644 --- a/docs/airtable.rst +++ b/docs/airtable.rst @@ -6,7 +6,7 @@ Overview ******** The Airtable class allows you to interact with an `Airtable `_ base. In order to use this class -you must generate an Airtable API Key which can be found in your Airtable `account settings `_. +you must generate an Airtable personal access token which can be found in your Airtable `settings `_. .. note:: Finding The Base Key @@ -18,20 +18,20 @@ you must generate an Airtable API Key which can be found in your Airtable `accou ********** QuickStart ********** -To instantiate the Airtable class, you can either store your Airtable API -``AIRTABLE_API_KEY`` as an environmental variable or pass in your api key +To instantiate the Airtable class, you can either store your Airtable personal access token +``AIRTABLE_PERSONAL_ACCESS_TOKEN`` as an environmental variable or pass in your personal access token as an argument. You also need to pass in the base key and table name. .. code-block:: python from parsons import Airtable - # First approach: Use API credentials via environmental variables and pass + # First approach: Use personal access token via environmental variable and pass # the base key and the table as arguments. at = Airtable(base_key, 'table01') - # Second approach: Pass API credentials, base key and table name as arguments. - at = Airtable(base_key, 'table01', api_key='MYFAKEKEY') + # Second approach: Pass personal access token, base key and table name as arguments. + at = Airtable(base_key, 'table01', personal_access_token='MYFAKETOKEN') You can then call various endpoints: diff --git a/docs/google.rst b/docs/google.rst index 84b4060934..a1428875ef 100644 --- a/docs/google.rst +++ b/docs/google.rst @@ -68,7 +68,7 @@ Google Cloud projects. Quickstart ========== -To instantiate the GoogleBigQuery class, you can pass the constructor a string containing either the name of the Google service account credentials file or a JSON string encoding those credentials. Alternatively, you can set the environment variable ``GOOGLE_APPLICATION_CREDENTIALS`` to be either of those strings and call the constructor without that argument. +To instantiate the `GoogleBigQuery` class, you can pass the constructor a string containing either the name of the Google service account credentials file or a JSON string encoding those credentials. Alternatively, you can set the environment variable ``GOOGLE_APPLICATION_CREDENTIALS`` to be either of those strings and call the constructor without that argument. .. code-block:: python @@ -78,7 +78,7 @@ To instantiate the GoogleBigQuery class, you can pass the constructor a string c # be the file name or a JSON encoding of the credentials. os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'google_credentials_file.json' - big_query = GoogleBigQuery() + bigquery = GoogleBigQuery() Alternatively, you can pass the credentials in as an argument. In the example below, we also specify the project. @@ -86,8 +86,10 @@ Alternatively, you can pass the credentials in as an argument. In the example be # Project in which we're working project = 'parsons-test' - big_query = GoogleBigQuery(app_creds='google_credentials_file.json', - project=project) + bigquery = GoogleBigQuery( + app_creds='google_credentials_file.json', + project=project + ) We can now upload/query data. @@ -98,7 +100,7 @@ We can now upload/query data. # Table name should be project.dataset.table, or dataset.table, if # working with the default project - table_name = project + '.' + dataset + '.' + table + table_name = f"`{project}.{dataset}.{table}`" # Must be pre-existing bucket. Create via GoogleCloudStorage() or # at https://console.cloud.google.com/storage/create-bucket. May be @@ -107,7 +109,7 @@ We can now upload/query data. gcs_temp_bucket = 'parsons_bucket' # Create dataset if it doesn't already exist - big_query.client.create_dataset(dataset=dataset, exists_ok=True) + bigquery.client.create_dataset(dataset=dataset, exists_ok=True) parsons_table = Table([{'name':'Bob', 'party':'D'}, {'name':'Jane', 'party':'D'}, @@ -115,15 +117,23 @@ We can now upload/query data. {'name':'Bill', 'party':'I'}]) # Copy table in to create new BigQuery table - big_query.copy(table_obj=parsons_table, - table_name=table_name, - tmp_gcs_bucket=gcs_temp_bucket) + bigquery.copy( + table_obj=parsons_table, + table_name=table_name, + tmp_gcs_bucket=gcs_temp_bucket + ) # Select from project.dataset.table - big_query.query(f'select name from {table_name} where party = "D"') + bigquery.query(f'select name from {table_name} where party = "D"') + + # Query with parameters + bigquery.query( + f"select name from {table_name} where party = %s", + parameters=["D"] + ) # Delete the table when we're done - big_query.client.delete_table(table=table_name) + bigquery.client.delete_table(table=table_name) === API diff --git a/parsons/airtable/airtable.py b/parsons/airtable/airtable.py index 299391033f..d6d7baf12c 100644 --- a/parsons/airtable/airtable.py +++ b/parsons/airtable/airtable.py @@ -15,14 +15,17 @@ class Airtable(object): table_name: str The name of the table in the base. The table name is the equivilant of the sheet name in Excel or GoogleDocs. - api_key: str - The Airtable provided api key. Not required if ``AIRTABLE_API_KEY`` env variable set. + personal_access_token: str + The Airtable personal access token. Not required if ``AIRTABLE_PERSONAL_ACCESS_TOKEN`` + env variable set. """ - def __init__(self, base_key, table_name, api_key=None): + def __init__(self, base_key, table_name, personal_access_token=None): - self.api_key = check_env.check("AIRTABLE_API_KEY", api_key) - self.client = client(base_key, table_name, self.api_key) + self.personal_access_token = check_env.check( + "AIRTABLE_PERSONAL_ACCESS_TOKEN", personal_access_token + ) + self.client = client(base_key, table_name, self.personal_access_token) def get_record(self, record_id): """ diff --git a/parsons/databases/database/constants.py b/parsons/databases/database/constants.py index 8935a8734d..1b78ffb0af 100644 --- a/parsons/databases/database/constants.py +++ b/parsons/databases/database/constants.py @@ -153,11 +153,7 @@ VARCHAR = "varchar" FLOAT = "float" - -DO_PARSE_BOOLS = False BOOL = "bool" -TRUE_VALS = ("TRUE", "T", "YES", "Y", "1", 1) -FALSE_VALS = ("FALSE", "F", "NO", "N", "0", 0) # The following values are the minimum and maximum values for MySQL int # types. https://dev.mysql.com/doc/refman/8.0/en/integer-types.html diff --git a/parsons/databases/database/database.py b/parsons/databases/database/database.py index 9f369d43c9..bfec754241 100644 --- a/parsons/databases/database/database.py +++ b/parsons/databases/database/database.py @@ -1,5 +1,7 @@ import parsons.databases.database.constants as consts -import ast +import logging + +logger = logging.getLogger(__name__) class DatabaseCreateStatement: @@ -17,11 +19,7 @@ def __init__(self): self.BIGINT = consts.BIGINT self.FLOAT = consts.FLOAT - # Added for backwards compatability - self.DO_PARSE_BOOLS = consts.DO_PARSE_BOOLS self.BOOL = consts.BOOL - self.TRUE_VALS = consts.TRUE_VALS - self.FALSE_VALS = consts.FALSE_VALS self.VARCHAR = consts.VARCHAR self.RESERVED_WORDS = consts.RESERVED_WORDS @@ -117,29 +115,6 @@ def is_valid_sql_num(self, val): except (TypeError, ValueError): return False - def is_sql_bool(self, val): - """Check whether val is a valid sql boolean. - - When inserting data into databases, different values can be accepted - as boolean types. For excample, ``False``, ``'FALSE'``, ``1``. - - `Args`: - val: any - The value to check. - `Returns`: - bool - Whether or not the value is a valid sql boolean. - """ - if not self.DO_PARSE_BOOLS: - return - - if isinstance(val, bool) or ( - type(val) in (int, str) - and str(val).upper() in self.TRUE_VALS + self.FALSE_VALS - ): - return True - return False - def detect_data_type(self, value, cmp_type=None): """Detect the higher of value's type cmp_type. @@ -161,64 +136,45 @@ def detect_data_type(self, value, cmp_type=None): # Stop if the compare type is already a varchar # varchar is the highest data type. if cmp_type == self.VARCHAR: - return cmp_type - - # Attempt to evaluate value as a literal (e.g. '1' => 1, ) If the value - # is just a string, is None, or is empty, it will raise an error. These - # should be considered varchars. - # E.g. - # "" => SyntaxError - # "anystring" => ValueError - try: - val_lit = ast.literal_eval(str(value)) - except (SyntaxError, ValueError): - if self.is_sql_bool(value): - return self.BOOL - return self.VARCHAR - - # Exit early if it's None - # is_valid_sql_num(None) == False - # instead of defaulting to varchar (which is the next test) - # return the compare type - if val_lit is None: - return cmp_type + result = cmp_type + + elif isinstance(value, bool): + result = self.BOOL + + elif value is None: + result = cmp_type # Make sure that it is a valid integer # Python accepts 100_000 as a valid form of 100000, # however a sql engine may throw an error - if not self.is_valid_sql_num(value): - if self.is_sql_bool(val_lit) and cmp_type != self.VARCHAR: - return self.BOOL - else: - return self.VARCHAR + elif not self.is_valid_sql_num(value): + result = self.VARCHAR - if self.is_sql_bool(val_lit) and cmp_type not in self.INT_TYPES + [self.FLOAT]: - return self.BOOL - - type_lit = type(val_lit) - - # If a float, stop here - # float is highest after varchar - if type_lit == float or cmp_type == self.FLOAT: - return self.FLOAT + elif isinstance(value, float) or cmp_type == self.FLOAT: + result = self.FLOAT # The value is very likely an int # let's get its size # If the compare types are empty and use the types of the current value - if type_lit == int and cmp_type in (self.INT_TYPES + [None, "", self.BOOL]): - + elif isinstance(value, int) and cmp_type in ( + self.INT_TYPES + [None, "", self.BOOL] + ): # Use smallest possible int type above TINYINT - if self.SMALLINT_MIN < val_lit < self.SMALLINT_MAX: - return self.get_bigger_int(self.SMALLINT, cmp_type) - elif self.MEDIUMINT_MIN < val_lit < self.MEDIUMINT_MAX: - return self.get_bigger_int(self.MEDIUMINT, cmp_type) - elif self.INT_MIN < val_lit < self.INT_MAX: - return self.get_bigger_int(self.INT, cmp_type) + if self.SMALLINT_MIN < value < self.SMALLINT_MAX: + result = self.get_bigger_int(self.SMALLINT, cmp_type) + elif self.MEDIUMINT_MIN < value < self.MEDIUMINT_MAX: + result = self.get_bigger_int(self.MEDIUMINT, cmp_type) + elif self.INT_MIN < value < self.INT_MAX: + result = self.get_bigger_int(self.INT, cmp_type) else: - return self.BIGINT + result = self.BIGINT + + else: + # Need to determine who makes it all the way down here + logger.debug(f"Unexpected object type: {type(value)}") + result = cmp_type - # Need to determine who makes it all the way down here - return cmp_type + return result def format_column(self, col, index="", replace_chars=None, col_prefix="_"): """Format the column to meet database contraints. diff --git a/parsons/databases/discover_database.py b/parsons/databases/discover_database.py index 1d51a37112..729afa5cb0 100644 --- a/parsons/databases/discover_database.py +++ b/parsons/databases/discover_database.py @@ -5,7 +5,7 @@ from parsons.databases.redshift import Redshift from parsons.databases.mysql import MySQL from parsons.databases.postgres import Postgres -from parsons.google.google_bigquery import GoogleBigQuery +from parsons.google.google_bigquery import GoogleBigQuery as BigQuery def discover_database( @@ -40,7 +40,7 @@ def discover_database( "Redshift": Redshift, "MySQL": MySQL, "Postgres": Postgres, - "GoogleBigQuery": GoogleBigQuery, + "GoogleBigQuery": BigQuery, } password_vars = { diff --git a/parsons/databases/redshift/redshift.py b/parsons/databases/redshift/redshift.py index 77ed0460c9..a68498579d 100644 --- a/parsons/databases/redshift/redshift.py +++ b/parsons/databases/redshift/redshift.py @@ -724,7 +724,7 @@ def unload( sql: str The SQL string to execute to generate the data to unload. - buckey: str + bucket: str The destination S3 bucket key_prefix: str The prefix of the key names that will be written diff --git a/parsons/etl/etl.py b/parsons/etl/etl.py index e4eef99228..f2b20e94fe 100644 --- a/parsons/etl/etl.py +++ b/parsons/etl/etl.py @@ -7,7 +7,6 @@ class ETL(object): def __init__(self): - pass def add_column(self, column, value=None, index=None, if_exists="fail"): @@ -206,7 +205,6 @@ def get_column_max_width(self, column): max_width = 0 for v in petl.values(self.table, column): - if len(str(v).encode("utf-8")) > max_width: max_width = len(str(v).encode("utf-8")) @@ -314,7 +312,6 @@ def map_columns(self, column_map, exact_match=True): """ for col in self.columns: - if not exact_match: cleaned_col = col.lower().replace("_", "").replace(" ", "") else: @@ -830,7 +827,6 @@ def _prepend_dict(self, dict_obj, prepend): new_dict = {} for k, v in dict_obj.items(): - new_dict[prepend + "_" + k] = v return new_dict diff --git a/parsons/etl/table.py b/parsons/etl/table.py index b0a0fd42fe..1bf6acd943 100644 --- a/parsons/etl/table.py +++ b/parsons/etl/table.py @@ -28,13 +28,11 @@ class Table(ETL, ToFrom): """ def __init__(self, lst=[]): - self.table = None lst_type = type(lst) if lst_type in [list, tuple]: - # Check for empty list if not len(lst): self.table = petl.fromdicts([]) @@ -59,21 +57,16 @@ def __init__(self, lst=[]): self._index_count = 0 def __repr__(self): - return repr(petl.dicts(self.table)) def __iter__(self): - return iter(petl.dicts(self.table)) def __getitem__(self, index): - if isinstance(index, int): - return self.row_data(index) elif isinstance(index, str): - return self.column_data(index) elif isinstance(index, slice): @@ -81,11 +74,9 @@ def __getitem__(self, index): return [row for row in tblslice] else: - raise TypeError("You must pass a string or an index as a value.") def __bool__(self): - # Try to get a single row from our table head_one = petl.head(self.table) diff --git a/parsons/etl/tofrom.py b/parsons/etl/tofrom.py index 4d564659bf..c87a3cdb12 100644 --- a/parsons/etl/tofrom.py +++ b/parsons/etl/tofrom.py @@ -2,6 +2,7 @@ import json import io import gzip +from typing import Optional from parsons.utilities import files, zip_archive @@ -619,8 +620,39 @@ def to_postgres( pg = Postgres(username=username, password=password, host=host, db=db, port=port) pg.copy(self, table_name, **copy_args) - def to_petl(self): + def to_bigquery( + self, + table_name: str, + app_creds: Optional[str] = None, + project: Optional[str] = None, + **kwargs, + ): + """ + Write a table to BigQuery + `Args`: + table_name: str + Table name to write to in BigQuery; this should be in `schema.table` format + app_creds: str + A credentials json string or a path to a json file. Not required + if ``GOOGLE_APPLICATION_CREDENTIALS`` env variable set. + project: str + The project which the client is acting on behalf of. If not passed + then will use the default inferred environment. + **kwargs: kwargs + Additional keyword arguments passed into the `.copy()` function (`if_exists`, + `max_errors`, etc.) + + `Returns`: + ``None`` + """ + + from parsons import GoogleBigQuery as BigQuery + + bq = BigQuery(app_creds=app_creds, project=project) + bq.copy(self, table_name=table_name, **kwargs) + + def to_petl(self): return self.table def to_civis( @@ -898,6 +930,35 @@ def from_s3_csv( return cls(petl.cat(*tbls)) + @classmethod + def from_bigquery(cls, sql: str, app_creds: str = None, project: str = None): + """ + Create a ``parsons table`` from a BigQuery statement. + + To pull an entire BigQuery table, use a query like ``SELECT * FROM {{ table }}``. + + `Args`: + sql: str + A valid SQL statement + app_creds: str + A credentials json string or a path to a json file. Not required + if ``GOOGLE_APPLICATION_CREDENTIALS`` env variable set. + project: str + The project which the client is acting on behalf of. If not passed + then will use the default inferred environment. + TODO - Should users be able to pass in kwargs here? For parameters? + + `Returns`: + Parsons Table + See :ref:`parsons-table` for output options. + """ + + from parsons import GoogleBigQuery as BigQuery + + bq = BigQuery(app_creds=app_creds, project=project) + + return bq.query(sql=sql) + @classmethod def from_dataframe(cls, dataframe, include_index=False): """ diff --git a/parsons/google/google_bigquery.py b/parsons/google/google_bigquery.py index f10641c466..2ae3aa8337 100644 --- a/parsons/google/google_bigquery.py +++ b/parsons/google/google_bigquery.py @@ -1,12 +1,16 @@ import pickle -from typing import Optional, Union +from typing import Optional, Union, List import uuid +import logging +import datetime +import random from google.cloud import bigquery from google.cloud.bigquery import dbapi from google.cloud.bigquery.job import LoadJobConfig from google.cloud import exceptions import petl +from contextlib import contextmanager from parsons.databases.table import BaseTable from parsons.databases.database_connector import DatabaseConnector @@ -16,6 +20,8 @@ from parsons.utilities import check_env from parsons.utilities.files import create_temp_file +logger = logging.getLogger(__name__) + BIGQUERY_TYPE_MAP = { "str": "STRING", "float": "FLOAT", @@ -26,6 +32,8 @@ "datetime.time": "TIME", "dict": "RECORD", "NoneType": "STRING", + "UUID": "STRING", + "datetime": "DATETIME", } # Max number of rows that we query at a time, so we can avoid loading huge @@ -59,6 +67,55 @@ def parse_table_name(table_name): return parsed +def ends_with_semicolon(query: str) -> str: + query = query.strip() + if query[-1] == ";": + return query + return query + ";" + + +def map_column_headers_to_schema_field(schema_definition: list) -> list: + """ + Loops through a list of dictionaries and instantiates + google.cloud.bigquery.SchemaField objects. Useful docs + from Google's API can be found here: + https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.schema.SchemaField + + `Args`: + schema_definition: list + This function expects a list of dictionaries in the following format: + + ``` + schema_definition = [ + { + "name": column_name, + "field_type": [INTEGER, STRING, FLOAT, etc.] + }, + { + "name": column_name, + "field_type": [INTEGER, STRING, FLOAT, etc.], + "mode": "REQUIRED" + }, + { + "name": column_name, + "field_type": [INTEGER, STRING, FLOAT, etc.], + "default_value_expression": CURRENT_TIMESTAMP() + } + ] + ``` + + `Returns`: + List of instantiated `SchemaField` objects + """ + + # TODO - Better way to test for this + if isinstance(schema_definition[0], bigquery.SchemaField): + logger.debug("User supplied list of SchemaField objects") + return schema_definition + + return [bigquery.SchemaField(**x) for x in schema_definition] + + class GoogleBigQuery(DatabaseConnector): """ Class for querying BigQuery table and returning the data as Parsons tables. @@ -103,11 +160,568 @@ def __init__(self, app_creds=None, project=None, location=None): self.dialect = "bigquery" + @property + def client(self): + """ + Get the Google BigQuery client to use for making queries. + + `Returns:` + `google.cloud.bigquery.client.Client` + """ + if not self._client: + # Create a BigQuery client to use to make the query + self._client = bigquery.Client(project=self.project, location=self.location) + + return self._client + + @contextmanager + def connection(self): + """ + Generate a BigQuery connection. + The connection is set up as a python "context manager", so it will be closed + automatically when the connection goes out of scope. Note that the BigQuery + API uses jobs to run database operations and, as such, simply has a no-op for + a "commit" function. + + If you would like to manage transactions, please use multi-statement queries + as [outlined here](https://cloud.google.com/bigquery/docs/transactions) + or utilize the `query_with_transaction` method on this class. + + When using the connection, make sure to put it in a ``with`` block (necessary for + any context manager): + ``with bq.connection() as conn:`` + + `Returns:` + Google BigQuery ``connection`` object + """ + conn = self._dbapi.connect(self.client) + try: + yield conn + finally: + conn.close() + + @contextmanager + def cursor(self, connection): + cur = connection.cursor() + try: + yield cur + finally: + cur.close() + + def query( + self, + sql: str, + parameters: Optional[Union[list, dict]] = None, + return_values: bool = True, + ) -> Optional[Table]: + """ + Run a BigQuery query and return the results as a Parsons table. + + To include python variables in your query, it is recommended to pass them as parameters, + following the BigQuery style where parameters are prefixed with `@`s. + Using the ``parameters`` argument ensures that values are escaped properly, and avoids SQL + injection attacks. + + **Parameter Examples** + + .. code-block:: python + + name = "Beatrice O'Brady" + sql = 'SELECT * FROM my_table WHERE name = %s' + rs.query(sql, parameters=[name]) + + .. code-block:: python + + name = "Beatrice O'Brady" + sql = "SELECT * FROM my_table WHERE name = %(name)s" + rs.query(sql, parameters={'name': name}) + + `Args:` + sql: str + A valid BigTable statement + parameters: dict + A dictionary of query parameters for BigQuery. + + `Returns:` + Parsons Table + See :ref:`parsons-table` for output options. + """ + + with self.connection() as connection: + return self.query_with_connection( + sql, connection, parameters=parameters, return_values=return_values + ) + + def query_with_connection( + self, sql, connection, parameters=None, commit=True, return_values: bool = True + ): + """ + Execute a query against the BigQuery database, with an existing connection. + Useful for batching queries together. Will return ``None`` if the query + returns zero rows. + + `Args:` + sql: str + A valid SQL statement + connection: obj + A connection object obtained from ``redshift.connection()`` + parameters: list + A list of python variables to be converted into SQL values in your query + commit: boolean + Must be true. BigQuery + + `Returns:` + Parsons Table + See :ref:`parsons-table` for output options. + """ + + if not commit: + raise ValueError( + """ + BigQuery implementation uses an API client which always auto-commits. + If you wish to wrap multiple queries in a transaction, use + Mulit-Statement transactions within a single query as outlined + here: https://cloud.google.com/bigquery/docs/transactions or use the + `query_with_transaction` method on the BigQuery connector. + """ + ) + + # get our connection and cursor + with self.cursor(connection) as cursor: + # Run the query + cursor.execute(sql, parameters) + + if not return_values: + return None + + final_table = self._fetch_query_results(cursor=cursor) + + return final_table + + def query_with_transaction(self, queries, parameters=None): + queries_with_semicolons = [ends_with_semicolon(q) for q in queries] + queries_on_newlines = "\n".join(queries_with_semicolons) + queries_wrapped = f""" + BEGIN + BEGIN TRANSACTION; + {queries_on_newlines} + COMMIT TRANSACTION; + END; + """ + self.query(sql=queries_wrapped, parameters=parameters, return_values=False) + + def copy_from_gcs( + self, + gcs_blob_uri: str, + table_name: str, + if_exists: str = "fail", + max_errors: int = 0, + data_type: str = "csv", + csv_delimiter: str = ",", + ignoreheader: int = 1, + nullas: Optional[str] = None, + allow_quoted_newlines: bool = True, + allow_jagged_rows: bool = True, + quote: Optional[str] = None, + schema: Optional[List[dict]] = None, + job_config: Optional[LoadJobConfig] = None, + force_unzip_blobs: bool = False, + compression_type: str = "gzip", + new_file_extension: str = "csv", + **load_kwargs, + ): + """ + Copy a csv saved in Google Cloud Storage into Google BigQuery. + + `Args:` + gcs_blob_uri: str + The GoogleCloudStorage URI referencing the file to be copied. + table_name: str + The table name to load the data into. + if_exists: str + If the table already exists, either ``fail``, ``append``, ``drop`` + or ``truncate`` the table. This maps to `write_disposition` in the + `LoadJobConfig` class. + max_errors: int + The maximum number of rows that can error and be skipped before + the job fails. This maps to `max_bad_records` in the `LoadJobConfig` class. + data_type: str + Denotes whether target file is a JSON or CSV + csv_delimiter: str + Character used to separate values in the target file + ignoreheader: int + Treats the specified number_rows as a file header and doesn't load them + nullas: str + Loads fields that match null_string as NULL, where null_string can be any string + allow_quoted_newlines: bool + If True, detects quoted new line characters within a CSV field and does + not interpret the quoted new line character as a row boundary + allow_jagged_rows: bool + Allow missing trailing optional columns (CSV only). + quote: str + The value that is used to quote data sections in a CSV file. + BigQuery converts the string to ISO-8859-1 encoding, and then uses the first byte of + the encoded string to split the data in its raw, binary state. + schema: list + BigQuery expects a list of dictionaries in the following format + ``` + schema = [ + {"name": "column_name", "type": STRING}, + {"name": "another_column_name", "type": INT} + ] + ``` + job_config: object + A LoadJobConfig object to provide to the underlying call to load_table_from_uri + on the BigQuery client. The function will create its own if not provided. Note + if there are any conflicts between the job_config and other parameters, the + job_config values are preferred. + force_unzip_blobs: bool + If True, target blobs will be unzipped before being loaded to BigQuery. + compression_type: str + Accepts `zip` or `gzip` values to differentially unzip a compressed + blob in cloud storage. + new_file_extension: str + Provides a file extension if a blob is decompressed and rewritten + to cloud storage. + **load_kwargs: kwargs + Other arguments to pass to the underlying load_table_from_uri + call on the BigQuery client. + """ + if if_exists not in ["fail", "truncate", "append", "drop"]: + raise ValueError( + f"Unexpected value for if_exists: {if_exists}, must be one of " + '"append", "drop", "truncate", or "fail"' + ) + if data_type not in ["csv", "json"]: + raise ValueError( + f"Only supports csv or json files [data_type = {data_type}]" + ) + + table_exists = self.table_exists(table_name) + + job_config = self._process_job_config( + job_config=job_config, + table_exists=table_exists, + table_name=table_name, + if_exists=if_exists, + max_errors=max_errors, + data_type=data_type, + csv_delimiter=csv_delimiter, + ignoreheader=ignoreheader, + nullas=nullas, + allow_quoted_newlines=allow_quoted_newlines, + allow_jagged_rows=allow_jagged_rows, + quote=quote, + schema=schema, + ) + + # load CSV from Cloud Storage into BigQuery + table_ref = get_table_ref(self.client, table_name) + + try: + if force_unzip_blobs: + self.copy_large_compressed_file_from_gcs( + gcs_blob_uri=gcs_blob_uri, + table_name=table_name, + if_exists=if_exists, + max_errors=max_errors, + data_type=data_type, + csv_delimiter=csv_delimiter, + ignoreheader=ignoreheader, + nullas=nullas, + allow_quoted_newlines=allow_quoted_newlines, + quote=quote, + schema=schema, + job_config=job_config, + compression_type=compression_type, + new_file_extension=new_file_extension, + ) + else: + load_job = self.client.load_table_from_uri( + source_uris=gcs_blob_uri, + destination=table_ref, + job_config=job_config, + **load_kwargs, + ) + load_job.result() + except exceptions.BadRequest as e: + if "one of the files is larger than the maximum allowed size." in str(e): + logger.debug( + f"{gcs_blob_uri.split('/')[-1]} exceeds max size ... \ + running decompression function..." + ) + + self.copy_large_compressed_file_from_gcs( + gcs_blob_uri=gcs_blob_uri, + table_name=table_name, + if_exists=if_exists, + max_errors=max_errors, + data_type=data_type, + csv_delimiter=csv_delimiter, + ignoreheader=ignoreheader, + nullas=nullas, + allow_quoted_newlines=allow_quoted_newlines, + quote=quote, + schema=schema, + job_config=job_config, + compression_type=compression_type, + new_file_extension=new_file_extension, + ) + elif "Schema has no field" in str(e): + logger.debug(f"{gcs_blob_uri.split('/')[-1]} is empty, skipping file") + return "Empty file" + + elif "encountered too many errors, giving up" in str(e): + # TODO - Is this TOO verbose? + logger.error(f"Max errors exceeded for {gcs_blob_uri.split('/')[-1]}") + + for error_ in load_job.errors: + logger.error(error_) + + raise e + + else: + raise e + + def copy_large_compressed_file_from_gcs( + self, + gcs_blob_uri: str, + table_name: str, + if_exists: str = "fail", + max_errors: int = 0, + data_type: str = "csv", + csv_delimiter: str = ",", + ignoreheader: int = 1, + nullas: Optional[str] = None, + allow_quoted_newlines: bool = True, + allow_jagged_rows: bool = True, + quote: Optional[str] = None, + schema: Optional[List[dict]] = None, + job_config: Optional[LoadJobConfig] = None, + compression_type: str = "gzip", + new_file_extension: str = "csv", + **load_kwargs, + ): + """ + Copy a compressed CSV file that exceeds the maximum size in Google Cloud Storage + into Google BigQuery. + + `Args:` + gcs_blob_uri: str + The GoogleCloudStorage URI referencing the file to be copied. + table_name: str + The table name to load the data into. + if_exists: str + If the table already exists, either ``fail``, ``append``, ``drop`` + or ``truncate`` the table. This maps to `write_disposition` in the + `LoadJobConfig` class. + max_errors: int + The maximum number of rows that can error and be skipped before + the job fails. This maps to `max_bad_records` in the `LoadJobConfig` class. + data_type: str + Denotes whether target file is a JSON or CSV + csv_delimiter: str + Character used to separate values in the target file + ignoreheader: int + Treats the specified number_rows as a file header and doesn't load them + nullas: str + Loads fields that match null_string as NULL, where null_string can be any string + allow_quoted_newlines: bool + If True, detects quoted new line characters within a CSV field + and does not interpret the quoted new line character as a row boundary + allow_jagged_rows: bool + Allow missing trailing optional columns (CSV only). + quote: str + The value that is used to quote data sections in a CSV file. + BigQuery converts the string to ISO-8859-1 encoding, and then uses the first byte of + the encoded string to split the data in its raw, binary state. + schema: list + BigQuery expects a list of dictionaries in the following format + ``` + schema = [ + {"name": "column_name", "type": STRING}, + {"name": "another_column_name", "type": INT} + ] + ``` + job_config: object + A LoadJobConfig object to provide to the underlying call to load_table_from_uri + on the BigQuery client. The function will create its own if not provided. Note + if there are any conflicts between the job_config and other parameters, the + job_config values are preferred. + compression_type: str + Accepts `zip` or `gzip` values to differentially unzip a compressed + blob in cloud storage. + new_file_extension: str + Provides a file extension if a blob is decompressed and rewritten to cloud storage. + **load_kwargs: kwargs + Other arguments to pass to the underlying load_table_from_uri call on the BigQuery + client. + """ + + if if_exists not in ["fail", "truncate", "append", "drop"]: + raise ValueError( + f"Unexpected value for if_exists: {if_exists}, must be one of " + '"append", "drop", "truncate", or "fail"' + ) + if data_type not in ["csv", "json"]: + raise ValueError( + f"Only supports csv or json files [data_type = {data_type}]" + ) + + table_exists = self.table_exists(table_name) + + job_config = self._process_job_config( + job_config=job_config, + table_exists=table_exists, + table_name=table_name, + if_exists=if_exists, + max_errors=max_errors, + data_type=data_type, + csv_delimiter=csv_delimiter, + ignoreheader=ignoreheader, + nullas=nullas, + allow_quoted_newlines=allow_quoted_newlines, + allow_jagged_rows=allow_jagged_rows, + quote=quote, + schema=schema, + ) + + # TODO - See if this inheritance is happening in other places + gcs = GoogleCloudStorage(app_creds=self.app_creds, project=self.project) + old_bucket_name, old_blob_name = gcs.split_uri(gcs_uri=gcs_blob_uri) + + uncompressed_gcs_uri = None + + try: + logger.debug("Unzipping large file") + uncompressed_gcs_uri = gcs.unzip_blob( + bucket_name=old_bucket_name, + blob_name=old_blob_name, + new_file_extension=new_file_extension, + compression_type=compression_type, + ) + + logger.debug( + f"Loading uncompressed uri into BigQuery {uncompressed_gcs_uri}..." + ) + table_ref = get_table_ref(self.client, table_name) + load_job = self.client.load_table_from_uri( + source_uris=uncompressed_gcs_uri, + destination=table_ref, + job_config=job_config, + **load_kwargs, + ) + load_job.result() + finally: + if uncompressed_gcs_uri: + new_bucket_name, new_blob_name = gcs.split_uri( + gcs_uri=uncompressed_gcs_uri + ) + gcs.delete_blob(new_bucket_name, new_blob_name) + logger.debug("Successfully dropped uncompressed blob") + + def copy_s3( + self, + table_name, + bucket, + key, + if_exists: str = "fail", + max_errors: int = 0, + data_type: str = "csv", + csv_delimiter: str = ",", + ignoreheader: int = 1, + nullas: Optional[str] = None, + aws_access_key_id: Optional[str] = None, + aws_secret_access_key: Optional[str] = None, + gcs_client: Optional[GoogleCloudStorage] = None, + tmp_gcs_bucket: Optional[str] = None, + job_config: Optional[LoadJobConfig] = None, + **load_kwargs, + ): + """ + Copy a file from s3 to BigQuery. + + `Args:` + table_name: str + The table name and schema (``tmc.cool_table``) to point the file. + bucket: str + The s3 bucket where the file or manifest is located. + key: str + The key of the file or manifest in the s3 bucket. + if_exists: str + If the table already exists, either ``fail``, ``append``, ``drop`` + or ``truncate`` the table. + max_errors: int + The maximum number of rows that can error and be skipped before + the job fails. + data_type: str + The data type of the file. Only ``csv`` supported currently. + csv_delimiter: str + The delimiter of the ``csv``. Only relevant if data_type is ``csv``. + ignoreheader: int + The number of header rows to skip. Ignored if data_type is ``json``. + nullas: str + Loads fields that match string as NULL + aws_access_key_id: + An AWS access key granted to the bucket where the file is located. Not required + if keys are stored as environmental variables. + aws_secret_access_key: + An AWS secret access key granted to the bucket where the file is located. Not + required if keys are stored as environmental variables. + gcs_client: object + The GoogleCloudStorage Connector to use for loading data into Google Cloud Storage. + tmp_gcs_bucket: str + The name of the Google Cloud Storage bucket to use to stage the data to load + into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified. + job_config: object + A LoadJobConfig object to provide to the underlying call to load_table_from_uri + on the BigQuery client. The function will create its own if not provided. Note + if there are any conflicts between the job_config and other parameters, the + job_config values are preferred. + + `Returns` + Parsons Table or ``None`` + See :ref:`parsons-table` for output options. + """ + + # copy from S3 to GCS + tmp_gcs_bucket = check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket) + gcs_client = gcs_client or GoogleCloudStorage() + temp_blob_uri = gcs_client.copy_s3_to_gcs( + aws_source_bucket=bucket, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + gcs_sink_bucket=tmp_gcs_bucket, + aws_s3_key=key, + ) + temp_blob_name = key + temp_blob_uri = gcs_client.format_uri( + bucket=tmp_gcs_bucket, name=temp_blob_name + ) + + # load CSV from Cloud Storage into BigQuery + try: + self.copy_from_gcs( + gcs_blob_uri=temp_blob_uri, + table_name=table_name, + if_exists=if_exists, + max_errors=max_errors, + data_type=data_type, + csv_delimiter=csv_delimiter, + ignoreheader=ignoreheader, + nullas=nullas, + job_config=job_config, + **load_kwargs, + ) + finally: + gcs_client.delete_blob(tmp_gcs_bucket, temp_blob_name) + def copy( self, tbl: Table, table_name: str, if_exists: str = "fail", + max_errors: int = 0, tmp_gcs_bucket: Optional[str] = None, gcs_client: Optional[GoogleCloudStorage] = None, job_config: Optional[LoadJobConfig] = None, @@ -117,13 +731,16 @@ def copy( Copy a :ref:`parsons-table` into Google BigQuery via Google Cloud Storage. `Args:` - table_obj: obj + tbl: obj The Parsons Table to copy into BigQuery. table_name: str The table name to load the data into. if_exists: str If the table already exists, either ``fail``, ``append``, ``drop`` or ``truncate`` the table. + max_errors: int + The maximum number of rows that can error and be skipped before + the job fails. tmp_gcs_bucket: str The name of the Google Cloud Storage bucket to use to stage the data to load into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified. @@ -138,53 +755,183 @@ def copy( """ tmp_gcs_bucket = check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket) - if if_exists not in ["fail", "truncate", "append", "drop"]: - raise ValueError( - f"Unexpected value for if_exists: {if_exists}, must be one of " - '"append", "drop", "truncate", or "fail"' - ) - - table_exists = self.table_exists(table_name) - - if not job_config: - job_config = bigquery.LoadJobConfig() - + # if not job_config: + job_config = bigquery.LoadJobConfig() if not job_config.schema: - job_config.schema = self._generate_schema(tbl) - - if not job_config.create_disposition: - job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED - job_config.skip_leading_rows = 1 - job_config.source_format = bigquery.SourceFormat.CSV - job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY - - if table_exists: - if if_exists == "fail": - raise ValueError("Table already exists.") - elif if_exists == "drop": - self.delete_table(table_name) - elif if_exists == "append": - job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND - elif if_exists == "truncate": - job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE + job_config.schema = self._generate_schema_from_parsons_table(tbl) gcs_client = gcs_client or GoogleCloudStorage() temp_blob_name = f"{uuid.uuid4()}.csv" temp_blob_uri = gcs_client.upload_table(tbl, tmp_gcs_bucket, temp_blob_name) # load CSV from Cloud Storage into BigQuery - table_ref = get_table_ref(self.client, table_name) try: - load_job = self.client.load_table_from_uri( - temp_blob_uri, - table_ref, + self.copy_from_gcs( + gcs_blob_uri=temp_blob_uri, + table_name=table_name, + if_exists=if_exists, + max_errors=max_errors, job_config=job_config, **load_kwargs, ) - load_job.result() finally: gcs_client.delete_blob(tmp_gcs_bucket, temp_blob_name) + def duplicate_table( + self, + source_table, + destination_table, + if_exists="fail", + drop_source_table=False, + ): + """ + Create a copy of an existing table (or subset of rows) in a new + table. + + `Args:` + source_table: str + Name of existing schema and table (e.g. ``myschema.oldtable``) + destination_table: str + Name of destination schema and table (e.g. ``myschema.newtable``) + if_exists: str + If the table already exists, either ``fail``, ``replace``, or + ``ignore`` the operation. + drop_source_table: boolean + Drop the source table + """ + if if_exists not in ["fail", "replace", "ignore"]: + raise ValueError("Invalid value for `if_exists` argument") + if if_exists == "fail" and self.table_exists(destination_table): + raise ValueError("Table already exists.") + + table__replace_clause = "OR REPLACE " if if_exists == "replace" else "" + table__exists_clause = " IF NOT EXISTS" if if_exists == "ignore" else "" + + query = f""" + CREATE {table__replace_clause}TABLE{table__exists_clause} + {destination_table} + CLONE {source_table} + """ + self.query(sql=query, return_values=False) + if drop_source_table: + self.delete_table(table_name=source_table) + + def upsert( + self, + table_obj, + target_table, + primary_key, + distinct_check=True, + cleanup_temp_table=True, + from_s3=False, + **copy_args, + ): + """ + Preform an upsert on an existing table. An upsert is a function in which rows + in a table are updated and inserted at the same time. + + `Args:` + table_obj: obj + A Parsons table object + target_table: str + The schema and table name to upsert + primary_key: str or list + The primary key column(s) of the target table + distinct_check: boolean + Check if the primary key column is distinct. Raise error if not. + cleanup_temp_table: boolean + A temp table is dropped by default on cleanup. You can set to False for debugging. + from_s3: boolean + Instead of specifying a table_obj (set the first argument to None), + set this to True and include :func:`~parsons.databases.bigquery.Bigquery.copy_s3` + arguments to upsert a pre-existing s3 file into the target_table + \**copy_args: kwargs + See :func:`~parsons.databases.bigquery.BigQuery.copy` for options. + """ # noqa: W605 + if not self.table_exists(target_table): + logger.info( + "Target table does not exist. Copying into newly \ + created target table." + ) + + self.copy(table_obj, target_table) + return None + + if isinstance(primary_key, str): + primary_keys = [primary_key] + else: + primary_keys = primary_key + + if distinct_check: + primary_keys_statement = ", ".join(primary_keys) + diff = self.query( + f""" + select ( + select count(*) + from {target_table} + ) - ( + SELECT COUNT(*) from ( + select distinct {primary_keys_statement} + from {target_table} + ) + ) as total_count + """ + ).first + if diff > 0: + raise ValueError("Primary key column contains duplicate values.") + + noise = f"{random.randrange(0, 10000):04}"[:4] + date_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M") + # Generate a temp table like "table_tmp_20200210_1230_14212" + staging_tbl = f"{target_table}_stg_{date_stamp}_{noise}" + + # Copy to a staging table + logger.info(f"Building staging table: {staging_tbl}") + + if from_s3: + if table_obj is not None: + raise ValueError( + "upsert(... from_s3=True) requires the first argument (table_obj)" + " to be None. from_s3 and table_obj are mutually exclusive." + ) + self.copy_s3(staging_tbl, template_table=target_table, **copy_args) + + else: + self.copy( + tbl=table_obj, + table_name=staging_tbl, + template_table=target_table, + **copy_args, + ) + + staging_table_name = staging_tbl.split(".")[1] + target_table_name = target_table.split(".")[1] + + # Delete rows + comparisons = [ + f"{staging_table_name}.{primary_key} = {target_table_name}.{primary_key}" + for primary_key in primary_keys + ] + where_clause = " and ".join(comparisons) + + queries = [ + f""" + DELETE FROM {target_table} + USING {staging_tbl} + WHERE {where_clause} + """, + f""" + INSERT INTO {target_table} + SELECT * FROM {staging_tbl} + """, + ] + + if cleanup_temp_table: + # Drop the staging table + queries.append(f"DROP TABLE IF EXISTS {staging_tbl}") + + return self.query_with_transaction(queries=queries) + def delete_table(self, table_name): """ Delete a BigQuery table. @@ -196,47 +943,234 @@ def delete_table(self, table_name): table_ref = get_table_ref(self.client, table_name) self.client.delete_table(table_ref) - def query( - self, sql: str, parameters: Optional[Union[list, dict]] = None - ) -> Optional[Table]: + def table_exists(self, table_name: str) -> bool: """ - Run a BigQuery query and return the results as a Parsons table. - - To include python variables in your query, it is recommended to pass them as parameters, - following the BigQuery style where parameters are prefixed with `@`s. - Using the ``parameters`` argument ensures that values are escaped properly, and avoids SQL - injection attacks. + Check whether or not the Google BigQuery table exists in the specified dataset. - **Parameter Examples** + `Args:` + table_name: str + The name of the BigQuery table to check for + `Returns:` + bool + True if the table exists in the specified dataset, false otherwise + """ + table_ref = get_table_ref(self.client, table_name) + try: + self.client.get_table(table_ref) + except exceptions.NotFound: + return False - .. code-block:: python + return True - name = "Beatrice O'Brady" - sql = 'SELECT * FROM my_table WHERE name = %s' - rs.query(sql, parameters=[name]) + def get_tables(self, schema, table_name: Optional[str] = None): + """ + List the tables in a schema including metadata. - .. code-block:: python + Args: + schema: str + Filter by a schema + table_name: str + Filter by a table name + `Returns:` + Parsons Table + See :ref:`parsons-table` for output options. + """ - name = "Beatrice O'Brady" - sql = "SELECT * FROM my_table WHERE name = %(name)s" - rs.query(sql, parameters={'name': name}) + logger.debug("Retrieving tables info.") + sql = f"select * from {schema}.INFORMATION_SCHEMA.TABLES" + if table_name: + sql += f" where table_name = '{table_name}'" + return self.query(sql) - `Args:` - sql: str - A valid BigTable statement - parameters: dict - A dictionary of query parameters for BigQuery. + def get_views(self, schema, view: Optional[str] = None): + """ + List views. + Args: + schema: str + Filter by a schema + view: str + Filter by a table name `Returns:` Parsons Table See :ref:`parsons-table` for output options. """ - # get our connection and cursor - cursor = self._dbapi.connect(self.client).cursor() - # Run the query - cursor.execute(sql, parameters) + logger.debug("Retrieving views info.") + sql = f""" + select + table_schema as schema_name, + table_name as view_name, + view_definition + from {schema}.INFORMATION_SCHEMA.VIEWS + """ + if view: + sql += f" where table_name = '{view}'" + return self.query(sql) + + def get_columns(self, schema: str, table_name: str): + """ + Gets the column names (and other column metadata) for a table. If you + need just the column names run ``get_columns_list()``, as it is faster. + + `Args:` + schema: str + The schema name + table_name: str + The table name + + `Returns:` + A dictionary mapping column name to a dictionary with extra info. The + keys of the dictionary are ordered just liked the columns in the table. + The extra info is a dict with format + """ + + base_query = f""" + SELECT + * + FROM `{self.project}.{schema}.INFORMATION_SCHEMA.COLUMNS` + WHERE + table_name = '{table_name}' + """ + + logger.debug(base_query) + + return { + row["column_name"]: { + "data_type": row["data_type"], + "is_nullable": row["is_nullable"], + "is_updatable": row["is_updatable"], + "is_partioning_column": row["is_partitioning_column"], + "rounding_mode": row["rounding_mode"], + } + for row in self.query(base_query) + } + def get_columns_list(self, schema: str, table_name: str) -> list: + """ + Gets the column names for a table. + + `Args:` + schema: str + The schema name + table_name: str + The table name + + `Returns:` + A list of column names + """ + + first_row = self.query(f"SELECT * FROM {schema}.{table_name} LIMIT 1;") + + return [x for x in first_row.columns] + + def get_row_count(self, schema: str, table_name: str) -> int: + """ + Gets the row count for a BigQuery materialization. + + `Args`: + schema: str + The schema name + table_name: str + The table name + + `Returns:` + Row count of the target table + """ + + sql = f"SELECT COUNT(*) AS row_count FROM `{schema}.{table_name}`" + result = self.query(sql=sql) + + return result["row_count"][0] + + def _generate_schema_from_parsons_table(self, tbl): + stats = tbl.get_columns_type_stats() + fields = [] + for stat in stats: + petl_types = stat["type"] + best_type = "str" if "str" in petl_types else petl_types[0] + field_type = self._bigquery_type(best_type) + field = bigquery.schema.SchemaField(stat["name"], field_type) + fields.append(field) + return fields + + def _process_job_config( + self, job_config: Optional[LoadJobConfig] = None, **kwargs + ) -> LoadJobConfig: + """ + Internal function to neatly process a user-supplied job configuration object. + As a convention, if both the job_config and keyword arguments specify a value, + we defer to the job_config. + + `Args`: + job_config: `LoadJobConfig` + Optionally supplied GCS `LoadJobConfig` object + + `Returns`: + A `LoadJobConfig` object + """ + + if not job_config: + job_config = bigquery.LoadJobConfig() + + if not job_config.schema: + if kwargs["schema"]: + logger.debug("Using user-supplied schema definition...") + job_config.schema = map_column_headers_to_schema_field(kwargs["schema"]) + job_config.autodetect = False + else: + logger.debug("Autodetecting schema definition...") + job_config.autodetect = True + + if not job_config.create_disposition: + job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED + + if not job_config.max_bad_records: + job_config.max_bad_records = kwargs["max_errors"] + + if not job_config.skip_leading_rows and kwargs["data_type"] == "csv": + job_config.skip_leading_rows = kwargs["ignoreheader"] + + if not job_config.source_format: + job_config.source_format = ( + bigquery.SourceFormat.CSV + if kwargs["data_type"] == "csv" + else bigquery.SourceFormat.NEWLINE_DELIMITED_JSON + ) + + if not job_config.field_delimiter: + if kwargs["data_type"] == "csv": + job_config.field_delimiter = kwargs["csv_delimiter"] + if kwargs["nullas"]: + job_config.null_marker = kwargs["nullas"] + + if not job_config.write_disposition: + if kwargs["if_exists"] == "append": + job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND + elif kwargs["if_exists"] == "truncate": + job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE + elif kwargs["table_exists"] and kwargs["if_exists"] == "fail": + raise Exception("Table already exists.") + elif kwargs["if_exists"] == "drop" and kwargs["table_exists"]: + self.delete_table(kwargs["table_name"]) + job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY + else: + job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY + + if not job_config.allow_quoted_newlines: + job_config.allow_quoted_newlines = kwargs["allow_quoted_newlines"] + + if kwargs["data_type"] == "csv" and kwargs["allow_jagged_rows"]: + job_config.allow_jagged_rows = kwargs["allow_jagged_rows"] + else: + job_config.allow_jagged_rows = True + + if not job_config.quote_character and kwargs["quote"]: + job_config.quote_character = kwargs["quote"] + + return job_config + + def _fetch_query_results(self, cursor) -> Table: # We will use a temp file to cache the results so that they are not all living # in memory. We'll use pickle to serialize the results to file in order to maintain # the proper data types (e.g. integer). @@ -267,53 +1201,7 @@ def query( return None ptable = petl.frompickle(temp_filename) - final_table = Table(ptable) - - return final_table - - def table_exists(self, table_name: str) -> bool: - """ - Check whether or not the Google BigQuery table exists in the specified dataset. - - `Args:` - table_name: str - The name of the BigQuery table to check for - `Returns:` - bool - True if the table exists in the specified dataset, false otherwise - """ - table_ref = get_table_ref(self.client, table_name) - try: - self.client.get_table(table_ref) - except exceptions.NotFound: - return False - - return True - - @property - def client(self): - """ - Get the Google BigQuery client to use for making queries. - - `Returns:` - `google.cloud.bigquery.client.Client` - """ - if not self._client: - # Create a BigQuery client to use to make the query - self._client = bigquery.Client(project=self.project, location=self.location) - - return self._client - - def _generate_schema(self, tbl): - stats = tbl.get_columns_type_stats() - fields = [] - for stat in stats: - petl_types = stat["type"] - best_type = "str" if "str" in petl_types else petl_types[0] - field_type = self._bigquery_type(best_type) - field = bigquery.schema.SchemaField(stat["name"], field_type) - fields.append(field) - return fields + return Table(ptable) @staticmethod def _bigquery_type(tp): @@ -326,7 +1214,7 @@ def table(self, table_name): class BigQueryTable(BaseTable): - # BigQuery table object. + """BigQuery table object.""" def drop(self, cascade=False): """ @@ -339,16 +1227,5 @@ def truncate(self): """ Truncate the table. """ - # BigQuery does not support truncate natively, so we will "load" an empty dataset - # with write disposition of "truncate" - table_ref = get_table_ref(self.db.client, self.table) - bq_table = self.db.client.get_table(table_ref) - # BigQuery wants the schema when we load the data, so we will grab it from the table - job_config = bigquery.LoadJobConfig() - job_config.schema = bq_table.schema - - empty_table = Table([]) - self.db.copy( - empty_table, self.table, if_exists="truncate", job_config=job_config - ) + self.db.query(f"TRUNCATE TABLE {self.table}") diff --git a/parsons/google/google_cloud_storage.py b/parsons/google/google_cloud_storage.py index 0b6065ef2b..19993c5768 100644 --- a/parsons/google/google_cloud_storage.py +++ b/parsons/google/google_cloud_storage.py @@ -1,9 +1,15 @@ import google from google.cloud import storage +from google.cloud import storage_transfer from parsons.google.utitities import setup_google_application_credentials from parsons.utilities import files import datetime import logging +import time +import uuid +import gzip +import zipfile +from typing import Optional logger = logging.getLogger(__name__) @@ -35,17 +41,17 @@ class GoogleCloudStorage(object): """ def __init__(self, app_creds=None, project=None): - setup_google_application_credentials(app_creds) + self.project = project # Throws an error if you pass project=None, so adding if/else statement. - if not project: + if not self.project: self.client = storage.Client() """ Access all methods of `google.cloud` package """ else: - self.client = storage.Client(project=project) + self.client = storage.Client(project=self.project) def list_buckets(self): """ @@ -71,10 +77,10 @@ def bucket_exists(self, bucket_name): """ if bucket_name in self.list_buckets(): - logger.info(f"{bucket_name} exists.") + logger.debug(f"{bucket_name} exists.") return True else: - logger.info(f"{bucket_name} does not exist.") + logger.debug(f"{bucket_name} does not exist.") return False def get_bucket(self, bucket_name): @@ -93,7 +99,7 @@ def get_bucket(self, bucket_name): else: raise google.cloud.exceptions.NotFound("Bucket not found") - logger.info(f"Returning {bucket_name} object") + logger.debug(f"Returning {bucket_name} object") return bucket def create_bucket(self, bucket_name): @@ -107,7 +113,7 @@ def create_bucket(self, bucket_name): ``None`` """ - # To Do: Allow user to set all of the bucket parameters + # TODO: Allow user to set all of the bucket parameters self.client.create_bucket(bucket_name) logger.info(f"Created {bucket_name} bucket.") @@ -130,7 +136,7 @@ def delete_bucket(self, bucket_name, delete_blobs=False): bucket.delete(force=delete_blobs) logger.info(f"{bucket_name} bucket deleted.") - def list_blobs(self, bucket_name, max_results=None, prefix=None): + def list_blobs(self, bucket_name, max_results=None, prefix=None, match_glob=None): """ List all of the blobs in a bucket @@ -138,15 +144,19 @@ def list_blobs(self, bucket_name, max_results=None, prefix=None): bucket_name: str The name of the bucket max_results: int - TBD - prefix_filter: str + Maximum number of blobs to return + prefix: str A prefix to filter files + match_glob: str + Filters files based on glob string. NOTE that the match_glob + parameter runs on the full blob URI, include a preceding wildcard + value to account for nested files (*/ for one level, **/ for n levels) `Returns:` A list of blob names """ blobs = self.client.list_blobs( - bucket_name, max_results=max_results, prefix=prefix + bucket_name, max_results=max_results, prefix=prefix, match_glob=match_glob ) lst = [b.name for b in blobs] logger.info(f"Found {len(lst)} in {bucket_name} bucket.") @@ -167,10 +177,10 @@ def blob_exists(self, bucket_name, blob_name): """ if blob_name in self.list_blobs(bucket_name): - logger.info(f"{blob_name} exists.") + logger.debug(f"{blob_name} exists.") return True else: - logger.info(f"{blob_name} does not exist.") + logger.debug(f"{blob_name} does not exist.") return False def get_blob(self, bucket_name, blob_name): @@ -191,15 +201,15 @@ def get_blob(self, bucket_name, blob_name): logger.debug(f"Got {blob_name} object from {bucket_name} bucket.") return blob - def put_blob(self, bucket_name, blob_name, local_path): + def put_blob(self, bucket_name, blob_name, local_path, **kwargs): """ Puts a blob (aka file) in a bucket `Args:` - blob_name: - The name of blob to be stored in the bucket bucket_name: The name of the bucket to store the blob + blob_name: + The name of blob to be stored in the bucket local_path: str The local path of the file to upload `Returns:` @@ -210,7 +220,7 @@ def put_blob(self, bucket_name, blob_name, local_path): blob = storage.Blob(blob_name, bucket) with open(local_path, "rb") as f: - blob.upload_from_file(f) + blob.upload_from_file(f, **kwargs) logger.info(f"{blob_name} put in {bucket_name} bucket.") @@ -238,10 +248,10 @@ def download_blob(self, bucket_name, blob_name, local_path=None): bucket = storage.Bucket(self.client, name=bucket_name) blob = storage.Blob(blob_name, bucket) - logger.info(f"Downloading {blob_name} from {bucket_name} bucket.") + logger.debug(f"Downloading {blob_name} from {bucket_name} bucket.") with open(local_path, "wb") as f: blob.download_to_file(f, client=self.client) - logger.info(f"{blob_name} saved to {local_path}.") + logger.debug(f"{blob_name} saved to {local_path}.") return local_path @@ -277,6 +287,11 @@ def upload_table( The name of the blob to upload the data into. data_type: str The file format to use when writing the data. One of: `csv` or `json` + default_acl: + ACL desired for newly uploaded table + + `Returns`: + String representation of file URI in GCS """ bucket = storage.Bucket(self.client, name=bucket_name) blob = storage.Blob(blob_name, bucket) @@ -328,3 +343,290 @@ def get_url(self, bucket_name, blob_name, expires_in=60): method="GET", ) return url + + def copy_bucket_to_gcs( + self, + gcs_sink_bucket: str, + source: str, + source_bucket: str, + destination_path: str = "", + source_path: str = "", + aws_access_key_id: Optional[str] = None, + aws_secret_access_key: Optional[str] = None, + ): + """ + Creates a one-time transfer job from Amazon S3 to Google Cloud + Storage. Copies all blobs within the bucket unless a key or prefix + is passed. + + `Args`: + gcs_sink_bucket (str): + Destination for the data transfer (located in GCS) + source (str): + File storge vendor [gcs or s3] + source_bucket (str): + Source bucket name + source_path (str): + Path in the source system pointing to the relevant keys + / files to sync. Must end in a '/' + aws_access_key_id (str): + Access key to authenticate storage transfer + aws_secret_access_key (str): + Secret key to authenticate storage transfer + """ + if source not in ["gcs", "s3"]: + raise ValueError( + f"Blob transfer only supports gcs and s3 sources [source={source}]" + ) + if source_path and source_path[-1] != "/": + raise ValueError("Source path much end in a '/'") + + client = storage_transfer.StorageTransferServiceClient() + + now = datetime.datetime.utcnow() + # Setting the start date and the end date as + # the same time creates a one-time transfer + one_time_schedule = {"day": now.day, "month": now.month, "year": now.year} + + if source == "gcs": + description = f"""One time GCS to GCS Transfer + [{source_bucket} -> {gcs_sink_bucket}] - {uuid.uuid4()}""" + elif source == "s3": + description = f"""One time S3 to GCS Transfer + [{source_bucket} -> {gcs_sink_bucket}] - {uuid.uuid4()}""" + + transfer_job_config = { + "project_id": self.project, + "description": description, + "status": storage_transfer.TransferJob.Status.ENABLED, + "schedule": { + "schedule_start_date": one_time_schedule, + "schedule_end_date": one_time_schedule, + }, + } + + # Setup transfer job configuration based on user imput + if source == "s3": + blob_storage = "S3" + transfer_job_config["transfer_spec"] = { + "aws_s3_data_source": { + "bucket_name": source_bucket, + "path": source_path, + "aws_access_key": { + "access_key_id": aws_access_key_id, + "secret_access_key": aws_secret_access_key, + }, + }, + "gcs_data_sink": { + "bucket_name": gcs_sink_bucket, + "path": destination_path, + }, + } + elif source == "gcs": + blob_storage = "GCS" + transfer_job_config["transfer_spec"] = { + "gcs_data_source": { + "bucket_name": source_bucket, + "path": source_path, + }, + "gcs_data_sink": { + "bucket_name": gcs_sink_bucket, + "path": destination_path, + }, + } + + create_transfer_job_request = storage_transfer.CreateTransferJobRequest( + {"transfer_job": transfer_job_config} + ) + + # Create the transfer job + create_result = client.create_transfer_job(create_transfer_job_request) + + polling = True + wait_time = 0 + wait_between_attempts_in_sec = 10 + + # NOTE: This value defaults to an empty string until GCP + # triggers the job internally ... we'll use this value to + # determine whether or not the transfer has kicked off + latest_operation_name = create_result.latest_operation_name + + while polling: + if latest_operation_name: + operation = client.get_operation({"name": latest_operation_name}) + + if not operation.done: + logger.debug("Operation still running...") + + else: + operation_metadata = storage_transfer.TransferOperation.deserialize( + operation.metadata.value + ) + error_output = operation_metadata.error_breakdowns + if len(error_output) != 0: + raise Exception( + f"""{blob_storage} to GCS Transfer Job + {create_result.name} failed with error: {error_output}""" + ) + else: + logger.info(f"TransferJob: {create_result.name} succeeded.") + return + + else: + logger.info("Waiting to kickoff operation...") + get_transfer_job_request = storage_transfer.GetTransferJobRequest( + {"job_name": create_result.name, "project_id": self.project} + ) + get_result = client.get_transfer_job(request=get_transfer_job_request) + latest_operation_name = get_result.latest_operation_name + + wait_time += wait_between_attempts_in_sec + time.sleep(wait_between_attempts_in_sec) + + def format_uri(self, bucket: str, name: str): + """ + Represent a GCS URI as a string + + `Args`: + bucket: str + GCS bucket name + name: str + Filename in bucket + + `Returns`: + String represetnation of URI + """ + return f"gs://{bucket}/{name}" + + def split_uri(self, gcs_uri: str): + """ + Split a GCS URI into a bucket and blob name + + `Args`: + gcs_uri: str + GCS URI + + `Returns`: + Tuple of strings with bucket_name and blob_name + """ + # TODO: make this more robust with regex? + remove_protocol = gcs_uri.replace("gs://", "") + uri_parts = remove_protocol.split("/") + bucket_name = uri_parts[0] + blob_name = "/".join(uri_parts[1:]) + return bucket_name, blob_name + + def unzip_blob( + self, + bucket_name: str, + blob_name: str, + compression_type: str = "gzip", + new_filename: Optional[str] = None, + new_file_extension: Optional[str] = None, + ) -> str: + """ + Downloads and decompresses a blob. The decompressed blob + is re-uploaded with the same filename if no `new_filename` + parameter is provided. + + `Args`: + bucket_name: str + GCS bucket name + + blob_name: str + Blob name in GCS bucket + + compression_type: str + Either `zip` or `gzip` + + new_filename: str + If provided, replaces the existing blob name + when the decompressed file is uploaded + + new_file_extension: str + If provided, replaces the file extension + when the decompressed file is uploaded + + `Returns`: + String representation of decompressed GCS URI + """ + + compression_params = { + "zip": { + "file_extension": ".zip", + "compression_function": self.__zip_decompress_and_write_to_gcs, + "read": "r", + }, + "gzip": { + "file_extension": ".gz", + "compression_function": self.__gzip_decompress_and_write_to_gcs, + }, + } + + file_extension = compression_params[compression_type]["file_extension"] + compression_function = compression_params[compression_type][ + "compression_function" + ] + + compressed_filepath = self.download_blob( + bucket_name=bucket_name, blob_name=blob_name + ) + + decompressed_filepath = compressed_filepath.replace(file_extension, "") + decompressed_blob_name = ( + new_filename if new_filename else blob_name.replace(file_extension, "") + ) + if new_file_extension: + decompressed_filepath += f".{new_file_extension}" + decompressed_blob_name += f".{new_file_extension}" + + logger.debug("Decompressing file...") + compression_function( + compressed_filepath=compressed_filepath, + decompressed_filepath=decompressed_filepath, + decompressed_blob_name=decompressed_blob_name, + bucket_name=bucket_name, + new_file_extension=new_file_extension, + ) + + return self.format_uri(bucket=bucket_name, name=decompressed_blob_name) + + def __gzip_decompress_and_write_to_gcs(self, **kwargs): + """ + Handles `.gzip` decompression and streams blob contents + to a decompressed storage object + """ + + compressed_filepath = kwargs.pop("compressed_filepath") + decompressed_blob_name = kwargs.pop("decompressed_blob_name") + bucket_name = kwargs.pop("bucket_name") + + with gzip.open(compressed_filepath, "rb") as f_in: + logger.debug( + f"Uploading uncompressed file to GCS: {decompressed_blob_name}" + ) + bucket = self.get_bucket(bucket_name=bucket_name) + blob = storage.Blob(name=decompressed_blob_name, bucket=bucket) + blob.upload_from_file(file_obj=f_in, rewind=True, timeout=3600) + + def __zip_decompress_and_write_to_gcs(self, **kwargs): + """ + Handles `.zip` decompression and streams blob contents + to a decompressed storage object + """ + + compressed_filepath = kwargs.pop("compressed_filepath") + decompressed_blob_name = kwargs.pop("decompressed_blob_name") + decompressed_blob_in_archive = decompressed_blob_name.split("/")[-1] + bucket_name = kwargs.pop("bucket_name") + + # Unzip the archive + with zipfile.ZipFile(compressed_filepath) as path_: + # Open the underlying file + with path_.open(decompressed_blob_in_archive) as f_in: + logger.debug( + f"Uploading uncompressed file to GCS: {decompressed_blob_name}" + ) + bucket = self.get_bucket(bucket_name=bucket_name) + blob = storage.Blob(name=decompressed_blob_name, bucket=bucket) + blob.upload_from_file(file_obj=f_in, rewind=True, timeout=3600) diff --git a/requirements.txt b/requirements.txt index 3031efb451..869a574886 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,7 +20,8 @@ xmltodict==0.11.0 joblib==1.2.0 censusgeocode==0.4.3.post1 airtable-python-wrapper==0.13.0 -google-cloud-storage==2.2.0 +google-cloud-storage==2.10.0 +google-cloud-storage-transfer==1.9.1 google-cloud-bigquery==3.4.0 docutils<0.18,>=0.14 urllib3==1.26.18 @@ -44,3 +45,4 @@ bs4==0.0.1 # TODO Remove when we have a TMC-specific Docker image selenium==3.141.0 jinja2==3.0.2 +us==3.1.1 \ No newline at end of file diff --git a/setup.py b/setup.py index c75e8cfe43..9d35042565 100644 --- a/setup.py +++ b/setup.py @@ -29,6 +29,7 @@ def main(): "google-api-python-client", "google-cloud-bigquery", "google-cloud-storage", + "google-cloud-storage-transfer", "gspread", "httplib2", "oauth2client", @@ -61,7 +62,7 @@ def main(): setup( name="parsons", - version="2.1.0", + version="3.0.0", author="The Movement Cooperative", author_email="info@movementcooperative.org", url="https://github.com/move-coop/parsons", diff --git a/test/test_airtable/test_airtable.py b/test/test_airtable/test_airtable.py index ff326c43b9..0459283abb 100644 --- a/test/test_airtable/test_airtable.py +++ b/test/test_airtable/test_airtable.py @@ -11,7 +11,7 @@ ) -os.environ["AIRTABLE_API_KEY"] = "SOME_KEY" +os.environ["AIRTABLE_PERSONAL_ACCESS_TOKEN"] = "SOME_TOKEN" BASE_KEY = "BASEKEY" TABLE_NAME = "TABLENAME" diff --git a/test/test_databases/test_bigquery.py b/test/test_databases/test_bigquery.py new file mode 100644 index 0000000000..e77baee8b9 --- /dev/null +++ b/test/test_databases/test_bigquery.py @@ -0,0 +1,539 @@ +import json +import os +import unittest.mock as mock + +from google.cloud import bigquery +from google.cloud import exceptions + +from parsons import GoogleBigQuery as BigQuery, Table +from parsons.google.google_cloud_storage import GoogleCloudStorage +from test.test_google.test_utilities import FakeCredentialTest + + +class FakeClient: + """A Fake Storage Client used for monkey-patching.""" + + def __init__(self, project=None): + self.project = project + + +class FakeGoogleCloudStorage(GoogleCloudStorage): + """A Fake GoogleCloudStorage object used to test setting up credentials.""" + + @mock.patch("google.cloud.storage.Client", FakeClient) + def __init__(self): + super().__init__(None, None) + + def upload_table( + self, table, bucket_name, blob_name, data_type="csv", default_acl=None + ): + pass + + def delete_blob(self, bucket_name, blob_name): + pass + + +class TestGoogleBigQuery(FakeCredentialTest): + def setUp(self): + super().setUp() + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.cred_path + self.tmp_gcs_bucket = "tmp" + + def tearDown(self) -> None: + super().tearDown() + del os.environ["GOOGLE_APPLICATION_CREDENTIALS"] + + def test_query(self): + query_string = "select * from table" + + # Pass the mock class into our GoogleBigQuery constructor + bq = self._build_mock_client_for_querying([{"one": 1, "two": 2}]) + + # Run a query against our parsons GoogleBigQuery class + result = bq.query(query_string) + + # Check our return value + self.assertEqual(result.num_rows, 1) + self.assertEqual(result.columns, ["one", "two"]) + self.assertEqual(result[0], {"one": 1, "two": 2}) + + def test_query__no_results(self): + query_string = "select * from table" + + # Pass the mock class into our GoogleBigQuery constructor + bq = self._build_mock_client_for_querying([]) + + # Run a query against our parsons GoogleBigQuery class + result = bq.query(query_string) + + # Check our return value + self.assertEqual(result, None) + + @mock.patch("parsons.utilities.files.create_temp_file") + def test_query__no_return(self, create_temp_file_mock): + query_string = "select * from table" + + # Pass the mock class into our GoogleBigQuery constructor + bq = self._build_mock_client_for_querying([{"one": 1, "two": 2}]) + bq._fetch_query_results = mock.MagicMock() + + # Run a query against our parsons GoogleBigQuery class + result = bq.query(query_string, return_values=False) + + # Check our return value + self.assertEqual(result, None) + + # Check that query results were not fetched + bq._fetch_query_results.assert_not_called() + + @mock.patch("parsons.utilities.files.create_temp_file") + def test_query_with_transaction(self, create_temp_file_mock): + queries = ["select * from table", "select foo from bar"] + parameters = ["baz"] + + # Pass the mock class into our GoogleBigQuery constructor + bq = self._build_mock_client_for_querying([{"one": 1, "two": 2}]) + bq.query = mock.MagicMock() + + # Run a query against our parsons GoogleBigQuery class + result = bq.query_with_transaction(queries=queries, parameters=parameters) + keyword_args = bq.query.call_args[1] + + # Check our return value + self.assertEqual(result, None) + + # Check that queries and transaction keywords are included in sql + self.assertTrue( + all( + [ + text in keyword_args["sql"] + for text in queries + ["BEGIN TRANSACTION", "COMMIT"] + ] + ) + ) + self.assertEqual(keyword_args["parameters"], parameters) + self.assertFalse(keyword_args["return_values"]) + + def test_copy_gcs(self): + # setup dependencies / inputs + tmp_blob_uri = "gs://tmp/file" + + # set up object under test + bq = self._build_mock_client_for_copying(table_exists=False) + + # call the method being tested + bq.copy_from_gcs( + gcs_blob_uri=tmp_blob_uri, + table_name="dataset.table", + ) + + # check that the method did the right things + self.assertEqual(bq.client.load_table_from_uri.call_count, 1) + load_call_args = bq.client.load_table_from_uri.call_args + self.assertEqual(load_call_args[1]["source_uris"], tmp_blob_uri) + + job_config = load_call_args[1]["job_config"] + self.assertEqual( + job_config.write_disposition, bigquery.WriteDisposition.WRITE_EMPTY + ) + + def test_copy_gcs__if_exists_truncate(self): + # setup dependencies / inputs + tmp_blob_uri = "gs://tmp/file" + + # set up object under test + bq = self._build_mock_client_for_copying(table_exists=False) + + # call the method being tested + bq.copy_from_gcs( + gcs_blob_uri=tmp_blob_uri, + table_name="dataset.table", + if_exists="truncate", + ) + + # check that the method did the right things + self.assertEqual(bq.client.load_table_from_uri.call_count, 1) + load_call_args = bq.client.load_table_from_uri.call_args + self.assertEqual(load_call_args[1]["source_uris"], tmp_blob_uri) + + job_config = load_call_args[1]["job_config"] + self.assertEqual( + job_config.write_disposition, bigquery.WriteDisposition.WRITE_TRUNCATE + ) + + def test_copy_gcs__if_exists_append(self): + # setup dependencies / inputs + tmp_blob_uri = "gs://tmp/file" + + # set up object under test + bq = self._build_mock_client_for_copying(table_exists=False) + + # call the method being tested + bq.copy_from_gcs( + gcs_blob_uri=tmp_blob_uri, + table_name="dataset.table", + if_exists="append", + ) + + # check that the method did the right things + self.assertEqual(bq.client.load_table_from_uri.call_count, 1) + load_call_args = bq.client.load_table_from_uri.call_args + self.assertEqual(load_call_args[1]["source_uris"], tmp_blob_uri) + + job_config = load_call_args[1]["job_config"] + self.assertEqual( + job_config.write_disposition, bigquery.WriteDisposition.WRITE_APPEND + ) + + def test_copy_gcs__if_exists_fail(self): + # setup dependencies / inputs + tmp_blob_uri = "gs://tmp/file" + + # set up object under test + bq = self._build_mock_client_for_copying(table_exists=False) + + # call the method being tested + bq.copy_from_gcs( + gcs_blob_uri=tmp_blob_uri, + table_name="dataset.table", + if_exists="truncate", + ) + bq.table_exists = mock.MagicMock() + bq.table_exists.return_value = True + + # call the method being tested + with self.assertRaises(Exception): + bq.copy_from_gcs( + self.default_table, + "dataset.table", + tmp_gcs_bucket=self.tmp_gcs_bucket, + gcs_client=self._build_mock_cloud_storage_client(), + ) + + def test_copy_gcs__if_exists_drop(self): + # setup dependencies / inputs + tmp_blob_uri = "gs://tmp/file" + + # set up object under test + bq = self._build_mock_client_for_copying(table_exists=False) + bq.table_exists = mock.MagicMock() + bq.table_exists.return_value = True + + # call the method being tested + bq.copy_from_gcs( + gcs_blob_uri=tmp_blob_uri, + table_name="dataset.table", + if_exists="drop", + ) + + # check that we tried to delete the table + self.assertEqual(bq.client.delete_table.call_count, 1) + + def test_copy_gcs__bad_if_exists(self): + # setup dependencies / inputs + tmp_blob_uri = "gs://tmp/file" + + # set up object under test + bq = self._build_mock_client_for_copying(table_exists=False) + bq.table_exists = mock.MagicMock() + bq.table_exists.return_value = True + + # call the method being tested + with self.assertRaises(ValueError): + bq.copy_from_gcs( + gcs_blob_uri=tmp_blob_uri, + table_name="dataset.table", + if_exists="foobar", + ) + + @mock.patch("google.cloud.storage.Client") + @mock.patch.object( + GoogleCloudStorage, "split_uri", return_value=("tmp", "file.gzip") + ) + @mock.patch.object( + GoogleCloudStorage, "unzip_blob", return_value="gs://tmp/file.csv" + ) + def test_copy_large_compressed_file_from_gcs( + self, unzip_mock: mock.MagicMock, split_mock: mock.MagicMock, *_ + ): + # setup dependencies / inputs + tmp_blob_uri = "gs://tmp/file.gzip" + + # set up object under test + bq = self._build_mock_client_for_copying(table_exists=False) + + # call the method being tested + bq.copy_large_compressed_file_from_gcs( + gcs_blob_uri=tmp_blob_uri, + table_name="dataset.table", + ) + + # check that the method did the right things + split_mock.assert_has_calls( + [ + mock.call(gcs_uri="gs://tmp/file.gzip"), + mock.call(gcs_uri="gs://tmp/file.csv"), + ] + ) + unzip_mock.assert_called_once_with( + bucket_name="tmp", + blob_name="file.gzip", + new_file_extension="csv", + compression_type="gzip", + ) + self.assertEqual(bq.client.load_table_from_uri.call_count, 1) + load_call_args = bq.client.load_table_from_uri.call_args + self.assertEqual(load_call_args[1]["source_uris"], "gs://tmp/file.csv") + + job_config = load_call_args[1]["job_config"] + self.assertEqual( + job_config.write_disposition, bigquery.WriteDisposition.WRITE_EMPTY + ) + + def test_copy_s3(self): + # setup dependencies / inputs + table_name = "table_name" + bucket = "aws_bucket" + key = "file.gzip" + aws_access_key_id = "AAAAAA" + aws_secret_access_key = "BBBBB" + tmp_gcs_bucket = "tmp" + + # set up object under test + bq = self._build_mock_client_for_copying(table_exists=False) + gcs_client = self._build_mock_cloud_storage_client() + bq.copy_from_gcs = mock.MagicMock() + + # call the method being tested + bq.copy_s3( + table_name=table_name, + bucket=bucket, + key=key, + gcs_client=gcs_client, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + tmp_gcs_bucket=tmp_gcs_bucket, + ) + + # check that the method did the right things + gcs_client.copy_s3_to_gcs.assert_called_once_with( + aws_source_bucket=bucket, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + gcs_sink_bucket=tmp_gcs_bucket, + aws_s3_key=key, + ) + bq.copy_from_gcs.assert_called_once() + gcs_client.delete_blob.assert_called_once() + + def test_copy(self): + # setup dependencies / inputs + tmp_blob_uri = "gs://tmp/file" + + # set up object under test + gcs_client = self._build_mock_cloud_storage_client(tmp_blob_uri) + tbl = self.default_table + bq = self._build_mock_client_for_copying(table_exists=False) + bq.copy_from_gcs = mock.MagicMock() + table_name = ("dataset.table",) + + # call the method being tested + bq.copy( + tbl, + table_name, + tmp_gcs_bucket=self.tmp_gcs_bucket, + gcs_client=gcs_client, + ) + + # check that the method did the right things + self.assertEqual(gcs_client.upload_table.call_count, 1) + upload_call_args = gcs_client.upload_table.call_args + self.assertEqual(upload_call_args[0][0], tbl) + self.assertEqual(upload_call_args[0][1], self.tmp_gcs_bucket) + tmp_blob_name = upload_call_args[0][2] + + self.assertEqual(bq.copy_from_gcs.call_count, 1) + load_call_args = bq.copy_from_gcs.call_args + self.assertEqual(load_call_args[1]["gcs_blob_uri"], tmp_blob_uri) + self.assertEqual(load_call_args[1]["table_name"], table_name) + + # make sure we cleaned up the temp file + self.assertEqual(gcs_client.delete_blob.call_count, 1) + delete_call_args = gcs_client.delete_blob.call_args + self.assertEqual(delete_call_args[0][0], self.tmp_gcs_bucket) + self.assertEqual(delete_call_args[0][1], tmp_blob_name) + + def test_copy__credentials_are_correctly_set(self): + tbl = self.default_table + bq = self._build_mock_client_for_copying(table_exists=False) + + # Pass in our fake GCS Client. + bq.copy( + tbl, + "dataset.table", + tmp_gcs_bucket=self.tmp_gcs_bucket, + gcs_client=FakeGoogleCloudStorage(), + ) + + actual = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] + + with open(actual, "r") as factual: + with open(self.cred_path, "r") as fexpected: + actual_str = factual.read() + self.assertEqual(actual_str, fexpected.read()) + self.assertEqual(self.cred_contents, json.loads(actual_str)) + + def test_copy__if_exists_passed_through(self): + # setup dependencies / inputs + tmp_blob_uri = "gs://tmp/file" + + # set up object under test + gcs_client = self._build_mock_cloud_storage_client(tmp_blob_uri) + tbl = self.default_table + bq = self._build_mock_client_for_copying(table_exists=False) + bq.copy_from_gcs = mock.MagicMock() + table_name = "dataset.table" + if_exists = "append" + + # call the method being tested + bq.copy( + tbl, + table_name, + tmp_gcs_bucket=self.tmp_gcs_bucket, + gcs_client=gcs_client, + if_exists=if_exists, + ) + + self.assertEqual(bq.copy_from_gcs.call_count, 1) + load_call_args = bq.copy_from_gcs.call_args + self.assertEqual(load_call_args[1]["if_exists"], if_exists) + + @mock.patch.object(BigQuery, "table_exists", return_value=False) + @mock.patch.object(BigQuery, "query", return_value=None) + def test_duplicate_table(self, query_mock, table_exists_mock): + source_table = "vendor_table" + destination_table = "raw_table" + expected_query = f""" + CREATE TABLE + {destination_table} + CLONE {source_table} + """ + bq = self._build_mock_client_for_querying(results=None) + + bq.duplicate_table( + source_table=source_table, + destination_table=destination_table, + ) + + query_mock.assert_called_once() + actual_query = query_mock.call_args[1]["sql"] + self.assertEqual(actual_query, expected_query) + + @mock.patch.object(BigQuery, "table_exists", return_value=False) + @mock.patch.object(BigQuery, "delete_table", return_value=None) + @mock.patch.object(BigQuery, "query", return_value=None) + def test_duplicate_table_with_drop( + self, query_mock: mock.MagicMock, delete_mock: mock.MagicMock, table_exists_mock + ): + source_table = "vendor_table" + destination_table = "raw_table" + bq = self._build_mock_client_for_querying(results=None) + + bq.duplicate_table( + source_table=source_table, + destination_table=destination_table, + drop_source_table=True, + ) + + delete_mock.assert_called_once_with(table_name=source_table) + + @mock.patch.object(BigQuery, "table_exists", return_value=True) + @mock.patch.object(BigQuery, "query_with_transaction", return_value=None) + @mock.patch.object(BigQuery, "copy", return_value=None) + def test_upsert(self, copy_mock, query_mock, *_): + upsert_tbl = Table([["id", "name"], [1, "Jane"]]) + target_table = "my_dataset.my_target_table" + primary_key = "id" + bq = self._build_mock_client_for_querying(results=[]) + + bq.upsert( + table_obj=upsert_tbl, + target_table=target_table, + primary_key=primary_key, + distinct_check=False, + ) + + # stages the table -> calls copy + copy_mock.assert_called_once() + self.assertEqual(copy_mock.call_args[1]["tbl"], upsert_tbl) + self.assertEqual(copy_mock.call_args[1]["template_table"], target_table) + + # runs a delete insert within a transaction + query_mock.assert_called_once() + actual_queries = query_mock.call_args[1]["queries"] + self.assertIn("DELETE", actual_queries[0]) + self.assertIn("INSERT", actual_queries[1]) + + @mock.patch.object(BigQuery, "query") + def test_get_row_count(self, query_mock): + # Arrange + schema = "foo" + table_name = "bar" + expected_num_rows = 2 + + query_mock.return_value = Table([{"row_count": expected_num_rows}]) + expected_query = f"SELECT COUNT(*) AS row_count FROM `{schema}.{table_name}`" + bq = self._build_mock_client_for_querying(results=Table([{"row_count": 2}])) + + # Act + row_count = bq.get_row_count(schema=schema, table_name=table_name) + + # Assert + query_mock.assert_called_once() + actual_query = query_mock.call_args[1]["sql"] + self.assertEqual(row_count, expected_num_rows) + self.assertEqual(actual_query, expected_query) + + def _build_mock_client_for_querying(self, results): + # Create a mock that will play the role of the cursor + cursor = mock.MagicMock() + cursor.execute.return_value = None + cursor.fetchmany.side_effect = [results, []] + + # Create a mock that will play the role of the connection + connection = mock.MagicMock() + connection.cursor.return_value = cursor + + # Create a mock that will play the role of the Google BigQuery dbapi module + dbapi = mock.MagicMock() + dbapi.connect.return_value = connection + + # Create a mock that will play the role of our GoogleBigQuery client + client = mock.MagicMock() + + bq = BigQuery() + bq._client = client + bq._dbapi = dbapi + return bq + + def _build_mock_client_for_copying(self, table_exists=True): + bq_client = mock.MagicMock() + if not table_exists: + bq_client.get_table.side_effect = exceptions.NotFound("not found") + bq = BigQuery() + bq._client = bq_client + return bq + + def _build_mock_cloud_storage_client(self, tmp_blob_uri=""): + gcs_client = mock.MagicMock() + gcs_client.upload_table.return_value = tmp_blob_uri + return gcs_client + + @property + def default_table(self): + return Table( + [ + {"num": 1, "ltr": "a"}, + {"num": 2, "ltr": "b"}, + ] + ) diff --git a/test/test_databases/test_database.py b/test/test_databases/test_database.py index b49491e687..c56116e3d1 100644 --- a/test/test_databases/test_database.py +++ b/test/test_databases/test_database.py @@ -3,7 +3,6 @@ MEDIUMINT, INT, BIGINT, - FLOAT, BOOL, VARCHAR, ) @@ -19,13 +18,6 @@ def dcs(): return db -@pytest.fixture -def dcs_bool(): - db = DatabaseCreateStatement() - db.DO_PARSE_BOOLS = True - return db - - @pytest.mark.parametrize( ("int1", "int2", "higher"), ( @@ -95,34 +87,6 @@ def test_is_valid_sql_num(dcs, val, is_valid): assert dcs.is_valid_sql_num(val) == is_valid -@pytest.mark.parametrize( - ("val", "cmp_type", "detected_type"), - ( - (1, None, SMALLINT), - (1, "", SMALLINT), - (1, MEDIUMINT, MEDIUMINT), - (32769, None, MEDIUMINT), - (32769, BIGINT, BIGINT), - (2147483648, None, BIGINT), - (2147483648, FLOAT, FLOAT), - (5.001, None, FLOAT), - (5.001, "", FLOAT), - ("FALSE", VARCHAR, VARCHAR), - ("word", "", VARCHAR), - ("word", INT, VARCHAR), - ("1_2", BIGINT, VARCHAR), - ("01", FLOAT, VARCHAR), - ("00001", None, VARCHAR), - ("word", None, VARCHAR), - ("1_2", None, VARCHAR), - ("01", None, VARCHAR), - ("{}", None, VARCHAR), - ), -) -def test_detect_data_type(dcs, val, cmp_type, detected_type): - assert dcs.detect_data_type(val, cmp_type) == detected_type - - @pytest.mark.parametrize( ("val", "cmp_type", "detected_type"), ( @@ -131,16 +95,16 @@ def test_detect_data_type(dcs, val, cmp_type, detected_type): (1, MEDIUMINT, MEDIUMINT), (2, BOOL, SMALLINT), (True, None, BOOL), - (0, None, BOOL), - (1, None, BOOL), - (1, BOOL, BOOL), - ("F", None, BOOL), - ("FALSE", None, BOOL), - ("Yes", None, BOOL), + (0, None, SMALLINT), + (1, None, SMALLINT), + (1, BOOL, SMALLINT), + ("F", None, VARCHAR), + ("FALSE", None, VARCHAR), + ("Yes", None, VARCHAR), ), ) -def test_detect_data_type_bools(dcs_bool, val, cmp_type, detected_type): - assert dcs_bool.detect_data_type(val, cmp_type) == detected_type +def test_detect_data_type_bools(dcs, val, cmp_type, detected_type): + assert dcs.detect_data_type(val, cmp_type) == detected_type @pytest.mark.parametrize( diff --git a/test/test_databases/test_discover_database.py b/test/test_databases/test_discover_database.py index b946629e10..4f8fbf647e 100644 --- a/test/test_databases/test_discover_database.py +++ b/test/test_databases/test_discover_database.py @@ -3,12 +3,12 @@ from parsons.databases.redshift import Redshift from parsons.databases.mysql import MySQL from parsons.databases.postgres import Postgres -from parsons.google.google_bigquery import GoogleBigQuery +from parsons import GoogleBigQuery as BigQuery from parsons.databases.discover_database import discover_database class TestDiscoverDatabase(unittest.TestCase): - @patch.object(GoogleBigQuery, "__init__", return_value=None) + @patch.object(BigQuery, "__init__", return_value=None) @patch.object(Postgres, "__init__", return_value=None) @patch.object(MySQL, "__init__", return_value=None) @patch.object(Redshift, "__init__", return_value=None) @@ -18,7 +18,7 @@ def test_no_database_detected(self, mock_getenv, *_): with self.assertRaises(EnvironmentError): discover_database() - @patch.object(GoogleBigQuery, "__init__", return_value=None) + @patch.object(BigQuery, "__init__", return_value=None) @patch.object(Postgres, "__init__", return_value=None) @patch.object(MySQL, "__init__", return_value=None) @patch.object(Redshift, "__init__", return_value=None) @@ -29,7 +29,7 @@ def test_single_database_detected(self, mock_getenv, *_): ) self.assertIsInstance(discover_database(), Redshift) - @patch.object(GoogleBigQuery, "__init__", return_value=None) + @patch.object(BigQuery, "__init__", return_value=None) @patch.object(Postgres, "__init__", return_value=None) @patch.object(MySQL, "__init__", return_value=None) @patch.object(Redshift, "__init__", return_value=None) @@ -40,7 +40,7 @@ def test_single_database_detected_with_other_default(self, mock_getenv, *_): ) self.assertIsInstance(discover_database(default_connector=Postgres), Redshift) - @patch.object(GoogleBigQuery, "__init__", return_value=None) + @patch.object(BigQuery, "__init__", return_value=None) @patch.object(Postgres, "__init__", return_value=None) @patch.object(MySQL, "__init__", return_value=None) @patch.object(Redshift, "__init__", return_value=None) @@ -53,7 +53,7 @@ def test_single_database_detected_with_other_default_list(self, mock_getenv, *_) discover_database(default_connector=[Postgres, MySQL]), Redshift ) - @patch.object(GoogleBigQuery, "__init__", return_value=None) + @patch.object(BigQuery, "__init__", return_value=None) @patch.object(Postgres, "__init__", return_value=None) @patch.object(MySQL, "__init__", return_value=None) @patch.object(Redshift, "__init__", return_value=None) @@ -63,7 +63,7 @@ def test_multiple_databases_no_default(self, mock_getenv, *_): with self.assertRaises(EnvironmentError): discover_database() - @patch.object(GoogleBigQuery, "__init__", return_value=None) + @patch.object(BigQuery, "__init__", return_value=None) @patch.object(Postgres, "__init__", return_value=None) @patch.object(MySQL, "__init__", return_value=None) @patch.object(Redshift, "__init__", return_value=None) @@ -72,7 +72,7 @@ def test_multiple_databases_with_default(self, mock_getenv, *_): mock_getenv.return_value = "password" self.assertIsInstance(discover_database(default_connector=Redshift), Redshift) - @patch.object(GoogleBigQuery, "__init__", return_value=None) + @patch.object(BigQuery, "__init__", return_value=None) @patch.object(Postgres, "__init__", return_value=None) @patch.object(MySQL, "__init__", return_value=None) @patch.object(Redshift, "__init__", return_value=None) @@ -83,7 +83,7 @@ def test_multiple_databases_with_default_list(self, mock_getenv, *_): discover_database(default_connector=[MySQL, Redshift]), MySQL ) - @patch.object(GoogleBigQuery, "__init__", return_value=None) + @patch.object(BigQuery, "__init__", return_value=None) @patch.object(Postgres, "__init__", return_value=None) @patch.object(MySQL, "__init__", return_value=None) @patch.object(Redshift, "__init__", return_value=None) @@ -97,7 +97,7 @@ def test_multiple_databases_invalid_default(self, mock_getenv, *_): with self.assertRaises(EnvironmentError): discover_database(default_connector=Postgres) - @patch.object(GoogleBigQuery, "__init__", return_value=None) + @patch.object(BigQuery, "__init__", return_value=None) @patch.object(Postgres, "__init__", return_value=None) @patch.object(MySQL, "__init__", return_value=None) @patch.object(Redshift, "__init__", return_value=None) @@ -109,7 +109,7 @@ def test_multiple_databases_invalid_default_list(self, mock_getenv, *_): else None ) with self.assertRaises(EnvironmentError): - discover_database(default_connector=[Postgres, GoogleBigQuery]) + discover_database(default_connector=[Postgres, BigQuery]) if __name__ == "__main__": diff --git a/test/test_databases/test_mysql.py b/test/test_databases/test_mysql.py index 01c156dfe4..323b4ffbf6 100644 --- a/test/test_databases/test_mysql.py +++ b/test/test_databases/test_mysql.py @@ -156,11 +156,9 @@ def setUp(self): def test_data_type(self): # Test bool - self.mysql.DO_PARSE_BOOLS = True - self.assertEqual(self.mysql.data_type(1, ""), "bool") self.assertEqual(self.mysql.data_type(False, ""), "bool") + self.assertEqual(self.mysql.data_type(True, ""), "bool") - self.mysql.DO_PARSE_BOOLS = False # Test smallint self.assertEqual(self.mysql.data_type(1, ""), "smallint") self.assertEqual(self.mysql.data_type(2, ""), "smallint") @@ -170,14 +168,14 @@ def test_data_type(self): self.assertEqual(self.mysql.data_type(2147483648, ""), "bigint") # Test varchar that looks like an int self.assertEqual(self.mysql.data_type("00001", ""), "varchar") - # Test varchar that looks like a bool - self.assertEqual(self.mysql.data_type(False, ""), "varchar") # Test a float as a decimal self.assertEqual(self.mysql.data_type(5.001, ""), "float") # Test varchar self.assertEqual(self.mysql.data_type("word", ""), "varchar") - # Test int with underscore + # Test int with underscore as string self.assertEqual(self.mysql.data_type("1_2", ""), "varchar") + # Test int with underscore + self.assertEqual(self.mysql.data_type(1_2, ""), "smallint") # Test int with leading zero self.assertEqual(self.mysql.data_type("01", ""), "varchar") diff --git a/test/test_databases/test_postgres.py b/test/test_databases/test_postgres.py index 08956c672d..5279c94ccf 100644 --- a/test/test_databases/test_postgres.py +++ b/test/test_databases/test_postgres.py @@ -30,11 +30,8 @@ def setUp(self): ["g", "", 9, "NA", 1.4, 1, 2], ] ) - self.mapping = self.pg.generate_data_types(self.tbl) self.mapping2 = self.pg.generate_data_types(self.tbl2) - self.pg.DO_PARSE_BOOLS = True - self.mapping3 = self.pg.generate_data_types(self.tbl2) def test_connection(self): @@ -56,7 +53,6 @@ def test_connection(self): self.assertEqual(pg_env.port, 5432) def test_data_type(self): - self.pg.DO_PARSE_BOOLS = False # Test smallint self.assertEqual(self.pg.data_type(1, ""), "smallint") self.assertEqual(self.pg.data_type(2, ""), "smallint") @@ -66,20 +62,18 @@ def test_data_type(self): self.assertEqual(self.pg.data_type(2147483648, ""), "bigint") # Test varchar that looks like an int self.assertEqual(self.pg.data_type("00001", ""), "varchar") - # Test varchar that looks like a bool - self.assertEqual(self.pg.data_type(True, ""), "varchar") # Test a float as a decimal self.assertEqual(self.pg.data_type(5.001, ""), "decimal") # Test varchar self.assertEqual(self.pg.data_type("word", ""), "varchar") - # Test int with underscore + # Test int with underscore as string self.assertEqual(self.pg.data_type("1_2", ""), "varchar") - # Test int with leading zero + # Test int with leading zero as string self.assertEqual(self.pg.data_type("01", ""), "varchar") + # Test int with underscore + self.assertEqual(self.pg.data_type(1_2, ""), "smallint") # Test bool - self.pg.DO_PARSE_BOOLS = True - self.assertEqual(self.pg.data_type(1, ""), "bool") self.assertEqual(self.pg.data_type(True, ""), "bool") def test_generate_data_types(self): @@ -100,10 +94,6 @@ def test_generate_data_types(self): "varchar", ], ) - self.assertEqual( - self.mapping3["type_list"], - ["varchar", "varchar", "decimal", "varchar", "decimal", "bool", "varchar"], - ) # Test correct lengths self.assertEqual(self.mapping["longest"], [1, 5]) diff --git a/test/test_redshift.py b/test/test_databases/test_redshift.py similarity index 98% rename from test/test_redshift.py rename to test/test_databases/test_redshift.py index fd664fede5..b811e98857 100644 --- a/test/test_redshift.py +++ b/test/test_databases/test_redshift.py @@ -35,10 +35,7 @@ def setUp(self): ) self.mapping = self.rs.generate_data_types(self.tbl) - self.rs.DO_PARSE_BOOLS = True self.mapping2 = self.rs.generate_data_types(self.tbl2) - self.rs.DO_PARSE_BOOLS = False - self.mapping3 = self.rs.generate_data_types(self.tbl2) def test_split_full_table_name(self): schema, table = Redshift.split_full_table_name("some_schema.some_table") @@ -60,14 +57,9 @@ def test_combine_schema_and_table_name(self): self.assertEqual(full_table_name, "some_schema.some_table") def test_data_type(self): - # Test bool - self.rs.DO_PARSE_BOOLS = True - self.assertEqual(self.rs.data_type(1, ""), "bool") self.assertEqual(self.rs.data_type(True, ""), "bool") - self.rs.DO_PARSE_BOOLS = False self.assertEqual(self.rs.data_type(1, ""), "int") - self.assertEqual(self.rs.data_type(True, ""), "varchar") # Test smallint # Currently smallints are coded as ints self.assertEqual(self.rs.data_type(2, ""), "int") @@ -81,13 +73,14 @@ def test_data_type(self): self.assertEqual(self.rs.data_type(5.001, ""), "float") # Test varchar self.assertEqual(self.rs.data_type("word", ""), "varchar") - # Test int with underscore + # Test int with underscore as varchar self.assertEqual(self.rs.data_type("1_2", ""), "varchar") + # Test int with underscore + self.assertEqual(self.rs.data_type(1_2, ""), "int") # Test int with leading zero self.assertEqual(self.rs.data_type("01", ""), "varchar") def test_generate_data_types(self): - # Test correct header labels self.assertEqual(self.mapping["headers"], ["ID", "Name"]) # Test correct data types @@ -95,13 +88,9 @@ def test_generate_data_types(self): self.assertEqual( self.mapping2["type_list"], - ["varchar", "varchar", "float", "varchar", "float", "bool", "varchar"], - ) - - self.assertEqual( - self.mapping3["type_list"], ["varchar", "varchar", "float", "varchar", "float", "int", "varchar"], ) + # Test correct lengths self.assertEqual(self.mapping["longest"], [1, 5]) diff --git a/test/test_google/test_google_bigquery.py b/test/test_google/test_google_bigquery.py deleted file mode 100644 index 4fa3f0ac20..0000000000 --- a/test/test_google/test_google_bigquery.py +++ /dev/null @@ -1,270 +0,0 @@ -import json -import os -import unittest.mock as mock - -from google.cloud import bigquery -from google.cloud import exceptions - -from parsons import GoogleBigQuery, Table -from parsons.google.google_cloud_storage import GoogleCloudStorage -from test.test_google.test_utilities import FakeCredentialTest - - -class FakeClient: - """A Fake Storage Client used for monkey-patching.""" - - def __init__(self, project=None): - self.project = project - - -class FakeGoogleCloudStorage(GoogleCloudStorage): - """A Fake GoogleCloudStorage object used to test setting up credentials.""" - - @mock.patch("google.cloud.storage.Client", FakeClient) - def __init__(self): - super().__init__(None, None) - - def upload_table( - self, table, bucket_name, blob_name, data_type="csv", default_acl=None - ): - pass - - def delete_blob(self, bucket_name, blob_name): - pass - - -class TestGoogleBigQuery(FakeCredentialTest): - def setUp(self): - super().setUp() - os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.cred_path - self.tmp_gcs_bucket = "tmp" - - def tearDown(self) -> None: - super().tearDown() - del os.environ["GOOGLE_APPLICATION_CREDENTIALS"] - - def test_query(self): - query_string = "select * from table" - - # Pass the mock class into our GoogleBigQuery constructor - bq = self._build_mock_client_for_querying([{"one": 1, "two": 2}]) - - # Run a query against our parsons GoogleBigQuery class - result = bq.query(query_string) - - # Check our return value - self.assertEqual(result.num_rows, 1) - self.assertEqual(result.columns, ["one", "two"]) - self.assertEqual(result[0], {"one": 1, "two": 2}) - - def test_query__no_results(self): - query_string = "select * from table" - - # Pass the mock class into our GoogleBigQuery constructor - bq = self._build_mock_client_for_querying([]) - - # Run a query against our parsons GoogleBigQuery class - result = bq.query(query_string) - - # Check our return value - self.assertEqual(result, None) - - def test_copy(self): - # setup dependencies / inputs - tmp_blob_uri = "gs://tmp/file" - - # set up object under test - gcs_client = self._build_mock_cloud_storage_client(tmp_blob_uri) - tbl = self.default_table - bq = self._build_mock_client_for_copying(table_exists=False) - - # call the method being tested - bq.copy( - tbl, - "dataset.table", - tmp_gcs_bucket=self.tmp_gcs_bucket, - gcs_client=gcs_client, - ) - - # check that the method did the right things - self.assertEqual(gcs_client.upload_table.call_count, 1) - upload_call_args = gcs_client.upload_table.call_args - self.assertEqual(upload_call_args[0][0], tbl) - self.assertEqual(upload_call_args[0][1], self.tmp_gcs_bucket) - tmp_blob_name = upload_call_args[0][2] - - self.assertEqual(bq.client.load_table_from_uri.call_count, 1) - load_call_args = bq.client.load_table_from_uri.call_args - self.assertEqual(load_call_args[0][0], tmp_blob_uri) - - job_config = load_call_args[1]["job_config"] - self.assertEqual( - job_config.write_disposition, bigquery.WriteDisposition.WRITE_EMPTY - ) - - # make sure we cleaned up the temp file - self.assertEqual(gcs_client.delete_blob.call_count, 1) - delete_call_args = gcs_client.delete_blob.call_args - self.assertEqual(delete_call_args[0][0], self.tmp_gcs_bucket) - self.assertEqual(delete_call_args[0][1], tmp_blob_name) - - def test_copy__if_exists_truncate(self): - gcs_client = self._build_mock_cloud_storage_client() - # set up object under test - bq = self._build_mock_client_for_copying() - - # call the method being tested - bq.copy( - self.default_table, - "dataset.table", - tmp_gcs_bucket=self.tmp_gcs_bucket, - if_exists="truncate", - gcs_client=gcs_client, - ) - - # check that the method did the right things - call_args = bq.client.load_table_from_uri.call_args - job_config = call_args[1]["job_config"] - self.assertEqual( - job_config.write_disposition, bigquery.WriteDisposition.WRITE_TRUNCATE - ) - - # make sure we cleaned up the temp file - self.assertEqual(gcs_client.delete_blob.call_count, 1) - - def test_copy__if_exists_append(self): - gcs_client = self._build_mock_cloud_storage_client() - # set up object under test - bq = self._build_mock_client_for_copying() - - # call the method being tested - bq.copy( - self.default_table, - "dataset.table", - tmp_gcs_bucket=self.tmp_gcs_bucket, - if_exists="append", - gcs_client=gcs_client, - ) - - # check that the method did the right things - call_args = bq.client.load_table_from_uri.call_args - job_config = call_args[1]["job_config"] - self.assertEqual( - job_config.write_disposition, bigquery.WriteDisposition.WRITE_APPEND - ) - - # make sure we cleaned up the temp file - self.assertEqual(gcs_client.delete_blob.call_count, 1) - - def test_copy__if_exists_fail(self): - # set up object under test - bq = self._build_mock_client_for_copying() - - # call the method being tested - with self.assertRaises(Exception): - bq.copy( - self.default_table, - "dataset.table", - tmp_gcs_bucket=self.tmp_gcs_bucket, - gcs_client=self._build_mock_cloud_storage_client(), - ) - - def test_copy__if_exists_drop(self): - gcs_client = self._build_mock_cloud_storage_client() - # set up object under test - bq = self._build_mock_client_for_copying() - - # call the method being tested - bq.copy( - self.default_table, - "dataset.table", - tmp_gcs_bucket=self.tmp_gcs_bucket, - if_exists="drop", - gcs_client=gcs_client, - ) - - # check that we tried to delete the table - self.assertEqual(bq.client.delete_table.call_count, 1) - - # make sure we cleaned up the temp file - self.assertEqual(gcs_client.delete_blob.call_count, 1) - - def test_copy__bad_if_exists(self): - gcs_client = self._build_mock_cloud_storage_client() - - # set up object under test - bq = self._build_mock_client_for_copying() - - # call the method being tested - with self.assertRaises(ValueError): - bq.copy( - self.default_table, - "dataset.table", - tmp_gcs_bucket=self.tmp_gcs_bucket, - if_exists="foo", - gcs_client=gcs_client, - ) - - def test_copy__credentials_are_correctly_set(self): - tbl = self.default_table - bq = self._build_mock_client_for_copying(table_exists=False) - - # Pass in our fake GCS Client. - bq.copy( - tbl, - "dataset.table", - tmp_gcs_bucket=self.tmp_gcs_bucket, - gcs_client=FakeGoogleCloudStorage(), - ) - - actual = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] - - with open(actual, "r") as factual: - with open(self.cred_path, "r") as fexpected: - actual_str = factual.read() - self.assertEqual(actual_str, fexpected.read()) - self.assertEqual(self.cred_contents, json.loads(actual_str)) - - def _build_mock_client_for_querying(self, results): - # Create a mock that will play the role of the cursor - cursor = mock.MagicMock() - cursor.execute.return_value = None - cursor.fetchmany.side_effect = [results, []] - - # Create a mock that will play the role of the connection - connection = mock.MagicMock() - connection.cursor.return_value = cursor - - # Create a mock that will play the role of the Google BigQuery dbapi module - dbapi = mock.MagicMock() - dbapi.connect.return_value = connection - - # Create a mock that will play the role of our GoogleBigQuery client - client = mock.MagicMock() - - bq = GoogleBigQuery() - bq._client = client - bq._dbapi = dbapi - return bq - - def _build_mock_client_for_copying(self, table_exists=True): - bq_client = mock.MagicMock() - if not table_exists: - bq_client.get_table.side_effect = exceptions.NotFound("not found") - bq = GoogleBigQuery() - bq._client = bq_client - return bq - - def _build_mock_cloud_storage_client(self, tmp_blob_uri=""): - gcs_client = mock.MagicMock() - gcs_client.upload_table.return_value = tmp_blob_uri - return gcs_client - - @property - def default_table(self): - return Table( - [ - {"num": 1, "ltr": "a"}, - {"num": 2, "ltr": "b"}, - ] - )