Skip to content

Commit

Permalink
Several fixes for COPY FROM DATABASE - correctly mark information_sch…
Browse files Browse the repository at this point in the history
…ema and pg_ schemas as internal, and fix support for built-in geometry types
  • Loading branch information
Mytherin committed Jan 23, 2024
1 parent 0a7e63e commit 72b22ed
Show file tree
Hide file tree
Showing 19 changed files with 187 additions and 25 deletions.
58 changes: 56 additions & 2 deletions src/include/postgres_binary_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,43 @@ struct PostgresBinaryReader {
return (config.is_negative ? -base_res : base_res);
}

void ReadGeometry(const LogicalType &type, const PostgresType &postgres_type, Vector &out_vec, idx_t output_offset) {
idx_t element_count = 0;
switch(postgres_type.info) {
case PostgresTypeAnnotation::GEOM_LINE:
case PostgresTypeAnnotation::GEOM_CIRCLE:
element_count = 3;
break;
case PostgresTypeAnnotation::GEOM_LINE_SEGMENT:
case PostgresTypeAnnotation::GEOM_BOX:
element_count = 4;
break;
case PostgresTypeAnnotation::GEOM_PATH: {
// variable number of elements
auto path_is_closed = ReadBoolean(); // ignored for now
element_count = 2 * ReadInteger<uint32_t>();
break;
}
case PostgresTypeAnnotation::GEOM_POLYGON:
// variable number of elements
element_count = 2 * ReadInteger<uint32_t>();
break;
default:
throw InternalException("Unsupported type for ReadGeometry");
}
auto list_entries = FlatVector::GetData<list_entry_t>(out_vec);
auto child_offset = ListVector::GetListSize(out_vec);
ListVector::Reserve(out_vec, child_offset + element_count);
list_entries[output_offset].offset = child_offset;
list_entries[output_offset].length = element_count;
auto &child_vector = ListVector::GetEntry(out_vec);
auto child_data = FlatVector::GetData<double>(child_vector);
for(idx_t i = 0; i < element_count; i++) {
child_data[child_offset + i] = ReadDouble();
}
ListVector::SetListSize(out_vec, child_offset + element_count);
}

