Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix truncate behavior #16

Merged
merged 9 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions includes/sql_generator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct table_def {
std::string schema_name;
std::string table_name;

std::string to_string() const;
std::string to_escaped_string() const;
};

bool schema_exists(duckdb::Connection &con, const std::string &db_name,
Expand Down Expand Up @@ -47,7 +47,10 @@ void update_values(duckdb::Connection &con, const table_def &table,
std::vector<const column_def *> &columns_regular,
const std::string &unmodified_string);

void truncate_table(duckdb::Connection &con, const table_def &table);
void truncate_table(duckdb::Connection &con, const table_def &table,
const std::string &synced_column,
std::chrono::nanoseconds &cutoff_ns,
const std::string &deleted_column);

void delete_rows(duckdb::Connection &con, const table_def &table,
const std::string &staging_table_name,
Expand Down
13 changes: 12 additions & 1 deletion src/motherduck_destination_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,22 @@ DestinationSdkImpl::Truncate(::grpc::ServerContext *context,
find_property(request->configuration(), MD_PROP_DATABASE);
table_def table_name{db_name, get_schema_name(request),
get_table_name(request)};
if (request->synced_column().empty()) {
throw std::invalid_argument("Synced column is required");
}
if (request->soft().deleted_column().empty()) {
// right now, only soft deletes are supported
throw std::invalid_argument("Deleted column is required");
}
std::unique_ptr<duckdb::Connection> con =
get_connection(request->configuration(), db_name);

if (table_exists(*con, table_name)) {
truncate_table(*con, table_name);
std::chrono::nanoseconds delete_before_ts =
std::chrono::seconds(request->utc_delete_before().seconds()) +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be converted to ns?

Suggested change
std::chrono::seconds(request->utc_delete_before().seconds()) +
std::chrono::seconds(request->utc_delete_before().seconds()) * 1e9 +

But also, doesn't std::chrono::nanoseconds(request->utc_delete_before().nanos()) already give you what you need? Does it just give you the fractional seconds part?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding seconds to nanoseconds gets seconds converted automatically (docs)

When two duration objects of different types are involved, the one with the longest period (as determined by common_type) is converted before the operation.

nanos() only contain the fractional part -- proto docs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit too magical for me, but I guess I'm not on the C++ design committee :-)
You got to love this radically opposite approach between C++ stdlib that makes it as nice and expressive as possible vs the proto which complicate things and wants to split seconds and ns!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was actually grateful that for once C++ did not make me invent every basic utility from first principles, but yeah, it's not super intuitive either.
And I have no idea what proto designers were thinking here. I guess it's space saving for usecases where only second granularity is needed? But who uses second granularity anymore -- it's not 1999.

std::chrono::nanoseconds(request->utc_delete_before().nanos());
truncate_table(*con, table_name, request->synced_column(),
delete_before_ts, request->soft().deleted_column());
} else {
mdlog::warning("Table <" + request->table_name() +
"> not found in schema <" + request->schema_name() +
Expand Down
39 changes: 27 additions & 12 deletions src/sql_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ using duckdb::KeywordHelper;

// Utility

std::string table_def::to_string() const {
std::string table_def::to_escaped_string() const {
std::ostringstream out;
out << KeywordHelper::WriteQuoted(db_name, '"') << "."
<< KeywordHelper::WriteQuoted(schema_name, '"') << "."
Expand Down Expand Up @@ -65,7 +65,7 @@ bool table_exists(duckdb::Connection &con, const table_def &table) {

if (result->HasError()) {
throw std::runtime_error("Could not find whether table <" +
table.to_string() +
table.to_escaped_string() +
"> exists: " + result->GetError());
}
auto materialized_result = duckdb::unique_ptr_cast<
Expand All @@ -83,7 +83,7 @@ void create_schema(duckdb::Connection &con, const std::string &db_name,
void create_table(duckdb::Connection &con, const table_def &table,
const std::vector<const column_def *> &columns_pk,
const std::vector<column_def> &all_columns) {
const std::string absolute_table_name = table.to_string();
const std::string absolute_table_name = table.to_escaped_string();
std::ostringstream ddl;
ddl << "CREATE OR REPLACE TABLE " << absolute_table_name << " (";

Expand Down Expand Up @@ -129,7 +129,8 @@ std::vector<column_def> describe_table(duckdb::Connection &con,
auto result = statement->Execute(params, false);

if (result->HasError()) {
throw std::runtime_error("Could not describe table <" + table.to_string() +
throw std::runtime_error("Could not describe table <" +
table.to_escaped_string() +
">:" + result->GetError());
}
auto materialized_result = duckdb::unique_ptr_cast<
Expand All @@ -148,7 +149,7 @@ std::vector<column_def> describe_table(duckdb::Connection &con,
void alter_table(duckdb::Connection &con, const table_def &table,
const std::vector<column_def> &columns) {

auto absolute_table_name = table.to_string();
auto absolute_table_name = table.to_escaped_string();
std::set<std::string> alter_types;
std::set<std::string> added_columns;
std::set<std::string> deleted_columns;
Expand Down Expand Up @@ -233,7 +234,7 @@ void upsert(duckdb::Connection &con, const table_def &table,
const std::string &staging_table_name,
std::vector<const column_def *> &columns_pk,
std::vector<const column_def *> &columns_regular) {
const std::string absolute_table_name = table.to_string();
const std::string absolute_table_name = table.to_escaped_string();
std::ostringstream sql;
sql << "INSERT INTO " << absolute_table_name << " SELECT * FROM "
<< staging_table_name;
Expand Down Expand Up @@ -264,7 +265,7 @@ void update_values(duckdb::Connection &con, const table_def &table,
const std::string &unmodified_string) {

std::ostringstream sql;
auto absolute_table_name = table.to_string();
auto absolute_table_name = table.to_escaped_string();

sql << "UPDATE " << absolute_table_name << " SET ";

Expand Down Expand Up @@ -299,7 +300,7 @@ void delete_rows(duckdb::Connection &con, const table_def &table,
const std::string &staging_table_name,
std::vector<const column_def *> &columns_pk) {

const std::string absolute_table_name = table.to_string();
const std::string absolute_table_name = table.to_escaped_string();
std::ostringstream sql;
sql << "DELETE FROM " + absolute_table_name << " USING " << staging_table_name
<< " WHERE ";
Expand All @@ -319,13 +320,27 @@ void delete_rows(duckdb::Connection &con, const table_def &table,
}
}

void truncate_table(duckdb::Connection &con, const table_def &table) {
const std::string absolute_table_name = table.to_string();
void truncate_table(duckdb::Connection &con, const table_def &table,
const std::string &synced_column,
std::chrono::nanoseconds &cutoff_ns,
const std::string &deleted_column) {
const std::string absolute_table_name = table.to_escaped_string();
std::ostringstream sql;
sql << "DELETE FROM " + absolute_table_name;

sql << "UPDATE " << absolute_table_name << " SET "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(sorry if I already asked that before...) we can't quote the table name?

Copy link
Contributor Author

@elefeint elefeint Feb 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table_def::to_string() quotes all parts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps I should rename it to to_quoted_string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to to_escaped_string

<< KeywordHelper::WriteQuoted(deleted_column, '"') << " = true WHERE "
<< KeywordHelper::WriteQuoted(synced_column, '"')
<< " < make_timestamp(?)";
auto query = sql.str();
mdlog::info("truncate_table: " + query);
auto result = con.Query(query);
auto statement = con.Prepare(query);

// DuckDB make_timestamp takes microseconds; Fivetran sends millisecond
// precision -- safe to divide with truncation
long cutoff_microseconds = cutoff_ns.count() / 1000;
duckdb::vector<duckdb::Value> params = {duckdb::Value(cutoff_microseconds)};

auto result = statement->Execute(params, false);
if (result->HasError()) {
throw std::runtime_error("Error truncating table <" + absolute_table_name +
">:" + result->GetError());
Expand Down
4 changes: 2 additions & 2 deletions test/files/books_update.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
id,title,magic_number,_fivetran_deleted,_fivetran_synced
3,"unmod-NcK9NIjPUutCsz4mjOQQztbnwnE1sY3",15,false,"2024-01-09T04:30:13.984276065Z"
2,"The empire strikes back","unmod-NcK9NIjPUutCsz4mjOQQztbnwnE1sY3",false,"2024-01-09T04:30:13.984276065Z"
3,"unmod-NcK9NIjPUutCsz4mjOQQztbnwnE1sY3",15,false,"2024-02-08T23:59:59.999999999Z"
2,"The empire strikes back","unmod-NcK9NIjPUutCsz4mjOQQztbnwnE1sY3",false,"2024-02-09T00:00:00.000000000Z"
148 changes: 123 additions & 25 deletions test/integration/test_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ TEST_CASE("WriteBatch", "[integration][current]") {
}

{
// check after update
auto res = con->Query("SELECT id, title, magic_number FROM " + table_name +
" ORDER BY id");
INFO(res->GetError());
Expand All @@ -454,11 +455,17 @@ TEST_CASE("WriteBatch", "[integration][current]") {
}

{
// truncate table
// truncate data before Jan 9 2024
::fivetran_sdk::TruncateRequest request;
(*request.mutable_configuration())["motherduck_token"] = token;
(*request.mutable_configuration())["motherduck_database"] = "fivetran_test";
request.set_table_name(table_name);
request.set_synced_column("_fivetran_synced");
request.mutable_soft()->set_deleted_column("_fivetran_deleted");

const auto cutoff_datetime = 1707436800; // 2024-02-09 0:0:0 GMT, trust me
request.mutable_utc_delete_before()->set_seconds(cutoff_datetime);
request.mutable_utc_delete_before()->set_nanos(0);
::fivetran_sdk::TruncateResponse response;
auto status = service.Truncate(nullptr, &request, &response);

Expand All @@ -468,39 +475,61 @@ TEST_CASE("WriteBatch", "[integration][current]") {

{
// check truncated table
auto res = con->Query("SELECT id, title, magic_number FROM " + table_name +
" ORDER BY id");
auto res = con->Query("SELECT title, id, magic_number FROM " + table_name +
" WHERE _fivetran_deleted = false ORDER BY id");
INFO(res->GetError());
REQUIRE(!res->HasError());
REQUIRE(res->RowCount() == 0);
// the 1st row from books_update.csv that had 2024-02-08T23:59:59.999999999Z
// timestamp got deleted
REQUIRE(res->RowCount() == 1);
REQUIRE(res->GetValue(0, 0) == "The empire strikes back");
REQUIRE(res->GetValue(1, 0) == 2);
REQUIRE(res->GetValue(2, 0) == 1);
}
}

TEST_CASE("Truncate nonexistent table should succeed", "[integration]") {
DestinationSdkImpl service;
{
// check the rows did not get physically deleted
auto res = con->Query("SELECT title, id, magic_number FROM " + table_name +
" ORDER BY id");
INFO(res->GetError());
REQUIRE(!res->HasError());
REQUIRE(res->RowCount() == 2);
}

const std::string bad_table_name = "nonexistent";
{
// truncate table does nothing if there is no utc_delete_before field set
::fivetran_sdk::TruncateRequest request;
(*request.mutable_configuration())["motherduck_token"] = token;
(*request.mutable_configuration())["motherduck_database"] = "fivetran_test";
request.set_table_name(table_name);
request.set_synced_column("_fivetran_synced");
request.mutable_soft()->set_deleted_column("_fivetran_deleted");

auto token = std::getenv("motherduck_token");
REQUIRE(token);
::fivetran_sdk::TruncateResponse response;
auto status = service.Truncate(nullptr, &request, &response);

::fivetran_sdk::TruncateRequest request;
(*request.mutable_configuration())["motherduck_token"] = token;
(*request.mutable_configuration())["motherduck_database"] = "fivetran_test";
request.set_schema_name("some_schema");
request.set_table_name(bad_table_name);
::fivetran_sdk::TruncateResponse response;
INFO(status.error_message());
REQUIRE(status.ok());
}

std::stringstream buffer;
std::streambuf *real_cout = std::cout.rdbuf(buffer.rdbuf());
auto status = service.Truncate(nullptr, &request, &response);
std::cout.rdbuf(real_cout);
{
// check truncated table is the same as before
auto res = con->Query("SELECT title FROM " + table_name +
" WHERE _fivetran_deleted = false ORDER BY id");
INFO(res->GetError());
REQUIRE(!res->HasError());
REQUIRE(res->RowCount() == 1);
REQUIRE(res->GetValue(0, 0) == "The empire strikes back");
}

INFO(status.error_message());
REQUIRE(status.ok());
REQUIRE_THAT(buffer.str(), Catch::Matchers::ContainsSubstring(
"Table <nonexistent> not found in schema "
"<some_schema>; not truncated"));
{
// check again that the rows did not get physically deleted
auto res = con->Query("SELECT title, id, magic_number FROM " + table_name +
" ORDER BY id");
INFO(res->GetError());
REQUIRE(!res->HasError());
REQUIRE(res->RowCount() == 2);
}
}

TEST_CASE("CreateTable with multiple primary keys", "[integration]") {
Expand Down Expand Up @@ -549,4 +578,73 @@ TEST_CASE("CreateTable with multiple primary keys", "[integration]") {
REQUIRE(response.table().columns().size() == 2);
}
}
}

TEST_CASE("Truncate nonexistent table should succeed", "[integration]") {
DestinationSdkImpl service;

const std::string bad_table_name = "nonexistent";

auto token = std::getenv("motherduck_token");
REQUIRE(token);

::fivetran_sdk::TruncateRequest request;
(*request.mutable_configuration())["motherduck_token"] = token;
(*request.mutable_configuration())["motherduck_database"] = "fivetran_test";
request.set_schema_name("some_schema");
request.set_table_name(bad_table_name);
request.set_synced_column("_fivetran_synced");
request.mutable_soft()->set_deleted_column("_fivetran_deleted");

::fivetran_sdk::TruncateResponse response;

std::stringstream buffer;
std::streambuf *real_cout = std::cout.rdbuf(buffer.rdbuf());
auto status = service.Truncate(nullptr, &request, &response);
std::cout.rdbuf(real_cout);

INFO(status.error_message());
REQUIRE(status.ok());
REQUIRE_THAT(buffer.str(), Catch::Matchers::ContainsSubstring(
"Table <nonexistent> not found in schema "
"<some_schema>; not truncated"));
}

TEST_CASE("Truncate fails if required properties are missing") {

DestinationSdkImpl service;

const std::string bad_table_name = "nonexistent";

auto token = std::getenv("motherduck_token");
REQUIRE(token);

{
// synced_column is required
::fivetran_sdk::TruncateRequest request;
(*request.mutable_configuration())["motherduck_token"] = token;
(*request.mutable_configuration())["motherduck_database"] = "fivetran_test";
request.set_table_name("some_table");

::fivetran_sdk::TruncateResponse response;
auto status = service.Truncate(nullptr, &request, &response);

REQUIRE(!status.ok());
REQUIRE(status.error_message() == "Synced column is required");
}

{
// synced_column is required
::fivetran_sdk::TruncateRequest request;
(*request.mutable_configuration())["motherduck_token"] = token;
(*request.mutable_configuration())["motherduck_database"] = "fivetran_test";
request.set_table_name("some_table");
request.set_synced_column("_fivetran_synced");

::fivetran_sdk::TruncateResponse response;
auto status = service.Truncate(nullptr, &request, &response);

REQUIRE(!status.ok());
REQUIRE(status.error_message() == "Deleted column is required");
}
}
Loading