Skip to content

Commit

Permalink
clp-s: Add support for projecting of a subset of columns during searc…
Browse files Browse the repository at this point in the history
…h. (y-scope#510)

Co-authored-by: wraymo <[email protected]>
Co-authored-by: Kirk Rodrigues <[email protected]>
  • Loading branch information
3 people authored Oct 7, 2024
1 parent 035449a commit 7826d73
Show file tree
Hide file tree
Showing 11 changed files with 234 additions and 4 deletions.
1 change: 1 addition & 0 deletions components/core/src/clp_s/ArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ void ArchiveReader::initialize_schema_reader(
auto& schema = (*m_schema_map)[schema_id];
reader.reset(
m_schema_tree,
m_projection,
schema_id,
schema.get_ordered_schema_view(),
m_id_to_table_metadata[schema_id].num_messages,
Expand Down
8 changes: 8 additions & 0 deletions components/core/src/clp_s/ArchiveReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "DictionaryReader.hpp"
#include "ReaderUtils.hpp"
#include "SchemaReader.hpp"
#include "search/Projection.hpp"
#include "TimestampDictionaryReader.hpp"
#include "Utils.hpp"

Expand Down Expand Up @@ -133,6 +134,10 @@ class ArchiveReader {
*/
[[nodiscard]] std::vector<int32_t> const& get_schema_ids() const { return m_schema_ids; }

void set_projection(std::shared_ptr<search::Projection> projection) {
m_projection = projection;
}

private:
/**
* Initializes a schema reader passed by reference to become a reader for a given schema.
Expand Down Expand Up @@ -182,6 +187,9 @@ class ArchiveReader {
std::shared_ptr<ReaderUtils::SchemaMap> m_schema_map;
std::vector<int32_t> m_schema_ids;
std::map<int32_t, SchemaReader::TableMetadata> m_id_to_table_metadata;
std::shared_ptr<search::Projection> m_projection{
std::make_shared<search::Projection>(search::ProjectionMode::ReturnAllColumns)
};

FileReader m_tables_file_reader;
FileReader m_table_metadata_file_reader;
Expand Down
2 changes: 2 additions & 0 deletions components/core/src/clp_s/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ set(
search/Output.hpp
search/OutputHandler.cpp
search/OutputHandler.hpp
search/Projection.cpp
search/Projection.hpp
search/SchemaMatch.cpp
search/SchemaMatch.hpp
search/SearchUtils.cpp
Expand Down
8 changes: 8 additions & 0 deletions components/core/src/clp_s/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,14 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
"archive-id",
po::value<std::string>(&m_archive_id)->value_name("ID"),
"Limit search to the archive with the given ID"
)(
"projection",
po::value<std::vector<std::string>>(&m_projection_columns)
->multitoken()
->value_name("COLUMN_A COLUMN_B ..."),
"Project only the given set of columns for matching results. This option must be"
" specified after all positional options. Values that are objects or structured"
" arrays are currently unsupported."
);
// clang-format on
search_options.add(match_options);
Expand Down
3 changes: 3 additions & 0 deletions components/core/src/clp_s/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ class CommandLineArguments {

size_t get_ordered_chunk_size() const { return m_ordered_chunk_size; }

std::vector<std::string> const& get_projection_columns() const { return m_projection_columns; }

private:
// Methods
/**
Expand Down Expand Up @@ -192,6 +194,7 @@ class CommandLineArguments {
std::optional<epochtime_t> m_search_begin_ts;
std::optional<epochtime_t> m_search_end_ts;
bool m_ignore_case{false};
std::vector<std::string> m_projection_columns;

// Decompression and search variables
std::string m_archive_id;
Expand Down
8 changes: 7 additions & 1 deletion components/core/src/clp_s/JsonSerializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,13 @@ class JsonSerializer {

void begin_document() { m_json_string += "{"; }

void end_document() { m_json_string[m_json_string.size() - 1] = '}'; }
void end_document() {
if ('{' != m_json_string.back()) {
m_json_string[m_json_string.size() - 1] = '}';
} else {
m_json_string += '}';
}
}

void end_object() {
if (m_op_list[m_op_list_index - 2] != BeginObject
Expand Down
12 changes: 9 additions & 3 deletions components/core/src/clp_s/SchemaReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,19 +548,25 @@ void SchemaReader::initialize_serializer() {
m_serializer_initialized = true;

for (int32_t global_column_id : m_ordered_schema) {
generate_local_tree(global_column_id);
if (m_projection->matches_node(global_column_id)) {
generate_local_tree(global_column_id);
}
}

for (auto it = m_global_id_to_unordered_object.begin();
it != m_global_id_to_unordered_object.end();
++it)
{
generate_local_tree(it->first);
if (m_projection->matches_node(it->first)) {
generate_local_tree(it->first);
}
}

// TODO: this code will have to change once we allow mixing log lines parsed by different
// parsers.
generate_json_template(0);
if (false == m_local_schema_tree.get_nodes().empty()) {
generate_json_template(m_local_schema_tree.get_root_node_id());
}
}

void SchemaReader::generate_json_template(int32_t id) {
Expand Down
5 changes: 5 additions & 0 deletions components/core/src/clp_s/SchemaReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "FileReader.hpp"
#include "JsonSerializer.hpp"
#include "SchemaTree.hpp"
#include "search/Projection.hpp"
#include "ZstdDecompressor.hpp"

namespace clp_s {
Expand Down Expand Up @@ -71,13 +72,15 @@ class SchemaReader {
* to accept append_column calls for the new schema.
*
* @param schema_tree
* @param projection
* @param schema_id
* @param ordered_schema
* @param num_messages
* @param should_marshal_records
*/
void reset(
std::shared_ptr<SchemaTree> schema_tree,
std::shared_ptr<search::Projection> projection,
int32_t schema_id,
std::span<int32_t> ordered_schema,
uint64_t num_messages,
Expand All @@ -100,6 +103,7 @@ class SchemaReader {
m_local_schema_tree.clear();
m_json_serializer.clear();
m_global_schema_tree = std::move(schema_tree);
m_projection = std::move(projection);
m_should_marshal_records = should_marshal_records;
}

Expand Down Expand Up @@ -291,6 +295,7 @@ class SchemaReader {
JsonSerializer m_json_serializer;
bool m_should_marshal_records{true};
bool m_serializer_initialized{false};
std::shared_ptr<search::Projection> m_projection;

std::map<int32_t, std::pair<size_t, std::span<int32_t>>> m_global_id_to_unordered_object;
};
Expand Down
21 changes: 21 additions & 0 deletions components/core/src/clp_s/clp-s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "search/OrOfAndForm.hpp"
#include "search/Output.hpp"
#include "search/OutputHandler.hpp"
#include "search/Projection.hpp"
#include "search/SchemaMatch.hpp"
#include "TimestampPattern.hpp"
#include "TraceableException.hpp"
Expand All @@ -39,6 +40,7 @@ using clp_s::cArchiveFormatDevelopmentVersionFlag;
using clp_s::cEpochTimeMax;
using clp_s::cEpochTimeMin;
using clp_s::CommandLineArguments;
using clp_s::StringUtils;

namespace {
/**
Expand Down Expand Up @@ -179,6 +181,25 @@ bool search_archive(
return true;
}

// Populate projection
auto projection = std::make_shared<Projection>(
command_line_arguments.get_projection_columns().empty()
? ProjectionMode::ReturnAllColumns
: ProjectionMode::ReturnSelectedColumns
);
try {
for (auto const& column : command_line_arguments.get_projection_columns()) {
std::vector<std::string> descriptor_tokens;
StringUtils::tokenize_column_descriptor(column, descriptor_tokens);
projection->add_column(ColumnDescriptor::create(descriptor_tokens));
}
} catch (clp_s::TraceableException& e) {
SPDLOG_ERROR("{}", e.what());
return false;
}
projection->resolve_columns(archive_reader->get_schema_tree());
archive_reader->set_projection(projection);

std::unique_ptr<OutputHandler> output_handler;
try {
switch (command_line_arguments.get_output_handler_type()) {
Expand Down
88 changes: 88 additions & 0 deletions components/core/src/clp_s/search/Projection.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#include "Projection.hpp"

#include <algorithm>

#include "SearchUtils.hpp"

namespace clp_s::search {
void Projection::add_column(std::shared_ptr<ColumnDescriptor> column) {
if (column->is_unresolved_descriptor()) {
throw OperationFailed(ErrorCodeBadParam, __FILE__, __LINE__);
}
if (ProjectionMode::ReturnAllColumns == m_projection_mode) {
throw OperationFailed(ErrorCodeUnsupported, __FILE__, __LINE__);
}
if (m_selected_columns.end()
!= std::find_if(
m_selected_columns.begin(),
m_selected_columns.end(),
[column](auto const& rhs) -> bool { return *column == *rhs; }
))
{
// no duplicate columns in projection
throw OperationFailed(ErrorCodeBadParam, __FILE__, __LINE__);
}
m_selected_columns.push_back(column);
}

void Projection::resolve_columns(std::shared_ptr<SchemaTree> tree) {
for (auto& column : m_selected_columns) {
resolve_column(tree, column);
}
}

void Projection::resolve_column(
std::shared_ptr<SchemaTree> tree,
std::shared_ptr<ColumnDescriptor> column
) {
/**
* Ideally we would reuse the code from SchemaMatch for resolving columns, but unfortunately we
* can not.
*
* The main reason is that here we don't want to allow projection to travel inside unstructured
* objects -- it may be possible to support such a thing in the future, but it poses some extra
* challenges (e.g. deciding what to do when projecting repeated elements in a structure).
*
* It would be possible to create code that can handle our use-case and SchemaMatch's use-case
* in an elegant way, but it's a significant refactor. In particular, if we extend our column
* type system to be one-per-token instead of one-per-column we can make it so that intermediate
* tokens will not match certain kinds of MPT nodes (like the node for structured arrays).
*
* In light of that we implement a simple version of column resolution here that does exactly
* what we need.
*/

auto cur_node_id = tree->get_root_node_id();
auto it = column->descriptor_begin();
while (it != column->descriptor_end()) {
bool matched_any{false};
auto cur_it = it++;
bool last_token = it == column->descriptor_end();
auto const& cur_node = tree->get_node(cur_node_id);
for (int32_t child_node_id : cur_node.get_children_ids()) {
auto const& child_node = tree->get_node(child_node_id);

// Intermediate nodes must be objects
if (false == last_token && child_node.get_type() != NodeType::Object) {
continue;
}

if (child_node.get_key_name() != cur_it->get_token()) {
continue;
}

matched_any = true;
if (last_token && column->matches_type(node_to_literal_type(child_node.get_type()))) {
m_matching_nodes.insert(child_node_id);
} else if (false == last_token) {
cur_node_id = child_node_id;
break;
}
}

if (false == matched_any) {
break;
}
}
}
} // namespace clp_s::search
82 changes: 82 additions & 0 deletions components/core/src/clp_s/search/Projection.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#ifndef CLP_S_SEARCH_PROJECTION_HPP
#define CLP_S_SEARCH_PROJECTION_HPP

#include <vector>

#include <absl/container/flat_hash_set.h>

#include "../SchemaTree.hpp"
#include "../TraceableException.hpp"
#include "ColumnDescriptor.hpp"

namespace clp_s::search {
enum ProjectionMode : uint8_t {
ReturnAllColumns,
ReturnSelectedColumns
};

/**
* This class describes the set of columns that should be included in the projected results.
*
* After adding columns and before calling matches_node the caller is responsible for calling
* resolve_columns.
*/
class Projection {
public:
class OperationFailed : public TraceableException {
public:
// Constructors
OperationFailed(ErrorCode error_code, char const* const filename, int line_number)
: TraceableException(error_code, filename, line_number) {}
};

explicit Projection(ProjectionMode mode) : m_projection_mode{mode} {}

/**
* Adds a column to the set of columns that should be included in the projected results
* @param column
* @throws OperationFailed if `column` contains a wildcard
* @throws OperationFailed if this instance of Projection is in mode ReturnAllColumns
* @throws OperationFailed if `column` is identical to a previously added column
*/
void add_column(std::shared_ptr<ColumnDescriptor> column);

/**
* Resolves all columns for the purpose of projection. This key resolution implementation is
* more limited than the one in schema matching. In particular, this version of key resolution
* only allows resolving keys that do not contain wildcards and does not allow resolving to
* objects within arrays.
*
* Note: we could try to generalize column resolution code/move it to the schema tree. It is
* probably best to write a simpler version dedicated to projection for now since types are
* leaf-only. The type-per-token idea solves this problem (in the absence of wildcards).
*
* @param tree
*/
void resolve_columns(std::shared_ptr<SchemaTree> tree);

/**
* Checks whether a column corresponding to given leaf node should be included in the output
* @param node_id
* @return true if the column should be included in the output, false otherwise
*/
bool matches_node(int32_t node_id) const {
return ProjectionMode::ReturnAllColumns == m_projection_mode
|| m_matching_nodes.contains(node_id);
}

private:
/**
* Resolves an individual column as described by the `resolve_columns` method.
* @param tree
* @param column
*/
void resolve_column(std::shared_ptr<SchemaTree> tree, std::shared_ptr<ColumnDescriptor> column);

std::vector<std::shared_ptr<ColumnDescriptor>> m_selected_columns;
absl::flat_hash_set<int32_t> m_matching_nodes;
ProjectionMode m_projection_mode{ProjectionMode::ReturnAllColumns};
};
} // namespace clp_s::search

#endif // CLP_S_SEARCH_PROJECTION_HPP

0 comments on commit 7826d73

Please sign in to comment.