- Reusable Plugins
- Putting it all together into a Pipeline
- Building the CDF/CDAP Plugin (JAR file / JSON file) and deploying into CDF/CDAP
The CDF/CDAP plugins detailed below can be reused in the context of data pipelines.
Let's say you run your incremental pipeline once every 5 minutes. When running an incremental pipeline, you have to filter the records by a specific field (e.g., lastUpdateDateTime
of records > latest watermark value - buffer time) so it will sync the records that were updated since your last incremental sync. Subsequently, a merge and dedupe step is done to make sure only new/updated are synced into the destination table.
Plugin Description
Creates, reads, and updates checkpoints in incremental pull pipelines.
CheckPointReadAction
- reads checkpoints in Firestore DB and provides the data during runtime as environment variable
CheckPointUpdateAction
- updates checkpoints in Firestore DB (i.e., creates a new document and stores maximum update date / time from BQ so the next run it can use this checkpoint value to filter records that were added since then)
For now these plugins only support timestamp values - in the future, integer values can potentially be added.
-
Setup Firestore DB
- Firestore is used to store / read checkpoints, which is used in your incremental pipelines
-
Create a collection with a document from the parent path /
- Collection ID:
PIPELINE_CHECKPOINTS
- Document ID:
INCREMENTAL_DEMO
- Collection ID:
- Create a collection under Parent path
/PIPELINE_CHECKPOINTS/INCREMENTAL_DEMO
- Collection ID:
CHECKPOINT
- Document ID: just accept what was provided initially
-
Field #1
-
Note:
- Set to maximum timestamp from destination (BQ table)
- Set to minimum timestamp if running for the first time from source (e.g., SQL server table)
-
Field name:
CREATED_TIMESTAMP
-
Field type:
string
-
Date and time:
2020-05-08 17:21:01
-
-
Field #2
- Note: enter the current time in timestamp format
- Field name: CREATED_TIMESTAMP
- Field type: timestamp
- Date and time: 25/08/2020, 15:49
-
- Collection ID:
Before running the pipeline, add the lastWatermarkValue
variable as runtime argument (on Pipeline Studio view, click on drop-down arrow for Run button) and set the value = 0 :
CheckpointReadAction will populate lastWatermarkValue with the CHECKPOINT_VALUE from Firestore. lastWatermarkValue runtime argument will be used as parameter of the import query of the Database Source in a subsequent step:
SELECT * FROM test WHERE last_update_datetime > '${latestWatermarkValue}'
BigQuery - actual destination table name (this is where max checkpoint is taken from - i.e., max timestamp)
Use Case
This plugin can be used at the beginning of an incremental CDAP data pipeline to read the checkpoint value from the last sync.
Let's say you run your pipeline once every 5 minutes. When running an incremental pipeline, you have to filter the records by a specific field (timestamp - current date > current date -3) - it is doing merge and dedupe even though we are processing the same records to make sure duplicate records are not in the destination table.
CheckPointReadAction
- reads checkpoints in Firestore DB and provides the data during runtime as environment variable
CheckPointUpdateAction
- updates checkpoints in Firestore DB (i.e., creates a new document and stores maximum update date / time from BQ so the next run it can use this checkpoint value to filter records that were added since then)
For now these plugins only support timestamp values - in the future, integer values can potentially be added.
CheckpointReadAction
plugin requires the following config properties:
- Label : plugin label name.
- Specify the collection name in firestore DB: Name of the Collection.
- Specify the document name to read the checkpoint details: Provide the document name specified in the Collection.
- Buffer time to add to checkpoint value. (Note: in Minutes): Number of minutes that need to be subtracted from the Firestore collection value.
- Project: project ID.
- Key path: Service account key file path to communicate with the Firestore DB.
Please see the following screenshot for example.
CheckpointUpdateAction
plugin requires the following configuration:
- Label : plugin label name.
- Specify the collection name in firestore DB: Name of the Collection.
- Specify the document name to read the checkpoint details: Provide the document name specified in the Collection.
- Dataset name where incremental pull table exists: Big Query Dataset name.
- Table name that needs incremental pull: Big Query table name.
- Specify the checkpoint column from incremental pull table:
- Project: project ID.
- Key path: Service account key file path to communicate with the Firestore DB.
Please see the below screenshot for example:
Plugin description
Copies the BigQuery table from staging to destination at the end of the pipeline run. A new table is created if it doesn't exist. Otherwise, if the table exists, the plugin replaces the existing BigQuery destination table with data from staging.
Use case
This is applicable in the CDAP data pipelines which do the full import/scan the data from source system to BigQuery.
Dependencies
Destination dataset : bq_dataset
Destination table : bq_table
Source dataset : bq_dataset_batch_staging
Source table : bq_table
CopyTableAction
plugin requires the following configuration:
- Label: plugin label name.
- Key path: Service account key file path to call the Big Query API.
- Project ID: GCP project ID.
- Dataset: Big Query dataset name.
- Table Name: Big Query table name.
Please see the following screenshot for example:
Plugin Description
Drops a BigQuery table in the beginning of the pipeline runs.
Use Case
Useful to drop staging tables.
Dependencies
Requires BQ table to drop to exist.
Drop table action plugin requires the following configuration:
- Label : plugin label name.
- Key path: Service account key file path to call the Big Query API.
- Project ID: GCP project ID.
- Dataset: Big Query dataset name.
- Table Name: Big Query table name.
Please see the following screenshot for example configuration:
Plugin Description
Truncates a BigQuery table when we set pipelines to restore the data from source.
Use Case
Applicable in restoring data pipelines from source.
TruncateTable action plugin requires the following configuration:
- Label : plugin label name.
- Key path: Service account key file path to call the Big Query API.
- Project ID: GCP project ID.
- Dataset: Big Query dataset name.
- Table Name: Big Query table name.
Please see the following screenshot for example configuration:
CheckPointReadAction
→ TruncateTableAction
→ Database → BigQuery → MergeLastUpdateTSAction
→ CheckPointUpdateAction
What does the pipeline do?
CheckPointReadAction
- reads latest checkpoint from FirestoreTruncateTableAction
- truncate the records in the log table- Database Source- imports data from the source
- BigQuery Sink - exports data into BigQuery from previous step (database source)
MergeLastUpdateTSAction
- merge based on timestamp and the update column list (columns to keep in the merge).- Note: Alternatively, you can use
BigQueryExecute
action to do a Merge.
- Note: Alternatively, you can use
CheckPointUpdateAction
- update checkpoint in Firestore from the max record lastUpdateTimestamp in BigQuery
Label:
CheckPointReadAction
Specify the document name to read the checkpoint details*:
INCREMENTAL_DEMO
Buffer time to add to checkpoint value. (Note: in Minutes):
1
project:
pso-cdf-plugins-287518
serviceFilePath:
auto-detect
Screenshot:
Label:
TruncateTableAction
Key path:*
auto-detect
ProjectId :*
pso-cdf-plugins-287518
Dataset *
bq_dataset
Table name*
bq_table_LOG
Label * Database
Reference Name*
test
Plugin Name*
sqlserver42
Plugin Type
jdbc
Connection String
jdbc:sqlserver://:;databaseName=main;user=;password=;
Import Query:
SELECT * FROM test WHERE last_update_datetime > '${latestWatermarkValue}'
Label * BigQuery
Reference Name*
bq_table_sink
Project ID
pso-cdf-plugins-287518
Dataset*
bq_dataset
Table* (write to a temporary table, e.g., bq_table_LOG)
bq_table_LOG
Service Account File Path
auto-detect
Schema
Label*
MergeLastUpdateTSAction
Key path*
auto-detect
Project ID*
pso-cdf-plugins-287518
Dataset name
bq_dataset
Table name*
bq_table
Primary key list*
id
Update columns list*
id,name,last_update_datetime
Label*
CheckPointUpdateAction
Specify the collection name in firestore DB*
PIPELINE_CHECKPOINTS
Specify the document name to read the checkpoint details*
INCREMENTAL_DEMO
Dataset name where incremental pull table exists*
bq_dataset
Table name that needs incremental pull*
bq_table
Specify the checkpoint column from incremental pull table*
last_update_datetime
serviceFilePath
auto-detect
project
pso-cdf-plugins-287518
This plugin requires Java JDK1.8 and maven.
- To build the CDAP / CDF plugin jar, execute the following command on the root.
mvn clean compile package
-
You will find the generated JAR file and JSON file under target folder:
GoogleFunctions-1.6.jar
GoogleFunctions-1.6.json
-
Deploy
GoogleFunctions-1.6.jar
andGoogleFunctions-1.6.json
into CDF/CDAP (note that if you have the same version already deployed then you’ll get an error that it already exists):- Go to Control Center
- Delete
GoogleFunctions
artifact if the same version already exists. - Upload plugin by clicking on the circled green + button
- Pick the JAR file / JSON file created under target folder
- You’ll see a confirmation of the successful plugin upload