From 7826d7312aa0cea784bfe5371a608094fca10d06 Mon Sep 17 00:00:00 2001 From: Devin Gibson Date: Mon, 7 Oct 2024 10:20:01 -0400 Subject: [PATCH] clp-s: Add support for projecting of a subset of columns during search. (#510) Co-authored-by: wraymo <37269683+wraymo@users.noreply.github.com> Co-authored-by: Kirk Rodrigues <2454684+kirkrodrigues@users.noreply.github.com> --- components/core/src/clp_s/ArchiveReader.cpp | 1 + components/core/src/clp_s/ArchiveReader.hpp | 8 ++ components/core/src/clp_s/CMakeLists.txt | 2 + .../core/src/clp_s/CommandLineArguments.cpp | 8 ++ .../core/src/clp_s/CommandLineArguments.hpp | 3 + components/core/src/clp_s/JsonSerializer.hpp | 8 +- components/core/src/clp_s/SchemaReader.cpp | 12 ++- components/core/src/clp_s/SchemaReader.hpp | 5 ++ components/core/src/clp_s/clp-s.cpp | 21 +++++ .../core/src/clp_s/search/Projection.cpp | 88 +++++++++++++++++++ .../core/src/clp_s/search/Projection.hpp | 82 +++++++++++++++++ 11 files changed, 234 insertions(+), 4 deletions(-) create mode 100644 components/core/src/clp_s/search/Projection.cpp create mode 100644 components/core/src/clp_s/search/Projection.hpp diff --git a/components/core/src/clp_s/ArchiveReader.cpp b/components/core/src/clp_s/ArchiveReader.cpp index f211a0707..1bdc0baae 100644 --- a/components/core/src/clp_s/ArchiveReader.cpp +++ b/components/core/src/clp_s/ArchiveReader.cpp @@ -235,6 +235,7 @@ void ArchiveReader::initialize_schema_reader( auto& schema = (*m_schema_map)[schema_id]; reader.reset( m_schema_tree, + m_projection, schema_id, schema.get_ordered_schema_view(), m_id_to_table_metadata[schema_id].num_messages, diff --git a/components/core/src/clp_s/ArchiveReader.hpp b/components/core/src/clp_s/ArchiveReader.hpp index 91fcc1a94..97966131f 100644 --- a/components/core/src/clp_s/ArchiveReader.hpp +++ b/components/core/src/clp_s/ArchiveReader.hpp @@ -12,6 +12,7 @@ #include "DictionaryReader.hpp" #include "ReaderUtils.hpp" #include "SchemaReader.hpp" +#include "search/Projection.hpp" #include "TimestampDictionaryReader.hpp" #include "Utils.hpp" @@ -133,6 +134,10 @@ class ArchiveReader { */ [[nodiscard]] std::vector const& get_schema_ids() const { return m_schema_ids; } + void set_projection(std::shared_ptr projection) { + m_projection = projection; + } + private: /** * Initializes a schema reader passed by reference to become a reader for a given schema. @@ -182,6 +187,9 @@ class ArchiveReader { std::shared_ptr m_schema_map; std::vector m_schema_ids; std::map m_id_to_table_metadata; + std::shared_ptr m_projection{ + std::make_shared(search::ProjectionMode::ReturnAllColumns) + }; FileReader m_tables_file_reader; FileReader m_table_metadata_file_reader; diff --git a/components/core/src/clp_s/CMakeLists.txt b/components/core/src/clp_s/CMakeLists.txt index c8cf08b22..625972de7 100644 --- a/components/core/src/clp_s/CMakeLists.txt +++ b/components/core/src/clp_s/CMakeLists.txt @@ -145,6 +145,8 @@ set( search/Output.hpp search/OutputHandler.cpp search/OutputHandler.hpp + search/Projection.cpp + search/Projection.hpp search/SchemaMatch.cpp search/SchemaMatch.hpp search/SearchUtils.cpp diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index 4cfe017ac..553f17c39 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -425,6 +425,14 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { "archive-id", po::value(&m_archive_id)->value_name("ID"), "Limit search to the archive with the given ID" + )( + "projection", + po::value>(&m_projection_columns) + ->multitoken() + ->value_name("COLUMN_A COLUMN_B ..."), + "Project only the given set of columns for matching results. This option must be" + " specified after all positional options. Values that are objects or structured" + " arrays are currently unsupported." ); // clang-format on search_options.add(match_options); diff --git a/components/core/src/clp_s/CommandLineArguments.hpp b/components/core/src/clp_s/CommandLineArguments.hpp index 0f3d8c556..030ff8b99 100644 --- a/components/core/src/clp_s/CommandLineArguments.hpp +++ b/components/core/src/clp_s/CommandLineArguments.hpp @@ -108,6 +108,8 @@ class CommandLineArguments { size_t get_ordered_chunk_size() const { return m_ordered_chunk_size; } + std::vector const& get_projection_columns() const { return m_projection_columns; } + private: // Methods /** @@ -192,6 +194,7 @@ class CommandLineArguments { std::optional m_search_begin_ts; std::optional m_search_end_ts; bool m_ignore_case{false}; + std::vector m_projection_columns; // Decompression and search variables std::string m_archive_id; diff --git a/components/core/src/clp_s/JsonSerializer.hpp b/components/core/src/clp_s/JsonSerializer.hpp index 01a8a1e74..ff46dfa24 100644 --- a/components/core/src/clp_s/JsonSerializer.hpp +++ b/components/core/src/clp_s/JsonSerializer.hpp @@ -75,7 +75,13 @@ class JsonSerializer { void begin_document() { m_json_string += "{"; } - void end_document() { m_json_string[m_json_string.size() - 1] = '}'; } + void end_document() { + if ('{' != m_json_string.back()) { + m_json_string[m_json_string.size() - 1] = '}'; + } else { + m_json_string += '}'; + } + } void end_object() { if (m_op_list[m_op_list_index - 2] != BeginObject diff --git a/components/core/src/clp_s/SchemaReader.cpp b/components/core/src/clp_s/SchemaReader.cpp index 03edebf69..d2c7739da 100644 --- a/components/core/src/clp_s/SchemaReader.cpp +++ b/components/core/src/clp_s/SchemaReader.cpp @@ -548,19 +548,25 @@ void SchemaReader::initialize_serializer() { m_serializer_initialized = true; for (int32_t global_column_id : m_ordered_schema) { - generate_local_tree(global_column_id); + if (m_projection->matches_node(global_column_id)) { + generate_local_tree(global_column_id); + } } for (auto it = m_global_id_to_unordered_object.begin(); it != m_global_id_to_unordered_object.end(); ++it) { - generate_local_tree(it->first); + if (m_projection->matches_node(it->first)) { + generate_local_tree(it->first); + } } // TODO: this code will have to change once we allow mixing log lines parsed by different // parsers. - generate_json_template(0); + if (false == m_local_schema_tree.get_nodes().empty()) { + generate_json_template(m_local_schema_tree.get_root_node_id()); + } } void SchemaReader::generate_json_template(int32_t id) { diff --git a/components/core/src/clp_s/SchemaReader.hpp b/components/core/src/clp_s/SchemaReader.hpp index 3639560f6..d72b2c3b6 100644 --- a/components/core/src/clp_s/SchemaReader.hpp +++ b/components/core/src/clp_s/SchemaReader.hpp @@ -11,6 +11,7 @@ #include "FileReader.hpp" #include "JsonSerializer.hpp" #include "SchemaTree.hpp" +#include "search/Projection.hpp" #include "ZstdDecompressor.hpp" namespace clp_s { @@ -71,6 +72,7 @@ class SchemaReader { * to accept append_column calls for the new schema. * * @param schema_tree + * @param projection * @param schema_id * @param ordered_schema * @param num_messages @@ -78,6 +80,7 @@ class SchemaReader { */ void reset( std::shared_ptr schema_tree, + std::shared_ptr projection, int32_t schema_id, std::span ordered_schema, uint64_t num_messages, @@ -100,6 +103,7 @@ class SchemaReader { m_local_schema_tree.clear(); m_json_serializer.clear(); m_global_schema_tree = std::move(schema_tree); + m_projection = std::move(projection); m_should_marshal_records = should_marshal_records; } @@ -291,6 +295,7 @@ class SchemaReader { JsonSerializer m_json_serializer; bool m_should_marshal_records{true}; bool m_serializer_initialized{false}; + std::shared_ptr m_projection; std::map>> m_global_id_to_unordered_object; }; diff --git a/components/core/src/clp_s/clp-s.cpp b/components/core/src/clp_s/clp-s.cpp index 0e0401ad1..8e37ca769 100644 --- a/components/core/src/clp_s/clp-s.cpp +++ b/components/core/src/clp_s/clp-s.cpp @@ -29,6 +29,7 @@ #include "search/OrOfAndForm.hpp" #include "search/Output.hpp" #include "search/OutputHandler.hpp" +#include "search/Projection.hpp" #include "search/SchemaMatch.hpp" #include "TimestampPattern.hpp" #include "TraceableException.hpp" @@ -39,6 +40,7 @@ using clp_s::cArchiveFormatDevelopmentVersionFlag; using clp_s::cEpochTimeMax; using clp_s::cEpochTimeMin; using clp_s::CommandLineArguments; +using clp_s::StringUtils; namespace { /** @@ -179,6 +181,25 @@ bool search_archive( return true; } + // Populate projection + auto projection = std::make_shared( + command_line_arguments.get_projection_columns().empty() + ? ProjectionMode::ReturnAllColumns + : ProjectionMode::ReturnSelectedColumns + ); + try { + for (auto const& column : command_line_arguments.get_projection_columns()) { + std::vector descriptor_tokens; + StringUtils::tokenize_column_descriptor(column, descriptor_tokens); + projection->add_column(ColumnDescriptor::create(descriptor_tokens)); + } + } catch (clp_s::TraceableException& e) { + SPDLOG_ERROR("{}", e.what()); + return false; + } + projection->resolve_columns(archive_reader->get_schema_tree()); + archive_reader->set_projection(projection); + std::unique_ptr output_handler; try { switch (command_line_arguments.get_output_handler_type()) { diff --git a/components/core/src/clp_s/search/Projection.cpp b/components/core/src/clp_s/search/Projection.cpp new file mode 100644 index 000000000..69836e312 --- /dev/null +++ b/components/core/src/clp_s/search/Projection.cpp @@ -0,0 +1,88 @@ +#include "Projection.hpp" + +#include + +#include "SearchUtils.hpp" + +namespace clp_s::search { +void Projection::add_column(std::shared_ptr column) { + if (column->is_unresolved_descriptor()) { + throw OperationFailed(ErrorCodeBadParam, __FILE__, __LINE__); + } + if (ProjectionMode::ReturnAllColumns == m_projection_mode) { + throw OperationFailed(ErrorCodeUnsupported, __FILE__, __LINE__); + } + if (m_selected_columns.end() + != std::find_if( + m_selected_columns.begin(), + m_selected_columns.end(), + [column](auto const& rhs) -> bool { return *column == *rhs; } + )) + { + // no duplicate columns in projection + throw OperationFailed(ErrorCodeBadParam, __FILE__, __LINE__); + } + m_selected_columns.push_back(column); +} + +void Projection::resolve_columns(std::shared_ptr tree) { + for (auto& column : m_selected_columns) { + resolve_column(tree, column); + } +} + +void Projection::resolve_column( + std::shared_ptr tree, + std::shared_ptr column +) { + /** + * Ideally we would reuse the code from SchemaMatch for resolving columns, but unfortunately we + * can not. + * + * The main reason is that here we don't want to allow projection to travel inside unstructured + * objects -- it may be possible to support such a thing in the future, but it poses some extra + * challenges (e.g. deciding what to do when projecting repeated elements in a structure). + * + * It would be possible to create code that can handle our use-case and SchemaMatch's use-case + * in an elegant way, but it's a significant refactor. In particular, if we extend our column + * type system to be one-per-token instead of one-per-column we can make it so that intermediate + * tokens will not match certain kinds of MPT nodes (like the node for structured arrays). + * + * In light of that we implement a simple version of column resolution here that does exactly + * what we need. + */ + + auto cur_node_id = tree->get_root_node_id(); + auto it = column->descriptor_begin(); + while (it != column->descriptor_end()) { + bool matched_any{false}; + auto cur_it = it++; + bool last_token = it == column->descriptor_end(); + auto const& cur_node = tree->get_node(cur_node_id); + for (int32_t child_node_id : cur_node.get_children_ids()) { + auto const& child_node = tree->get_node(child_node_id); + + // Intermediate nodes must be objects + if (false == last_token && child_node.get_type() != NodeType::Object) { + continue; + } + + if (child_node.get_key_name() != cur_it->get_token()) { + continue; + } + + matched_any = true; + if (last_token && column->matches_type(node_to_literal_type(child_node.get_type()))) { + m_matching_nodes.insert(child_node_id); + } else if (false == last_token) { + cur_node_id = child_node_id; + break; + } + } + + if (false == matched_any) { + break; + } + } +} +} // namespace clp_s::search diff --git a/components/core/src/clp_s/search/Projection.hpp b/components/core/src/clp_s/search/Projection.hpp new file mode 100644 index 000000000..74fece742 --- /dev/null +++ b/components/core/src/clp_s/search/Projection.hpp @@ -0,0 +1,82 @@ +#ifndef CLP_S_SEARCH_PROJECTION_HPP +#define CLP_S_SEARCH_PROJECTION_HPP + +#include + +#include + +#include "../SchemaTree.hpp" +#include "../TraceableException.hpp" +#include "ColumnDescriptor.hpp" + +namespace clp_s::search { +enum ProjectionMode : uint8_t { + ReturnAllColumns, + ReturnSelectedColumns +}; + +/** + * This class describes the set of columns that should be included in the projected results. + * + * After adding columns and before calling matches_node the caller is responsible for calling + * resolve_columns. + */ +class Projection { +public: + class OperationFailed : public TraceableException { + public: + // Constructors + OperationFailed(ErrorCode error_code, char const* const filename, int line_number) + : TraceableException(error_code, filename, line_number) {} + }; + + explicit Projection(ProjectionMode mode) : m_projection_mode{mode} {} + + /** + * Adds a column to the set of columns that should be included in the projected results + * @param column + * @throws OperationFailed if `column` contains a wildcard + * @throws OperationFailed if this instance of Projection is in mode ReturnAllColumns + * @throws OperationFailed if `column` is identical to a previously added column + */ + void add_column(std::shared_ptr column); + + /** + * Resolves all columns for the purpose of projection. This key resolution implementation is + * more limited than the one in schema matching. In particular, this version of key resolution + * only allows resolving keys that do not contain wildcards and does not allow resolving to + * objects within arrays. + * + * Note: we could try to generalize column resolution code/move it to the schema tree. It is + * probably best to write a simpler version dedicated to projection for now since types are + * leaf-only. The type-per-token idea solves this problem (in the absence of wildcards). + * + * @param tree + */ + void resolve_columns(std::shared_ptr tree); + + /** + * Checks whether a column corresponding to given leaf node should be included in the output + * @param node_id + * @return true if the column should be included in the output, false otherwise + */ + bool matches_node(int32_t node_id) const { + return ProjectionMode::ReturnAllColumns == m_projection_mode + || m_matching_nodes.contains(node_id); + } + +private: + /** + * Resolves an individual column as described by the `resolve_columns` method. + * @param tree + * @param column + */ + void resolve_column(std::shared_ptr tree, std::shared_ptr column); + + std::vector> m_selected_columns; + absl::flat_hash_set m_matching_nodes; + ProjectionMode m_projection_mode{ProjectionMode::ReturnAllColumns}; +}; +} // namespace clp_s::search + +#endif // CLP_S_SEARCH_PROJECTION_HPP