Skip to content

Commit

Permalink
Add first cut refactored reducer server
Browse files Browse the repository at this point in the history
  • Loading branch information
gibber9809 committed Jan 8, 2024
1 parent b965926 commit 785ce81
Show file tree
Hide file tree
Showing 19 changed files with 1,715 additions and 0 deletions.
14 changes: 14 additions & 0 deletions components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,19 @@ else()
message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for ZStd")
endif()

# Find and setup mongocxx
if(CLP_USE_STATIC_LIBS)
set(MONGOCXX_TARGET mongo::mongocxx_static)
else()
set(MONGOCXX_TARGET mongo::mongocxx_shared)
endif()
find_package(mongocxx REQUIRED)
if(mongocxx_FOUND)
message(STATUS "Found mongocxx ${mongocxx_VERSION}")
else()
message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for mongocxx")
endif()

# sqlite dependencies
set(sqlite_DYNAMIC_LIBS "dl;m;pthread")
include(cmake/Modules/FindLibraryDependencies.cmake)
Expand All @@ -167,6 +180,7 @@ add_subdirectory(src/clp/clg)
add_subdirectory(src/clp/clo)
add_subdirectory(src/clp/clp)
add_subdirectory(src/clp/make_dictionaries_readable)
add_subdirectory(src/reducer)

set(SOURCE_FILES_unitTest
src/clp/BufferedFileReader.cpp
Expand Down
60 changes: 60 additions & 0 deletions components/core/src/reducer/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
set(
REDUCER_SOURCES
CountOperator.cpp
CountOperator.hpp
GroupByTime.cpp
GroupByTime.hpp
GroupTags.hpp
Operator.cpp
Operator.hpp
Pipeline.cpp
Pipeline.hpp
Record.hpp
RecordGroup.hpp
RecordGroupIterator.hpp
RecordGroupSerdes.cpp
RecordGroupSerdes.hpp
RecordIterator.hpp
RecordValueIterator.hpp
reducer_server.cpp
../clp/database_utils.cpp
../clp/database_utils.hpp
../clp/Defs.h
../clp/ErrorCode.hpp
../clp/ffi/encoding_methods.cpp
../clp/ffi/encoding_methods.hpp
../clp/ffi/encoding_methods.inc
../clp/ir/parsing.cpp
../clp/ir/parsing.hpp
../clp/ir/parsing.inc
../clp/ir/types.hpp
../clp/MySQLDB.cpp
../clp/MySQLDB.hpp
../clp/MySQLParamBindings.cpp
../clp/MySQLParamBindings.hpp
../clp/MySQLPreparedStatement.cpp
../clp/MySQLPreparedStatement.hpp
../clp/spdlog_with_specializations.hpp
../clp/TraceableException.hpp
../clp/type_utils.hpp
)

add_executable(reducer_server ${REDUCER_SOURCES})
target_compile_features(reducer_server PRIVATE cxx_std_17)
target_include_directories(reducer_server PRIVATE "${PROJECT_SOURCE_DIR}/submodules")
target_link_libraries(reducer_server
PRIVATE
Boost::program_options
clp::string_utils
fmt::fmt
MariaDBClient::MariaDBClient
${MONGOCXX_TARGET}
msgpack-cxx
spdlog::spdlog
)
# Put the built executable at the root of the build directory
set_target_properties(
reducer_server
PROPERTIES
RUNTIME_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}"
)
33 changes: 33 additions & 0 deletions components/core/src/reducer/CountOperator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#include "CountOperator.hpp"

namespace reducer {
void CountOperator::push_intra_stage_record_group(RecordGroup const& record_group) {
auto& count = m_group_count[record_group.get_tags()];

for (auto it = record_group.record_it(); !it->done(); it->next()) {
count += it->get()->get_int64_value("count");
}
}

void CountOperator::push_inter_stage_record_group(RecordGroup const& record_group) {
auto& count = m_group_count[record_group.get_tags()];

for (auto it = record_group.record_it(); !it->done(); it->next()) {
count += 1;
}
}

std::unique_ptr<RecordGroupIterator> CountOperator::get_stored_result_iterator() {
return std::unique_ptr<RecordGroupIterator>(
new Int64MapRecordGroupIterator(m_group_count, "count")
);
}

std::unique_ptr<RecordGroupIterator> CountOperator::get_stored_result_iterator(
std::set<GroupTags> const& filtered_tags
) {
return std::unique_ptr<RecordGroupIterator>(
new FilteredInt64MapRecordGroupIterator(m_group_count, filtered_tags, "count")
);
}
} // namespace reducer
35 changes: 35 additions & 0 deletions components/core/src/reducer/CountOperator.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#ifndef CLP_AGGREGATION_COUNT_OPERATOR_HPP
#define CLP_AGGREGATION_COUNT_OPERATOR_HPP

