diff --git a/client/client_priv.h b/client/client_priv.h index 8f5e42023ad8..52e5db7ea46f 100644 --- a/client/client_priv.h +++ b/client/client_priv.h @@ -1,6 +1,6 @@ /* Copyright (c) 2001, 2019, Oracle and/or its affiliates. All rights reserved. - Copyright (c) 2021, Huawei Technologies Co., Ltd. + This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, as published by the Free Software Foundation. @@ -142,7 +142,6 @@ enum options_client { OPT_AUTO_VERTICAL_OUTPUT, OPT_DEBUG_INFO, OPT_DEBUG_CHECK, - OPT_PQ, OPT_COLUMN_TYPES, OPT_ERROR_LOG_FILE, OPT_WRITE_BINLOG, diff --git a/include/my_alloc.h b/include/my_alloc.h index 316d4787ab36..ba60e05fffdc 100644 --- a/include/my_alloc.h +++ b/include/my_alloc.h @@ -44,6 +44,8 @@ #include "mysql/psi/psi_memory.h" typedef void CallBackFunc(PSI_memory_key key, size_t length, unsigned int id) ; +const int PQ_MEMORY_USED_BUCKET = 16; + /** * The MEM_ROOT is a simple arena, where allocations are carved out of * larger blocks. Using an arena over plain malloc gives you two main @@ -133,7 +135,7 @@ struct MEM_ROOT { * The returned pointer will always be 8-aligned. */ void *Alloc(size_t length) MY_ATTRIBUTE((malloc)); - + /** Allocate “num” objects of type T, and default-construct them. If the constructor throws an exception, behavior is undefined. @@ -316,7 +318,6 @@ struct MEM_ROOT { CallBackFunc *allocCBFunc = nullptr; CallBackFunc *freeCBFunc = nullptr; - }; // Legacy C thunks. Do not use in new code. diff --git a/include/my_base.h b/include/my_base.h index 7ec4524f513f..ba5818030c6e 100644 --- a/include/my_base.h +++ b/include/my_base.h @@ -1,5 +1,5 @@ /* Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved. - Copyright (c) 2021, Huawei Technologies Co., Ltd. + This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, as published by the Free Software Foundation. @@ -986,10 +986,8 @@ Information in the data-dictionary needs to be updated. */ #define HA_ERR_TOO_LONG_PATH 207 /** Histogram sampling initialization failed */ #define HA_ERR_SAMPLING_INIT_FAILED 208 -/** Complete scan all Parallel Query ranges */ -#define HA_ERR_END_OF_RANGES 209 /** Copy of last error number */ -#define HA_ERR_LAST 209 +#define HA_ERR_LAST 208 /* Number of different errors */ #define HA_ERR_ERRORS (HA_ERR_LAST - HA_ERR_FIRST + 1) diff --git a/mysys/dbug.cc b/mysys/dbug.cc index 6931f87997fe..123c94abcb23 100644 --- a/mysys/dbug.cc +++ b/mysys/dbug.cc @@ -923,14 +923,14 @@ void _db_pop_() { } } -void pq_stack_copy(CODE_STATE *leader_cs){ +void pq_stack_copy(CODE_STATE *leader_cs) { CODE_STATE *cs; get_code_state_or_return; DBUG_ASSERT(cs->stack == &init_settings); cs->stack = leader_cs->stack; } -void pq_stack_reset(){ +void pq_stack_reset() { CODE_STATE *cs; get_code_state_or_return; cs->stack = &init_settings; diff --git a/mysys/my_alloc.cc b/mysys/my_alloc.cc index 20c92e31dfee..32c02166c88e 100644 --- a/mysys/my_alloc.cc +++ b/mysys/my_alloc.cc @@ -54,8 +54,6 @@ #define MEM_ROOT_SINGLE_CHUNKS 0 #endif -const int PQ_MEMORY_USED_BUCKET = 16; - MEM_ROOT::Block *MEM_ROOT::AllocBlock(size_t length) { DBUG_TRACE; @@ -109,10 +107,13 @@ void *MEM_ROOT::Alloc(size_t length) { ret = AllocSlow(length); DBUG_ASSERT(m_allocated_size >= old_alloc_size); - if (allocCBFunc && (m_allocated_size - old_alloc_size)) - allocCBFunc(m_psi_key, m_allocated_size - old_alloc_size, - ((reinterpret_cast(this) >> PQ_MEMORY_USED_BUCKET) & 0xf)); - + if (allocCBFunc && (m_allocated_size - old_alloc_size)) { + allocCBFunc( + m_psi_key, m_allocated_size - old_alloc_size, + ((reinterpret_cast(this) >> PQ_MEMORY_USED_BUCKET) & + 0xf)); + } + return ret; } @@ -171,10 +172,12 @@ void MEM_ROOT::Clear() { DBUG_TRACE; DBUG_PRINT("enter", ("root: %p", this)); - if (freeCBFunc && m_allocated_size) - freeCBFunc(m_psi_key, m_allocated_size, + if (freeCBFunc && m_allocated_size) { + freeCBFunc( + m_psi_key, m_allocated_size, (reinterpret_cast(this) >> PQ_MEMORY_USED_BUCKET) & 0xf); - + } + // Already cleared, or memset() to zero, so just ignore. if (m_current_block == nullptr) return; @@ -210,10 +213,12 @@ void MEM_ROOT::ClearForReuse() { m_current_block->prev = nullptr; m_allocated_size = m_current_free_end - m_current_free_start; - if (freeCBFunc && (old_alloc_size - m_allocated_size)) - freeCBFunc(m_psi_key, old_alloc_size - m_allocated_size, + if (freeCBFunc && (old_alloc_size - m_allocated_size)) { + freeCBFunc( + m_psi_key, old_alloc_size - m_allocated_size, (reinterpret_cast(this) >> PQ_MEMORY_USED_BUCKET) & 0xf); - + } + FreeBlocks(start); } diff --git a/mysys/my_handler_errors.h b/mysys/my_handler_errors.h index af02f9c80b38..edb090f242d3 100644 --- a/mysys/my_handler_errors.h +++ b/mysys/my_handler_errors.h @@ -2,7 +2,7 @@ #define MYSYS_MY_HANDLER_ERRORS_INCLUDED /* Copyright (c) 2008, 2019, Oracle and/or its affiliates. All rights reserved. - Copyright (c) 2021, Huawei Technologies Co., Ltd. + This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, as published by the Free Software Foundation. @@ -123,7 +123,6 @@ static const char *handler_error_messages[] = { "Invalid table name", "Path is too long for the OS", "Histogram sampling initialization failed", - "Complete scan all Parallel Query ranges" }; extern void my_handler_error_register(void); diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 1436a168b3a1..ba4aacb63e6d 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -566,6 +566,7 @@ SET(SQL_SHARED_SOURCES ssl_acceptor_context.cc exchange.cc exchange_sort.cc + exchange_nosort.cc msg_queue.cc ) diff --git a/sql/binary_heap.h b/sql/binary_heap.h index b5e98092df34..f52e13ae1fcb 100644 --- a/sql/binary_heap.h +++ b/sql/binary_heap.h @@ -24,33 +24,25 @@ along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ -#include -#include -#include "my_sys.h" -#include "mysys_err.h" -#include "priority_queue.h" -#include "sort_param.h" -#include "sql/malloc_allocator.h" -#include "sql_base.h" #include "sql/sql_class.h" -/// compare based on the sort_key +// compare based on the sort_key typedef bool (*binaryheap_comparator)(int a, int b, void *arg); class binary_heap { -public: + public: binary_heap(int element_size, void *arg, binaryheap_comparator cmp, THD *thd) - : m_queue(NULL), + : m_queue(nullptr), m_capacity(element_size), m_size(0), m_compare(cmp), m_arg(arg), - m_thd(thd) - {} + m_thd(thd) {} -public: /* @retval: false of success, and true otherwise. */ bool init_binary_heap() { - if (!m_capacity) return true; + if (m_capacity <= 0) { + return true; + } m_queue = new (m_thd->pq_mem_root) int[m_capacity + 1]; if (!m_queue || DBUG_EVALUATE_IF("pq_msort_error9", true, false)) { my_error(ER_STD_BAD_ALLOC_ERROR, MYF(0), "", "(PQ::init)"); @@ -59,20 +51,7 @@ class binary_heap { return false; } - /* return the index ((i - 1) / 2) of the parent node of node i */ - inline int parent(unsigned int i) { - DBUG_ASSERT(i != 0); - return (--i) >> 1; - } - - /* return the index (2 * i + 1) of the left child of node i */ - inline int left(unsigned int i) { return (i << 1) | 1; } - - /* return the index (2 * i + 2) of the right child of node */ - inline int right(unsigned int i) { return (++i) << 1; } - void reset() { m_size = 0; } - uint size() { return m_size; } void add_unorderd(int element) { if (m_size >= m_capacity || @@ -84,23 +63,19 @@ class binary_heap { } void build() { - if (m_size <= 1) return; - for (int i = parent(m_size - 1); i >= 0; i--) sift_down(i); - } - - void add(int element) { - if (m_size >= m_capacity) { - my_error(ER_STD_BAD_ALLOC_ERROR, MYF(0), "out of binary heap space"); + if (m_size <= 1) { return; } - m_queue[m_size++] = element; - sift_up(m_size - 1); + for (int i = parent(m_size - 1); i >= 0; i--) { + sift_down(i); + } } int first() { DBUG_ASSERT(!empty()); return m_queue[0]; } + int remove_first() { DBUG_ASSERT(!empty()); if (m_size == 1) { @@ -118,21 +93,48 @@ class binary_heap { void replace_first(int element) { DBUG_ASSERT(!empty()); m_queue[0] = element; - if (m_size > 1) sift_down(0); + if (m_size > 1) { + sift_down(0); + } } + bool empty() { return m_size == 0; } - void cleanup() - { - if (m_queue) destroy(m_queue); + void cleanup() { + if (m_queue) { + destroy(m_queue); + } + } + + private: + /* return the index ((i - 1) / 2) of the parent node of node i */ + int parent(unsigned int i) { + DBUG_ASSERT(i != 0); + return (--i) >> 1; + } + + /* return the index (2 * i + 1) of the left child of node i */ + int left(unsigned int i) { return (i << 1) | 1; } + + /* return the index (2 * i + 2) of the right child of node */ + int right(unsigned int i) { return (++i) << 1; } + + uint size() { return m_size; } + + void add(int element) { + if (m_size >= m_capacity) { + my_error(ER_STD_BAD_ALLOC_ERROR, MYF(0), "out of binary heap space"); + return; + } + m_queue[m_size++] = element; + sift_up(m_size - 1); } -private: void swap_node(int a, int b) { - int T; - T = m_queue[a]; + int temp; + temp = m_queue[a]; m_queue[a] = m_queue[b]; - m_queue[b] = T; + m_queue[b] = temp; } void sift_down(int node_off) { @@ -142,17 +144,21 @@ class binary_heap { int swap_off = 0; if (left_off < m_size && - m_compare(m_queue[left_off], m_queue[node_off], m_arg)) + m_compare(m_queue[left_off], m_queue[node_off], m_arg)) { swap_off = left_off; + } if (right_off < m_size && m_compare(m_queue[right_off], m_queue[node_off], m_arg)) { if (!swap_off || - m_compare(m_queue[right_off], m_queue[left_off], m_arg)) + m_compare(m_queue[right_off], m_queue[left_off], m_arg)) { swap_off = right_off; + } } - if (!swap_off) break; + if (!swap_off) { + break; + } swap_node(swap_off, node_off); node_off = swap_off; @@ -165,14 +171,15 @@ class binary_heap { while (node_off != 0) { parent_off = parent(node_off); cmp = m_compare(m_queue[parent_off], m_queue[node_off], m_arg); - if (cmp) break; + if (cmp) { + break; + } swap_node(node_off, parent_off); node_off = parent_off; } } -private: int *m_queue; int m_capacity; int m_size; diff --git a/sql/exchange.cc b/sql/exchange.cc index 8cc7b51c2234..35633a67647e 100644 --- a/sql/exchange.cc +++ b/sql/exchange.cc @@ -1,6 +1,6 @@ /* Copyright (c) 2020, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 2021, Huawei Technologies Co., Ltd. - + This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, as published by the Free Software Foundation. @@ -22,61 +22,77 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include "exchange.h" -#include "table.h" #include "field.h" +#include "table.h" + /** * alloc space for mqueue_handles * * @return false for success, and otherwise true. */ -bool Exchange::init() -{ - uint i= 0; +bool Exchange::init() { + uint i = 0; MQueue **mqueues = nullptr; /** note that: all workers share one receiver. */ - m_receiver= new (m_thd->pq_mem_root) MQ_event(m_thd); - if (!m_receiver) goto err; + m_receiver = new (m_thd->pq_mem_root) MQ_event(m_thd); + if (m_receiver == nullptr) { + goto err; + } + + mqueue_handles = + new (m_thd->pq_mem_root) MQueue_handle *[m_nqueues] { nullptr }; + if (mqueue_handles == nullptr) { + goto err; + } - mqueue_handles= new (m_thd->pq_mem_root) MQueue_handle*[m_nqueues]{NULL}; - if (!mqueue_handles) goto err; + mqueues = new (m_thd->pq_mem_root) MQueue *[m_nqueues] { nullptr }; + if (mqueues == nullptr) { + goto err; + } - mqueues= new (m_thd->pq_mem_root) MQueue*[m_nqueues]{NULL}; - if (!mqueues) goto err; - for (i= 0; i < m_nqueues; i++) - { - char *ring_buffer= new (m_thd->pq_mem_root) char[RING_SIZE]; - if (!ring_buffer) goto err; - MQ_event *sender= new (m_thd->pq_mem_root) MQ_event(); - if (!sender) goto err; - mqueues[i]= new (m_thd->pq_mem_root) MQueue(sender, m_receiver, ring_buffer, RING_SIZE); - if (!mqueues[i] || DBUG_EVALUATE_IF("pq_mq_error1", true, false)) + for (i = 0; i < m_nqueues; i++) { + char *ring_buffer = new (m_thd->pq_mem_root) char[RING_SIZE]; + if (ring_buffer == nullptr) { goto err; + } + + MQ_event *sender = new (m_thd->pq_mem_root) MQ_event(); + if (sender == nullptr) { + goto err; + } + + mqueues[i] = new (m_thd->pq_mem_root) + MQueue(sender, m_receiver, ring_buffer, RING_SIZE); + if (mqueues[i] == nullptr || + DBUG_EVALUATE_IF("pq_mq_error1", true, false)) { + goto err; + } } - for (i= 0; i < m_nqueues; i++) - { - mqueue_handles[i]= new (m_thd->pq_mem_root) MQueue_handle(mqueues[i], MQ_BUFFER_SIZE); - if (!mqueue_handles[i] - || mqueue_handles[i]->init_mqueue_handle(m_thd) - || DBUG_EVALUATE_IF("pq_mq_error2", true, false)) + for (i = 0; i < m_nqueues; i++) { + mqueue_handles[i] = + new (m_thd->pq_mem_root) MQueue_handle(mqueues[i], MQ_BUFFER_SIZE); + if (mqueue_handles[i] == nullptr || + mqueue_handles[i]->init_mqueue_handle(m_thd) || + DBUG_EVALUATE_IF("pq_mq_error2", true, false)) { goto err; + } } + return false; - err: +err: sql_print_error("alloc space for exchange_record_pq error"); return true; } -void Exchange::cleanup() -{ +void Exchange::cleanup() { destroy(m_receiver); - if (mqueue_handles) - { - for(uint i= 0; i < m_nqueues; i++) - { - if (mqueue_handles[i]) + if (mqueue_handles) { + for (uint i = 0; i < m_nqueues; i++) { + if (mqueue_handles[i]) { mqueue_handles[i]->cleanup(); + } } } } @@ -87,12 +103,7 @@ void Exchange::cleanup() t[0] = {false, false}, t[1] = {false, true}, t[2] = {true, false}, t[3] = {true, true} */ -static char bool_item_field[8] = { - 0, 0, - 0, 1, - 1, 0, - 1, 1 -}; +static char bool_item_field[8] = {0, 0, 0, 1, 1, 0, 1, 1}; char *const_item_and_field_flag(uint value) { DBUG_ASSERT(value < 4); @@ -101,7 +112,6 @@ char *const_item_and_field_flag(uint value) { /** * reconstruct table->record[0] from MQ's message - * @qep_tab: the qep_tab of leader's first tmp table * @data: the message data * @msg_len: the message length * @@ -114,50 +124,49 @@ char *const_item_and_field_flag(uint value) { * (3) some unexpected errors; * */ -bool Exchange::convert_mq_data_to_record(uchar * data, int msg_len, - uchar * row_id) -{ +bool Exchange::convert_mq_data_to_record(uchar *data, int msg_len, + uchar *row_id) { /** there is error */ - if (m_thd->is_killed() || m_thd->pq_error) + if (m_thd->is_killed() || m_thd->pq_error) { return false; - - if(msg_len == 1 || DBUG_EVALUATE_IF("pq_worker_error10", true, false)) - { + } + if (msg_len == 1 || DBUG_EVALUATE_IF("pq_worker_error10", true, false)) { if (data[0] == EMPTY_MSG && DBUG_EVALUATE_IF("pq_worker_error10", false, true)) { return true; } else { - const char *msg= (data[0] == ERROR_MSG) ? "error msg" : "unknown error"; + const char *msg = (data[0] == ERROR_MSG) ? "error msg" : "unknown error"; sql_print_error("[Parallel query]: error info. %s\n", msg); - m_thd->pq_error= true; + m_thd->pq_error = true; return false; } } memset(m_table->record[0], 255, m_table->s->reclength); - int size_field= m_table->s->fields; + int size_field = m_table->s->fields; - //fetch the row_id info. from MQ - if (m_stab_output) - { - if (row_id) memcpy(row_id, data, m_ref_length); //row_id - data+= m_ref_length; + // fetch the row_id info. from MQ + if (m_stab_output) { + if (row_id) { + memcpy(row_id, data, m_ref_length); + } + data += m_ref_length; } - uint null_len= *(uint16 *)data; - data= data + sizeof(uint16); - uchar *null_flag= (uchar *)data; + uint null_len = *(uint16 *)data; + data = data + sizeof(uint16); + uchar *null_flag = (uchar *)data; /** - * Note that: we use two more bytes to store Field_varstring::length_bytes, - * and MYSQL_TIME_TYPE::pq_neg. + * Note that: we use one more byte to store Field_varstring::length_bytes. */ if (DBUG_EVALUATE_IF("pq_worker_error11", true, false) || - msg_len > (int)(m_table->s->reclength + 6 + null_len - + 2 * m_table->s->fields + (m_stab_output ? m_ref_length : 0))) - { + msg_len > + (int)(m_table->s->reclength + 6 + null_len + m_table->s->fields + + (m_stab_output ? m_ref_length : 0))) { m_thd->pq_error = true; - sql_print_error("[Parallel query]: sending (or receiving) msg from MQ error"); + sql_print_error( + "[Parallel query]: sending (or receiving) msg from MQ error"); return false; } @@ -165,158 +174,50 @@ bool Exchange::convert_mq_data_to_record(uchar * data, int msg_len, bool const_item = false; uint bit_value; char *status_flag = nullptr; - - uint null_offset= 0; - uint ptr_offset= null_len; - Field *item_field= nullptr; - int i= 0, j; - for(; i < size_field; i++) - { - item_field= m_table->field[i]; + uint null_offset = 0; + uint ptr_offset = null_len; + Field *item_field = nullptr; + int i = 0, j; + for (; i < size_field; i++) { + item_field = m_table->field[i]; /** determine whether it is a CONST_ITEM or NULL_FIELD */ - j= (null_offset >> 3) + 1; + j = (null_offset >> 3) + 1; DBUG_ASSERT((null_offset & 1) == 0); - bit_value= (null_flag[j] >> (6 - (null_offset & 7))) & 3; - status_flag= const_item_and_field_flag(bit_value); - const_item= *status_flag; - null_field= *(status_flag + 1); - - enum_field_types field_type= item_field->type(); - /** we should fill data into record[0] only when NOT_CONST_ITEM & NOT_NULL_FIELD */ - if (!const_item && !null_field) - { + bit_value = (null_flag[j] >> (6 - (null_offset & 7))) & 3; + status_flag = const_item_and_field_flag(bit_value); + const_item = *status_flag; + null_field = *(status_flag + 1); + enum_field_types field_type = item_field->type(); + /** we should fill data into record[0] only when NOT_CONST_ITEM & + * NOT_NULL_FIELD */ + if (!const_item && !null_field) { if (field_type == MYSQL_TYPE_VARCHAR || - field_type == MYSQL_TYPE_VAR_STRING) - { - Field_varstring *field_var= static_cast(item_field); - field_var->length_bytes= (uint) data[ptr_offset]; - ptr_offset++; //moving to the real value - - uint field_length= (field_var->length_bytes == 1) ? - (uint) data[ptr_offset] : - uint2korr(&data[ptr_offset]); - uint pack_length= field_length + field_var->length_bytes; + field_type == MYSQL_TYPE_VAR_STRING) { + Field_varstring *field_var = static_cast(item_field); + field_var->length_bytes = (uint)data[ptr_offset]; + ptr_offset++; // moving to the real value + uint field_length = (field_var->length_bytes == 1) + ? (uint)data[ptr_offset] + : uint2korr(&data[ptr_offset]); + uint pack_length = field_length + field_var->length_bytes; memcpy(field_var->ptr, &data[ptr_offset], pack_length); - ptr_offset+= pack_length; + ptr_offset += pack_length; } else { - uint pack_length= item_field->pack_length(); + uint pack_length = item_field->pack_length(); memcpy(item_field->ptr, &data[ptr_offset], pack_length); - ptr_offset+= pack_length; + ptr_offset += pack_length; } } /** set NULL flag of field */ - if (!const_item) - { - if (null_field) item_field->set_null(); - else item_field->set_notnull(); - } - null_offset+= 2; - } - return true; -} - -/** - * read one message from MQ[next_queue] - * - * @retval: true for success, and otherwise false - */ -bool Exchange_nosort::get_next(void **datap, uint32 *m_len, bool *done) -{ - MQ_RESULT result; - if (done != NULL) *done = false; - MQueue_handle *reader = get_mq_handle(m_next_queue); - result= reader->receive(datap, m_len); - - if (result == MQ_DETACHED) - { - if (done != NULL) *done = true; - return false; - } - if (result == MQ_WOULD_BLOCK) - { - return false; - } - return true; -} - -/** - * read one message from MQ in a round-robin method - * @datap: the message data - * @m_len: the message length - * - * @retval: true for success, and otherwise false - */ -bool Exchange_nosort::read_next(void **datap, uint32 *m_len) -{ - bool readerdone = false; - int nvisited = 0; - bool read_result= false; - THD *thd = get_thd(); - - /** round-robin method to acquire the data */ - while (!thd->is_killed() && !thd->pq_error) - { - read_result= get_next(datap, m_len, &readerdone); - /** detached and its content is also read done */ - if (readerdone) - { - DBUG_ASSERT(false == read_result); - m_active_readers--; - /** read done for all queues */ - if(m_active_readers == 0) - { - return false; - } - mqueue_mmove(m_next_queue, m_active_readers); - if(m_next_queue >= m_active_readers) { - m_next_queue = 0; - } - continue; - } - - /** data has successfully read into datap */ - if (read_result) return true; - /** move to next worker */ - m_next_queue++; - if (m_next_queue >= m_active_readers) - { - m_next_queue = 0; - } - nvisited++; - /** In a round-robin, we cannot read one message from MQ */ - if (nvisited >= m_active_readers) - { - /** - * this barrier ensures that the receiver first enters into a - * waiting status and then is waked by one sender. - */ - memory_barrier(); - MQ_event *receiver= get_mq_handle(0)->get_receiver(); - if (receiver) - { - receiver->wait_latch(); - receiver->reset_latch(); + if (!const_item) { + if (null_field) { + item_field->set_null(); + } else { + item_field->set_notnull(); } - nvisited= 0; } + null_offset += 2; } - return false; -} - -/** - * read one message from MQ and fill it to table->record[0] - * - * @retval: true for success, and otherwise false - */ -bool Exchange_nosort::read_mq_record() -{ - DBUG_ASSERT(!is_stable() && get_exchange_type() == EXCHANGE_NOSORT); - bool result= false; - uchar *data= nullptr; - uint32 msg_len= 0; - - /** read a message from MQ's local buffer */ - result= read_next((void **)&data, &msg_len); - return (result && convert_mq_data_to_record(data, msg_len)); -} + return true; +} \ No newline at end of file diff --git a/sql/exchange.h b/sql/exchange.h index 9c36cceda5f6..1854c79bac42 100644 --- a/sql/exchange.h +++ b/sql/exchange.h @@ -1,9 +1,9 @@ -#ifndef _EXCHAGE_H -#define _EXCHAGE_H +#ifndef EXCHAGE_H +#define EXCHAGE_H /* Copyright (c) 2020, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 2021, Huawei Technologies Co., Ltd. - + This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, as published by the Free Software Foundation. @@ -27,17 +27,12 @@ #include "msg_queue.h" class Exchange { -private: - uint32 m_nqueues; - MQ_event *m_receiver; - uint m_ref_length; - bool m_stab_output; -public: - MQueue_handle **mqueue_handles; - THD *m_thd; - TABLE *m_table; + public: + enum EXCHANGE_TYPE { + EXCHANGE_NOSORT = 0, + EXCHANGE_SORT, + }; -public: Exchange() : m_nqueues(0), m_receiver(nullptr), @@ -45,106 +40,57 @@ class Exchange { m_stab_output(false), mqueue_handles(nullptr), m_thd(nullptr), - m_table(nullptr) - {} + m_table(nullptr) {} - Exchange(THD *thd, TABLE *table, uint32 workers, uint ref_length, bool stab_output=false) + Exchange(THD *thd, TABLE *table, uint32 workers, uint ref_length, + bool stab_output = false) : m_nqueues(workers), m_receiver(nullptr), m_ref_length(ref_length), m_stab_output(stab_output), mqueue_handles(nullptr), m_thd(thd), - m_table(table) - {} + m_table(table) {} - enum EXCHANGE_TYPE { - EXCHANGE_NOSORT= 0, - EXCHANGE_SORT, - }; virtual ~Exchange() {} -public: + virtual bool read_mq_record() = 0; + virtual EXCHANGE_TYPE get_exchange_type() = 0; virtual bool init(); virtual void cleanup(); virtual bool convert_mq_data_to_record(uchar *data, int msg_len, uchar *row_id = nullptr); - inline THD *get_thd() - { - return m_thd ? m_thd : current_thd; - } + inline THD *get_thd() { return m_thd ? m_thd : current_thd; } - inline MQueue_handle *get_mq_handle(uint32 i) - { + inline MQueue_handle *get_mq_handle(uint32 i) { DBUG_ASSERT(mqueue_handles); - DBUG_ASSERT( i < m_nqueues); + DBUG_ASSERT(i < m_nqueues); return mqueue_handles[i]; } - inline void mqueue_mmove(int mq_next_readers, int number_workers) - { - memmove(&mqueue_handles[mq_next_readers], &mqueue_handles[mq_next_readers + 1], + inline void mqueue_mmove(int mq_next_readers, int number_workers) { + memmove(&mqueue_handles[mq_next_readers], + &mqueue_handles[mq_next_readers + 1], sizeof(MQueue_handle *) * (number_workers - mq_next_readers)); } - inline int lanuch_workers () - { - return m_nqueues; - } + inline int lanuch_workers() { return m_nqueues; } - inline TABLE* get_table() - { - return m_table; - } + inline TABLE *get_table() { return m_table; } - inline int ref_length() - { - return m_ref_length; - } - - inline bool is_stable() - { - return m_stab_output; - } - virtual bool read_mq_record() = 0; - virtual EXCHANGE_TYPE get_exchange_type() = 0; -}; - -class Exchange_nosort : public Exchange { -private: - int m_next_queue; /** the next read queue */ - int m_active_readers; /** number of left queues which is sending or - receiving message */ - /** read one message from MQ[next_queue] */ - bool get_next(void **datap, uint32 *len, bool *done); - /** read one message rom MQ in a round-robin manner */ - bool read_next(void **datap, uint32 *len); - -public: - Exchange_nosort() - : Exchange(), - m_next_queue(0), - m_active_readers(0) - {} - - Exchange_nosort(THD *thd, TABLE *table, int workers, int ref_length, bool stab_output) - : Exchange(thd, table, workers, ref_length, stab_output) - { - m_next_queue= 0; - m_active_readers= workers; - } - - virtual ~Exchange_nosort() {} + inline int ref_length() { return m_ref_length; } - /** read/convert one message from mq to table->record[0] */ - bool read_mq_record() override; - - inline EXCHANGE_TYPE get_exchange_type() override - { - return EXCHANGE_NOSORT; - } + inline bool is_stable() { return m_stab_output; } + private: + uint32 m_nqueues; + MQ_event *m_receiver; + uint m_ref_length; + bool m_stab_output; + MQueue_handle **mqueue_handles; + THD *m_thd; + TABLE *m_table; }; -#endif +#endif // EXCHAGE_H diff --git a/sql/exchange_nosort.cc b/sql/exchange_nosort.cc new file mode 100644 index 000000000000..fea2f799e384 --- /dev/null +++ b/sql/exchange_nosort.cc @@ -0,0 +1,125 @@ +/* Copyright (c) 2020, Oracle and/or its affiliates. All Rights Reserved. + Copyright (c) 2021, Huawei Technologies Co., Ltd. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include "exchange_nosort.h" + +/** + * read one message from MQ[next_queue] + * + * @retval: true for success, and otherwise false + */ +bool Exchange_nosort::get_next(void **datap, uint32 *m_len, bool *done) { + MQ_RESULT result; + if (done != nullptr) { + *done = false; + } + MQueue_handle *reader = get_mq_handle(m_next_queue); + result = reader->receive(datap, m_len); + if (result == MQ_DETACHED) { + if (done != nullptr) { + *done = true; + } + return false; + } + if (result == MQ_WOULD_BLOCK) { + return false; + } + + return true; +} + +/** + * read one message from MQ in a round-robin method + * @datap: the message data + * @m_len: the message length + * + * @retval: true for success, and otherwise false + */ +bool Exchange_nosort::read_next(void **datap, uint32 *m_len) { + bool readerdone = false; + int nvisited = 0; + bool read_result = false; + THD *thd = get_thd(); + + /** round-robin method to acquire the data */ + while (!thd->is_killed() && !thd->pq_error) { + read_result = get_next(datap, m_len, &readerdone); + /** detached and its content is also read done */ + if (readerdone) { + DBUG_ASSERT(false == read_result); + m_active_readers--; + /** read done for all queues */ + if (m_active_readers == 0) { + return false; + } + mqueue_mmove(m_next_queue, m_active_readers); + if (m_next_queue >= m_active_readers) { + m_next_queue = 0; + } + continue; + } + + /** data has successfully read into datap */ + if (read_result) { + return true; + } + /** move to next worker */ + m_next_queue++; + if (m_next_queue >= m_active_readers) { + m_next_queue = 0; + } + nvisited++; + /** In a round-robin, we cannot read one message from MQ */ + if (nvisited >= m_active_readers) { + /** + * this barrier ensures that the receiver first enters into a + * waiting status and then is waked by one sender. + */ + memory_barrier(); + MQ_event *receiver = get_mq_handle(0)->get_receiver(); + if (receiver) { + receiver->wait_latch(); + receiver->reset_latch(); + } + nvisited = 0; + } + } + + return false; +} + +/** + * read one message from MQ and fill it to table->record[0] + * + * @retval: true for success, and otherwise false + */ +bool Exchange_nosort::read_mq_record() { + DBUG_ASSERT(!is_stable() && get_exchange_type() == EXCHANGE_NOSORT); + bool result = false; + uchar *data = nullptr; + uint32 msg_len = 0; + + /** read a message from MQ's local buffer */ + result = read_next((void **)&data, &msg_len); + return (result && convert_mq_data_to_record(data, msg_len)); +} diff --git a/sql/exchange_nosort.h b/sql/exchange_nosort.h new file mode 100644 index 000000000000..7d3ad4c93abf --- /dev/null +++ b/sql/exchange_nosort.h @@ -0,0 +1,57 @@ +#ifndef EXCHANGE_NOSORT_H +#define EXCHANGE_NOSORT_H + +/* Copyright (c) 2020, Oracle and/or its affiliates. All Rights Reserved. + Copyright (c) 2021, Huawei Technologies Co., Ltd. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include "exchange.h" + +class Exchange_nosort : public Exchange { + public: + Exchange_nosort() : Exchange(), m_next_queue(0), m_active_readers(0) {} + + Exchange_nosort(THD *thd, TABLE *table, int workers, int ref_length, + bool stab_output) + : Exchange(thd, table, workers, ref_length, stab_output), + m_next_queue(0), + m_active_readers(workers) {} + + virtual ~Exchange_nosort() {} + + /** read/convert one message from mq to table->record[0] */ + bool read_mq_record() override; + + inline EXCHANGE_TYPE get_exchange_type() override { return EXCHANGE_NOSORT; } + + private: + /** read one message from MQ[next_queue] */ + bool get_next(void **datap, uint32 *len, bool *done); + /** read one message from MQ in a round-robin manner */ + bool read_next(void **datap, uint32 *len); + + int m_next_queue; /** the next read queue */ + int m_active_readers; /** number of left queues which is sending or + receiving message */ +}; + +#endif // EXCHANGE_NOSORT_H diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 34feaa75be39..22705b2be25f 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -8215,13 +8215,8 @@ static int show_net_compression(THD *thd, SHOW_VAR *var, char *buff) { static int show_pq_memory(THD *, SHOW_VAR *var, char *buff) { var->type = SHOW_INT; var->value = buff; - - uint sum_memory = 0; - for (uint i = 0; i < PQ_MEMORY_USED_BUCKET; i++) - sum_memory += atomic_add(pq_memory_used[i], 0); - unsigned int *value = reinterpret_cast(buff); - *value = sum_memory; + *value = get_pq_memory_total(); return 0; } diff --git a/sql/pq_global.h b/sql/pq_global.h index 6b3765dbd1fc..a8ab9a808fd1 100644 --- a/sql/pq_global.h +++ b/sql/pq_global.h @@ -33,8 +33,6 @@ #define TIME_MILLION 1000000 #define TIME_BILLION 1000000000 -const int PQ_MEMORY_USED_BUCKET = 16; - template T atomic_add(T &value, T n) { return __sync_fetch_and_add(&value, n); diff --git a/sql/sql_parallel.cc b/sql/sql_parallel.cc index cd45025635ef..13d6de346c30 100644 --- a/sql/sql_parallel.cc +++ b/sql/sql_parallel.cc @@ -42,8 +42,8 @@ #include "sql/sql_tmp_table.h" #include "sql/timing_iterator.h" #include "sql/transaction.h" -#include "sql/exchange.h" #include "sql/exchange_sort.h" +#include "sql/exchange_nosort.h" #include "sql/basic_row_iterators.h" #include "sql/pq_global.h" #include "sql/pq_condition.h"