From 14e284a34859c3cd268ac4b94bb5088a6d732183 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 26 Jun 2025 11:40:47 +0530 Subject: [PATCH 01/12] feat: Introduce `MutationCache` interface, add `PersistentCache` implementation, and refactor `InMemoryCache` to implement interface. --- .../InMemoryCache.java} | 27 +--- .../datastax/oss/cdc/cache/MutationCache.java | 41 ++++++ .../oss/cdc/cache/PersistentCache.java | 124 ++++++++++++++++++ 3 files changed, 169 insertions(+), 23 deletions(-) rename connector/src/main/java/com/datastax/oss/cdc/{MutationCache.java => cache/InMemoryCache.java} (67%) create mode 100644 connector/src/main/java/com/datastax/oss/cdc/cache/MutationCache.java create mode 100644 connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java diff --git a/connector/src/main/java/com/datastax/oss/cdc/MutationCache.java b/connector/src/main/java/com/datastax/oss/cdc/cache/InMemoryCache.java similarity index 67% rename from connector/src/main/java/com/datastax/oss/cdc/MutationCache.java rename to connector/src/main/java/com/datastax/oss/cdc/cache/InMemoryCache.java index ff5d4a8c..92fca06b 100644 --- a/connector/src/main/java/com/datastax/oss/cdc/MutationCache.java +++ b/connector/src/main/java/com/datastax/oss/cdc/cache/InMemoryCache.java @@ -1,19 +1,4 @@ -/** - * Copyright DataStax, Inc 2021. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.datastax.oss.cdc; +package com.datastax.oss.cdc.cache; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; @@ -24,19 +9,15 @@ import java.util.List; import java.util.concurrent.TimeUnit; -/** - * Keep MD5 digests to deduplicate Cassandra mutations - */ -public class MutationCache { - +public class InMemoryCache implements MutationCache { Cache> mutationCache; /** - * Max number of cached digest per cached entry. + * Max number of cached digests per cached entry. */ long maxDigests; - public MutationCache(long maxDigests, long maxCapacity, Duration expireAfter) { + public InMemoryCache(long maxDigests, long maxCapacity, Duration expireAfter) { this.maxDigests = maxDigests; mutationCache = Caffeine.newBuilder() .expireAfterWrite(expireAfter.getSeconds(), TimeUnit.SECONDS) diff --git a/connector/src/main/java/com/datastax/oss/cdc/cache/MutationCache.java b/connector/src/main/java/com/datastax/oss/cdc/cache/MutationCache.java new file mode 100644 index 00000000..63522717 --- /dev/null +++ b/connector/src/main/java/com/datastax/oss/cdc/cache/MutationCache.java @@ -0,0 +1,41 @@ +package com.datastax.oss.cdc.cache; + +import com.github.benmanes.caffeine.cache.stats.CacheStats; + +import java.util.List; + +public interface MutationCache { + /** + * Add a mutation MD5 digest to the cache for the given mutation key. + * @param mutationKey the key for the mutation, typically a partition key or a unique identifier + * @param md5Digest the MD5 digest of the mutation to be added + * @return a list of MD5 digests for the given mutation key, which may include the newly added digest + */ + List addMutationMd5(K mutationKey, String md5Digest); + + /** + * Retrieve the list of MD5 digests for the given mutation key. + * @param mutationKey the key for the mutation + * @return a list of MD5 digests associated with the mutation key, or an empty list if none exist + */ + List getMutationCRCs(K mutationKey); + + /** + * Check if a mutation with the given key and MD5 digest has already been processed. + * @param mutationKey the key for the mutation + * @param md5Digest the MD5 digest of the mutation + * @return true if the mutation has been processed, false otherwise + */ + boolean isMutationProcessed(K mutationKey, String md5Digest); + + /** + * Gives the current statistics of the cache, such as hit rate, miss rate, and size. + * Caffeine Statistics wiki + */ + CacheStats stats(); + + /** + * Gives the estimated size of the cache. + */ + long estimatedSize(); +} diff --git a/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java b/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java new file mode 100644 index 00000000..8041988e --- /dev/null +++ b/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java @@ -0,0 +1,124 @@ +package com.datastax.oss.cdc.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.stats.CacheStats; +import org.rocksdb.Options; +import org.rocksdb.RocksDBException; +import org.rocksdb.TtlDB; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +public class PersistentCache implements MutationCache { + /** + * The mutation cache + */ + Cache> mutationCache; + + private final TtlDB rocksDB; + /** + * Max number of cached digests per cached entry. + */ + long maxDigests; + + private final Function keySerializer; + + static { + TtlDB.loadLibrary(); + } + + public PersistentCache(long maxDigests, long maxCapacity, Duration expireAfter, String dbPath, Function keySerializer) throws RocksDBException { + this.maxDigests = maxDigests; + this.keySerializer = keySerializer; + + Options options = new Options().setCreateIfMissing(true); + this.rocksDB = TtlDB.open(options, dbPath, (int) expireAfter.getSeconds(), false); + + this.mutationCache = Caffeine.newBuilder() + .expireAfterWrite(expireAfter.getSeconds(), TimeUnit.SECONDS) + .maximumSize(maxCapacity) + .executor(Runnable::run) + .recordStats() + .removalListener((K key, List value, RemovalCause cause) -> { + try { + rocksDB.delete(this.keySerializer.apply(key)); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + }) + .build(); + } + + + private List valueDeserializer(byte[] data){ + return data == null || data.length == 0 ? null : List.of(new String(data).split(",")); + } + + private byte[] valueSerializer(List data){ + return data.stream() + .reduce((s1, s2) -> s1 + "," + s2) + .orElse("") + .getBytes(); + } + + public List getMutationCRCs(K mutationKey) { + return mutationCache.get(mutationKey, k -> { + try { + return valueDeserializer(rocksDB.get(this.keySerializer.apply(k))); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + }); + } + + public void putMutationCRCs(K key, List value) { + mutationCache.asMap().compute(key, (k, v) -> { + try { + rocksDB.put(keySerializer.apply(k), valueSerializer(value)); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + return value; + }); + } + + public List addMutationMd5(K mutationKey, String md5Digest) { + List crcs = getMutationCRCs(mutationKey); + if(crcs == null) { + crcs = new ArrayList<>(1); + crcs.add(md5Digest); + } else { + if (!crcs.contains(md5Digest)) { + if (crcs.size() >= maxDigests) { + // remove the oldest digest + crcs.remove(0); + } + crcs.add(md5Digest); + } + } + putMutationCRCs(mutationKey, crcs); + return crcs; + } + + public boolean isMutationProcessed(K mutationKey, String md5Digest) { + List digests = getMutationCRCs(mutationKey); + return digests != null && digests.contains(md5Digest); + } + + public CacheStats stats() { + return mutationCache.stats(); + } + + public long estimatedSize() { + return mutationCache.estimatedSize(); + } + + public void close() { + rocksDB.close(); + } +} From 3fb06b5a4ee18f36ab0f9778c2f8b90e7be95656 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 26 Jun 2025 11:41:05 +0530 Subject: [PATCH 02/12] feat: Add RocksDB dependency to project configurations --- connector/build.gradle | 1 + gradle.properties | 1 + 2 files changed, 2 insertions(+) diff --git a/connector/build.gradle b/connector/build.gradle index 3b57fe8a..85e60689 100644 --- a/connector/build.gradle +++ b/connector/build.gradle @@ -32,6 +32,7 @@ sourceSets { dependencies { implementation project(':commons') implementation("com.github.ben-manes.caffeine:caffeine:${caffeineVersion}") + implementation("org.rocksdb:rocksdbjni:${rocksdbVersion}") implementation("io.vavr:vavr:${vavrVersion}") implementation "com.datastax.oss:java-driver-core:${ossDriverVersion}" implementation "com.datastax.oss:java-driver-query-builder:${ossDriverVersion}" diff --git a/gradle.properties b/gradle.properties index 55098650..b9cae411 100644 --- a/gradle.properties +++ b/gradle.properties @@ -24,6 +24,7 @@ kafkaVersion=3.4.0 vavrVersion=0.10.3 testContainersVersion=1.19.1 caffeineVersion=2.8.8 +rocksdbVersion=9.4.0 guavaVersion=30.1-jre messagingConnectorsCommonsVersion=1.0.14 slf4jVersion=1.7.30 From 6a61963db98c97df31bb4f793d96e9ec945567a7 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 26 Jun 2025 11:41:25 +0530 Subject: [PATCH 03/12] feat: Add persistent cache directory configuration to Cassandra source connector --- .../oss/cdc/CassandraSourceConnectorConfig.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/connector/src/main/java/com/datastax/oss/cdc/CassandraSourceConnectorConfig.java b/connector/src/main/java/com/datastax/oss/cdc/CassandraSourceConnectorConfig.java index f04d5ffe..6dad70e5 100644 --- a/connector/src/main/java/com/datastax/oss/cdc/CassandraSourceConnectorConfig.java +++ b/connector/src/main/java/com/datastax/oss/cdc/CassandraSourceConnectorConfig.java @@ -66,6 +66,8 @@ public class CassandraSourceConnectorConfig { public static final String CACHE_MAX_DIGESTS_CONFIG = "cache.max.digest"; public static final String CACHE_MAX_CAPACITY_CONFIG = "cache.max.capacity"; public static final String CACHE_EXPIRE_AFTER_MS_CONFIG = "cache.expire.after.ms"; + public static final String CACHE_PERSISTENT_DIRECTORY_CONFIG = "cache.persistent.directory"; + public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter"; public static final String VALUE_CONVERTER_CLASS_CONFIG = "value.converter"; @@ -195,6 +197,13 @@ public class CassandraSourceConnectorConfig { ConfigDef.Importance.HIGH, "The maximum number of digest per mutation cache entry, with a default set to 3", "CQL Read cache", 1, ConfigDef.Width.NONE, "CacheMaxDigest") + .define(CACHE_PERSISTENT_DIRECTORY_CONFIG, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.HIGH, + "The directory where the persistent mutation cache will be stored. Formated as `{cache.persistent.directory}/{source-name}-{instance-id}` " + + "If set, the connector will use RocksDB to store digests, otherwise it will use an in-memory cache.", + "CQL Read cache", 1, ConfigDef.Width.NONE, "CachePersistentDirectory") .define(CACHE_MAX_CAPACITY_CONFIG, ConfigDef.Type.LONG, "32767", @@ -661,6 +670,10 @@ public long getCacheMaxDigests() { return globalConfig.getLong(CACHE_MAX_DIGESTS_CONFIG); } + public String getCachePersistentDirectory() { + return globalConfig.getString(CACHE_PERSISTENT_DIRECTORY_CONFIG); + } + public long getCacheMaxCapacity() { return globalConfig.getLong(CACHE_MAX_CAPACITY_CONFIG); } @@ -782,6 +795,7 @@ public String toString() { + " " + QUERY_BACKOFF_IN_MS_CONFIG + ": %d%n" + " " + QUERY_MAX_BACKOFF_IN_SEC_CONFIG + ": %d%n" + " " + CACHE_MAX_DIGESTS_CONFIG + ": %d%n" + + " " + CACHE_PERSISTENT_DIRECTORY_CONFIG + ": %s%n" + " " + CACHE_MAX_CAPACITY_CONFIG + ": %d%n" + " " + CACHE_EXPIRE_AFTER_MS_CONFIG + ": %d%n" + " " + CACHE_ONLY_IF_COORDINATOR_MATCH + ": %s%n" @@ -805,6 +819,7 @@ public String toString() { getQueryBackoffInMs(), getQueryMaxBackoffInSec(), getCacheMaxDigests(), + getCachePersistentDirectory(), getCacheMaxCapacity(), getCacheExpireAfterMs(), getCacheOnlyIfCoordinatorMatch(), From 75d91f7be46cefcd9ad7840762541b72b5c54b0c Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 26 Jun 2025 11:41:45 +0530 Subject: [PATCH 04/12] feat: Refactor mutation cache initialization to support persistent caching --- .../oss/pulsar/source/CassandraSource.java | 35 ++++++++++++++++--- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java b/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java index 21716a59..d32c49cd 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java @@ -20,9 +20,11 @@ import com.datastax.oss.cdc.ConfigUtil; import com.datastax.oss.cdc.Constants; import com.datastax.oss.cdc.CqlLogicalTypes; -import com.datastax.oss.cdc.MutationCache; import com.datastax.oss.cdc.MutationValue; import com.datastax.oss.cdc.Version; +import com.datastax.oss.cdc.cache.MutationCache; +import com.datastax.oss.cdc.cache.InMemoryCache; +import com.datastax.oss.cdc.cache.PersistentCache; import com.datastax.oss.driver.api.core.ConsistencyLevel; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.Row; @@ -66,6 +68,7 @@ import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.annotations.Connector; import org.apache.pulsar.io.core.annotations.IOType; +import org.rocksdb.RocksDBException; import java.lang.reflect.InvocationTargetException; import java.nio.charset.StandardCharsets; @@ -312,10 +315,8 @@ public void open(Map config, SourceContext sourceContext) { consumerBuilder.keySharedPolicy(KeySharedPolicy.autoSplitHashRange()); } this.consumer = consumerBuilder.subscribe(); - this.mutationCache = new MutationCache<>( - this.config.getCacheMaxDigests(), - this.config.getCacheMaxCapacity(), - Duration.ofMillis(this.config.getCacheExpireAfterMs())); + this.mutationCache = this.getMutationCache(); + log.info("Starting source connector topic={} subscription={} query.executors={}", dirtyTopicName, this.config.getEventsSubscriptionName(), @@ -326,6 +327,30 @@ public void open(Map config, SourceContext sourceContext) { } } + /** + * Get the mutation cache implementation. + * @return the mutation cache implementation + * @throws RocksDBException if the mutation cache cannot be created due to RocksDB issues + */ + private MutationCache getMutationCache() throws RocksDBException { + if(this.config.getCachePersistentDirectory() != null){ + return new PersistentCache<>( + this.config.getCacheMaxDigests(), + this.config.getCacheMaxCapacity(), + Duration.ofMillis(this.config.getCacheExpireAfterMs()), + this.config.getCachePersistentDirectory() + "/" + + this.sourceContext.getSourceName() + "-" + this.sourceContext.getInstanceId(), + String::getBytes + ); + } + else { + return new InMemoryCache<>( + this.config.getCacheMaxDigests(), + this.config.getCacheMaxCapacity(), + Duration.ofMillis(this.config.getCacheExpireAfterMs())); + } + } + void maybeInitCassandraClient() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException, InstantiationException { if (this.cassandraClient == null) { synchronized (this) { From b1a617f39395e1f171f8e4385bc57c7d49304ec4 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 26 Jun 2025 11:42:19 +0530 Subject: [PATCH 05/12] feat: Update MutationCacheTests to use InMemoryCache implementation --- .../datastax/oss/cdc/MutationCacheTests.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java b/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java index e473875f..a0868067 100644 --- a/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java +++ b/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java @@ -15,17 +15,22 @@ */ package com.datastax.oss.cdc; +import com.datastax.oss.cdc.cache.InMemoryCache; +import com.datastax.oss.cdc.cache.MutationCache; +import com.github.benmanes.caffeine.cache.Caffeine; import org.junit.jupiter.api.Test; +import java.lang.reflect.Field; import java.time.Duration; +import java.util.concurrent.TimeUnit; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; public class MutationCacheTests { @Test public final void testMaxDigests() throws Exception { - MutationCache mutationCache = new MutationCache(3, 10, Duration.ofHours(1)); + MutationCache mutationCache = new InMemoryCache<>(3, 10, Duration.ofHours(1)); mutationCache.addMutationMd5("mutation1","digest1"); mutationCache.addMutationMd5("mutation1","digest2"); mutationCache.addMutationMd5("mutation1","digest3"); @@ -35,20 +40,20 @@ public final void testMaxDigests() throws Exception { @Test public final void testIsProcessed() throws Exception { - MutationCache mutationCache = new MutationCache(3, 10, Duration.ofHours(1)); - assertEquals(false,mutationCache.isMutationProcessed("mutation1","digest1")); + MutationCache mutationCache = new InMemoryCache<>(3, 10, Duration.ofHours(1)); + assertFalse(mutationCache.isMutationProcessed("mutation1", "digest1")); mutationCache.addMutationMd5("mutation1","digest1"); - assertEquals(true, mutationCache.isMutationProcessed("mutation1","digest1")); + assertTrue(mutationCache.isMutationProcessed("mutation1", "digest1")); } @Test public final void testExpireAfter() throws Exception { - MutationCache mutationCache = new MutationCache(3, 10, Duration.ofSeconds(1)); - assertEquals(false, mutationCache.isMutationProcessed("mutation1","digest1")); + MutationCache mutationCache = new InMemoryCache<>(3, 10, Duration.ofSeconds(1)); + assertFalse(mutationCache.isMutationProcessed("mutation1", "digest1")); mutationCache.addMutationMd5("mutation1","digest1"); - assertEquals(true, mutationCache.isMutationProcessed("mutation1","digest1")); + assertTrue(mutationCache.isMutationProcessed("mutation1", "digest1")); Thread.sleep(2000); - assertEquals(false, mutationCache.isMutationProcessed("mutation1","digest1")); + assertFalse(mutationCache.isMutationProcessed("mutation1", "digest1")); } } From dc72d73e35000608db5f49ea3f3dc6464e49bb18 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 26 Jun 2025 11:43:10 +0530 Subject: [PATCH 06/12] feat: Add test for max capacity in InMemoryCache implementation --- .../datastax/oss/cdc/MutationCacheTests.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java b/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java index a0868067..085fbd28 100644 --- a/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java +++ b/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java @@ -56,4 +56,32 @@ public final void testExpireAfter() throws Exception { assertFalse(mutationCache.isMutationProcessed("mutation1", "digest1")); } + @Test + public final void testMaxCapacity() throws Exception { + MutationCache mutationCache = new InMemoryCache<>(3, 10, Duration.ofHours(1)); + + // Access and modify the private field using reflection + Field field = InMemoryCache.class.getDeclaredField("mutationCache"); + field.setAccessible(true); + field.set(mutationCache, Caffeine.newBuilder() + .expireAfterWrite(Duration.ofHours(1).getSeconds(), TimeUnit.SECONDS) + .maximumSize(10) + .recordStats() + .executor(Runnable::run) // https://github.com/ben-manes/caffeine/wiki/Testing + .build() + ); + + for (int i = 0; i <20; i++) { + mutationCache.addMutationMd5("mutation" + i, "digest" + i); + } + + int count = 0; + for (int i = 0; i < 20; i++) { + if(mutationCache.getMutationCRCs("mutation" + i) != null) { + count++; + } + } + assertEquals(10, count); + } + } From 1d1a1f7403509a92a15ddffc7707e1f8c8774f22 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 26 Jun 2025 11:43:31 +0530 Subject: [PATCH 07/12] docs: persistent cache directory setting in Cassandra source connector --- .../core/modules/ROOT/partials/cfgCassandraSource.adoc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/docs-src/core/modules/ROOT/partials/cfgCassandraSource.adoc b/docs/docs-src/core/modules/ROOT/partials/cfgCassandraSource.adoc index 2a29bee3..b9645390 100644 --- a/docs/docs-src/core/modules/ROOT/partials/cfgCassandraSource.adoc +++ b/docs/docs-src/core/modules/ROOT/partials/cfgCassandraSource.adoc @@ -187,4 +187,10 @@ | | true +| *cache.persistent.directory* +| The directory where the persistent mutation cache will be stored. Formated as `{cache.persistent.directory}/{source-name}-{instance-id}` This setting is only used if the cache is persistent. If set, the connector will use RocksDB to store digests, otherwise it will use an in-memory cache. +| string +| +| /data/source-cache + |=== From 618c993ae63bc7b1e1410d54410cbfd0b66b0109 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 26 Jun 2025 12:38:23 +0530 Subject: [PATCH 08/12] feat: Update the mutation cache to conditionally delete keys from RocksDB --- .../com/datastax/oss/cdc/cache/PersistentCache.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java b/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java index 8041988e..42ec0b34 100644 --- a/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java +++ b/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java @@ -4,6 +4,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.stats.CacheStats; +import com.google.common.annotations.VisibleForTesting; import org.rocksdb.Options; import org.rocksdb.RocksDBException; import org.rocksdb.TtlDB; @@ -18,7 +19,7 @@ public class PersistentCache implements MutationCache { /** * The mutation cache */ - Cache> mutationCache; + private final Cache> mutationCache; private final TtlDB rocksDB; /** @@ -42,11 +43,14 @@ public PersistentCache(long maxDigests, long maxCapacity, Duration expireAfter, this.mutationCache = Caffeine.newBuilder() .expireAfterWrite(expireAfter.getSeconds(), TimeUnit.SECONDS) .maximumSize(maxCapacity) - .executor(Runnable::run) .recordStats() .removalListener((K key, List value, RemovalCause cause) -> { try { - rocksDB.delete(this.keySerializer.apply(key)); + // If the removal cause is not SIZE, we delete the key from RocksDB + // This is to avoid deleting the key when it is removed due to the size limit + if (cause != RemovalCause.SIZE ) { + rocksDB.delete(this.keySerializer.apply(key)); + } } catch (RocksDBException e) { throw new RuntimeException(e); } From e3d7973458b4d1bd74af222c9b9675d25a16bd82 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 26 Jun 2025 12:38:32 +0530 Subject: [PATCH 09/12] refactor: Move MutationCacheTests to the cache package --- .../com/datastax/oss/cdc/{ => cache}/MutationCacheTests.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) rename connector/src/test/java/com/datastax/oss/cdc/{ => cache}/MutationCacheTests.java (96%) diff --git a/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java b/connector/src/test/java/com/datastax/oss/cdc/cache/MutationCacheTests.java similarity index 96% rename from connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java rename to connector/src/test/java/com/datastax/oss/cdc/cache/MutationCacheTests.java index 085fbd28..9fa1918c 100644 --- a/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java +++ b/connector/src/test/java/com/datastax/oss/cdc/cache/MutationCacheTests.java @@ -13,10 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.datastax.oss.cdc; +package com.datastax.oss.cdc.cache; -import com.datastax.oss.cdc.cache.InMemoryCache; -import com.datastax.oss.cdc.cache.MutationCache; import com.github.benmanes.caffeine.cache.Caffeine; import org.junit.jupiter.api.Test; From 9788b47c93f1867107850036651e77faff45ab2b Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 26 Jun 2025 12:38:34 +0530 Subject: [PATCH 10/12] refactor: Move MutationCacheTests to the cache package --- .../oss/cdc/cache/PersistentCacheTests.java | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 connector/src/test/java/com/datastax/oss/cdc/cache/PersistentCacheTests.java diff --git a/connector/src/test/java/com/datastax/oss/cdc/cache/PersistentCacheTests.java b/connector/src/test/java/com/datastax/oss/cdc/cache/PersistentCacheTests.java new file mode 100644 index 00000000..5e70792d --- /dev/null +++ b/connector/src/test/java/com/datastax/oss/cdc/cache/PersistentCacheTests.java @@ -0,0 +1,95 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.cdc.cache; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.google.common.testing.FakeTicker; +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.rocksdb.RocksDBException; + +import java.io.File; +import java.time.Duration; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.*; + +public class PersistentCacheTests { + private String path; + + @BeforeEach + public void setUp() { + path = "./rocksdb_mutation_cache_" + (new Random()).nextInt(); + } + + @AfterEach + public void tearDown() { + try { + FileUtils.deleteDirectory(new File(path)); + } catch (Exception e) { + // Ignore any exceptions during cleanup + } + } + + @Test + public final void testMaxDigests() throws Exception { + MutationCache mutationCache = new PersistentCache<>(3, 10, Duration.ofHours(1), path, String::getBytes); + mutationCache.addMutationMd5("mutation1","digest1"); + mutationCache.addMutationMd5("mutation1","digest2"); + mutationCache.addMutationMd5("mutation1","digest3"); + mutationCache.addMutationMd5("mutation1","digest4"); + assertEquals(3L, mutationCache.getMutationCRCs("mutation1").size()); + } + + @Test + public final void testIsProcessed() throws Exception { + MutationCache mutationCache = new PersistentCache<>(3, 10, Duration.ofHours(1), path, String::getBytes); + assertFalse(mutationCache.isMutationProcessed("mutation1", "digest1")); + mutationCache.addMutationMd5("mutation1","digest1"); + assertTrue(mutationCache.isMutationProcessed("mutation1", "digest1")); + + } + + @Test + public final void testPersistence() throws Exception { + PersistentCache mutationCache = new PersistentCache<>(3, 10, Duration.ofHours(1), path, String::getBytes); + mutationCache.addMutationMd5("mutation1","digest1"); + mutationCache.addMutationMd5("mutation1","digest2"); + mutationCache.addMutationMd5("mutation1","digest3"); + + mutationCache.close(); + mutationCache = new PersistentCache<>(3, 10, Duration.ofHours(1), path, String::getBytes); + assertEquals(3L, mutationCache.getMutationCRCs("mutation1").size()); + } + + @Test + public final void testMaxCapacity() throws Exception { + MutationCache mutationCache = new PersistentCache<>(3, 10, Duration.ofHours(1), path, String::getBytes); + + for (int i = 0; i <20; i++) { + mutationCache.addMutationMd5("mutation" + i, "digest" + i); + } + + for (int i = 0; i < 20; i++) { + assertEquals(List.of("digest" + i), mutationCache.getMutationCRCs("mutation" + i)); + } + } +} From b86be914ebfa5a1affd5c2709fa11875dbf6afbf Mon Sep 17 00:00:00 2001 From: Arkadip Date: Fri, 27 Jun 2025 10:01:00 +0530 Subject: [PATCH 11/12] chore: Add Apache License header to cache-related Java files --- .../datastax/oss/cdc/cache/InMemoryCache.java | 15 +++++++++++++++ .../datastax/oss/cdc/cache/MutationCache.java | 15 +++++++++++++++ .../datastax/oss/cdc/cache/PersistentCache.java | 16 +++++++++++++++- 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/connector/src/main/java/com/datastax/oss/cdc/cache/InMemoryCache.java b/connector/src/main/java/com/datastax/oss/cdc/cache/InMemoryCache.java index 92fca06b..b7c74006 100644 --- a/connector/src/main/java/com/datastax/oss/cdc/cache/InMemoryCache.java +++ b/connector/src/main/java/com/datastax/oss/cdc/cache/InMemoryCache.java @@ -1,3 +1,18 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.datastax.oss.cdc.cache; import com.github.benmanes.caffeine.cache.Cache; diff --git a/connector/src/main/java/com/datastax/oss/cdc/cache/MutationCache.java b/connector/src/main/java/com/datastax/oss/cdc/cache/MutationCache.java index 63522717..0d83079f 100644 --- a/connector/src/main/java/com/datastax/oss/cdc/cache/MutationCache.java +++ b/connector/src/main/java/com/datastax/oss/cdc/cache/MutationCache.java @@ -1,3 +1,18 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.datastax.oss.cdc.cache; import com.github.benmanes.caffeine.cache.stats.CacheStats; diff --git a/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java b/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java index 42ec0b34..544b6845 100644 --- a/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java +++ b/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java @@ -1,10 +1,24 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.datastax.oss.cdc.cache; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.stats.CacheStats; -import com.google.common.annotations.VisibleForTesting; import org.rocksdb.Options; import org.rocksdb.RocksDBException; import org.rocksdb.TtlDB; From 2387798f0edb977a2d7ab8df5c88109ecb618d38 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Mon, 28 Jul 2025 11:10:43 +0530 Subject: [PATCH 12/12] test: Fix PersistentCacheTests to resolve assertion method compatibility --- .../com/datastax/oss/cdc/cache/PersistentCacheTests.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/connector/src/test/java/com/datastax/oss/cdc/cache/PersistentCacheTests.java b/connector/src/test/java/com/datastax/oss/cdc/cache/PersistentCacheTests.java index 5e70792d..fbb7f1c5 100644 --- a/connector/src/test/java/com/datastax/oss/cdc/cache/PersistentCacheTests.java +++ b/connector/src/test/java/com/datastax/oss/cdc/cache/PersistentCacheTests.java @@ -15,20 +15,15 @@ */ package com.datastax.oss.cdc.cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalCause; -import com.google.common.testing.FakeTicker; import org.apache.commons.io.FileUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.rocksdb.RocksDBException; import java.io.File; import java.time.Duration; import java.util.List; import java.util.Random; -import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.*; @@ -89,7 +84,9 @@ public final void testMaxCapacity() throws Exception { } for (int i = 0; i < 20; i++) { - assertEquals(List.of("digest" + i), mutationCache.getMutationCRCs("mutation" + i)); + List expected = new java.util.ArrayList<>(); + expected.add("digest" + i); + assertEquals(expected, mutationCache.getMutationCRCs("mutation" + i)); } } }