#include <map>
#include <string>

#include "GroupTags.hpp"
#include "Operator.hpp"

namespace reducer {
class CountOperator : public Operator {
public:
CountOperator() : Operator() {}

virtual OperatorType get_type() const { return OperatorType::REDUCE; }

virtual OperatorResultCardinality get_cardinality() const {
return OperatorResultCardinality::ONE;
}

virtual void push_intra_stage_record_group(RecordGroup const& record_group);

virtual void push_inter_stage_record_group(RecordGroup const& record_group);

virtual std::unique_ptr<RecordGroupIterator> get_stored_result_iterator();
virtual std::unique_ptr<RecordGroupIterator> get_stored_result_iterator(
std::set<GroupTags> const& filtered_tags
);

private:
std::map<GroupTags, int64_t> m_group_count;
};
} // namespace reducer

#endif // CLP_AGGREGATION_COUNT_OPERATOR_HPP
21 changes: 21 additions & 0 deletions components/core/src/reducer/GroupByTime.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#include "GroupByTime.hpp"

#include "RecordGroup.hpp"

namespace reducer {
void GroupByTime::push_inter_stage_record_group(RecordGroup const& record_group) {
// FIXME: should append to existing grouptags, and preserve record, but this doesn't matter for
// now
for (auto it = record_group.record_it(); !it->done(); it->next()) {
int64_t time = it->get()->get_int64_value("@time");
time = time / m_bucket_size;
time = time * m_bucket_size;
if (time != m_prev_time) {
m_tags[0] = std::to_string(time);
m_prev_time = time;
}

m_next_stage->push_inter_stage_record_group(BasicSingleRecordGroup(&m_tags, &m_empty));
}
}
} // namespace reducer
45 changes: 45 additions & 0 deletions components/core/src/reducer/GroupByTime.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#ifndef CLP_AGGREGATION_GROUPBY_TIME_OPERATOR_HPP
#define CLP_AGGREGATION_GROUPBY_TIME_OPERATOR_HPP

#include <map>
#include <string>

#include "GroupTags.hpp"
#include "Operator.hpp"
#include "Record.hpp"

namespace reducer {
class GroupByTime : public Operator {
public:
GroupByTime(int64_t bucket_size = 5 * 60 * 1000)
: Operator(),
m_bucket_size(bucket_size),
m_prev_time(-1) {
m_tags.push_back("0");
}

virtual OperatorType get_type() const { return OperatorType::GROUPBY; }

virtual OperatorResultCardinality get_cardinality() const {
return OperatorResultCardinality::INPUT;
}

virtual void push_intra_stage_record_group(RecordGroup const& record_group) {
push_inter_stage_record_group(record_group);
}

virtual void push_inter_stage_record_group(RecordGroup const& record_group);

virtual std::unique_ptr<RecordGroupIterator> get_stored_result_iterator() {
return std::make_unique<EmptyRecordGroupIterator>();
}

private:
EmptyRecord m_empty;
GroupTags m_tags;
int64_t m_prev_time;
int64_t m_bucket_size;
};
} // namespace reducer

#endif // CLP_AGGREGATION_GROUPBY_TIME_OPERATOR_HPP
14 changes: 14 additions & 0 deletions components/core/src/reducer/GroupTags.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#ifndef CLP_AGGREGATION_GROUPTAGS_HPP
#define CLP_AGGREGATION_GROUPTAGS_HPP

#include <string>
#include <vector>

namespace reducer {
// We will do something fancier for GroupTags in the future,
// but this is good enough to get started

typedef std::vector<std::string> GroupTags;
} // namespace reducer

#endif // CLP_AGGREGATION_GROUPTAGS_HPP
13 changes: 13 additions & 0 deletions components/core/src/reducer/Operator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#include "Operator.hpp"

namespace reducer {
void Operator::finish() {
if (m_next_stage == nullptr) {
return;
}

for (auto it = this->get_stored_result_iterator(); !it->done(); it->next()) {
m_next_stage->push_inter_stage_record_group(*it->get());
}
}
} // namespace reducer
53 changes: 53 additions & 0 deletions components/core/src/reducer/Operator.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#ifndef CLP_AGGREGATION_OPERATOR_HPP
#define CLP_AGGREGATION_OPERATOR_HPP

