From 753e6d38b1e030bb0ea0ffeb281ade1bb0a878ae Mon Sep 17 00:00:00 2001 From: Alexander Indenbaum Date: Tue, 26 Dec 2023 15:23:55 +0200 Subject: [PATCH] Multi gateway group refactoring Cleanup, extract - src/mon/NVMeofGwSerialize.h - src/mon/NVMeofGwTypes.h Signed-off-by: Alexander Indenbaum --- src/messages/MNVMeofGwBeacon.h | 61 --- src/mon/NVMeofGwMap.cc | 754 ++++++++++++++------------------- src/mon/NVMeofGwMap.h | 481 +++------------------ src/mon/NVMeofGwMon.cc | 219 +++------- src/mon/NVMeofGwMon.h | 50 +-- src/mon/NVMeofGwSerialize.h | 311 ++++++++++++++ src/mon/NVMeofGwTypes.h | 108 +++++ src/nvmeof/NVMeofGw.cc | 120 +++--- 8 files changed, 919 insertions(+), 1185 deletions(-) create mode 100755 src/mon/NVMeofGwSerialize.h create mode 100755 src/mon/NVMeofGwTypes.h diff --git a/src/messages/MNVMeofGwBeacon.h b/src/messages/MNVMeofGwBeacon.h index 114c0cbd6c01..0735e57fa149 100644 --- a/src/messages/MNVMeofGwBeacon.h +++ b/src/messages/MNVMeofGwBeacon.h @@ -22,55 +22,6 @@ #include "mon/NVMeofGwMap.h" #include "include/types.h" - -typedef GW_STATES_PER_AGROUP_E SM_STATE[MAX_SUPPORTED_ANA_GROUPS]; -struct NqnState { - std::string nqn; // subsystem NQN - SM_STATE sm_state; // susbsystem's state machine state - uint16_t opt_ana_gid; // optimized ANA group index - - // Default constructor - NqnState(const std::string& _nqn) : nqn(_nqn), opt_ana_gid(0) { - for (int i=0; i < MAX_SUPPORTED_ANA_GROUPS; i++) - sm_state[i] = GW_STATES_PER_AGROUP_E::GW_IDLE_STATE; - } -}; - -typedef std::vector GwSubsystems; - -std::ostream& operator<<(std::ostream& os, const SM_STATE value) { - os << "SM_STATE [ "; - for (int i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i++) { - switch (value[i]) { - case GW_STATES_PER_AGROUP_E::GW_IDLE_STATE: os << "IDLE "; break; - case GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE: os << "STANDBY "; break; - case GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE: os << "ACTIVE "; break; - case GW_STATES_PER_AGROUP_E::GW_BLOCKED_AGROUP_OWNER: os << "BLOCKED_AGROUP_OWNER "; break; - case GW_STATES_PER_AGROUP_E::GW_WAIT_FAILBACK_PREPARED: os << "WAIT_FAILBACK_PREPARED "; break; - default: os << "Invalid " << (int)value[i] << " "; - } - } - os << "]"; - return os; -} - -std::ostream& operator<<(std::ostream& os, const NqnState value) { - os << "Subsystem( nqn: " << value.nqn << ", ANAGrpId: " << value.opt_ana_gid << ", " << value.sm_state << " )"; - return os; -} - -std::ostream& operator<<(std::ostream& os, const GW_AVAILABILITY_E value) { - switch (value) { - - case GW_AVAILABILITY_E::GW_CREATED: os << "CREATED"; break; - case GW_AVAILABILITY_E::GW_AVAILABLE: os << "AVAILABLE"; break; - case GW_AVAILABILITY_E::GW_UNAVAILABLE: os << "UNAVAILABLE"; break; - - default: os << "Invalid " << (int)value << " "; - } - return os; -} - class MNVMeofGwBeacon final : public PaxosServiceMessage { private: static constexpr int HEAD_VERSION = 1; @@ -115,18 +66,6 @@ class MNVMeofGwBeacon final : public PaxosServiceMessage { std::string_view get_type_name() const override { return "nvmeofgwbeacon"; } - void print(std::ostream& out) const override { - out << get_type_name() << - " nvmeofgw id: " << gw_id << - ", pool:" << gw_pool << - ", group:" << gw_group << - ", susbsystems: [ "; - for (const NqnState& st: subsystems) { - out << st << " "; - } - out << "], availability: " << availability << ", version:" << version; - } - void encode_payload(uint64_t features) override { header.version = HEAD_VERSION; header.compat_version = COMPAT_VERSION; diff --git a/src/mon/NVMeofGwMap.cc b/src/mon/NVMeofGwMap.cc index b3e3c6db8ca3..fe2c3dcca36d 100755 --- a/src/mon/NVMeofGwMap.cc +++ b/src/mon/NVMeofGwMap.cc @@ -1,7 +1,7 @@ - #include #include "include/stringify.h" #include "NVMeofGwMon.h" +#include "NVMeofGwMap.h" using std::map; using std::make_pair; @@ -32,360 +32,247 @@ static std::string G_gw_ana_states[] = { "WAIT_FLBACK_RDY" }; -int NVMeofGwMap::cfg_add_gw (const GW_ID_T &gw_id, const std::string& pool, const std::string& group) { - GW_CREATED_T gw_created; - bool allocated[MAX_SUPPORTED_ANA_GROUPS] = {false}; - gw_created.ana_grp_id = 0xff; - std::string gw_name; - std::string gw_preffix; - NVMeofGwMap::gw_preffix_from_id_pool_group (gw_preffix, pool, group ); - NVMeofGwMap::gw_name_from_id_pool_group (gw_name, gw_id , pool, group ); - - for (auto& itr : Created_gws){ - // Allocate ana_grp_ids per pool + group pair - if((itr.first.find(gw_preffix) != std::string::npos)) // gw_name contains ".pool.group" string - allocated[itr.second.ana_grp_id ] = true; - if(itr.first == gw_name){ - dout(4) << __func__ << " ERROR create GW: already exists in map " << gw_name << dendl; - return -EEXIST ; - } - } - - for(int i=0; i<=MAX_SUPPORTED_ANA_GROUPS; i++){ - if (allocated[i] == false){ - gw_created.ana_grp_id = i; - break; - } - } - if(gw_created.ana_grp_id == 0xff){ - dout(4) << __func__ << " ERROR create GW: " << gw_name << " ANA groupId was not allocated " << dendl; - return -EINVAL; - } - - Created_gws.insert({gw_name, gw_created}); - dout(4) << __func__ << "Created GW: " << gw_name << " grpid " << gw_created.ana_grp_id << dendl; - std::stringstream ss; - _dump_created_gws(ss); - dout(4) << ss.str() << dendl; - return 0; -} - - -int NVMeofGwMap::cfg_delete_gw (const GW_ID_T &gw_id, const std::string& pool, const std::string& group, bool & map_modified){ - - GW_STATE_T * state; - int ana_grp_id = 0; - std::string gw_name; - - NVMeofGwMap::gw_name_from_id_pool_group (gw_name, gw_id , pool, group ); - - if(find_created_gw(gw_name, ana_grp_id) != 0) - { - dout(4) << __func__ << " ERROR :GW was not created " << gw_name << dendl; - return -ENODEV ; - } - // traverse the GMap , find gw in the map for all nqns - - map_modified = false; - for (auto& itr : Gmap) - for (auto& ptr : itr.second) { - GW_ID_T found_gw_id = ptr.first; - const std::string& nqn = itr.first; - state = &ptr.second; - if (gw_name == found_gw_id) { // GW was created - bool modified = false; - for(int i=0; ism_state[i], i, modified); - map_modified |= modified; - } - dout(4) << " Delete GW :"<< gw_name << "nqn " << nqn << " ANA grpid: " << state->optimized_ana_group_id << dendl; - Gmap[itr.first].erase(gw_name); - delete_metadata(gw_name, nqn); - } - } - Created_gws.erase(gw_name);//TODO check whether ana map with nonce vector is destroyed properly - probably not. to handle! - - return 0; -} - - -GW_METADATA_T* NVMeofGwMap::find_gw_metadata(const GW_ID_T &gw_name, const std::string& nqn) -{ - auto it = Gmetadata.find(nqn); - if (it != Gmetadata.end() ) { - auto it2 = it->second.find(gw_name); - if (it2 != it->second.end() ) { - return &it2->second; - } - else{ - dout(4) << __func__ << " not found by gw id " << gw_name << dendl; +int NVMeofGwMap::cfg_add_gw(const GW_ID_T &gw_id, const GROUP_KEY& group_key) { + // Calculate allocated group bitmask + bool allocated[MAX_SUPPORTED_ANA_GROUPS] = {false}; + for (auto& itr: Created_gws[group_key]) { + allocated[itr.second.ana_grp_id] = true; + if(itr.first == gw_id) { + dout(4) << __func__ << " ERROR create GW: already exists in map " << gw_id << dendl; + return -EEXIST ; } } - else{ - dout(4) << __func__ << " not found by nqn " << nqn << dendl; - } - return NULL; -} - -int NVMeofGwMap::_dump_gwmap(GWMAP & Gmap)const { + // Allocate the new group id + for(int i=0; i<=MAX_SUPPORTED_ANA_GROUPS; i++) { + if (allocated[i] == false) { + GW_CREATED_T gw_created(i); + Created_gws[group_key][gw_id] = gw_created; - dout(0) << __func__ << " called " << mon << dendl; - std::ostringstream ss; - ss << std::endl; - for (auto& itr : Gmap) { - for (auto& ptr : itr.second) { - - ss << "(gw-mon) NQN " << itr.first << " GW_ID " << ptr.first << " ANA gr " << std::setw(5) << (int)ptr.second.optimized_ana_group_id+1 << - " available :" << G_gw_avail[(int)ptr.second.availability] << " States: "; - int num_groups = Created_gws.size(); - for (int i = 0; i < num_groups; i++) { - ss << G_gw_ana_states[(int)ptr.second.sm_state[i]] << " " ; - } - ss << " Failover peers: " << std::endl << " "; - - for (int i = 0; i < num_groups; i++) { - ss << ptr.second.failover_peer[i] << " " ; - } - ss << std::endl; + dout(4) << __func__ << "Created GW: " << gw_id << " pool " << group_key.first << "group" << group_key.second + << " grpid " << gw_created.ana_grp_id << dendl; + return 0; } } - dout(0) << ss.str() <second; + for(int i=0; ianagrp_sm_tstamps[i] != INVALID_GW_TIMER){ - metadata->anagrp_sm_tstamps[i] ++; - dout(4) << "timer for GW " << ptr.first << " ANA GRP " << i<<" :" << metadata->anagrp_sm_tstamps[i] <anagrp_sm_tstamps[i] >= 2){//TODO define - fsm_handle_to_expired (ptr.first, itr.first, i, propose_pending); + for (auto& group_md: Gmetadata) { + auto& group_key = group_md.first; + auto& pool = group_key.first; + auto& group = group_key.second; + + for (auto& nqn_md: group_md.second) { + auto& nqn = nqn_md.first; + for (auto& gw_md: nqn_md.second) { + auto& gw_id = gw_md.first; + auto& md = gw_md.second; + for (int i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i++) { + if (md.anagrp_sm_tstamps[i] == INVALID_GW_TIMER) continue; + + md.anagrp_sm_tstamps[i]++; + dout(4) << "timer for GW " << gw_id << " ANA GRP " << i<<" :" << md.anagrp_sm_tstamps[i] <= 2){//TODO define + fsm_handle_to_expired (gw_id, std::make_pair(pool, group), nqn, i, propose_pending); } } } } } - return 0; } -int NVMeofGwMap::process_gw_map_gw_down(const GW_ID_T &gw_name, const std::string& nqn, bool &propose_pending) -{ +int NVMeofGwMap::process_gw_map_gw_down(const GW_ID_T &gw_id, const GROUP_KEY& group_key, + const NQN_ID_T& nqn, bool &propose_pending) { int rc = 0; - int i; - GW_STATE_T* gw_state = find_gw_map(gw_name, nqn); - if (gw_state) { - dout(4) << "GW down " << gw_name << " nqn " <availability = GW_AVAILABILITY_E::GW_UNAVAILABLE; - for (i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i ++) { - bool map_modified; - fsm_handle_gw_down (gw_name, nqn, gw_state->sm_state[i], i, map_modified); - if(map_modified) propose_pending = true; - set_gw_standby_state(gw_state, i); + auto& nqn_gws_states = Gmap[group_key][nqn]; + auto gw_state = nqn_gws_states.find(gw_id); + if (gw_state != nqn_gws_states.end()) { + dout(4) << "GW down " << gw_id << " nqn " <second; + st.availability = GW_AVAILABILITY_E::GW_UNAVAILABLE; + for (ANA_GRP_ID_T i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i ++) { + fsm_handle_gw_down (gw_id, group_key, nqn, st.sm_state[i], i, propose_pending); + st.standby_state(i); } } else { - dout(4) << __FUNCTION__ << "ERROR GW-id was not found in the map " << gw_name << dendl; - rc = 1; + dout(4) << __FUNCTION__ << "ERROR GW-id was not found in the map " << gw_id << dendl; + rc = -EINVAL; } return rc; } -int NVMeofGwMap::process_gw_map_ka(const GW_ID_T &gw_name, const std::string& nqn , bool &propose_pending) +void NVMeofGwMap::process_gw_map_ka(const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn , bool &propose_pending) { - int rc = 0; + #define FAILBACK_PERSISTENCY_INT_SEC 8 - GW_STATE_T* gw_state = find_gw_map(gw_name, nqn); - if (gw_state) { - dout(4) << "KA beacon from the GW " << gw_name << " in state " << (int)gw_state->availability << dendl; - - if (gw_state->availability == GW_AVAILABILITY_E::GW_CREATED) { - // first time appears - allow IO traffic for this GW - gw_state->availability = GW_AVAILABILITY_E::GW_AVAILABLE; - for (int i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i++) gw_state->sm_state[i] = GW_STANDBY_STATE; - if (gw_state->optimized_ana_group_id != REDUNDANT_GW_ANA_GROUP_ID) { // not a redundand GW - gw_state->sm_state[gw_state->optimized_ana_group_id] = GW_ACTIVE_STATE; - } - propose_pending = true; + auto& nqn_gws_states = Gmap[group_key][nqn]; + auto gw_state = nqn_gws_states.find(gw_id); + ceph_assert (gw_state != nqn_gws_states.end()); + auto& st = gw_state->second; + dout(4) << "KA beacon from the GW " << gw_id << " in state " << (int)st.availability << dendl; + + if (st.availability == GW_AVAILABILITY_E::GW_CREATED) { + // first time appears - allow IO traffic for this GW + st.availability = GW_AVAILABILITY_E::GW_AVAILABLE; + for (int i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i++) st.sm_state[i] = GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE; + if (st.optimized_ana_group_id != REDUNDANT_GW_ANA_GROUP_ID) { // not a redundand GW + st.sm_state[st.optimized_ana_group_id] = GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE; } - - else if (gw_state->availability == GW_AVAILABILITY_E::GW_UNAVAILABLE) { - gw_state->availability = GW_AVAILABILITY_E::GW_AVAILABLE; - if (gw_state->optimized_ana_group_id == REDUNDANT_GW_ANA_GROUP_ID) { - for (int i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i++) gw_state->sm_state[i] = GW_STANDBY_STATE; - propose_pending = true; //TODO try to find the 1st GW overloaded by ANA groups and start failback for ANA group that it is not an owner of - } - else { - //========= prepare to Failback to this GW ========= - // find the GW that took over on the group gw_state->optimized_ana_group_id - bool some_found = false; - propose_pending = true; - find_failback_gw(gw_name, nqn, gw_state, some_found); - if (!some_found ) { // There is start of single GW so immediately turn its group to GW_ACTIVE_STATE - dout(4) << "Warning - not found the GW responsible for" << gw_state->optimized_ana_group_id << " that took over the GW " << gw_name << "when it was fallen" << dendl; - gw_state->sm_state[gw_state->optimized_ana_group_id] = GW_ACTIVE_STATE; - } + propose_pending = true; + } + else if (st.availability == GW_AVAILABILITY_E::GW_UNAVAILABLE) { + st.availability = GW_AVAILABILITY_E::GW_AVAILABLE; + if (st.optimized_ana_group_id == REDUNDANT_GW_ANA_GROUP_ID) { + for (int i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i++) st.sm_state[i] = GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE; + propose_pending = true; //TODO try to find the 1st GW overloaded by ANA groups and start failback for ANA group that it is not an owner of + } + else { + //========= prepare to Failback to this GW ========= + // find the GW that took over on the group st.optimized_ana_group_id + bool some_found = false; + propose_pending = true; + find_failback_gw(gw_id, group_key, nqn, some_found); + if (!some_found ) { // There is start of single GW so immediately turn its group to GW_ACTIVE_STATE + dout(4) << "Warning - not found the GW responsible for" << st.optimized_ana_group_id << " that took over the GW " << gw_id << "when it was fallen" << dendl; + st.sm_state[st.optimized_ana_group_id] = GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE; } } - // if GW remains AVAILABLE need to handle failback Timers , this is handled separately - } - else{ - dout(4) << __func__ << "ERROR GW-id was not found in the map " << gw_name << dendl; - rc = 1; - ceph_assert(false); } - return rc; } -int NVMeofGwMap::handle_abandoned_ana_groups(bool & propose) +void NVMeofGwMap::handle_abandoned_ana_groups(bool& propose) { propose = false; - for (auto& nqn_itr : Gmap) { - dout(4) << "NQN " << nqn_itr.first << dendl; - - for (auto& ptr : nqn_itr.second) { // loop for GWs inside nqn group - auto gw_id = ptr.first; - GW_STATE_T* state = &ptr.second; - - //1. Failover missed : is there is a GW in unavailable state? if yes, is its ANA group handled by some other GW? - if (state->availability == GW_AVAILABILITY_E::GW_UNAVAILABLE && state->optimized_ana_group_id != REDUNDANT_GW_ANA_GROUP_ID) { - auto found_gw_for_ana_group = false; - for (auto& ptr2 : nqn_itr.second) { - if (ptr2.second.availability == GW_AVAILABILITY_E::GW_AVAILABLE && ptr2.second.sm_state[state->optimized_ana_group_id] == GW_ACTIVE_STATE) { - found_gw_for_ana_group = true; // dout(4) << "Found GW " << ptr2.first << " that handles ANA grp " << (int)state->optimized_ana_group_id << dendl; - break; + for (auto& group_state: Gmap) { + auto& group_key = group_state.first; + auto& nqn_gws_states = group_state.second; + + for (auto& nqn_gws_state: nqn_gws_states) { + auto& nqn = nqn_gws_state.first; + auto& gws_states = nqn_gws_state.second; + dout(4) << "NQN " << nqn << dendl; + + for (auto& gw_state : gws_states) { // loop for GWs inside nqn group + auto& gw_id = gw_state.first; + GW_STATE_T& state = gw_state.second; + + //1. Failover missed : is there is a GW in unavailable state? if yes, is its ANA group handled by some other GW? + if (state.availability == GW_AVAILABILITY_E::GW_UNAVAILABLE && state.optimized_ana_group_id != REDUNDANT_GW_ANA_GROUP_ID) { + auto found_gw_for_ana_group = false; + for (auto& gw_state2 : gws_states) { + GW_STATE_T& state2 = gw_state2.second; + if (state2.availability == GW_AVAILABILITY_E::GW_AVAILABLE && state2.sm_state[state.optimized_ana_group_id] == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE) { + found_gw_for_ana_group = true; // dout(4) << "Found GW " << ptr2.first << " that handles ANA grp " << (int)state->optimized_ana_group_id << dendl; + break; + } } - } - if (found_gw_for_ana_group == false) { //choose the GW for handle ana group - dout(4)<< "Was not found the GW " << " that handles ANA grp " << (int)state->optimized_ana_group_id << " find candidate "<< dendl; + if (found_gw_for_ana_group == false) { //choose the GW for handle ana group + dout(4)<< "Was not found the GW " << " that handles ANA grp " << (int)state.optimized_ana_group_id << " find candidate "<< dendl; - GW_STATE_T* gw_state = find_gw_map(gw_id, nqn_itr.first); - for (int i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i++) - find_failover_candidate( gw_id, nqn_itr.first , gw_state, i, propose ); + for (int i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i++) + find_failover_candidate( gw_id, group_key, nqn, i, propose ); + } } - } - //2. Failback missed: Check this GW is Available and Standby and no other GW is doing Failback to it - else if (state->availability == GW_AVAILABILITY_E::GW_AVAILABLE && state->optimized_ana_group_id != REDUNDANT_GW_ANA_GROUP_ID && - state->sm_state[state->optimized_ana_group_id] == GW_STANDBY_STATE - ) - { - bool found = false; - for (auto& ptr2 : nqn_itr.second) { - if ( ptr2.second.sm_state[state->optimized_ana_group_id] == GW_WAIT_FAILBACK_PREPARED){ - found = true; - break; - } - } - if(!found){ - dout(4) << __func__ << " GW " <optimized_ana_group_id << dendl; - state->sm_state[state->optimized_ana_group_id] = GW_ACTIVE_STATE; - propose = true; + //2. Failback missed: Check this GW is Available and Standby and no other GW is doing Failback to it + else if (state.availability == GW_AVAILABILITY_E::GW_AVAILABLE + && state.optimized_ana_group_id != REDUNDANT_GW_ANA_GROUP_ID && + state.sm_state[state.optimized_ana_group_id] == GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE) + { + bool found = false; + for (auto& gw_state2 : gws_states) { + auto& state2 = gw_state2.second; + if (state2.sm_state[state.optimized_ana_group_id] == GW_STATES_PER_AGROUP_E::GW_WAIT_FAILBACK_PREPARED){ + found = true; + break; + } + } + if (!found) { + dout(4) << __func__ << " GW " << gw_id << " turns to be Active for ANA group " << state.optimized_ana_group_id << dendl; + state.sm_state[state.optimized_ana_group_id] = GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE; + propose = true; + } } } } } - return 0; } - -int NVMeofGwMap::handle_removed_subsystems (const std::vector &created_subsystems, bool &propose_pending) +/* + sync our sybsystems from the beacon. systems subsystems not in beacon are removed. +*/ +void NVMeofGwMap::handle_removed_subsystems (const std::vector ¤t_subsystems, const GROUP_KEY& group_key, bool &propose_pending) { - bool found = false;; - for (auto& m_itr : Gmap) { - //if not found in the vector of configured subsystems, need to remove the nqn from the map - found = false; - for(auto v_itr : created_subsystems){ - if (m_itr.first == v_itr){ - found = true; - break; - } - } - if(!found){ - // remove m_itr.first from the map - dout(4) << "seems subsystem nqn was removed - to remove nqn from the map " << m_itr.first <first) == current_subsystems.end()) { + // Erase the susbsystem nqn if the nqn is not in the current subsystems + it = nqn_gws_states.erase(it); + } else { + // Move to the next pair + ++it; } } - return 0; } -int NVMeofGwMap::set_failover_gw_for_ANA_group(const GW_ID_T &failed_gw_id, const GW_ID_T &gw_name, const std::string& nqn, uint8_t ANA_groupid) +void NVMeofGwMap::set_failover_gw_for_ANA_group(const GW_ID_T &failed_gw_id, const GROUP_KEY& group_key, const GW_ID_T &gw_id, const NQN_ID_T& nqn, ANA_GRP_ID_T ANA_groupid) { - GW_STATE_T* gw_state = find_gw_map(gw_name, nqn); - gw_state->sm_state[ANA_groupid] = GW_ACTIVE_STATE; - gw_state->failover_peer[ANA_groupid] = failed_gw_id; - //publish_map_to_gws(nqn); - dout(4) << "Set failower GW " << gw_name << " for ANA group " << (int)ANA_groupid << dendl; - return 0; + GW_STATE_T& gw_state = Gmap[group_key][nqn][gw_id]; + gw_state.sm_state[ANA_groupid] = GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE; + gw_state.failover_peer[ANA_groupid] = failed_gw_id; + + dout(4) << "Set failower GW " << gw_id << " for ANA group " << (int)ANA_groupid << dendl; } -int NVMeofGwMap::find_failback_gw(const GW_ID_T &gw_name, const std::string& nqn, GW_STATE_T* gw_state, bool &some_found) +void NVMeofGwMap::find_failback_gw(const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, bool &some_found) { - auto subsyst_it = find_subsystem_map(nqn); bool found_some_gw = false; bool found_candidate = false; - for (auto& itr : *subsyst_it) { - //cout << "Found GW " << itr.second.gw_id << endl; - if (itr.second.sm_state[gw_state->optimized_ana_group_id] == GW_ACTIVE_STATE) { - ceph_assert(itr.second.failover_peer[gw_state->optimized_ana_group_id] == gw_name); - - dout(4) << "Found GW " << itr.first << ", nqn " << nqn << " that took over the ANAGRP " << (int)gw_state->optimized_ana_group_id << " of the available GW " << gw_name << dendl; - itr.second.sm_state[gw_state->optimized_ana_group_id] = GW_WAIT_FAILBACK_PREPARED; - start_timer(itr.first, nqn, gw_state->optimized_ana_group_id);// Add timestamp of start Failback preparation - gw_state->sm_state[gw_state->optimized_ana_group_id] = GW_BLOCKED_AGROUP_OWNER; + auto& nqn_gws_states = Gmap[group_key][nqn]; + auto& gw_state = Gmap[group_key][nqn][gw_id]; + for (auto& nqn_gw_state: nqn_gws_states) { + auto& found_gw_id = nqn_gw_state.first; + auto& st = nqn_gw_state.second; + if (st.sm_state[gw_state.optimized_ana_group_id] == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE) { + ceph_assert(st.failover_peer[gw_state.optimized_ana_group_id] == gw_id); + + dout(4) << "Found GW " << found_gw_id << ", nqn " << nqn << " that took over the ANAGRP " << gw_state.optimized_ana_group_id << " of the available GW " << gw_id << dendl; + st.sm_state[gw_state.optimized_ana_group_id] = GW_STATES_PER_AGROUP_E::GW_WAIT_FAILBACK_PREPARED; + start_timer(found_gw_id, group_key, nqn, gw_state.optimized_ana_group_id);// Add timestamp of start Failback preparation + gw_state.sm_state[gw_state.optimized_ana_group_id] = GW_STATES_PER_AGROUP_E::GW_BLOCKED_AGROUP_OWNER; found_candidate = true; break; @@ -394,98 +281,95 @@ int NVMeofGwMap::find_failback_gw(const GW_ID_T &gw_name, const std::string& nq } some_found = found_candidate |found_some_gw; //TODO cleanup myself (gw_id) from the Block-List - return 0; } // TODO When decision to change ANA state of group is prepared, need to consider that last seen FSM state is "approved" - means it was returned in beacon alone with map version -int NVMeofGwMap::find_failover_candidate(const GW_ID_T &gw_id, const std::string& nqn, GW_STATE_T* gw_state, int grpid, bool &propose_pending) +void NVMeofGwMap::find_failover_candidate(const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, ANA_GRP_ID_T grpid, bool &propose_pending) { - // dout(4) <<__func__<< " process GW down " << gw_id << dendl; -#define ILLEGAL_GW_ID " " -#define MIN_NUM_ANA_GROUPS 0xFFF - int min_num_ana_groups_in_gw = 0; - int current_ana_groups_in_gw = 0; - GW_ID_T min_loaded_gw_id = ILLEGAL_GW_ID; - auto subsyst_it = find_subsystem_map(nqn); - - // this GW may handle several ANA groups and for each of them need to found the candidate GW - if (gw_state->sm_state[grpid] == GW_ACTIVE_STATE || gw_state->optimized_ana_group_id == grpid) { - // Find a GW that takes over the ANA group(s) - min_num_ana_groups_in_gw = MIN_NUM_ANA_GROUPS; - min_loaded_gw_id = ILLEGAL_GW_ID; - for (auto& itr : *subsyst_it) { // for all the gateways of the subsystem - if (itr.second.availability == GW_AVAILABILITY_E::GW_AVAILABLE) { - - current_ana_groups_in_gw = 0; - for (int j = 0; j < MAX_SUPPORTED_ANA_GROUPS; j++) { - if (itr.second.sm_state[j] == GW_BLOCKED_AGROUP_OWNER || itr.second.sm_state[j] == GW_WAIT_FAILBACK_PREPARED) { - current_ana_groups_in_gw = 0xFFFF; - break; // dont take into account GWs in the transitive state - } - else if (itr.second.sm_state[j] == GW_ACTIVE_STATE) - //dout(4) << " process GW down " << current_ana_groups_in_gw << dendl; - current_ana_groups_in_gw++; // how many ANA groups are handled by this GW + // dout(4) <<__func__<< " process GW down " << gw_id << dendl; + #define ILLEGAL_GW_ID " " + #define MIN_NUM_ANA_GROUPS 0xFFF + int min_num_ana_groups_in_gw = 0; + int current_ana_groups_in_gw = 0; + GW_ID_T min_loaded_gw_id = ILLEGAL_GW_ID; + + auto& nqn_gws_states = Gmap[group_key][nqn]; + + auto gw_state = nqn_gws_states.find(gw_id); + ceph_assert(gw_state != nqn_gws_states.end()); + + // this GW may handle several ANA groups and for each of them need to found the candidate GW + if (gw_state->second.sm_state[grpid] == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE || gw_state->second.optimized_ana_group_id == grpid) { + // Find a GW that takes over the ANA group(s) + min_num_ana_groups_in_gw = MIN_NUM_ANA_GROUPS; + min_loaded_gw_id = ILLEGAL_GW_ID; + for (auto& found_gw_state: nqn_gws_states) { // for all the gateways of the subsystem + auto st = found_gw_state.second; + if (st.availability == GW_AVAILABILITY_E::GW_AVAILABLE) { + current_ana_groups_in_gw = 0; + for (int j = 0; j < MAX_SUPPORTED_ANA_GROUPS; j++) { + if (st.sm_state[j] == GW_STATES_PER_AGROUP_E::GW_BLOCKED_AGROUP_OWNER || st.sm_state[j] == GW_STATES_PER_AGROUP_E::GW_WAIT_FAILBACK_PREPARED) { + current_ana_groups_in_gw = 0xFFFF; + break; // dont take into account GWs in the transitive state + } + else if (st.sm_state[j] == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE) + //dout(4) << " process GW down " << current_ana_groups_in_gw << dendl; + current_ana_groups_in_gw++; // how many ANA groups are handled by this GW } if (min_num_ana_groups_in_gw > current_ana_groups_in_gw) { min_num_ana_groups_in_gw = current_ana_groups_in_gw; - min_loaded_gw_id = itr.first; - dout(4) << "choose: gw-id min_ana_groups " << itr.first << current_ana_groups_in_gw << " min " << min_num_ana_groups_in_gw << dendl; + min_loaded_gw_id = found_gw_state.first; + dout(4) << "choose: gw-id min_ana_groups " << min_loaded_gw_id << current_ana_groups_in_gw << " min " << min_num_ana_groups_in_gw << dendl; } } } if (min_loaded_gw_id != ILLEGAL_GW_ID) { propose_pending = true; - set_failover_gw_for_ANA_group(gw_id, min_loaded_gw_id, nqn, grpid); + set_failover_gw_for_ANA_group(gw_id, group_key, min_loaded_gw_id, nqn, grpid); } else { - if (gw_state->sm_state[grpid] == GW_ACTIVE_STATE){// not found candidate but map changed. + if (gw_state->second.sm_state[grpid] == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE){// not found candidate but map changed. propose_pending = true; dout(4) << "gw down no candidate found " << dendl; - _dump_gwmap(Gmap); } } - gw_state->sm_state[grpid] = GW_STANDBY_STATE; + gw_state->second.sm_state[grpid] = GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE; } - return 0; } - int NVMeofGwMap::fsm_handle_gw_down (const GW_ID_T &gw_name, const std::string& nqn, GW_STATES_PER_AGROUP_E state , int grpid, bool &map_modified) + void NVMeofGwMap::fsm_handle_gw_down(const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, GW_STATES_PER_AGROUP_E state, ANA_GRP_ID_T grpid, bool &map_modified) { switch (state) { - case GW_STANDBY_STATE: - case GW_IDLE_STATE: - // nothing to do - break; + case GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE: + case GW_STATES_PER_AGROUP_E::GW_IDLE_STATE: + // nothing to do + break; - case GW_WAIT_FAILBACK_PREPARED: - { - cancel_timer(gw_name, nqn, grpid); - auto subsyst_it = find_subsystem_map(nqn); - for (auto& itr : *subsyst_it){ - if (itr.second.sm_state[grpid] == GW_BLOCKED_AGROUP_OWNER) // found GW that was intended for Failback for this ana grp - { - dout(4) << "Warning: Outgoing Failback when GW is down back - to rollback it" << nqn <<" GW " <optimized_ana_group_id) {// Try to find GW that temporary owns my group - if found, this GW should pass to standby for this group - auto subsyst_it = find_subsystem_map(nqn); - for (auto& itr : *subsyst_it){ - if (itr.second.sm_state[grpid] == GW_ACTIVE_STATE || itr.second.sm_state[grpid] == GW_WAIT_FAILBACK_PREPARED){ - set_gw_standby_state(&itr.second, grpid); - map_modified = true; - if (itr.second.sm_state[grpid] == GW_WAIT_FAILBACK_PREPARED) - cancel_timer(itr.first, nqn, grpid); - break; - } - } +void NVMeofGwMap::fsm_handle_gw_delete (const GW_ID_T &gw_id, const GROUP_KEY& group_key, + const NQN_ID_T& nqn, GW_STATES_PER_AGROUP_E state , ANA_GRP_ID_T grpid, bool &map_modified) { + switch (state) + { + case GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE: + case GW_STATES_PER_AGROUP_E::GW_IDLE_STATE: + case GW_STATES_PER_AGROUP_E::GW_BLOCKED_AGROUP_OWNER: + { + GW_STATE_T& gw_state = Gmap[group_key][nqn][gw_id]; + + if (grpid == gw_state.optimized_ana_group_id) {// Try to find GW that temporary owns my group - if found, this GW should pass to standby for this group + auto& gateway_states = Gmap[group_key][nqn]; + for (auto& gs: gateway_states) { + if (gs.second.sm_state[grpid] == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE || gs.second.sm_state[grpid] == GW_STATES_PER_AGROUP_E::GW_WAIT_FAILBACK_PREPARED){ + gs.second.standby_state(grpid); + if (gs.second.sm_state[grpid] == GW_STATES_PER_AGROUP_E::GW_WAIT_FAILBACK_PREPARED) + cancel_timer(gs.first, group_key, nqn, grpid); + break; + } + } + } + } + break; + + case GW_STATES_PER_AGROUP_E::GW_WAIT_FAILBACK_PREPARED: + { + cancel_timer(gw_id, group_key, nqn, grpid); + for (auto& nqn_gws_state: Gmap[group_key][nqn]) { + auto& st = nqn_gws_state.second; + + if (st.sm_state[grpid] == GW_STATES_PER_AGROUP_E::GW_BLOCKED_AGROUP_OWNER) { // found GW that was intended for Failback for this ana grp + dout(4) << "Warning: Outgoing Failback when GW is deleted - to rollback it" << nqn <<" GW " <sm_state[grpid] == GW_WAIT_FAILBACK_PREPARED) { - - dout(4) << "Expired Failback timer from GW " << gw_name << " ANA groupId "<< grpid << dendl; - - cancel_timer(gw_name, nqn, grpid); - for (auto& itr : *subsyst_it) { - if (itr.second.sm_state[grpid] == GW_BLOCKED_AGROUP_OWNER && itr.second.availability == GW_AVAILABILITY_E::GW_AVAILABLE) { - set_gw_standby_state(gw_state, grpid); - itr.second.sm_state[grpid] = GW_ACTIVE_STATE; - dout(4) << "Failback from GW " << gw_name << " to " << itr.first << dendl; + auto& gw_state = Gmap[group_key][nqn][gw_id]; + + if (gw_state.sm_state[grpid] == GW_STATES_PER_AGROUP_E::GW_WAIT_FAILBACK_PREPARED) { + + dout(4) << "Expired Failback timer from GW " << gw_id << " ANA groupId "<< grpid << dendl; + + cancel_timer(gw_id, group_key, nqn, grpid); + for (auto& gw_state: Gmap[group_key][nqn]) { + auto& st = gw_state.second; + if (st.sm_state[grpid] == GW_STATES_PER_AGROUP_E::GW_BLOCKED_AGROUP_OWNER && + st.availability == GW_AVAILABILITY_E::GW_AVAILABLE) { + st.standby_state(grpid); + st.sm_state[grpid] = GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE; + dout(4) << "Failback from GW " << gw_id << " to " << gw_state.first << dendl; map_modified = true; break; } - else if (itr.second.optimized_ana_group_id == grpid ){ - if(itr.second.sm_state[grpid] == GW_STANDBY_STATE && itr.second.availability == GW_AVAILABILITY_E::GW_AVAILABLE) { - itr.second.sm_state[grpid] = GW_ACTIVE_STATE; // GW failed and started during the persistency interval - dout(4) << "Failback unsuccessfull. GW: " << itr.first << "becomes Active for the ana group " << grpid << dendl; + else if (st.optimized_ana_group_id == grpid ){ + if(st.sm_state[grpid] == GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE && st.availability == GW_AVAILABILITY_E::GW_AVAILABLE) { + st.sm_state[grpid] = GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE; // GW failed and started during the persistency interval + dout(4) << "Failback unsuccessfull. GW: " << gw_state.first << "becomes Active for the ana group " << grpid << dendl; } - set_gw_standby_state(gw_state, grpid); - dout(4) << "Failback unsuccessfull GW: " << gw_name << "becomes standby for the ana group " << grpid << dendl; + st.standby_state(grpid); + dout(4) << "Failback unsuccessfull GW: " << gw_id << "becomes standby for the ana group " << grpid << dendl; map_modified = true; break; } } } - return 0; } +void NVMeofGwMap::start_timer(const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, ANA_GRP_ID_T anagrpid) { + Gmetadata[group_key][nqn][gw_id].anagrp_sm_tstamps[anagrpid] = 0; +} + +int NVMeofGwMap::get_timer(const GW_ID_T &gw_id, GROUP_KEY& group_key, const NQN_ID_T& nqn, ANA_GRP_ID_T anagrpid) { + auto timer = Gmetadata[group_key][nqn][gw_id].anagrp_sm_tstamps[anagrpid]; + ceph_assert(timer != INVALID_GW_TIMER); + return timer; +} -int NVMeofGwMap::set_gw_standby_state(GW_STATE_T* gw_state, uint8_t ANA_groupid) -{ - gw_state->sm_state[ANA_groupid] = GW_STANDBY_STATE; - gw_state->failover_peer[ANA_groupid] = "NULL"; - return 0; +void NVMeofGwMap::cancel_timer(const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, ANA_GRP_ID_T anagrpid) { + Gmetadata[group_key][nqn][gw_id].anagrp_sm_tstamps[anagrpid] = INVALID_GW_TIMER; } diff --git a/src/mon/NVMeofGwMap.h b/src/mon/NVMeofGwMap.h index 348bd3bee237..43aba52cd355 100755 --- a/src/mon/NVMeofGwMap.h +++ b/src/mon/NVMeofGwMap.h @@ -7,12 +7,8 @@ #ifndef MON_NVMEOFGWMAP_H_ #define MON_NVMEOFGWMAP_H_ -#include "string" -#include -#include "map" +#include #include -#include -#include #include "include/encoding.h" #include "include/utime.h" #include "common/Formatter.h" @@ -23,455 +19,74 @@ #include "PaxosService.h" #include "msg/Message.h" #include "common/ceph_time.h" -/*#include "NVMeofGwMon.h" - -using std::ostream; - -#define dout_subsys ceph_subsys_mon -#undef dout_prefix -#define dout_prefix _prefix(_dout, mon, this) -using namespace TOPNSPC::common; - -class NVMeofGwMap; - -inline ostream& _prefix(std::ostream *_dout, const Monitor &mon, - const NVMeofGwMap *map) { - return *_dout << "mon." << mon.name << "@" << mon.rank; -} - */ +#include "NVMeofGwTypes.h" using ceph::coarse_mono_clock; - -using GW_ID_T = std::string; -using ANA_GRP_ID_T = uint16_t; -typedef enum { - GW_IDLE_STATE = 0, //invalid state - GW_STANDBY_STATE, - GW_ACTIVE_STATE, - GW_BLOCKED_AGROUP_OWNER, - GW_WAIT_FAILBACK_PREPARED -}GW_STATES_PER_AGROUP_E; - -enum class GW_AVAILABILITY_E { - GW_CREATED = 0, - GW_AVAILABLE, - GW_UNAVAILABLE, - GW_DELETED -}; - -#define MAX_SUPPORTED_ANA_GROUPS 16 -#define INVALID_GW_TIMER 0xffff -#define REDUNDANT_GW_ANA_GROUP_ID 0xFF -typedef struct GW_STATE_T { - GW_STATES_PER_AGROUP_E sm_state [MAX_SUPPORTED_ANA_GROUPS]; // state machine states per ANA group - GW_ID_T failover_peer[MAX_SUPPORTED_ANA_GROUPS]; - ANA_GRP_ID_T optimized_ana_group_id; // optimized ANA group index as configured by Conf upon network entry, note for redundant GW it is FF - GW_AVAILABILITY_E availability; // in absence of beacon heartbeat messages it becomes inavailable - uint64_t version; // version per all GWs of the same subsystem. subsystem version -}GW_STATE_T; - -typedef struct GW_METADATA_T { - int anagrp_sm_tstamps[MAX_SUPPORTED_ANA_GROUPS]; // statemachine timer(timestamp) set in some state -}GW_METADATA_T; - - -using GWMAP = std::map >; -using GWMETADATA = std::map >; -using SUBSYST_GWMAP = std::map; -using SUBSYST_GWMETA = std::map; - -using NONCE_VECTOR_T = std::vector; -using GW_ANA_NONCE_MAP = std::map ; - - -typedef struct { - ANA_GRP_ID_T ana_grp_id; // ana-group-id allocated for this GW, GW owns this group-id - GW_ANA_NONCE_MAP nonce_map; -} GW_CREATED_T; - -using GW_CREATED_MAP = std::map; - - - -inline void encode(const GW_STATE_T& state, ceph::bufferlist &bl) { - for(int i = 0; i Gmap; + std::map Created_gws; + std::map Gmetadata; + + int cfg_add_gw (const GW_ID_T &gw_id, const GROUP_KEY& group_key); + int cfg_delete_gw (const GW_ID_T &gw_id, const GROUP_KEY& group_key); + void process_gw_map_ka (const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, bool &propose_pending); + int process_gw_map_gw_down (const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, bool &propose_pending); + void update_active_timers (bool &propose_pending); + void handle_abandoned_ana_groups (bool &propose_pending); + void handle_removed_subsystems (const std::vector ¤t_subsystems, const GROUP_KEY& group_key, bool &propose_pending); +private: + void fsm_handle_gw_down (const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, GW_STATES_PER_AGROUP_E state, ANA_GRP_ID_T grpid, bool &map_modified); + void fsm_handle_gw_delete (const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, GW_STATES_PER_AGROUP_E state, ANA_GRP_ID_T grpid, bool &map_modified); + void fsm_handle_to_expired (const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, ANA_GRP_ID_T grpid, bool &map_modified); - void encode_nonces(const GW_ANA_NONCE_MAP & nonce_map, ceph::buffer::list &bl, bool full_encode ) const - { - ENCODE_START(2, 1, bl); - - encode((int)nonce_map.size(), bl); - for (auto& itr : nonce_map) { - encode((ANA_GRP_ID_T)itr.first, bl); - // now encode the nonces - const NONCE_VECTOR_T &nonce_vector = itr.second; - encode ((int)nonce_vector.size(), bl); // encode the vector size - for(auto &list_it : nonce_vector ){ - encode(list_it, bl); - } - } - ENCODE_FINISH(bl); - } - - void decode_nonces(GW_ANA_NONCE_MAP & nonce_map, ceph::buffer::list::const_iterator &bl, bool full_decode = true) { - DECODE_START(1, bl); - int map_size; - ANA_GRP_ID_T ana_grp_id; - int vector_size; - std::string nonce; - - decode(map_size, bl); - for(int i = 0; iana_grp_id, bl); - encode_nonces(gw_created->nonce_map, bl, full_encode);//TODO "if not full_encode" to prevent sending full nonce map to the clients - } - encode ((int)Gmap.size(),bl); // number nqn - for (auto& itr : Gmap) { - encode((const std::string &)itr.first, bl);// nqn - encode( itr.second, bl);// encode the full map of this nqn : std::map - } + using ceph::encode; + __u8 struct_v = 0; + encode(struct_v, bl); // version + encode(epoch, bl);// global map epoch - if(full_encode) { - // Encode Gmetadata - encode ((int)Gmetadata.size(),bl); - for (auto& itr : Gmetadata) { - encode((const std::string &)itr.first, bl);// nqn - encode( itr.second, bl);// encode the full map of this nqn : - } + encode(Created_gws, bl); //Encode created GWs + encode(Gmap, bl); + if (full_encode) { + encode(Gmetadata, bl); } - ENCODE_FINISH(bl); } void decode(ceph::buffer::list::const_iterator &bl, bool full_decode = true) { - DECODE_START(1, bl); - int num_subsystems; - std::string nqn; + using ceph::decode; + __u8 struct_v; + decode(struct_v, bl); + ceph_assert(struct_v == 0); decode(epoch, bl); - //Decode created GWs - int num_created_gws; - decode(num_created_gws, bl); - Created_gws.clear(); - for(int i = 0; i())); - //decode the map - gw_map.clear(); - decode(gw_map, bl); - //insert the qw_map to Gmap - for(auto &itr: gw_map ){ - Gmap[nqn].insert({itr.first, itr.second}); - } - } - - if(full_decode){ - // decode Gmetadata - decode(num_subsystems, bl); - SUBSYST_GWMETA gw_meta; - Gmetadata.clear(); - //_dump_gwmap(Gmap); - for(int i = 0; i < num_subsystems; i++){ - decode(nqn, bl); - Gmetadata.insert(make_pair(nqn, std::map())); - //decode the map - gw_meta.clear(); - decode(gw_meta, bl); - //insert the gw_meta to Gmap - for(auto &itr: gw_meta ){ - Gmetadata[nqn].insert({itr.first, itr.second}); - } - } - } - DECODE_FINISH(bl); } - - int find_created_gw(const GW_ID_T &gw_id , int &ana_grp_id) const - { - auto it = Created_gws.find(gw_id); - if (it != Created_gws.end()) { - ana_grp_id = it->second.ana_grp_id; - return 0; - } - return -1; - } - - GW_CREATED_T* find_created_gw(const GW_ID_T &gw_id ) - { - auto it = Created_gws.find(gw_id); - if (it != Created_gws.end()) { - return &it->second; - } - return NULL; - } - - int update_gw_nonce(const GW_ID_T &gw_id, ANA_GRP_ID_T &ana_grp_id, NONCE_VECTOR_T &new_nonces) - { - GW_CREATED_T* gw_created = find_created_gw(gw_id); - if(!gw_created) - return 1; - if (new_nonces.size() >0){ - GW_ANA_NONCE_MAP & nonce_map = gw_created->nonce_map; - if(nonce_map[ana_grp_id].size() == 0){ - nonce_map.insert({ana_grp_id, NONCE_VECTOR_T()}) ; - } - nonce_map[ana_grp_id].clear(); - nonce_map[ana_grp_id].reserve(new_nonces.size()); //gw_created->nonces.clear();// gw_created->nonces.reserve(new_nonces.size()); - for( auto &it : new_nonces){ - nonce_map[ana_grp_id].push_back(it); //gw_created->nonces.push_back(it); - } - } - return 0; - } - - int destroy_gw(const GW_ID_T &gw_id) - { - GW_CREATED_T* gw_created = find_created_gw(gw_id); - if( gw_created ) - { - GW_ANA_NONCE_MAP & nonce_map = gw_created->nonce_map; - for(auto &it : nonce_map){ - it.second.clear();// clear the nonce contexts - } - nonce_map.clear(); - return 0; - } - else - return 1; - - } - - - GW_STATE_T * find_gw_map(const GW_ID_T &gw_id, const std::string& nqn ) const - { - auto it = Gmap.find(nqn); - if (it != Gmap.end() /* && it->first == nqn*/) { - auto it2 = it->second.find(gw_id); - if (it2 != it->second.end() /* && it2->first == gw_id*/ ){ // cout << "AAAA " << gw_id << " " << it2->first << endl; - return (GW_STATE_T *) &it2->second; - } - } - return NULL; - } - - int insert_gw_to_map(const GW_ID_T &gw_id, const std::string& nqn, int ana_grp_id ){ - if(Gmap[nqn].size() == 0) - Gmap.insert(make_pair(nqn, SUBSYST_GWMAP())); - - GW_STATE_T state{ {GW_IDLE_STATE,}, {""}, (uint16_t)ana_grp_id, GW_AVAILABILITY_E::GW_CREATED, 0 }; - for(int i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i++) state.failover_peer[i] = "NULL"; - - Gmap[nqn].insert({gw_id, state}); - create_metadata(gw_id, nqn); - return 0; - } - - int update_active_timers( bool &propose_pending); - epoch_t get_epoch() const { return epoch; } - int _dump_gwmap(GWMAP & Gmap)const; - int _dump_gwmap(std::stringstream &ss)const ; - int _dump_created_gws(std::stringstream &ss)const ; - int cfg_add_gw (const GW_ID_T &gw_id, const std::string& pool, const std::string& group); - int cfg_delete_gw (const GW_ID_T &gw_id, const std::string& pool, const std::string& group, bool &propose_pending); - int process_gw_map_ka (const GW_ID_T &gw_id, const std::string& nqn , bool &propose_pending); - int process_gw_map_gw_down (const GW_ID_T &gw_id, const std::string& nqn, bool &propose_pending); - int handle_abandoned_ana_groups (bool &propose_pending); - int handle_removed_subsystems (const std::vector &created_subsystems, bool &propose_pending); - - //make these functions static - static void gw_name_from_id_pool_group (std::string &gw_name , const std::string &gw_id ,const std::string &gw_pool, const std::string &gw_group ){ - gw_name = gw_id + "." + gw_pool + "." + gw_group; - } - static void gw_preffix_from_id_pool_group (std::string &gw_preffix ,const std::string &gw_pool, const std::string &gw_group ){ - gw_preffix = "." + gw_pool + "." + gw_group; - } - - void debug_encode_decode(){ - ceph::buffer::list bl; - encode(bl); - auto p = bl.cbegin(); - decode(p); - } - -private: - int fsm_handle_gw_down (const GW_ID_T &gw_id, const std::string& nqn, GW_STATES_PER_AGROUP_E state, int grpid, bool &map_modified); - int fsm_handle_gw_delete (const GW_ID_T &gw_id, const std::string& nqn, GW_STATES_PER_AGROUP_E state, int grpid, bool &map_modified); - int fsm_handle_gw_up (const GW_ID_T &gw_id, const std::string& nqn, GW_STATES_PER_AGROUP_E state, int grpid, bool &map_modified); - int fsm_handle_to_expired (const GW_ID_T &gw_id, const std::string& nqn, int grpid, bool &map_modified); - - int find_failover_candidate(const GW_ID_T &gw_id, const std::string& nqn, GW_STATE_T* gw_state, int grpid, bool &propose_pending); - int find_failback_gw (const GW_ID_T &gw_id, const std::string& nqn, GW_STATE_T* gw_state, bool &found); - int set_failover_gw_for_ANA_group (const GW_ID_T &failed_gw_id, const GW_ID_T &gw_id, const std::string& nqn, uint8_t ANA_groupid); - int set_gw_standby_state(GW_STATE_T* gw_state, uint8_t ANA_groupid); - - SUBSYST_GWMAP * find_subsystem_map(const std::string& nqn) - { - auto it = Gmap.find(nqn); - if (it != Gmap.end() ){ - return &it->second; - } - return NULL; - } - - int create_metadata(const GW_ID_T& gw_id, const std::string & nqn) - { - - if(Gmetadata[nqn].size() == 0) - Gmetadata.insert(make_pair(nqn, std::map())); - //Gmetadata[nqn].insert({ gw_id, new_metadata }); - return 0; - } - - int delete_metadata(const GW_ID_T& gw_id, const std::string & nqn) - { - if(Gmetadata[nqn].size() != 0) - Gmetadata[nqn].erase(gw_id); - return 0; - } - - int start_timer(const GW_ID_T &gw_id, const std::string& nqn, uint16_t anagrpid) - { - GW_METADATA_T* metadata; - //const auto now = ceph::coarse_mono_clock::now(); - if ((metadata = find_gw_metadata(gw_id, nqn)) != NULL) { - metadata->anagrp_sm_tstamps[anagrpid] = 0;// set timer - } - else { - GW_METADATA_T new_metadata = {INVALID_GW_TIMER,}; - for (int i=0; ianagrp_sm_tstamps[anagrpid] != INVALID_GW_TIMER); - return metadata->anagrp_sm_tstamps[anagrpid]; - } - else{ - ceph_assert(false); - } - } - - int cancel_timer(const GW_ID_T &gw_id, const std::string& nqn, uint16_t anagrpid) - { - GW_METADATA_T* metadata; - int i; - if ((metadata = find_gw_metadata(gw_id, nqn)) != NULL) { - metadata->anagrp_sm_tstamps[anagrpid] = INVALID_GW_TIMER; - for(i=0; ianagrp_sm_tstamps[i] != INVALID_GW_TIMER) - break; - if(i==MAX_SUPPORTED_ANA_GROUPS){ - Gmetadata[nqn].erase(gw_id); // remove all gw_id timers from the map - } - } - else { - ceph_assert(false); - } - return 0; - } - - GW_METADATA_T* find_gw_metadata(const GW_ID_T &gw_id, const std::string& nqn); }; +#include "NVMeofGwSerialize.h" + #endif /* SRC_MON_NVMEOFGWMAP_H_ */ diff --git a/src/mon/NVMeofGwMon.cc b/src/mon/NVMeofGwMon.cc index 9acdb1bf3a46..6c5a434d9783 100644 --- a/src/mon/NVMeofGwMon.cc +++ b/src/mon/NVMeofGwMon.cc @@ -12,25 +12,18 @@ #include "messages/MNVMeofGwBeacon.h" #include "messages/MNVMeofGwMap.h" -using std::map; -using std::make_pair; -using std::ostream; -using std::ostringstream; using std::string; -using std::vector; - #define dout_subsys ceph_subsys_mon #undef dout_prefix #define dout_prefix _prefix(_dout, this, this) using namespace TOPNSPC::common; -static ostream& _prefix(std::ostream *_dout, const NVMeofGwMon *h,//const Monitor &mon, +static std::ostream& _prefix(std::ostream *_dout, const NVMeofGwMon *h,//const Monitor &mon, const NVMeofGwMon *hmon) { return *_dout << "gw-mon." << hmon->mon.name << "@" << hmon->mon.rank; } #define MY_MON_PREFFIX " NVMeGW " - void NVMeofGwMon::init(){ dout(4) << MY_MON_PREFFIX << __func__ << "called " << dendl; } @@ -41,92 +34,7 @@ void NVMeofGwMon::on_restart(){ last_tick = ceph::coarse_mono_clock::now(); } - -void NVMeofGwMon::on_shutdown() { - -} - -static int cnt ; -#define start_cnt 6 -void NVMeofGwMon::inject1(){ - //bool propose = false; - if( ++cnt == 4 ){// simulation that new configuration was added - pending_map.cfg_add_gw("GW1" ,"g1","p1"); - pending_map.cfg_add_gw("GW2" ,"g1","p1"); - pending_map.cfg_add_gw("GW3" ,"g1","p1"); - NONCE_VECTOR_T new_nonces = {"abc", "def","hij"}; - ANA_GRP_ID_T grp = 1; - pending_map.update_gw_nonce("GW1.g1.p1", grp, new_nonces); - grp = 2; - pending_map.update_gw_nonce("GW1.g1.p1", grp, new_nonces); - std::stringstream ss; - pending_map._dump_created_gws(ss); - dout(4) << ss.str() << dendl; - - //pending_map._dump_gwmap(pending_map.Gmap); - pending_map.debug_encode_decode(); - dout(4) << "Dump map after decode encode:" <("mon_nvmeofgw_beacon_grace"); dout(4) << MY_MON_PREFFIX << __func__ << "NVMeofGwMon leader got a real tick, pending epoch "<< pending_map.epoch << dendl; @@ -171,18 +79,16 @@ void NVMeofGwMon::tick(){ const auto cutoff = now - nvmegw_beacon_grace; for(auto &itr : last_beacon){// Pass over all the stored beacons + auto& lb = itr.first; auto last_beacon_time = itr.second; - GW_ID_T gw_name; - std::string nqn; if(last_beacon_time < cutoff){ - get_gw_and_nqn_from_key(itr.first, gw_name, nqn); - dout(4) << "beacon timeout for GW " << gw_name << " nqn " << nqn << dendl; - pending_map.process_gw_map_gw_down( gw_name, nqn, propose); + dout(4) << "beacon timeout for GW " << lb.gw_id << " nqn " << lb.nqn << dendl; + pending_map.process_gw_map_gw_down( lb.gw_id, lb.group_key, lb.nqn, propose); _propose_pending |= propose; - last_beacon.erase(itr.first); + last_beacon.erase(lb); } - else{ - dout(4) << "beacon live for GW key: " << itr.first << dendl; + else { + dout(4) << "beacon live for GW key: " << lb.gw_id << " nqn " << lb.nqn << dendl; } } @@ -205,7 +111,6 @@ const char **NVMeofGwMon::get_tracked_conf_keys() const static const char* KEYS[] = { "nvmf_mon_mapdump", "nvmf_mon_log_level", - //"rocksdb_cache_size", NULL }; return KEYS; @@ -217,31 +122,19 @@ void NVMeofGwMon::handle_conf_change(const ConfigProxy& conf, dout(4) << __func__ << " " << changed << dendl; if (changed.count("nvmef_gw_mapdump")) { - //_set_cache_autotuning(); - std::stringstream ss1; - pending_map._dump_gwmap(ss1); - - std::stringstream ss2; - pending_map._dump_created_gws(ss2); - + dout(4) << "pending_map " << pending_map << dendl; } if (changed.count("nvmf_mon_log_level")){ dout(4) << "TODO SET LOG LEVEL >= " << g_conf()->nvmf_mon_log_level << dendl; } } - void NVMeofGwMon::create_pending(){ pending_map = map;// deep copy of the object // TODO since "pending_map" can be reset each time during paxos re-election even in the middle of the changes ... pending_map.epoch++; - //map.epoch ++; - dout(4) << MY_MON_PREFFIX << __func__ << " pending epoch " << pending_map.epoch << dendl; - - dout(5) << MY_MON_PREFFIX << "dump_pending" << dendl; - - pending_map._dump_gwmap(pending_map.Gmap); + dout(4) << MY_MON_PREFFIX << __func__ << " pending " << pending_map << dendl; } void NVMeofGwMon::encode_pending(MonitorDBStore::TransactionRef t){ @@ -268,10 +161,7 @@ void NVMeofGwMon::update_from_paxos(bool *need_bootstrap){ auto p = bl.cbegin(); map.decode(p); if(!mon.is_leader()) { - std::stringstream ss; - map._dump_created_gws(ss); - dout(4) << ss.str() << dendl; - map._dump_gwmap(map.Gmap); + dout(4) << "leader map: " << map << dendl; } check_subs(true); @@ -282,8 +172,8 @@ void NVMeofGwMon::check_sub(Subscription *sub) { /* MgrMonitor::check_sub*/ //if (sub->type == "NVMeofGw") { - dout(4) << "sub->next , map-epoch " << sub->next << " " << map.get_epoch() << dendl; - if (sub->next <= map.get_epoch()) + dout(4) << "sub->next , map-epoch " << sub->next << " " << map.epoch << dendl; + if (sub->next <= map.epoch) { dout(4) << "Sending map to subscriber " << sub->session->con << " " << sub->session->con->get_peer_addr() << dendl; sub->session->con->send_message2(make_message(map)); @@ -291,7 +181,7 @@ void NVMeofGwMon::check_sub(Subscription *sub) if (sub->onetime) { mon.session_map.remove_sub(sub); } else { - sub->next = map.get_epoch() + 1; + sub->next = map.epoch + 1; } } } @@ -423,24 +313,23 @@ bool NVMeofGwMon::prepare_command(MonOpRequestRef op) const auto prefix = cmd_getval_or(cmdmap, "prefix", string{}); dout(4) << "MonCommand : "<< prefix << dendl; - bool map_modified = false; if( prefix == "nvme-gw create" || prefix == "nvme-gw delete" ) { std::string id, pool, group; cmd_getval(cmdmap, "id", id); cmd_getval(cmdmap, "pool", pool); cmd_getval(cmdmap, "group", group); + auto group_key = std::make_pair(pool, group); if(prefix == "nvme-gw create"){ - rc = pending_map.cfg_add_gw(id ,pool , group); + rc = pending_map.cfg_add_gw(id, group_key); ceph_assert(rc!= -EINVAL); - map_modified = true; } else{ - rc = pending_map.cfg_delete_gw(id, pool, group, map_modified);// TODO add params + rc = pending_map.cfg_delete_gw(id, group_key); ceph_assert(rc!= -EINVAL); } - if(map_modified){ + if(rc != -EEXIST){ propose_pending(); goto update; } @@ -482,51 +371,34 @@ bool NVMeofGwMon::preprocess_beacon(MonOpRequestRef op){ } -#define GW_DELIM ',' - -void NVMeofGwMon::get_gw_and_nqn_from_key(std::string key, GW_ID_T &gw_name , std::string& nqn) -{ - std::stringstream s1(key); - - std::getline(s1, gw_name, GW_DELIM); - std::getline(s1, nqn, GW_DELIM); -} - //#define BYPASS_GW_CREATE_CLI bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op){ dout(4) << MY_MON_PREFFIX <<__func__ << dendl; - GW_STATE_T* gw_state = NULL; auto m = op->get_req(); - // dout(4) << "availability " << m->get_availability() << " GW : " <get_gw_id() << " subsystems " << m->get_subsystems() << " epoch " << m->get_version() << dendl; - std::stringstream out; - m->print(out); - dout(4) << out.str() <get_availability() << " GW : " <get_gw_id() << " subsystems " << m->get_subsystems() << " epoch " << m->get_version() << dendl; GW_ID_T gw_id = m->get_gw_id(); - GW_ID_T pool = m->get_gw_pool(); - GW_ID_T group = m->get_gw_group(); + GROUP_KEY group_key = std::make_pair(m->get_gw_pool(), m->get_gw_group()); GW_AVAILABILITY_E avail = m->get_availability(); const GwSubsystems& subsystems = m->get_subsystems(); bool propose = false; - int ana_grp_id = 0; - std::vector configured_subsystems; - - std::string gw_name; - pending_map.gw_name_from_id_pool_group(gw_name , gw_id , pool, group ); + ANA_GRP_ID_T ana_grp_id = 0; + std::vector configured_subsystems; if (avail == GW_AVAILABILITY_E::GW_CREATED){ // in this special state GWs receive map with just "created_gws" vector - if(pending_map.find_created_gw(gw_name, ana_grp_id) == 0) {// GW is created administratively - dout(4) << "GW " << gw_name << " sent beacon being in state GW_WAIT_INITIAL_MAP" << dendl; + auto& created_gw = pending_map.Created_gws[group_key][gw_id]; + if(created_gw.ana_grp_id == ana_grp_id) {// GW is created administratively + dout(4) << "GW " << gw_id << " sent beacon being in state GW_WAIT_INITIAL_MAP" << dendl; propose = true; } else{ - dout(4) << "GW " << gw_name << " sent beacon being in state GW_WAIT_INITIAL_MAP but it is not created yet!!! "<< dendl; + dout(4) << "GW " << gw_id << " sent beacon being in state GW_WAIT_INITIAL_MAP but it is not created yet!!! "<< dendl; #ifdef BYPASS_GW_CREATE_CLI - pending_map.cfg_add_gw(gw_name); - dout(4) << "GW " << gw_name << " created since mode is bypass-create-cli "<< dendl; + pending_map.cfg_add_gw(gw_id); + dout(4) << "GW " << gw_id << " created since mode is bypass-create-cli "<< dendl; propose= true; #endif } @@ -536,14 +408,19 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op){ // Validation gw is in the database for (const NqnState &st : subsystems) { - gw_state = pending_map.find_gw_map( gw_name, st.nqn ); - if (gw_state == NULL) + auto& nqn_gws_states = pending_map.Gmap[group_key][st.nqn]; + auto gw_state = nqn_gws_states.find(gw_id); + if (gw_state == nqn_gws_states.end()) { - dout(4) << "GW + NQN pair is not in the database: " << gw_name << " " << st.nqn << dendl; - // if GW is created - if(pending_map.find_created_gw(gw_name, ana_grp_id) == 0) {// GW is created administratively - pending_map.insert_gw_to_map(gw_name, st.nqn, ana_grp_id); - dout(4) << "GW + NQN pair " << gw_name << " " << st.nqn << " inserted to map, ANA grp-id " << ana_grp_id << dendl; + dout(4) << "GW + NQN pair is not in the database: " << gw_id << " " << st.nqn << dendl; + // if GW is created + auto& group_gws = pending_map.Created_gws[group_key]; + auto gw_state = group_gws.find(gw_id); + if (gw_state != group_gws.end()) { + GW_STATE_T gst(ana_grp_id); + pending_map.Gmap[group_key][st.nqn][gw_id] = gst; + GW_METADATA_T md; + pending_map.Gmetadata[group_key][st.nqn][gw_id] = md; } else { //drop beacon on the floor silently discard @@ -552,7 +429,7 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op){ } configured_subsystems.push_back(st.nqn); } - pending_map.handle_removed_subsystems( configured_subsystems, propose ); + pending_map.handle_removed_subsystems( configured_subsystems, group_key, propose ); if(avail == GW_AVAILABILITY_E::GW_AVAILABLE) { @@ -560,19 +437,21 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op){ // check pending_map.epoch vs m->get_version() - if different - drop the beacon for (const NqnState& st: subsystems) { - last_beacon[(gw_name + GW_DELIM + st.nqn)] = now; - pending_map.process_gw_map_ka( gw_name, st.nqn, propose ); + LastBeacon lb = { gw_id, group_key, st.nqn }; + last_beacon[lb] = now; + pending_map.process_gw_map_ka( gw_id, group_key, st.nqn, propose ); } } else if(avail == GW_AVAILABILITY_E::GW_UNAVAILABLE){ // state set by GW client application // TODO: remove from last_beacon if found . if gw was found in last_beacon call process_gw_map_gw_down for (const NqnState& st: subsystems) { + LastBeacon lb = { gw_id, group_key, st.nqn }; - auto it = last_beacon.find(gw_name + GW_DELIM + st.nqn); + auto it = last_beacon.find(lb); if (it != last_beacon.end()){ - last_beacon.erase(gw_name + GW_DELIM + st.nqn); - pending_map.process_gw_map_gw_down( gw_name, st.nqn, propose ); + last_beacon.erase(lb); + pending_map.process_gw_map_gw_down( gw_id, group_key, st.nqn, propose ); } } } diff --git a/src/mon/NVMeofGwMon.h b/src/mon/NVMeofGwMon.h index 97aad7e03433..a5d83e570ff4 100755 --- a/src/mon/NVMeofGwMon.h +++ b/src/mon/NVMeofGwMon.h @@ -4,27 +4,38 @@ * Created on: Oct 17, 2023 * Author: 227870756 */ - #ifndef MON_NVMEGWMONITOR_H_ #define MON_NVMEGWMONITOR_H_ -#include -#include -#include "include/Context.h" -//#include "MgrMap.h" -#include "PaxosService.h" -#include "MonCommand.h" #include "NVMeofGwMap.h" +struct LastBeacon { + GW_ID_T gw_id; + GROUP_KEY group_key; + NQN_ID_T nqn; + + // Comparison operators to allow usage as a map key + bool operator<(const LastBeacon& other) const { + if (gw_id != other.gw_id) return gw_id < other.gw_id; + if (group_key != other.group_key) return group_key < other.group_key; + return nqn < other.nqn; + } + + bool operator==(const LastBeacon& other) const { + return gw_id == other.gw_id && + group_key == other.group_key && + nqn == other.nqn; + } +}; + class NVMeofGwMon: public PaxosService, public md_config_obs_t { - NVMeofGwMap map; //NVMeGWMap + NVMeofGwMap map; //NVMeGWMap NVMeofGwMap pending_map; - //utime_t first_seen_inactive; //TODO the key of the beacon is a unique gw-id; for example string consisting from gw_num + subsystem_nqn - std::map< std::string, ceph::coarse_mono_clock::time_point> last_beacon; + std::map last_beacon; // when the mon was not updating us for some period (e.g. during slow @@ -44,21 +55,11 @@ class NVMeofGwMon: public PaxosService, const char** get_tracked_conf_keys() const override; void handle_conf_change(const ConfigProxy& conf, const std::set &changed) override; - //const MgrMap &get_map() const { return map; } - - // bool in_use() const { return map.epoch > 0; } - - //void prime_mgr_client(); - - - // void get_store_prefixes(std::set& s) const override; - // 3 pure virtual methods of the paxosService void create_initial()override{}; void create_pending()override ; void encode_pending(MonitorDBStore::TransactionRef t)override ; - void init() override; void on_shutdown() override; void on_restart() override; @@ -76,17 +77,10 @@ class NVMeofGwMon: public PaxosService, bool preprocess_beacon(MonOpRequestRef op); bool prepare_beacon(MonOpRequestRef op); - //void check_sub(Subscription *sub); - //void check_subs() - void tick() override; void print_summary(ceph::Formatter *f, std::ostream *ss) const; - //const std::vector &get_command_descs() const; - - - //void get_versions(std::map> &versions); void check_subs(bool type); void check_sub(Subscription *sub); private: @@ -95,6 +89,4 @@ class NVMeofGwMon: public PaxosService, }; - - #endif /* SRC_MON_NVMEGWMONITOR_H_ */ diff --git a/src/mon/NVMeofGwSerialize.h b/src/mon/NVMeofGwSerialize.h new file mode 100755 index 000000000000..e99cb4c9b311 --- /dev/null +++ b/src/mon/NVMeofGwSerialize.h @@ -0,0 +1,311 @@ +/* + * NVMeofGwSerialize.h + * + * Created on: Dec 29, 2023 + */ + +#ifndef MON_NVMEOFGWSERIALIZE_H_ +#define MON_NVMEOFGWSERIALIZE_H_ + +inline void encode(const GW_STATE_T& state, ceph::bufferlist &bl) { + for(int i = 0; i & created_gws, ceph::bufferlist &bl) { + encode (created_gws.size(), bl); // number of groups + for (auto& group_gws: created_gws) { + auto& group_key = group_gws.first; + encode(group_key.first, bl); // pool + encode(group_key.second, bl); // group + + auto& gws = group_gws.second; + encode (gws, bl); // encode group gws + } +} + +inline void decode(std::map& created_gws, ceph::buffer::list::const_iterator &bl) { + created_gws.clear(); + size_t ngroups; + decode(ngroups, bl); + for(size_t i = 0; i + } +} + +inline void decode(GWMAP& nqn_gws_states, ceph::buffer::list::const_iterator &bl) { + size_t num_subsystems; + + decode(num_subsystems, bl); + SUBSYST_GWMAP gw_map; + nqn_gws_states.clear(); + + for (size_t i = 0; i < num_subsystems; i++) { + std::string nqn; + decode(nqn, bl); + SUBSYST_GWMAP gw_map; + decode(gw_map, bl); + nqn_gws_states[nqn] = gw_map; + } +} + + +inline void encode(const std::map& gmap, ceph::bufferlist &bl) { + encode (gmap.size(), bl); // number of groups + for (auto& group_state: gmap) { + auto& group_key = group_state.first; + encode(group_key.first, bl); // pool + encode(group_key.second, bl); // group + + encode(group_state.second, bl); + } +} + +inline void decode(std::map& gmap, ceph::buffer::list::const_iterator &bl) { + gmap.clear(); + size_t ngroups; + decode(ngroups, bl); + for(size_t i = 0; i& gmetadata, ceph::bufferlist &bl) { + encode (gmetadata.size(), bl); // number of groups + for (auto& group_md: gmetadata) { + auto& group_key = group_md.first; + encode(group_key.first, bl); // pool + encode(group_key.second, bl); // group + + encode(group_md.second, bl); + } +} + +inline void decode(std::map& gmetadata, ceph::buffer::list::const_iterator &bl) { + gmetadata.clear(); + size_t ngroups; + decode(ngroups, bl); + for(size_t i = 0; i "; + for (auto& gw_state: nqn_state.second) { + os << " { gw_id: " << gw_state.first << " -> " << gw_state.second << "}"; + } + os << "}"; + } + return os; +}; + +inline std::ostream& operator<<(std::ostream& os, const NVMeofGwMap value) { + os << "NVMeofGwMap [ Gmap: "; + for (auto& group_state: value.Gmap) { + os << " { " << group_state.first << " } -> { " << group_state.second << " }"; + } + os << " ] [ Created_gws: "; + for (auto& group_gws: value.Created_gws) { + os << " { " << group_gws.first << " } -> { "; + for (auto& gw: group_gws.second) { + os << " { gw_id " << gw.first << " } -> { " << gw.second.ana_grp_id << " }"; + } + os << " }"; + } + os << "]"; + return os; +} +#endif /* SRC_MON_NVMEOFGWSERIALIZEP_H_ */ diff --git a/src/mon/NVMeofGwTypes.h b/src/mon/NVMeofGwTypes.h new file mode 100755 index 000000000000..4a759eeacb77 --- /dev/null +++ b/src/mon/NVMeofGwTypes.h @@ -0,0 +1,108 @@ +/* + * NVMeofGwTypes.h + * + * Created on: Dec 29, 2023 + */ + +#ifndef MON_NVMEOFGWTYPES_H_ +#define MON_NVMEOFGWTYPES_H_ +#include +#include +#include +#include + +using GW_ID_T = std::string; +using GROUP_KEY = std::pair; +using NQN_ID_T = std::string; +using ANA_GRP_ID_T = uint32_t; + + +enum class GW_STATES_PER_AGROUP_E { + GW_IDLE_STATE = 0, //invalid state + GW_STANDBY_STATE, + GW_ACTIVE_STATE, + GW_BLOCKED_AGROUP_OWNER, + GW_WAIT_FAILBACK_PREPARED +}; + +enum class GW_AVAILABILITY_E { + GW_CREATED = 0, + GW_AVAILABLE, + GW_UNAVAILABLE, + GW_DELETED +}; + +#define MAX_SUPPORTED_ANA_GROUPS 16 +#define INVALID_GW_TIMER 0xffff +#define REDUNDANT_GW_ANA_GROUP_ID 0xFF + +typedef GW_STATES_PER_AGROUP_E SM_STATE[MAX_SUPPORTED_ANA_GROUPS]; + +struct NqnState { + std::string nqn; // subsystem NQN + SM_STATE sm_state; // susbsystem's state machine state + uint16_t opt_ana_gid; // optimized ANA group index + + // Default constructor + NqnState(const std::string& _nqn) : nqn(_nqn), opt_ana_gid(0) { + for (int i=0; i < MAX_SUPPORTED_ANA_GROUPS; i++) + sm_state[i] = GW_STATES_PER_AGROUP_E::GW_IDLE_STATE; + } +}; + +typedef std::vector GwSubsystems; + +struct GW_STATE_T { + SM_STATE sm_state; // state machine states per ANA group + GW_ID_T failover_peer[MAX_SUPPORTED_ANA_GROUPS]; + ANA_GRP_ID_T optimized_ana_group_id; // optimized ANA group index as configured by Conf upon network entry, note for redundant GW it is FF + GW_AVAILABILITY_E availability; // in absence of beacon heartbeat messages it becomes inavailable + uint64_t version; // version per all GWs of the same subsystem. subsystem version + + GW_STATE_T(ANA_GRP_ID_T id): + optimized_ana_group_id(id), + availability(GW_AVAILABILITY_E::GW_CREATED), + version(0) + { + for (int i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i++) + sm_state[i] = GW_STATES_PER_AGROUP_E::GW_IDLE_STATE; + }; + + GW_STATE_T() : GW_STATE_T(REDUNDANT_GW_ANA_GROUP_ID) {}; + + void standby_state(ANA_GRP_ID_T grpid) { + sm_state[grpid] = GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE; + failover_peer[grpid] = ""; + }; +}; + +struct GW_METADATA_T { + int anagrp_sm_tstamps[MAX_SUPPORTED_ANA_GROUPS]; // statemachine timer(timestamp) set in some state + + GW_METADATA_T() { + for (int i=0; i >; +using GWMETADATA = std::map >; +using SUBSYST_GWMAP = std::map; +using SUBSYST_GWMETA = std::map; + +using NONCE_VECTOR_T = std::vector; +using GW_ANA_NONCE_MAP = std::map ; + + +struct GW_CREATED_T { + ANA_GRP_ID_T ana_grp_id; // ana-group-id allocated for this GW, GW owns this group-id + GW_ANA_NONCE_MAP nonce_map; + + GW_CREATED_T(): ana_grp_id(REDUNDANT_GW_ANA_GROUP_ID) {}; + GW_CREATED_T(ANA_GRP_ID_T id): ana_grp_id(id) {}; +}; + +using GW_CREATED_MAP = std::map; + +#endif /* SRC_MON_NVMEOFGWTYPES_H_ */ diff --git a/src/nvmeof/NVMeofGw.cc b/src/nvmeof/NVMeofGw.cc index 3f2ac6385d6a..0b97a45e92a5 100644 --- a/src/nvmeof/NVMeofGw.cc +++ b/src/nvmeof/NVMeofGw.cc @@ -197,12 +197,11 @@ void NVMeofGw::send_beacon() for (int i = 0; i < gw_subsystems.subsystems_size(); i++) { const subsystem& sub = gw_subsystems.subsystems(i); struct NqnState nqn_state(sub.nqn()); - GW_STATE_T* gw_state = map.find_gw_map(name, nqn_state.nqn); - if (gw_state) { - nqn_state.opt_ana_gid = gw_state->optimized_ana_group_id; - for (int i=0; i < MAX_SUPPORTED_ANA_GROUPS; i++) - nqn_state.sm_state[i] = gw_state->sm_state[i]; - } + auto group_key = std::make_pair(pool, group); + GW_STATE_T& gw_state = map.Gmap[group_key][nqn_state.nqn][name]; + nqn_state.opt_ana_gid = gw_state.optimized_ana_group_id; + for (int i=0; i < MAX_SUPPORTED_ANA_GROUPS; i++) + nqn_state.sm_state[i] = gw_state.sm_state[i]; subs.push_back(nqn_state); } } @@ -267,30 +266,28 @@ void NVMeofGw::handle_nvmeof_gw_map(ceph::ref_t mmap) { dout(0) << "handle nvmeof gw map" << dendl; auto &mp = mmap->get_map(); - dout(0) << "received map epoch " << mp.get_epoch() << dendl; - std::stringstream ss; - mp._dump_gwmap(ss); - dout(0) << ss.str() << dendl; + dout(0) << "received map epoch " << mp.epoch << dendl; + dout(0) << "mp " << mp << dendl; ana_info ai; - std::string gw_name; - NVMeofGwMap::gw_name_from_id_pool_group(gw_name , name , pool, group); + auto group_key = std::make_pair(pool, group); if (map.epoch == 0){ // initial map - int ana_grp_id = -1; - - if(mp.find_created_gw(gw_name ,ana_grp_id) !=0) + auto group_gws = mp.Created_gws.find(group_key); + if (group_gws == mp.Created_gws.end()) { + dout(0) << "Failed to find group key " << group_key << "created gw for " << name << dendl; + return; + } + auto gw = group_gws->second.find(name); + if(gw == group_gws->second.end()) { - dout(0) << "Failed to find created gw for " << gw_name << dendl; + dout(0) << "Failed to find created gw for " << name << dendl; return; } - std::stringstream ss1; - mp._dump_created_gws(ss1); - dout(0) << ss1.str() << dendl; bool set_group_id = false; while (!set_group_id) { NVMeofGwMonitorGroupClient monitor_group_client( grpc::CreateChannel(monitor_address, grpc::InsecureChannelCredentials())); - dout(0) << "GRPC set_group_id: " << ana_grp_id << dendl; - set_group_id = monitor_group_client.set_group_id( ana_grp_id); + dout(0) << "GRPC set_group_id: " << gw->second.ana_grp_id << dendl; + set_group_id = monitor_group_client.set_group_id( gw->second.ana_grp_id); if (!set_group_id) { dout(0) << "GRPC set_group_id failed" << dendl; usleep(1000); // TODO: conf options @@ -299,45 +296,50 @@ void NVMeofGw::handle_nvmeof_gw_map(ceph::ref_t mmap) } // Interate over NQNs - for (const auto& subsystemPair : mp.Gmap) { - const std::string& nqn = subsystemPair.first; - const auto& idStateMap = subsystemPair.second; - nqn_ana_states nas; - nas.set_nqn(nqn); - - // This gateway state for the current subsystem / nqn - const auto& new_gateway_state = idStateMap.find(gw_name); - - // There is no subsystem update for this gateway - if (new_gateway_state == idStateMap.end()) continue; - - // Previously monitor distributed state - GW_STATE_T* old_gw_state = map.find_gw_map(gw_name, nqn); - - // Iterate over possible ANA Groups - for (uint32_t ana_grp_index = 0; ana_grp_index < MAX_SUPPORTED_ANA_GROUPS; ana_grp_index++) { - ana_group_state gs; - gs.set_grp_id(ana_grp_index + 1); // offset by 1, index 0 is ANAGRP1 - - // There is no state change for this ANA Group - auto old_state = old_gw_state ? old_gw_state->sm_state[ana_grp_index] : GW_STATES_PER_AGROUP_E::GW_IDLE_STATE; - if (old_state == new_gateway_state->second.sm_state[ana_grp_index]) continue; - - // detect was active, but not any more transition - if ((old_state == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE || old_state == GW_STATES_PER_AGROUP_E::GW_IDLE_STATE ) && - new_gateway_state->second.sm_state[ana_grp_index] != GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE) { - gs.set_state(INACCESSIBLE); // Set the ANA state - nas.mutable_states()->Add(std::move(gs)); - dout(0) << "nqn: " << nqn << " grpid " << (ana_grp_index + 1) << " INACCESSIBLE" <second.sm_state[ana_grp_index] == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE) { - gs.set_state(OPTIMIZED); // Set the ANA state - nas.mutable_states()->Add(std::move(gs)); - dout(0) << "nqn: " << nqn << " grpid " << (ana_grp_index + 1) << " OPTIMIZED" <second) { + const auto& nqn = subsystemPair.first; + const auto& idStateMap = subsystemPair.second; + nqn_ana_states nas; + nas.set_nqn(nqn); + + // This gateway state for the current subsystem / nqn + const auto& new_gateway_state = idStateMap.find(name); + + // There is no subsystem update for this gateway + if (new_gateway_state == idStateMap.end()) continue; + + // Previously monitor distributed state + GW_STATE_T& old_gw_state = map.Gmap[group_key][nqn][name]; + + // Iterate over possible ANA Groups + for (ANA_GRP_ID_T ana_grp_index = 0; ana_grp_index < MAX_SUPPORTED_ANA_GROUPS; ana_grp_index++) { + ana_group_state gs; + gs.set_grp_id(ana_grp_index + 1); // offset by 1, index 0 is ANAGRP1 + + // There is no state change for this ANA Group + auto old_state = old_gw_state.sm_state[ana_grp_index]; + if (old_state == new_gateway_state->second.sm_state[ana_grp_index]) continue; + + // detect was active, but not any more transition + if ((old_state == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE || old_state == GW_STATES_PER_AGROUP_E::GW_IDLE_STATE ) && + new_gateway_state->second.sm_state[ana_grp_index] != GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE) { + gs.set_state(INACCESSIBLE); // Set the ANA state + nas.mutable_states()->Add(std::move(gs)); + dout(0) << "nqn: " << nqn << " grpid " << (ana_grp_index + 1) << " INACCESSIBLE" <second.sm_state[ana_grp_index] == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE) { + gs.set_state(OPTIMIZED); // Set the ANA state + nas.mutable_states()->Add(std::move(gs)); + dout(0) << "nqn: " << nqn << " grpid " << (ana_grp_index + 1) << " OPTIMIZED" <Add(std::move(nas)); } - if (nas.states_size()) ai.mutable_states()->Add(std::move(nas)); } if (ai.states_size()) { bool set_ana_state = false;