From 8944f2bb63c94980b175180949d580219edc3af6 Mon Sep 17 00:00:00 2001 From: Ognyan Stoimenov Date: Tue, 17 Dec 2024 16:51:12 +0200 Subject: [PATCH 01/11] Implementation --- cpp/arcticdb/version/test/test_version_map.cpp | 8 ++++++-- cpp/arcticdb/version/version_map.hpp | 15 +++++---------- cpp/arcticdb/version/version_map_entry.hpp | 4 +--- cpp/arcticdb/version/version_utils.hpp | 1 + 4 files changed, 13 insertions(+), 15 deletions(-) diff --git a/cpp/arcticdb/version/test/test_version_map.cpp b/cpp/arcticdb/version/test/test_version_map.cpp index 4a12695561..16ccc57071 100644 --- a/cpp/arcticdb/version/test/test_version_map.cpp +++ b/cpp/arcticdb/version/test/test_version_map.cpp @@ -669,8 +669,12 @@ TEST(VersionMap, FollowingVersionChainWithCaching){ // LATEST should still be cached, but the cached entry now needs to have no undeleted keys check_loads_versions(LoadStrategy{LoadType::LATEST, LoadObjective::INCLUDE_DELETED}, 2, 0); + EXPECT_FALSE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::INCLUDE_DELETED, static_cast(-1)})); // FROM_TIME UNDELETED_ONLY should no longer be cached even though we used the same request before because the undeleted key it went to got deleted. So it will load the entire version chain check_loads_versions(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(10)}, 3, 0); + // We have the full version chain loaded, so has_cached_entry should always return true (even when requesting timestamp before earliest version) + EXPECT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(-1)})); + EXPECT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::INCLUDE_DELETED, static_cast(-1)})); // We add a new undeleted key auto key4 = atom_key_with_version(id, 3, 5); @@ -979,8 +983,8 @@ TEST(VersionMap, TombstoneAllFromEntry) { ASSERT_EQ(version_id, 0); - // With cached entry from the write ops - // Tombstone all should succeed as we are not relying on the ref key + // With cached entry from the write ops + // Tombstone all should succeed as we are not relying on the ref key version_map->tombstone_from_key_or_all(store, id, dummy_key, entry); auto [maybe_prev_cached_entry, deleted_cached_entry] = get_latest_version(store, version_map, id); diff --git a/cpp/arcticdb/version/version_map.hpp b/cpp/arcticdb/version/version_map.hpp index cb9a15786c..112ff2449b 100644 --- a/cpp/arcticdb/version/version_map.hpp +++ b/cpp/arcticdb/version/version_map.hpp @@ -581,7 +581,9 @@ class VersionMapImpl { return false; } - LoadType cached_load_type = entry->load_strategy_.load_type_; + if (entry->load_progress_.is_earliest_version_loaded) { + return true; + } switch (requested_load_type) { case LoadType::NOT_LOADED: @@ -606,14 +608,9 @@ class VersionMapImpl { return cached_timestamp <= requested_load_strategy.load_from_time_.value(); } case LoadType::ALL: - // We can use cache when it was populated by a ALL call, in which case it is only unsafe to use - // when the cache is of undeleted versions and the request is for all versions - if (cached_load_type==LoadType::ALL){ - return entry->load_strategy_.should_include_deleted() || !requested_load_strategy.should_include_deleted(); - } - return false; + case LoadType::UNKNOWN: default: - util::raise_rte("Unexpected load type in cache {}", cached_load_type); + return false; } } @@ -763,12 +760,10 @@ class VersionMapImpl { const auto clock_unsync_tolerance = ConfigsMap::instance()->get_int("VersionMap.UnsyncTolerance", DEFAULT_CLOCK_UNSYNC_TOLERANCE); entry->last_reload_time_ = Clock::nanos_since_epoch() - clock_unsync_tolerance; - entry->load_strategy_ = LoadStrategy{LoadType::NOT_LOADED, LoadObjective::INCLUDE_DELETED}; // FUTURE: to make more thread-safe with #368 auto temp = std::make_shared(*entry); load_via_ref_key(store, stream_id, load_strategy, temp); std::swap(*entry, *temp); - entry->load_strategy_ = load_strategy; util::check(entry->keys_.empty() || entry->head_, "Non-empty VersionMapEntry should set head"); if (validate_) diff --git a/cpp/arcticdb/version/version_map_entry.hpp b/cpp/arcticdb/version/version_map_entry.hpp index 944ad007ad..8a82959a2e 100644 --- a/cpp/arcticdb/version/version_map_entry.hpp +++ b/cpp/arcticdb/version/version_map_entry.hpp @@ -203,6 +203,7 @@ struct LoadProgress { VersionId oldest_loaded_undeleted_index_version_ = std::numeric_limits::max(); timestamp earliest_loaded_timestamp_ = std::numeric_limits::max(); timestamp earliest_loaded_undeleted_timestamp_ = std::numeric_limits::max(); + bool is_earliest_version_loaded { false }; }; struct VersionMapEntry { @@ -241,7 +242,6 @@ struct VersionMapEntry { tombstone_all_.reset(); keys_.clear(); load_progress_ = LoadProgress{}; - load_strategy_ = LoadStrategy{LoadType::NOT_LOADED, LoadObjective::INCLUDE_DELETED}; } bool empty() const { @@ -258,7 +258,6 @@ struct VersionMapEntry { swap(left.last_reload_time_, right.last_reload_time_); swap(left.tombstone_all_, right.tombstone_all_); swap(left.head_, right.head_); - swap(left.load_strategy_, right.load_strategy_); swap(left.load_progress_, right.load_progress_); } @@ -432,7 +431,6 @@ struct VersionMapEntry { } std::optional head_; - LoadStrategy load_strategy_ = LoadStrategy{LoadType::NOT_LOADED, LoadObjective::INCLUDE_DELETED}; timestamp last_reload_time_ = 0; LoadProgress load_progress_; std::deque keys_; diff --git a/cpp/arcticdb/version/version_utils.hpp b/cpp/arcticdb/version/version_utils.hpp index 22925db0c7..4d58cd6638 100644 --- a/cpp/arcticdb/version/version_utils.hpp +++ b/cpp/arcticdb/version/version_utils.hpp @@ -108,6 +108,7 @@ inline std::optional read_segment_with_keys( load_progress.oldest_loaded_undeleted_index_version_ = std::min(load_progress.oldest_loaded_undeleted_index_version_, oldest_loaded_undeleted_index); load_progress.earliest_loaded_timestamp_ = std::min(load_progress.earliest_loaded_timestamp_, earliest_loaded_timestamp); load_progress.earliest_loaded_undeleted_timestamp_ = std::min(load_progress.earliest_loaded_undeleted_timestamp_, earliest_loaded_undeleted_timestamp); + load_progress.is_earliest_version_loaded = !next.has_value(); return next; } From 7d7ec06b6b1d2386fb460dba17d2013da2f3b8f9 Mon Sep 17 00:00:00 2001 From: Ognyan Stoimenov Date: Thu, 19 Dec 2024 14:06:19 +0200 Subject: [PATCH 02/11] Tests --- .../version/test/test_version_map.cpp | 98 +++++++++++-------- cpp/arcticdb/version/version_map_entry.hpp | 4 +- 2 files changed, 60 insertions(+), 42 deletions(-) diff --git a/cpp/arcticdb/version/test/test_version_map.cpp b/cpp/arcticdb/version/test/test_version_map.cpp index 16ccc57071..0bd4265cdd 100644 --- a/cpp/arcticdb/version/test/test_version_map.cpp +++ b/cpp/arcticdb/version/test/test_version_map.cpp @@ -547,23 +547,22 @@ TEST(VersionMap, StorageLogging) { ASSERT_EQ(tomb_keys, 3u); } -std::shared_ptr write_two_versions( - std::shared_ptr store, - std::shared_ptr version_map, - const StreamId& id) { +std::shared_ptr write_versions( + const std::shared_ptr& store, + const std::shared_ptr& version_map, + const StreamId& id, + int number_of_versions) { auto entry = version_map->check_reload( store, id, LoadStrategy{LoadType::NOT_LOADED, LoadObjective::INCLUDE_DELETED}, __FUNCTION__); - auto key1 = atom_key_with_version(id, 0, 0); - version_map->do_write(store, key1, entry); - write_symbol_ref(store, key1, std::nullopt, entry->head_.value()); - auto key2 = atom_key_with_version(id, 1, 1); - version_map->do_write(store, key2, entry); - // We override the symbol ref without a prev_key on purpose. This way we'll only load the version=1 from the ref key - write_symbol_ref(store, key2, std::nullopt, entry->head_.value()); + for (int i = 0; i < number_of_versions; i++) { + auto key = atom_key_with_version(id, i, i); + version_map->do_write(store, key, entry); + write_symbol_ref(store, key, std::nullopt, entry->head_.value()); + } return entry; } @@ -696,7 +695,7 @@ TEST(VersionMap, FollowingVersionChainEndEarlyOnTombstoneAll) { auto version_map = std::make_shared(); StreamId id{"test"}; - write_two_versions(store, version_map, id); + write_versions(store, version_map, id, 2); // Deleting should add a TOMBSTONE_ALL which should end searching for undeleted versions early. version_map->delete_all_versions(store, id); @@ -809,7 +808,7 @@ TEST(VersionMap, CacheInvalidationWithTombstoneAfterLoad) { auto version_map = std::make_shared(); StreamId id{"test"}; - write_two_versions(store, version_map, id); + write_versions(store, version_map, id, 2); // Use a clean version_map version_map = std::make_shared(); @@ -845,45 +844,62 @@ TEST(VersionMap, CacheInvalidationWithTombstoneAfterLoad) { TEST(VersionMap, CacheInvalidationWithTombstoneAllAfterLoad) { using namespace arcticdb; - // Given - symbol with 2 versions - load downto version 0 + // Given - symbol with 3 versions - load downto version 1 or 0 // never time-invalidate the cache so we can test our other cache invalidation logic ScopedConfig sc("VersionMap.ReloadInterval", std::numeric_limits::max()); auto store = std::make_shared(); auto version_map = std::make_shared(); StreamId id{"test"}; - write_two_versions(store, version_map, id); + write_versions(store, version_map, id, 3); // Use a clean version_map version_map = std::make_shared(); - auto entry = version_map->check_reload( - store, - id, - LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(0)}, - __FUNCTION__); - - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::DOWNTO, LoadObjective::UNDELETED_ONLY, static_cast(-1)})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::DOWNTO, LoadObjective::UNDELETED_ONLY, static_cast(-2)})); - - // When - we delete version 1 - auto tombstone_key = version_map->write_tombstone(store, VersionId{1}, id, entry); - - // We should not invalidate the cache because the version we loaded to is still undeleted - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)})); - // When - we delete all versions without reloading - version_map->write_tombstone_all_key_internal(store, tombstone_key, entry); - - // We should invalidate cached undeleted checks - ASSERT_FALSE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY})); - ASSERT_FALSE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)})); - ASSERT_FALSE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)})); + for (const auto& load_strategy : {LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(1)}, + LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(0)}}) + { + const bool is_loaded_to_0 = load_strategy.load_until_version_ == 0; + auto entry = version_map->check_reload( + store, + id, + load_strategy, + __FUNCTION__); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(2)})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)})); + ASSERT_EQ(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)}), is_loaded_to_0); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::DOWNTO, LoadObjective::UNDELETED_ONLY, static_cast(-1)})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::DOWNTO, LoadObjective::UNDELETED_ONLY, static_cast(-2)})); + ASSERT_EQ(version_map->has_cached_entry(id, LoadStrategy{LoadType::DOWNTO, LoadObjective::UNDELETED_ONLY, static_cast(-3)}), is_loaded_to_0); + + // When - we delete version 2 + auto tombstone_key = version_map->write_tombstone(store, VersionId{2}, id, entry); + + // We should not invalidate the cache because the version we loaded to is still undeleted + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(2)})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)})); + ASSERT_EQ(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)}), is_loaded_to_0); + + // When - we delete all versions without reloading + version_map->write_tombstone_all_key_internal(store, tombstone_key, entry); + + if (is_loaded_to_0) { + // If we have loaded everything we should not invalidate cache + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(2)})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)})); + } + else { + // If we haven't loaded everything we should invalidate cached undeleted checks + ASSERT_FALSE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY})); + ASSERT_FALSE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)})); + ASSERT_FALSE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)})); + } + } } TEST(VersionMap, CompactionUpdateCache) { diff --git a/cpp/arcticdb/version/version_map_entry.hpp b/cpp/arcticdb/version/version_map_entry.hpp index 8a82959a2e..65eaf83ed3 100644 --- a/cpp/arcticdb/version/version_map_entry.hpp +++ b/cpp/arcticdb/version/version_map_entry.hpp @@ -16,6 +16,8 @@ #include #include +#include "async/tasks.hpp" + namespace arcticdb { using namespace arcticdb::entity; using namespace arcticdb::stream; @@ -264,7 +266,7 @@ struct VersionMapEntry { // Below four functions used to return optional of the tombstone, but copying keys is expensive and only // one function was actually interested in the key, so they now return bool. See get_tombstone(). bool has_individual_tombstone(VersionId version_id) const { - return tombstones_.count(version_id) != 0; + return tombstones_.contains(version_id); } bool is_tombstoned_via_tombstone_all(VersionId version_id) const { From 1951261fd905b339bd23023c0c5f441c91e882f3 Mon Sep 17 00:00:00 2001 From: Ognyan Stoimenov Date: Thu, 19 Dec 2024 18:01:36 +0200 Subject: [PATCH 03/11] Fix tests --- .../version/test/test_version_map.cpp | 62 +++++++++++++------ 1 file changed, 43 insertions(+), 19 deletions(-) diff --git a/cpp/arcticdb/version/test/test_version_map.cpp b/cpp/arcticdb/version/test/test_version_map.cpp index 0bd4265cdd..19f5ec5c68 100644 --- a/cpp/arcticdb/version/test/test_version_map.cpp +++ b/cpp/arcticdb/version/test/test_version_map.cpp @@ -551,17 +551,31 @@ std::shared_ptr write_versions( const std::shared_ptr& store, const std::shared_ptr& version_map, const StreamId& id, - int number_of_versions) { + size_t number_of_versions, + std::vector tombstones_versions = {}, + const std::optional& tombstone_all_version = std::nullopt) { auto entry = version_map->check_reload( store, id, LoadStrategy{LoadType::NOT_LOADED, LoadObjective::INCLUDE_DELETED}, __FUNCTION__); - for (int i = 0; i < number_of_versions; i++) { + std::sort(tombstones_versions.begin(), tombstones_versions.end()); + size_t j = 0; + for (size_t i = 0; i < number_of_versions; i++) { auto key = atom_key_with_version(id, i, i); version_map->do_write(store, key, entry); write_symbol_ref(store, key, std::nullopt, entry->head_.value()); + + if (tombstone_all_version.has_value() && *tombstone_all_version == i) { + version_map->delete_all_versions(store, id); + } + + if (j < tombstones_versions.size() && tombstones_versions[j] == i) { + version_map->write_tombstone(store, VersionId{i}, id, entry); + j+=1; + } + } return entry; @@ -603,7 +617,7 @@ TEST(VersionMap, FollowingVersionChain){ auto store = std::make_shared(); auto version_map = std::make_shared(); StreamId id{"test"}; - write_alternating_deleted_undeleted(store, version_map, id); + write_versions(store, version_map, id, 3, {2}, 0); auto check_strategy_loads_to = [&](LoadStrategy load_strategy, VersionId should_load_to){ auto ref_entry = VersionMapEntry{}; @@ -639,7 +653,7 @@ TEST(VersionMap, FollowingVersionChainWithCaching){ auto store = std::make_shared(); auto version_map = std::make_shared(); StreamId id{"test"}; - write_alternating_deleted_undeleted(store, version_map, id); + write_versions(store, version_map, id, 3, {2}, 0); // We create an empty version map after populating the versions version_map = std::make_shared(); @@ -731,11 +745,11 @@ TEST(VersionMap, FollowingVersionChainEndEarlyOnTombstoneAll) { TEST(VersionMap, CacheInvalidation) { ScopedConfig sc("VersionMap.ReloadInterval", std::numeric_limits::max()); - // Set up the version chain v0(tombstone_all) <- v1 <- v2(tombstoned) + // Set up the version chain v0 <- v1(tombstone_all) <- v2 <- v3(tombstoned) auto store = std::make_shared(); auto version_map = std::make_shared(); StreamId id{"test"}; - write_alternating_deleted_undeleted(store, version_map, id); + write_versions(store, version_map, id, 4, {3}, 1); auto check_caching = [&](LoadStrategy to_load, LoadStrategy to_check_if_cached, bool expected_outcome){ auto clean_version_map = std::make_shared(); @@ -753,48 +767,58 @@ TEST(VersionMap, CacheInvalidation) { } }; - auto load_all_param = LoadStrategy{LoadType::ALL, LoadObjective::INCLUDE_DELETED}; - auto load_all_undeleted_param = LoadStrategy{LoadType::ALL, LoadObjective::UNDELETED_ONLY}; + const auto load_all_param = LoadStrategy{LoadType::ALL, LoadObjective::INCLUDE_DELETED}; + const auto load_all_undeleted_param = LoadStrategy{LoadType::ALL, LoadObjective::UNDELETED_ONLY}; check_caching(load_all_param, load_all_undeleted_param, true); check_caching(load_all_undeleted_param, load_all_param, false); - constexpr auto num_versions = 3u; + constexpr auto num_versions = 4u; std::vector should_load_to_v[num_versions] = { // Different parameters which should all load to v0 std::vector{ LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(0)}, - LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(-3)}, + LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(-4)}, LoadStrategy{LoadType::FROM_TIME, LoadObjective::INCLUDE_DELETED, static_cast(0)}, }, // Different parameters which should all load to v1 std::vector{ LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(1)}, - LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(-2)}, + LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(-3)}, LoadStrategy{LoadType::FROM_TIME, LoadObjective::INCLUDE_DELETED, static_cast(1)}, - LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, - static_cast(2)}, // when include_deleted=false FROM_TIME searches for an undeleted version - LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY}, }, // Different parameters which should all load to v2 std::vector{ LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(2)}, - LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(-1)}, + LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(-2)}, LoadStrategy{LoadType::FROM_TIME, LoadObjective::INCLUDE_DELETED, static_cast(2)}, + LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, + static_cast(3)}, // when include_deleted=false FROM_TIME searches for an undeleted version + LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY}, + }, + // Different parameters which should all load to v3 + std::vector{ + LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(3)}, + LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(-1)}, + LoadStrategy{LoadType::FROM_TIME, LoadObjective::INCLUDE_DELETED, static_cast(3)}, LoadStrategy{LoadType::LATEST, LoadObjective::INCLUDE_DELETED}, } }; for (auto i=0u; i Date: Fri, 20 Dec 2024 18:01:03 +0200 Subject: [PATCH 04/11] test --- cpp/arcticdb/version/test/test_version_map.cpp | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/cpp/arcticdb/version/test/test_version_map.cpp b/cpp/arcticdb/version/test/test_version_map.cpp index 19f5ec5c68..c3f437cf06 100644 --- a/cpp/arcticdb/version/test/test_version_map.cpp +++ b/cpp/arcticdb/version/test/test_version_map.cpp @@ -767,11 +767,6 @@ TEST(VersionMap, CacheInvalidation) { } }; - const auto load_all_param = LoadStrategy{LoadType::ALL, LoadObjective::INCLUDE_DELETED}; - const auto load_all_undeleted_param = LoadStrategy{LoadType::ALL, LoadObjective::UNDELETED_ONLY}; - check_caching(load_all_param, load_all_undeleted_param, true); - check_caching(load_all_undeleted_param, load_all_param, false); - constexpr auto num_versions = 4u; std::vector should_load_to_v[num_versions] = { // Different parameters which should all load to v0 @@ -779,6 +774,7 @@ TEST(VersionMap, CacheInvalidation) { LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(0)}, LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(-4)}, LoadStrategy{LoadType::FROM_TIME, LoadObjective::INCLUDE_DELETED, static_cast(0)}, + LoadStrategy{LoadType::ALL, LoadObjective::INCLUDE_DELETED} }, // Different parameters which should all load to v1 @@ -786,6 +782,7 @@ TEST(VersionMap, CacheInvalidation) { LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(1)}, LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(-3)}, LoadStrategy{LoadType::FROM_TIME, LoadObjective::INCLUDE_DELETED, static_cast(1)}, + LoadStrategy{LoadType::ALL, LoadObjective::UNDELETED_ONLY} }, // Different parameters which should all load to v2 @@ -811,14 +808,6 @@ TEST(VersionMap, CacheInvalidation) { // For every two versions we check that all load params for earlier versions cache load params for later versions: check_all_caching(should_load_to_v[i], should_load_to_v[j], i<=j); } - - // LOAD_ALL loads to version 0 - check_all_caching({load_all_param}, should_load_to_v[i], true); - // Load_all_undeleted loads to version 1 because of the tombstone_all in ver 1 - check_all_caching({load_all_undeleted_param}, should_load_to_v[i], i != 0); - - // If we have loaded to version 0 all load requests are true - check_all_caching(should_load_to_v[i], {load_all_param, load_all_undeleted_param}, i == 0); } } From 8a6a3deb275b2a7e5e24ed72f6593a6696de7221 Mon Sep 17 00:00:00 2001 From: Ognyan Stoimenov Date: Mon, 23 Dec 2024 17:26:25 +0200 Subject: [PATCH 05/11] PR comments --- .../version/test/test_version_map.cpp | 99 ++++++++++--------- cpp/arcticdb/version/version_map.hpp | 4 +- 2 files changed, 54 insertions(+), 49 deletions(-) diff --git a/cpp/arcticdb/version/test/test_version_map.cpp b/cpp/arcticdb/version/test/test_version_map.cpp index 0659eafe77..0247a8c56c 100644 --- a/cpp/arcticdb/version/test/test_version_map.cpp +++ b/cpp/arcticdb/version/test/test_version_map.cpp @@ -547,35 +547,44 @@ TEST(VersionMap, StorageLogging) { ASSERT_EQ(tomb_keys, 3u); } +struct VersionChainOperation { + enum class Type { + WRITE, + TOMBSTONE, + TOMBSTONE_ALL + } type {Type::WRITE}; + + VersionId version_id { 0 }; +}; + std::shared_ptr write_versions( const std::shared_ptr& store, const std::shared_ptr& version_map, const StreamId& id, - size_t number_of_versions, - std::vector tombstones_versions = {}, - const std::optional& tombstone_all_version = std::nullopt) { + const std::vector& operations) { auto entry = version_map->check_reload( store, id, LoadStrategy{LoadType::NOT_LOADED, LoadObjective::INCLUDE_DELETED}, __FUNCTION__); - std::sort(tombstones_versions.begin(), tombstones_versions.end()); - size_t j = 0; - for (size_t i = 0; i < number_of_versions; i++) { - auto key = atom_key_with_version(id, i, i); - version_map->do_write(store, key, entry); - write_symbol_ref(store, key, std::nullopt, entry->head_.value()); - - if (tombstone_all_version.has_value() && *tombstone_all_version == i) { - version_map->delete_all_versions(store, id); - } - - if (j < tombstones_versions.size() && tombstones_versions[j] == i) { - version_map->write_tombstone(store, VersionId{i}, id, entry); - j+=1; + for (const auto& [type, version_id]: operations) { + switch (type) { + case VersionChainOperation::Type::WRITE: { + auto key = atom_key_with_version(id, version_id, version_id); + version_map->do_write(store, key, entry); + write_symbol_ref(store, key, std::nullopt, entry->head_.value()); + break; + } + case VersionChainOperation::Type::TOMBSTONE: { + version_map->write_tombstone(store, version_id, id, entry); + break; + } + case VersionChainOperation::Type::TOMBSTONE_ALL: { + version_map->delete_all_versions(store, id); + break; + } } - } return entry; @@ -583,33 +592,20 @@ std::shared_ptr write_versions( // Produces the following version chain: v0 <- tombstone_all <- v1 <- v2 <- tombstone void write_alternating_deleted_undeleted(std::shared_ptr store, std::shared_ptr version_map, StreamId id) { - auto entry = version_map->check_reload( - store, - id, - LoadStrategy{LoadType::NOT_LOADED, LoadObjective::INCLUDE_DELETED}, - __FUNCTION__); - - auto key1 = atom_key_with_version(id, 0, 0); - auto key2 = atom_key_with_version(id, 1, 1); - auto key3 = atom_key_with_version(id, 2, 2); - - // Write version 0 - version_map->do_write(store, key1, entry); - write_symbol_ref(store, key1, std::nullopt, entry->head_.value()); - - // Tombstone_all on version 0 - version_map->delete_all_versions(store, id); - - // Write version 1 - version_map->do_write(store, key2, entry); - write_symbol_ref(store, key2, std::nullopt, entry->head_.value()); - - // Write version 2 - version_map->do_write(store, key3, entry); - write_symbol_ref(store, key3, std::nullopt, entry->head_.value()); + using Type = VersionChainOperation::Type; + write_versions(store, version_map, id, {{Type::WRITE, 0}, + {Type::TOMBSTONE_ALL}, + {Type::WRITE, 1}, + {Type::WRITE, 2}, + {Type::TOMBSTONE, 2}}); +} - // Tombstone version 2 - version_map->write_tombstone(store, VersionId{2}, id, entry, timestamp{3}); +void write_versions(std::shared_ptr store, std::shared_ptr version_map, StreamId id, int number_of_versions) { + std::vector version_chain; + for (int i = 0; i < number_of_versions; i++) { + version_chain.emplace_back(VersionChainOperation::Type::WRITE, i); + } + write_versions(store, version_map, id, version_chain); } TEST(VersionMap, FollowingVersionChain){ @@ -617,7 +613,7 @@ TEST(VersionMap, FollowingVersionChain){ auto store = std::make_shared(); auto version_map = std::make_shared(); StreamId id{"test"}; - write_versions(store, version_map, id, 3, {2}, 0); + write_alternating_deleted_undeleted(store, version_map, id); auto check_strategy_loads_to = [&](LoadStrategy load_strategy, VersionId should_load_to){ auto ref_entry = VersionMapEntry{}; @@ -653,7 +649,7 @@ TEST(VersionMap, FollowingVersionChainWithCaching){ auto store = std::make_shared(); auto version_map = std::make_shared(); StreamId id{"test"}; - write_versions(store, version_map, id, 3, {2}, 0); + write_alternating_deleted_undeleted(store, version_map, id); // We create an empty version map after populating the versions version_map = std::make_shared(); @@ -743,13 +739,20 @@ TEST(VersionMap, FollowingVersionChainEndEarlyOnTombstoneAll) { } } -TEST(VersionMap, CacheInvalidation) { +TEST(VersionMap, HasCachedEntry) { ScopedConfig sc("VersionMap.ReloadInterval", std::numeric_limits::max()); // Set up the version chain v0 <- v1(tombstone_all) <- v2 <- v3(tombstoned) auto store = std::make_shared(); auto version_map = std::make_shared(); StreamId id{"test"}; - write_versions(store, version_map, id, 4, {3}, 1); + using Type = VersionChainOperation::Type; + std::vector version_chain = {{Type::WRITE, 0}, + {Type::WRITE, 1}, + {Type::TOMBSTONE_ALL}, + {Type::WRITE, 2}, + {Type::WRITE, 3}, + {Type::TOMBSTONE, 3}}; + write_versions(store, version_map, id, version_chain); auto check_caching = [&](LoadStrategy to_load, LoadStrategy to_check_if_cached, bool expected_outcome){ auto clean_version_map = std::make_shared(); diff --git a/cpp/arcticdb/version/version_map.hpp b/cpp/arcticdb/version/version_map.hpp index 4e5255d6ac..c9c6bef136 100644 --- a/cpp/arcticdb/version/version_map.hpp +++ b/cpp/arcticdb/version/version_map.hpp @@ -581,7 +581,9 @@ class VersionMapImpl { return false; } - if (entry->load_progress_.is_earliest_version_loaded) { + const bool has_loaded_everything = entry->load_progress_.is_earliest_version_loaded; + const bool has_loaded_earliest_undeleted = entry->tombstone_all_.has_value() && entry->load_progress_.oldest_loaded_index_version_ <= entry->tombstone_all_->version_id(); + if (has_loaded_everything || (!requested_load_strategy.should_include_deleted() && has_loaded_earliest_undeleted)) { return true; } From 3aa26ac38e8094e44f521bba0eb6ef41e307d4cb Mon Sep 17 00:00:00 2001 From: Ognyan Stoimenov Date: Mon, 6 Jan 2025 11:41:15 +0200 Subject: [PATCH 06/11] Tombstone All invalidation and formatting --- .../version/test/test_version_map.cpp | 53 +++++++++---------- cpp/arcticdb/version/version_map_entry.hpp | 2 - 2 files changed, 25 insertions(+), 30 deletions(-) diff --git a/cpp/arcticdb/version/test/test_version_map.cpp b/cpp/arcticdb/version/test/test_version_map.cpp index 0247a8c56c..6118093053 100644 --- a/cpp/arcticdb/version/test/test_version_map.cpp +++ b/cpp/arcticdb/version/test/test_version_map.cpp @@ -593,11 +593,13 @@ std::shared_ptr write_versions( // Produces the following version chain: v0 <- tombstone_all <- v1 <- v2 <- tombstone void write_alternating_deleted_undeleted(std::shared_ptr store, std::shared_ptr version_map, StreamId id) { using Type = VersionChainOperation::Type; - write_versions(store, version_map, id, {{Type::WRITE, 0}, - {Type::TOMBSTONE_ALL}, - {Type::WRITE, 1}, - {Type::WRITE, 2}, - {Type::TOMBSTONE, 2}}); + write_versions(store, version_map, id, { + {Type::WRITE, 0}, + {Type::TOMBSTONE_ALL}, + {Type::WRITE, 1}, + {Type::WRITE, 2}, + {Type::TOMBSTONE, 2} + }); } void write_versions(std::shared_ptr store, std::shared_ptr version_map, StreamId id, int number_of_versions) { @@ -684,6 +686,8 @@ TEST(VersionMap, FollowingVersionChainWithCaching){ // We have the full version chain loaded, so has_cached_entry should always return true (even when requesting timestamp before earliest version) EXPECT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(-1)})); EXPECT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::INCLUDE_DELETED, static_cast(-1)})); + EXPECT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::ALL, LoadObjective::INCLUDE_DELETED})); + EXPECT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::ALL, LoadObjective::UNDELETED_ONLY})); // We add a new undeleted key auto key4 = atom_key_with_version(id, 3, 5); @@ -746,12 +750,14 @@ TEST(VersionMap, HasCachedEntry) { auto version_map = std::make_shared(); StreamId id{"test"}; using Type = VersionChainOperation::Type; - std::vector version_chain = {{Type::WRITE, 0}, - {Type::WRITE, 1}, - {Type::TOMBSTONE_ALL}, - {Type::WRITE, 2}, - {Type::WRITE, 3}, - {Type::TOMBSTONE, 3}}; + std::vector version_chain = { + {Type::WRITE, 0}, + {Type::WRITE, 1}, + {Type::TOMBSTONE_ALL}, + {Type::WRITE, 2}, + {Type::WRITE, 3}, + {Type::TOMBSTONE, 3} + }; write_versions(store, version_map, id, version_chain); auto check_caching = [&](LoadStrategy to_load, LoadStrategy to_check_if_cached, bool expected_outcome){ @@ -867,15 +873,14 @@ TEST(VersionMap, CacheInvalidationWithTombstoneAllAfterLoad) { auto version_map = std::make_shared(); StreamId id{"test"}; + // Version chain is v0 <- v1 <- v2 write_versions(store, version_map, id, 3); - // Use a clean version_map - version_map = std::make_shared(); - - for (const auto& load_strategy : {LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(1)}, LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(0)}}) { + // Use a clean version_map + version_map = std::make_shared(); const bool is_loaded_to_0 = load_strategy.load_until_version_ == 0; auto entry = version_map->check_reload( store, @@ -902,19 +907,11 @@ TEST(VersionMap, CacheInvalidationWithTombstoneAllAfterLoad) { // When - we delete all versions without reloading version_map->write_tombstone_all_key_internal(store, tombstone_key, entry); - if (is_loaded_to_0) { - // If we have loaded everything we should not invalidate cache - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(2)})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)})); - } - else { - // If we haven't loaded everything we should invalidate cached undeleted checks - ASSERT_FALSE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY})); - ASSERT_FALSE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)})); - ASSERT_FALSE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)})); - } + // Tombstone All should not invalidate cache as it deletes everything so all undeleted versions have been loaded. + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(2)})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)})); + ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)})); } } diff --git a/cpp/arcticdb/version/version_map_entry.hpp b/cpp/arcticdb/version/version_map_entry.hpp index 65eaf83ed3..309f0d474c 100644 --- a/cpp/arcticdb/version/version_map_entry.hpp +++ b/cpp/arcticdb/version/version_map_entry.hpp @@ -16,8 +16,6 @@ #include #include -#include "async/tasks.hpp" - namespace arcticdb { using namespace arcticdb::entity; using namespace arcticdb::stream; From 7b94b0ccbce521efb89c71edb04a7febcddce1de Mon Sep 17 00:00:00 2001 From: Ognyan Stoimenov Date: Tue, 7 Jan 2025 15:55:13 +0200 Subject: [PATCH 07/11] More testing --- .../version/test/test_version_map.cpp | 89 ++++++++++++++----- 1 file changed, 69 insertions(+), 20 deletions(-) diff --git a/cpp/arcticdb/version/test/test_version_map.cpp b/cpp/arcticdb/version/test/test_version_map.cpp index 6118093053..eb004110ae 100644 --- a/cpp/arcticdb/version/test/test_version_map.cpp +++ b/cpp/arcticdb/version/test/test_version_map.cpp @@ -869,16 +869,28 @@ TEST(VersionMap, CacheInvalidationWithTombstoneAllAfterLoad) { // Given - symbol with 3 versions - load downto version 1 or 0 // never time-invalidate the cache so we can test our other cache invalidation logic ScopedConfig sc("VersionMap.ReloadInterval", std::numeric_limits::max()); + StreamId id{"test"}; + auto version_map = std::make_shared(); auto store = std::make_shared(); - auto version_map = std::make_shared(); - StreamId id{"test"}; - // Version chain is v0 <- v1 <- v2 - write_versions(store, version_map, id, 3); + auto validate_load_strategy = [&](const LoadStrategy& load_strategy, bool should_be_cached, int expected_cached = -1) { + if (should_be_cached) { + auto entry = version_map->check_reload(nullptr, id, load_strategy, __FUNCTION__); + ASSERT_EQ(std::ranges::count_if(entry->keys_, [](const auto& key) { return key.type() == KeyType::TABLE_INDEX;}), expected_cached); + } + else { + ASSERT_FALSE(version_map->has_cached_entry(id, load_strategy)); + } + }; for (const auto& load_strategy : {LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(1)}, - LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(0)}}) - { + LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(0)}}) { + store = std::make_shared(); + version_map = std::make_shared(); + + // Version chain is v0 <- v1 <- v2 + write_versions(store, version_map, id, 3); + // Use a clean version_map version_map = std::make_shared(); const bool is_loaded_to_0 = load_strategy.load_until_version_ == 0; @@ -887,32 +899,69 @@ TEST(VersionMap, CacheInvalidationWithTombstoneAllAfterLoad) { id, load_strategy, __FUNCTION__); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(2)})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)})); - ASSERT_EQ(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)}), is_loaded_to_0); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::DOWNTO, LoadObjective::UNDELETED_ONLY, static_cast(-1)})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::DOWNTO, LoadObjective::UNDELETED_ONLY, static_cast(-2)})); - ASSERT_EQ(version_map->has_cached_entry(id, LoadStrategy{LoadType::DOWNTO, LoadObjective::UNDELETED_ONLY, static_cast(-3)}), is_loaded_to_0); + + validate_load_strategy(LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(2)}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)}, is_loaded_to_0, is_loaded_to_0 ? 3 : 0); + validate_load_strategy(LoadStrategy{LoadType::DOWNTO, LoadObjective::UNDELETED_ONLY, static_cast(-1)}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::DOWNTO, LoadObjective::UNDELETED_ONLY, static_cast(-2)}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::DOWNTO, LoadObjective::UNDELETED_ONLY, static_cast(-3)}, is_loaded_to_0, is_loaded_to_0 ? 3 : 0); // When - we delete version 2 auto tombstone_key = version_map->write_tombstone(store, VersionId{2}, id, entry); // We should not invalidate the cache because the version we loaded to is still undeleted - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(2)})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)})); + validate_load_strategy(LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(2)}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)}, true, is_loaded_to_0 ? 3 : 2); ASSERT_EQ(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)}), is_loaded_to_0); // When - we delete all versions without reloading version_map->write_tombstone_all_key_internal(store, tombstone_key, entry); // Tombstone All should not invalidate cache as it deletes everything so all undeleted versions have been loaded. - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(2)})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)})); + validate_load_strategy(LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(2)}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)}, true, is_loaded_to_0 ? 3 : 2); + + // When - we add a new version so that tombstone all isn't the latest + auto key = atom_key_with_version(id, 5, 5); + version_map->do_write(store, key, entry); + write_symbol_ref(store, key, std::nullopt, entry->head_.value()); + + validate_load_strategy(LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY}, true, is_loaded_to_0 ? 4 : 3); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(2)}, true, is_loaded_to_0 ? 4 : 3); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)}, true, is_loaded_to_0 ? 4 : 3); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)}, true, is_loaded_to_0 ? 4 : 3); } + + // Given tombstone all isn't the latest version + // v0 <- v1 <- tombstone_all <- v2 + store = std::make_shared(); + version_map = std::make_shared(); + + using Type = VersionChainOperation::Type; + write_versions(store, version_map, id, { + {Type::WRITE, 0}, + {Type::WRITE, 1}, + {Type::TOMBSTONE_ALL, 1}, + {Type::WRITE, 2}, + }); + + version_map = std::make_shared(); + auto entry = version_map->check_reload( + store, + id, + LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY}, + __FUNCTION__); + + validate_load_strategy(LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY}, true, 1); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(2)}, true, 1); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)}, false); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)}, false); } TEST(VersionMap, CompactionUpdateCache) { From 43a948c3f5923e04cbaaf3bc61261b5b6d65a785 Mon Sep 17 00:00:00 2001 From: Ognyan Stoimenov Date: Mon, 13 Jan 2025 18:21:49 +0200 Subject: [PATCH 08/11] e2e tests --- .../version/test/test_version_map.cpp | 4 ++ .../version_store/test_basic_version_store.py | 68 +++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/cpp/arcticdb/version/test/test_version_map.cpp b/cpp/arcticdb/version/test/test_version_map.cpp index eb004110ae..186cec529d 100644 --- a/cpp/arcticdb/version/test/test_version_map.cpp +++ b/cpp/arcticdb/version/test/test_version_map.cpp @@ -557,6 +557,10 @@ struct VersionChainOperation { VersionId version_id { 0 }; }; +/** + * @param operations write operations with their specified version_id in this order. + * TOMBSTONE_ALL operation ignores version_id as the only API available is to use it with the latest version in the chain + */ std::shared_ptr write_versions( const std::shared_ptr& store, const std::shared_ptr& version_map, diff --git a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py index 92ad6e9a4c..91d3996a74 100644 --- a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py +++ b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py @@ -2704,3 +2704,71 @@ def test_missing_first_version_key_batch(basic_store): vits = lib.batch_read(symbols, as_ofs=write_times) for x in range(num_items): assert_equal(vits[symbols[x]].data, expected[x]) + +@pytest.mark.parametrize("use_caching", [True, False]) +def test_version_chain_cache(basic_store, use_caching): + timeout = sys.maxsize if use_caching else 0 + lib = basic_store + symbol = "test" + dataframes = [sample_dataframe()] * 10 + timestamps = [] + delete_versions = {3, 7, 9} + + with config_context("VersionMap.ReloadInterval", timeout): + # Write 10 versions + for i in range(10): + with distinct_timestamps(lib) as timestamp: + lib.write(symbol, dataframes[i]) + timestamps.append(timestamp) + + # Validate the most recent version + assert_equal(lib.read(symbol).data, dataframes[-1]) + + # Check reading specific versions + for i in range(10): + assert_equal(lib.read(symbol, as_of=timestamps[i].after).data, dataframes[i]) + + if i == 0: + with pytest.raises(NoSuchVersionException): + lib.read(symbol, as_of=timestamps[i].before) + else: + assert_equal(lib.read(symbol, as_of=timestamps[i].before).data, dataframes[i-1]) + + assert_equal(lib.read(symbol, as_of=i).data, dataframes[i]) + + # Ensure reading a non-existent version raises an exception + with pytest.raises(NoSuchVersionException): + lib.read(symbol, as_of=pd.Timestamp(0)) + + # Delete specific versions + for version in delete_versions: + lib.delete_version(symbol, version) + + assert_equal(lib.read(symbol).data, dataframes[-2]) + for i in range(10): + if i in delete_versions: + with pytest.raises(NoSuchVersionException): + lib.read(symbol, as_of=i) + assert_equal(lib.read(symbol, as_of=timestamps[i].after).data, dataframes[i-1]) + else: + assert_equal(lib.read(symbol, as_of=i).data, dataframes[i]) + assert_equal(lib.read(symbol, as_of=timestamps[i].after).data, dataframes[i]) + + if i == 0: + with pytest.raises(NoSuchVersionException): + lib.read(symbol, as_of=timestamps[i].before) + else: + assert_equal(lib.read(symbol, as_of=timestamps[i].before).data, dataframes[i-1]) + + with pytest.raises(NoSuchVersionException): + lib.read(symbol, as_of=pd.Timestamp(0)) + + # Delete all versions and ensure all versions are no longer accessible + lib.delete(symbol) + for i in range(10): + with pytest.raises(NoSuchVersionException): + lib.read(symbol, as_of=timestamps[i].after) + with pytest.raises(NoSuchVersionException): + lib.read(symbol, as_of=timestamps[i].before) + with pytest.raises(NoSuchVersionException): + lib.read(symbol, as_of=i) From 8182191a63861be555ab816f5654534c7fb88ba5 Mon Sep 17 00:00:00 2001 From: Ognyan Stoimenov Date: Tue, 14 Jan 2025 11:17:41 +0200 Subject: [PATCH 09/11] Tombstone all not latest --- cpp/arcticdb/version/test/test_version_map.cpp | 16 ++++++++-------- .../version_store/test_basic_version_store.py | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cpp/arcticdb/version/test/test_version_map.cpp b/cpp/arcticdb/version/test/test_version_map.cpp index 186cec529d..bac3032ba3 100644 --- a/cpp/arcticdb/version/test/test_version_map.cpp +++ b/cpp/arcticdb/version/test/test_version_map.cpp @@ -943,17 +943,15 @@ TEST(VersionMap, CacheInvalidationWithTombstoneAllAfterLoad) { } // Given tombstone all isn't the latest version - // v0 <- v1 <- tombstone_all <- v2 + // v0 <- v1 <- v2 <- Tombstone_all(v1) store = std::make_shared(); version_map = std::make_shared(); + write_versions(store, version_map, id, 3); - using Type = VersionChainOperation::Type; - write_versions(store, version_map, id, { - {Type::WRITE, 0}, - {Type::WRITE, 1}, - {Type::TOMBSTONE_ALL, 1}, - {Type::WRITE, 2}, - }); + auto key = atom_key_builder() + .version_id(1) + .build(id, KeyType::VERSION); + version_map->tombstone_from_key_or_all(store, id, key); version_map = std::make_shared(); auto entry = version_map->check_reload( @@ -966,6 +964,8 @@ TEST(VersionMap, CacheInvalidationWithTombstoneAllAfterLoad) { validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(2)}, true, 1); validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)}, false); validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)}, false); + validate_load_strategy(LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(1)}, false); + validate_load_strategy(LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(0)}, false); } TEST(VersionMap, CompactionUpdateCache) { diff --git a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py index 91d3996a74..4171df177c 100644 --- a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py +++ b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py @@ -2763,7 +2763,7 @@ def test_version_chain_cache(basic_store, use_caching): with pytest.raises(NoSuchVersionException): lib.read(symbol, as_of=pd.Timestamp(0)) - # Delete all versions and ensure all versions are no longer accessible + # Delete all versions lib.delete(symbol) for i in range(10): with pytest.raises(NoSuchVersionException): From 9343cc601cee1e6ca016ff9778a852340678878e Mon Sep 17 00:00:00 2001 From: Ognyan Stoimenov Date: Tue, 14 Jan 2025 17:05:14 +0200 Subject: [PATCH 10/11] Fix python test --- .../version/test/test_version_map.cpp | 5 +- .../version_store/test_basic_version_store.py | 75 ++++++++++--------- 2 files changed, 41 insertions(+), 39 deletions(-) diff --git a/cpp/arcticdb/version/test/test_version_map.cpp b/cpp/arcticdb/version/test/test_version_map.cpp index bac3032ba3..42468aab51 100644 --- a/cpp/arcticdb/version/test/test_version_map.cpp +++ b/cpp/arcticdb/version/test/test_version_map.cpp @@ -874,11 +874,12 @@ TEST(VersionMap, CacheInvalidationWithTombstoneAllAfterLoad) { // never time-invalidate the cache so we can test our other cache invalidation logic ScopedConfig sc("VersionMap.ReloadInterval", std::numeric_limits::max()); StreamId id{"test"}; - auto version_map = std::make_shared(); - auto store = std::make_shared(); + std::shared_ptr version_map; + std::shared_ptr store; auto validate_load_strategy = [&](const LoadStrategy& load_strategy, bool should_be_cached, int expected_cached = -1) { if (should_be_cached) { + // Store is nullptr as we shouldn't go to storage auto entry = version_map->check_reload(nullptr, id, load_strategy, __FUNCTION__); ASSERT_EQ(std::ranges::count_if(entry->keys_, [](const auto& key) { return key.type() == KeyType::TABLE_INDEX;}), expected_cached); } diff --git a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py index 4171df177c..d654d91405 100644 --- a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py +++ b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py @@ -2710,13 +2710,40 @@ def test_version_chain_cache(basic_store, use_caching): timeout = sys.maxsize if use_caching else 0 lib = basic_store symbol = "test" - dataframes = [sample_dataframe()] * 10 + # Will write 10 versions + num_of_versions = 10 + dataframes = [sample_dataframe() for _ in range(num_of_versions)] timestamps = [] - delete_versions = {3, 7, 9} + + def assert_correct_dataframe(timestamp_and_version_index, deleted_versions): + # Version + version_index = timestamp_and_version_index + if i in deleted_versions: + with pytest.raises(NoSuchVersionException): + lib.read(symbol, as_of=version_index) + else: + assert_equal(lib.read(symbol, as_of=version_index).data, dataframes[i]) + + # Timestamp + timestamp_index = timestamp_and_version_index + def find_expected_version(first_to_check): + for num in range(first_to_check, -1, -1): + if num not in deleted_versions: + return num + return None + + for timestamp, is_before in [(timestamps[timestamp_index].before, True), (timestamps[timestamp_index].after, False)]: + first_version_to_check = timestamp_index - 1 if is_before else timestamp_index + expected_version_to_find = find_expected_version(first_version_to_check) + if expected_version_to_find is None: + with pytest.raises(NoSuchVersionException): + lib.read(symbol, as_of=timestamp) + else: + assert_frame_equal(lib.read(symbol, as_of=timestamp).data, dataframes[expected_version_to_find]) with config_context("VersionMap.ReloadInterval", timeout): - # Write 10 versions - for i in range(10): + # Write versions and keep track of time before and after writing + for i in range(num_of_versions): with distinct_timestamps(lib) as timestamp: lib.write(symbol, dataframes[i]) timestamps.append(timestamp) @@ -2725,50 +2752,24 @@ def test_version_chain_cache(basic_store, use_caching): assert_equal(lib.read(symbol).data, dataframes[-1]) # Check reading specific versions - for i in range(10): - assert_equal(lib.read(symbol, as_of=timestamps[i].after).data, dataframes[i]) - - if i == 0: - with pytest.raises(NoSuchVersionException): - lib.read(symbol, as_of=timestamps[i].before) - else: - assert_equal(lib.read(symbol, as_of=timestamps[i].before).data, dataframes[i-1]) - - assert_equal(lib.read(symbol, as_of=i).data, dataframes[i]) + for i in range(num_of_versions): + assert_correct_dataframe(i, {}) # Ensure reading a non-existent version raises an exception with pytest.raises(NoSuchVersionException): lib.read(symbol, as_of=pd.Timestamp(0)) # Delete specific versions + delete_versions = {1, 3, 7, 9} for version in delete_versions: lib.delete_version(symbol, version) - - assert_equal(lib.read(symbol).data, dataframes[-2]) - for i in range(10): - if i in delete_versions: - with pytest.raises(NoSuchVersionException): - lib.read(symbol, as_of=i) - assert_equal(lib.read(symbol, as_of=timestamps[i].after).data, dataframes[i-1]) - else: - assert_equal(lib.read(symbol, as_of=i).data, dataframes[i]) - assert_equal(lib.read(symbol, as_of=timestamps[i].after).data, dataframes[i]) - - if i == 0: - with pytest.raises(NoSuchVersionException): - lib.read(symbol, as_of=timestamps[i].before) - else: - assert_equal(lib.read(symbol, as_of=timestamps[i].before).data, dataframes[i-1]) + for i in range(num_of_versions): + assert_correct_dataframe(i, delete_versions) with pytest.raises(NoSuchVersionException): lib.read(symbol, as_of=pd.Timestamp(0)) # Delete all versions lib.delete(symbol) - for i in range(10): - with pytest.raises(NoSuchVersionException): - lib.read(symbol, as_of=timestamps[i].after) - with pytest.raises(NoSuchVersionException): - lib.read(symbol, as_of=timestamps[i].before) - with pytest.raises(NoSuchVersionException): - lib.read(symbol, as_of=i) + for i in range(num_of_versions): + assert_correct_dataframe(i, set(range(num_of_versions))) \ No newline at end of file From 1ad31570a49a5cf2781a985d3016e805faab01a8 Mon Sep 17 00:00:00 2001 From: Ognyan Stoimenov Date: Tue, 14 Jan 2025 18:06:16 +0200 Subject: [PATCH 11/11] Cpp test improvement --- .../version/test/test_version_map.cpp | 30 +++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/cpp/arcticdb/version/test/test_version_map.cpp b/cpp/arcticdb/version/test/test_version_map.cpp index 42468aab51..b27fb2eda0 100644 --- a/cpp/arcticdb/version/test/test_version_map.cpp +++ b/cpp/arcticdb/version/test/test_version_map.cpp @@ -554,12 +554,11 @@ struct VersionChainOperation { TOMBSTONE_ALL } type {Type::WRITE}; - VersionId version_id { 0 }; + std::optional version_id { std::nullopt }; }; /** * @param operations write operations with their specified version_id in this order. - * TOMBSTONE_ALL operation ignores version_id as the only API available is to use it with the latest version in the chain */ std::shared_ptr write_versions( const std::shared_ptr& store, @@ -572,20 +571,26 @@ std::shared_ptr write_versions( LoadStrategy{LoadType::NOT_LOADED, LoadObjective::INCLUDE_DELETED}, __FUNCTION__); - for (const auto& [type, version_id]: operations) { + for (const auto& [type, version_id_opt]: operations) { switch (type) { case VersionChainOperation::Type::WRITE: { - auto key = atom_key_with_version(id, version_id, version_id); + auto key = atom_key_with_version(id, *version_id_opt, *version_id_opt); version_map->do_write(store, key, entry); write_symbol_ref(store, key, std::nullopt, entry->head_.value()); break; } case VersionChainOperation::Type::TOMBSTONE: { - version_map->write_tombstone(store, version_id, id, entry); + version_map->write_tombstone(store, *version_id_opt, id, entry); break; } case VersionChainOperation::Type::TOMBSTONE_ALL: { - version_map->delete_all_versions(store, id); + std::optional key = std::nullopt; + if (version_id_opt.has_value()) { + key = atom_key_builder() + .version_id(*version_id_opt) + .build(id, KeyType::VERSION); + } + version_map->tombstone_from_key_or_all(store, id, key); break; } } @@ -947,12 +952,13 @@ TEST(VersionMap, CacheInvalidationWithTombstoneAllAfterLoad) { // v0 <- v1 <- v2 <- Tombstone_all(v1) store = std::make_shared(); version_map = std::make_shared(); - write_versions(store, version_map, id, 3); - - auto key = atom_key_builder() - .version_id(1) - .build(id, KeyType::VERSION); - version_map->tombstone_from_key_or_all(store, id, key); + using Type = VersionChainOperation::Type; + write_versions(store, version_map, id, { + {Type::WRITE, 0}, + {Type::WRITE, 1}, + {Type::WRITE, 2}, + {Type::TOMBSTONE, 1} + }); version_map = std::make_shared(); auto entry = version_map->check_reload(