Skip to content

Commit

Permalink
Merge pull request #1 from recurly/DBA-362_vitessMVP
Browse files Browse the repository at this point in the history
DBA-361: Vitess MVP
  • Loading branch information
rpedela-recurly authored Aug 5, 2021
2 parents 222712d + 35b0561 commit c04025f
Show file tree
Hide file tree
Showing 7 changed files with 656 additions and 21 deletions.
23 changes: 21 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,17 @@
</dependencies>
</profile>
</profiles>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>20.8.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.mchange</groupId>
Expand Down Expand Up @@ -220,6 +230,11 @@
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.25.3</version>
</dependency>
<dependency>
<groupId>io.vitess</groupId>
<artifactId>vitess-grpc-client</artifactId>
<version>10.0.0</version>
</dependency>
<dependency>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
Expand Down Expand Up @@ -263,7 +278,6 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.108.6</version>
</dependency>
<dependency>
<groupId>io.nats</groupId>
Expand Down Expand Up @@ -373,6 +387,11 @@
<artifactId>opencensus-exporter-stats-stackdriver</artifactId>
<version>${opencensus.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
</dependencies>

<build>
Expand Down
52 changes: 34 additions & 18 deletions src/main/java/com/zendesk/maxwell/Maxwell.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.zendesk.maxwell.replication.BinlogConnectorReplicator;
import com.zendesk.maxwell.replication.Position;
import com.zendesk.maxwell.replication.Replicator;
import com.zendesk.maxwell.replication.VitessConnectorReplicator;
import com.zendesk.maxwell.row.HeartbeatRowMap;
import com.zendesk.maxwell.schema.*;
import com.zendesk.maxwell.schema.columndef.ColumnDefCastException;
Expand Down Expand Up @@ -194,6 +195,7 @@ public void start() throws Exception {
}
}


