Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
FROM maven:3.6-jdk-11 as builder
ENV MAXWELL_VERSION=1.37.7 KAFKA_VERSION=1.0.0
ENV MAXWELL_VERSION=1.37.3 KAFKA_VERSION=2.7.0

RUN apt-get update \
&& apt-get -y upgrade \
Expand Down
2 changes: 1 addition & 1 deletion bin/maxwell
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fi

CLASSPATH="$CLASSPATH:$lib_dir/*"

KAFKA_VERSION="1.0.0"
KAFKA_VERSION="2.7.0"

function use_kafka() {
wanted="$1"
Expand Down
8 changes: 7 additions & 1 deletion config.properties.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ password=maxwell


# *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis|bigquery
#producer=kafka

# set the log level. note that you can configure things further in log4j2.xml
Expand Down Expand Up @@ -218,6 +218,12 @@ kafka.acks=1
#pubsub_topic=maxwell
#ddl_pubsub_topic=maxwell_ddl

# *** bigquery ***

#bigquery_project_id=myproject
#bigquery_dataset=mydataset
#bigquery_table=mytable

# *** rabbit-mq ***

#rabbitmq_host=rabbitmq_hostname
Expand Down
9 changes: 9 additions & 0 deletions docs/docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,15 @@ pubsub_total_timeout | LONG | Puts a limit on the value in secon

_See also:_ [PubSub Producer Documentation](/producers#google-cloud-pubsub)

## bigquery producer
option | argument | description | default
-------------------------------|-------------------------------------| --------------------------------------------------- | -------
bigquery_project_id | STRING | Google Cloud bigquery project id |
bigquery_dataset | STRING | Google Cloud bigquery dataset id |
bigquery_table | STRING | Google Cloud bigquery table id |

_See also:_ [PubSub Producer Documentation](/producers#google-cloud-bigquery)

## rabbitmq producer
option | argument | description | default
-------------------------------|-------------------------------------| --------------------------------------------------- | -------
Expand Down
28 changes: 28 additions & 0 deletions docs/docs/producers.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,34 @@ for DDL updates by setting the `ddl_pubsub_topic` property.

The producer uses the [Google Cloud Java Library for Pub/Sub](https://github.com/GoogleCloudPlatform/google-cloud-java/tree/master/google-cloud-pubsub) and uses its built-in configurations.

# Google Cloud BigQuery
***
To stream data into Google Cloud Bigquery, first there must be a table created on bigquery in order to stream the data
into defined as `bigquery_project_id.bigquery_dataset.bigquery_table`. The schema of the table must match the outputConfig. The column types should be defined as below

- database: string
- table: string
- type: string
- ts: integer
- xid: integer
- xoffset: integer
- commit: boolean
- position: string
- gtid: string
- server_id: integer
- primary_key: string
- data: string
- old: string

See the Google Cloud Platform docs for the [latest examples of which permissions are needed](https://cloud.google.com/bigquery/docs/access-control), as well as [how to properly configure service accounts](https://cloud.google.com/compute/docs/access/create-enable-service-accounts-for-instances).

Set the output stream in `config.properties` by setting the `bigquery_project_id`, `bigquery_dataset` and `bigquery_table` properties.

The producer uses the [Google Cloud Java Bigquery Storage Library for Bigquery](https://github.com/googleapis/java-bigquerystorage) [Bigquery Storage Write API documenatation](https://cloud.google.com/bigquery/docs/write-api).
To use the Storage Write API, you must have `bigquery.tables.updateData` permissions.

This producer is using the Default Stream with at-least once semantics for greater data resiliency and fewer scaling restrictions

# RabbitMQ
***
To produce messages to RabbitMQ, you will need to specify a host in `config.properties` with `rabbitmq_host`. This is the only required property, everything else falls back to a sane default.
Expand Down
9 changes: 9 additions & 0 deletions docs/docs/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \
--pubsub_topic='maxwell'
```

## Google Cloud Bigquery

```
bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \
--producer=bigquery --bigquery_project_id='$BIGQUERY_PROJECT_ID' \
--bigquery_dataset='$BIGQUERY_DATASET' \
--bigquery_table='$BIGQUERY_TABLE'
```

## RabbitMQ

```
Expand Down
15 changes: 12 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,17 @@
</dependencies>
</profile>
</profiles>

<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquerystorage</artifactId>
<version>2.14.2</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>com.mchange</groupId>
<artifactId>c3p0</artifactId>
Expand Down Expand Up @@ -278,7 +287,7 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.16.1</version>
<version>3.20.0</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
Expand Down Expand Up @@ -556,4 +565,4 @@
</plugins>
</pluginManagement>
</build>
</project>
</project>
27 changes: 27 additions & 0 deletions src/main/java/com/zendesk/maxwell/MaxwellConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,21 @@ public class MaxwellConfig extends AbstractConfig {
*/
public String ddlPubsubTopic;

/**
* {@link com.zendesk.maxwell.producer.MaxwellBigQueryProducer} project id
*/
public String bigQueryProjectId;

/**
* {@link com.zendesk.maxwell.producer.MaxwellBigQueryProducer} dataset
*/
public String bigQueryDataset;

/**
* {@link com.zendesk.maxwell.producer.MaxwellBigQueryProducer} table
*/
public String bigQueryTable;

/**
* {@link com.zendesk.maxwell.producer.MaxwellPubsubProducer} bytes request threshold
*/
Expand Down Expand Up @@ -789,6 +804,14 @@ protected MaxwellOptionParser buildOptionParser() {
parser.accepts( "nats_url", "Url(s) of Nats connection (comma separated). Default is localhost:4222" ).withRequiredArg();
parser.accepts( "nats_subject", "Subject Hierarchies of Nats. Default is '%{database}.%{table}'" ).withRequiredArg();

parser.section( "bigquery" );
parser.accepts( "bigquery_project_id", "provide a google cloud platform project id associated with the bigquery table" )
.withRequiredArg();
parser.accepts( "bigquery_dataset", "provide a google cloud platform dataset id associated with the bigquery table" )
.withRequiredArg();
parser.accepts( "bigquery_table", "provide a google cloud platform table id associated with the bigquery table" )
.withRequiredArg();

parser.section( "pubsub" );
parser.accepts( "pubsub_project_id", "provide a google cloud platform project id associated with the pubsub topic" )
.withRequiredArg();
Expand Down Expand Up @@ -994,6 +1017,10 @@ private void setup(OptionSet options, Properties properties) {
this.kafkaPartitionHash = fetchStringOption("kafka_partition_hash", options, properties, "default");
this.ddlKafkaTopic = fetchStringOption("ddl_kafka_topic", options, properties, this.kafkaTopic);

this.bigQueryProjectId = fetchStringOption("bigquery_project_id", options, properties, null);
this.bigQueryDataset = fetchStringOption("bigquery_dataset", options, properties, null);
this.bigQueryTable = fetchStringOption("bigquery_table", options, properties, null);

this.pubsubProjectId = fetchStringOption("pubsub_project_id", options, properties, null);
this.pubsubTopic = fetchStringOption("pubsub_topic", options, properties, "maxwell");
this.ddlPubsubTopic = fetchStringOption("ddl_pubsub_topic", options, properties, this.pubsubTopic);
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/zendesk/maxwell/MaxwellContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,9 @@ public AbstractProducer getProducer() throws IOException {
case "redis":
this.producer = new MaxwellRedisProducer(this);
break;
case "bigquery":
this.producer = new MaxwellBigQueryProducer(this, this.config.bigQueryProjectId, this.config.bigQueryDataset, this.config.bigQueryTable);
break;
case "none":
this.producer = new NoneProducer(this);
break;
Expand Down
Loading