Skip to content

Commit

Permalink
Merge pull request ClickHouse#56578 from ClickHouse/backport/23.10/56306
Browse files Browse the repository at this point in the history
Backport ClickHouse#56306 to 23.10: Fix restore from backup with `flatten_nested` and `data_type_default_nullable`
  • Loading branch information
robot-clickhouse authored Nov 10, 2023
2 parents 351ac33 + 8d96ec5 commit 171cb24
Show file tree
Hide file tree
Showing 37 changed files with 158 additions and 50 deletions.
2 changes: 1 addition & 1 deletion src/Client/LocalConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ void LocalConnection::sendQuery(

try
{
state->io = executeQuery(state->query, query_context, false, state->stage).second;
state->io = executeQuery(state->query, query_context, QueryFlags{}, state->stage).second;

if (state->io.pipeline.pushing())
{
Expand Down
4 changes: 2 additions & 2 deletions src/Databases/DatabaseOnDisk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ std::pair<String, StoragePtr> createTableFromAST(
auto table_function = factory.get(table_function_ast, context);
ColumnsDescription columns;
if (ast_create_query.columns_list && ast_create_query.columns_list->columns)
columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context, true);
columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context, true, false);
StoragePtr storage = table_function->execute(table_function_ast, context, ast_create_query.getTable(), std::move(columns));
storage->renameInMemory(ast_create_query);
return {ast_create_query.getTable(), storage};
Expand All @@ -99,7 +99,7 @@ std::pair<String, StoragePtr> createTableFromAST(
}
else
{
columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context, true);
columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context, true, false);
constraints = InterpreterCreateQuery::getConstraintsDescription(ast_create_query.columns_list->constraints);
}
}
Expand Down
9 changes: 5 additions & 4 deletions src/Databases/DatabaseReplicated.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ void DatabaseReplicated::checkQueryValid(const ASTPtr & query, ContextPtr query_
}
}

BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, ContextPtr query_context, bool internal)
BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, ContextPtr query_context, QueryFlags flags)
{

if (query_context->getCurrentTransaction() && query_context->getSettingsRef().throw_on_unsupported_query_inside_transaction)
Expand All @@ -728,7 +728,7 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, Contex
if (is_readonly)
throw Exception(ErrorCodes::NO_ZOOKEEPER, "Database is in readonly mode, because it cannot connect to ZooKeeper");

if (!internal && (query_context->getClientInfo().query_kind != ClientInfo::QueryKind::INITIAL_QUERY))
if (!flags.internal && (query_context->getClientInfo().query_kind != ClientInfo::QueryKind::INITIAL_QUERY))
throw Exception(ErrorCodes::INCORRECT_QUERY, "It's not initial query. ON CLUSTER is not allowed for Replicated database.");

checkQueryValid(query, query_context);
Expand All @@ -739,6 +739,7 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, Contex
entry.initiator = ddl_worker->getCommonHostID();
entry.setSettingsIfRequired(query_context);
entry.tracing_context = OpenTelemetry::CurrentContext();
entry.is_backup_restore = flags.distributed_backup_restore;
String node_path = ddl_worker->tryEnqueueAndExecuteEntry(entry, query_context);

Strings hosts_to_wait;
Expand Down Expand Up @@ -916,14 +917,14 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
String query = fmt::format("CREATE DATABASE IF NOT EXISTS {} ENGINE=Ordinary", backQuoteIfNeed(to_db_name));
auto query_context = Context::createCopy(getContext());
query_context->setSetting("allow_deprecated_database_ordinary", 1);
executeQuery(query, query_context, true);
executeQuery(query, query_context, QueryFlags{ .internal = true });

/// But we want to avoid discarding UUID of ReplicatedMergeTree tables, because it will not work
/// if zookeeper_path contains {uuid} macro. Replicated database do not recreate replicated tables on recovery,
/// so it's ok to save UUID of replicated table.
query = fmt::format("CREATE DATABASE IF NOT EXISTS {} ENGINE=Atomic", backQuoteIfNeed(to_db_name_replicated));
query_context = Context::createCopy(getContext());
executeQuery(query, query_context, true);
executeQuery(query, query_context, QueryFlags{ .internal = true });
}

size_t moved_tables = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/Databases/DatabaseReplicated.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class DatabaseReplicated : public DatabaseAtomic

/// Try to execute DLL query on current host as initial query. If query is succeed,
/// then it will be executed on all replicas.
BlockIO tryEnqueueReplicatedDDL(const ASTPtr & query, ContextPtr query_context, bool internal) override;
BlockIO tryEnqueueReplicatedDDL(const ASTPtr & query, ContextPtr query_context, QueryFlags flags) override;

