Skip to content

Commit

Permalink
Merge pull request #14952 from vbotbuildovich/backport-pr-14821-v23.2…
Browse files Browse the repository at this point in the history
….x-713

[v23.2.x] cloud_storage: Add client address to fetch and log reader config
  • Loading branch information
piyushredpanda authored Nov 14, 2023
2 parents 8d33fbc + 245955f commit 315cb04
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 19 deletions.
35 changes: 30 additions & 5 deletions src/v/cloud_storage/remote_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,16 @@ class partition_record_batch_reader_impl final
_reader = {};
}
}

vlog(
_ctxlog.trace,
"partition_record_batch_reader_impl:: start: creating cursor: {}",
config);
co_await init_cursor(config);
vlog(
_ctxlog.trace,
"partition_record_batch_reader_impl:: start: created cursor: {}",
config);
_partition->_ts_probe.reader_created();
}

Expand Down Expand Up @@ -307,7 +316,10 @@ class partition_record_batch_reader_impl final
co_return storage_t{};
}
if (_reader->config().over_budget) {
vlog(_ctxlog.debug, "We're over-budget, stopping");
vlog(
_ctxlog.debug,
"We're over-budget, stopping, config: {}",
_reader->config());
// We need to stop in such way that will keep the
// reader in the reusable state, so we could reuse
// it on next iteration
Expand Down Expand Up @@ -558,7 +570,9 @@ class partition_record_batch_reader_impl final
segment_reader_units segment_reader_unit) {
vlog(
_ctxlog.debug,
"partition_record_batch_reader_impl initialize reader state");
"partition_record_batch_reader_impl initialize reader state, config: "
"{}",
config);
auto [reader, next_offset] = find_cached_reader(
manifest,
config,
Expand All @@ -573,7 +587,8 @@ class partition_record_batch_reader_impl final
_ctxlog.debug,
"partition_record_batch_reader_impl initialize reader state - "
"segment not "
"found");
"found, config: {}",
config);
_reader = {};
_next_segment_base_offset = {};
}
Expand Down Expand Up @@ -1082,14 +1097,23 @@ ss::future<storage::translating_reader> remote_partition::make_reader(
std::ignore = deadline;
vlog(
_ctxlog.debug,
"remote partition make_reader invoked, config: {}, num segments {}",
"remote partition make_reader invoked (waiting for units), config: {}, "
"num segments {}",
config,
_segments.size());

auto units = co_await _api.materialized().get_partition_reader_units(
1, config.abort_source);
auto ot_state = ss::make_lw_shared<storage::offset_translator_state>(
get_ntp());

vlog(
_ctxlog.trace,
"remote partition make_reader invoked (units acquired), config: {}, "
"num segments {}",
config,
_segments.size());

auto impl = std::make_unique<partition_record_batch_reader_impl>(
shared_from_this(), ot_state, std::move(units));
co_await impl->start(config);
Expand All @@ -1116,7 +1140,8 @@ remote_partition::timequery(storage::timequery_config cfg) {
cfg.prio,
cfg.type_filter,
cfg.time,
cfg.abort_source);
cfg.abort_source,
cfg.client_address);

