From 2dcc1ef7c537e1346f4cf71209d5722a84190a2b Mon Sep 17 00:00:00 2001 From: bgilbert-fetchrewards <61198425+bgilbert-fetchrewards@users.noreply.github.com> Date: Mon, 14 Sep 2020 15:13:16 -0500 Subject: [PATCH] snowflake integration; basic json schema array handling; formatting with black --- .dockerignore | 6 + .gitignore | 129 ++++++++ docs/index.rst | 2 +- jsonschema2db.py | 798 +++++++++++++++++++++++++++++++++++------------ test/test.py | 265 ++++++++++------ 5 files changed, 917 insertions(+), 283 deletions(-) create mode 100644 .dockerignore create mode 100644 .gitignore diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..aa9c8f4 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,6 @@ +# ignore .git and .cache folders +.git +.cache + +# ignore readmes +*.md diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea6d83e --- /dev/null +++ b/.gitignore @@ -0,0 +1,129 @@ +.ignore/ +.vscode/ + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +# lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + diff --git a/docs/index.rst b/docs/index.rst index e304476..f3b5100 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -86,7 +86,7 @@ The first step is to instantiate a :class:`jsonschema2db.JSONSchemaToPostgres` o schema = json.load(open('test/test_schema.json')) translator = JSONSchemaToPostgres( schema, - postgres_schema='schm', + schema_namespace='schm', item_col_name='loan_file_id', item_col_type='string', abbreviations={ diff --git a/jsonschema2db.py b/jsonschema2db.py index 71a792d..1ba6494 100644 --- a/jsonschema2db.py +++ b/jsonschema2db.py @@ -1,21 +1,23 @@ import change_case -import csv import datetime import iso8601 import json -import os -import random +import re +import csv import sys import tempfile import warnings +import random +from abc import ABC, abstractmethod -class JSONSchemaToDatabase: - '''JSONSchemaToDatabase is the mother class for everything +class JSONSchemaToDatabase(ABC): + """JSONSchemaToDatabase is the mother class for everything :param schema: The JSON schema, as a native Python dict - :param database_flavor: Either "postgres" or "redshift" - :param postgres_schema: (optional) A string denoting a postgres schema (namespace) under which all tables will be created + :param database_flavor: One of ["postgres", "redshift", "snowflake"] + :param drop_schema: (optional) Whether or not to drop (with CASCADE) the target schema + :param schema_namespace: (optional) A string denoting a schema (namespace) under which all tables will be created :param debug: (optional) Set this to True if you want all queries to be printed to stderr :param item_col_name: (optional) The name of the main object key (default is 'item_id') :param item_col_type: (optional) Type of the main object key (uses the type identifiers from JSON Schema). Default is 'integer' @@ -28,18 +30,34 @@ class JSONSchemaToDatabase: :param s3_prefix: (optional, Redshift only) Optional subdirectory within the S3 bucket :param s3_iam_arn: (optional, Redshift only) Extra IAM argument - Typically you want to instantiate a `JSONSchemaToPostgres` object, and run :func:`create_tables` to create all the tables. After that, insert all data using :func:`insert_items`. Once you're done inserting, run :func:`create_links` to populate all references properly and add foreign keys between tables. Optionally you can run :func:`analyze` finally which optimizes the tables. - ''' - def __init__(self, schema, database_flavor, postgres_schema=None, debug=False, - item_col_name='item_id', item_col_type='integer', prefix_col_name='prefix', - abbreviations={}, extra_columns=[], root_table='root', - s3_client=None, s3_bucket=None, s3_prefix='jsonschema2db', s3_iam_arn=None): + Typically you want to instantiate a `JSONSchemaToDATABASE` object, and run :func:`create_tables` to create all the tables. After that, insert all data using :func:`insert_items`. Once you're done inserting, run :func:`create_links` to populate all references properly and add foreign keys between tables. Optionally you can run :func:`analyze` finally which optimizes the tables. + """ + + def __init__( + self, + schema, + database_flavor, + drop_schema=False, + schema_namespace=None, + debug=False, + item_col_name="item_id", + item_col_type="integer", + prefix_col_name="prefix", + abbreviations={}, + extra_columns=[], + root_table="root", + s3_client=None, + s3_bucket=None, + s3_prefix="jsonschema2db", + s3_iam_arn=None, + ): self._database_flavor = database_flavor self._debug = debug self._table_definitions = {} self._links = {} self._backlinks = {} - self._postgres_schema = postgres_schema + self._schema = schema_namespace + self._drop_schema = drop_schema self._item_col_name = item_col_name self._item_col_type = item_col_type self._prefix_col_name = prefix_col_name @@ -60,7 +78,9 @@ def __init__(self, schema, database_flavor, postgres_schema=None, debug=False, self.json_path_count = {} # json path -> count # Walk the schema and build up the translation tables - self._translation_tree = self._traverse(schema, schema, table=self._root_table, comment=schema.get('comment')) + self._translation_tree = self._traverse( + schema, schema, table=self._root_table, comment=schema.get("comment") + ) # Need to compile all the backlinks that uniquely identify a parent and add columns for them for child_table in self._backlinks: @@ -69,12 +89,14 @@ def __init__(self, schema, database_flavor, postgres_schema=None, debug=False, continue parent_table, ref_col_name, _ = list(self._backlinks[child_table])[0] self._backlinks[child_table] = (parent_table, ref_col_name) - self._table_definitions[child_table][ref_col_name] = 'link' + self._table_definitions[child_table][ref_col_name] = "link" self._links.setdefault(child_table, {})[ref_col_name] = (None, parent_table) # Construct tables and columns self._table_columns = {} - max_column_length = {'postgres': 63, 'redshift': 127}[self._database_flavor] + max_column_length = {"postgres": 63, "redshift": 127, "snowflake": 251}[ + self._database_flavor + ] for col, type in self._extra_columns: if 0 < len(col) <= max_column_length: @@ -83,28 +105,48 @@ def __init__(self, schema, database_flavor, postgres_schema=None, debug=False, for table, column_types in self._table_definitions.items(): for column in column_types.keys(): if len(column) > max_column_length: - warnings.warn('Ignoring_column because it is too long: %s.%s' % (table, column)) - columns = sorted(col for col in column_types.keys() if 0 < len(col) <= max_column_length) + warnings.warn( + "Ignoring_column because it is too long: %s.%s" + % (table, column) + ) + columns = sorted( + col for col in column_types.keys() if 0 < len(col) <= max_column_length + ) self._table_columns[table] = columns def _table_name(self, path): - return '__'.join(change_case.ChangeCase.camel_to_snake(self._abbreviations.get(p, p)) for p in path) + return "__".join( + change_case.ChangeCase.camel_to_snake(self._abbreviations.get(p, p)) + for p in path if p + ) def _column_name(self, path): return self._table_name(path) # same + @abstractmethod def _execute(self, cursor, query, args=None, query_ok_to_print=True): - if self._debug and query_ok_to_print: - print(query, file=sys.stderr) - cursor.execute(query, args) - - def _traverse(self, schema, tree, path=tuple(), table='root', parent=None, comment=None, json_path=tuple()): + pass + + @abstractmethod + def _get_data_types(self): + pass + + def _traverse( + self, + schema, + tree, + path=tuple(), + table="root", + parent=None, + comment=None, + json_path=tuple(), + ): # Computes a bunch of stuff # 1. A list of tables and columns (used to create tables dynamically) # 2. A tree (dicts of dicts) with a mapping for each fact into tables (used to map data) # 3. Links between entities if type(tree) != dict: - warnings.warn('%s.%s: Broken subtree' % (table, self._column_name(path))) + warnings.warn("%s.%s: Broken subtree" % (table, self._column_name(path))) return if parent is not None: @@ -117,106 +159,171 @@ def _traverse(self, schema, tree, path=tuple(), table='root', parent=None, comme definition = None new_json_path = json_path - while '$ref' in tree: - ref = tree['$ref'] - p = ref.lstrip('#').lstrip('/').split('/') + while "$ref" in tree: + ref = tree["$ref"] + p = ref.lstrip("#").lstrip("/").split("/") tree = schema for elem in p: if elem not in tree: - warnings.warn('%s.%s: Broken definition: %s' % (table, self._column_name(path), ref)) + warnings.warn( + "%s.%s: Broken definition: %s" + % (table, self._column_name(path), ref) + ) return tree = tree[elem] - new_json_path = ('#',) + tuple(p) - definition = p[-1] # TODO(erikbern): we should just make this a boolean variable + new_json_path = ("#",) + tuple(p) + definition = p[ + -1 + ] # TODO(erikbern): we should just make this a boolean variable - special_keys = set(tree.keys()).intersection(['oneOf', 'allOf', 'anyOf']) + special_keys = set(tree.keys()).intersection(["oneOf", "allOf", "anyOf"]) if special_keys: res = {} for p in special_keys: for q in tree[p]: - res.update(self._traverse(schema, q, path, table, json_path=new_json_path)) + res.update( + self._traverse(schema, q, path, table, json_path=new_json_path) + ) return res # This is a special node, don't store any more information - elif 'enum' in tree: - self._table_definitions[table][self._column_name(path)] = 'enum' - if 'comment' in tree: - self._column_comments.setdefault(table, {})[self._column_name(path)] = tree['comment'] - res = {'_column': self._column_name(path), '_type': 'enum'} - elif 'type' not in tree: + elif "enum" in tree: + self._table_definitions[table][self._column_name(path)] = "enum" + if "comment" in tree: + self._column_comments.setdefault(table, {})[ + self._column_name(path) + ] = tree["comment"] + res = {"_column": self._column_name(path), "_type": "enum"} + elif "type" not in tree: res = {} - warnings.warn('%s.%s: Type info missing' % (table, self._column_name(path))) - elif tree['type'] == 'object': - print('object:', tree) + warnings.warn("%s.%s: Type info missing" % (table, self._column_name(path))) + elif tree["type"] == "object": + if self._debug: + print("object:", tree) res = {} - if 'patternProperties' in tree: + if "patternProperties" in tree: # Always create a new table for the pattern properties - if len(tree['patternProperties']) > 1: - warnings.warn('%s.%s: Multiple patternProperties, will ignore all except first' % (table, self._column_name(path))) - for p in tree['patternProperties']: - ref_col_name = table + '_id' - res['*'] = self._traverse(schema, tree['patternProperties'][p], tuple(), self._table_name(path), (table, ref_col_name, self._column_name(path)), tree.get('comment'), new_json_path + (p,)) + if len(tree["patternProperties"]) > 1: + warnings.warn( + "%s.%s: Multiple patternProperties, will ignore all except first" + % (table, self._column_name(path)) + ) + for p in tree["patternProperties"]: + ref_col_name = table + "_id" + res["*"] = self._traverse( + schema, + tree["patternProperties"][p], + tuple(), + self._table_name(path), + (table, ref_col_name, self._column_name(path)), + tree.get("comment"), + new_json_path + (p,), + ) break - elif 'properties' in tree: + elif "properties" in tree: if definition: # This is a shared definition, so create a new table (if not already exists) if path == tuple(): - ref_col_name = self._table_name([definition]) + '_id' + ref_col_name = self._table_name([definition]) + "_id" else: - ref_col_name = self._column_name(path) + '_id' - for p in tree['properties']: - res[p] = self._traverse(schema, tree['properties'][p], (p, ), self._table_name([definition]), (table, ref_col_name, self._column_name(path)), tree.get('comment'), new_json_path + (p,)) - self._table_definitions[table][ref_col_name] = 'link' - self._links.setdefault(table, {})[ref_col_name] = ('/'.join(path), self._table_name([definition])) + ref_col_name = self._column_name(path) + "_id" + for p in tree["properties"]: + res[p] = self._traverse( + schema, + tree["properties"][p], + (p,), + self._table_name([definition]), + (table, ref_col_name, self._column_name(path)), + tree.get("comment"), + new_json_path + (p,), + ) + self._table_definitions[table][ref_col_name] = "link" + self._links.setdefault(table, {})[ref_col_name] = ( + "/".join(path), + self._table_name([definition]), + ) else: # Standard object, just traverse recursively - for p in tree['properties']: - res[p] = self._traverse(schema, tree['properties'][p], path + (p,), table, parent, tree.get('comment'), new_json_path + (p,)) + for p in tree["properties"]: + res[p] = self._traverse( + schema, + tree["properties"][p], + path + (p,), + table, + parent, + tree.get("comment"), + new_json_path + (p,), + ) else: - warnings.warn('%s.%s: Object with neither properties nor patternProperties' % (table, self._column_name(path))) + warnings.warn( + "%s.%s: Object with neither properties nor patternProperties" + % (table, self._column_name(path)) + ) + + elif tree["type"] == "array": + self._table_definitions[table][self._column_name(path)] = "array" + if "comment" in tree: + self._column_comments.setdefault(table, {})[ + self._column_name(path) + ] = tree["comment"] + res = {"_column": self._column_name(path), "_type": "array"} else: - if tree['type'] == 'null': + if tree["type"] == "null": res = {} - elif tree['type'] not in ['string', 'boolean', 'number', 'integer']: - warnings.warn('%s.%s: Type error: %s' % (table, self._column_name(path), tree['type'])) + elif tree["type"] not in ["string", "boolean", "number", "integer"]: + warnings.warn( + "%s.%s: Type error: %s" + % (table, self._column_name(path), tree["type"]) + ) res = {} else: - if definition in ['date', 'timestamp']: + if definition in ["date", "timestamp"]: t = definition else: - t = tree['type'] + t = tree["type"] self._table_definitions[table][self._column_name(path)] = t - if 'comment' in tree: - self._column_comments.setdefault(table, {})[self._column_name(path)] = tree['comment'] - res = {'_column': self._column_name(path), '_type': t} + if "comment" in tree: + self._column_comments.setdefault(table, {})[ + self._column_name(path) + ] = tree["comment"] + res = {"_column": self._column_name(path), "_type": t} - res['_table'] = table - res['_suffix'] = '/'.join(path) - res['_json_path'] = '/'.join(json_path) - self.json_path_count['/'.join(json_path)] = 0 + res["_table"] = table + res["_suffix"] = "/".join(path) + res["_json_path"] = "/".join(json_path) + self.json_path_count["/".join(json_path)] = 0 return res def _coerce_type(self, t, value): - ''' Returns a two-tuple (is_valid, new_value) where new_value is properly coerced. ''' + """ Returns a two-tuple (is_valid, new_value) where new_value is properly coerced. """ try: - if t == 'number': + if t == "number": return type(value) != bool, float(value) - elif t == 'integer': + elif t == "integer": return type(value) != bool, int(value) - elif t == 'boolean': + elif t == "boolean": return type(value) == bool, value - elif t == 'timestamp': + elif t == "timestamp": if type(value) == datetime.datetime: return True, value return True, iso8601.parse_date(value) - elif t == 'date': + elif t == "date": if type(value) == datetime.date: return True, value - return True, datetime.date(*(int(z) for z in value.split('-'))) - elif t == 'string': + return True, datetime.date(*(int(z) for z in value.split("-"))) + elif t == "string": # Allow coercing ints/floats, but nothing else return type(value) in [str, int, float], str(value) - elif t == 'enum': + elif t == "enum": return type(value) == str, str(value) + elif t == "array": + if self._database_flavor == 'snowflake': + return True, "PARSE_JSON(' %s ')" % json.dumps(value).replace('\'', '\\\'') + else: + warnings.warn( + "Ignoring unsupported column with type: %s" + % (t) + ) + pass except: pass return False, None @@ -226,41 +333,20 @@ def _flatten_dict(self, data, res=None, path=tuple()): res = [] if type(data) == dict: for k, v in data.items(): - self._flatten_dict(v, res, path+(k,)) + self._flatten_dict(v, res, path + (k,)) else: res.append((path, data)) return res - def _postgres_table_name(self, table): - if self._postgres_schema is None: + def _get_table_name(self, table): + if self._schema is None: return '"%s"' % table else: - return '"%s"."%s"' % (self._postgres_schema, table) + return '"%s"."%s"' % (self._schema, table) + @abstractmethod def create_tables(self, con): - '''Creates tables - - :param con: psycopg2 connection object - ''' - postgres_types = {'boolean': 'bool', 'number': 'float', 'string': 'text', 'enum': 'text', 'integer': 'bigint', 'timestamp': 'timestamptz', 'date': 'date', 'link': 'integer'} - with con.cursor() as cursor: - if self._postgres_schema is not None: - self._execute(cursor, 'drop schema if exists %s cascade' % self._postgres_schema) - self._execute(cursor, 'create schema %s' % self._postgres_schema) - for table, columns in self._table_columns.items(): - types = [self._table_definitions[table][column] for column in columns] - id_data_type = {'postgres': 'serial', 'redshift': 'int identity(1, 1) not null'}[self._database_flavor] - - create_q = 'create table %s (id %s, "%s" %s not null, "%s" text not null, %s unique ("%s", "%s"), unique (id))' % \ - (self._postgres_table_name(table), id_data_type, self._item_col_name, postgres_types[self._item_col_type], self._prefix_col_name, - ''.join('"%s" %s, ' % (c, postgres_types[t]) for c, t in zip(columns, types)), - self._item_col_name, self._prefix_col_name) - self._execute(cursor, create_q) - if table in self._table_comments: - self._execute(cursor, 'comment on table %s is %%s' % self._postgres_table_name(table), (self._table_comments[table],)) - for c in columns: - if c in self._column_comments.get(table, {}): - self._execute(cursor, 'comment on column %s."%s" is %%s' % (self._postgres_table_name(table), c), (self._column_comments[table][c],)) + pass def _insert_items_generate_rows(self, items, extra_items, count): # Helper function to generate data row by row for insertion @@ -272,36 +358,44 @@ def _insert_items_generate_rows(self, items, extra_items, count): if value is None: continue - subtree = self._translation_tree - res.setdefault(subtree['_table'], {}).setdefault('', {}) + # if type(value) == list: + # for v in value: + # self._insert_items_generate_rows(v, extra_items, count) + + subtree = self._translation_tree + res.setdefault(subtree["_table"], {}).setdefault("", {}) if count: - self.json_path_count[subtree['_json_path']] += 1 + self.json_path_count[subtree["_json_path"]] += 1 for index, path_part in enumerate(path): - if '*' in subtree: - subtree = subtree['*'] + if "*" in subtree: + subtree = subtree["*"] elif not subtree.get(path_part): if count: - self.failure_count[path] = self.failure_count.get(path, 0) + 1 + self.failure_count[path] = ( + self.failure_count.get(path, 0) + 1 + ) break else: subtree = subtree[path_part] # Compute the prefix, add an empty entry (TODO: should make the prefix customizeable) - table, suffix = subtree['_table'], subtree['_suffix'] - prefix_suffix = '/' + '/'.join(path[:(index+1)]) + table, suffix = subtree["_table"], subtree["_suffix"] + prefix_suffix = "/" + "/".join(path[: (index + 1)]) assert prefix_suffix.endswith(suffix) - prefix = prefix_suffix[:len(prefix_suffix)-len(suffix)].rstrip('/') + prefix = prefix_suffix[: len(prefix_suffix) - len(suffix)].rstrip( + "/" + ) res.setdefault(table, {}).setdefault(prefix, {}) if count: - self.json_path_count[subtree['_json_path']] += 1 + self.json_path_count[subtree["_json_path"]] += 1 # Leaf node with value, validate and prepare for insertion - if '_column' not in subtree: + if "_column" not in subtree: if count: self.failure_count[path] = self.failure_count.get(path, 0) + 1 continue - col, t = subtree['_column'], subtree['_type'] + col, t = subtree["_column"], subtree["_type"] if table not in self._table_columns: if count: self.failure_count[path] = self.failure_count.get(path, 0) + 1 @@ -316,16 +410,101 @@ def _insert_items_generate_rows(self, items, extra_items, count): for table, table_values in res.items(): if table == self._root_table and item_id in extra_items: - res[table][''].update(extra_items[item_id]) + res[table][""].update(extra_items[item_id]) # Compile table rows for this item for table, table_values in res.items(): for prefix, row_values in table_values.items(): - row_array = [item_id, prefix] + [row_values.get(t) for t in self._table_columns[table]] + row_array = [item_id, prefix] + [ + row_values.get(t) for t in self._table_columns[table] + ] yield (table, row_array) + @abstractmethod def insert_items(self, con, items, extra_items={}, mutate=True, count=False): - ''' Inserts data into database. + pass + + @abstractmethod + def create_links(self, con): + """Adds foreign keys between tables.""" + pass + + +class JSONSchemaToPostgres(JSONSchemaToDatabase): + """Shorthand for JSONSchemaToDatabase(..., database_flavor='postgres')""" + + def __init__(self, *args, **kwargs): + kwargs["database_flavor"] = "postgres" + return super(JSONSchemaToPostgres, self).__init__(*args, **kwargs) + + def _execute(self, cursor, query, args=None, query_ok_to_print=True): + if self._debug and query_ok_to_print: + print(query, file=sys.stderr) + + cursor.execute(query, args) + + def _get_data_types(self): + """json schema's data types to database flavor's data types""" + return { + "boolean": "bool", + "number": "float", + "string": "text", + "enum": "text", + "integer": "bigint", + "timestamp": "timestamptz", + "date": "date", + "link": "integer", + } + + def create_tables(self, con): + """Creates tables + :param con: psycopg2 connection object + """ + data_types = self._get_data_types() + with con.cursor() as cursor: + if self._schema is not None: + if self._drop_schema: + self._execute(cursor, "drop schema if exists %s cascade" % self._schema) + + self._execute(cursor, "create schema if not exists %s" % self._schema) + for table, columns in self._table_columns.items(): + types = [self._table_definitions[table][column] for column in columns] + id_data_type = "serial" + + create_q = ( + 'create table %s (id %s, "%s" %s not null, "%s" text not null, %s unique ("%s", "%s"), unique (id))' + % ( + self._get_table_name(table), + id_data_type, + self._item_col_name, + data_types[self._item_col_type], + self._prefix_col_name, + "".join( + '"%s" %s, ' % (c, data_types[t]) + for c, t in zip(columns, types) + ), + self._item_col_name, + self._prefix_col_name, + ) + ) + self._execute(cursor, create_q) + if table in self._table_comments: + self._execute( + cursor, + "comment on table %s is %%s" % self._get_table_name(table), + (self._table_comments[table],), + ) + for c in columns: + if c in self._column_comments.get(table, {}): + self._execute( + cursor, + 'comment on column %s."%s" is %%s' + % (self._get_table_name(table), c), + (self._column_comments[table][c],), + ) + + def insert_items(self, con, items, extra_items={}, mutate=True, count=False): + """ Inserts data into database. :param con: psycopg2 connection object :param items: is an iterable of tuples `(item id, values)` where `values` is either: @@ -344,106 +523,337 @@ def insert_items(self, con, items, extra_items={}, mutate=True, count=False): Otherwise, it will fall back to the Postgres-based method of running batched insertions. Note that the Postgres-based insertion builds up huge intermediary datastructures, so it will take a lot more memory. - ''' - rows = self._insert_items_generate_rows(items=items, extra_items=extra_items, count=count) + """ + rows = self._insert_items_generate_rows( + items=items, extra_items=extra_items, count=count + ) if not mutate: for table, row in rows: # Just exhaust the iterator pass - elif self._database_flavor == 'redshift' and self._s3_client: - with tempfile.TemporaryDirectory() as tmpdirname, con.cursor() as cursor: - # Flush the iterator to temporary files on disk - temp_files, writers, file_objs = {}, {}, [] - for table, row in rows: - if table not in temp_files: - fn = temp_files[table] = os.path.join(tmpdirname, table + '.csv') - f = open(fn, 'wt') - writer = csv.writer(f) - if self._debug: - print('Creating temp file for table', table, 'at', fn, file=sys.stderr) - writers[table] = writer - file_objs.append(f) - - writers[table].writerow(row) - - # Close local temp files so all data gets flushed to disk - for f in file_objs: - f.close() - - # Upload all files to S3 and load into Redshift - # TODO: might want to use a thread pool for this - batch_random = '%012d' % random.randint(0, 999999999999) - for table, fn in temp_files.items(): - s3_path = '/%s/%s/%s.csv' % (self._s3_prefix, batch_random, table) - if self._debug: - print('Uploading data for table %s from %s (%d bytes) to %s' % (table, fn, os.path.getsize(fn), s3_path), file=sys.stderr) - self._s3_client.upload_file(Filename=fn, Bucket=self._s3_bucket, Key=s3_path) - - query = 'copy %s from \'s3://%s/%s\' csv %s truncatecolumns compupdate off statupdate off' % ( - self._postgres_table_name(table), - self._s3_bucket, s3_path, self._s3_iam_arn and 'iam_role \'%s\'' % self._s3_iam_arn or '') - self._execute(cursor, query) - else: - # Postgres-based insertion - with con.cursor() as cursor: - data_by_table = {} - for table, row in rows: - # Note that this flushes the iterator into an in-memory datastructure, so it will be far less memory efficient than the Redshift strategy - data_by_table.setdefault(table, []).append(row) - for table, data in data_by_table.items(): - cols = '("%s","%s"%s)' % (self._item_col_name, self._prefix_col_name, ''.join(',"%s"' % c for c in self._table_columns[table])) - pattern = '(' + ','.join(['%s'] * len(data[0])) + ')' - args = b','.join(cursor.mogrify(pattern, tup) for tup in data) - self._execute(cursor, b'insert into %s %s values %s' % (self._postgres_table_name(table).encode(), cols.encode(), args), query_ok_to_print=False) + # Postgres-based insertion + with con.cursor() as cursor: + data_by_table = {} + for table, row in rows: + # Note that this flushes the iterator into an in-memory datastructure, so it will be far less memory efficient than the Redshift strategy + data_by_table.setdefault(table, []).append(row) + for table, data in data_by_table.items(): + cols = '("%s","%s"%s)' % ( + self._item_col_name, + self._prefix_col_name, + "".join(',"%s"' % c for c in self._table_columns[table]), + ) + pattern = "(" + ",".join(["%s"] * len(data[0])) + ")" + args = b",".join(cursor.mogrify(pattern, tup) for tup in data) + self._execute( + cursor, + b"insert into %s %s values %s" + % (self._get_table_name(table).encode(), cols.encode(), args), + query_ok_to_print=False, + ) def create_links(self, con): - '''Adds foreign keys between tables.''' + """Adds foreign keys between tables.""" for from_table, cols in self._links.items(): for ref_col_name, (prefix, to_table) in cols.items(): - if from_table not in self._table_columns or to_table not in self._table_columns: + if ( + from_table not in self._table_columns + or to_table not in self._table_columns + ): continue args = { - 'from_table': self._postgres_table_name(from_table), - 'to_table': self._postgres_table_name(to_table), - 'ref_col': ref_col_name, - 'item_col': self._item_col_name, - 'prefix_col': self._prefix_col_name, - 'prefix': prefix, - } - update_q = 'update %(from_table)s set "%(ref_col)s" = to_table.id from (select "%(item_col)s", "%(prefix_col)s", id from %(to_table)s) to_table' % args + "from_table": self._get_table_name(from_table), + "to_table": self._get_table_name(to_table), + "ref_col": ref_col_name, + "item_col": self._item_col_name, + "prefix_col": self._prefix_col_name, + "prefix": prefix, + } + + update_q = ( + 'update %(from_table)s set "%(ref_col)s" = to_table.id from (select "%(item_col)s", "%(prefix_col)s", id from %(to_table)s) to_table' + % args + ) + if prefix: # Forward reference from table to a definition - update_q += ' where %(from_table)s."%(item_col)s" = to_table."%(item_col)s" and %(from_table)s."%(prefix_col)s" || \'/%(prefix)s\' = to_table."%(prefix_col)s"' % args + update_q += ( + ' where %(from_table)s."%(item_col)s" = to_table."%(item_col)s" and %(from_table)s."%(prefix_col)s" || \'/%(prefix)s\' = to_table."%(prefix_col)s"' + % args + ) else: # Backward definition from a table to its patternProperty parent - update_q += ' where %(from_table)s."%(item_col)s" = to_table."%(item_col)s" and strpos(%(from_table)s."%(prefix_col)s", to_table."%(prefix_col)s") = 1' % args - - alter_q = 'alter table %(from_table)s add constraint fk_%(ref_col)s foreign key ("%(ref_col)s") references %(to_table)s (id)' % args + update_q += ( + ' where %(from_table)s."%(item_col)s" = to_table."%(item_col)s" and strpos(%(from_table)s."%(prefix_col)s", to_table."%(prefix_col)s") = 1' + % args + ) + + alter_q = ( + 'alter table %(from_table)s add constraint fk_%(ref_col)s foreign key ("%(ref_col)s") references %(to_table)s (id)' + % args + ) with con.cursor() as cursor: self._execute(cursor, update_q) self._execute(cursor, alter_q) def analyze(self, con): - '''Runs `analyze` on each table. This improves performance. + """Runs `analyze` on each table. This improves performance. See the `Postgres documentation for Analyze `_ - ''' + """ with con.cursor() as cursor: for table in self._table_columns.keys(): - self._execute(cursor, 'analyze %s' % self._postgres_table_name(table)) + self._execute(cursor, "analyze %s" % self._get_table_name(table)) -class JSONSchemaToPostgres(JSONSchemaToDatabase): - '''Shorthand for JSONSchemaToDatabase(..., database_flavor='postgres')''' +class JSONSchemaToRedshift(JSONSchemaToPostgres): + """Shorthand for JSONSchemaToDatabase(..., database_flavor='redshift')""" + def __init__(self, *args, **kwargs): - kwargs['database_flavor'] = 'postgres' - return super(JSONSchemaToPostgres, self).__init__(*args, **kwargs) + kwargs["database_flavor"] = "redshift" + return super(JSONSchemaToRedshift, self).__init__(*args, **kwargs) + def insert_items(self, con, items, extra_items={}, mutate=True, count=False): + """ Inserts data into database. + :param con: redshift connection object + :param items: is an iterable of tuples `(item id, values)` where `values` is either: + - A nested dict conforming to the JSON spec + - A list (or iterator) of pairs where the first item in the pair is a tuple specifying the path, and the second value in the pair is the value. + :param extra_items: A dictionary containing values for extra columns, where key is an extra column name. + :param mutate: If this is set to `False`, nothing is actually inserted. This might be useful if you just want to validate data. + :param count: if set to `True`, it will count some things. Defaults to `False`. + Updates `self.failure_count`, a dict counting the number of failures for paths (keys are tuples, values are integers). + This function has an optimized strategy for Redshift, where it writes the data to temporary files, copies those to S3, and uses the `COPY` + command to ingest the data into Redshift. However this strategy is only used if the `s3_client` is provided to the constructor. + Otherwise, it will fall back to the Postgres-based method of running batched insertions. + Note that the Postgres-based insertion builds up huge intermediary datastructures, so it will take a lot more memory. + """ + rows = self._insert_items_generate_rows( + items=items, extra_items=extra_items, count=count + ) + + if not mutate: + for table, row in rows: + # Just exhaust the iterator + pass + + with tempfile.TemporaryDirectory() as tmpdirname, con.cursor() as cursor: + # Flush the iterator to temporary files on disk + temp_files, writers, file_objs = {}, {}, [] + for table, row in rows: + if table not in temp_files: + fn = temp_files[table] = os.path.join(tmpdirname, table + ".csv") + f = open(fn, "wt") + writer = csv.writer(f) + if self._debug: + print( + "Creating temp file for table", + table, + "at", + fn, + file=sys.stderr, + ) + writers[table] = writer + file_objs.append(f) + + writers[table].writerow(row) + + # Close local temp files so all data gets flushed to disk + for f in file_objs: + f.close() + + # Upload all files to S3 and load into Redshift + # TODO: might want to use a thread pool for this + batch_random = "%012d" % random.randint(0, 999999999999) + for table, fn in temp_files.items(): + s3_path = "/%s/%s/%s.csv" % (self._s3_prefix, batch_random, table) + if self._debug: + print( + "Uploading data for table %s from %s (%d bytes) to %s" + % (table, fn, os.path.getsize(fn), s3_path), + file=sys.stderr, + ) + self._s3_client.upload_file( + Filename=fn, Bucket=self._s3_bucket, Key=s3_path + ) + + query = ( + "copy %s from 's3://%s/%s' csv %s truncatecolumns compupdate off statupdate off" + % ( + self._get_table_name(table), + self._s3_bucket, + s3_path, + self._s3_iam_arn and "iam_role '%s'" % self._s3_iam_arn or "", + ) + ) + self._execute(cursor, query) + + +class JSONSchemaToSnowflake(JSONSchemaToDatabase): + """Shorthand for JSONSchemaToDatabase(..., database_flavor='snowflake')""" -class JSONSchemaToRedshift(JSONSchemaToDatabase): - '''Shorthand for JSONSchemaToDatabase(..., database_flavor='redshift')''' def __init__(self, *args, **kwargs): - kwargs['database_flavor'] = 'redshift' - return super(JSONSchemaToRedshift, self).__init__(*args, **kwargs) + kwargs["database_flavor"] = "snowflake" + return super(JSONSchemaToSnowflake, self).__init__(*args, **kwargs) + + def _execute(self, cursor, query, args=None, query_ok_to_print=True): + if self._debug and query_ok_to_print: + print(query, file=sys.stderr) + + query = query.replace("'None'", "NULL") + print(query) + cursor.execute(query, args) + + def _get_data_types(self): + """json schema's data types to database flavor's data types""" + return { + "boolean": "boolean", + "number": "number", + "string": "varchar", + "enum": "varchar", + "integer": "bigint", + "timestamp": "timestamp", + "date": "date", + "link": "integer", + "array": "variant", + } + + def create_tables(self, con): + """Creates tables + + :param con: snowflake connection object + """ + data_types = self._get_data_types() + + with con.cursor() as cursor: + if self._schema is not None: + if self._drop_schema: + self._execute(cursor, "drop schema if exists %s cascade" % self._schema) + + self._execute(cursor, "create schema if not exists %s" % self._schema) + for table, columns in self._table_columns.items(): + types = [self._table_definitions[table][column] for column in columns] + # new + id_data_type = "int not null default %s_%s_seq.nextval" % ( + self._schema, + table, + ) + seq_q = "create or replace sequence %s_%s_seq" % (self._schema, table) + + self._execute(cursor, seq_q) + + create_q = ( + 'create or replace table %s (id %s, "%s" %s not null, "%s" varchar not null, %s)' + % ( + self._get_table_name(table), + id_data_type, + self._item_col_name, + data_types[self._item_col_type], + self._prefix_col_name, + ( + "".join( + '"%s" %s, ' % (c, data_types[t]) + for c, t in zip(columns, types) + ) + )[:-2], + ) + ) + + self._execute(cursor, create_q) + + def insert_items(self, con, items, extra_items={}, mutate=True, count=False): + """ Inserts data into database. + + :param con: snowflake connection object + :param items: is an iterable of tuples `(item id, values)` where `values` is either: + + - A nested dict conforming to the JSON spec + - A list (or iterator) of pairs where the first item in the pair is a tuple specifying the path, and the second value in the pair is the value. + + :param extra_items: A dictionary containing values for extra columns, where key is an extra column name. + :param mutate: If this is set to `False`, nothing is actually inserted. This might be useful if you just want to validate data. + :param count: if set to `True`, it will count some things. Defaults to `False`. + + Updates `self.failure_count`, a dict counting the number of failures for paths (keys are tuples, values are integers). + + This function has an optimized strategy for Redshift, where it writes the data to temporary files, copies those to S3, and uses the `COPY` + command to ingest the data into Redshift. However this strategy is only used if the `s3_client` is provided to the constructor. + Otherwise, it will fall back to the Postgres-based method of running batched insertions. + + Note that the Postgres-based insertion builds up huge intermediary datastructures, so it will take a lot more memory. + """ + rows = self._insert_items_generate_rows( + items=items, extra_items=extra_items, count=count + ) + + if not mutate: + for table, row in rows: + # Just exhaust the iterator + pass + + with con.cursor() as cursor: + data_by_table = {} + for table, row in rows: + # Note that this flushes the iterator into an in-memory datastructure, so it will be far less memory efficient than the Redshift strategy + data_by_table.setdefault(table, []).append(row) + for table, data in data_by_table.items(): + cols = '("%s","%s"%s)' % ( + self._item_col_name, + self._prefix_col_name, + "".join(',"%s"' % c for c in self._table_columns[table]), + ) + pattern = "" + ",".join(["%s"] * len(data[0])) + "" + for datum in data: + args = pattern % tuple(["%s" % d if "PARSE_JSON" in str(d) else "'%s'" % d for d in datum]) + self._execute( + cursor, + "insert into %s %s select %s" + % (self._get_table_name(table), cols, args), + query_ok_to_print=False, + ) + + def create_links(self, con): + """Adds foreign keys between tables.""" + for from_table, cols in self._links.items(): + for ref_col_name, (prefix, to_table) in cols.items(): + if ( + from_table not in self._table_columns + or to_table not in self._table_columns + ): + continue + args = { + "from_table": self._get_table_name(from_table), + "to_table": self._get_table_name(to_table), + "ref_col": ref_col_name, + "item_col": self._item_col_name, + "prefix_col": self._prefix_col_name, + "prefix": prefix, + } + + update_q = ( + 'update %(from_table)s set "%(ref_col)s" = to_table.id from (select "%(item_col)s", "%(prefix_col)s", id from %(to_table)s) to_table' + % args + ) + + if prefix: + # Forward reference from table to a definition + update_q += ( + ' where %(from_table)s."%(item_col)s" = to_table."%(item_col)s" and %(from_table)s."%(prefix_col)s" || \'/%(prefix)s\' = to_table."%(prefix_col)s"' + % args + ) + else: + # Backward definition from a table to its patternProperty parent + update_q += ( + ' where %(from_table)s."%(item_col)s" = to_table."%(item_col)s" and strpos(%(from_table)s."%(prefix_col)s", to_table."%(prefix_col)s") = 1' + % args + ) + + alter_q = ( + 'alter table %(from_table)s add constraint fk_%(ref_col)s foreign key ("%(ref_col)s") references %(to_table)s (id)' + % args + ) + with con.cursor() as cursor: + self._execute(cursor, update_q) + self._execute(cursor, alter_q) diff --git a/test/test.py b/test/test.py index 6dba8e9..3ba2210 100644 --- a/test/test.py +++ b/test/test.py @@ -12,139 +12,228 @@ def query(con, q): def test_lff(): - schema = json.load(open('test/test_schema.json')) + schema = json.load(open("test/test_schema.json")) translator = JSONSchemaToPostgres( schema, - postgres_schema='schm', - item_col_name='loan_file_id', - item_col_type='string', - abbreviations={ - 'AbbreviateThisReallyLongColumn': 'AbbTRLC', - }, + schema_namespace="schm", + item_col_name="loan_file_id", + item_col_type="string", + abbreviations={"AbbreviateThisReallyLongColumn": "AbbTRLC",}, debug=True, ) - con = psycopg2.connect('host=localhost dbname=jsonschema2db-test') + con = psycopg2.connect("host=localhost dbname=jsonschema2db-test") translator.create_tables(con) - translator.insert_items(con, [ - ('loan_file_abc123', { - 'Loan': {'Amount': 500000}, - 'SubjectProperty': {'Address': {'City': 'New York', 'ZipCode': '12345', 'Latitude': 43}, 'Acreage': 42}, - 'RealEstateOwned': {'1': {'Address': {'City': 'Brooklyn', 'ZipCode': '65432'}, 'RentalIncome': 1000}, - '2': {'Address': {'City': 'Queens', 'ZipCode': '54321'}}}, - }) - ]) + translator.insert_items( + con, + [ + ( + "loan_file_abc123", + { + "Loan": {"Amount": 500000}, + "SubjectProperty": { + "Address": { + "City": "New York", + "ZipCode": "12345", + "Latitude": 43, + }, + "Acreage": 42, + }, + "RealEstateOwned": { + "1": { + "Address": {"City": "Brooklyn", "ZipCode": "65432"}, + "RentalIncome": 1000, + }, + "2": {"Address": {"City": "Queens", "ZipCode": "54321"}}, + }, + }, + ) + ], + ) translator.create_links(con) translator.analyze(con) - assert list(query(con, 'select count(1) from schm.root')) == [(1,)] - assert list(query(con, 'select count(1) from schm.basic_address')) == [(3,)] - assert list(query(con, 'select loan_file_id, prefix, loan__amount, subject_property__acreage, subject_property__address__latitude, loan__abb_trlc from schm.root')) == \ - [('loan_file_abc123', '', 500000, 42.0, 43.0, None)] - assert set(query(con, 'select loan_file_id, prefix, city, zip_code from schm.basic_address')) == \ - set([('loan_file_abc123', '/SubjectProperty/Address', 'New York', '12345'), - ('loan_file_abc123', '/RealEstateOwned/1/Address', 'Brooklyn', '65432'), - ('loan_file_abc123', '/RealEstateOwned/2/Address', 'Queens', '54321')]) - assert set(query(con, 'select loan_file_id, prefix, rental_income from schm.real_estate_owned')) == \ - set([('loan_file_abc123', '/RealEstateOwned/1', 1000), - ('loan_file_abc123', '/RealEstateOwned/2', None)]) - assert set(query(con, 'select subject_property__address_id from schm.root union select address_id from schm.real_estate_owned')) == \ - set(query(con, 'select id from schm.basic_address')) - assert set(query(con, 'select root_id from schm.real_estate_owned')) == \ - set(query(con, 'select id from schm.root')) + assert list(query(con, "select count(1) from schm.root")) == [(1,)] + assert list(query(con, "select count(1) from schm.basic_address")) == [(3,)] + assert list( + query( + con, + "select loan_file_id, prefix, loan__amount, subject_property__acreage, subject_property__address__latitude, loan__abb_trlc from schm.root", + ) + ) == [("loan_file_abc123", "", 500000, 42.0, 43.0, None)] + assert set( + query( + con, "select loan_file_id, prefix, city, zip_code from schm.basic_address" + ) + ) == set( + [ + ("loan_file_abc123", "/SubjectProperty/Address", "New York", "12345"), + ("loan_file_abc123", "/RealEstateOwned/1/Address", "Brooklyn", "65432"), + ("loan_file_abc123", "/RealEstateOwned/2/Address", "Queens", "54321"), + ] + ) + assert set( + query( + con, + "select loan_file_id, prefix, rental_income from schm.real_estate_owned", + ) + ) == set( + [ + ("loan_file_abc123", "/RealEstateOwned/1", 1000), + ("loan_file_abc123", "/RealEstateOwned/2", None), + ] + ) + assert set( + query( + con, + "select subject_property__address_id from schm.root union select address_id from schm.real_estate_owned", + ) + ) == set(query(con, "select id from schm.basic_address")) + assert set(query(con, "select root_id from schm.real_estate_owned")) == set( + query(con, "select id from schm.root") + ) def test_pp_to_def(): - schema = json.load(open('test/test_pp_to_def.json')) + schema = json.load(open("test/test_pp_to_def.json")) translator = JSONSchemaToPostgres(schema, debug=True) - con = psycopg2.connect('host=localhost dbname=jsonschema2db-test') + con = psycopg2.connect("host=localhost dbname=jsonschema2db-test") translator.create_tables(con) - translator.insert_items(con, - [(33, [(('aBunchOfDocuments', 'xyz', 'url'), 'http://baz.bar'), - (('moreDocuments', 'abc', 'url'), 'https://banana'), - (('moreDocuments', 'abc', 'url'), ['wrong-type']), - (('moreDocuments', 'abc'), 'broken-value-ignore')])], - count=True) + translator.insert_items( + con, + [ + ( + 33, + [ + (("aBunchOfDocuments", "xyz", "url"), "http://baz.bar"), + (("moreDocuments", "abc", "url"), "https://banana"), + (("moreDocuments", "abc", "url"), ["wrong-type"]), + (("moreDocuments", "abc"), "broken-value-ignore"), + ], + ) + ], + count=True, + ) translator.create_links(con) translator.analyze(con) - assert translator.failure_count == {('moreDocuments', 'abc'): 1, ('moreDocuments', 'abc', 'url'): 1} - - assert list(query(con, 'select count(1) from root')) == [(1,)] - assert list(query(con, 'select count(1) from file')) == [(2,)] - - assert list(query(con, 'select id, prefix, item_id from root')) == [(1, '', 33)] - assert list(query(con, 'select id, prefix, item_id, root_id from a_bunch_of_documents')) == \ - [(1, '/aBunchOfDocuments/xyz', 33, 1)] - assert set(query(con, 'select prefix, url, item_id from file')) == \ - set([('/aBunchOfDocuments/xyz', 'http://baz.bar', 33), - ('/moreDocuments/abc', 'https://banana', 33)]) - assert set(list(query(con, 'select file_id from a_bunch_of_documents')) + - list(query(con, 'select file_id from more_documents'))) == set([(1,), (2,)]) + assert translator.failure_count == { + ("moreDocuments", "abc"): 1, + ("moreDocuments", "abc", "url"): 1, + } + + assert list(query(con, "select count(1) from root")) == [(1,)] + assert list(query(con, "select count(1) from file")) == [(2,)] + + assert list(query(con, "select id, prefix, item_id from root")) == [(1, "", 33)] + assert list( + query(con, "select id, prefix, item_id, root_id from a_bunch_of_documents") + ) == [(1, "/aBunchOfDocuments/xyz", 33, 1)] + assert set(query(con, "select prefix, url, item_id from file")) == set( + [ + ("/aBunchOfDocuments/xyz", "http://baz.bar", 33), + ("/moreDocuments/abc", "https://banana", 33), + ] + ) + assert set( + list(query(con, "select file_id from a_bunch_of_documents")) + + list(query(con, "select file_id from more_documents")) + ) == set([(1,), (2,)]) def test_comments(): - schema = json.load(open('test/test_pp_to_def.json')) + schema = json.load(open("test/test_pp_to_def.json")) translator = JSONSchemaToPostgres(schema, debug=True) # A bit ugly to look at private members, but pulling comments out of postgres is a pain - assert translator._table_comments == {'root': 'the root of everything', - 'file': 'this is a file', - 'a_bunch_of_documents': 'this is a bunch of documents'} - assert translator._column_comments == {'file': {'url': 'the url of the file'}} + assert translator._table_comments == { + "root": "the root of everything", + "file": "this is a file", + "a_bunch_of_documents": "this is a bunch of documents", + } + assert translator._column_comments == {"file": {"url": "the url of the file"}} def test_time_types(): - schema = json.load(open('test/test_time_schema.json')) + schema = json.load(open("test/test_time_schema.json")) translator = JSONSchemaToPostgres(schema, debug=True) - con = psycopg2.connect('host=localhost dbname=jsonschema2db-test') + con = psycopg2.connect("host=localhost dbname=jsonschema2db-test") translator.create_tables(con) - translator.insert_items(con, [ - (1, {'ts': datetime.datetime(2018, 2, 3, 12, 45, 56), 'd': datetime.date(2018, 7, 8)}), - (2, {'ts': '2017-02-03T01:23:45Z', 'd': '2013-03-02'}), - ]) + translator.insert_items( + con, + [ + ( + 1, + { + "ts": datetime.datetime(2018, 2, 3, 12, 45, 56), + "d": datetime.date(2018, 7, 8), + }, + ), + (2, {"ts": "2017-02-03T01:23:45Z", "d": "2013-03-02"}), + ], + ) - assert list(query(con, 'select id, d from root')) == \ - [(1, datetime.date(2018, 7, 8)), (2, datetime.date(2013, 3, 2))] - assert list((id, ts.isoformat()) for id, ts in query(con, 'select id, ts from root')) == \ - [(1, '2018-02-03T12:45:56+00:00'), (2, '2017-02-03T01:23:45+00:00')] + assert list(query(con, "select id, d from root")) == [ + (1, datetime.date(2018, 7, 8)), + (2, datetime.date(2013, 3, 2)), + ] + assert list( + (id, ts.isoformat()) for id, ts in query(con, "select id, ts from root") + ) == [(1, "2018-02-03T12:45:56+00:00"), (2, "2017-02-03T01:23:45+00:00")] def test_refs(): - schema = json.load(open('test/test_refs.json')) + schema = json.load(open("test/test_refs.json")) translator = JSONSchemaToPostgres(schema, debug=True) - con = psycopg2.connect('host=localhost dbname=jsonschema2db-test') + con = psycopg2.connect("host=localhost dbname=jsonschema2db-test") translator.create_tables(con) - assert list(query(con, 'select col from c')) == [] # Just make sure table exists + assert list(query(con, "select col from c")) == [] # Just make sure table exists def test_extra_columns(): - schema = json.load(open('test/test_schema.json')) + schema = json.load(open("test/test_schema.json")) translator = JSONSchemaToPostgres( schema, - postgres_schema='schm', - item_col_name='loan_file_id', - item_col_type='string', - abbreviations={ - 'AbbreviateThisReallyLongColumn': 'AbbTRLC', - }, + schema_namespace="schm", + item_col_name="loan_file_id", + item_col_type="string", + abbreviations={"AbbreviateThisReallyLongColumn": "AbbTRLC",}, debug=True, - extra_columns=[('loan_period', 'integer')] + extra_columns=[("loan_period", "integer")], ) - con = psycopg2.connect('host=localhost dbname=jsonschema2db-test') + con = psycopg2.connect("host=localhost dbname=jsonschema2db-test") translator.create_tables(con) - translator.insert_items(con, [ - ('loan_file_abc123', { - 'Loan': {'Amount': 500000}, - 'SubjectProperty': {'Address': {'City': 'New York', 'ZipCode': '12345', 'Latitude': 43}, 'Acreage': 42}, - 'RealEstateOwned': {'1': {'Address': {'City': 'Brooklyn', 'ZipCode': '65432'}, 'RentalIncome': 1000}, - '2': {'Address': {'City': 'Queens', 'ZipCode': '54321'}}}, - }) - ], {'loan_file_abc123': {'loan_period': 30}}) + translator.insert_items( + con, + [ + ( + "loan_file_abc123", + { + "Loan": {"Amount": 500000}, + "SubjectProperty": { + "Address": { + "City": "New York", + "ZipCode": "12345", + "Latitude": 43, + }, + "Acreage": 42, + }, + "RealEstateOwned": { + "1": { + "Address": {"City": "Brooklyn", "ZipCode": "65432"}, + "RentalIncome": 1000, + }, + "2": {"Address": {"City": "Queens", "ZipCode": "54321"}}, + }, + }, + ) + ], + {"loan_file_abc123": {"loan_period": 30}}, + ) translator.create_links(con) translator.analyze(con) - assert list(query(con, 'select loan_period from schm.root')) == [(30,)] + assert list(query(con, "select loan_period from schm.root")) == [(30,)]