bool canExecuteReplicatedMetadataAlter() const override;

Expand Down
1 change: 1 addition & 0 deletions src/Databases/DatabasesCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ void DatabaseWithOwnTablesBase::createTableRestoredFromBackup(const ASTPtr & cre
/// Creates a table by executing a "CREATE TABLE" query.
InterpreterCreateQuery interpreter{create_table_query, local_context};
interpreter.setInternal(true);
interpreter.setIsRestoreFromBackup(true);
interpreter.execute();
}

Expand Down
3 changes: 2 additions & 1 deletion src/Databases/IDatabase.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Core/UUID.h>
#include <Databases/LoadingStrictnessLevel.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/executeQuery.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <base/types.h>
Expand Down Expand Up @@ -345,7 +346,7 @@ class IDatabase : public std::enable_shared_from_this<IDatabase>

virtual bool shouldReplicateQuery(const ContextPtr & /*query_context*/, const ASTPtr & /*query_ptr*/) const { return false; }

virtual BlockIO tryEnqueueReplicatedDDL(const ASTPtr & /*query*/, ContextPtr /*query_context*/, [[maybe_unused]] bool internal = false) /// NOLINT
virtual BlockIO tryEnqueueReplicatedDDL(const ASTPtr & /*query*/, ContextPtr /*query_context*/, [[maybe_unused]] QueryFlags flags = {}) /// NOLINT
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database engine {} does not have replicated DDL queue", getEngineName());
}
Expand Down
2 changes: 1 addition & 1 deletion src/Databases/MySQL/MaterializedMySQLSyncThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ static BlockIO tryToExecuteQuery(const String & query_to_execute, ContextMutable
if (!database.empty())
query_context->setCurrentDatabase(database);

return executeQuery("/*" + comment + "*/ " + query_to_execute, query_context, true).second;
return executeQuery("/*" + comment + "*/ " + query_to_execute, query_context, QueryFlags{ .internal = true }).second;
}
catch (...)
{
Expand Down
4 changes: 2 additions & 2 deletions src/Dictionaries/ClickHouseDictionarySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ QueryPipeline ClickHouseDictionarySource::createStreamForQuery(const String & qu

if (configuration.is_local)
{
pipeline = executeQuery(query, context_copy, true).second.pipeline;
pipeline = executeQuery(query, context_copy, QueryFlags{ .internal = true }).second.pipeline;
pipeline.convertStructureTo(empty_sample_block.getColumnsWithTypeAndName());
}
else
Expand All @@ -190,7 +190,7 @@ std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & re

if (configuration.is_local)
{
return readInvalidateQuery(executeQuery(request, context_copy, true).second.pipeline);
return readInvalidateQuery(executeQuery(request, context_copy, QueryFlags{ .internal = true }).second.pipeline);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ InterpreterShowAccessEntitiesQuery::InterpreterShowAccessEntitiesQuery(const AST

BlockIO InterpreterShowAccessEntitiesQuery::execute()
{
return executeQuery(getRewrittenQuery(), getContext(), true).second;
return executeQuery(getRewrittenQuery(), getContext(), QueryFlags{ .internal = true }).second;
}


Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/Access/InterpreterShowPrivilegesQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ InterpreterShowPrivilegesQuery::InterpreterShowPrivilegesQuery(const ASTPtr & qu

BlockIO InterpreterShowPrivilegesQuery::execute()
{
return executeQuery("SELECT * FROM system.privileges", context, true).second;
return executeQuery("SELECT * FROM system.privileges", context, QueryFlags{ .internal = true }).second;
}

}
9 changes: 9 additions & 0 deletions src/Interpreters/DDLTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ String DDLLogEntry::toString() const
writeChar('\n', wb);
}

if (version >= BACKUP_RESTORE_FLAG_IN_ZK_VERSION)
wb << "is_backup_restore: " << is_backup_restore << "\n";

return wb.str();
}

Expand Down Expand Up @@ -165,6 +168,12 @@ void DDLLogEntry::parse(const String & data)
checkChar('\n', rb);
}

if (version >= BACKUP_RESTORE_FLAG_IN_ZK_VERSION)
{
checkString("is_backup_restore: ", rb);
readBoolText(is_backup_restore, rb);
checkChar('\n', rb);
}

assertEOF(rb);

Expand Down
4 changes: 3 additions & 1 deletion src/Interpreters/DDLTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ struct DDLLogEntry
static constexpr const UInt64 NORMALIZE_CREATE_ON_INITIATOR_VERSION = 3;
static constexpr const UInt64 OPENTELEMETRY_ENABLED_VERSION = 4;
static constexpr const UInt64 PRESERVE_INITIAL_QUERY_ID_VERSION = 5;
static constexpr const UInt64 BACKUP_RESTORE_FLAG_IN_ZK_VERSION = 6;
/// Add new version here

/// Remember to update the value below once new version is added
static constexpr const UInt64 DDL_ENTRY_FORMAT_MAX_VERSION = 5;
static constexpr const UInt64 DDL_ENTRY_FORMAT_MAX_VERSION = 6;

UInt64 version = 1;
String query;
Expand All @@ -84,6 +85,7 @@ struct DDLLogEntry
std::optional<SettingsChanges> settings;
OpenTelemetry::TracingContext tracing_context;
String initial_query_id;
bool is_backup_restore = false;

void setSettingsIfRequired(ContextPtr context);
String toString() const;
Expand Down
3 changes: 2 additions & 1 deletion src/Interpreters/DDLWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,8 @@ bool DDLWorker::tryExecuteQuery(DDLTaskBase & task, const ZooKeeperPtr & zookeep

if (!task.is_initial_query)
query_scope.emplace(query_context);
executeQuery(istr, ostr, !task.is_initial_query, query_context, {});

executeQuery(istr, ostr, !task.is_initial_query, query_context, {}, QueryFlags{ .internal = false, .distributed_backup_restore = task.entry.is_backup_restore });

if (auto txn = query_context->getZooKeeperMetadataTransaction())
{
Expand Down
13 changes: 6 additions & 7 deletions src/Interpreters/InterpreterCreateQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ ASTPtr InterpreterCreateQuery::formatProjections(const ProjectionsDescription &
}

ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
const ASTExpressionList & columns_ast, ContextPtr context_, bool attach)
const ASTExpressionList & columns_ast, ContextPtr context_, bool attach, bool is_restore_from_backup)
{
/// First, deduce implicit types.

Expand All @@ -489,7 +489,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(

ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
NamesAndTypesList column_names_and_types;
bool make_columns_nullable = !attach && context_->getSettingsRef().data_type_default_nullable;
bool make_columns_nullable = !attach && !is_restore_from_backup && context_->getSettingsRef().data_type_default_nullable;

for (const auto & ast : columns_ast.children)
{
Expand Down Expand Up @@ -645,7 +645,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
res.add(std::move(column));
}

if (!attach && context_->getSettingsRef().flatten_nested)
if (!attach && !is_restore_from_backup && context_->getSettingsRef().flatten_nested)
res.flattenNested();

if (res.getAllPhysical().empty())
Expand Down Expand Up @@ -692,7 +692,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti

if (create.columns_list->columns)
{
properties.columns = getColumnsDescription(*create.columns_list->columns, getContext(), create.attach);
properties.columns = getColumnsDescription(*create.columns_list->columns, getContext(), create.attach, is_restore_from_backup);
}

if (create.columns_list->indices)
Expand Down Expand Up @@ -752,7 +752,6 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
}
else if (create.select)
{

Block as_select_sample;

if (getContext()->getSettingsRef().allow_experimental_analyzer)
Expand Down Expand Up @@ -1077,7 +1076,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
auto guard = DatabaseCatalog::instance().getDDLGuard(database_name, create.getTable());
create.setDatabase(database_name);
guard->releaseTableLock();
return database->tryEnqueueReplicatedDDL(query_ptr, getContext(), internal);
return database->tryEnqueueReplicatedDDL(query_ptr, getContext(), QueryFlags{ .internal = internal, .distributed_backup_restore = is_restore_from_backup });
}

if (!create.cluster.empty())
Expand Down Expand Up @@ -1233,7 +1232,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
auto guard = DatabaseCatalog::instance().getDDLGuard(create.getDatabase(), create.getTable());
assertOrSetUUID(create, database);
guard->releaseTableLock();
return database->tryEnqueueReplicatedDDL(query_ptr, getContext(), internal);
return database->tryEnqueueReplicatedDDL(query_ptr, getContext(), QueryFlags{ .internal = internal, .distributed_backup_restore = is_restore_from_backup });
}

if (!create.cluster.empty())
Expand Down
8 changes: 7 additions & 1 deletion src/Interpreters/InterpreterCreateQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,14 @@ class InterpreterCreateQuery : public IInterpreter, WithMutableContext
need_ddl_guard = false;
}

