Skip to content

Commit

Permalink
Merge pull request #410 from ClickHouse/show-tables-from-database
Browse files Browse the repository at this point in the history
Adjusting the table mapping code to account for additional databases
  • Loading branch information
Paultagoras authored Jul 8, 2024
2 parents d1fbe2b + 8b66c25 commit 3a5b8d3
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public ProxySinkTask(final ClickHouseSinkConfig clickHouseSinkConfig, final Erro

// Add table mapping refresher
if (clickHouseSinkConfig.getTableRefreshInterval() > 0) {
TableMappingRefresher tableMappingRefresher = new TableMappingRefresher(chWriter);
TableMappingRefresher tableMappingRefresher = new TableMappingRefresher(clickHouseSinkConfig.getDatabase(), chWriter);
Timer tableRefreshTimer = new Timer();
tableRefreshTimer.schedule(tableMappingRefresher, clickHouseSinkConfig.getTableRefreshInterval(), clickHouseSinkConfig.getTableRefreshInterval());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ public ClickHouseWriter() {
this.mapping = new HashMap<String, Table>();
}

protected void setClient(ClickHouseHelperClient chc) {
this.chc = chc;
}
protected void setSinkConfig(ClickHouseSinkConfig csc) {
this.csc = csc;
}
protected Map<String, Table> getMapping() {
return mapping;
}

@Override
public boolean start(ClickHouseSinkConfig csc) {
LOGGER.trace("Starting ClickHouseWriter");
Expand Down Expand Up @@ -100,7 +110,7 @@ public boolean start(ClickHouseSinkConfig csc) {

LOGGER.debug("Ping was successful.");

this.updateMapping();
this.updateMapping(csc.getDatabase());
if (mapping.isEmpty()) {
LOGGER.error("Did not find any tables in destination Please create before running.");
return false;
Expand All @@ -109,7 +119,7 @@ public boolean start(ClickHouseSinkConfig csc) {
return true;
}

public void updateMapping() {
public void updateMapping(String database) {
// Do not start a new update cycle if one is already in progress
if (this.isUpdateMappingRunning.get()) {
return;
Expand All @@ -120,21 +130,18 @@ public void updateMapping() {

try {
// Getting tables from ClickHouse
List<Table> tableList = this.chc.extractTablesMapping(this.mapping);
List<Table> tableList = this.chc.extractTablesMapping(database, this.mapping);
if (tableList.isEmpty()) {
return;
}

HashMap<String, Table> mapping = new HashMap<String, Table>();

// Adding new tables to mapping
// Adding new tables to mapping, or update existing tables
// TODO: check Kafka Connect's topics name or topics regex config and
// only add tables to in-memory mapping that matches the topics we consume.
for (Table table : tableList) {
mapping.put(table.getName(), table);
mapping.put(table.getFullName(), table);
}

this.mapping = mapping;
} finally {
this.isUpdateMappingRunning.set(false);
}
Expand Down Expand Up @@ -162,7 +169,8 @@ public void doInsert(List<Record> records, QueryIdentifier queryId, ErrorReporte

Record first = records.get(0);
String topic = first.getTopic();
Table table = getTable(topic);
String database = first.getDatabase();
Table table = getTable(database, topic);
if (table == null) { return; }//We checked the error flag in getTable, so we don't need to check it again here
LOGGER.debug("Trying to insert [{}] records to table name [{}] (QueryId: [{}])", records.size(), table.getName(), queryId.getQueryId());

Expand Down Expand Up @@ -864,10 +872,17 @@ private ClickHouseRequest.Mutation getMutationRequest(ClickHouseClient client, C

return request;
}
private Table getTable(String topic) {
String tableName = Utils.getTableName(topic, csc.getTopicToTableMap());
protected Table getTable(String database, String topic) {
String tableName = Utils.getTableName(database, topic, csc.getTopicToTableMap());
Table table = this.mapping.get(tableName);
if (table == null) {
this.updateMapping(database);
table = this.mapping.get(tableName);//If null, update then do it again to be sure
}

if (table == null) {
this.updateMapping(database);

if (csc.isSuppressTableExistenceException()) {
LOGGER.warn("Table [{}] does not exist, but error was suppressed.", tableName);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@
public class TableMappingRefresher extends TimerTask {
private static final Logger LOGGER = LoggerFactory.getLogger(TableMappingRefresher.class);
private ClickHouseWriter chWriter = null;
private String database = null;

public TableMappingRefresher(final ClickHouseWriter chWriter) {
public TableMappingRefresher(String database, final ClickHouseWriter chWriter) {
this.chWriter = chWriter;
this.database = database;
}

@Override
public void run() {
try {
chWriter.updateMapping();
chWriter.updateMapping(database);
} catch (Exception e) {
LOGGER.error("Update mapping Error: {}", e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,31 +164,30 @@ public ClickHouseResponse query(String query, ClickHouseFormat clickHouseFormat)
throw new RuntimeException(ce);
}

public List<String> showTables() {
public List<String> showTables(String database) {
List<String> tablesNames = new ArrayList<>();
try (ClickHouseClient client = ClickHouseClient.builder()
.options(getDefaultClientOptions())
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
.build();
ClickHouseResponse response = client.read(server)
.query("SHOW TABLES")
.query(String.format("SHOW TABLES FROM `%s`", database))
.executeAndWait()) {
for (ClickHouseRecord r : response.records()) {
ClickHouseValue v = r.getValue(0);
String tableName = v.asString();
tablesNames.add(tableName);
}

} catch (ClickHouseException e) {
LOGGER.error("Failed in show tables", e);
}
return tablesNames;
}

public Table describeTable(String tableName) {
public Table describeTable(String database, String tableName) {
if (tableName.startsWith(".inner"))
return null;
String describeQuery = String.format("DESCRIBE TABLE `%s`.`%s`", this.database, tableName);
String describeQuery = String.format("DESCRIBE TABLE `%s`.`%s`", database, tableName);
LOGGER.debug(describeQuery);

try (ClickHouseClient client = ClickHouseClient.builder()
Expand All @@ -201,7 +200,7 @@ public Table describeTable(String tableName) {
.query(describeQuery)
.executeAndWait()) {

Table table = new Table(tableName);
Table table = new Table(database, tableName);
for (ClickHouseRecord r : response.records()) {
ClickHouseValue v = r.getValue(0);

Expand Down Expand Up @@ -229,17 +228,13 @@ public Table describeTable(String tableName) {
return null;
}
}

public List<Table> extractTablesMapping() {
HashMap<String, Table> cache = new HashMap<String, Table>();
return extractTablesMapping(cache);
}

public List<Table> extractTablesMapping(Map<String, Table> cache) {

public List<Table> extractTablesMapping(String database, Map<String, Table> cache) {
List<Table> tableList = new ArrayList<>();
for (String tableName : showTables() ) {
// Table names are escaped in the cache
String escapedTableName = Utils.escapeTopicName(tableName);
for (String tableName : showTables(database) ) {
// (Full) Table names are escaped in the cache
String escapedTableName = Utils.escapeTableName(database, tableName);

// Read from cache if we already described this table before
// This means we won't pick up edited table configs until the connector is restarted
Expand All @@ -248,7 +243,7 @@ public List<Table> extractTablesMapping(Map<String, Table> cache) {
continue;
}

Table table = describeTable(tableName);
Table table = describeTable(this.database, tableName);
if (table != null )
tableList.add(table);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class Table {
private static final Pattern MULTIPLE_MAP_VALUES_PATTERN = Pattern.compile("(\\.values)(?=((\\.values)+$))");

private final String name;
private final String database;

private final List<Column> rootColumnsList;
private final Map<String, Column> rootColumnsMap;
Expand All @@ -29,7 +30,8 @@ public class Table {
@Accessors(fluent = true)
private boolean hasDefaults;

public Table(String name) {
public Table(String database, String name) {
this.database = database;
this.name = name;
this.rootColumnsList = new ArrayList<>();
this.rootColumnsMap = new HashMap<>();
Expand All @@ -39,7 +41,11 @@ public Table(String name) {
}

public String getName() {
return Utils.escapeTopicName(name);
return Utils.escapeName(name);
}

public String getFullName() {
return Utils.escapeTableName(database, name);
}

private void registerValidColumn(Column column) {
Expand Down
13 changes: 9 additions & 4 deletions src/main/java/com/clickhouse/kafka/connect/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@

public class Utils {

public static String escapeTopicName(String topic) {
return String.format("`%s`", topic);
public static String escapeName(String topic) {
String cleanTopic = topic.replace("`", "");
return String.format("`%s`", cleanTopic);
}

public static String escapeTableName(String database, String topicName) {
return escapeName(database) + "." + escapeName(topicName);
}

private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class);
Expand Down Expand Up @@ -135,14 +140,14 @@ public static void sendTODlq(ErrorReporter errorReporter, SinkRecord record, Exc
}
}

public static String getTableName(String topicName, Map<String, String> topicToTableMap) {
public static String getTableName(String database, String topicName, Map<String, String> topicToTableMap) {
String tableName = topicToTableMap.get(topicName);
LOGGER.debug("Topic name: {}, Table Name: {}", topicName, tableName);
if (tableName == null) {
tableName = topicName;
}

return escapeTopicName(tableName);
return escapeTableName(database, tableName);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ protected static void tearDown() {
db.stop();
}

protected ClickHouseSinkConfig createConfig() {
return new ClickHouseSinkConfig(createProps());
}

protected ClickHouseHelperClient createClient(Map<String,String> props) {
return createClient(props, true);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.clickhouse.kafka.connect.sink.db;

import com.clickhouse.kafka.connect.sink.ClickHouseBase;
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient;
import com.clickhouse.kafka.connect.sink.db.mapping.Table;
import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers;
import com.clickhouse.kafka.connect.util.Utils;
import org.junit.jupiter.api.Test;

import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

public class ClickHouseWriterTest extends ClickHouseBase {

@Test
public void updateMapping() {
Map<String, String> props = createProps();;
ClickHouseHelperClient chc = createClient(props);
String topic = createTopicName("missing_table_mapping_test");

ClickHouseTestHelpers.dropTable(chc, topic);
ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16 ) Engine = MergeTree ORDER BY off16");

ClickHouseWriter chw = new ClickHouseWriter();
chw.setSinkConfig(createConfig());
chw.setClient(chc);

chw.updateMapping("default");
Map<String, Table> tables = chw.getMapping();
assertNull(tables.get(Utils.escapeTableName(chc.getDatabase(), topic)));

Table table = chw.getTable(chc.getDatabase(), topic);
assertNotNull(table);
assertEquals(Utils.escapeTableName(chc.getDatabase(), topic), table.getFullName());

tables = chw.getMapping();
assertNotNull(tables.get(Utils.escapeTableName(chc.getDatabase(), topic)));

ClickHouseTestHelpers.dropTable(chc, topic);
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
package com.clickhouse.kafka.connect.sink.db.helper;

import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.kafka.connect.sink.ClickHouseBase;
import com.clickhouse.kafka.connect.sink.db.mapping.Table;
import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers;
import com.clickhouse.kafka.connect.sink.junit.extension.FromVersionConditionExtension;
import com.clickhouse.kafka.connect.sink.junit.extension.SinceClickHouseVersion;
import com.clickhouse.kafka.connect.util.Utils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@ExtendWith(FromVersionConditionExtension.class)
Expand All @@ -32,6 +34,19 @@ public void ping() {
Assertions.assertTrue(chc.ping());
}

@Test
public void showTables() {
String topic = createTopicName("simple_table_test");
ClickHouseTestHelpers.createTable(chc, topic,
"CREATE TABLE %s ( `num` String ) Engine = MergeTree ORDER BY num");
try {
List<String> tableNames = chc.showTables(chc.getDatabase());
Assertions.assertTrue(tableNames.contains(topic));
} finally {
ClickHouseTestHelpers.dropTable(chc, topic);
}
}

@Test
public void describeNestedFlattenedTable() {
String topic = createTopicName("nested_flattened_table_test");
Expand All @@ -41,7 +56,7 @@ public void describeNestedFlattenedTable() {
"Engine = MergeTree ORDER BY num");

try {
Table table = chc.describeTable(topic);
Table table = chc.describeTable(chc.getDatabase(), topic);
Assertions.assertEquals(3, table.getRootColumnsList().size());
} finally {
ClickHouseTestHelpers.dropTable(chc, topic);
Expand Down Expand Up @@ -70,10 +85,10 @@ public void describeNestedUnFlattenedTable() {
"Engine = MergeTree ORDER BY num");

try {
Table nestedTable = chc.describeTable(nestedTopic);
Table nestedTable = chc.describeTable(chc.getDatabase(), nestedTopic);
Assertions.assertNull(nestedTable);

Table normalTable = chc.describeTable(normalTopic);
Table normalTable = chc.describeTable(chc.getDatabase(), normalTopic);
Assertions.assertEquals(1, normalTable.getRootColumnsList().size());
} finally {
ClickHouseTestHelpers.dropTable(chc, nestedTopic);
Expand Down
Loading

0 comments on commit 3a5b8d3

Please sign in to comment.