From c344821deae68723016e5d34c89cfcde9373217f Mon Sep 17 00:00:00 2001 From: JinyongHa Date: Mon, 21 Feb 2022 01:55:34 +0000 Subject: [PATCH] dedup-tool: add basic crawling Create crawling threads which crawl objects in base pool and deduplicate based on their deduplication efficiency. Crawler samples objects and finds duplicated chunks within the samples. It regards an object which has duplicated chunks higher than object_dedup_threshold value as an efficient object to be deduplicated. Besides the chunk which is duplicated more than chunk_dedup_threshold times is also deduplicated. The commit contains basic crawling which crawls all objects in base pool instead of sampling among the objects. [usage] ceph_dedup_tool --op sample-dedup --pool POOL --chunk-pool POOL \ --fingerprint-algorithm FP --object-dedup-threshold \ --chunk-dedup-threshold --- src/tools/ceph_dedup_tool.cc | 787 +++++++++++++++++++++++++++++++++-- 1 file changed, 746 insertions(+), 41 deletions(-) diff --git a/src/tools/ceph_dedup_tool.cc b/src/tools/ceph_dedup_tool.cc index 4b8c926f75ea55..8ac95adb915ebc 100644 --- a/src/tools/ceph_dedup_tool.cc +++ b/src/tools/ceph_dedup_tool.cc @@ -144,6 +144,7 @@ void usage() " [--op dump-chunk-refs --chunk-pool POOL --object OID] \n" " [--op chunk-dedup --pool POOL --object OID --chunk-pool POOL --fingerprint-algorithm FP --source-off OFFSET --source-length LENGTH] \n" " [--op object-dedup --pool POOL --object OID --chunk-pool POOL --fingerprint-algorithm FP --dedup-cdc-chunk-size CHUNK_SIZE] \n" +" [--op sample-dedup --pool POOL --chunk-pool POOL --fingerprint-algorithm FP]\n" << std::endl; cout << "optional arguments: " << std::endl; cout << " --object " << std::endl; @@ -155,9 +156,13 @@ void usage() cout << " --report-period " << std::endl; cout << " --max-seconds " << std::endl; cout << " --max-read-size " << std::endl; + cout << " --object-dedup-threshold " << std::endl; + cout << " --chunk-dedup-threshold " << std::endl; cout << "explanations: " << std::endl; cout << " chunk-dedup performs deduplication using a chunk generated by given source" << std::endl; cout << " offset and length. object-dedup deduplicates the entire object, not a chunk" << std::endl; + cout << " sample-dedup makes crawling threads which crawl objects in base pool and" << std::endl; + cout << "deduplicate them based on their deduplcation efficiency" << std::endl; exit(1); } @@ -173,6 +178,57 @@ static int rados_sistrtoll(I &i, T *val) { } } +static int make_manifest_object_and_flush( + std::string& oid, + IoCtx& io_ctx, + IoCtx& chunk_io_ctx) { + // try to make manifest object + ObjectWriteOperation op; + bufferlist temp; + temp.append("temp"); + op.write_full(temp); + + auto gen_r_num = [] () -> string { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution dist; + uint64_t r_num = dist(gen); + return to_string(r_num); + }; + string temp_oid = gen_r_num(); + // create temp chunk object for set-chunk + int ret = chunk_io_ctx.operate(temp_oid, &op); + if (ret == -EEXIST) { + // one more try + temp_oid = gen_r_num(); + ret = chunk_io_ctx.operate(temp_oid, &op); + } + if (ret < 0) { + cerr << " operate fail : " << cpp_strerror(ret) << std::endl; + return ret; + } + + // set-chunk to make manifest object + ObjectReadOperation chunk_op; + chunk_op.set_chunk(0, 4, chunk_io_ctx, temp_oid, 0, + CEPH_OSD_OP_FLAG_WITH_REFERENCE); + ret = io_ctx.operate(oid, &chunk_op, NULL); + if (ret < 0) { + cerr << " set_chunk fail : " << cpp_strerror(ret) << std::endl; + return ret; + } + + // tier-flush to perform deduplication + ObjectReadOperation flush_op; + flush_op.tier_flush(); + ret = io_ctx.operate(oid, &flush_op, NULL); + if (ret < 0) { + cerr << " tier_flush fail : " << cpp_strerror(ret) << std::endl; + return ret; + } + return ret; +} + class EstimateDedupRatio; class ChunkScrub; class CrawlerThread : public Thread @@ -214,6 +270,7 @@ class CrawlerThread : public Thread void set_debug(const bool debug_) { debug = debug_; } friend class EstimateDedupRatio; friend class ChunkScrub; + friend class SampleDedup; }; class EstimateDedupRatio : public CrawlerThread @@ -502,6 +559,504 @@ void ChunkScrub::chunk_scrub_common() cout << "--done--" << std::endl; } +class SampleDedup : public CrawlerThread +{ +public: + SampleDedup( + IoCtx& io_ctx, + IoCtx& chunk_io_ctx, + int n, + int m, + ObjectCursor& begin, + ObjectCursor end, + int32_t report_period, + uint64_t num_objects, + uint32_t object_dedup_threshold, + uint32_t chunk_dedup_threshold, + size_t chunk_size, + std::string& fp_algo) : + CrawlerThread(io_ctx, n, m, begin, end, report_period, num_objects), + chunk_io_ctx(chunk_io_ctx), + chunk_dedup_threshold(chunk_dedup_threshold), + object_dedup_threshold(object_dedup_threshold), + chunk_size(chunk_size), + fp_type(get_fp_type(fp_algo)) { } + + static void init(size_t chunk_dedup_threshold) { + fp_store.init(chunk_dedup_threshold); + std::unique_lock lock(flushed_lock); + flushed_objects.clear(); + } + + ~SampleDedup() { }; + +protected: + void* entry() override { + crawl(); + return NULL; + } + +private: + struct chunk_t { + string oid = ""; + size_t start = 0; + size_t size = 0; + string fingerprint = ""; + bufferlist data; + }; + + void crawl(); + void prepare_rados(); + std::tuple get_shard_boundary(); + std::tuple, ObjectCursor> get_objects( + ObjectCursor current, + ObjectCursor end, + size_t max_object_count); + std::set sample_object(size_t count); + void try_object_dedup_and_accumulate_result(ObjectItem& object); + bool ok_to_dedup_all(); + void flush_duplicable_object(ObjectItem& object); + AioCompletion* set_chunk_duplicated(chunk_t& chunk); + void mark_non_dedup(ObjectCursor start, ObjectCursor end); + bufferlist read_object(ObjectItem& object); + std::vector>> do_cdc( + ObjectItem& object, + bufferlist& data); + std::string generate_fingerprint(bufferlist chunk_data); + bool check_whole_object_dedupable(size_t dedup_size, size_t total_size); + bool is_dirty(ObjectItem& object); + AioCompletion* do_async_evict(string oid); + + enum class fp_type_t { + SHA1, + SHA256, + SHA512, + UNKNOWN, + }; + + fp_type_t get_fp_type(string fp_algo); + + Rados rados; + IoCtx chunk_io_ctx; + std::list duplicable_chunks; + size_t total_duplicated_size = 0; + size_t total_object_size = 0; + size_t chunk_dedup_threshold; + size_t object_dedup_threshold; + + class FpStore { + public: + struct fp_store_entry_t { + size_t duplication_count = 1; + std::list found_chunks; + bool processed = false; + }; + + bool find(string& fp) { + std::shared_lock lock(fingerprint_lock); + auto found_item = fp_map.find(fp); + if (found_item != fp_map.end()) { + return true; + } + return false; + } + + void add(chunk_t& chunk, std::list& duplicable_chunks) { + std::unique_lock lock(fingerprint_lock); + auto found_iter = fp_map.find(chunk.fingerprint); + if (found_iter != fp_map.end()) { + auto& target = found_iter->second; + target.duplication_count++; + target.found_chunks.push_back(chunk); + if (target.duplication_count >= dedup_threshold) { + if (target.processed == false) { + target.processed = true; + // When a fingerprint firstly detected to be duplicated more than + // threshold, add all previously found chunks to duplicable_chunks + duplicable_chunks.splice(target.found_chunks.begin(), target.found_chunks); + } else { + duplicable_chunks.push_back(chunk); + } + } + } else { + fp_store_entry_t fp_entry; + fp_entry.found_chunks.push_back(chunk); + fp_map.insert({chunk.fingerprint, fp_entry}); + } + } + + void init(size_t dedup_threshold_) { + std::unique_lock lock(fingerprint_lock); + fp_map.clear(); + dedup_threshold = dedup_threshold_; + } + + private: + size_t dedup_threshold = -1; + std::unordered_map fp_map; + std::shared_mutex fingerprint_lock; + }; + + std::set oid_for_evict; + static FpStore fp_store; + static std::unordered_set flushed_objects; + static std::shared_mutex flushed_lock; + std::list dedupable_objects; + size_t chunk_size; + fp_type_t fp_type; +}; + +SampleDedup::FpStore SampleDedup::fp_store; +std::unordered_set SampleDedup::flushed_objects; +std::shared_mutex SampleDedup::flushed_lock; + +SampleDedup::fp_type_t SampleDedup::get_fp_type(string fp_algo) { + if (fp_algo == "sha1") { + return fp_type_t::SHA1; + } else if (fp_algo == "sha256") { + return fp_type_t::SHA256; + } else if (fp_algo == "sha512") { + return fp_type_t::SHA512; + } else { + return fp_type_t::UNKNOWN; + } +} + +void SampleDedup::crawl() { + prepare_rados(); + ObjectCursor shard_start; + ObjectCursor shard_end; + std::tie(shard_start, shard_end) = get_shard_boundary(); + cout << "new iteration thread: " << n < objects; + // Get the list of object IDs to deduplicate + std::tie(objects, current_object) = get_objects( + current_object, + shard_end, + 100); + + // Pick few objects to be processed. Crawling mode decides how many objects + // to pick (sampling ratio). Lower sampling ratio makes crawler to has lower + // crawling overhead but finds less duplication. + std::set sampled_indexes = sample_object(objects.size()); + for (size_t index : sampled_indexes) { + ObjectItem target = objects[index]; + // Only process dirty objects which are expected not processed yet + if (is_dirty(target)) { + try_object_dedup_and_accumulate_result(target); + } + } + } + + map set_chunk_completions; + // Do set_chunk to make found duplicable chunks can be evicted by tier_evict() + for (auto& duplicable_chunk : duplicable_chunks) { + auto completion = set_chunk_duplicated(duplicable_chunk); + if (completion != nullptr) { + set_chunk_completions[duplicable_chunk.oid] = completion; + } + } + + vector evict_completions; + for (auto& oid : oid_for_evict) { + auto completion_iter = set_chunk_completions.find(oid); + // Related set_chunk should be completed before tier_evict because + // tier_evict() only evict data processed by set_chunk() or tier_flush() + if (completion_iter != set_chunk_completions.end()) { + auto completion = completion_iter->second; + completion->wait_for_complete(); + delete completion; + } + auto completion = do_async_evict(oid); + evict_completions.push_back(completion); + } + for (auto& completion : evict_completions) { + completion->wait_for_complete(); + delete completion; + } + cout << "done iteration thread: " << n < SampleDedup::get_shard_boundary() { + ObjectCursor shard_start; + ObjectCursor shard_end; + io_ctx.object_list_slice(begin, end, n, m, &shard_start, &shard_end); + + return std::make_tuple(shard_start, shard_end); +} + +std::tuple, ObjectCursor> SampleDedup::get_objects( + ObjectCursor current, ObjectCursor end, size_t max_object_count) { + std::vector objects; + ObjectCursor next; + int ret = io_ctx.object_list( + current, + end, + max_object_count, + {}, + &objects, + &next); + if (ret < 0 ) { + cerr << "error object_list : " << cpp_strerror(ret) << std::endl; + throw std::exception(); + } + + return std::make_tuple(objects, next); +} + +std::set SampleDedup::sample_object(size_t count) { + std::set indexes; + for (size_t index = 0 ; index < count ; index++) { + indexes.insert(index); + } + return indexes; +} + +void SampleDedup::try_object_dedup_and_accumulate_result(ObjectItem& object) { + bufferlist data = read_object(object); + if (data.length() == 0) { + cerr << __func__ << " skip object " << object.oid + << " dedup (read failed)\n"; + return; + } + auto chunks = do_cdc(object, data); + size_t chunk_total_amount = 0; + + // First, check total size of created chunks + for (auto& chunk : chunks) { + auto& chunk_data = std::get<0>(chunk); + chunk_total_amount += chunk_data.length(); + } + if (chunk_total_amount != data.length()) { + cerr << __func__ << " sum of chunked length(" << chunk_total_amount + << ") is different from object data length(" << data.length() << ")\n"; + return; + } + + size_t duplicated_size = 0; + for (auto& chunk : chunks) { + auto& chunk_data = std::get<0>(chunk); + std::string fingerprint = generate_fingerprint(chunk_data); + std::pair chunk_boundary = std::get<1>(chunk); + chunk_t chunk_info = { + .oid = object.oid, + .start = chunk_boundary.first, + .size = chunk_boundary.second, + .fingerprint = fingerprint, + .data = chunk_data + }; + if (debug) { + cout << "check " << chunk_info.oid << " fp " << fingerprint << " " << + chunk_info.start << ", " << chunk_info.size << std::endl; + } + if (fp_store.find(fingerprint)) { + if (debug) { + cout << "duplication oid " << chunk_info.oid << " " << + chunk_info.fingerprint << " " << chunk_info.start << + ", " << chunk_info.size << std::endl; + } + duplicated_size += chunk_data.length(); + } + fp_store.add(chunk_info, duplicable_chunks); + } + + size_t object_size = data.length(); + if (debug) { + cout << "oid " << object.oid << " object_size " << object_size + << " dup size " << duplicated_size << std::endl; + } + // if the chunks in an object are duplicated higher than object_dedup_threshold, + // try deduplicate whole object via tier_flush + if (check_whole_object_dedupable(duplicated_size, object_size)) { + if (debug) { + cout << "dedup object " << object.oid << std::endl; + } + flush_duplicable_object(object); + } + + total_duplicated_size += duplicated_size; + total_object_size += object_size; + return; +} + +bufferlist SampleDedup::read_object(ObjectItem& object) { + bufferlist whole_data; + size_t offset = 0; + if (debug) { + cout << "read object " << object.oid << std::endl; + } + int ret = -1; + while (ret != 0) { + bufferlist partial_data; + ret = io_ctx.read(object.oid, partial_data, max_read_size, offset); + if (ret < 0) { + cerr << "read object error " << object.oid << " offset " << offset + << " size " << max_read_size << std::endl; + bufferlist empty_buf; + return empty_buf; + } + offset += ret; + whole_data.claim_append(partial_data); + } + return whole_data; +} + +std::vector>> SampleDedup::do_cdc( + ObjectItem& object, + bufferlist& data) { + std::vector>> ret; + + unique_ptr cdc = CDC::create("fastcdc", cbits(chunk_size) - 1); + vector> chunks; + cdc->calc_chunks(data, &chunks); + for (auto& p : chunks) { + bufferlist chunk; + chunk.substr_of(data, p.first, p.second); + ret.push_back(make_tuple(chunk, p)); + } + + return ret; +} + +std::string SampleDedup::generate_fingerprint(bufferlist chunk_data) { + string ret; + switch (fp_type) { + case fp_type_t::SHA1: + ret = crypto::digest(chunk_data).to_str(); + break; + + case fp_type_t::SHA256: + ret = crypto::digest(chunk_data).to_str(); + break; + + case fp_type_t::SHA512: + ret = crypto::digest(chunk_data).to_str(); + break; + default: + throw logic_error("Invalid fp type"); + break; + } + return ret; +} + +bool SampleDedup::check_whole_object_dedupable( + size_t dedup_size, + size_t total_size) { + if (total_size > 0) { + double dedup_ratio = dedup_size *100 / total_size; + return dedup_ratio >= object_dedup_threshold; + } + return false; +} + +void SampleDedup::flush_duplicable_object(ObjectItem& object) { + ObjectReadOperation op; + op.tier_flush(); + if (debug) { + cout << "try flush " << object.oid << " " << &flushed_objects< &opts, * the object a manifest object, the tier_flush() will remove * it and replace it with the real contents. */ - // convert object to manifest object - ObjectWriteOperation op; - bufferlist temp; - temp.append("temp"); - op.write_full(temp); - - auto gen_r_num = [] () -> string { - std::random_device rd; - std::mt19937 gen(rd()); - std::uniform_int_distribution dist; - uint64_t r_num = dist(gen); - return to_string(r_num); - }; - string temp_oid = gen_r_num(); - // create temp chunk object for set-chunk - ret = chunk_io_ctx.operate(temp_oid, &op); - if (ret == -EEXIST) { - // one more try - temp_oid = gen_r_num(); - ret = chunk_io_ctx.operate(temp_oid, &op); - } + ret = make_manifest_object_and_flush(object_name, io_ctx, chunk_io_ctx); if (ret < 0) { - cerr << " operate fail : " << cpp_strerror(ret) << std::endl; - goto out; - } - - // set-chunk to make manifest object - ObjectReadOperation chunk_op; - chunk_op.set_chunk(0, 4, chunk_io_ctx, temp_oid, 0, - CEPH_OSD_OP_FLAG_WITH_REFERENCE); - ret = io_ctx.operate(object_name, &chunk_op, NULL); - if (ret < 0) { - cerr << " set_chunk fail : " << cpp_strerror(ret) << std::endl; - goto out; - } - - // tier-flush to perform deduplication - ObjectReadOperation flush_op; - flush_op.tier_flush(); - ret = io_ctx.operate(object_name, &flush_op, NULL); - if (ret < 0) { - cerr << " tier_flush fail : " << cpp_strerror(ret) << std::endl; + cerr << __func__ << " failed\n"; goto out; } @@ -1203,6 +1719,189 @@ int make_dedup_object(const std::map < std::string, std::string > &opts, return (ret < 0) ? 1 : 0; } +int make_crawling_daemon(const map &opts, + vector &nargs) { + string base_pool_name; + auto i = opts.find("pool"); + if (i != opts.end()) { + base_pool_name = i->second.c_str(); + } else { + cerr << "must specify --pool" << std::endl; + return -EINVAL; + } + + string chunk_pool_name; + i = opts.find("chunk-pool"); + if (i != opts.end()) { + chunk_pool_name = i->second.c_str(); + } else { + cerr << "must specify --chunk-pool" << std::endl; + return -EINVAL; + } + + unsigned max_thread = default_max_thread; + i = opts.find("max-thread"); + if (i != opts.end()) { + if (rados_sistrtoll(i, &max_thread)) { + return -EINVAL; + } + } + + uint32_t report_period = default_report_period; + i = opts.find("report-period"); + if (i != opts.end()) { + if (rados_sistrtoll(i, &report_period)) { + return -EINVAL; + } + } + + uint32_t object_dedup_threshold = 50; + i = opts.find("object-dedup-threshold"); + if (i != opts.end()) { + if (rados_sistrtoll(i, &object_dedup_threshold)) { + return -EINVAL; + } + } + + size_t chunk_size = 8192; + i = opts.find("chunk-size"); + if (i != opts.end()) { + if (rados_sistrtoll(i, &chunk_size)) { + return -EINVAL; + } + } + + uint32_t chunk_dedup_threshold = 2; + i = opts.find("chunk-dedup-threshold"); + if (i != opts.end()) { + if (rados_sistrtoll(i, &chunk_dedup_threshold)) { + return -EINVAL; + } + } + + Rados rados; + int ret = rados.init_with_context(g_ceph_context); + if (ret < 0) { + cerr << "couldn't initialize rados: " << cpp_strerror(ret) << std::endl; + return -EINVAL; + } + ret = rados.connect(); + if (ret) { + cerr << "couldn't connect to cluster: " << cpp_strerror(ret) << std::endl; + return -EINVAL; + } + + std::string fp_algo; + i = opts.find("fingerprint-algorithm"); + if (i != opts.end()) { + fp_algo = i->second.c_str(); + if (fp_algo != "sha1" + && fp_algo != "sha256" && fp_algo != "sha512") { + cerr << "unrecognized fingerprint-algorithm " << fp_algo << std::endl; + exit(1); + } + } + + list pool_names; + IoCtx io_ctx, chunk_io_ctx; + pool_names.push_back(base_pool_name); + ret = rados.ioctx_create(base_pool_name.c_str(), io_ctx); + if (ret < 0) { + cerr << "error opening base pool " + << base_pool_name << ": " + << cpp_strerror(ret) << std::endl; + return -EINVAL; + } + + ret = rados.ioctx_create(chunk_pool_name.c_str(), chunk_io_ctx); + if (ret < 0) { + cerr << "error opening chunk pool " + << chunk_pool_name << ": " + << cpp_strerror(ret) << std::endl; + return -EINVAL; + } + bufferlist inbl; + ret = rados.mon_command( + make_pool_str(base_pool_name, "fingerprint_algorithm", fp_algo), + inbl, NULL, NULL); + if (ret < 0) { + cerr << " operate fail : " << cpp_strerror(ret) << std::endl; + return ret; + } + ret = rados.mon_command( + make_pool_str(base_pool_name, "dedup_chunk_algorithm", "fastcdc"), + inbl, NULL, NULL); + if (ret < 0) { + cerr << " operate fail : " << cpp_strerror(ret) << std::endl; + return ret; + } + ret = rados.mon_command( + make_pool_str(base_pool_name, "dedup_cdc_chunk_size", chunk_size), + inbl, NULL, NULL); + if (ret < 0) { + cerr << " operate fail : " << cpp_strerror(ret) << std::endl; + return ret; + } + + cout << "Object Dedup Threshold : " << object_dedup_threshold << std::endl + << "Chunk Dedup Threshold : " << chunk_dedup_threshold << std::endl + << "Chunk Size : " << chunk_size << std::endl + << std::endl; + + while (true) { + lock_guard lock(glock); + ObjectCursor begin = io_ctx.object_list_begin(); + ObjectCursor end = io_ctx.object_list_end(); + map stats; + ret = rados.get_pool_stats(pool_names, stats); + if (ret < 0) { + cerr << "error fetching pool stats: " << cpp_strerror(ret) << std::endl; + return -EINVAL; + } + if (stats.find(base_pool_name) == stats.end()) { + cerr << "stats can not find pool name: " << base_pool_name << std::endl; + return -EINVAL; + } + librados::pool_stat_t s = stats[base_pool_name]; + + bool debug = false; + i = opts.find("debug"); + if (i != opts.end()) { + debug = true; + } + + estimate_threads.clear(); + SampleDedup::init(chunk_dedup_threshold); + for (unsigned i = 0; i < max_thread; i++) { + cout << " add thread.. " << std::endl; + unique_ptr ptr ( + new SampleDedup( + io_ctx, + chunk_io_ctx, + i, + max_thread, + begin, + end, + report_period, + s.num_objects, + object_dedup_threshold, + chunk_dedup_threshold, + chunk_size, + fp_algo)); + ptr->set_debug(debug); + ptr->create("sample_dedup"); + estimate_threads.push_back(move(ptr)); + } + + for (auto &p : estimate_threads) { + p->join(); + } + break; + } + + return 0; +} + int main(int argc, const char **argv) { auto args = argv_to_vec(argc, argv); @@ -1269,6 +1968,10 @@ int main(int argc, const char **argv) opts["source-length"] = val; } else if (ceph_argparse_witharg(args, i, &val, "--dedup-cdc-chunk-size", (char*)NULL)) { opts["dedup-cdc-chunk-size"] = val; + } else if (ceph_argparse_witharg(args, i, &val, "--object-dedup-threshold", (char*)NULL)) { + opts["object-dedup-threshold"] = val; + } else if (ceph_argparse_witharg(args, i, &val, "--chunk-dedup-threshold", (char*)NULL)) { + opts["chunk-dedup-threshold"] = val; } else if (ceph_argparse_flag(args, i, "--debug", (char*)NULL)) { opts["debug"] = "true"; } else { @@ -1301,6 +2004,8 @@ int main(int argc, const char **argv) * */ return make_dedup_object(opts, args); + } else if (op_name == "sample-dedup") { + return make_crawling_daemon(opts, args); } else { cerr << "unrecognized op " << op_name << std::endl; exit(1);