#include <memory>
#include <set>

#include "RecordGroup.hpp"
#include "RecordGroupIterator.hpp"

namespace reducer {
enum class OperatorType {
MAP,
GROUPBY,
REDUCE
};

enum class OperatorResultCardinality {
ONE,
SUBSET,
INPUT
};

class Operator {
public:
Operator() : m_next_stage(nullptr) {}

virtual ~Operator() {}

virtual OperatorType get_type() const = 0;
virtual OperatorResultCardinality get_cardinality() const = 0;

virtual void push_intra_stage_record_group(RecordGroup const& record_group) = 0;
virtual void push_inter_stage_record_group(RecordGroup const& record_group) = 0;

void set_next_stage(std::shared_ptr<Operator> next_operator) { m_next_stage = next_operator; }

// TODO: default implementation of finish
void finish();

virtual std::unique_ptr<RecordGroupIterator> get_stored_result_iterator() = 0;

virtual std::unique_ptr<RecordGroupIterator> get_stored_result_iterator(
std::set<GroupTags> const& filtered_tags
) {
return get_stored_result_iterator();
}

protected:
std::shared_ptr<Operator> m_next_stage;
};
} // namespace reducer

#endif // CLP_AGGREGATION_OPERATOR_HPP
49 changes: 49 additions & 0 deletions components/core/src/reducer/Pipeline.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include "Pipeline.hpp"

#include "RecordGroup.hpp"

namespace reducer {
void Pipeline::push_record_group(RecordGroup const& record_group) {
if (m_pipeline.size() > 0) {
if (m_input_mode == PipelineInputMode::INTER_STAGE) {
m_pipeline[0]->push_inter_stage_record_group(record_group);
} else /*input_mode == PipelineInputMode::INTRA_STAGE*/ {
m_pipeline[0]->push_intra_stage_record_group(record_group);
}
}
// else silently drop
}

void Pipeline::push_record(Record const& record) {
push_record_group(BasicSingleRecordGroup(&m_empty_group_tags, &record));
}

void Pipeline::add_pipeline_stage(std::shared_ptr<Operator> op) {
m_pipeline.push_back(op);
if (m_pipeline.size() > 1) {
m_pipeline[m_pipeline.size() - 2]->set_next_stage(op);
}
}

std::unique_ptr<RecordGroupIterator> Pipeline::finish() {
for (auto op = m_pipeline.begin(); op != m_pipeline.end(); ++op) {
(*op)->finish();
}

if (!m_pipeline.empty()) {
return m_pipeline.back()->get_stored_result_iterator();
}

return std::unique_ptr<RecordGroupIterator>(new EmptyRecordGroupIterator());
}

std::unique_ptr<RecordGroupIterator> Pipeline::finish(std::set<GroupTags> const& filtered_tags) {
// FIXME: assumes no need to push results between stages, but we will change the
// programming model to eliminate the possibility of flushing between stages at the end later.
if (!m_pipeline.empty()) {
return m_pipeline.back()->get_stored_result_iterator(filtered_tags);
}

return std::unique_ptr<RecordGroupIterator>(new EmptyRecordGroupIterator());
}
} // namespace reducer
38 changes: 38 additions & 0 deletions components/core/src/reducer/Pipeline.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#ifndef CLP_AGGREGATION_PIPELINE_HPP
#define CLP_AGGREGATION_PIPELINE_HPP

#include <memory>
#include <set>
#include <vector>

#include "GroupTags.hpp"
#include "Operator.hpp"
#include "Record.hpp"
#include "RecordGroup.hpp"

namespace reducer {
enum class PipelineInputMode {
INTRA_STAGE,
INTER_STAGE
};

class Pipeline {
public:
Pipeline(PipelineInputMode input_mode) : m_input_mode(input_mode){};

void push_record(Record const& record);
void push_record_group(RecordGroup const& record_group);

void add_pipeline_stage(std::shared_ptr<Operator> op);

std::unique_ptr<RecordGroupIterator> finish();
std::unique_ptr<RecordGroupIterator> finish(std::set<GroupTags> const& filtered_tags);

private:
std::vector<std::shared_ptr<Operator>> m_pipeline;
PipelineInputMode m_input_mode;
GroupTags m_empty_group_tags;
};
} // namespace reducer

#endif // CLP_AGGREGATION_PIPELINE_HPP
Loading

0 comments on commit 785ce81

Please sign in to comment.