Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Linting, fixing broken code #72

Merged
merged 3 commits into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions .github/workflows/pythonapp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install -r yaetos/scripts/requirements_alt.txt
# - name: Lint with flake8
# run: |
# pip install flake8
# # stop the build if there are Python syntax errors or undefined names
# flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics # TODO: uncomment when fixed
# # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
# flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics # TODO: uncomment when fixed
- name: Lint with flake8
run: |
pip install flake8
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --ignore=E501,C901,E402,W605 --max-complexity=10 --max-line-length=127 --statistics # TODO: remove --exit-zero to enforce, and check to remove --ignore codes.
- name: Test with pytest
run: |
pip install pytest
pytest --ignore=yaetos/scripts/
# TODO: change pytest cmdline above to "pytest tests/ --extraargs?" and find if extraargs exists that gets test running from work dir (i.e. not changing to 'tests/')
# TODO: change pytest cmdline above to "pytest tests/ --extraargs?" and find if extraargs exists that gets test running from work dir (i.e. not changing to 'tests/')
2 changes: 1 addition & 1 deletion yaetos/db_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def cast_value(value, required_type, field_name):
elif isinstance(required_type, type(db_types.INT())):
return None if pd.isnull(value) else int(float(value))
elif isinstance(required_type, type(db_types.BIGINT())):
return None if pd.isnull(value) else long(value)
return None if pd.isnull(value) else int(value)
elif isinstance(required_type, type(db_types.FLOAT())):
return None if pd.isnull(value) else float(value)
else:
Expand Down
3 changes: 2 additions & 1 deletion yaetos/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def run_direct(self):
if new_cluster and not self.deploy_args.get('leave_on') and self.app_args.get('clean_post_run'): # TODO: add clean_post_run in input options.
logger.info("New cluster setup to be deleted after job finishes.")
self.describe_status_until_terminated(c)
s3 = self.session.resource('s3')
self.remove_temp_files(s3) # TODO: remove tmp files for existing clusters too but only tmp files for the job

def s3_ops(self, session):
Expand Down Expand Up @@ -665,7 +666,7 @@ def deploy_all_scheduled():
### TODO: also need to remove "dependency" run for the ones with no dependencies.
def get_yml(args):
meta_file = args.get('job_param_file', 'repo')
if meta_file is 'repo':
if meta_file == 'repo':
meta_file = eu.CLUSTER_APP_FOLDER+eu.JOBS_METADATA_FILE if args['storage']=='s3' else eu.JOBS_METADATA_LOCAL_FILE
yml = eu.Job_Args_Parser.load_meta(meta_file)
logger.info('Loaded job param file: ' + meta_file)
Expand Down
99 changes: 51 additions & 48 deletions yaetos/env_dispatchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,49 +48,52 @@ def save_metadata_cluster(fname, content):
s3c.put_object(Bucket=bucket_name, Key=bucket_fname, Body=fake_handle.read())
logger.info("Created file S3: {}".format(fname))

# --- save_file set of functions ----
def save_file(self, fname, content):
self.save_file_cluster(fname, content) if self.is_s3_path(fname) else self.save_file_local(fname, content)

@staticmethod
def save_file_local(fname, content):
folder = os.path.dirname(fname)
if not os.path.exists(folder):
os.makedirs(folder)
joblib.dump(content, fname)
logger.info("Saved content to new file locally: {}".format(fname))

def save_file_cluster(self, fname, content):
fname_parts = fname.split('s3://')[1].split('/')
bucket_name = fname_parts[0]
bucket_fname = '/'.join(fname_parts[1:])
s3c = boto3.Session(profile_name='default').client('s3')

local_path = CLUSTER_APP_FOLDER+'tmp/local_'+fname_parts[-1]
self.save_file_local(local_path, content)
fh = open(local_path, 'rb')
s3c.put_object(Bucket=bucket_name, Key=bucket_fname, Body=fh)
logger.info("Pushed local file to S3, from '{}' to '{}' ".format(local_path, fname))

# --- load_file set of functions ----
def load_file(self, fname):
return self.load_file_cluster(fname) if self.is_s3_path(fname) else self.load_file_local(fname)

@staticmethod
def load_file_local(fname):
return joblib.load(fname)

