Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sql/sys_vars.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6281,6 +6281,14 @@ static Sys_var_charptr Sys_wsrep_patch_version(
READ_ONLY GLOBAL_VAR(wsrep_patch_version_ptr), CMD_LINE_HELP_ONLY,
DEFAULT(WSREP_PATCH_VERSION));

static Sys_var_charptr Sys_wsrep_applier_priority(
"wsrep_applier_priority", "Scheduler and priority for WSREP applier threads",
PREALLOCATED GLOBAL_VAR(wsrep_applier_priority), CMD_LINE(OPT_ARG),
DEFAULT("other:0"),
NO_MUTEX_GUARD, NOT_IN_BINLOG,
ON_CHECK(wsrep_applier_priority_check),
ON_UPDATE(wsrep_applier_priority_update));

#endif /* WITH_WSREP */

static bool fix_host_cache_size(sys_var *, THD *, enum_var_type)
Expand Down
12 changes: 12 additions & 0 deletions sql/wsrep_mysqld.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ uint wsrep_ignore_apply_errors= 0;

std::atomic <bool> wsrep_thread_create_failed;

const char *wsrep_applier_priority; // Priority of applier threads!

/*
* End configuration options
*/
Expand Down Expand Up @@ -855,6 +857,8 @@ int wsrep_init()
wsrep_init_position();
wsrep_sst_auth_init();

thread_priority_manager = new Thread_priority_manager();

if (!*wsrep_provider ||
!strcasecmp(wsrep_provider, WSREP_NONE))
{
Expand Down Expand Up @@ -1038,6 +1042,8 @@ void wsrep_deinit(bool free_options)
{
wsrep_sst_auth_free();
}

delete thread_priority_manager;
}

/* Destroy wsrep thread LOCKs and CONDs */
Expand Down Expand Up @@ -3764,6 +3770,10 @@ void* start_wsrep_THD(void *arg)

thd->real_id=pthread_self(); // Keep purify happy

// Allow the system variable "wsrep_applier_priority" to control
// the priority of this thread.
thread_priority_manager->add(thd->real_id);

my_net_init(&thd->net,(st_vio*) 0, thd, MYF(0));

DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id));
Expand Down Expand Up @@ -3835,6 +3845,8 @@ void* start_wsrep_THD(void *arg)
before cleanup. */
wsrep_store_threadvars(thd);

thread_priority_manager->remove(thd->real_id);

close_connection(thd, 0);

mysql_mutex_lock(&LOCK_wsrep_slave_threads);
Expand Down
1 change: 1 addition & 0 deletions sql/wsrep_mysqld.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ extern uint32 wsrep_gtid_domain_id;
extern std::atomic <bool > wsrep_thread_create_failed;
extern ulonglong wsrep_mode;
extern my_bool wsrep_strict_ddl;
extern const char *wsrep_applier_priority;