void setIsRestoreFromBackup(bool is_restore_from_backup_)
{
is_restore_from_backup = is_restore_from_backup_;
}

/// Obtain information about columns, their types, default values and column comments,
/// for case when columns in CREATE query is specified explicitly.
static ColumnsDescription getColumnsDescription(const ASTExpressionList & columns, ContextPtr context, bool attach);
static ColumnsDescription getColumnsDescription(const ASTExpressionList & columns, ContextPtr context, bool attach, bool is_restore_from_backup);
static ConstraintsDescription getConstraintsDescription(const ASTExpressionList * constraints);

static void prepareOnClusterQuery(ASTCreateQuery & create, ContextPtr context, const String & cluster_name);
Expand Down Expand Up @@ -116,6 +121,7 @@ class InterpreterCreateQuery : public IInterpreter, WithMutableContext
bool force_attach = false;
bool load_database_without_tables = false;
bool need_ddl_guard = true;
bool is_restore_from_backup = false;

mutable String as_database_saved;
mutable String as_table_saved;
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterKillQueryQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ Block InterpreterKillQueryQuery::getSelectResult(const String & columns, const S
if (where_expression)
select_query += " WHERE " + queryToString(where_expression);

auto io = executeQuery(select_query, getContext(), true).second;
auto io = executeQuery(select_query, getContext(), QueryFlags{ .internal = true }).second;
PullingPipelineExecutor executor(io.pipeline);
Block res;
while (!res && executor.pull(res));
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterShowColumnsQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ WHERE

BlockIO InterpreterShowColumnsQuery::execute()
{
return executeQuery(getRewrittenQuery(), getContext(), true).second;
return executeQuery(getRewrittenQuery(), getContext(), QueryFlags{ .internal = true }).second;
}


Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterShowEngineQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace DB

BlockIO InterpreterShowEnginesQuery::execute()
{
return executeQuery("SELECT * FROM system.table_engines ORDER BY name", getContext(), true).second;
return executeQuery("SELECT * FROM system.table_engines ORDER BY name", getContext(), QueryFlags{ .internal = true }).second;
}

}
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterShowFunctionsQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ InterpreterShowFunctionsQuery::InterpreterShowFunctionsQuery(const ASTPtr & quer

BlockIO InterpreterShowFunctionsQuery::execute()
{
return executeQuery(getRewrittenQuery(), getContext(), true).second;
return executeQuery(getRewrittenQuery(), getContext(), QueryFlags{ .internal = true }).second;
}

String InterpreterShowFunctionsQuery::getRewrittenQuery()
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterShowIndexesQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ ORDER BY index_type, expression, column_name, seq_in_index;)", database, table,

