Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions connector/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ sourceSets {
dependencies {
implementation project(':commons')
implementation("com.github.ben-manes.caffeine:caffeine:${caffeineVersion}")
implementation("org.rocksdb:rocksdbjni:${rocksdbVersion}")
implementation("io.vavr:vavr:${vavrVersion}")
implementation "com.datastax.oss:java-driver-core:${ossDriverVersion}"
implementation "com.datastax.oss:java-driver-query-builder:${ossDriverVersion}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public class CassandraSourceConnectorConfig {
public static final String CACHE_MAX_DIGESTS_CONFIG = "cache.max.digest";
public static final String CACHE_MAX_CAPACITY_CONFIG = "cache.max.capacity";
public static final String CACHE_EXPIRE_AFTER_MS_CONFIG = "cache.expire.after.ms";
public static final String CACHE_PERSISTENT_DIRECTORY_CONFIG = "cache.persistent.directory";


public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter";
public static final String VALUE_CONVERTER_CLASS_CONFIG = "value.converter";
Expand Down Expand Up @@ -195,6 +197,13 @@ public class CassandraSourceConnectorConfig {
ConfigDef.Importance.HIGH,
"The maximum number of digest per mutation cache entry, with a default set to 3",
"CQL Read cache", 1, ConfigDef.Width.NONE, "CacheMaxDigest")
.define(CACHE_PERSISTENT_DIRECTORY_CONFIG,
ConfigDef.Type.STRING,
null,
ConfigDef.Importance.HIGH,
"The directory where the persistent mutation cache will be stored. Formated as `{cache.persistent.directory}/{source-name}-{instance-id}` " +
"If set, the connector will use RocksDB to store digests, otherwise it will use an in-memory cache.",
"CQL Read cache", 1, ConfigDef.Width.NONE, "CachePersistentDirectory")
.define(CACHE_MAX_CAPACITY_CONFIG,
ConfigDef.Type.LONG,
"32767",
Expand Down Expand Up @@ -661,6 +670,10 @@ public long getCacheMaxDigests() {
return globalConfig.getLong(CACHE_MAX_DIGESTS_CONFIG);
}

public String getCachePersistentDirectory() {
return globalConfig.getString(CACHE_PERSISTENT_DIRECTORY_CONFIG);
}

public long getCacheMaxCapacity() {
return globalConfig.getLong(CACHE_MAX_CAPACITY_CONFIG);
}
Expand Down Expand Up @@ -782,6 +795,7 @@ public String toString() {
+ " " + QUERY_BACKOFF_IN_MS_CONFIG + ": %d%n"
+ " " + QUERY_MAX_BACKOFF_IN_SEC_CONFIG + ": %d%n"
+ " " + CACHE_MAX_DIGESTS_CONFIG + ": %d%n"
+ " " + CACHE_PERSISTENT_DIRECTORY_CONFIG + ": %s%n"
+ " " + CACHE_MAX_CAPACITY_CONFIG + ": %d%n"
+ " " + CACHE_EXPIRE_AFTER_MS_CONFIG + ": %d%n"
+ " " + CACHE_ONLY_IF_COORDINATOR_MATCH + ": %s%n"
Expand All @@ -805,6 +819,7 @@ public String toString() {
getQueryBackoffInMs(),
getQueryMaxBackoffInSec(),
getCacheMaxDigests(),
getCachePersistentDirectory(),
getCacheMaxCapacity(),
getCacheExpireAfterMs(),
getCacheOnlyIfCoordinatorMatch(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.cdc;
package com.datastax.oss.cdc.cache;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
Expand All @@ -24,19 +24,15 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* Keep MD5 digests to deduplicate Cassandra mutations
*/
public class MutationCache<K> {

public class InMemoryCache<K> implements MutationCache<K> {
Cache<K, List<String>> mutationCache;

/**
* Max number of cached digest per cached entry.
* Max number of cached digests per cached entry.
*/
long maxDigests;

public MutationCache(long maxDigests, long maxCapacity, Duration expireAfter) {
public InMemoryCache(long maxDigests, long maxCapacity, Duration expireAfter) {
this.maxDigests = maxDigests;
mutationCache = Caffeine.newBuilder()
.expireAfterWrite(expireAfter.getSeconds(), TimeUnit.SECONDS)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright DataStax, Inc 2021.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.cdc.cache;

import com.github.benmanes.caffeine.cache.stats.CacheStats;

import java.util.List;

public interface MutationCache<K> {
/**
* Add a mutation MD5 digest to the cache for the given mutation key.
* @param mutationKey the key for the mutation, typically a partition key or a unique identifier
* @param md5Digest the MD5 digest of the mutation to be added
* @return a list of MD5 digests for the given mutation key, which may include the newly added digest
*/
List<String> addMutationMd5(K mutationKey, String md5Digest);

/**
* Retrieve the list of MD5 digests for the given mutation key.
* @param mutationKey the key for the mutation
* @return a list of MD5 digests associated with the mutation key, or an empty list if none exist
*/
List<String> getMutationCRCs(K mutationKey);

/**
* Check if a mutation with the given key and MD5 digest has already been processed.
* @param mutationKey the key for the mutation
* @param md5Digest the MD5 digest of the mutation
* @return true if the mutation has been processed, false otherwise
*/
boolean isMutationProcessed(K mutationKey, String md5Digest);

/**
* Gives the current statistics of the cache, such as hit rate, miss rate, and size.
* <a href="https://github.com/ben-manes/caffeine/wiki/Statistics">Caffeine Statistics wiki</a>
*/
CacheStats stats();

/**
* Gives the estimated size of the cache.
*/
long estimatedSize();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/**
* Copyright DataStax, Inc 2021.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.cdc.cache;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import org.rocksdb.Options;
import org.rocksdb.RocksDBException;
import org.rocksdb.TtlDB;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

public class PersistentCache<K> implements MutationCache<K> {
/**
* The mutation cache
*/
private final Cache<K, List<String>> mutationCache;

private final TtlDB rocksDB;
/**
* Max number of cached digests per cached entry.
*/
long maxDigests;

private final Function<K, byte[]> keySerializer;

static {
TtlDB.loadLibrary();
}

public PersistentCache(long maxDigests, long maxCapacity, Duration expireAfter, String dbPath, Function<K, byte[]> keySerializer) throws RocksDBException {
this.maxDigests = maxDigests;
this.keySerializer = keySerializer;

Options options = new Options().setCreateIfMissing(true);
this.rocksDB = TtlDB.open(options, dbPath, (int) expireAfter.getSeconds(), false);

this.mutationCache = Caffeine.newBuilder()
.expireAfterWrite(expireAfter.getSeconds(), TimeUnit.SECONDS)
.maximumSize(maxCapacity)
.recordStats()
.removalListener((K key, List<String> value, RemovalCause cause) -> {
try {
// If the removal cause is not SIZE, we delete the key from RocksDB
// This is to avoid deleting the key when it is removed due to the size limit
if (cause != RemovalCause.SIZE ) {
rocksDB.delete(this.keySerializer.apply(key));
}
} catch (RocksDBException e) {
throw new RuntimeException(e);
}
})
.build();
}


private List<String> valueDeserializer(byte[] data){
return data == null || data.length == 0 ? null : List.of(new String(data).split(","));
}

private byte[] valueSerializer(List<String> data){
return data.stream()
.reduce((s1, s2) -> s1 + "," + s2)
.orElse("")
.getBytes();
}

public List<String> getMutationCRCs(K mutationKey) {
return mutationCache.get(mutationKey, k -> {
try {
return valueDeserializer(rocksDB.get(this.keySerializer.apply(k)));
} catch (RocksDBException e) {
throw new RuntimeException(e);
}
});
}

public void putMutationCRCs(K key, List<String> value) {
mutationCache.asMap().compute(key, (k, v) -> {
try {
rocksDB.put(keySerializer.apply(k), valueSerializer(value));
} catch (RocksDBException e) {
throw new RuntimeException(e);
}
return value;
});
}

public List<String> addMutationMd5(K mutationKey, String md5Digest) {
List<String> crcs = getMutationCRCs(mutationKey);
if(crcs == null) {
crcs = new ArrayList<>(1);
crcs.add(md5Digest);
} else {
if (!crcs.contains(md5Digest)) {
if (crcs.size() >= maxDigests) {
// remove the oldest digest
crcs.remove(0);
}
crcs.add(md5Digest);
}
}
putMutationCRCs(mutationKey, crcs);
return crcs;
}

public boolean isMutationProcessed(K mutationKey, String md5Digest) {
List<String> digests = getMutationCRCs(mutationKey);
return digests != null && digests.contains(md5Digest);
}

public CacheStats stats() {
return mutationCache.stats();
}

public long estimatedSize() {
return mutationCache.estimatedSize();
}

public void close() {
rocksDB.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import com.datastax.oss.cdc.ConfigUtil;
import com.datastax.oss.cdc.Constants;
import com.datastax.oss.cdc.CqlLogicalTypes;
import com.datastax.oss.cdc.MutationCache;
import com.datastax.oss.cdc.MutationValue;
import com.datastax.oss.cdc.Version;
import com.datastax.oss.cdc.cache.MutationCache;
import com.datastax.oss.cdc.cache.InMemoryCache;
import com.datastax.oss.cdc.cache.PersistentCache;
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
Expand Down Expand Up @@ -66,6 +68,7 @@
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.rocksdb.RocksDBException;

import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -312,10 +315,8 @@ public void open(Map<String, Object> config, SourceContext sourceContext) {
consumerBuilder.keySharedPolicy(KeySharedPolicy.autoSplitHashRange());
}
this.consumer = consumerBuilder.subscribe();
this.mutationCache = new MutationCache<>(
this.config.getCacheMaxDigests(),
this.config.getCacheMaxCapacity(),
Duration.ofMillis(this.config.getCacheExpireAfterMs()));
this.mutationCache = this.getMutationCache();

log.info("Starting source connector topic={} subscription={} query.executors={}",
dirtyTopicName,
this.config.getEventsSubscriptionName(),
Expand All @@ -326,6 +327,30 @@ public void open(Map<String, Object> config, SourceContext sourceContext) {
}
}

/**
* Get the mutation cache implementation.
* @return the mutation cache implementation
* @throws RocksDBException if the mutation cache cannot be created due to RocksDB issues
*/
private MutationCache<String> getMutationCache() throws RocksDBException {
if(this.config.getCachePersistentDirectory() != null){
return new PersistentCache<>(
this.config.getCacheMaxDigests(),
this.config.getCacheMaxCapacity(),
Duration.ofMillis(this.config.getCacheExpireAfterMs()),
this.config.getCachePersistentDirectory() + "/" +
this.sourceContext.getSourceName() + "-" + this.sourceContext.getInstanceId(),
String::getBytes
);
}
else {
return new InMemoryCache<>(
this.config.getCacheMaxDigests(),
this.config.getCacheMaxCapacity(),
Duration.ofMillis(this.config.getCacheExpireAfterMs()));
}
}

void maybeInitCassandraClient() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException, InstantiationException {
if (this.cassandraClient == null) {
synchronized (this) {
Expand Down
Loading