Skip to content

Commit

Permalink
Merge pull request #116 from arthurprevot/support_signle_tenancy
Browse files Browse the repository at this point in the history
Support single tenancy & more + move to v0.11.8
  • Loading branch information
arthurprevot authored Jun 12, 2024
2 parents 7841edc + 3f5ef8c commit 6256440
Show file tree
Hide file tree
Showing 17 changed files with 372 additions and 73 deletions.
8 changes: 6 additions & 2 deletions conf/aws_config.cfg.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ ec2_key_name : to_be_filled
user : to_be_filled
s3_region : to_be_filled
ec2_subnet_id : to_be_filled
extra_security_gp : None
extra_security_gp : to_be_filled # optional
emr_ec2_role : to_be_filled # optional
emr_role : to_be_filled # optional

[prod]
profile_name : to_be_filled
ec2_key_name : to_be_filled
user : to_be_filled
s3_region : to_be_filled
ec2_subnet_id : to_be_filled
extra_security_gp : None
extra_security_gp : to_be_filled # optional
emr_ec2_role : to_be_filled # optional
emr_role : to_be_filled # optional
29 changes: 29 additions & 0 deletions conf/jobs_metadata.yml
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,32 @@ jobs:
output: {'path':'{base_path}/load_example/glob_filtering/{now}/dataset.csv', 'type':'csv', 'df_type':'pandas'}
spark_boot: False

examples/ex17_multimode_params_job:
description: |
Showing how to use multiple, useful for single tenant setup, where base_path need to change for various. see --mode=dev_local,your_extra_tenant below.
usage: python jobs/generic/launcher.py --job_name=examples/ex17_multimode_params_job --mode=dev_local,your_extra_tenant
py_job: jobs/examples/ex7_pandas_job.py
inputs:
some_events: {'path':"{base_path}/wiki_example/input/{latest}/", 'type':'csv', 'df_type':'pandas'}
other_events: {'path':"{base_path}/wiki_example/input/{latest}/", 'type':'csv', 'df_type':'pandas', 'read_kwargs':{}}
output: {'path':'{base_path}/wiki_example/output_ex7_pandas/{now}/dataset.csv', 'type':'csv', 'df_type':'pandas', 'save_kwargs':{'sep':'|'}}
spark_boot: False

examples/ex18_base_path_in_out_job:
description: |
Showing use of name_base_in_param and name_base_out_param, to set base_path different in input and output.
usage: python jobs/generic/launcher.py --job_name=examples/ex18_base_path_in_out_job #--mode=dev_local,your_extra_tenant
py_job: jobs/examples/ex7_pandas_job.py
inputs:
some_events: {'path':"{base_path_in}/wiki_example/input/{latest}/", 'type':'csv', 'df_type':'pandas'}
other_events: {'path':"{base_path_in}/wiki_example/input/{latest}/", 'type':'csv', 'df_type':'pandas', 'read_kwargs':{}}
output: {'path':'{base_path_out}/wiki_example/output_ex7_pandas/{now}/dataset.csv', 'type':'csv', 'df_type':'pandas', 'save_kwargs':{'sep':'|'}}
name_base_in_param: base_path_in
name_base_out_param: base_path_out
base_path_in: './data'
base_path_out: './data_other'
spark_boot: False

# wordcount_raw_job: #Job exists but doesn't rely on jobs_metadata entries

# ----- Marketing Jobs --------
Expand Down Expand Up @@ -448,3 +474,6 @@ common_params:
enable_db_push: False
save_schemas: True
manage_git_info: False
your_extra_tenant: # useful for single tenant architecture, with data in separate locations
save_schemas: False
other_param: 'some_value'
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
author = 'Arthur Prevot'

release = '0.11'
version = '0.11.7'
version = '0.11.8'

# -- General configuration

Expand Down
3 changes: 3 additions & 0 deletions jobs/generic/copy_raw_job.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"""
Job meant to run locally to get data from AWS S3 to local. Updates required to run in cluster.
"""
from yaetos.etl_utils import ETL_Base, Commandliner, get_aws_setup
import os
from cloudpathlib import CloudPath as CPt
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
# For a discussion on single-sourcing the version across setup.py and the
# project code, see
# https://packaging.python.org/guides/single-sourcing-package-version/
version='0.11.7', # Required
version='0.11.8', # Required

# This is a one-line description or tagline of what your project does. This
# corresponds to the "Summary" metadata field:
Expand Down
17 changes: 16 additions & 1 deletion tests/yaetos/deploy_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,26 @@ def test_get_spark_submit_args(self, app_args):
expected.insert(8, '--sql_file=/home/hadoop/app/some_file.sql')
assert actual == expected

