From fab47d704d94e0e1017f47cc867ca8deee2eba4c Mon Sep 17 00:00:00 2001 From: Nicholas Long <1907354+nllong@users.noreply.github.com> Date: Wed, 2 Oct 2024 13:28:14 -0600 Subject: [PATCH] Fix and update syntax (#49) * update github action * fix syntax in readme * update with ruff and manual code cleanup * new pytest raises syntax * fix integration test * fix integration test * Ruff/tox update * chmod --------- Co-authored-by: Alex Swindler --- .github/workflows/ci.yml | 5 + .isort.cfg | 16 -- .pre-commit-config.yaml | 49 ++-- README.rst | 2 +- cspell.json | 82 +++--- pyseed/__init__.py | 3 +- pyseed/apibase.py | 281 ++++++++---------- pyseed/exceptions.py | 39 +-- pyseed/seed_client.py | 276 +++++++----------- pyseed/seed_client_base.py | 322 ++++++++++----------- pyseed/utils.py | 75 ++--- requirements-test.txt | 7 +- requirements.txt | 1 - ruff.toml | 63 ++++ setup.cfg | 39 +-- setup.py | 8 +- tests/test_apibase.py | 507 ++++++++++++--------------------- tests/test_seed_base.py | 121 ++++---- tests/test_seed_client.py | 123 ++++---- tests/test_seed_client_base.py | 318 ++++++++++----------- tests/test_utils.py | 5 +- tox.ini | 28 +- 22 files changed, 1044 insertions(+), 1326 deletions(-) delete mode 100644 .isort.cfg mode change 100755 => 100644 pyseed/apibase.py create mode 100644 ruff.toml mode change 100755 => 100644 tests/test_apibase.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index aa97a91..a218a44 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,8 +29,13 @@ jobs: run: | python -m pip install --upgrade pip python -m pip install tox tox-gh-actions + python -m pip install build twine - name: Test with tox run: tox -e ${{ matrix.test_env }} + - name: Test syntax of build package + run: | + python -m build + python -m twine check dist/* integration-tests: name: Run integration tests runs-on: ubuntu-latest diff --git a/.isort.cfg b/.isort.cfg deleted file mode 100644 index e3c918e..0000000 --- a/.isort.cfg +++ /dev/null @@ -1,16 +0,0 @@ -[settings] -line_length=79 -multi_line_output=3 -include_trailing_comma=1 -known_standard_library=typing -known_django=django -known_thirdparty=peewee -import_heading_stdlib=Imports from Standard Library -import_heading_thirdparty=Imports from Third Party Modules -import_heading_django=Imports from Django -import_heading_firstparty=Local Imports -sections=FUTURE,STDLIB,DJANGO,THIRDPARTY,FIRSTPARTY,LOCALFOLDER -not_skip = __init__.py - -# for additional settings see: -# https://github.com/timothycrosley/isort/wiki/isort-Settings diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 73157fe..ed03221 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,51 +1,34 @@ -exclude: | - (?x)( - ^docs/conf.py| - ^docs/license.rst - ) - repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.6.0 hooks: - - id: trailing-whitespace - id: check-added-large-files args: ["--maxkb=50000"] - id: check-ast + - id: check-builtin-literals + - id: check-case-conflict - id: check-json - id: check-merge-conflict - - id: check-xml + - id: check-toml - id: check-yaml - id: debug-statements - id: end-of-file-fixer - - id: requirements-txt-fixer + - id: fix-byte-order-marker - id: mixed-line-ending - args: ["--fix=auto"] - - repo: https://github.com/pre-commit/mirrors-autopep8 - rev: v2.0.4 - hooks: - - id: autopep8 - args: - [ - "--in-place", - "--aggressive", - "--aggressive", - "--recursive", - "--max-line-length=100", - "--ignore=E501,E402,W503,W504,E731", - ] - - repo: https://github.com/pycqa/flake8 - rev: 7.0.0 - hooks: - - id: flake8 - args: ["--ignore=E501,E402,W503,W504,E731,F401"] + - id: pretty-format-json + args: ["--autofix", "--no-sort-keys", "--no-ensure-ascii"] + - id: requirements-txt-fixer + - id: trailing-whitespace - repo: https://github.com/pre-commit/mirrors-prettier rev: v4.0.0-alpha.8 hooks: - id: prettier - types_or: [css, yaml, markdown, html, scss, javascript] - - repo: https://github.com/pre-commit/mirrors-isort - rev: v5.10.1 + types_or: [css, yaml, markdown, html, scss, javascript, json] + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.6.8 hooks: - - id: isort - args: ["-m=VERTICAL_HANGING_INDENT"] # vertical hanging + # Run the linter + - id: ruff + args: [--fix, --exit-non-zero-on-fix, --output-format=full] + # Run the formatter + - id: ruff-format diff --git a/README.rst b/README.rst index d96ed71..0746e09 100644 --- a/README.rst +++ b/README.rst @@ -21,7 +21,7 @@ More information can be found here: Compatibility Matrix -------------- +-------------------- .. list-table:: :widths: 50 50 diff --git a/cspell.json b/cspell.json index 98107d9..f1891d0 100644 --- a/cspell.json +++ b/cspell.json @@ -1,40 +1,46 @@ { - "version": "0.2", - "ignorePaths": [], - "dictionaryDefinitions": [], - "dictionaries": [], - "words": [ - "apibase", - "buildingsync", - "codeauthor", - "datafile", - "dname", - "durl", - "ECAM", - "EEEJ", - "ESPM", - "excpt", - "geocoded", - "greenbuildingregistry", - "jakejarvis", - "JSONAPI", - "jwalton", - "Munday", - "ndeloof", - "officedocument", - "openxmlformats", - "precommit", - "printenv", - "pyseed", - "pytest", - "sdist", - "SEEDO", - "seedrecords", - "spreadsheetml", - "subclassing", - "taxlot", - "taxlots" - ], - "ignoreWords": [], - "import": [] + "version": "0.2", + "ignorePaths": [], + "dictionaryDefinitions": [], + "dictionaries": [], + "words": [ + "apibase", + "BSYNCR", + "buildingsync", + "codeauthor", + "datafile", + "dname", + "durl", + "ECAM", + "EEEJ", + "ESPM", + "excpt", + "geocoded", + "greenbuildingregistry", + "jakejarvis", + "JSONAPI", + "jwalton", + "kflemin", + "Munday", + "ndeloof", + "nllong", + "NREL", + "officedocument", + "openxmlformats", + "precommit", + "printenv", + "pyseed", + "pytest", + "sdist", + "SEEDO", + "seedplatform", + "seedrecords", + "spreadsheetml", + "subclassing", + "taxlot", + "taxlots", + "Turas" + ], + "ignoreWords": [], + "import": [] } diff --git a/pyseed/__init__.py b/pyseed/__init__.py index ad527fd..3bb0257 100644 --- a/pyseed/__init__.py +++ b/pyseed/__init__.py @@ -1,5 +1,4 @@ -# Local Imports -from pyseed.seed_client_base import ( # noqa +from pyseed.seed_client_base import ( SEEDOAuthReadOnlyClient, SEEDOAuthReadWriteClient, SEEDReadOnlyClient, diff --git a/pyseed/apibase.py b/pyseed/apibase.py old mode 100755 new mode 100644 index 4d88033..7b99001 --- a/pyseed/apibase.py +++ b/pyseed/apibase.py @@ -1,49 +1,38 @@ -#!/usr/bin/env python """ copyright (c) 2016 Earth Advantage. All rights reserved. ..codeauthor::Paul Munday Functionality for calls to external APIs""" -# Imports from Third Party Modules import re -# Imports from External Modules + import requests -# Local Imports -# Public Functions and Classes -# Helper functions for use by BaseAPI subclasses from pyseed.exceptions import APIClientError def add_pk(url, pk, required=True, slash=False): """Add id/primary key to url""" if required and not pk: - raise APIClientError('id/pk must be supplied') + raise APIClientError("id/pk must be supplied") if pk: - if isinstance(pk, str) and not pk.isdigit(): - raise TypeError('id/pk must be a positive integer') - elif not isinstance(pk, (int, str)) or int(pk) < 0: - raise TypeError('id/pk must be a positive integer') - if not url.endswith('/'): - url = "{}/{}".format(url, pk) - else: - url = "{}{}".format(url, pk) - if slash: - # Only add the trailing slash if it's not already there - if not url.endswith('/'): - url = "{}/".format(url) + if isinstance(pk, str) and not pk.isdigit() or (not isinstance(pk, (int, str)) or int(pk) < 0): + raise TypeError("id/pk must be a positive integer") + url = f"{url}/{pk}" if not url.endswith("/") else f"{url}{pk}" + # Only add the trailing slash if it's not already there + if slash and not url.endswith("/"): + url = f"{url}/" return url -class BaseAPI(object): +class BaseAPI: """ Base class for API Calls """ + # pylint: disable=too-few-public-methods, too-many-instance-attributes - def __init__(self, url=None, use_ssl=True, timeout=None, use_json=False, - use_auth=False, auth=None, **kwargs): + def __init__(self, url=None, use_ssl=True, timeout=None, use_json=False, use_auth=False, auth=None, **kwargs): # pylint: disable=too-many-arguments """Set url,api key, auth usage, ssl usage, timeout etc. @@ -79,19 +68,19 @@ def __init__(self, url=None, use_ssl=True, timeout=None, use_json=False, def _construct_payload(self, params): """Construct parameters for an api call. -. - :param params: A dictionary of key-value pairs to include - in the request. - :return: A dictionary of k-v pairs to send to the server - in the request. + . + :param params: A dictionary of key-value pairs to include + in the request. + :return: A dictionary of k-v pairs to send to the server + in the request. """ - compulsory = getattr(self, 'compulsory_params', []) + compulsory = getattr(self, "compulsory_params", []) for param in compulsory: if param not in params: try: params[param] = getattr(self, param) except AttributeError: - msg = "{} is a compulsory field".format(param) + msg = f"{param} is a compulsory field" raise APIClientError(msg) return params @@ -100,31 +89,23 @@ def _construct_url(self, urlstring, use_ssl=None): # self.use_ssl takes priority to enforce ssl use use_ssl = self.use_ssl if self.use_ssl is not None else use_ssl if not urlstring and not self.url: - raise APIClientError('No url set') + raise APIClientError("No url set") elif not urlstring: url = self.url + elif urlstring.startswith("https://") and not use_ssl: + # We strip off url prefix + # raise an error if https is used in url without use_ssl + raise APIClientError("use_ssl is false but url starts with https") + elif urlstring.startswith("http://") and use_ssl: + # We strip off url prefix + # raise an error if http is used in url with use_ssl + raise APIClientError("use_ssl is true but url does not starts with https") else: - if urlstring.startswith('https://') and not use_ssl: - # We strip off url prefix - # raise an error if https is used in url without use_ssl - raise APIClientError( - 'use_ssl is false but url starts with https' - ) - elif urlstring.startswith('http://') and use_ssl: - # We strip off url prefix - # raise an error if http is used in url with use_ssl - raise APIClientError( - 'use_ssl is true but url does not starts with https' - ) - else: - # strip http(s):// off url - regex = re.compile('^https?://') - urlstring = regex.sub('', urlstring) - if use_ssl: - start = 'https://' - else: - start = 'http://' - url = "{}{}".format(start, urlstring) + # strip http(s):// off url + regex = re.compile("^https?://") + urlstring = regex.sub("", urlstring) + start = "https://" if use_ssl else "http://" + url = f"{start}{urlstring}" return url def check_call_success(self, response): @@ -136,15 +117,13 @@ def _get(self, url=None, use_ssl=None, **kwargs): """Internal method to make api calls using GET.""" url = self._construct_url(url, use_ssl=use_ssl) params = self._construct_payload(kwargs) - payload = { - 'timeout': self.timeout, - 'headers': params.pop('headers', None) - } + payload = {"timeout": self.timeout, "headers": params.pop("headers", None)} if params: - payload['params'] = params - if self.auth: # pragma: no cover - payload['auth'] = self.auth - api_call = requests.get(url, **payload) + payload["params"] = params + if self.auth: # pragma: no cover + payload["auth"] = self.auth + # timeout is specified in the payload + api_call = requests.get(url, **payload) # noqa: S113 return api_call def _post(self, url=None, use_ssl=None, params=None, files=None, **kwargs): @@ -153,132 +132,121 @@ def _post(self, url=None, use_ssl=None, params=None, files=None, **kwargs): if not params: params = {} params = self._construct_payload(params) - payload = { - 'timeout': self.timeout, - 'headers': params.pop('headers', None) - } + payload = {"timeout": self.timeout, "headers": params.pop("headers", None)} if params: - payload['params'] = params + payload["params"] = params if files: - payload['files'] = files - if self.auth: # pragma: no cover - payload['auth'] = self.auth + payload["files"] = files + if self.auth: # pragma: no cover + payload["auth"] = self.auth if self.use_json: - data = kwargs.pop('json', None) + data = kwargs.pop("json", None) if data: - payload['json'] = data + payload["json"] = data else: # just put the remaining kwargs into the json field - payload['json'] = kwargs + payload["json"] = kwargs else: - data = kwargs.pop('data', None) + data = kwargs.pop("data", None) if data: - payload['data'] = data + payload["data"] = data else: # just put the remaining kwargs into the data field - payload['data'] = kwargs + payload["data"] = kwargs # if there are any remaining kwargs, then put them into the params - if 'params' not in payload: - payload['params'] = {} - payload['params'].update(**kwargs) + if "params" not in payload: + payload["params"] = {} + payload["params"].update(**kwargs) # now do the actual call to post! - api_call = requests.post(url, **payload) + # timeout is specified in the payload + api_call = requests.post(url, **payload) # noqa: S113 return api_call - def _put(self, url=None, use_ssl=None, params=None, files=None, - **kwargs): + def _put(self, url=None, use_ssl=None, params=None, files=None, **kwargs): """Internal method to make api calls using PUT.""" url = self._construct_url(url, use_ssl=use_ssl) if not params: params = {} params = self._construct_payload(params) - payload = { - 'timeout': self.timeout, - 'headers': params.pop('headers', None) - } + payload = {"timeout": self.timeout, "headers": params.pop("headers", None)} if params: - payload['params'] = params - if files: # pragma: no cover - payload['files'] = files - if self.auth: # pragma: no cover - payload['auth'] = self.auth + payload["params"] = params + if files: # pragma: no cover + payload["files"] = files + if self.auth: # pragma: no cover + payload["auth"] = self.auth if self.use_json: - data = kwargs.pop('json', None) + data = kwargs.pop("json", None) if data: - payload['json'] = data + payload["json"] = data else: # just put the remaining kwargs into the json field - payload['json'] = kwargs + payload["json"] = kwargs else: - data = kwargs.pop('data', None) + data = kwargs.pop("data", None) if data: - payload['data'] = data + payload["data"] = data else: # just put the remaining kwargs into the data field - payload['data'] = kwargs + payload["data"] = kwargs # if there are any remaining kwargs, then put them into the params - if 'params' not in payload: - payload['params'] = {} - payload['params'].update(**kwargs) - - api_call = requests.put(url, **payload) + if "params" not in payload: + payload["params"] = {} + payload["params"].update(**kwargs) + # timeout is specified in the payload + api_call = requests.put(url, **payload) # noqa: S113 return api_call - def _patch(self, url=None, use_ssl=None, params=None, files=None, - **kwargs): + def _patch(self, url=None, use_ssl=None, params=None, files=None, **kwargs): """Internal method to make api calls using PATCH.""" url = self._construct_url(url, use_ssl=use_ssl) if not params: params = {} params = self._construct_payload(params) - payload = { - 'timeout': self.timeout, - 'headers': params.pop('headers', None) - } + payload = {"timeout": self.timeout, "headers": params.pop("headers", None)} if params: - payload['params'] = params + payload["params"] = params if files: - payload['files'] = files - if self.auth: # pragma: no cover - payload['auth'] = self.auth + payload["files"] = files + if self.auth: # pragma: no cover + payload["auth"] = self.auth if self.use_json: - data = kwargs.pop('json', None) + data = kwargs.pop("json", None) if data: - payload['json'] = data + payload["json"] = data else: # just put the remaining kwargs into the json field - payload['json'] = kwargs + payload["json"] = kwargs else: - data = kwargs.pop('data', None) + data = kwargs.pop("data", None) if data: - payload['data'] = data + payload["data"] = data else: # just put the remaining kwargs into the data field - payload['data'] = kwargs + payload["data"] = kwargs # if there are any remaining kwargs, then put them into the params - if 'params' not in payload: - payload['params'] = {} - payload['params'].update(**kwargs) - api_call = requests.patch(url, **payload) + if "params" not in payload: + payload["params"] = {} + payload["params"].update(**kwargs) + # timeout is specified in the payload + api_call = requests.patch(url, **payload) # noqa: S113 return api_call def _delete(self, url=None, use_ssl=None, **kwargs): """Internal method to make api calls using DELETE.""" url = self._construct_url(url, use_ssl=use_ssl) params = self._construct_payload(kwargs) - payload = { - 'timeout': self.timeout, - 'headers': params.pop('headers', None) - } + payload = {"timeout": self.timeout, "headers": params.pop("headers", None)} if params: - payload['params'] = params - if self.auth: # pragma: no cover - payload['auth'] = self.auth - api_call = requests.delete(url, **payload) + payload["params"] = params + if self.auth: # pragma: no cover + payload["auth"] = self.auth + # timeout is specified in the payload + api_call = requests.delete(url, **payload) # noqa: S113 return api_call @@ -286,31 +254,29 @@ class JSONAPI(BaseAPI): """ Base class for Json API Calls. See BaseAPI for documentation. """ + # pylint: disable=too-few-public-methods, too-many-arguments - def __init__(self, url=None, use_ssl=True, timeout=None, - use_auth=False, auth=None, **kwargs): - super(JSONAPI, self).__init__( - url=url, use_ssl=use_ssl, timeout=timeout, use_json=True, - use_auth=use_auth, auth=auth, **kwargs - ) + def __init__(self, url=None, use_ssl=True, timeout=None, use_auth=False, auth=None, **kwargs): + super().__init__(url=url, use_ssl=use_ssl, timeout=timeout, use_json=True, use_auth=use_auth, auth=auth, **kwargs) -class UserAuthMixin(object): +class UserAuthMixin: """ Mixin to provide basic or digest api client authentication via username and password(or api_key).""" + # pylint:disable=too-few-public-methods def _get_auth(self): """Get basic or digest auth by username/password""" - username = getattr(self, 'username', None) - password = getattr(self, 'password', None) + username = getattr(self, "username", None) + password = getattr(self, "password", None) # support using api_key as password in basic auth # as used by SEED (if supplied as api_key not password) if not password: - password = getattr(self, 'api_key', None) - if getattr(self, 'auth_method', None) == 'digest': + password = getattr(self, "api_key", None) + if getattr(self, "auth_method", None) == "digest": auth = requests.auth.HTTPDigestAuth(username, password) else: auth = requests.auth.HTTPBasicAuth(username, password) @@ -318,18 +284,18 @@ def _get_auth(self): def _construct_payload(self, params): """Construct parameters for an api call. -. - :param params: A dictionary of key-value pairs to include - in the request. - :return: A dictionary of k-v pairs to send to the server - in the request. + . + :param params: A dictionary of key-value pairs to include + in the request. + :return: A dictionary of k-v pairs to send to the server + in the request. """ - if getattr(self, 'use_auth', None) and not getattr(self, 'auth', None): + if getattr(self, "use_auth", None) and not getattr(self, "auth", None): self.auth = self._get_auth() - return super(UserAuthMixin, self)._construct_payload(params) + return super()._construct_payload(params) -class OAuthMixin(object): +class OAuthMixin: """ Mixin to provide api client authentication via OAuth access tokens based on the JWTGrantClient found in jwt-oauth2lib. @@ -337,20 +303,17 @@ class OAuthMixin(object): see https://github.com/GreenBuildingRegistry/jwt_oauth2 """ - _token_type = "Bearer" + _token_type = "Bearer" # noqa: S105 oauth_client = None def _get_access_token(self): """Generate OAuth access token""" - private_key_file = getattr(self, 'private_key_location', None) - client_id = getattr(self, 'client_id', None) - username = getattr(self, 'username', None) - with open(private_key_file, 'r') as pk_file: + private_key_file = getattr(self, "private_key_location", None) + client_id = getattr(self, "client_id", None) + username = getattr(self, "username", None) + with open(private_key_file) as pk_file: sig = pk_file.read() - oauth_client = self.oauth_client( - sig, username, client_id, - pvt_key_password=getattr(self, 'pvt_key_password', None) - ) + oauth_client = self.oauth_client(sig, username, client_id, pvt_key_password=getattr(self, "pvt_key_password", None)) return oauth_client.get_access_token() def _construct_payload(self, params): @@ -361,9 +324,7 @@ def _construct_payload(self, params): :return: A dictionary of k-v pairs to send to the server in the request. """ - params = super(OAuthMixin, self)._construct_payload(params) - token = getattr(self, 'token', None) or self._get_access_token() - params['headers'] = { - 'Authorization': '{} {}'.format(self._token_type, token) - } + params = super()._construct_payload(params) + token = getattr(self, "token", None) or self._get_access_token() + params["headers"] = {"Authorization": f"{self._token_type} {token}"} return params diff --git a/pyseed/exceptions.py b/pyseed/exceptions.py index f73c3b7..bc92d8b 100644 --- a/pyseed/exceptions.py +++ b/pyseed/exceptions.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python -# encoding: utf-8 """ copyright (c) 2016-2017 Earth Advantage. All rights reserved @@ -22,48 +20,37 @@ class APIClientError(Exception): """Indicates errors when calling an API""" - def __init__(self, error, service=None, url=None, caller=None, - verb=None, status_code=None, **kwargs): + def __init__(self, error, service=None, url=None, caller=None, verb=None, status_code=None, **kwargs): self.error = error self.service = service self.url = url self.caller = caller self.verb = verb self.status_code = status_code - args = ( - error, service, url, caller, verb.upper() if verb else None, - status_code - ) + args = (error, service, url, caller, verb.upper() if verb else None, status_code) self.kwargs = kwargs - super(APIClientError, self).__init__(*args) + super().__init__(*args) def __str__(self): - msg = "{}: {}".format(self.__class__.__name__, self.error) + msg = f"{self.__class__.__name__}: {self.error}" if self.service: - msg = "{}, calling service {}".format(msg, self.service) + msg = f"{msg}, calling service {self.service}" if self.caller: - msg = "{} as {}".format(msg, self.caller) + msg = f"{msg} as {self.caller}" if self.url: - msg = "{} with url {}".format(msg, self.url) + msg = f"{msg} with url {self.url}" if self.verb: - msg = "{}, http method: {}".format(msg, self.verb.upper()) + msg = f"{msg}, http method: {self.verb.upper()}" if self.kwargs: - arguments = ", ".join([ - "{}={}".format(str(key), str(val)) - for key, val in self.kwargs.items() - ]) - msg = "{} supplied with {}".format(msg, arguments) + arguments = ", ".join([f"{key!s}={val!s}" for key, val in self.kwargs.items()]) + msg = f"{msg} supplied with {arguments}" if self.status_code: - msg = "{} http status code: {}".format(msg, self.status_code) + msg = f"{msg} http status code: {self.status_code}" return msg class SEEDError(APIClientError): """Indicates Error interacting with SEED API""" - def __init__(self, error, url=None, caller=None, verb=None, - status_code=None, **kwargs): - super(SEEDError, self).__init__( - error, service='SEED', url=url, caller=caller, verb=verb, - status_code=status_code, **kwargs - ) + def __init__(self, error, url=None, caller=None, verb=None, status_code=None, **kwargs): + super().__init__(error, service="SEED", url=url, caller=caller, verb=verb, status_code=status_code, **kwargs) diff --git a/pyseed/seed_client.py b/pyseed/seed_client.py index 6b7b0e4..30995b2 100644 --- a/pyseed/seed_client.py +++ b/pyseed/seed_client.py @@ -3,10 +3,6 @@ See also https://github.com/seed-platform/py-seed/main/LICENSE """ -# Imports from Standard Library -from typing import Any, Optional, Union - -# Imports from Third Party Modules import json import logging import os @@ -14,17 +10,18 @@ from collections import Counter from csv import DictReader from datetime import date -from openpyxl import Workbook from pathlib import Path +from typing import Any, Optional, Union + +from openpyxl import Workbook -# Local Imports from pyseed.seed_client_base import SEEDReadWriteClient from pyseed.utils import read_map_file logger = logging.getLogger(__name__) -class SeedClientWrapper(object): +class SeedClientWrapper: """This is a wrapper around the SEEDReadWriteClient. If you need access to the READOnly client, or the OAuth client, then you will need to create another class""" @@ -53,9 +50,7 @@ def __init__( Exception: SeedClientWrapper """ if not connection_params and not connection_config_filepath: - raise Exception( - "Must provide either connection_params or connection_config_filepath" - ) + raise Exception("Must provide either connection_params or connection_config_filepath") # favor the connection params over the config file self.payload = {} @@ -63,9 +58,7 @@ def __init__( # the connection params are simply squashed on SEEDReadWriteClient init self.payload = connection_params elif connection_config_filepath: - self.payload = SeedClientWrapper.read_connection_config_file( - connection_config_filepath - ) + self.payload = SeedClientWrapper.read_connection_config_file(connection_config_filepath) # read in from config file self.client = SEEDReadWriteClient(organization_id, **self.payload) @@ -92,9 +85,11 @@ def read_connection_config_file(cls, filepath: Path) -> dict: filepath (str): path to the connection config file """ if not filepath.exists(): - raise Exception(f"Cannot find connection config file: {str(filepath)}") + raise Exception(f"Cannot find connection config file: {filepath!s}") - connection_params = json.load(open(filepath)) + connection_params = None + with open(filepath) as f: + connection_params = json.load(f) return connection_params @@ -111,8 +106,8 @@ def __init__( super().__init__(organization_id, connection_params, connection_config_filepath) # set org if you can - if self.payload and self.payload.get('seed_org_name', None): - self.get_org_by_name(self.payload['seed_org_name'], set_org_id=True) + if self.payload and self.payload.get("seed_org_name", None): + self.get_org_by_name(self.payload["seed_org_name"], set_org_id=True) def get_org_id(self) -> int: """Return the org ID that is set""" @@ -148,7 +143,7 @@ def instance_information(self) -> dict: # http://localhost:8000/api/version/ # add in URL to the SEED instance # add in username (but not the password/api key) - info = self.client.get(None, required_pk=False, endpoint="version", data_name='all') + info = self.client.get(None, required_pk=False, endpoint="version", data_name="all") info["host"] = self.client.base_url info["username"] = self.client.username return info @@ -200,7 +195,7 @@ def get_user_id(self, username: str) -> Union[None, int]: Returns: int: user ID """ - for user in self.get_users()['users']: + for user in self.get_users()["users"]: # compare string case insensitive if user["email"].lower() == username.lower(): return user["user_id"] @@ -257,7 +252,7 @@ def get_buildings(self) -> list[dict]: # step through each page of the results buildings: list[dict] = [] - for i in range(1, total_qry['num_pages'] + 1): + for i in range(1, total_qry["num_pages"] + 1): buildings = buildings + self.client.list( endpoint="properties", data_name="results", @@ -286,9 +281,7 @@ def get_property_view(self, property_view_id: int) -> dict: ... } """ - return self.client.get( - property_view_id, endpoint="property_views", data_name="property_views" - ) + return self.client.get(property_view_id, endpoint="property_views", data_name="property_views") def get_property(self, property_view_id: int) -> dict: """Return a single property by the property view id. @@ -309,9 +302,7 @@ def get_property(self, property_view_id: int) -> dict: } """ # NOTE: this seems to be the call that OEP uses (returns property and labels dictionaries) - return self.client.get( - property_view_id, endpoint="properties", data_name="properties" - ) + return self.client.get(property_view_id, endpoint="properties", data_name="properties") def search_buildings( self, @@ -330,9 +321,7 @@ def search_buildings( if identifier_exact is not None: payload["identifier_exact"] = identifier_exact - properties = self.client.get( - None, required_pk=False, endpoint="properties_search", **payload - ) + properties = self.client.get(None, required_pk=False, endpoint="properties_search", **payload) return properties def get_labels(self, filter_by_name: Optional[list] = None) -> list: @@ -363,9 +352,7 @@ def get_labels(self, filter_by_name: Optional[list] = None) -> list: labels = [label for label in labels if label["name"] in filter_by_name] return labels - def get_or_create_label( - self, label_name: str, color: str = "blue", show_in_list: bool = False - ) -> dict: + def get_or_create_label(self, label_name: str, color: str = "blue", show_in_list: bool = False) -> dict: """_summary_ Args: @@ -436,9 +423,7 @@ def update_label( # remove the org id from the json data current_label.pop("organization_id") - return self.client.put( - current_label["id"], endpoint="labels", json=current_label - ) + return self.client.put(current_label["id"], endpoint="labels", json=current_label) def delete_label(self, label_name: str) -> dict: """Deletes an existing label. This method will look up the ID of the label to delete. @@ -452,9 +437,9 @@ def delete_label(self, label_name: str) -> dict: label = self.get_labels(filter_by_name=[label_name]) if len(label) != 1: raise Exception(f"Could not find label to delete with name {label_name}") - id = label[0]["id"] + label_id = label[0]["id"] - return self.client.delete(id, endpoint="labels") + return self.client.delete(label_id, endpoint="labels") def get_view_ids_with_label(self, label_names: Union[str, list] = []) -> list: """Get the view IDs of the properties with a given label name(s). Can be a single @@ -540,9 +525,7 @@ def update_labels_of_buildings( "add_label_ids": add_label_ids, "remove_label_ids": remove_label_ids, } - result = self.client.put( - None, required_pk=False, endpoint=endpoint, json=payload - ) + result = self.client.put(None, required_pk=False, endpoint=endpoint, json=payload) return result def create_building(self, params: dict) -> list: @@ -554,35 +537,31 @@ def create_building(self, params: dict) -> list: Returns the created property_view id """ # first try matching on custom_id_1 - matching_id = params.get('state', {}).get('custom_id_1', None) + matching_id = params.get("state", {}).get("custom_id_1", None) if not matching_id: # then try on pm_property_id - matching_id = params.get('state', {}).get('pm_property_id', None) + matching_id = params.get("state", {}).get("pm_property_id", None) if not matching_id: - raise Exception( - "This property does not have a pm_property_id or a custom_id_1 for matching...cannot create." - ) + raise Exception("This property does not have a pm_property_id or a custom_id_1 for matching...cannot create.") - cycle_id = params.get('cycle_id', None) + cycle_id = params.get("cycle_id") # include appropriate cycle in search (if not using the default cycle set on the class) buildings = self.search_buildings(identifier_exact=matching_id, cycle_id=cycle_id) if len(buildings) > 0: - raise Exception( - "A property matching the provided matching ID (pm_property_id or custom_id_1) already exists." - ) + raise Exception("A property matching the provided matching ID (pm_property_id or custom_id_1) already exists.") results = self.client.post(endpoint="properties", json=params) return results - def update_building(self, id, params: dict) -> list: + def update_building(self, property_view_id, params: dict) -> list: """ Updates a building's property_view - Expects id and params to contain a state dictionary + Expects property_view_id and params to contain a state dictionary """ - results = self.client.put(id, endpoint="properties", json=params) + results = self.client.put(property_view_id, endpoint="properties", json=params) return results def get_cycles(self) -> list: @@ -643,9 +622,7 @@ def create_cycle(self, cycle_name: str, start_date: date, end_date: date) -> dic existing_cycles = self.get_cycles() for cycle in existing_cycles: if cycle["name"] == cycle_name: - raise Exception( - f"A cycle with this name already exists: '{cycle_name}'" - ) + raise Exception(f"A cycle with this name already exists: '{cycle_name}'") cycles = self.client.post(endpoint="cycles", json=post_data) return cycles["cycles"] @@ -775,14 +752,10 @@ def get_or_create_dataset(self, dataset_name: str) -> dict: dataset = self.client.post(endpoint="datasets", json=post_data) selected = {} if dataset["status"] == "success": - selected = self.client.get( - dataset["id"], endpoint="datasets", data_name="dataset" - ) + selected = self.client.get(dataset["id"], endpoint="datasets", data_name="dataset") return selected - def upload_datafile( - self, dataset_id: int, data_file: str, upload_datatype: str - ) -> dict: + def upload_datafile(self, dataset_id: int, data_file: str, upload_datatype: str) -> dict: """Upload a datafile file Args: @@ -804,7 +777,7 @@ def upload_datafile( } files_params = [ - ("file", (Path(data_file).name, open(Path(data_file).resolve(), "rb"))), + ("file", (Path(data_file).name, open(Path(data_file).resolve(), "rb"))), # noqa: SIM115 ] return self.client.post( @@ -844,7 +817,7 @@ def track_progress_result(self, progress_key) -> dict: endpoint="progress", url_args={"PROGRESS_KEY": progress_key}, ) - except Exception: + except Exception: # noqa: BLE001 logger.error("Other unknown exception caught") progress_result = None @@ -877,17 +850,11 @@ def get_column_mapping_profiles(self, profile_type: str = "All") -> dict: # return only the unmarked indices if indices_to_remove: - result = [ - item - for index, item in enumerate(result) - if index not in indices_to_remove - ] + result = [item for index, item in enumerate(result) if index not in indices_to_remove] return result - def get_column_mapping_profile( - self, column_mapping_profile_name: str - ) -> Optional[dict]: + def get_column_mapping_profile(self, column_mapping_profile_name: str) -> Optional[dict]: """get a specific column mapping profile. Currently, filter does not take an argument by name, so return them all and find the one that matches the column_mapping_profile_name. @@ -906,9 +873,7 @@ def get_column_mapping_profile( # if nothing, then return none return None - def create_or_update_column_mapping_profile( - self, mapping_profile_name: str, mappings: list - ) -> dict: + def create_or_update_column_mapping_profile(self, mapping_profile_name: str, mappings: list) -> dict: """Create or update an existing mapping profile from a list of mappings This only works for 'Normal' column mapping profiles, that is, it does not work for @@ -963,15 +928,11 @@ def create_or_update_column_mapping_profile( payload = { "mappings": mappings, } - result = self.client.put( - profile["id"], endpoint="column_mapping_profiles", json=payload - ) + result = self.client.put(profile["id"], endpoint="column_mapping_profiles", json=payload) return result - def create_or_update_column_mapping_profile_from_file( - self, mapping_profile_name: str, mapping_file: str - ) -> dict: + def create_or_update_column_mapping_profile_from_file(self, mapping_profile_name: str, mapping_file: str) -> dict: """creates or updates a mapping profile. The format of the mapping file is a CSV with the following format: Raw Columns, units, SEED Table, SEED Columns, Omit\n @@ -1001,13 +962,9 @@ def create_or_update_column_mapping_profile_from_file( if not Path(mapping_file).exists(): raise Exception(f"Could not find mapping file: {mapping_file}") - return self.create_or_update_column_mapping_profile( - mapping_profile_name, read_map_file(mapping_file) - ) + return self.create_or_update_column_mapping_profile(mapping_profile_name, read_map_file(mapping_file)) - def set_import_file_column_mappings( - self, import_file_id: int, mappings: list - ) -> dict: + def set_import_file_column_mappings(self, import_file_id: int, mappings: list) -> dict: """Sets the column mappings onto the import file record. Args: @@ -1036,7 +993,14 @@ def get_columns(self) -> dict: result = self.client.list(endpoint="columns") return result - def create_extra_data_column(self, column_name: str, display_name: str, inventory_type: str, column_description: str, data_type: str) -> dict: + def create_extra_data_column( + self, + column_name: str, + display_name: str, + inventory_type: str, + column_description: str, + data_type: str, + ) -> dict: """Create an extra data column. If column exists, skip Args: @@ -1058,11 +1022,11 @@ def create_extra_data_column(self, column_name: str, display_name: str, inventor """ # get extra data columns (only) result = self.client.list(endpoint="columns") - columns = result['columns'] - extra_data_cols = [item for item in columns if item['is_extra_data']] + columns = result["columns"] + extra_data_cols = [item for item in columns if item["is_extra_data"]] # see if extra data column already exists (for now don't update it, just skip it) - res = list(filter(lambda extra_data_cols: extra_data_cols['column_name'] == column_name, extra_data_cols)) + res = list(filter(lambda extra_data_cols: extra_data_cols["column_name"] == column_name, extra_data_cols)) if res: # column already exists result = {"status": "noop", "message": "column already exists"} @@ -1074,7 +1038,7 @@ def create_extra_data_column(self, column_name: str, display_name: str, inventor "table_name": "PropertyState" if inventory_type == "Property" else "TaxlotState", "column_description": column_description, "data_type": data_type, - "organization_id": self.get_org_id() + "organization_id": self.get_org_id(), } result = self.client.post(endpoint="columns", json=payload) @@ -1100,7 +1064,7 @@ def create_extra_data_columns_from_file(self, columns_csv_filepath: str) -> list }] """ # open file in read mode - with open(columns_csv_filepath, 'r') as f: + with open(columns_csv_filepath) as f: dict_reader = DictReader(f) columns = list(dict_reader) @@ -1131,8 +1095,7 @@ def get_meters(self, property_id: int) -> list: ... ] """ - meters = self.client.get(None, required_pk=False, endpoint='properties_meters', - url_args={"PK": property_id}) + meters = self.client.get(None, required_pk=False, endpoint="properties_meters", url_args={"PK": property_id}) return meters def get_meter(self, property_view_id: int, meter_type: str, source: str, source_id: str) -> Union[dict, None]: @@ -1150,10 +1113,9 @@ def get_meter(self, property_view_id: int, meter_type: str, source: str, source_ # return all the meters for the property and see if the meter exists, if so, return it meters = self.get_meters(property_view_id) for meter in meters: - if meter['type'] == meter_type and meter['source'] == source and meter['source_id'] == source_id: + if meter["type"] == meter_type and meter["source"] == source and meter["source_id"] == source_id: return meter - else: - return None + return None def get_or_create_meter(self, property_view_id: int, meter_type: str, source: str, source_id: str) -> Optional[dict[Any, Any]]: """get or create a meter for a property view. @@ -1174,14 +1136,12 @@ def get_or_create_meter(self, property_view_id: int, meter_type: str, source: st else: # create the meter payload = { - 'type': meter_type, - 'source': source, - 'source_id': source_id, + "type": meter_type, + "source": source, + "source_id": source_id, } - meter = self.client.post( - endpoint='properties_meters', url_args={"PK": property_view_id}, json=payload - ) + meter = self.client.post(endpoint="properties_meters", url_args={"PK": property_view_id}, json=payload) return meter @@ -1195,9 +1155,7 @@ def delete_meter(self, property_view_id: int, meter_id: int) -> dict: Returns: dict: status of the deletion """ - return self.client.delete( - meter_id, endpoint='properties_meters', url_args={"PK": property_view_id} - ) + return self.client.delete(meter_id, endpoint="properties_meters", url_args={"PK": property_view_id}) def upsert_meter_readings_bulk(self, property_view_id: int, meter_id: int, data: list) -> dict: """Upsert meter readings for a property's meter with the bulk method. @@ -1212,11 +1170,13 @@ def upsert_meter_readings_bulk(self, property_view_id: int, meter_id: int, data: """ # get the meter data for the property readings = self.client.post( - endpoint='properties_meters_reading', url_args={"PK": property_view_id, "METER_PK": meter_id}, json=data + endpoint="properties_meters_reading", + url_args={"PK": property_view_id, "METER_PK": meter_id}, + json=data, ) return readings - def get_meter_data(self, property_id, interval: str = 'Exact', excluded_meter_ids: list = []): + def get_meter_data(self, property_id, interval: str = "Exact", excluded_meter_ids: list = []): """Return the meter data from the property. Args: @@ -1228,7 +1188,7 @@ def get_meter_data(self, property_id, interval: str = 'Exact', excluded_meter_id "interval": interval, "excluded_meter_ids": excluded_meter_ids, } - meter_data = self.client.post(endpoint='properties_meter_usage', url_args={"PK": property_id}, json=payload) + meter_data = self.client.post(endpoint="properties_meter_usage", url_args={"PK": property_id}, json=payload) return meter_data def start_save_data(self, import_file_id: int, multiple_cycle_upload: bool = False) -> dict: @@ -1252,8 +1212,7 @@ def start_save_data(self, import_file_id: int, multiple_cycle_upload: bool = Fal return self.client.post( "import_files_start_save_data_pk", url_args={"PK": import_file_id}, - json={"cycle_id": self.cycle_id, - "multiple_cycle_upload": multiple_cycle_upload}, + json={"cycle_id": self.cycle_id, "multiple_cycle_upload": multiple_cycle_upload}, ) def start_map_data(self, import_file_id: int) -> dict: @@ -1313,9 +1272,7 @@ def start_system_matching_and_geocoding(self, import_file_id: int) -> dict: } } """ - return self.client.post( - "import_files_start_matching_pk", url_args={"PK": import_file_id} - ) + return self.client.post("import_files_start_matching_pk", url_args={"PK": import_file_id}) def get_matching_results(self, import_file_id: int) -> dict: """matching results summary @@ -1415,9 +1372,7 @@ def download_pm_report(self, pm_username: str, pm_password: str, pm_template: di """ response = self.client.post( endpoint="portfolio_manager_report", - json={"username": pm_username, - "password": pm_password, - "template": pm_template}, + json={"username": pm_username, "password": pm_password, "template": pm_template}, ) # Get the "properties" key from the dictionary. @@ -1431,8 +1386,8 @@ def download_pm_report(self, pm_username: str, pm_password: str, pm_template: di # Get the header row from the API response. header_row = [] - for property in properties: - for key in property: + for prop in properties: + for key in prop: if key not in header_row: header_row.append(key) @@ -1441,15 +1396,15 @@ def download_pm_report(self, pm_username: str, pm_password: str, pm_template: di sheet.append(header_row) # Loop over the list of dictionaries and write the data to the sheet object. - for property in properties: + for prop in properties: row = [] for key in header_row: - row.append(property[key]) + row.append(prop[key]) if sheet: sheet.append(row) # Report Template name - report_template_name = pm_template['name'] + report_template_name = pm_template["name"] # Filename file_name = f"{pm_username}_{report_template_name}.xlsx" @@ -1491,9 +1446,7 @@ def import_files_reuse_inventory_file_for_meters(self, import_file_id: int) -> d } """ payload = {"import_file_id": import_file_id} - response = self.client.post( - endpoint="import_files_reuse_inventory_file_for_meters", json=payload - ) + response = self.client.post(endpoint="import_files_reuse_inventory_file_for_meters", json=payload) return response def upload_and_match_datafile( @@ -1539,14 +1492,10 @@ def upload_and_match_datafile( result = self.track_progress_result(progress_key) # create/retrieve the column mappings - result = self.create_or_update_column_mapping_profile_from_file( - column_mapping_profile_name, column_mappings_file - ) + result = self.create_or_update_column_mapping_profile_from_file(column_mapping_profile_name, column_mappings_file) # set the column mappings for the dataset - result = self.set_import_file_column_mappings( - import_file_id, result["mappings"] - ) + result = self.set_import_file_column_mappings(import_file_id, result["mappings"]) # now start the mapping result = self.start_map_data(import_file_id) @@ -1568,9 +1517,7 @@ def upload_and_match_datafile( # check if we need to import meters and if they exist if import_meters_if_exist and self.check_meters_tab_exist(import_file_id): - reuse_file = self.import_files_reuse_inventory_file_for_meters( - import_file_id - ) + reuse_file = self.import_files_reuse_inventory_file_for_meters(import_file_id) meter_import_file_id = reuse_file["import_file_id"] @@ -1599,17 +1546,14 @@ def retrieve_at_building_and_update(self, audit_template_building_id: int, cycle None, required_pk=False, endpoint="audit_template_building_xml", - url_args={"PK": audit_template_building_id} + url_args={"PK": audit_template_building_id}, ) - if response['status'] == 'success': + if response["status"] == "success": # now post to api/v3/properties/PK/update_with_buildingsync - xml_file = response['content'] - filename = 'at_' + str(int(time.time() * 1000)) + '.xml' - files = [ - ('file', (filename, xml_file)), - ('file_type', (None, 1)) - ] + xml_file = response["content"] + filename = "at_" + str(int(time.time() * 1000)) + ".xml" + files = [("file", (filename, xml_file)), ("file_type", (None, 1))] response = self.client.put( None, @@ -1617,7 +1561,7 @@ def retrieve_at_building_and_update(self, audit_template_building_id: int, cycle endpoint="properties_update_with_buildingsync", url_args={"PK": seed_id}, files=files, - cycle_id=cycle_id + cycle_id=cycle_id, ) return response @@ -1627,7 +1571,7 @@ def retrieve_at_submission_and_update( audit_template_submission_id: int, cycle_id: int, seed_id: int, - report_format: str = 'pdf', + report_format: str = "pdf", filename: Optional[str] = None, ) -> dict: """Connect to audit template and retrieve audit report by submission ID @@ -1652,40 +1596,32 @@ def retrieve_at_submission_and_update( required_pk=False, endpoint="audit_template_submission", url_args={"PK": audit_template_submission_id}, - report_format=report_format + report_format=report_format, ) - if response['status'] == 'success': - if report_format.lower() == 'pdf': - + if response["status"] == "success": + if report_format.lower() == "pdf": # for PDF, store pdf report as inventory document - pdf_file = response['content'] + pdf_file = response["content"] if not filename: - filename = 'at_submission_report_' + str(audit_template_submission_id) + '.pdf' - files = [ - ('file', (filename, pdf_file)), - ('file_type', (None, 1)) - ] + filename = "at_submission_report_" + str(audit_template_submission_id) + ".pdf" + files = [("file", (filename, pdf_file)), ("file_type", (None, 1))] response2 = self.client.put( None, required_pk=False, endpoint="properties_upload_inventory_document", url_args={"PK": seed_id}, - files=files + files=files, ) - response2['pdf_report'] = pdf_file + response2["pdf_report"] = pdf_file else: - # assume XML. for XML, update property with BuildingSync # now post to api/v3/properties/PK/update_with_buildingsync - xml_file = response['content'] + xml_file = response["content"] if not filename: - filename = 'at_' + str(int(time.time() * 1000)) + '.xml' + filename = "at_" + str(int(time.time() * 1000)) + ".xml" - files = [ - ('file', (filename, xml_file)), - ('file_type', (None, 1)) - ] + files = [("file", (filename, xml_file)), ("file_type", (None, 1))] response2 = self.client.put( None, @@ -1693,7 +1629,7 @@ def retrieve_at_submission_and_update( endpoint="properties_update_with_buildingsync", url_args={"PK": seed_id}, files=files, - cycle_id=cycle_id + cycle_id=cycle_id, ) return response2 @@ -1716,15 +1652,15 @@ def retrieve_portfolio_manager_property(self, username: str, password: str, pm_p response = self.client.post( "portfolio_manager_property_download", json={"username": username, "password": password}, - url_args={"PK": pm_property_id} + url_args={"PK": pm_property_id}, ) - result = {'status': 'error'} + result = {"status": "error"} # save the file to the location that was passed # note that the data are returned directly (the ESPM URL directly downloads the file) if isinstance(response, bytes): - with open(save_file_name, 'wb') as f: + with open(save_file_name, "wb") as f: f.write(response) - result['status'] = 'success' + result["status"] = "success" return result def import_portfolio_manager_property(self, seed_id: int, cycle_id: int, mapping_profile_id: int, file_path: str) -> dict: @@ -1738,7 +1674,7 @@ def import_portfolio_manager_property(self, seed_id: int, cycle_id: int, mapping in the 'Meter Entries' tab""" files_params = [ - ("file", (Path(file_path).name, open(Path(file_path).resolve(), "rb"))), + ("file", (Path(file_path).name, open(Path(file_path).resolve(), "rb"))), # noqa: SIM115 ] response = self.client.put( @@ -1748,7 +1684,7 @@ def import_portfolio_manager_property(self, seed_id: int, cycle_id: int, mapping url_args={"PK": seed_id}, files=files_params, cycle_id=cycle_id, - mapping_profile_id=mapping_profile_id + mapping_profile_id=mapping_profile_id, ) return response diff --git a/pyseed/seed_client_base.py b/pyseed/seed_client_base.py index 107f28d..1d898e2 100644 --- a/pyseed/seed_client_base.py +++ b/pyseed/seed_client_base.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python -# encoding: utf-8 """ copyright (c) 2016 Earth Advantage. All rights reserved. ..codeauthor::Paul Munday @@ -22,79 +20,76 @@ """ -# Imports from Third Party Modules import inspect + import requests -# Local Imports from pyseed.apibase import JSONAPI, OAuthMixin, UserAuthMixin, add_pk from pyseed.exceptions import SEEDError # Constants (Should end with a slash) URLS = { - 'v3': { - 'column_mapping_profiles': '/api/v3/column_mapping_profiles/', - 'column_mapping_profiles_filter': '/api/v3/column_mapping_profiles/filter/', - 'columns': '/api/v3/columns/', - 'cycles': '/api/v3/cycles/', - 'datasets': '/api/v3/datasets/', - 'gbr_properties': '/api/v3/gbr_properties/', - 'green_assessment': '/api/v3/green_assessments/', - 'green_assessment_property': '/api/v3/green_assessment_properties/', - 'green_assessment_url': '/api/v3/green_assessment_urls/', - 'import_files': '/api/v3/import_files/', - 'import_files_reuse_inventory_file_for_meters': '/api/v3/import_files/reuse_inventory_file_for_meters/', - 'labels': '/api/v3/labels/', - 'labels_property': '/api/v3/labels_property/', - 'labels_taxlot': '/api/v3/labels_taxlot/', - 'organizations': '/api/v3/organizations/', - 'portfolio_manager_report': '/api/v3/portfolio_manager/report/', - 'portfolio_manager_report_templates': '/api/v3/portfolio_manager/template_list/', - 'properties': '/api/v3/properties/', - 'properties_labels': '/api/v3/properties/labels/', - 'properties_search': '/api/v3/properties/search/', - 'property_views': '/api/v3/property_views/', - 'taxlots': '/api/v3/taxlots/', - 'upload': '/api/v3/upload/', - 'users': '/api/v3/users/', + "v3": { + "column_mapping_profiles": "/api/v3/column_mapping_profiles/", + "column_mapping_profiles_filter": "/api/v3/column_mapping_profiles/filter/", + "columns": "/api/v3/columns/", + "cycles": "/api/v3/cycles/", + "datasets": "/api/v3/datasets/", + "gbr_properties": "/api/v3/gbr_properties/", + "green_assessment": "/api/v3/green_assessments/", + "green_assessment_property": "/api/v3/green_assessment_properties/", + "green_assessment_url": "/api/v3/green_assessment_urls/", + "import_files": "/api/v3/import_files/", + "import_files_reuse_inventory_file_for_meters": "/api/v3/import_files/reuse_inventory_file_for_meters/", + "labels": "/api/v3/labels/", + "labels_property": "/api/v3/labels_property/", + "labels_taxlot": "/api/v3/labels_taxlot/", + "organizations": "/api/v3/organizations/", + "portfolio_manager_report": "/api/v3/portfolio_manager/report/", + "portfolio_manager_report_templates": "/api/v3/portfolio_manager/template_list/", + "properties": "/api/v3/properties/", + "properties_labels": "/api/v3/properties/labels/", + "properties_search": "/api/v3/properties/search/", + "property_views": "/api/v3/property_views/", + "taxlots": "/api/v3/taxlots/", + "upload": "/api/v3/upload/", + "users": "/api/v3/users/", # No versioning endpoints - 'version': '/api/version/', + "version": "/api/version/", # POSTs with replaceable keys - 'import_files_check_meters_tab_exists_pk': '/api/v3/import_files/PK/check_meters_tab_exists/', - 'import_files_start_map_data_pk': '/api/v3/import_files/PK/map/', - 'import_files_start_matching_pk': '/api/v3/import_files/PK/start_system_matching_and_geocoding/', - 'import_files_start_save_data_pk': '/api/v3/import_files/PK/start_save_data/', - 'org_column_mapping_import_file': 'api/v3/organizations/ORG_ID/column_mappings/', - 'portfolio_manager_property_download': '/api/v3/portfolio_manager/PK/download/', + "import_files_check_meters_tab_exists_pk": "/api/v3/import_files/PK/check_meters_tab_exists/", + "import_files_start_map_data_pk": "/api/v3/import_files/PK/map/", + "import_files_start_matching_pk": "/api/v3/import_files/PK/start_system_matching_and_geocoding/", + "import_files_start_save_data_pk": "/api/v3/import_files/PK/start_save_data/", + "org_column_mapping_import_file": "api/v3/organizations/ORG_ID/column_mappings/", + "portfolio_manager_property_download": "/api/v3/portfolio_manager/PK/download/", # PUTs with replaceable keys: - 'properties_update_with_buildingsync': 'api/v3/properties/PK/update_with_building_sync/', - 'properties_upload_inventory_document': 'api/v3/properties/PK/upload_inventory_document', - 'property_update_with_espm': 'api/v3/properties/PK/update_with_espm/', + "properties_update_with_buildingsync": "api/v3/properties/PK/update_with_building_sync/", + "properties_upload_inventory_document": "api/v3/properties/PK/upload_inventory_document", + "property_update_with_espm": "api/v3/properties/PK/update_with_espm/", # GETs with replaceable keys - 'analyses_views': '/api/v3/analyses/PK/views/ANALYSIS_VIEW_PK/', - 'audit_template_building_xml': '/api/v3/audit_template/PK/get_building_xml', - 'audit_template_submission': '/api/v3/audit_template/PK/get_submission', - 'import_files_matching_results': '/api/v3/import_files/PK/matching_and_geocoding_results/', - 'properties_cross_cycle_data': '/api/v3/properties/PK/links/', - 'progress': '/api/v3/progress/PROGRESS_KEY/', - 'properties_analyses': '/api/v3/properties/PK/analyses/', - 'properties_meter_usage': '/api/v3/properties/PK/meter_usage/', - 'properties_meters': '/api/v3/properties/PK/meters/', + "analyses_views": "/api/v3/analyses/PK/views/ANALYSIS_VIEW_PK/", + "audit_template_building_xml": "/api/v3/audit_template/PK/get_building_xml", + "audit_template_submission": "/api/v3/audit_template/PK/get_submission", + "import_files_matching_results": "/api/v3/import_files/PK/matching_and_geocoding_results/", + "properties_cross_cycle_data": "/api/v3/properties/PK/links/", + "progress": "/api/v3/progress/PROGRESS_KEY/", + "properties_analyses": "/api/v3/properties/PK/analyses/", + "properties_meter_usage": "/api/v3/properties/PK/meter_usage/", + "properties_meters": "/api/v3/properties/PK/meters/", # GET & POST with replaceable keys - 'properties_meters_reading': '/api/v3/properties/PK/meters/METER_PK/readings/', - } + "properties_meters_reading": "/api/v3/properties/PK/meters/METER_PK/readings/", + }, } # Private Classes and Functions def _get_urls(base_url, url_map=None, version=None): """Populate URL""" - version = version if version else 'v3' + version = version if version else "v3" if not url_map: url_map = URLS[version] - return { - key: '{}/{}'.format(base_url.rstrip('/'), val.lstrip('/')) for key, val in url_map.items() - } + return {key: "{}/{}".format(base_url.rstrip("/"), val.lstrip("/")) for key, val in url_map.items()} def _set_default(obj, key, val, required=True): @@ -107,7 +102,7 @@ def _set_default(obj, key, val, required=True): if not val: val = getattr(obj, key, None) if not val and required: - msg = '{} is not set'.format(key) + msg = f"{key} is not set" raise AttributeError(msg) return val @@ -169,29 +164,38 @@ class SEEDBaseClient(JSONAPI): # pylint:disable=too-few-public-methods,too-many-arguments # pylint:disable=too-many-instance-attributes - def __init__(self, org_id, username=None, password=None, access_token=None, - endpoint=None, data_name=None, use_ssl=None, base_url=None, - port=None, url_map=None, version=None, **kwargs): + def __init__( + self, + org_id, + username=None, + password=None, + access_token=None, + endpoint=None, + data_name=None, + use_ssl=None, + base_url=None, + port=None, + url_map=None, + version=None, + **kwargs, + ): use_ssl = use_ssl if use_ssl is not None else True - super(SEEDBaseClient, self).__init__( - username=username, password=password, use_ssl=use_ssl, - use_auth=True, access_token=access_token, **kwargs - ) + super().__init__(username=username, password=password, use_ssl=use_ssl, use_auth=True, access_token=access_token, **kwargs) self.org_id = org_id self.token = access_token # prevent overriding if set in subclass as class attr - if not getattr(self, 'endpoint', None): + if not getattr(self, "endpoint", None): self.endpoint = endpoint - if not getattr(self, 'data_name', None): + if not getattr(self, "data_name", None): self.data_name = data_name - if not getattr(self, 'base_url', None): - self.base_url = base_url if base_url else 'localhost' - if not getattr(self, 'port', None): + if not getattr(self, "base_url", None): + self.base_url = base_url if base_url else "localhost" + if not getattr(self, "port", None): self.port = port if port else None if self.port: - self.base_url = '{}:{}'.format(self.base_url, self.port) - if not self.base_url.endswith('/'): - self.base_url = self.base_url + '/' + self.base_url = f"{self.base_url}:{self.port}" + if not self.base_url.endswith("/"): + self.base_url = self.base_url + "/" self.username = username self.urls = _get_urls(self.base_url, url_map=url_map, version=version) self.endpoints = self.urls.keys() @@ -204,54 +208,56 @@ def _check_response(self, response, *args, **kwargs): be reported correctly. """ error = False - error_msg = 'Unknown error from SEED API' + error_msg = "Unknown error from SEED API" # grab the response content type to determine json, spreadsheet, or text - response_content_types = response.headers.get('Content-Type', []) + response_content_types = response.headers.get("Content-Type", []) # OK, Created, Accepted, No-Content if response.status_code not in [200, 201, 202, 204]: error = True - error_msg = 'SEED returned status code: {}'.format(response.status_code) + error_msg = f"SEED returned status code: {response.status_code}" # SEED adds a status key to the response to indicate success/error # This is superfluous as status codes should be used to indicate an # error, but they are not always set correctly. elif response.status_code == 204: # there will not be response content with a 204 error = False - elif 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' in response_content_types: + elif "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" in response_content_types: # spreadsheet response error = False - elif 'application/pdf' in response_content_types: + elif "application/pdf" in response_content_types: # PDF report response error = False - elif 'application/json' not in response_content_types: + elif "application/json" not in response_content_types: # get as text if not response.content: error = True elif isinstance(response.json(), dict): - status_field = response.json().get('status', None) - has_progress_key = 'progress_key' in response.json().keys() + status_field = response.json().get("status", None) + has_progress_key = "progress_key" in response.json() if status_field: if has_progress_key: # For the delete cycles, the data returned have a status and a progress_key, # but no progress_data. In lieu of updating SEED, this check is added # specifically for this case - error = status_field not in ['not-started', 'success', 'parsing'] - elif status_field == 'error': + error = status_field not in ["not-started", "success", "parsing"] + elif status_field == "error": error = True - elif status_field == 'success': + elif status_field == "success": # continue error = False - elif 'success' in response.json(): - success_flag = response.json().get('success', None) + elif "success" in response.json(): + success_flag = response.json().get("success", None) # For file uploads the response key is 'success' error = not success_flag - elif 'progress_data' in response.json(): + elif "progress_data" in response.json(): # this is a system matching response, which is okay. return the success flag of this - status_flag = response.json()['progress_data'].get('status', None) - error = status_flag not in ['not-started', 'success', 'parsing'] - elif not any(key in ['results', 'readings', 'data', 'status', 'id', 'organizations', 'sha', 'users'] for key in response.json().keys()): + status_flag = response.json()["progress_data"].get("status", None) + error = status_flag not in ["not-started", "success", "parsing"] + elif not any( + key in ["results", "readings", "data", "status", "id", "organizations", "sha", "users"] for key in response.json() + ): # In some cases there is not a 'status' field, so check if there are # any other keys in the response that depict a success: # readings - this comes from meters @@ -269,15 +275,13 @@ def _check_response(self, response, *args, **kwargs): if response.content: try: if getattr(response.json(), "get", None): - error_msg = response.json().get( - 'message', f"Unknown SEED Error {response.status_code}: {response.json()}" - ) + error_msg = response.json().get("message", f"Unknown SEED Error {response.status_code}: {response.json()}") else: error_msg = f"Unknown SEED Error {response.status_code}: {response.json()}" except ValueError: - error_msg = 'Unknown SEED Error: No response returned' + error_msg = "Unknown SEED Error: No response returned" if args: - kwargs['args'] = args + kwargs["args"] = args self._raise_error(response, error_msg, stack_pos=1, **kwargs) def _get_result(self, response, data_name=None, **kwargs): @@ -287,38 +291,30 @@ def _get_result(self, response, data_name=None, **kwargs): you want to get the entire response back.""" # grab the response content type to determine json, spreadsheet, or text - response_content_types = response.headers.get('Content-Type', []) + response_content_types = response.headers.get("Content-Type", []) # pass through for spreadsheet (?) - if 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' in response_content_types: + if "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" in response_content_types: return response.content - if 'application/json' not in response_content_types: - return {'status': 'success', 'content': response.content} + if "application/json" not in response_content_types: + return {"status": "success", "content": response.content} if not data_name: url = response.request.url # take the last part of the url unless it's a digit # in which case take the previous part - durl = url.lstrip(self.base_url).rstrip('/').rsplit('/', 1) - if durl[1].isdigit(): - data_name = durl[0].rsplit('/', 2)[1] - else: - data_name = durl[1] + durl = url.lstrip(self.base_url).rstrip("/").rsplit("/", 1) + data_name = durl[0].rsplit("/", 2)[1] if durl[1].isdigit() else durl[1] # actual results should be under data_name or the fallbacks # handle a 204 result = None - if response.status_code == 204: - result = {'status': 'success'} - else: - result = response.json() + result = {"status": "success"} if response.status_code == 204 else response.json() if result is None: - error_msg = 'No results returned' + error_msg = "No results returned" self._raise_error(response, error_msg, stack_pos=2, **kwargs) constrained_result = None - if data_name == 'all': - result = result - else: - for dname in [data_name, 'data', 'detail']: + if data_name != "all": + for dname in [data_name, "data", "detail"]: try: # allow a list to be valid (this is the case with labels) if isinstance(result, dict): @@ -330,8 +326,9 @@ def _get_result(self, response, data_name=None, **kwargs): pass if result is None: - error_msg = 'Could not find result using data_name {}.'.format(data_name) + error_msg = f"Could not find result using data_name {data_name}." self._raise_error(response, error_msg, stack_pos=2, **kwargs) + return result def _raise_error(self, response, error_msg, stack_pos=0, *args, **kwargs): @@ -361,19 +358,14 @@ def _raise_error(self, response, error_msg, stack_pos=0, *args, **kwargs): url = response.request.url verb = response.request.method # e.g., MyClass.method - caller = caller = '{}.{}'.format( - self.__class__.__name__, inspect.stack()[stack_pos + 1][3] - ) + caller = f"{self.__class__.__name__}.{inspect.stack()[stack_pos + 1][3]}" if args: - kwargs['args'] = args - raise SEEDError( - error_msg, url=url, caller=caller, verb=verb, - status_code=status_code, **kwargs - ) + kwargs["args"] = args + raise SEEDError(error_msg, url=url, caller=caller, verb=verb, status_code=status_code, **kwargs) def _set_params(self, params): """Add org_id""" - params['organization_id'] = self.org_id + params["organization_id"] = self.org_id return params @@ -381,7 +373,7 @@ def _set_params(self, params): # These should be used with SEEDClient as they rely on its methods -class CreateMixin(object): +class CreateMixin: """Add _post methods""" # pylint:disable=too-few-public-methods @@ -398,26 +390,26 @@ def post(self, endpoint=None, data_name=None, **kwargs): """ # for a post, if the user has sent some url args, then pop them for later # parsing. - url_args = kwargs.pop('url_args', None) + url_args = kwargs.pop("url_args", None) kwargs = self._set_params(kwargs) - endpoint = _set_default(self, 'endpoint', endpoint) - data_name = _set_default(self, 'data_name', data_name, required=False) + endpoint = _set_default(self, "endpoint", endpoint) + data_name = _set_default(self, "data_name", data_name, required=False) # check if the endpoint is to be looked up or is a fully qualified url - if '/' in endpoint: + if "/" in endpoint: url = endpoint elif endpoint in self.urls: url = self.urls[endpoint] else: - raise Exception(f'Unknown endpoint: {endpoint}') - if not url.endswith('/'): - url = url + '/' + raise Exception(f"Unknown endpoint: {endpoint}") + if not url.endswith("/"): + url = url + "/" url = _replace_url_args(url, url_args) - response = super(CreateMixin, self)._post(url=url, **kwargs) + response = super()._post(url=url, **kwargs) self._check_response(response, **kwargs) return self._get_result(response, data_name=data_name, **kwargs) -class ReadMixin(object): +class ReadMixin: """Add get & list method""" # pylint:disable=too-few-public-methods @@ -433,16 +425,16 @@ def get(self, pk, endpoint=None, data_name=None, **kwargs): :returns: dict (from response.json()[data_name]) """ - url_args = kwargs.pop('url_args', None) - org_id_qp = kwargs.pop('include_org_id_query_param', False) + url_args = kwargs.pop("url_args", None) + org_id_qp = kwargs.pop("include_org_id_query_param", False) kwargs = self._set_params(kwargs) - endpoint = _set_default(self, 'endpoint', endpoint) - data_name = _set_default(self, 'data_name', data_name, required=False) - url = add_pk(self.urls[endpoint], pk, required=kwargs.pop('required_pk', True), slash=True) + endpoint = _set_default(self, "endpoint", endpoint) + data_name = _set_default(self, "data_name", data_name, required=False) + url = add_pk(self.urls[endpoint], pk, required=kwargs.pop("required_pk", True), slash=True) url = _replace_url_args(url, url_args) if org_id_qp: url += f"?organization_id={self.org_id}" - response = super(ReadMixin, self)._get(url=url, **kwargs) + response = super()._get(url=url, **kwargs) self._check_response(response, **kwargs) return self._get_result(response, data_name=data_name, **kwargs) @@ -455,20 +447,20 @@ def list(self, endpoint=None, data_name=None, **kwargs): :returns: dict (from response.json()[data_name]) """ - url_args = kwargs.pop('url_args', None) + url_args = kwargs.pop("url_args", None) kwargs = self._set_params(kwargs) - endpoint = _set_default(self, 'endpoint', endpoint) - data_name = _set_default(self, 'data_name', data_name, required=False) + endpoint = _set_default(self, "endpoint", endpoint) + data_name = _set_default(self, "data_name", data_name, required=False) url = self.urls[endpoint] - if not url.endswith('/'): - url = url + '/' + if not url.endswith("/"): + url = url + "/" url = _replace_url_args(url, url_args) - response = super(ReadMixin, self)._get(url=url, **kwargs) + response = super()._get(url=url, **kwargs) self._check_response(response, **kwargs) return self._get_result(response, data_name=data_name, **kwargs) -class UpdateMixin(object): +class UpdateMixin: """Add _put & _patch methods""" # pylint:disable=too-few-public-methods,redefined-builtin @@ -483,14 +475,14 @@ def put(self, pk, endpoint=None, data_name=None, **kwargs): :returns: dict (from response.json()[data_name]) """ - url_args = kwargs.pop('url_args', None) + url_args = kwargs.pop("url_args", None) kwargs = self._set_params(kwargs) - endpoint = _set_default(self, 'endpoint', endpoint) - data_name = _set_default(self, 'data_name', data_name, required=False) - url = add_pk(self.urls[endpoint], pk, required=kwargs.pop('required_pk', True), slash=True) + endpoint = _set_default(self, "endpoint", endpoint) + data_name = _set_default(self, "data_name", data_name, required=False) + url = add_pk(self.urls[endpoint], pk, required=kwargs.pop("required_pk", True), slash=True) url = _replace_url_args(url, url_args) - response = super(UpdateMixin, self)._put(url=url, **kwargs) + response = super()._put(url=url, **kwargs) self._check_response(response, **kwargs) return self._get_result(response, data_name=data_name, **kwargs) @@ -504,18 +496,18 @@ def patch(self, pk, endpoint=None, data_name=None, **kwargs): :returns: dict (from response.json()[data_name]) """ - url_args = kwargs.pop('url_args', None) + url_args = kwargs.pop("url_args", None) kwargs = self._set_params(kwargs) - endpoint = _set_default(self, 'endpoint', endpoint) - data_name = _set_default(self, 'data_name', data_name, required=False) - url = add_pk(self.urls[endpoint], pk, required=kwargs.pop('required_pk', True), slash=True) + endpoint = _set_default(self, "endpoint", endpoint) + data_name = _set_default(self, "data_name", data_name, required=False) + url = add_pk(self.urls[endpoint], pk, required=kwargs.pop("required_pk", True), slash=True) url = _replace_url_args(url, url_args) - response = super(UpdateMixin, self)._patch(url=url, **kwargs) + response = super()._patch(url=url, **kwargs) self._check_response(response, **kwargs) return self._get_result(response, data_name=data_name, **kwargs) -class DeleteMixin(object): +class DeleteMixin: """Add _delete methods""" # pylint:disable=too-few-public-methods,redefined-builtin @@ -531,13 +523,13 @@ def delete(self, pk, endpoint=None, data_name=None, **kwargs): :returns: None """ # pylint:disable=no-member - url_args = kwargs.pop('url_args', None) + url_args = kwargs.pop("url_args", None) kwargs = self._set_params(kwargs) - endpoint = _set_default(self, 'endpoint', endpoint) - data_name = _set_default(self, 'data_name', data_name, required=False) - url = add_pk(self.urls[endpoint], pk, required=kwargs.pop('required_pk', True), slash=True) + endpoint = _set_default(self, "endpoint", endpoint) + data_name = _set_default(self, "data_name", data_name, required=False) + url = add_pk(self.urls[endpoint], pk, required=kwargs.pop("required_pk", True), slash=True) url = _replace_url_args(url, url_args) - response = super(DeleteMixin, self)._delete(url=url, **kwargs) + response = super()._delete(url=url, **kwargs) # delete should return 204 and no content, unless it is a background task if response.status_code != requests.codes.no_content: self._check_response(response, **kwargs) @@ -546,23 +538,19 @@ def delete(self, pk, endpoint=None, data_name=None, **kwargs): class SEEDReadOnlyClient(ReadMixin, UserAuthMixin, SEEDBaseClient): """Read Only Client""" - pass -class SEEDReadWriteClient(CreateMixin, ReadMixin, UpdateMixin, DeleteMixin, - UserAuthMixin, SEEDBaseClient): +class SEEDReadWriteClient(CreateMixin, ReadMixin, UpdateMixin, DeleteMixin, UserAuthMixin, SEEDBaseClient): """Client with full CRUD Methods""" + # pylint:disable=too-many-ancestors - pass class SEEDOAuthReadOnlyClient(ReadMixin, OAuthMixin, SEEDBaseClient): """Read Ony Client""" - pass -class SEEDOAuthReadWriteClient(CreateMixin, ReadMixin, UpdateMixin, - DeleteMixin, OAuthMixin, SEEDBaseClient): +class SEEDOAuthReadWriteClient(CreateMixin, ReadMixin, UpdateMixin, DeleteMixin, OAuthMixin, SEEDBaseClient): """Client with full CRUD Methods""" + # pylint:disable=too-many-ancestors - pass diff --git a/pyseed/utils.py b/pyseed/utils.py index ee09ef4..f045284 100644 --- a/pyseed/utils.py +++ b/pyseed/utils.py @@ -3,7 +3,6 @@ See also https://github.com/seed-platform/py-seed/main/LICENSE """ -# Imports from Third Party Modules import csv import json from math import pi, sin @@ -30,14 +29,14 @@ def _ring_area(coordinates): {float} The approximate signed geodesic total_area of the polygon in square meters. """ - - assert isinstance(coordinates, (list, tuple)) + if not isinstance(coordinates, (list, tuple)): + raise ValueError("coordinates must be a list or tuple") total_area = 0 coordinates_length = len(coordinates) if coordinates_length > 2: - for i in range(0, coordinates_length): + for i in range(coordinates_length): if i == (coordinates_length - 2): lower_index = coordinates_length - 2 middle_index = coordinates_length - 1 @@ -63,8 +62,8 @@ def _ring_area(coordinates): def _polygon_area(coordinates): - - assert isinstance(coordinates, (list, tuple)) + if not isinstance(coordinates, (list, tuple)): + raise ValueError("coordinates must be a list or tuple") total_area = 0 if len(coordinates) > 0: @@ -86,18 +85,19 @@ def geojson_area(geometry): if isinstance(geometry, str): geometry = json.loads(geometry) - assert isinstance(geometry, dict) + if not isinstance(geometry, dict): + raise ValueError("geometry must be a GeoJSON dict") total_area = 0 - if geometry['type'] == 'Polygon': - return _polygon_area(geometry['coordinates']) - elif geometry['type'] == 'MultiPolygon': - for i in range(0, len(geometry['coordinates'])): - total_area += _polygon_area(geometry['coordinates'][i]) - elif geometry['type'] == 'GeometryCollection': - for i in range(0, len(geometry['geometries'])): - total_area += geojson_area(geometry['geometries'][i]) + if geometry["type"] == "Polygon": + return _polygon_area(geometry["coordinates"]) + elif geometry["type"] == "MultiPolygon": + for i in range(len(geometry["coordinates"])): + total_area += _polygon_area(geometry["coordinates"][i]) + elif geometry["type"] == "GeometryCollection": + for i in range(len(geometry["geometries"])): + total_area += geojson_area(geometry["geometries"][i]) return total_area @@ -106,25 +106,30 @@ def read_map_file(mapfile_path): """Read in the mapping file""" mapfile_path = Path(mapfile_path) - assert mapfile_path.exists(), f"Cannot find file: {str(mapfile_path)}" - - map_reader = csv.reader(open(mapfile_path, 'r')) - map_reader.__next__() # Skip the header - - # Open the mapping file and fill list - maplist = list() - for rowitem in map_reader: - data = { - "from_field": rowitem[0], - "from_units": rowitem[1], - "to_table_name": rowitem[2], - "to_field": rowitem[3], - } - try: - data["is_omitted"] = True if rowitem[4].lower().strip() == "true" else False - except IndexError: - data["is_omitted"] = False - - maplist.append(data) + if not mapfile_path.exists(): + raise ValueError(f"Mapping file {mapfile_path} does not exist") + + with open(mapfile_path) as f: + map_reader = csv.reader(f) + map_reader.__next__() # Skip the header + + # Open the mapping file and fill list + maplist = [] + for rowitem in map_reader: + data = { + "from_field": rowitem[0], + "from_units": rowitem[1], + "to_table_name": rowitem[2], + "to_field": rowitem[3], + } + try: + if rowitem[4].lower().strip() == "true": + data["is_omitted"] = True + else: + False + except IndexError: + data["is_omitted"] = False + + maplist.append(data) return maplist diff --git a/requirements-test.txt b/requirements-test.txt index 5d19027..b0dd64d 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,11 +1,10 @@ -r requirements.txt -flake8==7.0.0 mock==5.1.0 -mypy==1.10.0 -pre-commit==3.7.1 +mypy==1.11.2 +pre-commit==3.8.0 pytest==8.2.2 pytest-cov==5.0.0 pytest-order==1.2.1 pytest-xdist==3.6.1 testfixtures>=8.3.0 -tox==4.15.1 +tox==4.21.0 diff --git a/requirements.txt b/requirements.txt index 931bce5..abeb26e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,2 @@ openpyxl==3.1.2 requests>=2.28.0 -typing==3.7.4.3 diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 0000000..2a3260e --- /dev/null +++ b/ruff.toml @@ -0,0 +1,63 @@ +fix = true +line-length = 140 + +[format] +# preview = true +docstring-code-format = true + +# https://docs.astral.sh/ruff/linter/#rule-selection +[lint] +# preview = true +# Enable these rules +extend-select = [ + "A", # flake8-builtins + "ARG", # flake8-unused-arguments + "BLE", # flake8-blind-except + "C4", # flake8-comprehensions + "COM", # flake8-commas + # "DTZ", # flake8-datetimez + "E", # Error + "EXE", # flake8-executable + "F", # Pyflakes + "I", # isort + "ICN", # flake8-import-conventions + "ISC", # flake8-implicit-str-concat + "N", # pep8-naming + "PD", # pandas-vet + "PGH", # pygrep-hooks + "PIE", # flake8-pie + "PLC", # Pylint Convention + "PLE", # Pylint Error + "PLR", # Pylint Refactor + "PLW", # Pylint Warning + "PT", # flake8-pytest-style + "Q", # flake8-quotes + "RUF", # Ruff-specific rules + "S", # flake8-bandit + "SIM", # flake8-simplify + "T10", # flake8-debugger + "TID", # flake8-tidy-imports + "UP", # pyupgrade +] +# except for these specific errors +ignore = [ + "E501", # line-too-long + "PLR0913", # too-many-arguments + "PLR2004", # magic-value-comparison +] + +[lint.per-file-ignores] +"**/__init__.py" = [ + "F401", # unused-import +] +"tests/test_*" = [ + "S101", # assert + "S105", # hardcoded-password-string + "S106", # hardcoded-password-func-arg + "S307", # suspicious-eval-usage +] + +[lint.pylint] +# Raise the allowed limits the least possible amount https://docs.astral.sh/ruff/settings/#pylint-max-branches +max-statements = 58 +max-branches = 24 diff --git a/setup.cfg b/setup.cfg index c77b0b7..7f3a7ec 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,29 +1,32 @@ [metadata] -name=py-seed -version=0.5.0 -description=A Python API client for the SEED Platform -author=Nicholas Long, Katherine Fleming, Fable Turas, Paul Munday -author_email=nicholas.long@nrel.gov, fable@raintechpdx.com, paul@paulmunday.net -maintainer=NREL -maintainer_email=nicholas.long@nrel.gov -keywords= seed, api -url=https://github.com/seed-platform/py-seed +name = py-seed +version = 0.5.0 +description = A Python API client for the SEED Platform +author = Nicholas Long, Katherine Fleming, Fable Turas, Paul Munday +author_email = nicholas.long@nrel.gov, katherine.fleming@nrel.gov, fable@raintechpdx.com, paul@paulmunday.net +maintainer = NREL +maintainer_email = nicholas.long@nrel.gov +keywords = seed, api +url = https://github.com/SEED-platform/py-seed classifiers = - Development Status :: 4 - Beta - Intended Audience :: Developers - Operating System :: OS Independent - Programming Language :: Python :: 3.9 - Programming Language :: Python :: 3.10 - Programming Language :: Python :: 3.11 - Programming Language :: Python :: 3.12 + Development Status :: 4 - Beta + Intended Audience :: Developers + Operating System :: OS Independent + Programming Language :: Python :: 3.9 + Programming Language :: Python :: 3.10 + Programming Language :: Python :: 3.11 + Programming Language :: Python :: 3.12 + +[project_urls] +Source Code = https://github.com/SEED-platform/py-seed +Changelog = https://github.com/SEED-platform/py-seed/blob/develop/CHANGELOG.rst [options] packages = find: include_package_data = True zip_safe = False install_requires = - requests>=2.28.0 - typing==3.6.1 + requests>=2.28.0 [bdist_wheel] universal = 1 diff --git a/setup.py b/setup.py index 0114110..0dcc775 100644 --- a/setup.py +++ b/setup.py @@ -1,12 +1,8 @@ -# Imports from Third Party Modules from pathlib import Path + from setuptools import setup this_directory = Path(__file__).parent long_description = (this_directory / "README.rst").read_text() -setup( - name='py-SEED', - long_description=long_description, - long_description_content_type='text/x-rst' -) +setup(name="py-SEED", long_description=long_description, long_description_content_type="text/x-rst") diff --git a/tests/test_apibase.py b/tests/test_apibase.py old mode 100755 new mode 100644 index 8a58b5d..e003996 --- a/tests/test_apibase.py +++ b/tests/test_apibase.py @@ -1,177 +1,137 @@ -#!/usr/bin/env python -# encoding: utf-8 """ copyright (c) 2016-2016 Earth Advantage. All rights reserved. ..codeauthor::Paul Munday Unit tests for pyseed/apibase """ -# Imports from Third Party Modules -import sys + import unittest +from unittest import mock + +import pytest -# Local Imports from pyseed.apibase import JSONAPI, BaseAPI, add_pk from pyseed.exceptions import APIClientError from pyseed.seed_client_base import _get_urls, _set_default -NO_URL_ERROR = "APIClientError: No url set" -SSL_ERROR = "APIClientError: use_ssl is true but url does not starts with https" -SSL_ERROR2 = "APIClientError: use_ssl is false but url starts with https" +NO_URL_ERROR = "No url set" +SSL_ERROR = "use_ssl is true but url does not starts with https" +SSL_ERROR2 = "use_ssl is false but url starts with https" # Constants -SERVICES_DICT = { - 'urls': {'test1': 'test1', 'test2': '/test2'} -} - - -PY3 = sys.version_info[0] == 3 -if PY3: - # Imports from Third Party Modules - from unittest import mock -else: - # Imports from Third Party Modules - import mock +SERVICES_DICT = {"urls": {"test1": "test1", "test2": "/test2"}} -class MockConfig(object): +class MockConfig: """Mock config object""" + # pylint:disable=too-few-public-methods, no-self-use def __init__(self, conf): self.conf = conf def get(self, var, section=None, default=None): - if section: # pragma: no cover - cdict = self.conf.get(section, {}) - else: - cdict = self.conf + cdict = self.conf.get(section, {}) if section else self.conf # pragma: no cover return cdict.get(var, default) SERVICES = MockConfig(SERVICES_DICT) -@mock.patch('pyseed.apibase.requests') +@mock.patch("pyseed.apibase.requests") class APITests(unittest.TestCase): """Tests for API base classes""" + # pylint: disable=protected-access, no-self-use, unused-argument def setUp(self): - self.url = 'example.org' + self.url = "example.org" self.api = JSONAPI(self.url) def test_ssl_verification(self, mock_requests): """Test ssl usage""" # ensure error is raised if http is supplied and use_ssl is true - with self.assertRaises(APIClientError) as conm: - JSONAPI('http://example.org') - exception = conm.exception - expected = SSL_ERROR - self.assertEqual(expected, str(exception)) + with pytest.raises(APIClientError) as conm: + JSONAPI("http://example.org") + assert conm.value.error == SSL_ERROR # ensure error is raised if https is supplied and use_ssl is false - with self.assertRaises(APIClientError) as conm: - JSONAPI('https://example.org', use_ssl=False) - exception = conm.exception - expected = SSL_ERROR2 - self.assertEqual(expected, str(exception)) + with pytest.raises(APIClientError) as conm: + JSONAPI("https://example.org", use_ssl=False) + assert conm.value.error == SSL_ERROR2 # test defaults to https - api = JSONAPI('example.org') + api = JSONAPI("example.org") api._get() - mock_requests.get.assert_called_with( - 'https://example.org', timeout=None, headers=None - ) + mock_requests.get.assert_called_with("https://example.org", timeout=None, headers=None) # use_ssl is False - api = JSONAPI('example.org', use_ssl=False) + api = JSONAPI("example.org", use_ssl=False) api._get() - mock_requests.get.assert_called_with( - 'http://example.org', timeout=None, headers=None - ) + mock_requests.get.assert_called_with("http://example.org", timeout=None, headers=None) def test_get(self, mock_requests): """Test _get method.""" - self.api._get(id=1, foo='bar') - mock_requests.get.assert_called_with( - 'https://example.org', params={'id': 1, 'foo': 'bar'}, - timeout=None, headers=None - ) + self.api._get(id=1, foo="bar") + mock_requests.get.assert_called_with("https://example.org", params={"id": 1, "foo": "bar"}, timeout=None, headers=None) def test_post(self, mock_requests): """Test _get_post.""" - params = {'id': 1} - files = {'file': 'mock_file'} - data = {'foo': 'bar', 'test': 'test'} - self.api._post(params=params, files=files, foo='bar', test='test') - mock_requests.post.assert_called_with( - 'https://example.org', params=params, files=files, - json=data, timeout=None, headers=None - ) + params = {"id": 1} + files = {"file": "mock_file"} + data = {"foo": "bar", "test": "test"} + self.api._post(params=params, files=files, foo="bar", test="test") + mock_requests.post.assert_called_with("https://example.org", params=params, files=files, json=data, timeout=None, headers=None) # Not json - api = BaseAPI('example.org') - api._post(params=params, files=files, foo='bar', test='test') - mock_requests.post.assert_called_with( - 'https://example.org', params=params, files=files, - data=data, timeout=None, headers=None - ) + api = BaseAPI("example.org") + api._post(params=params, files=files, foo="bar", test="test") + mock_requests.post.assert_called_with("https://example.org", params=params, files=files, data=data, timeout=None, headers=None) def test_patch(self, mock_requests): """Test _get_patch.""" - params = {'id': 1} - files = {'file': 'mock_file'} - data = {'foo': 'bar', 'test': 'test'} - self.api._patch(params=params, files=files, foo='bar', test='test') - mock_requests.patch.assert_called_with( - 'https://example.org', params=params, files=files, - json=data, timeout=None, headers=None - ) + params = {"id": 1} + files = {"file": "mock_file"} + data = {"foo": "bar", "test": "test"} + self.api._patch(params=params, files=files, foo="bar", test="test") + mock_requests.patch.assert_called_with("https://example.org", params=params, files=files, json=data, timeout=None, headers=None) # Not json - api = BaseAPI('example.org') - api._patch(params=params, files=files, foo='bar', test='test') - mock_requests.patch.assert_called_with( - 'https://example.org', params=params, files=files, - data=data, timeout=None, headers=None - ) + api = BaseAPI("example.org") + api._patch(params=params, files=files, foo="bar", test="test") + mock_requests.patch.assert_called_with("https://example.org", params=params, files=files, data=data, timeout=None, headers=None) def test_delete(self, mock_requests): """Test _delete method.""" - self.api._delete(id=1, foo='bar') - mock_requests.delete.assert_called_with( - 'https://example.org', params={'id': 1, 'foo': 'bar'}, - timeout=None, headers=None - ) + self.api._delete(id=1, foo="bar") + mock_requests.delete.assert_called_with("https://example.org", params={"id": 1, "foo": "bar"}, timeout=None, headers=None) def test_construct_payload(self, mock_requests): """Test construct_payload method.""" - with self.assertRaises(APIClientError): - api = BaseAPI('example.org', compulsory_params=['id']) - api._get(foo='bar') + api = BaseAPI("example.org", compulsory_params=["id"]) + with pytest.raises(APIClientError): + api._get(foo="bar") api._get(id=1) - self.assertTrue(mock_requests.get.called) + assert mock_requests.get.called url = self.url class TestAPI(BaseAPI): """test class""" + # pylint: disable=too-few-public-methods def __init__(self): - self.compulsory_params = ['id', 'comp'] - super(TestAPI, self).__init__(url) + self.compulsory_params = ["id", "comp"] + super().__init__(url) self.comp = 1 - with self.assertRaises(APIClientError) as conm: - api = TestAPI() + + api = TestAPI() + with pytest.raises(APIClientError) as conm: api._get() - exception = conm.exception - self.assertEqual( - 'APIClientError: id is a compulsory field', str(exception) - ) + assert conm.value.error == "id is a compulsory field" api._get(id=1) - self.assertTrue(mock_requests.get.called) + assert mock_requests.get.called def test_check_call_success(self, mock_requests): """Test check_call_success method.""" @@ -180,70 +140,59 @@ def test_check_call_success(self, mock_requests): mock_requests.get.return_value = mock_response mock_requests.codes.ok = 200 response = self.api._get(id=1) - self.assertTrue(self.api.check_call_success(response)) + assert self.api.check_call_success(response) - def test_construct_url(self, mock_requests): + def test_construct_url(self, mock_requests): # noqa: ARG002 """Test _construct_url method.""" api = BaseAPI(use_ssl=False) # ensure error is raised if no url is supplied - with self.assertRaises(APIClientError) as conm: + with pytest.raises(APIClientError) as conm: api._construct_url(None) - exception = conm.exception - expected = NO_URL_ERROR - self.assertEqual(expected, str(exception)) + assert conm.value.error == NO_URL_ERROR # ensure error is raised if https is supplied and use_ssl is false - with self.assertRaises(APIClientError) as conm: - api._construct_url('https://www.example.org', use_ssl=False) - exception = conm.exception - expected = SSL_ERROR2 - self.assertEqual(expected, str(exception)) + with pytest.raises(APIClientError) as conm: + api._construct_url("https://www.example.org", use_ssl=False) + assert conm.value.error == SSL_ERROR2 - def test_construct_url_ssl_explicit(self, mock_requests): + def test_construct_url_ssl_explicit(self, mock_requests): # noqa: ARG002 """Test _construct_url method.""" api = BaseAPI(use_ssl=True) # ensure error is raised if http is supplied and use_ssl is true - with self.assertRaises(APIClientError) as conm: - api._construct_url('http://example.org', use_ssl=True) - exception = conm.exception - expected = SSL_ERROR - self.assertEqual(expected, str(exception)) + with pytest.raises(APIClientError) as conm: + api._construct_url("http://example.org", use_ssl=True) + assert conm.value.error == SSL_ERROR # ensure error is raised if http is supplied and use_ssl is default - with self.assertRaises(APIClientError) as conm: - api._construct_url('http://example.org') - exception = conm.exception - expected = SSL_ERROR - self.assertEqual(expected, str(exception)) + with pytest.raises(APIClientError) as conm: + api._construct_url("http://example.org") + assert conm.value.error == SSL_ERROR - def test_construct_url_ssl_implicit(self, mock_requests): + def test_construct_url_ssl_implicit(self, mock_requests): # noqa: ARG002 """Test _construct_url method.""" api = BaseAPI() # ensure error is raised if http is supplied and use_ssl is true - with self.assertRaises(APIClientError) as conm: - api._construct_url('http://example.org', use_ssl=True) - exception = conm.exception - expected = SSL_ERROR - self.assertEqual(expected, str(exception)) + with pytest.raises(APIClientError) as conm: + api._construct_url("http://example.org", use_ssl=True) + assert conm.value.error == SSL_ERROR # ensure error is raised if http is supplied and use_ssl is default - with self.assertRaises(APIClientError) as conm: - api._construct_url('http://example.org') - exception = conm.exception - expected = SSL_ERROR - self.assertEqual(expected, str(exception)) + with pytest.raises(APIClientError) as conm: + api._construct_url("http://example.org") + assert conm.value.error == SSL_ERROR -@mock.patch('pyseed.apibase.requests') +@mock.patch("pyseed.apibase.requests") class APITestsNoURL(unittest.TestCase): """Tests for API base classes with no self.url set""" + # pylint: disable=protected-access, no-self-use, unused-argument def setUp(self): - self.url = 'example.org' + self.url = "example.org" self.api = JSONAPI() def test_no_url(self, mock_requests): @@ -251,279 +200,195 @@ def test_no_url(self, mock_requests): def test_get(self, mock_requests): """Test _get method.""" - self.api._get(self.url, id=1, foo='bar') - mock_requests.get.assert_called_with( - 'https://example.org', params={'id': 1, 'foo': 'bar'}, - timeout=None, headers=None - ) + self.api._get(self.url, id=1, foo="bar") + mock_requests.get.assert_called_with("https://example.org", params={"id": 1, "foo": "bar"}, timeout=None, headers=None) # ensure error is raised if https is supplied and use_ssl is false - with self.assertRaises(APIClientError) as conm: - api = BaseAPI('example.org', use_ssl=False) - api._get(url='https://www.example.org', use_ssl=False) - exception = conm.exception - expected = SSL_ERROR2 - self.assertEqual(expected, str(exception)) + api = BaseAPI("example.org", use_ssl=False) + with pytest.raises(APIClientError) as conm: + api._get(url="https://www.example.org", use_ssl=False) + assert conm.value.error == SSL_ERROR2 # ensure error is raised if http is supplied and use_ssl is true - with self.assertRaises(APIClientError) as conm: - self.api._get(url='http://example.org') - exception = conm.exception - expected = SSL_ERROR - self.assertEqual(expected, str(exception)) + with pytest.raises(APIClientError) as conm: + self.api._get(url="http://example.org") + assert conm.value.error == SSL_ERROR # ensure error is raised if no url is supplied - with self.assertRaises(APIClientError) as conm: + with pytest.raises(APIClientError) as conm: self.api._get() - exception = conm.exception - expected = NO_URL_ERROR - self.assertEqual(expected, str(exception)) + assert conm.value.error == NO_URL_ERROR # test defaults to http self.api._get(url=self.url) - mock_requests.get.assert_called_with( - 'https://example.org', timeout=None, headers=None - ) + mock_requests.get.assert_called_with("https://example.org", timeout=None, headers=None) # use_ssl is False - api = BaseAPI('example.org', use_ssl=False) + api = BaseAPI("example.org", use_ssl=False) api._get(url=self.url, use_ssl=False) - mock_requests.get.assert_called_with( - 'http://example.org', timeout=None, headers=None - ) + mock_requests.get.assert_called_with("http://example.org", timeout=None, headers=None) def test_post(self, mock_requests): """Test _get_post.""" - params = {'id': 1} - files = {'file': 'mock_file'} - data = {'foo': 'bar', 'test': 'test'} - self.api._post( - url=self.url, params=params, files=files, foo='bar', test='test' - ) - mock_requests.post.assert_called_with( - 'https://example.org', params=params, files=files, - json=data, timeout=None, headers=None - ) + params = {"id": 1} + files = {"file": "mock_file"} + data = {"foo": "bar", "test": "test"} + self.api._post(url=self.url, params=params, files=files, foo="bar", test="test") + mock_requests.post.assert_called_with("https://example.org", params=params, files=files, json=data, timeout=None, headers=None) # Not json api = BaseAPI() - api._post(url=self.url, params=params, files=files, - foo='bar', test='test') - mock_requests.post.assert_called_with( - 'https://example.org', params=params, files=files, - data=data, timeout=None, headers=None - ) + api._post(url=self.url, params=params, files=files, foo="bar", test="test") + mock_requests.post.assert_called_with("https://example.org", params=params, files=files, data=data, timeout=None, headers=None) # ensure error is raised if no url is supplied - with self.assertRaises(APIClientError) as conm: - self.api._post( - params=params, files=files, foo='bar', test='test' - ) - exception = conm.exception - expected = NO_URL_ERROR - self.assertEqual(expected, str(exception)) + with pytest.raises(APIClientError) as conm: + self.api._post(params=params, files=files, foo="bar", test="test") + assert conm.value.error == NO_URL_ERROR # ensure error is raised if https is supplied and use_ssl is false - with self.assertRaises(APIClientError) as conm: - api = BaseAPI('example.org', use_ssl=False) - api._post( - url='https://example.org', use_ssl=False, - params=params, files=files, foo='bar', test='test' - ) - exception = conm.exception - expected = SSL_ERROR2 - self.assertEqual(expected, str(exception)) + api = BaseAPI("example.org", use_ssl=False) + with pytest.raises(APIClientError) as conm: + api._post(url="https://example.org", use_ssl=False, params=params, files=files, foo="bar", test="test") + assert conm.value.error == SSL_ERROR2 # ensure error is raised if http is supplied and use_ssl is true - with self.assertRaises(APIClientError) as conm: - self.api._post( - url='http://example.org', use_ssl=True, - params=params, files=files, foo='bar', test='test' - ) - exception = conm.exception - expected = SSL_ERROR - self.assertEqual(expected, str(exception)) + with pytest.raises(APIClientError) as conm: + self.api._post(url="http://example.org", use_ssl=True, params=params, files=files, foo="bar", test="test") + assert conm.value.error == SSL_ERROR def test_patch(self, mock_requests): """Test _get_patch.""" - params = {'id': 1} - files = {'file': 'mock_file'} - data = {'foo': 'bar', 'test': 'test'} - self.api._patch( - url=self.url, params=params, files=files, foo='bar', test='test' - ) - mock_requests.patch.assert_called_with( - 'https://example.org', params=params, files=files, - json=data, timeout=None, headers=None - ) + params = {"id": 1} + files = {"file": "mock_file"} + data = {"foo": "bar", "test": "test"} + self.api._patch(url=self.url, params=params, files=files, foo="bar", test="test") + mock_requests.patch.assert_called_with("https://example.org", params=params, files=files, json=data, timeout=None, headers=None) # Not json - api = BaseAPI('example.org') - api._patch( - url=self.url, params=params, files=files, foo='bar', test='test' - ) - mock_requests.patch.assert_called_with( - 'https://example.org', params=params, files=files, - data=data, timeout=None, headers=None - ) + api = BaseAPI("example.org") + api._patch(url=self.url, params=params, files=files, foo="bar", test="test") + mock_requests.patch.assert_called_with("https://example.org", params=params, files=files, data=data, timeout=None, headers=None) # ensure error is raised if no url is supplied - with self.assertRaises(APIClientError) as conm: - self.api._patch( - params=params, files=files, foo='bar', test='test' - ) - exception = conm.exception - expected = NO_URL_ERROR - self.assertEqual(expected, str(exception)) + with pytest.raises(APIClientError) as conm: + self.api._patch(params=params, files=files, foo="bar", test="test") + assert conm.value.error == NO_URL_ERROR # ensure error is raised if https is supplied and use_ssl is false - with self.assertRaises(APIClientError) as conm: - api = BaseAPI('example.org', use_ssl=False) - api._patch( - url='https://example.org', use_ssl=False, - params=params, files=files, foo='bar', test='test' - ) - exception = conm.exception - expected = SSL_ERROR2 - self.assertEqual(expected, str(exception)) + api = BaseAPI("example.org", use_ssl=False) + with pytest.raises(APIClientError) as conm: + api._patch(url="https://example.org", use_ssl=False, params=params, files=files, foo="bar", test="test") + assert conm.value.error == SSL_ERROR2 # ensure error is raised if http is supplied and use_ssl is true - with self.assertRaises(APIClientError) as conm: - self.api._patch( - url='http://example.org', use_ssl=True, - params=params, files=files, foo='bar', test='test' - ) - exception = conm.exception - expected = SSL_ERROR - self.assertEqual(expected, str(exception)) + with pytest.raises(APIClientError) as conm: + self.api._patch(url="http://example.org", use_ssl=True, params=params, files=files, foo="bar", test="test") + assert conm.value.error == SSL_ERROR def test_delete(self, mock_requests): """Test _delete method.""" - self.api._delete(url=self.url, id=1, foo='bar') - mock_requests.delete.assert_called_with( - 'https://example.org', params={'id': 1, 'foo': 'bar'}, - timeout=None, headers=None - ) + self.api._delete(url=self.url, id=1, foo="bar") + mock_requests.delete.assert_called_with("https://example.org", params={"id": 1, "foo": "bar"}, timeout=None, headers=None) # ensure error is raised if https is supplied and use_ssl is false - with self.assertRaises(APIClientError) as conm: - api = BaseAPI('example.org', use_ssl=False) - api._delete(url='https://www.example.org') - exception = conm.exception - expected = SSL_ERROR2 - self.assertEqual(expected, str(exception)) + api = BaseAPI("example.org", use_ssl=False) + with pytest.raises(APIClientError) as conm: + api._delete(url="https://www.example.org") + assert conm.value.error == SSL_ERROR2 # ensure error is raised if http is supplied and use_ssl is true - with self.assertRaises(APIClientError) as conm: - self.api._delete(url='http://example.org') - exception = conm.exception - expected = SSL_ERROR - self.assertEqual(expected, str(exception)) + with pytest.raises(APIClientError) as conm: + self.api._delete(url="http://example.org") + assert conm.value.error == SSL_ERROR # ensure error is raised if no url is supplied - with self.assertRaises(APIClientError) as conm: + with pytest.raises(APIClientError) as conm: self.api._delete() - exception = conm.exception - expected = NO_URL_ERROR - self.assertEqual(expected, str(exception)) + assert conm.value.error == NO_URL_ERROR # test defaults to http self.api._delete(url=self.url) - mock_requests.delete.assert_called_with( - 'https://example.org', timeout=None, headers=None - ) + mock_requests.delete.assert_called_with("https://example.org", timeout=None, headers=None) # use_ssl is False - api = BaseAPI('example.org', use_ssl=False) + api = BaseAPI("example.org", use_ssl=False) api._delete(url=self.url, use_ssl=False) - mock_requests.delete.assert_called_with( - 'http://example.org', timeout=None, headers=None - ) + mock_requests.delete.assert_called_with("http://example.org", timeout=None, headers=None) class APIFunctionTest(unittest.TestCase): - def testadd_pk(self): """Test add_pk helper function.""" # Error checks - with self.assertRaises(APIClientError) as conm: - add_pk('url', None) - self.assertEqual( - 'APIClientError: id/pk must be supplied', str(conm.exception) - ) - - with self.assertRaises(TypeError) as conm: - add_pk('url', 'a') - self.assertEqual( - 'id/pk must be a positive integer', str(conm.exception) - ) - - with self.assertRaises(TypeError) as conm: - add_pk('url', 1.2) - self.assertEqual( - 'id/pk must be a positive integer', str(conm.exception) - ) - - with self.assertRaises(TypeError) as conm: - add_pk('url', -1) - self.assertEqual( - 'id/pk must be a positive integer', str(conm.exception) - ) + with pytest.raises(APIClientError) as conm: + add_pk("url", None) + assert conm.value.error == "id/pk must be supplied" + + with pytest.raises(TypeError) as conm: + add_pk("url", "a") + assert conm.value.args[0] == "id/pk must be a positive integer" + + with pytest.raises(TypeError) as conm: + add_pk("url", 1.2) + assert conm.value.args[0] == "id/pk must be a positive integer" + + with pytest.raises(TypeError) as conm: + add_pk("url", -1) + assert conm.value.args[0] == "id/pk must be a positive integer" # adds ints - result = add_pk('url', 1) - self.assertEqual('url/1', result) + result = add_pk("url", 1) + assert result == "url/1" # converts strings if digit - result = add_pk('url', '1') - self.assertEqual('url/1', result) + result = add_pk("url", "1") + assert result == "url/1" # id not required - result = add_pk('url', None, required=False) - self.assertEqual('url', result) + result = add_pk("url", None, required=False) + assert result == "url" # adds_slash - result = add_pk('url', 1, slash=True) - self.assertEqual('url/1/', result) + result = add_pk("url", 1, slash=True) + assert result == "url/1/" # does not repeat / - result = add_pk('url/', 1) - self.assertEqual('url/1', result) + result = add_pk("url/", 1) + assert result == "url/1" def test_set_default(self): """Test _set_default helper method""" obj = mock.MagicMock() - obj.key = 'val' + obj.key = "val" # make sure nokey is not set on mock del obj.nokey # raises error if attribute not set and val is none - with self.assertRaises(AttributeError) as conm: - _set_default(obj, 'nokey', None) - self.assertEqual('nokey is not set', str(conm.exception)) + with pytest.raises(AttributeError) as conm: + _set_default(obj, "nokey", None) + assert conm.value.args[0] == "nokey is not set" - result = _set_default(obj, 'key', None) - self.assertNotEqual(result, None) - self.assertEqual(result, 'val') + result = _set_default(obj, "key", None) + assert result is not None + assert result == "val" # returns obj.key if not value is supplied - result = _set_default(obj, 'key', None) - self.assertNotEqual(result, None) - self.assertEqual(result, 'val') + result = _set_default(obj, "key", None) + assert result is not None + assert result == "val" # return value if supplied - result = _set_default(obj, 'key', 'other') - self.assertNotEqual(result, None) - self.assertEqual(result, 'other') + result = _set_default(obj, "key", "other") + assert result is not None + assert result == "other" # Return None if val and attr not set and required = False - result = _set_default(obj, 'nokey', None, required=False) - self.assertEqual(result, None) + result = _set_default(obj, "nokey", None, required=False) + assert result is None def test_get_urls(self): """test _get_urls correctly formats urls""" - expected = {'test1': 'base_url/test1', 'test2': 'base_url/test2'} - result = _get_urls( - 'base_url/', {'test1': 'test1', 'test2': 'test2'} - ) - self.assertDictEqual(expected, result) + expected = {"test1": "base_url/test1", "test2": "base_url/test2"} + result = _get_urls("base_url/", {"test1": "test1", "test2": "test2"}) + assert expected == result diff --git a/tests/test_seed_base.py b/tests/test_seed_base.py index 80c8d75..d254183 100644 --- a/tests/test_seed_base.py +++ b/tests/test_seed_base.py @@ -3,14 +3,13 @@ See also https://github.com/seed-platform/py-seed/main/LICENSE """ -# Imports from Third Party Modules -import pytest import unittest import uuid from datetime import date from pathlib import Path -# Local Imports +import pytest + from pyseed.seed_client import SeedClient @@ -24,7 +23,7 @@ def setup_class(cls): # The seed-config.json file needs to be added to the project root directory # If running SEED locally for testing, then you can run the following from your SEED root directory: # ./manage.py create_test_user_json --username user@seed-platform.org --file ../py-seed/seed-config.json --pyseed - config_file = Path('seed-config.json') + config_file = Path("seed-config.json") cls.seed_client = SeedClient(cls.organization_id, connection_config_filepath=config_file) cls.organization_id = 1 @@ -45,18 +44,14 @@ def test_get_create_delete_cycle(self): # create a new unique cycle unique_id = str(uuid.uuid4())[:8] - cycle = self.seed_client.get_or_create_cycle( - f'test cycle {unique_id}', date(2021, 1, 1), date(2022, 1, 1) - ) - assert cycle['name'] == f'test cycle {unique_id}' - cycle_id = cycle['id'] + cycle = self.seed_client.get_or_create_cycle(f"test cycle {unique_id}", date(2021, 1, 1), date(2022, 1, 1)) + assert cycle["name"] == f"test cycle {unique_id}" + cycle_id = cycle["id"] all_cycles = self.seed_client.get_cycles() assert len(all_cycles) == cycle_count + 1 # verify that it won't be created again - cycle = self.seed_client.get_or_create_cycle( - f'test cycle {unique_id}', date(2021, 1, 1), date(2022, 1, 1) - ) - assert cycle_id == cycle['id'] + cycle = self.seed_client.get_or_create_cycle(f"test cycle {unique_id}", date(2021, 1, 1), date(2022, 1, 1)) + assert cycle_id == cycle["id"] all_cycles = self.seed_client.get_cycles() assert len(all_cycles) == cycle_count + 1 @@ -67,13 +62,13 @@ def test_get_create_delete_cycle(self): assert len(all_cycles) == cycle_count def test_create_cycle(self): - new_cycle_name = 'test cycle for test_create_cycle' + new_cycle_name = "test cycle for test_create_cycle" cycle = self.seed_client.create_cycle(new_cycle_name, date(2021, 6, 1), date(2022, 6, 1)) - cycle_id = cycle['id'] + cycle_id = cycle["id"] assert cycle is not None # verify that trying to create the same name will fail - with pytest.raises(Exception) as exc_info: + with pytest.raises(Exception) as exc_info: # noqa: PT011 self.seed_client.create_cycle(new_cycle_name, date(2021, 6, 1), date(2022, 6, 1)) assert exc_info.value.args[0] == f"A cycle with this name already exists: '{new_cycle_name}'" @@ -85,127 +80,119 @@ def test_create_cycle(self): self.seed_client.delete_cycle(cycle_id) def test_get_cycle_by_name(self): - cycle = self.seed_client.create_cycle('test cycle for test_get_cycle_by_name', date(2021, 6, 1), date(2022, 6, 1)) - cycle_id = cycle['id'] + cycle = self.seed_client.create_cycle("test cycle for test_get_cycle_by_name", date(2021, 6, 1), date(2022, 6, 1)) + cycle_id = cycle["id"] assert cycle is not None - cycle = self.seed_client.get_cycle_by_name('test cycle for test_get_cycle_by_name', set_cycle_id=True) + cycle = self.seed_client.get_cycle_by_name("test cycle for test_get_cycle_by_name", set_cycle_id=True) assert cycle is not None - assert cycle['name'] == 'test cycle for test_get_cycle_by_name' + assert cycle["name"] == "test cycle for test_get_cycle_by_name" assert self.seed_client.cycle_id == cycle_id # cleanup self.seed_client.delete_cycle(cycle_id) def test_get_or_create_dataset(self): - dataset_name = 'seed-salesforce-test-data' + dataset_name = "seed-salesforce-test-data" dataset = self.seed_client.get_or_create_dataset(dataset_name) - assert dataset['name'] == dataset_name - assert dataset['super_organization'] == self.seed_client.client.org_id + assert dataset["name"] == dataset_name + assert dataset["super_organization"] == self.seed_client.client.org_id assert dataset is not None def test_get_columns(self): result = self.seed_client.get_columns() - assert result['status'] == 'success' - assert len(result['columns']) >= 1 + assert result["status"] == "success" + assert len(result["columns"]) >= 1 def test_create_column(self): result = self.seed_client.create_extra_data_column( - column_name='test_col', - display_name='A Test Column', + column_name="test_col", + display_name="A Test Column", inventory_type="Property", column_description="this is a test column", - data_type="string") - assert result['status'] == 'success' - assert 'id' in result['column'] + data_type="string", + ) + assert result["status"] == "success" + assert "id" in result["column"] def test_create_columns_from_file(self): - cols_filepath = 'tests/data/test-seed-create-columns.csv' + cols_filepath = "tests/data/test-seed-create-columns.csv" result = self.seed_client.create_extra_data_columns_from_file(cols_filepath) assert len(result) - assert result[0]['status'] + assert result[0]["status"] def test_get_column_mapping_profiles(self): result = self.seed_client.get_column_mapping_profiles() assert len(result) >= 1 # There should only be one default BuildingSync mapping profile - result = self.seed_client.get_column_mapping_profiles('BuildingSync Default') + result = self.seed_client.get_column_mapping_profiles("BuildingSync Default") assert len(result) == 1 def test_get_column_mapping_profile(self): - result = self.seed_client.get_column_mapping_profile('does not exist') + result = self.seed_client.get_column_mapping_profile("does not exist") assert result is None # There should always be a portfolio manager default unless the # user removed it. - result = self.seed_client.get_column_mapping_profile('Portfolio Manager Defaults') + result = self.seed_client.get_column_mapping_profile("Portfolio Manager Defaults") assert isinstance(result, dict) - assert len(result['mappings']) > 0 + assert len(result["mappings"]) > 0 def test_create_column_mapping_profile_with_file(self): - profile_name = 'new profile' - result = self.seed_client.create_or_update_column_mapping_profile_from_file( - profile_name, - 'tests/data/test-seed-data-mappings.csv' - ) + profile_name = "new profile" + result = self.seed_client.create_or_update_column_mapping_profile_from_file(profile_name, "tests/data/test-seed-data-mappings.csv") assert result is not None - assert len(result['mappings']) == 14 + assert len(result["mappings"]) == 14 # delete some of the mappings and update - mappings = result['mappings'] + mappings = result["mappings"] for index in range(5, 0, -1): mappings.pop(index) - result = self.seed_client.create_or_update_column_mapping_profile( - profile_name, - mappings - ) - assert len(result['mappings']) == 9 + result = self.seed_client.create_or_update_column_mapping_profile(profile_name, mappings) + assert len(result["mappings"]) == 9 # restore with the original call - result = self.seed_client.create_or_update_column_mapping_profile_from_file( - profile_name, - 'tests/data/test-seed-data-mappings.csv' - ) - assert len(result['mappings']) == 14 + result = self.seed_client.create_or_update_column_mapping_profile_from_file(profile_name, "tests/data/test-seed-data-mappings.csv") + assert len(result["mappings"]) == 14 def test_get_labels(self): result = self.seed_client.get_labels() assert len(result) > 10 # find a set of two labels - result = self.seed_client.get_labels(filter_by_name=['Compliant', 'Violation']) + result = self.seed_client.get_labels(filter_by_name=["Compliant", "Violation"]) assert len(result) == 2 # find single field - result = self.seed_client.get_labels(filter_by_name=['Call']) + result = self.seed_client.get_labels(filter_by_name=["Call"]) assert len(result) == 1 - assert result[0]['name'] == 'Call' - assert not result[0]['show_in_list'] + assert result[0]["name"] == "Call" + assert not result[0]["show_in_list"] # find nothing field - result = self.seed_client.get_labels(filter_by_name=['Does not Exist']) + result = self.seed_client.get_labels(filter_by_name=["Does not Exist"]) assert len(result) == 0 def test_get_or_create_label(self): - label_name = 'something borrowed' - label = self.seed_client.get_or_create_label(label_name, 'green', show_in_list=True) - label_id = label['id'] + label_name = "something borrowed" + label = self.seed_client.get_or_create_label(label_name, "green", show_in_list=True) + label_id = label["id"] assert label is not None - assert label['name'] == label_name + assert label["name"] == label_name # try running it again and make sure it doesn't create a new label (ID should be the same0) label = self.seed_client.get_or_create_label(label_name) - assert label_id == label['id'] + assert label_id == label["id"] # now update the color - label = self.seed_client.update_label(label_name, new_color='blue') - assert label['color'] == 'blue' + label = self.seed_client.update_label(label_name, new_color="blue") + assert label["color"] == "blue" # now update the name and show in list = False - new_label_name = 'something blue' + new_label_name = "something blue" label = self.seed_client.update_label(label_name, new_label_name=new_label_name, new_show_in_list=False) - assert label['name'] == new_label_name + assert label["name"] == new_label_name # cleanup by deleting label label = self.seed_client.delete_label(new_label_name) diff --git a/tests/test_seed_client.py b/tests/test_seed_client.py index 4a31f78..b76bc7c 100644 --- a/tests/test_seed_client.py +++ b/tests/test_seed_client.py @@ -3,14 +3,13 @@ See also https://github.com/seed-platform/py-seed/main/LICENSE """ -# Imports from Third Party Modules import os -import pytest import unittest from datetime import date from pathlib import Path -# Local Imports +import pytest + from pyseed.seed_client import SeedClient # For CI the test org is 1, but for local testing it may be different @@ -32,15 +31,11 @@ def setup_class(cls): # If running SEED locally for testing, then you can run the following from your SEED root directory: # ./manage.py create_test_user_json --username user@seed-platform.org --file ../py-seed/seed-config.json --pyseed config_file = Path("seed-config.json") - cls.seed_client = SeedClient( - cls.organization_id, connection_config_filepath=config_file - ) + cls.seed_client = SeedClient(cls.organization_id, connection_config_filepath=config_file) # Get/create the new cycle and upload the data. Make sure to set the cycle ID so that the # data end up in the correct cycle - cls.seed_client.get_or_create_cycle( - "pyseed-api-test", date(2021, 6, 1), date(2022, 6, 1), set_cycle_id=True - ) + cls.seed_client.get_or_create_cycle("pyseed-api-test", date(2021, 6, 1), date(2022, 6, 1), set_cycle_id=True) cls.seed_client.upload_and_match_datafile( "pyseed-properties-test", @@ -60,7 +55,7 @@ def test_seed_orgs(self): def test_seed_client_info(self): info = self.seed_client.instance_information() - assert set(("version", "sha")).issubset(info.keys()) + assert set(("version", "sha")).issubset(info.keys()) # noqa: C405 def test_create_organization(self): # create a new organization. This test requires that the @@ -69,20 +64,20 @@ def test_create_organization(self): assert org["organization"]["id"] is not None # try to create again and it should raise an error - with self.assertRaises(Exception) as excpt: + with pytest.raises(Exception) as excpt: # noqa: PT011 self.seed_client.create_organization("NEW ORG") - assert "already exists" in str(excpt.exception) + assert "already exists" in excpt.value.args[0] def test_seed_buildings(self): # set cycle before retrieving (just in case) - self.seed_client.get_cycle_by_name('pyseed-api-test', set_cycle_id=True) + self.seed_client.get_cycle_by_name("pyseed-api-test", set_cycle_id=True) buildings = self.seed_client.get_buildings() # ESPM test creates a building now too, assert building count is 10 or 11? assert len(buildings) == 10 def test_get_pm_report_template_names(self): - pm_un = os.environ.get('SEED_PM_UN', False) - pm_pw = os.environ.get('SEED_PM_PW', False) + pm_un = os.environ.get("SEED_PM_UN", False) + pm_pw = os.environ.get("SEED_PM_PW", False) if not pm_un or not pm_pw: self.fail(f"Somehow PM test was initiated without {pm_un} or {pm_pw} in the environment") response = self.seed_client.get_pm_report_template_names(pm_un, pm_pw) @@ -100,7 +95,7 @@ def test_get_pm_report_template_names(self): def test_search_buildings(self): # set cycle - self.seed_client.get_cycle_by_name('pyseed-api-test', set_cycle_id=True) + self.seed_client.get_cycle_by_name("pyseed-api-test", set_cycle_id=True) properties = self.seed_client.search_buildings(identifier_exact="B-1") assert len(properties) == 1 @@ -122,7 +117,7 @@ def test_create_update_building(self): # create a new building (property, propertyState, propertyView) # Update the building completion_date = "02/02/2023" - year = '2023' + year = "2023" cycle = self.seed_client.get_or_create_cycle( "pyseed-api-integration-test", date(int(year), 1, 1), @@ -155,13 +150,10 @@ def test_create_update_building(self): "generation_date": None, "recent_sale_date": None, "release_date": None, - "extra_data": { - "pathway": "new", - "completion_date": completion_date - } + "extra_data": {"pathway": "new", "completion_date": completion_date}, } - params = {'state': state, 'cycle_id': cycle["id"]} + params = {"state": state, "cycle_id": cycle["id"]} result = self.seed_client.create_building(params=params) assert result["status"] == "success" @@ -169,12 +161,12 @@ def test_create_update_building(self): view_id = result["view"]["id"] # update that property (by ID) - state['property_name'] = 'New Name Building' + state["property_name"] = "New Name Building" - properties = self.seed_client.search_buildings(identifier_exact=state['custom_id_1']) + properties = self.seed_client.search_buildings(identifier_exact=state["custom_id_1"]) assert len(properties) == 1 - params2 = {'state': state} + params2 = {"state": state} result2 = self.seed_client.update_building(view_id, params=params2) assert result2["status"] == "success" @@ -186,9 +178,7 @@ def test_add_label_to_buildings(self): assert len(properties) == 1 prop_ids.append(properties[0]["id"]) - result = self.seed_client.update_labels_of_buildings( - ["Violation"], [], prop_ids - ) + result = self.seed_client.update_labels_of_buildings(["Violation"], [], prop_ids) assert result["status"] == "success" assert result["num_updated"] == 3 # verify that the 3 buildings have the Violation label @@ -196,9 +186,7 @@ def test_add_label_to_buildings(self): assert all(item in properties[0]["is_applied"] for item in prop_ids) # now remove the violation label and add compliant - result = self.seed_client.update_labels_of_buildings( - ["Compliant"], ["Violation"], prop_ids - ) + result = self.seed_client.update_labels_of_buildings(["Compliant"], ["Violation"], prop_ids) assert result["status"] == "success" assert result["num_updated"] == 3 properties = self.seed_client.get_view_ids_with_label(label_names=["Violation"]) @@ -209,15 +197,11 @@ def test_add_label_to_buildings(self): assert all(item in properties[0]["is_applied"] for item in prop_ids) # now remove all - result = self.seed_client.update_labels_of_buildings( - [], ["Violation", "Compliant"], prop_ids - ) + result = self.seed_client.update_labels_of_buildings([], ["Violation", "Compliant"], prop_ids) assert result["status"] == "success" assert result["num_updated"] == 3 # no labels on the properties - properties = self.seed_client.get_view_ids_with_label( - label_names=["Compliant", "Violation"] - ) + properties = self.seed_client.get_view_ids_with_label(label_names=["Compliant", "Violation"]) assert not all(item in properties[0]["is_applied"] for item in prop_ids) assert not all(item in properties[1]["is_applied"] for item in prop_ids) @@ -234,9 +218,7 @@ def test_upload_datafile(self): # Need to get the dataset id, again. Maybe need to clean up eventually. dataset = self.seed_client.get_or_create_dataset("pyseed-uploader-test-data") - result = self.seed_client.upload_datafile( - dataset["id"], "tests/data/test-seed-data.xlsx", "Assessed Raw" - ) + result = self.seed_client.upload_datafile(dataset["id"], "tests/data/test-seed-data.xlsx", "Assessed Raw") import_file_id = result["import_file_id"] assert result["success"] is True assert import_file_id is not None @@ -254,15 +236,11 @@ def test_upload_datafile(self): assert result["progress"] == 100 # create/retrieve the column mappings - result = self.seed_client.create_or_update_column_mapping_profile_from_file( - "new profile", "tests/data/test-seed-data-mappings.csv" - ) + result = self.seed_client.create_or_update_column_mapping_profile_from_file("new profile", "tests/data/test-seed-data-mappings.csv") assert len(result["mappings"]) > 0 # set the column mappings for the dataset - result = self.seed_client.set_import_file_column_mappings( - import_file_id, result["mappings"] - ) + result = self.seed_client.set_import_file_column_mappings(import_file_id, result["mappings"]) # now start the mapping result = self.seed_client.start_map_data(import_file_id) @@ -347,7 +325,7 @@ def test_upload_single_method_with_meters(self): meters = self.seed_client.get_meters(building[0]["id"]) assert len(meters) == 4 # elec, elec cost, gas, gas cost meter_data = self.seed_client.get_meter_data(building[0]["id"]) - assert len(meter_data['readings']) == 24 + assert len(meter_data["readings"]) == 24 def test_download_espm_property(self): # For testing, read in the ESPM username and password from @@ -358,30 +336,26 @@ def test_download_espm_property(self): save_file.unlink() self.seed_client.retrieve_portfolio_manager_property( - username=os.environ.get('SEED_PM_UN'), - password=os.environ.get('SEED_PM_PW'), + username=os.environ.get("SEED_PM_UN"), + password=os.environ.get("SEED_PM_PW"), pm_property_id=22178850, - save_file_name=save_file + save_file_name=save_file, ) - self.assertTrue(save_file.exists()) + assert save_file.exists() # redownload and show an error - with self.assertRaises(Exception) as excpt: + with pytest.raises(Exception) as excpt: # noqa: PT011 self.seed_client.retrieve_portfolio_manager_property( - username=os.environ.get('SEED_PM_UN'), - password=os.environ.get('SEED_PM_PW'), + username=os.environ.get("SEED_PM_UN"), + password=os.environ.get("SEED_PM_PW"), pm_property_id=22178850, - save_file_name=save_file + save_file_name=save_file, ) - self.assertEqual( - str(excpt.exception), - f'Save filename already exists, save to a new file name: {str(save_file)}' - ) + assert excpt.value.args[0] == f"Save filename already exists, save to a new file name: {save_file!s}" def test_upload_espm_property_to_seed(self): - file = Path("tests/data/portfolio-manager-single-22482007.xlsx") # need a building @@ -389,15 +363,20 @@ def test_upload_espm_property_to_seed(self): building = None if buildings: building = buildings[0] - self.assertTrue(building) + assert building # need a column mapping profile mapping_file = Path("tests/data/test-seed-data-mappings.csv") - mapping_profile = self.seed_client.create_or_update_column_mapping_profile_from_file('ESPM Test', mapping_file) - self.assertTrue('id' in mapping_profile) - - response = self.seed_client.import_portfolio_manager_property(building['id'], self.seed_client.cycle_id, mapping_profile['id'], file) - self.assertTrue(response['status'] == 'success') + mapping_profile = self.seed_client.create_or_update_column_mapping_profile_from_file("ESPM Test", mapping_file) + assert "id" in mapping_profile + + response = self.seed_client.import_portfolio_manager_property( + building["id"], + self.seed_client.cycle_id, + mapping_profile["id"], + file, + ) + assert response["status"] == "success" # def test_retrieve_at_building_and_update(self): # # NOTE: commenting this out as we cannot set the AT credentials in SEED from py-seed @@ -433,9 +412,7 @@ def setup_class(cls): # If running SEED locally for testing, then you can run the following from your SEED root directory: # ./manage.py create_test_user_json --username user@seed-platform.org --file ../py-seed/seed-config.json --pyseed config_file = Path("seed-config.json") - cls.seed_client = SeedClient( - cls.organization_id, connection_config_filepath=config_file - ) + cls.seed_client = SeedClient(cls.organization_id, connection_config_filepath=config_file) @classmethod def teardown_class(cls): @@ -464,7 +441,7 @@ def test_upload_multiple_cycles_and_read_back(self): "Single Step Column Mappings", "tests/data/test-seed-data-mappings.csv", import_meters_if_exist=False, - multiple_cycle_upload=True + multiple_cycle_upload=True, ) assert result is not None @@ -479,6 +456,6 @@ def test_upload_multiple_cycles_and_read_back(self): assert len(building_cycles) == 3 # check that the site_euis are correct - assert building_cycles[0]['site_eui'] == 95 - assert building_cycles[1]['site_eui'] == 181 - assert building_cycles[2]['site_eui'] == 129 + assert building_cycles[0]["site_eui"] == 95 + assert building_cycles[1]["site_eui"] == 181 + assert building_cycles[2]["site_eui"] == 129 diff --git a/tests/test_seed_client_base.py b/tests/test_seed_client_base.py index 276bcf3..cae2d62 100644 --- a/tests/test_seed_client_base.py +++ b/tests/test_seed_client_base.py @@ -3,13 +3,13 @@ See also https://github.com/seed-platform/py-seed/main/LICENSE """ -# Imports from Third Party Modules import json -import requests import unittest from unittest import mock -# Local Imports +import pytest +import requests + from pyseed.exceptions import SEEDError from pyseed.seed_client_base import ( ReadMixin, @@ -20,37 +20,30 @@ # Constants URLS = { - 'test1': 'api/v3/test', - 'test2': 'api/v3/test2', - 'test3': 'api/v3/test3', + "test1": "api/v3/test", + "test2": "api/v3/test2", + "test3": "api/v3/test3", } -CONFIG_DICT = { - 'port': 1337, - 'urls_key': 'urls', - 'base_url': 'example.org' -} +CONFIG_DICT = {"port": 1337, "urls_key": "urls", "base_url": "example.org"} SERVICES_DICT = { - 'seed': { - 'urls': URLS, - - } + "seed": { + "urls": URLS, + }, } -class MockConfig(object): +class MockConfig: """Mock config object""" + # pylint:disable=too-few-public-methods, no-self-use def __init__(self, conf): self.conf = conf def get(self, var, section=None, default=None): - if section: - cdict = self.conf.get(section, {}) - else: - cdict = self.conf + cdict = self.conf.get(section, {}) if section else self.conf return cdict.get(var, default) @@ -61,46 +54,48 @@ def get(self, var, section=None, default=None): # Helper Functions & Classes class MySeedClient(ReadMixin, SEEDBaseClient): # pylint:disable=too-few-public-methods - endpoint = 'test1' - + endpoint = "test1" -class MockOAuthClient(object): +class MockOAuthClient: def __init__(self, sig, username, client_id): pass def get_access_token(self): - return 'dfghjk' - - -def get_mock_response(data=None, data_name='data', error=False, - status_code=200, method='get', - base_url=CONFIG_DICT['base_url'], - endpoint='test1', extra=None, https=False, - content=True): + return "dfghjk" + + +def get_mock_response( + data=None, + data_name="data", + error=False, + status_code=200, + method="get", + base_url=CONFIG_DICT["base_url"], + endpoint="test1", + extra=None, + https=False, + content=True, +): """Create mock response in the style of SEED""" # pylint:disable=too-many-arguments - status = 'error' if error else 'success' + status = "error" if error else "success" mock_request = mock.MagicMock() - url = "{}://{}/{}/".format( - 'https' if https else 'http', - base_url, - URLS[endpoint] - ) - if extra: # pragma: no cover + url = "{}://{}/{}/".format("https" if https else "http", base_url, URLS[endpoint]) + if extra: # pragma: no cover url = url + extra mock_request.url = url mock_request.method = method mock_response = mock.MagicMock() mock_response.status_code = status_code mock_response.request = mock_request - mock_response.headers = {'Content-Type': 'application/json'} + mock_response.headers = {"Content-Type": "application/json"} # SEED old style if content: if error: - data_name = 'message' - content_dict = {'status': status, data_name: data} + data_name = "message" + content_dict = {"status": status, data_name: data} mock_response.content = json.dumps(content_dict) mock_response.json.return_value = content_dict else: @@ -109,7 +104,7 @@ def get_mock_response(data=None, data_name='data', error=False, # Tests -@mock.patch('pyseed.apibase.requests') +@mock.patch("pyseed.apibase.requests") class SEEDClientErrorHandlingTests(unittest.TestCase): """ The error handling uses the inspect module to examine the stack @@ -122,11 +117,15 @@ class SEEDClientErrorHandlingTests(unittest.TestCase): def setUp(self): self.port = 1137 self.urls_map = URLS - self.base_url = 'example.org' + self.base_url = "example.org" print(self.urls_map) self.client = MySeedClient( - 1, username='test@example.org', access_token='dfghj', - base_url=self.base_url, port=self.port, url_map=self.urls_map + 1, + username="test@example.org", + access_token="dfghj", + base_url=self.base_url, + port=self.port, + url_map=self.urls_map, ) def test_check_response_inheritance(self, mock_requests): @@ -139,218 +138,207 @@ def test_check_response_inheritance(self, mock_requests): Error called in _check_response(), this also tests that method as well as _raise_error(). """ - url = 'http://example.org/api/v3/test/' + url = "http://example.org/api/v3/test/" # Old SEED Style 200 (sic) with error message - mock_requests.get.return_value = get_mock_response( - data="No llama!", error=True - ) - with self.assertRaises(SEEDError) as conm: + mock_requests.get.return_value = get_mock_response(data="No llama!", error=True) + with pytest.raises(SEEDError) as conm: self.client.get(1) - self.assertEqual(conm.exception.error, 'No llama!') - self.assertEqual(conm.exception.service, 'SEED') - self.assertEqual(conm.exception.url, url) - self.assertEqual(conm.exception.caller, 'MySeedClient.get') - self.assertEqual(conm.exception.verb.upper(), 'GET') - self.assertEqual(conm.exception.status_code, 200) + assert conm.value.error == "No llama!" + assert conm.value.service == "SEED" + assert conm.value.url == url + assert conm.value.caller == "MySeedClient.get" + assert conm.value.verb.upper() == "GET" + assert conm.value.status_code == 200 # newer/correct using status codes (no message) - mock_requests.get.return_value = get_mock_response( - status_code=404, data="No llama!", error=True, content=False - ) - with self.assertRaises(SEEDError) as conm: + mock_requests.get.return_value = get_mock_response(status_code=404, data="No llama!", error=True, content=False) + with pytest.raises(SEEDError) as conm: self.client.get(1) - self.assertEqual( - conm.exception.error, 'SEED returned status code: 404' - ) - self.assertEqual(conm.exception.service, 'SEED') - self.assertEqual(conm.exception.url, url) - self.assertEqual(conm.exception.caller, 'MySeedClient.get') - self.assertEqual(conm.exception.verb.upper(), 'GET') - self.assertEqual(conm.exception.status_code, 404) + assert conm.value.error == "SEED returned status code: 404" + assert conm.value.service == "SEED" + assert conm.value.url == url + assert conm.value.caller == "MySeedClient.get" + assert conm.value.verb.upper() == "GET" + assert conm.value.status_code == 404 # newer/correct using status codes (with message) - mock_requests.get.return_value = get_mock_response( - status_code=404, data="No llama!", error=True, content=True - ) - with self.assertRaises(SEEDError) as conm: + mock_requests.get.return_value = get_mock_response(status_code=404, data="No llama!", error=True, content=True) + with pytest.raises(SEEDError) as conm: self.client.get(1) - self.assertEqual( - conm.exception.error, 'No llama!' - ) - self.assertEqual(conm.exception.service, 'SEED') - self.assertEqual(conm.exception.url, url) - self.assertEqual(conm.exception.caller, 'MySeedClient.get') - self.assertEqual(conm.exception.verb.upper(), 'GET') - self.assertEqual(conm.exception.status_code, 404) + assert conm.value.error == "No llama!" + assert conm.value.service == "SEED" + assert conm.value.url == url + assert conm.value.caller == "MySeedClient.get" + assert conm.value.verb.upper() == "GET" + assert conm.value.status_code == 404 class SEEDClientMethodTests(unittest.TestCase): - def setUp(self): self.port = 1137 self.urls_map = URLS - self.base_url = 'example.org' + self.base_url = "example.org" self.client = MySeedClient( - 1, username='test@example.org', access_token='dfghj', - base_url=self.base_url, port=self.port, url_map=self.urls_map + 1, + username="test@example.org", + access_token="dfghj", + base_url=self.base_url, + port=self.port, + url_map=self.urls_map, ) def test_init(self): """Test init sets params correctly""" - urls = { - key: "{}:{}/{}".format( - self.base_url, self.port, val - ) for key, val in URLS.items() - } - self.assertTrue(self.client.use_ssl) - self.assertTrue(self.client.use_json) - self.assertEqual(1, self.client.org_id) - self.assertEqual( - "{}:{}/".format( - self.base_url, self.port - ), - self.client.base_url - ) - self.assertEqual('test1', self.client.endpoint) - self.assertEqual(None, self.client.data_name) - self.assertEqual(urls, self.client.urls) - self.assertEqual(URLS.keys(), self.client.endpoints) - self.assertEqual('test1', self.client.endpoint) + urls = {key: f"{self.base_url}:{self.port}/{val}" for key, val in URLS.items()} + assert self.client.use_ssl + assert self.client.use_json + assert self.client.org_id == 1 + assert f"{self.base_url}:{self.port}/" == self.client.base_url + assert self.client.endpoint == "test1" + assert None is self.client.data_name + assert urls == self.client.urls + assert URLS.keys() == self.client.endpoints + assert self.client.endpoint == "test1" def test_get_result(self): """Test _get_result method.""" - response = get_mock_response(data='test') + response = get_mock_response(data="test") result = self.client._get_result(response) - self.assertEqual('test', result) + assert result == "test" -@mock.patch('pyseed.apibase.requests') +@mock.patch("pyseed.apibase.requests") class MixinTests(unittest.TestCase): """Test Mixins via SEEDOAuthReadWriteClient""" def setUp(self): self.port = 1337 self.urls_map = URLS - self.base_url = 'example.org' + self.base_url = "example.org" self.client = SEEDOAuthReadWriteClient( - 1, username='test@example.org', - access_token='dfghjk', base_url=self.base_url, - port=self.port, url_map=self.urls_map, oauth_client=MockOAuthClient + 1, + username="test@example.org", + access_token="dfghjk", + base_url=self.base_url, + port=self.port, + url_map=self.urls_map, + oauth_client=MockOAuthClient, ) self.call_dict = { - 'headers': {'Authorization': 'Bearer dfghjk'}, - 'params': { - 'organization_id': 1, + "headers": {"Authorization": "Bearer dfghjk"}, + "params": { + "organization_id": 1, }, - 'timeout': None + "timeout": None, } def test_delete(self, mock_requests): # pylint:disable=no-member - url = 'https://example.org:1337/api/v3/test/1/' - mock_requests.delete.return_value = get_mock_response( - status_code=requests.codes.no_content - ) - result = self.client.delete(1, endpoint='test1') - self.assertEqual(None, result) + url = "https://example.org:1337/api/v3/test/1/" + mock_requests.delete.return_value = get_mock_response(status_code=requests.codes.no_content) + result = self.client.delete(1, endpoint="test1") + assert None is result mock_requests.delete.assert_called_with(url, **self.call_dict) def test_get(self, mock_requests): - url = 'https://example.org:1337/api/v3/test/1/' + url = "https://example.org:1337/api/v3/test/1/" mock_requests.get.return_value = get_mock_response(data="Llama!") - result = self.client.get(1, endpoint='test1') - self.assertEqual('Llama!', result) + result = self.client.get(1, endpoint="test1") + assert result == "Llama!" mock_requests.get.assert_called_with(url, **self.call_dict) def test_list(self, mock_requests): - url = 'https://example.org:1337/api/v3/test/' + url = "https://example.org:1337/api/v3/test/" mock_requests.get.return_value = get_mock_response(data=["Llama!"]) - result = self.client.list(endpoint='test1') - self.assertEqual(['Llama!'], result) + result = self.client.list(endpoint="test1") + assert result == ["Llama!"] mock_requests.get.assert_called_with(url, **self.call_dict) def test_patch(self, mock_requests): - url = 'https://example.org:1337/api/v3/test/1/' + url = "https://example.org:1337/api/v3/test/1/" mock_requests.patch.return_value = get_mock_response(data="Llama!") - result = self.client.patch(1, endpoint='test1', foo='bar', json={'more': 'data'}) - self.assertEqual('Llama!', result) + result = self.client.patch(1, endpoint="test1", foo="bar", json={"more": "data"}) + assert result == "Llama!" expected = { - 'headers': {'Authorization': 'Bearer dfghjk'}, - 'params': { - 'organization_id': 1, - 'foo': 'bar', + "headers": {"Authorization": "Bearer dfghjk"}, + "params": { + "organization_id": 1, + "foo": "bar", }, - 'json': {'more': 'data'}, - 'timeout': None + "json": {"more": "data"}, + "timeout": None, } mock_requests.patch.assert_called_with(url, **expected) def test_put(self, mock_requests): - url = 'https://example.org:1337/api/v3/test/1/' + url = "https://example.org:1337/api/v3/test/1/" mock_requests.put.return_value = get_mock_response(data="Llama!") - result = self.client.put(1, endpoint='test1', foo='bar', json={'more': 'data'}) - self.assertEqual('Llama!', result) + result = self.client.put(1, endpoint="test1", foo="bar", json={"more": "data"}) + assert result == "Llama!" expected = { - 'headers': {'Authorization': 'Bearer dfghjk'}, - 'params': { - 'organization_id': 1, - 'foo': 'bar', + "headers": {"Authorization": "Bearer dfghjk"}, + "params": { + "organization_id": 1, + "foo": "bar", }, - 'json': {'more': 'data'}, - 'timeout': None + "json": {"more": "data"}, + "timeout": None, } mock_requests.put.assert_called_with(url, **expected) def test_post(self, mock_requests): - url = 'https://example.org:1337/api/v3/test/' + url = "https://example.org:1337/api/v3/test/" mock_requests.post.return_value = get_mock_response(data="Llama!") - result = self.client.post(endpoint='test1', json={'foo': 'bar', 'not_org': 1}) - self.assertEqual('Llama!', result) + result = self.client.post(endpoint="test1", json={"foo": "bar", "not_org": 1}) + assert result == "Llama!" expected = { - 'headers': {'Authorization': 'Bearer dfghjk'}, - 'params': { - 'organization_id': 1, + "headers": {"Authorization": "Bearer dfghjk"}, + "params": { + "organization_id": 1, }, - 'json': {'not_org': 1, 'foo': 'bar'}, - 'timeout': None + "json": {"not_org": 1, "foo": "bar"}, + "timeout": None, } mock_requests.post.assert_called_with(url, **expected) -@mock.patch('pyseed.apibase.requests') +@mock.patch("pyseed.apibase.requests") class SEEDReadWriteClientTests(unittest.TestCase): """Test SEEDReadWriteClient""" def setUp(self): self.port = 1337 self.urls_map = URLS - self.base_url = 'example.org' + self.base_url = "example.org" self.client = SEEDReadWriteClient( - 1, username='test@example.org', - api_key='dfghjk', base_url=self.base_url, - port=self.port, url_map=self.urls_map + 1, + username="test@example.org", + api_key="dfghjk", + base_url=self.base_url, + port=self.port, + url_map=self.urls_map, ) self.call_dict = { - 'headers': {'Authorization': 'Basic dfghjk'}, - 'params': { - 'organization_id': 1, + "headers": {"Authorization": "Basic dfghjk"}, + "params": { + "organization_id": 1, }, - 'timeout': None + "timeout": None, } def test_get(self, mock_requests): # url = 'https://example.org:1337/api/v3/test/1/' mock_requests.get.return_value = get_mock_response(data="Llama!") - result = self.client.get(1, endpoint='test1') - self.assertEqual('Llama!', result) + result = self.client.get(1, endpoint="test1") + assert result == "Llama!" def test_list(self, mock_requests): # url = 'https://example.org:1337/api/v3/test/' mock_requests.get.return_value = get_mock_response(data=["Llama!"]) - result = self.client.list(endpoint='test1') - self.assertEqual(['Llama!'], result) + result = self.client.list(endpoint="test1") + assert result == ["Llama!"] diff --git a/tests/test_utils.py b/tests/test_utils.py index 7e1fd71..797a07a 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -3,12 +3,9 @@ See also https://github.com/seed-platform/py-seed/main/LICENSE """ - -# Imports from Third Party Modules import unittest from pathlib import Path -# Local Imports from pyseed.utils import read_map_file @@ -22,6 +19,6 @@ def test_mapping_file(self): "from_units": "ft**2", "to_field": "gross_floor_area", "to_table_name": "PropertyState", - "is_omitted": False + "is_omitted": False, } assert mappings[5] == expected diff --git a/tox.ini b/tox.ini index 5116d63..c25dc8d 100644 --- a/tox.ini +++ b/tox.ini @@ -1,30 +1,20 @@ -[flake8] -ignore=E402,E501,E731,W503,W504 -;exclude=... -max-line-length=100 - [tox] -envlist= +env_list = python precommit mypy recreate = True -skipsdist=True +no_package = True + +[testenv] +base_python = python +deps = -r requirements-test.txt [testenv:python] -basepython=python -deps=-rrequirements-test.txt -commands=pytest --cov=. --cov-report= --cov-append -s -m 'not integration' +commands = pytest --cov=. --cov-report= --cov-append -s -m 'not integration' [testenv:precommit] -basepython=python -deps= - -r{toxinidir}/requirements-test.txt -commands= - pre-commit run --all-files +commands = pre-commit run --all-files [testenv:mypy] -basepython=python -deps= - -r{toxinidir}/requirements-test.txt -commands=mypy --install-types --non-interactive --show-error-codes {toxinidir}/pyseed +commands = mypy --install-types --non-interactive --show-error-codes pyseed