Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.3.x] [CORE-7257] cloud_storage: Remove assertion in remote_segment #24292

Open
wants to merge 2 commits into
base: v24.3.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/v/cloud_storage/cache_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include <set>
#include <string_view>

struct cloud_storage_fixture;

namespace cloud_storage {

// These timeout/backoff settings are for S3 requests
Expand Down Expand Up @@ -360,6 +362,7 @@ class cache
ss::condition_variable _block_puts_cond;

friend class cache_test_fixture;
friend struct ::cloud_storage_fixture;

// List of probable deletion candidates from the last trim.
std::optional<fragmented_vector<file_list_item>> _last_trim_carryover;
Expand Down
15 changes: 14 additions & 1 deletion src/v/cloud_storage/remote_segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "cloud_storage/remote_segment.h"

#include "base/vlog.h"
#include "bytes/iobuf.h"
#include "bytes/iostream.h"
#include "cloud_storage/cache_service.h"
Expand Down Expand Up @@ -37,6 +38,7 @@
#include <seastar/core/abort_source.hh>
#include <seastar/core/circular_buffer.hh>
#include <seastar/core/fstream.hh>
#include <seastar/core/future.hh>
#include <seastar/core/io_priority_class.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/lowres_clock.hh>
Expand Down Expand Up @@ -1663,7 +1665,18 @@ ss::future<> hydration_loop_state::hydrate(size_t wait_list_size) {
fs.push_back(state.hydrate_action());
break;
case cache_element_status::in_progress:
vassert(false, "{} is already in progress", state.path);
// Ths means that we have two remote_segment instances running
// in parallel. This is possible in case of extreme contention
// when the materialized segment gets evicted and then
// materialized again. The underlying cache service is a global
// state that all instances of the 'remote_segment' share.
vlog(_ctxlog.warn, "{} is already in progress", state.path);
fs.push_back(
ss::make_exception_future<>(std::runtime_error(fmt_with_ctx(
fmt::format,
"Concurrency violation. {} is already in progress.",
state.path))));
break;
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/v/cloud_storage/tests/cloud_storage_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ struct cloud_storage_fixture : s3_imposter_fixture {
cloud_storage_fixture(cloud_storage_fixture&&) = delete;
cloud_storage_fixture operator=(cloud_storage_fixture&&) = delete;

void mark_as_in_progress(remote_segment_path path) {
cache.local()._files_in_progress.insert(path);
}

ss::tmp_dir tmp_directory;
ss::sharded<cloud_storage::cache> cache;
ss::sharded<cloud_storage_clients::client_pool> pool;
Expand Down
55 changes: 55 additions & 0 deletions src/v/cloud_storage/tests/remote_segment_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "base/seastarx.h"
#include "bytes/iobuf.h"
#include "bytes/iostream.h"
#include "cloud_storage/cache_service.h"
#include "cloud_storage/download_exception.h"
#include "cloud_storage/materialized_resources.h"
#include "cloud_storage/partition_manifest.h"
Expand Down Expand Up @@ -42,6 +43,8 @@
#include <boost/test/tools/old/interface.hpp>
#include <boost/test/unit_test.hpp>

#include <stdexcept>

using namespace std::chrono_literals;
using namespace cloud_storage;

Expand Down Expand Up @@ -503,3 +506,55 @@ FIXTURE_TEST(
reader.stop().get();
segment->stop().get();
}

FIXTURE_TEST(
test_remote_segment_concurrent_download, cloud_storage_fixture) { // NOLINT
auto conf = get_configuration();
partition_manifest m(manifest_ntp, manifest_revision);
model::initial_revision_id segment_ntp_revision{777};
iobuf segment_bytes = generate_segment(model::offset(1), 20);
uint64_t clen = segment_bytes.size_bytes();
auto reset_stream = make_reset_fn(segment_bytes);
retry_chain_node fib(never_abort, 1000ms, 200ms);
partition_manifest::segment_meta meta{
.is_compacted = false,
.size_bytes = segment_bytes.size_bytes(),
.base_offset = model::offset(1),
.committed_offset = model::offset(20),
.base_timestamp = {},
.max_timestamp = {},
.delta_offset = model::offset_delta(0),
.ntp_revision = segment_ntp_revision,
.sname_format = segment_name_format::v2};
auto path = m.generate_segment_path(meta, path_provider);
set_expectations_and_listen({});
auto upl_res
= api.local()
.upload_segment(
bucket_name, path, clen, reset_stream, fib, always_continue)
.get();
BOOST_REQUIRE(upl_res == upload_result::success);
m.add(meta);

partition_probe probe{manifest_ntp};
auto& ts_probe = api.local().materialized().get_read_path_probe();

auto name = m.generate_segment_path(meta, path_provider);
mark_as_in_progress(name);
remote_segment segment(
api.local(),
cache.local(),
bucket_name,
name,
m.get_ntp(),
meta,
fib,
probe,
ts_probe);

auto d = ss::defer([&segment] { segment.stop().get(); });

BOOST_REQUIRE_THROW(
segment.data_stream(0, ss::default_priority_class()).get(),
std::runtime_error);
}
Loading