π The most popular Apache Airflow plugin for ClickHouse, ranked in the top 1% of downloads on PyPI. Based on awesome mymarilyn/clickhouse-driver.
This plugin provides two families of operators: richer clickhouse_driver.Client.execute
-based and standardized compatible with Python DB API 2.0.
Both operators' families are fully supported and covered with tests for different versions of Airflow and Python.
ClickHouseOperator
ClickHouseHook
ClickHouseSensor
These operators are based on mymarilyn/clickhouse-driver's Client.execute
method and arguments. They offer a full functionality of clickhouse-driver
and are recommended if you are starting fresh with ClickHouse in Airflow.
- SQL Templating: SQL queries and other parameters are templated.
- Multiple SQL Queries: execute run multiple SQL queries within a single
ClickHouseOperator
. The result of the last query is pushed to XCom (configurable bydo_xcom_push
). - Logging: executed queries are logged in a visually pleasing format, making it easier to track and debug.
- Efficient Native ClickHouse Protocol: Utilizes efficient native ClickHouse TCP protocol, thanks to clickhouse-driver. Does not support HTTP protocol.
- Custom Connection Parameters: Supports additional ClickHouse connection parameters, such as various timeouts,
compression
,secure
, through the Airflow Connection.extra property.
See reference and examples below.
pip install -U airflow-clickhouse-plugin
Dependencies: only apache-airflow
and clickhouse-driver
.
- Operators:
ClickHouseSQLExecuteQueryOperator
ClickHouseSQLColumnCheckOperator
ClickHouseSQLTableCheckOperator
ClickHouseSQLCheckOperator
ClickHouseSQLValueCheckOperator
ClickHouseSQLIntervalCheckOperator
ClickHouseSQLThresholdCheckOperator
ClickHouseBranchSQLOperator
ClickHouseDbApiHook
ClickHouseSqlSensor
These operators combine clickhouse_driver.dbapi
with apache-airflow-providers-common-sql. While they have limited functionality compared to Client.execute
(not all arguments are supported), they provide a standardized interface. This is useful when porting Airflow pipelines to ClickHouse from another SQL provider backed by common.sql
Airflow package, such as MySQL, Postgres, BigQuery, and others.
The feature set of this version is fully based on common.sql
Airflow provider: refer to its reference and examples for details.
An example is also available below.
Add common.sql
extra when installing the plugin: pip install -U airflow-clickhouse-plugin[common.sql]
β to enable DB API 2.0 operators.
Dependencies: apache-airflow-providers-common-sql
(usually pre-packed with Airflow) in addition to apache-airflow
and clickhouse-driver
.
Different versions of the plugin support different combinations of Python and Airflow versions. We primarily support Airflow 2.0+ and Python 3.8+. If you need to use the plugin with older Python-Airflow combinations, pick a suitable plugin version:
airflow-clickhouse-plugin version | Airflow version | Python version |
---|---|---|
1.0.0 | >=2.0.0,<2.7.0 | ~=3.8 |
0.11.0 | ~=2.0.0,>=2.2.0,<2.7.0 | ~=3.7 |
0.10.0,0.10.1 | ~=2.0.0,>=2.2.0,<2.6.0 | ~=3.7 |
0.9.0,0.9.1 | ~=2.0.0,>=2.2.0,<2.5.0 | ~=3.7 |
0.8.2 | >=2.0.0,<2.4.0 | ~=3.7 |
0.8.0,0.8.1 | >=2.0.0,<2.3.0 | ~=3.6 |
0.7.0 | >=2.0.0,<2.2.0 | ~=3.6 |
0.6.0 | ~=2.0.1 | ~=3.6 |
>=0.5.4,<0.6.0 | ~=1.10.6 | >=2.7 or >=3.5.* |
>=0.5.0,<0.5.4 | ==1.10.6 | >=2.7 or >=3.5.* |
~=
means compatible release, see PEP 440 for an
explanation.
Previous versions of the plugin might require pandas
extra: pip install airflow-clickhouse-plugin[pandas]==0.11.0
. Check out earlier versions of README.md
for details.
To see examples scroll down. To run them, create an Airflow connection to ClickHouse.
To import ClickHouseOperator
use from airflow_clickhouse_plugin.operators.clickhouse import ClickHouseOperator
.
Supported arguments:
sql
(templated, required): query (if argument is a singlestr
) or multiple queries (iterable ofstr
). Supports files with.sql
extension.clickhouse_conn_id
: Airflow connection id. Connection schema is described below. Default connection id isclickhouse_default
.- Arguments of
clickhouse_driver.Client.execute
method:parameters
(templated): passedparams
of theexecute
method. (Renamed to avoid name conflict with Airflow tasks'params
argument.)dict
forSELECT
queries.list
/tuple
/generator forINSERT
queries.- If multiple queries are provided via
sql
then theparameters
are passed to all of them.
with_column_types
(not templated).external_tables
(templated).query_id
(templated).settings
(templated).types_check
(not templated).columnar
(not templated).- For the documentation of these arguments, refer to
clickhouse_driver.Client.execute
API reference.
database
(templated): if present, overridesschema
of Airflow connection.- Other arguments (including a required
task_id
) are inherited from Airflow BaseOperator.
Result of the last query is pushed to XCom (disable using do_xcom_push=False
argument).
In other words, the operator simply wraps ClickHouseHook.execute
method.
See example below.
To import ClickHouseHook
use from airflow_clickhouse_plugin.hooks.clickhouse import ClickHouseHook
.
Supported kwargs of constructor (__init__
method):
clickhouse_conn_id
: Airflow connection id. Connection schema is described below. Default connection id isclickhouse_default
.database
: if present, overridesschema
of Airflow connection.
Defines ClickHouseHook.execute
method which simply wraps clickhouse_driver.Client.execute
. It has all the same arguments, except of:
sql
(instead ofexecute
'squery
): query (if argument is a singlestr
) or multiple queries (iterable ofstr
).
ClickHouseHook.execute
returns a result of the last query.
Also, the hook defines get_conn()
method which returns an underlying clickhouse_driver.Client instance.
See example below.
To import ClickHouseSensor
use from airflow_clickhouse_plugin.sensors.clickhouse import ClickHouseSensor
.
This class wraps ClickHouseHook.execute
method into an Airflow sensor. Supports all the arguments of ClickHouseOperator
and additionally:
is_success
: a callable which accepts a single argument β a return value ofClickHouseHook.execute
. If a return value ofis_success
is truthy, the sensor succeeds. By default, the callable isbool
: i.e. if the return value ofClickHouseHook.execute
is truthy, the sensor succeeds. Usually,execute
is a list of records returned by query: thus, by default it is falsy if no records are returned.is_failure
: a callable which accepts a single argument β a return value ofClickHouseHook.execute
. If a return value ofis_failure
is truthy, the sensor raisesAirflowException
. By default,is_failure
isNone
and no failure check is performed.
See example below.
As a type
of a new connection, choose SQLite or any other SQL database. There is no special ClickHouse connection type yet, so we use any SQL as the closest one.
All the connection attributes are optional: default host is localhost
and other credentials have defaults defined by clickhouse-driver
. If you use non-default values, set them according to the connection schema.
If you use a secure connection to ClickHouse (this requires additional configurations on ClickHouse side), set extra
to {"secure":true}
. All extra
connection parameters are passed to clickhouse_driver.Client
as-is.
clickhouse_driver.Client
is initialized with attributes stored in Airflow Connection attributes:
Airflow Connection attribute | Client.__init__ argument |
---|---|
host |
host |
port (int ) |
port |
schema |
database |
login |
user |
password |
password |
extra |
**kwargs |
database
argument of ClickHouseOperator
, ClickHouseHook
, ClickHouseSensor
, and others overrides schema
attribute of the Airflow connection.
You may set non-standard arguments of clickhouse_driver.Client
, such as timeouts, compression
, secure
, etc. using Airflow's Connection.extra
attribute. The attribute should contain a JSON object which will be deserialized and all of its properties will be passed as-is to the Client
.
For example, if Airflow connection contains extra='{"secure": true}'
then the Client.__init__
will receive secure=True
keyword argument in addition to other connection attributes.
You should install specific packages to support compression. For example, for lz4:
pip3 install clickhouse-cityhash lz4
Then you should include compression
parameter in airflow connection uri: extra='{"compression":"lz4"}'
. You can get additional information about extra options from official documentation of clickhouse-driver.
Connection URI with compression will look like clickhouse://login:password@host:port/?compression=lz4
.
See official documentation to learn more about connections management in Airflow.
If some Airflow connection attribute is not set, it is not passed to clickhouse_driver.Client
. In such cases, the plugin uses a default value from the corresponding clickhouse_driver.Connection
argument. For instance, user
defaults to 'default'
.
This means that the plugin itself does not define any default values for the ClickHouse connection. You may fully rely on default values of the clickhouse-driver version you use.
The only exception is host
: if the attribute of Airflow connection is not set then 'localhost'
is used.
By default, the plugin uses Airflow connection with id 'clickhouse_default'
.
from airflow import DAG
from airflow_clickhouse_plugin.operators.clickhouse import ClickHouseOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id='update_income_aggregate',
start_date=days_ago(2),
) as dag:
ClickHouseOperator(
task_id='update_income_aggregate',
database='default',
sql=(
'''
INSERT INTO aggregate
SELECT eventDt, sum(price * qty) AS income FROM sales
WHERE eventDt = '{{ ds }}' GROUP BY eventDt
''', '''
OPTIMIZE TABLE aggregate ON CLUSTER {{ var.value.cluster_name }}
PARTITION toDate('{{ execution_date.format('%Y-%m-01') }}')
''', '''
SELECT sum(income) FROM aggregate
WHERE eventDt BETWEEN
'{{ execution_date.start_of('month').to_date_string() }}'
AND '{{ execution_date.end_of('month').to_date_string() }}'
''',
# result of the last query is pushed to XCom
),
# query_id is templated and allows to quickly identify query in ClickHouse logs
query_id='{{ ti.dag_id }}-{{ ti.task_id }}-{{ ti.run_id }}-{{ ti.try_number }}',
clickhouse_conn_id='clickhouse_test',
) >> PythonOperator(
task_id='print_month_income',
python_callable=lambda task_instance:
# pulling XCom value and printing it
print(task_instance.xcom_pull(task_ids='update_income_aggregate')),
)
from airflow import DAG
from airflow_clickhouse_plugin.hooks.clickhouse import ClickHouseHook
from airflow.providers.sqlite.hooks.sqlite import SqliteHook
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
def sqlite_to_clickhouse():
sqlite_hook = SqliteHook()
ch_hook = ClickHouseHook()
records = sqlite_hook.get_records('SELECT * FROM some_sqlite_table')
ch_hook.execute('INSERT INTO some_ch_table VALUES', records)
with DAG(
dag_id='sqlite_to_clickhouse',
start_date=days_ago(2),
) as dag:
dag >> PythonOperator(
task_id='sqlite_to_clickhouse',
python_callable=sqlite_to_clickhouse,
)
Important note: don't try to insert values using ch_hook.execute('INSERT INTO some_ch_table VALUES (1)')
literal form. clickhouse-driver
requires values for INSERT
query to be provided via parameters
due to specifics of the native ClickHouse protocol.
from airflow import DAG
from airflow_clickhouse_plugin.sensors.clickhouse import ClickHouseSensor
from airflow_clickhouse_plugin.operators.clickhouse import ClickHouseOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id='listen_warnings',
start_date=days_ago(2),
) as dag:
dag >> ClickHouseSensor(
task_id='poke_events_count',
database='monitor',
sql="SELECT count() FROM warnings WHERE eventDate = '{{ ds }}'",
is_success=lambda cnt: cnt > 10000,
) >> ClickHouseOperator(
task_id='create_alert',
database='alerts',
sql='''
INSERT INTO events SELECT eventDate, count()
FROM monitor.warnings WHERE eventDate = '{{ ds }}'
''',
)
from airflow import DAG
from airflow_clickhouse_plugin.sensors.clickhouse_dbapi import ClickHouseSqlSensor
from airflow_clickhouse_plugin.operators.clickhouse_dbapi import ClickHouseSQLExecuteQueryOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id='listen_warnings',
start_date=days_ago(2),
) as dag:
dag >> ClickHouseSqlSensor(
task_id='poke_events_count',
hook_params=dict(schema='monitor'),
sql="SELECT count() FROM warnings WHERE eventDate = '{{ ds }}'",
success=lambda cnt: cnt > 10000,
conn_id=None, # required by common.sql SqlSensor; use None for default
) >> ClickHouseSQLExecuteQueryOperator(
task_id='create_alert',
database='alerts',
sql='''
INSERT INTO events SELECT eventDate, count()
FROM monitor.warnings WHERE eventDate = '{{ ds }}'
''',
)
Unit tests: python3 -m unittest discover -t tests -s unit
Integration tests require access to a ClickHouse server. Here is how to set up a local test environment using Docker:
- Run ClickHouse server in a local Docker container:
docker run -p 9000:9000 --ulimit nofile=262144:262144 -it clickhouse/clickhouse-server
- Run tests with Airflow connection details set via environment variable:
PYTHONPATH=src AIRFLOW_CONN_CLICKHOUSE_DEFAULT=clickhouse://localhost python3 -m unittest discover -t tests -s integration
- Stop the container after running the tests to deallocate its resources.
Run all (unit&integration) tests with ClickHouse connection defined: PYTHONPATH=src AIRFLOW_CONN_CLICKHOUSE_DEFAULT=clickhouse://localhost python3 -m unittest discover -s tests
GitHub Action is configured for this project.
Start a ClickHouse server inside Docker: docker exec -it $(docker run --rm -d clickhouse/clickhouse-server) bash
The above command will open bash
inside the container.
Install dependencies into container and run tests (execute inside container):
apt-get update
apt-get install -y python3 python3-pip git make
git clone https://github.com/whisklabs/airflow-clickhouse-plugin.git
cd airflow-clickhouse-plugin
python3 -m pip install -r requirements.txt
PYTHONPATH=src AIRFLOW_CONN_CLICKHOUSE_DEFAULT=clickhouse://localhost python3 -m unittest discover -s tests
Stop the container.
- Created by Anton Bryzgalov, @bryzgaloff, originally at Whisk, Samsung
- Inspired by Viktor Taranenko, @viktortnk (Whisk, Samsung)
Community contributors:
- Danila Ganchar, @d-ganchar
- Mikhail, @glader
- Alexander Chashnikov, @ne1r0n
- Simone Brundu, @saimon46
- @gkarg
- Stanislav Morozov, @r3b-fish
- Sergey Bychkov, @SergeyBychkov
- @was-av
- Maxim Tarasov, @MaximTar
- @dvnrvn
- Giovanni Corsetti, @CorsettiS
- Dmytro Zhyzniev, @1ng4lipt