@staticmethod
def load_file_cluster(fname):
fname_parts = fname.split('s3://')[1].split('/')
bucket_name = fname_parts[0]
bucket_fname = '/'.join(fname_parts[1:])
local_path = CLUSTER_APP_FOLDER+'tmp/s3_'+fname_parts[-1]
s3c = boto3.Session(profile_name='default').client('s3')
s3c.download_file(bucket_name, bucket_fname, local_path)
logger.info("Copied file from S3 '{}' to local '{}'".format(fname, local_path))
model = joblib.load(local_path)
return model
## --- save_file set of functions ----
## Disabled until joblib enabled. Will be useful for ML use case.
# def save_file(self, fname, content):
# self.save_file_cluster(fname, content) if self.is_s3_path(fname) else self.save_file_local(fname, content)
#
# @staticmethod
# def save_file_local(fname, content):
# folder = os.path.dirname(fname)
# if not os.path.exists(folder):
# os.makedirs(folder)
# joblib.dump(content, fname)
# logger.info("Saved content to new file locally: {}".format(fname))
#
# def save_file_cluster(self, fname, content):
# fname_parts = fname.split('s3://')[1].split('/')
# bucket_name = fname_parts[0]
# bucket_fname = '/'.join(fname_parts[1:])
# s3c = boto3.Session(profile_name='default').client('s3')
#
# # local_path = CLUSTER_APP_FOLDER+'tmp/local_'+fname_parts[-1]
# local_path = 'tmp/local_'+fname_parts[-1]
# self.save_file_local(local_path, content)
# fh = open(local_path, 'rb')
# s3c.put_object(Bucket=bucket_name, Key=bucket_fname, Body=fh)
# logger.info("Pushed local file to S3, from '{}' to '{}' ".format(local_path, fname))
#
# # --- load_file set of functions ----
# def load_file(self, fname):
# return self.load_file_cluster(fname) if self.is_s3_path(fname) else self.load_file_local(fname)
#
# @staticmethod
# def load_file_local(fname):
# return joblib.load(fname)
#
# @staticmethod
# def load_file_cluster(fname):
# fname_parts = fname.split('s3://')[1].split('/')
# bucket_name = fname_parts[0]
# bucket_fname = '/'.join(fname_parts[1:])
# # local_path = CLUSTER_APP_FOLDER+'tmp/s3_'+fname_parts[-1]
# local_path = 'tmp/s3_'+fname_parts[-1]
# s3c = boto3.Session(profile_name='default').client('s3')
# s3c.download_file(bucket_name, bucket_fname, local_path)
# logger.info("Copied file from S3 '{}' to local '{}'".format(fname, local_path))
# model = joblib.load(local_path)
# return model

# --- listdir set of functions ----
def listdir(self, path):
Expand Down Expand Up @@ -175,16 +178,16 @@ def save_pandas_cluster(self, df, fname, save_method, save_kwargs):


class Cred_Ops_Dispatcher():
def retrieve_secrets(self, storage, creds='conf/connections.cfg'):
creds = self.retrieve_secrets_cluster() if storage=='s3' else self.retrieve_secrets_local(creds)
def retrieve_secrets(self, storage, aws_creds='/yaetos/connections', local_creds='conf/connections.cfg'):
creds = self.retrieve_secrets_cluster(aws_creds) if storage=='s3' else self.retrieve_secrets_local(local_creds)
return creds

@staticmethod
def retrieve_secrets_cluster():
def retrieve_secrets_cluster(creds):
client = boto3.Session(profile_name='default').client('secretsmanager')

response = client.get_secret_value(SecretId=AWS_SECRET_ID)
logger.info('Read aws secret, secret_id:'+AWS_SECRET_ID)
response = client.get_secret_value(SecretId=creds)
logger.info('Read aws secret, secret_id:'+creds)
logger.debug('get_secret_value response: '+str(response))
content = response['SecretString']

Expand Down
12 changes: 6 additions & 6 deletions yaetos/etl_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ def load_data_from_files(self, name, path, type, sc, sc_sql):
return sdf

def load_mysql(self, input_name):
creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, creds=self.jargs.connection_file)
creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, aws_creds=AWS_SECRET_ID, local_creds=self.jargs.connection_file)
creds_section = self.jargs.inputs[input_name]['creds']
db = creds[creds_section]
extra_params = '' # can use '?zeroDateTimeBehavior=CONVERT_TO_NULL' to help solve "java.sql.SQLException: Zero date value prohibited" but leads to other error msg.
Expand Down Expand Up @@ -454,7 +454,7 @@ def load_mysql(self, input_name):
return sdf

