Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating docs for callbacks #375

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 0 additions & 44 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,50 +65,6 @@ If you want to split your DAG configuration into multiple files, you can do so b

![custom_operators.png](img/custom_operators.png)

### Callbacks

**dag-factory** also supports using "callbacks" at the DAG, Task, and TaskGroup level. These callbacks can be defined in
a few different ways. The first points directly to a Python function that has been defined in the `include/callbacks.py`
file.

```yaml
example_dag1:
on_failure_callback: include.callbacks.example_callback1
...
```

Here, the `on_success_callback` points to first a file, and then to a function name within that file. Notice that this
callback is defined using `default_args`, meaning this callback will be applied to all tasks.

```yaml
example_dag1:
...
default_args:
on_success_callback_file: /usr/local/airflow/include/callbacks.py
on_success_callback_name: example_callback1
```

**dag-factory** users can also leverage provider-built tools when configuring callbacks. In this example, the
`send_slack_notification` function from the Slack provider is used to dispatch a message when a DAG failure occurs. This
function is passed to the `callback` key under `on_failure_callback`. This pattern allows for callback definitions to
take parameters (such as `text`, `channel`, and `username`, as shown here).

**Note that this functionality is currently only supported for `on_failure_callback`'s defined at the DAG-level, or in
`default_args`. Support for other callback types and Task/TaskGroup-level definitions are coming soon.**

```yaml
example_dag1:
on_failure_callback:
callback: airflow.providers.slack.notifications.slack.send_slack_notification
slack_conn_id: example_slack_id
text: |
:red_circle: Task Failed.
This task has failed and needs to be addressed.
Please remediate this issue ASAP.
channel: analytics-alerts
username: Airflow
...
```

## Notes

Expand Down
100 changes: 100 additions & 0 deletions docs/features/callbacks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Callbacks
DAG Factory supports the use of callbacks. These callbacks can be set at the DAG, TaskGroup, or Task level. The way
that callbacks that can be configured for DAGs, TaskGroups, and Tasks differ slightly, and details around this can be
found in the [Apache Airflow documentation](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/callbacks.html#).

Within DAG Factory itself, there are three approaches to defining callbacks. The goal is to make this process
intuitive and provide parity with the traditional DAG authoring experience. These approaches to configure callbacks
are outlined below, each with an example of implementation. While proceeding examples are all defined for individual
Tasks, callbacks can also be defined using `default_args`, or at the DAG and TaskGroup level.

* [Passing a string that points to a callable](#passing-a-string-that-points-to-a-callable)
* [Specifying a user-defined `.py` and the function within that file to be executed](#specifying-a-user-defined-py-file-and-function)
* [Configuring callbacks from providers](#provider-callbacks)


## Passing a string that points to a callable

The most traditional way of configuring callbacks is by defining a custom function within the Airflow project and
assigning that callback to the desired Task. Using the syntax below, this can be implemented using DAG Factory. In this
case, the `output_standard_message` function is a user-defined function stored in the `include/custom_callbacks.py`
file. This function requires no parameters, and the YAML would take the form below.

For this example to be implemented in DAG Factory, the `include/custom_callbacks.py` file must be on the Python
`sys.path`. If this is not the case, the full path to a `.py` function can be specified, as shown below.

```yaml
...

task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo task_1"
on_failure_callback: include.custom_callbacks.output_standard_message
...
```

Sometimes, a function may have parameters that need to be defined within the Task itself. Here, the
`output_custom_message` callback takes two key-word arguments; `param1`, and `param2`. These values are defined in the
YAML itself, offering DAG Factory authors an additional degree of flexibility and verbosity.

```yaml
...

task_2:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo task_2"
on_success_callback:
callback: include.custom_callbacks.output_custom_message
param1: "Task status"
param2: "Successful!"
...
```


## Specifying a user-defined `.py` file and function

In addition to passing a string that points to a callback, the full path to the file and name of the callback can be
specified for a DAG, TaskGroup, or Task. This provides a viable option for defining a callback when the director the
`.py` file is stored in is not on the Python path.

```yaml
...

task_3:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo task_3"
on_retry_callback_name: output_standard_message
on_retry_callback_file: /usr/local/airflow/include/custom_callbacks.py
...
```

Note that this method for defining callbacks in DAG Factory does not allow for parameters to be passed to the callable
within the YAML itself.


## Provider callbacks

In addition to custom-built callbacks, there are a number of provider-built callbacks that can be used when defining a
DAG. With DAG Factory, these callbacks can be configured similar to how they would be when authoring a traditional DAG.
First, the type of callback is specified (`on_success_callback`, `on_failure_callback`, etc.). The `callback` key-value
pair specifies the provider-built function to be executed. Then, the specific key-word arguments the callback takes can
be specified, as shown below.

Note that the provider package being used must be available on the Python `sys.path` path, meaning it may need to be
`pip installed`.

```yaml
...
task_4:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo task_4"
on_failure_callback:
callback: airflow.providers.slack.notifications.slack.send_slack_notification
slack_conn_id: slack_conn_id
text: |
:red_circle: Task Failed.
This task has failed and needs to be addressed.
Please remediate this issue ASAP.
channel: "#channel"
...
```
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Are you new to DAG Factory? This is the place to start!

* [Dynamic tasks](features/dynamic_tasks.md)
* [Datasets scheduling](features/datasets.md)
* [Callbacks](features/callbacks.md)

## Getting help

Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ nav:
- Features:
- features/dynamic_tasks.md
- features/datasets.md
- features/callbacks.md
- Comparison:
- comparison/index.md
- Traditional Airflow Operators: comparison/traditional_operators.md
Expand Down