enum enum_wsrep_reject_types {
WSREP_REJECT_NONE, /* nothing rejected */
Expand Down
250 changes: 250 additions & 0 deletions sql/wsrep_var.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "wsrep_mysqld.h"

ulong wsrep_reject_queries;
Thread_priority_manager *thread_priority_manager;

int wsrep_init_vars()
{
Expand All @@ -40,6 +41,7 @@ int wsrep_init_vars()
wsrep_node_name = my_strdup(PSI_INSTRUMENT_ME, "", MYF(MY_WME));
wsrep_node_address = my_strdup(PSI_INSTRUMENT_ME, "", MYF(MY_WME));
wsrep_node_incoming_address= my_strdup(PSI_INSTRUMENT_ME, WSREP_NODE_INCOMING_AUTO, MYF(MY_WME));
wsrep_applier_priority = my_strdup(PSI_INSTRUMENT_ME, "", MYF(MY_WME));
if (wsrep_gtid_mode)
wsrep_start_position = my_strdup(PSI_INSTRUMENT_ME, WSREP_START_POSITION_ZERO_GTID, MYF(MY_WME));
else
Expand Down Expand Up @@ -1214,3 +1216,251 @@ bool wsrep_forced_binlog_format_check(sys_var *self, THD* thd, set_var* var)

return false;
}

/*
Split a string into a list of substring separated by the given character.
*/
static std::vector<std::string> split(const std::string& s, char separator)
{
std::vector<std::string> result;

size_t pos, prev_pos = 0;

while ((pos = s.find_first_of(separator, prev_pos)) != std::string::npos)
{
result.push_back(s.substr(prev_pos, pos - prev_pos));
prev_pos = pos + 1;
}

if (s.length() > prev_pos)
{
result.push_back(s.substr(prev_pos, s.length() - prev_pos));
}

return result;
}

// Names of the scheduling policies
static std::string const SCHED_OTHER_STR ("other");
static std::string const SCHED_FIFO_STR ("fifo");
static std::string const SCHED_RR_STR ("rr");
static std::string const SCHED_UNKNOWN_STR("unknown");


static inline bool parse_thread_schedparam(const std::string& string,
int& policy,
int& priority)
{
std::vector<std::string> list(split(string, ':'));
char *end_ptr;

if (list.size() != 2)
{
my_message(ER_WRONG_ARGUMENTS, "wsrep_applier_thread_priority="
"POLICY:PRIORITY can't be set: "
"invalid scheduling policy", MYF(0));
return true;
}

if (list[0] == SCHED_OTHER_STR) {
policy = SCHED_OTHER;
} else if (list[0] == SCHED_FIFO_STR) {
policy = SCHED_FIFO;
} else if (list[0] == SCHED_RR_STR) {
policy = SCHED_RR;
} else {
my_message(ER_WRONG_ARGUMENTS, "wsrep_applier_thread_priority="
"POLICY:PRIORITY can't be set: "
"invalid scheduling policy", MYF(0));
return (true);
}

priority = (int)std::strtol(list[1].c_str(), &end_ptr, 10);
if (!(list[1].c_str()[0] != 0 && end_ptr[0] == 0)) {
my_message(ER_WRONG_ARGUMENTS, "wsrep_applier_thread_priority="
"POLICY:PRIORITY can't be set: "
"invalid scheduling policy", MYF(0));
return (true);
}

return (false);
}


Thread_sched_param Thread_sched_param::system_default(std::string("other:0"));


Thread_sched_param::Thread_sched_param(const std::string& param)
:
policy_(0),
priority_(0)
{
if (param.empty()) {
*this = system_default;
} else {
parse_thread_schedparam(param, policy_, priority_);
}
}


bool Thread_sched_param::set(const std::string& param)
{
if (param.empty()) {
*this = system_default;
} else {
int policy, priority;

if (false == parse_thread_schedparam(param, policy, priority)) {
policy_ = policy;
priority_ = priority;
} else {
// failure
return (true);
}
}
// success
return (false);
}


bool wsrep_applier_priority_check(sys_var *self, THD* thd, set_var* var)
{
std::stringstream ss;

ss << "wsrep_applier_priority_check: string " << var->save_result.string_value.str;
// WSREP_INFO("DEBUG %s(%d): %s", __FUNCTION__, __LINE__, ss.str().c_str());

if ((! var->save_result.string_value.str) ||
(var->save_result.string_value.length > (FN_REFLEN -1))) // safety
{
my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str,
var->save_result.string_value.str ?
var->save_result.string_value.str : "NULL");
// WSREP_INFO("DEBUG (%d): %s", __LINE__, ss.str().c_str());
return 1;
}


bool result = thread_priority_manager->update_priorities(var->save_result.string_value.str);
// WSREP_INFO("DEBUG %s(%d): %s, return %d", __FUNCTION__, __LINE__, ss.str().c_str(), result);

return (result);
}


