Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

并行查询代码优化 #164

Merged
merged 1 commit into from
Oct 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions client/client_priv.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
Copyright (c) 2001, 2019, Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2021, Huawei Technologies Co., Ltd.

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
as published by the Free Software Foundation.
Expand Down Expand Up @@ -142,7 +142,6 @@ enum options_client {
OPT_AUTO_VERTICAL_OUTPUT,
OPT_DEBUG_INFO,
OPT_DEBUG_CHECK,
OPT_PQ,
OPT_COLUMN_TYPES,
OPT_ERROR_LOG_FILE,
OPT_WRITE_BINLOG,
Expand Down
5 changes: 3 additions & 2 deletions include/my_alloc.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
#include "mysql/psi/psi_memory.h"

typedef void CallBackFunc(PSI_memory_key key, size_t length, unsigned int id) ;
const int PQ_MEMORY_USED_BUCKET = 16;

/**
* The MEM_ROOT is a simple arena, where allocations are carved out of
* larger blocks. Using an arena over plain malloc gives you two main
Expand Down Expand Up @@ -133,7 +135,7 @@ struct MEM_ROOT {
* The returned pointer will always be 8-aligned.
*/
void *Alloc(size_t length) MY_ATTRIBUTE((malloc));

/**
Allocate “num” objects of type T, and default-construct them.
If the constructor throws an exception, behavior is undefined.
Expand Down Expand Up @@ -316,7 +318,6 @@ struct MEM_ROOT {
CallBackFunc *allocCBFunc = nullptr;

CallBackFunc *freeCBFunc = nullptr;

};

// Legacy C thunks. Do not use in new code.
Expand Down
6 changes: 2 additions & 4 deletions include/my_base.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2021, Huawei Technologies Co., Ltd.

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
as published by the Free Software Foundation.
Expand Down Expand Up @@ -986,10 +986,8 @@ Information in the data-dictionary needs to be updated. */
#define HA_ERR_TOO_LONG_PATH 207
/** Histogram sampling initialization failed */
#define HA_ERR_SAMPLING_INIT_FAILED 208
/** Complete scan all Parallel Query ranges */
#define HA_ERR_END_OF_RANGES 209
/** Copy of last error number */
#define HA_ERR_LAST 209
#define HA_ERR_LAST 208

/* Number of different errors */
#define HA_ERR_ERRORS (HA_ERR_LAST - HA_ERR_FIRST + 1)
Expand Down
4 changes: 2 additions & 2 deletions mysys/dbug.cc
Original file line number Diff line number Diff line change
Expand Up @@ -923,14 +923,14 @@ void _db_pop_() {
}
}

