From 35b05617bb0c08ca3726e1bd3e36a31f5372b8dd Mon Sep 17 00:00:00 2001 From: Artem Vovk Date: Tue, 3 Aug 2021 10:44:56 -0600 Subject: [PATCH] DBA-361: Vitess MVP Add basic configuration, parsing, and processing of Vitess's VStream. This basically uses a bunch of debezium code, Vitess's generated code, and hardcore type conversion to wrangle VStream events into whatever maxwell might pretend to consume and produce. --- pom.xml | 23 +- .../java/com/zendesk/maxwell/Maxwell.java | 52 +- .../com/zendesk/maxwell/MaxwellConfig.java | 5 + .../zendesk/maxwell/replication/Vgtid.java | 133 ++++++ .../VitessConnectorReplicator.java | 451 ++++++++++++++++++ .../schema/columndef/DateTimeColumnDef.java | 2 + .../schema/columndef/IntColumnDef.java | 11 +- 7 files changed, 656 insertions(+), 21 deletions(-) create mode 100644 src/main/java/com/zendesk/maxwell/replication/Vgtid.java create mode 100644 src/main/java/com/zendesk/maxwell/replication/VitessConnectorReplicator.java diff --git a/pom.xml b/pom.xml index c7c0d57aa..bce71bb52 100644 --- a/pom.xml +++ b/pom.xml @@ -123,7 +123,17 @@ - + + + + com.google.cloud + libraries-bom + 20.8.0 + pom + import + + + com.mchange @@ -220,6 +230,11 @@ mysql-binlog-connector-java 0.25.3 + + io.vitess + vitess-grpc-client + 10.0.0 + net.sf.jopt-simple jopt-simple @@ -263,7 +278,6 @@ com.google.cloud google-cloud-pubsub - 1.108.6 io.nats @@ -373,6 +387,11 @@ opencensus-exporter-stats-stackdriver ${opencensus.version} + + com.google.code.gson + gson + 2.8.6 + diff --git a/src/main/java/com/zendesk/maxwell/Maxwell.java b/src/main/java/com/zendesk/maxwell/Maxwell.java index 9f9454775..6516d65fc 100644 --- a/src/main/java/com/zendesk/maxwell/Maxwell.java +++ b/src/main/java/com/zendesk/maxwell/Maxwell.java @@ -9,6 +9,7 @@ import com.zendesk.maxwell.replication.BinlogConnectorReplicator; import com.zendesk.maxwell.replication.Position; import com.zendesk.maxwell.replication.Replicator; +import com.zendesk.maxwell.replication.VitessConnectorReplicator; import com.zendesk.maxwell.row.HeartbeatRowMap; import com.zendesk.maxwell.schema.*; import com.zendesk.maxwell.schema.columndef.ColumnDefCastException; @@ -194,6 +195,7 @@ public void start() throws Exception { } } + private void startInner() throws Exception { try ( Connection connection = this.context.getReplicationConnection(); Connection rawConnection = this.context.getRawMaxwellConnection() ) { @@ -227,24 +229,37 @@ private void startInner() throws Exception { mysqlSchemaStore.getSchema(); // trigger schema to load / capture before we start the replicator. - this.replicator = new BinlogConnectorReplicator( - mysqlSchemaStore, - producer, - bootstrapController, - config.replicationMysql, - config.replicaServerID, - config.databaseName, - context.getMetrics(), - initPosition, - false, - config.clientID, - context.getHeartbeatNotifier(), - config.scripting, - context.getFilter(), - config.outputConfig, - config.bufferMemoryUsage - ); - + if (config.useVitess) { + this.replicator = new VitessConnectorReplicator( + mysqlSchemaStore, + config.replicationMysql, + producer, + config.databaseName, + context.getMetrics(), + initPosition, + config.scripting, + config.outputConfig, + config.bufferMemoryUsage + ); + } else { + this.replicator = new BinlogConnectorReplicator( + mysqlSchemaStore, + producer, + bootstrapController, + config.replicationMysql, + config.replicaServerID, + config.databaseName, + context.getMetrics(), + initPosition, + false, + config.clientID, + context.getHeartbeatNotifier(), + config.scripting, + context.getFilter(), + config.outputConfig, + config.bufferMemoryUsage + ); + } context.setReplicator(replicator); this.context.start(); @@ -266,6 +281,7 @@ public static void main(String[] args) { if ( config.log_level != null ) { Logging.setLevel(config.log_level); + } final Maxwell maxwell = new Maxwell(config); diff --git a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java index 8dd2abc58..f4cee3e1d 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java @@ -35,6 +35,8 @@ public class MaxwellConfig extends AbstractConfig { public MaxwellMysqlConfig maxwellMysql; public Filter filter; public Boolean gtidMode; + public boolean useVitess; + public String databaseName; @@ -170,6 +172,7 @@ public MaxwellConfig() { // argv is only null in tests this.schemaMysql = new MaxwellMysqlConfig(); this.masterRecovery = false; this.gtidMode = false; + this.useVitess = false; this.bufferedProducerSize = 200; this.outputConfig = new MaxwellOutputConfig(); setup(null, null); // setup defaults @@ -555,6 +558,8 @@ private void setup(OptionSet options, Properties properties) { this.replicationMysql = parseMysqlConfig("replication_", options, properties); this.schemaMysql = parseMysqlConfig("schema_", options, properties); this.gtidMode = fetchBooleanOption("gtid_mode", options, properties, System.getenv(GTID_MODE_ENV) != null); + this.useVitess = fetchBooleanOption("use_vitess", options, properties, false); + this.databaseName = fetchStringOption("schema_database", options, properties, "maxwell"); this.maxwellMysql.database = this.databaseName; diff --git a/src/main/java/com/zendesk/maxwell/replication/Vgtid.java b/src/main/java/com/zendesk/maxwell/replication/Vgtid.java new file mode 100644 index 000000000..1bd43bf78 --- /dev/null +++ b/src/main/java/com/zendesk/maxwell/replication/Vgtid.java @@ -0,0 +1,133 @@ +package com.zendesk.maxwell.replication; + +import binlogdata.Binlogdata; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** Vitess source position coordiates. */ +public class Vgtid { + public static final String CURRENT_GTID = "current"; + private static final Gson gson = new Gson(); + + private final Binlogdata.VGtid rawVgtid; + private final Set shardGtids = new HashSet<>(); + + private Vgtid(Binlogdata.VGtid rawVgtid) { + this.rawVgtid = rawVgtid; + for (Binlogdata.ShardGtid shardGtid : rawVgtid.getShardGtidsList()) { + shardGtids.add(new ShardGtid(shardGtid.getKeyspace(), shardGtid.getShard(), shardGtid.getGtid())); + } + } + + private Vgtid(List shardGtids) { + this.shardGtids.addAll(shardGtids); + + Binlogdata.VGtid.Builder builder = Binlogdata.VGtid.newBuilder(); + for (ShardGtid shardGtid : shardGtids) { + builder.addShardGtids( + Binlogdata.ShardGtid.newBuilder() + .setKeyspace(shardGtid.getKeyspace()) + .setShard(shardGtid.getShard()) + .setGtid(shardGtid.getGtid()) + .build()); + } + this.rawVgtid = builder.build(); + } + + public static Vgtid of(String shardGtidsInJson) { + List shardGtids = gson.fromJson(shardGtidsInJson, new TypeToken>() { + }.getType()); + return of(shardGtids); + } + + public static Vgtid of(Binlogdata.VGtid rawVgtid) { + return new Vgtid(rawVgtid); + } + + public static Vgtid of(List shardGtids) { + return new Vgtid(shardGtids); + } + + public Binlogdata.VGtid getRawVgtid() { + return rawVgtid; + } + + public Set getShardGtids() { + return shardGtids; + } + + public boolean isSingleShard() { + return rawVgtid.getShardGtidsCount() == 1; + } + + @Override + public String toString() { + return gson.toJson(shardGtids); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Vgtid vgtid = (Vgtid) o; + return Objects.equals(rawVgtid, vgtid.rawVgtid) && + Objects.equals(shardGtids, vgtid.shardGtids); + } + + @Override + public int hashCode() { + return Objects.hash(rawVgtid, shardGtids); + } + + public static class ShardGtid { + private final String keyspace; + private final String shard; + private final String gtid; + + public ShardGtid(String keyspace, String shard, String gtid) { + this.keyspace = keyspace; + this.shard = shard; + this.gtid = gtid; + } + + public String getKeyspace() { + return keyspace; + } + + public String getShard() { + return shard; + } + + public String getGtid() { + return gtid; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ShardGtid shardGtid = (ShardGtid) o; + return Objects.equals(keyspace, shardGtid.keyspace) && + Objects.equals(shard, shardGtid.shard) && + Objects.equals(gtid, shardGtid.gtid); + } + + @Override + public int hashCode() { + return Objects.hash(keyspace, shard, gtid); + } + } +} diff --git a/src/main/java/com/zendesk/maxwell/replication/VitessConnectorReplicator.java b/src/main/java/com/zendesk/maxwell/replication/VitessConnectorReplicator.java new file mode 100644 index 000000000..91438d72c --- /dev/null +++ b/src/main/java/com/zendesk/maxwell/replication/VitessConnectorReplicator.java @@ -0,0 +1,451 @@ +package com.zendesk.maxwell.replication; + +import binlogdata.Binlogdata; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.zendesk.maxwell.MaxwellMysqlConfig; +import com.zendesk.maxwell.monitoring.Metrics; +import com.zendesk.maxwell.producer.AbstractProducer; +import com.zendesk.maxwell.producer.MaxwellOutputConfig; +import com.zendesk.maxwell.row.RowMap; +import com.zendesk.maxwell.row.RowMapBuffer; +import com.zendesk.maxwell.schema.Schema; +import com.zendesk.maxwell.schema.SchemaStore; +import com.zendesk.maxwell.schema.SchemaStoreException; +import com.zendesk.maxwell.schema.columndef.ColumnDef; +import com.zendesk.maxwell.schema.columndef.ColumnDefCastException; +import com.zendesk.maxwell.scripting.Scripting; +import com.zendesk.maxwell.util.RunLoopProcess; +import io.grpc.ManagedChannel; +import io.grpc.Status; +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import io.grpc.stub.StreamObserver; +import io.vitess.proto.Query; +import io.vitess.proto.Topodata; +import io.vitess.proto.Vtgate; +import io.vitess.proto.grpc.VitessGrpc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class VitessConnectorReplicator extends RunLoopProcess implements Replicator { + static final Logger LOGGER = LoggerFactory.getLogger(VitessConnectorReplicator.class); + private static final long MAX_TX_ELEMENTS = 10000; + + private final float bufferMemoryUsage; + private final AbstractProducer producer; + private final MaxwellOutputConfig outputConfig; + + private final SchemaStore schemaStore; + private final Scripting scripting; + private RowMapBuffer rowBuffer; + + + private Histogram transactionRowCount; + private Histogram transactionExecutionTime; + private Long stopAtHeartbeat; + private final String maxwellSchemaDatabaseName; + + private final Counter rowCounter; + private final Meter rowMeter; + + private Position lastHeartbeatPosition; + private String keyspace; + protected VitessGrpc.VitessStub stub; + + protected StreamObserver observer; + protected Map> tableColumnsMap = new HashMap<>(); + + + public VitessConnectorReplicator( + SchemaStore schemaStore, + MaxwellMysqlConfig mysqlConfig, + AbstractProducer producer, + String maxwellSchemaDatabaseName, + Metrics metrics, + Position start, + Scripting scripting, + MaxwellOutputConfig outputConfig, + float bufferMemoryUsage + ) { + this.bufferMemoryUsage = bufferMemoryUsage; + this.schemaStore = schemaStore; + this.keyspace = mysqlConfig.database; + this.scripting = scripting; + this.outputConfig = outputConfig; + this.maxwellSchemaDatabaseName = maxwellSchemaDatabaseName; + this.lastHeartbeatPosition = start; + this.producer = producer; + + /* setup metrics */ + rowCounter = metrics.getRegistry().counter( + metrics.metricName("row", "count") + ); + + rowMeter = metrics.getRegistry().meter( + metrics.metricName("row", "meter") + ); + + transactionRowCount = metrics.getRegistry().histogram(metrics.metricName("transaction", "row_count")); + transactionExecutionTime = metrics.getRegistry().histogram(metrics.metricName("transaction", "execution_time")); + + AtomicReference channel = new AtomicReference<>(); + ManagedChannel managedChannel = NettyChannelBuilder + .forAddress(mysqlConfig.host, mysqlConfig.port) + .usePlaintext() + .build(); + channel.compareAndSet(null, managedChannel); + this.stub = VitessGrpc.newStub(channel.get()); + } + + private boolean replicatorStarted = false; + @Override + public void startReplicator() throws Exception { + + this.observer = new StreamObserver<>() { + + @Override + public void onNext(Vtgate.VStreamResponse response) { + rowBuffer = new RowMapBuffer(MAX_TX_ELEMENTS, bufferMemoryUsage); + + LOGGER.info("Received {} vEvents in the VStreamResponse:", + response.getEventsCount()); + + Vgtid vgtid = getVgtid(response); + + RowMap rowMap; + Position position = null; + String changeType = ""; + List pkFields = new ArrayList(); + List columnDefs = new ArrayList<>(); + HashMap newData = new HashMap<>(); + HashMap oldData = new HashMap<>(); + String[] schemaTableTuple = new String[2]; + long beginTs = Instant.now().toEpochMilli(); + for (int i = 0; i < response.getEventsCount(); i++) { + + Binlogdata.VEvent vEvent = response.getEvents(i); + LOGGER.info("vEvent: {}", vEvent); + + Binlogdata.VEventType eventType = vEvent.getType(); + + if (eventType == Binlogdata.VEventType.BEGIN) { + beginTs = vEvent.getTimestamp(); + continue; + } else if (eventType == Binlogdata.VEventType.COMMIT) { + rowBuffer.getLast().setTXCommit(); + long timeSpent = vEvent.getTimestamp() - beginTs; + transactionExecutionTime.update(timeSpent); + transactionRowCount.update(rowBuffer.size()); + continue; + } else if (eventType == Binlogdata.VEventType.FIELD) { + // TODO: This needs to be processed via MysqlSchemaCompactor somehow + schemaTableTuple = vEvent.getFieldEvent().getTableName().split("\\."); + columnDefs = handleFieldEvent(vEvent); + tableColumnsMap.put(schemaTableTuple[1], columnDefs); + continue; + } else if (eventType == Binlogdata.VEventType.VGTID) { + position = handleVgtidEvent(rowBuffer, vEvent); + lastHeartbeatPosition = position; + continue; + } else if (eventType == Binlogdata.VEventType.ROW) { + schemaTableTuple = vEvent.getRowEvent().getTableName().split("\\."); + + for (Binlogdata.RowChange rowChange : vEvent.getRowEvent().getRowChangesList() + ) { + Query.Row newRow = rowChange.getAfter(); + Query.Row oldRow = rowChange.getBefore(); + if (oldRow != Query.Row.getDefaultInstance() && newRow != Query.Row.getDefaultInstance()) { + changeType = "update"; + } else if (oldRow != Query.Row.getDefaultInstance() && newRow == Query.Row.getDefaultInstance()) { + changeType = "delete"; + } else if (oldRow == Query.Row.getDefaultInstance() && newRow != Query.Row.getDefaultInstance()) { + changeType = "insert"; + } else { + LOGGER.error("Unexpected RowChange: " + rowChange); + } + + try { + extractData(tableColumnsMap.get(schemaTableTuple[1]), newData, newRow); + extractData(tableColumnsMap.get(schemaTableTuple[1]), oldData, oldRow); + } catch (ColumnDefCastException e) { + LOGGER.error("Unable to add column data: ", e); + } + + LOGGER.info("OldRow : {}", oldRow); + LOGGER.info("NewRow : {}", newRow); + LOGGER.info("OldData : {}", oldData); + LOGGER.info("NewData : {}", newData); + + } + } + // TODO: once schema tracking is working, we'll + // have a better way to handle PKs + columnDefs = tableColumnsMap.get(schemaTableTuple[1]); + if (pkFields.size() < 1 && columnDefs != null && columnDefs.size() > 0) { + pkFields.add(columnDefs.get(0).getName()); + } + rowMap = new RowMap( + changeType, + schemaTableTuple[0], + schemaTableTuple[1], + vEvent.getTimestamp(), + pkFields, + position + ); + for (Map.Entry entry : newData.entrySet()) { + rowMap.putData(entry.getKey(), entry.getValue()); + } + for (Map.Entry entry : oldData.entrySet()) { + rowMap.putOldData(entry.getKey(), entry.getValue()); + } + try { + rowBuffer.add(rowMap); + } catch (IOException e) { + LOGGER.error("Unable to add row to buffer: " + e); + } + try { + LOGGER.info("RowMap buffered: {}", rowMap.toJSON()); + } catch (Exception e) { + LOGGER.error("Non-serializable RowMap: " + e); + } + } + try { + work(); + } catch (Exception e) { + LOGGER.error("Unable to process row: " + e); + } + } + + @Override + public void onError(Throwable t) { + LOGGER.info("VStream streaming onError. Status: " + Status.fromThrowable(t), t); + + } + + @Override + public void onCompleted() { + LOGGER.info("VStream streaming completed."); + } + + }; + String shardGtid = Vgtid.CURRENT_GTID; + if (this.lastHeartbeatPosition != null) { + shardGtid = this.lastHeartbeatPosition.getBinlogPosition().getGtidSetStr(); + } + + Vgtid vgtid = Vgtid.of( + Binlogdata.VGtid.newBuilder() + .addShardGtids( + Binlogdata.ShardGtid.newBuilder() + .setGtid(shardGtid) + .setKeyspace(this.keyspace).build()) + .build()); + stub.vStream( + Vtgate.VStreamRequest.newBuilder() + .setVgtid(vgtid.getRawVgtid()) + .setTabletType(Topodata.TabletType.REPLICA) + .build(), this.observer); + replicatorStarted = true; + } + + @Override + public RowMap getRow() throws Exception { + + if ( !replicatorStarted ) { + LOGGER.warn("replicator was not started, calling startReplicator()..."); + startReplicator(); + } + + while (true) { + if (rowBuffer != null && !rowBuffer.isEmpty()) { + RowMap row = rowBuffer.removeFirst(); + return row; + } + } + } + + private List handleFieldEvent(Binlogdata.VEvent vEvent) { + List columnDefs = new ArrayList<>(); + short pos = 0; + for (Query.Field field : vEvent.getFieldEvent().getFieldsList()) { + try { + String columnType = field.getType().toString().toLowerCase(Locale.ROOT); + String[] enumValues = new String[0]; + if (columnType.contains("int")) { + columnType = "int"; + } else if (columnType.contains("float")) { + columnType = "float"; + } else if (columnType.contains("enum")) { + String ct = field.getColumnType(); + String[] split = ct.split("\\'"); + enumValues = Stream.of(split) + .filter(Predicate.not(value -> value.startsWith("enum("))) + .filter(Predicate.not(value -> value.equals(","))) + .filter(Predicate.not(value -> value.equals(")"))) + .collect(Collectors.toSet()).toArray(new String[0]); + } + ColumnDef columnDef = ColumnDef.build( + field.getName(), + "utf8mb4", + columnType, + pos, + false, + enumValues, + Long.valueOf(field.getColumnLength())); + pos++; + columnDefs.add(columnDef); + } catch (IllegalArgumentException iae) { + LOGGER.error("Column Definition build failed: " + iae); + throw(iae); + } + } + LOGGER.info("Column Definitions: {}", columnDefs); + return columnDefs; + } + + private Position handleVgtidEvent(RowMapBuffer buffer, Binlogdata.VEvent vEvent) { + BinlogPosition binlogPosition = null; + if (!buffer.isEmpty()) { + buffer.getLast().setTXCommit(); + } + for (Binlogdata.ShardGtid sgtid : vEvent.getVgtid().getShardGtidsList()) { + binlogPosition = new BinlogPosition( + sgtid.getGtid(), + null, + vEvent.getTimestamp(), + sgtid.getShard() + ); + } + return new Position(binlogPosition, vEvent.getTimestamp()); + } + + private void extractData(List columnDefs, HashMap data, Query.Row row) throws ColumnDefCastException { + int numberOfColumns = columnDefs.size(); + int rawValueIndex = 0; + if ( row != Query.Row.getDefaultInstance() ) { + + for (short j = 0; j < numberOfColumns; j++) { + ColumnDef column = columnDefs.get(j); + final String columnName = column.getName(); + String rawValues = row.getValues().toStringUtf8(); + + final int rawValueLength = (int) row.getLengths(j); + final String rawValue = rawValueLength == -1 + ? null + : new String(Arrays.copyOfRange(rawValues.getBytes(StandardCharsets.UTF_8), rawValueIndex, rawValueIndex + rawValueLength)); + if (rawValueLength != -1) { + // no update to rawValueIndex when no value in the rawValue + rawValueIndex += rawValueLength; + } + if (rawValue == null) { + data.put(columnName, rawValue); + } else { + data.put(columnName, column.asJSON(rawValue, outputConfig)); + } + } + } + } + + @Override + public Long getLastHeartbeatRead() { + return lastHeartbeatPosition.getLastHeartbeatRead(); + } + + @Override + public Schema getSchema() throws SchemaStoreException { + return this.schemaStore.getSchema(); + } + + @Override + public Long getSchemaId() throws SchemaStoreException { + return this.schemaStore.getSchemaID(); + } + + @Override + protected void work() throws Exception { + if (! replicatorStarted ) { + startReplicator(); + } + + RowMap row = null; + try { + row = getRow(); + } catch ( InterruptedException e ) { + LOGGER.error("Unable to getRow: " + e); + } + + if ( row == null ) + return; + + rowCounter.inc(); + rowMeter.mark(); + + if ( scripting != null && !isMaxwellRow(row)) + scripting.invoke(row); + + processRow(row); + } + + public void stopAtHeartbeat(long heartbeat) { + stopAtHeartbeat = heartbeat; + } + + protected void processRow(RowMap row) throws Exception { + producer.push(row); + } + + // We assume there is at most one vgtid event for response. + // Even in case of resharding, there is only one vgtid event that contains multiple shard + // gtids. + private Vgtid getVgtid(Vtgate.VStreamResponse response) { + LinkedList vgtids = new LinkedList<>(); + for (Binlogdata.VEvent vEvent : response.getEventsList()) { + if (vEvent.getType() == Binlogdata.VEventType.VGTID) { + vgtids.addLast(Vgtid.of(vEvent.getVgtid())); + } + } + if (vgtids.size() == 0) { + // The VStreamResponse that contains an VERSION vEvent does not have VGTID. + // We do not update lastReceivedVgtid in this case. + // It can also be null if the 1st grpc response does not have vgtid upon restart + LOGGER.info("No vgtid found in response {}...", response.toString().substring(0, Math.min(100, response.toString().length()))); + LOGGER.info("Full response is {}", response); + return Vgtid.of(Binlogdata.VGtid.newBuilder() + .addShardGtids( + Binlogdata.ShardGtid.newBuilder() + .setKeyspace(this.keyspace) + .setGtid(Vgtid.CURRENT_GTID) + .build()).build()); + } + if (vgtids.size() > 1) { + LOGGER.error( + "Should only have 1 vgtid per VStreamResponse, but found {}. Use the last vgtid {}.", + vgtids.size(), vgtids.getLast()); + } + return vgtids.getLast(); + } + + private int getNumOfRowEvents(Vtgate.VStreamResponse response) { + int num = 0; + for (Binlogdata.VEvent vEvent : response.getEventsList()) { + if (vEvent.getType() == Binlogdata.VEventType.ROW) { + num++; + } + } + return num; + } + + protected boolean isMaxwellRow(RowMap row) { + return row.getDatabase().equals(this.maxwellSchemaDatabaseName); + } +} diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/DateTimeColumnDef.java b/src/main/java/com/zendesk/maxwell/schema/columndef/DateTimeColumnDef.java index 9b48c99e0..566890564 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/DateTimeColumnDef.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/DateTimeColumnDef.java @@ -27,6 +27,8 @@ protected String formatValue(Object value, MaxwellOutputConfig config) throws Co else return appendFractionalSeconds("0000-00-00 00:00:00", 0, getColumnLength()); } + } if ( value instanceof String ) { + return (String) value; } try { diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/IntColumnDef.java b/src/main/java/com/zendesk/maxwell/schema/columndef/IntColumnDef.java index c0f30aa90..3ff5bc49e 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/IntColumnDef.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/IntColumnDef.java @@ -58,11 +58,20 @@ private Long toLong(Object value) throws ColumnDefCastException { long res = castUnsigned(i, 1L << this.bits); return Long.valueOf(res); + } else if (value instanceof String) { + try { + if (signed) + return Long.parseLong((String) value); + + return Long.parseUnsignedLong((String) value); + } catch (NumberFormatException e) { + throw new ColumnDefCastException(this, value); + } } else { throw new ColumnDefCastException(this, value); } - } + @Override public String toSQL(Object value) throws ColumnDefCastException { return toLong(value).toString();