This repo contains an example Cloud Composer workflow that triggers Cloud Dataflow to transform, enrich and load a delimited text file into Cloud BigQuery.
The goal of this example is to provide a common pattern to automatically trigger, via Google Cloud Function, a Dataflow job when a file arrives in Google Cloud Storage, process the data and load it into BigQuery.
A Cloud Function with a Cloud Storage trigger is used to initiate the workflow when a file is uploaded for processing.
At a high-level the Cloud Composer workflow performs the following steps:
- Extracts the location of the input file that triggered the workflow.
- Executes a Cloud Dataflow job that performs the following:
- Parses the delimited input file and adds some useful 'metadata'
- 'filename': The name of the file that is proceeded by the Cloud Dataflow job
- 'load_dt': The date in YYYY-MM-DD format when the file is processed
- Loads the data into an existing Cloud BigQuery table (any existing data is truncated)
- Parses the delimited input file and adds some useful 'metadata'
- Moves the input file to a Cloud Storage bucket that is setup for storing processed files.
When a file is uploaded to the Cloud Storage bucket, a Cloud Function is triggered. This invocation wraps the event information (bucket and object details) that triggered this event and passes it to the the Cloud Composer workflow that gets triggered. The workflow extracts this information and passes it to the Cloud Dataflow job.
job_args = {
'input': 'gs://{{ dag_run.conf.get("bucket") }}/{{ dag_run.conf.get("name") }}',
'output': models.Variable.get('bq_output_table'),
'fields': models.Variable.get('input_field_names'),
'load_dt': ds_tag
}
The workflow then executes a Cloud Dataflow job to process the delimited file, adds filename and load_dt fields and loads the data into a Cloud BigQuery table.
Based on the status of the Cloud Dataflow job, the workflow will then move the processed files to a Cloud Storage bucket setup to store processed data. A separate folder is created along with a processed date field to hold the files in this bucket.
Ready to dive deeper? Check out the complete code here
It is recommended that virtualenv be used to keep everything tidy. The requirements.txt describes the dependencies needed for the code used in this repo.
The following high-level steps describe the setup needed to run this example:
- Create a Cloud Storage (GCS) bucket for receiving input files (input-gcs-bucket).
- Create a GCS bucket for storing processed files (output-gcs-bucket).
- Create a Cloud Composer environment - Follow these steps to create a Cloud Composer environment if needed (cloud-composer-env).
- Create a Cloud BigQuery table for the processed output. The following schema is used for this example:
Column Name | Column Type |
---|---|
state | STRING |
gender | STRING |
year | STRING |
name | STRING |
number | STRING |
created_date | STRING |
filename | STRING |
load_dt | DATE |
- Set the following Airflow variables needed for this example:
Key | Value | Example |
---|---|---|
gcp_project | your-gcp-project-id | cloud-comp-df-demo |
gcp_temp_location | gcs-bucket-for-dataflow-temp-files | gs://my-comp-df-demo-temp/tmp |
gcs_completion_bucket | output-gcs-bucket | my-comp-df-demp-output |
input_field_names | comma-separated-field-names-for-delimited-file | state,gender,year,name,number,created_date |
bq_output_table | bigquery-output-table | my_dataset.usa_names |
[email protected] | [email protected] |
The variables can be set as follows:
gcloud composer environments run
cloud-composer-env-name variables -- --set
key val
-
Browse to the Cloud Composer widget in Cloud Console and click on the DAG folder icon as shown below:
-
The DAG folder is essentially a Cloud Storage bucket. Upload the simple_load_dag.py file into the folder:
-
Upload the Python Dataflow code process_delimited.py into a dataflow folder created in the base DAG folder.
-
Finally follow these instructions to create a Cloud Function.
- Ensure that the DAG_NAME property is set to GcsToBigQueryTriggered i.e. The DAG name defined in simple_load_dag.py.
The workflow is automatically triggered by Cloud Function that gets invoked when a new file is uploaded into the input-gcs-bucket For this example workflow, the usa_names.csv file can be uploaded into the input-gcs-bucket
gsutil cp resources/usa_names.csv gs://
input-gcs-bucket