Skip to content

Commit

Permalink
Initial Commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Saff committed Oct 22, 2020
0 parents commit ceded06
Show file tree
Hide file tree
Showing 17 changed files with 728 additions and 0 deletions.
7 changes: 7 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Copyright © 2020 Colgate-Palmolive

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Data Engineering Container Tools

This is a package containing tools for data engineering containers. Use
[Github-flavored Markdown](https://guides.github.com/features/mastering-markdown/)
to write more content.
4 changes: 4 additions & 0 deletions build/lib/dataEng-container-tools/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .safe_stdout import setup_stdout
from .cla import default_gcs_secret_location

setup_stdout(default_gcs_secret_location)
171 changes: 171 additions & 0 deletions build/lib/dataEng-container-tools/cla.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import argparse
import json
import sys
from enum import Enum
from .safe_stdout import setup_stdout

default_gcs_secret_location = '/vault/secrets/gcp-sa-storage.json'

class custom_command_line_argument:
"""Class for creating custom command line arguments.
Takes in all arguments needed for 'command_line_arguments'
to construct an additional custom argument."""
name = None
action = None
nargs = None
const = None
default = None
data_type = None
choices = None
required = None
help_message = None
metavar = None
dest = None
def __init__(self, name, action = None, nargs = None, const = None, default = None,
data_type = None, choices = None, required = None, help_message = None,
metavar = None, dest = None):
self.name = name
self.action = action
self.nargs = nargs
self.const = const
self.default = default
self.data_type = data_type
self.choices = choices
self.required = required
self.help_message = help_message
self.metavar = metavar
self.dest = dest

def __str__(self):
return ("name: " + self.name + ", " +
"action: " + self.action + ", " +
"nargs: " + self.nargs + ", " +
"const: " + self.const + ", " +
"default: " + self.default + ", " +
"data_type: " + self.data_type + ", " +
"choices: " + self.choices + ", " +
"required: " + self.required + ", " +
"help_message: " + self.help_message + ", " +
"metavar: " + self.metavar + ", " +
"dest: " + self.dest)

class command_line_argument_type(Enum):
"""Enum type for dictating whether fields in 'command_line_arguments' class are OPTIONAL or REQUIRED."""
OPTIONAL = False
REQUIRED = True

class command_line_arguments:
"""Simplified process for creating command line arguments. Allows for custom CLAs."""
__args = None
__input_files = None
__output_files = None
__secret_location = None
__default_file_type = None
__custom_inputs = None
__description = None
__input_dtypes = None
__default_secret_location = default_gcs_secret_location
def __init__(self, input_files = None, output_files = None, secret_location = None,
default_file_type = None, custom_inputs = None, description = None,
input_dtypes = None, parser = None):
self.__input_files = input_files
self.__output_files = output_files
self.__secret_location = secret_location
self.__default_file_type = default_file_type
self.__custom_inputs = custom_inputs
self.__description = description
self.__input_dtypes = input_dtypes
parser = parser if parser else argparse.ArgumentParser(description=description)
if input_files:
parser.add_argument("--input_bucket_names", type=str, required=input_files.value,
nargs = '+', help="GCS Buckets to read from.")

parser.add_argument("--input_paths", type=str, required=input_files.value,
nargs = '+', help="GCS folders in bucket to read file from.")

parser.add_argument("--input_filenames", type=str, required=input_files.value,
nargs = '+', help="Filenames to read file from.")
if input_dtypes:
parser.add_argument("--input_dtypes", type=json.loads, required=input_dtypes.value,
nargs ='+', help = "JSON dictionaries of (column: type) pairs to cast columns to")

if output_files:
parser.add_argument("--output_bucket_names", type=str, required=output_files.value,
nargs = '+', help="GCS Bucket to write to.")

parser.add_argument("--output_paths", type=str, required=output_files.value,
nargs = '+', help="GCS folder in bucket to write file to.")

parser.add_argument("--output_filenames", type=str, required=output_files.value,
nargs = '+', help="Filename to write file to.")
if secret_location:
parser.add_argument("--gcs_secret_location", type = str, required=secret_location.value,
default = self.__default_secret_location,
help = "Location of GCS secret injected by Vault. Default: '" + self.__default_secret_location + "'.")
if default_file_type:
parser.add_argument("--default_file_type", type = str,required=default_file_type.value,
choices = ["parquet", "csv", "pkl", "json"], default = "parquet",
help = "How to handle input/output files if no file extension found. Choice of 'parquet', 'csv', 'pkl', and 'json'. Default 'parquet'.")

if custom_inputs:
for item in custom_inputs:
parser.add_argument(name = "--"+item.name, action = item.action, nargs = item.nargs,
const = item.const, default = item.default, type = item.data_type,
choices = item.choices, required = item.required,
help = item.help_message, metavar = item.metavar,
dest = item.dest)
self.__args = parser.parse_args()
print("CLA Input:", self)
self.check_args()
if self.__secret_location and (self.__args.gcs_secret_location != self.__default_secret_location):
setup_stdout(self.__args.gcs_secret_location)

def __str__(self):
return self.__args.__str__()

def get_arguments(self):
return self.__args

def get_input_dtypes(self):
if not self.__input_dtypes:
return None
return self.__args.input_dtypes

def get_input_uris(self):
if not self.__input_files:
return []
constant_bucket = False
bucket_name = ''
output = []
if len(self.__args.input_bucket_names) == 1:
constant_bucket = True
bucket_name = self.__args.input_bucket_names[0]
for pos, filename in enumerate(self.__args.input_filenames):
if not constant_bucket:
bucket_name = self.__args.input_bucket_names[pos]
output.append("gs://"+bucket_name+"/"+self.__args.input_paths[pos]+"/"+filename)
return output

def get_output_uris(self):
if not self.__output_files:
return []
constant_bucket = False
bucket_name = ''
output = []
if len(self.__args.output_bucket_names) == 1:
constant_bucket = True
bucket_name = self.__args.output_bucket_names[0]
for pos, filename in enumerate(self.__args.output_filenames):
if not constant_bucket:
bucket_name = self.__args.output_bucket_names[pos]
output.append("gs://"+bucket_name+"/"+self.__args.output_paths[pos]+"/"+filename)
return output

def get_secret_location(self):
if not self.__secret_location:
return None
return self.__args.gcs_secret_location

def check_args(self):
#TODO: Implement this
return
120 changes: 120 additions & 0 deletions build/lib/dataEng-container-tools/gcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import json
import pandas as pd
import io
from google.cloud import storage
import pickle

class gcs_file_io:
gcs_client = None
gcs_secret_location = None

def __init__(self, gcs_secret_location):
self.gcs_secret_location = gcs_secret_location
with open(gcs_secret_location,'r') as f:
gcs_sa = json.load(f)
with open('gcs-sa.json', 'w') as json_file:
json.dump(gcs_sa, json_file)
self.gcs_client = storage.Client.from_service_account_json('gcs-sa.json')

def __get_parts(self, gcs_uri):
if gcs_uri.startswith('gs://'):
uri = gcs_uri[5:]
bucket = uri[:uri.find("/")]
file_path = uri[uri.find("/")+1:]
return bucket, file_path

def download_file_to_object(self, gcs_uri, default_file_type = None, dtype = None):
bucket_name, file_path = self.__get_parts(gcs_uri)
bucket = self.gcs_client.bucket(bucket_name)
binary_object = bucket.get_blob(file_path).download_as_string()
file_like_object = io.BytesIO(binary_object)
hasEnding = file_path.endswith('.parquet') or file_path.endswith('.csv') or file_path.endswith('.pkl')
if file_path.endswith('.parquet') or ((not hasEnding) and (default_file_type == 'parquet')):
return pd.read_parquet(file_like_object, dtype = dtype) if dtype else pd.read_parquet(file_like_object)
if file_path.endswith('.csv') or ((not hasEnding) and (default_file_type == 'csv')):
return pd.read_csv(file_like_object, dtype = dtype) if dtype else pd.read_csv(file_like_object)
if file_path.endswith('.pkl') or ((not hasEnding) and (default_file_type == 'pkl')):
return pd.read_pickle(file_like_object) if dtype else pd.read_pickle(file_like_object)#, dtype = dtype
if file_path.endswith('.json') or ((not hasEnding) and (default_file_type == 'json')):
return json.load(file_like_object)
return file_like_object

def download_files_to_objects(self, gcs_uris, default_file_type = None, dtypes = None):
dtypes_status = 0
if dtypes:
dtypes_status +=1
if len(dtypes)>1:
dtypes_status +=1
return_objects = []
for pos, gcs_uri in enumerate(gcs_uris):
if dtypes_status == 0:
return_objects.append(self.download_file_to_object(gcs_uri, default_file_type= default_file_type))
elif dtypes_status == 1:
return_objects.append(self.download_file_to_object(gcs_uri,
default_file_type= default_file_type,
dtype = dtypes[0]))
else:
return_objects.append(self.download_file_to_object(gcs_uri,
default_file_type= default_file_type,
dtype = dtypes[pos]))
return return_objects

def download_file_to_disk(self, gcs_uri, local_location = None):
bucket_name, file_path = self.__get_parts(gcs_uri)
print("Bucket:", bucket_name, "File:", file_path)
print(self.gcs_client)
bucket = self.gcs_client.bucket(bucket_name)
print("1")
local_location = local_location if local_location else file_path
print("2")
bucket.get_blob(file_path).download_to_filename(local_location)
print("3")
return local_location

def download_files_to_disk(self, gcs_uris, local_locations = None):
return_locations = []
for pos, gcs_uri in enumerate(gcs_uris):
return_locations.append(self.download_file_to_disk(gcs_uri = gcs_uri, local_location = local_locations[pos]))
return return_locations

def upload_file_from_disk(self, gcs_uri, local_location):
bucket_name, file_path = self.__get_parts(gcs_uri)
bucket = self.gcs_client.bucket(bucket_name)
blob = bucket.blob(file_path)
return blob.upload_from_filename(local_location)

def upload_files_from_disk(self, gcs_uris, local_locations):
return_objects = []
for pos, gcs_uri in enumerate(gcs_uris):
return_objects.append(self.upload_file_from_disk(gcs_uri, local_locations[pos]))
return return_objects

def upload_file_from_object(self, gcs_uri, object_to_upload, default_file_type = None):
bucket_name, file_path = self.__get_parts(gcs_uri)
bucket = self.gcs_client.bucket(bucket_name)
blob = bucket.blob(file_path)
hasEnding = file_path.endswith('.parquet') or file_path.endswith('.csv') or file_path.endswith('.pkl')
if file_path.endswith('.parquet') or ((not hasEnding) and (default_file_type == 'parquet')):
fileObject = io.BytesIO()
object_to_upload.to_parquet(fileObject)
fileObject.seek(0)
return blob.upload_from_file(fileObject)
if file_path.endswith('.csv') or ((not hasEnding) and (default_file_type == 'csv')):
csv_string = object_to_upload.to_csv(encoding='utf-8')
return blob.upload_from_string(csv_string)
if file_path.endswith('.pkl') or ((not hasEnding) and (default_file_type == 'pkl')):
fileObject = io.BytesIO(pickle.dumps(object_to_upload))
fileObject.seek(0)
return blob.upload_from_file(fileObject)
if file_path.endswith('.json') or ((not hasEnding) and (default_file_type == 'json')):
json_string = json.dumps(fileObject)
return blob.upload_from_string(json_string)
fileObject = io.BytesIO(pickle.dumps(object_to_upload))
fileObject.seek(0)
return blob.upload_from_file(fileObject)

def upload_files_from_objects(self, gcs_uris, objects_to_upload, default_file_type = None):
return_objects = []
for pos, gcs_uri in enumerate(gcs_uris):
return_objects.append(self.upload_file_from_object(gcs_uri, objects_to_upload[pos], default_file_type = default_file_type))
return return_objects
36 changes: 36 additions & 0 deletions build/lib/dataEng-container-tools/safe_stdout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import sys
import json

class safe_stdout:
def __init__(self, bad_words):
self.__bad_words = bad_words
self.__bad_word_lengths = [len(bad_word) for bad_word in bad_words]
self.__old_stdout = sys.stdout

def write(self, message):
for location, bad_word in enumerate(self.__bad_words):
bad_word_length = self.__bad_word_lengths[location]
bad_word_location = message.find(bad_word)
while(bad_word_location != -1):
message = (message[0:bad_word_location] + '*'*bad_word_length +
message[bad_word_location + bad_word_length:])
bad_word_location = message.find(bad_word)
self.__old_stdout.write(message)

def flush(self):
pass

def setup_stdout(secret_location):
secret = {}
try:
secret = json.load(open(secret_location,'r'))
except OSError:
print("No secret file found")
return
bad_words = list(secret.values())
dumped_words = []
for word in bad_words:
dumped_words.append(json.dumps(word))
bad_words += dumped_words
#print("\nBad words:", bad_words)
sys.stdout = safe_stdout(bad_words)
4 changes: 4 additions & 0 deletions dataEng-container-tools/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .safe_stdout import setup_stdout
from .cla import default_gcs_secret_location

setup_stdout(default_gcs_secret_location)
Loading

0 comments on commit ceded06

Please sign in to comment.