diff --git a/Dockerfile b/Dockerfile index b0010ce3a..50c33352c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ FROM maven:3.6-jdk-11 as builder -ENV MAXWELL_VERSION=1.37.7 KAFKA_VERSION=1.0.0 +ENV MAXWELL_VERSION=1.37.3 KAFKA_VERSION=2.7.0 RUN apt-get update \ && apt-get -y upgrade \ diff --git a/bin/maxwell b/bin/maxwell index 4dd47e9a6..b1987b897 100755 --- a/bin/maxwell +++ b/bin/maxwell @@ -18,7 +18,7 @@ fi CLASSPATH="$CLASSPATH:$lib_dir/*" -KAFKA_VERSION="1.0.0" +KAFKA_VERSION="2.7.0" function use_kafka() { wanted="$1" diff --git a/config.properties.example b/config.properties.example index a862236bc..c9286251b 100644 --- a/config.properties.example +++ b/config.properties.example @@ -11,7 +11,7 @@ password=maxwell # *** general *** -# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis +# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis|bigquery #producer=kafka # set the log level. note that you can configure things further in log4j2.xml @@ -218,6 +218,12 @@ kafka.acks=1 #pubsub_topic=maxwell #ddl_pubsub_topic=maxwell_ddl +# *** bigquery *** + +#bigquery_project_id=myproject +#bigquery_dataset=mydataset +#bigquery_table=mytable + # *** rabbit-mq *** #rabbitmq_host=rabbitmq_hostname diff --git a/docs/docs/config.md b/docs/docs/config.md index 11da42592..9b7185fef 100644 --- a/docs/docs/config.md +++ b/docs/docs/config.md @@ -129,6 +129,15 @@ pubsub_total_timeout | LONG | Puts a limit on the value in secon _See also:_ [PubSub Producer Documentation](/producers#google-cloud-pubsub) +## bigquery producer +option | argument | description | default +-------------------------------|-------------------------------------| --------------------------------------------------- | ------- +bigquery_project_id | STRING | Google Cloud bigquery project id | +bigquery_dataset | STRING | Google Cloud bigquery dataset id | +bigquery_table | STRING | Google Cloud bigquery table id | + +_See also:_ [PubSub Producer Documentation](/producers#google-cloud-bigquery) + ## rabbitmq producer option | argument | description | default -------------------------------|-------------------------------------| --------------------------------------------------- | ------- diff --git a/docs/docs/producers.md b/docs/docs/producers.md index ea30ca63c..845464104 100644 --- a/docs/docs/producers.md +++ b/docs/docs/producers.md @@ -260,6 +260,34 @@ for DDL updates by setting the `ddl_pubsub_topic` property. The producer uses the [Google Cloud Java Library for Pub/Sub](https://github.com/GoogleCloudPlatform/google-cloud-java/tree/master/google-cloud-pubsub) and uses its built-in configurations. +# Google Cloud BigQuery +*** +To stream data into Google Cloud Bigquery, first there must be a table created on bigquery in order to stream the data +into defined as `bigquery_project_id.bigquery_dataset.bigquery_table`. The schema of the table must match the outputConfig. The column types should be defined as below + +- database: string +- table: string +- type: string +- ts: integer +- xid: integer +- xoffset: integer +- commit: boolean +- position: string +- gtid: string +- server_id: integer +- primary_key: string +- data: string +- old: string + +See the Google Cloud Platform docs for the [latest examples of which permissions are needed](https://cloud.google.com/bigquery/docs/access-control), as well as [how to properly configure service accounts](https://cloud.google.com/compute/docs/access/create-enable-service-accounts-for-instances). + +Set the output stream in `config.properties` by setting the `bigquery_project_id`, `bigquery_dataset` and `bigquery_table` properties. + +The producer uses the [Google Cloud Java Bigquery Storage Library for Bigquery](https://github.com/googleapis/java-bigquerystorage) [Bigquery Storage Write API documenatation](https://cloud.google.com/bigquery/docs/write-api). +To use the Storage Write API, you must have `bigquery.tables.updateData` permissions. + +This producer is using the Default Stream with at-least once semantics for greater data resiliency and fewer scaling restrictions + # RabbitMQ *** To produce messages to RabbitMQ, you will need to specify a host in `config.properties` with `rabbitmq_host`. This is the only required property, everything else falls back to a sane default. diff --git a/docs/docs/quickstart.md b/docs/docs/quickstart.md index 7ddb86a63..e9ae3ac6e 100644 --- a/docs/docs/quickstart.md +++ b/docs/docs/quickstart.md @@ -100,6 +100,15 @@ bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \ --pubsub_topic='maxwell' ``` +## Google Cloud Bigquery + +``` +bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \ + --producer=bigquery --bigquery_project_id='$BIGQUERY_PROJECT_ID' \ + --bigquery_dataset='$BIGQUERY_DATASET' \ + --bigquery_table='$BIGQUERY_TABLE' +``` + ## RabbitMQ ``` diff --git a/pom.xml b/pom.xml index c945dfa16..0fe870832 100644 --- a/pom.xml +++ b/pom.xml @@ -123,8 +123,17 @@ - + + com.google.cloud + google-cloud-bigquerystorage + 2.14.2 + + + com.google.cloud + google-cloud-bigquery + 2.13.3 + com.mchange c3p0 @@ -278,7 +287,7 @@ com.google.protobuf protobuf-java - 3.16.1 + 3.20.0 io.dropwizard.metrics @@ -556,4 +565,4 @@ - + \ No newline at end of file diff --git a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java index b58b15247..189ce6151 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java @@ -203,6 +203,21 @@ public class MaxwellConfig extends AbstractConfig { */ public String ddlPubsubTopic; + /** + * {@link com.zendesk.maxwell.producer.MaxwellBigQueryProducer} project id + */ + public String bigQueryProjectId; + + /** + * {@link com.zendesk.maxwell.producer.MaxwellBigQueryProducer} dataset + */ + public String bigQueryDataset; + + /** + * {@link com.zendesk.maxwell.producer.MaxwellBigQueryProducer} table + */ + public String bigQueryTable; + /** * {@link com.zendesk.maxwell.producer.MaxwellPubsubProducer} bytes request threshold */ @@ -789,6 +804,14 @@ protected MaxwellOptionParser buildOptionParser() { parser.accepts( "nats_url", "Url(s) of Nats connection (comma separated). Default is localhost:4222" ).withRequiredArg(); parser.accepts( "nats_subject", "Subject Hierarchies of Nats. Default is '%{database}.%{table}'" ).withRequiredArg(); + parser.section( "bigquery" ); + parser.accepts( "bigquery_project_id", "provide a google cloud platform project id associated with the bigquery table" ) + .withRequiredArg(); + parser.accepts( "bigquery_dataset", "provide a google cloud platform dataset id associated with the bigquery table" ) + .withRequiredArg(); + parser.accepts( "bigquery_table", "provide a google cloud platform table id associated with the bigquery table" ) + .withRequiredArg(); + parser.section( "pubsub" ); parser.accepts( "pubsub_project_id", "provide a google cloud platform project id associated with the pubsub topic" ) .withRequiredArg(); @@ -994,6 +1017,10 @@ private void setup(OptionSet options, Properties properties) { this.kafkaPartitionHash = fetchStringOption("kafka_partition_hash", options, properties, "default"); this.ddlKafkaTopic = fetchStringOption("ddl_kafka_topic", options, properties, this.kafkaTopic); + this.bigQueryProjectId = fetchStringOption("bigquery_project_id", options, properties, null); + this.bigQueryDataset = fetchStringOption("bigquery_dataset", options, properties, null); + this.bigQueryTable = fetchStringOption("bigquery_table", options, properties, null); + this.pubsubProjectId = fetchStringOption("pubsub_project_id", options, properties, null); this.pubsubTopic = fetchStringOption("pubsub_topic", options, properties, "maxwell"); this.ddlPubsubTopic = fetchStringOption("ddl_pubsub_topic", options, properties, this.pubsubTopic); diff --git a/src/main/java/com/zendesk/maxwell/MaxwellContext.java b/src/main/java/com/zendesk/maxwell/MaxwellContext.java index 484886a7d..2cbbaf217 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellContext.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellContext.java @@ -550,6 +550,9 @@ public AbstractProducer getProducer() throws IOException { case "redis": this.producer = new MaxwellRedisProducer(this); break; + case "bigquery": + this.producer = new MaxwellBigQueryProducer(this, this.config.bigQueryProjectId, this.config.bigQueryDataset, this.config.bigQueryTable); + break; case "none": this.producer = new NoneProducer(this); break; diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java new file mode 100644 index 000000000..dbd1f33f4 --- /dev/null +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java @@ -0,0 +1,238 @@ +package com.zendesk.maxwell.producer; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import com.zendesk.maxwell.MaxwellContext; +import com.zendesk.maxwell.monitoring.Metrics; +import com.zendesk.maxwell.replication.Position; +import com.zendesk.maxwell.row.RowMap; +import com.zendesk.maxwell.schema.BqToBqStorageSchemaConverter; +import com.zendesk.maxwell.util.StoppableTask; +import com.zendesk.maxwell.util.StoppableTaskState; + +import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeoutException; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; + +import javax.annotation.concurrent.GuardedBy; +import org.json.JSONArray; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class BigQueryCallback implements ApiFutureCallback { + public final Logger LOGGER = LoggerFactory.getLogger(BigQueryCallback.class); + + private final MaxwellBigQueryProducerWorker parent; + private final AbstractAsyncProducer.CallbackCompleter cc; + private final Position position; + private MaxwellContext context; + AppendContext appendContext; + + private Counter succeededMessageCount; + private Counter failedMessageCount; + private Meter succeededMessageMeter; + private Meter failedMessageMeter; + + + public BigQueryCallback(MaxwellBigQueryProducerWorker parent, + AppendContext appendContext, + AbstractAsyncProducer.CallbackCompleter cc, + Position position, + Counter producedMessageCount, Counter failedMessageCount, + Meter succeededMessageMeter, Meter failedMessageMeter, + MaxwellContext context) { + this.parent = parent; + this.appendContext = appendContext; + this.cc = cc; + this.position = position; + this.succeededMessageCount = producedMessageCount; + this.failedMessageCount = failedMessageCount; + this.succeededMessageMeter = succeededMessageMeter; + this.failedMessageMeter = failedMessageMeter; + this.context = context; + } + + @Override + public void onSuccess(AppendRowsResponse response) { + this.succeededMessageCount.inc(); + this.succeededMessageMeter.mark(); + + if (LOGGER.isDebugEnabled()) { + try { + LOGGER.debug("-> {}\n" + + " {}\n", + this.appendContext.r.toJSON(), this.position); + } catch (Exception e) { + e.printStackTrace(); + } + } + cc.markCompleted(); + } + + @Override + public void onFailure(Throwable t) { + this.failedMessageCount.inc(); + this.failedMessageMeter.mark(); + + LOGGER.error(t.getClass().getSimpleName() + " @ " + position); + LOGGER.error(t.getLocalizedMessage()); + + LOGGER.error("bq insertion error ->" + appendContext.data.toString()); + + if (!this.context.getConfig().ignoreProducerError) { + if(t.getLocalizedMessage().contains("MessageSize is too large")){ + LOGGER.warn("skipping row exceeding 10 MB" + appendContext.data.toString()); + cc.markCompleted(); + return; + } + this.context.terminate(new RuntimeException(t)); + return; + } + + cc.markCompleted(); + } +} + +public class MaxwellBigQueryProducer extends AbstractProducer { + + private final ArrayBlockingQueue queue; + private final MaxwellBigQueryProducerWorker worker; + + public MaxwellBigQueryProducer(MaxwellContext context, String bigQueryProjectId, + String bigQueryDataset, String bigQueryTable) + throws IOException { + super(context); + this.queue = new ArrayBlockingQueue<>(100); + this.worker = new MaxwellBigQueryProducerWorker(context, this.queue, bigQueryProjectId, bigQueryDataset, + bigQueryTable); + + TableName table = TableName.of(bigQueryProjectId, bigQueryDataset, bigQueryTable); + try { + this.worker.initialize(table); + } catch (DescriptorValidationException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + Thread thread = new Thread(this.worker, "maxwell-bigquery-worker"); + thread.setDaemon(true); + thread.start(); + } + + @Override + public void push(RowMap r) throws Exception { + this.queue.put(r); + } +} + +class AppendContext { + JSONArray data; + int retryCount = 0; + RowMap r = null; + + AppendContext(JSONArray data, int retryCount, RowMap r) { + this.data = data; + this.retryCount = retryCount; + this.r = r; + } +} + +class MaxwellBigQueryProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask { + static final Logger LOGGER = LoggerFactory.getLogger(MaxwellBigQueryProducerWorker.class); + + private final ArrayBlockingQueue queue; + private StoppableTaskState taskState; + private Thread thread; + private JsonStreamWriter streamWriter; + + public MaxwellBigQueryProducerWorker(MaxwellContext context, + ArrayBlockingQueue queue, String bigQueryProjectId, + String bigQueryDataset, String bigQueryTable) throws IOException { + super(context); + this.queue = queue; + Metrics metrics = context.getMetrics(); + this.taskState = new StoppableTaskState("MaxwellBigQueryProducerWorker"); + } + + private void covertJSONObjectFieldsToString(JSONObject record) { + if (this.context.getConfig().outputConfig.includesPrimaryKeys) { + record.put("primary_key", record.get("primary_key").toString()); + } + String data = record.has("data") == true ? record.get("data").toString() : null; + record.put("data", data); + String old = record.has("old") == true ? record.get("old").toString() : null; + record.put("old", old); + } + + public void initialize(TableName tName) + throws DescriptorValidationException, IOException, InterruptedException { + + BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(tName.getProject()).build().getService(); + Table table = bigquery.getTable(tName.getDataset(), tName.getTable()); + Schema schema = table.getDefinition().getSchema(); + TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema); + streamWriter = JsonStreamWriter.newBuilder(tName.toString(), tableSchema).build(); + } + + @Override + public void requestStop() throws Exception { + taskState.requestStop(); + streamWriter.close(); + } + + @Override + public void awaitStop(Long timeout) throws TimeoutException { + taskState.awaitStop(thread, timeout); + } + + @Override + public void run() { + this.thread = Thread.currentThread(); + while (true) { + try { + RowMap row = queue.take(); + if (!taskState.isRunning()) { + taskState.stopped(); + return; + } + this.push(row); + } catch (Exception e) { + taskState.stopped(); + context.terminate(e); + return; + } + } + } + + @Override + public void sendAsync(RowMap r, CallbackCompleter cc) throws Exception { + JSONArray jsonArr = new JSONArray(); + JSONObject record = new JSONObject(r.toJSON(outputConfig)); + //convert json and array fields to String + covertJSONObjectFieldsToString(record); + jsonArr.put(record); + AppendContext appendContext = new AppendContext(jsonArr, 0, r); + + ApiFuture future = streamWriter.append(appendContext.data); + ApiFutures.addCallback( + future, new BigQueryCallback(this, appendContext, cc, r.getNextPosition(), + this.succeededMessageCount, this.failedMessageCount, this.succeededMessageMeter, this.failedMessageMeter, + this.context), + MoreExecutors.directExecutor()); + } +} diff --git a/src/main/java/com/zendesk/maxwell/schema/BqToBqStorageSchemaConverter.java b/src/main/java/com/zendesk/maxwell/schema/BqToBqStorageSchemaConverter.java new file mode 100644 index 000000000..cfda9609e --- /dev/null +++ b/src/main/java/com/zendesk/maxwell/schema/BqToBqStorageSchemaConverter.java @@ -0,0 +1,88 @@ +package com.zendesk.maxwell.schema; + +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.common.collect.ImmutableMap; + +/** Converts structure from BigQuery client to BigQueryStorage client */ +public class BqToBqStorageSchemaConverter { + private static ImmutableMap BQTableSchemaModeMap = + ImmutableMap.of( + Field.Mode.NULLABLE, TableFieldSchema.Mode.NULLABLE, + Field.Mode.REPEATED, TableFieldSchema.Mode.REPEATED, + Field.Mode.REQUIRED, TableFieldSchema.Mode.REQUIRED); + + private static ImmutableMap BQTableSchemaTypeMap = + new ImmutableMap.Builder() + .put(StandardSQLTypeName.BOOL, TableFieldSchema.Type.BOOL) + .put(StandardSQLTypeName.BYTES, TableFieldSchema.Type.BYTES) + .put(StandardSQLTypeName.DATE, TableFieldSchema.Type.DATE) + .put(StandardSQLTypeName.DATETIME, TableFieldSchema.Type.DATETIME) + .put(StandardSQLTypeName.FLOAT64, TableFieldSchema.Type.DOUBLE) + .put(StandardSQLTypeName.GEOGRAPHY, TableFieldSchema.Type.GEOGRAPHY) + .put(StandardSQLTypeName.INT64, TableFieldSchema.Type.INT64) + .put(StandardSQLTypeName.NUMERIC, TableFieldSchema.Type.NUMERIC) + .put(StandardSQLTypeName.STRING, TableFieldSchema.Type.STRING) + .put(StandardSQLTypeName.STRUCT, TableFieldSchema.Type.STRUCT) + .put(StandardSQLTypeName.TIME, TableFieldSchema.Type.TIME) + .put(StandardSQLTypeName.TIMESTAMP, TableFieldSchema.Type.TIMESTAMP) + .build(); + + /** + * Converts from BigQuery client Table Schema to bigquery storage API Table Schema. + * + * @param schema the BigQuery client Table Schema + * @return the bigquery storage API Table Schema + */ + public static TableSchema convertTableSchema(Schema schema) { + TableSchema.Builder result = TableSchema.newBuilder(); + for (int i = 0; i < schema.getFields().size(); i++) { + result.addFields(i, convertFieldSchema(schema.getFields().get(i))); + } + return result.build(); + } + + /** + * Converts from bigquery v2 Field Schema to bigquery storage API Field Schema. + * + * @param field the BigQuery client Field Schema + * @return the bigquery storage API Field Schema + */ + public static TableFieldSchema convertFieldSchema(Field field) { + TableFieldSchema.Builder result = TableFieldSchema.newBuilder(); + if (field.getMode() == null) { + field = field.toBuilder().setMode(Field.Mode.NULLABLE).build(); + } + result.setMode(BQTableSchemaModeMap.get(field.getMode())); + result.setName(field.getName()); + result.setType(BQTableSchemaTypeMap.get(field.getType().getStandardType())); + if (field.getDescription() != null) { + result.setDescription(field.getDescription()); + } + if (field.getSubFields() != null) { + for (int i = 0; i < field.getSubFields().size(); i++) { + result.addFields(i, convertFieldSchema(field.getSubFields().get(i))); + } + } + return result.build(); + } +} diff --git a/src/test/java/com/zendesk/maxwell/producer/BigQueryCallbackTest.java b/src/test/java/com/zendesk/maxwell/producer/BigQueryCallbackTest.java new file mode 100644 index 000000000..ca8aa4475 --- /dev/null +++ b/src/test/java/com/zendesk/maxwell/producer/BigQueryCallbackTest.java @@ -0,0 +1,79 @@ +package com.zendesk.maxwell.producer; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; +import com.zendesk.maxwell.MaxwellConfig; +import com.zendesk.maxwell.MaxwellContext; +import com.zendesk.maxwell.replication.BinlogPosition; +import com.zendesk.maxwell.replication.Position; +import com.zendesk.maxwell.row.RowIdentity; +import com.zendesk.maxwell.row.RowMap; + +import io.grpc.Status; +import io.grpc.StatusRuntimeException; + +import org.apache.commons.lang3.tuple.Pair; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.junit.Test; +import com.zendesk.maxwell.monitoring.NoOpMetrics; + +import static org.mockito.Mockito.*; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.concurrent.ArrayBlockingQueue; + +public class BigQueryCallbackTest { + + @Test + public void shouldIgnoreProducerErrorByDefault() throws JSONException, Exception { + MaxwellContext context = mock(MaxwellContext.class); + MaxwellConfig config = new MaxwellConfig(); + when(context.getConfig()).thenReturn(config); + when(context.getMetrics()).thenReturn(new NoOpMetrics()); + MaxwellOutputConfig outputConfig = new MaxwellOutputConfig(); + outputConfig.includesServerId = true; + RowMap r = new RowMap("insert", "MyDatabase", "MyTable", 1234567890L, new ArrayList(), null); + JSONArray jsonArr = new JSONArray(); + JSONObject record = new JSONObject(r.toJSON(outputConfig)); + jsonArr.put(record); + AbstractAsyncProducer.CallbackCompleter cc = mock(AbstractAsyncProducer.CallbackCompleter.class); + AppendContext appendContext = new AppendContext(jsonArr, 0, r); + ArrayBlockingQueue queue = new ArrayBlockingQueue(100); + MaxwellBigQueryProducerWorker producerWorker = new MaxwellBigQueryProducerWorker(context, queue,"myproject", "mydataset", "mytable"); + BigQueryCallback callback = new BigQueryCallback(producerWorker, appendContext, cc, + new Position(new BinlogPosition(1, "binlog-1"), 0L), + new Counter(), new Counter(), new Meter(), new Meter(), context); + Throwable t = new Throwable("error"); + callback.onFailure(t); + verify(cc).markCompleted(); + } + + @Test + public void shouldTerminateWhenNotIgnoreProducerError() throws JSONException, Exception { + MaxwellContext context = mock(MaxwellContext.class); + MaxwellConfig config = new MaxwellConfig(); + config.ignoreProducerError = false; + when(context.getConfig()).thenReturn(config); + when(context.getMetrics()).thenReturn(new NoOpMetrics()); + MaxwellOutputConfig outputConfig = new MaxwellOutputConfig(); + outputConfig.includesServerId = true; + RowMap r = new RowMap("insert", "MyDatabase", "MyTable", 1234567890L, new ArrayList(), null); + JSONArray jsonArr = new JSONArray(); + JSONObject record = new JSONObject(r.toJSON(outputConfig)); + jsonArr.put(record); + AbstractAsyncProducer.CallbackCompleter cc = mock(AbstractAsyncProducer.CallbackCompleter.class); + AppendContext appendContext = new AppendContext(jsonArr, 0, r); + ArrayBlockingQueue queue = new ArrayBlockingQueue(100); + MaxwellBigQueryProducerWorker producerWorker = new MaxwellBigQueryProducerWorker(context, queue,"myproject", "mydataset", "mytable"); + BigQueryCallback callback = new BigQueryCallback(producerWorker, appendContext, cc, + new Position(new BinlogPosition(1, "binlog-1"), 0L), + new Counter(), new Counter(), new Meter(), new Meter(), context); + Throwable t = new StatusRuntimeException(Status.DEADLINE_EXCEEDED); + callback.onFailure(t); + verify(cc).markCompleted(); + } +} \ No newline at end of file