Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kubernetes template for airflow + more #120

Merged
merged 41 commits into from
Jul 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
93a5467
make deploy to k8s pre-parametrized for k8s_aws or for k8s_docker_des…
arthurprevot Jul 1, 2024
907dbc9
function rename path_handler().expand_later() -> expand_latest()
arthurprevot Jul 4, 2024
5f5cfdb
added list_files() and copy_files() for cross plateform ops
arthurprevot Jul 5, 2024
e53d55d
update setup_master.sh to diagnose update issue in EMR ( with aws s3 …
arthurprevot Jul 5, 2024
04652d8
cleaning
arthurprevot Jul 6, 2024
7b243b1
deal with default_aws_modes from yml file
arthurprevot Jul 6, 2024
f88ee50
get deploy.py to use etl_utils.get_aws_setup() to get aws creds from …
arthurprevot Jul 6, 2024
0f817f7
added --deploy=airflow_k8s that uses a new template (not yet set, pla…
arthurprevot Jul 7, 2024
b97aea0
add template for airflow_k8s, just copy of emr for now. TBUpdated
arthurprevot Jul 7, 2024
ea35706
customize airflow_template_k8s.py for k8s
arthurprevot Jul 7, 2024
e702c27
customize
arthurprevot Jul 7, 2024
8b53d87
template yaml for spark submit for airflow. to be customized later.
arthurprevot Jul 7, 2024
9a00046
set k8s params for template
arthurprevot Jul 7, 2024
37ea13e
added k8s params in jobs_metadata.yml
arthurprevot Jul 7, 2024
8df6167
fix mode selection + fix jargs
arthurprevot Jul 14, 2024
8159b1e
var rename
arthurprevot Jul 14, 2024
abf7fd6
remove code now not needed
arthurprevot Jul 14, 2024
c4eea4d
make dag local folder parametrizable and upload optional + added unit…
arthurprevot Jul 14, 2024
82e4c3b
fix get_aws_setup()
arthurprevot Jul 14, 2024
f95dd20
fix to create multiple directories. Necessary to pass unit-test
arthurprevot Jul 15, 2024
b7cdd81
fix unit-test with new k8s params
arthurprevot Jul 15, 2024
5a3de1f
minor
arthurprevot Jul 15, 2024
bf527f6
update test_create_dags() to check content of file (EMR only for now)…
arthurprevot Jul 15, 2024
cab394a
move fct.
arthurprevot Jul 15, 2024
e88bcd0
cleaning
arthurprevot Jul 15, 2024
89479a5
fix test, tested to work in every cases now.
arthurprevot Jul 16, 2024
eb08a38
added unit-test for airflow_k8s dag
arthurprevot Jul 16, 2024
4a0b621
cleaning
arthurprevot Jul 16, 2024
b5cf543
minor
arthurprevot Jul 16, 2024
ef1e211
fix import
arthurprevot Jul 16, 2024
1101553
comments
arthurprevot Jul 16, 2024
d6c2d20
cleaning
arthurprevot Jul 16, 2024
b7bf64a
lint
arthurprevot Jul 16, 2024
faa3503
fix test
arthurprevot Jul 16, 2024
c02a931
lint
arthurprevot Jul 16, 2024
e63133b
logging
arthurprevot Jul 18, 2024
cfee676
fix list_files_cluster()
arthurprevot Jul 18, 2024
36202b2
improve placeholder unit-test to have case of param inside param
arthurprevot Jul 21, 2024
2580b0d
logs
arthurprevot Jul 21, 2024
9def33f
yaetos version moved to 0.12.1, about to be submitted to pypi
arthurprevot Jul 21, 2024
566e963
update file copy post pypify.sh
arthurprevot Jul 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile_k8s
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM docker.io/bitnami/spark:3.5.0
USER root

RUN python -m pip install --upgrade pip
RUN pip3 install --no-deps yaetos==0.12.0
RUN pip3 install --no-deps yaetos==0.12.1
# Force latest version to avoid using previous ones.
RUN pip3 install -r /opt/bitnami/python/lib/python3.11/site-packages/yaetos/scripts/requirements_base.txt
# Installing libraries required by Yaetos and more. Using this since requirements_base.txt has exact versions.
Expand Down
20 changes: 20 additions & 0 deletions conf/jobs_metadata.yml
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,16 @@ jobs:
output: {'path':'{{base_path}}/wiki_example/output_ex20_filelist/{{now}}/dataset.csv', 'type':'csv', 'df_type':'pandas'}
spark_boot: False

examples/ex21_deploy_airflow_job:
description: "Test deployment to airflow."
py_job: jobs/generic/copy_job.py
inputs:
table_to_copy: {'path':"tests/fixtures/data_sample/wiki_example/input/dataset.csv", 'type':'csv'}
output: {'path':'{{base_path}}/load_example/test_files/{{now}}/dataset.csv', 'type':'csv'}
s3_dags: 's3://mylake-dev/pipelines_metadata/airflow_dags'
local_dags: './airflow_dags/'
spark_boot: False

# wordcount_raw_job: #Job exists but doesn't rely on jobs_metadata entries

# ----- Marketing Jobs --------
Expand Down Expand Up @@ -458,6 +468,16 @@ common_params:
redshift_s3_tmp_dir: s3a://dev-spark/tmp_spark/
email_cred_section: some_email_cred_section # Section from "connection_file"
spark_version: '3.5' # options: '2.4', '3.0', '3.4' or '3.5'
k8s_url: 'k8s://https://kubernetes.docker.internal:6443'
k8s_name: 'my-pyspark-job'
k8s_executor_instances: '2'
k8s_namespace: 'a_k8s_namespace'
k8s_image_service: 'a_k8s_image_service'
k8s_upload_path: 's3a://a_k8s_upload_path'
k8s_driver_podTemplateFile: conf/k8s_setup_spark_submit_driver.yaml
k8s_executor_podTemplateFile: conf/k8s_setup_spark_submit_executor.yaml
aws_region: eu-west-1 # TODO: remove this.
k8s_podname: a_podname # TODO: make it nullable so jobs can rerun back to back.
default_aws_modes: 'dev_EMR'
default_local_modes: 'dev_local'
aws_modes: ['dev_EMR','prod_EMR']
Expand Down
2 changes: 1 addition & 1 deletion jobs/examples/ex15_copy_job_multi_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def expand_input_path(self, path, **kwargs):
base_path = self.jargs.base_path
path_partly_expanded = path.replace('{category}', category) \
.replace('{subcategory}', subcategory)
path = Path_Handler(path_partly_expanded, base_path, self.jargs.merged_args.get('root_path')).expand_later()
path = Path_Handler(path_partly_expanded, base_path, self.jargs.merged_args.get('root_path')).expand_latest()
return path

def expand_output_path(self, path, now_dt, **kwargs):
Expand Down
7 changes: 5 additions & 2 deletions jobs/generic/copy_raw_job.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Job meant to run locally to get data from AWS S3 to local. Updates required to run in cluster.
"""
from yaetos.etl_utils import ETL_Base, Commandliner, get_aws_setup
from yaetos.etl_utils import ETL_Base, Commandliner, get_aws_setup # FS_Ops_Dispatcher
import os
from cloudpathlib import CloudPath as CPt
import fnmatch
Expand All @@ -27,14 +27,17 @@ def transform(self, files_to_copy):
pattern = '*'
pattern_type = 'glob'

# TODO: replace code below (and all functions) with the commented code
# FS_Ops_Dispatcher().copy_file(path_in, path_out)

session = get_aws_setup(self.jargs.merged_args)
s3 = session.client('s3')

file_number = self.get_size(s3, path_raw_in.bucket, path_raw_in.key, pattern, pattern_type)
self.logger.info(f"Number of files to be downloaded {file_number}")

self.download_files(s3, path_raw_in.bucket, path_raw_in.key, pattern, pattern_type, path_raw_out)
self.logger.info("Finished downloading all files")
self.logger.info("Finished copying all files")
return None

def download_files(self, s3, bucket_name, prefix, pattern, pattern_type, path_raw_out):
Expand Down
5 changes: 4 additions & 1 deletion jobs/generic/list_files_job.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from yaetos.etl_utils import ETL_Base, Commandliner, get_aws_setup
from yaetos.etl_utils import ETL_Base, Commandliner, get_aws_setup # FS_Ops_Dispatcher
from cloudpathlib import CloudPath as CPt
import fnmatch
import re
Expand Down Expand Up @@ -26,6 +26,9 @@ def transform(self, files):
pattern = '*'
pattern_type = 'glob'

# TODO: replace code below (and all functions) with the commented code
# files = FS_Ops_Dispatcher().list_files(path, regex=None, globy=None)

session = get_aws_setup(self.jargs.merged_args)

s3 = session.client('s3')
Expand Down
140 changes: 140 additions & 0 deletions tests/fixtures/ref_airflow_emr_job_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@

from airflow import DAG
from airflow.providers.amazon.aws.operators.emr import EmrCreateJobFlowOperator, EmrAddStepsOperator # EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor # EmrJobFlowSensor
from airflow.utils.dates import days_ago # noqa: F401
from datetime import timedelta
import dateutil


DAG_ARGS = {
'dag_id': 'ex-job_x',
'dagrun_timeout': timedelta(hours=2),
'start_date': dateutil.parser.parse("2024-07-15T00:00:00+00:00"), # ignore_in_diff
'schedule': '@once',
'tags': ['emr'],
'default_args': {
'owner': 'me',
'depends_on_past': False,
'email': [],
'email_on_failure': False,
'email_on_retry': False,
},
}


CLUSTER_JOB_FLOW_OVERRIDES = {
'Name': 'yaetos__ex_s_job_x__20240101T000000', # ignore_in_diff
'ReleaseLabel': 'emr-6.1.1',
'Applications': [{'Name': 'Hadoop'}, {'Name': 'Spark'}],
'Instances': {
'InstanceGroups': [
{
'Name': "Main nodes",
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
},

{
'Name': 'Secondary nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm5.xlarge',
'InstanceCount': '2',
}

],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
'Ec2KeyName': 'to_be_filled',
'Ec2SubnetId': 'to_be_filled',
# 'AdditionalMasterSecurityGroups': extra_security_gp, # TODO : make optional in future. "[self.extra_security_gp] if self.extra_security_gp else []" doesn't work.
},
'VisibleToAllUsers': True,
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
'LogUri': "s3://mylake-dev/pipelines_metadata/manual_run_logs/",
'BootstrapActions': [{
'Name': 'setup_nodes',
'ScriptBootstrapAction': {
'Path': 's3n://mylake-dev/pipelines_metadata/jobs_code/yaetos__ex_s_job_x__20240701T000000/code_package/setup_nodes.sh', # ignore_in_diff
'Args': []
}
}],
'Configurations': [
{ # Section to force python3 since emr-5.x uses python2 by default.
"Classification": "spark-env",
"Configurations": [{
"Classification": "export",
"Properties": {"PYSPARK_PYTHON": "/usr/bin/python3"}
}]
},
# { # Section to add jars (redshift...), not used for now, since passed in spark-submit args.
# "Classification": "spark-defaults",
# "Properties": { "spark.jars": ["/home/hadoop/redshift_tbd.jar"], "spark.driver.memory": "40G", "maximizeResourceAllocation": "true"},
# }
]
}

EMR_STEPS = [
{
'Name': 'Run Setup',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://to_be_filled.elasticmapreduce/libs/script-runner/script-runner.jar',
'Args': [
"s3://mylake-dev/pipelines_metadata/jobs_code/yaetos__ex_s_job_x__20240701T000000/code_package/setup_master.sh", # ignore_in_diff
"s3://mylake-dev/pipelines_metadata/jobs_code/yaetos__ex_s_job_x__20240701T000000/code_package", # ignore_in_diff
]
}
},
{
'Name': 'Spark Application',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['spark-submit', '--verbose', '--py-files=/home/hadoop/app/scripts.zip', '/home/hadoop/app/some/job.py', '--mode=None', '--deploy=none', '--storage=s3', '--job_name=ex/job_x'],
},
}
]

with DAG(**DAG_ARGS) as dag:

cluster_creator = EmrCreateJobFlowOperator(
task_id='start_emr_cluster',
aws_conn_id='aws_default',
emr_conn_id='emr_default',
job_flow_overrides=CLUSTER_JOB_FLOW_OVERRIDES
)

step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='start_emr_cluster', key='return_value') }}",
aws_conn_id='aws_default',
steps=EMR_STEPS,
)

step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('start_emr_cluster', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[1] }}", # [1] to watch 2nd step, the spark application.
aws_conn_id='aws_default',
)

# # not used for now
# cluster_checker = EmrJobFlowSensor(
# task_id='check_cluster',
# job_flow_id="{{ task_instance.xcom_pull('start_emr_cluster', key='return_value') }}",
# aws_conn_id='aws_default',
# )

# # not used for now
# terminate_cluster = EmrTerminateJobFlowOperator(
# task_id='terminate_cluster',
# job_flow_id="{{ task_instance.xcom_pull('start_emr_cluster', key='return_value') }}",
# aws_conn_id='aws_default',
# )

cluster_creator >> step_adder >> step_checker
45 changes: 45 additions & 0 deletions tests/fixtures/ref_airflow_k8s_job_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@

from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
from airflow.utils.dates import days_ago # noqa: F401
from datetime import timedelta
import dateutil


DAG_ARGS = {
'dag_id': 'ex-job_x',
'dagrun_timeout': timedelta(hours=2),
'start_date': dateutil.parser.parse("2024-07-15T00:00:00+00:00"), # ignore_in_diff
'schedule': '@once',
'tags': ['emr'],
'default_args': {
'owner': 'me',
'depends_on_past': False,
'email': [],
'email_on_failure': False,
'email_on_retry': False,
},
}


with DAG(**DAG_ARGS) as dag:

spark_submit = SparkKubernetesOperator(
task_id='spark_submit_task',
namespace='None',
application_file='None',
kubernetes_conn_id='k8s_default',
do_xcom_push=True,
)

spark_sensor = SparkKubernetesSensor(
task_id='watch_step',
namespace='None',
application_name="{{ task_instance.xcom_pull(task_ids='spark_submit_task')['metadata']['name'] }}",
kubernetes_conn_id='k8s_default',
poke_interval=60,
timeout=600,
)

spark_submit >> spark_sensor
Loading
Loading