Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for built-in geometry types, and several fixes for COPY FROM DATABASE/EXPORT #173

Merged
merged 3 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 56 additions & 2 deletions src/include/postgres_binary_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,43 @@ struct PostgresBinaryReader {
return (config.is_negative ? -base_res : base_res);
}

void ReadGeometry(const LogicalType &type, const PostgresType &postgres_type, Vector &out_vec, idx_t output_offset) {
idx_t element_count = 0;
switch(postgres_type.info) {
case PostgresTypeAnnotation::GEOM_LINE:
case PostgresTypeAnnotation::GEOM_CIRCLE:
element_count = 3;
break;
case PostgresTypeAnnotation::GEOM_LINE_SEGMENT:
case PostgresTypeAnnotation::GEOM_BOX:
element_count = 4;
break;
case PostgresTypeAnnotation::GEOM_PATH: {
// variable number of elements
auto path_is_closed = ReadBoolean(); // ignored for now
element_count = 2 * ReadInteger<uint32_t>();
break;
}
case PostgresTypeAnnotation::GEOM_POLYGON:
// variable number of elements
element_count = 2 * ReadInteger<uint32_t>();
break;
default:
throw InternalException("Unsupported type for ReadGeometry");
}
auto list_entries = FlatVector::GetData<list_entry_t>(out_vec);
auto child_offset = ListVector::GetListSize(out_vec);
ListVector::Reserve(out_vec, child_offset + element_count);
list_entries[output_offset].offset = child_offset;
list_entries[output_offset].length = element_count;
auto &child_vector = ListVector::GetEntry(out_vec);
auto child_data = FlatVector::GetData<double>(child_vector);
for(idx_t i = 0; i < element_count; i++) {
child_data[child_offset + i] = ReadDouble();
}
ListVector::SetListSize(out_vec, child_offset + element_count);
}

