From f0ffe08840588c4c0884f553690d109839608814 Mon Sep 17 00:00:00 2001 From: Damtev Date: Thu, 13 May 2021 19:03:54 +0300 Subject: [PATCH] Added reads splitting with ShortKMerReadMapper --- .../src/common/io/reads/osequencestream.hpp | 38 ++++- assembler/src/common/io/reads/single_read.hpp | 17 +- .../src/projects/bin_refine/CMakeLists.txt | 3 +- assembler/src/projects/bin_refine/binning.cpp | 8 +- assembler/src/projects/bin_refine/binning.hpp | 1 + .../projects/bin_refine/binning_refiner.cpp | 28 +++- .../projects/bin_refine/read_splitting.cpp | 154 ++++++++++++++++++ .../projects/bin_refine/read_splitting.hpp | 31 ++++ 8 files changed, 255 insertions(+), 25 deletions(-) create mode 100644 assembler/src/projects/bin_refine/read_splitting.cpp create mode 100644 assembler/src/projects/bin_refine/read_splitting.hpp diff --git a/assembler/src/common/io/reads/osequencestream.hpp b/assembler/src/common/io/reads/osequencestream.hpp index 45e4db177..96976041f 100644 --- a/assembler/src/common/io/reads/osequencestream.hpp +++ b/assembler/src/common/io/reads/osequencestream.hpp @@ -22,7 +22,7 @@ namespace io { inline void WriteWrapped(const std::string &s, std::ostream &os, size_t max_width = 60) { size_t cur = 0; while (cur < s.size()) { - os << s.substr(cur, max_width) << "\n"; + os << s.substr(cur, max_width) << '\n'; cur += max_width; } } @@ -110,17 +110,29 @@ class osequencestream_bgc: public osequencestream { struct FastaWriter { static void Write(std::ostream &stream, const SingleRead &read) { - stream << ">" << read.name() << "\n"; + stream << '>' << read.name() << '\n'; WriteWrapped(read.GetSequenceString(), stream); } + + static void Write(std::ostream &stream, const SingleReadSeq &read) { + stream << '>' << '\n' + << read.sequence() << '\n'; + } }; struct FastqWriter { static void Write(std::ostream &stream, const SingleRead &read) { - stream << "@" << read.name() << std::endl - << read.GetSequenceString() << std::endl - << "+" << std::endl - << read.GetPhredQualityString() << std::endl; + stream << '@' << read.name() << '\n' + << read.GetSequenceString() << '\n' + << '+' << '\n' + << read.GetPhredQualityString() << '\n'; + } + + static void Write(std::ostream &stream, const SingleReadSeq &read) { + stream << '@' << '\n' + << read.sequence() << '\n' + << '+' << '\n' + << std::string(read.sequence().size(), '#') << '\n'; } }; @@ -130,14 +142,18 @@ class OReadStream { typedef SingleRead ReadT; OReadStream(const std::string &filename) - : stream_(filename) { - } + : stream_(filename) {} OReadStream &operator<<(const SingleRead &read) { Writer::Write(stream_, read); return *this; } + OReadStream &operator<<(const SingleReadSeq &read) { + Writer::Write(stream_, read); + return *this; + } + void close() { stream_.close(); } @@ -166,6 +182,12 @@ class OPairedReadStream { return *this; } + OPairedReadStream &operator<<(const PairedReadSeq &read) { + Writer::Write(left_stream_, rc1_ ? !read.first() : read.first()); + Writer::Write(right_stream_, rc2_ ? !read.second(): read.second()); + return *this; + } + void close() { left_stream_.close(); right_stream_.close(); diff --git a/assembler/src/common/io/reads/single_read.hpp b/assembler/src/common/io/reads/single_read.hpp index aa7e823f4..8a12acb41 100644 --- a/assembler/src/common/io/reads/single_read.hpp +++ b/assembler/src/common/io/reads/single_read.hpp @@ -218,9 +218,8 @@ class SingleRead { * @variable The flag of SingleRead correctness. */ - //Left and right offsets with respect to original sequence + // Left and right offsets with respect to original sequence SequenceOffsetT left_offset_; - SequenceOffsetT right_offset_; bool valid_; @@ -266,15 +265,13 @@ inline std::ostream &operator<<(std::ostream &os, const SingleRead &read) { } class SingleReadSeq { - public: explicit SingleReadSeq(const Sequence &s, - SequenceOffsetT left_offset = 0, SequenceOffsetT right_offset = 0) : - seq_(s), left_offset_(left_offset), right_offset_(right_offset) { - } + SequenceOffsetT left_offset = 0, SequenceOffsetT right_offset = 0) + : seq_(s), left_offset_(left_offset), right_offset_(right_offset) {} - SingleReadSeq() : seq_(), left_offset_(0), right_offset_(0) { - } + SingleReadSeq() + : seq_(), left_offset_(0), right_offset_(0) {} bool BinRead(std::istream &file) { seq_.BinRead(file); @@ -305,7 +302,7 @@ class SingleReadSeq { return seq_ == singleread.seq_; } - const Sequence sequence() const { + Sequence sequence() const { return seq_; } @@ -332,7 +329,7 @@ class SingleReadSeq { private: Sequence seq_; - //Left and right offsets with respect to original sequence + // Left and right offsets with respect to original sequence SequenceOffsetT left_offset_; SequenceOffsetT right_offset_; }; diff --git a/assembler/src/projects/bin_refine/CMakeLists.txt b/assembler/src/projects/bin_refine/CMakeLists.txt index 55a4ffafd..2ae918749 100644 --- a/assembler/src/projects/bin_refine/CMakeLists.txt +++ b/assembler/src/projects/bin_refine/CMakeLists.txt @@ -11,7 +11,8 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}) add_executable(bin-refine alpha_assigner.cpp alpha_propagation.cpp - binning_refiner.cpp binning.cpp labels_propagation.cpp link_index.cpp paired_end.cpp + binning_refiner.cpp binning.cpp labels_propagation.cpp link_index.cpp + paired_end.cpp read_splitting.cpp binning_assignment_strategy.cpp majority_length_strategy.cpp max_likelihood_strategy.cpp) target_link_libraries(bin-refine Blaze graphio toolchain common_modules ${COMMON_LIBRARIES}) diff --git a/assembler/src/projects/bin_refine/binning.cpp b/assembler/src/projects/bin_refine/binning.cpp index eee91649f..0e245dc1b 100644 --- a/assembler/src/projects/bin_refine/binning.cpp +++ b/assembler/src/projects/bin_refine/binning.cpp @@ -6,15 +6,15 @@ #include "binning.hpp" -#include "blaze/math/dense/DynamicVector.h" #include "io/utils/id_mapper.hpp" - #include "io/reads/io_helper.hpp" -#include "utils/filesystem/path_helper.hpp" -#include "csv/csv.h" +#include "utils/filesystem/path_helper.hpp" #include "utils/stl_utils.hpp" +#include +#include + using namespace debruijn_graph; using namespace bin_stats; diff --git a/assembler/src/projects/bin_refine/binning.hpp b/assembler/src/projects/bin_refine/binning.hpp index cb9864832..e71524136 100644 --- a/assembler/src/projects/bin_refine/binning.hpp +++ b/assembler/src/projects/bin_refine/binning.hpp @@ -110,6 +110,7 @@ class Binning { void WriteToBinningFile(const std::string& binning_file, uint64_t output_options, const SoftBinsAssignment &edge_soft_labels, const BinningAssignmentStrategy& assignment_strategy, const io::IdMapper &edge_mapper); + void AssignEdgeBins(const SoftBinsAssignment& soft_bins_assignment, const BinningAssignmentStrategy& assignment_strategy); const debruijn_graph::Graph& graph() const { return graph_; } diff --git a/assembler/src/projects/bin_refine/binning_refiner.cpp b/assembler/src/projects/bin_refine/binning_refiner.cpp index f7a19fe19..799239d31 100644 --- a/assembler/src/projects/bin_refine/binning_refiner.cpp +++ b/assembler/src/projects/bin_refine/binning_refiner.cpp @@ -11,6 +11,7 @@ #include "majority_length_strategy.hpp" #include "max_likelihood_strategy.hpp" #include "paired_end.hpp" +#include "read_splitting.hpp" #include "assembly_graph/core/graph.hpp" #include "pipeline/config_struct.hpp" @@ -65,6 +66,8 @@ struct gcfg { bool debug = false; bool bin_dist = false; uint64_t out_options = 0; + bool split_reads = false; + double bin_weight_threshold = 0.1; }; static void process_cmdline(int argc, char** argv, gcfg& cfg) { @@ -99,6 +102,10 @@ static void process_cmdline(int argc, char** argv, gcfg& cfg) { (option("-lt") & value("--length-threshold", cfg.length_threshold)) % "Binning will not be propagated to edges longer than threshold", (option("-db") & value("--distance-bound", cfg.distance_bound)) % "Binning will not be propagated further than bound" ), + "Read splitting options:" % ( + (option("-r", "--reads").set(cfg.split_reads) % "split reads according to binning"), + (option("-b", "--bin-weight") & value("threshold", cfg.bin_weight_threshold)) % "reads bin weight threshold" + ), "Developer options:" % ( (option("--bin-load").set(cfg.bin_load)) % "load binary-converted reads from tmpdir", (option("--debug").set(cfg.debug)) % "produce lots of debug data", @@ -275,10 +282,11 @@ int main(int argc, char** argv) { } binning::LinkIndex pe_links(graph); - if (cfg.libindex != -1u) { - INFO("Processing paired-end reads"); + if (cfg.libindex != -1u || cfg.split_reads) debruijn_graph::config::init_libs(dataset, cfg.nthreads, cfg.tmpdir); + if (cfg.libindex != -1u) { + INFO("Processing paired-end reads"); auto &lib = dataset[cfg.libindex]; if (lib.is_paired()) { binning::FillPairedEndLinks(pe_links, lib, graph, @@ -357,6 +365,22 @@ int main(int argc, char** argv) { links.dump(fs::append_path(cfg.prefix, "graph_links.tsv"), *id_mapper); } + if (cfg.split_reads) { + auto &lib = dataset[0]; + + INFO("Splitting reads started"); + binning::SplitAndWriteReads(graph, + lib, + binning, + soft_edge_labels, + *assignment_strategy, + cfg.tmpdir, + cfg.prefix, + cfg.nthreads, + cfg.bin_weight_threshold); + INFO("Splitting reads ended"); + } + if (!cfg.debug) fs::remove_dir(cfg.tmpdir); } catch (const std::string& s) { diff --git a/assembler/src/projects/bin_refine/read_splitting.cpp b/assembler/src/projects/bin_refine/read_splitting.cpp new file mode 100644 index 000000000..2a907edba --- /dev/null +++ b/assembler/src/projects/bin_refine/read_splitting.cpp @@ -0,0 +1,154 @@ +//*************************************************************************** +//* Copyright (c) 2021-2022 Saint Petersburg State University +//* All Rights Reserved +//* See file LICENSE for details. +//*************************************************************************** + +#include "read_splitting.hpp" +#include "binning.hpp" + +#include "modules/alignment/sequence_mapper_notifier.hpp" +#include "modules/alignment/sequence_mapper.hpp" +#include "modules/alignment/kmer_sequence_mapper.hpp" + +#include "io/reads/osequencestream.hpp" +#include "io/reads/io_helper.hpp" +#include "io/dataset_support/read_converter.hpp" + +#include + +#include + +using namespace debruijn_graph; +using namespace bin_stats; + +namespace binning { + +class ReadsPathsToComponentsListener : public debruijn_graph::SequenceMapperListener { + public: + ReadsPathsToComponentsListener(const Binning &binning, + std::vector &binned_reads_ostreams, + io::OFastqPairedStream &unbinned_reads_ostream, + const SoftBinsAssignment &soft_edge_labels, + const BinningAssignmentStrategy &assignment_strategy, + const double bin_weight_threshold) + : binned_reads_ostreams_(binned_reads_ostreams), + unbinned_reads_ostream_(unbinned_reads_ostream), + binning_(binning), + soft_edge_labels_(soft_edge_labels), + assignment_strategy_(assignment_strategy), + bin_weight_threshold_(bin_weight_threshold) {} + + void StartProcessLibrary(size_t threads_count) override { + binned_reads_.resize(threads_count, BinnedReads(binned_reads_ostreams_.size())); + unbinned_reads_.resize(threads_count); + } + + void MergeBuffer(size_t thread_index) override { + BinnedReads &reads = binned_reads_[thread_index]; + ReadVector &unbinned = unbinned_reads_[thread_index]; + + for (size_t part_id = 0; part_id < binned_reads_ostreams_.size(); ++part_id) { + io::OFastqPairedStream& paired_ostream = binned_reads_ostreams_[part_id]; + + for (const auto& paired_read : reads[part_id]) + paired_ostream << paired_read; + + reads[part_id].clear(); + } + + for (const auto& paired_read : unbinned) + unbinned_reads_ostream_ << paired_read; + unbinned.clear(); + } + + void ProcessPairedRead(size_t thread_index, + const io::PairedReadSeq& paired_read, + const MappingPath& first_path, + const MappingPath& second_path) override { + BinnedReads &reads = binned_reads_[thread_index]; + ReadVector &unbinned = unbinned_reads_[thread_index]; + + if (!first_path.empty() && !second_path.empty()) { + auto first_read_bins = + assignment_strategy_.ChooseMajorBins(assignment_strategy_.AssignScaffoldBins(first_path.simple_path(), + soft_edge_labels_, binning_), + soft_edge_labels_, binning_); + auto second_read_bins = + assignment_strategy_.ChooseMajorBins(assignment_strategy_.AssignScaffoldBins(second_path.simple_path(), + soft_edge_labels_, binning_), + soft_edge_labels_, binning_); + + std::unordered_set read_bins; + read_bins.insert(first_read_bins.begin(), first_read_bins.end()); + read_bins.insert(second_read_bins.begin(), second_read_bins.end()); + + for (auto bin : read_bins) + reads[bin].push_back(paired_read); + } else + unbinned.push_back(paired_read); + } + + private: + using ReadVector = std::vector; + using BinnedReads = std::vector; + // Per thread + std::vector binned_reads_; + std::vector unbinned_reads_; + + std::vector &binned_reads_ostreams_; + io::OFastqPairedStream &unbinned_reads_ostream_; + + const Binning &binning_; + const SoftBinsAssignment& soft_edge_labels_; + const BinningAssignmentStrategy& assignment_strategy_; + + double bin_weight_threshold_; +}; + +void SplitAndWriteReads(const debruijn_graph::Graph &graph, + SequencingLib &lib, + const Binning &binning, + const SoftBinsAssignment& edge_soft_labels, + const BinningAssignmentStrategy& assignment_strategy, + const std::string &work_dir, + const std::string &prefix, + unsigned nthreads, + const double bin_weight_threshold) { + if (!io::ReadConverter::LoadLibIfExists(lib)) { + std::unique_ptr pool; + if (nthreads > 1) + pool = std::make_unique(nthreads); + io::ReadConverter::ConvertToBinary(lib, pool.get()); + } + + io::OFastqPairedStream unbinned_reads_ostream(fs::append_path(prefix, "unbinned_1.fastq"), + fs::append_path(prefix, "unbinned_2.fastq")); + + std::vector binned_reads_ostreams; + for (size_t bin_id = 0; bin_id < binning.bins().size(); ++bin_id) { + auto bin_label = binning.bin_labels().at(bin_id); + std::replace(bin_label.begin(), bin_label.end(), '/', '_'); + const std::pair cur_part_paired_reads_filenames + {fs::append_path(prefix, bin_label + "_1.fastq"), + fs::append_path(prefix, bin_label + "_2.fastq")}; + binned_reads_ostreams.emplace_back(cur_part_paired_reads_filenames.first, + cur_part_paired_reads_filenames.second); + } + + SequenceMapperNotifier notifier; + auto mapper = alignment::ShortKMerReadMapper(graph, work_dir); + ReadsPathsToComponentsListener listener(binning, + binned_reads_ostreams, + unbinned_reads_ostream, + edge_soft_labels, + assignment_strategy, + bin_weight_threshold); + notifier.Subscribe(&listener); + auto paired_streams = paired_binary_readers(lib, /*followed by rc*/ false, 0, + /*include merged*/true); + notifier.ProcessLibrary(paired_streams, mapper); +} + +} + diff --git a/assembler/src/projects/bin_refine/read_splitting.hpp b/assembler/src/projects/bin_refine/read_splitting.hpp new file mode 100644 index 000000000..c972f5c32 --- /dev/null +++ b/assembler/src/projects/bin_refine/read_splitting.hpp @@ -0,0 +1,31 @@ +//*************************************************************************** +//* Copyright (c) 2021-2022 Saint Petersburg State University +//* All Rights Reserved +//* See file LICENSE for details. +//*************************************************************************** + +#pragma once + +#include "binning.hpp" + +#include "assembly_graph/core/graph.hpp" + +#include "pipeline/library.hpp" +#include "pipeline/library_data.hpp" + +typedef io::DataSet DataSet; +typedef io::SequencingLibrary SequencingLib; + +namespace binning { + +void SplitAndWriteReads(const debruijn_graph::Graph &graph, + SequencingLib &lib, + const bin_stats::Binning &binning, + const bin_stats::SoftBinsAssignment& edge_soft_labels, + const bin_stats::BinningAssignmentStrategy& assignment_strategy, + const std::string &work_dir, + const std::string &prefix, + unsigned nthreads, + const double bin_weight_threshold); + +}