// Construct a reader that will skip to the requested timestamp
// by virtue of log_reader_config::start_timestamp
Expand Down
18 changes: 14 additions & 4 deletions src/v/cloud_storage/remote_segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1338,11 +1338,12 @@ remote_segment_batch_reader::read_some(
const auto msg = fmt::format(
"segment_reader is stuck, segment ntp: {}, _cur_rp_offset: "
"{}, "
"_bytes_consumed: {}, parser error state: {}",
"_bytes_consumed: {}, parser error state: {}, client: {}",
_seg->get_ntp(),
_cur_rp_offset,
_bytes_consumed,
_parser->error());
_parser->error(),
_config.client_address);
if (_parser->error() == storage::parser_errc::end_of_stream) {
vlog(_ctxlog.info, "{}", msg);
} else {
Expand All @@ -1362,15 +1363,24 @@ remote_segment_batch_reader::init_parser() {
ss::gate::holder h(_gate);
vlog(
_ctxlog.debug,
"remote_segment_batch_reader::init_parser, start_offset: {}",
_config.start_offset);
"remote_segment_batch_reader::init_parser (creating stream), "
"start_offset: {}, client: {}",
_config.start_offset,
_config.client_address);

auto stream_off = co_await _seg->offset_data_stream(
model::offset_cast(_config.start_offset),
model::offset_cast(_config.max_offset),
_config.first_timestamp,
priority_manager::local().shadow_indexing_priority());

vlog(
_ctxlog.debug,
"remote_segment_batch_reader::init_parser (stream created), "
"start_offset: {}, client: {}",
_config.start_offset,
_config.client_address);

auto parser = std::make_unique<storage::continuous_batch_parser>(
std::make_unique<remote_segment_batch_consumer>(
_config, *this, _seg->get_term(), _seg->get_ntp(), _rtc),
Expand Down
8 changes: 7 additions & 1 deletion src/v/kafka/server/connection_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ class connection_context final
.finally([this]() {
vlog(
klog.debug,
"Connection input_shutdown; aborting operations");
"Connection input_shutdown; aborting operations for {}",
conn->addr);
return _as.request_abort_ex(std::system_error(
std::make_error_code(std::errc::connection_aborted)));
})
Expand All @@ -149,11 +150,16 @@ class connection_context final

ss::future<> stop() {
if (conn) {
vlog(klog.trace, "stopping connection context for {}", conn->addr);
conn->shutdown_input();
}
co_await _wait_input_shutdown.get_future();
co_await _as.request_abort_ex(ssx::connection_aborted_exception{});
co_await _as.stop();

if (conn) {
vlog(klog.trace, "stopped connection context for {}", conn->addr);
}
}

/// The instance of \ref kafka::server on the shard serving the connection
Expand Down
9 changes: 8 additions & 1 deletion src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ static ss::future<read_result> read_from_partition(
std::nullopt,
config.abort_source.has_value()
? config.abort_source.value().get().local()
: storage::opt_abort_source_t{});
: storage::opt_abort_source_t{},
config.client_address);

reader_config.strict_max_bytes = config.strict_max_bytes;
auto rdr = co_await part.make_reader(reader_config);
Expand Down Expand Up @@ -790,6 +791,11 @@ class simple_fetch_planner final : public fetch_planner::impl {
bytes_left_in_plan -= max_bytes;
}

const auto client_address = fmt::format(
"{}:{}",
octx.rctx.connection()->client_host(),
octx.rctx.connection()->client_port());

