Skip to content

Commit

Permalink
DBZ-8363 creation of single test artifact
Browse files Browse the repository at this point in the history
  • Loading branch information
smiklosovic committed Nov 4, 2024
1 parent b777a76 commit 4447f1e
Show file tree
Hide file tree
Showing 80 changed files with 918 additions and 3,125 deletions.
6 changes: 4 additions & 2 deletions cassandra-3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-cassandra-core</artifactId>
<artifactId>debezium-connector-cassandra-tests</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -86,6 +85,9 @@
<cassandra.version>${version.cassandra3}</cassandra.version>
<docker.dir>${project.basedir}/src/test/resources/docker</docker.dir>
</systemPropertyVariables>
<dependenciesToScan>
<dependency>io.debezium:debezium-connector-cassandra-tests</dependency>
</dependenciesToScan>
</configuration>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.List;
import java.util.function.Function;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.BooleanType;
Expand Down Expand Up @@ -110,10 +109,4 @@ public Object deserialize(Object abstractType, ByteBuffer bb) {
public Function<Object, Object> baseTypeForReversedType() {
return abstractType -> ((AbstractType<?>) abstractType).isReversed() ? ((ReversedType<?>) abstractType).baseType : abstractType;
}

@Override
public String getClusterName() {
return DatabaseDescriptor.getClusterName();
}

}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.cassandra;

import java.io.File;
import java.io.IOException;

import org.apache.cassandra.db.commitlog.CommitLogReadHandler;
import org.apache.cassandra.db.commitlog.CommitLogReader;

import io.debezium.config.Configuration;
import io.debezium.connector.cassandra.spi.CassandraTestProvider;
import io.debezium.connector.cassandra.spi.CommitLogProcessing;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;

public class Cassandra3TestProvider implements CassandraTestProvider {
@Override
public CommitLogProcessing provideCommitLogProcessing(CassandraConnectorContext context, CommitLogProcessorMetrics metrics) {
return new Cassandra3CommitLogProcessing(context, metrics);
}

private static class Cassandra3CommitLogProcessing implements CommitLogProcessing {

private final CommitLogReadHandler commitLogReadHandler;
private final CommitLogSegmentReader commitLogSegmentReader;
private final CassandraConnectorContext context;

Cassandra3CommitLogProcessing(CassandraConnectorContext context, CommitLogProcessorMetrics metrics) {
commitLogReadHandler = new Cassandra3CommitLogReadHandlerImpl(context, metrics);
commitLogSegmentReader = new Cassandra3CommitLogSegmentReader(context, metrics);
this.context = context;
}

@Override
public void readAllCommitLogs(File[] commitLogs) throws IOException {
CommitLogReader reader = new CommitLogReader();
File cdcLoc = new File(context.getCassandraConnectorConfig().getCdcLogLocation());
for (File commitLog : CommitLogUtil.getCommitLogs(cdcLoc)) {
reader.readCommitLogSegment(commitLogReadHandler, commitLog, true);
}
}

@Override
public void readCommitLogSegment(File file, long segmentId, int position) throws IOException {
commitLogSegmentReader.readCommitLogSegment(file, segmentId, position);
}

@Override
public CommitLogSegmentReader getCommitLogSegmentReader() {
return commitLogSegmentReader;
}
}

@Override
public CassandraConnectorContext provideContext(Configuration configuration) throws Exception {
CassandraConnectorConfig config = new CassandraConnectorConfig(configuration);
Cassandra3TypeProvider provider = new Cassandra3TypeProvider();
CassandraTypeDeserializer.init(provider.deserializers(), config.getDecimalMode(), config.getVarIntMode(),
provider.baseTypeForReversedType());

return new DefaultCassandraConnectorContext(config,
new CassandraConnectorTask.Cassandra3SchemaLoader(),
new CassandraConnectorTask.Cassandra3SchemaChangeListenerProvider(),
new FileOffsetWriter(config));
}

@Override
public CassandraConnectorContext provideContextWithoutSchemaManagement(Configuration configuration) {
CassandraConnectorConfig config = new CassandraConnectorConfig(configuration);
Cassandra3TypeProvider provider = new Cassandra3TypeProvider();
CassandraTypeDeserializer.init(provider.deserializers(), config.getDecimalMode(), config.getVarIntMode(),
provider.baseTypeForReversedType());

return new DefaultCassandraConnectorContext(new CassandraConnectorConfig(configuration));
}
}

This file was deleted.

Loading

0 comments on commit 4447f1e

Please sign in to comment.