private void startInner() throws Exception {
try ( Connection connection = this.context.getReplicationConnection();
Connection rawConnection = this.context.getRawMaxwellConnection() ) {
Expand Down Expand Up @@ -227,24 +229,37 @@ private void startInner() throws Exception {

mysqlSchemaStore.getSchema(); // trigger schema to load / capture before we start the replicator.

this.replicator = new BinlogConnectorReplicator(
mysqlSchemaStore,
producer,
bootstrapController,
config.replicationMysql,
config.replicaServerID,
config.databaseName,
context.getMetrics(),
initPosition,
false,
config.clientID,
context.getHeartbeatNotifier(),
config.scripting,
context.getFilter(),
config.outputConfig,
config.bufferMemoryUsage
);

if (config.useVitess) {
this.replicator = new VitessConnectorReplicator(
mysqlSchemaStore,
config.replicationMysql,
producer,
config.databaseName,
context.getMetrics(),
initPosition,
config.scripting,
config.outputConfig,
config.bufferMemoryUsage
);
} else {
this.replicator = new BinlogConnectorReplicator(
mysqlSchemaStore,
producer,
bootstrapController,
config.replicationMysql,
config.replicaServerID,
config.databaseName,
context.getMetrics(),
initPosition,
false,
config.clientID,
context.getHeartbeatNotifier(),
config.scripting,
context.getFilter(),
config.outputConfig,
config.bufferMemoryUsage
);
}
context.setReplicator(replicator);
this.context.start();

Expand All @@ -266,6 +281,7 @@ public static void main(String[] args) {

if ( config.log_level != null ) {
Logging.setLevel(config.log_level);

}

final Maxwell maxwell = new Maxwell(config);
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/zendesk/maxwell/MaxwellConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class MaxwellConfig extends AbstractConfig {
public MaxwellMysqlConfig maxwellMysql;
public Filter filter;
public Boolean gtidMode;
public boolean useVitess;


public String databaseName;

Expand Down Expand Up @@ -170,6 +172,7 @@ public MaxwellConfig() { // argv is only null in tests
this.schemaMysql = new MaxwellMysqlConfig();
this.masterRecovery = false;
this.gtidMode = false;
this.useVitess = false;
this.bufferedProducerSize = 200;
this.outputConfig = new MaxwellOutputConfig();
setup(null, null); // setup defaults
Expand Down Expand Up @@ -555,6 +558,8 @@ private void setup(OptionSet options, Properties properties) {
this.replicationMysql = parseMysqlConfig("replication_", options, properties);
this.schemaMysql = parseMysqlConfig("schema_", options, properties);
this.gtidMode = fetchBooleanOption("gtid_mode", options, properties, System.getenv(GTID_MODE_ENV) != null);
this.useVitess = fetchBooleanOption("use_vitess", options, properties, false);


this.databaseName = fetchStringOption("schema_database", options, properties, "maxwell");
this.maxwellMysql.database = this.databaseName;
Expand Down
133 changes: 133 additions & 0 deletions src/main/java/com/zendesk/maxwell/replication/Vgtid.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package com.zendesk.maxwell.replication;

import binlogdata.Binlogdata;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;

/** Vitess source position coordiates. */
public class Vgtid {
public static final String CURRENT_GTID = "current";
private static final Gson gson = new Gson();

private final Binlogdata.VGtid rawVgtid;
private final Set<ShardGtid> shardGtids = new HashSet<>();

private Vgtid(Binlogdata.VGtid rawVgtid) {
this.rawVgtid = rawVgtid;
for (Binlogdata.ShardGtid shardGtid : rawVgtid.getShardGtidsList()) {
shardGtids.add(new ShardGtid(shardGtid.getKeyspace(), shardGtid.getShard(), shardGtid.getGtid()));
}
}

private Vgtid(List<ShardGtid> shardGtids) {
this.shardGtids.addAll(shardGtids);

Binlogdata.VGtid.Builder builder = Binlogdata.VGtid.newBuilder();
for (ShardGtid shardGtid : shardGtids) {
builder.addShardGtids(
Binlogdata.ShardGtid.newBuilder()
.setKeyspace(shardGtid.getKeyspace())
.setShard(shardGtid.getShard())
.setGtid(shardGtid.getGtid())
.build());
}
this.rawVgtid = builder.build();
}

public static Vgtid of(String shardGtidsInJson) {
List<Vgtid.ShardGtid> shardGtids = gson.fromJson(shardGtidsInJson, new TypeToken<List<ShardGtid>>() {
}.getType());
return of(shardGtids);
}

public static Vgtid of(Binlogdata.VGtid rawVgtid) {
return new Vgtid(rawVgtid);
}

public static Vgtid of(List<ShardGtid> shardGtids) {
return new Vgtid(shardGtids);
}

public Binlogdata.VGtid getRawVgtid() {
return rawVgtid;
}

public Set<ShardGtid> getShardGtids() {
return shardGtids;
}

public boolean isSingleShard() {
return rawVgtid.getShardGtidsCount() == 1;
}

@Override
public String toString() {
return gson.toJson(shardGtids);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Vgtid vgtid = (Vgtid) o;
return Objects.equals(rawVgtid, vgtid.rawVgtid) &&
Objects.equals(shardGtids, vgtid.shardGtids);
}

@Override
public int hashCode() {
return Objects.hash(rawVgtid, shardGtids);
}

public static class ShardGtid {
private final String keyspace;
private final String shard;
private final String gtid;

public ShardGtid(String keyspace, String shard, String gtid) {
this.keyspace = keyspace;
this.shard = shard;
this.gtid = gtid;
}

public String getKeyspace() {
return keyspace;
}

public String getShard() {
return shard;
}

public String getGtid() {
return gtid;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ShardGtid shardGtid = (ShardGtid) o;
return Objects.equals(keyspace, shardGtid.keyspace) &&
Objects.equals(shard, shardGtid.shard) &&
Objects.equals(gtid, shardGtid.gtid);
}

@Override
public int hashCode() {
return Objects.hash(keyspace, shard, gtid);
}
}
}
Loading

0 comments on commit c04025f

Please sign in to comment.