diff --git a/.gitmodules b/.gitmodules index c4f68c6b2fa4..a44fccbedcfe 100644 --- a/.gitmodules +++ b/.gitmodules @@ -78,4 +78,11 @@ [submodule "src/BLAKE3"] path = src/BLAKE3 url = https://github.com/BLAKE3-team/BLAKE3.git - +[submodule "src/boost_redis"] + path = src/boost_redis + url = https://github.com/boostorg/redis.git +[submodule "src/nvmeof/gateway"] + path = src/nvmeof/gateway + url = https://github.com/ceph/ceph-nvmeof.git + fetchRecurseSubmodules = false + shallow = true diff --git a/PendingReleaseNotes b/PendingReleaseNotes index 25fcbb70db08..391b8e69cfbb 100644 --- a/PendingReleaseNotes +++ b/PendingReleaseNotes @@ -506,3 +506,11 @@ Relevant tracker: https://tracker.ceph.com/issues/57090 set using the `fs set` command. This flag prevents using a standby for another file system (join_fs = X) when standby for the current filesystem is not available. Relevant tracker: https://tracker.ceph.com/issues/61599 +* mon: add NVMe-oF gateway monitor and HA + This PR adds high availability support for the nvmeof Ceph service. High availability +means that even in the case that a certain GW is down, there will be another available +path for the initiator to be able to continue the IO through another GW. +It is also adding 2 new mon commands, to notify monitor about the gateway creation/deletion: + - nvme-gw create + - nvme-gw delete +Relevant tracker: https://tracker.ceph.com/issues/64777 diff --git a/ceph.spec.in b/ceph.spec.in index fae1e390ebab..686b9388c942 100644 --- a/ceph.spec.in +++ b/ceph.spec.in @@ -250,6 +250,7 @@ BuildRequires: gperf BuildRequires: cmake > 3.5 BuildRequires: fuse-devel BuildRequires: git +BuildRequires: grpc-devel %if 0%{?fedora} || 0%{?suse_version} > 1500 || 0%{?rhel} == 9 || 0%{?openEuler} BuildRequires: gcc-c++ >= 11 %endif @@ -642,6 +643,17 @@ system. One or more instances of ceph-mon form a Paxos part-time parliament cluster that provides extremely reliable and durable storage of cluster membership, configuration, and state. +%package mon-client-nvmeof +Summary: Ceph NVMeoF Gateway Monitor Client +%if 0%{?suse_version} +Group: System/Filesystems +%endif +Provides: ceph-test:/usr/bin/ceph-nvmeof-monitor-client +Requires: librados2 = %{_epoch_prefix}%{version}-%{release} +%description mon-client-nvmeof +Ceph NVMeoF Gateway Monitor Client distributes Paxos ANA info +to NVMeoF Gateway and provides beacons to the monitor daemon + %package mgr Summary: Ceph Manager Daemon %if 0%{?suse_version} @@ -2077,6 +2089,9 @@ if [ $1 -ge 1 ] ; then fi fi +%files mon-client-nvmeof +%{_bindir}/ceph-nvmeof-monitor-client + %files fuse %{_bindir}/ceph-fuse %{_mandir}/man8/ceph-fuse.8* diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 79b45ef171f9..591ea5f357e1 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -305,6 +305,12 @@ endif(WITH_BLKIN) if(WITH_JAEGER) find_package(thrift 0.13.0 REQUIRED) + + if(EXISTS "/etc/redhat-release" OR EXISTS "/etc/fedora-release") + # absl is installed as grpc build dependency on RPM based systems + add_definitions(-DHAVE_ABSEIL) + endif() + include(BuildOpentelemetry) build_opentelemetry() add_library(jaeger_base INTERFACE) @@ -875,6 +881,112 @@ if(WITH_FUSE) install(PROGRAMS mount.fuse.ceph DESTINATION ${CMAKE_INSTALL_SBINDIR}) endif(WITH_FUSE) +# NVMEOF GATEWAY MONITOR CLIENT +# Supported on RPM-based platforms only, depends on grpc devel libraries/tools +if(EXISTS "/etc/redhat-release" OR EXISTS "/etc/fedora-release") + option(WITH_NVMEOF_GATEWAY_MONITOR_CLIENT "build nvmeof gateway monitor client" ON) +else() + option(WITH_NVMEOF_GATEWAY_MONITOR_CLIENT "build nvmeof gateway monitor client" OFF) +endif() + +if(WITH_NVMEOF_GATEWAY_MONITOR_CLIENT) + + # Find Protobuf installation + # Looks for protobuf-config.cmake file installed by Protobuf's cmake installation. + option(protobuf_MODULE_COMPATIBLE TRUE) + find_package(Protobuf REQUIRED) + + set(_REFLECTION grpc++_reflection) + if(CMAKE_CROSSCOMPILING) + find_program(_PROTOBUF_PROTOC protoc) + else() + set(_PROTOBUF_PROTOC $) + endif() + + # Find gRPC installation + # Looks for gRPCConfig.cmake file installed by gRPC's cmake installation. + find_package(gRPC CONFIG REQUIRED) + message(STATUS "Using gRPC ${gRPC_VERSION}") + set(_GRPC_GRPCPP gRPC::grpc++) + if(CMAKE_CROSSCOMPILING) + find_program(_GRPC_CPP_PLUGIN_EXECUTABLE grpc_cpp_plugin) + else() + set(_GRPC_CPP_PLUGIN_EXECUTABLE $) + endif() + + # Gateway Proto file + get_filename_component(nvmeof_gateway_proto "nvmeof/gateway/control/proto/gateway.proto" ABSOLUTE) + get_filename_component(nvmeof_gateway_proto_path "${nvmeof_gateway_proto}" PATH) + + # Generated sources + set(nvmeof_gateway_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/gateway.pb.cc") + set(nvmeof_gateway_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/gateway.pb.h") + set(nvmeof_gateway_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/gateway.grpc.pb.cc") + set(nvmeof_gateway_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/gateway.grpc.pb.h") + + add_custom_command( + OUTPUT "${nvmeof_gateway_proto_srcs}" "${nvmeof_gateway_proto_hdrs}" "${nvmeof_gateway_grpc_srcs}" "${nvmeof_gateway_grpc_hdrs}" + COMMAND ${_PROTOBUF_PROTOC} + ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" + --cpp_out "${CMAKE_CURRENT_BINARY_DIR}" + -I "${nvmeof_gateway_proto_path}" + --experimental_allow_proto3_optional + --plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}" + "${nvmeof_gateway_proto}" + DEPENDS "${nvmeof_gateway_proto}") + + + # Monitor Proto file + get_filename_component(nvmeof_monitor_proto "nvmeof/gateway/control/proto/monitor.proto" ABSOLUTE) + get_filename_component(nvmeof_monitor_proto_path "${nvmeof_monitor_proto}" PATH) + + # Generated sources + set(nvmeof_monitor_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/monitor.pb.cc") + set(nvmeof_monitor_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/monitor.pb.h") + set(nvmeof_monitor_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/monitor.grpc.pb.cc") + set(nvmeof_monitor_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/monitor.grpc.pb.h") + + add_custom_command( + OUTPUT "${nvmeof_monitor_proto_srcs}" "${nvmeof_monitor_proto_hdrs}" "${nvmeof_monitor_grpc_srcs}" "${nvmeof_monitor_grpc_hdrs}" + COMMAND ${_PROTOBUF_PROTOC} + ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" + --cpp_out "${CMAKE_CURRENT_BINARY_DIR}" + -I "${nvmeof_monitor_proto_path}" + --experimental_allow_proto3_optional + --plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}" + "${nvmeof_monitor_proto}" + DEPENDS "${nvmeof_monitor_proto}") + + # Include generated *.pb.h files + include_directories("${CMAKE_CURRENT_BINARY_DIR}") + + set(ceph_nvmeof_monitor_client_srcs + ${nvmeof_gateway_proto_srcs} + ${nvmeof_gateway_proto_hdrs} + ${nvmeof_gateway_grpc_srcs} + ${nvmeof_gateway_grpc_hdrs} + ${nvmeof_monitor_proto_srcs} + ${nvmeof_monitor_proto_hdrs} + ${nvmeof_monitor_grpc_srcs} + ${nvmeof_monitor_grpc_hdrs} + ceph_nvmeof_monitor_client.cc + nvmeof/NVMeofGwClient.cc + nvmeof/NVMeofGwMonitorGroupClient.cc + nvmeof/NVMeofGwMonitorClient.cc) + add_executable(ceph-nvmeof-monitor-client ${ceph_nvmeof_monitor_client_srcs}) + add_dependencies(ceph-nvmeof-monitor-client ceph-common) + target_link_libraries(ceph-nvmeof-monitor-client + client + mon + global-static + ceph-common + ${_REFLECTION} + ${_GRPC_GRPCPP} + ) + install(TARGETS ceph-nvmeof-monitor-client DESTINATION bin) +endif() +# END OF NVMEOF GATEWAY MONITOR CLIENT + if(WITH_DOKAN) add_subdirectory(dokan) endif(WITH_DOKAN) diff --git a/src/ceph_nvmeof_monitor_client.cc b/src/ceph_nvmeof_monitor_client.cc new file mode 100644 index 000000000000..05457998cb8b --- /dev/null +++ b/src/ceph_nvmeof_monitor_client.cc @@ -0,0 +1,79 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM Inc + * + * Author: Alexander Indenbaum + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include + +#include "include/types.h" +#include "include/compat.h" +#include "common/config.h" +#include "common/ceph_argparse.h" +#include "common/errno.h" +#include "common/pick_address.h" +#include "global/global_init.h" + +#include "nvmeof/NVMeofGwMonitorClient.h" + +static void usage() +{ + std::cout << "usage: ceph-nvmeof-monitor-client\n" + " --gateway-name \n" + " --gateway-address \n" + " --gateway-pool \n" + " --gateway-group \n" + " --monitor-group-address \n" + " [flags]\n" + << std::endl; + generic_server_usage(); +} + +/** + * A short main() which just instantiates a Nvme and + * hands over control to that. + */ +int main(int argc, const char **argv) +{ + ceph_pthread_setname(pthread_self(), "ceph-nvmeof-monitor-client"); + + auto args = argv_to_vec(argc, argv); + if (args.empty()) { + std::cerr << argv[0] << ": -h or --help for usage" << std::endl; + exit(1); + } + if (ceph_argparse_need_usage(args)) { + usage(); + exit(0); + } + + auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, // maybe later use CODE_ENVIRONMENT_DAEMON, + CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); + + pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC); + + global_init_daemonize(g_ceph_context); + global_init_chdir(g_ceph_context); + common_init_finish(g_ceph_context); + + NVMeofGwMonitorClient gw_monitor_client(argc, argv); + int rc = gw_monitor_client.init(); + if (rc != 0) { + std::cerr << "Error in initialization: " << cpp_strerror(rc) << std::endl; + return rc; + } + + return gw_monitor_client.main(args); +} + diff --git a/src/common/options/global.yaml.in b/src/common/options/global.yaml.in index 1b355d6e03ad..b34e3c5a337b 100644 --- a/src/common/options/global.yaml.in +++ b/src/common/options/global.yaml.in @@ -1755,6 +1755,13 @@ options: default: 500 services: - mon +- name: mon_max_nvmeof_epochs + type: int + level: advanced + desc: max number of nvmeof gateway maps to store + default: 500 + services: + - mon - name: mon_max_osd type: int level: advanced diff --git a/src/common/options/mon.yaml.in b/src/common/options/mon.yaml.in index 1ec9871b6a8e..ab1634bc154b 100644 --- a/src/common/options/mon.yaml.in +++ b/src/common/options/mon.yaml.in @@ -72,6 +72,25 @@ options: default: 30 services: - mon +- name: mon_nvmeofgw_beacon_grace + type: secs + level: advanced + desc: Period in seconds from last beacon to monitor marking a NVMeoF gateway as + failed + default: 10 + services: + - mon +- name: mon_nvmeofgw_set_group_id_retry + type: uint + level: advanced + desc: Retry wait time in microsecond for set group id between the monitor client + and gateway + long_desc: The monitor server determines the gateway's group ID. If the monitor client + receives a monitor group ID assignment before the gateway is fully up during + initialization, a retry is required. + default: 1000 + services: + - mon - name: mon_mgr_inactive_grace type: int level: advanced @@ -1341,3 +1360,18 @@ options: with_legacy: true see_also: - osd_heartbeat_use_min_delay_socket +- name: nvmeof_mon_client_disconnect_panic + type: secs + level: advanced + desc: The duration, expressed in seconds, after which the nvmeof gateway + should trigger a panic if it loses connection to the monitor + default: 100 + services: + - mon +- name: nvmeof_mon_client_tick_period + type: secs + level: advanced + desc: Period in seconds of nvmeof gateway beacon messages to monitor + default: 2 + services: + - mon diff --git a/src/messages/MNVMeofGwBeacon.h b/src/messages/MNVMeofGwBeacon.h new file mode 100644 index 000000000000..26fc8dcf3ac1 --- /dev/null +++ b/src/messages/MNVMeofGwBeacon.h @@ -0,0 +1,122 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_NVMEOFGWBEACON_H +#define CEPH_NVMEOFGWBEACON_H + +#include +#include +#include "messages/PaxosServiceMessage.h" +#include "mon/MonCommand.h" +#include "mon/NVMeofGwMap.h" +#include "include/types.h" + +class MNVMeofGwBeacon final : public PaxosServiceMessage { +private: + static constexpr int HEAD_VERSION = 1; + static constexpr int COMPAT_VERSION = 1; + +protected: + std::string gw_id; + std::string gw_pool; + std::string gw_group; + BeaconSubsystems subsystems; // gateway susbsystem and their state machine states + gw_availability_t availability; // in absence of beacon heartbeat messages it becomes inavailable + epoch_t last_osd_epoch; + epoch_t last_gwmap_epoch; + +public: + MNVMeofGwBeacon() + : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, HEAD_VERSION, COMPAT_VERSION} + { + set_priority(CEPH_MSG_PRIO_HIGH); + } + + MNVMeofGwBeacon(const std::string &gw_id_, + const std::string& gw_pool_, + const std::string& gw_group_, + const BeaconSubsystems& subsystems_, + const gw_availability_t& availability_, + const epoch_t& last_osd_epoch_, + const epoch_t& last_gwmap_epoch_ + ) + : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, HEAD_VERSION, COMPAT_VERSION}, + gw_id(gw_id_), gw_pool(gw_pool_), gw_group(gw_group_), subsystems(subsystems_), + availability(availability_), last_osd_epoch(last_osd_epoch_), last_gwmap_epoch(last_gwmap_epoch_) + { + set_priority(CEPH_MSG_PRIO_HIGH); + } + + const std::string& get_gw_id() const { return gw_id; } + const std::string& get_gw_pool() const { return gw_pool; } + const std::string& get_gw_group() const { return gw_group; } + NvmeAnaNonceMap get_nonce_map() const { + NvmeAnaNonceMap nonce_map; + for (const auto& sub: subsystems) { + for (const auto& ns: sub.namespaces) { + auto& nonce_vec = nonce_map[ns.anagrpid-1];//Converting ana groups to offsets + if (std::find(nonce_vec.begin(), nonce_vec.end(), ns.nonce) == nonce_vec.end()) + nonce_vec.push_back(ns.nonce); + } + } + return nonce_map; + } + + const gw_availability_t& get_availability() const { return availability; } + const epoch_t& get_last_osd_epoch() const { return last_osd_epoch; } + const epoch_t& get_last_gwmap_epoch() const { return last_gwmap_epoch; } + const BeaconSubsystems& get_subsystems() const { return subsystems; }; + +private: + ~MNVMeofGwBeacon() final {} + +public: + + std::string_view get_type_name() const override { return "nvmeofgwbeacon"; } + + void encode_payload(uint64_t features) override { + using ceph::encode; + paxos_encode(); + encode(gw_id, payload); + encode(gw_pool, payload); + encode(gw_group, payload); + encode(subsystems, payload); + encode((uint32_t)availability, payload); + encode(last_osd_epoch, payload); + encode(last_gwmap_epoch, payload); + } + + void decode_payload() override { + using ceph::decode; + auto p = payload.cbegin(); + + paxos_decode(p); + decode(gw_id, p); + decode(gw_pool, p); + decode(gw_group, p); + decode(subsystems, p); + uint32_t tmp; + decode(tmp, p); + availability = static_cast(tmp); + decode(last_osd_epoch, p); + decode(last_gwmap_epoch, p); + } + +private: + template + friend boost::intrusive_ptr ceph::make_message(Args&&... args); +}; + + +#endif diff --git a/src/messages/MNVMeofGwMap.h b/src/messages/MNVMeofGwMap.h new file mode 100644 index 000000000000..3affdd250dc0 --- /dev/null +++ b/src/messages/MNVMeofGwMap.h @@ -0,0 +1,70 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MNVMEOFGWMAP_H +#define CEPH_MNVMEOFGWMAP_H + +#include "msg/Message.h" +#include "mon/NVMeofGwMap.h" + +class MNVMeofGwMap final : public Message { +private: + static constexpr int VERSION = 1; + +protected: + std::map map; + epoch_t gwmap_epoch; + +public: + const std::map& get_map() {return map;} + const epoch_t& get_gwmap_epoch() {return gwmap_epoch;} + +private: + MNVMeofGwMap() : + Message{MSG_MNVMEOF_GW_MAP} {} + MNVMeofGwMap(const NVMeofGwMap &map_) : + Message{MSG_MNVMEOF_GW_MAP}, gwmap_epoch(map_.epoch) + { + map_.to_gmap(map); + } + ~MNVMeofGwMap() final {} + +public: + std::string_view get_type_name() const override { return "nvmeofgwmap"; } + + void decode_payload() override { + auto p = payload.cbegin(); + int version; + decode(version, p); + if (version > VERSION) + throw ::ceph::buffer::malformed_input(DECODE_ERR_OLDVERSION(__PRETTY_FUNCTION__, VERSION, version)); + decode(gwmap_epoch, p); + decode(map, p); + } + void encode_payload(uint64_t features) override { + using ceph::encode; + encode(VERSION, payload); + encode(gwmap_epoch, payload); + encode(map, payload); + } +private: + using RefCountedObject::put; + using RefCountedObject::get; + template + friend boost::intrusive_ptr ceph::make_message(Args&&... args); + template + friend MURef crimson::make_message(Args&&... args); +}; + +#endif diff --git a/src/mon/CMakeLists.txt b/src/mon/CMakeLists.txt index 4019f854c991..c5bf64f8c153 100644 --- a/src/mon/CMakeLists.txt +++ b/src/mon/CMakeLists.txt @@ -21,6 +21,8 @@ set(lib_mon_srcs ConnectionTracker.cc HealthMonitor.cc KVMonitor.cc + NVMeofGwMon.cc + NVMeofGwMap.cc ../mds/MDSAuthCaps.cc ../mgr/mgr_commands.cc ../osd/OSDCap.cc diff --git a/src/mon/MonCommands.h b/src/mon/MonCommands.h index e9025b05ef77..438cbcfd6d58 100644 --- a/src/mon/MonCommands.h +++ b/src/mon/MonCommands.h @@ -1378,8 +1378,25 @@ COMMAND("config generate-minimal-conf", "Generate a minimal ceph.conf file", "config", "r") +/* NVMeofGwMon*/ +COMMAND("nvme-gw create" + " name=id,type=CephString" + " name=pool,type=CephString" + " name=group,type=CephString", + "create nvmeof gateway id for (pool, group)", + "mgr", "rw") +COMMAND("nvme-gw delete" + " name=id,type=CephString" + " name=pool,type=CephString" + " name=group,type=CephString", + "delete nvmeof gateway id for (pool, group)", + "mgr", "rw") - +COMMAND("nvme-gw show" + " name=pool,type=CephString" + " name=group,type=CephString", + " show nvmeof gateways within (pool, group)", + "mon", "r") // these are tell commands that were implemented as CLI commands in // the broken pre-octopus way that we want to allow to work when a diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index a70bfbe33c9d..07e6bebab497 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -84,6 +84,7 @@ #include "MgrStatMonitor.h" #include "ConfigMonitor.h" #include "KVMonitor.h" +#include "NVMeofGwMon.h" #include "mon/HealthMonitor.h" #include "common/config.h" #include "common/cmdparse.h" @@ -247,6 +248,7 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s, paxos_service[PAXOS_HEALTH].reset(new HealthMonitor(*this, *paxos, "health")); paxos_service[PAXOS_CONFIG].reset(new ConfigMonitor(*this, *paxos, "config")); paxos_service[PAXOS_KV].reset(new KVMonitor(*this, *paxos, "kv")); + paxos_service[PAXOS_NVMEGW].reset(new NVMeofGwMon(*this, *paxos, "nvmeofgw")); bool r = mon_caps.parse("allow *", NULL); ceph_assert(r); @@ -3617,7 +3619,10 @@ void Monitor::handle_command(MonOpRequestRef op) mgrmon()->dispatch(op); return; } - + if (module == "nvme-gw"){ + nvmegwmon()->dispatch(op); + return; + } if (prefix == "fsid") { if (f) { f->open_object_section("fsid"); @@ -4551,6 +4556,7 @@ void Monitor::_ms_dispatch(Message *m) void Monitor::dispatch_op(MonOpRequestRef op) { op->mark_event("mon:dispatch_op"); + MonSession *s = op->get_session(); ceph_assert(s); if (s->closed) { @@ -4664,6 +4670,11 @@ void Monitor::dispatch_op(MonOpRequestRef op) paxos_service[PAXOS_MGR]->dispatch(op); return; + case MSG_MNVMEOF_GW_BEACON: + paxos_service[PAXOS_NVMEGW]->dispatch(op); + return; + + // MgrStat case MSG_MON_MGR_REPORT: case CEPH_MSG_STATFS: @@ -5351,6 +5362,9 @@ void Monitor::handle_subscribe(MonOpRequestRef op) } else if (p->first.find("kv:") == 0) { kvmon()->check_sub(s->sub_map[p->first]); } + else if (p->first == "NVMeofGw") { + nvmegwmon()->check_sub(s->sub_map[p->first]); + } } if (reply) { diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 13afacafde7d..0f8481eea6dc 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -712,6 +712,11 @@ class Monitor : public Dispatcher, return (class KVMonitor*) paxos_service[PAXOS_KV].get(); } + class NVMeofGwMon *nvmegwmon() { + return (class NVMeofGwMon*) paxos_service[PAXOS_NVMEGW].get(); + } + + friend class Paxos; friend class OSDMonitor; friend class MDSMonitor; diff --git a/src/mon/NVMeofGwMap.cc b/src/mon/NVMeofGwMap.cc new file mode 100755 index 000000000000..9af9f81b7f3e --- /dev/null +++ b/src/mon/NVMeofGwMap.cc @@ -0,0 +1,659 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include +#include "include/stringify.h" +#include "NVMeofGwMon.h" +#include "NVMeofGwMap.h" +#include "OSDMonitor.h" + +using std::map; +using std::make_pair; +using std::ostream; +using std::ostringstream; +using std::string; + +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define dout_prefix *_dout << "nvmeofgw " << __PRETTY_FUNCTION__ << " " + +void NVMeofGwMap::to_gmap(std::map& Gmap) const { + Gmap.clear(); + for (const auto& created_map_pair: created_gws) { + const auto& group_key = created_map_pair.first; + const NvmeGwMonStates& gw_created_map = created_map_pair.second; + for (const auto& gw_created_pair: gw_created_map) { + const auto& gw_id = gw_created_pair.first; + const auto& gw_created = gw_created_pair.second; + + auto gw_state = NvmeGwClientState(gw_created.ana_grp_id, epoch, gw_created.availability); + for (const auto& sub: gw_created.subsystems) { + gw_state.subsystems.insert({sub.nqn, NqnState(sub.nqn, gw_created.sm_state, gw_created )}); + } + Gmap[group_key][gw_id] = gw_state; + dout (20) << gw_id << " Gw-Client: " << gw_state << dendl; + } + } +} + +void NVMeofGwMap::add_grp_id(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, const NvmeAnaGrpId grpid) +{ + Tmdata tm_data; + Blocklist_data blklist_data; + created_gws[group_key][gw_id].sm_state[grpid] = gw_states_per_group_t::GW_STANDBY_STATE; + fsm_timers[group_key][gw_id].data[grpid] = tm_data; + created_gws[group_key][gw_id].blocklist_data[grpid] = blklist_data; +} + +void NVMeofGwMap::remove_grp_id(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, const NvmeAnaGrpId grpid) +{ + created_gws[group_key][gw_id].sm_state.erase(grpid); + created_gws[group_key][gw_id].blocklist_data.erase(grpid); + fsm_timers[group_key][gw_id].data.erase(grpid); +} + +int NVMeofGwMap::cfg_add_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key) { + std::set allocated; + for (auto& itr: created_gws[group_key]) { + allocated.insert(itr.second.ana_grp_id); + if (itr.first == gw_id) { + dout(1) << __func__ << " ERROR create GW: already exists in map " << gw_id << dendl; + return -EEXIST ; + } + } + // Allocate the new group id + NvmeAnaGrpId i = 0; + bool was_allocated = false; + for (NvmeAnaGrpId elem: allocated) {// "allocated" is a sorted set (!),so if found any gap between numbers, it should be filled + if (i != elem) { + allocated.insert(i); + was_allocated = true; + break; + } + i++; + } + if (!was_allocated) allocated.insert(i); + dout(10) << "allocated ANA groupId " << i << " for GW " << gw_id << dendl; + for (auto& itr: created_gws[group_key]) { // add new allocated grp_id to maps of created gateways + add_grp_id(itr.first, group_key, i); + } + NvmeGwMonState gw_created(i); + created_gws[group_key][gw_id] = gw_created; + created_gws[group_key][gw_id].performed_full_startup = true; + for (NvmeAnaGrpId elem: allocated) { + add_grp_id(gw_id, group_key, elem); // add all existed grp_ids to newly created gateway + dout(4) << "adding group " << elem << " to gw " << gw_id << dendl; + } + dout(10) << __func__ << " Created GWS: " << created_gws << dendl; + return 0; +} + +int NVMeofGwMap::cfg_delete_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key) { + int rc = 0; + for (auto& gws_states: created_gws[group_key]) { + + if (gws_states.first == gw_id) { + auto& state = gws_states.second; + for (auto& state_itr: created_gws[group_key][gw_id].sm_state ) { + bool modified; + fsm_handle_gw_delete(gw_id, group_key,state_itr.second , state_itr.first, modified); + } + dout(10) << " Delete GW :"<< gw_id << " ANA grpid: " << state.ana_grp_id << dendl; + for (auto& itr: created_gws[group_key]) { + remove_grp_id(itr.first, group_key, state.ana_grp_id);// Update state map and other maps + // of all created gateways. Removed key = anagrp + } + fsm_timers[group_key].erase(gw_id); + if (fsm_timers[group_key].size() == 0) + fsm_timers.erase(group_key); + + created_gws[group_key].erase(gw_id); + if (created_gws[group_key].size() == 0) + created_gws.erase(group_key); + return rc; + } + } + + return -EINVAL; +} + + +int NVMeofGwMap::process_gw_map_gw_down(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, + bool &propose_pending) { + int rc = 0; + auto& gws_states = created_gws[group_key]; + auto gw_state = gws_states.find(gw_id); + if (gw_state != gws_states.end()) { + dout(10) << "GW down " << gw_id << dendl; + auto& st = gw_state->second; + st.set_unavailable_state(); + for (auto& state_itr: created_gws[group_key][gw_id].sm_state ) { + fsm_handle_gw_down(gw_id, group_key, state_itr.second, state_itr.first, propose_pending); + state_itr.second = gw_states_per_group_t::GW_STANDBY_STATE; + } + propose_pending = true; // map should reflect that gw becames unavailable + if (propose_pending) validate_gw_map(group_key); + } + else { + dout(1) << __FUNCTION__ << "ERROR GW-id was not found in the map " << gw_id << dendl; + rc = -EINVAL; + } + return rc; +} + + +void NVMeofGwMap::process_gw_map_ka(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, epoch_t& last_osd_epoch, bool &propose_pending) +{ + auto& gws_states = created_gws[group_key]; + auto gw_state = gws_states.find(gw_id); + auto& st = gw_state->second; + dout(20) << "KA beacon from the GW " << gw_id << " in state " << (int)st.availability << dendl; + + if (st.availability == gw_availability_t::GW_CREATED) { + // first time appears - allow IO traffic for this GW + st.availability = gw_availability_t::GW_AVAILABLE; + for (auto& state_itr: created_gws[group_key][gw_id].sm_state ) state_itr.second = gw_states_per_group_t::GW_STANDBY_STATE; + if (st.ana_grp_id != REDUNDANT_GW_ANA_GROUP_ID) { // not a redundand GW + st.active_state(st.ana_grp_id); + } + propose_pending = true; + } + else if (st.availability == gw_availability_t::GW_UNAVAILABLE) { + st.availability = gw_availability_t::GW_AVAILABLE; + if (st.ana_grp_id == REDUNDANT_GW_ANA_GROUP_ID) { + for (auto& state_itr: created_gws[group_key][gw_id].sm_state ) state_itr.second = gw_states_per_group_t::GW_STANDBY_STATE; + propose_pending = true; + } + else { + //========= prepare to Failback to this GW ========= + // find the GW that took over on the group st.ana_grp_id + find_failback_gw(gw_id, group_key, propose_pending); + } + } + else if (st.availability == gw_availability_t::GW_AVAILABLE) { + for (auto& state_itr: created_gws[group_key][gw_id].sm_state ) { + fsm_handle_gw_alive(gw_id, group_key, gw_state->second, state_itr.second, state_itr.first, last_osd_epoch, propose_pending); + } + } + if (propose_pending) validate_gw_map(group_key); +} + + +void NVMeofGwMap::handle_abandoned_ana_groups(bool& propose) +{ + propose = false; + for (auto& group_state: created_gws) { + auto& group_key = group_state.first; + auto& gws_states = group_state.second; + + for (auto& gw_state : gws_states) { // loop for GWs inside nqn group + auto& gw_id = gw_state.first; + NvmeGwMonState& state = gw_state.second; + + //1. Failover missed : is there is a GW in unavailable state? if yes, is its ANA group handled by some other GW? + if (state.availability == gw_availability_t::GW_UNAVAILABLE && state.ana_grp_id != REDUNDANT_GW_ANA_GROUP_ID) { + auto found_gw_for_ana_group = false; + for (auto& gw_state2 : gws_states) { + NvmeGwMonState& state2 = gw_state2.second; + if (state2.availability == gw_availability_t::GW_AVAILABLE && state2.sm_state[state.ana_grp_id] == gw_states_per_group_t::GW_ACTIVE_STATE) { + found_gw_for_ana_group = true; + break; + } + } + if (found_gw_for_ana_group == false) { //choose the GW for handle ana group + dout(10)<< "Was not found the GW " << " that handles ANA grp " << (int)state.ana_grp_id << " find candidate "<< dendl; + for (auto& state_itr: created_gws[group_key][gw_id].sm_state ) { + find_failover_candidate(gw_id, group_key, state_itr.first, propose); + } + } + } + + //2. Failback missed: Check this GW is Available and Standby and no other GW is doing Failback to it + else if (state.availability == gw_availability_t::GW_AVAILABLE + && state.ana_grp_id != REDUNDANT_GW_ANA_GROUP_ID && + state.sm_state[state.ana_grp_id] == gw_states_per_group_t::GW_STANDBY_STATE) + { + find_failback_gw(gw_id, group_key, propose); + } + } + if (propose) { + validate_gw_map(group_key); + } + } +} + + +void NVMeofGwMap::set_failover_gw_for_ANA_group(const NvmeGwId &failed_gw_id, const NvmeGroupKey& group_key, const NvmeGwId &gw_id, NvmeAnaGrpId ANA_groupid) +{ + NvmeGwMonState& gw_state = created_gws[group_key][gw_id]; + epoch_t epoch; + dout(10) << "Found failover GW " << gw_id << " for ANA group " << (int)ANA_groupid << dendl; + int rc = blocklist_gw(failed_gw_id, group_key, ANA_groupid, epoch, true); + if (rc) { + gw_state.active_state(ANA_groupid); //start failover even when nonces are empty ! + } + else{ + gw_state.sm_state[ANA_groupid] = gw_states_per_group_t::GW_WAIT_BLOCKLIST_CMPL; + gw_state.blocklist_data[ANA_groupid].osd_epoch = epoch; + gw_state.blocklist_data[ANA_groupid].is_failover = true; + start_timer(gw_id, group_key, ANA_groupid, 30); //start Failover preparation timer + } +} + +void NVMeofGwMap::find_failback_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, bool &propose) +{ + auto& gws_states = created_gws[group_key]; + auto& gw_state = created_gws[group_key][gw_id]; + bool do_failback = false; + + dout(10) << "Find failback GW for GW " << gw_id << dendl; + for (auto& gw_state_it: gws_states) { + auto& st = gw_state_it.second; + if (st.sm_state[gw_state.ana_grp_id] != gw_states_per_group_t::GW_STANDBY_STATE) {// some other gw owns or owned the desired ana-group + do_failback = true;// if candidate is in state ACTIVE for the desired ana-group, then failback starts immediately, otherwise need to wait + dout(10) << "Found some gw " << gw_state_it.first << " in state " << st.sm_state[gw_state.ana_grp_id] << dendl; + break; + } + } + + if (do_failback == false) { + // No other gw currently performs some activity with desired ana group of coming-up GW - so it just takes over on the group + dout(10) << "Failback GW candidate was not found, just set Optimized to group " << gw_state.ana_grp_id << " to GW " << gw_id << dendl; + gw_state.active_state(gw_state.ana_grp_id); + propose = true; + return; + } + //try to do_failback + for (auto& gw_state_it: gws_states) { + auto& failback_gw_id = gw_state_it.first; + auto& st = gw_state_it.second; + if (st.sm_state[gw_state.ana_grp_id] == gw_states_per_group_t::GW_ACTIVE_STATE) { + dout(10) << "Found Failback GW " << failback_gw_id << " that previously took over the ANAGRP " << gw_state.ana_grp_id << " of the available GW " << gw_id << dendl; + st.sm_state[gw_state.ana_grp_id] = gw_states_per_group_t::GW_WAIT_FAILBACK_PREPARED; + start_timer(failback_gw_id, group_key, gw_state.ana_grp_id, 3);// Add timestamp of start Failback preparation + gw_state.sm_state[gw_state.ana_grp_id] = gw_states_per_group_t::GW_OWNER_WAIT_FAILBACK_PREPARED; + propose = true; + break; + } + } +} + +void NVMeofGwMap::find_failover_candidate(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId grpid, bool &propose_pending) +{ + dout(10) <<__func__<< " " << gw_id << dendl; + #define ILLEGAL_GW_ID " " + #define MIN_NUM_ANA_GROUPS 0xFFF + int min_num_ana_groups_in_gw = 0; + int current_ana_groups_in_gw = 0; + NvmeGwId min_loaded_gw_id = ILLEGAL_GW_ID; + auto& gws_states = created_gws[group_key]; + auto gw_state = gws_states.find(gw_id); + + // this GW may handle several ANA groups and for each of them need to found the candidate GW + if (gw_state->second.sm_state[grpid] == gw_states_per_group_t::GW_ACTIVE_STATE || gw_state->second.ana_grp_id == grpid) { + + for (auto& found_gw_state: gws_states) { // for all the gateways of the subsystem + auto st = found_gw_state.second; + if (st.sm_state[grpid] == gw_states_per_group_t::GW_WAIT_BLOCKLIST_CMPL) { // some GW already started failover/failback on this group + dout(4) << "Warning : Failover" << st.blocklist_data[grpid].is_failover << " already started for the group " << grpid << " by GW " << found_gw_state.first << dendl; + gw_state->second.standby_state(grpid); + return ; + } + } + // Find a GW that takes over the ANA group(s) + min_num_ana_groups_in_gw = MIN_NUM_ANA_GROUPS; + min_loaded_gw_id = ILLEGAL_GW_ID; + for (auto& found_gw_state: gws_states) { // for all the gateways of the subsystem + auto st = found_gw_state.second; + if (st.availability == gw_availability_t::GW_AVAILABLE) { + current_ana_groups_in_gw = 0; + for (auto& state_itr: created_gws[group_key][gw_id].sm_state ) { + NvmeAnaGrpId anagrp = state_itr.first; + if (st.sm_state[anagrp] == gw_states_per_group_t::GW_OWNER_WAIT_FAILBACK_PREPARED || st.sm_state[anagrp] == gw_states_per_group_t::GW_WAIT_FAILBACK_PREPARED + || st.sm_state[anagrp] == gw_states_per_group_t::GW_WAIT_BLOCKLIST_CMPL) { + current_ana_groups_in_gw = 0xFFFF; + break; // dont take into account GWs in the transitive state + } + else if (st.sm_state[anagrp] == gw_states_per_group_t::GW_ACTIVE_STATE) { + current_ana_groups_in_gw++; // how many ANA groups are handled by this GW + } + } + if (min_num_ana_groups_in_gw > current_ana_groups_in_gw) { + min_num_ana_groups_in_gw = current_ana_groups_in_gw; + min_loaded_gw_id = found_gw_state.first; + dout(10) << "choose: gw-id min_ana_groups " << min_loaded_gw_id << current_ana_groups_in_gw << " min " << min_num_ana_groups_in_gw << dendl; + } + } + } + if (min_loaded_gw_id != ILLEGAL_GW_ID) { + propose_pending = true; + set_failover_gw_for_ANA_group(gw_id, group_key, min_loaded_gw_id, grpid); + } + else { + if (gw_state->second.sm_state[grpid] == gw_states_per_group_t::GW_ACTIVE_STATE) {// not found candidate but map changed. + propose_pending = true; + dout(10) << "gw down: no candidate found " << dendl; + } + } + gw_state->second.standby_state(grpid); + } +} + +void NVMeofGwMap::fsm_handle_gw_alive(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeGwMonState & gw_state, gw_states_per_group_t state, NvmeAnaGrpId grpid, epoch_t& last_osd_epoch, bool &map_modified) +{ + switch (state) { + case gw_states_per_group_t::GW_WAIT_BLOCKLIST_CMPL: + { + int timer_val = get_timer(gw_id, group_key, grpid); + NvmeGwMonState& gw_map = created_gws[group_key][gw_id]; + if (gw_map.blocklist_data[grpid].osd_epoch <= last_osd_epoch) { + dout(10) << "is-failover: " << gw_map.blocklist_data[grpid].is_failover << " osd epoch changed from " << gw_map.blocklist_data[grpid].osd_epoch << " to "<< last_osd_epoch + << " Ana-grp: " << grpid << " timer:" << timer_val << dendl; + gw_state.active_state(grpid); // Failover Gw still alive and guaranteed that + cancel_timer(gw_id, group_key, grpid); // ana group wouldnt be taken back during blocklist wait period + map_modified = true; + } + else{ + dout(20) << "osd epoch not changed from " << gw_map.blocklist_data[grpid].osd_epoch << " to "<< last_osd_epoch + << " Ana-grp: " << grpid << " timer:" << timer_val << dendl; + } + } + break; + + default: + break; + } +} + + void NVMeofGwMap::fsm_handle_gw_down(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, gw_states_per_group_t state, NvmeAnaGrpId grpid, bool &map_modified) + { + switch (state) + { + case gw_states_per_group_t::GW_STANDBY_STATE: + case gw_states_per_group_t::GW_IDLE_STATE: + // nothing to do + break; + + case gw_states_per_group_t::GW_WAIT_BLOCKLIST_CMPL: + { + cancel_timer(gw_id, group_key, grpid); + map_modified = true; + }break; + + case gw_states_per_group_t::GW_WAIT_FAILBACK_PREPARED: + cancel_timer(gw_id, group_key, grpid); + map_modified = true; + for (auto& gw_st: created_gws[group_key]) { + auto& st = gw_st.second; + if (st.sm_state[grpid] == gw_states_per_group_t::GW_OWNER_WAIT_FAILBACK_PREPARED) { // found GW that was intended for Failback for this ana grp + dout(4) << "Warning: Outgoing Failback when GW is down back - to rollback it" <<" GW " <mon->osdmon()->is_writeable() << dendl; + if (m->mon->osdmon()->is_writeable()) { + epoch_t epoch = m->mon->osdmon()->blocklist(addr_vect, expires); + dout(10) << "epoch " << epoch <mon->nvmegwmon()->request_proposal(m->mon->osdmon()); + } + else { + m->mon->osdmon()->wait_for_writeable_ctx( new CMonRequestProposal(m, addr_vect, expires)); + } + } +}; + +int NVMeofGwMap::blocklist_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId grpid, epoch_t &epoch, bool failover) +{ + NvmeGwMonState& gw_map = created_gws[group_key][gw_id]; //find_already_created_gw(gw_id, group_key); + + if (gw_map.nonce_map[grpid].size() > 0) { + NvmeNonceVector &nonce_vector = gw_map.nonce_map[grpid];; + std::string str = "["; + entity_addrvec_t addr_vect; + + double d = g_conf().get_val("mon_osd_blocklist_default_expire"); + utime_t expires = ceph_clock_now(); + expires += d; + dout(10) << " blocklist timestamp " << expires << dendl; + for (auto &it: nonce_vector ) { + if (str != "[") str += ","; + str += it; + } + str += "]"; + bool rc = addr_vect.parse(&str[0]); + dout(10) << str << " rc " << rc << " network vector: " << addr_vect << " " << addr_vect.size() << dendl; + if (rc) + return 1; + + if (!mon->osdmon()->is_writeable()) { + dout(10) << "osdmon is not writable, waiting, epoch = " << epoch << dendl; + mon->osdmon()->wait_for_writeable_ctx( new CMonRequestProposal(this, addr_vect, expires ));// return false; + } + else { + epoch = mon->osdmon()->blocklist(addr_vect, expires); + if (!mon->osdmon()->is_writeable()) { + dout(10) << "osdmon is not writable after blocklist is done, waiting, epoch = " << epoch << dendl; + mon->osdmon()->wait_for_writeable_ctx( new CMonRequestProposal(this, addr_vect, expires ));// return false; + } + else{ + mon->nvmegwmon()->request_proposal(mon->osdmon()); + } + } + dout(10) << str << " mon->osdmon()->blocklist: epoch : " << epoch << " address vector: " << addr_vect << " " << addr_vect.size() << dendl; + } + else{ + dout(4) << "Error: No nonces context present for gw: " <= to.data[to_itr.first].end_time) { + fsm_handle_to_expired(gw_id, std::make_pair(pool, group), to_itr.first, propose_pending); + } + } + } + } +} + +void NVMeofGwMap::start_timer(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId anagrpid, uint8_t value_sec) { + fsm_timers[group_key][gw_id].data[anagrpid].timer_started = 1; + fsm_timers[group_key][gw_id].data[anagrpid].timer_value = value_sec; + dout(10) << "start timer for ana " << anagrpid << " gw " << gw_id << "value sec " << (int)value_sec << dendl; + const auto now = std::chrono::system_clock::now(); + fsm_timers[group_key][gw_id].data[anagrpid].end_time = now + std::chrono::seconds(value_sec); +} + +int NVMeofGwMap::get_timer(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId anagrpid) { + auto timer = fsm_timers[group_key][gw_id].data[anagrpid].timer_value; + return timer; +} + +void NVMeofGwMap::cancel_timer(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId anagrpid) { + fsm_timers[group_key][gw_id].data[anagrpid].timer_started = 0; +} diff --git a/src/mon/NVMeofGwMap.h b/src/mon/NVMeofGwMap.h new file mode 100755 index 000000000000..2390176305b2 --- /dev/null +++ b/src/mon/NVMeofGwMap.h @@ -0,0 +1,95 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#ifndef MON_NVMEOFGWMAP_H_ +#define MON_NVMEOFGWMAP_H_ +#include +#include +#include "include/encoding.h" +#include "include/utime.h" +#include "common/Formatter.h" +#include "common/ceph_releases.h" +#include "common/version.h" +#include "common/options.h" +#include "common/Clock.h" +#include "msg/Message.h" +#include "common/ceph_time.h" +#include "NVMeofGwTypes.h" + +using ceph::coarse_mono_clock; +class Monitor; +/*-------------------*/ +class NVMeofGwMap +{ +public: + Monitor* mon = NULL; + epoch_t epoch = 0; // epoch is for Paxos synchronization mechanizm + bool delay_propose = false; + + std::map created_gws; + std::map fsm_timers;// map that handles timers started by all Gateway FSMs + void to_gmap(std::map& Gmap) const; + + int cfg_add_gw (const NvmeGwId &gw_id, const NvmeGroupKey& group_key); + int cfg_delete_gw (const NvmeGwId &gw_id, const NvmeGroupKey& group_key); + void process_gw_map_ka (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, epoch_t& last_osd_epoch, bool &propose_pending); + int process_gw_map_gw_down (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, bool &propose_pending); + void update_active_timers (bool &propose_pending); + void handle_abandoned_ana_groups (bool &propose_pending); + void handle_removed_subsystems (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, const std::vector ¤t_subsystems, bool &propose_pending); + void start_timer (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId anagrpid, uint8_t value); +private: + void add_grp_id (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, const NvmeAnaGrpId grpid); + void remove_grp_id(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, const NvmeAnaGrpId grpid); + void fsm_handle_gw_down (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, gw_states_per_group_t state, NvmeAnaGrpId grpid, bool &map_modified); + void fsm_handle_gw_delete (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, gw_states_per_group_t state, NvmeAnaGrpId grpid, bool &map_modified); + void fsm_handle_gw_alive (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeGwMonState & gw_state, gw_states_per_group_t state, + NvmeAnaGrpId grpid, epoch_t& last_osd_epoch, bool &map_modified); + void fsm_handle_to_expired (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId grpid, bool &map_modified); + + void find_failover_candidate(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId grpid, bool &propose_pending); + void find_failback_gw (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, bool &propose_pending); + void set_failover_gw_for_ANA_group (const NvmeGwId &failed_gw_id, const NvmeGroupKey& group_key, const NvmeGwId &gw_id, + NvmeAnaGrpId groupid); + + + int get_timer (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId anagrpid); + void cancel_timer(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId anagrpid); + void validate_gw_map(const NvmeGroupKey& group_key); + +public: + int blocklist_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId ANA_groupid, epoch_t &epoch, bool failover); + void encode(ceph::buffer::list &bl) const { + using ceph::encode; + ENCODE_START(1, 1, bl); + encode(epoch, bl);// global map epoch + + encode(created_gws, bl); //Encode created GWs + encode(fsm_timers, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator &bl) { + using ceph::decode; + DECODE_START(1, bl); + decode(epoch, bl); + + decode(created_gws, bl); + decode(fsm_timers, bl); + DECODE_FINISH(bl); + } +}; + +#include "NVMeofGwSerialize.h" + +#endif /* SRC_MON_NVMEOFGWMAP_H_ */ diff --git a/src/mon/NVMeofGwMon.cc b/src/mon/NVMeofGwMon.cc new file mode 100644 index 000000000000..6111f76f425d --- /dev/null +++ b/src/mon/NVMeofGwMon.cc @@ -0,0 +1,532 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include +#include "include/stringify.h" +#include "NVMeofGwMon.h" +#include "messages/MNVMeofGwBeacon.h" +#include "messages/MNVMeofGwMap.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define dout_prefix *_dout << "nvmeofgw " << __PRETTY_FUNCTION__ << " " + +using std::string; + +void NVMeofGwMon::init() { + dout(10) << "called " << dendl; +} + +void NVMeofGwMon::on_restart() { + dout(10) << "called " << dendl; + last_beacon.clear(); + last_tick = ceph::coarse_mono_clock::now(); + synchronize_last_beacon(); +} + + +void NVMeofGwMon::synchronize_last_beacon() { + dout(10) << "called, is leader : " << mon.is_leader() <<" active " << is_active() << dendl; + // Initialize last_beacon to identify transitions of available GWs to unavailable state + for (const auto& created_map_pair: map.created_gws) { + const auto& group_key = created_map_pair.first; + const NvmeGwMonStates& gw_created_map = created_map_pair.second; + for (const auto& gw_created_pair: gw_created_map) { + const auto& gw_id = gw_created_pair.first; + if (gw_created_pair.second.availability == gw_availability_t::GW_AVAILABLE) { + dout(10) << "synchronize last_beacon for GW :" << gw_id << dendl; + LastBeacon lb = {gw_id, group_key}; + last_beacon[lb] = last_tick; + } + } + } +} + +void NVMeofGwMon::on_shutdown() { + dout(10) << "called " << dendl; +} + +void NVMeofGwMon::tick() { + if (!is_active() || !mon.is_leader()) { + dout(10) << "NVMeofGwMon leader : " << mon.is_leader() << "active : " << is_active() << dendl; + return; + } + bool _propose_pending = false; + + const auto now = ceph::coarse_mono_clock::now(); + const auto nvmegw_beacon_grace = g_conf().get_val("mon_nvmeofgw_beacon_grace"); + dout(15) << "NVMeofGwMon leader got a tick, pending epoch "<< pending_map.epoch << dendl; + + const auto client_tick_period = g_conf().get_val("nvmeof_mon_client_tick_period"); + //handle exception of tick overdued in order to avoid false detection of overdued beacons, like it done in MgrMonitor::tick + if (last_tick != ceph::coarse_mono_clock::zero() + && (now - last_tick > (nvmegw_beacon_grace - client_tick_period))) { + // This case handles either local slowness (calls being delayed + // for whatever reason) or cluster election slowness (a long gap + // between calls while an election happened) + dout(10) << ": resetting beacon timeouts due to mon delay " + "(slow election?) of " << now - last_tick << " seconds" << dendl; + for (auto &i : last_beacon) { + i.second = now; + } + } + + last_tick = now; + bool propose = false; + + pending_map.update_active_timers(propose); // Periodic: check active FSM timers + _propose_pending |= propose; + + const auto cutoff = now - nvmegw_beacon_grace; + for (auto &itr : last_beacon) {// Pass over all the stored beacons + auto& lb = itr.first; + auto last_beacon_time = itr.second; + if (last_beacon_time < cutoff) { + dout(10) << "beacon timeout for GW " << lb.gw_id << dendl; + pending_map.process_gw_map_gw_down(lb.gw_id, lb.group_key, propose); + _propose_pending |= propose; + last_beacon.erase(lb); + } + else { + dout(20) << "beacon live for GW key: " << lb.gw_id << dendl; + } + } + + pending_map.handle_abandoned_ana_groups(propose); // Periodic: take care of not handled ANA groups + _propose_pending |= propose; + + if (_propose_pending) { + dout(10) << "propose pending " <("mon_max_nvmeof_epochs"); + if (map.epoch > max) { + return map.epoch - max; + } + return 0; +} + +void NVMeofGwMon::create_pending() { + + pending_map = map;// deep copy of the object + pending_map.epoch++; + dout(10) << " pending " << pending_map << dendl; +} + +void NVMeofGwMon::encode_pending(MonitorDBStore::TransactionRef t) { + + dout(10) << dendl; + ceph_assert(get_last_committed() + 1 == pending_map.epoch); + bufferlist bl; + pending_map.encode(bl); + put_version(t, pending_map.epoch, bl); + put_last_committed(t, pending_map.epoch); +} + +void NVMeofGwMon::update_from_paxos(bool *need_bootstrap) { + version_t version = get_last_committed(); + + if (version != map.epoch) { + dout(10) << " NVMeGW loading version " << version << " " << map.epoch << dendl; + + bufferlist bl; + int err = get_version(version, bl); + ceph_assert(err == 0); + + auto p = bl.cbegin(); + map.decode(p); + if (!mon.is_leader()) { + dout(10) << "leader map: " << map << dendl; + } + check_subs(true); + } +} + +void NVMeofGwMon::check_sub(Subscription *sub) +{ + dout(10) << "sub->next , map-epoch " << sub->next << " " << map.epoch << dendl; + if (sub->next <= map.epoch) + { + dout(10) << "Sending map to subscriber " << sub->session->con << " " << sub->session->con->get_peer_addr() << dendl; + sub->session->con->send_message2(make_message(map)); + + if (sub->onetime) { + mon.session_map.remove_sub(sub); + } else { + sub->next = map.epoch + 1; + } + } +} + +void NVMeofGwMon::check_subs(bool t) +{ + const std::string type = "NVMeofGw"; + dout(10) << "count " << mon.session_map.subs.count(type) << dendl; + + if (mon.session_map.subs.count(type) == 0) { + return; + } + for (auto sub : *(mon.session_map.subs[type])) { + check_sub(sub); + } +} + +bool NVMeofGwMon::preprocess_query(MonOpRequestRef op) { + dout(20) << dendl; + + auto m = op->get_req(); + switch (m->get_type()) { + case MSG_MNVMEOF_GW_BEACON: + return preprocess_beacon(op); + + case MSG_MON_COMMAND: + try { + return preprocess_command(op); + } catch (const bad_cmd_get& e) { + bufferlist bl; + mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed()); + return true; + } + + default: + mon.no_reply(op); + derr << "Unhandled message type " << m->get_type() << dendl; + return true; + } + return false; +} + +bool NVMeofGwMon::prepare_update(MonOpRequestRef op) { + auto m = op->get_req(); + switch (m->get_type()) { + case MSG_MNVMEOF_GW_BEACON: + return prepare_beacon(op); + + case MSG_MON_COMMAND: + try { + return prepare_command(op); + } catch (const bad_cmd_get& e) { + bufferlist bl; + mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed()); + return false; /* nothing to propose! */ + } + + default: + mon.no_reply(op); + dout(1) << "Unhandled message type " << m->get_type() << dendl; + return false; /* nothing to propose! */ + } +} + +bool NVMeofGwMon::preprocess_command(MonOpRequestRef op) +{ + dout(10) << dendl; + auto m = op->get_req(); + std::stringstream sstrm; + bufferlist rdata; + string rs; + int err = 0; + cmdmap_t cmdmap; + if (!cmdmap_from_json(m->cmd, &cmdmap, sstrm)) + { + string rs = sstrm.str(); + dout(4) << "Error : Invalid command " << m->cmd << "Error " << rs << dendl; + mon.reply_command(op, -EINVAL, rs, rdata, get_last_committed()); + return true; + } + + string prefix; + cmd_getval(cmdmap, "prefix", prefix); + dout(10) << "MonCommand : "<< prefix << dendl; + string format = cmd_getval_or(cmdmap, "format", "plain"); + boost::scoped_ptr f(Formatter::create(format)); + if (prefix == "nvme-gw show") { + std::string pool, group; + if (!f) { + f.reset(Formatter::create(format, "json-pretty", "json-pretty")); + } + cmd_getval(cmdmap, "pool", pool); + cmd_getval(cmdmap, "group", group); + auto group_key = std::make_pair(pool, group); + dout(10) <<"nvme-gw show pool "<< pool << " group "<< group << dendl; + + if (map.created_gws[group_key].size()) { + f->open_object_section("common"); + f->dump_unsigned("epoch", map.epoch); + f->dump_string("pool", pool); + f->dump_string("group", group); + f->dump_unsigned("num gws", map.created_gws[group_key].size()); + sstrm <<"[ "; + NvmeGwId gw_id; + for (auto& gw_created_pair: map.created_gws[group_key]) { + gw_id = gw_created_pair.first; + auto& st = gw_created_pair.second; + sstrm << st.ana_grp_id+1 << " "; + } + sstrm << "]"; + f->dump_string("Anagrp list", sstrm.str()); + f->close_section(); + + for (auto& gw_created_pair: map.created_gws[group_key]) { + auto& gw_id = gw_created_pair.first; + auto& state = gw_created_pair.second; + f->open_object_section("stat"); + f->dump_string("gw-id", gw_id); + f->dump_unsigned("anagrp-id",state.ana_grp_id+1); + f->dump_unsigned("performed-full-startup", state.performed_full_startup); + std::stringstream sstrm1; + sstrm1 << state.availability; + f->dump_string("Availability", sstrm1.str()); + sstrm1.str(""); + for (auto &state_itr: map.created_gws[group_key][gw_id].sm_state) { + sstrm1 << " " << state_itr.first + 1 << ": " << state.sm_state[state_itr.first] << ","; + } + f->dump_string("ana states", sstrm1.str()); + f->close_section(); + } + f->flush(rdata); + sstrm.str(""); + } + else { + sstrm << "num_gws 0"; + } + getline(sstrm, rs); + mon.reply_command(op, err, rs, rdata, get_last_committed()); + return true; + } + return false; +} + +bool NVMeofGwMon::prepare_command(MonOpRequestRef op) +{ + dout(10) << dendl; + auto m = op->get_req(); + int rc; + std::stringstream sstrm; + bufferlist rdata; + string rs; + int err = 0; + cmdmap_t cmdmap; + bool response = false; + + if (!cmdmap_from_json(m->cmd, &cmdmap, sstrm)) + { + string rs = sstrm.str(); + mon.reply_command(op, -EINVAL, rs, rdata, get_last_committed()); + return true; + } + + string format = cmd_getval_or(cmdmap, "format", "plain"); + boost::scoped_ptr f(Formatter::create(format)); + + const auto prefix = cmd_getval_or(cmdmap, "prefix", string{}); + + dout(10) << "MonCommand : "<< prefix << dendl; + if (prefix == "nvme-gw create" || prefix == "nvme-gw delete") { + std::string id, pool, group; + + cmd_getval(cmdmap, "id", id); + cmd_getval(cmdmap, "pool", pool); + cmd_getval(cmdmap, "group", group); + auto group_key = std::make_pair(pool, group); + dout(10) << " id "<< id <<" pool "<< pool << " group "<< group << dendl; + if (prefix == "nvme-gw create") { + rc = pending_map.cfg_add_gw(id, group_key); + if (rc == -EINVAL) { + err = rc; + dout (4) << "Error: GW cannot be created " << id << " " << pool << " " << group << " rc " << rc << dendl; + sstrm.str(""); + } + } + else{ + rc = pending_map.cfg_delete_gw(id, group_key); + if (rc == -EINVAL) { + dout (4) << "Error: GW not found in the database " << id << " " << pool << " " << group << " rc " << rc << dendl; + err = 0; + sstrm.str(""); + } + } + if ((rc != -EEXIST) && (rc != -EINVAL)) //propose pending would be generated by the PaxosService + response = true; + } + + getline(sstrm, rs); + if (response == false) { + if (err < 0 && rs.length() == 0) + { + rs = cpp_strerror(err); + dout(10) << "Error command err : "<< err << " rs-len: " << rs.length() << dendl; + } + mon.reply_command(op, err, rs, rdata, get_last_committed()); + } + else + wait_for_commit(op, new Monitor::C_Command(mon, op, 0, rs, + get_last_committed() + 1)); + return response; +} + + +bool NVMeofGwMon::preprocess_beacon(MonOpRequestRef op) { + auto m = op->get_req(); + const BeaconSubsystems& sub = m->get_subsystems(); + dout(15) << "beacon from " << m->get_type() << " GW : " << m->get_gw_id() << " num subsystems " << sub.size() << dendl; + + return false; // allways return false to call leader's prepare beacon +} + + +bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op) { + auto m = op->get_req(); + + dout(20) << "availability " << m->get_availability() << " GW : " << m->get_gw_id() << + " osdmap_epoch " << m->get_last_osd_epoch() << " subsystems " << m->get_subsystems() << dendl; + + NvmeGwId gw_id = m->get_gw_id(); + NvmeGroupKey group_key = std::make_pair(m->get_gw_pool(), m->get_gw_group()); + gw_availability_t avail = m->get_availability(); + bool propose = false; + bool nonce_propose = false; + bool timer_propose = false; + bool gw_created = true; + NVMeofGwMap ack_map; + auto& group_gws = map.created_gws[group_key]; + auto gw = group_gws.find(gw_id); + const BeaconSubsystems& sub = m->get_subsystems(); + + if (avail == gw_availability_t::GW_CREATED) { + if (gw == group_gws.end()) { + gw_created = false; + dout(10) << "Warning: GW " << gw_id << " group_key " << group_key << " was not found in the map.Created_gws "<< map.created_gws <(ack_map); + mon.send_reply(op, msg.detach()); + goto false_return; + } + } + + // At this stage the gw has to be in the Created_gws + if (gw == group_gws.end()) { + dout(4) << "Administratively deleted GW sends beacon " << gw_id <get_nonce_map().size()) { + if (pending_map.created_gws[group_key][gw_id].nonce_map != m->get_nonce_map()) + { + dout(10) << "nonce map of GW changed , propose pending " << gw_id << dendl; + pending_map.created_gws[group_key][gw_id].nonce_map = m->get_nonce_map(); + dout(10) << "nonce map of GW " << gw_id << " "<< pending_map.created_gws[group_key][gw_id].nonce_map << dendl; + nonce_propose = true; + } + } + else { + dout(10) << "Warning: received empty nonce map in the beacon of GW " << gw_id << " "<< dendl; + } + + if (sub.size() == 0) { + avail = gw_availability_t::GW_UNAVAILABLE; + } + if (pending_map.created_gws[group_key][gw_id].subsystems != sub) + { + dout(10) << "subsystems of GW changed, propose pending " << gw_id << dendl; + pending_map.created_gws[group_key][gw_id].subsystems = sub; + dout(20) << "subsystems of GW " << gw_id << " "<< pending_map.created_gws[group_key][gw_id].subsystems << dendl; + nonce_propose = true; + } + pending_map.created_gws[group_key][gw_id].last_gw_map_epoch_valid = ( map.epoch == m->get_last_gwmap_epoch() ); + if (pending_map.created_gws[group_key][gw_id].last_gw_map_epoch_valid == false) { + dout(20) << "map epoch of gw is not up-to-date " << gw_id << " epoch " << map.epoch << " beacon_epoch " << m->get_last_gwmap_epoch() << dendl; + } + if (avail == gw_availability_t::GW_AVAILABLE) + { + auto now = ceph::coarse_mono_clock::now(); + // check pending_map.epoch vs m->get_version() - if different - drop the beacon + + LastBeacon lb = {gw_id, group_key}; + last_beacon[lb] = now; + epoch_t last_osd_epoch = m->get_last_osd_epoch(); + pending_map.process_gw_map_ka(gw_id, group_key, last_osd_epoch, propose); + } + else if (avail == gw_availability_t::GW_UNAVAILABLE) { // state set by GW client application + LastBeacon lb = {gw_id, group_key}; + + auto it = last_beacon.find(lb); + if (it != last_beacon.end()) { + last_beacon.erase(lb); + pending_map.process_gw_map_gw_down(gw_id, group_key, propose); + } + } + pending_map.update_active_timers(timer_propose); // Periodic: check active FSM timers + propose |= timer_propose; + propose |= nonce_propose; + +set_propose: + if (!propose) { + if (gw_created) { + ack_map.created_gws[group_key][gw_id] = map.created_gws[group_key][gw_id];// respond with a map slice correspondent to the same GW + } + ack_map.epoch = map.epoch; + dout(20) << "ack_map " << ack_map <(ack_map); + mon.send_reply(op, msg.detach()); + } + else { + mon.no_reply(op); + } +false_return: + if (propose) { + dout(10) << "decision in prepare_beacon" < last_beacon; + ceph::coarse_mono_clock::time_point last_tick; + +public: + NVMeofGwMon(Monitor &mn, Paxos &p, const std::string& service_name): PaxosService(mn, p, service_name) {map.mon = &mn;} + ~NVMeofGwMon() override {} + + + // config observer + const char** get_tracked_conf_keys() const override; + void handle_conf_change(const ConfigProxy& conf, const std::set &changed) override {}; + + // 3 pure virtual methods of the paxosService + void create_initial() override {}; + void create_pending() override; + void encode_pending(MonitorDBStore::TransactionRef t) override; + + void init() override; + void on_shutdown() override; + void on_restart() override; + void update_from_paxos(bool *need_bootstrap) override; + + version_t get_trim_to() const override; + + bool preprocess_query(MonOpRequestRef op) override; + bool prepare_update(MonOpRequestRef op) override; + + bool preprocess_command(MonOpRequestRef op); + bool prepare_command(MonOpRequestRef op); + + void encode_full(MonitorDBStore::TransactionRef t) override { } + + bool preprocess_beacon(MonOpRequestRef op); + bool prepare_beacon(MonOpRequestRef op); + + void tick() override; + void print_summary(ceph::Formatter *f, std::ostream *ss) const; + + void check_subs(bool type); + void check_sub(Subscription *sub); + +private: + void synchronize_last_beacon(); + +}; + +#endif /* MON_NVMEGWMONITOR_H_ */ diff --git a/src/mon/NVMeofGwSerialize.h b/src/mon/NVMeofGwSerialize.h new file mode 100755 index 000000000000..cd7055413727 --- /dev/null +++ b/src/mon/NVMeofGwSerialize.h @@ -0,0 +1,610 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ +#ifndef MON_NVMEOFGWSERIALIZE_H_ +#define MON_NVMEOFGWSERIALIZE_H_ +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define MODULE_PREFFIX "nvmeofgw " +#define dout_prefix *_dout << MODULE_PREFFIX << __PRETTY_FUNCTION__ << " " + +inline std::ostream& operator<<(std::ostream& os, const gw_exported_states_per_group_t value) { + switch (value) { + case gw_exported_states_per_group_t::GW_EXPORTED_OPTIMIZED_STATE: os << "OPTIMIZED "; break; + case gw_exported_states_per_group_t::GW_EXPORTED_INACCESSIBLE_STATE: os << "INACCESSIBLE "; break; + default: os << "Invalid " << (int)value << " "; + } + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const gw_states_per_group_t value) { + switch (value) { + case gw_states_per_group_t::GW_IDLE_STATE: os << "IDLE "; break; + case gw_states_per_group_t::GW_STANDBY_STATE: os << "STANDBY "; break; + case gw_states_per_group_t::GW_ACTIVE_STATE: os << "ACTIVE "; break; + case gw_states_per_group_t::GW_OWNER_WAIT_FAILBACK_PREPARED: os << "OWNER_FAILBACK_PREPARED "; break; + case gw_states_per_group_t::GW_WAIT_FAILBACK_PREPARED: os << "WAIT_FAILBACK_PREPARED "; break; + case gw_states_per_group_t::GW_WAIT_BLOCKLIST_CMPL: os << "WAIT_BLOCKLIST_CMPL "; break; + default: os << "Invalid " << (int)value << " "; + } + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const gw_availability_t value) { + switch (value) { + + case gw_availability_t::GW_CREATED: os << "CREATED"; break; + case gw_availability_t::GW_AVAILABLE: os << "AVAILABLE"; break; + case gw_availability_t::GW_UNAVAILABLE: os << "UNAVAILABLE"; break; + + default: os << "Invalid " << (int)value << " "; + } + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const SmState value) { + os << "SM_STATE [ "; + for (auto& state_itr: value ) + os << value.at(state_itr.first); + os << "]"; + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const BeaconNamespace value) { + os << "BeaconNamespace( anagrpid:" << value.anagrpid << ", nonce:" << value.nonce <<" )"; + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const BeaconListener value) { + os << "BeaconListener( addrfam:" << value.address_family + << ", addr:" << value.address + << ", svcid:" << value.svcid << " )"; + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const BeaconSubsystem value) { + os << "BeaconSubsystem( nqn:" << value.nqn << ", listeners [ "; + for (const auto& list: value.listeners) os << list << " "; + os << "] namespaces [ "; + for (const auto& ns: value.namespaces) os << ns << " "; + os << "] )"; + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const NqnState value) { + os << "NqnState( nqn: " << value.nqn << ", " << value.ana_state << " )"; + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const NvmeGwClientState value) { + os << "NvmeGwState { group id: " << value.group_id << " gw_map_epoch " << value.gw_map_epoch << " availablilty "<< value.availability + << " GwSubsystems: [ "; + for (const auto& sub: value.subsystems) os << sub.second << " "; + os << " ] }"; + + return os; +}; + +inline std::ostream& operator<<(std::ostream& os, const NvmeGroupKey value) { + os << "NvmeGroupKey {" << value.first << "," << value.second << "}"; + return os; +}; + +inline std::ostream& operator<<(std::ostream& os, const NvmeGwMonClientStates value) { + os << "NvmeGwMap "; + for (auto& gw_state: value) { + os << "\n" << MODULE_PREFFIX <<" { == gw_id: " << gw_state.first << " -> " << gw_state.second << "}"; + } + os << "}"; + + return os; +}; + +inline std::ostream& operator<<(std::ostream& os, const NvmeNonceVector value) { + for (auto & nonces : value) { + os << nonces << " "; + } + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const NvmeAnaNonceMap value) { + if(value.size()) os << "\n" << MODULE_PREFFIX; + for (auto &nonce_map : value) { + os << " ana_grp: " << nonce_map.first << " [ " << nonce_map.second << "]\n"<< MODULE_PREFFIX ; + } + return os; +} + +inline std::ostream& print_gw_created_t(std::ostream& os, const NvmeGwMonState value, size_t num_ana_groups) { + os << "==Internal map ==NvmeGwCreated { ana_group_id " << value.ana_grp_id << " osd_epochs: "; + for (auto& blklst_itr: value.blocklist_data) + { + os << " " << blklst_itr.first <<": " << blklst_itr.second.osd_epoch << ":" < { " << group_gws.second << " }"; + } + os << "]"; + return os; +} + +inline void encode(const ana_state_t& st, ceph::bufferlist &bl) { + ENCODE_START(1, 1, bl); + encode((uint32_t)st.size(), bl); + for (const auto& gr: st) { + encode((uint32_t)gr.first, bl); + encode((uint32_t)gr.second, bl); + } + ENCODE_FINISH(bl); +} + +inline void decode(ana_state_t& st, ceph::buffer::list::const_iterator &bl) { + uint32_t n; + DECODE_START(1, bl); + decode(n, bl); + st.resize(n); + for (uint32_t i = 0; i < n; i++) { + uint32_t a, b; + decode(a, bl); + decode(b, bl); + st[i].first = (gw_exported_states_per_group_t)a; + st[i].second = (epoch_t)b; + } + DECODE_FINISH(bl); +} + +inline void encode(const GwSubsystems& subsystems, ceph::bufferlist &bl) { + ENCODE_START(1, 1, bl); + encode((uint32_t)subsystems.size(), bl); + for (const auto& sub: subsystems) { + encode(sub.second.nqn, bl); + encode(sub.second.ana_state, bl); + } + ENCODE_FINISH(bl); +} + +inline void decode(GwSubsystems& subsystems, ceph::bufferlist::const_iterator& bl) { + uint32_t num_subsystems; + DECODE_START(1, bl); + decode(num_subsystems, bl); + subsystems.clear(); + for (uint32_t i=0; i(endtime.time_since_epoch()).count(); + encode(millisecondsSinceEpoch , bl); + } + ENCODE_FINISH(bl); +} + +inline void decode(NvmeGwTimerState& state, ceph::bufferlist::const_iterator& bl) { + uint32_t size; + DECODE_START(1, bl); + decode(size, bl); + for (uint32_t i = 0; i (duration); + state.data[tm_key] = tm; + } + DECODE_FINISH(bl); +} + +inline void encode(const NvmeAnaNonceMap& nonce_map, ceph::bufferlist &bl) { + ENCODE_START(1, 1, bl); + encode((uint32_t)nonce_map.size(), bl); + for (auto& ana_group_nonces : nonce_map) { + encode(ana_group_nonces.first, bl); // ana group id + encode ((uint32_t)ana_group_nonces.second.size(), bl); // encode the vector size + for (auto& nonce: ana_group_nonces.second) encode(nonce, bl); + } + ENCODE_FINISH(bl); +} + +inline void decode(NvmeAnaNonceMap& nonce_map, ceph::buffer::list::const_iterator &bl) { + uint32_t map_size; + NvmeAnaGrpId ana_grp_id; + uint32_t vector_size; + std::string nonce; + DECODE_START(1, bl); + decode(map_size, bl); + for (uint32_t i = 0; i& created_gws, ceph::bufferlist &bl) { + ENCODE_START(1, 1, bl); + encode ((uint32_t)created_gws.size(), bl); // number of groups + for (auto& group_gws: created_gws) { + auto& group_key = group_gws.first; + encode(group_key.first, bl); // pool + encode(group_key.second, bl); // group + + auto& gws = group_gws.second; + encode (gws, bl); // encode group gws + } + ENCODE_FINISH(bl); +} + +inline void decode(std::map& created_gws, ceph::buffer::list::const_iterator &bl) { + created_gws.clear(); + uint32_t ngroups; + DECODE_START(1, bl); + decode(ngroups, bl); + for (uint32_t i = 0; i& gmap, ceph::bufferlist &bl) { + ENCODE_START(1, 1, bl); + encode ((uint32_t)gmap.size(), bl); // number of groups + for (auto& group_state: gmap) { + auto& group_key = group_state.first; + encode(group_key.first, bl); // pool + encode(group_key.second, bl); // group + encode(group_state.second, bl); + } + ENCODE_FINISH(bl); +} +// Start decode NvmeGroupKey, NvmeGwMap +inline void decode(std::map& gmap, ceph::buffer::list::const_iterator &bl) { + gmap.clear(); + uint32_t ngroups; + DECODE_START(1, bl); + decode(ngroups, bl); + for (uint32_t i = 0; i& gmetadata, ceph::bufferlist &bl) { + ENCODE_START(1, 1, bl); + encode ((uint32_t)gmetadata.size(), bl); // number of groups + for (auto& group_md: gmetadata) { + auto& group_key = group_md.first; + encode(group_key.first, bl); // pool + encode(group_key.second, bl); // group + + encode(group_md.second, bl); + } + ENCODE_FINISH(bl); +} + +inline void decode(std::map& gmetadata, ceph::buffer::list::const_iterator &bl) { + gmetadata.clear(); + uint32_t ngroups; + DECODE_START(1, bl); + decode(ngroups, bl); + for (uint32_t i = 0; i +#include +#include +#include + +using NvmeGwId = std::string; +using NvmeGroupKey = std::pair; +using NvmeNqnId = std::string; +using NvmeAnaGrpId = uint32_t; + + +enum class gw_states_per_group_t { + GW_IDLE_STATE = 0, //invalid state + GW_STANDBY_STATE, + GW_ACTIVE_STATE, + GW_OWNER_WAIT_FAILBACK_PREPARED, + GW_WAIT_FAILBACK_PREPARED, + GW_WAIT_BLOCKLIST_CMPL +}; + +enum class gw_exported_states_per_group_t { + GW_EXPORTED_OPTIMIZED_STATE = 0, + GW_EXPORTED_INACCESSIBLE_STATE +}; + +enum class gw_availability_t { + GW_CREATED = 0, + GW_AVAILABLE, + GW_UNAVAILABLE, + GW_DELETED +}; + +#define REDUNDANT_GW_ANA_GROUP_ID 0xFF +using SmState = std::map < NvmeAnaGrpId, gw_states_per_group_t>; + +using ana_state_t = std::vector>; + +struct BeaconNamespace { + NvmeAnaGrpId anagrpid; + std::string nonce; + + // Define the equality operator + bool operator==(const BeaconNamespace& other) const { + return anagrpid == other.anagrpid && + nonce == other.nonce; + } +}; + +// Beacon Listener represents an NVME Subsystem listener, +// which generally does not have to use TCP/IP. +// It is derived from the SPDK listener JSON RPC representation. +// For more details, see https://spdk.io/doc/jsonrpc.html#rpc_nvmf_listen_address. +struct BeaconListener { + std::string address_family; // IPv4 or IPv6 + std::string address; // + std::string svcid; // port + + // Define the equality operator + bool operator==(const BeaconListener& other) const { + return address_family == other.address_family && + address == other.address && + svcid == other.svcid; + } +}; + +struct BeaconSubsystem { + NvmeNqnId nqn; + std::list listeners; + std::list namespaces; + + // Define the equality operator + bool operator==(const BeaconSubsystem& other) const { + return nqn == other.nqn && + listeners == other.listeners && + namespaces == other.namespaces; + } +}; + +using BeaconSubsystems = std::list; + +using NvmeNonceVector = std::vector; +using NvmeAnaNonceMap = std::map ; + +struct Blocklist_data{ + epoch_t osd_epoch; + bool is_failover; + Blocklist_data() { + osd_epoch = 0; + is_failover = true; + }; + Blocklist_data(epoch_t epoch, bool failover):osd_epoch(epoch), is_failover(failover) {}; +}; + +using BlocklistData = std::map < NvmeAnaGrpId, Blocklist_data>; + +struct NvmeGwMonState { + NvmeAnaGrpId ana_grp_id; // ana-group-id allocated for this GW, GW owns this group-id + gw_availability_t availability; // in absence of beacon heartbeat messages it becomes inavailable + bool last_gw_map_epoch_valid; // "true" if the last epoch seen by the gw-client is up-to-date + bool performed_full_startup; // in order to identify gws that did not exit upon failover + BeaconSubsystems subsystems; // gateway susbsystem and their state machine states + NvmeAnaNonceMap nonce_map; + SmState sm_state; // state machine states per ANA group + BlocklistData blocklist_data; + + NvmeGwMonState(): ana_grp_id(REDUNDANT_GW_ANA_GROUP_ID) {}; + + NvmeGwMonState(NvmeAnaGrpId id): ana_grp_id(id), availability(gw_availability_t::GW_CREATED), last_gw_map_epoch_valid(false), + performed_full_startup(false) {}; + void set_unavailable_state() { + availability = gw_availability_t::GW_UNAVAILABLE; + performed_full_startup = false; // after setting this state the next time monitor sees GW, it expects it performed the full startup + } + void standby_state(NvmeAnaGrpId grpid) { + sm_state[grpid] = gw_states_per_group_t::GW_STANDBY_STATE; + }; + void active_state(NvmeAnaGrpId grpid) { + sm_state[grpid] = gw_states_per_group_t::GW_ACTIVE_STATE; + blocklist_data[grpid].osd_epoch = 0; + }; +}; + +struct NqnState { + std::string nqn; // subsystem NQN + ana_state_t ana_state; // subsystem's ANA state + + // constructors + NqnState(const std::string& _nqn, const ana_state_t& _ana_state): + nqn(_nqn), ana_state(_ana_state) {} + NqnState(const std::string& _nqn, const SmState& sm_state, const NvmeGwMonState & gw_created) : nqn(_nqn) { + uint32_t i = 0; + for (auto& state_itr: sm_state) { + if (state_itr.first > i) { + uint32_t num_to_add = state_itr.first - i; + for (uint32_t j = 0; j state_pair; + state_pair.first = gw_exported_states_per_group_t::GW_EXPORTED_INACCESSIBLE_STATE; + state_pair.second = 0; + ana_state.push_back(state_pair); + } + i += num_to_add; + } + std::pair state_pair; + state_pair.first = (sm_state.at(state_itr.first) == gw_states_per_group_t::GW_ACTIVE_STATE + || sm_state.at(state_itr.first) == gw_states_per_group_t::GW_WAIT_BLOCKLIST_CMPL) + ? gw_exported_states_per_group_t::GW_EXPORTED_OPTIMIZED_STATE + : gw_exported_states_per_group_t::GW_EXPORTED_INACCESSIBLE_STATE; + state_pair.second = gw_created.blocklist_data.at(state_itr.first).osd_epoch; + ana_state.push_back(state_pair); + i ++; + } + } +}; + +typedef std::map GwSubsystems; + +struct NvmeGwClientState { + NvmeAnaGrpId group_id; + epoch_t gw_map_epoch; + GwSubsystems subsystems; + gw_availability_t availability; + NvmeGwClientState(NvmeAnaGrpId id, epoch_t epoch, gw_availability_t available): + group_id(id), + gw_map_epoch(epoch), + availability(available) + {}; + + NvmeGwClientState() : NvmeGwClientState(REDUNDANT_GW_ANA_GROUP_ID, 0, gw_availability_t::GW_UNAVAILABLE) {}; +}; + + +struct Tmdata{ + uint32_t timer_started; // statemachine timer(timestamp) set in some state + uint8_t timer_value; + std::chrono::system_clock::time_point end_time; + Tmdata() { + timer_started = 0; + timer_value = 0; + } +}; + +using TmData = std::map < NvmeAnaGrpId, Tmdata>; + +struct NvmeGwTimerState { + TmData data; + NvmeGwTimerState() {}; +}; + +using NvmeGwMonClientStates = std::map; +using NvmeGwTimers = std::map; +using NvmeGwMonStates = std::map; + +#endif /* SRC_MON_NVMEOFGWTYPES_H_ */ diff --git a/src/mon/mon_types.h b/src/mon/mon_types.h index 3429a8e99916..9dd2797852d4 100644 --- a/src/mon/mon_types.h +++ b/src/mon/mon_types.h @@ -36,6 +36,7 @@ enum { PAXOS_HEALTH, PAXOS_CONFIG, PAXOS_KV, + PAXOS_NVMEGW, PAXOS_NUM }; diff --git a/src/msg/Message.cc b/src/msg/Message.cc index 22208d2d1f42..f649e0f3d3ee 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -219,6 +219,9 @@ #include "messages/MOSDPGUpdateLogMissing.h" #include "messages/MOSDPGUpdateLogMissingReply.h" +#include "messages/MNVMeofGwBeacon.h" +#include "messages/MNVMeofGwMap.h" + #ifdef WITH_BLKIN #include "Messenger.h" #endif @@ -885,6 +888,10 @@ Message *decode_message(CephContext *cct, m = make_message(); break; + case MSG_MNVMEOF_GW_BEACON: + m = make_message(); + break; + case MSG_MON_MGR_REPORT: m = make_message(); break; @@ -944,6 +951,9 @@ Message *decode_message(CephContext *cct, m = make_message(); break; + case MSG_MNVMEOF_GW_MAP: + m = make_message(); + break; // -- simple messages without payload -- case CEPH_MSG_SHUTDOWN: diff --git a/src/msg/Message.h b/src/msg/Message.h index 15eb3feadced..78557f90e48f 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -242,6 +242,12 @@ // *** ceph-mgr <-> MON daemons *** #define MSG_MGR_UPDATE 0x70b +// *** nvmeof mon -> gw daemons *** +#define MSG_MNVMEOF_GW_MAP 0x800 + +// *** gw daemons -> nvmeof mon *** +#define MSG_MNVMEOF_GW_BEACON 0x801 + // ====================================================== // abstract Message class diff --git a/src/nvmeof/NVMeofGwClient.cc b/src/nvmeof/NVMeofGwClient.cc new file mode 100644 index 000000000000..c82423de5158 --- /dev/null +++ b/src/nvmeof/NVMeofGwClient.cc @@ -0,0 +1,32 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "NVMeofGwClient.h" + +bool NVMeofGwClient::get_subsystems(subsystems_info& reply) { + get_subsystems_req request; + ClientContext context; + + Status status = stub_->get_subsystems(&context, request, &reply); + + return status.ok(); +} + +bool NVMeofGwClient::set_ana_state(const ana_info& info) { + req_status reply; + ClientContext context; + + Status status = stub_->set_ana_state(&context, info, &reply); + + return status.ok() && reply.status(); +} diff --git a/src/nvmeof/NVMeofGwClient.h b/src/nvmeof/NVMeofGwClient.h new file mode 100644 index 000000000000..022485251d6b --- /dev/null +++ b/src/nvmeof/NVMeofGwClient.h @@ -0,0 +1,40 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + + +#ifndef __NVMEOFGWCLIENT_H__ +#define __NVMEOFGWCLIENT_H__ +#include +#include +#include + +#include + +#include "gateway.grpc.pb.h" + +using grpc::Channel; +using grpc::ClientContext; +using grpc::Status; + +class NVMeofGwClient { + public: + NVMeofGwClient(std::shared_ptr channel) + : stub_(Gateway::NewStub(channel)) {} + + bool get_subsystems(subsystems_info& reply); + bool set_ana_state(const ana_info& info); + + private: + std::unique_ptr stub_; +}; +#endif diff --git a/src/nvmeof/NVMeofGwMonitorClient.cc b/src/nvmeof/NVMeofGwMonitorClient.cc new file mode 100644 index 000000000000..fc4358f07d4d --- /dev/null +++ b/src/nvmeof/NVMeofGwMonitorClient.cc @@ -0,0 +1,451 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023,2024 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include + +#include "common/errno.h" +#include "common/signal.h" +#include "common/ceph_argparse.h" +#include "include/compat.h" + +#include "include/stringify.h" +#include "global/global_context.h" +#include "global/signal_handler.h" + + +#include "messages/MNVMeofGwBeacon.h" +#include "messages/MNVMeofGwMap.h" +#include "NVMeofGwMonitorClient.h" +#include "NVMeofGwClient.h" +#include "NVMeofGwMonitorGroupClient.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define dout_prefix *_dout << "nvmeofgw " << __PRETTY_FUNCTION__ << " " + +NVMeofGwMonitorClient::NVMeofGwMonitorClient(int argc, const char **argv) : + Dispatcher(g_ceph_context), + osdmap_epoch(0), + gwmap_epoch(0), + last_map_time(std::chrono::steady_clock::now()), + monc{g_ceph_context, poolctx}, + client_messenger(Messenger::create(g_ceph_context, "async", entity_name_t::CLIENT(-1), "client", getpid())), + objecter{g_ceph_context, client_messenger.get(), &monc, poolctx}, + client{client_messenger.get(), &monc, &objecter}, + timer(g_ceph_context, lock), + orig_argc(argc), + orig_argv(argv) +{ +} + +NVMeofGwMonitorClient::~NVMeofGwMonitorClient() = default; + +const char** NVMeofGwMonitorClient::get_tracked_conf_keys() const +{ + static const char* KEYS[] = { + NULL + }; + return KEYS; +} + +std::string read_file(const std::string& filename) { + std::ifstream file(filename); + std::string content((std::istreambuf_iterator(file)), std::istreambuf_iterator()); + return content; +} + +void NVMeofGwMonitorClient::init_gw_ssl_opts() +{ + if (server_cert.empty() && client_key.empty() && client_cert.empty()) + return; + + // load the certificates content + // create SSL/TLS credentials + gw_ssl_opts.pem_root_certs = read_file(server_cert); + gw_ssl_opts.pem_private_key = read_file(client_key); + gw_ssl_opts.pem_cert_chain = read_file(client_cert); +} + +std::shared_ptr NVMeofGwMonitorClient::gw_creds() +{ + // use insecure channel if no keys/certs defined + if (server_cert.empty() && client_key.empty() && client_cert.empty()) + return grpc::InsecureChannelCredentials(); + else + return grpc::SslCredentials(gw_ssl_opts); +} + +int NVMeofGwMonitorClient::init() +{ + dout(10) << dendl; + std::string val; + auto args = argv_to_vec(orig_argc, orig_argv); + + for (std::vector::iterator i = args.begin(); i != args.end(); ) { + if (ceph_argparse_double_dash(args, i)) { + break; + } else if (ceph_argparse_witharg(args, i, &val, "--gateway-name", (char*)NULL)) { + name = val; + } else if (ceph_argparse_witharg(args, i, &val, "--gateway-pool", (char*)NULL)) { + pool = val; + } else if (ceph_argparse_witharg(args, i, &val, "--gateway-group", (char*)NULL)) { + group = val; + } else if (ceph_argparse_witharg(args, i, &val, "--gateway-address", (char*)NULL)) { + gateway_address = val; + } else if (ceph_argparse_witharg(args, i, &val, "--monitor-group-address", (char*)NULL)) { + monitor_address = val; + } else if (ceph_argparse_witharg(args, i, &val, "--server-cert", (char*)NULL)) { + server_cert = val; + } else if (ceph_argparse_witharg(args, i, &val, "--client-key", (char*)NULL)) { + client_key = val; + } else if (ceph_argparse_witharg(args, i, &val, "--client-cert", (char*)NULL)) { + client_cert = val; + } else { + ++i; + } + } + + dout(10) << "gateway name: " << name << + " pool:" << pool << + " group:" << group << + " address: " << gateway_address << dendl; + ceph_assert(name != "" && pool != "" && gateway_address != "" && monitor_address != ""); + + // ensures that either all are empty or all are non-empty. + ceph_assert((server_cert.empty() == client_key.empty()) && (client_key.empty() == client_cert.empty())); + init_gw_ssl_opts(); + + init_async_signal_handler(); + register_async_signal_handler(SIGHUP, sighup_handler); + + std::lock_guard l(lock); + + // Initialize Messenger + client_messenger->add_dispatcher_tail(this); + client_messenger->add_dispatcher_head(&objecter); + client_messenger->add_dispatcher_tail(&client); + client_messenger->start(); + + poolctx.start(2); + + // Initialize MonClient + if (monc.build_initial_monmap() < 0) { + client_messenger->shutdown(); + client_messenger->wait(); + return -1; + } + + monc.sub_want("NVMeofGw", 0, 0); + monc.set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD + |CEPH_ENTITY_TYPE_MDS|CEPH_ENTITY_TYPE_MGR); + monc.set_messenger(client_messenger.get()); + + // We must register our config callback before calling init(), so + // that we see the initial configuration message + monc.register_config_callback([this](const std::string &k, const std::string &v){ + // leaving this for debugging purposes + dout(10) << "nvmeof config_callback: " << k << " : " << v << dendl; + + return false; + }); + monc.register_config_notify_callback([this]() { + dout(4) << "nvmeof monc config notify callback" << dendl; + }); + dout(4) << "nvmeof Registered monc callback" << dendl; + + int r = monc.init(); + if (r < 0) { + monc.shutdown(); + client_messenger->shutdown(); + client_messenger->wait(); + return r; + } + dout(10) << "nvmeof Registered monc callback" << dendl; + + r = monc.authenticate(); + if (r < 0) { + derr << "Authentication failed, did you specify an ID with a valid keyring?" << dendl; + monc.shutdown(); + client_messenger->shutdown(); + client_messenger->wait(); + return r; + } + dout(10) << "monc.authentication done" << dendl; + monc.set_passthrough_monmap(); + + client_t whoami = monc.get_global_id(); + client_messenger->set_myname(entity_name_t::MGR(whoami.v)); + objecter.set_client_incarnation(0); + objecter.init(); + objecter.enable_blocklist_events(); + objecter.start(); + client.init(); + timer.init(); + + tick(); + + dout(10) << "Complete." << dendl; + return 0; +} + +static bool get_gw_state(const char* desc, const std::map& m, const NvmeGroupKey& group_key, const NvmeGwId& gw_id, NvmeGwClientState& out) +{ + auto gw_group = m.find(group_key); + if (gw_group == m.end()) { + dout(10) << "can not find group (" << group_key.first << "," << group_key.second << ") " << desc << " map: " << m << dendl; + return false; + } + auto gw_state = gw_group->second.find(gw_id); + if (gw_state == gw_group->second.end()) { + dout(10) << "can not find gw id: " << gw_id << " in " << desc << "group: " << gw_group->second << dendl; + return false; + } + out = gw_state->second; + return true; +} + +void NVMeofGwMonitorClient::send_beacon() +{ + ceph_assert(ceph_mutex_is_locked_by_me(lock)); + gw_availability_t gw_availability = gw_availability_t::GW_CREATED; + BeaconSubsystems subs; + NVMeofGwClient gw_client( + grpc::CreateChannel(gateway_address, gw_creds())); + subsystems_info gw_subsystems; + bool ok = gw_client.get_subsystems(gw_subsystems); + if (ok) { + for (int i = 0; i < gw_subsystems.subsystems_size(); i++) { + const subsystem& sub = gw_subsystems.subsystems(i); + BeaconSubsystem bsub; + bsub.nqn = sub.nqn(); + for (int j = 0; j < sub.namespaces_size(); j++) { + const auto& ns = sub.namespaces(j); + BeaconNamespace bns = {ns.anagrpid(), ns.nonce()}; + bsub.namespaces.push_back(bns); + } + for (int k = 0; k < sub.listen_addresses_size(); k++) { + const auto& ls = sub.listen_addresses(k); + BeaconListener bls = { ls.adrfam(), ls.traddr(), ls.trsvcid() }; + bsub.listeners.push_back(bls); + } + subs.push_back(bsub); + } + } + + auto group_key = std::make_pair(pool, group); + NvmeGwClientState old_gw_state; + // if already got gateway state in the map + if (get_gw_state("old map", map, group_key, name, old_gw_state)) + gw_availability = ok ? gw_availability_t::GW_AVAILABLE : gw_availability_t::GW_UNAVAILABLE; + dout(10) << "sending beacon as gid " << monc.get_global_id() << " availability " << (int)gw_availability << + " osdmap_epoch " << osdmap_epoch << " gwmap_epoch " << gwmap_epoch << dendl; + auto m = ceph::make_message( + name, + pool, + group, + subs, + gw_availability, + osdmap_epoch, + gwmap_epoch); + monc.send_mon_message(std::move(m)); +} + +void NVMeofGwMonitorClient::disconnect_panic() +{ + auto disconnect_panic_duration = g_conf().get_val("nvmeof_mon_client_disconnect_panic").count(); + auto now = std::chrono::steady_clock::now(); + auto elapsed_seconds = std::chrono::duration_cast(now - last_map_time).count(); + if (elapsed_seconds > disconnect_panic_duration) { + dout(4) << "Triggering a panic upon disconnection from the monitor, elapsed " << elapsed_seconds << ", configured disconnect panic duration " << disconnect_panic_duration << dendl; + throw std::runtime_error("Lost connection to the monitor (beacon timeout)."); + } +} + +void NVMeofGwMonitorClient::tick() +{ + dout(10) << dendl; + + disconnect_panic(); + send_beacon(); + + timer.add_event_after( + g_conf().get_val("nvmeof_mon_client_tick_period").count(), + new LambdaContext([this](int r){ + tick(); + } + )); +} + +void NVMeofGwMonitorClient::shutdown() +{ + std::lock_guard l(lock); + + dout(4) << "nvmeof Shutting down" << dendl; + + + // stop sending beacon first, I use monc to talk with monitors + timer.shutdown(); + // client uses monc and objecter + client.shutdown(); + // Stop asio threads, so leftover events won't call into shut down + // monclient/objecter. + poolctx.finish(); + // stop monc + monc.shutdown(); + + // objecter is used by monc + objecter.shutdown(); + // client_messenger is used by all of them, so stop it in the end + client_messenger->shutdown(); +} + +void NVMeofGwMonitorClient::handle_nvmeof_gw_map(ceph::ref_t nmap) +{ + last_map_time = std::chrono::steady_clock::now(); // record time of last monitor message + + auto &new_map = nmap->get_map(); + gwmap_epoch = nmap->get_gwmap_epoch(); + auto group_key = std::make_pair(pool, group); + dout(10) << "handle nvmeof gw map: " << new_map << dendl; + + NvmeGwClientState old_gw_state; + auto got_old_gw_state = get_gw_state("old map", map, group_key, name, old_gw_state); + NvmeGwClientState new_gw_state; + auto got_new_gw_state = get_gw_state("new map", new_map, group_key, name, new_gw_state); + + // ensure that the gateway state has not vanished + ceph_assert(got_new_gw_state || !got_old_gw_state); + + if (!got_old_gw_state) { + if (!got_new_gw_state) { + dout(10) << "Can not find new gw state" << dendl; + return; + } + bool set_group_id = false; + while (!set_group_id) { + NVMeofGwMonitorGroupClient monitor_group_client( + grpc::CreateChannel(monitor_address, gw_creds())); + dout(10) << "GRPC set_group_id: " << new_gw_state.group_id << dendl; + set_group_id = monitor_group_client.set_group_id( new_gw_state.group_id); + if (!set_group_id) { + dout(10) << "GRPC set_group_id failed" << dendl; + auto retry_timeout = g_conf().get_val("mon_nvmeofgw_set_group_id_retry"); + usleep(retry_timeout); + } + } + } + + if (got_old_gw_state && got_new_gw_state) { + dout(10) << "got_old_gw_state: " << old_gw_state << "got_new_gw_state: " << new_gw_state << dendl; + // Make sure we do not get out of order state changes from the monitor + ceph_assert(new_gw_state.gw_map_epoch >= old_gw_state.gw_map_epoch); + + // If the monitor previously identified this gateway as accessible but now + // flags it as unavailable, it suggests that the gateway lost connection + // to the monitor. + if (old_gw_state.availability == gw_availability_t::GW_AVAILABLE && + new_gw_state.availability == gw_availability_t::GW_UNAVAILABLE) { + dout(4) << "Triggering a panic upon disconnection from the monitor, gw state - unavailable" << dendl; + throw std::runtime_error("Lost connection to the monitor (gw map unavailable)."); + } + } + + // Gather all state changes + ana_info ai; + epoch_t max_blocklist_epoch = 0; + for (const auto& nqn_state_pair: new_gw_state.subsystems) { + auto& sub = nqn_state_pair.second; + const auto& nqn = nqn_state_pair.first; + nqn_ana_states nas; + nas.set_nqn(nqn); + const auto& old_nqn_state_pair = old_gw_state.subsystems.find(nqn); + auto found_old_nqn_state = (old_nqn_state_pair != old_gw_state.subsystems.end()); + + // old and new ana group id ranges could be different + auto ana_state_size = (found_old_nqn_state) ? + std::max(old_nqn_state_pair->second.ana_state.size(), sub.ana_state.size()) : + sub.ana_state.size(); + + for (NvmeAnaGrpId ana_grp_index = 0; ana_grp_index < ana_state_size; ana_grp_index++) { + const auto initial_ana_state = std::make_pair(gw_exported_states_per_group_t::GW_EXPORTED_INACCESSIBLE_STATE, (epoch_t)0); + auto new_group_state = (ana_grp_index < sub.ana_state.size()) ? + sub.ana_state[ana_grp_index] : + initial_ana_state; + auto old_group_state = (got_old_gw_state && found_old_nqn_state && ana_grp_index < old_nqn_state_pair->second.ana_state.size()) ? + old_nqn_state_pair->second.ana_state[ana_grp_index] : + initial_ana_state; + + // if no state change detected for this nqn, group id + if (new_group_state.first == old_group_state.first) { + continue; + } + ana_group_state gs; + gs.set_grp_id(ana_grp_index + 1); // offset by 1, index 0 is ANAGRP1 + const auto& new_agroup_state = new_group_state.first; + const epoch_t& blocklist_epoch = new_group_state.second; + + if (new_agroup_state == gw_exported_states_per_group_t::GW_EXPORTED_OPTIMIZED_STATE && + blocklist_epoch != 0) { + if (blocklist_epoch > max_blocklist_epoch) max_blocklist_epoch = blocklist_epoch; + } + gs.set_state(new_agroup_state == gw_exported_states_per_group_t::GW_EXPORTED_OPTIMIZED_STATE ? OPTIMIZED : INACCESSIBLE); // Set the ANA state + nas.mutable_states()->Add(std::move(gs)); + dout(10) << " grpid " << (ana_grp_index + 1) << " state: " << new_gw_state << dendl; + } + if (nas.states_size()) ai.mutable_states()->Add(std::move(nas)); + } + + // if there is state change, notify the gateway + if (ai.states_size()) { + bool set_ana_state = false; + while (!set_ana_state) { + NVMeofGwClient gw_client( + grpc::CreateChannel(gateway_address, gw_creds())); + set_ana_state = gw_client.set_ana_state(ai); + if (!set_ana_state) { + dout(10) << "GRPC set_ana_state failed" << dendl; + usleep(1000); // TODO conf option + } + } + // Update latest accepted osdmap epoch, for beacons + if (max_blocklist_epoch > osdmap_epoch) { + osdmap_epoch = max_blocklist_epoch; + dout(10) << "Ready for blocklist osd map epoch: " << osdmap_epoch << dendl; + } + } + map = new_map; +} + +bool NVMeofGwMonitorClient::ms_dispatch2(const ref_t& m) +{ + std::lock_guard l(lock); + dout(10) << "got map type " << m->get_type() << dendl; + + if (m->get_type() == MSG_MNVMEOF_GW_MAP) { + handle_nvmeof_gw_map(ref_cast(m)); + } + bool handled = false; + return handled; +} + +int NVMeofGwMonitorClient::main(std::vector args) +{ + client_messenger->wait(); + + // Disable signal handlers + unregister_async_signal_handler(SIGHUP, sighup_handler); + shutdown_async_signal_handler(); + + return 0; +} diff --git a/src/nvmeof/NVMeofGwMonitorClient.h b/src/nvmeof/NVMeofGwMonitorClient.h new file mode 100644 index 000000000000..5bcca91eb4a0 --- /dev/null +++ b/src/nvmeof/NVMeofGwMonitorClient.h @@ -0,0 +1,97 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023,2024 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + + +#ifndef NVMEOFGWMONITORCLIENT_H_ +#define NVMEOFGWMONITORCLIENT_H_ + +#include "auth/Auth.h" +#include "common/async/context_pool.h" +#include "common/Finisher.h" +#include "common/Timer.h" +#include "common/LogClient.h" + +#include "client/Client.h" +#include "mon/MonClient.h" +#include "osdc/Objecter.h" +#include "messages/MNVMeofGwMap.h" + +#include +#include + +class NVMeofGwMonitorClient: public Dispatcher, + public md_config_obs_t { +private: + std::string name; + std::string pool; + std::string group; + std::string gateway_address; + std::string monitor_address; + std::string server_cert; + std::string client_key; + std::string client_cert; + grpc::SslCredentialsOptions + gw_ssl_opts; // gateway grpc ssl options + epoch_t osdmap_epoch; // last awaited osdmap_epoch + epoch_t gwmap_epoch; // last received gw map epoch + std::chrono::time_point + last_map_time; // used to panic on disconnect + + // init gw ssl opts + void init_gw_ssl_opts(); + + // returns gateway grpc credentials + std::shared_ptr gw_creds(); + +protected: + ceph::async::io_context_pool poolctx; + MonClient monc; + std::unique_ptr client_messenger; + Objecter objecter; + Client client; + std::map map; + ceph::mutex lock = ceph::make_mutex("NVMeofGw::lock"); + SafeTimer timer; + + int orig_argc; + const char **orig_argv; + + void send_config_beacon(); + void send_beacon(); + +public: + NVMeofGwMonitorClient(int argc, const char **argv); + ~NVMeofGwMonitorClient() override; + + // Dispatcher interface + bool ms_dispatch2(const ceph::ref_t& m) override; + bool ms_handle_reset(Connection *con) override { return false; } + void ms_handle_remote_reset(Connection *con) override {} + bool ms_handle_refused(Connection *con) override { return false; }; + + // config observer bits + const char** get_tracked_conf_keys() const override; + void handle_conf_change(const ConfigProxy& conf, + const std::set &changed) override {}; + + int init(); + void shutdown(); + int main(std::vector args); + void tick(); + void disconnect_panic(); + + void handle_nvmeof_gw_map(ceph::ref_t m); +}; + +#endif + diff --git a/src/nvmeof/NVMeofGwMonitorGroupClient.cc b/src/nvmeof/NVMeofGwMonitorGroupClient.cc new file mode 100644 index 000000000000..27ed7b134816 --- /dev/null +++ b/src/nvmeof/NVMeofGwMonitorGroupClient.cc @@ -0,0 +1,25 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "NVMeofGwMonitorGroupClient.h" + +bool NVMeofGwMonitorGroupClient::set_group_id(const uint32_t& id) { + group_id_req request; + request.set_id(id); + google::protobuf::Empty reply; + ClientContext context; + + Status status = stub_->group_id(&context, request, &reply); + + return status.ok(); +} diff --git a/src/nvmeof/NVMeofGwMonitorGroupClient.h b/src/nvmeof/NVMeofGwMonitorGroupClient.h new file mode 100644 index 000000000000..805e182c15c1 --- /dev/null +++ b/src/nvmeof/NVMeofGwMonitorGroupClient.h @@ -0,0 +1,39 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + + +#ifndef __NVMEOFGWMONITORGROUPCLIENT_H__ +#define __NVMEOFGWMONITORGROUPCLIENT_H__ +#include +#include +#include + +#include + +#include "monitor.grpc.pb.h" + +using grpc::Channel; +using grpc::ClientContext; +using grpc::Status; + +class NVMeofGwMonitorGroupClient { + public: + NVMeofGwMonitorGroupClient(std::shared_ptr channel) + : stub_(MonitorGroup::NewStub(channel)) {} + + bool set_group_id(const uint32_t& id); + + private: + std::unique_ptr stub_; +}; +#endif diff --git a/src/nvmeof/gateway b/src/nvmeof/gateway new file mode 160000 index 000000000000..322a86f7348a --- /dev/null +++ b/src/nvmeof/gateway @@ -0,0 +1 @@ +Subproject commit 322a86f7348af1bc173f01e6cc4b64e9a8075727 diff --git a/src/pybind/mgr/cephadm/services/nvmeof.py b/src/pybind/mgr/cephadm/services/nvmeof.py index ac258887f6a5..9f9ba94557b3 100644 --- a/src/pybind/mgr/cephadm/services/nvmeof.py +++ b/src/pybind/mgr/cephadm/services/nvmeof.py @@ -21,6 +21,9 @@ class NvmeofService(CephService): def config(self, spec: NvmeofServiceSpec) -> None: # type: ignore assert self.TYPE == spec.service_type assert spec.pool + self.pool = spec.pool + assert spec.group is not None + self.group = spec.group self.mgr._check_pool_exists(spec.pool, spec.service_name()) def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: @@ -77,8 +80,36 @@ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonD daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) daemon_spec.deps = [] + if not hasattr(self, 'gws'): + self.gws = {} # id -> name map of gateways for this service. + self.gws[nvmeof_gw_id] = name # add to map of service's gateway names return daemon_spec + def daemon_check_post(self, daemon_descrs: List[DaemonDescription]) -> None: + """ Overrides the daemon_check_post to add nvmeof gateways safely + """ + self.mgr.log.info(f"nvmeof daemon_check_post {daemon_descrs}") + # Assert configured + assert self.pool + assert self.group is not None + for dd in daemon_descrs: + self.mgr.log.info(f"nvmeof daemon_descr {dd}") + assert dd.daemon_id in self.gws + name = self.gws[dd.daemon_id] + self.mgr.log.info(f"nvmeof daemon name={name}") + # Notify monitor about this gateway creation + cmd = { + 'prefix': 'nvme-gw create', + 'id': name, + 'group': self.group, + 'pool': self.pool + } + self.mgr.log.info(f"create gateway: monitor command {cmd}") + _, _, err = self.mgr.mon_command(cmd) + if err: + self.mgr.log.error(f"Unable to send monitor command {cmd}, error {err}") + super().daemon_check_post(daemon_descrs) + def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None: def get_set_cmd_dicts(out: str) -> List[dict]: gateways = json.loads(out)['gateways'] @@ -151,10 +182,41 @@ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None if not ret: logger.info(f'{daemon.hostname} removed from nvmeof gateways dashboard config') - # and any certificates being used for mTLS + # Assert configured + assert self.pool + assert self.group is not None + assert daemon.daemon_id in self.gws + name = self.gws[daemon.daemon_id] + self.gws.pop(daemon.daemon_id) + # Notify monitor about this gateway deletion + cmd = { + 'prefix': 'nvme-gw delete', + 'id': name, + 'group': self.group, + 'pool': self.pool + } + self.mgr.log.info(f"delete gateway: monitor command {cmd}") + _, _, err = self.mgr.mon_command(cmd) + if err: + self.mgr.log.error(f"Unable to send monitor command {cmd}, error {err}") def purge(self, service_name: str) -> None: - """Removes configuration + """Make sure no zombie gateway is left behind """ - # TODO: what should we purge in this case (if any)? - pass + # Assert configured + assert self.pool + assert self.group is not None + for daemon_id in self.gws: + name = self.gws[daemon_id] + self.gws.pop(daemon_id) + # Notify monitor about this gateway deletion + cmd = { + 'prefix': 'nvme-gw delete', + 'id': name, + 'group': self.group, + 'pool': self.pool + } + self.mgr.log.info(f"purge delete gateway: monitor command {cmd}") + _, _, err = self.mgr.mon_command(cmd) + if err: + self.mgr.log.error(f"Unable to send monitor command {cmd}, error {err}") diff --git a/src/pybind/mgr/cephadm/templates/services/nvmeof/ceph-nvmeof.conf.j2 b/src/pybind/mgr/cephadm/templates/services/nvmeof/ceph-nvmeof.conf.j2 index 18786f95bbe8..644ca586ba93 100644 --- a/src/pybind/mgr/cephadm/templates/services/nvmeof/ceph-nvmeof.conf.j2 +++ b/src/pybind/mgr/cephadm/templates/services/nvmeof/ceph-nvmeof.conf.j2 @@ -1,7 +1,7 @@ # {{ cephadm_managed }} [gateway] name = {{ name }} -group = {{ spec.group if spec.group is not none else '' }} +group = {{ spec.group }} addr = {{ addr }} port = {{ spec.port }} enable_auth = {{ spec.enable_auth }} diff --git a/src/python-common/ceph/deployment/service_spec.py b/src/python-common/ceph/deployment/service_spec.py index 4b88cf804426..1664c4de74ec 100644 --- a/src/python-common/ceph/deployment/service_spec.py +++ b/src/python-common/ceph/deployment/service_spec.py @@ -1355,7 +1355,7 @@ def __init__(self, max_log_directory_backups: Optional[int] = 10, log_directory: Optional[str] = '/var/log/ceph/', monitor_timeout: Optional[float] = 1.0, - enable_monitor_client: bool = False, + enable_monitor_client: bool = True, placement: Optional[PlacementSpec] = None, unmanaged: bool = False, preview_only: bool = False, @@ -1381,7 +1381,7 @@ def __init__(self, #: ``name`` name of the nvmeof gateway self.name = name #: ``group`` name of the nvmeof gateway - self.group = group + self.group = group or '' #: ``enable_auth`` enables user authentication on nvmeof gateway self.enable_auth = enable_auth #: ``state_update_notify`` enables automatic update from OMAP in nvmeof gateway diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index 2e756eeb5838..6272b3b1ed67 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -1008,3 +1008,11 @@ add_ceph_unittest(unittest_weighted_shuffle) add_executable(unittest_intarith test_intarith.cc) add_ceph_unittest(unittest_intarith) #make check ends here + +# test_nvmeof_mon_encoding +add_executable(test_nvmeof_mon_encoding + test_nvmeof_mon_encoding.cc + ) +target_link_libraries(test_nvmeof_mon_encoding + mon ceph-common global-static + ) diff --git a/src/test/test_nvmeof_mon_encoding.cc b/src/test/test_nvmeof_mon_encoding.cc new file mode 100644 index 000000000000..8cd2381fa784 --- /dev/null +++ b/src/test/test_nvmeof_mon_encoding.cc @@ -0,0 +1,200 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2024 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include "common/ceph_argparse.h" +#include "common/debug.h" +#include "include/ceph_assert.h" +#include "global/global_init.h" +#include "mon/NVMeofGwMon.h" +#include "messages/MNVMeofGwMap.h" +#include "messages/MNVMeofGwBeacon.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define dout_prefix *_dout + +using namespace std; + +void test_NVMeofGwMap() { + dout(0) << __func__ << "\n\n" << dendl; + + NVMeofGwMap pending_map; + std::string pool = "pool1"; + std::string group = "grp1"; + auto group_key = std::make_pair(pool, group); + pending_map.cfg_add_gw("GW1" ,group_key); + pending_map.cfg_add_gw("GW2" ,group_key); + pending_map.cfg_add_gw("GW3" ,group_key); + NvmeNonceVector new_nonces = {"abc", "def","hij"}; + pending_map.created_gws[group_key]["GW1"].nonce_map[1] = new_nonces; + pending_map.created_gws[group_key]["GW1"].performed_full_startup = true; + int i = 0; + for (auto & blklst_itr : pending_map.created_gws[group_key]["GW1"].blocklist_data){ + blklst_itr.second.osd_epoch = 2*(i++); + blklst_itr.second.is_failover = false; + } + + pending_map.created_gws[group_key]["GW2"].nonce_map[2] = new_nonces; + dout(0) << " == Dump map before Encode : == " < map; + + std::string pool = "pool1"; + std::string group = "grp1"; + std::string gw_id = "GW1"; + NvmeGwClientState state(1, 32, gw_availability_t::GW_UNAVAILABLE); + std::string nqn = "nqn"; + ana_state_t ana_state; + NqnState nqn_state(nqn, ana_state); + state.subsystems.insert({nqn, nqn_state}); + + auto group_key = std::make_pair(pool, group); + map[group_key][gw_id] = state; + + + + ceph::buffer::list bl; + encode(map, bl); + dout(0) << "encode: " << map << dendl; + decode(map, bl); + dout(0) << "decode: " << map << dendl; + + BeaconSubsystem sub = { nqn, {}, {} }; + NVMeofGwMap pending_map; + pending_map.cfg_add_gw("GW1" ,group_key); + pending_map.cfg_add_gw("GW2" ,group_key); + pending_map.cfg_add_gw("GW3" ,group_key); + NvmeNonceVector new_nonces = {"abc", "def","hij"}; + pending_map.created_gws[group_key]["GW1"].nonce_map[1] = new_nonces; + pending_map.created_gws[group_key]["GW1"].subsystems.push_back(sub); + int i = 0; + for (auto & blklst_itr : pending_map.created_gws[group_key]["GW1"].blocklist_data){ + blklst_itr.second.osd_epoch = 2*(i++); + blklst_itr.second.is_failover = false; + } + + pending_map.created_gws[group_key]["GW2"].nonce_map[2] = new_nonces; + dout(0) << "False pending map: " << pending_map << dendl; + + auto msg = make_message(pending_map); + msg->encode_payload(0); + msg->decode_payload(); + dout(0) << "decode msg: " << *msg << dendl; + + dout(0) << "\n == Test GW Delete ==" << dendl; + pending_map.cfg_delete_gw("GW1" ,group_key); + dout(0) << "deleted GW1 " << pending_map << dendl; + + pending_map.cfg_delete_gw("GW1" ,group_key); + dout(0) << "duplicated delete of GW1 " << pending_map << dendl; + + pending_map.cfg_delete_gw("GW2" ,group_key); + dout(0) << "deleted GW2 " << pending_map << dendl; + + dout(0) << "delete of wrong gw id" << dendl; + pending_map.cfg_delete_gw("wow" ,group_key); + + pending_map.cfg_delete_gw("GW3" ,group_key); + dout(0) << "deleted GW3 . we should see the empty map " << pending_map << dendl; + + +} + +void test_MNVMeofGwBeacon() { + std::string gw_id = "GW"; + std::string gw_pool = "pool"; + std::string gw_group = "group"; + gw_availability_t availability = gw_availability_t::GW_AVAILABLE; + std::string nqn = "nqn"; + BeaconSubsystem sub = { nqn, {}, {} }; + BeaconSubsystems subs = { sub }; + epoch_t osd_epoch = 17; + epoch_t gwmap_epoch = 42; + + auto msg = make_message( + gw_id, + gw_pool, + gw_group, + subs, + availability, + osd_epoch, + gwmap_epoch); + msg->encode_payload(0); + msg->decode_payload(); + dout(0) << "decode msg: " << *msg << dendl; + ceph_assert(msg->get_gw_id() == gw_id); + ceph_assert(msg->get_gw_pool() == gw_pool); + ceph_assert(msg->get_gw_group() == gw_group); + ceph_assert(msg->get_availability() == availability); + ceph_assert(msg->get_last_osd_epoch() == osd_epoch); + ceph_assert(msg->get_last_gwmap_epoch() == gwmap_epoch); + const auto& dsubs = msg->get_subsystems(); + auto it = std::find_if(dsubs.begin(), dsubs.end(), + [&nqn](const auto& element) { + return element.nqn == nqn; + }); + ceph_assert(it != dsubs.end()); +} + +void test_NVMeofGwTimers() +{ + NVMeofGwMap pending_map; + //pending_map.Gmetadata; + const NvmeGroupKey group_key = std::make_pair("a","b"); + std::string gwid = "GW1"; + NvmeAnaGrpId grpid = 2; + pending_map.start_timer(gwid, group_key, grpid, 30); + auto end_time = pending_map.fsm_timers[group_key][gwid].data[grpid].end_time; + uint64_t millisecondsSinceEpoch = std::chrono::duration_cast(end_time.time_since_epoch()).count(); + dout(0) << "Metadata milliseconds " << millisecondsSinceEpoch << " " << (int)pending_map.fsm_timers[group_key][gwid].data[grpid].timer_value << dendl; + ceph::buffer::list bl; + pending_map.encode(bl); + auto p = bl.cbegin(); + pending_map.decode(p); + + end_time = pending_map.fsm_timers[group_key][gwid].data[2].end_time; + millisecondsSinceEpoch = std::chrono::duration_cast(end_time.time_since_epoch()).count(); + dout(0) << "After encode decode Metadata milliseconds " << millisecondsSinceEpoch << " " << (int)pending_map.fsm_timers[group_key][gwid].data[grpid].timer_value< +using namespace std; +#include "include/ceph_features.h" + +#define TYPE(t) +#define TYPE_STRAYDATA(t) +#define TYPE_NONDETERMINISTIC(t) +#define TYPE_FEATUREFUL(t) +#define TYPE_FEATUREFUL_STRAYDATA(t) +#define TYPE_FEATUREFUL_NONDETERMINISTIC(t) +#define TYPE_FEATUREFUL_NOCOPY(t) +#define TYPE_NOCOPY(t) +#define MESSAGE(t) +#include "nvmeof_types.h" +#undef TYPE +#undef TYPE_STRAYDATA +#undef TYPE_NONDETERMINISTIC +#undef TYPE_NOCOPY +#undef TYPE_FEATUREFUL +#undef TYPE_FEATUREFUL_STRAYDATA +#undef TYPE_FEATUREFUL_NONDETERMINISTIC +#undef TYPE_FEATUREFUL_NOCOPY +#undef MESSAGE + +#include "denc_plugin.h" + +DENC_API void register_dencoders(DencoderPlugin* plugin) +{ +#include "nvmeof_types.h" +} + +DENC_API void unregister_dencoders(DencoderPlugin* plugin) +{ + plugin->unregister_dencoders(); +} diff --git a/src/tools/ceph-dencoder/nvmeof_types.h b/src/tools/ceph-dencoder/nvmeof_types.h new file mode 100644 index 000000000000..96cff7353b63 --- /dev/null +++ b/src/tools/ceph-dencoder/nvmeof_types.h @@ -0,0 +1,174 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2024 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_NVMEOF_TYPES_H +#define CEPH_NVMEOF_TYPES_H + +#ifdef WITH_NVMEOF_GATEWAY_MONITOR_CLIENT +#include "mon/NVMeofGwMon.h" +#include "messages/MNVMeofGwMap.h" +#include "messages/MNVMeofGwBeacon.h" +TYPE(NVMeofGwMap) +// Implement the dencoder interface +class NVMeofGwMapDencoder { + private: + NVMeofGwMap m; + public: + NVMeofGwMapDencoder() = default; + explicit NVMeofGwMapDencoder(const NVMeofGwMap& m) : m(m) {} + + void encode(bufferlist& bl) const { + using ceph::encode; + encode(t, bl); + } + void decode(bufferlist::const_iterator &p) { + using ceph::decode; + decode(t, p); + } + void dump(Formatter* f) { + f->dump_stream("NVMeofGwMap") << m; + } + + static void generate_test_instances(std::list& ls) { + std::string pool = "pool1"; + std::string group = "grp1"; + auto group_key = std::make_pair(pool, group); + m.cfg_add_gw("GW1" ,group_key); + m.cfg_add_gw("GW2" ,group_key); + m.cfg_add_gw("GW3" ,group_key); + NvmeNonceVector new_nonces = {"abc", "def","hij"}; + m.created_gws[group_key]["GW1"].nonce_map[1] = new_nonces; + m.created_gws[group_key]["GW1"].performed_full_startup = true; + for(int i=0; i< MAX_SUPPORTED_ANA_GROUPS; i++){ + m.created_gws[group_key]["GW1"].blocklist_data[i].osd_epoch = i*2; + m.created_gws[group_key]["GW1"].blocklist_data[i].is_failover = false; + } + + m.created_gws[group_key]["GW2"].nonce_map[2] = new_nonces; + + ls.push_back(new NVMeofGwMapDencoder(m)); + + } +}; +WRITE_CLASS_ENCODER(NVMeofGwMapDencoder) + +TYPE(MNVMeofGwMap) +// Implement the dencoder interface +class MNVMeofGwMapDencoder { + private: + MNVMeofGwMap m; + public: + MNVMeofGwMapDencoder() = default; + explicit MNVMeofGwMapDencoder(const MNVMeofGwMap& m) : m(m) {} + + void encode(bufferlist& bl) const { + using ceph::encode; + encode(t, bl); + } + void decode(bufferlist::const_iterator &p) { + using ceph::decode; + decode(t, p); + } + void dump(Formatter* f) { + f->dump_stream("MNVMeofGwMap") << m; + } + + static void generate_test_instances(std::list& ls) { + std::map map; + std::string pool = "pool1"; + std::string group = "grp1"; + std::string gw_id = "GW1"; + NvmeGwClientState state(1, 32, gw_availability_t::GW_UNAVAILABLE); + std::string nqn = "nqn"; + ANA_STATE ana_state; + NqnState nqn_state(nqn, ana_state); + state.subsystems.insert({nqn, nqn_state}); + + auto group_key = std::make_pair(pool, group); + map[group_key][gw_id] = state; + BeaconSubsystem sub = { nqn, {}, {} }; + NVMeofGwMap pending_map; + pending_map.cfg_add_gw("GW1" ,group_key); + pending_map.cfg_add_gw("GW2" ,group_key); + pending_map.cfg_add_gw("GW3" ,group_key); + NvmeNonceVector new_nonces = {"abc", "def","hij"}; + pending_map.created_gws[group_key]["GW1"].nonce_map[1] = new_nonces; + pending_map.created_gws[group_key]["GW1"].subsystems.push_back(sub); + for(int i=0; i< MAX_SUPPORTED_ANA_GROUPS; i++){ + pending_map.created_gws[group_key]["GW1"].blocklist_data[i].osd_epoch = i*2; + pending_map.created_gws[group_key]["GW1"].blocklist_data[i].is_failover = false; + } + + pending_map.created_gws[group_key]["GW2"].nonce_map[2] = new_nonces; + pending_map.start_timer(gw_id, group_key, group, 30); + + m = MNVMeofGwMap(pending_map); + ls.push_back(new MNVMeofGwMapDencoder(m)); + + } +}; +WRITE_CLASS_ENCODER(MNVMeofGwMapDencoder) + +TYPE(MNVMeofGwBeacon) +// Implement the dencoder interface +class MNVMeofGwBeaconDencoder { + private: + MNVMeofGwBeacon m; + public: + MNVMeofGwBeaconDencoder() = default; + explicit MNVMeofGwBeaconDencoder(const MNVMeofGwBeacon& m) : m(m) {} + + void encode(bufferlist& bl) const { + using ceph::encode; + encode(t, bl); + } + void decode(bufferlist::const_iterator &p) { + using ceph::decode; + decode(t, p); + } + void dump(Formatter* f) { + f->dump_stream("MNVMeofGwBeacon") << m; + } + + static void generate_test_instances(std::list& ls) { + std::string gw_id = "GW"; + std::string gw_pool = "pool"; + std::string gw_group = "group"; + gw_availability_t availability = gw_availability_t::GW_AVAILABLE; + std::string nqn = "nqn"; + BeaconSubsystem sub = { nqn, {}, {} }; + std::string nqn = "nqn"; + BeaconSubsystem sub = { nqn, {}, {} }; + BeaconSubsystems subs = { sub }; + epoch_t osd_epoch = 17; + epoch_t gwmap_epoch = 42; + m = MNVMeofGwBeacon( + gw_id, + gw_pool, + gw_group, + subs, + availability, + osd_epoch, + gwmap_epoch); + + ls.push_back(new MNVMeofGwBeaconDencoder(m)); + + } +}; +WRITE_CLASS_ENCODER(MNVMeofGwBeaconDencoder) + + +#endif // WITH_NVMEOF_GATEWAY_MONITOR_CLIENT + +#endif // CEPH_NVMEOF_TYPES_H