def test_get_spark_submit_args_with_launcher(self, app_args):
app_args['job_name'] = 'some_job_name'
app_file = 'jobs/generic/launcher.py'
actual = Dep.get_spark_submit_args(app_file, app_args)
expected = [
'spark-submit',
'--verbose',
'--py-files=/home/hadoop/app/scripts.zip',
'/home/hadoop/app/jobs/generic/launcher.py',
'--mode=None',
'--deploy=none',
'--storage=s3',
'--job_name=some_job_name']
assert actual == expected

def test_get_spark_submit_args_jar(self):
app_args = {
'jar_job': 'some/job.jar',
'spark_app_args': 'some_arg'}
app_file = app_args['jar_job']
app_file = 'jobs/generic/launcher.py'
actual = Dep.get_spark_submit_args(app_file, app_args)
expected = [
'spark-submit',
Expand Down
53 changes: 53 additions & 0 deletions tests/yaetos/etl_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,44 @@ def test_set_sql_file_from_name(self):
sql_file = Job_Yml_Parser.set_sql_file_from_name('some/job_name')
assert sql_file is None

def test_set_mode_specific_params(self):
job_name = 'n/a'
job_param_file = 'conf/jobs_metadata.yml'
yml_modes = 'dev_EMR'
skip_job = True
actual_params = Job_Yml_Parser(job_name, job_param_file, yml_modes, skip_job).set_job_yml(job_name, job_param_file, yml_modes, skip_job)
expected_params = {
'aws_config_file': 'conf/aws_config.cfg',
'aws_setup': 'dev',
'base_path': '{root_path}/pipelines_data',
'connection_file': 'conf/connections.cfg',
'email_cred_section': 'some_email_cred_section',
'emr_core_instances': 0,
'enable_db_push': False,
'jobs_folder': 'jobs/',
'load_connectors': 'all',
'manage_git_info': True,
'redshift_s3_tmp_dir': 's3a://dev-spark/tmp_spark/',
'root_path': 's3://mylake-dev',
's3_dags': '{root_path}/pipelines_metadata/airflow_dags',
's3_logs': '{root_path}/pipelines_metadata',
'save_schemas': False,
'schema': 'sandbox',
'spark_version': '3.5'}
assert actual_params == expected_params

def test_set_modes(self):
yml_modes = 'dev_EMR,your_extra_tenant' # i.e. using 2 modes
job_name = 'n/a'
job_param_file = 'conf/jobs_metadata.yml'
skip_job = True
expected_params = {
'save_schemas': False,
'other_param': 'some_value'}
actual_params = Job_Yml_Parser(job_name, job_param_file, yml_modes, skip_job).set_job_yml(job_name, job_param_file, yml_modes, skip_job)
actual_params = {key: value for (key, value) in actual_params.items() if key in expected_params.keys()}
assert actual_params == expected_params


class Test_Job_Args_Parser(object):
def test_no_param_override(self):
Expand Down Expand Up @@ -173,6 +211,21 @@ def test_create_spark_submit_python_job(self):
'--arg2=value2']
assert cmd_lst_real == cmd_lst_expected

def test_create_spark_submit_python_job_with_launcher(self):
job_args = {
'launcher_file': 'jobs/generic/launcher.py',
'py_job': 'jobs/examples/ex7_pandas_job.py',
'py-files': 'some/files.zip',
'spark_submit_keys': 'py-files',
'spark_app_keys': ''}
launch_jargs = Job_Args_Parser(defaults_args={}, yml_args={}, job_args=job_args, cmd_args={}, build_yml_args=False, loaded_inputs={})
cmd_lst_real = Runner.create_spark_submit(jargs=launch_jargs)
cmd_lst_expected = [
'spark-submit',
'--py-files=some/files.zip',
'jobs/examples/ex7_pandas_job.py'] # launcher.py not carried over. may want to change behavior.
assert cmd_lst_real == cmd_lst_expected

def test_create_spark_submit_jar_job(self):
job_args = {
'jar_job': 'jobs/examples/ex12_scala_job/target/spark_scala_job_2.13-1.0.jar',
Expand Down
59 changes: 56 additions & 3 deletions yaetos/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
logger = setup_logging('Athena')


def register_table(types, name_tb, schema, output_info, args):
def register_table_to_athena_catalog(types, name_tb, schema, output_info, args):
description_statement = f"""COMMENT "{args['description']}" """ if args.get('description') else ''
output_folder = output_info['path_expanded'].replace('s3a', 's3')

Expand Down Expand Up @@ -41,7 +41,7 @@ def register_table(types, name_tb, schema, output_info, args):
raise Exception('Athena table registration not setup for other than csv files.')

# Start the query execution
if args.get('mode') == 'dev_local':
if args.get('mode') and 'dev_local' in args['mode'].split(','):
config = ConfigParser()
assert os.path.isfile(args.get('aws_config_file'))
config.read(args.get('aws_config_file'))
Expand All @@ -55,5 +55,58 @@ def register_table(types, name_tb, schema, output_info, args):
QueryExecutionContext={'Database': schema},
ResultConfiguration={'OutputLocation': args.get('athena_out')},
)
logger.info(f"Registered table to athena '{schema}.{name_tb}', with QueryExecutionId: {response['QueryExecutionId']}.")
logger.info(f"Registered table to athena '{schema}.{name_tb}', pointing to {output_folder}, with QueryExecutionId: {response['QueryExecutionId']}.")
# TODO: Check to support "is_incremental"


