From 40c930d44c0f5a22749df031201194c5946a28c4 Mon Sep 17 00:00:00 2001 From: Arthur Prevot Date: Mon, 30 May 2022 13:59:55 +0200 Subject: [PATCH 1/3] enable flake8 --- .github/workflows/pythonapp.yml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/.github/workflows/pythonapp.yml b/.github/workflows/pythonapp.yml index 14145516..265bbdd2 100644 --- a/.github/workflows/pythonapp.yml +++ b/.github/workflows/pythonapp.yml @@ -31,15 +31,15 @@ jobs: run: | python -m pip install --upgrade pip pip install -r yaetos/scripts/requirements_alt.txt - # - name: Lint with flake8 - # run: | - # pip install flake8 - # # stop the build if there are Python syntax errors or undefined names - # flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics # TODO: uncomment when fixed - # # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide - # flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics # TODO: uncomment when fixed + - name: Lint with flake8 + run: | + pip install flake8 + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics # TODO: uncomment when fixed + # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide + flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics # TODO: uncomment when fixed - name: Test with pytest run: | pip install pytest pytest --ignore=yaetos/scripts/ - # TODO: change pytest cmdline above to "pytest tests/ --extraargs?" and find if extraargs exists that gets test running from work dir (i.e. not changing to 'tests/') + # TODO: change pytest cmdline above to "pytest tests/ --extraargs?" and find if extraargs exists that gets test running from work dir (i.e. not changing to 'tests/') From 7fa691a7028a48c7d65002e5045651e15ff83155 Mon Sep 17 00:00:00 2001 From: Arthur Prevot Date: Mon, 30 May 2022 20:51:36 +0200 Subject: [PATCH 2/3] cleaning all errors coming from "flake8 yaetos --count --select=E9,F63,F7,F82 --show-source --statistics" --- yaetos/db_utils.py | 2 +- yaetos/deploy.py | 3 +- yaetos/env_dispatchers.py | 99 ++++++++++++++++++++------------------- yaetos/etl_utils.py | 12 ++--- yaetos/kafka_utils.py | 2 +- yaetos/mysql_job.py | 2 +- yaetos/oracle.py | 4 +- 7 files changed, 63 insertions(+), 61 deletions(-) diff --git a/yaetos/db_utils.py b/yaetos/db_utils.py index fc4332f4..8636d3a8 100644 --- a/yaetos/db_utils.py +++ b/yaetos/db_utils.py @@ -43,7 +43,7 @@ def cast_value(value, required_type, field_name): elif isinstance(required_type, type(db_types.INT())): return None if pd.isnull(value) else int(float(value)) elif isinstance(required_type, type(db_types.BIGINT())): - return None if pd.isnull(value) else long(value) + return None if pd.isnull(value) else int(value) elif isinstance(required_type, type(db_types.FLOAT())): return None if pd.isnull(value) else float(value) else: diff --git a/yaetos/deploy.py b/yaetos/deploy.py index 43fe24db..b8adf5c1 100644 --- a/yaetos/deploy.py +++ b/yaetos/deploy.py @@ -159,6 +159,7 @@ def run_direct(self): if new_cluster and not self.deploy_args.get('leave_on') and self.app_args.get('clean_post_run'): # TODO: add clean_post_run in input options. logger.info("New cluster setup to be deleted after job finishes.") self.describe_status_until_terminated(c) + s3 = self.session.resource('s3') self.remove_temp_files(s3) # TODO: remove tmp files for existing clusters too but only tmp files for the job def s3_ops(self, session): @@ -665,7 +666,7 @@ def deploy_all_scheduled(): ### TODO: also need to remove "dependency" run for the ones with no dependencies. def get_yml(args): meta_file = args.get('job_param_file', 'repo') - if meta_file is 'repo': + if meta_file == 'repo': meta_file = eu.CLUSTER_APP_FOLDER+eu.JOBS_METADATA_FILE if args['storage']=='s3' else eu.JOBS_METADATA_LOCAL_FILE yml = eu.Job_Args_Parser.load_meta(meta_file) logger.info('Loaded job param file: ' + meta_file) diff --git a/yaetos/env_dispatchers.py b/yaetos/env_dispatchers.py index 18811076..d097b18d 100644 --- a/yaetos/env_dispatchers.py +++ b/yaetos/env_dispatchers.py @@ -48,49 +48,52 @@ def save_metadata_cluster(fname, content): s3c.put_object(Bucket=bucket_name, Key=bucket_fname, Body=fake_handle.read()) logger.info("Created file S3: {}".format(fname)) - # --- save_file set of functions ---- - def save_file(self, fname, content): - self.save_file_cluster(fname, content) if self.is_s3_path(fname) else self.save_file_local(fname, content) - - @staticmethod - def save_file_local(fname, content): - folder = os.path.dirname(fname) - if not os.path.exists(folder): - os.makedirs(folder) - joblib.dump(content, fname) - logger.info("Saved content to new file locally: {}".format(fname)) - - def save_file_cluster(self, fname, content): - fname_parts = fname.split('s3://')[1].split('/') - bucket_name = fname_parts[0] - bucket_fname = '/'.join(fname_parts[1:]) - s3c = boto3.Session(profile_name='default').client('s3') - - local_path = CLUSTER_APP_FOLDER+'tmp/local_'+fname_parts[-1] - self.save_file_local(local_path, content) - fh = open(local_path, 'rb') - s3c.put_object(Bucket=bucket_name, Key=bucket_fname, Body=fh) - logger.info("Pushed local file to S3, from '{}' to '{}' ".format(local_path, fname)) - - # --- load_file set of functions ---- - def load_file(self, fname): - return self.load_file_cluster(fname) if self.is_s3_path(fname) else self.load_file_local(fname) - - @staticmethod - def load_file_local(fname): - return joblib.load(fname) - - @staticmethod - def load_file_cluster(fname): - fname_parts = fname.split('s3://')[1].split('/') - bucket_name = fname_parts[0] - bucket_fname = '/'.join(fname_parts[1:]) - local_path = CLUSTER_APP_FOLDER+'tmp/s3_'+fname_parts[-1] - s3c = boto3.Session(profile_name='default').client('s3') - s3c.download_file(bucket_name, bucket_fname, local_path) - logger.info("Copied file from S3 '{}' to local '{}'".format(fname, local_path)) - model = joblib.load(local_path) - return model + ## --- save_file set of functions ---- + ## Disabled until joblib enabled. Will be useful for ML use case. + # def save_file(self, fname, content): + # self.save_file_cluster(fname, content) if self.is_s3_path(fname) else self.save_file_local(fname, content) + # + # @staticmethod + # def save_file_local(fname, content): + # folder = os.path.dirname(fname) + # if not os.path.exists(folder): + # os.makedirs(folder) + # joblib.dump(content, fname) + # logger.info("Saved content to new file locally: {}".format(fname)) + # + # def save_file_cluster(self, fname, content): + # fname_parts = fname.split('s3://')[1].split('/') + # bucket_name = fname_parts[0] + # bucket_fname = '/'.join(fname_parts[1:]) + # s3c = boto3.Session(profile_name='default').client('s3') + # + # # local_path = CLUSTER_APP_FOLDER+'tmp/local_'+fname_parts[-1] + # local_path = 'tmp/local_'+fname_parts[-1] + # self.save_file_local(local_path, content) + # fh = open(local_path, 'rb') + # s3c.put_object(Bucket=bucket_name, Key=bucket_fname, Body=fh) + # logger.info("Pushed local file to S3, from '{}' to '{}' ".format(local_path, fname)) + # + # # --- load_file set of functions ---- + # def load_file(self, fname): + # return self.load_file_cluster(fname) if self.is_s3_path(fname) else self.load_file_local(fname) + # + # @staticmethod + # def load_file_local(fname): + # return joblib.load(fname) + # + # @staticmethod + # def load_file_cluster(fname): + # fname_parts = fname.split('s3://')[1].split('/') + # bucket_name = fname_parts[0] + # bucket_fname = '/'.join(fname_parts[1:]) + # # local_path = CLUSTER_APP_FOLDER+'tmp/s3_'+fname_parts[-1] + # local_path = 'tmp/s3_'+fname_parts[-1] + # s3c = boto3.Session(profile_name='default').client('s3') + # s3c.download_file(bucket_name, bucket_fname, local_path) + # logger.info("Copied file from S3 '{}' to local '{}'".format(fname, local_path)) + # model = joblib.load(local_path) + # return model # --- listdir set of functions ---- def listdir(self, path): @@ -175,16 +178,16 @@ def save_pandas_cluster(self, df, fname, save_method, save_kwargs): class Cred_Ops_Dispatcher(): - def retrieve_secrets(self, storage, creds='conf/connections.cfg'): - creds = self.retrieve_secrets_cluster() if storage=='s3' else self.retrieve_secrets_local(creds) + def retrieve_secrets(self, storage, aws_creds='/yaetos/connections', local_creds='conf/connections.cfg'): + creds = self.retrieve_secrets_cluster(aws_creds) if storage=='s3' else self.retrieve_secrets_local(local_creds) return creds @staticmethod - def retrieve_secrets_cluster(): + def retrieve_secrets_cluster(creds): client = boto3.Session(profile_name='default').client('secretsmanager') - response = client.get_secret_value(SecretId=AWS_SECRET_ID) - logger.info('Read aws secret, secret_id:'+AWS_SECRET_ID) + response = client.get_secret_value(SecretId=creds) + logger.info('Read aws secret, secret_id:'+creds) logger.debug('get_secret_value response: '+str(response)) content = response['SecretString'] diff --git a/yaetos/etl_utils.py b/yaetos/etl_utils.py index aa81fb6b..bded18cf 100644 --- a/yaetos/etl_utils.py +++ b/yaetos/etl_utils.py @@ -412,7 +412,7 @@ def load_data_from_files(self, name, path, type, sc, sc_sql): return sdf def load_mysql(self, input_name): - creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, creds=self.jargs.connection_file) + creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, aws_creds=AWS_SECRET_ID, local_creds=self.jargs.connection_file) creds_section = self.jargs.inputs[input_name]['creds'] db = creds[creds_section] extra_params = '' # can use '?zeroDateTimeBehavior=CONVERT_TO_NULL' to help solve "java.sql.SQLException: Zero date value prohibited" but leads to other error msg. @@ -454,7 +454,7 @@ def load_mysql(self, input_name): return sdf def load_clickhouse(self, input_name): - creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, creds=self.jargs.connection_file) + creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, aws_creds=AWS_SECRET_ID, local_creds=self.jargs.connection_file) creds_section = self.jargs.inputs[input_name]['creds'] db = creds[creds_section] url = 'jdbc:postgresql://{host}/{service}'.format(host=db['host'], service=db['service']) @@ -585,7 +585,7 @@ def copy_to_redshift_using_pandas(self, output, types): connection_profile = self.jargs.copy_to_redshift['creds'] schema, name_tb = self.jargs.copy_to_redshift['table'].split('.') schema = schema.format(schema=self.jargs.schema) if '{schema}' in schema else schema - creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, creds=self.jargs.connection_file) + creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, aws_creds=AWS_SECRET_ID, local_creds=self.jargs.connection_file) create_table(df, connection_profile, name_tb, schema, types, creds, self.jargs.is_incremental) del(df) @@ -595,7 +595,7 @@ def copy_to_redshift_using_spark(self, sdf): connection_profile = self.jargs.copy_to_redshift['creds'] schema, name_tb= self.jargs.copy_to_redshift['table'].split('.') schema = schema.format(schema=self.jargs.schema) if '{schema}' in schema else schema - creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, creds=self.jargs.connection_file) + creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, aws_creds=AWS_SECRET_ID, local_creds=self.jargs.connection_file) create_table(sdf, connection_profile, name_tb, schema, creds, self.jargs.is_incremental, self.jargs.redshift_s3_tmp_dir, self.jargs.merged_args.get('spark_version', '2.4')) def copy_to_clickhouse(self, sdf): @@ -604,7 +604,7 @@ def copy_to_clickhouse(self, sdf): connection_profile = self.jargs.copy_to_clickhouse['creds'] schema, name_tb= self.jargs.copy_to_clickhouse['table'].split('.') schema = schema.format(schema=self.jargs.schema) if '{schema}' in schema else schema - creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, creds=self.jargs.connection_file) + creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, aws_creds=AWS_SECRET_ID, local_creds=self.jargs.connection_file) create_table(sdf, connection_profile, name_tb, schema, creds, self.jargs.is_incremental) def push_to_kafka(self, output, types): @@ -620,7 +620,7 @@ def send_msg(self, msg, recipients=None): logger.error("Email can't be sent since no recipient set in {}, .\nMessage : \n{}".format(self.jargs.job_param_file, msg)) return None - creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, creds=self.jargs.connection_file) + creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, aws_creds=AWS_SECRET_ID, local_creds=self.jargs.connection_file) creds_section = self.jargs.email_cred_section sender_email = creds.get(creds_section, 'sender_email') diff --git a/yaetos/kafka_utils.py b/yaetos/kafka_utils.py index a7e1c162..2e30a92d 100644 --- a/yaetos/kafka_utils.py +++ b/yaetos/kafka_utils.py @@ -55,7 +55,7 @@ def validate(self, message): try: jsonschema.validate(instance=message, schema=self.__schema) return True - except(jsonschema.exceptions.ValidationError, e): + except jsonschema.exceptions.ValidationError as e: logger.error(u"Validation error pre kafka send: {}.".format(str(e))) return False diff --git a/yaetos/mysql_job.py b/yaetos/mysql_job.py index 6b33cdf3..4ba89cbe 100644 --- a/yaetos/mysql_job.py +++ b/yaetos/mysql_job.py @@ -20,4 +20,4 @@ def query_mysql(self, query_str): if __name__ == "__main__": - Commandliner(Job) + Commandliner(Mysql_Job) diff --git a/yaetos/oracle.py b/yaetos/oracle.py index bc1ed9fa..2040b467 100644 --- a/yaetos/oracle.py +++ b/yaetos/oracle.py @@ -6,13 +6,11 @@ logger = setup_logging('Oracle') -def create_table(df, connection_profile, name_tb, types, creds_or_file, is_incremental): +def create_table(df, connection_profile, name_tb, schema, types, creds_or_file, is_incremental): """ Creates table in oracle, full drop or incremental drop. types should be of sqlalchemy type. Ex: types.Date(), types.Integer() """ - user = creds.get(connection_profile, 'user') - assert schema == user if_exist = 'replace' if not is_incremental else 'append' connection = connect(db=connection_profile, connection_type='sqlalchemy', creds_or_file=creds_or_file) chunksize = 500000 From f238d328cb387808403bd40af7532c55e5ccf31e Mon Sep 17 00:00:00 2001 From: Arthur Prevot Date: Mon, 30 May 2022 21:28:57 +0200 Subject: [PATCH 3/3] add error codes not to check to avoid changing code too much --- .github/workflows/pythonapp.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pythonapp.yml b/.github/workflows/pythonapp.yml index 265bbdd2..8b0c3c7d 100644 --- a/.github/workflows/pythonapp.yml +++ b/.github/workflows/pythonapp.yml @@ -35,9 +35,9 @@ jobs: run: | pip install flake8 # stop the build if there are Python syntax errors or undefined names - flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics # TODO: uncomment when fixed + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide - flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics # TODO: uncomment when fixed + flake8 . --count --exit-zero --ignore=E501,C901,E402,W605 --max-complexity=10 --max-line-length=127 --statistics # TODO: remove --exit-zero to enforce, and check to remove --ignore codes. - name: Test with pytest run: | pip install pytest