Skip to content

Commit

Permalink
Extract long lambdas for readability
Browse files Browse the repository at this point in the history
  • Loading branch information
Zitrax committed Oct 8, 2023
1 parent db1477e commit 517d056
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 87 deletions.
187 changes: 100 additions & 87 deletions src/torrent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,79 @@ uint32_t Torrent::piece_length(uint32_t id) const {
return m_piece_length;
}

void Torrent::verify_piece_single_file(uintmax_t file_length,
std::atomic_uint32_t& num_pieces,
std::mutex& mutex,
const Sha1& sha1) {
const auto id = zit::numeric_cast<uint32_t>(&sha1 - m_pieces.data());
const auto offset = id * m_piece_length;
ifstream is{m_tmpfile, ios::in | ios::binary};
is.exceptions(ifstream::failbit | ifstream::badbit);
is.seekg(offset);
const auto tail = zit::numeric_cast<uint32_t>(file_length - offset);
const auto len = std::min(m_piece_length, tail);
bytes data(len);
is.read(reinterpret_cast<char*>(data.data()), len);
const auto fsha1 = Sha1::calculateData(data);
if (sha1 == fsha1) {
// Lock when updating num_pieces, inserting into
// std::map is likely not thread safe.
const std::lock_guard<std::mutex> lock(mutex);
m_client_pieces[id] = true;
m_active_pieces.emplace(
id, make_shared<Piece>(PieceId(id), PieceSize(piece_length(id))));
m_active_pieces[id]->set_piece_written(true);
++num_pieces;
} else {
logger()->trace("Piece {} does not match ({}!={})", id, sha1, fsha1);
}
}

void Torrent::verify_piece_multi_file(std::atomic_uint32_t& num_pieces,
std::mutex& mutex,
int64_t global_len,
const Sha1& sha1) {
const auto id = zit::numeric_cast<uint32_t>(&sha1 - m_pieces.data());
const auto pos = id * m_piece_length;
// The piece might be spread over more than one file
bytes data(m_piece_length, 0_b);
auto remaining = numeric_cast<int64_t>(m_piece_length);
int64_t gpos = pos; // global pos
int64_t ppos = 0; // piece pos
while (remaining > 0 && gpos < global_len) {
const auto& [fi, offset, left] = file_at_pos(gpos);
const auto file = name() / fi.path();
if (!filesystem::exists(file)) {
return;
}
ifstream is{name() / fi.path(), ios::in | ios::binary};
is.exceptions(ifstream::failbit | ifstream::badbit);
is.seekg(offset);
const auto len = std::min(left, remaining);
is.read(reinterpret_cast<char*>(
std::next(data.data(), numeric_cast<std::ptrdiff_t>(ppos))),
len);
gpos += len;
ppos += len;
remaining -= len;
}
// Since last piece might not be filled
data.resize(data.size() - numeric_cast<size_t>(remaining));
const auto fsha1 = Sha1::calculateData(data);
if (sha1 == fsha1) {
// Lock when updating num_pieces, inserting into std::map is
// likely not thread safe.
const std::lock_guard<std::mutex> lock(mutex);
m_client_pieces[id] = true;
m_active_pieces.emplace(
id, make_shared<Piece>(PieceId(id), PieceSize(piece_length(id))));
m_active_pieces[id]->set_piece_written(true);
++num_pieces;
} else {
logger()->trace("Piece {} does not match ({}!={})", id, sha1, fsha1);
}
}

