diff --git a/Makefile b/Makefile index 83b42721..9c66b455 100644 --- a/Makefile +++ b/Makefile @@ -78,7 +78,7 @@ test_debug: debug format: cp duckdb/.clang-format . - find src/ -iname *.hpp -o -iname *.cpp | xargs clang-format --sort-includes=0 -style=file -i + find src/ -iname "*.hpp" -o -iname "*.cpp" | xargs clang-format --sort-includes=0 -style=file -i cmake-format -i CMakeLists.txt rm .clang-format diff --git a/src/postgres_attach.cpp b/src/postgres_attach.cpp index 2f7043f9..4bf10c41 100644 --- a/src/postgres_attach.cpp +++ b/src/postgres_attach.cpp @@ -50,7 +50,7 @@ static void AttachFunction(ClientContext &context, TableFunctionInput &data_p, D auto conn = PostgresConnection::Open(data.dsn); auto dconn = Connection(context.db->GetDatabase(context)); auto fetch_table_query = StringUtil::Format( - R"( + R"( SELECT relname FROM pg_class JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid JOIN pg_attribute ON pg_class.oid = pg_attribute.attrelid @@ -58,7 +58,7 @@ WHERE relkind = 'r' AND attnum > 0 AND nspname = %s GROUP BY relname ORDER BY relname; )", - KeywordHelper::WriteQuoted(data.source_schema)); + KeywordHelper::WriteQuoted(data.source_schema)); auto res = conn.Query(fetch_table_query); for (idx_t row = 0; row < PQntuples(res->res); row++) { auto table_name = res->GetString(row, 0); @@ -93,7 +93,7 @@ ORDER BY relname; } PostgresAttachFunction::PostgresAttachFunction() - : TableFunction("postgres_attach", {LogicalType::VARCHAR}, AttachFunction, AttachBind) { + : TableFunction("postgres_attach", {LogicalType::VARCHAR}, AttachFunction, AttachBind) { named_parameters["overwrite"] = LogicalType::BOOLEAN; named_parameters["filter_pushdown"] = LogicalType::BOOLEAN; @@ -102,4 +102,4 @@ PostgresAttachFunction::PostgresAttachFunction() named_parameters["suffix"] = LogicalType::VARCHAR; } -} +} // namespace duckdb diff --git a/src/postgres_connection.cpp b/src/postgres_connection.cpp index 60925430..7bf02d7c 100644 --- a/src/postgres_connection.cpp +++ b/src/postgres_connection.cpp @@ -38,7 +38,7 @@ static bool ResultHasError(PGresult *result) { if (!result) { return true; } - switch(PQresultStatus(result)) { + switch (PQresultStatus(result)) { case PGRES_COMMAND_OK: case PGRES_TUPLES_OK: return false; @@ -54,12 +54,12 @@ PGresult *PostgresConnection::PQExecute(const string &query) { return PQexec(GetConn(), query.c_str()); } - unique_ptr PostgresConnection::TryQuery(const string &query, optional_ptr error_message) { auto result = PQExecute(query.c_str()); if (ResultHasError(result)) { if (error_message) { - *error_message = StringUtil::Format("Failed to execute query \"" + query + "\": " + string(PQresultErrorMessage(result))); + *error_message = StringUtil::Format("Failed to execute query \"" + query + + "\": " + string(PQresultErrorMessage(result))); } return nullptr; } @@ -88,13 +88,14 @@ vector> PostgresConnection::ExecuteQueries(const stri throw std::runtime_error("Failed to execute query \"" + queries + "\": " + string(PQerrorMessage(GetConn()))); } vector> results; - while(true) { + while (true) { auto res = PQgetResult(GetConn()); if (!res) { break; } if (ResultHasError(res)) { - throw std::runtime_error("Failed to execute query \"" + queries + "\": " + string(PQresultErrorMessage(res))); + throw std::runtime_error("Failed to execute query \"" + queries + + "\": " + string(PQresultErrorMessage(res))); } if (PQresultStatus(res) != PGRES_TUPLES_OK) { continue; @@ -106,7 +107,8 @@ vector> PostgresConnection::ExecuteQueries(const stri } PostgresVersion PostgresConnection::GetPostgresVersion() { - auto result = Query("SELECT CURRENT_SETTING('server_version'), (SELECT COUNT(*) FROM pg_settings WHERE name LIKE 'rds%')"); + auto result = + Query("SELECT CURRENT_SETTING('server_version'), (SELECT COUNT(*) FROM pg_settings WHERE name LIKE 'rds%')"); auto version = PostgresUtils::ExtractPostgresVersion(result->GetString(0, 0)); if (result->GetInt64(0, 1) > 0) { version.type_v = PostgresInstanceType::AURORA; diff --git a/src/postgres_copy_from.cpp b/src/postgres_copy_from.cpp index 5171111b..9a5a8328 100644 --- a/src/postgres_copy_from.cpp +++ b/src/postgres_copy_from.cpp @@ -12,4 +12,4 @@ void PostgresConnection::BeginCopyFrom(PostgresBinaryReader &reader, const strin reader.CheckHeader(); } -} +} // namespace duckdb diff --git a/src/postgres_copy_to.cpp b/src/postgres_copy_to.cpp index 0385bb71..ff96a00a 100644 --- a/src/postgres_copy_to.cpp +++ b/src/postgres_copy_to.cpp @@ -5,7 +5,9 @@ namespace duckdb { -void PostgresConnection::BeginCopyTo(ClientContext &context, PostgresCopyState &state, PostgresCopyFormat format, const string &schema_name, const string &table_name, const vector &column_names) { +void PostgresConnection::BeginCopyTo(ClientContext &context, PostgresCopyState &state, PostgresCopyFormat format, + const string &schema_name, const string &table_name, + const vector &column_names) { string query = "COPY "; if (!schema_name.empty()) { query += KeywordHelper::WriteQuoted(schema_name, '"') + "."; @@ -13,7 +15,7 @@ void PostgresConnection::BeginCopyTo(ClientContext &context, PostgresCopyState & query += KeywordHelper::WriteQuoted(table_name, '"') + " "; if (!column_names.empty()) { query += "("; - for(idx_t c = 0; c < column_names.size(); c++) { + for (idx_t c = 0; c < column_names.size(); c++) { if (c > 0) { query += ", "; } @@ -23,7 +25,7 @@ void PostgresConnection::BeginCopyTo(ClientContext &context, PostgresCopyState & } query += "FROM STDIN (FORMAT "; state.format = format; - switch(state.format) { + switch (state.format) { case PostgresCopyFormat::BINARY: query += "BINARY"; break; @@ -50,8 +52,8 @@ void PostgresConnection::BeginCopyTo(ClientContext &context, PostgresCopyState & void PostgresConnection::CopyData(data_ptr_t buffer, idx_t size) { int result; do { - result = PQputCopyData(GetConn(), (const char *) buffer, int(size)); - } while(result == 0); + result = PQputCopyData(GetConn(), (const char *)buffer, int(size)); + } while (result == 0); if (result == -1) { throw InternalException("Error during PQputCopyData: %s", PQerrorMessage(GetConn())); } @@ -103,7 +105,7 @@ void CastListToPostgresArray(ClientContext &context, Vector &input, Vector &varc auto child_entries = FlatVector::GetData(child_varchar); auto list_entries = FlatVector::GetData(input); auto result_entries = FlatVector::GetData(varchar_vector); - for(idx_t r = 0; r < size; r++) { + for (idx_t r = 0; r < size; r++) { if (FlatVector::IsNull(input, r)) { FlatVector::SetNull(varchar_vector, r, true); continue; @@ -111,7 +113,7 @@ void CastListToPostgresArray(ClientContext &context, Vector &input, Vector &varc auto list_entry = list_entries[r]; string result; result = "{"; - for(idx_t list_idx = 0; list_idx < list_entry.length; list_idx++) { + for (idx_t list_idx = 0; list_idx < list_entry.length; list_idx++) { if (list_idx > 0) { result += ","; } @@ -134,7 +136,7 @@ void CastListToPostgresArray(ClientContext &context, Vector &input, Vector &varc } bool TypeRequiresQuotes(const LogicalType &input) { - switch(input.id()) { + switch (input.id()) { case LogicalTypeId::STRUCT: case LogicalTypeId::LIST: return true; @@ -148,7 +150,7 @@ void CastStructToPostgres(ClientContext &context, Vector &input, Vector &varchar // cast child data of structs vector child_varchar_vectors; vector child_requires_quotes; - for(idx_t c = 0; c < child_vectors.size(); c++) { + for (idx_t c = 0; c < child_vectors.size(); c++) { Vector child_varchar(LogicalType::VARCHAR, size); CastToPostgresVarchar(context, *child_vectors[c], child_varchar, size, depth + 1); child_varchar_vectors.push_back(std::move(child_varchar)); @@ -157,14 +159,14 @@ void CastStructToPostgres(ClientContext &context, Vector &input, Vector &varchar // construct the struct entries auto result_entries = FlatVector::GetData(varchar_vector); - for(idx_t r = 0; r < size; r++) { + for (idx_t r = 0; r < size; r++) { if (FlatVector::IsNull(input, r)) { FlatVector::SetNull(varchar_vector, r, true); continue; } string result; result = "("; - for(idx_t c = 0; c < child_varchar_vectors.size(); c++) { + for (idx_t c = 0; c < child_varchar_vectors.size(); c++) { if (c > 0) { result += ","; } @@ -189,7 +191,7 @@ void CastStructToPostgres(ClientContext &context, Vector &input, Vector &varchar void CastBlobToPostgres(ClientContext &context, Vector &input, Vector &result, idx_t size) { auto input_data = FlatVector::GetData(input); auto result_data = FlatVector::GetData(result); - for(idx_t r = 0; r < size; r++) { + for (idx_t r = 0; r < size; r++) { if (FlatVector::IsNull(input, r)) { FlatVector::SetNull(result, r, true); continue; @@ -198,7 +200,7 @@ void CastBlobToPostgres(ClientContext &context, Vector &input, Vector &result, i string blob_str = "\\\\x"; auto blob_data = const_data_ptr_cast(input_data[r].GetData()); auto blob_size = input_data[r].GetSize(); - for(idx_t c = 0; c < blob_size; c++) { + for (idx_t c = 0; c < blob_size; c++) { blob_str += HEX_STRING[blob_data[c] / 16]; blob_str += HEX_STRING[blob_data[c] % 16]; } @@ -223,7 +225,8 @@ void CastToPostgresVarchar(ClientContext &context, Vector &input, Vector &result } } -void PostgresConnection::CopyChunk(ClientContext &context, PostgresCopyState &state, DataChunk &chunk, DataChunk &varchar_chunk) { +void PostgresConnection::CopyChunk(ClientContext &context, PostgresCopyState &state, DataChunk &chunk, + DataChunk &varchar_chunk) { chunk.Flatten(); if (state.format == PostgresCopyFormat::BINARY) { @@ -242,14 +245,14 @@ void PostgresConnection::CopyChunk(ClientContext &context, PostgresCopyState &st if (varchar_chunk.ColumnCount() == 0) { // not initialized yet vector varchar_types; - for(idx_t c = 0; c < chunk.ColumnCount(); c++) { + for (idx_t c = 0; c < chunk.ColumnCount(); c++) { varchar_types.push_back(LogicalType::VARCHAR); } varchar_chunk.Initialize(Allocator::DefaultAllocator(), varchar_types); } D_ASSERT(chunk.ColumnCount() == varchar_chunk.ColumnCount()); // for text format cast to varchar first - for(idx_t c = 0; c < chunk.ColumnCount(); c++) { + for (idx_t c = 0; c < chunk.ColumnCount(); c++) { CastToPostgresVarchar(context, chunk.data[c], varchar_chunk.data[c], chunk.size()); } varchar_chunk.SetCardinality(chunk.size()); @@ -270,5 +273,4 @@ void PostgresConnection::CopyChunk(ClientContext &context, PostgresCopyState &st } } - -} +} // namespace duckdb diff --git a/src/postgres_extension.cpp b/src/postgres_extension.cpp index ea632b86..ae4ffe6a 100644 --- a/src/postgres_extension.cpp +++ b/src/postgres_extension.cpp @@ -1,7 +1,6 @@ #define DUCKDB_BUILD_LOADABLE_EXTENSION #include "duckdb.hpp" - #include "postgres_scanner.hpp" #include "postgres_storage.hpp" #include "postgres_scanner_extension.hpp" @@ -20,7 +19,7 @@ static void SetPostgresConnectionLimit(ClientContext &context, SetScope scope, V throw InvalidInputException("pg_connection_limit can only be set globally"); } auto databases = DatabaseManager::Get(context).GetDatabases(context); - for(auto &db_ref : databases) { + for (auto &db_ref : databases) { auto &db = db_ref.get(); auto &catalog = db.GetCatalog(); if (catalog.GetCatalogType() != "postgres") { @@ -55,17 +54,25 @@ static void LoadInternal(DatabaseInstance &db) { auto &config = DBConfig::GetConfig(db); config.storage_extensions["postgres_scanner"] = make_uniq(); - config.AddExtensionOption("pg_use_binary_copy", "Whether or not to use BINARY copy to read data", LogicalType::BOOLEAN, Value::BOOLEAN(true)); - config.AddExtensionOption("pg_pages_per_task", "The amount of pages per task", LogicalType::UBIGINT, Value::UBIGINT(PostgresBindData::DEFAULT_PAGES_PER_TASK)); - config.AddExtensionOption("pg_connection_limit", "The maximum amount of concurrent Postgres connections", LogicalType::UBIGINT, Value::UBIGINT(PostgresConnectionPool::DEFAULT_MAX_CONNECTIONS), SetPostgresConnectionLimit); - config.AddExtensionOption("pg_array_as_varchar", "Read Postgres arrays as varchar - enables reading mixed dimensional arrays", LogicalType::BOOLEAN, Value::BOOLEAN(false)); - config.AddExtensionOption("pg_experimental_filter_pushdown", "Whether or not to use filter pushdown (currently experimental)", LogicalType::BOOLEAN, Value::BOOLEAN(false)); - config.AddExtensionOption("pg_debug_show_queries", "DEBUG SETTING: print all queries sent to Postgres to stdout", LogicalType::BOOLEAN, Value::BOOLEAN(false), SetPostgresDebugQueryPrint); - + config.AddExtensionOption("pg_use_binary_copy", "Whether or not to use BINARY copy to read data", + LogicalType::BOOLEAN, Value::BOOLEAN(true)); + config.AddExtensionOption("pg_pages_per_task", "The amount of pages per task", LogicalType::UBIGINT, + Value::UBIGINT(PostgresBindData::DEFAULT_PAGES_PER_TASK)); + config.AddExtensionOption("pg_connection_limit", "The maximum amount of concurrent Postgres connections", + LogicalType::UBIGINT, Value::UBIGINT(PostgresConnectionPool::DEFAULT_MAX_CONNECTIONS), + SetPostgresConnectionLimit); + config.AddExtensionOption("pg_array_as_varchar", + "Read Postgres arrays as varchar - enables reading mixed dimensional arrays", + LogicalType::BOOLEAN, Value::BOOLEAN(false)); + config.AddExtensionOption("pg_experimental_filter_pushdown", + "Whether or not to use filter pushdown (currently experimental)", LogicalType::BOOLEAN, + Value::BOOLEAN(false)); + config.AddExtensionOption("pg_debug_show_queries", "DEBUG SETTING: print all queries sent to Postgres to stdout", + LogicalType::BOOLEAN, Value::BOOLEAN(false), SetPostgresDebugQueryPrint); } void PostgresScannerExtension::Load(DuckDB &db) { - LoadInternal(*db.instance); + LoadInternal(*db.instance); } extern "C" { @@ -81,5 +88,4 @@ DUCKDB_EXTENSION_API const char *postgres_scanner_version() { DUCKDB_EXTENSION_API void postgres_scanner_storage_init(DBConfig &config) { config.storage_extensions["postgres_scanner"] = make_uniq(); } - } diff --git a/src/postgres_filter_pushdown.cpp b/src/postgres_filter_pushdown.cpp index 01a034cf..96cedafd 100644 --- a/src/postgres_filter_pushdown.cpp +++ b/src/postgres_filter_pushdown.cpp @@ -3,7 +3,8 @@ namespace duckdb { -string PostgresFilterPushdown::CreateExpression(string &column_name, vector> &filters, string op) { +string PostgresFilterPushdown::CreateExpression(string &column_name, vector> &filters, + string op) { vector filter_entries; for (auto &filter : filters) { filter_entries.push_back(TransformFilter(column_name, *filter)); @@ -55,7 +56,8 @@ string PostgresFilterPushdown::TransformFilter(string &column_name, TableFilter } } -string PostgresFilterPushdown::TransformFilters(const vector &column_ids, optional_ptr filters, const vector &names) { +string PostgresFilterPushdown::TransformFilters(const vector &column_ids, + optional_ptr filters, const vector &names) { if (!filters || filters->filters.empty()) { // no filters return string(); @@ -72,4 +74,4 @@ string PostgresFilterPushdown::TransformFilters(const vector &column_i return result; } -} +} // namespace duckdb diff --git a/src/postgres_query.cpp b/src/postgres_query.cpp index 06d0d89e..5c4b624d 100644 --- a/src/postgres_query.cpp +++ b/src/postgres_query.cpp @@ -10,7 +10,7 @@ namespace duckdb { static unique_ptr PGQueryBind(ClientContext &context, TableFunctionBindInput &input, - vector &return_types, vector &names) { + vector &return_types, vector &names) { auto result = make_uniq(); // look up the database to query @@ -48,9 +48,11 @@ static unique_ptr PGQueryBind(ClientContext &context, TableFunctio } auto nfields = PQnfields(describe_prepared); if (nfields <= 0) { - throw BinderException("No fields returned by query \"%s\" - the query must be a SELECT statement that returns at least one column", sql); + throw BinderException("No fields returned by query \"%s\" - the query must be a SELECT statement that returns " + "at least one column", + sql); } - for(idx_t c = 0; c < nfields; c++) { + for (idx_t c = 0; c < nfields; c++) { PostgresType postgres_type; postgres_type.oid = PQftype(describe_prepared, c); PostgresTypeData type_data; @@ -75,12 +77,11 @@ static unique_ptr PGQueryBind(ClientContext &context, TableFunctio } PostgresQueryFunction::PostgresQueryFunction() - : TableFunction("postgres_query", {LogicalType::VARCHAR, LogicalType::VARCHAR}, - nullptr, PGQueryBind) { + : TableFunction("postgres_query", {LogicalType::VARCHAR, LogicalType::VARCHAR}, nullptr, PGQueryBind) { PostgresScanFunction scan_function; init_global = scan_function.init_global; init_local = scan_function.init_local; function = scan_function.function; projection_pushdown = true; } -} +} // namespace duckdb diff --git a/src/postgres_scanner.cpp b/src/postgres_scanner.cpp index bc9ed0f9..dc79c450 100644 --- a/src/postgres_scanner.cpp +++ b/src/postgres_scanner.cpp @@ -30,12 +30,12 @@ struct PostgresLocalState : public LocalTableFunctionState { idx_t batch_idx = 0; PostgresPoolConnection pool_connection; - void ScanChunk(ClientContext &context, const PostgresBindData &bind_data, PostgresGlobalState &gstate, DataChunk &output); + void ScanChunk(ClientContext &context, const PostgresBindData &bind_data, PostgresGlobalState &gstate, + DataChunk &output); }; struct PostgresGlobalState : public GlobalTableFunctionState { - explicit PostgresGlobalState(idx_t max_threads) - : page_idx(0), batch_idx(0), max_threads(max_threads) { + explicit PostgresGlobalState(idx_t max_threads) : page_idx(0), batch_idx(0), max_threads(max_threads) { } mutex lock; @@ -75,7 +75,8 @@ static void PostgresGetSnapshot(PostgresVersion version, ClientContext &context, return; } - result = con.TryQuery("SELECT pg_is_in_recovery(), pg_export_snapshot(), (select count(*) from pg_stat_wal_receiver)"); + result = + con.TryQuery("SELECT pg_is_in_recovery(), pg_export_snapshot(), (select count(*) from pg_stat_wal_receiver)"); if (result) { auto in_recovery = result->GetBool(0, 0) || result->GetInt64(0, 2) > 0; bind_data.snapshot = ""; @@ -86,7 +87,8 @@ static void PostgresGetSnapshot(PostgresVersion version, ClientContext &context, } } -void PostgresScanFunction::PrepareBind(PostgresVersion version, ClientContext &context, PostgresBindData &bind_data, idx_t approx_num_pages) { +void PostgresScanFunction::PrepareBind(PostgresVersion version, ClientContext &context, PostgresBindData &bind_data, + idx_t approx_num_pages) { Value pages_per_task; if (context.TryGetCurrentSetting("pg_pages_per_task", pages_per_task)) { bind_data.pages_per_task = UBigIntValue::Get(pages_per_task); @@ -140,7 +142,7 @@ static unique_ptr PostgresBind(ClientContext &context, TableFuncti auto info = PostgresTableSet::GetTableInfo(con, bind_data->schema_name, bind_data->table_name); bind_data->postgres_types = info->postgres_types; - for(auto &col : info->create_info->columns.Logical()) { + for (auto &col : info->create_info->columns.Logical()) { names.push_back(col.GetName()); return_types.push_back(col.GetType()); } @@ -159,7 +161,7 @@ static void PostgresInitInternal(ClientContext &context, const PostgresBindData auto bind_data = (const PostgresBindData *)bind_data_p; string col_names; - for(auto &column_id : lstate.column_ids) { + for (auto &column_id : lstate.column_ids) { if (!col_names.empty()) { col_names += ", "; } @@ -183,7 +185,8 @@ static void PostgresInitInternal(ClientContext &context, const PostgresBindData } } - string filter_string = PostgresFilterPushdown::TransformFilters(lstate.column_ids, lstate.filters, bind_data->names); + string filter_string = + PostgresFilterPushdown::TransformFilters(lstate.column_ids, lstate.filters, bind_data->names); string filter; if (bind_data->pages_approx > 0) { @@ -200,18 +203,18 @@ static void PostgresInitInternal(ClientContext &context, const PostgresBindData if (bind_data->table_name.empty()) { D_ASSERT(!bind_data->sql.empty()); lstate.sql = StringUtil::Format( - R"( + R"( COPY (SELECT %s FROM (%s) AS __unnamed_subquery %s) TO STDOUT (FORMAT binary); )", - col_names, bind_data->sql, filter); + col_names, bind_data->sql, filter); } else { lstate.sql = StringUtil::Format( - R"( + R"( COPY (SELECT %s FROM %s.%s %s) TO STDOUT (FORMAT binary); )", - col_names, KeywordHelper::WriteQuoted(bind_data->schema_name, '"'), KeywordHelper::WriteQuoted(bind_data->table_name, '"'), filter); - + col_names, KeywordHelper::WriteQuoted(bind_data->schema_name, '"'), + KeywordHelper::WriteQuoted(bind_data->table_name, '"'), filter); } lstate.exec = false; lstate.done = false; @@ -226,7 +229,8 @@ static idx_t PostgresMaxThreads(ClientContext &context, const FunctionData *bind return bind_data.max_threads; } -static unique_ptr GetLocalState(ClientContext &context, TableFunctionInitInput &input, PostgresGlobalState &gstate); +static unique_ptr GetLocalState(ClientContext &context, TableFunctionInitInput &input, + PostgresGlobalState &gstate); static unique_ptr PostgresInitGlobalState(ClientContext &context, TableFunctionInitInput &input) { @@ -235,7 +239,7 @@ static unique_ptr PostgresInitGlobalState(ClientContex if (bind_data.requires_materialization) { // if requires_materialization is enabled we scan and materialize the table in its entirety up-front vector types; - for(auto column_id : input.column_ids) { + for (auto column_id : input.column_ids) { types.push_back(column_id == COLUMN_IDENTIFIER_ROW_ID ? LogicalType::BIGINT : bind_data.types[column_id]); } auto materialized = make_uniq(Allocator::Get(context), types); @@ -246,7 +250,7 @@ static unique_ptr PostgresInitGlobalState(ClientContex auto &lstate = local_state->Cast(); ColumnDataAppendState append_state; materialized->InitializeAppend(append_state); - while(true) { + while (true) { scan_chunk.Reset(); lstate.ScanChunk(context, bind_data, *result, scan_chunk); if (scan_chunk.size() == 0) { @@ -289,7 +293,8 @@ static void PostgresScanConnect(PostgresConnection &conn, string snapshot) { } } -bool PostgresBindData::TryOpenNewConnection(ClientContext &context, PostgresLocalState &lstate, PostgresGlobalState &gstate) { +bool PostgresBindData::TryOpenNewConnection(ClientContext &context, PostgresLocalState &lstate, + PostgresGlobalState &gstate) { { lock_guard parallel_lock(gstate.lock); if (!gstate.used_main_thread) { @@ -311,8 +316,9 @@ bool PostgresBindData::TryOpenNewConnection(ClientContext &context, PostgresLoca return true; } -static unique_ptr GetLocalState(ClientContext &context, TableFunctionInitInput &input, PostgresGlobalState &gstate) { - auto &bind_data = (PostgresBindData &) *input.bind_data; +static unique_ptr GetLocalState(ClientContext &context, TableFunctionInitInput &input, + PostgresGlobalState &gstate) { + auto &bind_data = (PostgresBindData &)*input.bind_data; auto local_state = make_uniq(); if (gstate.collection) { @@ -342,7 +348,8 @@ static unique_ptr PostgresInitLocalState(ExecutionConte return GetLocalState(context.client, input, gstate); } -void PostgresLocalState::ScanChunk(ClientContext &context, const PostgresBindData &bind_data, PostgresGlobalState &gstate, DataChunk &output) { +void PostgresLocalState::ScanChunk(ClientContext &context, const PostgresBindData &bind_data, + PostgresGlobalState &gstate, DataChunk &output) { idx_t output_offset = 0; PostgresBinaryReader reader(connection); while (true) { @@ -388,8 +395,7 @@ void PostgresLocalState::ScanChunk(ClientContext &context, const PostgresBindDat ctid_type.info = PostgresTypeAnnotation::CTID; reader.ReadValue(LogicalType::BIGINT, ctid_type, out_vec, output_offset); } else { - reader.ReadValue(bind_data.types[col_idx], bind_data.postgres_types[col_idx], out_vec, - output_offset); + reader.ReadValue(bind_data.types[col_idx], bind_data.postgres_types[col_idx], out_vec, output_offset); } } reader.Reset(); @@ -413,8 +419,7 @@ static void PostgresScan(ClientContext &context, TableFunctionInput &data, DataC } static idx_t PostgresScanBatchIndex(ClientContext &context, const FunctionData *bind_data_p, - LocalTableFunctionState *local_state_p, - GlobalTableFunctionState *global_state) { + LocalTableFunctionState *local_state_p, GlobalTableFunctionState *global_state) { auto &bind_data = bind_data_p->Cast(); auto &local_state = local_state_p->Cast(); return local_state.batch_idx; @@ -428,7 +433,7 @@ static string PostgresScanToString(const FunctionData *bind_data_p) { } static void PostgresScanSerialize(Serializer &serializer, const optional_ptr bind_data_p, - const TableFunction &function) { + const TableFunction &function) { throw NotImplementedException("PostgresScanSerialize"); } @@ -437,8 +442,8 @@ static unique_ptr PostgresScanDeserialize(Deserializer &deserializ } PostgresScanFunction::PostgresScanFunction() - : TableFunction("postgres_scan", {LogicalType::VARCHAR, LogicalType::VARCHAR, LogicalType::VARCHAR}, - PostgresScan, PostgresBind, PostgresInitGlobalState, PostgresInitLocalState) { + : TableFunction("postgres_scan", {LogicalType::VARCHAR, LogicalType::VARCHAR, LogicalType::VARCHAR}, PostgresScan, + PostgresBind, PostgresInitGlobalState, PostgresInitLocalState) { to_string = PostgresScanToString; serialize = PostgresScanSerialize; deserialize = PostgresScanDeserialize; @@ -447,8 +452,8 @@ PostgresScanFunction::PostgresScanFunction() } PostgresScanFunctionFilterPushdown::PostgresScanFunctionFilterPushdown() - : TableFunction("postgres_scan_pushdown", {LogicalType::VARCHAR, LogicalType::VARCHAR, LogicalType::VARCHAR}, - PostgresScan, PostgresBind, PostgresInitGlobalState, PostgresInitLocalState) { + : TableFunction("postgres_scan_pushdown", {LogicalType::VARCHAR, LogicalType::VARCHAR, LogicalType::VARCHAR}, + PostgresScan, PostgresBind, PostgresInitGlobalState, PostgresInitLocalState) { to_string = PostgresScanToString; serialize = PostgresScanSerialize; deserialize = PostgresScanDeserialize; @@ -457,4 +462,4 @@ PostgresScanFunctionFilterPushdown::PostgresScanFunctionFilterPushdown() filter_pushdown = true; } -} +} // namespace duckdb diff --git a/src/postgres_storage.cpp b/src/postgres_storage.cpp index 7297d9d2..d569f2c4 100644 --- a/src/postgres_storage.cpp +++ b/src/postgres_storage.cpp @@ -8,12 +8,12 @@ namespace duckdb { static unique_ptr PostgresAttach(StorageExtensionInfo *storage_info, AttachedDatabase &db, const string &name, - AttachInfo &info, AccessMode access_mode) { + AttachInfo &info, AccessMode access_mode) { return make_uniq(db, info.path, access_mode); } static unique_ptr PostgresCreateTransactionManager(StorageExtensionInfo *storage_info, - AttachedDatabase &db, Catalog &catalog) { + AttachedDatabase &db, Catalog &catalog) { auto &postgres_catalog = catalog.Cast(); return make_uniq(db, postgres_catalog); } diff --git a/src/postgres_utils.cpp b/src/postgres_utils.cpp index d5ffc1cc..28fd2993 100644 --- a/src/postgres_utils.cpp +++ b/src/postgres_utils.cpp @@ -5,8 +5,7 @@ namespace duckdb { -static void -PGNoticeProcessor(void *arg, const char *message) { +static void PGNoticeProcessor(void *arg, const char *message) { } PGconn *PostgresUtils::PGConnect(const string &dsn) { @@ -24,7 +23,7 @@ string PostgresUtils::TypeToString(const LogicalType &input) { if (input.HasAlias()) { return input.GetAlias(); } - switch(input.id()) { + switch (input.id()) { case LogicalTypeId::FLOAT: return "REAL"; case LogicalTypeId::DOUBLE: @@ -34,9 +33,11 @@ string PostgresUtils::TypeToString(const LogicalType &input) { case LogicalTypeId::LIST: return PostgresUtils::TypeToString(ListType::GetChildType(input)) + "[]"; case LogicalTypeId::ENUM: - throw NotImplementedException("Enums in Postgres must be named - unnamed enums are not supported. Use CREATE TYPE to create a named enum."); + throw NotImplementedException("Enums in Postgres must be named - unnamed enums are not supported. Use CREATE " + "TYPE to create a named enum."); case LogicalTypeId::STRUCT: - throw NotImplementedException("Composite types in Postgres must be named - unnamed composite types are not supported. Use CREATE TYPE to create a named composite type."); + throw NotImplementedException("Composite types in Postgres must be named - unnamed composite types are not " + "supported. Use CREATE TYPE to create a named composite type."); case LogicalTypeId::MAP: throw NotImplementedException("MAP type not supported in Postgres"); case LogicalTypeId::UNION: @@ -47,7 +48,7 @@ string PostgresUtils::TypeToString(const LogicalType &input) { } LogicalType PostgresUtils::RemoveAlias(const LogicalType &type) { - switch(type.id()) { + switch (type.id()) { case LogicalTypeId::STRUCT: { auto child_types = StructType::GetChildTypes(type); return LogicalType::STRUCT(std::move(child_types)); @@ -63,7 +64,9 @@ LogicalType PostgresUtils::RemoveAlias(const LogicalType &type) { } } -LogicalType PostgresUtils::TypeToLogicalType(optional_ptr transaction, optional_ptr schema, const PostgresTypeData &type_info, PostgresType &postgres_type) { +LogicalType PostgresUtils::TypeToLogicalType(optional_ptr transaction, + optional_ptr schema, + const PostgresTypeData &type_info, PostgresType &postgres_type) { auto &pgtypename = type_info.type_name; // postgres array types start with an _ @@ -93,7 +96,7 @@ LogicalType PostgresUtils::TypeToLogicalType(optional_ptr t PostgresType child_pg_type; auto child_type = PostgresUtils::TypeToLogicalType(transaction, schema, child_type_info, child_pg_type); // construct the child type based on the number of dimensions - for(idx_t i = 1; i < dimensions; i++) { + for (idx_t i = 1; i < dimensions; i++) { PostgresType new_pg_type; new_pg_type.children.push_back(std::move(child_pg_type)); child_pg_type = std::move(new_pg_type); @@ -161,7 +164,8 @@ LogicalType PostgresUtils::TypeToLogicalType(optional_ptr t if (!context) { throw InternalException("Context is destroyed!?"); } - auto entry = schema->GetEntry(CatalogTransaction(schema->ParentCatalog(), *context), CatalogType::TYPE_ENTRY, pgtypename); + auto entry = schema->GetEntry(CatalogTransaction(schema->ParentCatalog(), *context), CatalogType::TYPE_ENTRY, + pgtypename); if (!entry) { // unsupported so fallback to varchar postgres_type.info = PostgresTypeAnnotation::CAST_TO_VARCHAR; @@ -199,7 +203,7 @@ LogicalType PostgresUtils::ToPostgresType(const LogicalType &input) { return LogicalType::LIST(ToPostgresType(ListType::GetChildType(input))); case LogicalTypeId::STRUCT: { child_list_t new_types; - for(idx_t c = 0; c < StructType::GetChildCount(input); c++) { + for (idx_t c = 0; c < StructType::GetChildCount(input); c++) { auto &name = StructType::GetChildName(input, c); auto &type = StructType::GetChildType(input, c); new_types.push_back(make_pair(name, ToPostgresType(type))); @@ -228,9 +232,9 @@ LogicalType PostgresUtils::ToPostgresType(const LogicalType &input) { PostgresType PostgresUtils::CreateEmptyPostgresType(const LogicalType &type) { PostgresType result; - switch(type.id()) { + switch (type.id()) { case LogicalTypeId::STRUCT: - for(auto &child_type : StructType::GetChildTypes(type)) { + for (auto &child_type : StructType::GetChildTypes(type)) { result.children.push_back(CreateEmptyPostgresType(child_type.second)); } break; @@ -244,7 +248,7 @@ PostgresType PostgresUtils::CreateEmptyPostgresType(const LogicalType &type) { } bool PostgresUtils::SupportedPostgresOid(const LogicalType &input) { - switch(input.id()) { + switch (input.id()) { case LogicalTypeId::BOOLEAN: case LogicalTypeId::SMALLINT: case LogicalTypeId::INTEGER: @@ -268,7 +272,7 @@ bool PostgresUtils::SupportedPostgresOid(const LogicalType &input) { } string PostgresUtils::PostgresOidToName(uint32_t oid) { - switch(oid) { + switch (oid) { case BOOLOID: return "bool"; case INT2OID: @@ -357,7 +361,7 @@ string PostgresUtils::PostgresOidToName(uint32_t oid) { } uint32_t PostgresUtils::ToPostgresOid(const LogicalType &input) { - switch(input.id()) { + switch (input.id()) { case LogicalTypeId::BOOLEAN: return BOOLOID; case LogicalTypeId::SMALLINT: @@ -401,12 +405,12 @@ PostgresVersion PostgresUtils::ExtractPostgresVersion(const string &version_str) PostgresVersion result; idx_t pos = 0; // scan for the first digit - while(pos < version_str.size() && !StringUtil::CharacterIsDigit(version_str[pos])) { + while (pos < version_str.size() && !StringUtil::CharacterIsDigit(version_str[pos])) { pos++; } - for(idx_t version_idx = 0; version_idx < 3; version_idx++) { + for (idx_t version_idx = 0; version_idx < 3; version_idx++) { idx_t digit_start = pos; - while(pos < version_str.size() && StringUtil::CharacterIsDigit(version_str[pos])) { + while (pos < version_str.size() && StringUtil::CharacterIsDigit(version_str[pos])) { pos++; } if (digit_start == pos) { @@ -416,7 +420,7 @@ PostgresVersion PostgresUtils::ExtractPostgresVersion(const string &version_str) // our version is at [digit_start..pos) auto digit_str = version_str.substr(digit_start, pos - digit_start); auto digit = std::strtoll(digit_str.c_str(), 0, 10); - switch(version_idx) { + switch (version_idx) { case 0: result.major_v = digit; break; @@ -437,4 +441,4 @@ PostgresVersion PostgresUtils::ExtractPostgresVersion(const string &version_str) return result; } -} +} // namespace duckdb diff --git a/src/storage/postgres_catalog.cpp b/src/storage/postgres_catalog.cpp index 358f46da..4e43a187 100644 --- a/src/storage/postgres_catalog.cpp +++ b/src/storage/postgres_catalog.cpp @@ -44,14 +44,12 @@ void PostgresCatalog::DropSchema(ClientContext &context, DropInfo &info) { } void PostgresCatalog::ScanSchemas(ClientContext &context, std::function callback) { - schemas.Scan(context, [&](CatalogEntry &schema) { - callback(schema.Cast()); - }); + schemas.Scan(context, [&](CatalogEntry &schema) { callback(schema.Cast()); }); } optional_ptr PostgresCatalog::GetSchema(CatalogTransaction transaction, const string &schema_name, - OnEntryNotFound if_not_found, - QueryErrorContext error_context) { + OnEntryNotFound if_not_found, + QueryErrorContext error_context) { if (schema_name == DEFAULT_SCHEMA) { return GetSchema(transaction, "public", if_not_found, error_context); } @@ -88,5 +86,4 @@ void PostgresCatalog::ClearCache() { schemas.ClearEntries(); } - } // namespace duckdb diff --git a/src/storage/postgres_catalog_set.cpp b/src/storage/postgres_catalog_set.cpp index 0bd0c7e2..5f2225df 100644 --- a/src/storage/postgres_catalog_set.cpp +++ b/src/storage/postgres_catalog_set.cpp @@ -3,8 +3,8 @@ #include "duckdb/parser/parsed_data/drop_info.hpp" namespace duckdb { -PostgresCatalogSet::PostgresCatalogSet(Catalog &catalog) : - catalog(catalog), is_loaded(false) {} +PostgresCatalogSet::PostgresCatalogSet(Catalog &catalog) : catalog(catalog), is_loaded(false) { +} optional_ptr PostgresCatalogSet::GetEntry(ClientContext &context, const string &name) { if (!is_loaded) { @@ -55,7 +55,7 @@ void PostgresCatalogSet::Scan(ClientContext &context, const std::function l(entry_lock); - for(auto &entry : entries) { + for (auto &entry : entries) { callback(*entry.second); } } @@ -77,4 +77,4 @@ void PostgresCatalogSet::ClearEntries() { is_loaded = false; } -} +} // namespace duckdb diff --git a/src/storage/postgres_clear_cache.cpp b/src/storage/postgres_clear_cache.cpp index 1c6ef6bf..9ec199de 100644 --- a/src/storage/postgres_clear_cache.cpp +++ b/src/storage/postgres_clear_cache.cpp @@ -13,7 +13,7 @@ struct ClearCacheFunctionData : public TableFunctionData { }; static unique_ptr ClearCacheBind(ClientContext &context, TableFunctionBindInput &input, - vector &return_types, vector &names) { + vector &return_types, vector &names) { auto result = make_uniq(); return_types.push_back(LogicalType::BOOLEAN); @@ -27,7 +27,7 @@ static void ClearCacheFunction(ClientContext &context, TableFunctionInput &data_ return; } auto databases = DatabaseManager::Get(context).GetDatabases(context); - for(auto &db_ref : databases) { + for (auto &db_ref : databases) { auto &db = db_ref.get(); auto &catalog = db.GetCatalog(); if (catalog.GetCatalogType() != "postgres") { @@ -39,7 +39,6 @@ static void ClearCacheFunction(ClientContext &context, TableFunctionInput &data_ } PostgresClearCacheFunction::PostgresClearCacheFunction() - : TableFunction("pg_clear_cache", {}, - ClearCacheFunction, ClearCacheBind) { -} + : TableFunction("pg_clear_cache", {}, ClearCacheFunction, ClearCacheBind) { } +} // namespace duckdb diff --git a/src/storage/postgres_connection_pool.cpp b/src/storage/postgres_connection_pool.cpp index 8aa775e4..21726bc7 100644 --- a/src/storage/postgres_connection_pool.cpp +++ b/src/storage/postgres_connection_pool.cpp @@ -3,10 +3,13 @@ namespace duckdb { -PostgresPoolConnection::PostgresPoolConnection() : pool(nullptr) {} +PostgresPoolConnection::PostgresPoolConnection() : pool(nullptr) { +} -PostgresPoolConnection::PostgresPoolConnection(optional_ptr pool, PostgresConnection connection_p) : - pool(pool), connection(std::move(connection_p)) {} +PostgresPoolConnection::PostgresPoolConnection(optional_ptr pool, + PostgresConnection connection_p) + : pool(pool), connection(std::move(connection_p)) { +} PostgresPoolConnection::~PostgresPoolConnection() { if (pool) { @@ -36,8 +39,9 @@ PostgresConnection &PostgresPoolConnection::GetConnection() { return connection; } -PostgresConnectionPool::PostgresConnectionPool(PostgresCatalog &postgres_catalog, idx_t maximum_connections_p) : - postgres_catalog(postgres_catalog), active_connections(0), maximum_connections(maximum_connections_p) {} +PostgresConnectionPool::PostgresConnectionPool(PostgresCatalog &postgres_catalog, idx_t maximum_connections_p) + : postgres_catalog(postgres_catalog), active_connections(0), maximum_connections(maximum_connections_p) { +} bool PostgresConnectionPool::TryGetConnection(PostgresPoolConnection &connection) { lock_guard l(connection_lock); @@ -60,7 +64,9 @@ bool PostgresConnectionPool::TryGetConnection(PostgresPoolConnection &connection PostgresPoolConnection PostgresConnectionPool::GetConnection() { PostgresPoolConnection result; if (!TryGetConnection(result)) { - throw IOException("Failed to get connection from PostgresConnectionPool - maximum connection count exceeded (%llu/%llu max)", active_connections, maximum_connections); + throw IOException( + "Failed to get connection from PostgresConnectionPool - maximum connection count exceeded (%llu/%llu max)", + active_connections, maximum_connections); } return result; } @@ -72,7 +78,8 @@ void PostgresConnectionPool::ReturnConnection(PostgresConnection connection) { } active_connections--; if (active_connections >= maximum_connections) { - // if the maximum number of connections has been decreased by the user we might need to reclaim the connection immediately + // if the maximum number of connections has been decreased by the user we might need to reclaim the connection + // immediately return; } // check if the underlying connection is still usable @@ -98,7 +105,7 @@ void PostgresConnectionPool::SetMaximumConnections(idx_t new_max) { // note that we can only close connections in the connection cache // we will have to wait for connections to be returned auto total_open_connections = active_connections + connection_cache.size(); - while(!connection_cache.empty() && total_open_connections > new_max) { + while (!connection_cache.empty() && total_open_connections > new_max) { total_open_connections--; connection_cache.pop_back(); } @@ -106,4 +113,4 @@ void PostgresConnectionPool::SetMaximumConnections(idx_t new_max) { maximum_connections = new_max; } -} +} // namespace duckdb diff --git a/src/storage/postgres_delete.cpp b/src/storage/postgres_delete.cpp index ed6f9961..17dc7513 100644 --- a/src/storage/postgres_delete.cpp +++ b/src/storage/postgres_delete.cpp @@ -83,7 +83,7 @@ SinkResultType PostgresDelete::Sink(ExecutionContext &context, DataChunk &chunk, // Finalize //===--------------------------------------------------------------------===// SinkFinalizeType PostgresDelete::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, - OperatorSinkFinalizeInput &input) const { + OperatorSinkFinalizeInput &input) const { auto &gstate = input.global_state.Cast(); gstate.Flush(context); return SinkFinalizeType::READY; @@ -92,7 +92,8 @@ SinkFinalizeType PostgresDelete::Finalize(Pipeline &pipeline, Event &event, Clie //===--------------------------------------------------------------------===// // GetData //===--------------------------------------------------------------------===// -SourceResultType PostgresDelete::GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const { +SourceResultType PostgresDelete::GetData(ExecutionContext &context, DataChunk &chunk, + OperatorSourceInput &input) const { auto &insert_gstate = sink_state->Cast(); chunk.SetCardinality(1); chunk.SetValue(0, 0, Value::BIGINT(insert_gstate.delete_count)); @@ -115,7 +116,7 @@ string PostgresDelete::ParamsToString() const { // Plan //===--------------------------------------------------------------------===// unique_ptr PostgresCatalog::PlanDelete(ClientContext &context, LogicalDelete &op, - unique_ptr plan) { + unique_ptr plan) { if (op.return_chunk) { throw BinderException("RETURNING clause not yet supported for deletion of a Postgres table"); } diff --git a/src/storage/postgres_index.cpp b/src/storage/postgres_index.cpp index 1fceaf3a..d670e744 100644 --- a/src/storage/postgres_index.cpp +++ b/src/storage/postgres_index.cpp @@ -14,7 +14,7 @@ PostgresCreateIndex::PostgresCreateIndex(unique_ptr info, Table // Source //===--------------------------------------------------------------------===// SourceResultType PostgresCreateIndex::GetData(ExecutionContext &context, DataChunk &chunk, - OperatorSourceInput &input) const { + OperatorSourceInput &input) const { auto &catalog = table.catalog; if (info->catalog == INVALID_CATALOG && info->schema == catalog.GetName()) { info->schema = DEFAULT_SCHEMA; @@ -51,9 +51,10 @@ class LogicalPostgresCreateIndex : public LogicalExtensionOperator { }; unique_ptr PostgresCatalog::BindCreateIndex(Binder &binder, CreateStatement &stmt, - TableCatalogEntry &table, unique_ptr plan) { + TableCatalogEntry &table, + unique_ptr plan) { return make_uniq(unique_ptr_cast(std::move(stmt.info)), - table); + table); } } // namespace duckdb diff --git a/src/storage/postgres_index_entry.cpp b/src/storage/postgres_index_entry.cpp index 4428d414..a27ab3ea 100644 --- a/src/storage/postgres_index_entry.cpp +++ b/src/storage/postgres_index_entry.cpp @@ -4,7 +4,7 @@ namespace duckdb { PostgresIndexEntry::PostgresIndexEntry(Catalog &catalog, SchemaCatalogEntry &schema, CreateIndexInfo &info, - string table_name_p) + string table_name_p) : IndexCatalogEntry(catalog, schema, info), table_name(std::move(table_name_p)) { } diff --git a/src/storage/postgres_index_set.cpp b/src/storage/postgres_index_set.cpp index 9276330c..64573db7 100644 --- a/src/storage/postgres_index_set.cpp +++ b/src/storage/postgres_index_set.cpp @@ -7,9 +7,9 @@ namespace duckdb { -PostgresIndexSet::PostgresIndexSet(PostgresSchemaEntry &schema, unique_ptr index_result_p) : - PostgresCatalogSet(schema.ParentCatalog()), schema(schema), index_result(std::move(index_result_p)) {} - +PostgresIndexSet::PostgresIndexSet(PostgresSchemaEntry &schema, unique_ptr index_result_p) + : PostgresCatalogSet(schema.ParentCatalog()), schema(schema), index_result(std::move(index_result_p)) { +} string PostgresIndexSet::GetInitializeQuery() { return R"( @@ -25,7 +25,7 @@ void PostgresIndexSet::LoadEntries(ClientContext &context) { throw InternalException("PostgresIndexSet::LoadEntries called without an index result defined"); } auto &result = index_result->GetResult(); - for(idx_t row = index_result->start; row < index_result->end; row++) { + for (idx_t row = index_result->start; row < index_result->end; row++) { auto table_name = result.GetString(row, 1); auto index_name = result.GetString(row, 2); CreateIndexInfo info; @@ -71,11 +71,11 @@ string PGGetCreateIndexSQL(CreateIndexInfo &info, TableCatalogEntry &tbl) { } optional_ptr PostgresIndexSet::CreateIndex(ClientContext &context, CreateIndexInfo &info, - TableCatalogEntry &table) { + TableCatalogEntry &table) { auto &postgres_transaction = PostgresTransaction::Get(context, table.catalog); postgres_transaction.Query(PGGetCreateIndexSQL(info, table)); auto index_entry = make_uniq(schema.ParentCatalog(), schema, info, table.name); return CreateEntry(std::move(index_entry)); } -} +} // namespace duckdb diff --git a/src/storage/postgres_insert.cpp b/src/storage/postgres_insert.cpp index 3eeddc39..84541451 100644 --- a/src/storage/postgres_insert.cpp +++ b/src/storage/postgres_insert.cpp @@ -15,7 +15,7 @@ namespace duckdb { PostgresInsert::PostgresInsert(LogicalOperator &op, TableCatalogEntry &table, - physical_index_vector_t column_index_map_p) + physical_index_vector_t column_index_map_p) : PhysicalOperator(PhysicalOperatorType::EXTENSION, op.types, 1), table(&table), schema(nullptr), column_index_map(std::move(column_index_map_p)) { } @@ -30,7 +30,8 @@ PostgresInsert::PostgresInsert(LogicalOperator &op, SchemaCatalogEntry &schema, //===--------------------------------------------------------------------===// class PostgresInsertGlobalState : public GlobalSinkState { public: - explicit PostgresInsertGlobalState(ClientContext &context, PostgresTableEntry *table) : table(table), insert_count(0) { + explicit PostgresInsertGlobalState(ClientContext &context, PostgresTableEntry *table) + : table(table), insert_count(0) { } PostgresTableEntry *table; @@ -81,7 +82,7 @@ unique_ptr PostgresInsert::GetGlobalSinkState(ClientContext &co auto format = insert_table->GetCopyFormat(context); vector insert_column_names; if (!insert_columns.empty()) { - for(auto &str : insert_columns) { + for (auto &str : insert_columns) { auto index = insert_table->GetColumnIndex(str, true); if (!index.IsValid()) { insert_column_names.push_back(str); @@ -90,7 +91,8 @@ unique_ptr PostgresInsert::GetGlobalSinkState(ClientContext &co } } } - connection.BeginCopyTo(context, result->copy_state, format, insert_table->schema.name, insert_table->name, insert_column_names); + connection.BeginCopyTo(context, result->copy_state, format, insert_table->schema.name, insert_table->name, + insert_column_names); return std::move(result); } @@ -110,7 +112,7 @@ SinkResultType PostgresInsert::Sink(ExecutionContext &context, DataChunk &chunk, // Finalize //===--------------------------------------------------------------------===// SinkFinalizeType PostgresInsert::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, - OperatorSinkFinalizeInput &input) const { + OperatorSinkFinalizeInput &input) const { auto &gstate = sink_state->Cast(); auto &transaction = PostgresTransaction::Get(context, gstate.table->catalog); auto &connection = transaction.GetConnection(); @@ -126,7 +128,8 @@ SinkFinalizeType PostgresInsert::Finalize(Pipeline &pipeline, Event &event, Clie //===--------------------------------------------------------------------===// // GetData //===--------------------------------------------------------------------===// -SourceResultType PostgresInsert::GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const { +SourceResultType PostgresInsert::GetData(ExecutionContext &context, DataChunk &chunk, + OperatorSourceInput &input) const { auto &insert_gstate = sink_state->Cast(); chunk.SetCardinality(1); chunk.SetValue(0, 0, Value::BIGINT(insert_gstate.insert_count)); @@ -176,8 +179,8 @@ unique_ptr AddCastToPostgresTypes(ClientContext &context, uniq select_list.push_back(std::move(expr)); } // we need to cast: add casts - auto proj = - make_uniq(std::move(postgres_types), std::move(select_list), plan->estimated_cardinality); + auto proj = make_uniq(std::move(postgres_types), std::move(select_list), + plan->estimated_cardinality); proj->children.push_back(std::move(plan)); plan = std::move(proj); } @@ -188,21 +191,21 @@ unique_ptr AddCastToPostgresTypes(ClientContext &context, uniq void PostgresCatalog::MaterializePostgresScans(PhysicalOperator &op) { if (op.type == PhysicalOperatorType::TABLE_SCAN) { auto &table_scan = op.Cast(); - if (table_scan.function.name == "postgres_scan" || table_scan.function.name == "postgres_scan_pushdown" - || table_scan.function.name == "postgres_query") { + if (table_scan.function.name == "postgres_scan" || table_scan.function.name == "postgres_scan_pushdown" || + table_scan.function.name == "postgres_query") { auto &bind_data = table_scan.bind_data->Cast(); bind_data.requires_materialization = true; bind_data.max_threads = 1; bind_data.emit_ctid = true; } } - for(auto &child : op.children) { + for (auto &child : op.children) { MaterializePostgresScans(*child); } } unique_ptr PostgresCatalog::PlanInsert(ClientContext &context, LogicalInsert &op, - unique_ptr plan) { + unique_ptr plan) { if (op.return_chunk) { throw BinderException("RETURNING clause not yet supported for insertion into Postgres table"); } @@ -219,9 +222,11 @@ unique_ptr PostgresCatalog::PlanInsert(ClientContext &context, } unique_ptr PostgresCatalog::PlanCreateTableAs(ClientContext &context, LogicalCreateTable &op, - unique_ptr plan) { + unique_ptr plan) { plan = AddCastToPostgresTypes(context, std::move(plan)); + MaterializePostgresScans(*plan); + auto insert = make_uniq(op, op.schema, std::move(op.info)); insert->children.push_back(std::move(plan)); return std::move(insert); diff --git a/src/storage/postgres_schema_entry.cpp b/src/storage/postgres_schema_entry.cpp index c35124d4..cb1aebf8 100644 --- a/src/storage/postgres_schema_entry.cpp +++ b/src/storage/postgres_schema_entry.cpp @@ -13,20 +13,16 @@ namespace duckdb { -PostgresSchemaEntry::PostgresSchemaEntry(Catalog &catalog, string name) : - SchemaCatalogEntry(catalog, std::move(name), true), tables(*this), indexes(*this), - types(*this) { +PostgresSchemaEntry::PostgresSchemaEntry(Catalog &catalog, string name) + : SchemaCatalogEntry(catalog, std::move(name), true), tables(*this), indexes(*this), types(*this) { } -PostgresSchemaEntry::PostgresSchemaEntry(Catalog &catalog, string name, - unique_ptr tables, - unique_ptr enums, - unique_ptr composite_types, - unique_ptr indexes) : - SchemaCatalogEntry(catalog, std::move(name), true), - tables(*this, std::move(tables)), - indexes(*this, std::move(indexes)), - types(*this, std::move(enums), std::move(composite_types)) { +PostgresSchemaEntry::PostgresSchemaEntry(Catalog &catalog, string name, unique_ptr tables, + unique_ptr enums, + unique_ptr composite_types, + unique_ptr indexes) + : SchemaCatalogEntry(catalog, std::move(name), true), tables(*this, std::move(tables)), + indexes(*this, std::move(indexes)), types(*this, std::move(enums), std::move(composite_types)) { } PostgresTransaction &GetPostgresTransaction(CatalogTransaction transaction) { @@ -45,7 +41,8 @@ void PostgresSchemaEntry::TryDropEntry(ClientContext &context, CatalogType catal DropEntry(context, info); } -optional_ptr PostgresSchemaEntry::CreateTable(CatalogTransaction transaction, BoundCreateTableInfo &info) { +optional_ptr PostgresSchemaEntry::CreateTable(CatalogTransaction transaction, + BoundCreateTableInfo &info) { auto &postgres_transaction = GetPostgresTransaction(transaction); auto &base_info = info.Base(); auto table_name = base_info.table; @@ -56,12 +53,13 @@ optional_ptr PostgresSchemaEntry::CreateTable(CatalogTransaction t return tables.CreateTable(transaction.GetContext(), info); } -optional_ptr PostgresSchemaEntry::CreateFunction(CatalogTransaction transaction, CreateFunctionInfo &info) { +optional_ptr PostgresSchemaEntry::CreateFunction(CatalogTransaction transaction, + CreateFunctionInfo &info) { throw BinderException("Postgres databases do not support creating functions"); } optional_ptr PostgresSchemaEntry::CreateIndex(ClientContext &context, CreateIndexInfo &info, - TableCatalogEntry &table) { + TableCatalogEntry &table) { return indexes.CreateIndex(context, info, table); } @@ -116,27 +114,28 @@ optional_ptr PostgresSchemaEntry::CreateType(CatalogTransaction tr return types.CreateType(transaction.GetContext(), info); } -optional_ptr PostgresSchemaEntry::CreateSequence(CatalogTransaction transaction, CreateSequenceInfo &info) { +optional_ptr PostgresSchemaEntry::CreateSequence(CatalogTransaction transaction, + CreateSequenceInfo &info) { throw BinderException("Postgres databases do not support creating sequences"); } optional_ptr PostgresSchemaEntry::CreateTableFunction(CatalogTransaction transaction, - CreateTableFunctionInfo &info) { + CreateTableFunctionInfo &info) { throw BinderException("Postgres databases do not support creating table functions"); } optional_ptr PostgresSchemaEntry::CreateCopyFunction(CatalogTransaction transaction, - CreateCopyFunctionInfo &info) { + CreateCopyFunctionInfo &info) { throw BinderException("Postgres databases do not support creating copy functions"); } optional_ptr PostgresSchemaEntry::CreatePragmaFunction(CatalogTransaction transaction, - CreatePragmaFunctionInfo &info) { + CreatePragmaFunctionInfo &info) { throw BinderException("Postgres databases do not support creating pragma functions"); } optional_ptr PostgresSchemaEntry::CreateCollation(CatalogTransaction transaction, - CreateCollationInfo &info) { + CreateCollationInfo &info) { throw BinderException("Postgres databases do not support creating collations"); } @@ -161,7 +160,7 @@ bool CatalogTypeIsSupported(CatalogType type) { } void PostgresSchemaEntry::Scan(ClientContext &context, CatalogType type, - const std::function &callback) { + const std::function &callback) { if (!CatalogTypeIsSupported(type)) { return; } @@ -176,7 +175,7 @@ void PostgresSchemaEntry::DropEntry(ClientContext &context, DropInfo &info) { } optional_ptr PostgresSchemaEntry::GetEntry(CatalogTransaction transaction, CatalogType type, - const string &name) { + const string &name) { if (!CatalogTypeIsSupported(type)) { return nullptr; } diff --git a/src/storage/postgres_schema_set.cpp b/src/storage/postgres_schema_set.cpp index 613c08d5..ffe136aa 100644 --- a/src/storage/postgres_schema_set.cpp +++ b/src/storage/postgres_schema_set.cpp @@ -8,8 +8,8 @@ namespace duckdb { -PostgresSchemaSet::PostgresSchemaSet(Catalog &catalog) : - PostgresCatalogSet(catalog) {} +PostgresSchemaSet::PostgresSchemaSet(Catalog &catalog) : PostgresCatalogSet(catalog) { +} vector> SliceResult(PostgresResult &schemas, unique_ptr to_slice_ptr) { auto shared_result = shared_ptr(to_slice_ptr.release()); @@ -17,10 +17,10 @@ vector> SliceResult(PostgresResult &schemas, uni vector> result; idx_t current_offset = 0; - for(idx_t schema_idx = 0; schema_idx < schemas.Count(); schema_idx++) { + for (idx_t schema_idx = 0; schema_idx < schemas.Count(); schema_idx++) { auto oid = schemas.GetInt64(schema_idx, 0); idx_t start = current_offset; - for(; current_offset < to_slice.Count(); current_offset++) { + for (; current_offset < to_slice.Count(); current_offset++) { auto current_oid = to_slice.GetInt64(current_offset, 0); if (current_oid != oid) { break; @@ -58,10 +58,12 @@ void PostgresSchemaSet::LoadEntries(ClientContext &context) { auto enums = SliceResult(*result, std::move(results[1])); auto composite_types = SliceResult(*result, std::move(results[2])); auto indexes = SliceResult(*result, std::move(results[3])); - for(idx_t row = 0; row < rows; row++) { + for (idx_t row = 0; row < rows; row++) { auto oid = result->GetInt64(row, 0); auto schema_name = result->GetString(row, 1); - auto schema = make_uniq(catalog, schema_name, std::move(tables[row]), std::move(enums[row]), std::move(composite_types[row]), std::move(indexes[row])); + auto schema = + make_uniq(catalog, schema_name, std::move(tables[row]), std::move(enums[row]), + std::move(composite_types[row]), std::move(indexes[row])); CreateEntry(std::move(schema)); } } @@ -75,4 +77,4 @@ optional_ptr PostgresSchemaSet::CreateSchema(ClientContext &contex return CreateEntry(std::move(schema_entry)); } -} +} // namespace duckdb diff --git a/src/storage/postgres_table_entry.cpp b/src/storage/postgres_table_entry.cpp index c3166c9e..9f5a21ae 100644 --- a/src/storage/postgres_table_entry.cpp +++ b/src/storage/postgres_table_entry.cpp @@ -8,8 +8,8 @@ namespace duckdb { PostgresTableEntry::PostgresTableEntry(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info) - : TableCatalogEntry(catalog, schema, info) { - for(idx_t c = 0; c < columns.LogicalColumnCount(); c++) { + : TableCatalogEntry(catalog, schema, info) { + for (idx_t c = 0; c < columns.LogicalColumnCount(); c++) { auto &col = columns.GetColumnMutable(LogicalIndex(c)); if (col.GetType().HasAlias()) { col.TypeMutable() = PostgresUtils::RemoveAlias(col.GetType()); @@ -22,7 +22,7 @@ PostgresTableEntry::PostgresTableEntry(Catalog &catalog, SchemaCatalogEntry &sch PostgresTableEntry::PostgresTableEntry(Catalog &catalog, SchemaCatalogEntry &schema, PostgresTableInfo &info) : TableCatalogEntry(catalog, schema, *info.create_info), postgres_types(std::move(info.postgres_types)), - postgres_names(std::move(info.postgres_names)) { + postgres_names(std::move(info.postgres_names)) { D_ASSERT(postgres_types.size() == columns.LogicalColumnCount()); approx_num_pages = info.approx_num_pages; } @@ -45,7 +45,7 @@ TableFunction PostgresTableEntry::GetScanFunction(ClientContext &context, unique result->dsn = transaction.GetDSN(); result->SetConnection(transaction.GetConnection().GetConnection()); result->SetCatalog(pg_catalog); - for(auto &col : columns.Logical()) { + for (auto &col : columns.Logical()) { result->types.push_back(col.GetType()); } result->names = postgres_names; @@ -75,7 +75,7 @@ static bool CopyRequiresText(const LogicalType &type, const PostgresType &pg_typ if (pg_type.info != PostgresTypeAnnotation::STANDARD) { return true; } - switch(type.id()) { + switch (type.id()) { case LogicalTypeId::LIST: { D_ASSERT(pg_type.children.size() == 1); auto &child_type = ListType::GetChildType(type); @@ -90,7 +90,7 @@ static bool CopyRequiresText(const LogicalType &type, const PostgresType &pg_typ case LogicalTypeId::STRUCT: { auto &children = StructType::GetChildTypes(type); D_ASSERT(children.size() == pg_type.children.size()); - for(idx_t c = 0; c < pg_type.children.size(); c++) { + for (idx_t c = 0; c < pg_type.children.size(); c++) { if (!PostgresUtils::SupportedPostgresOid(children[c].second)) { return true; } @@ -113,7 +113,7 @@ PostgresCopyFormat PostgresTableEntry::GetCopyFormat(ClientContext &context) { } } D_ASSERT(postgres_types.size() == columns.LogicalColumnCount()); - for(idx_t c = 0; c < postgres_types.size(); c++) { + for (idx_t c = 0; c < postgres_types.size(); c++) { if (CopyRequiresText(columns.GetColumn(LogicalIndex(c)).GetType(), postgres_types[c])) { return PostgresCopyFormat::TEXT; } diff --git a/src/storage/postgres_table_set.cpp b/src/storage/postgres_table_set.cpp index 07ef52ce..ba24e8b0 100644 --- a/src/storage/postgres_table_set.cpp +++ b/src/storage/postgres_table_set.cpp @@ -14,8 +14,9 @@ namespace duckdb { -PostgresTableSet::PostgresTableSet(PostgresSchemaEntry &schema, unique_ptr table_result_p) : - PostgresCatalogSet(schema.ParentCatalog()), schema(schema), table_result(std::move(table_result_p)) {} +PostgresTableSet::PostgresTableSet(PostgresSchemaEntry &schema, unique_ptr table_result_p) + : PostgresCatalogSet(schema.ParentCatalog()), schema(schema), table_result(std::move(table_result_p)) { +} string PostgresTableSet::GetInitializeQuery() { return R"( @@ -30,7 +31,9 @@ ORDER BY pg_namespace.oid, relname, attnum; )"; } -void PostgresTableSet::AddColumn(optional_ptr transaction, optional_ptr schema, PostgresResult &result, idx_t row, PostgresTableInfo &table_info, idx_t column_offset) { +void PostgresTableSet::AddColumn(optional_ptr transaction, + optional_ptr schema, PostgresResult &result, idx_t row, + PostgresTableInfo &table_info, idx_t column_offset) { PostgresTypeData type_info; idx_t column_index = column_offset; auto column_name = result.GetString(row, column_index); @@ -55,11 +58,12 @@ void PostgresTableSet::AddColumn(optional_ptr transaction, create_info.columns.AddColumn(std::move(column)); } -void PostgresTableSet::CreateEntries(PostgresTransaction &transaction, PostgresResult &result, idx_t start, idx_t end, idx_t col_offset) { +void PostgresTableSet::CreateEntries(PostgresTransaction &transaction, PostgresResult &result, idx_t start, idx_t end, + idx_t col_offset) { vector> tables; unique_ptr info; - for(idx_t row = start; row < end; row++) { + for (idx_t row = start; row < end; row++) { auto table_name = result.GetString(row, col_offset); auto approx_num_pages = result.GetInt64(row, col_offset + 1); if (!info || info->GetTableName() != table_name) { @@ -74,7 +78,7 @@ void PostgresTableSet::CreateEntries(PostgresTransaction &transaction, PostgresR if (info) { tables.push_back(std::move(info)); } - for(auto &tbl_info : tables) { + for (auto &tbl_info : tables) { auto table_entry = make_uniq(catalog, schema, *tbl_info); CreateEntry(std::move(table_entry)); } @@ -95,7 +99,8 @@ void PostgresTableSet::LoadEntries(ClientContext &context) { JOIN pg_type ON atttypid=pg_type.oid WHERE pg_namespace.nspname=${SCHEMA_NAME} AND attnum > 0 ORDER BY relname, attnum; - )", "${SCHEMA_NAME}", KeywordHelper::WriteQuoted(schema.name)); + )", + "${SCHEMA_NAME}", KeywordHelper::WriteQuoted(schema.name)); auto result = transaction.Query(query); auto rows = result->Count(); @@ -114,10 +119,13 @@ JOIN pg_attribute ON pg_class.oid=pg_attribute.attrelid JOIN pg_type ON atttypid=pg_type.oid WHERE pg_namespace.nspname=${SCHEMA_NAME} AND relname=${TABLE_NAME} AND attnum > 0 ORDER BY relname, attnum; -)", "${SCHEMA_NAME}", KeywordHelper::WriteQuoted(schema_name)), "${TABLE_NAME}", KeywordHelper::WriteQuoted(table_name)); +)", + "${SCHEMA_NAME}", KeywordHelper::WriteQuoted(schema_name)), + "${TABLE_NAME}", KeywordHelper::WriteQuoted(table_name)); } -unique_ptr PostgresTableSet::GetTableInfo(PostgresTransaction &transaction, PostgresSchemaEntry &schema, const string &table_name) { +unique_ptr PostgresTableSet::GetTableInfo(PostgresTransaction &transaction, + PostgresSchemaEntry &schema, const string &table_name) { auto query = GetTableInfoQuery(schema.name, table_name); auto result = transaction.Query(query); auto rows = result->Count(); @@ -125,14 +133,15 @@ unique_ptr PostgresTableSet::GetTableInfo(PostgresTransaction throw InvalidInputException("Table %s does not contain any columns.", table_name); } auto table_info = make_uniq(schema, table_name); - for(idx_t row = 0; row < rows; row++) { + for (idx_t row = 0; row < rows; row++) { AddColumn(&transaction, &schema, *result, row, *table_info, 1); } table_info->approx_num_pages = result->GetInt64(0, 0); return table_info; } -unique_ptr PostgresTableSet::GetTableInfo(PostgresConnection &connection, const string &schema_name, const string &table_name) { +unique_ptr PostgresTableSet::GetTableInfo(PostgresConnection &connection, const string &schema_name, + const string &table_name) { auto query = GetTableInfoQuery(schema_name, table_name); auto result = connection.Query(query); auto rows = result->Count(); @@ -140,7 +149,7 @@ unique_ptr PostgresTableSet::GetTableInfo(PostgresConnection throw InvalidInputException("Table %s does not contain any columns.", table_name); } auto table_info = make_uniq(schema_name, table_name); - for(idx_t row = 0; row < rows; row++) { + for (idx_t row = 0; row < rows; row++) { AddColumn(nullptr, nullptr, *result, row, *table_info, 1); } table_info->approx_num_pages = result->GetInt64(0, 0); @@ -332,10 +341,11 @@ void PostgresTableSet::AlterTable(ClientContext &context, AlterTableInfo &alter) AlterTable(context, alter.Cast()); break; default: - throw BinderException("Unsupported ALTER TABLE type - Postgres tables only support RENAME TABLE, RENAME COLUMN, " - "ADD COLUMN and DROP COLUMN"); + throw BinderException( + "Unsupported ALTER TABLE type - Postgres tables only support RENAME TABLE, RENAME COLUMN, " + "ADD COLUMN and DROP COLUMN"); } ClearEntries(); } -} +} // namespace duckdb diff --git a/src/storage/postgres_transaction.cpp b/src/storage/postgres_transaction.cpp index cf735599..3731dadf 100644 --- a/src/storage/postgres_transaction.cpp +++ b/src/storage/postgres_transaction.cpp @@ -7,7 +7,8 @@ namespace duckdb { -PostgresTransaction::PostgresTransaction(PostgresCatalog &postgres_catalog, TransactionManager &manager, ClientContext &context) +PostgresTransaction::PostgresTransaction(PostgresCatalog &postgres_catalog, TransactionManager &manager, + ClientContext &context) : Transaction(manager, context), access_mode(postgres_catalog.access_mode) { connection = postgres_catalog.GetConnectionPool().GetConnection(); } @@ -79,7 +80,6 @@ vector> PostgresTransaction::ExecuteQueries(const str return con.ExecuteQueries(queries); } - PostgresTransaction &PostgresTransaction::Get(ClientContext &context, Catalog &catalog) { return Transaction::Get(context, catalog).Cast(); } diff --git a/src/storage/postgres_type_entry.cpp b/src/storage/postgres_type_entry.cpp index 4c601d64..5d9d621c 100644 --- a/src/storage/postgres_type_entry.cpp +++ b/src/storage/postgres_type_entry.cpp @@ -2,8 +2,9 @@ namespace duckdb { -PostgresTypeEntry::PostgresTypeEntry(Catalog &catalog, SchemaCatalogEntry &schema, CreateTypeInfo &info, PostgresType postgres_type_p) : - TypeCatalogEntry(catalog, schema, info), postgres_type(std::move(postgres_type_p)) {} - - +PostgresTypeEntry::PostgresTypeEntry(Catalog &catalog, SchemaCatalogEntry &schema, CreateTypeInfo &info, + PostgresType postgres_type_p) + : TypeCatalogEntry(catalog, schema, info), postgres_type(std::move(postgres_type_p)) { } + +} // namespace duckdb diff --git a/src/storage/postgres_type_set.cpp b/src/storage/postgres_type_set.cpp index e7255cec..265a4729 100644 --- a/src/storage/postgres_type_set.cpp +++ b/src/storage/postgres_type_set.cpp @@ -14,11 +14,11 @@ struct PGTypeInfo { string name; }; -PostgresTypeSet::PostgresTypeSet(PostgresSchemaEntry &schema, - unique_ptr enum_result_p, - unique_ptr composite_type_result_p) : - PostgresCatalogSet(schema.ParentCatalog()), schema(schema), enum_result(std::move(enum_result_p)), - composite_type_result(std::move(composite_type_result_p)) {} +PostgresTypeSet::PostgresTypeSet(PostgresSchemaEntry &schema, unique_ptr enum_result_p, + unique_ptr composite_type_result_p) + : PostgresCatalogSet(schema.ParentCatalog()), schema(schema), enum_result(std::move(enum_result_p)), + composite_type_result(std::move(composite_type_result_p)) { +} string PostgresTypeSet::GetInitializeEnumsQuery() { return R"( @@ -38,7 +38,7 @@ void PostgresTypeSet::CreateEnum(PostgresResult &result, idx_t start_row, idx_t // construct the enum idx_t enum_count = end_row - start_row; Vector duckdb_levels(LogicalType::VARCHAR, enum_count); - for(idx_t enum_idx = 0; enum_idx < enum_count; enum_idx++) { + for (idx_t enum_idx = 0; enum_idx < enum_count; enum_idx++) { duckdb_levels.SetValue(enum_idx, result.GetString(start_row + enum_idx, 3)); } info.type = LogicalType::ENUM(duckdb_levels, enum_count); @@ -81,7 +81,8 @@ ORDER BY n.oid, t.oid, attrelid, attnum; )"; } -void PostgresTypeSet::CreateCompositeType(PostgresTransaction &transaction, PostgresResult &result, idx_t start_row, idx_t end_row) { +void PostgresTypeSet::CreateCompositeType(PostgresTransaction &transaction, PostgresResult &result, idx_t start_row, + idx_t end_row) { PostgresType postgres_type; CreateTypeInfo info; postgres_type.oid = result.GetInt64(start_row, 1); @@ -93,7 +94,8 @@ void PostgresTypeSet::CreateCompositeType(PostgresTransaction &transaction, Post PostgresTypeData type_data; type_data.type_name = result.GetString(row, 4); PostgresType child_type; - child_types.push_back(make_pair(type_name, PostgresUtils::TypeToLogicalType(&transaction, &schema, type_data, child_type))); + child_types.push_back( + make_pair(type_name, PostgresUtils::TypeToLogicalType(&transaction, &schema, type_data, child_type))); postgres_type.children.push_back(std::move(child_type)); } info.type = LogicalType::STRUCT(std::move(child_types)); @@ -141,7 +143,7 @@ string GetCreateTypeSQL(CreateTypeInfo &info) { case LogicalTypeId::ENUM: { sql += "ENUM("; auto enum_size = EnumType::GetSize(info.type); - for(idx_t i = 0; i < enum_size; i++) { + for (idx_t i = 0; i < enum_size; i++) { if (i > 0) { sql += ", "; } @@ -154,7 +156,7 @@ string GetCreateTypeSQL(CreateTypeInfo &info) { case LogicalTypeId::STRUCT: { auto child_count = StructType::GetChildCount(info.type); sql += "("; - for(idx_t c = 0; c < child_count; c++) { + for (idx_t c = 0; c < child_count; c++) { if (c > 0) { sql += ", "; } @@ -183,5 +185,4 @@ optional_ptr PostgresTypeSet::CreateType(ClientContext &context, C return CreateEntry(std::move(type_entry)); } - -} +} // namespace duckdb diff --git a/src/storage/postgres_update.cpp b/src/storage/postgres_update.cpp index 3d327bd4..d625006f 100644 --- a/src/storage/postgres_update.cpp +++ b/src/storage/postgres_update.cpp @@ -68,7 +68,6 @@ string GetUpdateSQL(const string &name, PostgresTableEntry &table, const vector< return result; } - unique_ptr PostgresUpdate::GetGlobalSinkState(ClientContext &context) const { auto &postgres_table = table.Cast(); @@ -92,7 +91,8 @@ unique_ptr PostgresUpdate::GetGlobalSinkState(ClientContext &co // begin the COPY TO string schema_name; vector column_names; - connection.BeginCopyTo(context, result->copy_state, PostgresCopyFormat::TEXT, schema_name, table_name, column_names); + connection.BeginCopyTo(context, result->copy_state, PostgresCopyFormat::TEXT, schema_name, table_name, + column_names); return std::move(result); } @@ -104,7 +104,7 @@ SinkResultType PostgresUpdate::Sink(ExecutionContext &context, DataChunk &chunk, chunk.Flatten(); // reference the data columns directly - for(idx_t c = 0; c < columns.size(); c++) { + for (idx_t c = 0; c < columns.size(); c++) { gstate.insert_chunk.data[c].Reference(chunk.data[c]); } // convert our row ids back into ctids @@ -139,7 +139,7 @@ SinkResultType PostgresUpdate::Sink(ExecutionContext &context, DataChunk &chunk, // Finalize //===--------------------------------------------------------------------===// SinkFinalizeType PostgresUpdate::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, - OperatorSinkFinalizeInput &input) const { + OperatorSinkFinalizeInput &input) const { auto &gstate = input.global_state.Cast(); auto &transaction = PostgresTransaction::Get(context, gstate.table.catalog); auto &connection = transaction.GetConnection(); @@ -153,7 +153,8 @@ SinkFinalizeType PostgresUpdate::Finalize(Pipeline &pipeline, Event &event, Clie //===--------------------------------------------------------------------===// // GetData //===--------------------------------------------------------------------===// -SourceResultType PostgresUpdate::GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const { +SourceResultType PostgresUpdate::GetData(ExecutionContext &context, DataChunk &chunk, + OperatorSourceInput &input) const { auto &insert_gstate = sink_state->Cast(); chunk.SetCardinality(1); chunk.SetValue(0, 0, Value::BIGINT(insert_gstate.update_count)); @@ -176,7 +177,7 @@ string PostgresUpdate::ParamsToString() const { // Plan //===--------------------------------------------------------------------===// unique_ptr PostgresCatalog::PlanUpdate(ClientContext &context, LogicalUpdate &op, - unique_ptr plan) { + unique_ptr plan) { if (op.return_chunk) { throw BinderException("RETURNING clause not yet supported for updates of a Postgres table"); } diff --git a/test/sql/storage/attach_issue_146.test b/test/sql/storage/attach_issue_146.test new file mode 100644 index 00000000..140dc26f --- /dev/null +++ b/test/sql/storage/attach_issue_146.test @@ -0,0 +1,22 @@ +# name: test/sql/storage/attach_keywords.test +# description: Test quoting in ATTACH with keyword identifiers +# group: [storage] + +require postgres_scanner + +require-env POSTGRES_TEST_DATABASE_AVAILABLE + +statement ok +ATTACH 'dbname=postgresscanner' AS postgres_db (TYPE POSTGRES) + +statement ok +DROP TABLE IF EXISTS postgres_db.public.test_table_1; + +statement ok +DROP TABLE IF EXISTS postgres_db.public.test_table_2; + +statement ok +create table postgres_db.public.test_table_1 (id int); + +statement ok +create table postgres_db.public.test_table_2 as select * from values ( (1) ) as V(id) where id not in (select id from postgres_db.public.test_table_1);