diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..195e4a1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +.vscode +setup-airflow/logs +setup-airflow/plugins +setup-pentaho/logs +__pycache__ +.meta +.env +# jdbc.properties \ No newline at end of file diff --git a/README.md b/README.md index 116f506..6870320 100644 --- a/README.md +++ b/README.md @@ -1 +1,99 @@ -docker-airflow-pdi-01 +# Description + +Step by step approach to easily dockerize Airflow and Pentaho Data Integration **IN SEPARATE CONTAINERS**. +Below is the high level architecture of the setup: +- Airflow: + - Orchestrator container + - Sends transformation/job metadata as task to Pentaho container + +- Pentaho: + - Container receives transformation/job details as task to be done + - Performs (runs) the actual task (transformation/job) + + +# Pre-requisites +- [Docker Engine](https://docs.docker.com/engine/install/) +- [Docker Compose](https://docs.docker.com/compose/install/) + +# Versions +- Airflow 2.0 +- PDI 9.1 + + # Setup +Change directory to the project folder before performing below steps. + +### Environment variables, files & folders for containers +- Create a .env file and add the user and group Ids for the respective containers. +This is required for the containers to have same access privileges as that of the host user during docker compose. + + echo -e "PENTAHO_UID=$(id -u)\nPENTAHO_GID=0\nAIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env + +- If needed, append the below optional variables to the above .env file. + + echo -e "=" >> .env + - HOST_ENV --> run containers as localhost/dev/qa/prod. This will copy corresponding kettle.properties into the PDI container. Also enables PDI transformations to pick environment specific DB JNDI connections during execution. Can be used by Airflow to connect to corresponding resources. + - CARTE_USER --> Default: cluster + - CARTE_PASSWORD --> Default: cluster + - AIRFLOW_ADMIN_USER --> Create Web UI user. Default: airflow + - AIRFLOW_ADMIN_PASSWORD --> Default: airflow + - AIRFLOW_ADMIN_EMAIL --> Required if new user to be created + - PENTAHO_DI_JAVA_OPTIONS --> Allocate JVM memory to PDI container, based on host machine RAM. Increase if container crashes due to GC Out of memory. Ex: for Min. 1G and Max 4G, set this to "-Xms1g -Xmx4g" + - CARTE_HOST_PORT --> Default: 8181 + - AIRFLOW_HOST_PORT --> Default: 8080 + + - Create below folders for the container volumes to bind + + mkdir ./setup-airflow/logs ./setup-airflow/plugins ./setup-pentaho/logs + + +- Source Code +Since the DAGs/PDI source code files might undergo frequent updates, they are not copied into the container during image build, instead mounted via docker compose. Any update to these source code files on host will automatically get visible inside the container. + + - Airflow: + - Default folder for DAGs on host is ./source-code/dags + - Replace the above default folder in the docker compose file, with the desired folder location on host. + - Place all the DAG files in the above host dags folder. + + - Pentaho: + - Default folder for ktr/kjb files on host is ./source-code/ktrs + - Replace the above default folder in the docker compose file, with the desired folder location on host. + - Place all the PDI files in the above host ktrs folder. + - Update repositories.xml file accordingly, to make them visible to Carte. + +### Build & Deploy +Below command will build (if first time) and start all the services. + + docker-compose up +To run as daemon, add -d option. + +# Web UI +- If not localhost, replace with server endpoint Url +- If not below default ports, replace with the ones used during CARTE_HOST_PORT & AIRFLOW_HOST_PORT setup. + +Airflow Webserver + + localhost:8080/home + +Carte Webserver + + localhost:8181/kettle/status + +# Best practices +- ```jdbc.properties``` file, which contains database access credentials, has been included in this repo for reference purpose only. In actual development, this should be avoided and needs to be added to gitignore instead. After first code pull to a server, update it with all JNDI details before docker compose. + +- ```.env``` file also may contain sensitive information, like environment dependent access keys. This also should be added to .gitignore file. Instead create this file with necessary parameters during image build. + +- ```HOST_ENV``` setting this parameter gives us a flexibility to choose appropriate ```kettle.properties``` file. For example, QA and PROD mailing server SMTP details may differ. This can be included in separate kettle properties file, to be selected dynamically based on the host environment. Not only this, if one uses the ```jdbc.properties``` file, we can enable PDI container dynamically select the correct JNDI from ```jdbc.properties``` file. For ex: if one needs to test a transformation in QA environemnt using Postgres JNDI connection encoded as ```db-${HOST_ENV}```, running PDI service with ```HOST_ENV=qa```, will render ```db-qa``` database JNDI, thus using QA data for testing. + +- ```PENTAHO_DI_JAVA_OPTIONS``` Having this option lets the user tweak the amount of memory PDI gets inside the container, to run a task. Depending on the host machine memory and average task complexity, this can be modified to avoid PDI container crash due to "GC Out of Memory" errors. If host machine has ample RAM and PDI container is crashing due to the default memory limits, we can increase it by setting ```PENTAHO_DI_JAVA_OPTIONS=-Xms1g -Xmx4g``` 1GB and 4GB being the lower and upper limits respectively. + +# References & Credits +- [What is Carte Server ?](https://wiki.pentaho.com/display/EAI/Carte+User+Documentation) + +- [Configure Carte Server](https://help.pentaho.com/Documentation/8.0/Products/Data_Integration/Carte_Clusters/060) + +- [Set Repository on the Carte Server](https://help.pentaho.com/Documentation/9.1/Products/Use_Carte_Clusters) + +- [Carte APIs to trigger kettle transformation/jobs](https://help.pentaho.com/Documentation/9.1/Developer_center/REST_API_Reference/Carte) + +- [Scheduling a PDI job using Dockerized Airflow](https://diethardsteiner.github.io/pdi/2020/04/01/Scheduling-a-PDI-Job-on-Apache-Airflow.html) \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100755 index 0000000..462af27 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,151 @@ +version: '3' +x-pdi-common: + &pdi-common + build: + context: ./setup-pentaho + dockerfile: Dockerfile + args: + PENTAHO_UID: ${PENTAHO_UID} + PENTAHO_GID: ${PENTAHO_GID} + image: pdi + environment: + &pdi-common-env + PENTAHO_DI_JAVA_OPTIONS: ${PENTAHO_DI_JAVA_OPTIONS} + CARTE_USER: ${CARTE_USER} + CARTE_PASSWORD: ${CARTE_PASSWORD} + volumes: + - /var/run/docker.sock:/var/run/docker.sock + - ./source-code/ktrs:/home/pentaho/repositories + - ./setup-pentaho/logs:/opt/data-integration/logs + - ./setup-pentaho/repositories.xml:/opt/data-integration/.kettle/repositories.xml + - ./setup-pentaho/kettle-properties/${HOST_ENV:-localhost}-kettle.properties:/opt/data-integration/.kettle/kettle.properties + - ./setup-pentaho/simple-jndi:/opt/data-integration/simple-jndi + deploy: + restart_policy: + condition: on-failure + max_attempts: 3 + +x-airflow-common: + &airflow-common + build: ./setup-airflow + image: airflow + environment: + &airflow-common-env + AIRFLOW__CORE__EXECUTOR: CeleryExecutor + AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@airflow-database/airflow + AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-database/airflow + AIRFLOW__CELERY__BROKER_URL: redis://:@airflow-broker:6379/0 + AIRFLOW__CORE__FERNET_KEY: '' + AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' + AIRFLOW__CORE__LOAD_EXAMPLES: 'false' + PDI_CONN_STR: http://${CARTE_USER:-cluster}:${CARTE_PASSWORD:-cluster}@pdi-master:${CARTE_HOST_PORT:-8181} + volumes: + - ./source-code/dags:/opt/airflow/dags + - ./setup-airflow/plugins:/opt/airflow/plugins + - ./setup-airflow/logs:/opt/airflow/logs + - ./setup-airflow/execute-carte.sh:/opt/airflow/execute-carte.sh + - ./setup-airflow/airflow.cfg:/opt/airflow/airflow.cfg + user: "${AIRFLOW_UID}:${AIRFLOW_GID}" + depends_on: + airflow-broker: + condition: service_healthy + airflow-database: + condition: service_healthy + + +services: +# Airflow-DB + airflow-database: + image: postgres:13 + container_name: airflow-database + environment: + POSTGRES_USER: airflow + POSTGRES_PASSWORD: airflow + POSTGRES_DB: airflow + volumes: + - postgres-db-volume:/var/lib/postgresql/data + healthcheck: + test: ["CMD", "pg_isready", "-U", "airflow"] + interval: 5s + retries: 5 + restart: always + +# Airflow-messenger + airflow-broker: + image: redis:latest + container_name: airflow-broker + ports: + - 6379:6379 + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 30s + retries: 50 + restart: always + +# Airflow-webserver + airflow-webserver: + <<: *airflow-common + container_name: airflow-webserver + command: webserver + ports: + - ${AIRFLOW_HOST_PORT:-8080}:8080 + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:${AIRFLOW_HOST_PORT:-8080}/health"] + interval: 10s + timeout: 10s + retries: 5 + restart: always + +# Airflow-scheduler + airflow-scheduler: + <<: *airflow-common + container_name: airflow-scheduler + command: scheduler + restart: always + +# Airflow-worker + airflow-worker: + <<: *airflow-common + command: celery worker + restart: always + +# Airflow-DB-initialize + airflow-init: + <<: *airflow-common + container_name: airflow-init + command: version + environment: + <<: *airflow-common-env + _AIRFLOW_DB_UPGRADE: 'true' + _AIRFLOW_WWW_USER_CREATE: 'true' + _AIRFLOW_WWW_USER_USERNAME: ${AIRFLOW_ADMIN_USER:-airflow} + _AIRFLOW_WWW_USER_PASSWORD: ${AIRFLOW_ADMIN_PASSWORD:-airflow} + _AIRFLOW_WWW_USER_EMAIL: ${AIRFLOW_ADMIN_EMAIL:-admin@admin.com} + +# Pentaho + pdi-master: + << : *pdi-common + container_name: pdi-master + environment: + <<: *pdi-common-env + ports: + - ${CARTE_HOST_PORT:-8181}:8181 + + # pdi-child: + # << : *pdi-common + # container_name: pdi-child + # ports: + # - 8182 + # depends_on: + # - pdi-master + # environment: + # <<: *pdi-common-env + # CARTE_PORT: 8182 + # CARTE_IS_MASTER: 'N' + # CARTE_INCLUDE_MASTERS: 'Y' + # CARTE_MASTER_HOSTNAME: 'pdi-master' + # CARTE_MASTER_PORT: ${CARTE_HOST_PORT:-8181} + +volumes: + postgres-db-volume: \ No newline at end of file diff --git a/setup-airflow/Dockerfile b/setup-airflow/Dockerfile new file mode 100644 index 0000000..450f67a --- /dev/null +++ b/setup-airflow/Dockerfile @@ -0,0 +1,14 @@ +FROM apache/airflow:2.0.1 + +USER root + +# Install environment dependencies +RUN apt-get update \ +# xmlstarlet package is required by Airflow to read XML log generated by Carte server running in separate container + && apt-get install xmlstarlet -y \ +# Upgrade PIP + && pip install --upgrade pip \ +# Install project specific packages + && pip install 'apache-airflow[postgres]' + +USER airflow \ No newline at end of file diff --git a/setup-airflow/airflow.cfg b/setup-airflow/airflow.cfg new file mode 100644 index 0000000..dd59b93 --- /dev/null +++ b/setup-airflow/airflow.cfg @@ -0,0 +1,1000 @@ +[core] +# The folder where your airflow pipelines live, most likely a +# subfolder in a code repository. This path must be absolute. +dags_folder = /opt/airflow/dags + +# Hostname by providing a path to a callable, which will resolve the hostname. +# The format is "package.function". +# +# For example, default value "socket.getfqdn" means that result from getfqdn() of "socket" +# package will be used as hostname. +# +# No argument should be required in the function specified. +# If using IP address as hostname is preferred, use value ``airflow.utils.net.get_host_ip_address`` +hostname_callable = socket.getfqdn + +# Default timezone in case supplied date times are naive +# can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam) +default_timezone = utc + +# The executor class that airflow should use. Choices include +# ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``, +# ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the +# full import path to the class when using a custom executor. +executor = SequentialExecutor + +# The SqlAlchemy connection string to the metadata database. +# SqlAlchemy supports many different database engine, more information +# their website +sql_alchemy_conn = sqlite:////opt/airflow/airflow.db + +# The encoding for the databases +sql_engine_encoding = utf-8 + +# Collation for ``dag_id``, ``task_id``, ``key`` columns in case they have different encoding. +# This is particularly useful in case of mysql with utf8mb4 encoding because +# primary keys for XCom table has too big size and ``sql_engine_collation_for_ids`` should +# be set to ``utf8mb3_general_ci``. +# sql_engine_collation_for_ids = + +# If SqlAlchemy should pool database connections. +sql_alchemy_pool_enabled = True + +# The SqlAlchemy pool size is the maximum number of database connections +# in the pool. 0 indicates no limit. +sql_alchemy_pool_size = 5 + +# The maximum overflow size of the pool. +# When the number of checked-out connections reaches the size set in pool_size, +# additional connections will be returned up to this limit. +# When those additional connections are returned to the pool, they are disconnected and discarded. +# It follows then that the total number of simultaneous connections the pool will allow +# is pool_size + max_overflow, +# and the total number of "sleeping" connections the pool will allow is pool_size. +# max_overflow can be set to ``-1`` to indicate no overflow limit; +# no limit will be placed on the total number of concurrent connections. Defaults to ``10``. +sql_alchemy_max_overflow = 10 + +# The SqlAlchemy pool recycle is the number of seconds a connection +# can be idle in the pool before it is invalidated. This config does +# not apply to sqlite. If the number of DB connections is ever exceeded, +# a lower config value will allow the system to recover faster. +sql_alchemy_pool_recycle = 1800 + +# Check connection at the start of each connection pool checkout. +# Typically, this is a simple statement like "SELECT 1". +# More information here: +# https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic +sql_alchemy_pool_pre_ping = True + +# The schema to use for the metadata database. +# SqlAlchemy supports databases with the concept of multiple schemas. +sql_alchemy_schema = + +# Import path for connect args in SqlAlchemy. Defaults to an empty dict. +# This is useful when you want to configure db engine args that SqlAlchemy won't parse +# in connection string. +# See https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine.params.connect_args +# sql_alchemy_connect_args = + +# The amount of parallelism as a setting to the executor. This defines +# the max number of task instances that should run simultaneously +# on this airflow installation +parallelism = 32 + +# The number of task instances allowed to run concurrently by the scheduler +# in one DAG. Can be overridden by ``concurrency`` on DAG level. +dag_concurrency = 16 + +# Are DAGs paused by default at creation +dags_are_paused_at_creation = True + +# The maximum number of active DAG runs per DAG +max_active_runs_per_dag = 16 + +# Whether to load the DAG examples that ship with Airflow. It's good to +# get started, but you probably want to set this to ``False`` in a production +# environment +load_examples = False + +# Whether to load the default connections that ship with Airflow. It's good to +# get started, but you probably want to set this to ``False`` in a production +# environment +load_default_connections = True + +# Path to the folder containing Airflow plugins +plugins_folder = /opt/airflow/plugins + +# Should tasks be executed via forking of the parent process ("False", +# the speedier option) or by spawning a new python process ("True" slow, +# but means plugin changes picked up by tasks straight away) +execute_tasks_new_python_interpreter = False + +# Secret key to save connection passwords in the db +fernet_key = hjIFXCPQL6ZZx-dN7Kpr5yULTMFmLK-skgH9KdKeA1I= + +# Whether to disable pickling dags +donot_pickle = True + +# How long before timing out a python file import +dagbag_import_timeout = 30.0 + +# Should a traceback be shown in the UI for dagbag import errors, +# instead of just the exception message +dagbag_import_error_tracebacks = True + +# If tracebacks are shown, how many entries from the traceback should be shown +dagbag_import_error_traceback_depth = 2 + +# How long before timing out a DagFileProcessor, which processes a dag file +dag_file_processor_timeout = 50 + +# The class to use for running task instances in a subprocess. +# Choices include StandardTaskRunner, CgroupTaskRunner or the full import path to the class +# when using a custom task runner. +task_runner = StandardTaskRunner + +# If set, tasks without a ``run_as_user`` argument will be run with this user +# Can be used to de-elevate a sudo user running Airflow when executing tasks +default_impersonation = + +# What security module to use (for example kerberos) +security = + +# Turn unit test mode on (overwrites many configuration options with test +# values at runtime) +unit_test_mode = False + +# Whether to enable pickling for xcom (note that this is insecure and allows for +# RCE exploits). +enable_xcom_pickling = False + +# When a task is killed forcefully, this is the amount of time in seconds that +# it has to cleanup after it is sent a SIGTERM, before it is SIGKILLED +killed_task_cleanup_time = 60 + +# Whether to override params with dag_run.conf. If you pass some key-value pairs +# through ``airflow dags backfill -c`` or +# ``airflow dags trigger -c``, the key-value pairs will override the existing ones in params. +dag_run_conf_overrides_params = True + +# When discovering DAGs, ignore any files that don't contain the strings ``DAG`` and ``airflow``. +dag_discovery_safe_mode = True + +# The number of retries each task is going to have by default. Can be overridden at dag or task level. +default_task_retries = 0 + +# Updating serialized DAG can not be faster than a minimum interval to reduce database write rate. +min_serialized_dag_update_interval = 30 + +# Fetching serialized DAG can not be faster than a minimum interval to reduce database +# read rate. This config controls when your DAGs are updated in the Webserver +min_serialized_dag_fetch_interval = 10 + +# Whether to persist DAG files code in DB. +# If set to True, Webserver reads file contents from DB instead of +# trying to access files in a DAG folder. +# Example: store_dag_code = False +# store_dag_code = + +# Maximum number of Rendered Task Instance Fields (Template Fields) per task to store +# in the Database. +# All the template_fields for each of Task Instance are stored in the Database. +# Keeping this number small may cause an error when you try to view ``Rendered`` tab in +# TaskInstance view for older tasks. +max_num_rendered_ti_fields_per_task = 30 + +# On each dagrun check against defined SLAs +check_slas = True + +# Path to custom XCom class that will be used to store and resolve operators results +# Example: xcom_backend = path.to.CustomXCom +xcom_backend = airflow.models.xcom.BaseXCom + +# By default Airflow plugins are lazily-loaded (only loaded when required). Set it to ``False``, +# if you want to load plugins whenever 'airflow' is invoked via cli or loaded from module. +lazy_load_plugins = True + +# By default Airflow providers are lazily-discovered (discovery and imports happen only when required). +# Set it to False, if you want to discover providers whenever 'airflow' is invoked via cli or +# loaded from module. +lazy_discover_providers = True + +# Number of times the code should be retried in case of DB Operational Errors. +# Not all transactions will be retried as it can cause undesired state. +# Currently it is only used in ``DagFileProcessor.process_file`` to retry ``dagbag.sync_to_db``. +max_db_retries = 3 + +[logging] +# The folder where airflow should store its log files +# This path must be absolute +base_log_folder = /opt/airflow/logs + +# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search. +# Set this to True if you want to enable remote logging. +remote_logging = False + +# Users must supply an Airflow connection id that provides access to the storage +# location. +remote_log_conn_id = + +# Path to Google Credential JSON file. If omitted, authorization based on `the Application Default +# Credentials +# `__ will +# be used. +google_key_path = + +# Storage bucket URL for remote logging +# S3 buckets should start with "s3://" +# Cloudwatch log groups should start with "cloudwatch://" +# GCS buckets should start with "gs://" +# WASB buckets should start with "wasb" just to help Airflow select correct handler +# Stackdriver logs should start with "stackdriver://" +remote_base_log_folder = + +# Use server-side encryption for logs stored in S3 +encrypt_s3_logs = False + +# Logging level +logging_level = INFO + +# Logging level for Flask-appbuilder UI +fab_logging_level = WARN + +# Logging class +# Specify the class that will specify the logging configuration +# This class has to be on the python classpath +# Example: logging_config_class = my.path.default_local_settings.LOGGING_CONFIG +logging_config_class = + +# Flag to enable/disable Colored logs in Console +# Colour the logs when the controlling terminal is a TTY. +colored_console_log = True + +# Log format for when Colored logs is enabled +colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s +colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter + +# Format of Log line +log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s +simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s + +# Specify prefix pattern like mentioned below with stream handler TaskHandlerWithCustomFormatter +# Example: task_log_prefix_template = {ti.dag_id}-{ti.task_id}-{execution_date}-{try_number} +task_log_prefix_template = + +# Formatting for how airflow generates file names/paths for each task run. +log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log + +# Formatting for how airflow generates file names for log +log_processor_filename_template = {{ filename }}.log + +# full path of dag_processor_manager logfile +dag_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log + +# Name of handler to read task instance logs. +# Defaults to use ``task`` handler. +task_log_reader = task + +# A comma\-separated list of third-party logger names that will be configured to print messages to +# consoles\. +# Example: extra_loggers = connexion,sqlalchemy +extra_loggers = + +[metrics] + +# StatsD (https://github.com/etsy/statsd) integration settings. +# Enables sending metrics to StatsD. +statsd_on = False +statsd_host = localhost +statsd_port = 8125 +statsd_prefix = airflow + +# If you want to avoid sending all the available metrics to StatsD, +# you can configure an allow list of prefixes (comma separated) to send only the metrics that +# start with the elements of the list (e.g: "scheduler,executor,dagrun") +statsd_allow_list = + +# A function that validate the statsd stat name, apply changes to the stat name if necessary and return +# the transformed stat name. +# +# The function should have the following signature: +# def func_name(stat_name: str) -> str: +stat_name_handler = + +# To enable datadog integration to send airflow metrics. +statsd_datadog_enabled = False + +# List of datadog tags attached to all metrics(e.g: key1:value1,key2:value2) +statsd_datadog_tags = + +# If you want to utilise your own custom Statsd client set the relevant +# module path below. +# Note: The module path must exist on your PYTHONPATH for Airflow to pick it up +# statsd_custom_client_path = + +[secrets] +# Full class name of secrets backend to enable (will precede env vars and metastore in search path) +# Example: backend = airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend +backend = + +# The backend_kwargs param is loaded into a dictionary and passed to __init__ of secrets backend class. +# See documentation for the secrets backend you are using. JSON is expected. +# Example for AWS Systems Manager ParameterStore: +# ``{"connections_prefix": "/airflow/connections", "profile_name": "default"}`` +backend_kwargs = + +[cli] +# In what way should the cli access the API. The LocalClient will use the +# database directly, while the json_client will use the api running on the +# webserver +api_client = airflow.api.client.local_client + +# If you set web_server_url_prefix, do NOT forget to append it here, ex: +# ``endpoint_url = http://localhost:8080/myroot`` +# So api will look like: ``http://localhost:8080/myroot/api/experimental/...`` +endpoint_url = http://localhost:8080 + +[debug] +# Used only with ``DebugExecutor``. If set to ``True`` DAG will fail with first +# failed task. Helpful for debugging purposes. +fail_fast = False + +[api] +# Enables the deprecated experimental API. Please note that these APIs do not have access control. +# The authenticated user has full access. +# +# .. warning:: +# +# This `Experimental REST API `__ is +# deprecated since version 2.0. Please consider using +# `the Stable REST API `__. +# For more information on migration, see +# `UPDATING.md `_ +enable_experimental_api = False + +# How to authenticate users of the API. See +# https://airflow.apache.org/docs/stable/security.html for possible values. +# ("airflow.api.auth.backend.default" allows all requests for historic reasons) +auth_backend = airflow.api.auth.backend.deny_all + +# Used to set the maximum page limit for API requests +maximum_page_limit = 100 + +# Used to set the default page limit when limit is zero. A default limit +# of 100 is set on OpenApi spec. However, this particular default limit +# only work when limit is set equal to zero(0) from API requests. +# If no limit is supplied, the OpenApi spec default is used. +fallback_page_limit = 100 + +# The intended audience for JWT token credentials used for authorization. This value must match on the client and server sides. If empty, audience will not be tested. +# Example: google_oauth2_audience = project-id-random-value.apps.googleusercontent.com +google_oauth2_audience = + +# Path to Google Cloud Service Account key file (JSON). If omitted, authorization based on +# `the Application Default Credentials +# `__ will +# be used. +# Example: google_key_path = /files/service-account-json +google_key_path = + +[lineage] +# what lineage backend to use +backend = + +[atlas] +sasl_enabled = False +host = +port = 21000 +username = +password = + +[operators] +# The default owner assigned to each new operator, unless +# provided explicitly or passed via ``default_args`` +default_owner = airflow +default_cpus = 1 +default_ram = 512 +default_disk = 512 +default_gpus = 0 + +# Is allowed to pass additional/unused arguments (args, kwargs) to the BaseOperator operator. +# If set to False, an exception will be thrown, otherwise only the console message will be displayed. +allow_illegal_arguments = False + +[hive] +# Default mapreduce queue for HiveOperator tasks +default_hive_mapred_queue = + +# Template for mapred_job_name in HiveOperator, supports the following named parameters +# hostname, dag_id, task_id, execution_date +# mapred_job_name_template = + +[webserver] +# The base url of your website as airflow cannot guess what domain or +# cname you are using. This is used in automated emails that +# airflow sends to point links to the right web server +base_url = http://localhost:8080 + +# Default timezone to display all dates in the UI, can be UTC, system, or +# any IANA timezone string (e.g. Europe/Amsterdam). If left empty the +# default value of core/default_timezone will be used +# Example: default_ui_timezone = America/New_York +default_ui_timezone = UTC + +# The ip specified when starting the web server +web_server_host = 0.0.0.0 + +# The port on which to run the web server +web_server_port = 8080 + +# Paths to the SSL certificate and key for the web server. When both are +# provided SSL will be enabled. This does not change the web server port. +web_server_ssl_cert = + +# Paths to the SSL certificate and key for the web server. When both are +# provided SSL will be enabled. This does not change the web server port. +web_server_ssl_key = + +# Number of seconds the webserver waits before killing gunicorn master that doesn't respond +web_server_master_timeout = 120 + +# Number of seconds the gunicorn webserver waits before timing out on a worker +web_server_worker_timeout = 120 + +# Number of workers to refresh at a time. When set to 0, worker refresh is +# disabled. When nonzero, airflow periodically refreshes webserver workers by +# bringing up new ones and killing old ones. +worker_refresh_batch_size = 1 + +# Number of seconds to wait before refreshing a batch of workers. +worker_refresh_interval = 30 + +# If set to True, Airflow will track files in plugins_folder directory. When it detects changes, +# then reload the gunicorn. +reload_on_plugin_change = False + +# Secret key used to run your flask app +# It should be as random as possible +secret_key = JK3PU6syfBItlK8mgHrYnA== + +# Number of workers to run the Gunicorn web server +workers = 4 + +# The worker class gunicorn should use. Choices include +# sync (default), eventlet, gevent +worker_class = sync + +# Log files for the gunicorn webserver. '-' means log to stderr. +access_logfile = - + +# Log files for the gunicorn webserver. '-' means log to stderr. +error_logfile = - + +# Access log format for gunicorn webserver. +# default format is %%(h)s %%(l)s %%(u)s %%(t)s "%%(r)s" %%(s)s %%(b)s "%%(f)s" "%%(a)s" +# documentation - https://docs.gunicorn.org/en/stable/settings.html#access-log-format +access_logformat = + +# Expose the configuration file in the web server +expose_config = False + +# Expose hostname in the web server +expose_hostname = True + +# Expose stacktrace in the web server +expose_stacktrace = True + +# Default DAG view. Valid values are: ``tree``, ``graph``, ``duration``, ``gantt``, ``landing_times`` +dag_default_view = graph + +# Default DAG orientation. Valid values are: +# ``LR`` (Left->Right), ``TB`` (Top->Bottom), ``RL`` (Right->Left), ``BT`` (Bottom->Top) +dag_orientation = LR + +# Puts the webserver in demonstration mode; blurs the names of Operators for +# privacy. +demo_mode = False + +# The amount of time (in secs) webserver will wait for initial handshake +# while fetching logs from other worker machine +log_fetch_timeout_sec = 5 + +# Time interval (in secs) to wait before next log fetching. +log_fetch_delay_sec = 2 + +# Distance away from page bottom to enable auto tailing. +log_auto_tailing_offset = 30 + +# Animation speed for auto tailing log display. +log_animation_speed = 1000 + +# By default, the webserver shows paused DAGs. Flip this to hide paused +# DAGs by default +hide_paused_dags_by_default = False + +# Consistent page size across all listing views in the UI +page_size = 100 + +# Define the color of navigation bar +navbar_color = #fff + +# Default dagrun to show in UI +default_dag_run_display_number = 25 + +# Enable werkzeug ``ProxyFix`` middleware for reverse proxy +enable_proxy_fix = False + +# Number of values to trust for ``X-Forwarded-For``. +# More info: https://werkzeug.palletsprojects.com/en/0.16.x/middleware/proxy_fix/ +proxy_fix_x_for = 1 + +# Number of values to trust for ``X-Forwarded-Proto`` +proxy_fix_x_proto = 1 + +# Number of values to trust for ``X-Forwarded-Host`` +proxy_fix_x_host = 1 + +# Number of values to trust for ``X-Forwarded-Port`` +proxy_fix_x_port = 1 + +# Number of values to trust for ``X-Forwarded-Prefix`` +proxy_fix_x_prefix = 1 + +# Set secure flag on session cookie +cookie_secure = False + +# Set samesite policy on session cookie +cookie_samesite = Lax + +# Default setting for wrap toggle on DAG code and TI log views. +default_wrap = False + +# Allow the UI to be rendered in a frame +x_frame_enabled = True + +# Send anonymous user activity to your analytics tool +# choose from google_analytics, segment, or metarouter +# analytics_tool = + +# Unique ID of your account in the analytics tool +# analytics_id = + +# 'Recent Tasks' stats will show for old DagRuns if set +show_recent_stats_for_completed_runs = True + +# Update FAB permissions and sync security manager roles +# on webserver startup +update_fab_perms = True + +# The UI cookie lifetime in minutes. User will be logged out from UI after +# ``session_lifetime_minutes`` of non-activity +session_lifetime_minutes = 43200 + +[email] + +# Configuration email backend and whether to +# send email alerts on retry or failure +# Email backend to use +email_backend = airflow.utils.email.send_email_smtp + +# Whether email alerts should be sent when a task is retried +default_email_on_retry = True + +# Whether email alerts should be sent when a task failed +default_email_on_failure = True + +[smtp] + +# If you want airflow to send emails on retries, failure, and you want to use +# the airflow.utils.email.send_email_smtp function, you have to configure an +# smtp server here +smtp_host = localhost +smtp_starttls = True +smtp_ssl = False +# Example: smtp_user = airflow +# smtp_user = +# Example: smtp_password = airflow +# smtp_password = +smtp_port = 25 +smtp_mail_from = airflow@example.com +smtp_timeout = 30 +smtp_retry_limit = 5 + +[sentry] + +# Sentry (https://docs.sentry.io) integration. Here you can supply +# additional configuration options based on the Python platform. See: +# https://docs.sentry.io/error-reporting/configuration/?platform=python. +# Unsupported options: ``integrations``, ``in_app_include``, ``in_app_exclude``, +# ``ignore_errors``, ``before_breadcrumb``, ``before_send``, ``transport``. +# Enable error reporting to Sentry +sentry_on = false +sentry_dsn = + +[celery_kubernetes_executor] + +# This section only applies if you are using the ``CeleryKubernetesExecutor`` in +# ``[core]`` section above +# Define when to send a task to ``KubernetesExecutor`` when using ``CeleryKubernetesExecutor``. +# When the queue of a task is ``kubernetes_queue``, the task is executed via ``KubernetesExecutor``, +# otherwise via ``CeleryExecutor`` +kubernetes_queue = kubernetes + +[celery] + +# This section only applies if you are using the CeleryExecutor in +# ``[core]`` section above +# The app name that will be used by celery +celery_app_name = airflow.executors.celery_executor + +# The concurrency that will be used when starting workers with the +# ``airflow celery worker`` command. This defines the number of task instances that +# a worker will take, so size up your workers based on the resources on +# your worker box and the nature of your tasks +worker_concurrency = 8 + +# The maximum and minimum concurrency that will be used when starting workers with the +# ``airflow celery worker`` command (always keep minimum processes, but grow +# to maximum if necessary). Note the value should be max_concurrency,min_concurrency +# Pick these numbers based on resources on worker box and the nature of the task. +# If autoscale option is available, worker_concurrency will be ignored. +# http://docs.celeryproject.org/en/latest/reference/celery.bin.worker.html#cmdoption-celery-worker-autoscale +# Example: worker_autoscale = 16,12 +# worker_autoscale = + +# Used to increase the number of tasks that a worker prefetches which can improve performance. +# The number of processes multiplied by worker_prefetch_multiplier is the number of tasks +# that are prefetched by a worker. A value greater than 1 can result in tasks being unnecessarily +# blocked if there are multiple workers and one worker prefetches tasks that sit behind long +# running tasks while another worker has unutilized processes that are unable to process the already +# claimed blocked tasks. +# https://docs.celeryproject.org/en/stable/userguide/optimizing.html#prefetch-limits +# Example: worker_prefetch_multiplier = 1 +# worker_prefetch_multiplier = + +# When you start an airflow worker, airflow starts a tiny web server +# subprocess to serve the workers local log files to the airflow main +# web server, who then builds pages and sends them to users. This defines +# the port on which the logs are served. It needs to be unused, and open +# visible from the main web server to connect into the workers. +worker_log_server_port = 8793 + +# Umask that will be used when starting workers with the ``airflow celery worker`` +# in daemon mode. This control the file-creation mode mask which determines the initial +# value of file permission bits for newly created files. +worker_umask = 0o077 + +# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally +# a sqlalchemy database. Refer to the Celery documentation for more information. +broker_url = redis://redis:6379/0 + +# The Celery result_backend. When a job finishes, it needs to update the +# metadata of the job. Therefore it will post a message on a message bus, +# or insert it into a database (depending of the backend) +# This status is used by the scheduler to update the state of the task +# The use of a database is highly recommended +# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings +result_backend = db+postgresql://postgres:airflow@postgres/airflow + +# Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start +# it ``airflow celery flower``. This defines the IP that Celery Flower runs on +flower_host = 0.0.0.0 + +# The root URL for Flower +# Example: flower_url_prefix = /flower +flower_url_prefix = + +# This defines the port that Celery Flower runs on +flower_port = 5555 + +# Securing Flower with Basic Authentication +# Accepts user:password pairs separated by a comma +# Example: flower_basic_auth = user1:password1,user2:password2 +flower_basic_auth = + +# Default queue that tasks get assigned to and that worker listen on. +default_queue = default + +# How many processes CeleryExecutor uses to sync task state. +# 0 means to use max(1, number of cores - 1) processes. +sync_parallelism = 0 + +# Import path for celery configuration options +celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG +ssl_active = False +ssl_key = +ssl_cert = +ssl_cacert = + +# Celery Pool implementation. +# Choices include: ``prefork`` (default), ``eventlet``, ``gevent`` or ``solo``. +# See: +# https://docs.celeryproject.org/en/latest/userguide/workers.html#concurrency +# https://docs.celeryproject.org/en/latest/userguide/concurrency/eventlet.html +pool = prefork + +# The number of seconds to wait before timing out ``send_task_to_executor`` or +# ``fetch_celery_task_state`` operations. +operation_timeout = 1.0 + +# Celery task will report its status as 'started' when the task is executed by a worker. +# This is used in Airflow to keep track of the running tasks and if a Scheduler is restarted +# or run in HA mode, it can adopt the orphan tasks launched by previous SchedulerJob. +task_track_started = True + +# Time in seconds after which Adopted tasks are cleared by CeleryExecutor. This is helpful to clear +# stalled tasks. +task_adoption_timeout = 600 + +# The Maximum number of retries for publishing task messages to the broker when failing +# due to ``AirflowTaskTimeout`` error before giving up and marking Task as failed. +task_publish_max_retries = 3 + +# Worker initialisation check to validate Metadata Database connection +worker_precheck = False + +[celery_broker_transport_options] + +# This section is for specifying options which can be passed to the +# underlying celery broker transport. See: +# http://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-broker_transport_options +# The visibility timeout defines the number of seconds to wait for the worker +# to acknowledge the task before the message is redelivered to another worker. +# Make sure to increase the visibility timeout to match the time of the longest +# ETA you're planning to use. +# visibility_timeout is only supported for Redis and SQS celery brokers. +# See: +# http://docs.celeryproject.org/en/master/userguide/configuration.html#std:setting-broker_transport_options +# Example: visibility_timeout = 21600 +# visibility_timeout = + +[dask] + +# This section only applies if you are using the DaskExecutor in +# [core] section above +# The IP address and port of the Dask cluster's scheduler. +cluster_address = 127.0.0.1:8786 + +# TLS/ SSL settings to access a secured Dask scheduler. +tls_ca = +tls_cert = +tls_key = + +[scheduler] +# Task instances listen for external kill signal (when you clear tasks +# from the CLI or the UI), this defines the frequency at which they should +# listen (in seconds). +job_heartbeat_sec = 5 + +# How often (in seconds) to check and tidy up 'running' TaskInstancess +# that no longer have a matching DagRun +clean_tis_without_dagrun_interval = 15.0 + +# The scheduler constantly tries to trigger new tasks (look at the +# scheduler section in the docs for more information). This defines +# how often the scheduler should run (in seconds). +scheduler_heartbeat_sec = 5 + +# The number of times to try to schedule each DAG file +# -1 indicates unlimited number +num_runs = -1 + +# The number of seconds to wait between consecutive DAG file processing +processor_poll_interval = 1 + +# after how much time (seconds) a new DAGs should be picked up from the filesystem +min_file_process_interval = 30 + +# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes. +dag_dir_list_interval = 300 + +# How often should stats be printed to the logs. Setting to 0 will disable printing stats +print_stats_interval = 30 + +# How often (in seconds) should pool usage stats be sent to statsd (if statsd_on is enabled) +pool_metrics_interval = 5.0 + +# If the last scheduler heartbeat happened more than scheduler_health_check_threshold +# ago (in seconds), scheduler is considered unhealthy. +# This is used by the health check in the "/health" endpoint +scheduler_health_check_threshold = 30 + +# How often (in seconds) should the scheduler check for orphaned tasks and SchedulerJobs +orphaned_tasks_check_interval = 300.0 +child_process_log_directory = /opt/airflow/logs/scheduler + +# Local task jobs periodically heartbeat to the DB. If the job has +# not heartbeat in this many seconds, the scheduler will mark the +# associated task instance as failed and will re-schedule the task. +scheduler_zombie_task_threshold = 300 + +# Turn off scheduler catchup by setting this to ``False``. +# Default behavior is unchanged and +# Command Line Backfills still work, but the scheduler +# will not do scheduler catchup if this is ``False``, +# however it can be set on a per DAG basis in the +# DAG definition (catchup) +catchup_by_default = True + +# This changes the batch size of queries in the scheduling main loop. +# If this is too high, SQL query performance may be impacted by one +# or more of the following: +# - reversion to full table scan +# - complexity of query predicate +# - excessive locking +# Additionally, you may hit the maximum allowable query length for your db. +# Set this to 0 for no limit (not advised) +max_tis_per_query = 512 + +# Should the scheduler issue ``SELECT ... FOR UPDATE`` in relevant queries. +# If this is set to False then you should not run more than a single +# scheduler at once +use_row_level_locking = True + +# Max number of DAGs to create DagRuns for per scheduler loop +# +# Default: 10 +# max_dagruns_to_create_per_loop = + +# How many DagRuns should a scheduler examine (and lock) when scheduling +# and queuing tasks. +# +# Default: 20 +# max_dagruns_per_loop_to_schedule = + +# Should the Task supervisor process perform a "mini scheduler" to attempt to schedule more tasks of the +# same DAG. Leaving this on will mean tasks in the same DAG execute quicker, but might starve out other +# dags in some circumstances +# +# Default: True +# schedule_after_task_execution = + +# The scheduler can run multiple processes in parallel to parse dags. +# This defines how many processes will run. +parsing_processes = 2 + +# Turn off scheduler use of cron intervals by setting this to False. +# DAGs submitted manually in the web UI or with trigger_dag will still run. +use_job_schedule = True + +# Allow externally triggered DagRuns for Execution Dates in the future +# Only has effect if schedule_interval is set to None in DAG +allow_trigger_in_future = False + +[kerberos] +ccache = /tmp/airflow_krb5_ccache + +# gets augmented with fqdn +principal = airflow +reinit_frequency = 3600 +kinit_path = kinit +keytab = airflow.keytab + +[github_enterprise] +api_rev = v3 + +[admin] +# UI to hide sensitive variable fields when set to True +hide_sensitive_variable_fields = True + +# A comma-separated list of sensitive keywords to look for in variables names. +sensitive_variable_fields = + +[elasticsearch] +# Elasticsearch host +host = + +# Format of the log_id, which is used to query for a given tasks logs +log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number} + +# Used to mark the end of a log stream for a task +end_of_log_mark = end_of_log + +# Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id +# Code will construct log_id using the log_id template from the argument above. +# NOTE: The code will prefix the https:// automatically, don't include that here. +frontend = + +# Write the task logs to the stdout of the worker, rather than the default files +write_stdout = False + +# Instead of the default log formatter, write the log lines as JSON +json_format = False + +# Log fields to also attach to the json output, if enabled +json_fields = asctime, filename, lineno, levelname, message + +[elasticsearch_configs] +use_ssl = False +verify_certs = True + +[kubernetes] +# Path to the YAML pod file. If set, all other kubernetes-related fields are ignored. +pod_template_file = + +# The repository of the Kubernetes Image for the Worker to Run +worker_container_repository = + +# The tag of the Kubernetes Image for the Worker to Run +worker_container_tag = + +# The Kubernetes namespace where airflow workers should be created. Defaults to ``default`` +namespace = default + +# If True, all worker pods will be deleted upon termination +delete_worker_pods = True + +# If False (and delete_worker_pods is True), +# failed worker pods will not be deleted so users can investigate them. +delete_worker_pods_on_failure = False + +# Number of Kubernetes Worker Pod creation calls per scheduler loop. +# Note that the current default of "1" will only launch a single pod +# per-heartbeat. It is HIGHLY recommended that users increase this +# number to match the tolerance of their kubernetes cluster for +# better performance. +worker_pods_creation_batch_size = 1 + +# Allows users to launch pods in multiple namespaces. +# Will require creating a cluster-role for the scheduler +multi_namespace_mode = False + +# Use the service account kubernetes gives to pods to connect to kubernetes cluster. +# It's intended for clients that expect to be running inside a pod running on kubernetes. +# It will raise an exception if called from a process not running in a kubernetes environment. +in_cluster = True + +# When running with in_cluster=False change the default cluster_context or config_file +# options to Kubernetes client. Leave blank these to use default behaviour like ``kubectl`` has. +# cluster_context = + +# Path to the kubernetes configfile to be used when ``in_cluster`` is set to False +# config_file = + +# Keyword parameters to pass while calling a kubernetes client core_v1_api methods +# from Kubernetes Executor provided as a single line formatted JSON dictionary string. +# List of supported params are similar for all core_v1_apis, hence a single config +# variable for all apis. See: +# https://raw.githubusercontent.com/kubernetes-client/python/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/api/core_v1_api.py +kube_client_request_args = + +# Optional keyword arguments to pass to the ``delete_namespaced_pod`` kubernetes client +# ``core_v1_api`` method when using the Kubernetes Executor. +# This should be an object and can contain any of the options listed in the ``v1DeleteOptions`` +# class defined here: +# https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19 +# Example: delete_option_kwargs = {"grace_period_seconds": 10} +delete_option_kwargs = + +# Enables TCP keepalive mechanism. This prevents Kubernetes API requests to hang indefinitely +# when idle connection is time-outed on services like cloud load balancers or firewalls. +enable_tcp_keepalive = False + +# When the `enable_tcp_keepalive` option is enabled, TCP probes a connection that has +# been idle for `tcp_keep_idle` seconds. +tcp_keep_idle = 120 + +# When the `enable_tcp_keepalive` option is enabled, if Kubernetes API does not respond +# to a keepalive probe, TCP retransmits the probe after `tcp_keep_intvl` seconds. +tcp_keep_intvl = 30 + +# When the `enable_tcp_keepalive` option is enabled, if Kubernetes API does not respond +# to a keepalive probe, TCP retransmits the probe `tcp_keep_cnt number` of times before +# a connection is considered to be broken. +tcp_keep_cnt = 6 + +[smart_sensor] +# When `use_smart_sensor` is True, Airflow redirects multiple qualified sensor tasks to +# smart sensor task. +use_smart_sensor = False + +# `shard_code_upper_limit` is the upper limit of `shard_code` value. The `shard_code` is generated +# by `hashcode % shard_code_upper_limit`. +shard_code_upper_limit = 10000 + +# The number of running smart sensor processes for each service. +shards = 5 + +# comma separated sensor classes support in smart_sensor. +sensors_enabled = NamedHivePartitionSensor \ No newline at end of file diff --git a/setup-airflow/execute-carte.sh b/setup-airflow/execute-carte.sh new file mode 100755 index 0000000..a9de75d --- /dev/null +++ b/setup-airflow/execute-carte.sh @@ -0,0 +1,71 @@ +#!/bin/bash +# Derived from and modified the logging mechanism discussed in the below article. This mechanism enables Airflow +# container to trigger PDI tasks synchronously: +# https://diethardsteiner.github.io/pdi/2020/04/01/Scheduling-a-PDI-Job-on-Apache-Airflow.html + + +CARTE_SERVER_URL=$PDI_CONN_STR +PDI_LOG_LEVEL=Basic +SLEEP_INTERVAL_SECONDS=5 +PDI_TASK=$1 +PDI_TASK_CMD=$2 + + +if [[ $PDI_TASK_CMD == *"rep="* ]] && [[ $PDI_TASK_CMD == *"job="* || $PDI_TASK_CMD == *"trans="* ]]; then + + set PDI_TASK_ID + set PDI_TASK_STATUS + + + # Execute task and get its Task ID + if [[ $PDI_TASK_CMD == *"executeJob"* ]]; then + PDI_TASK_ID=$(curl -s "${CARTE_SERVER_URL}/kettle/${PDI_TASK_CMD}&level=${PDI_LOG_LEVEL}" | xmlstarlet sel -t -m '/webresult/id' -v . -n) + echo "The PDI Task ID is: " ${PDI_TASK_ID} + else + PDI_TASK_ID=$(curl -s "${CARTE_SERVER_URL}/kettle/${PDI_TASK_CMD}&level=${PDI_LOG_LEVEL}") + fi + + getPDITaskStatus() { + if [[ $PDI_TASK_CMD == *"executeTrans"* ]]; then + curl -s "${CARTE_SERVER_URL}/kettle/transStatus/?name=${PDI_TASK}&id=${PDI_TASK_ID}&xml=Y" | xmlstarlet sel -t -m '/transstatus/status_desc' -v . -n + else + curl -s "${CARTE_SERVER_URL}/kettle/jobStatus/?name=${PDI_TASK}&id=${PDI_TASK_ID}&xml=Y" | xmlstarlet sel -t -m '/jobstatus/status_desc' -v . -n + fi + } + + getPDITaskFullLog() { + if [[ $PDI_TASK_CMD == *"executeTrans"* ]]; then + echo "Check carte server for transformation log!!!" + else + curl -s "${CARTE_SERVER_URL}/kettle/jobStatus/?name=${PDI_TASK}&id=${PDI_TASK_ID}&xml=Y" | xmlstarlet sel -t -m 'jobstatus/result/log_text' -v . -n + fi + } + + PDI_TASK_STATUS=$(getPDITaskStatus) + + # loop as long as the job is running + while [ ${PDI_TASK_STATUS} == "Running" ] + do + PDI_TASK_STATUS=$(getPDITaskStatus) + echo "The PDI task status is: " ${PDI_TASK_STATUS} + echo "I'll check in ${SLEEP_INTERVAL_SECONDS} seconds again" + # check every x seconds + sleep ${SLEEP_INTERVAL_SECONDS} + done + + # get and print full pdi task log + echo "The PDI task status is: " ${PDI_TASK_STATUS} + echo "Printing full log ..." + echo $(getPDITaskFullLog) + + # Check if any error. Send exit 1 if so. + if [[ ${PDI_TASK_STATUS} == "Finished" ]]; then + exit 0 + else + exit 1 + fi + +else + echo "Error executing: ${PDI_TASK_CMD}\n File or directory not found." + exit 1 +fi \ No newline at end of file diff --git a/setup-pentaho/Dockerfile b/setup-pentaho/Dockerfile new file mode 100644 index 0000000..4ae4245 --- /dev/null +++ b/setup-pentaho/Dockerfile @@ -0,0 +1,73 @@ +# Get Base image +FROM openjdk:8-jre + +LABEL maintainer="saritkumarsi@gmail.com" \ + version="1.0" \ + description="Docker file builds container with Pentaho Data Integration & Carte Server" + +# Set PDI user with permissions same as the Host machine. +ARG PENTAHO_GID +ARG PENTAHO_UID + +# Set required environment vars +ENV PDI_RELEASE=9.1 \ + PDI_VERSION=9.1.0.0-324 \ + PENTAHO_JAVA_HOME=/usr/local/openjdk-8 \ + PENTAHO_HOME=/home/pentaho \ + PENTAHO_UID=${PENTAHO_UID} \ + PENTAHO_GID=${PENTAHO_GID} \ + KETTLE_HOME=/opt/data-integration \ + PATH=${KETTLE_HOME}:${PATH} + +# Create Pentaho user home directory and required sub-folders +RUN mkdir -p ${PENTAHO_HOME}/templates ${PENTAHO_HOME}/scripts + +# Copy all required configs and entrypoint files +COPY carte-*-config.xml ${PENTAHO_HOME}/templates/ +COPY docker-entrypoint.sh ${PENTAHO_HOME}/scripts/ + +# Create Pentaho group passed as PENTAHO_GID arg +RUN command groupadd -r ${PENTAHO_GID} \ +# Create Pentaho user with PENTAHO_UID same as Host UID, and assign to new group + && useradd -s /bin/bash -d ${PENTAHO_HOME} -r -g ${PENTAHO_GID} -u ${PENTAHO_UID} pentaho + +# Pentaho download and setup +# Download PDI and save in PENTAHO_HOME +RUN /usr/bin/wget --progress=dot:giga \ + https://sourceforge.net/projects/pentaho/files/Pentaho%20${PDI_RELEASE}/client-tools/pdi-ce-${PDI_VERSION}.zip \ + -P ${PENTAHO_HOME} \ +# Unzip PDI to /opt/ directory + && /usr/bin/unzip -q ${PENTAHO_HOME}/pdi-ce-${PDI_VERSION}.zip -d /opt/ \ +# Clean up downloaded files + && rm -R ${PENTAHO_HOME}/pdi-ce-${PDI_VERSION}.zip \ +# Create directory for the kettle.properties file + && mkdir ${KETTLE_HOME}/.kettle \ +# Make pentaho user owner of both PENTAHO_HOME and KETTLE_HOME directories + && chown -R pentaho ${PENTAHO_HOME} ${KETTLE_HOME} \ +# Make entrypoint as executable + && chmod +x ${PENTAHO_HOME}/scripts/docker-entrypoint.sh + +# Install App dependent packages, if any +# Download MySQL JDBC Connector to PENTAHO_HOME +RUN /usr/bin/wget --progress=dot:giga \ + https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.49.zip -P ${PENTAHO_HOME} \ +# Unzip MySQL jar file + && /usr/bin/unzip -q ${PENTAHO_HOME}/mysql-connector-java-5.1.49.zip -d ${PENTAHO_HOME} \ +# Copy unzipped jar file to /opt/data-integration + && cp ${PENTAHO_HOME}/mysql-connector-java-5.1.49/mysql-connector-java-5.1.49-bin.jar /opt/data-integration/lib \ +# Clean up downloaded files + && rm -R ${PENTAHO_HOME}/mysql-connector* + +USER pentaho + +# Expose Carte Server +EXPOSE ${CARTE_PORT} + +# Set working directory +WORKDIR ${KETTLE_HOME} + +# Set container entrypoint. Sets all required configs for carte server. +ENTRYPOINT ["/home/pentaho/scripts/docker-entrypoint.sh"] + +# Start Carte Server - the entry point sets configs in carte.config.xml which is passed to carte.sh +CMD ["carte.sh", "carte.config.xml"] \ No newline at end of file diff --git a/setup-pentaho/carte-master-config.xml b/setup-pentaho/carte-master-config.xml new file mode 100644 index 0000000..1704611 --- /dev/null +++ b/setup-pentaho/carte-master-config.xml @@ -0,0 +1,12 @@ + + + + CARTE_NAME + CARTE_NETWORK_INTERFACE + CARTE_PORT + CARTE_USER + CARTE_PASSWORD + CARTE_IS_MASTER + + + \ No newline at end of file diff --git a/setup-pentaho/carte-slave-config.xml b/setup-pentaho/carte-slave-config.xml new file mode 100644 index 0000000..bfe8024 --- /dev/null +++ b/setup-pentaho/carte-slave-config.xml @@ -0,0 +1,27 @@ + + + + + + CARTE_MASTER_NAME + CARTE_MASTER_HOSTNAME + CARTE_MASTER_PORT + CARTE_MASTER_USER + CARTE_MASTER_PASSWORD + CARTE_MASTER_IS_MASTER + + + + + CARTE_REPORT_TO_MASTERS + + + CARTE_NAME + CARTE_NETWORK_INTERFACE + CARTE_PORT + CARTE_USER + CARTE_PASSWORD + CARTE_IS_MASTER + + + \ No newline at end of file diff --git a/setup-pentaho/docker-entrypoint.sh b/setup-pentaho/docker-entrypoint.sh new file mode 100644 index 0000000..6c3a6a7 --- /dev/null +++ b/setup-pentaho/docker-entrypoint.sh @@ -0,0 +1,47 @@ +#!/bin/bash + +set -e + +if [ "$1" = 'carte.sh' ]; then + if [ ! -f "$KETTLE_HOME/carte.config.xml" ]; then + # Set variables to default if not explicitly provided + : ${CARTE_NAME:=carte-server} + : ${CARTE_NETWORK_INTERFACE:=eth0} + : ${CARTE_PORT:=8181} + : ${CARTE_USER:=cluster} + : ${CARTE_PASSWORD:=cluster} + : ${CARTE_IS_MASTER:=Y} + + : ${CARTE_INCLUDE_MASTERS:=N} + + : ${CARTE_REPORT_TO_MASTERS:=Y} + : ${CARTE_MASTER_NAME:=carte-master} + : ${CARTE_MASTER_HOSTNAME:=localhost} + : ${CARTE_MASTER_PORT:=8181} + : ${CARTE_MASTER_USER:=cluster} + : ${CARTE_MASTER_PASSWORD:=cluster} + : ${CARTE_MASTER_IS_MASTER:=Y} + + # Copy master or slave config file based on the CARTE_INCLUDE_MASTERS flag + if [ "$CARTE_INCLUDE_MASTERS" = "Y" ]; then + cp $PENTAHO_HOME/templates/carte-slave-config.xml "$KETTLE_HOME/carte.config.xml" + sed -i "s/CARTE_REPORT_TO_MASTERS/$CARTE_REPORT_TO_MASTERS/" "$KETTLE_HOME/carte.config.xml" + sed -i "s/CARTE_MASTER_NAME/$CARTE_MASTER_NAME/" "$KETTLE_HOME/carte.config.xml" + sed -i "s/CARTE_MASTER_HOSTNAME/$CARTE_MASTER_HOSTNAME/" "$KETTLE_HOME/carte.config.xml" + sed -i "s/CARTE_MASTER_PORT/$CARTE_MASTER_PORT/" "$KETTLE_HOME/carte.config.xml" + sed -i "s/CARTE_MASTER_USER/$CARTE_MASTER_USER/" "$KETTLE_HOME/carte.config.xml" + sed -i "s/CARTE_MASTER_PASSWORD/$CARTE_MASTER_PASSWORD/" "$KETTLE_HOME/carte.config.xml" + sed -i "s/CARTE_MASTER_IS_MASTER/$CARTE_MASTER_IS_MASTER/" "$KETTLE_HOME/carte.config.xml" + else + cp $PENTAHO_HOME/templates/carte-master-config.xml "$KETTLE_HOME/carte.config.xml" + fi + sed -i "s/CARTE_NAME/$CARTE_NAME/" "$KETTLE_HOME/carte.config.xml" + sed -i "s/CARTE_NETWORK_INTERFACE/$CARTE_NETWORK_INTERFACE/" "$KETTLE_HOME/carte.config.xml" + sed -i "s/CARTE_PORT/$CARTE_PORT/" "$KETTLE_HOME/carte.config.xml" + sed -i "s/CARTE_USER/$CARTE_USER/" "$KETTLE_HOME/carte.config.xml" + sed -i "s/CARTE_PASSWORD/$CARTE_PASSWORD/" "$KETTLE_HOME/carte.config.xml" + sed -i "s/CARTE_IS_MASTER/$CARTE_IS_MASTER/" "$KETTLE_HOME/carte.config.xml" + fi +fi + +exec "$@" \ No newline at end of file diff --git a/setup-pentaho/kettle-properties/dev-kettle.properties b/setup-pentaho/kettle-properties/dev-kettle.properties new file mode 100644 index 0000000..a41c87a --- /dev/null +++ b/setup-pentaho/kettle-properties/dev-kettle.properties @@ -0,0 +1,2 @@ +HOST_ENV=dev +PDI_TEST_WELCOME_MESSAGE="Hi from Dev PDI!!!" diff --git a/setup-pentaho/kettle-properties/localhost-kettle.properties b/setup-pentaho/kettle-properties/localhost-kettle.properties new file mode 100644 index 0000000..9328d77 --- /dev/null +++ b/setup-pentaho/kettle-properties/localhost-kettle.properties @@ -0,0 +1,2 @@ +HOST_ENV=localhost +PDI_TEST_WELCOME_MESSAGE="Hi from Localhost PDI!!!" diff --git a/setup-pentaho/kettle-properties/prod-kettle.properties b/setup-pentaho/kettle-properties/prod-kettle.properties new file mode 100644 index 0000000..3103bbf --- /dev/null +++ b/setup-pentaho/kettle-properties/prod-kettle.properties @@ -0,0 +1,2 @@ +HOST_ENV=prod +PDI_TEST_WELCOME_MESSAGE="Hi from Prod PDI!!!" diff --git a/setup-pentaho/kettle-properties/qa-kettle.properties b/setup-pentaho/kettle-properties/qa-kettle.properties new file mode 100644 index 0000000..3a1d8ce --- /dev/null +++ b/setup-pentaho/kettle-properties/qa-kettle.properties @@ -0,0 +1,2 @@ +HOST_ENV=qa +PDI_TEST_WELCOME_MESSAGE="Hi from QA PDI!!!" diff --git a/setup-pentaho/repositories.xml b/setup-pentaho/repositories.xml new file mode 100644 index 0000000..3d4dce0 --- /dev/null +++ b/setup-pentaho/repositories.xml @@ -0,0 +1,11 @@ + + + + KettleFileRepository + test-repo + .kjb or .ktr files + /home/pentaho/repositories + N + N + + \ No newline at end of file diff --git a/setup-pentaho/simple-jndi/jdbc.properties b/setup-pentaho/simple-jndi/jdbc.properties new file mode 100644 index 0000000..728c3f3 --- /dev/null +++ b/setup-pentaho/simple-jndi/jdbc.properties @@ -0,0 +1,31 @@ +# Reference: https://help.pentaho.com/Documentation/9.1/Setup/JDBC_drivers_reference +# Caution: this file stores DB credentials, hence it should be added to .gitignore. Instead create this file in the server and mount it to PDI container using docker compose. + + +# Localhost PostGres DB connection string +db-localhost/type=javax.sql.DataSource +db-localhost/driver=org.postgresql.Driver +db-localhost/url=jdbc:postgresql://host.docker.internal/database +db-localhost/user=postgres +db-localhost/password=postgres + +# Dev PostGres DB connection string +db-dev/type=javax.sql.DataSource +db-dev/driver=org.postgresql.Driver +db-dev/url=jdbc:postgresql://[:]/ +db-dev/user=postgres +db-dev/password=postgres + +# QA PostGres DB connection string +db-qa/type=javax.sql.DataSource +db-qa/driver=org.postgresql.Driver +db-qa/url=jdbc:postgresql://[:]/ +db-qa/user=postgres +db-qa/password=postgres + +# PROD PostGres DB connection string +db-prod/type=javax.sql.DataSource +db-prod/driver=org.postgresql.Driver +db-prod/url=jdbc:postgresql://[:]/ +db-prod/user=postgres +db-prod/password=postgres \ No newline at end of file diff --git a/source-code/dags/async-trigger.py b/source-code/dags/async-trigger.py new file mode 100644 index 0000000..9eabed0 --- /dev/null +++ b/source-code/dags/async-trigger.py @@ -0,0 +1,43 @@ +# To illustrate asynchronous data processing triggers from Airflow container to PDI container + +from airflow import DAG +from airflow.utils.dates import days_ago +from airflow.operators.bash_operator import BashOperator +from airflow.operators.dummy import DummyOperator + +args = { + "owner": "airflow", + "start_date": days_ago(1), + "depends_on_past": False, + "wait_for_downstream": False, + "catchup": False, +} + + +with DAG( + dag_id="async-trigger", + default_args=args, + schedule_interval=None, + catchup=False, + description=f"To illustrate asynchronous task triggers from Airflow container to PDI container", +) as dag: + + t1 = DummyOperator( + task_id='Start', + ) + + t2 = BashOperator( + task_id='Task_1', + bash_command='curl "${PDI_CONN_STR}/kettle/executeTrans/?rep=test-repo&d&trans=/process1/task1"' + ) + + t3 = BashOperator( + task_id='Task_2', + bash_command='curl "${PDI_CONN_STR}/kettle/executeTrans/?rep=test-repo&trans=/process1/task2"' + ) + + t4 = DummyOperator( + task_id='Stop', + ) + + t1 >> t2 >> t3 >> t4 \ No newline at end of file diff --git a/source-code/dags/hello-world.py b/source-code/dags/hello-world.py new file mode 100644 index 0000000..35050bf --- /dev/null +++ b/source-code/dags/hello-world.py @@ -0,0 +1,44 @@ +# To illustrate how we can trigger a job/transformation in the PDI container via Carte APIs +# Reference: https://help.pentaho.com/Documentation/9.1/Developer_center/REST_API_Reference/Carte + +from airflow import DAG +from airflow.utils.dates import days_ago +from airflow.operators.bash_operator import BashOperator +from airflow.operators.dummy import DummyOperator + +args = { + "owner": "airflow", + "start_date": days_ago(1), + "depends_on_past": False, + "wait_for_downstream": False, + "catchup": False, +} + + +with DAG( + dag_id="hello-world", + default_args=args, + schedule_interval=None, + catchup=False, + description=f"Hello World!!!", +) as dag: + + t1 = DummyOperator( + task_id='Start', + ) + + t2 = BashOperator( + task_id='Trigger_Job', + bash_command='curl "${PDI_CONN_STR}/kettle/executeJob/?rep=test-repo&job=/helloworld/helloworld-job"' + ) + + t3 = BashOperator( + task_id='Trigger_Transformation', + bash_command='curl "${PDI_CONN_STR}/kettle/executeTrans/?rep=test-repo&trans=/helloworld/helloworld-trans"' + ) + + t4 = DummyOperator( + task_id='Stop', + ) + + t1 >> [t2, t3] >> t4 \ No newline at end of file diff --git a/source-code/dags/sync-trigger.py b/source-code/dags/sync-trigger.py new file mode 100644 index 0000000..91212de --- /dev/null +++ b/source-code/dags/sync-trigger.py @@ -0,0 +1,54 @@ +# To illustrate synchronous task triggers from Airflow container to PDI container + +from airflow import DAG +from airflow.utils.dates import days_ago +from airflow.operators.bash_operator import BashOperator +from airflow.operators.dummy import DummyOperator +from utils.execute_pdi import execute_trans + +args = { + "owner": "airflow", + "start_date": days_ago(1), + "depends_on_past": False, + "wait_for_downstream": False, + "catchup": False, +} + + +with DAG( + dag_id="sync-trigger", + default_args=args, + schedule_interval=None, + catchup=False, + description=f"To illustrate synchronous task triggers from Airflow container to PDI container", +) as dag: + + t1 = DummyOperator( + task_id='Start', + ) + + t2 = BashOperator( + task_id='Task_1', + bash_command=execute_trans( + rep="test-repo", + task="task1", + dir="/process1/", + param="" + ) + ) + + t3 = BashOperator( + task_id='Task_2', + bash_command=execute_trans( + rep="test-repo", + task="task2", + dir="/process1/", + param="" + ) + ) + + t4 = DummyOperator( + task_id='Stop', + ) + + t1 >> t2 >> t3 >> t4 \ No newline at end of file diff --git a/source-code/dags/utils/execute_pdi.py b/source-code/dags/utils/execute_pdi.py new file mode 100644 index 0000000..f7c84cf --- /dev/null +++ b/source-code/dags/utils/execute_pdi.py @@ -0,0 +1,56 @@ +""" +Summary: This module contains helper functions required to build + the Carte API URL trigger for PDI Jobs/ Transformations. +Params: + - rep: PDI repository containing the job/trans; value must exist in the tag of repositories.xml + - task: job/trans file name present in the folder; do not include .ktr/.kjb extension + - dir: '/' if job/trans file is in root directory inside . Else '/subfolder1/subfolder2/.../'. Do not include job/trans file name + - param: pass parameters for the job/trans. Add '&' if multiple,ex: param1=value¶m2=value +""" + + +def execute_command(executionType, task_type, rep, task, dir, param): + + command = "bash /opt/airflow/execute-carte.sh " + command += f'''{task} "{executionType}/?rep={rep}&{task_type}={dir}{task}&{param}"''' + + return command + +def execute_trans(rep, task, dir, param=''): + """Summary: Build executeTrans Carte API URL + + Args: + rep (string): [PDI repository containing the transformation .ktr file. Value must exist in the tag of repositories.xml] + task ([string]): [transformation file name present in the folder. Do not include .ktr extension] + dir ([string]): ['/' if file is in root directory inside . Else '/subfolder1/subfolder2/.../'. Do not include file name] + param ([string]): [If required to pass parameters to the transformation. Add '&' if multiple, ex: param1=value¶m2=value] + + Returns: + [string]: [Carte executeTrans API URL for the transformation] + """ + + command = execute_command( + executionType="executeTrans", task_type="trans", + rep=rep, task=task, dir=dir,param=param + ) + + return command + +def execute_job(rep, task, dir, param=''): + """Summary: Build executeJob Carte API URL + + Args: + rep (string): [PDI repository containing the job .kjb file. Value must exist in the tag of repositories.xml] + task ([string]): [job file name present in the folder. Do not include .kjb extension] + dir ([string]): ['/' if file is in root directory inside . Else '/subfolder1/subfolder2/.../'. Do not include file name] + param ([string]): [If required to pass parameters to the job. Add '&' if multiple, ex: param1=value¶m2=value] + + Returns: + [string]: [Carte executeJob API URL for the job] + """ + command = execute_command( + executionType="executeJob", task_type="job", + rep=rep, task=task, dir=dir,param=param + ) + + return command \ No newline at end of file diff --git a/source-code/ktrs/helloworld/helloworld-job.kjb b/source-code/ktrs/helloworld/helloworld-job.kjb new file mode 100644 index 0000000..de03c33 --- /dev/null +++ b/source-code/ktrs/helloworld/helloworld-job.kjb @@ -0,0 +1,385 @@ + + + helloworld-job + + + + 0 + / + - + 2015/06/01 15:33:25.423 + - + 2015/06/01 15:34:07.453 + + + + + + + + + + + + + ID_JOB + Y + ID_JOB + + + CHANNEL_ID + Y + CHANNEL_ID + + + JOBNAME + Y + JOBNAME + + + STATUS + Y + STATUS + + + LINES_READ + Y + LINES_READ + + + LINES_WRITTEN + Y + LINES_WRITTEN + + + LINES_UPDATED + Y + LINES_UPDATED + + + LINES_INPUT + Y + LINES_INPUT + + + LINES_OUTPUT + Y + LINES_OUTPUT + + + LINES_REJECTED + Y + LINES_REJECTED + + + ERRORS + Y + ERRORS + + + STARTDATE + Y + STARTDATE + + + ENDDATE + Y + ENDDATE + + + LOGDATE + Y + LOGDATE + + + DEPDATE + Y + DEPDATE + + + REPLAYDATE + Y + REPLAYDATE + + + LOG_FIELD + Y + LOG_FIELD + + + EXECUTING_SERVER + N + EXECUTING_SERVER + + + EXECUTING_USER + N + EXECUTING_USER + + + START_JOB_ENTRY + N + START_JOB_ENTRY + + + CLIENT + N + CLIENT + + + + + +
+ + + ID_BATCH + Y + ID_BATCH + + + CHANNEL_ID + Y + CHANNEL_ID + + + LOG_DATE + Y + LOG_DATE + + + JOBNAME + Y + TRANSNAME + + + JOBENTRYNAME + Y + STEPNAME + + + LINES_READ + Y + LINES_READ + + + LINES_WRITTEN + Y + LINES_WRITTEN + + + LINES_UPDATED + Y + LINES_UPDATED + + + LINES_INPUT + Y + LINES_INPUT + + + LINES_OUTPUT + Y + LINES_OUTPUT + + + LINES_REJECTED + Y + LINES_REJECTED + + + ERRORS + Y + ERRORS + + + RESULT + Y + RESULT + + + NR_RESULT_ROWS + Y + NR_RESULT_ROWS + + + NR_RESULT_FILES + Y + NR_RESULT_FILES + + + LOG_FIELD + N + LOG_FIELD + + + COPY_NR + N + COPY_NR + + + + + +
+ + + ID_BATCH + Y + ID_BATCH + + + CHANNEL_ID + Y + CHANNEL_ID + + + LOG_DATE + Y + LOG_DATE + + + LOGGING_OBJECT_TYPE + Y + LOGGING_OBJECT_TYPE + + + OBJECT_NAME + Y + OBJECT_NAME + + + OBJECT_COPY + Y + OBJECT_COPY + + + REPOSITORY_DIRECTORY + Y + REPOSITORY_DIRECTORY + + + FILENAME + Y + FILENAME + + + OBJECT_ID + Y + OBJECT_ID + + + OBJECT_REVISION + Y + OBJECT_REVISION + + + PARENT_CHANNEL_ID + Y + PARENT_CHANNEL_ID + + + ROOT_CHANNEL_ID + Y + ROOT_CHANNEL_ID + + + N + + + + START + + SPECIAL + + Y + N + N + 0 + 0 + 60 + 12 + 0 + 1 + 1 + N + Y + 0 + 128 + 64 + + + + trans + + TRANS + + filename + + ${Internal.Entry.Current.Directory}/helloworld/helloworld-trans + + N + N + N + N + N + N + + + N + N + Basic + N + + N + Y + N + N + N + Pentaho local + + Y + + MY_MESSAGE + + ${PDI_TEST_WELCOME_MESSAGE} + + + N + Y + 0 + 288 + 64 + + + + + + START + trans + 0 + 0 + Y + Y + Y + + + + + + + METASTORE.pentaho + + Default Run Configuration + {"namespace":"pentaho","id":"Default Run Configuration","name":"Default Run Configuration","description":"Defines a default run configuration","metaStoreName":null} + + + + {"_":"Embedded MetaStore Elements","namespace":"pentaho","type":"Default Run Configuration"} + + Pentaho local + {"children":[{"children":[],"id":"server","value":null},{"children":[],"id":"clustered","value":"N"},{"children":[],"id":"name","value":"Pentaho local"},{"children":[],"id":"description","value":null},{"children":[],"id":"pentaho","value":"N"},{"children":[],"id":"readOnly","value":"Y"},{"children":[],"id":"sendResources","value":"N"},{"children":[],"id":"logRemoteExecutionLocally","value":"N"},{"children":[],"id":"remote","value":"N"},{"children":[],"id":"local","value":"Y"},{"children":[],"id":"showTransformations","value":"N"}],"id":"Pentaho local","value":null,"name":"Pentaho local","owner":null,"ownerPermissionsList":[]} + + + + diff --git a/source-code/ktrs/helloworld/helloworld-trans.ktr b/source-code/ktrs/helloworld/helloworld-trans.ktr new file mode 100644 index 0000000..43714ae --- /dev/null +++ b/source-code/ktrs/helloworld/helloworld-trans.ktr @@ -0,0 +1,526 @@ + + + + helloworld-trans + + + + Normal + 0 + / + + + + + + +
+ + + + + ID_BATCH + Y + ID_BATCH + + + CHANNEL_ID + Y + CHANNEL_ID + + + TRANSNAME + Y + TRANSNAME + + + STATUS + Y + STATUS + + + LINES_READ + Y + LINES_READ + + + + LINES_WRITTEN + Y + LINES_WRITTEN + + + + LINES_UPDATED + Y + LINES_UPDATED + + + + LINES_INPUT + Y + LINES_INPUT + + + + LINES_OUTPUT + Y + LINES_OUTPUT + + + + LINES_REJECTED + Y + LINES_REJECTED + + + + ERRORS + Y + ERRORS + + + STARTDATE + Y + STARTDATE + + + ENDDATE + Y + ENDDATE + + + LOGDATE + Y + LOGDATE + + + DEPDATE + Y + DEPDATE + + + REPLAYDATE + Y + REPLAYDATE + + + LOG_FIELD + Y + LOG_FIELD + + + EXECUTING_SERVER + N + EXECUTING_SERVER + + + EXECUTING_USER + N + EXECUTING_USER + + + CLIENT + N + CLIENT + + + + + +
+ + + + ID_BATCH + Y + ID_BATCH + + + SEQ_NR + Y + SEQ_NR + + + LOGDATE + Y + LOGDATE + + + TRANSNAME + Y + TRANSNAME + + + STEPNAME + Y + STEPNAME + + + STEP_COPY + Y + STEP_COPY + + + LINES_READ + Y + LINES_READ + + + LINES_WRITTEN + Y + LINES_WRITTEN + + + LINES_UPDATED + Y + LINES_UPDATED + + + LINES_INPUT + Y + LINES_INPUT + + + LINES_OUTPUT + Y + LINES_OUTPUT + + + LINES_REJECTED + Y + LINES_REJECTED + + + ERRORS + Y + ERRORS + + + INPUT_BUFFER_ROWS + Y + INPUT_BUFFER_ROWS + + + OUTPUT_BUFFER_ROWS + Y + OUTPUT_BUFFER_ROWS + + + + + +
+ + + ID_BATCH + Y + ID_BATCH + + + CHANNEL_ID + Y + CHANNEL_ID + + + LOG_DATE + Y + LOG_DATE + + + LOGGING_OBJECT_TYPE + Y + LOGGING_OBJECT_TYPE + + + OBJECT_NAME + Y + OBJECT_NAME + + + OBJECT_COPY + Y + OBJECT_COPY + + + REPOSITORY_DIRECTORY + Y + REPOSITORY_DIRECTORY + + + FILENAME + Y + FILENAME + + + OBJECT_ID + Y + OBJECT_ID + + + OBJECT_REVISION + Y + OBJECT_REVISION + + + PARENT_CHANNEL_ID + Y + PARENT_CHANNEL_ID + + + ROOT_CHANNEL_ID + Y + ROOT_CHANNEL_ID + + + + + +
+ + + ID_BATCH + Y + ID_BATCH + + + CHANNEL_ID + Y + CHANNEL_ID + + + LOG_DATE + Y + LOG_DATE + + + TRANSNAME + Y + TRANSNAME + + + STEPNAME + Y + STEPNAME + + + STEP_COPY + Y + STEP_COPY + + + LINES_READ + Y + LINES_READ + + + LINES_WRITTEN + Y + LINES_WRITTEN + + + LINES_UPDATED + Y + LINES_UPDATED + + + LINES_INPUT + Y + LINES_INPUT + + + LINES_OUTPUT + Y + LINES_OUTPUT + + + LINES_REJECTED + Y + LINES_REJECTED + + + ERRORS + Y + ERRORS + + + LOG_FIELD + N + LOG_FIELD + + + + + +
+ + + ID_BATCH + Y + ID_BATCH + + + CHANNEL_ID + Y + CHANNEL_ID + + + LOG_DATE + Y + LOG_DATE + + + METRICS_DATE + Y + METRICS_DATE + + + METRICS_CODE + Y + METRICS_CODE + + + METRICS_DESCRIPTION + Y + METRICS_DESCRIPTION + + + METRICS_SUBJECT + Y + METRICS_SUBJECT + + + METRICS_TYPE + Y + METRICS_TYPE + + + METRICS_VALUE + Y + METRICS_VALUE + + + + + +
+ + 0.0 + 0.0 + + 10000 + 50 + 50 + N + Y + 50000 + Y + + N + 1000 + 100 + + + + + + + + + - + 2015/06/01 15:33:04.607 + - + 2015/06/01 15:50:19.484 + H4sIAAAAAAAAAAMAAAAAAAAAAAA= + N + + + + + + Get Variables + Write to log + Y + + + + Get Variables + GetVariable + + Y + + 1 + + none + + + + + PDI_TEST_MESSAGE + ${PDI_TEST_MESSAGE} + String + + + + + -1 + -1 + none + + + + + + + + + + + + 128 + 64 + Y + + + + Write to log + WriteToLog + + Y + + 1 + + none + + + log_level_basic + Y + N + 0 + + + + PDI_TEST_MESSAGE + + + + + + + + + + + + 288 + 64 + Y + + + + + + + N + + diff --git a/source-code/ktrs/process1/task1.ktr b/source-code/ktrs/process1/task1.ktr new file mode 100644 index 0000000..6414b78 --- /dev/null +++ b/source-code/ktrs/process1/task1.ktr @@ -0,0 +1,552 @@ + + + + task1 + + + + Normal + 0 + / + + + + + + +
+ + + + + ID_BATCH + Y + ID_BATCH + + + CHANNEL_ID + Y + CHANNEL_ID + + + TRANSNAME + Y + TRANSNAME + + + STATUS + Y + STATUS + + + LINES_READ + Y + LINES_READ + + + + LINES_WRITTEN + Y + LINES_WRITTEN + + + + LINES_UPDATED + Y + LINES_UPDATED + + + + LINES_INPUT + Y + LINES_INPUT + + + + LINES_OUTPUT + Y + LINES_OUTPUT + + + + LINES_REJECTED + Y + LINES_REJECTED + + + + ERRORS + Y + ERRORS + + + STARTDATE + Y + STARTDATE + + + ENDDATE + Y + ENDDATE + + + LOGDATE + Y + LOGDATE + + + DEPDATE + Y + DEPDATE + + + REPLAYDATE + Y + REPLAYDATE + + + LOG_FIELD + Y + LOG_FIELD + + + EXECUTING_SERVER + N + EXECUTING_SERVER + + + EXECUTING_USER + N + EXECUTING_USER + + + CLIENT + N + CLIENT + + + + + +
+ + + + ID_BATCH + Y + ID_BATCH + + + SEQ_NR + Y + SEQ_NR + + + LOGDATE + Y + LOGDATE + + + TRANSNAME + Y + TRANSNAME + + + STEPNAME + Y + STEPNAME + + + STEP_COPY + Y + STEP_COPY + + + LINES_READ + Y + LINES_READ + + + LINES_WRITTEN + Y + LINES_WRITTEN + + + LINES_UPDATED + Y + LINES_UPDATED + + + LINES_INPUT + Y + LINES_INPUT + + + LINES_OUTPUT + Y + LINES_OUTPUT + + + LINES_REJECTED + Y + LINES_REJECTED + + + ERRORS + Y + ERRORS + + + INPUT_BUFFER_ROWS + Y + INPUT_BUFFER_ROWS + + + OUTPUT_BUFFER_ROWS + Y + OUTPUT_BUFFER_ROWS + + + + + +
+ + + ID_BATCH + Y + ID_BATCH + + + CHANNEL_ID + Y + CHANNEL_ID + + + LOG_DATE + Y + LOG_DATE + + + LOGGING_OBJECT_TYPE + Y + LOGGING_OBJECT_TYPE + + + OBJECT_NAME + Y + OBJECT_NAME + + + OBJECT_COPY + Y + OBJECT_COPY + + + REPOSITORY_DIRECTORY + Y + REPOSITORY_DIRECTORY + + + FILENAME + Y + FILENAME + + + OBJECT_ID + Y + OBJECT_ID + + + OBJECT_REVISION + Y + OBJECT_REVISION + + + PARENT_CHANNEL_ID + Y + PARENT_CHANNEL_ID + + + ROOT_CHANNEL_ID + Y + ROOT_CHANNEL_ID + + + + + +
+ + + ID_BATCH + Y + ID_BATCH + + + CHANNEL_ID + Y + CHANNEL_ID + + + LOG_DATE + Y + LOG_DATE + + + TRANSNAME + Y + TRANSNAME + + + STEPNAME + Y + STEPNAME + + + STEP_COPY + Y + STEP_COPY + + + LINES_READ + Y + LINES_READ + + + LINES_WRITTEN + Y + LINES_WRITTEN + + + LINES_UPDATED + Y + LINES_UPDATED + + + LINES_INPUT + Y + LINES_INPUT + + + LINES_OUTPUT + Y + LINES_OUTPUT + + + LINES_REJECTED + Y + LINES_REJECTED + + + ERRORS + Y + ERRORS + + + LOG_FIELD + N + LOG_FIELD + + + + + +
+ + + ID_BATCH + Y + ID_BATCH + + + CHANNEL_ID + Y + CHANNEL_ID + + + LOG_DATE + Y + LOG_DATE + + + METRICS_DATE + Y + METRICS_DATE + + + METRICS_CODE + Y + METRICS_CODE + + + METRICS_DESCRIPTION + Y + METRICS_DESCRIPTION + + + METRICS_SUBJECT + Y + METRICS_SUBJECT + + + METRICS_TYPE + Y + METRICS_TYPE + + + METRICS_VALUE + Y + METRICS_VALUE + + + + + +
+ + 0.0 + 0.0 + + 10000 + 50 + 50 + N + Y + 50000 + Y + + N + 1000 + 100 + + + + + + + + + - + 2021/04/19 10:30:54.841 + - + 2021/04/19 10:30:54.841 + H4sIAAAAAAAAAAMAAAAAAAAAAAA= + N + + + + + + Get Variable + Delay row + Y + + + Delay row + Abort + Y + + + + Abort + Abort + + Y + + 1 + + none + + + 0 + + Y + ABORT_WITH_ERROR + + + + + + + + + + 320 + 128 + Y + + + + Delay row + Delay + + Y + + 1 + + none + + + 5 + seconds + + + + + + + + + + 208 + 128 + Y + + + + Get Variable + GetVariable + + Y + + 1 + + none + + + + + PDI_TEST_WELCOME_MESSAGE + ${PDI_TEST_WELCOME_MESSAGE} + String + + + + + -1 + -1 + none + + + + + + + + + + + + 80 + 128 + Y + + + + + + + N + + diff --git a/source-code/ktrs/process1/task2.ktr b/source-code/ktrs/process1/task2.ktr new file mode 100644 index 0000000..0be547e --- /dev/null +++ b/source-code/ktrs/process1/task2.ktr @@ -0,0 +1,516 @@ + + + + task2 + + + + Normal + 0 + / + + + + + + +
+ + + + + ID_BATCH + Y + ID_BATCH + + + CHANNEL_ID + Y + CHANNEL_ID + + + TRANSNAME + Y + TRANSNAME + + + STATUS + Y + STATUS + + + LINES_READ + Y + LINES_READ + + + + LINES_WRITTEN + Y + LINES_WRITTEN + + + + LINES_UPDATED + Y + LINES_UPDATED + + + + LINES_INPUT + Y + LINES_INPUT + + + + LINES_OUTPUT + Y + LINES_OUTPUT + + + + LINES_REJECTED + Y + LINES_REJECTED + + + + ERRORS + Y + ERRORS + + + STARTDATE + Y + STARTDATE + + + ENDDATE + Y + ENDDATE + + + LOGDATE + Y + LOGDATE + + + DEPDATE + Y + DEPDATE + + + REPLAYDATE + Y + REPLAYDATE + + + LOG_FIELD + Y + LOG_FIELD + + + EXECUTING_SERVER + N + EXECUTING_SERVER + + + EXECUTING_USER + N + EXECUTING_USER + + + CLIENT + N + CLIENT + + + + + +
+ + + + ID_BATCH + Y + ID_BATCH + + + SEQ_NR + Y + SEQ_NR + + + LOGDATE + Y + LOGDATE + + + TRANSNAME + Y + TRANSNAME + + + STEPNAME + Y + STEPNAME + + + STEP_COPY + Y + STEP_COPY + + + LINES_READ + Y + LINES_READ + + + LINES_WRITTEN + Y + LINES_WRITTEN + + + LINES_UPDATED + Y + LINES_UPDATED + + + LINES_INPUT + Y + LINES_INPUT + + + LINES_OUTPUT + Y + LINES_OUTPUT + + + LINES_REJECTED + Y + LINES_REJECTED + + + ERRORS + Y + ERRORS + + + INPUT_BUFFER_ROWS + Y + INPUT_BUFFER_ROWS + + + OUTPUT_BUFFER_ROWS + Y + OUTPUT_BUFFER_ROWS + + + + + +
+ + + ID_BATCH + Y + ID_BATCH + + + CHANNEL_ID + Y + CHANNEL_ID + + + LOG_DATE + Y + LOG_DATE + + + LOGGING_OBJECT_TYPE + Y + LOGGING_OBJECT_TYPE + + + OBJECT_NAME + Y + OBJECT_NAME + + + OBJECT_COPY + Y + OBJECT_COPY + + + REPOSITORY_DIRECTORY + Y + REPOSITORY_DIRECTORY + + + FILENAME + Y + FILENAME + + + OBJECT_ID + Y + OBJECT_ID + + + OBJECT_REVISION + Y + OBJECT_REVISION + + + PARENT_CHANNEL_ID + Y + PARENT_CHANNEL_ID + + + ROOT_CHANNEL_ID + Y + ROOT_CHANNEL_ID + + + + + +
+ + + ID_BATCH + Y + ID_BATCH + + + CHANNEL_ID + Y + CHANNEL_ID + + + LOG_DATE + Y + LOG_DATE + + + TRANSNAME + Y + TRANSNAME + + + STEPNAME + Y + STEPNAME + + + STEP_COPY + Y + STEP_COPY + + + LINES_READ + Y + LINES_READ + + + LINES_WRITTEN + Y + LINES_WRITTEN + + + LINES_UPDATED + Y + LINES_UPDATED + + + LINES_INPUT + Y + LINES_INPUT + + + LINES_OUTPUT + Y + LINES_OUTPUT + + + LINES_REJECTED + Y + LINES_REJECTED + + + ERRORS + Y + ERRORS + + + LOG_FIELD + N + LOG_FIELD + + + + + +
+ + + ID_BATCH + Y + ID_BATCH + + + CHANNEL_ID + Y + CHANNEL_ID + + + LOG_DATE + Y + LOG_DATE + + + METRICS_DATE + Y + METRICS_DATE + + + METRICS_CODE + Y + METRICS_CODE + + + METRICS_DESCRIPTION + Y + METRICS_DESCRIPTION + + + METRICS_SUBJECT + Y + METRICS_SUBJECT + + + METRICS_TYPE + Y + METRICS_TYPE + + + METRICS_VALUE + Y + METRICS_VALUE + + + + + +
+ + 0.0 + 0.0 + + 10000 + 50 + 50 + N + Y + 50000 + Y + + N + 1000 + 100 + + + + + + + + + - + 2021/04/19 10:30:54.841 + - + 2021/04/19 10:30:54.841 + H4sIAAAAAAAAAAMAAAAAAAAAAAA= + N + + + + + + Get Variable + Dummy (do nothing) + Y + + + + Dummy (do nothing) + Dummy + + Y + + 1 + + none + + + + + + + + + + + + 208 + 128 + Y + + + + Get Variable + GetVariable + + Y + + 1 + + none + + + + + PDI_TEST_WELCOME_MESSAGE + ${PDI_TEST_WELCOME_MESSAGE} + String + + + + + -1 + -1 + none + + + + + + + + + + + + 80 + 128 + Y + + + + + + + N + +