bool wsrep_applier_priority_update(sys_var *self, THD* thd, enum_var_type type)
{
// WSREP_INFO("DEBUG %s(%d): return %d", __FUNCTION__, __LINE__, false);
return (false);
}

#ifdef HAVE_PSI_INTERFACE
PSI_mutex_key key_LOCK_lock_thread_priority_manager;
#endif

Thread_priority_manager::Thread_priority_manager() {
mysql_mutex_init(key_LOCK_lock_thread_priority_manager,
&LOCK_thread_priority_manager, MY_MUTEX_INIT_FAST);
};

Thread_priority_manager::~Thread_priority_manager() {
mysql_mutex_destroy(&LOCK_thread_priority_manager);
};

void Thread_priority_manager::add(pthread_t thread)
{
mysql_mutex_lock(&LOCK_thread_priority_manager);

int rcode = set_priority(thread);
if (rcode != 0) {
/* failure */
mysql_mutex_unlock(&LOCK_thread_priority_manager);
}
m_threads.insert(thread);

mysql_mutex_unlock(&LOCK_thread_priority_manager);
}


void Thread_priority_manager::remove(pthread_t thread)
{
mysql_mutex_lock(&LOCK_thread_priority_manager);

m_threads.erase(thread);

mysql_mutex_unlock(&LOCK_thread_priority_manager);
}


bool Thread_priority_manager::update_priorities(const char *priority_string)
{
mysql_mutex_lock(&LOCK_thread_priority_manager);

if (m_sched_param.set(std::string(priority_string))) {
/* failure */
mysql_mutex_unlock(&LOCK_thread_priority_manager);
// WSREP_INFO("DEBUG %s(%d): return %d", __FUNCTION__, __LINE__, true);
return (true);
}

for (auto it = m_threads.begin(); it != m_threads.end(); it++) {
set_priority(*it);
}

mysql_mutex_unlock(&LOCK_thread_priority_manager);

/* success */
// WSREP_INFO("DEBUG %s(%d): return %d", __FUNCTION__, __LINE__, false);
return (false);
}


static bool schedparam_not_supported(false);

int Thread_priority_manager::set_priority(pthread_t thread)
{
std::stringstream ss;

if (schedparam_not_supported) return -ENOSYS;
#if defined(__sun__)
struct sched_param spstr = { m_sched_param.priority(), { 0, } /* sched_pad array */};
#else
struct sched_param spstr = { m_sched_param.priority() };
#endif
int err;
if ((err = pthread_setschedparam(thread, m_sched_param.policy(), &spstr)) != 0)
{
if (err == ENOSYS)
{
my_message(ER_WRONG_ARGUMENTS,
"Function pthread_setschedparam() is not implemented "
"in this system. Future attempts to change scheduling "
"priority will be no-op",
MYF(0));

schedparam_not_supported = true;
} else if (err == EINVAL) {
ss << "Failed to set 'wsrep_applier_priority': invalid arguments: "
<< "policy " << m_sched_param.policy() << ", priority " << spstr.sched_priority;
my_message(ER_WRONG_ARGUMENTS, ss.str().c_str(), MYF(0));
} else if (err == EPERM) {
ss << "Failed to set 'wsrep_applier_priority': insufficient permissions: "
<< "policy " << m_sched_param.policy() << ", priority " << spstr.sched_priority;
my_message(ER_WRONG_ARGUMENTS, ss.str().c_str(), MYF(0));
} else if (err == ESRCH) {
/* invalid thread id */
}

WSREP_INFO("DEBUG %s(%d): policy %d, priority %d, return %d", __FUNCTION__, __LINE__,
m_sched_param.policy(), spstr.sched_priority, -err);

/* failure */
return (-err);
}

WSREP_INFO("DEBUG %s(%d): policy %d, priority %d, return %d", __FUNCTION__, __LINE__,
m_sched_param.policy(), spstr.sched_priority, 0);

/* success */
return (0);
}
Loading