Skip to content

Commit

Permalink
test/d4n: Working on cleaning method testing
Browse files Browse the repository at this point in the history
Signed-off-by: Samarah <[email protected]>
  • Loading branch information
Samarah committed Nov 21, 2024
1 parent 0ce9d48 commit 748d0a5
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 29 deletions.
48 changes: 24 additions & 24 deletions src/rgw/driver/d4n/d4n_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -632,8 +632,8 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
continue;
}
ll.unlock();
if ((ret = cacheDriver->delete_data(dpp, e->key, y)) == 0) { // Sam: do we want del or delete_data here?
if (!(ret = erase(dpp, e->key, y))) {
if ((ret = cacheDriver->delete_data(dpp, e->key, null_yield)) == 0) { // Sam: do we want del or delete_data here?
if (!(ret = erase(dpp, e->key, null_yield))) {
ldpp_dout(dpp, 0) << "Failed to delete head policy entry for: " << e->key << ", ret=" << ret << dendl; // TODO: what must occur during failure?
}
} else {
Expand All @@ -645,7 +645,7 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
}

if (!e->delete_marker) {
ret = delete_data_blocks(dpp, e, y);
ret = delete_data_blocks(dpp, e, null_yield);
if (ret == 0) {
erase_dirty_object(dpp, e->key, null_yield);
} else if (ret == -EBUSY) {
Expand Down Expand Up @@ -821,14 +821,14 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
std::string oid_in_cache = head_oid_in_cache + CACHE_DELIM + std::to_string(fst) + CACHE_DELIM + std::to_string(cur_len);
ldpp_dout(dpp, 20) << __func__ << "(): oid_in_cache =" << oid_in_cache << dendl;
//Update in-memory data structure for each block
this->update(dpp, oid_in_cache, 0, 0, e->version, false, 0, y);
this->update(dpp, oid_in_cache, 0, 0, e->version, false, 0, null_yield);

rgw::d4n::CacheBlock block;
block.cacheObj.bucketName = c_obj->get_bucket()->get_bucket_id();
block.cacheObj.objName = c_obj->get_key().get_oid();
block.size = cur_len;
block.blockID = fst;
if ((op_ret = cacheDriver->set_attr(dpp, oid_in_cache, RGW_CACHE_ATTR_DIRTY, "0", y)) == 0) {
if ((op_ret = cacheDriver->set_attr(dpp, oid_in_cache, RGW_CACHE_ATTR_DIRTY, "0", null_yield)) == 0) {
op_ret = blockDir->update_field(dpp, &block, "dirty", "false", null_yield);
if (op_ret < 0) {
ldpp_dout(dpp, 0) << __func__ << "updating dirty flag in block directory failed, ret=" << op_ret << dendl;
Expand All @@ -842,9 +842,9 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
} //end-else if delete_marker

//invoke update() with dirty flag set to false, to update in-memory metadata for head
this->update(dpp, head_oid_in_cache, 0, 0, e->version, false, 0, y);
this->update(dpp, head_oid_in_cache, 0, 0, e->version, false, 0, null_yield);

if ((ret = cacheDriver->set_attr(dpp, head_oid_in_cache, RGW_CACHE_ATTR_DIRTY, "0", y)) < 0) {
if ((ret = cacheDriver->set_attr(dpp, head_oid_in_cache, RGW_CACHE_ATTR_DIRTY, "0", null_yield)) < 0) {
ldpp_dout(dpp, 0) << __func__ << "(): Failed to update dirty attr in cache, ret=" << op_ret << dendl;
}

Expand All @@ -861,12 +861,12 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
//non-versioned case
if (!c_obj->have_instance()) {
//add watch on latest entry, as it can be modified by a put or a del
ret = blockDir->watch(dpp, &block, y);
ret = blockDir->watch(dpp, &block, null_yield);
if (ret < 0) {
ldpp_dout(dpp, 0) << __func__ << "(): Failed to add a watch on: " << block.cacheObj.objName << ", ret=" << ret << dendl;
}
// hash entry for latest version
op_ret = blockDir->get(dpp, &block, y);
op_ret = blockDir->get(dpp, &block, null_yield);
if (op_ret < 0) {
ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << block.cacheObj.objName << ", ret=" << ret << dendl;
} else {
Expand All @@ -876,23 +876,23 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
null_block = block;
null_block.cacheObj.objName = "_:null_" + c_obj->get_name();
//hash entry for null block
op_ret = blockDir->get(dpp, &null_block, y);
op_ret = blockDir->get(dpp, &null_block, null_yield);
if (op_ret < 0) {
ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << null_block.cacheObj.objName << ", ret=" << ret << dendl;
} else {
if (null_block.version == e->version) {
block.cacheObj.dirty = false;
null_block.cacheObj.dirty = false;
//start redis transaction using MULTI
blockDir->multi(dpp, y);
auto blk_op_ret = blockDir->set(dpp, &block, y);
auto null_op_ret = blockDir->set(dpp, &null_block, y);
blockDir->multi(dpp, null_yield);
auto blk_op_ret = blockDir->set(dpp, &block, null_yield);
auto null_op_ret = blockDir->set(dpp, &null_block, null_yield);
if (blk_op_ret < 0 || null_op_ret < 0) {
blockDir->discard(dpp, y);
blockDir->discard(dpp, null_yield);
ldpp_dout(dpp, 0) << __func__ << "(): Failed to Queue update dirty flag for latest entry/null entry in block directory" << dendl;
} else {
std::vector<std::string> responses;
ret = blockDir->exec(dpp, responses, y);
ret = blockDir->exec(dpp, responses, null_yield);
if (responses.empty()) {
//transaction failed, which means latest hash entry has been modified by a put/del so ignore and do not update the entries
ldpp_dout(dpp, 0) << __func__ << "(): Execute responses are empty which means transaction failed!" << dendl;
Expand All @@ -909,7 +909,7 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
};
//remove the entry from the ordered set using its score, as the object is already cleaned
//need not be part of a transaction as it is being removed based on its score which is its creation time.
ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y);
ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, null_yield);
if (ret < 0) {
ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl;
}
Expand All @@ -932,26 +932,26 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
rgw::d4n::CacheBlock latest_block = block;
latest_block.cacheObj.objName = c_obj->get_name();
//add watch on latest entry, as it can be modified by a put or a del
ret = blockDir->watch(dpp, &latest_block, y);
ret = blockDir->watch(dpp, &latest_block, null_yield);
if (ret < 0) {
ldpp_dout(dpp, 0) << __func__ << "(): Failed to add a watch on: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
}
int retry = 3;
while(retry) {
retry--;
//get latest entry
ret = blockDir->get(dpp, &latest_block, y);
ret = blockDir->get(dpp, &latest_block, null_yield);
if (ret < 0) {
ldpp_dout(dpp, 0) << __func__ << "(): Failed to get latest entry in block directory for: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
}
//start redis transaction using MULTI
blockDir->multi(dpp, y);
blockDir->multi(dpp, null_yield);
if (latest_block.version == e->version) {
//remove object entry from ordered set
if (c_obj->have_instance()) {
blockDir->del(dpp, &latest_block, y, true);
blockDir->del(dpp, &latest_block, null_yield, true);
if (ret < 0) {
blockDir->discard(dpp, y);
blockDir->discard(dpp, null_yield);
ldpp_dout(dpp, 0) << __func__ << "(): Failed to queue del for latest hash entry: " << latest_block.cacheObj.objName << ", ret=" << ret << dendl;
continue;
}
Expand All @@ -962,14 +962,14 @@ void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
.objName = c_obj->get_name(),
.bucketName = c_obj->get_bucket()->get_bucket_id(),
};
ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, y, true);
ret = objDir->zremrangebyscore(dpp, &dir_obj, e->creationTime, e->creationTime, null_yield, true);
if (ret < 0) {
blockDir->discard(dpp, y);
blockDir->discard(dpp, null_yield);
ldpp_dout(dpp, 0) << __func__ << "(): Failed to remove object from ordered set with error: " << ret << dendl;
continue;
}
std::vector<std::string> responses;
ret = blockDir->exec(dpp, responses, y);
ret = blockDir->exec(dpp, responses, null_yield);
if (responses.empty()) {
ldpp_dout(dpp, 0) << __func__ << "(): Execute responses are empty hence continuing!" << dendl;
continue;
Expand Down
21 changes: 16 additions & 5 deletions src/test/rgw/test_d4n_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ TEST_F(D4NFilterFixture, GetObjectRead)
testFile.close();
conn->cancel();
testBucket->remove(env->dpp, true, optional_yield{yield});
DriverDestructor driver_destructor(static_cast<rgw::sal::D4NFilterDriver*>(driver));
DriverDestructor driver_destructor(static_cast<rgw::sal::D4NFilterDriver*>(driver)); // TODO: use d4nFilter?
}, rethrow);

io.run();
Expand Down Expand Up @@ -1403,15 +1403,14 @@ TEST_F(D4NFilterFixture, PutObjectWrite)
getline(testFile, testData);
EXPECT_EQ(testData, "test data");

std::this_thread::sleep_for(std::chrono::seconds(2)); // Wait for cache cleaning cycle to pass
std::this_thread::sleep_for(std::chrono::seconds(10)); // Wait for cache cleaning cycle to pass

{
boost::system::error_code ec;
request req;
req.push("HGET", bucketName + "_" + TEST_OBJ + testName + "_0_0", "dirty");
//req.push("FLUSHALL");

response< std::string/*, boost::redis::ignore_t*/ > resp;
response< std::string > resp;

conn->async_exec(req, resp, yield[ec]);

Expand All @@ -1423,12 +1422,24 @@ TEST_F(D4NFilterFixture, PutObjectWrite)
EXPECT_EQ(d4nFilter->get_cache_driver()->get_attr(env->dpp, location, RGW_CACHE_ATTR_DIRTY, attr_val, optional_yield({yield})), 0);
EXPECT_EQ(attr_val, "0");

/*{
boost::system::error_code ec;
request req;
req.push("FLUSHALL");
response< boost::redis::ignore_t > resp;
conn->async_exec(req, resp, yield[ec]);
ASSERT_EQ((bool)ec, false);
}*/

conn->cancel();
testBucket->remove(env->dpp, true, optional_yield{yield});
DriverDestructor driver_destructor(static_cast<rgw::sal::D4NFilterDriver*>(driver));
}, rethrow);

io.run();
DriverDestructor driver_destructor(static_cast<rgw::sal::D4NFilterDriver*>(driver));
}

#if 0
Expand Down

0 comments on commit 748d0a5

Please sign in to comment.