diff --git a/pom.xml b/pom.xml
index 4f875515b..0fe870832 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
com.zendesk
maxwell
- 1.37.3
+ 1.37.7
jar
maxwell
@@ -123,8 +123,17 @@
-
+
+ com.google.cloud
+ google-cloud-bigquerystorage
+ 2.14.2
+
+
+ com.google.cloud
+ google-cloud-bigquery
+ 2.13.3
+
com.mchange
c3p0
@@ -133,7 +142,7 @@
com.amazonaws
aws-java-sdk-core
- 1.12.137
+ 1.12.217
com.amazonaws
@@ -218,7 +227,7 @@
com.zendesk
mysql-binlog-connector-java
- 0.25.3
+ 0.25.6
net.sf.jopt-simple
@@ -278,7 +287,7 @@
com.google.protobuf
protobuf-java
- 3.16.1
+ 3.20.0
io.dropwizard.metrics
@@ -313,12 +322,12 @@
com.viafoura
metrics-datadog
- 2.0.0-RC2
+ 2.0.0-RC3
com.amazonaws
aws-java-sdk-sns
- 1.12.137
+ 1.12.191
com.amazonaws
@@ -556,4 +565,4 @@
-
+
\ No newline at end of file
diff --git a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java
index 9be74598b..d900cce8e 100644
--- a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java
+++ b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java
@@ -203,6 +203,21 @@ public class MaxwellConfig extends AbstractConfig {
*/
public String ddlPubsubTopic;
+ /**
+ * {@link com.zendesk.maxwell.producer.MaxwellBigQueryProducer} project id
+ */
+ public String bigQueryProjectId;
+
+ /**
+ * {@link com.zendesk.maxwell.producer.MaxwellBigQueryProducer} dataset
+ */
+ public String bigQueryDataset;
+
+ /**
+ * {@link com.zendesk.maxwell.producer.MaxwellBigQueryProducer} table
+ */
+ public String bigQueryTable;
+
/**
* {@link com.zendesk.maxwell.producer.MaxwellPubsubProducer} bytes request threshold
*/
@@ -789,6 +804,14 @@ protected MaxwellOptionParser buildOptionParser() {
parser.accepts( "nats_url", "Url(s) of Nats connection (comma separated). Default is localhost:4222" ).withRequiredArg();
parser.accepts( "nats_subject", "Subject Hierarchies of Nats. Default is '%{database}.%{table}'" ).withRequiredArg();
+ parser.section( "bigquery" );
+ parser.accepts( "bigquery_project_id", "provide a google cloud platform project id associated with the bigquery table" )
+ .withRequiredArg();
+ parser.accepts( "bigquery_dataset", "provide a google cloud platform dataset id associated with the bigquery table" )
+ .withRequiredArg();
+ parser.accepts( "bigquery_table", "provide a google cloud platform table id associated with the bigquery table" )
+ .withRequiredArg();
+
parser.section( "pubsub" );
parser.accepts( "pubsub_project_id", "provide a google cloud platform project id associated with the pubsub topic" )
.withRequiredArg();
@@ -994,6 +1017,10 @@ private void setup(OptionSet options, Properties properties) {
this.kafkaPartitionHash = fetchStringOption("kafka_partition_hash", options, properties, "default");
this.ddlKafkaTopic = fetchStringOption("ddl_kafka_topic", options, properties, this.kafkaTopic);
+ this.bigQueryProjectId = fetchStringOption("bigquery_project_id", options, properties, null);
+ this.bigQueryDataset = fetchStringOption("bigquery_dataset", options, properties, null);
+ this.bigQueryTable = fetchStringOption("bigquery_table", options, properties, null);
+
this.pubsubProjectId = fetchStringOption("pubsub_project_id", options, properties, null);
this.pubsubTopic = fetchStringOption("pubsub_topic", options, properties, "maxwell");
this.ddlPubsubTopic = fetchStringOption("ddl_pubsub_topic", options, properties, this.pubsubTopic);
diff --git a/src/main/java/com/zendesk/maxwell/MaxwellContext.java b/src/main/java/com/zendesk/maxwell/MaxwellContext.java
index 484886a7d..2cbbaf217 100644
--- a/src/main/java/com/zendesk/maxwell/MaxwellContext.java
+++ b/src/main/java/com/zendesk/maxwell/MaxwellContext.java
@@ -550,6 +550,9 @@ public AbstractProducer getProducer() throws IOException {
case "redis":
this.producer = new MaxwellRedisProducer(this);
break;
+ case "bigquery":
+ this.producer = new MaxwellBigQueryProducer(this, this.config.bigQueryProjectId, this.config.bigQueryDataset, this.config.bigQueryTable);
+ break;
case "none":
this.producer = new NoneProducer(this);
break;
diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java
new file mode 100644
index 000000000..0468c3dc6
--- /dev/null
+++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java
@@ -0,0 +1,267 @@
+package com.zendesk.maxwell.producer;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.Table;
+import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
+import com.google.cloud.bigquery.storage.v1.Exceptions;
+import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException;
+import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
+import com.google.cloud.bigquery.storage.v1.TableName;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.zendesk.maxwell.MaxwellContext;
+import com.zendesk.maxwell.monitoring.Metrics;
+import com.zendesk.maxwell.replication.Position;
+import com.zendesk.maxwell.row.RowMap;
+import com.zendesk.maxwell.schema.BqToBqStorageSchemaConverter;
+import com.zendesk.maxwell.util.StoppableTask;
+import com.zendesk.maxwell.util.StoppableTaskState;
+
+import io.grpc.Status;
+import io.grpc.Status.Code;
+import java.io.IOException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeoutException;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+
+import javax.annotation.concurrent.GuardedBy;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class BigQueryCallback implements ApiFutureCallback {
+ public final Logger LOGGER = LoggerFactory.getLogger(BigQueryCallback.class);
+
+ private final MaxwellBigQueryProducerWorker parent;
+ private final AbstractAsyncProducer.CallbackCompleter cc;
+ private final Position position;
+ private MaxwellContext context;
+ AppendContext appendContext;
+
+ private Counter succeededMessageCount;
+ private Counter failedMessageCount;
+ private Meter succeededMessageMeter;
+ private Meter failedMessageMeter;
+
+ private static final int MAX_RETRY_COUNT = 2;
+ private final ImmutableList RETRIABLE_ERROR_CODES = ImmutableList.of(Code.INTERNAL, Code.ABORTED,
+ Code.CANCELLED);
+
+ public BigQueryCallback(MaxwellBigQueryProducerWorker parent,
+ AppendContext appendContext,
+ AbstractAsyncProducer.CallbackCompleter cc,
+ Position position,
+ Counter producedMessageCount, Counter failedMessageCount,
+ Meter succeededMessageMeter, Meter failedMessageMeter,
+ MaxwellContext context) {
+ this.parent = parent;
+ this.appendContext = appendContext;
+ this.cc = cc;
+ this.position = position;
+ this.succeededMessageCount = producedMessageCount;
+ this.failedMessageCount = failedMessageCount;
+ this.succeededMessageMeter = succeededMessageMeter;
+ this.failedMessageMeter = failedMessageMeter;
+ this.context = context;
+ }
+
+ @Override
+ public void onSuccess(AppendRowsResponse response) {
+ this.succeededMessageCount.inc();
+ this.succeededMessageMeter.mark();
+
+ if (LOGGER.isDebugEnabled()) {
+ try {
+ LOGGER.debug("-> {}\n" +
+ " {}\n",
+ this.appendContext.r.toJSON(), this.position);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ cc.markCompleted();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ this.failedMessageCount.inc();
+ this.failedMessageMeter.mark();
+
+ LOGGER.error(t.getClass().getSimpleName() + " @ " + position);
+ LOGGER.error(t.getLocalizedMessage());
+
+ Status status = Status.fromThrowable(t);
+ if (appendContext.retryCount < MAX_RETRY_COUNT
+ && RETRIABLE_ERROR_CODES.contains(status.getCode())) {
+ appendContext.retryCount++;
+ try {
+ this.parent.sendAsync(appendContext.r, this.cc);
+ cc.markCompleted();
+ return;
+ } catch (Exception e) {
+ System.out.format("Failed to retry append: %s\n", e);
+ }
+ }
+
+ synchronized (this.parent.lock) {
+ if (this.parent.error == null) {
+ StorageException storageException = Exceptions.toStorageException(t);
+ this.parent.error = (storageException != null) ? storageException : new RuntimeException(t);
+ }
+ }
+ cc.markCompleted();
+ }
+}
+
+public class MaxwellBigQueryProducer extends AbstractProducer {
+
+ private final ArrayBlockingQueue queue;
+ private final MaxwellBigQueryProducerWorker worker;
+
+ public MaxwellBigQueryProducer(MaxwellContext context,String bigQueryProjectId,
+ String bigQueryDataset, String bigQueryTable)
+ throws IOException {
+ super(context);
+ this.queue = new ArrayBlockingQueue<>(100);
+ this.worker = new MaxwellBigQueryProducerWorker(context, this.queue, bigQueryProjectId, bigQueryDataset, bigQueryTable);
+
+ TableName table = TableName.of(bigQueryProjectId, bigQueryDataset, bigQueryTable);
+ try {
+ this.worker.initialize(table);
+ } catch (DescriptorValidationException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ Thread thread = new Thread(this.worker, "maxwell-bigquery-worker");
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+
+
+ @Override
+ public void push(RowMap r) throws Exception {
+ this.queue.put(r);
+ }
+}
+
+class AppendContext {
+ JSONArray data;
+ int retryCount = 0;
+ RowMap r = null;
+
+ AppendContext(JSONArray data, int retryCount, RowMap r) {
+ this.data = data;
+ this.retryCount = retryCount;
+ this.r = r;
+ }
+}
+
+class MaxwellBigQueryProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask {
+ static final Logger LOGGER = LoggerFactory.getLogger(MaxwellBigQueryProducerWorker.class);
+
+ private final ArrayBlockingQueue queue;
+ private StoppableTaskState taskState;
+ private Thread thread;
+
+ public MaxwellBigQueryProducerWorker(MaxwellContext context,
+ ArrayBlockingQueue queue,String bigQueryProjectId,
+ String bigQueryDataset, String bigQueryTable) throws IOException {
+ super(context);
+ this.queue = queue;
+ Metrics metrics = context.getMetrics();
+ this.taskState = new StoppableTaskState("MaxwellBigQueryProducerWorker");
+ }
+
+ public final Object lock = new Object();
+ private JsonStreamWriter streamWriter;
+
+ @GuardedBy("lock")
+ public RuntimeException error = null;
+
+ public void initialize(TableName tName)
+ throws DescriptorValidationException, IOException, InterruptedException {
+
+ BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(tName.getProject()).build().getService();
+ Table table = bigquery.getTable(tName.getDataset(), tName.getTable());
+ Schema schema = table.getDefinition().getSchema();
+ TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema);
+ streamWriter = JsonStreamWriter.newBuilder(tName.toString(), tableSchema).build();
+ }
+
+ @Override
+ public void requestStop() throws Exception {
+ taskState.requestStop();
+ streamWriter.close();
+ synchronized (this.lock) {
+ if (this.error != null) {
+ throw this.error;
+ }
+ }
+ }
+
+ @Override
+ public void awaitStop(Long timeout) throws TimeoutException {
+ taskState.awaitStop(thread, timeout);
+ }
+
+ @Override
+ public void run() {
+ this.thread = Thread.currentThread();
+ while (true) {
+ try {
+ RowMap row = queue.take();
+ if (!taskState.isRunning()) {
+ taskState.stopped();
+ return;
+ }
+ this.push(row);
+ } catch (Exception e) {
+ taskState.stopped();
+ context.terminate(e);
+ return;
+ }
+ }
+ }
+
+ @Override
+ public void sendAsync(RowMap r, CallbackCompleter cc) throws Exception {
+ synchronized (this.lock) {
+ if (this.error != null) {
+ throw this.error;
+ }
+ }
+ JSONArray jsonArr = new JSONArray();
+ JSONObject record = new JSONObject(r.toJSON(outputConfig));
+ LOGGER.debug("maxwell incoming log -> " + r.toJSON(outputConfig));
+ //stringfy columns in order to adapt noon cdc log table schema
+ String data = record.getJSONObject("data").toString();
+ String old = record.getJSONObject("old").toString();
+ String primary_key = record.get("primary_key").toString();
+ record.put("data", data);
+ record.put("old", old);
+ record.put("primary_key", primary_key);
+
+ jsonArr.put(record);
+ AppendContext appendContext = new AppendContext(jsonArr, 0, r);
+
+ ApiFuture future = streamWriter.append(appendContext.data);
+ ApiFutures.addCallback(
+ future, new BigQueryCallback(this, appendContext, cc, r.getNextPosition(),
+ this.succeededMessageCount, this.failedMessageCount, this.succeededMessageMeter, this.failedMessageMeter,
+ this.context),
+ MoreExecutors.directExecutor());
+ }
+}
diff --git a/src/main/java/com/zendesk/maxwell/schema/BqToBqStorageSchemaConverter.java b/src/main/java/com/zendesk/maxwell/schema/BqToBqStorageSchemaConverter.java
new file mode 100644
index 000000000..cfda9609e
--- /dev/null
+++ b/src/main/java/com/zendesk/maxwell/schema/BqToBqStorageSchemaConverter.java
@@ -0,0 +1,88 @@
+package com.zendesk.maxwell.schema;
+
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.StandardSQLTypeName;
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.common.collect.ImmutableMap;
+
+/** Converts structure from BigQuery client to BigQueryStorage client */
+public class BqToBqStorageSchemaConverter {
+ private static ImmutableMap BQTableSchemaModeMap =
+ ImmutableMap.of(
+ Field.Mode.NULLABLE, TableFieldSchema.Mode.NULLABLE,
+ Field.Mode.REPEATED, TableFieldSchema.Mode.REPEATED,
+ Field.Mode.REQUIRED, TableFieldSchema.Mode.REQUIRED);
+
+ private static ImmutableMap BQTableSchemaTypeMap =
+ new ImmutableMap.Builder()
+ .put(StandardSQLTypeName.BOOL, TableFieldSchema.Type.BOOL)
+ .put(StandardSQLTypeName.BYTES, TableFieldSchema.Type.BYTES)
+ .put(StandardSQLTypeName.DATE, TableFieldSchema.Type.DATE)
+ .put(StandardSQLTypeName.DATETIME, TableFieldSchema.Type.DATETIME)
+ .put(StandardSQLTypeName.FLOAT64, TableFieldSchema.Type.DOUBLE)
+ .put(StandardSQLTypeName.GEOGRAPHY, TableFieldSchema.Type.GEOGRAPHY)
+ .put(StandardSQLTypeName.INT64, TableFieldSchema.Type.INT64)
+ .put(StandardSQLTypeName.NUMERIC, TableFieldSchema.Type.NUMERIC)
+ .put(StandardSQLTypeName.STRING, TableFieldSchema.Type.STRING)
+ .put(StandardSQLTypeName.STRUCT, TableFieldSchema.Type.STRUCT)
+ .put(StandardSQLTypeName.TIME, TableFieldSchema.Type.TIME)
+ .put(StandardSQLTypeName.TIMESTAMP, TableFieldSchema.Type.TIMESTAMP)
+ .build();
+
+ /**
+ * Converts from BigQuery client Table Schema to bigquery storage API Table Schema.
+ *
+ * @param schema the BigQuery client Table Schema
+ * @return the bigquery storage API Table Schema
+ */
+ public static TableSchema convertTableSchema(Schema schema) {
+ TableSchema.Builder result = TableSchema.newBuilder();
+ for (int i = 0; i < schema.getFields().size(); i++) {
+ result.addFields(i, convertFieldSchema(schema.getFields().get(i)));
+ }
+ return result.build();
+ }
+
+ /**
+ * Converts from bigquery v2 Field Schema to bigquery storage API Field Schema.
+ *
+ * @param field the BigQuery client Field Schema
+ * @return the bigquery storage API Field Schema
+ */
+ public static TableFieldSchema convertFieldSchema(Field field) {
+ TableFieldSchema.Builder result = TableFieldSchema.newBuilder();
+ if (field.getMode() == null) {
+ field = field.toBuilder().setMode(Field.Mode.NULLABLE).build();
+ }
+ result.setMode(BQTableSchemaModeMap.get(field.getMode()));
+ result.setName(field.getName());
+ result.setType(BQTableSchemaTypeMap.get(field.getType().getStandardType()));
+ if (field.getDescription() != null) {
+ result.setDescription(field.getDescription());
+ }
+ if (field.getSubFields() != null) {
+ for (int i = 0; i < field.getSubFields().size(); i++) {
+ result.addFields(i, convertFieldSchema(field.getSubFields().get(i)));
+ }
+ }
+ return result.build();
+ }
+}