BlockIO InterpreterShowIndexesQuery::execute()
{
return executeQuery(getRewrittenQuery(), getContext(), true).second;
return executeQuery(getRewrittenQuery(), getContext(), QueryFlags{ .internal = true }).second;
}


Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterShowProcesslistQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace DB

BlockIO InterpreterShowProcesslistQuery::execute()
{
return executeQuery("SELECT * FROM system.processes ORDER BY elapsed DESC", getContext(), true).second;
return executeQuery("SELECT * FROM system.processes ORDER BY elapsed DESC", getContext(), QueryFlags{ .internal = true }).second;
}

}
3 changes: 1 addition & 2 deletions src/Interpreters/InterpreterShowSettingQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ String InterpreterShowSettingQuery::getRewrittenQuery()

BlockIO InterpreterShowSettingQuery::execute()
{
return executeQuery(getRewrittenQuery(), getContext(), true).second;
return executeQuery(getRewrittenQuery(), getContext(), QueryFlags{ .internal = true }).second;
}


}

2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterShowTablesQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ BlockIO InterpreterShowTablesQuery::execute()
return res;
}

return executeQuery(getRewrittenQuery(), getContext(), true).second;
return executeQuery(getRewrittenQuery(), getContext(), QueryFlags{ .internal = true }).second;
}

/// (*) Sorting is strictly speaking not necessary but 1. it is convenient for users, 2. SQL currently does not allow to
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica,
auto & create = create_ast->as<ASTCreateQuery &>();
create.attach = true;

auto columns = InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, system_context, true);
auto columns = InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, system_context, true, false);
auto constraints = InterpreterCreateQuery::getConstraintsDescription(create.columns_list->constraints);
auto data_path = database->getTableDataPath(create);

Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/MySQL/InterpretersMySQLDDLQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class InterpreterMySQLDDLQuery : public IInterpreter, WithMutableContext
ASTs rewritten_queries = InterpreterImpl::getRewrittenQueries(query, getContext(), mapped_to_database, mysql_database);

for (const auto & rewritten_query : rewritten_queries)
executeQuery("/* Rewritten MySQL DDL Query */ " + queryToString(rewritten_query), getContext(), true);
executeQuery("/* Rewritten MySQL DDL Query */ " + queryToString(rewritten_query), getContext(), QueryFlags{ .internal = true });

return BlockIO{};
}
Expand Down
Loading

0 comments on commit 171cb24

Please sign in to comment.