void ReadArray(const LogicalType &type, const PostgresType &postgres_type, Vector &out_vec, idx_t output_offset,
uint32_t current_count, uint32_t dimensions[], uint32_t ndim) {
auto list_entries = FlatVector::GetData<list_entry_t>(out_vec);
Expand Down Expand Up @@ -451,6 +488,18 @@ struct PostgresBinaryReader {
list_entry.length = 0;
break;
}
switch(postgres_type.info) {
case PostgresTypeAnnotation::GEOM_LINE:
case PostgresTypeAnnotation::GEOM_LINE_SEGMENT:
case PostgresTypeAnnotation::GEOM_BOX:
case PostgresTypeAnnotation::GEOM_PATH:
case PostgresTypeAnnotation::GEOM_POLYGON:
case PostgresTypeAnnotation::GEOM_CIRCLE:
ReadGeometry(type, postgres_type, out_vec, output_offset);
return;
default:
break;
}
D_ASSERT(value_len >= 3 * sizeof(uint32_t));
auto array_dim = ReadInteger<uint32_t>();
auto array_has_null = ReadInteger<uint32_t>(); // whether or not the array has nulls - ignore
Expand All @@ -472,8 +521,7 @@ struct PostgresBinaryReader {
"Expected an array with %llu dimensions, but this array has %llu dimensions. The array stored in "
"Postgres does not match the schema. Postgres does not enforce that arrays match the provided "
"schema but DuckDB requires this.\nSet pg_array_as_varchar=true to read the array as a varchar "
"instead. Note that you might have to run CALL pg_clear_cache() to clear cached type information "
"as well.",
"instead.",
expected_dimensions, array_dim);
}
auto dimensions = unique_ptr<uint32_t[]>(new uint32_t[array_dim]);
Expand All @@ -487,6 +535,12 @@ struct PostgresBinaryReader {
}
case LogicalTypeId::STRUCT: {
auto &child_entries = StructVector::GetEntries(out_vec);
if (postgres_type.info == PostgresTypeAnnotation::GEOM_POINT) {
D_ASSERT(value_len == sizeof(double) * 2);
FlatVector::GetData<double>(*child_entries[0])[output_offset] = ReadDouble();
FlatVector::GetData<double>(*child_entries[1])[output_offset] = ReadDouble();
break;
}
auto entry_count = ReadInteger<uint32_t>();
if (entry_count != child_entries.size()) {
throw InternalException("Mismatch in entry count: expected %d but got %d", child_entries.size(),
Expand Down
2 changes: 2 additions & 0 deletions src/include/postgres_scanner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class PostgresScanFunctionFilterPushdown : public TableFunction {
class PostgresClearCacheFunction : public TableFunction {
public:
PostgresClearCacheFunction();

static void ClearCacheOnSetting(ClientContext &context, SetScope scope, Value &parameter);
};

class PostgresQueryFunction : public TableFunction {
Expand Down
16 changes: 15 additions & 1 deletion src/include/postgres_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,21 @@ struct PostgresTypeData {
idx_t array_dimensions = 0;
};

enum class PostgresTypeAnnotation { STANDARD, CAST_TO_VARCHAR, NUMERIC_AS_DOUBLE, CTID, JSONB, FIXED_LENGTH_CHAR };
enum class PostgresTypeAnnotation {
STANDARD,
CAST_TO_VARCHAR,
NUMERIC_AS_DOUBLE,
CTID,
JSONB,
FIXED_LENGTH_CHAR,
GEOM_POINT,
GEOM_LINE,
GEOM_LINE_SEGMENT,
GEOM_BOX,
GEOM_PATH,
GEOM_POLYGON,
GEOM_CIRCLE
};

struct PostgresType {
idx_t oid = 0;
Expand Down
13 changes: 12 additions & 1 deletion src/include/storage/postgres_catalog_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
namespace duckdb {
struct DropInfo;
class PostgresResult;
class PostgresSchemaEntry;
class PostgresTransaction;

class PostgresCatalogSet {
Expand All @@ -24,7 +25,7 @@ class PostgresCatalogSet {
optional_ptr<CatalogEntry> GetEntry(ClientContext &context, const string &name);
void DropEntry(ClientContext &context, DropInfo &info);
void Scan(ClientContext &context, const std::function<void(CatalogEntry &)> &callback);
optional_ptr<CatalogEntry> CreateEntry(unique_ptr<CatalogEntry> entry);
virtual optional_ptr<CatalogEntry> CreateEntry(unique_ptr<CatalogEntry> entry);
void ClearEntries();
virtual bool SupportReload() const {
return false;
Expand All @@ -51,6 +52,16 @@ class PostgresCatalogSet {
atomic<bool> is_loaded;
};

class PostgresInSchemaSet : public PostgresCatalogSet {
public:
PostgresInSchemaSet(PostgresSchemaEntry &schema, bool is_loaded);

optional_ptr<CatalogEntry> CreateEntry(unique_ptr<CatalogEntry> entry) override;

protected:
PostgresSchemaEntry &schema;
};

struct PostgresResultSlice {
PostgresResultSlice(shared_ptr<PostgresResult> result_p, idx_t start, idx_t end)
: result(std::move(result_p)), start(start), end(end) {
Expand Down
3 changes: 1 addition & 2 deletions src/include/storage/postgres_index_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
namespace duckdb {
class PostgresSchemaEntry;

class PostgresIndexSet : public PostgresCatalogSet {
class PostgresIndexSet : public PostgresInSchemaSet {
public:
PostgresIndexSet(PostgresSchemaEntry &schema, unique_ptr<PostgresResultSlice> index_result = nullptr);

Expand All @@ -27,7 +27,6 @@ class PostgresIndexSet : public PostgresCatalogSet {
void LoadEntries(ClientContext &context) override;

protected:
PostgresSchemaEntry &schema;
unique_ptr<PostgresResultSlice> index_result;
};

Expand Down
2 changes: 2 additions & 0 deletions src/include/storage/postgres_schema_entry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class PostgresSchemaEntry : public SchemaCatalogEntry {

PostgresCatalogSet &GetCatalogSet(CatalogType type);

static bool SchemaIsInternal(const string &name);

private:
PostgresTableSet tables;
PostgresIndexSet indexes;
Expand Down
3 changes: 1 addition & 2 deletions src/include/storage/postgres_table_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class PostgresConnection;
class PostgresResult;
class PostgresSchemaEntry;

class PostgresTableSet : public PostgresCatalogSet {
class PostgresTableSet : public PostgresInSchemaSet {
public:
explicit PostgresTableSet(PostgresSchemaEntry &schema, unique_ptr<PostgresResultSlice> tables = nullptr);

Expand Down Expand Up @@ -51,7 +51,6 @@ class PostgresTableSet : public PostgresCatalogSet {
void CreateEntries(PostgresTransaction &transaction, PostgresResult &result, idx_t start, idx_t end);

protected:
PostgresSchemaEntry &schema;
unique_ptr<PostgresResultSlice> table_result;
};

Expand Down
3 changes: 1 addition & 2 deletions src/include/storage/postgres_type_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class PostgresResult;
class PostgresSchemaEntry;
struct PGTypeInfo;

class PostgresTypeSet : public PostgresCatalogSet {
class PostgresTypeSet : public PostgresInSchemaSet {
public:
explicit PostgresTypeSet(PostgresSchemaEntry &schema, unique_ptr<PostgresResultSlice> enum_result = nullptr,
unique_ptr<PostgresResultSlice> composite_type_result = nullptr);
Expand All @@ -42,7 +42,6 @@ class PostgresTypeSet : public PostgresCatalogSet {
void InitializeCompositeTypes(PostgresTransaction &transaction, PostgresResultSlice &composite_types);

protected:
PostgresSchemaEntry &schema;
unique_ptr<PostgresResultSlice> enum_result;
unique_ptr<PostgresResultSlice> composite_type_result;
};
Expand Down
2 changes: 1 addition & 1 deletion src/postgres_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ static void LoadInternal(DatabaseInstance &db) {
SetPostgresConnectionLimit);
config.AddExtensionOption("pg_array_as_varchar",
"Read Postgres arrays as varchar - enables reading mixed dimensional arrays",
LogicalType::BOOLEAN, Value::BOOLEAN(false));
LogicalType::BOOLEAN, Value::BOOLEAN(false), PostgresClearCacheFunction::ClearCacheOnSetting);
config.AddExtensionOption("pg_connection_cache",
"Whether or not to use the connection cache", LogicalType::BOOLEAN,
Value::BOOLEAN(true), PostgresConnectionPool::PostgresSetConnectionCache);
Expand Down
3 changes: 3 additions & 0 deletions src/postgres_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ static void PostgresInitInternal(ClientContext &context, const PostgresBindData
col_names += "::VARCHAR";
}
if (bind_data->types[column_id].id() == LogicalTypeId::LIST) {
if (bind_data->postgres_types[column_id].info != PostgresTypeAnnotation::STANDARD) {
continue;
}
if (bind_data->postgres_types[column_id].children[0].info == PostgresTypeAnnotation::CAST_TO_VARCHAR) {
col_names += "::VARCHAR[]";
}
Expand Down
24 changes: 24 additions & 0 deletions src/postgres_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,30 @@ LogicalType PostgresUtils::TypeToLogicalType(optional_ptr<PostgresTransaction> t
return LogicalType::INTERVAL;
} else if (pgtypename == "uuid") {
return LogicalType::UUID;
} else if (pgtypename == "point") {
postgres_type.info = PostgresTypeAnnotation::GEOM_POINT;
child_list_t<LogicalType> point_struct;
point_struct.emplace_back(make_pair("x", LogicalType::DOUBLE));
point_struct.emplace_back(make_pair("y", LogicalType::DOUBLE));
return LogicalType::STRUCT(point_struct);
} else if (pgtypename == "line") {
postgres_type.info = PostgresTypeAnnotation::GEOM_LINE;
return LogicalType::LIST(LogicalType::DOUBLE);
} else if (pgtypename == "lseg") {
postgres_type.info = PostgresTypeAnnotation::GEOM_LINE_SEGMENT;
return LogicalType::LIST(LogicalType::DOUBLE);
} else if (pgtypename == "box") {
postgres_type.info = PostgresTypeAnnotation::GEOM_BOX;
return LogicalType::LIST(LogicalType::DOUBLE);
} else if (pgtypename == "path") {
postgres_type.info = PostgresTypeAnnotation::GEOM_PATH;
return LogicalType::LIST(LogicalType::DOUBLE);
} else if (pgtypename == "polygon") {
postgres_type.info = PostgresTypeAnnotation::GEOM_POLYGON;
return LogicalType::LIST(LogicalType::DOUBLE);
} else if (pgtypename == "circle") {
postgres_type.info = PostgresTypeAnnotation::GEOM_CIRCLE;
return LogicalType::LIST(LogicalType::DOUBLE);
} else {
if (!transaction) {
// unsupported so fallback to varchar
Expand Down
11 changes: 11 additions & 0 deletions src/storage/postgres_catalog_set.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "storage/postgres_catalog_set.hpp"
#include "storage/postgres_transaction.hpp"
#include "duckdb/parser/parsed_data/drop_info.hpp"
#include "storage/postgres_schema_entry.hpp"

namespace duckdb {

PostgresCatalogSet::PostgresCatalogSet(Catalog &catalog, bool is_loaded_p) : catalog(catalog), is_loaded(is_loaded_p) {
Expand Down Expand Up @@ -103,4 +105,13 @@ void PostgresCatalogSet::ClearEntries() {
is_loaded = false;
}

PostgresInSchemaSet::PostgresInSchemaSet(PostgresSchemaEntry &schema, bool is_loaded) :
PostgresCatalogSet(schema.ParentCatalog(), is_loaded), schema(schema) {
}

optional_ptr<CatalogEntry> PostgresInSchemaSet::CreateEntry(unique_ptr<CatalogEntry> entry) {
entry->internal = schema.internal;
return PostgresCatalogSet::CreateEntry(std::move(entry));
}

} // namespace duckdb
18 changes: 13 additions & 5 deletions src/storage/postgres_clear_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ static unique_ptr<FunctionData> ClearCacheBind(ClientContext &context, TableFunc
return std::move(result);
}

static void ClearCacheFunction(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) {
auto &data = data_p.bind_data->CastNoConst<ClearCacheFunctionData>();
if (data.finished) {
return;
}
static void ClearPostgresCaches(ClientContext &context) {
auto databases = DatabaseManager::Get(context).GetDatabases(context);
for (auto &db_ref : databases) {
auto &db = db_ref.get();
Expand All @@ -35,9 +31,21 @@ static void ClearCacheFunction(ClientContext &context, TableFunctionInput &data_
}
catalog.Cast<PostgresCatalog>().ClearCache();
}
}

static void ClearCacheFunction(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) {
auto &data = data_p.bind_data->CastNoConst<ClearCacheFunctionData>();
if (data.finished) {
return;
}
ClearPostgresCaches(context);
data.finished = true;
}

void PostgresClearCacheFunction::ClearCacheOnSetting(ClientContext &context, SetScope scope, Value &parameter) {
ClearPostgresCaches(context);
}

PostgresClearCacheFunction::PostgresClearCacheFunction()
: TableFunction("pg_clear_cache", {}, ClearCacheFunction, ClearCacheBind) {
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/postgres_index_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace duckdb {

PostgresIndexSet::PostgresIndexSet(PostgresSchemaEntry &schema, unique_ptr<PostgresResultSlice> index_result_p)
: PostgresCatalogSet(schema.ParentCatalog(), !index_result_p), schema(schema), index_result(std::move(index_result_p)) {
: PostgresInSchemaSet(schema, !index_result_p), index_result(std::move(index_result_p)) {
}

string PostgresIndexSet::GetInitializeQuery() {
Expand Down
11 changes: 9 additions & 2 deletions src/storage/postgres_schema_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,24 @@
namespace duckdb {

PostgresSchemaEntry::PostgresSchemaEntry(Catalog &catalog, string name)
: SchemaCatalogEntry(catalog, std::move(name), true), tables(*this), indexes(*this), types(*this) {
: SchemaCatalogEntry(catalog, name, SchemaIsInternal(name)), tables(*this), indexes(*this), types(*this) {
}

PostgresSchemaEntry::PostgresSchemaEntry(Catalog &catalog, string name, unique_ptr<PostgresResultSlice> tables,
unique_ptr<PostgresResultSlice> enums,
unique_ptr<PostgresResultSlice> composite_types,
unique_ptr<PostgresResultSlice> indexes)
: SchemaCatalogEntry(catalog, std::move(name), true), tables(*this, std::move(tables)),
: SchemaCatalogEntry(catalog, name, SchemaIsInternal(name)), tables(*this, std::move(tables)),
indexes(*this, std::move(indexes)), types(*this, std::move(enums), std::move(composite_types)) {
}

bool PostgresSchemaEntry::SchemaIsInternal(const string &name) {
if (name == "information_schema" || StringUtil::StartsWith(name, "pg_")) {
return true;
}
return false;
}

PostgresTransaction &GetPostgresTransaction(CatalogTransaction transaction) {
if (!transaction.transaction) {
throw InternalException("No transaction!?");
Expand Down
8 changes: 4 additions & 4 deletions src/storage/postgres_table_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
namespace duckdb {

PostgresTableSet::PostgresTableSet(PostgresSchemaEntry &schema, unique_ptr<PostgresResultSlice> table_result_p)
: PostgresCatalogSet(schema.ParentCatalog(), !table_result_p), schema(schema), table_result(std::move(table_result_p)) {
: PostgresInSchemaSet(schema, !table_result_p), table_result(std::move(table_result_p)) {
}

string PostgresTableSet::GetInitializeQuery() {
Expand All @@ -26,7 +26,7 @@ FROM pg_class
JOIN pg_namespace ON relnamespace = pg_namespace.oid
JOIN pg_attribute ON pg_class.oid=pg_attribute.attrelid
JOIN pg_type ON atttypid=pg_type.oid
WHERE attnum > 0
WHERE attnum > 0 AND relkind IN ('r', 'v', 'm', 'f', 'p')
ORDER BY pg_namespace.oid, relname, attnum;
)";
}
Expand Down Expand Up @@ -96,7 +96,7 @@ void PostgresTableSet::LoadEntries(ClientContext &context) {
JOIN pg_namespace ON relnamespace = pg_namespace.oid
JOIN pg_attribute ON pg_class.oid=pg_attribute.attrelid
JOIN pg_type ON atttypid=pg_type.oid
WHERE pg_namespace.nspname=${SCHEMA_NAME} AND attnum > 0
WHERE pg_namespace.nspname=${SCHEMA_NAME} AND attnum > 0 AND relkind IN ('r', 'v', 'm', 'f', 'p')
ORDER BY relname, attnum;
)",
"${SCHEMA_NAME}", KeywordHelper::WriteQuoted(schema.name));
Expand All @@ -116,7 +116,7 @@ FROM pg_class
JOIN pg_namespace ON relnamespace = pg_namespace.oid
JOIN pg_attribute ON pg_class.oid=pg_attribute.attrelid
JOIN pg_type ON atttypid=pg_type.oid
WHERE pg_namespace.nspname=${SCHEMA_NAME} AND relname=${TABLE_NAME} AND attnum > 0
WHERE pg_namespace.nspname=${SCHEMA_NAME} AND relname=${TABLE_NAME} AND attnum > 0 AND relkind IN ('r', 'v', 'm', 'f', 'p')
ORDER BY relname, attnum;
)",
"${SCHEMA_NAME}", KeywordHelper::WriteQuoted(schema_name)),
Expand Down
Loading

0 comments on commit 72b22ed

Please sign in to comment.