def load_clickhouse(self, input_name):
creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, creds=self.jargs.connection_file)
creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, aws_creds=AWS_SECRET_ID, local_creds=self.jargs.connection_file)
creds_section = self.jargs.inputs[input_name]['creds']
db = creds[creds_section]
url = 'jdbc:postgresql://{host}/{service}'.format(host=db['host'], service=db['service'])
Expand Down Expand Up @@ -585,7 +585,7 @@ def copy_to_redshift_using_pandas(self, output, types):
connection_profile = self.jargs.copy_to_redshift['creds']
schema, name_tb = self.jargs.copy_to_redshift['table'].split('.')
schema = schema.format(schema=self.jargs.schema) if '{schema}' in schema else schema
creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, creds=self.jargs.connection_file)
creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, aws_creds=AWS_SECRET_ID, local_creds=self.jargs.connection_file)
create_table(df, connection_profile, name_tb, schema, types, creds, self.jargs.is_incremental)
del(df)

Expand All @@ -595,7 +595,7 @@ def copy_to_redshift_using_spark(self, sdf):
connection_profile = self.jargs.copy_to_redshift['creds']
schema, name_tb= self.jargs.copy_to_redshift['table'].split('.')
schema = schema.format(schema=self.jargs.schema) if '{schema}' in schema else schema
creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, creds=self.jargs.connection_file)
creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, aws_creds=AWS_SECRET_ID, local_creds=self.jargs.connection_file)
create_table(sdf, connection_profile, name_tb, schema, creds, self.jargs.is_incremental, self.jargs.redshift_s3_tmp_dir, self.jargs.merged_args.get('spark_version', '2.4'))

def copy_to_clickhouse(self, sdf):
Expand All @@ -604,7 +604,7 @@ def copy_to_clickhouse(self, sdf):
connection_profile = self.jargs.copy_to_clickhouse['creds']
schema, name_tb= self.jargs.copy_to_clickhouse['table'].split('.')
schema = schema.format(schema=self.jargs.schema) if '{schema}' in schema else schema
creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, creds=self.jargs.connection_file)
creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, aws_creds=AWS_SECRET_ID, local_creds=self.jargs.connection_file)
create_table(sdf, connection_profile, name_tb, schema, creds, self.jargs.is_incremental)

def push_to_kafka(self, output, types):
Expand All @@ -620,7 +620,7 @@ def send_msg(self, msg, recipients=None):
logger.error("Email can't be sent since no recipient set in {}, .\nMessage : \n{}".format(self.jargs.job_param_file, msg))
return None

creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, creds=self.jargs.connection_file)
creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, aws_creds=AWS_SECRET_ID, local_creds=self.jargs.connection_file)
creds_section = self.jargs.email_cred_section

sender_email = creds.get(creds_section, 'sender_email')
Expand Down
2 changes: 1 addition & 1 deletion yaetos/kafka_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def validate(self, message):
try:
jsonschema.validate(instance=message, schema=self.__schema)
return True
except(jsonschema.exceptions.ValidationError, e):
except jsonschema.exceptions.ValidationError as e:
logger.error(u"Validation error pre kafka send: {}.".format(str(e)))
return False

Expand Down
2 changes: 1 addition & 1 deletion yaetos/mysql_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ def query_mysql(self, query_str):


if __name__ == "__main__":
Commandliner(Job)
Commandliner(Mysql_Job)
4 changes: 1 addition & 3 deletions yaetos/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@
logger = setup_logging('Oracle')


def create_table(df, connection_profile, name_tb, types, creds_or_file, is_incremental):
def create_table(df, connection_profile, name_tb, schema, types, creds_or_file, is_incremental):
"""
Creates table in oracle, full drop or incremental drop.
types should be of sqlalchemy type. Ex: types.Date(), types.Integer()
"""
user = creds.get(connection_profile, 'user')
assert schema == user
if_exist = 'replace' if not is_incremental else 'append'
connection = connect(db=connection_profile, connection_type='sqlalchemy', creds_or_file=creds_or_file)
chunksize = 500000
Expand Down