From 23e290a4ecbeeba4551a201021960ad3efc950fa Mon Sep 17 00:00:00 2001 From: Krzysztof Filipek Date: Wed, 13 Jul 2022 18:50:43 +0200 Subject: [PATCH] [tests] add async cases for timestamp test --- tests/common/stream_helpers.hpp | 1 - tests/unittest/timestamp.cpp | 89 +++++++++++++++++++++++++++++++-- 2 files changed, 84 insertions(+), 6 deletions(-) diff --git a/tests/common/stream_helpers.hpp b/tests/common/stream_helpers.hpp index 6a43bc6c..c6a47484 100644 --- a/tests/common/stream_helpers.hpp +++ b/tests/common/stream_helpers.hpp @@ -371,7 +371,6 @@ struct pmemstream_helpers_type { rrt = it->second; } for (const auto &e : data) { - auto [ret, entry] = stream.append(region, e, rrt); UT_ASSERTeq(ret, 0); } diff --git a/tests/unittest/timestamp.cpp b/tests/unittest/timestamp.cpp index d3c2dc1f..870e6fb8 100644 --- a/tests/unittest/timestamp.cpp +++ b/tests/unittest/timestamp.cpp @@ -1,6 +1,8 @@ // SPDX-License-Identifier: BSD-3-Clause /* Copyright 2022, Intel Corporation */ +#include + #include "libpmemstream.h" #include "rapidcheck_helpers.hpp" #include "stream_helpers.hpp" @@ -11,6 +13,31 @@ * timestamp - unit test for testing method pmemstream_entry_timestamp() */ +void multithreaded_asynchronous_append(pmemstream_test_base &stream, const std::vector ®ions, + const std::vector> &data) +{ + using future_type = decltype(stream.helpers.async_append(regions[0], data[0])); + std::vector> futures(data.size()); + + parallel_exec(data.size(), [&](size_t thread_id) { + for (auto &chunk : data) { + futures[thread_id].emplace_back(stream.helpers.async_append(regions[thread_id], chunk)); + } + }); + + for (auto &future_sequence : futures) { + std::mt19937_64 g(*rc::gen::arbitrary()); + std::shuffle(future_sequence.begin(), future_sequence.end(), g); + } + + parallel_exec(data.size(), [&](size_t thread_id) { + for (auto &fut : futures[thread_id]) { + while (fut.poll() != FUTURE_STATE_COMPLETE) + ; + } + }); +} + void multithreaded_synchronous_append(pmemstream_test_base &stream, const std::vector ®ions, const std::vector> &data) { @@ -37,8 +64,7 @@ std::tuple, size_t> generate_and_append_data(pmem concurrency_level, rc::gen::arbitrary>()); if (async) { - // XXX: multithreaded_asynchronous_append(stream, regions, data); - UT_ASSERT(false); + multithreaded_asynchronous_append(stream, regions, data); } else { multithreaded_synchronous_append(stream, regions, data); } @@ -52,9 +78,9 @@ std::tuple, size_t> generate_and_append_data(pmem } size_t remove_random_region(pmemstream_with_multi_empty_regions &stream, std::vector ®ions, - size_t concurrency_level) + test_config_type &config) { - size_t pos = *rc::gen::inRange(0, concurrency_level); + size_t pos = *rc::gen::inRange(0, get_concurrency_level(config, regions)); auto region_to_remove = regions[pos]; auto region_size = stream.sut.region_size(region_to_remove); UT_ASSERTeq(stream.helpers.remove_region(region_to_remove.offset), 0); @@ -104,7 +130,7 @@ int main(int argc, char *argv[]) generate_and_append_data(stream, test_config, false /* sync */); auto concurrency_level = get_concurrency_level(test_config, regions); - auto region_size = remove_random_region(stream, regions, concurrency_level); + auto region_size = remove_random_region(stream, regions, test_config); /* Global ordering validation. */ if (regions.size() >= 1) @@ -115,6 +141,59 @@ int main(int argc, char *argv[]) regions.push_back(stream.helpers.initialize_single_region(region_size, extra_data)); + UT_ASSERTeq(stream.helpers.get_entries_from_regions(regions).size(), + elements * (concurrency_level - 1) + extra_data.size()); + UT_ASSERT(stream.helpers.validate_timestamps_possible_gaps(regions)); + }); + + ret += rc::check("timestamp values should increase in each region after asynchronous append", + [&](pmemstream_with_multi_empty_regions &&stream) { + auto [regions, elements] = + generate_and_append_data(stream, test_config, true /* async */); + + /* Single region ordering validation. */ + for (auto ®ion : regions) { + UT_ASSERT(stream.helpers.validate_timestamps_possible_gaps({region})); + } + }); + + ret += rc::check( + "timestamp values should globally increase in multi-region environment after asynchronous append", + [&](pmemstream_with_multi_empty_regions &&stream) { + auto [regions, elements] = + generate_and_append_data(stream, test_config, true /* async */); + + /* Global ordering validation */ + UT_ASSERT(stream.helpers.validate_timestamps_no_gaps(regions)); + }); + + ret += rc::check( + "timestamp values should globally increase in multi-region environment after asynchronous append to respawned region", + [&](pmemstream_with_multi_empty_regions &&stream, const std::vector &extra_data) { + RC_PRE(extra_data.size() > 0); + auto [regions, elements] = + generate_and_append_data(stream, test_config, true /* async */); + auto concurrency_level = get_concurrency_level(test_config, regions); + + auto region_size = remove_random_region(stream, regions, test_config); + + /* Global ordering validation. */ + if (regions.size() >= 1) + UT_ASSERT(stream.helpers.validate_timestamps_possible_gaps(regions)); + + UT_ASSERTeq(stream.helpers.get_entries_from_regions(regions).size(), + elements * (concurrency_level - 1)); + + { + auto [ret, region] = stream.sut.region_allocate(region_size); + UT_ASSERTeq(ret, 0); + regions.push_back(region); + + auto future = stream.helpers.async_append(region, extra_data); + while (future.poll() != FUTURE_STATE_COMPLETE) + ; + } + UT_ASSERTeq(stream.helpers.get_entries_from_regions(regions).size(), elements * (concurrency_level - 1) + extra_data.size()); UT_ASSERT(stream.helpers.validate_timestamps_possible_gaps(regions));