def register_table_to_glue_catalog(schema_list, name_tb, schema, output_info, args):
output_folder = output_info['path_expanded'].replace('s3a', 's3')

# Start the query execution
if args.get('mode') == 'dev_local':
config = ConfigParser()
assert os.path.isfile(args.get('aws_config_file'))
config.read(args.get('aws_config_file'))
region_name = config.get(args.get('aws_setup'), 's3_region')
else:
region_name = boto3.Session().region_name

glue_client = boto3.client('glue', region_name=region_name)

database_name = schema
table_name = name_tb
logger.info(f"Registering table to athena '{schema}.{name_tb}', schema {schema_list}.")

# Define the table input
table_input = {
'Name': table_name,
'StorageDescriptor': {
'Columns': schema_list,
'Location': output_folder,
'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat',
'Compressed': False,
'SerdeInfo': {
'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe',
'Parameters': {
'serialization.format': '1'
}
},
},
'TableType': 'EXTERNAL_TABLE',
'Parameters': {
'classification': 'parquet'
}
}

try:
glue_client.delete_table(DatabaseName=database_name, Name=table_name)
print("Existing table deleted.")
except glue_client.exceptions.EntityNotFoundException:
print("No table to delete.")

response = glue_client.create_table(
DatabaseName=database_name,
TableInput=table_input
)
logger.info(f"Registered table to athena '{schema}.{name_tb}', pointing to {output_folder} with query response: {response}.")
52 changes: 44 additions & 8 deletions yaetos/db_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,14 @@ def pdf_to_sdf(df, output_types, sc, sc_sql): # TODO: check suspicion that this
return sc_sql.createDataFrame(rdd, schema=spark_schema, verifySchema=True)


def pandas_types_to_hive_types(df):
def pandas_types_to_hive_types(df, format='glue'):
"""
Converts pandas DataFrame dtypes to Hive column types.
:param df: pandas DataFrame
:return: Dictionary of column names and their Hive data types
:return: Dictionary or list of dictionaries of column names and their Hive data types
"""
# TODO: add support for nested fields, just like in spark version
type_mapping = {
'object': 'STRING',
'bool': 'BOOLEAN',
Expand All @@ -144,9 +145,44 @@ def pandas_types_to_hive_types(df):
# This is a placeholder; actual handling should consider the specific precision and scale
# 'decimal': 'DECIMAL',
}
hive_types = {}
for column, dtype in df.dtypes.iteritems():
dtype_name = dtype.name
hive_type = type_mapping.get(dtype_name, 'STRING') # Default to STRING if no mapping found
hive_types[column] = hive_type
return hive_types
if format == 'athena':
hive_types = {}
for column, dtype in df.dtypes.items():
dtype_name = dtype.name
hive_type = type_mapping.get(dtype_name, 'STRING')
hive_types[column] = hive_type
return hive_types
elif format == 'glue':
hive_types = []
for column_name, dtype in df.dtypes.items():
hive_type = type_mapping.get(str(dtype), 'string')
hive_types.append({'Name': column_name, 'Type': hive_type})
return hive_types


def spark_type_to_hive_type(data_type):
""" Convert Spark data types to a detailed readable string format, handling nested structures. """
from pyspark.sql.types import StructType, ArrayType, FloatType, DecimalType, TimestampType # StructField, IntegerType, StringType,

if isinstance(data_type, StructType):
# Handle nested struct by recursively processing each field
fields = [f"{field.name}: {spark_types_to_hive_types(field.dataType)}" for field in data_type.fields]
return f"struct<{', '.join(fields)}>"
elif isinstance(data_type, ArrayType):
# Handle arrays by describing element types
element_type = spark_types_to_hive_types(data_type.elementType)
return f"array<{element_type}>"
elif isinstance(data_type, TimestampType):
return "timestamp"
elif isinstance(data_type, FloatType):
return "float"
elif isinstance(data_type, DecimalType):
return f"decimal({data_type.precision},{data_type.scale})"
else:
# Fallback for other types with default string representation
return data_type.simpleString()


def spark_types_to_hive_types(sdf):
schema_list = [{"Name": field.name, "Type": spark_type_to_hive_type(field.dataType)} for field in sdf.schema]
return schema_list
Loading

0 comments on commit 6256440

Please sign in to comment.