Skip to content

Commit

Permalink
Merge pull request ceph#60202 from athanatos/sjust/wip-crimson-io
Browse files Browse the repository at this point in the history
crimson: replace do_osd_ops* with simpler, more general mechanism

Reviewed-by: Yingxin Cheng <[email protected]>
Reviewed-by: Matan Breizman <[email protected]>
  • Loading branch information
athanatos authored Oct 17, 2024
2 parents 2966f22 + 2b562b6 commit 93cbc92
Show file tree
Hide file tree
Showing 16 changed files with 404 additions and 523 deletions.
4 changes: 2 additions & 2 deletions src/crimson/common/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ static inline seastar::log_level to_log_level(int level) {
#define SUBLOGDPP(subname_, level_, MSG, dpp, ...) \
LOGGER(subname_).log(level_, "{} {}: " MSG, dpp, FNAME , ##__VA_ARGS__)
#define SUBLOGDPPI(subname_, level_, MSG, dpp, ...) \
LOGGER(subname_).log(level_, "{} {}: " MSG, \
LOGGER(subname_).log(level_, "{} {} {}: " MSG, \
interruptor::get_interrupt_cond(), dpp, FNAME , ##__VA_ARGS__)
#define SUBTRACEDPP(subname_, ...) SUBLOGDPP(subname_, seastar::log_level::trace, __VA_ARGS__)
#define SUBTRACEDPPI(subname_, ...) SUBLOGDPPI(subname_, seastar::log_level::trace, __VA_ARGS__)
Expand All @@ -106,7 +106,7 @@ static inline seastar::log_level to_log_level(int level) {
#define LOGDPP(level_, MSG, dpp, ...) \
LOCAL_LOGGER.log(level_, "{} {}: " MSG, dpp, FNAME , ##__VA_ARGS__)
#define LOGDPPI(level_, MSG, dpp, ...) \
LOCAL_LOGGER.log(level_, "{} {}: " MSG, \
LOCAL_LOGGER.log(level_, "{} {} {}: " MSG, \
interruptor::get_interrupt_cond(), dpp, FNAME , ##__VA_ARGS__)
#define TRACEDPP(...) LOGDPP(seastar::log_level::trace, __VA_ARGS__)
#define TRACEDPPI(...) LOGDPPI(seastar::log_level::trace, __VA_ARGS__)
Expand Down
54 changes: 32 additions & 22 deletions src/crimson/osd/ops_executer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ OpsExecuter::list_snaps_iertr::future<> OpsExecuter::do_list_snaps(
auto p = ss.clone_snaps.find(clone);
if (p == ss.clone_snaps.end()) {
logger().error(
"OpsExecutor::do_list_snaps: {} has inconsistent "
"OpsExecuter::do_list_snaps: {} has inconsistent "
"clone_snaps, missing clone {}",
os.oi.soid,
clone);
Expand All @@ -518,7 +518,7 @@ OpsExecuter::list_snaps_iertr::future<> OpsExecuter::do_list_snaps(
auto p = ss.clone_overlap.find(clone);
if (p == ss.clone_overlap.end()) {
logger().error(
"OpsExecutor::do_list_snaps: {} has inconsistent "
"OpsExecuter::do_list_snaps: {} has inconsistent "
"clone_overlap, missing clone {}",
os.oi.soid,
clone);
Expand All @@ -532,7 +532,7 @@ OpsExecuter::list_snaps_iertr::future<> OpsExecuter::do_list_snaps(
auto p = ss.clone_size.find(clone);
if (p == ss.clone_size.end()) {
logger().error(
"OpsExecutor::do_list_snaps: {} has inconsistent "
"OpsExecuter::do_list_snaps: {} has inconsistent "
"clone_size, missing clone {}",
os.oi.soid,
clone);
Expand All @@ -551,7 +551,7 @@ OpsExecuter::list_snaps_iertr::future<> OpsExecuter::do_list_snaps(
}
resp.seq = ss.seq;
logger().error(
"OpsExecutor::do_list_snaps: {}, resp.clones.size(): {}",
"OpsExecuter::do_list_snaps: {}, resp.clones.size(): {}",
os.oi.soid,
resp.clones.size());
resp.encode(osd_op.outdata);
Expand Down Expand Up @@ -678,16 +678,32 @@ OpsExecuter::do_execute_op(OSDOp& osd_op)
whiteout = true;
}
return do_write_op([this, whiteout](auto& backend, auto& os, auto& txn) {
int num_bytes = 0;
// Calculate num_bytes to be removed
if (obc->obs.oi.soid.is_snap()) {
ceph_assert(obc->ssc->snapset.clone_overlap.count(obc->obs.oi.soid.snap));
num_bytes = obc->ssc->snapset.get_clone_bytes(obc->obs.oi.soid.snap);
} else {
num_bytes = obc->obs.oi.size;
}
return backend.remove(os, txn, *osd_op_params,
delta_stats, whiteout, num_bytes);
struct emptyctx_t {};
return with_effect_on_obc(
emptyctx_t{},
[&](auto &ctx) {
int num_bytes = 0;
// Calculate num_bytes to be removed
if (obc->obs.oi.soid.is_snap()) {
ceph_assert(obc->ssc->snapset.clone_overlap.count(
obc->obs.oi.soid.snap));
num_bytes = obc->ssc->snapset.get_clone_bytes(
obc->obs.oi.soid.snap);
} else {
num_bytes = obc->obs.oi.size;
}
return backend.remove(os, txn, *osd_op_params,
delta_stats, whiteout, num_bytes);
},
[](auto &&ctx, ObjectContextRef obc, Ref<PG>) {
return seastar::do_for_each(
obc->watchers,
[](auto &p) { return p.second->remove(); }
).then([obc] {
obc->watchers.clear();
return seastar::now();
});
});
});
}
case CEPH_OSD_OP_CALL:
Expand Down Expand Up @@ -957,15 +973,14 @@ void OpsExecuter::CloningContext::apply_to(
processed_obc.ssc->snapset = std::move(new_snapset);
}

OpsExecuter::interruptible_future<std::vector<pg_log_entry_t>>
std::vector<pg_log_entry_t>
OpsExecuter::flush_clone_metadata(
std::vector<pg_log_entry_t>&& log_entries,
SnapMapper& snap_mapper,
OSDriver& osdriver,
ceph::os::Transaction& txn)
{
assert(!txn.empty());
auto maybe_snap_mapped = interruptor::now();
update_clone_overlap();
if (cloning_ctx) {
std::move(*cloning_ctx).apply_to(log_entries, *obc);
Expand All @@ -977,12 +992,7 @@ OpsExecuter::flush_clone_metadata(
}
logger().debug("{} done, initial snapset={}, new snapset={}",
__func__, obc->obs.oi.soid, obc->ssc->snapset);
return std::move(
maybe_snap_mapped
).then_interruptible([log_entries=std::move(log_entries)]() mutable {
return interruptor::make_ready_future<std::vector<pg_log_entry_t>>(
std::move(log_entries));
});
return std::move(log_entries);
}

ObjectContextRef OpsExecuter::prepare_clone(
Expand Down
44 changes: 18 additions & 26 deletions src/crimson/osd/ops_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace crimson::osd {
class PG;

// OpsExecuter -- a class for executing ops targeting a certain object.
class OpsExecuter : public seastar::enable_lw_shared_from_this<OpsExecuter> {
class OpsExecuter {
friend class SnapTrimObjSubEvent;

using call_errorator = crimson::errorator<
Expand Down Expand Up @@ -170,16 +170,12 @@ class OpsExecuter : public seastar::enable_lw_shared_from_this<OpsExecuter> {

object_stat_sum_t delta_stats;
private:
// an operation can be divided into two stages: main and effect-exposing
// one. The former is performed immediately on call to `do_osd_op()` while
// the later on `submit_changes()` – after successfully processing main
// stages of all involved operations. When any stage fails, none of all
// scheduled effect-exposing stages will be executed.
// when operation requires this division, some variant of `with_effect()`
// should be used.
// with_effect can be used to schedule operations to be performed
// at commit time. effects will be discarded if the operation does
// not commit.
struct effect_t {
// an effect can affect PG, i.e. create a watch timeout
virtual osd_op_errorator::future<> execute(Ref<PG> pg) = 0;
virtual seastar::future<> execute(Ref<PG> pg) = 0;
virtual ~effect_t() = default;
};

Expand Down Expand Up @@ -213,10 +209,10 @@ class OpsExecuter : public seastar::enable_lw_shared_from_this<OpsExecuter> {
* execute_clone
*
* If snapc contains a snap which occurred logically after the last write
* seen by this object (see OpsExecutor::should_clone()), we first need
* seen by this object (see OpsExecuter::should_clone()), we first need
* make a clone of the object at its current state. execute_clone primes
* txn with that clone operation and returns an
* OpsExecutor::CloningContext which will allow us to fill in the corresponding
* OpsExecuter::CloningContext which will allow us to fill in the corresponding
* metadata and log_entries once the operations have been processed.
*
* Note that this strategy differs from classic, which instead performs this
Expand Down Expand Up @@ -267,7 +263,7 @@ class OpsExecuter : public seastar::enable_lw_shared_from_this<OpsExecuter> {
*/
void update_clone_overlap();

interruptible_future<std::vector<pg_log_entry_t>> flush_clone_metadata(
std::vector<pg_log_entry_t> flush_clone_metadata(
std::vector<pg_log_entry_t>&& log_entries,
SnapMapper& snap_mapper,
OSDriver& osdriver,
Expand Down Expand Up @@ -400,7 +396,7 @@ class OpsExecuter : public seastar::enable_lw_shared_from_this<OpsExecuter> {
execute_op(OSDOp& osd_op);

using rep_op_fut_tuple =
std::tuple<interruptible_future<>, osd_op_ierrorator::future<>>;
std::tuple<interruptible_future<>, interruptible_future<>>;
using rep_op_fut_t =
interruptible_future<rep_op_fut_tuple>;
template <typename MutFunc>
Expand Down Expand Up @@ -475,7 +471,7 @@ auto OpsExecuter::with_effect_on_obc(
effect_func(std::move(effect_func)),
obc(std::move(obc)) {
}
osd_op_errorator::future<> execute(Ref<PG> pg) final {
seastar::future<> execute(Ref<PG> pg) final {
return std::move(effect_func)(std::move(ctx),
std::move(obc),
std::move(pg));
Expand All @@ -502,15 +498,14 @@ OpsExecuter::flush_changes_n_do_ops_effects(
assert(obc);

auto submitted = interruptor::now();
auto all_completed =
interruptor::make_interruptible(osd_op_errorator::now());
auto all_completed = interruptor::now();

if (cloning_ctx) {
ceph_assert(want_mutate);
}

if (want_mutate) {
auto log_entries = co_await flush_clone_metadata(
auto log_entries = flush_clone_metadata(
prepare_transaction(ops),
snap_mapper,
osdriver,
Expand All @@ -536,7 +531,7 @@ OpsExecuter::flush_changes_n_do_ops_effects(
// need extra ref pg due to apply_stats() which can be executed after
// informing snap mapper
all_completed =
std::move(all_completed).safe_then_interruptible([this, pg=this->pg] {
std::move(all_completed).then_interruptible([this, pg=this->pg] {
// let's do the cleaning of `op_effects` in destructor
return interruptor::do_for_each(op_effects,
[pg=std::move(pg)](auto& op_effect) {
Expand All @@ -552,21 +547,19 @@ OpsExecuter::flush_changes_n_do_ops_effects(

template <class Func>
struct OpsExecuter::RollbackHelper {
void rollback_obc_if_modified(const std::error_code& e);
seastar::lw_shared_ptr<OpsExecuter> ox;
void rollback_obc_if_modified();
OpsExecuter *ox;
Func func;
};

template <class Func>
inline OpsExecuter::RollbackHelper<Func>
OpsExecuter::create_rollbacker(Func&& func) {
return {shared_from_this(), std::forward<Func>(func)};
return {this, std::forward<Func>(func)};
}


template <class Func>
void OpsExecuter::RollbackHelper<Func>::rollback_obc_if_modified(
const std::error_code& e)
void OpsExecuter::RollbackHelper<Func>::rollback_obc_if_modified()
{
// Oops, an operation had failed. do_osd_ops() altogether with
// OpsExecuter already dropped the ObjectStore::Transaction if
Expand All @@ -584,10 +577,9 @@ void OpsExecuter::RollbackHelper<Func>::rollback_obc_if_modified(
assert(ox);
const auto need_rollback = ox->has_seen_write();
crimson::get_logger(ceph_subsys_osd).debug(
"{}: object {} got error {}, need_rollback={}",
"{}: object {} got error, need_rollback={}",
__func__,
ox->obc->get_oid(),
e,
need_rollback);
if (need_rollback) {
func(ox->obc);
Expand Down
31 changes: 31 additions & 0 deletions src/crimson/osd/osd_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,37 @@ struct PerShardPipeline {
} create_or_wait_pg;
};

struct PGPeeringPipeline {
struct AwaitMap : OrderedExclusivePhaseT<AwaitMap> {
static constexpr auto type_name = "PeeringEvent::PGPipeline::await_map";
} await_map;
struct Process : OrderedExclusivePhaseT<Process> {
static constexpr auto type_name = "PeeringEvent::PGPipeline::process";
} process;
};

struct CommonPGPipeline {
struct WaitForActive : OrderedExclusivePhaseT<WaitForActive> {
static constexpr auto type_name = "CommonPGPipeline:::wait_for_active";
} wait_for_active;
struct RecoverMissing : OrderedConcurrentPhaseT<RecoverMissing> {
static constexpr auto type_name = "CommonPGPipeline::recover_missing";
} recover_missing;
struct CheckAlreadyCompleteGetObc : OrderedExclusivePhaseT<CheckAlreadyCompleteGetObc> {
static constexpr auto type_name = "CommonPGPipeline::check_already_complete_get_obc";
} check_already_complete_get_obc;
struct LockOBC : OrderedConcurrentPhaseT<LockOBC> {
static constexpr auto type_name = "CommonPGPipeline::lock_obc";
} lock_obc;
struct Process : OrderedExclusivePhaseT<Process> {
static constexpr auto type_name = "CommonPGPipeline::process";
} process;
struct WaitRepop : OrderedConcurrentPhaseT<WaitRepop> {
static constexpr auto type_name = "ClientRequest::PGPipeline::wait_repop";
} wait_repop;
};


enum class OperationTypeCode {
client_request = 0,
peering_event,
Expand Down
11 changes: 0 additions & 11 deletions src/crimson/osd/osd_operation_external_tracking.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ struct LttngBackend
ClientRequest::PGPipeline::RecoverMissing::
BlockingEvent::ExitBarrierEvent::Backend,
ClientRequest::PGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent::Backend,
ClientRequest::PGPipeline::GetOBC::BlockingEvent::Backend,
ClientRequest::PGPipeline::LockOBC::BlockingEvent::Backend,
ClientRequest::PGPipeline::LockOBC::BlockingEvent::ExitBarrierEvent::Backend,
ClientRequest::PGPipeline::Process::BlockingEvent::Backend,
Expand Down Expand Up @@ -117,10 +116,6 @@ struct LttngBackend
const ClientRequest::PGPipeline::CheckAlreadyCompleteGetObc& blocker) override {
}

void handle(ClientRequest::PGPipeline::GetOBC::BlockingEvent& ev,
const Operation& op,
const ClientRequest::PGPipeline::GetOBC& blocker) override {
}

void handle(ClientRequest::PGPipeline::LockOBC::BlockingEvent& ev,
const Operation& op,
Expand Down Expand Up @@ -171,7 +166,6 @@ struct HistoricBackend
ClientRequest::PGPipeline::RecoverMissing::
BlockingEvent::ExitBarrierEvent::Backend,
ClientRequest::PGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent::Backend,
ClientRequest::PGPipeline::GetOBC::BlockingEvent::Backend,
ClientRequest::PGPipeline::LockOBC::BlockingEvent::Backend,
ClientRequest::PGPipeline::LockOBC::BlockingEvent::ExitBarrierEvent::Backend,
ClientRequest::PGPipeline::Process::BlockingEvent::Backend,
Expand Down Expand Up @@ -252,11 +246,6 @@ struct HistoricBackend
const ClientRequest::PGPipeline::CheckAlreadyCompleteGetObc& blocker) override {
}

void handle(ClientRequest::PGPipeline::GetOBC::BlockingEvent& ev,
const Operation& op,
const ClientRequest::PGPipeline::GetOBC& blocker) override {
}

void handle(ClientRequest::PGPipeline::LockOBC::BlockingEvent& ev,
const Operation& op,
const ClientRequest::PGPipeline::LockOBC& blocker) override {
Expand Down
Loading

0 comments on commit 93cbc92

Please sign in to comment.