diff --git a/libmmux/src/mmux/storage/block.h b/libmmux/src/mmux/storage/block.h index 017056e3a..8dbb05f81 100644 --- a/libmmux/src/mmux/storage/block.h +++ b/libmmux/src/mmux/storage/block.h @@ -17,21 +17,42 @@ namespace mmux { namespace storage { +/* + * Block operation type + * Mutator can be read and written + * Accessor can only be read + */ + enum block_op_type : uint8_t { accessor = 0, mutator = 1 }; +/* + * Block state + * Regular, importing and exporting + */ + enum block_state { regular = 0, importing = 1, exporting = 2 }; +/* + * Block operation structure + */ + struct block_op { block_op_type type; char name[MAX_BLOCK_OP_NAME_SIZE]; + /** + * @brief Operator < to check if name is smaller in Lexicographical order + * @param other Other block operation + * @return Bool value + */ + bool operator<(const block_op &other) const { return std::strcmp(name, other.name) < 0; } @@ -39,10 +60,18 @@ struct block_op { // TODO: Setting metadata should be atomic: e.g., reset function, or setup block function, should acquire lock before // setting metadata +/* Block class */ class block { public: + /* Slot max range */ static const int32_t SLOT_MAX = 65536; + /** + * @brief Constructor + * @param block_ops Block operations + * @param block_name Block name + */ + explicit block(const std::vector &block_ops, std::string block_name) : block_ops_(block_ops), path_(""), @@ -53,89 +82,187 @@ class block { export_slot_range_(0, -1), import_slot_range_(0, -1) {} + /** + * @brief Virtual function for running a command on the block + * @param _return Return value + * @param oid Operation id + * @param args arguments + */ + virtual void run_command(std::vector &_return, int32_t oid, const std::vector &args) = 0; + /** + * @brief Set block path + * @param path Block path + */ + void path(const std::string &path) { std::unique_lock lock(metadata_mtx_); path_ = path; } + /** + * @brief Fetch block path + * @return Block path + */ + const std::string &path() const { std::shared_lock lock(metadata_mtx_); return path_; } + /** + * @brief Fetch block name + * @return Block name + */ + const std::string &name() const { return block_name_; // Does not require locking since block_name does not change } + /** + * @brief Set block hash slot range + * @param slot_begin Slot begin + * @param slot_end Slot end + */ + void slot_range(int32_t slot_begin, int32_t slot_end) { std::unique_lock lock(metadata_mtx_); slot_range_.first = slot_begin; slot_range_.second = slot_end; } + /** + * @brief Fetch slot_range + * @return Block slot range + */ + const std::pair &slot_range() const { std::shared_lock lock(metadata_mtx_); return slot_range_; } + /** + * @brief Fetch slot begin + * @return Slot begin + */ + int32_t slot_begin() const { std::shared_lock lock(metadata_mtx_); return slot_range_.first; } + /** + * @brief Fetch slot end + * @return Slot end + */ + int32_t slot_end() const { std::shared_lock lock(metadata_mtx_); return slot_range_.second; } + /** + * @brief Check if slot is within the slot range + * @param slot Slot + * @return Bool value, true if slot is within the range + */ + bool in_slot_range(int32_t slot) { std::shared_lock lock(metadata_mtx_); return slot >= slot_range_.first && slot <= slot_range_.second; } + /** + * @brief Set block state + * @param state State, orginal, importing or exporting + */ + void state(block_state state) { std::unique_lock lock(metadata_mtx_); state_ = state; } + /** + * @brief Fetch block state + * @return Block state + */ + const block_state &state() const { std::shared_lock lock(metadata_mtx_); return state_; } + /** + * @brief Set export slot range + * @param slot_begin Slot begin + * @param slot_end Slot end + */ + void export_slot_range(int32_t slot_begin, int32_t slot_end) { std::unique_lock lock(metadata_mtx_); export_slot_range_.first = slot_begin; export_slot_range_.second = slot_end; } + /** + * @brief Fetch export slot range + * @return Export slot range + */ + const std::pair &export_slot_range() { std::shared_lock lock(metadata_mtx_); return export_slot_range_; }; + /** + * @brief Check if slot is within export slot range + * @param slot Slot + * @return Bool value, true if slot is within the range + */ + bool in_export_slot_range(int32_t slot) { std::shared_lock lock(metadata_mtx_); return slot >= export_slot_range_.first && slot <= export_slot_range_.second; } + /** + * @brief Set import slot range + * @param slot_begin Slot begin + * @param slot_end Slot end + */ + void import_slot_range(int32_t slot_begin, int32_t slot_end) { std::unique_lock lock(metadata_mtx_); import_slot_range_.first = slot_begin; import_slot_range_.second = slot_end; } + /** + * @brief Fetch import slot range + * @return Import slot range + */ + const std::pair &import_slot_range() { std::shared_lock lock(metadata_mtx_); return import_slot_range_; }; + /** + * @brief Check if slot is within import slot range + * @param slot Slot + * @return Bool value, true if slot is within the range + */ + bool in_import_slot_range(int32_t slot) { return slot >= import_slot_range_.first && slot <= import_slot_range_.second; } + /** + * @brief TODO + * @param target + */ + void export_target(const std::vector &target) { std::unique_lock lock(metadata_mtx_); export_target_ = target; @@ -146,29 +273,60 @@ class block { export_target_str_.pop_back(); } + /** + * @brief Fetch export target + * @return Export target + */ + const std::vector &export_target() const { std::shared_lock lock(metadata_mtx_); return export_target_; } + /** + * @brief Fetch export target string + * @return Export target string + */ + const std::string export_target_str() const { std::shared_lock lock(metadata_mtx_); return export_target_str_; } + /** + * @brief Check if ith block operation type is accessor + * @param i Block operation id + * @return Bool value, true if is accessor + */ + bool is_accessor(int i) const { return block_ops_.at(static_cast(i)).type == accessor; // Does not require lock since block_ops don't change } + /** + * @brief Check if ith block operation type is mutator + * @param i Block operation id + * @return Bool value, true if is mutator + */ + bool is_mutator(int i) const { return block_ops_.at(static_cast(i)).type == mutator; // Does not require lock since block_ops don't change } + /** + * @brief Fetch operation name + * @param op_id Operation id + * @return Operation name + */ + std::string op_name(int op_id) { return block_ops_[op_id].name; // Does not require lock since block_ops don't change } - /** Management Operations **/ + /** + * Management Operations + * Virtual function + **/ virtual void load(const std::string &path) = 0; virtual bool sync(const std::string &path) = 0; @@ -183,34 +341,57 @@ class block { virtual void export_slots() = 0; + /** + * @brief Fetch subscription map + * @return Subscription map + */ + subscription_map &subscriptions() { std::shared_lock lock(metadata_mtx_); return sub_map_; } + /** + * @brief Fetch block_response_client_map + * @return Block_response_client_map + */ + block_response_client_map &clients() { std::shared_lock lock(metadata_mtx_); return client_map_; } protected: + /* Metadata mutex */ mutable std::shared_mutex metadata_mtx_; - + /* Block operations + * Block operation type can be accessor or mutator + * Accessor can only be read only + * Mutator can be read and write + * Each operation has it name + * */ const std::vector &block_ops_; + /* Block file path */ std::string path_; + /* Block name */ std::string block_name_; - + /* Block state, regular, importing or exporting */ block_state state_; + /* Hash slot range */ std::pair slot_range_; + /* Bool value for auto scaling */ std::atomic_bool auto_scale_; - + /* TODO */ std::pair export_slot_range_; + /* TODO */ std::vector export_target_; + /* TODO */ std::string export_target_str_; - + /* TODO */ std::pair import_slot_range_; - + /* Subscription map */ subscription_map sub_map_{}; + /* Block response client map */ block_response_client_map client_map_{}; }; diff --git a/libmmux/src/mmux/storage/chain_module.h b/libmmux/src/mmux/storage/chain_module.h index 45ffd1ac4..ccb5fdef7 100644 --- a/libmmux/src/mmux/storage/chain_module.h +++ b/libmmux/src/mmux/storage/chain_module.h @@ -21,6 +21,14 @@ namespace mmux { namespace storage { +/* + * Chain roles + * We mark out the special rule singleton if there is only one block for the chain + * i.e. we don't use chain replication in this case + * The head would deal with update request and the tail would deal with query request + * and also generate the respond + */ + enum chain_role { singleton = 0, head = 1, @@ -28,24 +36,48 @@ enum chain_role { tail = 3 }; +/* + * Chain operation + */ + struct chain_op { + /* Operation sequence id */ sequence_id seq; + /* Operation id */ int32_t op_id; + /* Operation arguments */ std::vector args; }; +/* Next block connection */ class next_block_cxn { public: next_block_cxn() = default; + /** + * @brief Constructor + * @param block_name Next block name + */ + explicit next_block_cxn(const std::string &block_name) { reset(block_name); } + /** + * @brief Destructor + */ + ~next_block_cxn() { client_.disconnect(); } + /** + * @brief Reset block + * Disconnect the current chain request client and connect to the new block + * @param block_name Block name + * @return Client protocol + */ + std::shared_ptr reset(const std::string &block_name) { std::unique_lock lock(mtx_); client_.disconnect(); @@ -59,6 +91,13 @@ class next_block_cxn { return client_.protocol(); } + /** + * @brief Deal with a request, either query or update + * @param seq Request sequence_id + * @param op_id Operation id + * @param args Arguments + */ + void request(const sequence_id &seq, int32_t op_id, const std::vector &args) { @@ -66,54 +105,105 @@ class next_block_cxn { client_.request(seq, op_id, args); } + /** + * @brief Run command on next block + * @param result Running result + * @param cmd_id Command id + * @param args Arguments + */ + void run_command(std::vector &result, int32_t cmd_id, const std::vector &args) { std::unique_lock lock(mtx_); client_.run_command(result, cmd_id, args); } private: + /* Class next_block_cxn operation mutex */ std::shared_mutex mtx_; + /* Chain request client */ chain_request_client client_; }; - +/* Previous block connection */ class prev_block_cxn { public: prev_block_cxn() = default; + /** + * @brief Constructor + * @param prot Protocol + */ + explicit prev_block_cxn(std::shared_ptr<::apache::thrift::protocol::TProtocol> prot) { reset(std::move(prot)); } + /** + * @brief Reset protocol + * @param prot Protocol + */ + void reset(std::shared_ptr<::apache::thrift::protocol::TProtocol> prot) { client_.reset_prot(std::move(prot)); } + /** + * @brief Chairn acknowledgement + * @param seq Operation sequence id + */ + void ack(const sequence_id &seq) { client_.ack(seq); } + /** + * @brief Check if chain response client is set + * @return Bool value, true if set + */ + bool is_set() const { return client_.is_set(); } private: + /* Charin response client */ chain_response_client client_; }; +/* Chain module class + * Inherited from block */ class chain_module : public block { public: + /* Class chain response handler + * Inherited from chain_response_serviceIf */ class chain_response_handler : public chain_response_serviceIf { public: + /** + * @brief Constructor + * @param module Chain module + */ + explicit chain_response_handler(chain_module *module) : module_(module) {} + /** + * @brief Chain acknowledgement + * @param seq Operation sequence id + */ + void chain_ack(const sequence_id &seq) override { module_->ack(seq); } private: + /* Chain module */ chain_module *module_; }; + /** + * @brief Constructor + * @param block_name Block name + * @param block_ops Block operations + */ + chain_module(const std::string &block_name, const std::vector &block_ops) : block(block_ops, block_name), @@ -121,12 +211,27 @@ class chain_module : public block { prev_(std::make_unique()), pending_(0) {} + /** + * @brief Destructor + */ + ~chain_module() { next_->reset("nil"); if (response_processor_.joinable()) response_processor_.join(); } + /** + * @brief Setup a chain module and start the processor thread + * @param path Block path + * @param slot_begin Hash slot begin + * @param slot_end Hash slot end + * @param chain Replication chain block names + * @param auto_scale Auto scaling boolean + * @param role Chain module role + * @param next_block_name Next block name + */ + void setup(const std::string &path, int32_t slot_begin, int32_t slot_end, @@ -161,6 +266,13 @@ class chain_module : public block { } } + /** + * @brief Set block to be exporting + * @param target_block Export target block + * @param slot_begin Slot begin + * @param slot_end Slot end + */ + void set_exporting(const std::vector &target_block, int32_t slot_begin, int32_t slot_end) { std::unique_lock lock(metadata_mtx_); state_ = block_state::exporting; @@ -174,6 +286,12 @@ class chain_module : public block { export_slot_range_.second = slot_end; } + /** + * @brief Set block to be importing + * @param slot_begin Slot begin + * @param slot_end Slot end + */ + void set_importing(int32_t slot_begin, int32_t slot_end) { std::unique_lock lock(metadata_mtx_); state_ = block_state::importing; @@ -181,6 +299,12 @@ class chain_module : public block { import_slot_range_.second = slot_end; } + /** + * @brief Set block back to regular + * @param slot_begin Slot begin + * @param slot_end Slot end + */ + void set_regular(int32_t slot_begin, int32_t slot_end) { std::unique_lock lock(metadata_mtx_); state_ = block_state::regular; @@ -192,76 +316,186 @@ class chain_module : public block { import_slot_range_.second = -1; } + /** + * @brief Set chain module role + * @param role Role + */ + void role(chain_role role) { std::unique_lock lock(metadata_mtx_); role_ = role; } + /** + * @brief Fetch chain module role + * @return Role + */ + chain_role role() const { std::shared_lock lock(metadata_mtx_); return role_; } + /** + * @brief Set replication chain block names + * @param chain Chain block names + */ + void chain(const std::vector &chain) { std::unique_lock lock(metadata_mtx_); chain_ = chain; } + /** + * @brief Fetch replication chain block names + * @return Replication chain block names + */ + const std::vector &chain() { std::shared_lock lock(metadata_mtx_); return chain_; } + /** + * @brief Check if chain module role is head + * @return Bool value, true if chain role is head or singleton + * In implementation, we regard singleton as a chain module that consists both + * head and tail roles + */ + bool is_head() const { std::shared_lock lock(metadata_mtx_); return role() == chain_role::head || role() == chain_role::singleton; } + /** + * @brief Check if chain module role is tail + * @return Bool value, true if chain role is tail or singleton + * In implementation, we regard singleton as a chain module that consists both + * head and tail roles + */ + bool is_tail() const { std::shared_lock lock(metadata_mtx_); return role() == chain_role::tail || role() == chain_role::singleton; } + /** + * @brief Check if previous chain module is set + * @return Bool value, true if set + */ + bool is_set_prev() const { return prev_->is_set(); // Does not require a lock since only one thread calls this at a time } + /** + * @brief Reset previous block + * @param prot Protocol + */ + void reset_prev(const std::shared_ptr<::apache::thrift::protocol::TProtocol> &prot) { prev_->reset(prot); // Does not require a lock since only one thread calls this at a time } + /** + * @brief Reset next block + * @param next_block Next block + */ + void reset_next(const std::string &next_block); + /** + * @brief Reset next block and start up response processor thread + * @param next_block Next block + */ + void reset_next_and_listen(const std::string &next_block); + /** + * @brief Run command on next block + * @param result Command result + * @param oid Operation id + * @param args Arguments + */ + void run_command_on_next(std::vector &result, int32_t oid, const std::vector &args) { next_->run_command(result, oid, args); } + /** + * @brief Add request to pending + * @param seq Request sequence id number + * @param op_id Operation id + * @param args Argumentss + */ + void add_pending(const sequence_id &seq, int op_id, const std::vector &args) { pending_.insert(seq.server_seq_no, chain_op{seq, op_id, args}); } + /** + * @brief Remove a pending request + * @param seq Sequence id + */ + void remove_pending(const sequence_id &seq) { pending_.erase(seq.server_seq_no); } + /** + * @brief Resend the pending request + */ + void resend_pending(); + /** + * @brief Virtual function for forwarding all + */ + virtual void forward_all() = 0; + /** + * @brief Request + * @param seq + * @param oid + * @param args + */ + void request(sequence_id seq, int32_t oid, const std::vector &args); + + /** + * @brief + * @param seq + * @param oid + * @param args + */ + void chain_request(const sequence_id &seq, int32_t oid, const std::vector &args); + + /** + * @brief + * @param seq + */ + void ack(const sequence_id &seq); protected: + /* Request mutex */ std::mutex request_mtx_; + /* Role of chain module */ chain_role role_{singleton}; + /* Chain sequence number */ int64_t chain_seq_no_{0}; + /* Next block connection */ std::unique_ptr next_{nullptr}; + /* Previous block connection */ std::unique_ptr prev_{nullptr}; + /* Replication chain block names */ std::vector chain_; + /* Response processor thread */ std::thread response_processor_; + /* Pending operations */ cuckoohash_map pending_; }; diff --git a/libmmux/src/mmux/storage/client/block_client.h b/libmmux/src/mmux/storage/client/block_client.h index 3d390426a..0ba593150 100644 --- a/libmmux/src/mmux/storage/client/block_client.h +++ b/libmmux/src/mmux/storage/client/block_client.h @@ -9,19 +9,33 @@ namespace mmux { namespace storage { - +/* Block client class */ class block_client { public: + /* Command response reader class */ class command_response_reader { public: command_response_reader() = default; + /** + * @brief Constructor + * @param prot Thrift protocol + */ + explicit command_response_reader(std::shared_ptr prot); + /** + * @brief Response receiver + * @param out Response result + * @return Client sequence number + */ + int64_t recv_response(std::vector &out); private: + /* Thrift protocol */ std::shared_ptr prot_; + /* Thrift protocol */ apache::thrift::protocol::TProtocol *iprot_{}; }; @@ -29,19 +43,67 @@ class block_client { typedef utils::client_cache client_cache; block_client() = default; + + /** + * @brief Destructor + */ + ~block_client(); + + /** + * @brief Fetch client id + * @return Client id number + */ + int64_t get_client_id(); + + /** + * @brief Connect host + * @param hostname Host name + * @param port Port number + * @param block_id Block id + * @param timeout_ms Timeout + */ + void connect(const std::string &hostname, int port, int block_id, int timeout_ms = 1000); + + /** + * @brief Disconnect host + */ + void disconnect(); + + /** + * @brief Check if connection is opened + * @return Bool value + */ + bool is_connected() const; + /** + * @brief Register client id with block id and return command_response reader + * @param client_id Client id number + * @return Command response reader + */ + command_response_reader get_command_response_reader(int64_t client_id); + + /** + * @brief Request command + * @param seq Sequence id number + * @param cmd_id Command id number + * @param args Arguments + */ void command_request(const sequence_id &seq, int32_t cmd_id, const std::vector &args); private: + /* Transport */ std::shared_ptr transport_{}; + /* Protocol */ std::shared_ptr protocol_{}; + /* Block request client */ std::shared_ptr client_{}; + /* Block id */ int block_id_{-1}; }; diff --git a/libmmux/src/mmux/storage/client/block_listener.h b/libmmux/src/mmux/storage/client/block_listener.h index a68028f3d..051853b12 100644 --- a/libmmux/src/mmux/storage/client/block_listener.h +++ b/libmmux/src/mmux/storage/client/block_listener.h @@ -14,7 +14,7 @@ namespace mmux { namespace storage { - +/* Block listener class */ class block_listener { public: typedef notification_serviceClient thrift_client; @@ -22,24 +22,72 @@ class block_listener { typedef std::pair notification_t; typedef blocking_queue mailbox_t; + /** + * @brief Destructor + */ + ~block_listener(); + + /** + * @brief Constuctor + * @param host Host + * @param port Port number + * @param notifications Notifications + * @param controls Controls + */ + block_listener(const std::string &host, int port, mailbox_t ¬ifications, mailbox_t &controls); + /** + * @brief Connect host + * @param host Host + * @param port Port number + */ + void connect(const std::string &host, int port); + + /** + * @brief Disconnect host + */ + void disconnect(); + /** + * @brief Fetch protocol + * @return Protocol + */ + std::shared_ptr protocol(); + /** + * @brief Subscribe for block on operation type + * @param block_id Block id + * @param ops Operation type + */ + void subscribe(int32_t block_id, const std::vector &ops); + + /** + * @brief Unsubscribe for block on operation type + * @param block_id Block id + * @param ops Operation type + */ + void unsubscribe(int32_t block_id, const std::vector &ops); private: + /* Notifications */ mailbox_t ¬ifications_; + /* TODO */ mailbox_t &controls_; + /* Socket */ std::shared_ptr socket_{}; + /* Transport */ std::shared_ptr transport_{}; + /* Protocol */ std::shared_ptr protocol_{}; + /* Notification service client */ std::shared_ptr client_{}; }; diff --git a/libmmux/src/mmux/storage/client/kv_client.cpp b/libmmux/src/mmux/storage/client/kv_client.cpp index f239ef78b..ef6e1f554 100644 --- a/libmmux/src/mmux/storage/client/kv_client.cpp +++ b/libmmux/src/mmux/storage/client/kv_client.cpp @@ -275,7 +275,10 @@ kv_client::locked_client::locked_client(kv_client &parent) : parent_(parent) { } if (new_block) { redirect_blocks_[i] = - std::make_shared(parent_.fs_, parent_.path_, blocks_[i]->chain(), parent_.timeout_ms_); + std::make_shared(parent_.fs_, + parent_.path_, + blocks_[i]->chain(), + parent_.timeout_ms_); locked_redirect_blocks_[i] = redirect_blocks_[i]->lock(); new_blocks_[i] = locked_redirect_blocks_[i]; } diff --git a/libmmux/src/mmux/storage/client/kv_client.h b/libmmux/src/mmux/storage/client/kv_client.h index 7fd7a41b2..4b9b4880a 100644 --- a/libmmux/src/mmux/storage/client/kv_client.h +++ b/libmmux/src/mmux/storage/client/kv_client.h @@ -7,15 +7,16 @@ namespace mmux { namespace storage { - +/* TODO */ class redo_error : public std::exception { public: redo_error() {} }; +/* Key value storage client */ class kv_client { public: - + /* Locked client class */ class locked_client { public: typedef replica_chain_client::locked_client locked_block_t; diff --git a/libmmux/src/mmux/storage/client/kv_listener.h b/libmmux/src/mmux/storage/client/kv_listener.h index 88b47ac76..089342b7e 100644 --- a/libmmux/src/mmux/storage/client/kv_listener.h +++ b/libmmux/src/mmux/storage/client/kv_listener.h @@ -7,28 +7,64 @@ namespace mmux { namespace storage { - +/* */ class kv_listener { public: typedef std::pair notification_t; typedef blocking_queue mailbox_t; + /** + * @brief + * @param path + * @param status + */ + kv_listener(const std::string &path, const directory::data_status &status); + + /** + * @brief + */ + ~kv_listener(); + /** + * @brief + * @param ops + */ + void subscribe(const std::vector &ops); + + /** + * @brief + * @param ops + */ + void unsubscribe(const std::vector &ops); + + /** + * @brief + * @param timeout_ms + * @return + */ + notification_t get_notification(int64_t timeout_ms = -1); private: + /* */ mailbox_t notifications_; + /* */ mailbox_t controls_; + /* */ std::string path_; + /* */ directory::data_status status_; + /* */ notification_worker worker_; + /* */ std::vector> listeners_; + /* */ std::vector block_ids_; }; diff --git a/libmmux/src/mmux/storage/client/replica_chain_client.h b/libmmux/src/mmux/storage/client/replica_chain_client.h index 30c8227c3..7b9b37934 100644 --- a/libmmux/src/mmux/storage/client/replica_chain_client.h +++ b/libmmux/src/mmux/storage/client/replica_chain_client.h @@ -8,66 +8,200 @@ namespace mmux { namespace storage { - +/* */ class replica_chain_client { public: typedef block_client *client_ref; - + /* */ class locked_client { public: + /** + * @brief + * @param parent + */ + locked_client(replica_chain_client &parent); + /** + * @brief + */ + ~locked_client(); + /** + * @brief + */ + void unlock(); - const directory::replica_chain & chain(); + + /** + * @brief + * @return + */ + + const directory::replica_chain &chain(); + + /** + * @brief + * @return + */ bool redirecting() const; + + /** + * @brief + * @return + */ + const std::vector &redirect_chain(); + /** + * @brief + * @param cmd_id + * @param args + */ + void send_command(int32_t cmd_id, const std::vector &args); + + /** + * @brief + * @return + */ + std::vector recv_response(); + + /** + * @brief + * @param cmd_id + * @param args + * @return + */ + std::vector run_command(int32_t cmd_id, const std::vector &args); + + /** + * @brief + * @param cmd_id + * @param args + * @return + */ + std::vector run_command_redirected(int32_t cmd_id, const std::vector &args); private: + /* */ replica_chain_client &parent_; - + /* */ bool redirecting_; + /* */ std::vector redirect_chain_; }; + /** + * @brief + * @param fs + * @param path + * @param chain + * @param timeout_ms + */ + explicit replica_chain_client(std::shared_ptr fs, const std::string &path, const directory::replica_chain &chain, int timeout_ms = 1000); + /** + * @brief + */ + ~replica_chain_client(); - const directory::replica_chain & chain() const; + /** + * @brief + * @return + */ + + const directory::replica_chain &chain() const; + + /** + * @brief + * @return + */ std::shared_ptr lock(); + /** + * @brief + * @return + */ + bool is_connected() const; + /** + * @brief + * @param cmd_id + * @param args + */ + void send_command(int32_t cmd_id, const std::vector &args); + + /** + * @brief + * @return + */ + std::vector recv_response(); + + /** + * @brief + * @param cmd_id + * @param args + * @return + */ + std::vector run_command(int32_t cmd_id, const std::vector &args); + + /** + * @brief + * @param cmd_id + * @param args + * @return + */ + std::vector run_command_redirected(int32_t cmd_id, const std::vector &args); private: + + /** + * @brief + * @param chain + * @param timeout_ms + */ void connect(const directory::replica_chain &chain, int timeout_ms = 0); - void disconnect(); + /** + * @brief + */ + + void disconnect(); + /* */ std::shared_ptr fs_; + /* */ std::string path_; - + /* */ sequence_id seq_; + /* */ directory::replica_chain chain_; + /* */ block_client head_; + /* */ block_client tail_; + /* */ block_client::command_response_reader response_reader_; + /* */ std::vector cmd_client_; + /* */ bool in_flight_; + /* */ int timeout_ms_; }; diff --git a/libmmux/src/mmux/storage/kv/kv_block.h b/libmmux/src/mmux/storage/kv/kv_block.h index c70bdc253..08a443afd 100644 --- a/libmmux/src/mmux/storage/kv/kv_block.h +++ b/libmmux/src/mmux/storage/kv/kv_block.h @@ -20,7 +20,9 @@ typedef binary key_type; typedef binary value_type; extern std::vector KV_OPS; - +/** + * @brief + */ enum kv_op_id : int32_t { exists = 0, get = 1, @@ -39,9 +41,20 @@ enum kv_op_id : int32_t { upsert = 14, locked_upsert = 15 }; - +/* */ class kv_block : public chain_module { public: + /** + * @brief + * @param block_name + * @param capacity + * @param threshold_lo + * @param threshold_hi + * @param directory_host + * @param directory_port + * @param ser + */ + explicit kv_block(const std::string &block_name, std::size_t capacity = 134217728, // 128 MB; TODO: hardcoded default double threshold_lo = 0.05, @@ -50,83 +63,275 @@ class kv_block : public chain_module { int directory_port = 9090, std::shared_ptr ser = std::make_shared()); + /** + * @brief + * @param key + * @param redirect + * @return + */ + std::string exists(const key_type &key, bool redirect = false); + /** + * @brief + * @param key + * @param value + * @param redirect + * @return + */ + std::string put(const key_type &key, const value_type &value, bool redirect = false); + /** + * @brief + * @param key + * @param value + * @param redirect + * @return + */ + std::string locked_put(const key_type &key, const value_type &value, bool redirect = false); + /** + * @brief + * @param key + * @param value + * @param redirect + * @return + */ + std::string upsert(const key_type &key, const value_type &value, bool redirect = false); + /** + * @brief + * @param key + * @param value + * @param redirect + * @return + */ + std::string locked_upsert(const key_type &key, const value_type &value, bool redirect = false); + /** + * @brief + * @param key + * @param redirect + * @return + */ + value_type get(const key_type &key, bool redirect = false); + /** + * @brief + * @param key + * @param redirect + * @return + */ + std::string locked_get(const key_type &key, bool redirect = false); + /** + * @brief + * @param key + * @param value + * @param redirect + * @return + */ + std::string update(const key_type &key, const value_type &value, bool redirect = false); + /** + * @brief + * @param key + * @param value + * @param redirect + * @return + */ + std::string locked_update(const key_type &key, const value_type &value, bool redirect = false); + /** + * @brief + * @param key + * @param redirect + * @return + */ + std::string remove(const key_type &key, bool redirect = false); + /** + * @brief + * @param key + * @param redirect + * @return + */ + std::string locked_remove(const key_type &key, bool redirect = false); + /** + * @brief + * @param keys + */ + void keys(std::vector &keys); + /** + * @brief + * @param data + * @param slot_begin + * @param slot_end + * @param num_keys + */ + void locked_get_data_in_slot_range(std::vector &data, int32_t slot_begin, int32_t slot_end, int32_t num_keys); + /** + * @brief + * @return + */ + std::string lock(); + /** + * @brief + * @return + */ + std::string unlock(); + /** + * @brief + * @return + */ + bool is_locked(); + /** + * @brief + * @return + */ + std::size_t size() const; + /** + * @brief + * @return + */ + bool empty() const; + /** + * @brief + * @param _return + * @param oid + * @param args + */ + void run_command(std::vector &_return, int oid, const std::vector &args) override; + /** + * @brief + * @return + */ + bool is_dirty() const; + /** + * @brief + * @param path + */ + void load(const std::string &path) override; + /** + * @brief + * @param path + * @return + */ + bool sync(const std::string &path) override; + /** + * @brief + * @param path + * @return + */ + bool dump(const std::string &path) override; + /** + * @brief + * @return + */ + std::size_t storage_capacity() override; + /** + * @brief + * @return + */ + std::size_t storage_size() override; + /** + * @brief + */ + void reset() override; + + /** + * @brief + */ + void forward_all() override; + /** + * @brief + */ + void export_slots() override; private: + /** + * @brief + * @return + */ + bool overload(); + /** + * @brief + * @return + */ + bool underload(); + /* */ hash_table_type block_; + /* */ locked_hash_table_type locked_block_; - + /* */ std::string directory_host_; + /* */ int directory_port_; - + /* */ std::shared_ptr ser_; + /* */ std::atomic bytes_; + /* */ std::size_t capacity_; + /* */ double threshold_lo_; + /* */ double threshold_hi_; + /* */ std::atomic splitting_; + /* */ std::atomic merging_; - + /* */ std::atomic dirty_; }; diff --git a/libmmux/src/mmux/storage/kv/serde/binary_serde.h b/libmmux/src/mmux/storage/kv/serde/binary_serde.h index 77d2e207b..06c6947e3 100644 --- a/libmmux/src/mmux/storage/kv/serde/binary_serde.h +++ b/libmmux/src/mmux/storage/kv/serde/binary_serde.h @@ -5,11 +5,26 @@ namespace mmux { namespace storage { - +/* */ class binary_serde : public serde { public: binary_serde() = default; + /** + * @brief + * @param table + * @param out + * @return + */ + size_t serialize(const block_type &table, std::shared_ptr out) override; + + /** + * @brief + * @param in + * @param table + * @return + */ + size_t deserialize(std::shared_ptr in, block_type &table) override; }; } diff --git a/libmmux/src/mmux/storage/manager/detail/block_name_parser.h b/libmmux/src/mmux/storage/manager/detail/block_name_parser.h index 025a6a0fe..a600b742c 100644 --- a/libmmux/src/mmux/storage/manager/detail/block_name_parser.h +++ b/libmmux/src/mmux/storage/manager/detail/block_name_parser.h @@ -5,7 +5,7 @@ namespace mmux { namespace storage { - +/* Structure of block id */ struct block_id { std::string host; int32_t service_port; @@ -14,16 +14,35 @@ struct block_id { int32_t chain_port; int32_t id; }; - +/* Block name parser class */ class block_name_parser { public: + + /** + * @brief Block name parser + * @param name Block name + * @return Block id structure + */ + static block_id parse(const std::string &name); + + /** + * @brief Make a block name by connecting all parts in a single string + * @param host Host + * @param service_port Service port number + * @param management_port Management port number + * @param notification_port Notification port number + * @param chain_port Chain port number + * @param id Block id number + * @return Block name string + */ + static std::string make(const std::string &host, - int32_t service_port, - int32_t management_port, - int32_t notification_port, - int32_t chain_port, - int32_t id); + int32_t service_port, + int32_t management_port, + int32_t notification_port, + int32_t chain_port, + int32_t id); }; } diff --git a/libmmux/src/mmux/storage/manager/storage_management_client.h b/libmmux/src/mmux/storage/manager/storage_management_client.h index 11ffb4652..32d9fcc0e 100644 --- a/libmmux/src/mmux/storage/manager/storage_management_client.h +++ b/libmmux/src/mmux/storage/manager/storage_management_client.h @@ -7,16 +7,53 @@ namespace mmux { namespace storage { - +/* Storage management client class */ class storage_management_client { public: typedef storage_management_serviceClient thrift_client; storage_management_client() = default; + + /** + * @brief Destructor + */ + ~storage_management_client(); + + /** + * @brief Constuctor + * @param host Host + * @param port Port number + */ + storage_management_client(const std::string &host, int port); + + /** + * @brief Connect + * @param host Host + * @param port Port number + */ + void connect(const std::string &host, int port); + + /** + * @brief Disconnect + * TODO why twice? + */ + void disconnect(); + /** + * @brief + * @param block_id + * @param path + * @param slot_begin + * @param slot_end + * @param chain + * @param auto_scale + * @param role + * @param next_block_name + */ + void setup_block(int32_t block_id, const std::string &path, int32_t slot_begin, @@ -25,12 +62,48 @@ class storage_management_client { bool auto_scale, int32_t role, const std::string &next_block_name); + + /** + * @brief + * @param block_id + * @return + */ + std::pair slot_range(int32_t block_id); + + /** + * @brief + * @param block_id + * @param target_block_name + * @param slot_begin + * @param slot_end + */ + void set_exporting(int32_t block_id, const std::vector &target_block_name, int32_t slot_begin, int32_t slot_end); + + /** + * @brief + * @param block_id + * @param slot_begin + * @param slot_end + */ + void set_importing(int32_t block_id, int32_t slot_begin, int32_t slot_end); + + /** + * @brief + * @param block_id + * @param path + * @param slot_begin + * @param slot_end + * @param chain + * @param role + * @param next_block_name + */ + void setup_and_set_importing(int32_t block_id, const std::string &path, int32_t slot_begin, @@ -38,22 +111,100 @@ class storage_management_client { const std::vector &chain, int32_t role, const std::string &next_block_name); + + /** + * @brief + * @param block_id + * @param slot_begin + * @param slot_end + */ + void set_regular(int32_t block_id, int32_t slot_begin, int32_t slot_end); + + /** + * @brief + * @param block_id + * @return + */ + std::string path(int32_t block_id); + + /** + * @brief + * @param block_id + * @param backing_path + */ + void sync(int32_t block_id, const std::string &backing_path); + + /** + * @brief + * @param block_id + * @param backing_path + */ + void load(int32_t block_id, const std::string &backing_path); + + /** + * @brief + * @param block_id + * @param backing_path + */ + void dump(int32_t block_id, const std::string &backing_path); + + /** + * @brief + * @param block_id + */ + void reset(int32_t block_id); + + /** + * @brief + * @param block_id + * @return + */ + int64_t storage_capacity(int32_t block_id); + + /** + * @brief + * @param block_id + * @return + */ + int64_t storage_size(int32_t block_id); + + /** + * @brief + * @param block_id + */ + void resend_pending(int32_t block_id); + + /** + * @brief + * @param block_id + */ + void forward_all(int32_t block_id); + + /** + * @brief + * @param block_id + */ + void export_slots(int32_t block_id); private: + /* */ std::shared_ptr socket_{}; + /* */ std::shared_ptr transport_{}; + /* */ std::shared_ptr protocol_{}; + /* */ std::shared_ptr client_{}; }; diff --git a/libmmux/src/mmux/storage/manager/storage_management_server.h b/libmmux/src/mmux/storage/manager/storage_management_server.h index de364b289..dc407f434 100644 --- a/libmmux/src/mmux/storage/manager/storage_management_server.h +++ b/libmmux/src/mmux/storage/manager/storage_management_server.h @@ -7,9 +7,16 @@ namespace mmux { namespace storage { - +/* Storage management server class */ class storage_management_server { public: + /** + * @brief + * @param blocks + * @param address + * @param port + * @return + */ static std::shared_ptr create(std::vector> &blocks, const std::string &address, int port); diff --git a/libmmux/src/mmux/storage/manager/storage_management_service_factory.h b/libmmux/src/mmux/storage/manager/storage_management_service_factory.h index 77b236343..bad758c45 100644 --- a/libmmux/src/mmux/storage/manager/storage_management_service_factory.h +++ b/libmmux/src/mmux/storage/manager/storage_management_service_factory.h @@ -7,13 +7,34 @@ namespace mmux { namespace storage { - +/* */ class storage_management_service_factory : public storage_management_serviceIfFactory { public: + + /** + * @brief + * @param blocks + */ + explicit storage_management_service_factory(std::vector> &blocks); + + /** + * @brief + * @param connInfo + * @return + */ + storage_management_serviceIf *getHandler(const ::apache::thrift::TConnectionInfo &connInfo) override; + + /** + * @brief + * @param anIf + */ + void releaseHandler(storage_management_serviceIf *anIf) override; private: + + /* */ std::vector> &blocks_; }; diff --git a/libmmux/src/mmux/storage/manager/storage_management_service_handler.h b/libmmux/src/mmux/storage/manager/storage_management_service_handler.h index bba5282e9..ee33737aa 100644 --- a/libmmux/src/mmux/storage/manager/storage_management_service_handler.h +++ b/libmmux/src/mmux/storage/manager/storage_management_service_handler.h @@ -7,19 +7,98 @@ namespace mmux { namespace storage { - +/* Storage management service handler, inherited from */ class storage_management_service_handler : public storage_management_serviceIf { public: + + /** + * @brief Constructor + * @param blocks Blocks chain + */ + explicit storage_management_service_handler(std::vector> &blocks); + + /** + * @brief Get block path + * @param _return The returning value, string of block path + * @param block_id Block id + */ + void get_path(std::string &_return, int32_t block_id) override; + + /** + * @brief Write data back to persistent storage + * @param block_id Block id + * @param backing_path Block backing path + */ + void sync(int32_t block_id, const std::string &backing_path) override; + + /** + * @brief Write data back to persistent storage and clear the block + * @param block_id Block id + * @param backing_path Block backing path + */ + void dump(int32_t block_id, const std::string &backing_path) override; + + /** + * @brief Load data block from persistent storage + * @param block_id Block id + * @param backing_path Block backing path + */ + void load(int32_t block_id, const std::string &backing_path) override; + + /** + * @brief + * @param block_id + */ + void reset(int32_t block_id) override; + + /** + * @brief + * @param block_id + * @return + */ + int64_t storage_capacity(int32_t block_id) override; + + /** + * @brief + * @param block_id + * @return + */ + int64_t storage_size(int32_t block_id) override; + + /** + * @brief + * @param block_id + */ + void resend_pending(int32_t block_id) override; + + /** + * @brief + * @param block_id + */ + void forward_all(int32_t block_id) override; + + /** + * @brief + * @param block_id + * @param path + * @param slot_begin + * @param slot_end + * @param chain + * @param auto_scale + * @param chain_role + * @param next_block_name + */ + void setup_block(int32_t block_id, const std::string &path, int32_t slot_begin, @@ -28,14 +107,50 @@ class storage_management_service_handler : public storage_management_serviceIf { bool auto_scale, int32_t chain_role, const std::string &next_block_name) override; + + /** + * @brief + * @param _return + * @param block_id + */ + void slot_range(rpc_slot_range &_return, int32_t block_id) override; + + /** + * @brief + * @param block_id + * @param target_block + * @param slot_begin + * @param slot_end + */ + void set_exporting(int32_t block_id, const std::vector &target_block, int32_t slot_begin, int32_t slot_end) override; + + /** + * @brief + * @param block_id + * @param slot_begin + * @param slot_end + */ + void set_importing(int32_t block_id, int32_t slot_begin, int32_t slot_end) override; + + /** + * @brief + * @param block_id + * @param path + * @param slot_begin + * @param slot_end + * @param chain + * @param chain_role + * @param next_block_name + */ + void setup_and_set_importing(int32_t block_id, const std::string &path, int32_t slot_begin, @@ -43,11 +158,45 @@ class storage_management_service_handler : public storage_management_serviceIf { const std::vector &chain, int32_t chain_role, const std::string &next_block_name) override; + + /** + * @brief + * @param block_id + */ + void export_slots(int32_t block_id) override; + + /** + * @brief + * @param block_id + * @param slot_begin + * @param slot_end + */ + void set_regular(int32_t block_id, int32_t slot_begin, int32_t slot_end) override; + private: + + /** + * @brief + * @param e + * @return + */ + storage_management_exception make_exception(std::exception &e); + + /** + * @brief + * @param msg + * @return + */ + storage_management_exception make_exception(const std::string &msg); + + /** + * @brief + */ + std::vector> &blocks_; }; diff --git a/libmmux/src/mmux/storage/manager/storage_manager.h b/libmmux/src/mmux/storage/manager/storage_manager.h index 6c1bf3baa..2d2189262 100644 --- a/libmmux/src/mmux/storage/manager/storage_manager.h +++ b/libmmux/src/mmux/storage/manager/storage_manager.h @@ -8,10 +8,23 @@ namespace mmux { namespace storage { - +/* */ class storage_manager : public storage_management_ops { public: storage_manager() = default; + + /** + * @brief + * @param block_name + * @param path + * @param slot_begin + * @param slot_end + * @param chain + * @param auto_scale + * @param role + * @param next_block_name + */ + void setup_block(const std::string &block_name, const std::string &path, int32_t slot_begin, @@ -20,21 +33,115 @@ class storage_manager : public storage_management_ops { bool auto_scale, int32_t role, const std::string &next_block_name) override; + + /** + * @brief + * @param block_name + * @return + */ + std::string path(const std::string &block_name) override; + + /** + * @brief + * @param block_name + * @return + */ + std::pair slot_range(const std::string &block_name) override; + + /** + * @brief + * @param block_name + * @param backing_path + */ + void load(const std::string &block_name, const std::string &backing_path) override; + + /** + * @brief + * @param block_name + * @param backing_path + */ + void sync(const std::string &block_name, const std::string &backing_path) override; + + /** + * @brief + * @param block_name + * @param backing_path + */ + void dump(const std::string &block_name, const std::string &backing_path) override; + + /** + * @brief + * @param block_name + */ + void reset(const std::string &block_name) override; + + /** + * @brief + * @param block_name + * @return + */ + size_t storage_capacity(const std::string &block_name) override; + + /** + * @brief + * @param block_name + * @return + */ + size_t storage_size(const std::string &block_name) override; + + /** + * @brief + * @param block_name + */ + void resend_pending(const std::string &block_name) override; + + /** + * @brief + * @param block_name + */ + void forward_all(const std::string &block_name) override; + + /** + * @brief + * @param block_name + */ + void export_slots(const std::string &block_name) override; + + /** + * @brief + * @param block_name + * @param target_block + * @param slot_begin + * @param slot_end + */ + void set_exporting(const std::string &block_name, const std::vector &target_block, int32_t slot_begin, int32_t slot_end) override; + + /** + * @brief + * @param block_name + * @param path + * @param slot_begin + * @param slot_end + * @param chain + * @param role + * @param next_block_name + */ + void setup_and_set_importing(const std::string &block_name, const std::string &path, int32_t slot_begin, @@ -42,7 +149,23 @@ class storage_manager : public storage_management_ops { const std::vector &chain, int32_t role, const std::string &next_block_name) override; + + /** + * @brief + * @param block_name + * @param slot_begin + * @param slot_end + */ + void set_regular(const std::string &block_name, int32_t slot_begin, int32_t slot_end) override; + + /** + * @brief + * @param block_name + * @param slot_begin + * @param slot_end + */ + virtual void set_importing(const std::string &block_name, int32_t slot_begin, int32_t slot_end); }; diff --git a/libmmux/src/mmux/storage/notification/blocking_queue.h b/libmmux/src/mmux/storage/notification/blocking_queue.h index 497e1c9a4..343a9ea4f 100644 --- a/libmmux/src/mmux/storage/notification/blocking_queue.h +++ b/libmmux/src/mmux/storage/notification/blocking_queue.h @@ -12,9 +12,24 @@ using namespace std::chrono_literals; namespace mmux { namespace storage { + template +/* A blocking queue class template + * Each push and pop argument can only be done once at a time. + * Push can be done immediately when it gets the lock. + * Pull can be done only when queue is not empty. + * If empty and given timeout time, wait for conditional variable for given time + * If empty and given timeout time is -1, wait fot conditional variable + * */ class blocking_queue { public: + + /** + * @brief Pop element out of queue + * @param timeout_ms + * @return + */ + T pop(int64_t timeout_ms = -1) { std::unique_lock mlock(mutex_); while (queue_.empty()) { @@ -31,6 +46,11 @@ class blocking_queue { return item; } + /** + * @brief Push item in the queue using lvalue reference + * @param item Item + */ + void push(const T &item) { std::unique_lock mlock(mutex_); queue_.push(item); @@ -38,6 +58,12 @@ class blocking_queue { cond_.notify_one(); } + /** + * @brief Push item in the queue using rvalue reference + * Even if the item is temporary, the push will succeed + * @param item Item + */ + void push(T &&item) { std::unique_lock mlock(mutex_); queue_.push(std::move(item)); @@ -46,8 +72,11 @@ class blocking_queue { } private: + /* Queue */ std::queue queue_; + /* Operation mutex */ std::mutex mutex_; + /* Conditional variable */ std::condition_variable cond_; }; diff --git a/libmmux/src/mmux/storage/notification/notification_handler.h b/libmmux/src/mmux/storage/notification/notification_handler.h index 0a62dc906..a6a9446b9 100644 --- a/libmmux/src/mmux/storage/notification/notification_handler.h +++ b/libmmux/src/mmux/storage/notification/notification_handler.h @@ -8,17 +8,54 @@ namespace mmux { namespace storage { - +/* Notification handler class + * Inherited from subscription_serviceIf */ class notification_handler : public subscription_serviceIf { public: typedef blocking_queue> mailbox_t; - explicit notification_handler(mailbox_t ¬ifications, mailbox_t& controls); + /** + * @brief Constructor + * + * The notification mailbox is like a notification + * buffer as to prevent client from being overwhelmed + * + * The control mailbox is a log for subscribe and + * unsubscribe control operations + * + * @param notifications Notification mailbox + * @param controls Control mailbox + */ + + explicit notification_handler(mailbox_t ¬ifications, mailbox_t &controls); + + /** + * @brief Add notification to notification mailbox + * @param op Operation + * @param data Data + */ + void notification(const std::string &op, const std::string &data) override; + + /** + * @brief Add control operation to control mailbox + * @param type response type, subscribe or unsubscribe + * @param ops Operations + * @param msg message to be returned + */ + void control(response_type type, const std::vector &ops, const std::string &msg) override; private: + /* Notification mailbox + * The notification mailbox is like a notification + * buffer as to prevent client from being overwhelmed + */ mailbox_t ¬ifications_; + /* Control mailbox + * The control mailbox is a log for subscribe and + * unsubscribe control operations + */ mailbox_t &controls_; }; diff --git a/libmmux/src/mmux/storage/notification/notification_server.h b/libmmux/src/mmux/storage/notification/notification_server.h index 1d70675e3..5a960bf2a 100644 --- a/libmmux/src/mmux/storage/notification/notification_server.h +++ b/libmmux/src/mmux/storage/notification/notification_server.h @@ -8,9 +8,18 @@ namespace mmux { namespace storage { - +/* Notification server class */ class notification_server { public: + + /** + * @brief Create notification server + * @param blocks Chain modules + * @param address Host address + * @param port Port number + * @return Server + */ + static std::shared_ptr create(std::vector> &blocks, const std::string &address, int port); diff --git a/libmmux/src/mmux/storage/notification/notification_service_factory.h b/libmmux/src/mmux/storage/notification/notification_service_factory.h index c8bfada1c..b8b11992f 100644 --- a/libmmux/src/mmux/storage/notification/notification_service_factory.h +++ b/libmmux/src/mmux/storage/notification/notification_service_factory.h @@ -7,14 +7,35 @@ namespace mmux { namespace storage { - +/* Notification service factory class + * Inherited from notification_serviceIfFactory */ class notification_service_factory: public notification_serviceIfFactory { public: + + /** + * @brief Constructor + * @param blocks Data block + */ + explicit notification_service_factory(std::vector> &blocks); + + /** + * @brief Fetch notification service handler + * @param conn_info Connection info + * @return Notification service handler + */ + notification_serviceIf *getHandler(const ::apache::thrift::TConnectionInfo &conn_info) override; + + /** + * @brief Release handler + * @param anIf Notification handler + */ + void releaseHandler(notification_serviceIf *anIf) override; private: + /* Data blocks */ std::vector> &blocks_; }; diff --git a/libmmux/src/mmux/storage/notification/notification_service_handler.h b/libmmux/src/mmux/storage/notification/notification_service_handler.h index 701651d1e..a8ea26840 100644 --- a/libmmux/src/mmux/storage/notification/notification_service_handler.h +++ b/libmmux/src/mmux/storage/notification/notification_service_handler.h @@ -9,18 +9,51 @@ namespace mmux { namespace storage { - +/* Notification handler */ class notification_service_handler : public notification_serviceIf { public: + + /** + * @brief Constructor + * @param oprot Protocol + * @param blocks Data blocks + */ + explicit notification_service_handler(std::shared_ptr<::apache::thrift::protocol::TProtocol> oprot, std::vector> &blocks); + /** + * @brief Subscribe to a block for given operations + * This function adds all pairs of block and operations in local subscription set + * and adds all the operation with the subscription service client to the subscription map + * of the block + * + * @param block_id Block id number + * @param ops Operations + */ + void subscribe(int32_t block_id, const std::vector &ops) override; + + /** + * @brief Unsubscribe to the block for given operations + * This function takes down all the operations that are unsubscribed + * and clears local subscription, then it removes the subscription in + * the block's subscription map + * + * @param block_id Block id, if block id is -1, find block id in local subscription + * @param ops Operations, if ops is empty, clear all in local subscription, otherwise + * just clear the specific operations + */ + void unsubscribe(int32_t block_id, const std::vector &ops) override; private: + /* Local subscription set for pairs of block and operation */ std::set> local_subs_; + /* Protocol */ std::shared_ptr<::apache::thrift::protocol::TProtocol> oprot_; + /* Subscription service client */ std::shared_ptr client_; + /* Data blocks */ std::vector> &blocks_; }; diff --git a/libmmux/src/mmux/storage/notification/notification_worker.h b/libmmux/src/mmux/storage/notification/notification_worker.h index 5adb6d426..81b600700 100644 --- a/libmmux/src/mmux/storage/notification/notification_worker.h +++ b/libmmux/src/mmux/storage/notification/notification_worker.h @@ -7,7 +7,7 @@ namespace mmux { namespace storage { - +/* Notification worker class */ class notification_worker { public: typedef subscription_serviceProcessor processor_t; @@ -15,21 +15,51 @@ class notification_worker { typedef std::shared_ptr protocol_ptr_t; typedef blocking_queue> mailbox_t; + /** + * @brief Constructor + * @param notifications Notification mailbox + * @param controls Control mailbox + */ + notification_worker(mailbox_t ¬ifications, mailbox_t &controls); + + /** + * @brief Destructor + */ + ~notification_worker(); + /** + * @brief Add protocol to protocol list + * @param protocol + */ + void add_protocol(protocol_ptr_t protocol); + /** + * @brief Start processor thread + */ + void start(); + + /** + * @brief Stop worker + */ + void stop(); private: + /* Notification mailbox */ mailbox_t ¬ifications_; + /* Control mailbox */ mailbox_t &controls_; + /* Atomic boolean stop */ std::atomic_bool stop_; + /* worker thread */ std::thread worker_; - + /* Processor */ processor_ptr_t processor_; + /* Protocols */ std::vector protocols_; }; diff --git a/libmmux/src/mmux/storage/notification/subscription_map.h b/libmmux/src/mmux/storage/notification/subscription_map.h index 4b7f598d2..3a19f12a5 100644 --- a/libmmux/src/mmux/storage/notification/subscription_map.h +++ b/libmmux/src/mmux/storage/notification/subscription_map.h @@ -7,23 +7,58 @@ #include "subscription_service.h" namespace mmux { namespace storage { - +/* Subscription map class + * This map records all the clients that are waiting for a specific operation + * on the block. When the operation is done, the block will send a notification + * in order to let the client get the right data at right time + * */ class subscription_map { public: + /** + * @brief Constructor + */ + subscription_map(); + /** + * @brief Add operations to subscription map + * The subscription service client will generate a response + * @param ops Operations + * @param client Subscription service client + */ + void add_subscriptions(const std::vector &ops, std::shared_ptr client); + /** + * @brief Remove a subscription from subscription map + * If inform is true, subscription service client will generate a response + * @param ops Operations + * @param client Subscription service client + * @param inform Bool value that indicates inform or not + */ + void remove_subscriptions(const std::vector &ops, - std::shared_ptr client, - bool inform = true); + std::shared_ptr client, + bool inform = true); + + /** + * @brief Notify all the waiting clients of the operation + * @param op Operation + * @param msg Message to be sent to waiting clients + */ void notify(const std::string &op, const std::string &msg); + /** + * @brief Clear the subscription map + */ + void clear(); private: + /* Subscription mapp operation mutex */ std::mutex mtx_{}; + /* Subscription map TODO map or unordered map ??*/ std::map>> subs_{}; }; diff --git a/libmmux/src/mmux/storage/service/block_request_handler.h b/libmmux/src/mmux/storage/service/block_request_handler.h index d7f796475..c84ae7a5a 100644 --- a/libmmux/src/mmux/storage/service/block_request_handler.h +++ b/libmmux/src/mmux/storage/service/block_request_handler.h @@ -9,28 +9,76 @@ namespace mmux { namespace storage { - +/* Block request handler class + * Inherited from block_request_serviceIf */ class block_request_handler : public block_request_serviceIf { public: + + /** + * @brief Constructor + * @param client Block response client + * @param client_id_gen Client id generator + * @param blocks Data blocks + */ + explicit block_request_handler(std::shared_ptr client, std::atomic &client_id_gen, std::vector> &blocks); + /** + * @brief Fetch current client_id_gen_ and add one to the atomic pointer + * @return Current client_id_gen_ + */ + int64_t get_client_id() override; + + /** + * @brief Register the client with the block + * Save the block id and client id to the request handler and + * add client to the block response client map + * @param block_id Block id number + * @param client_id Client id number + */ + void register_client_id(int32_t block_id, int64_t client_id) override; + + /** + * @brief Request an command + * @param seq Sequence id TODO + * @param block_id Block id number + * @param cmd_id Command id number + * @param arguments Arguments + */ + void command_request(const sequence_id &seq, int32_t block_id, int32_t cmd_id, const std::vector &arguments) override; + /** + * @brief Fetch the handler's registered block id + * @return Registered block id number + */ + int32_t registered_block_id() const; + + /** + * @brief Fetch the handler's registered client id + * @return Registered client id number + */ + int64_t registered_client_id() const; private: + /* Block response client */ std::shared_ptr client_; + /* Registered block id */ int32_t registered_block_id_; + /* Registered client id */ int64_t registered_client_id_; + /* Client id generator */ std::atomic &client_id_gen_; + /* Data blocks */ std::vector> &blocks_; }; diff --git a/libmmux/src/mmux/storage/service/block_request_handler_factory.h b/libmmux/src/mmux/storage/service/block_request_handler_factory.h index 54740ea15..170d00aa2 100644 --- a/libmmux/src/mmux/storage/service/block_request_handler_factory.h +++ b/libmmux/src/mmux/storage/service/block_request_handler_factory.h @@ -6,14 +6,36 @@ namespace mmux { namespace storage { - -class block_request_handler_factory: public block_request_serviceIfFactory { +/* Block request handler factory */ +class block_request_handler_factory : public block_request_serviceIfFactory { public: + /** + * @brief Constructor + * @param blocks Data blocks + */ + explicit block_request_handler_factory(std::vector> &blocks); + + /** + * @brief Fetch block request handler + * @param connInfo Connection info + * @return Block request handler + */ + block_request_serviceIf *getHandler(const ::apache::thrift::TConnectionInfo &connInfo) override; + + /** + * @brief Release handler + * Remove the registered client from the block response client list + * @param anIf Block request handler + */ + void releaseHandler(block_request_serviceIf *anIf) override; + public: + /* Data blocks */ std::vector> &blocks_; + /* Client id generator, starts at 1 */ std::atomic client_id_gen_; }; diff --git a/libmmux/src/mmux/storage/service/block_response_client.h b/libmmux/src/mmux/storage/service/block_response_client.h index 5e0a68f2a..87d2cc4c4 100644 --- a/libmmux/src/mmux/storage/service/block_response_client.h +++ b/libmmux/src/mmux/storage/service/block_response_client.h @@ -6,14 +6,28 @@ namespace mmux { namespace storage { - +/* Block response client */ class block_response_client { public: typedef block_response_serviceClient thrift_client; + + /** + * @brief Constructor + * @param protocol Protocol + */ + explicit block_response_client(std::shared_ptr protocol); - void response(const sequence_id& seq, const std::vector& result); + /** + * @brief Response + * @param seq TODO + * @param result Operation result + */ + + void response(const sequence_id &seq, const std::vector &result); + private: + /* Block response service client */ std::shared_ptr client_{}; }; diff --git a/libmmux/src/mmux/storage/service/block_response_client_map.cpp b/libmmux/src/mmux/storage/service/block_response_client_map.cpp index 1b2ee0cc8..d624966bd 100644 --- a/libmmux/src/mmux/storage/service/block_response_client_map.cpp +++ b/libmmux/src/mmux/storage/service/block_response_client_map.cpp @@ -6,7 +6,7 @@ namespace storage { using namespace utils; -block_response_client_map::block_response_client_map(): clients_(0) {} +block_response_client_map::block_response_client_map() : clients_(0) {} void block_response_client_map::add_client(int64_t client_id, std::shared_ptr client) { clients_.insert(client_id, client); diff --git a/libmmux/src/mmux/storage/service/block_response_client_map.h b/libmmux/src/mmux/storage/service/block_response_client_map.h index 7774334e2..37de22e7b 100644 --- a/libmmux/src/mmux/storage/service/block_response_client_map.h +++ b/libmmux/src/mmux/storage/service/block_response_client_map.h @@ -7,20 +7,50 @@ namespace mmux { namespace storage { - +/* Block response client map class + * This map records all the client that sent request to the block. + * The map is from client id to block response client + * */ class block_response_client_map { public: + + /** + * @brief Constructor + */ + block_response_client_map(); + /** + * @brief Add a client to the map + * @param client_id Client id + * @param client Block response client + */ + void add_client(int64_t client_id, std::shared_ptr client); + /** + * @brief Remove a client from the map + * @param client_id Client id number + */ + void remove_client(int64_t client_id); + /** + * @brief Respond to the client + * @param seq Request sequence id + * @param result Request result + */ + void respond_client(const sequence_id &seq, const std::vector &result); + /** + * @brief Clear the map + */ + void clear(); private: + /* TODO no cuckoohash currently */ cuckoohash_map> clients_; }; diff --git a/libmmux/src/mmux/storage/service/block_server.h b/libmmux/src/mmux/storage/service/block_server.h index eb03d95db..e76ddcc42 100644 --- a/libmmux/src/mmux/storage/service/block_server.h +++ b/libmmux/src/mmux/storage/service/block_server.h @@ -6,9 +6,20 @@ namespace mmux { namespace storage { - +/* */ class block_server { public: + /** + * @brief Create block server + * @param blocks Data blocks -> used to create block request handler factory + * @param address Socket address + * @param port Socket port + * @param non_blocking TODO + * @param num_io_threads Number of IO threads TODO + * @param num_proc_threads Number of process threads + * @return Block server + */ + static std::shared_ptr create(std::vector> &blocks, const std::string &address, int port, diff --git a/libmmux/src/mmux/storage/service/buffered_transport_factory.h b/libmmux/src/mmux/storage/service/buffered_transport_factory.h index 3aa6bf7a4..d1155bef9 100644 --- a/libmmux/src/mmux/storage/service/buffered_transport_factory.h +++ b/libmmux/src/mmux/storage/service/buffered_transport_factory.h @@ -6,16 +6,26 @@ namespace mmux { namespace storage { - +/* */ class BufferedTransportFactory : public apache::thrift::transport::TTransportFactory { public: + /** + * @brief + * @param buffer_size + */ + explicit BufferedTransportFactory(uint32_t buffer_size) : buffer_size_(buffer_size) {} + /** + * @brief + */ + virtual ~BufferedTransportFactory() {} /** * Wraps the transport into a buffered one. */ + std::shared_ptr getTransport(std::shared_ptr trans) override { return std::shared_ptr(new apache::thrift::transport::TBufferedTransport( trans, @@ -23,6 +33,7 @@ class BufferedTransportFactory : public apache::thrift::transport::TTransportFac } private: + /* */ uint32_t buffer_size_; }; } diff --git a/libmmux/src/mmux/storage/service/chain_request_client.h b/libmmux/src/mmux/storage/service/chain_request_client.h index 259465541..816393ad7 100644 --- a/libmmux/src/mmux/storage/service/chain_request_client.h +++ b/libmmux/src/mmux/storage/service/chain_request_client.h @@ -7,31 +7,98 @@ namespace mmux { namespace storage { - +/* Chain request client */ class chain_request_client { public: typedef chain_request_serviceClient thrift_client; chain_request_client() = default; + + /** + * @brief Constructor + * @param host Socket host + * @param port Socket port number + * @param block_id Block id + */ + explicit chain_request_client(const std::string &host, int32_t port, int32_t block_id); + + /** + * @brief Destructor + * Close transport and set block id to -1 + */ + ~chain_request_client(); + + /** + * @brief Open client connection + * @param host Socket host number + * @param port Socket port number + * @param block_id Block id number + */ + void connect(const std::string &host, int port, int32_t block_id); + + /** + * @brief Close client connection + * Set block id to -1 + */ + void disconnect(); + + /** + * @brief Check if transport is opened + * @return Bool value, true if opened + */ + bool is_connected() const; + + /** + * @brief Fetch end point + * @return End point string: host + " : " + port + */ + std::string endpoint(); + + /** + * @brief Fetch protocol + * @return Protocol + */ + std::shared_ptr protocol() const; + /** + * @brief Send a request + * @param seq Sequence id + * @param cmd_id Command id + * @param arguments Arguments + */ + void request(const sequence_id &seq, int32_t cmd_id, const std::vector &arguments); - void run_command(std::vector& _return, int32_t cmd_id, const std::vector& arguments); + + /** + * @brief Run command + * @param _return Return value + * @param cmd_id Command id + * @param arguments Arguments + */ + + void run_command(std::vector &_return, int32_t cmd_id, const std::vector &arguments); private: + /* Client host number */ std::string host_; + /* Client port number */ int32_t port_{}; + /* Block id */ int32_t block_id_{}; - + /* Socket */ std::shared_ptr socket_{}; + /* Transport */ std::shared_ptr transport_{}; + /* Protocol */ std::shared_ptr protocol_{}; + /* Chain request service client */ std::shared_ptr client_{}; }; diff --git a/libmmux/src/mmux/storage/service/chain_request_handler.h b/libmmux/src/mmux/storage/service/chain_request_handler.h index 82621915b..5d08eb266 100644 --- a/libmmux/src/mmux/storage/service/chain_request_handler.h +++ b/libmmux/src/mmux/storage/service/chain_request_handler.h @@ -7,22 +7,47 @@ namespace mmux { namespace storage { - +/* Chain request handler class + * Inherited from chain_request_serviceIf */ class chain_request_handler : public chain_request_serviceIf { public: + /** + * @brief Constructor + * @param prot Protocol + * @param blocks Data blocks + */ + explicit chain_request_handler(std::shared_ptr<::apache::thrift::protocol::TProtocol> prot, std::vector> &blocks); + /** + * @brief Send chain request + * @param seq TODO + * @param block_id Block index + * @param cmd_id Command id + * @param arguments arguments + */ + void chain_request(const sequence_id &seq, int32_t block_id, int32_t cmd_id, const std::vector &arguments) override; + /** + * @brief Run command on data block + * @param _return + * @param block_id + * @param cmd_id + * @param arguments + */ + void run_command(std::vector &_return, const int32_t block_id, const int32_t cmd_id, const std::vector &arguments) override; private: + /* Data blocks*/ std::vector> &blocks_; + /* Protocol */ std::shared_ptr<::apache::thrift::protocol::TProtocol> prot_; }; diff --git a/libmmux/src/mmux/storage/service/chain_request_handler_factory.h b/libmmux/src/mmux/storage/service/chain_request_handler_factory.h index 9003ca0a5..215d7a1a7 100644 --- a/libmmux/src/mmux/storage/service/chain_request_handler_factory.h +++ b/libmmux/src/mmux/storage/service/chain_request_handler_factory.h @@ -6,14 +6,34 @@ namespace mmux { namespace storage { - +/* Chain request handler factory + * Inherited from chain request serviceIfFactory */ class chain_request_handler_factory: public chain_request_serviceIfFactory { public: + /** + * @brief Constructor + * @param blocks Data blocks + */ + explicit chain_request_handler_factory(std::vector> &blocks); + + /** + * @brief Fetch chain request handler + * @param connInfo Connection information + * @return Chain request handler + */ + chain_request_serviceIf *getHandler(const ::apache::thrift::TConnectionInfo &connInfo) override; + + /** + * @brief Release chain request handler + * @param anIf Chain request handler + */ + void releaseHandler(chain_request_serviceIf *anIf) override; private: + /* Data blocks */ std::vector> &blocks_; }; diff --git a/libmmux/src/mmux/storage/service/chain_response_client.h b/libmmux/src/mmux/storage/service/chain_response_client.h index 31b12ef32..ae066c57b 100644 --- a/libmmux/src/mmux/storage/service/chain_response_client.h +++ b/libmmux/src/mmux/storage/service/chain_response_client.h @@ -5,19 +5,43 @@ namespace mmux { namespace storage { - +/* Chain response client */ class chain_response_client { public: typedef chain_response_serviceClient thrift_client; chain_response_client() = default; + + /** + * @brief Constructor + * Reset client protocol + * @param prot Protocol + */ + explicit chain_response_client(std::shared_ptr<::apache::thrift::protocol::TProtocol> prot); + /** + * @brief Check if chain response serviceClient is set + * @return Bool value, true if set + */ + bool is_set() const; + /** + * @brief Reset client protocol + * Construct a new chain response serviceClient + * @param prot Protocol + */ + void reset_prot(std::shared_ptr<::apache::thrift::protocol::TProtocol> prot); - void ack(const sequence_id& seq); + /** + * @brief TODO + * @param seq + */ + + void ack(const sequence_id &seq); private: + /* Chain response serviceClient */ std::unique_ptr client_{}; }; diff --git a/libmmux/src/mmux/storage/service/chain_server.h b/libmmux/src/mmux/storage/service/chain_server.h index a03634e15..42b4d13f2 100644 --- a/libmmux/src/mmux/storage/service/chain_server.h +++ b/libmmux/src/mmux/storage/service/chain_server.h @@ -6,9 +6,21 @@ namespace mmux { namespace storage { - +/* */ class chain_server { public: + + /** + * @brief + * @param blocks + * @param address + * @param port + * @param non_blocking + * @param num_io_threads + * @param num_proc_threads + * @return + */ + static std::shared_ptr create(std::vector> &blocks, const std::string &address, int port, diff --git a/libmmux/src/mmux/storage/service/server_storage_tracker.h b/libmmux/src/mmux/storage/service/server_storage_tracker.h index 56881a627..e371b9349 100644 --- a/libmmux/src/mmux/storage/service/server_storage_tracker.h +++ b/libmmux/src/mmux/storage/service/server_storage_tracker.h @@ -10,25 +10,49 @@ namespace mmux { namespace storage { +/* */ class server_storage_tracker { public: + /** + * @brief + * @param blocks + * @param periodicity_ms + * @param output_file + */ + server_storage_tracker(std::vector> &blocks, uint64_t periodicity_ms, const std::string &output_file); + /** + * @brief + */ ~server_storage_tracker(); + /** + * @brief + */ + void start(); + /** + * @brief + */ + void stop(); private: + /* */ void report_file_sizes(std::ofstream &out); - + /* */ std::vector> &blocks_; + /* */ std::chrono::milliseconds periodicity_ms_; + /* */ std::atomic_bool stop_{false}; + /* */ std::thread worker_; + /* */ std::string output_file_; }; } diff --git a/libmmux/src/mmux/storage/storage_management_ops.h b/libmmux/src/mmux/storage/storage_management_ops.h index 8c234506b..91b7f7381 100644 --- a/libmmux/src/mmux/storage/storage_management_ops.h +++ b/libmmux/src/mmux/storage/storage_management_ops.h @@ -9,7 +9,7 @@ namespace mmux { namespace storage { - +/* Storage management operation virtual class */ class storage_management_ops { public: virtual void setup_block(const std::string &block_name,