diff --git a/conf/aws_config.cfg.example b/conf/aws_config.cfg.example index 2da7b8fe..9257cf68 100644 --- a/conf/aws_config.cfg.example +++ b/conf/aws_config.cfg.example @@ -4,7 +4,9 @@ ec2_key_name : to_be_filled user : to_be_filled s3_region : to_be_filled ec2_subnet_id : to_be_filled -extra_security_gp : None +extra_security_gp : to_be_filled # optional +emr_ec2_role : to_be_filled # optional +emr_role : to_be_filled # optional [prod] profile_name : to_be_filled @@ -12,4 +14,6 @@ ec2_key_name : to_be_filled user : to_be_filled s3_region : to_be_filled ec2_subnet_id : to_be_filled -extra_security_gp : None +extra_security_gp : to_be_filled # optional +emr_ec2_role : to_be_filled # optional +emr_role : to_be_filled # optional diff --git a/conf/jobs_metadata.yml b/conf/jobs_metadata.yml index 943cf0f6..d02d1866 100644 --- a/conf/jobs_metadata.yml +++ b/conf/jobs_metadata.yml @@ -336,6 +336,32 @@ jobs: output: {'path':'{base_path}/load_example/glob_filtering/{now}/dataset.csv', 'type':'csv', 'df_type':'pandas'} spark_boot: False + examples/ex17_multimode_params_job: + description: | + Showing how to use multiple, useful for single tenant setup, where base_path need to change for various. see --mode=dev_local,your_extra_tenant below. + usage: python jobs/generic/launcher.py --job_name=examples/ex17_multimode_params_job --mode=dev_local,your_extra_tenant + py_job: jobs/examples/ex7_pandas_job.py + inputs: + some_events: {'path':"{base_path}/wiki_example/input/{latest}/", 'type':'csv', 'df_type':'pandas'} + other_events: {'path':"{base_path}/wiki_example/input/{latest}/", 'type':'csv', 'df_type':'pandas', 'read_kwargs':{}} + output: {'path':'{base_path}/wiki_example/output_ex7_pandas/{now}/dataset.csv', 'type':'csv', 'df_type':'pandas', 'save_kwargs':{'sep':'|'}} + spark_boot: False + + examples/ex18_base_path_in_out_job: + description: | + Showing use of name_base_in_param and name_base_out_param, to set base_path different in input and output. + usage: python jobs/generic/launcher.py --job_name=examples/ex18_base_path_in_out_job #--mode=dev_local,your_extra_tenant + py_job: jobs/examples/ex7_pandas_job.py + inputs: + some_events: {'path':"{base_path_in}/wiki_example/input/{latest}/", 'type':'csv', 'df_type':'pandas'} + other_events: {'path':"{base_path_in}/wiki_example/input/{latest}/", 'type':'csv', 'df_type':'pandas', 'read_kwargs':{}} + output: {'path':'{base_path_out}/wiki_example/output_ex7_pandas/{now}/dataset.csv', 'type':'csv', 'df_type':'pandas', 'save_kwargs':{'sep':'|'}} + name_base_in_param: base_path_in + name_base_out_param: base_path_out + base_path_in: './data' + base_path_out: './data_other' + spark_boot: False + # wordcount_raw_job: #Job exists but doesn't rely on jobs_metadata entries # ----- Marketing Jobs -------- @@ -448,3 +474,6 @@ common_params: enable_db_push: False save_schemas: True manage_git_info: False + your_extra_tenant: # useful for single tenant architecture, with data in separate locations + save_schemas: False + other_param: 'some_value' diff --git a/docs/conf.py b/docs/conf.py index 585651f8..24108b2b 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -7,7 +7,7 @@ author = 'Arthur Prevot' release = '0.11' -version = '0.11.7' +version = '0.11.8' # -- General configuration diff --git a/jobs/generic/copy_raw_job.py b/jobs/generic/copy_raw_job.py index 0e08a53a..6e42f6e3 100644 --- a/jobs/generic/copy_raw_job.py +++ b/jobs/generic/copy_raw_job.py @@ -1,3 +1,6 @@ +""" +Job meant to run locally to get data from AWS S3 to local. Updates required to run in cluster. +""" from yaetos.etl_utils import ETL_Base, Commandliner, get_aws_setup import os from cloudpathlib import CloudPath as CPt diff --git a/setup.py b/setup.py index c729b35c..a9b8d67f 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ # For a discussion on single-sourcing the version across setup.py and the # project code, see # https://packaging.python.org/guides/single-sourcing-package-version/ - version='0.11.7', # Required + version='0.11.8', # Required # This is a one-line description or tagline of what your project does. This # corresponds to the "Summary" metadata field: diff --git a/tests/yaetos/deploy_test.py b/tests/yaetos/deploy_test.py index 07f00994..8cce8275 100644 --- a/tests/yaetos/deploy_test.py +++ b/tests/yaetos/deploy_test.py @@ -78,11 +78,26 @@ def test_get_spark_submit_args(self, app_args): expected.insert(8, '--sql_file=/home/hadoop/app/some_file.sql') assert actual == expected + def test_get_spark_submit_args_with_launcher(self, app_args): + app_args['job_name'] = 'some_job_name' + app_file = 'jobs/generic/launcher.py' + actual = Dep.get_spark_submit_args(app_file, app_args) + expected = [ + 'spark-submit', + '--verbose', + '--py-files=/home/hadoop/app/scripts.zip', + '/home/hadoop/app/jobs/generic/launcher.py', + '--mode=None', + '--deploy=none', + '--storage=s3', + '--job_name=some_job_name'] + assert actual == expected + def test_get_spark_submit_args_jar(self): app_args = { 'jar_job': 'some/job.jar', 'spark_app_args': 'some_arg'} - app_file = app_args['jar_job'] + app_file = 'jobs/generic/launcher.py' actual = Dep.get_spark_submit_args(app_file, app_args) expected = [ 'spark-submit', diff --git a/tests/yaetos/etl_utils_test.py b/tests/yaetos/etl_utils_test.py index 41979cba..b1b00825 100644 --- a/tests/yaetos/etl_utils_test.py +++ b/tests/yaetos/etl_utils_test.py @@ -100,6 +100,44 @@ def test_set_sql_file_from_name(self): sql_file = Job_Yml_Parser.set_sql_file_from_name('some/job_name') assert sql_file is None + def test_set_mode_specific_params(self): + job_name = 'n/a' + job_param_file = 'conf/jobs_metadata.yml' + yml_modes = 'dev_EMR' + skip_job = True + actual_params = Job_Yml_Parser(job_name, job_param_file, yml_modes, skip_job).set_job_yml(job_name, job_param_file, yml_modes, skip_job) + expected_params = { + 'aws_config_file': 'conf/aws_config.cfg', + 'aws_setup': 'dev', + 'base_path': '{root_path}/pipelines_data', + 'connection_file': 'conf/connections.cfg', + 'email_cred_section': 'some_email_cred_section', + 'emr_core_instances': 0, + 'enable_db_push': False, + 'jobs_folder': 'jobs/', + 'load_connectors': 'all', + 'manage_git_info': True, + 'redshift_s3_tmp_dir': 's3a://dev-spark/tmp_spark/', + 'root_path': 's3://mylake-dev', + 's3_dags': '{root_path}/pipelines_metadata/airflow_dags', + 's3_logs': '{root_path}/pipelines_metadata', + 'save_schemas': False, + 'schema': 'sandbox', + 'spark_version': '3.5'} + assert actual_params == expected_params + + def test_set_modes(self): + yml_modes = 'dev_EMR,your_extra_tenant' # i.e. using 2 modes + job_name = 'n/a' + job_param_file = 'conf/jobs_metadata.yml' + skip_job = True + expected_params = { + 'save_schemas': False, + 'other_param': 'some_value'} + actual_params = Job_Yml_Parser(job_name, job_param_file, yml_modes, skip_job).set_job_yml(job_name, job_param_file, yml_modes, skip_job) + actual_params = {key: value for (key, value) in actual_params.items() if key in expected_params.keys()} + assert actual_params == expected_params + class Test_Job_Args_Parser(object): def test_no_param_override(self): @@ -173,6 +211,21 @@ def test_create_spark_submit_python_job(self): '--arg2=value2'] assert cmd_lst_real == cmd_lst_expected + def test_create_spark_submit_python_job_with_launcher(self): + job_args = { + 'launcher_file': 'jobs/generic/launcher.py', + 'py_job': 'jobs/examples/ex7_pandas_job.py', + 'py-files': 'some/files.zip', + 'spark_submit_keys': 'py-files', + 'spark_app_keys': ''} + launch_jargs = Job_Args_Parser(defaults_args={}, yml_args={}, job_args=job_args, cmd_args={}, build_yml_args=False, loaded_inputs={}) + cmd_lst_real = Runner.create_spark_submit(jargs=launch_jargs) + cmd_lst_expected = [ + 'spark-submit', + '--py-files=some/files.zip', + 'jobs/examples/ex7_pandas_job.py'] # launcher.py not carried over. may want to change behavior. + assert cmd_lst_real == cmd_lst_expected + def test_create_spark_submit_jar_job(self): job_args = { 'jar_job': 'jobs/examples/ex12_scala_job/target/spark_scala_job_2.13-1.0.jar', diff --git a/yaetos/athena.py b/yaetos/athena.py index c271fbb4..cee899b2 100644 --- a/yaetos/athena.py +++ b/yaetos/athena.py @@ -6,7 +6,7 @@ logger = setup_logging('Athena') -def register_table(types, name_tb, schema, output_info, args): +def register_table_to_athena_catalog(types, name_tb, schema, output_info, args): description_statement = f"""COMMENT "{args['description']}" """ if args.get('description') else '' output_folder = output_info['path_expanded'].replace('s3a', 's3') @@ -41,7 +41,7 @@ def register_table(types, name_tb, schema, output_info, args): raise Exception('Athena table registration not setup for other than csv files.') # Start the query execution - if args.get('mode') == 'dev_local': + if args.get('mode') and 'dev_local' in args['mode'].split(','): config = ConfigParser() assert os.path.isfile(args.get('aws_config_file')) config.read(args.get('aws_config_file')) @@ -55,5 +55,58 @@ def register_table(types, name_tb, schema, output_info, args): QueryExecutionContext={'Database': schema}, ResultConfiguration={'OutputLocation': args.get('athena_out')}, ) - logger.info(f"Registered table to athena '{schema}.{name_tb}', with QueryExecutionId: {response['QueryExecutionId']}.") + logger.info(f"Registered table to athena '{schema}.{name_tb}', pointing to {output_folder}, with QueryExecutionId: {response['QueryExecutionId']}.") # TODO: Check to support "is_incremental" + + +def register_table_to_glue_catalog(schema_list, name_tb, schema, output_info, args): + output_folder = output_info['path_expanded'].replace('s3a', 's3') + + # Start the query execution + if args.get('mode') == 'dev_local': + config = ConfigParser() + assert os.path.isfile(args.get('aws_config_file')) + config.read(args.get('aws_config_file')) + region_name = config.get(args.get('aws_setup'), 's3_region') + else: + region_name = boto3.Session().region_name + + glue_client = boto3.client('glue', region_name=region_name) + + database_name = schema + table_name = name_tb + logger.info(f"Registering table to athena '{schema}.{name_tb}', schema {schema_list}.") + + # Define the table input + table_input = { + 'Name': table_name, + 'StorageDescriptor': { + 'Columns': schema_list, + 'Location': output_folder, + 'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat', + 'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat', + 'Compressed': False, + 'SerdeInfo': { + 'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe', + 'Parameters': { + 'serialization.format': '1' + } + }, + }, + 'TableType': 'EXTERNAL_TABLE', + 'Parameters': { + 'classification': 'parquet' + } + } + + try: + glue_client.delete_table(DatabaseName=database_name, Name=table_name) + print("Existing table deleted.") + except glue_client.exceptions.EntityNotFoundException: + print("No table to delete.") + + response = glue_client.create_table( + DatabaseName=database_name, + TableInput=table_input + ) + logger.info(f"Registered table to athena '{schema}.{name_tb}', pointing to {output_folder} with query response: {response}.") diff --git a/yaetos/db_utils.py b/yaetos/db_utils.py index 4cb49232..20dab524 100644 --- a/yaetos/db_utils.py +++ b/yaetos/db_utils.py @@ -111,13 +111,14 @@ def pdf_to_sdf(df, output_types, sc, sc_sql): # TODO: check suspicion that this return sc_sql.createDataFrame(rdd, schema=spark_schema, verifySchema=True) -def pandas_types_to_hive_types(df): +def pandas_types_to_hive_types(df, format='glue'): """ Converts pandas DataFrame dtypes to Hive column types. :param df: pandas DataFrame - :return: Dictionary of column names and their Hive data types + :return: Dictionary or list of dictionaries of column names and their Hive data types """ + # TODO: add support for nested fields, just like in spark version type_mapping = { 'object': 'STRING', 'bool': 'BOOLEAN', @@ -144,9 +145,44 @@ def pandas_types_to_hive_types(df): # This is a placeholder; actual handling should consider the specific precision and scale # 'decimal': 'DECIMAL', } - hive_types = {} - for column, dtype in df.dtypes.iteritems(): - dtype_name = dtype.name - hive_type = type_mapping.get(dtype_name, 'STRING') # Default to STRING if no mapping found - hive_types[column] = hive_type - return hive_types + if format == 'athena': + hive_types = {} + for column, dtype in df.dtypes.items(): + dtype_name = dtype.name + hive_type = type_mapping.get(dtype_name, 'STRING') + hive_types[column] = hive_type + return hive_types + elif format == 'glue': + hive_types = [] + for column_name, dtype in df.dtypes.items(): + hive_type = type_mapping.get(str(dtype), 'string') + hive_types.append({'Name': column_name, 'Type': hive_type}) + return hive_types + + +def spark_type_to_hive_type(data_type): + """ Convert Spark data types to a detailed readable string format, handling nested structures. """ + from pyspark.sql.types import StructType, ArrayType, FloatType, DecimalType, TimestampType # StructField, IntegerType, StringType, + + if isinstance(data_type, StructType): + # Handle nested struct by recursively processing each field + fields = [f"{field.name}: {spark_types_to_hive_types(field.dataType)}" for field in data_type.fields] + return f"struct<{', '.join(fields)}>" + elif isinstance(data_type, ArrayType): + # Handle arrays by describing element types + element_type = spark_types_to_hive_types(data_type.elementType) + return f"array<{element_type}>" + elif isinstance(data_type, TimestampType): + return "timestamp" + elif isinstance(data_type, FloatType): + return "float" + elif isinstance(data_type, DecimalType): + return f"decimal({data_type.precision},{data_type.scale})" + else: + # Fallback for other types with default string representation + return data_type.simpleString() + + +def spark_types_to_hive_types(sdf): + schema_list = [{"Name": field.name, "Type": spark_type_to_hive_type(field.dataType)} for field in sdf.schema] + return schema_list diff --git a/yaetos/deploy.py b/yaetos/deploy.py index 490113a8..163628ca 100644 --- a/yaetos/deploy.py +++ b/yaetos/deploy.py @@ -50,18 +50,23 @@ def __init__(self, deploy_args, app_args): self.app_args = app_args self.app_file = app_args['py_job'] # TODO: remove all refs to app_file to be consistent. self.aws_setup = aws_setup + # From aws_config.cfg: self.ec2_key_name = config.get(aws_setup, 'ec2_key_name') self.s3_region = config.get(aws_setup, 's3_region') self.user = config.get(aws_setup, 'user') self.profile_name = config.get(aws_setup, 'profile_name') self.ec2_subnet_id = config.get(aws_setup, 'ec2_subnet_id') - self.extra_security_gp = config.get(aws_setup, 'extra_security_gp') + self.extra_security_gp = config.get(aws_setup, 'extra_security_gp', fallback=None) + self.emr_ec2_role = config.get(aws_setup, 'emr_ec2_role', fallback='EMR_EC2_DefaultRole') + self.emr_role = config.get(aws_setup, 'emr_role', fallback='EMR_DefaultRole') + # From jobs_metadata.yml: self.emr_core_instances = int(app_args.get('emr_core_instances', 1)) # TODO: make this update EMR_Scheduled mode too. self.deploy_args = deploy_args self.ec2_instance_master = app_args.get('ec2_instance_master', 'm5.xlarge') # 'm5.12xlarge', # used m3.2xlarge (8 vCPU, 30 Gib RAM), and earlier m3.xlarge (4 vCPU, 15 Gib RAM) self.ec2_instance_slaves = app_args.get('ec2_instance_slaves', 'm5.xlarge') - # Paths - self.s3_logs = CPt(app_args.get('s3_logs', 's3://').replace('{root_path}', self.app_args.get('root_path', ''))) + # Computed params: + s3_logs = app_args.get('s3_logs', 's3://').replace('{root_path}', self.app_args.get('root_path', '')) + self.s3_logs = CPt(s3_logs) self.s3_bucket_logs = self.s3_logs.bucket self.metadata_folder = 'pipelines_metadata' # TODO remove self.pipeline_name = self.generate_pipeline_name(self.deploy_args['mode'], self.app_args['job_name'], self.user) # format: some_job.some_user.20181204.153429 @@ -106,7 +111,7 @@ def run(self): raise Exception("Shouldn't get here.") def continue_post_git_check(self): - if self.app_args['mode'] != 'prod_EMR': + if 'prod_EMR' not in self.app_args['mode'].split(','): logger.debug('Not pushing as "prod_EMR", so git check ignored') return True elif self.git_yml is None: @@ -209,7 +214,17 @@ def choose_cluster(self, clusters, cluster_id=None): @staticmethod def generate_pipeline_name(mode, job_name, user): """Opposite of get_job_name()""" - mode_label = {'dev_EMR': 'dev', 'prod_EMR': 'prod'}[mode] + + # Get deploy_env (Hacky. TODO: improve) + modes = mode.split(',') + required_mode = ('dev_EMR', 'prod_EMR') + modes = [item for item in modes if item in required_mode] + if len(modes) == 1: + deploy_env = modes[0] + else: + raise Exception(f"mode missing one of the required on {required_mode}") + + mode_label = {'dev_EMR': 'dev', 'prod_EMR': 'prod'}[deploy_env] pname = job_name.replace('.', '_d_').replace('/', '_s_') now = datetime.now().strftime("%Y%m%dT%H%M%S") name = f"yaetos__{mode_label}__{pname}__{now}" @@ -222,7 +237,7 @@ def get_job_name(pipeline_name): return pipeline_name.split('__')[2].replace('_d_', '.').replace('_s_', '/') if '__' in pipeline_name else None def get_job_log_path(self): - if self.deploy_args.get('mode') == 'prod_EMR': + if self.deploy_args.get('mode') and 'prod_EMR' in self.deploy_args.get('mode').split(','): # TODO: check if should be replaced by app_args return '{}/jobs_code/production'.format(self.metadata_folder) else: return '{}/jobs_code/{}'.format(self.metadata_folder, self.pipeline_name) @@ -342,7 +357,7 @@ def get_package_path(self): base = Pt(eu.LOCAL_FRAMEWORK_FOLDER) elif self.app_args['code_source'] == 'dir': base = Pt(self.app_args['code_source_path']) - logger.debug("Source of yaetos code to be shipped: {}".format(base / 'yaetos/')) + logger.info("Source of yaetos code to be shipped: {}".format(base / 'yaetos/')) # TODO: move code_source and code_source_path to deploy_args, involves adding it to DEPLOY_ARGS_LIST return base @@ -426,8 +441,8 @@ def start_spark_cluster(self, c, emr_version): # "Properties": { "spark.jars": ["/home/hadoop/redshift_tbd.jar"], "spark.driver.memory": "40G", "maximizeResourceAllocation": "true"}, # } ], - JobFlowRole='EMR_EC2_DefaultRole', - ServiceRole='EMR_DefaultRole', + JobFlowRole=self.emr_ec2_role, + ServiceRole=self.emr_role, VisibleToAllUsers=True, BootstrapActions=[{ 'Name': 'setup_nodes', @@ -519,13 +534,15 @@ def step_spark_submit(self, c, app_file, app_args): @staticmethod def get_spark_submit_args(app_file, app_args): + """ app_file is launcher, might be py_job too, but may also be separate from py_job (ex python launcher.py --job_name=some_job_with_py_job).""" + if app_args.get('py_job'): overridable_args = { 'spark_submit_args': '--verbose', 'spark_submit_keys': 'py-files', 'spark_app_args': '', 'spark_app_keys': 'mode--deploy--storage'} - else: + else: # for jar_job overridable_args = { 'spark_submit_args': '--verbose', 'spark_submit_keys': '', @@ -534,13 +551,29 @@ def get_spark_submit_args(app_file, app_args): overridable_args.update(app_args) args = overridable_args.copy() + + # set py_job + if app_args.get('launcher_file') and app_args.get('py_job'): + py_job = eu.CLUSTER_APP_FOLDER + app_args.get('launcher_file') + elif isinstance(app_file, str) and app_file.endswith('.py'): # TODO: check values app_file can take + py_job = eu.CLUSTER_APP_FOLDER + app_file + else: + py_job = None + + # set jar_job + if (app_args.get('launcher_file') or isinstance(app_file, str)) and app_args.get('jar_job'): # TODO: check to enforce app_args.get('launcher_file') + jar_job = eu.CLUSTER_APP_FOLDER + app_args.get('jar_job') + else: + jar_job = None + # TODO: simplify business of getting application code (2 blocks up) upstream, in etl_utils.py + unoverridable_args = { - 'py-files': f"{eu.CLUSTER_APP_FOLDER}scripts.zip", - 'py_job': eu.CLUSTER_APP_FOLDER + (app_file or app_args.get('py_job') or app_args.get('launcher_file')), # TODO: simplify business of getting application code upstream - 'mode': 'dev_EMR' if app_args.get('mode') == 'dev_local' else app_args.get('mode'), + 'py-files': f"{eu.CLUSTER_APP_FOLDER}scripts.zip" if py_job else None, + 'py_job': py_job, + 'mode': 'dev_EMR' if app_args.get('mode') and 'dev_local' in app_args['mode'].split(',') else app_args.get('mode'), 'deploy': 'none', 'storage': 's3', - 'jar_job': eu.CLUSTER_APP_FOLDER + (app_file or app_args.get('jar_job') or app_args.get('launcher_file'))} + 'jar_job': jar_job} args.update(unoverridable_args) if app_args.get('load_connectors', '') == 'all': diff --git a/yaetos/env_dispatchers.py b/yaetos/env_dispatchers.py index c50fd099..56911cb6 100644 --- a/yaetos/env_dispatchers.py +++ b/yaetos/env_dispatchers.py @@ -150,24 +150,27 @@ def load_pandas_cluster(self, fname, file_type, globy, read_func, read_kwargs): bucket_name, bucket_fname, fname_parts = self.split_s3_path(fname) uuid_path = str(uuid.uuid4()) - local_path = 'tmp/s3_copy_' + uuid_path + '_' + fname_parts[-1] + '/' + fname_parts[-1] # First fname_parts[-1] is to show fname part in folder name for easier debugging in AWS. Second is to isolate fname part in sub folder. - local_folder = 'tmp/s3_copy_' + uuid_path + '_' + fname_parts[-1] # to be based on above + local_folder = 'tmp/s3_copy_' + uuid_path + '_' + fname_parts[-1] # fname_parts[-1] put in folder name for easier debugging in AWS. + local_path = local_folder + '/' + fname_parts[-1] os.makedirs(local_folder, exist_ok=True) - cp = CloudPath(fname) # TODO: add way to load it with specific profile_name or client, as in "s3c = boto3.Session(profile_name='default').client('s3')" + cp = CloudPath(fname) # no need to specify profile_name as aws creds taken from cluster env. if globy: - cfiles = cp.glob(globy) - os.makedirs(local_path, exist_ok=True) # assuming it is folder. TODO: confirm in code. - logger.info(f"Copying {len(cfiles)} files from S3 to local '{local_path}'") + cfiles = cp.glob(globy) # careful to loop through cfiles only once as it will be consumed. + os.makedirs(local_path, exist_ok=True) + logger.info(f"Copying files from S3 '{fname}' to local '{local_path}'") for cfile in cfiles: - local_file_path = os.path.join(local_path, cfile.name) + glob_folders = str(cfile.parent).replace(fname, '') # goes from s3://some_bucket/path/folder_from_glob/file.parquet to /folder_from_glob/file.parquet + os.makedirs(os.path.join(local_path, glob_folders), exist_ok=True) + local_file_path = os.path.join(local_path, glob_folders, cfile.name) local_pathlib = cfile.download_to(local_file_path) local_path += '/' else: - logger.info("Copying files from S3 '{}' to local '{}'. May take some time.".format(fname, local_path)) + logger.info(f"Copying files from S3 '{fname}' to local '{local_path}'. May take some time.") local_pathlib = cp.download_to(local_path) local_path = local_path + '/' if local_pathlib.is_dir() else local_path logger.info(f"File copy finished, to {local_path}") df = load_dfs(local_path, file_type, globy, read_func, read_kwargs) + logger.info(f"df loaded, size '{len(df)}'") return df # --- save_pandas set of functions ---- diff --git a/yaetos/etl_utils.py b/yaetos/etl_utils.py index 9d5a7ce6..c46032a2 100644 --- a/yaetos/etl_utils.py +++ b/yaetos/etl_utils.py @@ -83,7 +83,7 @@ def etl(self, sc, sc_sql): else: output = self.etl_multi_pass(sc, sc_sql, self.loaded_inputs) except Exception as err: - if self.jargs.mode in ('prod_EMR') and self.jargs.merged_args.get('owners'): + if self.jargs.merged_args.get('send_emails') and self.jargs.merged_args.get('owners'): self.send_job_failure_email(err) raise Exception("Job failed, error: \n{}".format(err)) self.out_df = output @@ -187,7 +187,6 @@ def etl_one_pass(self, sc, sc_sql, loaded_inputs={}): self.push_to_kafka(output, self.OUTPUT_TYPES) if self.jargs.merged_args.get('register_to_athena') and self.jargs.enable_db_push: self.register_to_athena(output) - # import ipdb; ipdb.set_trace() if self.jargs.output.get('df_type', 'spark') == 'spark': output.unpersist() @@ -231,12 +230,26 @@ def get_last_run_period_daily(self, sc, sc_sql): def set_jargs(self, pre_jargs, loaded_inputs={}): """ jargs means job args. Function called only if running the job directly, i.e. "python some_job.py""" + # Set job_name if 'job_name' not in pre_jargs['job_args']: py_job = self.set_py_job() job_name = Job_Yml_Parser.set_job_name_from_file(py_job) else: job_name = pre_jargs['job_args']['job_name'] - return Job_Args_Parser(defaults_args=pre_jargs['defaults_args'], yml_args=None, job_args=pre_jargs['job_args'], cmd_args=pre_jargs['cmd_args'], job_name=job_name, build_yml_args=True, loaded_inputs=loaded_inputs) + + # Set jargs + jargs = Job_Args_Parser( + defaults_args=pre_jargs['defaults_args'], + yml_args=None, + job_args=pre_jargs['job_args'], + cmd_args=pre_jargs['cmd_args'], + job_name=job_name, + build_yml_args=True, + loaded_inputs=loaded_inputs) + + # Room for jargs mods at job level. + jargs = self.expand_params(jargs) + return jargs def set_py_job(self): """ Returns the file being executed. For ex, when running "python some_job.py", this functions returns "some_job.py". @@ -335,9 +348,17 @@ def load_input(self, input_name): input_type = self.jargs.inputs[input_name]['type'] if input_type in self.FILE_TYPES: path = self.jargs.inputs[input_name]['path'] - path = path.replace('s3://', 's3a://') if self.jargs.mode == 'dev_local' else path + path = path.replace('s3://', 's3a://') if 'dev_local' in self.jargs.mode.split(',') else path logger.info("Input '{}' to be loaded from files '{}'.".format(input_name, path)) - path = Path_Handler(path, self.jargs.base_path, self.jargs.merged_args.get('root_path')).expand_later() + + # Get base_path. TODO: centralize + if self.jargs.merged_args.get('name_base_in_param'): + base_path = self.jargs.merged_args[self.jargs.merged_args.get('name_base_in_param')] + path = path.replace('{' + self.jargs.merged_args.get('name_base_in_param') + '}', '{base_path}') + else: + base_path = self.jargs.merged_args['base_path'] + + path = Path_Handler(path, base_path, self.jargs.merged_args.get('root_path')).expand_later() self.jargs.inputs[input_name]['path_expanded'] = path # Unstructured type @@ -379,14 +400,18 @@ def load_input(self, input_name): return pdf # Tabular types, Spark + globy = self.jargs.inputs[input_name].get('glob') if input_type == 'csv': delimiter = self.jargs.merged_args.get('csv_delimiter', ',') + path = path + globy if globy else path sdf = self.sc_sql.read.option("delimiter", delimiter).csv(path, header=True) logger.info("Input '{}' loaded from files '{}'.".format(input_name, path)) elif input_type == 'parquet': + path = path + globy if globy else path sdf = self.sc_sql.read.parquet(path) logger.info("Input '{}' loaded from files '{}'.".format(input_name, path)) elif input_type == 'json': + path = path + globy if globy else path sdf = self.sc_sql.read.json(path) logger.info("Input '{}' loaded from files '{}'.".format(input_name, path)) elif input_type == 'mysql': @@ -408,7 +433,7 @@ def load_data_from_files(self, name, path, type, sc, sc_sql, df_meta, **kwargs): input = df_meta # TODO: get 2 variables below from this one. input_type = type # TODO: remove 'input_' prefix in code below since not specific to input. input_name = name - path = path.replace('s3://', 's3a://') if self.jargs.mode == 'dev_local' else path + path = path.replace('s3://', 's3a://') if 'dev_local' in self.jargs.mode.split(',') else path logger.info("Dataset '{}' to be loaded from files '{}'.".format(input_name, path)) path = self.expand_input_path(path, **kwargs) @@ -460,13 +485,31 @@ def load_data_from_files(self, name, path, type, sc, sc_sql, df_meta, **kwargs): logger.info("Dataset data types: {}".format(pformat([(fd.name, fd.dataType) for fd in sdf.schema.fields]))) return sdf + @staticmethod + def expand_params(jargs, **kwargs): + return jargs + def expand_input_path(self, path, **kwargs): # Function call isolated to be overridable. - return Path_Handler(path, self.jargs.base_path, self.jargs.merged_args.get('root_path')).expand_later() + # Get base_path. TODO: centralize + if self.jargs.merged_args.get('name_base_in_param'): + base_path = self.jargs.merged_args[self.jargs.merged_args.get('name_base_in_param')] + path = path.replace('{' + self.jargs.merged_args.get('name_base_in_param') + '}', '{base_path}') + else: + base_path = self.jargs.merged_args['base_path'] + + return Path_Handler(path, base_path, self.jargs.merged_args.get('root_path')).expand_later() def expand_output_path(self, path, now_dt, **kwargs): # Function call isolated to be overridable. - return Path_Handler(path, self.jargs.base_path, self.jargs.merged_args.get('root_path')).expand_now(now_dt) + # Get base_path. TODO: centralize + if self.jargs.merged_args.get('name_base_out_param'): + base_path = self.jargs.merged_args[self.jargs.merged_args.get('name_base_out_param')] + path = path.replace('{' + self.jargs.merged_args.get('name_base_out_param') + '}', '{base_path}') + else: + base_path = self.jargs.merged_args['base_path'] + + return Path_Handler(path, base_path, self.jargs.merged_args.get('root_path')).expand_now(now_dt) def load_mysql(self, input_name): creds = Cred_Ops_Dispatcher().retrieve_secrets(self.jargs.storage, aws_creds=AWS_SECRET_ID, local_creds=self.jargs.connection_file) @@ -572,8 +615,6 @@ def save_output(self, output, now_dt=None): def save(self, output, path, base_path, type, now_dt=None, is_incremental=None, incremental_type=None, partitionby=None, file_tag=None, **kwargs): """Used to save output to disk. Can be used too inside jobs to output 2nd output for testing.""" - # import ipdb; ipdb.set_trace() - # path = Path_Handler(path, base_path, self.jargs.merged_args.get('root_path')).expand_now(now_dt): path = self.expand_output_path(path, now_dt, **kwargs) self.jargs.output['path_expanded'] = path @@ -670,15 +711,19 @@ def copy_to_redshift_using_spark(self, sdf): create_table(sdf, connection_profile, name_tb, schema, creds, self.jargs.is_incremental, self.jargs.redshift_s3_tmp_dir, self.jargs.merged_args.get('spark_version', '3.5')) def register_to_athena(self, df): - from yaetos.athena import register_table - from yaetos.db_utils import pandas_types_to_hive_types + from yaetos.athena import register_table_to_glue_catalog # TODO: add register_table_to_athena_catalog + from yaetos.db_utils import pandas_types_to_hive_types, spark_types_to_hive_types schema, name_tb = self.jargs.register_to_athena['table'].split('.') schema = schema.format(schema=self.jargs.schema) if '{schema}' in schema else schema output_info = self.jargs.output - pdf = df if isinstance(df, pd.DataFrame) else df.toPandas() - hive_types = pandas_types_to_hive_types(pdf) - args = self.jargs.merged_args - register_table(hive_types, name_tb, schema, output_info, args) + if isinstance(df, pd.DataFrame): + hive_types = pandas_types_to_hive_types(df) + args = self.jargs.merged_args + register_table_to_glue_catalog(hive_types, name_tb, schema, output_info, args) + else: + hive_types = spark_types_to_hive_types(df) + args = self.jargs.merged_args + register_table_to_glue_catalog(hive_types, name_tb, schema, output_info, args) def copy_to_clickhouse(self, sdf): # import put here below to avoid loading heavy libraries when not needed (optional feature). @@ -831,22 +876,28 @@ def set_sql_file_from_name(job_name): logger.info("sql_file: '{}', from job_name: '{}'".format(sql_file, job_name)) return sql_file - def set_job_yml(self, job_name, job_param_file, yml_mode, skip_job): + def set_job_yml(self, job_name, job_param_file, yml_modes, skip_job): if job_param_file is None: return {} yml = self.load_meta(job_param_file) if job_name not in yml['jobs'] and not skip_job: raise KeyError("Your job '{}' can't be found in jobs_metadata file '{}'. Add it there or make sure the name matches".format(job_name, job_param_file)) - elif job_name not in yml['jobs'] and skip_job: + elif skip_job: job_yml = {} else: job_yml = yml['jobs'][job_name] - if yml_mode not in yml['common_params']['mode_specific_params']: - raise KeyError("Your yml mode '{}' can't be found in jobs_metadata file '{}'. Add it there or make sure the name matches".format(yml_mode, job_param_file)) + yml_modes = yml_modes.split(',') + mode_spec_yml = {} + for yml_mode in yml_modes: + if yml_mode not in yml['common_params']['mode_specific_params']: + raise KeyError("Your yml mode '{}' can't be found in jobs_metadata file '{}'. Add it there or make sure the name matches".format(yml_mode, job_param_file)) - mode_spec_yml = yml['common_params']['mode_specific_params'][yml_mode] + mode_spec = yml['common_params']['mode_specific_params'][yml_mode] + mode_spec_yml.update(mode_spec) + + # Stacking params in right order (all_mode_params->mode_specific_params->job_params) out = yml['common_params']['all_mode_params'] out.update(mode_spec_yml) out.update(job_yml) @@ -907,7 +958,7 @@ def __init__(self, defaults_args, yml_args, job_args, cmd_args, job_name=None, b @staticmethod def get_default_mode(args): - if args.get('mode') == 'dev_local' and args.get('deploy') in ('EMR', 'EMR_Scheduled', 'airflow'): + if args.get('mode') and 'dev_local' in args['mode'].split(',') and args.get('deploy') in ('EMR', 'EMR_Scheduled', 'airflow'): return 'dev_EMR' else: return args.get('mode', 'None') @@ -926,7 +977,14 @@ def update_args(self, args, loaded_inputs): if args.get('output'): args['output']['type'] = args.pop('output.type', None) or args['output'].get('type', 'none') if args.get('spark_app_args'): # hack to have scala sample job working. TODO: remove hardcoded case when made more generic - args['spark_app_args'] = Path_Handler(args['spark_app_args'], args.get('base_path'), args.get('root_path')).path + + # Get base_path. TODO: centralize + if args.get('name_base_in_param'): # TODO: check if requires name_base_in_param or name_base_out_param + base_path = args[args.get('name_base_in_param')] + args['spark_app_args'] = args['spark_app_args'].replace('{' + self.jargs.merged_args.get('name_base_in_param') + '}', '{base_path}') + else: + base_path = args.get('base_path') + args['spark_app_args'] = Path_Handler(args['spark_app_args'], base_path, args.get('root_path')).path return args @@ -980,6 +1038,7 @@ def validate(self): class Path_Handler(): def __init__(self, path, base_path=None, root_path=None): + # TODO: rewrite full class. too hacky. self.path = path self.base_path = base_path self.root_path = root_path @@ -999,14 +1058,14 @@ def expand_later(self): upstream_path = path.split('{latest}')[0] paths = FS_Ops_Dispatcher().listdir(upstream_path) latest_date = max(paths) - path = path.format(latest=latest_date) + path = path.replace('{latest}', latest_date) return path def expand_now(self, now_dt): path = self.path if '{now}' in path: current_time = now_dt.strftime('date%Y%m%d_time%H%M%S_utc') - path = path.format(now=current_time) + path = path.replace('{now}', current_time) return path def get_base(self): @@ -1074,7 +1133,7 @@ def define_commandline_args(): # Defaults should not be set in parser so they can be set outside of command line functionality. parser = argparse.ArgumentParser() parser.add_argument("-d", "--deploy", choices=set(['none', 'EMR', 'EMR_Scheduled', 'airflow', 'EMR_DataPipeTest', 'code', 'local_spark_submit']), help="Choose where to run the job.") - parser.add_argument("-m", "--mode", choices=set(['dev_local', 'dev_EMR', 'prod_EMR']), help="Choose which set of params to use from jobs_metadata.yml file.") + parser.add_argument("-m", "--mode", help="Choose which set of params to use from jobs_metadata.yml file. Typically from ('dev_local', 'dev_EMR', 'prod_EMR') but could include others.") parser.add_argument("-j", "--job_param_file", help="Identify file to use. It can be set to 'False' to not load any file and provide all parameters through job or command line arguments.") parser.add_argument("-n", "--job_name", help="Identify registry job to use.") parser.add_argument("-q", "--sql_file", help="Path to an sql file to execute.") @@ -1217,7 +1276,7 @@ def create_contexts(app_name, jargs): if jargs.merged_args.get('driver-memoryOverhead'): # For extra overhead for python in driver (typically pandas) conf = conf.set("spark.driver.memoryOverhead", jargs.merged_args['driver-memoryOverhead']) - if jargs.mode == 'dev_local' and jargs.load_connectors == 'all': + if 'dev_local' in jargs.mode.split(',') and jargs.load_connectors == 'all': # Setup below not needed when running from EMR because setup there is done through spark-submit. # Env vars for S3 access get_aws_setup(jargs.merged_args) @@ -1250,7 +1309,7 @@ def create_contexts(app_name, jargs): def get_aws_setup(args): if os.environ.get('AWS_ACCESS_KEY_ID') and os.environ.get('AWS_SECRET_ACCESS_KEY'): - session = boto3.Session() + session = boto3.Session() # to check : credentials = session.get_credentials() return session from configparser import ConfigParser diff --git a/yaetos/excel_utils.py b/yaetos/excel_utils.py index 996299a3..57448c60 100644 --- a/yaetos/excel_utils.py +++ b/yaetos/excel_utils.py @@ -14,7 +14,7 @@ def load_excel(jargs, input_name, output_types, sc, sc_sql, **xls_args): def load_excel_pandas(jargs, input_name, **xls_args): path = jargs.inputs[input_name]['path'] - path = path.replace('s3://', 's3a://') if jargs.mode == 'dev_local' else path + path = path.replace('s3://', 's3a://') if 'dev_local' in jargs.mode.split(',') else path logger.info("Input '{}' to be loaded from files '{}'.".format(input_name, path)) path = Path_Handler(path, jargs.base_path).expand_later() logger.info("Input '{}' loaded from files '{}'.".format(input_name, path)) diff --git a/yaetos/git_utils.py b/yaetos/git_utils.py index 46da4895..ff208d1a 100644 --- a/yaetos/git_utils.py +++ b/yaetos/git_utils.py @@ -10,13 +10,17 @@ class Git_Config_Manager(): FNAME = 'conf/git_config.yml' def get_config(self, mode, **kwargs): - if mode == 'dev_local': + + # Deal with multiple modes if any (Hacky. TODO: improve) + modes = mode.split(',') + if 'dev_local' in modes: config = self.get_config_from_git(kwargs['local_app_folder']) # For debug: self.save_yaml(config) - elif mode in ('dev_EMR', 'prod_EMR'): + elif 'dev_EMR' in modes or 'prod_EMR' in modes: config = self.get_config_from_file(kwargs['cluster_app_folder']) else: - raise Exception('Wrong mode') + required_mode = ('dev_local', 'dev_EMR', 'prod_EMR') + raise Exception(f'Wrong mode, one of the mode should be in {required_mode}') return config def get_config_from_git(self, local_app_folder): diff --git a/yaetos/libs/generic_jobs/copy_raw_job.py b/yaetos/libs/generic_jobs/copy_raw_job.py index 0e08a53a..3b79db5e 100644 --- a/yaetos/libs/generic_jobs/copy_raw_job.py +++ b/yaetos/libs/generic_jobs/copy_raw_job.py @@ -1,3 +1,6 @@ +""" +Job meant to run locally to get data from AWS S3 to local. Updates require to run in cluster. +""" from yaetos.etl_utils import ETL_Base, Commandliner, get_aws_setup import os from cloudpathlib import CloudPath as CPt diff --git a/yaetos/scripts/copy/Dockerfile_external b/yaetos/scripts/copy/Dockerfile_external index 12e978dd..9a1a1371 100644 --- a/yaetos/scripts/copy/Dockerfile_external +++ b/yaetos/scripts/copy/Dockerfile_external @@ -9,7 +9,7 @@ RUN apt update \ # 2 lines above for jupyterlab RUN python -m pip install --upgrade pip -RUN pip3 install --no-deps yaetos==0.11.7 +RUN pip3 install --no-deps yaetos==0.11.8 # Force latest version to avoid using previous ones. RUN pip3 install -r /opt/bitnami/python/lib/python3.8/site-packages/yaetos/scripts/requirements_base.txt # Installing libraries required by Yaetos and more. Using this since requirements_base.txt has exact versions. diff --git a/yaetos/scripts/copy/aws_config.cfg.example b/yaetos/scripts/copy/aws_config.cfg.example index 2da7b8fe..9257cf68 100644 --- a/yaetos/scripts/copy/aws_config.cfg.example +++ b/yaetos/scripts/copy/aws_config.cfg.example @@ -4,7 +4,9 @@ ec2_key_name : to_be_filled user : to_be_filled s3_region : to_be_filled ec2_subnet_id : to_be_filled -extra_security_gp : None +extra_security_gp : to_be_filled # optional +emr_ec2_role : to_be_filled # optional +emr_role : to_be_filled # optional [prod] profile_name : to_be_filled @@ -12,4 +14,6 @@ ec2_key_name : to_be_filled user : to_be_filled s3_region : to_be_filled ec2_subnet_id : to_be_filled -extra_security_gp : None +extra_security_gp : to_be_filled # optional +emr_ec2_role : to_be_filled # optional +emr_role : to_be_filled # optional