Skip to content

Commit

Permalink
fix: update/delete should not fail when table has multi-column primar…
Browse files Browse the repository at this point in the history
…y key (#32)
elefeint authored Mar 19, 2024
1 parent b781fc7 commit 5e40db6
Showing 5 changed files with 169 additions and 23 deletions.
29 changes: 17 additions & 12 deletions src/sql_generator.cpp
Original file line number Diff line number Diff line change
@@ -21,13 +21,14 @@ const auto print_column = [](const std::string &quoted_col,

void write_joined(
std::ostringstream &sql, const std::vector<const column_def *> &columns,
std::function<void(const std::string &, std::ostringstream &)> print_str) {
std::function<void(const std::string &, std::ostringstream &)> print_str,
const std::string &separator = ", ") {
bool first = true;
for (const auto &col : columns) {
if (first) {
first = false;
} else {
sql << ", ";
sql << separator;
}
print_str(KeywordHelper::WriteQuoted(col->name, '"'), sql);
}
@@ -301,11 +302,13 @@ void update_values(duckdb::Connection &con, const table_def &table,
});

sql << " FROM " << staging_table_name << " WHERE ";
write_joined(sql, columns_pk,
[&](const std::string &quoted_col, std::ostringstream &out) {
out << table.table_name << "." << quoted_col << " = "
<< staging_table_name << "." << quoted_col;
});
write_joined(
sql, columns_pk,
[&](const std::string &quoted_col, std::ostringstream &out) {
out << table.table_name << "." << quoted_col << " = "
<< staging_table_name << "." << quoted_col;
},
" AND ");

auto query = sql.str();
mdlog::info("update: " + query);
@@ -325,11 +328,13 @@ void delete_rows(duckdb::Connection &con, const table_def &table,
sql << "DELETE FROM " + absolute_table_name << " USING " << staging_table_name
<< " WHERE ";

write_joined(sql, columns_pk,
[&](const std::string &quoted_col, std::ostringstream &out) {
out << table.table_name << "." << quoted_col << " = "
<< staging_table_name << "." << quoted_col;
});
write_joined(
sql, columns_pk,
[&](const std::string &quoted_col, std::ostringstream &out) {
out << table.table_name << "." << quoted_col << " = "
<< staging_table_name << "." << quoted_col;
},
" AND ");

auto query = sql.str();
mdlog::info("delete_rows: " + query);
3 changes: 3 additions & 0 deletions test/files/multikey_table_delete.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
id1,id2,text,_fivetran_deleted,_fivetran_synced
1,100,"does not matter",true,"2024-01-09T04:23:41.165531936Z"
3,300,"this value does not matter, and neither does _fivetran_deleted -- the row will still be hard deleted because it's in this file",false,"2024-01-09T04:23:41.165531936Z"
3 changes: 3 additions & 0 deletions test/files/multikey_table_update.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
id1,id2,text,_fivetran_deleted,_fivetran_synced
2,200,"second row updated",false,"2024-02-09T00:00:00.000000000Z"
3,300,"third row soft deleted - but also this value updated",true,"2024-02-09T00:00:00.000000000Z"
4 changes: 4 additions & 0 deletions test/files/multikey_table_upsert.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id1,id2,text,_fivetran_deleted,_fivetran_synced
1,100,"first row",false,"2024-01-09T04:10:19.156057706Z"
2,200,"second row",false,"2024-01-09T04:10:19.156057706Z"
3,300,"third row",false,"2024-01-09T04:10:19.156057706Z"
153 changes: 142 additions & 11 deletions test/integration/test_server.cpp
Original file line number Diff line number Diff line change
@@ -266,6 +266,32 @@ void define_test_table(T &request, const std::string &table_name) {
col5->set_type(::fivetran_sdk::DataType::UTC_DATETIME);
}

template <typename T>
void define_test_multikey_table(T &request, const std::string &table_name) {
request.mutable_table()->set_name(table_name);
auto col1 = request.mutable_table()->add_columns();
col1->set_name("id1");
col1->set_type(::fivetran_sdk::DataType::INT);
col1->set_primary_key(true);

auto col2 = request.mutable_table()->add_columns();
col2->set_name("id2");
col2->set_type(::fivetran_sdk::DataType::INT);
col2->set_primary_key(true);

auto col3 = request.mutable_table()->add_columns();
col3->set_name("text");
col3->set_type(::fivetran_sdk::DataType::STRING);

auto col4 = request.mutable_table()->add_columns();
col4->set_name("_fivetran_deleted");
col4->set_type(::fivetran_sdk::DataType::BOOLEAN);

auto col5 = request.mutable_table()->add_columns();
col5->set_name("_fivetran_synced");
col5->set_type(::fivetran_sdk::DataType::UTC_DATETIME);
}

std::unique_ptr<duckdb::Connection> get_test_connection(char *token) {
std::unordered_map<std::string, std::string> props{
{"motherduck_token", token},
@@ -555,7 +581,7 @@ TEST_CASE("WriteBatch", "[integration][current]") {
}
}

TEST_CASE("CreateTable with multiple primary keys", "[integration]") {
TEST_CASE("Table with multiple primary keys", "[integration]") {
DestinationSdkImpl service;

const std::string table_name =
@@ -568,15 +594,7 @@ TEST_CASE("CreateTable with multiple primary keys", "[integration]") {
::fivetran_sdk::CreateTableRequest request;
(*request.mutable_configuration())["motherduck_token"] = token;
(*request.mutable_configuration())["motherduck_database"] = "fivetran_test";
request.mutable_table()->set_name(table_name);
auto col1 = request.mutable_table()->add_columns();
col1->set_name("id1");
col1->set_type(::fivetran_sdk::DataType::INT);
col1->set_primary_key(true);
auto col2 = request.mutable_table()->add_columns();
col2->set_name("id2");
col2->set_type(::fivetran_sdk::DataType::INT);
col2->set_primary_key(true);
define_test_multikey_table(request, table_name);

::fivetran_sdk::CreateTableResponse response;
auto status = service.CreateTable(nullptr, &request, &response);
@@ -594,9 +612,122 @@ TEST_CASE("CreateTable with multiple primary keys", "[integration]") {
::fivetran_sdk::DescribeTableResponse response;
auto status = service.DescribeTable(nullptr, &request, &response);
REQUIRE_NO_FAIL(status);
REQUIRE(response.table().columns().size() == 2);
REQUIRE(response.table().columns().size() == 5);

REQUIRE(response.table().columns(0).name() == "id1");
REQUIRE(response.table().columns(1).name() == "id2");
REQUIRE(response.table().columns(2).name() == "text");
REQUIRE(response.table().columns(3).name() == "_fivetran_deleted");
REQUIRE(response.table().columns(4).name() == "_fivetran_synced");
}
}

// test connection needs to be created after table creation to avoid stale
// catalog
auto con = get_test_connection(token);
{
// insert rows
::fivetran_sdk::WriteBatchRequest request;
(*request.mutable_configuration())["motherduck_token"] = token;
(*request.mutable_configuration())["motherduck_database"] = "fivetran_test";
define_test_multikey_table(request, table_name);
const std::string filename = "multikey_table_upsert.csv";
const std::string filepath = TEST_RESOURCES_DIR + filename;

request.add_replace_files(filepath);

::fivetran_sdk::WriteBatchResponse response;
auto status = service.WriteBatch(nullptr, &request, &response);
REQUIRE_NO_FAIL(status);
}

{
// check inserted rows
auto res = con->Query("SELECT id1, id2, text FROM " + table_name +
" ORDER BY id1, id2");
REQUIRE_NO_FAIL(res);
REQUIRE(res->RowCount() == 3);
REQUIRE(res->GetValue(0, 0) == 1);
REQUIRE(res->GetValue(1, 0) == 100);
REQUIRE(res->GetValue(2, 0) == "first row");

REQUIRE(res->GetValue(0, 1) == 2);
REQUIRE(res->GetValue(1, 1) == 200);
REQUIRE(res->GetValue(2, 1) == "second row");

REQUIRE(res->GetValue(0, 2) == 3);
REQUIRE(res->GetValue(1, 2) == 300);
REQUIRE(res->GetValue(2, 2) == "third row");
}

{
// update
::fivetran_sdk::WriteBatchRequest request;
(*request.mutable_configuration())["motherduck_token"] = token;
(*request.mutable_configuration())["motherduck_database"] = "fivetran_test";
request.mutable_csv()->set_unmodified_string("magic-unmodified-value");
request.mutable_csv()->set_null_string("magic-nullvalue");
define_test_multikey_table(request, table_name);
const std::string filename = "multikey_table_update.csv";
const std::string filepath = TEST_RESOURCES_DIR + filename;

request.add_update_files(filepath);

::fivetran_sdk::WriteBatchResponse response;
auto status = service.WriteBatch(nullptr, &request, &response);
REQUIRE_NO_FAIL(status);
}

{
// check after update, including a soft delete
auto res = con->Query("SELECT id1, id2, text, _fivetran_deleted FROM " +
table_name + " ORDER BY id1, id2");
REQUIRE_NO_FAIL(res);
REQUIRE(res->RowCount() == 3);
REQUIRE(res->GetValue(0, 0) == 1);
REQUIRE(res->GetValue(1, 0) == 100);
REQUIRE(res->GetValue(2, 0) == "first row");
REQUIRE(res->GetValue(3, 0) == false);

REQUIRE(res->GetValue(0, 1) == 2);
REQUIRE(res->GetValue(1, 1) == 200);
REQUIRE(res->GetValue(2, 1) == "second row updated");
REQUIRE(res->GetValue(3, 1) == false);

REQUIRE(res->GetValue(0, 2) == 3);
REQUIRE(res->GetValue(1, 2) == 300);
REQUIRE(res->GetValue(2, 2) ==
"third row soft deleted - but also this value updated");
REQUIRE(res->GetValue(3, 2) == true);
}

{
// delete
::fivetran_sdk::WriteBatchRequest request;
(*request.mutable_configuration())["motherduck_token"] = token;
(*request.mutable_configuration())["motherduck_database"] = "fivetran_test";
define_test_multikey_table(request, table_name);
const std::string filename = "multikey_table_delete.csv";
const std::string filepath = TEST_RESOURCES_DIR + filename;

request.add_delete_files(filepath);

::fivetran_sdk::WriteBatchResponse response;
auto status = service.WriteBatch(nullptr, &request, &response);
REQUIRE_NO_FAIL(status);
}

{
// check after hard delete
auto res = con->Query("SELECT id1, id2, text, _fivetran_deleted FROM " +
table_name + " ORDER BY id1, id2");
REQUIRE_NO_FAIL(res);
REQUIRE(res->RowCount() == 1);
REQUIRE(res->GetValue(0, 0) == 2);
REQUIRE(res->GetValue(1, 0) == 200);
REQUIRE(res->GetValue(2, 0) == "second row updated");
REQUIRE(res->GetValue(3, 0) == false);
}
}

TEST_CASE("CreateTable with JSON column", "[integration]") {

0 comments on commit 5e40db6

Please sign in to comment.