Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: misc cleanups to source code #326

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ public static DatabaseDialect findBestFor(
bestScore = score;
}
}
if (bestMatch == null) {
throw new ConnectException("Could not find best dialect match.");
}
LOG.debug("Using dialect {} with score {} against {}", bestMatch, bestScore, info);
return bestMatch.create(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1094,7 +1094,7 @@ protected ColumnConverter columnConverterFor(
}

case Types.BIT: {
/**
/*
* BIT should be either 0 or 1.
* TODO: Postgres handles this differently, returning a string "t" or "f". See the
* elasticsearch-jdbc plugin for an example of how this is handled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,6 @@ protected String currentTimestampDatabaseQuery() {
return "select getdate()";
}

@Override
protected String checkConnectionQuery() {
return "SELECT 1";
}

@Override
protected String getSqlType(final SinkRecordField field) {
if (field.schemaName() != null) {
Expand Down
14 changes: 4 additions & 10 deletions src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand All @@ -41,23 +40,18 @@ public enum InsertMode {
INSERT,
MULTI,
UPSERT,
UPDATE;
UPDATE
}

public enum PrimaryKeyMode {
NONE,
KAFKA,
RECORD_KEY,
RECORD_VALUE;
RECORD_VALUE
}

public static final List<String> DEFAULT_KAFKA_PK_NAMES = Collections.unmodifiableList(
Arrays.asList(
"__connect_topic",
"__connect_partition",
"__connect_offset"
)
);
public static final List<String> DEFAULT_KAFKA_PK_NAMES =
List.of("__connect_topic", "__connect_partition", "__connect_offset");

public static final String TABLE_NAME_FORMAT = "table.name.format";
public static final String TABLE_NAME_FORMAT_DEFAULT = "${topic}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public static ConfigDef baseConfigDef() {
return config;
}

private static final void addDatabaseOptions(final ConfigDef config) {
private static void addDatabaseOptions(final ConfigDef config) {
int orderInGroup = 0;
defineConnectionUrl(config, ++orderInGroup,
Arrays.asList(TABLE_WHITELIST_CONFIG, TABLE_BLACKLIST_CONFIG));
Expand Down Expand Up @@ -389,7 +389,7 @@ private static final void addDatabaseOptions(final ConfigDef config) {
defineSqlQuoteIdentifiers(config, ++orderInGroup);
}

private static final void addModeOptions(final ConfigDef config) {
private static void addModeOptions(final ConfigDef config) {
int orderInGroup = 0;
config.define(
MODE_CONFIG,
Expand Down Expand Up @@ -480,7 +480,7 @@ private static final void addModeOptions(final ConfigDef config) {
);
}

private static final void addConnectorOptions(final ConfigDef config) {
private static void addConnectorOptions(final ConfigDef config) {
int orderInGroup = 0;
config.define(
TABLE_TYPE_CONFIG,
Expand Down Expand Up @@ -558,7 +558,6 @@ public JdbcSourceConnectorConfig(final Map<String, ?> props) {

private static class TableRecommender implements Recommender {

@SuppressWarnings("unchecked")
@Override
public List<Object> validValues(final String name, final Map<String, Object> config) {
try {
Expand All @@ -568,8 +567,10 @@ public List<Object> validValues(final String name, final Map<String, Object> con
}
// Create the dialect to get the tables ...
final JdbcConfig jdbcConfig = new JdbcConfig(CONFIG_DEF, config);
final DatabaseDialect dialect = DatabaseDialects.findBestFor(dbUrl, jdbcConfig);
try (final Connection db = dialect.getConnection()) {
try (
final DatabaseDialect dialect = DatabaseDialects.findBestFor(dbUrl, jdbcConfig);
final Connection db = dialect.getConnection()
) {
final List<Object> result = new LinkedList<>();
for (final TableId id : dialect.tableIds(db)) {
// Just add the unqualified table name
Expand Down Expand Up @@ -757,7 +758,7 @@ public String toString() {

@Override
public List<Object> validValues(final String name, final Map<String, Object> connectorConfigs) {
return new ArrayList<Object>(canonicalValues);
return new ArrayList<>(canonicalValues);
}

@Override
Expand Down
17 changes: 8 additions & 9 deletions src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ public class JdbcSourceTask extends SourceTask {

private static final Logger log = LoggerFactory.getLogger(JdbcSourceTask.class);

private Time time;
private final Time time;
private JdbcSourceTaskConfig config;
private DatabaseDialect dialect;
private CachedConnectionProvider cachedConnectionProvider;
private PriorityQueue<TableQuerier> tableQueue = new PriorityQueue<TableQuerier>();
private final PriorityQueue<TableQuerier> tableQueue = new PriorityQueue<>();
private final AtomicBoolean running = new AtomicBoolean(false);

public JdbcSourceTask() {
Expand Down Expand Up @@ -306,16 +306,15 @@ public List<SourceRecord> poll() throws InterruptedException {
final long untilNext = nextUpdate - time.milliseconds();
final long sleepMs = Math.min(untilNext, 100);
if (sleepMs > 0) {
log.trace("Waiting {} ms to poll {} next ({} ms total left to wait)",
sleepMs, querier.toString(), untilNext);
log.trace("Waiting {} ms to poll {} next ({} ms total left to wait)", sleepMs, querier, untilNext);
time.sleep(sleepMs);
continue; // Re-check stop flag before continuing
}
}

final List<SourceRecord> results = new ArrayList<>();
try {
log.debug("Checking for next block of results from {}", querier.toString());
log.debug("Checking for next block of results from {}", querier);
querier.maybeStartQuery(cachedConnectionProvider.getConnection());

final int batchMaxRows = config.getInt(JdbcSourceTaskConfig.BATCH_MAX_ROWS_CONFIG);
Expand All @@ -331,14 +330,14 @@ public List<SourceRecord> poll() throws InterruptedException {
}

if (results.isEmpty()) {
log.trace("No updates for {}", querier.toString());
log.trace("No updates for {}", querier);
continue;
}

log.debug("Returning {} records for {}", results.size(), querier.toString());
log.debug("Returning {} records for {}", results.size(), querier);
return results;
} catch (final SQLException sqle) {
log.error("Failed to run query for table {}: {}", querier.toString(), sqle);
log.error("Failed to run query for table {}: {}", querier, sqle);
resetAndRequeueHead(querier);
return null;
} catch (final Throwable t) {
Expand All @@ -359,7 +358,7 @@ public List<SourceRecord> poll() throws InterruptedException {
}

private void resetAndRequeueHead(final TableQuerier expectedHead) {
log.debug("Resetting querier {}", expectedHead.toString());
log.debug("Resetting querier {}", expectedHead);
final TableQuerier removedQuerier = tableQueue.poll();
assert removedQuerier == expectedHead;
expectedHead.reset(time.milliseconds());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public Schema schema() {
* values are to be mapped/converted and then set on the corresponding {@link Field} in supplied
* {@link Struct} objects.
*
* @return the array of {@link FieldSetter} instances; never null and never empty
* @return the list of {@link FieldSetter} instances; never null and never empty
*/
List<FieldSetter> fieldSetters() {
return fieldSetters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public TimestampIncrementingCriteria(
final TimeZone timeZone
) {
this.timestampColumns =
timestampColumns != null ? timestampColumns : Collections.<ColumnId>emptyList();
timestampColumns != null ? timestampColumns : Collections.emptyList();
this.incrementingColumn = incrementingColumn;
this.timeZone = timeZone;
}
Expand Down Expand Up @@ -274,15 +274,14 @@ protected boolean isIntegralPrimitiveType(final Object incrementingColumnValue)
|| incrementingColumnValue instanceof Short || incrementingColumnValue instanceof Byte;
}

protected String coalesceTimestampColumns(final ExpressionBuilder builder) {
protected void coalesceTimestampColumns(final ExpressionBuilder builder) {
if (timestampColumns.size() == 1) {
builder.append(timestampColumns.get(0));
} else {
builder.append("COALESCE(");
builder.appendList().delimitedBy(",").of(timestampColumns);
builder.append(")");
}
return builder.toString();
}

protected void timestampIncrementingWhereClause(final ExpressionBuilder builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class TimestampIncrementingTableQuerier extends TableQuerier
private final List<String> timestampColumnNames;
private final List<ColumnId> timestampColumns;
private String incrementingColumnName;
private long timestampDelay;
private final long timestampDelay;
private final long initialTimestampOffset;
private final long initialIncrementingOffset;
private TimestampIncrementingOffset offset;
Expand All @@ -91,7 +91,7 @@ public TimestampIncrementingTableQuerier(final DatabaseDialect dialect,
super(dialect, mode, name, topicPrefix);
this.incrementingColumnName = incrementingColumnName;
this.timestampColumnNames = timestampColumnNames != null
? timestampColumnNames : Collections.<String>emptyList();
? timestampColumnNames : Collections.emptyList();
this.timestampDelay = timestampDelay;
this.initialTimestampOffset = timestampInitialMs;
this.initialIncrementingOffset = incrementingOffsetInitial;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,7 @@ public boolean jdbcVersionAtLeast(
if (this.jdbcMajorVersion() > jdbcMajorVersion) {
return true;
}
if (jdbcMajorVersion == jdbcMajorVersion() && jdbcMinorVersion() >= jdbcMinorVersion) {
return true;
}
return false;
return jdbcMajorVersion == jdbcMajorVersion() && jdbcMinorVersion() >= jdbcMinorVersion;
}

@Override
Expand Down
10 changes: 5 additions & 5 deletions src/test/java/io/aiven/connect/jdbc/source/EmbeddedDerby.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public class EmbeddedDerby {
private static final String NAME_PREFIX = "__test_database_";
private static final String PROTOCOL = "jdbc:derby:";

private String name;
private Connection conn;
private final String name;
private final Connection conn;

public EmbeddedDerby() {
this("default");
Expand Down Expand Up @@ -261,7 +261,7 @@ private static String formatLiteral(final Object value) throws SQLException {

public static class CaseSensitive {

private String name;
private final String name;

public CaseSensitive(final String name) {
this.name = name;
Expand Down Expand Up @@ -296,8 +296,8 @@ public static class Condition {

public static class EqualsCondition extends Condition {

private Object left;
private Object right;
private final Object left;
private final Object right;

public EqualsCondition(final Object left, final Object right) {
this.left = left;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public void testTimestampWithDelay() throws Exception {
startTask("modified", null, null, 4L, "UTC", null, null);
verifyTimestampFirstPoll(TOPIC_PREFIX + SINGLE_TABLE_NAME);

final Long currentTime = new Date().getTime();
final long currentTime = new Date().getTime();

// Validate that we are seeing 2,3 but not 4,5 as they are getting delayed to the next round
// Using "toString" and not UTC because Derby's current_timestamp is always local time
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/io/aiven/connect/jdbc/source/MockTime.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*/
public class MockTime implements Time {

private long nanos = 0;
private long nanos;
private long autoTickMs = 0;

public MockTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class TimestampIncrementingCriteriaTest {
private TimestampIncrementingCriteria criteriaIncTs;
private Schema schema;
private Struct record;
private TimeZone utcTimeZone = TimeZone.getTimeZone(ZoneOffset.UTC);
private final TimeZone utcTimeZone = TimeZone.getTimeZone(ZoneOffset.UTC);

@BeforeEach
public void beforeEach() {
Expand All @@ -64,7 +64,7 @@ public void beforeEach() {
}

protected void assertExtractedOffset(final long expected, final Schema schema, final Struct record) {
TimestampIncrementingCriteria criteria = null;
final TimestampIncrementingCriteria criteria;
if (schema.field(INCREMENTING_COLUMN.name()) != null) {
if (schema.field(TS1_COLUMN.name()) != null) {
criteria = criteriaIncTs;
Expand Down
Loading