Skip to content

Commit

Permalink
Merge branch '23.7' into backport/23.7/54066
Browse files Browse the repository at this point in the history
  • Loading branch information
tavplubix authored Sep 14, 2023
2 parents d73f5d9 + 255adf8 commit 70c3ce3
Show file tree
Hide file tree
Showing 118 changed files with 1,497 additions and 440 deletions.
2 changes: 1 addition & 1 deletion contrib/libunwind
Submodule libunwind updated 1 files
+24 −4 src/config.h
2 changes: 1 addition & 1 deletion docker/test/integration/hive_server/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ RUN apt-get install -y wget openjdk-8-jre

RUN wget https://archive.apache.org/dist/hadoop/common/hadoop-3.1.0/hadoop-3.1.0.tar.gz && \
tar -xf hadoop-3.1.0.tar.gz && rm -rf hadoop-3.1.0.tar.gz
RUN wget https://dlcdn.apache.org/hive/hive-2.3.9/apache-hive-2.3.9-bin.tar.gz && \
RUN wget https://apache.apache.org/dist/hive/hive-2.3.9/apache-hive-2.3.9-bin.tar.gz && \
tar -xf apache-hive-2.3.9-bin.tar.gz && rm -rf apache-hive-2.3.9-bin.tar.gz
RUN apt install -y vim

Expand Down
2 changes: 1 addition & 1 deletion docker/test/integration/runner/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ RUN python3 -m pip install --no-cache-dir \
urllib3

# Hudi supports only spark 3.3.*, not 3.4
RUN curl -fsSL -O https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz \
RUN curl -fsSL -O https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz \
&& tar xzvf spark-3.3.2-bin-hadoop3.tgz -C / \
&& rm spark-3.3.2-bin-hadoop3.tgz

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "2.3"

services:
coredns:
image: coredns/coredns:latest
image: coredns/coredns:1.9.3 # :latest broke this test
restart: always
volumes:
- ${COREDNS_CONFIG_DIR}/example.com:/example.com
Expand Down
2 changes: 1 addition & 1 deletion docker/test/stateless/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ RUN arch=${TARGETARCH:-amd64} \
&& chmod +x ./mc ./minio


RUN wget --no-verbose 'https://dlcdn.apache.org/hadoop/common/hadoop-3.3.1/hadoop-3.3.1.tar.gz' \
RUN wget --no-verbose 'https://archive.apache.org/dist/hadoop/common/hadoop-3.3.1/hadoop-3.3.1.tar.gz' \
&& tar -xvf hadoop-3.3.1.tar.gz \
&& rm -rf hadoop-3.3.1.tar.gz

Expand Down
4 changes: 4 additions & 0 deletions src/AggregateFunctions/Moments.h
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,10 @@ struct AnalysisOfVarianceMoments

void add(T value, size_t group)
{
if (group == std::numeric_limits<size_t>::max())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Too many groups for analysis of variance (should be no more than {}, got {})",
MAX_GROUPS_NUMBER, group);