void pq_stack_copy(CODE_STATE *leader_cs){
void pq_stack_copy(CODE_STATE *leader_cs) {
CODE_STATE *cs;
get_code_state_or_return;
DBUG_ASSERT(cs->stack == &init_settings);
cs->stack = leader_cs->stack;
}

void pq_stack_reset(){
void pq_stack_reset() {
CODE_STATE *cs;
get_code_state_or_return;
cs->stack = &init_settings;
Expand Down
29 changes: 17 additions & 12 deletions mysys/my_alloc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@
#define MEM_ROOT_SINGLE_CHUNKS 0
#endif

const int PQ_MEMORY_USED_BUCKET = 16;

MEM_ROOT::Block *MEM_ROOT::AllocBlock(size_t length) {
DBUG_TRACE;

Expand Down Expand Up @@ -109,10 +107,13 @@ void *MEM_ROOT::Alloc(size_t length) {

ret = AllocSlow(length);
DBUG_ASSERT(m_allocated_size >= old_alloc_size);
if (allocCBFunc && (m_allocated_size - old_alloc_size))
allocCBFunc(m_psi_key, m_allocated_size - old_alloc_size,
((reinterpret_cast<unsigned long>(this) >> PQ_MEMORY_USED_BUCKET) & 0xf));

if (allocCBFunc && (m_allocated_size - old_alloc_size)) {
allocCBFunc(
m_psi_key, m_allocated_size - old_alloc_size,
((reinterpret_cast<unsigned long>(this) >> PQ_MEMORY_USED_BUCKET) &
0xf));
}

return ret;
}

Expand Down Expand Up @@ -171,10 +172,12 @@ void MEM_ROOT::Clear() {
DBUG_TRACE;
DBUG_PRINT("enter", ("root: %p", this));

if (freeCBFunc && m_allocated_size)
freeCBFunc(m_psi_key, m_allocated_size,
if (freeCBFunc && m_allocated_size) {
freeCBFunc(
m_psi_key, m_allocated_size,
(reinterpret_cast<unsigned long>(this) >> PQ_MEMORY_USED_BUCKET) & 0xf);

}

// Already cleared, or memset() to zero, so just ignore.
if (m_current_block == nullptr) return;

Expand Down Expand Up @@ -210,10 +213,12 @@ void MEM_ROOT::ClearForReuse() {
m_current_block->prev = nullptr;
m_allocated_size = m_current_free_end - m_current_free_start;

if (freeCBFunc && (old_alloc_size - m_allocated_size))
freeCBFunc(m_psi_key, old_alloc_size - m_allocated_size,
if (freeCBFunc && (old_alloc_size - m_allocated_size)) {
freeCBFunc(
m_psi_key, old_alloc_size - m_allocated_size,
(reinterpret_cast<uintptr_t>(this) >> PQ_MEMORY_USED_BUCKET) & 0xf);

}

FreeBlocks(start);
}

Expand Down
3 changes: 1 addition & 2 deletions mysys/my_handler_errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#define MYSYS_MY_HANDLER_ERRORS_INCLUDED

/* Copyright (c) 2008, 2019, Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2021, Huawei Technologies Co., Ltd.

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
as published by the Free Software Foundation.
Expand Down Expand Up @@ -123,7 +123,6 @@ static const char *handler_error_messages[] = {
"Invalid table name",
"Path is too long for the OS",
"Histogram sampling initialization failed",
"Complete scan all Parallel Query ranges"
};

extern void my_handler_error_register(void);
Expand Down
1 change: 1 addition & 0 deletions sql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ SET(SQL_SHARED_SOURCES
ssl_acceptor_context.cc
exchange.cc
exchange_sort.cc
exchange_nosort.cc
msg_queue.cc
)

Expand Down
107 changes: 57 additions & 50 deletions sql/binary_heap.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,25 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */

#include <algorithm>
#include <iostream>
#include "my_sys.h"
#include "mysys_err.h"
#include "priority_queue.h"
#include "sort_param.h"
#include "sql/malloc_allocator.h"
#include "sql_base.h"
#include "sql/sql_class.h"

/// compare based on the sort_key
// compare based on the sort_key
typedef bool (*binaryheap_comparator)(int a, int b, void *arg);
class binary_heap {
public:
public:
binary_heap(int element_size, void *arg, binaryheap_comparator cmp, THD *thd)
: m_queue(NULL),
: m_queue(nullptr),
m_capacity(element_size),
m_size(0),
m_compare(cmp),
m_arg(arg),
m_thd(thd)
{}
m_thd(thd) {}

public:
/* @retval: false of success, and true otherwise. */
bool init_binary_heap() {
if (!m_capacity) return true;
if (m_capacity <= 0) {
return true;
}
m_queue = new (m_thd->pq_mem_root) int[m_capacity + 1];
if (!m_queue || DBUG_EVALUATE_IF("pq_msort_error9", true, false)) {
my_error(ER_STD_BAD_ALLOC_ERROR, MYF(0), "", "(PQ::init)");
Expand All @@ -59,20 +51,7 @@ class binary_heap {
return false;
}

/* return the index ((i - 1) / 2) of the parent node of node i */
inline int parent(unsigned int i) {
DBUG_ASSERT(i != 0);
return (--i) >> 1;
}

/* return the index (2 * i + 1) of the left child of node i */
inline int left(unsigned int i) { return (i << 1) | 1; }

/* return the index (2 * i + 2) of the right child of node */
inline int right(unsigned int i) { return (++i) << 1; }

void reset() { m_size = 0; }
uint size() { return m_size; }

void add_unorderd(int element) {
if (m_size >= m_capacity ||
Expand All @@ -84,23 +63,19 @@ class binary_heap {
}

void build() {
if (m_size <= 1) return;
for (int i = parent(m_size - 1); i >= 0; i--) sift_down(i);
}

void add(int element) {
if (m_size >= m_capacity) {
my_error(ER_STD_BAD_ALLOC_ERROR, MYF(0), "out of binary heap space");
if (m_size <= 1) {
return;
}
m_queue[m_size++] = element;
sift_up(m_size - 1);
for (int i = parent(m_size - 1); i >= 0; i--) {
sift_down(i);
}
}

int first() {
DBUG_ASSERT(!empty());
return m_queue[0];
}

int remove_first() {
DBUG_ASSERT(!empty());
if (m_size == 1) {
Expand All @@ -118,21 +93,48 @@ class binary_heap {
void replace_first(int element) {
DBUG_ASSERT(!empty());
m_queue[0] = element;
if (m_size > 1) sift_down(0);
if (m_size > 1) {
sift_down(0);
}
}

bool empty() { return m_size == 0; }

void cleanup()
{
if (m_queue) destroy(m_queue);
void cleanup() {
if (m_queue) {
destroy(m_queue);
}
}

private:
/* return the index ((i - 1) / 2) of the parent node of node i */
int parent(unsigned int i) {
DBUG_ASSERT(i != 0);
return (--i) >> 1;
}

/* return the index (2 * i + 1) of the left child of node i */
int left(unsigned int i) { return (i << 1) | 1; }

/* return the index (2 * i + 2) of the right child of node */
int right(unsigned int i) { return (++i) << 1; }

uint size() { return m_size; }

void add(int element) {
if (m_size >= m_capacity) {
my_error(ER_STD_BAD_ALLOC_ERROR, MYF(0), "out of binary heap space");
return;
}
m_queue[m_size++] = element;
sift_up(m_size - 1);
}

private:
void swap_node(int a, int b) {
int T;
T = m_queue[a];
int temp;
temp = m_queue[a];
m_queue[a] = m_queue[b];
m_queue[b] = T;
m_queue[b] = temp;
}

void sift_down(int node_off) {
Expand All @@ -142,17 +144,21 @@ class binary_heap {
int swap_off = 0;

if (left_off < m_size &&
m_compare(m_queue[left_off], m_queue[node_off], m_arg))
m_compare(m_queue[left_off], m_queue[node_off], m_arg)) {
swap_off = left_off;
}

if (right_off < m_size &&
m_compare(m_queue[right_off], m_queue[node_off], m_arg)) {
if (!swap_off ||
m_compare(m_queue[right_off], m_queue[left_off], m_arg))
m_compare(m_queue[right_off], m_queue[left_off], m_arg)) {
swap_off = right_off;
}
}

if (!swap_off) break;
if (!swap_off) {
break;
}

swap_node(swap_off, node_off);
node_off = swap_off;
Expand All @@ -165,14 +171,15 @@ class binary_heap {
while (node_off != 0) {
parent_off = parent(node_off);
cmp = m_compare(m_queue[parent_off], m_queue[node_off], m_arg);
if (cmp) break;
if (cmp) {
break;
}

swap_node(node_off, parent_off);
node_off = parent_off;
}
}

private:
int *m_queue;
int m_capacity;
int m_size;
Expand Down
Loading