Skip to content

conduitio-labs/conduit-connector-bigquery

Repository files navigation

Conduit Connector for BigQuery

Conduit connector for BigQuery.

Source

A source connector pulls data from BigQuery and pushes it to downstream resources via Conduit.

Implementation

The connector pulls data from BigQuery for a dataset or selected tables of users choice. The connector syncs incrementally this means it keeps on looking for new insertion/updation happening every time interval user specified in any of the table the data is pulled for and syncs it. If the Conduit stops or pauses midway the connector will make sure to pull the data which was not pull earlier.

for eg,

  • table A and table B are synced.
  • Pipeline is paused after syncing complete table A and table B till index 5.
  • On resuming the pipeline - Connector sync data from table B index 6 and would not sync table A's already synced rows.

How to build?

Run make build to build the connector.

Configuration

name description required default value
serviceAccount service account with access to project. ref: https://cloud.google.com/docs/authentication/getting-started true -
projectID The Project ID on endpoint true -
datasetID The dataset ID to pull data from. true -
tableID Specify comma separated table IDs. Will pull whole dataset if no Table ID present. false all tables in dataset
datasetLocation Specify location were dataset exist true -
pollingTime Specify time foramtted as a time.Duration string, after which polling of data should be done. For eg, "2s", "500ms" false 5m
incrementingColumnName Specify the column name which provide visibility about newer row or newer updates. It can be either updated_at timestamp which specifies when the table was last updated. It can be a ID of type int or float whose value increases with every new record coming in. User need to provide column name for table in a format - 'columnName' without any spaces Eg: 'created_by' where created_by is column name. Table with no value will be pulled without any ordering. false -
primaryKeyColName Specify the primary key column name. eg, ID of type int or float or any primary key. User need to provide column name for each table in a format - 'columnName' without any spaces Eg: 'created_by' where created_by is column name. true -

How to configure

Create a connector using - POST /v1/connectors API

Sample payload-

{
  "config": {
    "name": "google_bigQuery",
    "settings": {
      "datasetID": "<dataset_name>",
      "datasetLocation": "<location_of_dataset eg, US>",
      "projectID": "<project_name eg, conduit-connectors>",
      "serviceAccount": "<path_to_key eg, /home/username/conduit-connectors-cf3466b16662.json>",
      "tableID": "< table_names_to_sync eg, table_1,table_2>",
      "incrementingColumnName": "<table_name:incrementingColName,table_name2:incrementingColName2... eg, table1:id,table2:updatedat>",
      "primaryKeyColName": "<table_name:primaryColName,table_name2:primaryColName2... eg, table1:id,table2:_id>",
      "pollingTime": "<time in duration eg,1m20s>"
    }
  },
  "type": "TYPE_SOURCE",
  "plugin": "standalone:bigquery", // the connector's binary should be put into a directory that holds all the standalone connectors as described here https://github.com/ConduitIO/conduit/issues/427#issuecomment-1227396725.
  "pipelineId": "<ID of pipeline created eg, 8cb0e678-d797-401c-8d23-f1e9e859b885>"
}

Testing

Run make test to run all the unit tests. To run the test cases export environment variable - GOOGLE_SERVICE_ACCOUNT and GOOGLE_PROJECT_ID where,

Known Issues & Limitations

  • Current implementation handles snapshot and incremental data.
  • The connector is able to send record's Key as sdk.RawData only.

Planned work

  • Handle Delete data

Specification

The spec.go file provides a programmatic representation of the configuration options. This is used by the Conduit server to validate configuration and dynamically display configuration options to end users.