Skip to content

Commit

Permalink
Merge pull request #2 from Lavedonio/development
Browse files Browse the repository at this point in the history
Version 0.1.2
  • Loading branch information
Lavedonio authored Aug 21, 2020
2 parents e9f0588 + 432f349 commit ec8d26d
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 103 deletions.
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# Instackup
This Python library is an open source way to standardize and simplify connections with cloud-based tools, databases and commonly used tools in data manipulation and analysis. It can help BI teams by having a unified source code for local development and testing as well as remote production (automated scheduled run) environments.

This package is compatible with Google Cloud Composer image `composer-1.11.1-airflow-1.10.9`.

# Index

- [Current release](#current-release)
Expand All @@ -12,7 +10,7 @@ This package is compatible with Google Cloud Composer image `composer-1.11.1-air
- [Version logs](#version-logs)

# Current release
**Version 0.1.1 (beta)**
**Version 0.1.2 (beta)**

# Prerequisites
1. Have a [Python 3.6 version or superior](https://www.python.org/downloads/) installed;
Expand Down Expand Up @@ -185,6 +183,8 @@ Check the documentation by clicking in each topic.
- [close_connection(self)](https://github.com/Lavedonio/instackup/blob/master/docs/redshift_tools.md#close_connectionself)
- [execute_sql(self, command, fail_silently=False)](https://github.com/Lavedonio/instackup/blob/master/docs/redshift_tools.md#execute_sqlself-command-fail_silentlyfalse)
- [query(self, sql_query, fetch_through_pandas=True, fail_silently=False)](https://github.com/Lavedonio/instackup/blob/master/docs/redshift_tools.md#queryself-sql_query-fetch_through_pandastrue-fail_silentlyfalse)
- [describe_table(self, table, schema="public", fetch_through_pandas=True, fail_silently=False)](https://github.com/Lavedonio/instackup/blob/master/docs/redshift_tools.md#describe_tableself-table-schemapublic-fetch_through_pandastrue-fail_silentlyfalse)
- [get_all_db_info(self, get_json_info=True, fetch_through_pandas=True, fail_silently=False)](https://github.com/Lavedonio/instackup/blob/master/docs/redshift_tools.md#get_all_db_infoself-get_json_infotrue-fetch_through_pandastrue-fail_silentlyfalse)
- [unload_to_S3(self, redshift_query, s3_path, filename, unload_options="MANIFEST GZIP ALLOWOVERWRITE REGION 'us-east-2'")](https://github.com/Lavedonio/instackup/blob/master/docs/redshift_tools.md#unload_to_s3self-redshift_query-s3_path-filename-unload_optionsmanifest-gzip-allowoverwrite-region-us-east-2)
- [s3_tools](https://github.com/Lavedonio/instackup/blob/master/docs/s3_tools.md#s3_tools)
- [S3Tool](https://github.com/Lavedonio/instackup/blob/master/docs/s3_tools.md#s3tool)
Expand Down Expand Up @@ -225,7 +225,8 @@ Check the documentation by clicking in each topic.
See what changed in every version.

- Beta releases
- [Version 0.1.1](https://github.com/Lavedonio/instackup/blob/master/version_logs/v0.1.1-beta-current_release.md#version-011-beta) (current release)
- [Version 0.1.2](https://github.com/Lavedonio/instackup/blob/master/version_logs/v0.1.2-beta-current_release.md#version-011-beta) (current release)
- [Version 0.1.1](https://github.com/Lavedonio/instackup/blob/master/version_logs/v0.1.1-beta.md#version-011-beta)
- [Version 0.1.0](https://github.com/Lavedonio/instackup/blob/master/version_logs/v0.1.0-beta.md#version-010-beta)
- Alpha releases
- [Version 0.0.6](https://github.com/Lavedonio/instackup/blob/master/version_logs/v0.0.6-alpha.md#version-006-alpha)
Expand Down
86 changes: 86 additions & 0 deletions docs/redshift_tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ This is the documentation for the redshift_tools module and all its contents, wi
- [close_connection(self)](#close_connectionself)
- [execute_sql(self, command, fail_silently=False)](#execute_sqlself-command-fail_silentlyfalse)
- [query(self, sql_query, fetch_through_pandas=True, fail_silently=False)](#queryself-sql_query-fetch_through_pandastrue-fail_silentlyfalse)
- [describe_table(self, table, schema="public", fetch_through_pandas=True, fail_silently=False)](#describe_tableself-table-schemapublic-fetch_through_pandastrue-fail_silentlyfalse)
- [get_all_db_info(self, get_json_info=True, fetch_through_pandas=True, fail_silently=False)](#get_all_db_infoself-get_json_infotrue-fetch_through_pandastrue-fail_silentlyfalse)
- [unload_to_S3(self, redshift_query, s3_path, filename, unload_options="MANIFEST GZIP ALLOWOVERWRITE REGION 'us-east-2'")](#unload_to_s3self-redshift_query-s3_path-filename-unload_optionsmanifest-gzip-allowoverwrite-region-us-east-2)

# Module Contents
Expand Down Expand Up @@ -264,6 +266,90 @@ with RedShiftTool() as rs:
# other code
```

### describe_table(self, table, schema="public", fetch_through_pandas=True, fail_silently=False)
Special query that returns all metadata from a specific table.

Usage example:
```
from instackup.redshift_tools import RedShiftTool
rs = RedShiftTool()
rs.connect()
try:
# Returns a list of tuples containing the rows of the response (Table: public.users)
table = rs.describe_table("users", fetch_through_pandas=False, fail_silently=True)
# Do something with table variable
except Exception as e:
rs.rollback()
raise e
else:
rs.commit()
finally:
# remember to close the connection later
rs.close_connection()
# or
with RedShiftTool() as rs:
# Already connected, use rs object in this context
# Returns a Pandas dataframe with all schema info of that specific schema.table
# To do operations with dataframe, you'll need to import pandas library
df = rs.describe_table("airflow_logs", schema="another_schema")
# other code
```

### get_all_db_info(self, get_json_info=True, fetch_through_pandas=True, fail_silently=False)
Gets all Database info, using a INFORMATION_SCHEMA query.

Ignore table pg_stat_statements and tables inside schemas pg_catalog and information_schema.

If _get_json_info_ parameter is True, it adds 2 columns with the data types from each key inside json and jsonb columns.

_fetch_through_pandas_ and _fail_silently_ parameters are passed directly to the _query_ method if _get_json_info_ parameter is set to False; if it's not, these 2 parameters are passed as their default values.

Returns a DataFrame if either _get_json_info_ or _fetch_through_pandas_ parameters are set to True; otherwise returns a list of tuples, each representing a row, with their position in the same order as in the columns of the INFORMATION_SCHEMA.COLUMNS table.

Usage example:
```
from instackup.redshift_tools import RedShiftTool
rs = RedShiftTool()
rs.connect()
try:
# Returns a list of tuples containing the rows of the response
schema_info = rs.get_all_db_info(get_json_info=False, fetch_through_pandas=False, fail_silently=True)
# Do something with table variable
except Exception as e:
rs.rollback()
raise e
else:
rs.commit()
finally:
# remember to close the connection later
rs.close_connection()
# or
with RedShiftTool() as rs:
# Already connected, use rs object in this context
# Returns a Pandas dataframe with all schema info, including inside JSON and JSONB fields
# To do operations with dataframe, you'll need to import pandas library
df = rs.get_all_db_info()
# other code
```

### unload_to_S3(self, redshift_query, s3_path, filename, unload_options="MANIFEST GZIP ALLOWOVERWRITE REGION 'us-east-2'")
Executes an unload command in RedShift database to copy data to S3.

Expand Down
9 changes: 8 additions & 1 deletion instackup/gcloudstorage_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ def __init__(self, gs_path=None, bucket=None, subfolder="", filename=None, authe
else:
filename = None
bucket, subfolder = parse_remote_uri(gs_path, "gs")
else:
ending_slash = "/" if subfolder[-1:] != '/' and len(subfolder) > 0 else ""
subfolder += ending_slash

if authenticate:
# Getting credentials
Expand Down Expand Up @@ -79,9 +82,13 @@ def blob(self):

def set_bucket(self, bucket):
self.bucket_name = bucket
self.subfolder = "" # Resets subfolder
self.filename = None # Resets filename

def set_subfolder(self, subfolder):
self.subfolder = subfolder
ending_slash = "/" if subfolder[-1:] != '/' and len(subfolder) > 0 else ""
self.subfolder = subfolder + ending_slash
self.filename = None # Resets filename

def select_file(self, filename):
self.filename = filename
Expand Down
93 changes: 5 additions & 88 deletions instackup/redshift_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import logging
import boto3
import psycopg2
import pandas as pd
from .general_tools import fetch_credentials
from .sql_tools import PostgreSQLTool


# Logging Configuration
Expand All @@ -20,7 +20,7 @@
logger.addHandler(file_handler)


class RedShiftTool(object):
class RedShiftTool(PostgreSQLTool):
"""This class handle most of the interaction needed with RedShift,
so the base code becomes more readable and straightforward."""

Expand Down Expand Up @@ -62,6 +62,9 @@ def __init__(self, connect_by_cluster=True):
self.connection = None
self.cursor = None

# Not used, but making it compatible with inherited class
self.filename = None

def connect(self, fail_silently=False):
"""Create the connection using the __init__ attributes.
If fail_silently parameter is set to True, any errors will be surpressed and not stop the code execution."""
Expand Down Expand Up @@ -98,71 +101,6 @@ def connect(self, fail_silently=False):
else:
logger.error("ATENTION: Failing Silently")

def commit(self):
"""Commit any pending transaction to the database."""
self.connection.commit()
logger.info("Transaction commited.")

def rollback(self):
"""Roll back to the start of any pending transaction."""
self.connection.rollback()
logger.info("Roll back current transaction.")

def execute_sql(self, command, fail_silently=False):
"""Execute a SQL command (CREATE, UPDATE and DROP).
If fail_silently parameter is set to True, any errors will be surpressed and not stop the code execution."""

try:
self.cursor.execute(command)
logger.debug(f"Command Executed: {command}")

except psycopg2.Error as e:
logger.exception("Error running command!")

if not fail_silently:
raise e
else:
logger.error("ATENTION: Failing Silently")

def query(self, sql_query, fetch_through_pandas=True, fail_silently=False):
"""Run a query and return the results.
fetch_through_pandas parameter tells if the query should be parsed by psycopg2 cursor or pandas.
If fail_silently parameter is set to True, any errors will be surpressed and not stop the code execution."""

# Eliminating SQL table quotes that can't be handled by RedShift
sql_query = sql_query.replace("`", "")

if fetch_through_pandas:
try:
result = pd.read_sql_query(sql_query, self.connection)

except (psycopg2.Error, pd.io.sql.DatabaseError) as e:
logger.exception("Error running query!")
result = None

if not fail_silently:
raise e
else:
logger.error("ATENTION: Failing Silently")

else:
try:
self.cursor.execute(sql_query)
logger.debug(f"Query Executed: {sql_query}")

result = self.cursor.fetchall()

except psycopg2.Error as e:
logger.exception("Error running query!")
result = None

if not fail_silently:
raise e
else:
logger.error("ATENTION: Failing Silently")

return result

def unload_to_S3(self, redshift_query, s3_path, filename, unload_options="MANIFEST GZIP ALLOWOVERWRITE REGION 'us-east-2'"):
"""Executes an unload command in RedShift database to copy data to S3.
Takes the parameters redshift_query to grab the data, s3_path to set the location of copied data,
Expand All @@ -185,24 +123,3 @@ def unload_to_S3(self, redshift_query, s3_path, filename, unload_options="MANIFE

logger.debug("Unloading Query...")
self.execute_sql(unload_query)

def close_connection(self):
"""Closes Connection with RedShift database"""
self.connection.close()
logger.info("Connection closed.")

# __enter__ and __exit__ functions for with statement.
# With statement docs: https://docs.python.org/2.5/whatsnew/pep-343.html
def __enter__(self):
return self.connect()

def __exit__(self, type, value, traceback):
if traceback is None:
# No exception, so commit
self.commit()
else:
# Exception occurred, so rollback.
self.rollback()
# return False

self.close_connection()
18 changes: 8 additions & 10 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

setuptools.setup(
name="instackup",
version="0.1.1",
version="0.1.2",
author="Daniel Lavedonio de Lima",
author_email="[email protected]",
description="A package to ease interaction with cloud services, DB connections and commonly used functionalities in data analytics.",
Expand All @@ -29,18 +29,16 @@
url="https://github.com/Lavedonio/instackup",
packages=setuptools.find_packages(),
install_requires=[
'PyYAML==5.1',
'PyYAML>=5.1',
'boto3>=1.14.0',
'google-auth==1.12.0',
'google-auth-oauthlib==0.4.1',
'google-cloud-bigquery==1.24.0',
'google-cloud-bigquery-datatransfer==0.4.0',
'google-cloud-storage==1.28.1',
'grpcio==1.23.0',
'google-cloud-bigquery>=1.26.0',
'google-cloud-bigquery-datatransfer>=1.1.0',
'google-cloud-storage>=1.18.0',
'grpcio==1.31.0',
'gcsfs==0.6.2',
'gspread==3.6.0',
'pandas==0.25.3',
'psycopg2-binary==2.8.4',
'pandas==1.1.0',
'psycopg2-binary==2.8.5',
],
license="MIT",
classifiers=[
Expand Down
File renamed without changes.
15 changes: 15 additions & 0 deletions version_logs/v0.1.2-beta-current_release.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Version 0.1.2 (beta)

## Minor additions:
- RedShiftTool now inherits from PostgreSQLTool, so it gains 2 extra methods.
- In GCloudStorageTool \_\_init\_\_ (when setting by the subfolder parameter) and set_subfolder methods, now it adds a trailing slash if it's missing.

## Bug fixes:
- GCloudStorageTool.set_bucket method didn't reset the subfolder and filename attributes.
- GCloudStorageTool.set_subfolder method didn't reset the filename attribute.

## Removed features:
- Package is not compatible with Google Cloud Composer anymore.

## Other solved issues:
- Revised documentation.

0 comments on commit ec8d26d

Please sign in to comment.