Skip to content

Commit

Permalink
Added reads splitting with ShortKMerReadMapper
Browse files Browse the repository at this point in the history
  • Loading branch information
Damtev authored and asl committed Feb 12, 2022
1 parent 5bed4b9 commit f0ffe08
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 25 deletions.
38 changes: 30 additions & 8 deletions assembler/src/common/io/reads/osequencestream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace io {
inline void WriteWrapped(const std::string &s, std::ostream &os, size_t max_width = 60) {
size_t cur = 0;
while (cur < s.size()) {
os << s.substr(cur, max_width) << "\n";
os << s.substr(cur, max_width) << '\n';
cur += max_width;
}
}
Expand Down Expand Up @@ -110,17 +110,29 @@ class osequencestream_bgc: public osequencestream {

struct FastaWriter {
static void Write(std::ostream &stream, const SingleRead &read) {
stream << ">" << read.name() << "\n";
stream << '>' << read.name() << '\n';
WriteWrapped(read.GetSequenceString(), stream);
}

static void Write(std::ostream &stream, const SingleReadSeq &read) {
stream << '>' << '\n'
<< read.sequence() << '\n';
}
};

struct FastqWriter {
static void Write(std::ostream &stream, const SingleRead &read) {
stream << "@" << read.name() << std::endl
<< read.GetSequenceString() << std::endl
<< "+" << std::endl
<< read.GetPhredQualityString() << std::endl;
stream << '@' << read.name() << '\n'
<< read.GetSequenceString() << '\n'
<< '+' << '\n'
<< read.GetPhredQualityString() << '\n';
}

static void Write(std::ostream &stream, const SingleReadSeq &read) {
stream << '@' << '\n'
<< read.sequence() << '\n'
<< '+' << '\n'
<< std::string(read.sequence().size(), '#') << '\n';
}
};

Expand All @@ -130,14 +142,18 @@ class OReadStream {
typedef SingleRead ReadT;

OReadStream(const std::string &filename)
: stream_(filename) {
}
: stream_(filename) {}

OReadStream &operator<<(const SingleRead &read) {
Writer::Write(stream_, read);
return *this;
}

OReadStream &operator<<(const SingleReadSeq &read) {
Writer::Write(stream_, read);
return *this;
}

void close() {
stream_.close();
}
Expand Down Expand Up @@ -166,6 +182,12 @@ class OPairedReadStream {
return *this;
}

OPairedReadStream &operator<<(const PairedReadSeq &read) {
Writer::Write(left_stream_, rc1_ ? !read.first() : read.first());
Writer::Write(right_stream_, rc2_ ? !read.second(): read.second());
return *this;
}

void close() {
left_stream_.close();
right_stream_.close();
Expand Down
17 changes: 7 additions & 10 deletions assembler/src/common/io/reads/single_read.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,8 @@ class SingleRead {
* @variable The flag of SingleRead correctness.
*/

//Left and right offsets with respect to original sequence
// Left and right offsets with respect to original sequence
SequenceOffsetT left_offset_;

SequenceOffsetT right_offset_;

bool valid_;
Expand Down Expand Up @@ -266,15 +265,13 @@ inline std::ostream &operator<<(std::ostream &os, const SingleRead &read) {
}

class SingleReadSeq {

public:
explicit SingleReadSeq(const Sequence &s,
SequenceOffsetT left_offset = 0, SequenceOffsetT right_offset = 0) :
seq_(s), left_offset_(left_offset), right_offset_(right_offset) {
}
SequenceOffsetT left_offset = 0, SequenceOffsetT right_offset = 0)
: seq_(s), left_offset_(left_offset), right_offset_(right_offset) {}

SingleReadSeq() : seq_(), left_offset_(0), right_offset_(0) {
}
SingleReadSeq()
: seq_(), left_offset_(0), right_offset_(0) {}

bool BinRead(std::istream &file) {
seq_.BinRead(file);
Expand Down Expand Up @@ -305,7 +302,7 @@ class SingleReadSeq {
return seq_ == singleread.seq_;
}

const Sequence sequence() const {
Sequence sequence() const {
return seq_;
}

Expand All @@ -332,7 +329,7 @@ class SingleReadSeq {
private:
Sequence seq_;

//Left and right offsets with respect to original sequence
// Left and right offsets with respect to original sequence
SequenceOffsetT left_offset_;
SequenceOffsetT right_offset_;
};
Expand Down
3 changes: 2 additions & 1 deletion assembler/src/projects/bin_refine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR})

add_executable(bin-refine
alpha_assigner.cpp alpha_propagation.cpp
binning_refiner.cpp binning.cpp labels_propagation.cpp link_index.cpp paired_end.cpp
binning_refiner.cpp binning.cpp labels_propagation.cpp link_index.cpp
paired_end.cpp read_splitting.cpp
binning_assignment_strategy.cpp majority_length_strategy.cpp max_likelihood_strategy.cpp)
target_link_libraries(bin-refine Blaze graphio toolchain common_modules ${COMMON_LIBRARIES})

Expand Down
8 changes: 4 additions & 4 deletions assembler/src/projects/bin_refine/binning.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@

#include "binning.hpp"

#include "blaze/math/dense/DynamicVector.h"
#include "io/utils/id_mapper.hpp"

#include "io/reads/io_helper.hpp"
#include "utils/filesystem/path_helper.hpp"

#include "csv/csv.h"
#include "utils/filesystem/path_helper.hpp"
#include "utils/stl_utils.hpp"

#include <blaze/math/dense/DynamicVector.h>
#include <csv/csv.h>

using namespace debruijn_graph;
using namespace bin_stats;

Expand Down
1 change: 1 addition & 0 deletions assembler/src/projects/bin_refine/binning.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class Binning {
void WriteToBinningFile(const std::string& binning_file, uint64_t output_options,
const SoftBinsAssignment &edge_soft_labels, const BinningAssignmentStrategy& assignment_strategy,
const io::IdMapper<std::string> &edge_mapper);
void AssignEdgeBins(const SoftBinsAssignment& soft_bins_assignment, const BinningAssignmentStrategy& assignment_strategy);

const debruijn_graph::Graph& graph() const { return graph_; }

Expand Down
28 changes: 26 additions & 2 deletions assembler/src/projects/bin_refine/binning_refiner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "majority_length_strategy.hpp"
#include "max_likelihood_strategy.hpp"
#include "paired_end.hpp"
#include "read_splitting.hpp"

#include "assembly_graph/core/graph.hpp"
#include "pipeline/config_struct.hpp"
Expand Down Expand Up @@ -65,6 +66,8 @@ struct gcfg {
bool debug = false;
bool bin_dist = false;
uint64_t out_options = 0;
bool split_reads = false;
double bin_weight_threshold = 0.1;
};

static void process_cmdline(int argc, char** argv, gcfg& cfg) {
Expand Down Expand Up @@ -99,6 +102,10 @@ static void process_cmdline(int argc, char** argv, gcfg& cfg) {
(option("-lt") & value("--length-threshold", cfg.length_threshold)) % "Binning will not be propagated to edges longer than threshold",
(option("-db") & value("--distance-bound", cfg.distance_bound)) % "Binning will not be propagated further than bound"
),
"Read splitting options:" % (
(option("-r", "--reads").set(cfg.split_reads) % "split reads according to binning"),
(option("-b", "--bin-weight") & value("threshold", cfg.bin_weight_threshold)) % "reads bin weight threshold"
),
"Developer options:" % (
(option("--bin-load").set(cfg.bin_load)) % "load binary-converted reads from tmpdir",
(option("--debug").set(cfg.debug)) % "produce lots of debug data",
Expand Down Expand Up @@ -275,10 +282,11 @@ int main(int argc, char** argv) {
}

binning::LinkIndex pe_links(graph);
if (cfg.libindex != -1u) {
INFO("Processing paired-end reads");
if (cfg.libindex != -1u || cfg.split_reads)
debruijn_graph::config::init_libs(dataset, cfg.nthreads, cfg.tmpdir);

if (cfg.libindex != -1u) {
INFO("Processing paired-end reads");
auto &lib = dataset[cfg.libindex];
if (lib.is_paired()) {
binning::FillPairedEndLinks(pe_links, lib, graph,
Expand Down Expand Up @@ -357,6 +365,22 @@ int main(int argc, char** argv) {
links.dump(fs::append_path(cfg.prefix, "graph_links.tsv"), *id_mapper);
}

if (cfg.split_reads) {
auto &lib = dataset[0];

INFO("Splitting reads started");
binning::SplitAndWriteReads(graph,
lib,
binning,
soft_edge_labels,
*assignment_strategy,
cfg.tmpdir,
cfg.prefix,
cfg.nthreads,
cfg.bin_weight_threshold);
INFO("Splitting reads ended");
}

if (!cfg.debug)
fs::remove_dir(cfg.tmpdir);
} catch (const std::string& s) {
Expand Down
154 changes: 154 additions & 0 deletions assembler/src/projects/bin_refine/read_splitting.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
//***************************************************************************
//* Copyright (c) 2021-2022 Saint Petersburg State University
//* All Rights Reserved
//* See file LICENSE for details.
//***************************************************************************

#include "read_splitting.hpp"
#include "binning.hpp"

#include "modules/alignment/sequence_mapper_notifier.hpp"
#include "modules/alignment/sequence_mapper.hpp"
#include "modules/alignment/kmer_sequence_mapper.hpp"

#include "io/reads/osequencestream.hpp"
#include "io/reads/io_helper.hpp"
#include "io/dataset_support/read_converter.hpp"

#include <threadpool/threadpool.hpp>

#include <vector>

using namespace debruijn_graph;
using namespace bin_stats;

namespace binning {

class ReadsPathsToComponentsListener : public debruijn_graph::SequenceMapperListener {
public:
ReadsPathsToComponentsListener(const Binning &binning,
std::vector<io::OFastqPairedStream> &binned_reads_ostreams,
io::OFastqPairedStream &unbinned_reads_ostream,
const SoftBinsAssignment &soft_edge_labels,
const BinningAssignmentStrategy &assignment_strategy,
const double bin_weight_threshold)
: binned_reads_ostreams_(binned_reads_ostreams),
unbinned_reads_ostream_(unbinned_reads_ostream),
binning_(binning),
soft_edge_labels_(soft_edge_labels),
assignment_strategy_(assignment_strategy),
bin_weight_threshold_(bin_weight_threshold) {}

void StartProcessLibrary(size_t threads_count) override {
binned_reads_.resize(threads_count, BinnedReads(binned_reads_ostreams_.size()));
unbinned_reads_.resize(threads_count);
}

void MergeBuffer(size_t thread_index) override {
BinnedReads &reads = binned_reads_[thread_index];
ReadVector &unbinned = unbinned_reads_[thread_index];

for (size_t part_id = 0; part_id < binned_reads_ostreams_.size(); ++part_id) {
io::OFastqPairedStream& paired_ostream = binned_reads_ostreams_[part_id];

for (const auto& paired_read : reads[part_id])
paired_ostream << paired_read;

reads[part_id].clear();
}

for (const auto& paired_read : unbinned)
unbinned_reads_ostream_ << paired_read;
unbinned.clear();
}

void ProcessPairedRead(size_t thread_index,
const io::PairedReadSeq& paired_read,
const MappingPath<EdgeId>& first_path,
const MappingPath<EdgeId>& second_path) override {
BinnedReads &reads = binned_reads_[thread_index];
ReadVector &unbinned = unbinned_reads_[thread_index];

if (!first_path.empty() && !second_path.empty()) {
auto first_read_bins =
assignment_strategy_.ChooseMajorBins(assignment_strategy_.AssignScaffoldBins(first_path.simple_path(),
soft_edge_labels_, binning_),
soft_edge_labels_, binning_);
auto second_read_bins =
assignment_strategy_.ChooseMajorBins(assignment_strategy_.AssignScaffoldBins(second_path.simple_path(),
soft_edge_labels_, binning_),
soft_edge_labels_, binning_);

std::unordered_set<bin_stats::Binning::BinId> read_bins;
read_bins.insert(first_read_bins.begin(), first_read_bins.end());
read_bins.insert(second_read_bins.begin(), second_read_bins.end());

for (auto bin : read_bins)
reads[bin].push_back(paired_read);
} else
unbinned.push_back(paired_read);
}

private:
using ReadVector = std::vector<io::PairedReadSeq>;
using BinnedReads = std::vector<ReadVector>;
// Per thread
std::vector<BinnedReads> binned_reads_;
std::vector<ReadVector> unbinned_reads_;

std::vector<io::OFastqPairedStream> &binned_reads_ostreams_;
io::OFastqPairedStream &unbinned_reads_ostream_;

const Binning &binning_;
const SoftBinsAssignment& soft_edge_labels_;
const BinningAssignmentStrategy& assignment_strategy_;

double bin_weight_threshold_;
};

void SplitAndWriteReads(const debruijn_graph::Graph &graph,
SequencingLib &lib,
const Binning &binning,
const SoftBinsAssignment& edge_soft_labels,
const BinningAssignmentStrategy& assignment_strategy,
const std::string &work_dir,
const std::string &prefix,
unsigned nthreads,
const double bin_weight_threshold) {
if (!io::ReadConverter::LoadLibIfExists(lib)) {
std::unique_ptr<ThreadPool::ThreadPool> pool;
if (nthreads > 1)
pool = std::make_unique<ThreadPool::ThreadPool>(nthreads);
io::ReadConverter::ConvertToBinary(lib, pool.get());
}

io::OFastqPairedStream unbinned_reads_ostream(fs::append_path(prefix, "unbinned_1.fastq"),
fs::append_path(prefix, "unbinned_2.fastq"));

std::vector<io::OFastqPairedStream> binned_reads_ostreams;
for (size_t bin_id = 0; bin_id < binning.bins().size(); ++bin_id) {
auto bin_label = binning.bin_labels().at(bin_id);
std::replace(bin_label.begin(), bin_label.end(), '/', '_');
const std::pair<std::string, std::string> cur_part_paired_reads_filenames
{fs::append_path(prefix, bin_label + "_1.fastq"),
fs::append_path(prefix, bin_label + "_2.fastq")};
binned_reads_ostreams.emplace_back(cur_part_paired_reads_filenames.first,
cur_part_paired_reads_filenames.second);
}

SequenceMapperNotifier notifier;
auto mapper = alignment::ShortKMerReadMapper(graph, work_dir);
ReadsPathsToComponentsListener listener(binning,
binned_reads_ostreams,
unbinned_reads_ostream,
edge_soft_labels,
assignment_strategy,
bin_weight_threshold);
notifier.Subscribe(&listener);
auto paired_streams = paired_binary_readers(lib, /*followed by rc*/ false, 0,
/*include merged*/true);
notifier.ProcessLibrary(paired_streams, mapper);
}

}

Loading

0 comments on commit f0ffe08

Please sign in to comment.