Skip to content

Commit

Permalink
dedup_tool: fixed mistaken exception handling and defined debug out m…
Browse files Browse the repository at this point in the history
…acro
  • Loading branch information
jyha200 committed Feb 24, 2022
1 parent c803524 commit bc239de
Showing 1 changed file with 84 additions and 84 deletions.
168 changes: 84 additions & 84 deletions src/tools/ceph_dedup_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,8 @@ void ChunkScrub::chunk_scrub_common()
cout << "--done--" << std::endl;
}

#define DEBUG_OUT(x) if(debug==1){std::cout<<x;}else{}

class SampleDedup : public CrawlerThread
{
public:
Expand Down Expand Up @@ -716,7 +718,6 @@ class SampleDedup : public CrawlerThread
static FpStore fp_store;
static std::unordered_set<std::string> flushed_objects;
static std::shared_mutex flushed_lock;
std::list<string> dedupable_objects;
size_t chunk_size;
fp_type_t fp_type;
};
Expand All @@ -739,69 +740,74 @@ SampleDedup::fp_type_t SampleDedup::get_fp_type(string fp_algo) {
}

void SampleDedup::crawl() {
prepare_rados();
ObjectCursor shard_start;
ObjectCursor shard_end;
std::tie(shard_start, shard_end) = get_shard_boundary();
cout << "new iteration thread: " << n <<std::endl;

for (ObjectCursor current_object = shard_start;
current_object < shard_end;) {
std::vector<ObjectItem> objects;
// Get the list of object IDs to deduplicate
std::tie(objects, current_object) = get_objects(
current_object,
shard_end,
100);

// Pick few objects to be processed. Crawling mode decides how many
// objects to pick (sampling ratio). Lower sampling ratio makes crawler
// have lower crawling overhead but find less duplication.
std::set<size_t> sampled_indexes = sample_object(objects.size());
for (size_t index : sampled_indexes) {
ObjectItem target = objects[index];
// Only process dirty objects which are expected not processed yet
if (is_dirty(target)) {
try_object_dedup_and_accumulate_result(target);
try {
if (fp_type == fp_type_t::UNKNOWN) {
throw std::logic_error("unknown fingerprint algorithm");
}
prepare_rados();
ObjectCursor shard_start;
ObjectCursor shard_end;
std::tie(shard_start, shard_end) = get_shard_boundary();
cout << "new iteration thread: " << n <<std::endl;

for (ObjectCursor current_object = shard_start;
current_object < shard_end;) {
std::vector<ObjectItem> objects;
// Get the list of object IDs to deduplicate
std::tie(objects, current_object) = get_objects(
current_object,
shard_end,
100);

// Pick few objects to be processed. Crawling mode decides how many
// objects to pick (sampling ratio). Lower sampling ratio makes crawler
// have lower crawling overhead but find less duplication.
std::set<size_t> sampled_indexes = sample_object(objects.size());
for (size_t index : sampled_indexes) {
ObjectItem target = objects[index];
// Only process dirty objects which are expected not processed yet
if (is_dirty(target)) {
try_object_dedup_and_accumulate_result(target);
}
}
}
}

map<std::string,AioCompletion*> set_chunk_completions;
// Do set_chunk to make found duplicable chunks can be evicted by tier_evict()
for (auto& duplicable_chunk : duplicable_chunks) {
auto completion = set_chunk_duplicated(duplicable_chunk);
if (completion != nullptr) {
set_chunk_completions[duplicable_chunk.oid] = completion;
map<std::string,AioCompletion*> set_chunk_completions;
// Do set_chunk to make found duplicable chunks can be evicted by tier_evict()
for (auto& duplicable_chunk : duplicable_chunks) {
auto completion = set_chunk_duplicated(duplicable_chunk);
if (completion != nullptr) {
set_chunk_completions[duplicable_chunk.oid] = completion;
}
}
}

vector<AioCompletion*> evict_completions;
for (auto& oid : oid_for_evict) {
auto completion_iter = set_chunk_completions.find(oid);
// Related set_chunk should be completed before tier_evict because
// tier_evict() only evict data processed by set_chunk() or tier_flush()
if (completion_iter != set_chunk_completions.end()) {
auto completion = completion_iter->second;
vector<AioCompletion*> evict_completions;
for (auto& oid : oid_for_evict) {
auto completion_iter = set_chunk_completions.find(oid);
// Related set_chunk should be completed before tier_evict because
// tier_evict() only evict data processed by set_chunk() or tier_flush()
if (completion_iter != set_chunk_completions.end()) {
auto completion = completion_iter->second;
completion->wait_for_complete();
delete completion;
}
auto completion = do_async_evict(oid);
evict_completions.push_back(completion);
}
for (auto& completion : evict_completions) {
completion->wait_for_complete();
delete completion;
}
auto completion = do_async_evict(oid);
evict_completions.push_back(completion);
}
for (auto& completion : evict_completions) {
completion->wait_for_complete();
delete completion;
} catch (std::exception& e) {
cerr << "exception : " << e.what() << std::endl;
}
cout << "done iteration thread: " << n <<std::endl;
}

AioCompletion* SampleDedup::do_async_evict(string oid) {
ObjectReadOperation op_tier;
AioCompletion* completion = rados.aio_create_completion();
if (debug) {
cout << "evict " << oid << std::endl;
}
DEBUG_OUT("evict " << oid << std::endl);
op_tier.tier_evict();
io_ctx.aio_operate(
oid,
Expand All @@ -816,20 +822,26 @@ bool SampleDedup::is_dirty(ObjectItem& object) {
bool dirty = false;
int r = -1;
op.is_dirty(&dirty, &r);
io_ctx.operate(object.oid, &op, NULL);
int ret = io_ctx.operate(object.oid, &op, NULL);
if (ret < 0) {
cerr << __func__ << " failed ioctx oid: "<< object.oid << strerror(ret)
<< std::endl;
}
if (r < 0) {
cerr << __func__ << " failed in is_dirty() oid: " << object.oid
<< strerror(r) << std::endl;
}
return dirty;
}

void SampleDedup::prepare_rados() {
int ret = rados.init_with_context(g_ceph_context);
if (ret < 0) {
cerr << "couldn't initialize rados: " << cpp_strerror(ret) << std::endl;
throw std::exception();
throw system_error(ret, generic_category(), "couldn't initialize rados");
}
ret = rados.connect();
if (ret) {
cerr << "couldn't connect to cluster: " << cpp_strerror(ret) << std::endl;
throw std::exception();
throw system_error(ret, generic_category(), "couldn't connect to cluster");
}
}

Expand All @@ -853,8 +865,7 @@ std::tuple<std::vector<ObjectItem>, ObjectCursor> SampleDedup::get_objects(
&objects,
&next);
if (ret < 0 ) {
cerr << "error object_list : " << cpp_strerror(ret) << std::endl;
throw std::exception();
throw system_error(ret, generic_category(), "error object_list");
}

return std::make_tuple(objects, next);
Expand Down Expand Up @@ -924,32 +935,26 @@ void SampleDedup::try_object_dedup_and_accumulate_result(ObjectItem& object) {
.fingerprint = fingerprint,
.data = chunk_data
};
if (debug) {
cout << "check " << chunk_info.oid << " fp " << fingerprint << " " <<
chunk_info.start << ", " << chunk_info.size << std::endl;
}

DEBUG_OUT("check " << chunk_info.oid << " fp " << fingerprint << " "
<< chunk_info.start << ", " << chunk_info.size << std::endl);
if (fp_store.find(fingerprint)) {
if (debug) {
cout << "duplication oid " << chunk_info.oid << " " <<
chunk_info.fingerprint << " " << chunk_info.start <<
", " << chunk_info.size << std::endl;
}
DEBUG_OUT("duplication oid " << chunk_info.oid << " "
<< chunk_info.fingerprint << " " << chunk_info.start << ", "
<< chunk_info.size << std::endl);
duplicated_size += chunk_data.length();
}
fp_store.add(chunk_info, duplicable_chunks);
}

size_t object_size = data.length();
if (debug) {
cout << "oid " << object.oid << " object_size " << object_size
<< " dup size " << duplicated_size << std::endl;
}

DEBUG_OUT("oid " << object.oid << " object_size " << object_size
<< " dup size " << duplicated_size << std::endl);
// if the chunks in an object are duplicated higher than object_dedup_threshold,
// try deduplicate whole object via tier_flush
if (check_whole_object_dedupable(duplicated_size, object_size)) {
if (debug) {
cout << "dedup object " << object.oid << std::endl;
}
DEBUG_OUT("dedup object " << object.oid << std::endl);
flush_duplicable_object(object);
}

Expand All @@ -961,16 +966,15 @@ void SampleDedup::try_object_dedup_and_accumulate_result(ObjectItem& object) {
bufferlist SampleDedup::read_object(ObjectItem& object) {
bufferlist whole_data;
size_t offset = 0;
if (debug) {
cout << "read object " << object.oid << std::endl;
}
DEBUG_OUT("read object " << object.oid << std::endl);
int ret = -1;
while (ret != 0) {
bufferlist partial_data;
ret = io_ctx.read(object.oid, partial_data, max_read_size, offset);
if (ret < 0) {
cerr << "read object error " << object.oid << " offset " << offset
<< " size " << max_read_size << std::endl;
<< " size " << max_read_size << " error(" << cpp_strerror(ret)
<< std::endl;
bufferlist empty_buf;
return empty_buf;
}
Expand Down Expand Up @@ -1031,9 +1035,7 @@ bool SampleDedup::check_whole_object_dedupable(
void SampleDedup::flush_duplicable_object(ObjectItem& object) {
ObjectReadOperation op;
op.tier_flush();
if (debug) {
cout << "try flush " << object.oid << " " << &flushed_objects<<std::endl;
}
DEBUG_OUT("try flush " << object.oid << " " << &flushed_objects << std::endl);
{
std::unique_lock lock(flushed_lock);
flushed_objects.insert(object.oid);
Expand All @@ -1060,10 +1062,8 @@ AioCompletion* SampleDedup::set_chunk_duplicated(chunk_t& chunk) {
return nullptr;
}
}
if (debug) {
cout << "set chunk " << chunk.oid << " fp " << chunk.fingerprint
<< std::endl;
}
DEBUG_OUT("set chunk " << chunk.oid << " fp " << chunk.fingerprint
<< std::endl);

uint64_t size;
time_t mtime;
Expand Down Expand Up @@ -1783,7 +1783,7 @@ int make_crawling_daemon(const map<string, string> &opts,
iterative = true;
}
string base_pool_name;
auto i = opts.find("pool");
i = opts.find("pool");
if (i != opts.end()) {
base_pool_name = i->second.c_str();
} else {
Expand Down

0 comments on commit bc239de

Please sign in to comment.