diff --git a/includes/sql_generator.hpp b/includes/sql_generator.hpp index ab9414b..af7b063 100644 --- a/includes/sql_generator.hpp +++ b/includes/sql_generator.hpp @@ -15,7 +15,7 @@ struct table_def { std::string schema_name; std::string table_name; - std::string to_string() const; + std::string to_escaped_string() const; }; bool schema_exists(duckdb::Connection &con, const std::string &db_name, @@ -47,7 +47,10 @@ void update_values(duckdb::Connection &con, const table_def &table, std::vector &columns_regular, const std::string &unmodified_string); -void truncate_table(duckdb::Connection &con, const table_def &table); +void truncate_table(duckdb::Connection &con, const table_def &table, + const std::string &synced_column, + std::chrono::nanoseconds &cutoff_ns, + const std::string &deleted_column); void delete_rows(duckdb::Connection &con, const table_def &table, const std::string &staging_table_name, diff --git a/src/motherduck_destination_server.cpp b/src/motherduck_destination_server.cpp index 4702616..c7e535b 100644 --- a/src/motherduck_destination_server.cpp +++ b/src/motherduck_destination_server.cpp @@ -269,11 +269,22 @@ DestinationSdkImpl::Truncate(::grpc::ServerContext *context, find_property(request->configuration(), MD_PROP_DATABASE); table_def table_name{db_name, get_schema_name(request), get_table_name(request)}; + if (request->synced_column().empty()) { + throw std::invalid_argument("Synced column is required"); + } + if (request->soft().deleted_column().empty()) { + // right now, only soft deletes are supported + throw std::invalid_argument("Deleted column is required"); + } std::unique_ptr con = get_connection(request->configuration(), db_name); if (table_exists(*con, table_name)) { - truncate_table(*con, table_name); + std::chrono::nanoseconds delete_before_ts = + std::chrono::seconds(request->utc_delete_before().seconds()) + + std::chrono::nanoseconds(request->utc_delete_before().nanos()); + truncate_table(*con, table_name, request->synced_column(), + delete_before_ts, request->soft().deleted_column()); } else { mdlog::warning("Table <" + request->table_name() + "> not found in schema <" + request->schema_name() + diff --git a/src/sql_generator.cpp b/src/sql_generator.cpp index 08d697a..3b18db5 100644 --- a/src/sql_generator.cpp +++ b/src/sql_generator.cpp @@ -8,7 +8,7 @@ using duckdb::KeywordHelper; // Utility -std::string table_def::to_string() const { +std::string table_def::to_escaped_string() const { std::ostringstream out; out << KeywordHelper::WriteQuoted(db_name, '"') << "." << KeywordHelper::WriteQuoted(schema_name, '"') << "." @@ -65,7 +65,7 @@ bool table_exists(duckdb::Connection &con, const table_def &table) { if (result->HasError()) { throw std::runtime_error("Could not find whether table <" + - table.to_string() + + table.to_escaped_string() + "> exists: " + result->GetError()); } auto materialized_result = duckdb::unique_ptr_cast< @@ -83,7 +83,7 @@ void create_schema(duckdb::Connection &con, const std::string &db_name, void create_table(duckdb::Connection &con, const table_def &table, const std::vector &columns_pk, const std::vector &all_columns) { - const std::string absolute_table_name = table.to_string(); + const std::string absolute_table_name = table.to_escaped_string(); std::ostringstream ddl; ddl << "CREATE OR REPLACE TABLE " << absolute_table_name << " ("; @@ -129,7 +129,8 @@ std::vector describe_table(duckdb::Connection &con, auto result = statement->Execute(params, false); if (result->HasError()) { - throw std::runtime_error("Could not describe table <" + table.to_string() + + throw std::runtime_error("Could not describe table <" + + table.to_escaped_string() + ">:" + result->GetError()); } auto materialized_result = duckdb::unique_ptr_cast< @@ -148,7 +149,7 @@ std::vector describe_table(duckdb::Connection &con, void alter_table(duckdb::Connection &con, const table_def &table, const std::vector &columns) { - auto absolute_table_name = table.to_string(); + auto absolute_table_name = table.to_escaped_string(); std::set alter_types; std::set added_columns; std::set deleted_columns; @@ -233,7 +234,7 @@ void upsert(duckdb::Connection &con, const table_def &table, const std::string &staging_table_name, std::vector &columns_pk, std::vector &columns_regular) { - const std::string absolute_table_name = table.to_string(); + const std::string absolute_table_name = table.to_escaped_string(); std::ostringstream sql; sql << "INSERT INTO " << absolute_table_name << " SELECT * FROM " << staging_table_name; @@ -264,7 +265,7 @@ void update_values(duckdb::Connection &con, const table_def &table, const std::string &unmodified_string) { std::ostringstream sql; - auto absolute_table_name = table.to_string(); + auto absolute_table_name = table.to_escaped_string(); sql << "UPDATE " << absolute_table_name << " SET "; @@ -299,7 +300,7 @@ void delete_rows(duckdb::Connection &con, const table_def &table, const std::string &staging_table_name, std::vector &columns_pk) { - const std::string absolute_table_name = table.to_string(); + const std::string absolute_table_name = table.to_escaped_string(); std::ostringstream sql; sql << "DELETE FROM " + absolute_table_name << " USING " << staging_table_name << " WHERE "; @@ -319,13 +320,27 @@ void delete_rows(duckdb::Connection &con, const table_def &table, } } -void truncate_table(duckdb::Connection &con, const table_def &table) { - const std::string absolute_table_name = table.to_string(); +void truncate_table(duckdb::Connection &con, const table_def &table, + const std::string &synced_column, + std::chrono::nanoseconds &cutoff_ns, + const std::string &deleted_column) { + const std::string absolute_table_name = table.to_escaped_string(); std::ostringstream sql; - sql << "DELETE FROM " + absolute_table_name; + + sql << "UPDATE " << absolute_table_name << " SET " + << KeywordHelper::WriteQuoted(deleted_column, '"') << " = true WHERE " + << KeywordHelper::WriteQuoted(synced_column, '"') + << " < make_timestamp(?)"; auto query = sql.str(); mdlog::info("truncate_table: " + query); - auto result = con.Query(query); + auto statement = con.Prepare(query); + + // DuckDB make_timestamp takes microseconds; Fivetran sends millisecond + // precision -- safe to divide with truncation + long cutoff_microseconds = cutoff_ns.count() / 1000; + duckdb::vector params = {duckdb::Value(cutoff_microseconds)}; + + auto result = statement->Execute(params, false); if (result->HasError()) { throw std::runtime_error("Error truncating table <" + absolute_table_name + ">:" + result->GetError()); diff --git a/test/files/books_update.csv b/test/files/books_update.csv index 7cb1440..648fb08 100644 --- a/test/files/books_update.csv +++ b/test/files/books_update.csv @@ -1,3 +1,3 @@ id,title,magic_number,_fivetran_deleted,_fivetran_synced -3,"unmod-NcK9NIjPUutCsz4mjOQQztbnwnE1sY3",15,false,"2024-01-09T04:30:13.984276065Z" -2,"The empire strikes back","unmod-NcK9NIjPUutCsz4mjOQQztbnwnE1sY3",false,"2024-01-09T04:30:13.984276065Z" +3,"unmod-NcK9NIjPUutCsz4mjOQQztbnwnE1sY3",15,false,"2024-02-08T23:59:59.999999999Z" +2,"The empire strikes back","unmod-NcK9NIjPUutCsz4mjOQQztbnwnE1sY3",false,"2024-02-09T00:00:00.000000000Z" diff --git a/test/integration/test_server.cpp b/test/integration/test_server.cpp index cb24f64..b812bac 100644 --- a/test/integration/test_server.cpp +++ b/test/integration/test_server.cpp @@ -438,6 +438,7 @@ TEST_CASE("WriteBatch", "[integration][current]") { } { + // check after update auto res = con->Query("SELECT id, title, magic_number FROM " + table_name + " ORDER BY id"); INFO(res->GetError()); @@ -454,11 +455,17 @@ TEST_CASE("WriteBatch", "[integration][current]") { } { - // truncate table + // truncate data before Jan 9 2024 ::fivetran_sdk::TruncateRequest request; (*request.mutable_configuration())["motherduck_token"] = token; (*request.mutable_configuration())["motherduck_database"] = "fivetran_test"; request.set_table_name(table_name); + request.set_synced_column("_fivetran_synced"); + request.mutable_soft()->set_deleted_column("_fivetran_deleted"); + + const auto cutoff_datetime = 1707436800; // 2024-02-09 0:0:0 GMT, trust me + request.mutable_utc_delete_before()->set_seconds(cutoff_datetime); + request.mutable_utc_delete_before()->set_nanos(0); ::fivetran_sdk::TruncateResponse response; auto status = service.Truncate(nullptr, &request, &response); @@ -468,39 +475,61 @@ TEST_CASE("WriteBatch", "[integration][current]") { { // check truncated table - auto res = con->Query("SELECT id, title, magic_number FROM " + table_name + - " ORDER BY id"); + auto res = con->Query("SELECT title, id, magic_number FROM " + table_name + + " WHERE _fivetran_deleted = false ORDER BY id"); INFO(res->GetError()); REQUIRE(!res->HasError()); - REQUIRE(res->RowCount() == 0); + // the 1st row from books_update.csv that had 2024-02-08T23:59:59.999999999Z + // timestamp got deleted + REQUIRE(res->RowCount() == 1); + REQUIRE(res->GetValue(0, 0) == "The empire strikes back"); + REQUIRE(res->GetValue(1, 0) == 2); + REQUIRE(res->GetValue(2, 0) == 1); } -} -TEST_CASE("Truncate nonexistent table should succeed", "[integration]") { - DestinationSdkImpl service; + { + // check the rows did not get physically deleted + auto res = con->Query("SELECT title, id, magic_number FROM " + table_name + + " ORDER BY id"); + INFO(res->GetError()); + REQUIRE(!res->HasError()); + REQUIRE(res->RowCount() == 2); + } - const std::string bad_table_name = "nonexistent"; + { + // truncate table does nothing if there is no utc_delete_before field set + ::fivetran_sdk::TruncateRequest request; + (*request.mutable_configuration())["motherduck_token"] = token; + (*request.mutable_configuration())["motherduck_database"] = "fivetran_test"; + request.set_table_name(table_name); + request.set_synced_column("_fivetran_synced"); + request.mutable_soft()->set_deleted_column("_fivetran_deleted"); - auto token = std::getenv("motherduck_token"); - REQUIRE(token); + ::fivetran_sdk::TruncateResponse response; + auto status = service.Truncate(nullptr, &request, &response); - ::fivetran_sdk::TruncateRequest request; - (*request.mutable_configuration())["motherduck_token"] = token; - (*request.mutable_configuration())["motherduck_database"] = "fivetran_test"; - request.set_schema_name("some_schema"); - request.set_table_name(bad_table_name); - ::fivetran_sdk::TruncateResponse response; + INFO(status.error_message()); + REQUIRE(status.ok()); + } - std::stringstream buffer; - std::streambuf *real_cout = std::cout.rdbuf(buffer.rdbuf()); - auto status = service.Truncate(nullptr, &request, &response); - std::cout.rdbuf(real_cout); + { + // check truncated table is the same as before + auto res = con->Query("SELECT title FROM " + table_name + + " WHERE _fivetran_deleted = false ORDER BY id"); + INFO(res->GetError()); + REQUIRE(!res->HasError()); + REQUIRE(res->RowCount() == 1); + REQUIRE(res->GetValue(0, 0) == "The empire strikes back"); + } - INFO(status.error_message()); - REQUIRE(status.ok()); - REQUIRE_THAT(buffer.str(), Catch::Matchers::ContainsSubstring( - "Table not found in schema " - "; not truncated")); + { + // check again that the rows did not get physically deleted + auto res = con->Query("SELECT title, id, magic_number FROM " + table_name + + " ORDER BY id"); + INFO(res->GetError()); + REQUIRE(!res->HasError()); + REQUIRE(res->RowCount() == 2); + } } TEST_CASE("CreateTable with multiple primary keys", "[integration]") { @@ -549,4 +578,73 @@ TEST_CASE("CreateTable with multiple primary keys", "[integration]") { REQUIRE(response.table().columns().size() == 2); } } +} + +TEST_CASE("Truncate nonexistent table should succeed", "[integration]") { + DestinationSdkImpl service; + + const std::string bad_table_name = "nonexistent"; + + auto token = std::getenv("motherduck_token"); + REQUIRE(token); + + ::fivetran_sdk::TruncateRequest request; + (*request.mutable_configuration())["motherduck_token"] = token; + (*request.mutable_configuration())["motherduck_database"] = "fivetran_test"; + request.set_schema_name("some_schema"); + request.set_table_name(bad_table_name); + request.set_synced_column("_fivetran_synced"); + request.mutable_soft()->set_deleted_column("_fivetran_deleted"); + + ::fivetran_sdk::TruncateResponse response; + + std::stringstream buffer; + std::streambuf *real_cout = std::cout.rdbuf(buffer.rdbuf()); + auto status = service.Truncate(nullptr, &request, &response); + std::cout.rdbuf(real_cout); + + INFO(status.error_message()); + REQUIRE(status.ok()); + REQUIRE_THAT(buffer.str(), Catch::Matchers::ContainsSubstring( + "Table not found in schema " + "; not truncated")); +} + +TEST_CASE("Truncate fails if required properties are missing") { + + DestinationSdkImpl service; + + const std::string bad_table_name = "nonexistent"; + + auto token = std::getenv("motherduck_token"); + REQUIRE(token); + + { + // synced_column is required + ::fivetran_sdk::TruncateRequest request; + (*request.mutable_configuration())["motherduck_token"] = token; + (*request.mutable_configuration())["motherduck_database"] = "fivetran_test"; + request.set_table_name("some_table"); + + ::fivetran_sdk::TruncateResponse response; + auto status = service.Truncate(nullptr, &request, &response); + + REQUIRE(!status.ok()); + REQUIRE(status.error_message() == "Synced column is required"); + } + + { + // synced_column is required + ::fivetran_sdk::TruncateRequest request; + (*request.mutable_configuration())["motherduck_token"] = token; + (*request.mutable_configuration())["motherduck_database"] = "fivetran_test"; + request.set_table_name("some_table"); + request.set_synced_column("_fivetran_synced"); + + ::fivetran_sdk::TruncateResponse response; + auto status = service.Truncate(nullptr, &request, &response); + + REQUIRE(!status.ok()); + REQUIRE(status.error_message() == "Deleted column is required"); + } } \ No newline at end of file