void Torrent::verify_existing_file() {
bool full_file = false;

Expand All @@ -247,40 +320,8 @@ void Torrent::verify_existing_file() {
std::atomic_uint32_t num_pieces = 0;
std::mutex mutex;
const bool use_threads = m_config.get(BoolSetting::PIECE_VERIFY_THREADS);
if (is_single_file()) {
logger()->info("Verifying existing file: {}", m_tmpfile);
const Timer timer(fmt::format("verifying existing file ({}using threads)",
use_threads ? "" : "not "));
const auto file_length = filesystem::file_size(m_tmpfile);

const auto verify_piece = [&file_length, &num_pieces, &mutex,
this // Be careful what is used from this. It
// needs to be thread safe.
](const Sha1& sha1) {
const auto id = zit::numeric_cast<uint32_t>(&sha1 - m_pieces.data());
const auto offset = id * m_piece_length;
ifstream is{m_tmpfile, ios::in | ios::binary};
is.exceptions(ifstream::failbit | ifstream::badbit);
is.seekg(offset);
const auto tail = zit::numeric_cast<uint32_t>(file_length - offset);
const auto len = std::min(m_piece_length, tail);
bytes data(len);
is.read(reinterpret_cast<char*>(data.data()), len);
const auto fsha1 = Sha1::calculateData(data);
if (sha1 == fsha1) {
// Lock when updating num_pieces, inserting into
// std::map is likely not thread safe.
const std::lock_guard<std::mutex> lock(mutex);
m_client_pieces[id] = true;
m_active_pieces.emplace(
id, make_shared<Piece>(PieceId(id), PieceSize(piece_length(id))));
m_active_pieces[id]->set_piece_written(true);
++num_pieces;
} else {
logger()->trace("Piece {} does not match ({}!={})", id, sha1, fsha1);
}
};

const auto verify = [this, &use_threads](auto&& verify_piece) {
// Verify each piece in parallel to speed it up
if (use_threads) {
std::for_each(std::execution::par_unseq, m_pieces.begin(),
Expand All @@ -289,62 +330,34 @@ void Torrent::verify_existing_file() {
std::for_each(std::execution::unseq, m_pieces.begin(), m_pieces.end(),
verify_piece);
}
} else {
logger()->info("Verifying existing files in: {}", m_tmpfile);
const auto global_len = length();

const auto verify_piece = [&num_pieces, &mutex, &global_len,
this // Be careful what is used from this. It
// needs to be thread safe.
](const Sha1& sha1) {
const auto id = zit::numeric_cast<uint32_t>(&sha1 - m_pieces.data());
const auto pos = id * m_piece_length;
// The piece might be spread over more than one file
bytes data(m_piece_length, 0_b);
auto remaining = numeric_cast<int64_t>(m_piece_length);
int64_t gpos = pos; // global pos
int64_t ppos = 0; // piece pos
while (remaining > 0 && gpos < global_len) {
const auto& [fi, offset, left] = file_at_pos(gpos);
const auto file = name() / fi.path();
if (!filesystem::exists(file)) {
return;
}
ifstream is{name() / fi.path(), ios::in | ios::binary};
is.exceptions(ifstream::failbit | ifstream::badbit);
is.seekg(offset);
const auto len = std::min(left, remaining);
is.read(reinterpret_cast<char*>(std::next(
data.data(), numeric_cast<std::ptrdiff_t>(ppos))),
len);
gpos += len;
ppos += len;
remaining -= len;
}
// Since last piece might not be filled
data.resize(data.size() - numeric_cast<size_t>(remaining));
const auto fsha1 = Sha1::calculateData(data);
if (sha1 == fsha1) {
// Lock when updating num_pieces, inserting into std::map is
// likely not thread safe.
const std::lock_guard<std::mutex> lock(mutex);
m_client_pieces[id] = true;
m_active_pieces.emplace(
id, make_shared<Piece>(PieceId(id), PieceSize(piece_length(id))));
m_active_pieces[id]->set_piece_written(true);
++num_pieces;
} else {
logger()->trace("Piece {} does not match ({}!={})", id, sha1, fsha1);
}
};
};

{
const Timer timer(
fmt::format("verifying existing file(s) ({}using threads)",
use_threads ? "" : "not "));

if (is_single_file()) {
logger()->info("Verifying existing file: {}", m_tmpfile);
const auto file_length = filesystem::file_size(m_tmpfile);

verify([&file_length, &num_pieces, &mutex,
this // Be careful what is used from this. It
// needs to be thread safe.
](const Sha1& sha1) {
verify_piece_single_file(file_length, num_pieces, mutex, sha1);
});

// Verify each piece in parallel to speed it up
if (use_threads) {
std::for_each(std::execution::par_unseq, m_pieces.begin(),
m_pieces.end(), verify_piece);
} else {
std::for_each(std::execution::unseq, m_pieces.begin(), m_pieces.end(),
verify_piece);
logger()->info("Verifying existing files in: {}", m_tmpfile);
const auto global_len = length();

verify([&num_pieces, &mutex, &global_len,
this // Be careful what is used from this. It
// needs to be thread safe.
](const Sha1& sha1) {
verify_piece_multi_file(num_pieces, mutex, global_len, sha1);
});
}
}
logger()->info("Verification done. {}/{} pieces done.", num_pieces,
Expand Down
16 changes: 16 additions & 0 deletions src/torrent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,22 @@ class Torrent {
void read_peers_binary_form(const bencode::Element& peers_dict,
std::vector<std::shared_ptr<Peer>>& peers);

/**
* Helper for `verify_existing_file` for single file pieces.
*/
void verify_piece_single_file(uintmax_t file_length,
std::atomic_uint32_t& num_pieces,
std::mutex& mutex,
const Sha1& sha1);

/**
* Helper for `verify_existing_file` for single multi file pieces.
*/
void verify_piece_multi_file(std::atomic_uint32_t& num_pieces,
std::mutex& mutex,
int64_t global_len,
const Sha1& sha1);

/**
* If a piece have requests but have not received data in a while we mark the
* blocks as non requested to be tried again.
Expand Down

0 comments on commit 517d056

Please sign in to comment.