fetch_config config{
.start_offset = fp.fetch_offset,
.max_offset = model::model_limits<model::offset>::max(),
Expand All @@ -802,6 +808,7 @@ class simple_fetch_planner final : public fetch_planner::impl {
.read_from_follower = octx.request.has_rack_id(),
.consumer_rack_id = octx.request.data.rack_id,
.abort_source = octx.rctx.abort_source(),
.client_address = model::client_address_t{client_address},
};

plan.fetches_per_shard[*shard].push_back({tp, config}, &(*resp_it));
Expand Down
6 changes: 4 additions & 2 deletions src/v/kafka/server/handlers/fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,12 @@ struct fetch_config {
std::optional<model::rack_id> consumer_rack_id;
std::optional<std::reference_wrapper<ssx::sharded_abort_source>>
abort_source;
std::optional<model::client_address_t> client_address;

friend std::ostream& operator<<(std::ostream& o, const fetch_config& cfg) {
fmt::print(
o,
R"({{"start_offset": {}, "max_offset": {}, "isolation_lvl": {}, "max_bytes": {}, "strict_max_bytes": {}, "skip_read": {}, "current_leader_epoch:" {}, "follower_read:" {}, "consumer_rack_id": {}, "abortable": {}, "aborted": {}}})",
R"({{"start_offset": {}, "max_offset": {}, "isolation_lvl": {}, "max_bytes": {}, "strict_max_bytes": {}, "skip_read": {}, "current_leader_epoch:" {}, "follower_read:" {}, "consumer_rack_id": {}, "abortable": {}, "aborted": {}, "client_address": {}}})",
cfg.start_offset,
cfg.max_offset,
cfg.isolation_level,
Expand All @@ -187,7 +188,8 @@ struct fetch_config {
cfg.abort_source.has_value(),
cfg.abort_source.has_value()
? cfg.abort_source.value().get().abort_requested()
: false);
: false,
cfg.client_address.value_or(model::client_address_t{"unknown"}));
return o;
}
};
Expand Down
2 changes: 2 additions & 0 deletions src/v/model/fundamental.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ static_assert(

std::ostream& operator<<(std::ostream&, const shadow_indexing_mode&);

using client_address_t = named_type<ss::sstring, struct client_address_tag>;

} // namespace model

namespace kafka {
Expand Down
1 change: 1 addition & 0 deletions src/v/net/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ ss::future<> server::apply_proto(
[this, conn, cq_units = std::move(cq_units)](ss::future<> f) {
print_exceptional_future(
std::move(f), "applying protocol", conn->addr);
vlog(_log.trace, "shutting down connection {}", conn->addr);
return conn->shutdown().then_wrapped(
[this, addr = conn->addr](ss::future<> f) {
print_exceptional_future(std::move(f), "shutting down", addr);
Expand Down
4 changes: 4 additions & 0 deletions src/v/storage/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ std::ostream& operator<<(std::ostream& o, const log_reader_config& cfg) {
<< (cfg.abort_source.has_value()
? cfg.abort_source.value().get().abort_requested()
: false);

if (cfg.client_address.has_value()) {
o << ", client_address:" << cfg.client_address.value();
}
return o << "}";
}

Expand Down
23 changes: 17 additions & 6 deletions src/v/storage/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,23 +221,28 @@ struct append_result {
using opt_abort_source_t
= std::optional<std::reference_wrapper<ss::abort_source>>;

using opt_client_address_t = std::optional<model::client_address_t>;

struct timequery_config {
timequery_config(
model::timestamp t,
model::offset o,
ss::io_priority_class iop,
std::optional<model::record_batch_type> type_filter,
opt_abort_source_t as = std::nullopt) noexcept
opt_abort_source_t as = std::nullopt,
opt_client_address_t client_addr = std::nullopt) noexcept
: time(t)
, max_offset(o)
, prio(iop)
, type_filter(type_filter)
, abort_source(as) {}
, abort_source(as)
, client_address(std::move(client_addr)) {}
model::timestamp time;
model::offset max_offset;
ss::io_priority_class prio;
std::optional<model::record_batch_type> type_filter;
opt_abort_source_t abort_source;
opt_client_address_t client_address;

friend std::ostream& operator<<(std::ostream& o, const timequery_config&);
};
Expand Down Expand Up @@ -325,6 +330,8 @@ struct log_reader_config {
// historical read-once workloads like compaction).
bool skip_batch_cache{false};

opt_client_address_t client_address;

log_reader_config(
model::offset start_offset,
model::offset max_offset,
Expand All @@ -333,15 +340,17 @@ struct log_reader_config {
ss::io_priority_class prio,
std::optional<model::record_batch_type> type_filter,
std::optional<model::timestamp> time,
opt_abort_source_t as)
opt_abort_source_t as,
opt_client_address_t client_addr = std::nullopt)
: start_offset(start_offset)
, max_offset(max_offset)
, min_bytes(min_bytes)
, max_bytes(max_bytes)
, prio(prio)
, type_filter(type_filter)
, first_timestamp(time)
, abort_source(as) {}
, abort_source(as)
, client_address(std::move(client_addr)) {}

/**
* Read offsets [start, end].
Expand All @@ -350,7 +359,8 @@ struct log_reader_config {
model::offset start_offset,
model::offset max_offset,
ss::io_priority_class prio,
opt_abort_source_t as = std::nullopt)
opt_abort_source_t as = std::nullopt,
opt_client_address_t client_addr = std::nullopt)
: log_reader_config(
start_offset,
max_offset,
Expand All @@ -359,7 +369,8 @@ struct log_reader_config {
prio,
std::nullopt,
std::nullopt,
as) {}
as,
std::move(client_addr)) {}

friend std::ostream& operator<<(std::ostream& o, const log_reader_config&);
};
Expand Down

0 comments on commit 315cb04

Please sign in to comment.