resizeIfNeeded(group + 1);
xs1[group] += value;
xs2[group] += value * value;
Expand Down
4 changes: 2 additions & 2 deletions src/Analyzer/Passes/QueryAnalysisPass.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6321,9 +6321,9 @@ void QueryAnalyzer::resolveTableFunction(QueryTreeNodePtr & table_function_node,
{
/// For input function we should check if input format supports reading subset of columns.
if (table_function_ptr->getName() == "input")
use_columns_from_insert_query = FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(scope.context->getInsertFormat());
use_columns_from_insert_query = FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(scope.context->getInsertFormat(), scope.context);
else
use_columns_from_insert_query = table_function_ptr->supportsReadingSubsetOfColumns();
use_columns_from_insert_query = table_function_ptr->supportsReadingSubsetOfColumns(scope.context);
}

if (use_columns_from_insert_query)
Expand Down
2 changes: 1 addition & 1 deletion src/Columns/ColumnSparse.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ void ColumnSparse::compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const
{
if (row_indexes)
if (row_indexes || !typeid_cast<const ColumnSparse *>(&rhs))
{
/// TODO: implement without conversion to full column.
auto this_full = convertToFullColumnIfSparse();
Expand Down
3 changes: 3 additions & 0 deletions src/Common/HTTPHeaderFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ void HTTPHeaderFilter::checkHeaders(const HTTPHeaderEntries & entries) const

for (const auto & entry : entries)
{
if (entry.name.contains('\n') || entry.value.contains('\n'))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "HTTP header \"{}\" has invalid character", entry.name);

if (forbidden_headers.contains(entry.name))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "HTTP header \"{}\" is forbidden in configuration file, "
"see <http_forbid_headers>", entry.name);
Expand Down
21 changes: 20 additions & 1 deletion src/Disks/IO/AsynchronousBoundedReadBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,26 @@ void AsynchronousBoundedReadBuffer::setReadUntilPosition(size_t position)
{
if (!read_until_position || position != *read_until_position)
{
if (position < file_offset_of_buffer_end)
{
/// file has been read beyond new read until position already
if (working_buffer.size() >= file_offset_of_buffer_end - position)
{
/// new read until position is inside working buffer
file_offset_of_buffer_end = position;
}
else
{
/// new read until position is before working buffer begin
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to set read until position before already read data ({} > {}, info: {})",
position,
getPosition(),
impl->getInfoForLog());
}
}

read_until_position = position;

/// We must wait on future and reset the prefetch here, because otherwise there might be
Expand Down Expand Up @@ -248,7 +268,6 @@ off_t AsynchronousBoundedReadBuffer::seek(off_t offset, int whence)
{
/// Position is still inside the buffer.
/// Probably it is at the end of the buffer - then we will load data on the following 'next' call.

pos = working_buffer.end() - file_offset_of_buffer_end + new_pos;
assert(pos >= working_buffer.begin());
assert(pos <= working_buffer.end());
Expand Down
2 changes: 2 additions & 0 deletions src/Disks/IO/AsynchronousBoundedReadBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class AsynchronousBoundedReadBuffer : public ReadBufferFromFileBase

void setReadUntilEnd() override { return setReadUntilPosition(getFileSize()); }

size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }

off_t getPosition() override { return file_offset_of_buffer_end - available() + bytes_to_ignore; }

private:
Expand Down
68 changes: 54 additions & 14 deletions src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <Common/Exception.h>

#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <boost/algorithm/string/join.hpp>

namespace DB
{
Expand Down Expand Up @@ -156,14 +157,13 @@ struct RemoveObjectStorageOperation final : public IDiskObjectStorageOperation

struct RemoveManyObjectStorageOperation final : public IDiskObjectStorageOperation
{
RemoveBatchRequest remove_paths;
bool keep_all_batch_data;
NameSet file_names_remove_metadata_only;
const RemoveBatchRequest remove_paths;
const bool keep_all_batch_data;
const NameSet file_names_remove_metadata_only;

std::vector<String> paths_removed_with_objects;
std::vector<ObjectsToRemove> objects_to_remove;

bool remove_from_cache = false;

RemoveManyObjectStorageOperation(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
Expand Down Expand Up @@ -203,6 +203,7 @@ struct RemoveManyObjectStorageOperation final : public IDiskObjectStorageOperati
if (unlink_outcome && !keep_all_batch_data && !file_names_remove_metadata_only.contains(fs::path(path).filename()))
{
objects_to_remove.emplace_back(ObjectsToRemove{std::move(objects), std::move(unlink_outcome)});
paths_removed_with_objects.push_back(path);
}
}
catch (const Exception & e)
Expand All @@ -213,6 +214,12 @@ struct RemoveManyObjectStorageOperation final : public IDiskObjectStorageOperati
|| e.code() == ErrorCodes::CANNOT_READ_ALL_DATA
|| e.code() == ErrorCodes::CANNOT_OPEN_FILE)
{
LOG_DEBUG(
&Poco::Logger::get("RemoveManyObjectStorageOperation"),
"Can't read metadata because of an exception. Just remove it from the filesystem. Path: {}, exception: {}",
metadata_storage.getPath() + path,
e.message());

tx->unlinkFile(path);
}
else
Expand All @@ -238,16 +245,31 @@ struct RemoveManyObjectStorageOperation final : public IDiskObjectStorageOperati
/// TL;DR Don't pay any attention to 404 status code
if (!remove_from_remote.empty())
object_storage.removeObjectsIfExist(remove_from_remote);

if (!keep_all_batch_data)
{
LOG_DEBUG(
&Poco::Logger::get("RemoveManyObjectStorageOperation"),
"metadata and objects were removed for [{}], "
"only metadata were removed for [{}].",
boost::algorithm::join(paths_removed_with_objects, ", "),
boost::algorithm::join(file_names_remove_metadata_only, ", "));
}
}
};


struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOperation
{
std::string path;
/// path inside disk with metadata
const std::string path;
const bool keep_all_batch_data;
/// paths inside the 'this->path'
const NameSet file_names_remove_metadata_only;

/// map from local_path to its remote objects with hardlinks counter
/// local_path is the path inside 'this->path'
std::unordered_map<std::string, ObjectsToRemove> objects_to_remove_by_path;
bool keep_all_batch_data;
NameSet file_names_remove_metadata_only;

RemoveRecursiveObjectStorageOperation(
IObjectStorage & object_storage_,
Expand All @@ -274,11 +296,16 @@ struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOp
{
try
{
chassert(path_to_remove.starts_with(path));
auto rel_path = String(fs::relative(fs::path(path_to_remove), fs::path(path)));

auto objects_paths = metadata_storage.getStorageObjects(path_to_remove);
auto unlink_outcome = tx->unlinkMetadata(path_to_remove);
if (unlink_outcome)

if (unlink_outcome && !file_names_remove_metadata_only.contains(rel_path))
{
objects_to_remove_by_path[path_to_remove] = ObjectsToRemove{std::move(objects_paths), std::move(unlink_outcome)};
objects_to_remove_by_path[std::move(rel_path)]
= ObjectsToRemove{std::move(objects_paths), std::move(unlink_outcome)};
}
}
catch (const Exception & e)
Expand Down Expand Up @@ -320,25 +347,38 @@ struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOp

void undo() override
{

}

void finalize() override
{
if (!keep_all_batch_data)
{
std::vector<String> total_removed_paths;
total_removed_paths.reserve(objects_to_remove_by_path.size());

StoredObjects remove_from_remote;
for (auto && [local_path, objects_to_remove] : objects_to_remove_by_path)
{
if (!file_names_remove_metadata_only.contains(fs::path(local_path).filename()))
chassert(!file_names_remove_metadata_only.contains(local_path));
if (objects_to_remove.unlink_outcome->num_hardlinks == 0)
{
if (objects_to_remove.unlink_outcome->num_hardlinks == 0)
std::move(objects_to_remove.objects.begin(), objects_to_remove.objects.end(), std::back_inserter(remove_from_remote));
std::move(objects_to_remove.objects.begin(), objects_to_remove.objects.end(), std::back_inserter(remove_from_remote));
total_removed_paths.push_back(local_path);
}
}

/// Read comment inside RemoveObjectStorageOperation class
/// TL;DR Don't pay any attention to 404 status code
object_storage.removeObjectsIfExist(remove_from_remote);

LOG_DEBUG(
&Poco::Logger::get("RemoveRecursiveObjectStorageOperation"),
"Recursively remove path {}: "
"metadata and objects were removed for [{}], "
"only metadata were removed for [{}].",
path,
boost::algorithm::join(total_removed_paths, ", "),
boost::algorithm::join(file_names_remove_metadata_only, ", "));
}
}
};
Expand Down
4 changes: 2 additions & 2 deletions src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exis

throwIfUnexpectedError(outcome, if_exists);

LOG_TRACE(log, "Object with path {} was removed from S3", object.remote_path);
LOG_DEBUG(log, "Object with path {} was removed from S3", object.remote_path);
}

void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_exists)
Expand Down Expand Up @@ -368,7 +368,7 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e

throwIfUnexpectedError(outcome, if_exists);

LOG_TRACE(log, "Objects with paths [{}] were removed from S3", keys);
LOG_DEBUG(log, "Objects with paths [{}] were removed from S3", keys);
}
}
}
Expand Down
17 changes: 13 additions & 4 deletions src/Formats/FormatFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -678,10 +678,18 @@ void FormatFactory::markOutputFormatSupportsParallelFormatting(const String & na

void FormatFactory::markFormatSupportsSubsetOfColumns(const String & name)
{
auto & target = dict[name].supports_subset_of_columns;
auto & target = dict[name].subset_of_columns_support_checker;
if (target)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Format {} is already marked as supporting subset of columns", name);
target = true;
target = [](const FormatSettings &){ return true; };
}

void FormatFactory::registerSubsetOfColumnsSupportChecker(const String & name, SubsetOfColumnsSupportChecker subset_of_columns_support_checker)
{
auto & target = dict[name].subset_of_columns_support_checker;
if (target)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Format {} is already marked as supporting subset of columns", name);
target = std::move(subset_of_columns_support_checker);
}

void FormatFactory::markFormatSupportsSubcolumns(const String & name)
Expand All @@ -706,10 +714,11 @@ bool FormatFactory::checkIfFormatSupportsSubcolumns(const String & name) const
return target.supports_subcolumns;
}

