Skip to content

Commit

Permalink
remove databatch dependency to tbb
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Sep 14, 2024
1 parent d1d7637 commit 8d8ffc8
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 16 deletions.
13 changes: 1 addition & 12 deletions cpp/ppc-framework/io/DataBatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#pragma once
#include "../Common.h"
#include <bcos-utilities/Common.h>
#include <tbb/tbb.h>
#include <boost/lexical_cast.hpp>
#include <algorithm>
#include <memory>
Expand Down Expand Up @@ -199,18 +198,8 @@ class DataBatch

DataSchema getDataSchema() const { return m_dataSchema; }

uint32_t dedup()
{
if (!m_data || m_data->empty())
{
return 0;
}

tbb::parallel_sort(m_data->begin(), m_data->end());
auto unique_end = std::unique(m_data->begin(), m_data->end());
m_data->erase(unique_end, m_data->end());
return m_data->size();
}
std::shared_ptr<std::vector<DataType>>& mutableData() const { return m_data; }

private:
std::shared_ptr<std::vector<DataType>> m_data;
Expand Down
15 changes: 14 additions & 1 deletion cpp/wedpr-computing/ppc-psi/src/cm2020-psi/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

#include "ppc-framework/Common.h"
#include "ppc-framework/protocol/PPCMessageFace.h"

#include <bcos-utilities/Common.h>
#include <bcos-utilities/Log.h>
#include <tbb/tbb.h>


namespace ppc::psi
Expand Down Expand Up @@ -65,4 +65,17 @@ enum class CM2020PSIRetCode : int
INVALID_TASK_PARAM = -3002
};

inline uint32_t dedupDataBatch(ppc::io::DataBatch::Ptr dataBatch)
{
if (!dataBatch || dataBatch->mutableData() == nullptr || dataBatch->mutableData()->empty())
{
return 0;
}
auto& data = dataBatch->mutableData();
tbb::parallel_sort(data->begin(), data->end());
auto unique_end = std::unique(data->begin(), data->end());
data->erase(unique_end, m_data->end());
return data->size();
}

} // namespace ppc::psi
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,7 @@ void CM2020PSIReceiver::saveResults()
}
CM2020_PSI_LOG(INFO) << LOG_BADGE("before dedup") << LOG_KV("taskID", m_taskID)
<< LOG_KV("originCount", finalResults->size());
m_resultCount = finalResults->dedup();
m_resultCount = dedupDataBatch(finalResults);
CM2020_PSI_LOG(INFO) << LOG_BADGE("after dedup") << LOG_KV("taskID", m_taskID)
<< LOG_KV("resultCount", m_resultCount);
m_taskState->writeLines(finalResults, DataSchema::Bytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ void CM2020PSISender::saveResults()
}
CM2020_PSI_LOG(INFO) << LOG_BADGE("before dedup") << LOG_KV("taskID", m_taskID)
<< LOG_KV("originCount", finalResults->size());
m_resultCount = finalResults->dedup();
m_resultCount = dedupDataBatch(finalResults);
CM2020_PSI_LOG(INFO) << LOG_BADGE("after dedup") << LOG_KV("taskID", m_taskID)
<< LOG_KV("resultCount", m_resultCount);
m_taskState->writeLines(finalResults, DataSchema::Bytes);
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-storage/ppc-io/src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
file(GLOB_RECURSE SRCS *.cpp)
add_library(${IO_TARGET} ${SRCS})
# Note: the DataBatch depends on tbb
target_link_libraries(${IO_TARGET} PUBLIC ${BCOS_UTILITIES_TARGET} ${BCOS_BOOSTSSL_TARGET} ${HDFS_LIB} ${CPU_FEATURES_LIB} TBB::tbb)
target_link_libraries(${IO_TARGET} PUBLIC ${BCOS_UTILITIES_TARGET} ${BCOS_BOOSTSSL_TARGET} ${HDFS_LIB} ${CPU_FEATURES_LIB})

0 comments on commit 8d8ffc8

Please sign in to comment.