diff --git a/airflow/dags/polygonetl_airflow/build_verify_streaming_dag.py b/airflow/dags/polygonetl_airflow/build_verify_streaming_dag.py index f829873..318ad44 100644 --- a/airflow/dags/polygonetl_airflow/build_verify_streaming_dag.py +++ b/airflow/dags/polygonetl_airflow/build_verify_streaming_dag.py @@ -18,6 +18,7 @@ def build_verify_streaming_dag( destination_dataset_project_id, chain='polygon', notification_emails=None, + extra_streaming_tables=None, start_date=datetime(2018, 7, 1), schedule='*/10 * * * *', max_lag_in_minutes=15): @@ -51,16 +52,24 @@ def build_verify_streaming_dag( dags_folder = os.environ.get('DAGS_FOLDER', '/home/airflow/gcs/dags') - def add_verify_tasks(task, dependencies=None): + def add_verify_tasks(task, dependencies=None, params=None): # The queries in verify/sqls will fail when the condition is not met # Have to use this trick since the Python 2 version of BigQueryCheckOperator doesn't support standard SQL # and legacy SQL can't be used to query partitioned tables. sql_path = os.path.join(dags_folder, 'resources/stages/verify_streaming/sqls/{task}.sql'.format(task=task)) sql = read_file(sql_path) + + combined_params = environment.copy() + task_id = 'verify_{task}'.format(task=task) + if params: + combined_params.update(params) + serialized_params = '_'.join(params.values()).replace('.', '_') + task_id = task_id + '_' + serialized_params + verify_task = BigQueryInsertJobOperator( - task_id=f"verify_{task}", + task_id=task_id, configuration={"query": {"query": sql, "useLegacySql": False}}, - params=environment, + params=combined_params, dag=dag, ) if dependencies is not None and len(dependencies) > 0: @@ -78,6 +87,13 @@ def add_verify_tasks(task, dependencies=None): add_verify_tasks('transactions_count') + # Use this to verify the lag of a streaming job https://github.com/blockchain-etl/blockchain-etl-streaming by piping a Pub/Sub topic to a BigQuery Table + # https://cloud.google.com/blog/products/data-analytics/pub-sub-launches-direct-path-to-bigquery-for-streaming-analytics + if extra_streaming_tables is not None and len(extra_streaming_tables) > 0: + streaming_table_list = [table.strip() for table in extra_streaming_tables.split(',')] + for streaming_table in streaming_table_list: + add_verify_tasks('extra_streaming_tables_have_latest', params={'streaming_table': streaming_table}) + return dag diff --git a/airflow/dags/polygonetl_airflow/variables.py b/airflow/dags/polygonetl_airflow/variables.py index ca98b3e..a3ccf0f 100644 --- a/airflow/dags/polygonetl_airflow/variables.py +++ b/airflow/dags/polygonetl_airflow/variables.py @@ -112,6 +112,11 @@ def read_parse_dag_vars(var_prefix, **kwargs): def read_verify_streaming_dag_vars(var_prefix, **kwargs): vars = { 'destination_dataset_project_id': read_var('destination_dataset_project_id', var_prefix, True, **kwargs), + # Comma separated list of fully qualified BigQuery table names e.g. myproject.streamers.blocks1,myproject.streamers.blocks2 + # Each table must have a column data containing a JSON object with field timestamp containing unix seconds + # Used in combination with https://cloud.google.com/blog/products/data-analytics/pub-sub-launches-direct-path-to-bigquery-for-streaming-analytics + # to verify lag of streaming jobs that output to Pub/Sub + 'extra_streaming_tables': read_var('extra_streaming_tables', var_prefix, False, **kwargs), 'notification_emails': read_var('notification_emails', None, False, **kwargs), } diff --git a/airflow/dags/resources/stages/verify_streaming/sqls/extra_streaming_tables_have_latest.sql b/airflow/dags/resources/stages/verify_streaming/sqls/extra_streaming_tables_have_latest.sql new file mode 100644 index 0000000..51d247e --- /dev/null +++ b/airflow/dags/resources/stages/verify_streaming/sqls/extra_streaming_tables_have_latest.sql @@ -0,0 +1,9 @@ +select if( +( +select timestamp_diff( + current_timestamp(), + (select max(timestamp_seconds(cast(json_extract(data, '$.timestamp') AS INTEGER))) + from `{{params.streaming_table}}`), + MINUTE) +) < {{params.max_lag_in_minutes}}, 1, +cast((select 'Streaming table {{params.streaming_table}} is lagging by more than {{params.max_lag_in_minutes}} minutes') as INT64)) \ No newline at end of file