Skip to content

Commit

Permalink
remove excessive logging
Browse files Browse the repository at this point in the history
  • Loading branch information
elefeint committed Mar 18, 2024
1 parent 1bd3612 commit b781fc7
Showing 1 changed file with 3 additions and 37 deletions.
40 changes: 3 additions & 37 deletions src/motherduck_destination_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,12 @@ grpc::Status DestinationSdkImpl::DescribeTable(
mdlog::info("Endpoint <DescribeTable>: started");
std::string db_name =
find_property(request->configuration(), MD_PROP_DATABASE);
mdlog::info("Endpoint <DescribeTable>: found database name <" + db_name +
">");
std::unique_ptr<duckdb::Connection> con =
get_connection(request->configuration(), db_name);
mdlog::info("Endpoint <DescribeTable>: got database connection");
table_def table_name{db_name, get_schema_name(request),
get_table_name(request)};

if (!table_exists(*con, table_name)) {
mdlog::info("Endpoint <DescribeTable>: table does not exist; returning "
"not found");
response->set_not_found(true);
return ::grpc::Status(::grpc::StatusCode::OK, "");
}
Expand All @@ -188,16 +183,10 @@ grpc::Status DestinationSdkImpl::DescribeTable(
fivetran_sdk::Table *table = response->mutable_table();
table->set_name(get_table_name(request));

mdlog::info("Endpoint <DescribeTable>: before enumerating columns");
for (auto &col : duckdb_columns) {
fivetran_sdk::Column *ft_col = table->mutable_columns()->Add();
mdlog::info("Endpoint <DescribeTable>: column <" + col.name +
">; duckdb type <" + LogicalTypeIdToString(col.type) + ">");
ft_col->set_name(col.name);
auto fivetran_type = get_fivetran_type(col.type);
mdlog::info("Endpoint <DescribeTable>: column <" + col.name +
">; fivetran type <" + DataType_Name(fivetran_type) + ">");
ft_col->set_type(fivetran_type);
ft_col->set_type(get_fivetran_type(col.type));
ft_col->set_primary_key(col.primary_key);
}

Expand Down Expand Up @@ -282,27 +271,19 @@ DestinationSdkImpl::Truncate(::grpc::ServerContext *context,
mdlog::info("Endpoint <Truncate>: started");
std::string db_name =
find_property(request->configuration(), MD_PROP_DATABASE);
mdlog::info("Endpoint <Truncate>: found database name <" + db_name + ">");
table_def table_name{db_name, get_schema_name(request),
get_table_name(request)};
if (request->synced_column().empty()) {
throw std::invalid_argument("Synced column is required");
}

mdlog::info("Endpoint <Truncate>: found synced column <" +
request->synced_column() + ">");
std::unique_ptr<duckdb::Connection> con =
get_connection(request->configuration(), db_name);

mdlog::info("Endpoint <Truncate>: got database connection");
if (table_exists(*con, table_name)) {
mdlog::info("Endpoint <Truncate>: schema <" + table_name.schema_name +
">, table <" + table_name.table_name + "> exists");
std::chrono::nanoseconds delete_before_ts =
std::chrono::seconds(request->utc_delete_before().seconds()) +
std::chrono::nanoseconds(request->utc_delete_before().nanos());
mdlog::info("Endpoint <Truncate>: delete_before_ts = <" +
std::to_string(delete_before_ts.count()) + ">");
truncate_table(*con, table_name, request->synced_column(),
delete_before_ts, request->soft().deleted_column());
} else {
Expand Down Expand Up @@ -333,18 +314,15 @@ DestinationSdkImpl::WriteBatch(::grpc::ServerContext *context,

const std::string db_name =
find_property(request->configuration(), MD_PROP_DATABASE);
mdlog::info("Endpoint <WriteBatch>: found database name <" + db_name + ">");
table_def table_name{db_name, get_schema_name(request),
request->table().name()};
std::unique_ptr<duckdb::Connection> con =
get_connection(request->configuration(), db_name);
mdlog::info("Endpoint <WriteBatch>: got database connection");

// Use local memory by default to prevent Arrow-based VIEW from traveling
// up to the cloud
con->Query("ATTACH ':memory:' as localmem");
con->Query("USE localmem");
mdlog::info("Endpoint <WriteBatch>: attached and used local memory db");

const auto cols = get_duckdb_columns(request->table().columns());
std::vector<const column_def *> columns_pk;
Expand All @@ -354,8 +332,6 @@ DestinationSdkImpl::WriteBatch(::grpc::ServerContext *context,
if (columns_pk.empty()) {
throw std::invalid_argument("No primary keys found");
}
mdlog::info("Endpoint <WriteBatch>: got " +
std::to_string(columns_pk.size()) + " primary keys");

// update file fields have to be read in as strings to allow
// "unmodified_string"/"null_string". Replace (upsert) files have to be read
Expand All @@ -365,46 +341,36 @@ DestinationSdkImpl::WriteBatch(::grpc::ServerContext *context,
[](const column_def &col) { return col.name; });

for (auto &filename : request->replace_files()) {
mdlog::info("Endpoint <WriteBatch>: processing replace file " + filename);
const auto decryption_key = get_encryption_key(
filename, request->keys(), request->csv().encryption());

mdlog::info("Endpoint <WriteBatch>: got replace file decryption key");
process_file(
*con, filename, decryption_key, column_names,
request->csv().null_string(), [&](const std::string &view_name) {
upsert(*con, table_name, view_name, columns_pk, columns_regular);
});
mdlog::info("Endpoint <WriteBatch>: finished processing replace file " +
filename);
}
for (auto &filename : request->update_files()) {
mdlog::info("Endpoint <WriteBatch>: processing update file " + filename);

auto decryption_key = get_encryption_key(filename, request->keys(),
request->csv().encryption());
mdlog::info("Endpoint <WriteBatch>: got update file decryption key");

process_file(
*con, filename, decryption_key, column_names,
request->csv().null_string(), [&](const std::string &view_name) {
update_values(*con, table_name, view_name, columns_pk,
columns_regular, request->csv().unmodified_string());
});
mdlog::info("Endpoint <WriteBatch>: finished processing update file " +
filename);
}
for (auto &filename : request->delete_files()) {
mdlog::info("Endpoint <WriteBatch>: processing delete file " + filename);
auto decryption_key = get_encryption_key(filename, request->keys(),
request->csv().encryption());
mdlog::info("Endpoint <WriteBatch>: got delete file decryption key");
std::vector<std::string> empty;
process_file(*con, filename, decryption_key, empty,
request->csv().null_string(),
[&](const std::string &view_name) {
delete_rows(*con, table_name, view_name, columns_pk);
});
mdlog::info("Endpoint <WriteBatch>: finished processing delete file " +
filename);
}

} catch (const std::exception &e) {
Expand Down

0 comments on commit b781fc7

Please sign in to comment.