Skip to content

Commit

Permalink
Merge pull request #53265 from yuvalif/wip-yuval-fix-62449
Browse files Browse the repository at this point in the history
test/cls_2pc_queue: fix race condition with producers

Reviewed-by: ljflores mattbenjamin
  • Loading branch information
yuvalif authored Sep 7, 2023
2 parents 200b4ab + 2b21513 commit 7077a7f
Showing 1 changed file with 24 additions and 20 deletions.
44 changes: 24 additions & 20 deletions src/test/cls_2pc_queue/test_cls_2pc_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ TEST_F(TestCls2PCQueue, MultiReserve)
for (auto& p : producers) {
p = std::thread([this, &queue_name] {
librados::ObjectWriteOperation op;
for (auto i = 0U; i < number_of_ops; ++i) {
for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, 0);
Expand Down Expand Up @@ -373,7 +373,7 @@ TEST_F(TestCls2PCQueue, MultiCommit)
for (auto& p : producers) {
p = std::thread([this, &queue_name] {
librados::ObjectWriteOperation op;
for (auto i = 0U; i < number_of_ops; ++i) {
for (auto i = 0U; i < number_of_ops; ++i) {
const std::string element_prefix("op-" +to_string(i) + "-element-");
std::vector<bufferlist> data(number_of_elements);
auto total_size = 0UL;
Expand Down Expand Up @@ -417,7 +417,7 @@ TEST_F(TestCls2PCQueue, MultiAbort)
for (auto& p : producers) {
p = std::thread([this, &queue_name] {
librados::ObjectWriteOperation op;
for (auto i = 0U; i < number_of_ops; ++i) {
for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, 0);
Expand Down Expand Up @@ -451,7 +451,7 @@ TEST_F(TestCls2PCQueue, ReserveCommit)
for (auto& r : reservers) {
r = std::thread([this, &queue_name] {
librados::ObjectWriteOperation op;
for (auto i = 0U; i < number_of_ops; ++i) {
for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
Expand Down Expand Up @@ -506,7 +506,7 @@ TEST_F(TestCls2PCQueue, ReserveAbort)
for (auto& r : reservers) {
r = std::thread([this, &queue_name] {
librados::ObjectWriteOperation op;
for (auto i = 0U; i < number_of_ops; ++i) {
for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
Expand Down Expand Up @@ -556,7 +556,7 @@ TEST_F(TestCls2PCQueue, ManualCleanup)
for (auto& r : reservers) {
r = std::thread([this, &queue_name] {
librados::ObjectWriteOperation op;
for (auto i = 0U; i < number_of_ops; ++i) {
for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
Expand Down Expand Up @@ -630,7 +630,7 @@ TEST_F(TestCls2PCQueue, Cleanup)
for (auto& r : reservers) {
r = std::thread([this, &queue_name] {
librados::ObjectWriteOperation op;
for (auto i = 0U; i < number_of_ops; ++i) {
for (auto i = 0U; i < number_of_ops; ++i) {
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, size_to_reserve, number_of_elements, res_id), 0);
ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
Expand Down Expand Up @@ -683,7 +683,7 @@ TEST_F(TestCls2PCQueue, MultiProducer)
for (auto& p : producers) {
p = std::thread([this, &queue_name, &producer_count] {
librados::ObjectWriteOperation op;
for (auto i = 0U; i < number_of_ops; ++i) {
for (auto i = 0U; i < number_of_ops; ++i) {
const std::string element_prefix("op-" +to_string(i) + "-element-");
std::vector<bufferlist> data(number_of_elements);
auto total_size = 0UL;
Expand All @@ -709,15 +709,20 @@ TEST_F(TestCls2PCQueue, MultiProducer)
librados::ObjectWriteOperation op;
const auto max_elements = 42;
const std::string marker;
bool truncated = false;
bool truncated = true;
std::string end_marker;
std::vector<cls_queue_entry> entries;
while (producer_count > 0 || truncated) {
const auto ret = cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
ASSERT_EQ(0, ret);
consume_count += entries.size();
cls_2pc_queue_remove_entries(op, end_marker, max_elements);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
if (entries.empty()) {
// queue is empty, let it fill
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} else {
consume_count += entries.size();
cls_2pc_queue_remove_entries(op, end_marker, max_elements);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
}
}
});

Expand All @@ -737,7 +742,6 @@ TEST_F(TestCls2PCQueue, AsyncConsumer)
cls_2pc_queue_init(wop, queue_name, max_size);
ASSERT_EQ(0, ioctx.operate(queue_name, &wop));


for (auto i = 0U; i < number_of_ops; ++i) {
const std::string element_prefix("op-" +to_string(i) + "-element-");
std::vector<bufferlist> data(number_of_elements);
Expand All @@ -762,21 +766,21 @@ TEST_F(TestCls2PCQueue, AsyncConsumer)
librados::ObjectReadOperation rop;
auto consume_count = 0U;
std::vector<cls_queue_entry> entries;
bool truncated = true;
bool truncated = true;
while (truncated) {
bufferlist bl;
int rc;
bufferlist bl;
int rc;
cls_2pc_queue_list_entries(rop, marker, max_elements, &bl, &rc);
ASSERT_EQ(0, ioctx.operate(queue_name, &rop, nullptr));
ASSERT_EQ(rc, 0);
ASSERT_EQ(cls_2pc_queue_list_entries_result(bl, entries, &truncated, end_marker), 0);
consume_count += entries.size();
cls_2pc_queue_remove_entries(wop, end_marker, max_elements);
marker = end_marker;
marker = end_marker;
}

ASSERT_EQ(consume_count, number_of_ops*number_of_elements);
// execute all delete operations in a batch
// execute all delete operations in a batch
ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
// make sure that queue is empty
ASSERT_EQ(cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker), 0);
Expand All @@ -803,7 +807,7 @@ TEST_F(TestCls2PCQueue, MultiProducerConsumer)
for (auto& p : producers) {
p = std::thread([this, &queue_name, &producer_count, &retry_happened] {
librados::ObjectWriteOperation op;
for (auto i = 0U; i < number_of_ops; ++i) {
for (auto i = 0U; i < number_of_ops; ++i) {
const std::string element_prefix("op-" +to_string(i) + "-element-");
std::vector<bufferlist> data(number_of_elements);
auto total_size = 0UL;
Expand Down Expand Up @@ -839,7 +843,7 @@ TEST_F(TestCls2PCQueue, MultiProducerConsumer)
c = std::thread([this, &queue_name, &producer_count] {
librados::ObjectWriteOperation op;
const std::string marker;
bool truncated = false;
bool truncated = true;
std::string end_marker;
std::vector<cls_queue_entry> entries;
while (producer_count > 0 || truncated) {
Expand Down

0 comments on commit 7077a7f

Please sign in to comment.