void ReadArray(const LogicalType &type, const PostgresType &postgres_type, Vector &out_vec, idx_t output_offset,
uint32_t current_count, uint32_t dimensions[], uint32_t ndim) {
auto list_entries = FlatVector::GetData<list_entry_t>(out_vec);
Expand Down Expand Up @@ -451,6 +488,18 @@ struct PostgresBinaryReader {
list_entry.length = 0;
break;
}
switch(postgres_type.info) {
case PostgresTypeAnnotation::GEOM_LINE:
case PostgresTypeAnnotation::GEOM_LINE_SEGMENT:
case PostgresTypeAnnotation::GEOM_BOX:
case PostgresTypeAnnotation::GEOM_PATH:
case PostgresTypeAnnotation::GEOM_POLYGON:
case PostgresTypeAnnotation::GEOM_CIRCLE:
ReadGeometry(type, postgres_type, out_vec, output_offset);
return;
default:
break;
}
D_ASSERT(value_len >= 3 * sizeof(uint32_t));
auto array_dim = ReadInteger<uint32_t>();
auto array_has_null = ReadInteger<uint32_t>(); // whether or not the array has nulls - ignore
Expand All @@ -472,8 +521,7 @@ struct PostgresBinaryReader {
"Expected an array with %llu dimensions, but this array has %llu dimensions. The array stored in "
"Postgres does not match the schema. Postgres does not enforce that arrays match the provided "
"schema but DuckDB requires this.\nSet pg_array_as_varchar=true to read the array as a varchar "
"instead. Note that you might have to run CALL pg_clear_cache() to clear cached type information "
"as well.",
"instead.",
expected_dimensions, array_dim);
}
auto dimensions = unique_ptr<uint32_t[]>(new uint32_t[array_dim]);
Expand All @@ -487,6 +535,12 @@ struct PostgresBinaryReader {
}
case LogicalTypeId::STRUCT: {
auto &child_entries = StructVector::GetEntries(out_vec);
if (postgres_type.info == PostgresTypeAnnotation::GEOM_POINT) {
D_ASSERT(value_len == sizeof(double) * 2);
FlatVector::GetData<double>(*child_entries[0])[output_offset] = ReadDouble();
FlatVector::GetData<double>(*child_entries[1])[output_offset] = ReadDouble();
break;
}
auto entry_count = ReadInteger<uint32_t>();
if (entry_count != child_entries.size()) {
throw InternalException("Mismatch in entry count: expected %d but got %d", child_entries.size(),
Expand Down
2 changes: 2 additions & 0 deletions src/include/postgres_scanner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class PostgresScanFunctionFilterPushdown : public TableFunction {
class PostgresClearCacheFunction : public TableFunction {
public:
PostgresClearCacheFunction();

static void ClearCacheOnSetting(ClientContext &context, SetScope scope, Value &parameter);
};

class PostgresQueryFunction : public TableFunction {
Expand Down
16 changes: 15 additions & 1 deletion src/include/postgres_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,21 @@ struct PostgresTypeData {
idx_t array_dimensions = 0;
};

enum class PostgresTypeAnnotation { STANDARD, CAST_TO_VARCHAR, NUMERIC_AS_DOUBLE, CTID, JSONB, FIXED_LENGTH_CHAR };
enum class PostgresTypeAnnotation {
STANDARD,
CAST_TO_VARCHAR,
NUMERIC_AS_DOUBLE,
CTID,
JSONB,
FIXED_LENGTH_CHAR,
GEOM_POINT,
GEOM_LINE,
GEOM_LINE_SEGMENT,
GEOM_BOX,
GEOM_PATH,
GEOM_POLYGON,
GEOM_CIRCLE
};

struct PostgresType {
idx_t oid = 0;
Expand Down
13 changes: 12 additions & 1 deletion src/include/storage/postgres_catalog_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
namespace duckdb {
struct DropInfo;
class PostgresResult;
class PostgresSchemaEntry;
class PostgresTransaction;

class PostgresCatalogSet {
Expand All @@ -24,7 +25,7 @@ class PostgresCatalogSet {
optional_ptr<CatalogEntry> GetEntry(ClientContext &context, const string &name);
void DropEntry(ClientContext &context, DropInfo &info);
void Scan(ClientContext &context, const std::function<void(CatalogEntry &)> &callback);
optional_ptr<CatalogEntry> CreateEntry(unique_ptr<CatalogEntry> entry);
virtual optional_ptr<CatalogEntry> CreateEntry(unique_ptr<CatalogEntry> entry);
void ClearEntries();
virtual bool SupportReload() const {
return false;
Expand All @@ -51,6 +52,16 @@ class PostgresCatalogSet {
atomic<bool> is_loaded;
};

class PostgresInSchemaSet : public PostgresCatalogSet {
public:
PostgresInSchemaSet(PostgresSchemaEntry &schema, bool is_loaded);

optional_ptr<CatalogEntry> CreateEntry(unique_ptr<CatalogEntry> entry) override;

protected:
PostgresSchemaEntry &schema;
};

struct PostgresResultSlice {
PostgresResultSlice(shared_ptr<PostgresResult> result_p, idx_t start, idx_t end)
: result(std::move(result_p)), start(start), end(end) {
Expand Down
3 changes: 1 addition & 2 deletions src/include/storage/postgres_index_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
namespace duckdb {
class PostgresSchemaEntry;

class PostgresIndexSet : public PostgresCatalogSet {
class PostgresIndexSet : public PostgresInSchemaSet {
public:
PostgresIndexSet(PostgresSchemaEntry &schema, unique_ptr<PostgresResultSlice> index_result = nullptr);

Expand All @@ -27,7 +27,6 @@ class PostgresIndexSet : public PostgresCatalogSet {
void LoadEntries(ClientContext &context) override;

protected:
PostgresSchemaEntry &schema;
unique_ptr<PostgresResultSlice> index_result;
};

Expand Down
2 changes: 2 additions & 0 deletions src/include/storage/postgres_schema_entry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class PostgresSchemaEntry : public SchemaCatalogEntry {

PostgresCatalogSet &GetCatalogSet(CatalogType type);

static bool SchemaIsInternal(const string &name);

private:
PostgresTableSet tables;
PostgresIndexSet indexes;
Expand Down
3 changes: 1 addition & 2 deletions src/include/storage/postgres_table_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class PostgresConnection;
class PostgresResult;
class PostgresSchemaEntry;

class PostgresTableSet : public PostgresCatalogSet {
class PostgresTableSet : public PostgresInSchemaSet {
public:
explicit PostgresTableSet(PostgresSchemaEntry &schema, unique_ptr<PostgresResultSlice> tables = nullptr);

Expand Down Expand Up @@ -51,7 +51,6 @@ class PostgresTableSet : public PostgresCatalogSet {
void CreateEntries(PostgresTransaction &transaction, PostgresResult &result, idx_t start, idx_t end);

protected:
PostgresSchemaEntry &schema;
unique_ptr<PostgresResultSlice> table_result;
};

Expand Down
3 changes: 1 addition & 2 deletions src/include/storage/postgres_type_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class PostgresResult;
class PostgresSchemaEntry;
struct PGTypeInfo;

class PostgresTypeSet : public PostgresCatalogSet {
class PostgresTypeSet : public PostgresInSchemaSet {
public:
explicit PostgresTypeSet(PostgresSchemaEntry &schema, unique_ptr<PostgresResultSlice> enum_result = nullptr,
unique_ptr<PostgresResultSlice> composite_type_result = nullptr);
Expand All @@ -42,7 +42,6 @@ class PostgresTypeSet : public PostgresCatalogSet {
void InitializeCompositeTypes(PostgresTransaction &transaction, PostgresResultSlice &composite_types);

protected:
PostgresSchemaEntry &schema;
unique_ptr<PostgresResultSlice> enum_result;
unique_ptr<PostgresResultSlice> composite_type_result;
};
Expand Down
2 changes: 1 addition & 1 deletion src/postgres_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ static void LoadInternal(DatabaseInstance &db) {
SetPostgresConnectionLimit);
config.AddExtensionOption("pg_array_as_varchar",
"Read Postgres arrays as varchar - enables reading mixed dimensional arrays",
LogicalType::BOOLEAN, Value::BOOLEAN(false));
LogicalType::BOOLEAN, Value::BOOLEAN(false), PostgresClearCacheFunction::ClearCacheOnSetting);
config.AddExtensionOption("pg_connection_cache",
"Whether or not to use the connection cache", LogicalType::BOOLEAN,
Value::BOOLEAN(true), PostgresConnectionPool::PostgresSetConnectionCache);
Expand Down
3 changes: 3 additions & 0 deletions src/postgres_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ static void PostgresInitInternal(ClientContext &context, const PostgresBindData
col_names += "::VARCHAR";
}
if (bind_data->types[column_id].id() == LogicalTypeId::LIST) {
if (bind_data->postgres_types[column_id].info != PostgresTypeAnnotation::STANDARD) {
continue;
}
if (bind_data->postgres_types[column_id].children[0].info == PostgresTypeAnnotation::CAST_TO_VARCHAR) {
col_names += "::VARCHAR[]";
}
Expand Down
24 changes: 24 additions & 0 deletions src/postgres_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,30 @@ LogicalType PostgresUtils::TypeToLogicalType(optional_ptr<PostgresTransaction> t
return LogicalType::INTERVAL;
} else if (pgtypename == "uuid") {
return LogicalType::UUID;
} else if (pgtypename == "point") {
postgres_type.info = PostgresTypeAnnotation::GEOM_POINT;
child_list_t<LogicalType> point_struct;
point_struct.emplace_back(make_pair("x", LogicalType::DOUBLE));
point_struct.emplace_back(make_pair("y", LogicalType::DOUBLE));
return LogicalType::STRUCT(point_struct);
} else if (pgtypename == "line") {
postgres_type.info = PostgresTypeAnnotation::GEOM_LINE;
return LogicalType::LIST(LogicalType::DOUBLE);
} else if (pgtypename == "lseg") {
postgres_type.info = PostgresTypeAnnotation::GEOM_LINE_SEGMENT;
return LogicalType::LIST(LogicalType::DOUBLE);
} else if (pgtypename == "box") {
postgres_type.info = PostgresTypeAnnotation::GEOM_BOX;
return LogicalType::LIST(LogicalType::DOUBLE);
} else if (pgtypename == "path") {
postgres_type.info = PostgresTypeAnnotation::GEOM_PATH;
return LogicalType::LIST(LogicalType::DOUBLE);
} else if (pgtypename == "polygon") {
postgres_type.info = PostgresTypeAnnotation::GEOM_POLYGON;
return LogicalType::LIST(LogicalType::DOUBLE);
} else if (pgtypename == "circle") {
postgres_type.info = PostgresTypeAnnotation::GEOM_CIRCLE;
return LogicalType::LIST(LogicalType::DOUBLE);
} else {
if (!transaction) {
// unsupported so fallback to varchar
Expand Down
11 changes: 11 additions & 0 deletions src/storage/postgres_catalog_set.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "storage/postgres_catalog_set.hpp"
#include "storage/postgres_transaction.hpp"
#include "duckdb/parser/parsed_data/drop_info.hpp"
#include "storage/postgres_schema_entry.hpp"

namespace duckdb {

PostgresCatalogSet::PostgresCatalogSet(Catalog &catalog, bool is_loaded_p) : catalog(catalog), is_loaded(is_loaded_p) {
Expand Down Expand Up @@ -103,4 +105,13 @@ void PostgresCatalogSet::ClearEntries() {
is_loaded = false;
}

PostgresInSchemaSet::PostgresInSchemaSet(PostgresSchemaEntry &schema, bool is_loaded) :
PostgresCatalogSet(schema.ParentCatalog(), is_loaded), schema(schema) {
}

optional_ptr<CatalogEntry> PostgresInSchemaSet::CreateEntry(unique_ptr<CatalogEntry> entry) {
entry->internal = schema.internal;
return PostgresCatalogSet::CreateEntry(std::move(entry));
}

} // namespace duckdb
18 changes: 13 additions & 5 deletions src/storage/postgres_clear_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ static unique_ptr<FunctionData> ClearCacheBind(ClientContext &context, TableFunc
return std::move(result);
}

static void ClearCacheFunction(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) {
auto &data = data_p.bind_data->CastNoConst<ClearCacheFunctionData>();
if (data.finished) {
return;
}
static void ClearPostgresCaches(ClientContext &context) {
auto databases = DatabaseManager::Get(context).GetDatabases(context);
for (auto &db_ref : databases) {
auto &db = db_ref.get();
Expand All @@ -35,9 +31,21 @@ static void ClearCacheFunction(ClientContext &context, TableFunctionInput &data_
}
catalog.Cast<PostgresCatalog>().ClearCache();
}
}

static void ClearCacheFunction(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) {
auto &data = data_p.bind_data->CastNoConst<ClearCacheFunctionData>();
if (data.finished) {
return;
}
ClearPostgresCaches(context);
data.finished = true;
}

void PostgresClearCacheFunction::ClearCacheOnSetting(ClientContext &context, SetScope scope, Value &parameter) {
ClearPostgresCaches(context);
}

PostgresClearCacheFunction::PostgresClearCacheFunction()
: TableFunction("pg_clear_cache", {}, ClearCacheFunction, ClearCacheBind) {
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/postgres_index_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace duckdb {

PostgresIndexSet::PostgresIndexSet(PostgresSchemaEntry &schema, unique_ptr<PostgresResultSlice> index_result_p)
: PostgresCatalogSet(schema.ParentCatalog(), !index_result_p), schema(schema), index_result(std::move(index_result_p)) {
: PostgresInSchemaSet(schema, !index_result_p), index_result(std::move(index_result_p)) {
}

string PostgresIndexSet::GetInitializeQuery() {
Expand Down
11 changes: 9 additions & 2 deletions src/storage/postgres_schema_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,24 @@
namespace duckdb {

PostgresSchemaEntry::PostgresSchemaEntry(Catalog &catalog, string name)
: SchemaCatalogEntry(catalog, std::move(name), true), tables(*this), indexes(*this), types(*this) {
: SchemaCatalogEntry(catalog, name, SchemaIsInternal(name)), tables(*this), indexes(*this), types(*this) {
}

PostgresSchemaEntry::PostgresSchemaEntry(Catalog &catalog, string name, unique_ptr<PostgresResultSlice> tables,
unique_ptr<PostgresResultSlice> enums,
unique_ptr<PostgresResultSlice> composite_types,
unique_ptr<PostgresResultSlice> indexes)
: SchemaCatalogEntry(catalog, std::move(name), true), tables(*this, std::move(tables)),
: SchemaCatalogEntry(catalog, name, SchemaIsInternal(name)), tables(*this, std::move(tables)),
indexes(*this, std::move(indexes)), types(*this, std::move(enums), std::move(composite_types)) {
}

bool PostgresSchemaEntry::SchemaIsInternal(const string &name) {
if (name == "information_schema" || StringUtil::StartsWith(name, "pg_")) {
return true;
}
return false;
}

PostgresTransaction &GetPostgresTransaction(CatalogTransaction transaction) {
if (!transaction.transaction) {
throw InternalException("No transaction!?");
Expand Down
8 changes: 4 additions & 4 deletions src/storage/postgres_table_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
namespace duckdb {

PostgresTableSet::PostgresTableSet(PostgresSchemaEntry &schema, unique_ptr<PostgresResultSlice> table_result_p)
: PostgresCatalogSet(schema.ParentCatalog(), !table_result_p), schema(schema), table_result(std::move(table_result_p)) {
: PostgresInSchemaSet(schema, !table_result_p), table_result(std::move(table_result_p)) {
}

string PostgresTableSet::GetInitializeQuery() {
Expand All @@ -26,7 +26,7 @@ FROM pg_class
JOIN pg_namespace ON relnamespace = pg_namespace.oid
JOIN pg_attribute ON pg_class.oid=pg_attribute.attrelid
JOIN pg_type ON atttypid=pg_type.oid
WHERE attnum > 0
WHERE attnum > 0 AND relkind IN ('r', 'v', 'm', 'f', 'p')
ORDER BY pg_namespace.oid, relname, attnum;
)";
}
Expand Down Expand Up @@ -96,7 +96,7 @@ void PostgresTableSet::LoadEntries(ClientContext &context) {
JOIN pg_namespace ON relnamespace = pg_namespace.oid
JOIN pg_attribute ON pg_class.oid=pg_attribute.attrelid
JOIN pg_type ON atttypid=pg_type.oid
WHERE pg_namespace.nspname=${SCHEMA_NAME} AND attnum > 0
WHERE pg_namespace.nspname=${SCHEMA_NAME} AND attnum > 0 AND relkind IN ('r', 'v', 'm', 'f', 'p')
ORDER BY relname, attnum;
)",
"${SCHEMA_NAME}", KeywordHelper::WriteQuoted(schema.name));
Expand All @@ -116,7 +116,7 @@ FROM pg_class
JOIN pg_namespace ON relnamespace = pg_namespace.oid
JOIN pg_attribute ON pg_class.oid=pg_attribute.attrelid
JOIN pg_type ON atttypid=pg_type.oid
WHERE pg_namespace.nspname=${SCHEMA_NAME} AND relname=${TABLE_NAME} AND attnum > 0
WHERE pg_namespace.nspname=${SCHEMA_NAME} AND relname=${TABLE_NAME} AND attnum > 0 AND relkind IN ('r', 'v', 'm', 'f', 'p')
ORDER BY relname, attnum;
)",
"${SCHEMA_NAME}", KeywordHelper::WriteQuoted(schema_name)),
Expand Down
Loading
Loading