bool FormatFactory::checkIfFormatSupportsSubsetOfColumns(const String & name) const
bool FormatFactory::checkIfFormatSupportsSubsetOfColumns(const DB::String & name, const ContextPtr & context, const std::optional<FormatSettings> & format_settings_) const
{
const auto & target = getCreators(name);
return target.supports_subset_of_columns;
auto format_settings = format_settings_ ? *format_settings_ : getFormatSettings(context);
return target.subset_of_columns_support_checker && target.subset_of_columns_support_checker(format_settings);
}

void FormatFactory::registerAdditionalInfoForSchemaCacheGetter(
Expand Down
14 changes: 10 additions & 4 deletions src/Formats/FormatFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ class FormatFactory final : private boost::noncopyable
/// and the name of the message.
using AdditionalInfoForSchemaCacheGetter = std::function<String(const FormatSettings & settings)>;

/// Some formats can support reading subset of columns depending on settings.
/// The checker should return true if format support append.
using SubsetOfColumnsSupportChecker = std::function<bool(const FormatSettings & settings)>;

struct Creators
{
InputCreator input_creator;
Expand All @@ -134,13 +138,13 @@ class FormatFactory final : private boost::noncopyable
FileSegmentationEngine file_segmentation_engine;
SchemaReaderCreator schema_reader_creator;
ExternalSchemaReaderCreator external_schema_reader_creator;
bool supports_parallel_formatting{false};
bool supports_subcolumns{false};
bool supports_subset_of_columns{false};
bool supports_parallel_formatting{false};
bool prefers_large_blocks{false};
NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker;
AppendSupportChecker append_support_checker;
AdditionalInfoForSchemaCacheGetter additional_info_for_schema_cache_getter;
SubsetOfColumnsSupportChecker subset_of_columns_support_checker;
};

using FormatsDictionary = std::unordered_map<String, Creators>;
Expand Down Expand Up @@ -229,10 +233,12 @@ class FormatFactory final : private boost::noncopyable
void markOutputFormatSupportsParallelFormatting(const String & name);
void markOutputFormatPrefersLargeBlocks(const String & name);
void markFormatSupportsSubcolumns(const String & name);
void markFormatSupportsSubsetOfColumns(const String & name);

bool checkIfFormatSupportsSubcolumns(const String & name) const;
bool checkIfFormatSupportsSubsetOfColumns(const String & name) const;

void markFormatSupportsSubsetOfColumns(const String & name);
void registerSubsetOfColumnsSupportChecker(const String & name, SubsetOfColumnsSupportChecker subset_of_columns_support_checker);
bool checkIfFormatSupportsSubsetOfColumns(const String & name, const ContextPtr & context, const std::optional<FormatSettings> & format_settings_ = std::nullopt) const;

bool checkIfFormatHasSchemaReader(const String & name) const;
bool checkIfFormatHasExternalSchemaReader(const String & name) const;
Expand Down
5 changes: 3 additions & 2 deletions src/Formats/registerWithNamesAndTypes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ void registerWithNamesAndTypes(const std::string & base_format_name, RegisterWit

void markFormatWithNamesAndTypesSupportsSamplingColumns(const std::string & base_format_name, FormatFactory & factory)
{
factory.markFormatSupportsSubsetOfColumns(base_format_name + "WithNames");
factory.markFormatSupportsSubsetOfColumns(base_format_name + "WithNamesAndTypes");
auto setting_checker = [](const FormatSettings & settings){ return settings.with_names_use_header; };
factory.registerSubsetOfColumnsSupportChecker(base_format_name + "WithNames", setting_checker);
factory.registerSubsetOfColumnsSupportChecker(base_format_name + "WithNamesAndTypes", setting_checker);
}

}
Loading

0 comments on commit 70c3ce3

Please sign in to comment.