Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snowflake integration #41

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

bgilbert-fetchrewards
Copy link

@erikbern just kill this PR whenever you see fit

  • create abstract base class
  • create JSONSchemaToSnowflake class
  • make input params a bit more generic
  • add very basic json schema array handling
  • formatting with black

@erikbern
Copy link
Contributor

the diff so far just looks like it's reformatting the code using black?

@bgilbert-fetchrewards
Copy link
Author

the diff so far just looks like it's reformatting the code using black?

It says Large diffs are not rendered by default.on jsonschema2db.py for me, but it looks like my changes are there.

This is the Snowflake piece:

class JSONSchemaToSnowflake(JSONSchemaToDatabase):
"""Shorthand for JSONSchemaToDatabase(..., database_flavor='snowflake')"""
def __init__(self, *args, **kwargs):
kwargs["database_flavor"] = "snowflake"
return super(JSONSchemaToSnowflake, self).__init__(*args, **kwargs)
def _execute(self, cursor, query, args=None, query_ok_to_print=True):
if self._debug and query_ok_to_print:
print(query, file=sys.stderr)
query = query.replace("'None'", "NULL")
print(query)
cursor.execute(query, args)
def _get_data_types(self):
"""json schema's data types to database flavor's data types"""
return {
"boolean": "boolean",
"number": "number",
"string": "varchar",
"enum": "varchar",
"integer": "bigint",
"timestamp": "timestamp",
"date": "date",
"link": "integer",
"array": "variant",
}
def create_tables(self, con):
"""Creates tables
:param con: snowflake connection object
"""
data_types = self._get_data_types()
with con.cursor() as cursor:
if self._schema is not None:
if self._drop_schema:
self._execute(cursor, "drop schema if exists %s cascade" % self._schema)
self._execute(cursor, "create schema if not exists %s" % self._schema)
for table, columns in self._table_columns.items():
types = [self._table_definitions[table][column] for column in columns]
# new
id_data_type = "int not null default %s_%s_seq.nextval" % (
self._schema,
table,
)
seq_q = "create or replace sequence %s_%s_seq" % (self._schema, table)
self._execute(cursor, seq_q)
create_q = (
'create or replace table %s (id %s, "%s" %s not null, "%s" varchar not null, %s)'
% (
self._get_table_name(table),
id_data_type,
self._item_col_name,
data_types[self._item_col_type],
self._prefix_col_name,
(
"".join(
'"%s" %s, ' % (c, data_types[t])
for c, t in zip(columns, types)
)
)[:-2],
)
)
self._execute(cursor, create_q)
def insert_items(self, con, items, extra_items={}, mutate=True, count=False):
""" Inserts data into database.
:param con: snowflake connection object
:param items: is an iterable of tuples `(item id, values)` where `values` is either:
- A nested dict conforming to the JSON spec
- A list (or iterator) of pairs where the first item in the pair is a tuple specifying the path, and the second value in the pair is the value.
:param extra_items: A dictionary containing values for extra columns, where key is an extra column name.
:param mutate: If this is set to `False`, nothing is actually inserted. This might be useful if you just want to validate data.
:param count: if set to `True`, it will count some things. Defaults to `False`.
Updates `self.failure_count`, a dict counting the number of failures for paths (keys are tuples, values are integers).
This function has an optimized strategy for Redshift, where it writes the data to temporary files, copies those to S3, and uses the `COPY`
command to ingest the data into Redshift. However this strategy is only used if the `s3_client` is provided to the constructor.
Otherwise, it will fall back to the Postgres-based method of running batched insertions.
Note that the Postgres-based insertion builds up huge intermediary datastructures, so it will take a lot more memory.
"""
rows = self._insert_items_generate_rows(
items=items, extra_items=extra_items, count=count
)
if not mutate:
for table, row in rows:
# Just exhaust the iterator
pass
with con.cursor() as cursor:
data_by_table = {}
for table, row in rows:
# Note that this flushes the iterator into an in-memory datastructure, so it will be far less memory efficient than the Redshift strategy
data_by_table.setdefault(table, []).append(row)
for table, data in data_by_table.items():
cols = '("%s","%s"%s)' % (
self._item_col_name,
self._prefix_col_name,
"".join(',"%s"' % c for c in self._table_columns[table]),
)
pattern = "" + ",".join(["%s"] * len(data[0])) + ""
for datum in data:
args = pattern % tuple(["%s" % d if "PARSE_JSON" in str(d) else "'%s'" % d for d in datum])
self._execute(
cursor,
"insert into %s %s select %s"
% (self._get_table_name(table), cols, args),
query_ok_to_print=False,
)
def create_links(self, con):
"""Adds foreign keys between tables."""
for from_table, cols in self._links.items():
for ref_col_name, (prefix, to_table) in cols.items():
if (
from_table not in self._table_columns
or to_table not in self._table_columns
):
continue
args = {
"from_table": self._get_table_name(from_table),
"to_table": self._get_table_name(to_table),
"ref_col": ref_col_name,
"item_col": self._item_col_name,
"prefix_col": self._prefix_col_name,
"prefix": prefix,
}
update_q = (
'update %(from_table)s set "%(ref_col)s" = to_table.id from (select "%(item_col)s", "%(prefix_col)s", id from %(to_table)s) to_table'
% args
)
if prefix:
# Forward reference from table to a definition
update_q += (
' where %(from_table)s."%(item_col)s" = to_table."%(item_col)s" and %(from_table)s."%(prefix_col)s" || \'/%(prefix)s\' = to_table."%(prefix_col)s"'
% args
)
else:
# Backward definition from a table to its patternProperty parent
update_q += (
' where %(from_table)s."%(item_col)s" = to_table."%(item_col)s" and strpos(%(from_table)s."%(prefix_col)s", to_table."%(prefix_col)s") = 1'
% args
)
alter_q = (
'alter table %(from_table)s add constraint fk_%(ref_col)s foreign key ("%(ref_col)s") references %(to_table)s (id)'
% args
)
with con.cursor() as cursor:
self._execute(cursor, update_q)
self._execute(cursor, alter_q)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants