diff --git a/plugin/thread_pool/CMakeLists.txt b/plugin/thread_pool/CMakeLists.txt index 35cbdff51401d4000f09c5e0ba59261cc4f7c33d..94f3caa0f9bb299feebf39475e202093ae3d10e6 100644 --- a/plugin/thread_pool/CMakeLists.txt +++ b/plugin/thread_pool/CMakeLists.txt @@ -20,6 +20,7 @@ ADD_COMPILE_DEFINITIONS( MYSQL_ADD_PLUGIN(thread_pool threadpool_common.cc threadpool_unix.cc + numa_affinity_manager.cc MODULE_ONLY MODULE_OUTPUT_NAME "thread_pool" ) diff --git a/plugin/thread_pool/numa_affinity_manager.cc b/plugin/thread_pool/numa_affinity_manager.cc new file mode 100644 index 0000000000000000000000000000000000000000..500d89b9bcfb699c94d2e48209d7a208747976c5 --- /dev/null +++ b/plugin/thread_pool/numa_affinity_manager.cc @@ -0,0 +1,380 @@ +/* Copyright (C) 2012 Monty Program Ab + Copyright (C) 2023 Huawei Technologies Co., Ltd + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "numa_affinity_manager.h" +#include "sql/log.h" +#include +#include +#include +#include +#include +#include + +using namespace std; + +struct bitmask *numa_bitmask = NULL; + +static struct bitmask *numa_bitmask_mem = NULL; +static int max_conf_nodes = -1; +static int node_possible = 0; +static int cpu_possible = 0; + +#define BIT_LENGTH_LONG (8 * sizeof(unsigned long)) +#define BIT_LENGTH_LONG_N(n) (((n)+((BIT_LENGTH_LONG)-1))/(BIT_LENGTH_LONG)) + +long long numa_mem_size(int node, long long *fp); +int numa_sched_getaffinity(pid_t pid, struct bitmask *mask); +void numa_node_bm_free(struct bitmask *bmp); +struct bitmask *numa_bm_alloc(void); + +void free_numa_bm(struct bitmask *bm) { + if (bm) { + numa_node_bm_free(bm); + bm = NULL; + } +} + +void __attribute__((destructor)) numa_end(void) +{ + numa_node_bm_free(numa_bitmask_mem); + numa_node_bm_free(numa_bitmask); +} + +static unsigned int get_bmp_bit(const struct bitmask *bmp, unsigned int n) +{ + if (n < bmp->size) + return (bmp->mask_cpu[n/BIT_LENGTH_LONG] >> (n % BIT_LENGTH_LONG)) & 1; + return 0; +} + +static void set_bmp_bit(struct bitmask *bmp, unsigned int n, unsigned int v) +{ + if (n < bmp->size) { + if (v) + bmp->mask_cpu[n/BIT_LENGTH_LONG] |= 1UL << (n % BIT_LENGTH_LONG); + else + bmp->mask_cpu[n/BIT_LENGTH_LONG] &= ~(1UL << (n % BIT_LENGTH_LONG)); + } +} + +struct bitmask *bitmask_alloc(unsigned int n) +{ + if (n < 1) { + errno = EINVAL; + sql_print_error("request to allocate mask for invalid number"); + return nullptr; + } + + struct bitmask *bmp = (struct bitmask *)malloc(sizeof(*bmp)); + if (bmp == 0) { + sql_print_error("Out of memory allocating bitmask"); + return nullptr; + } + bmp->size = n; + bmp->mask_cpu = (long unsigned int *)calloc(BIT_LENGTH_LONG_N(n), sizeof(unsigned long)); + if (bmp->mask_cpu == 0) { + free(bmp); + sql_print_error("Out of memory allocating bitmask"); + return nullptr; + } + return bmp; +} + +void numa_node_bm_free(struct bitmask *bmp) { + if (bmp != nullptr) { + free(bmp->mask_cpu); + bmp->mask_cpu = (unsigned long *)0xdeadcdef; + free(bmp); + } + return; +} + +static void set_configured_nodes(void) +{ + struct dirent *de; + long long fp; + + numa_bitmask_mem = numa_bm_alloc(); + numa_bitmask = numa_bm_alloc(); + + DIR *d = opendir("/sys/devices/system/node"); + if (d == nullptr) { + max_conf_nodes = 0; + } else { + while ((de = readdir(d)) != NULL) { + if (strncmp(de->d_name, "node", 4)) { + continue; + } + int nd = strtoul(de->d_name+4, NULL, 0); + set_bmp_bit(numa_bitmask, nd, 1); + if (numa_mem_size(nd, &fp) > 0) { + set_bmp_bit(numa_bitmask_mem, nd, 1); + } + if (max_conf_nodes < nd) { + max_conf_nodes = nd; + } + } + closedir(d); + } +} + +static inline int is_digit(char c) +{ + return (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F') || (c >= '0' && c <= '9'); +} + +static const char *file_mask_size = "/proc/self/status"; +static const char *node_mask_comment = "Mems_allowed:\t"; + +long __attribute__((weak)) get_mempolicy(int *policy, unsigned long *nmask, + unsigned long maxnode, void *addr, unsigned flags) +{ + return syscall(__NR_get_mempolicy, policy, nmask, maxnode, addr, flags); +} + +static void set_node_possible(void) +{ + int len = 0; + size_t buf_size = 0; + FILE *fp; + char *buf = NULL; + char *tmp_buf = NULL; + + if ((fp = fopen(file_mask_size, "r")) == NULL) { + if (node_possible == 0) { + int policy; + unsigned long *mask = NULL; + node_possible = 16; + do { + node_possible <<= 1; + mask = (long unsigned int *)realloc(mask, node_possible / 8); + if (mask == nullptr) { + return; + } + } while (get_mempolicy(&policy, mask, node_possible + 1, 0, 0) < 0 && + errno == EINVAL && node_possible < 4096*8); + free(mask); + } + return; + } + + while (getline(&buf, &buf_size, fp) > 0) { + if (strncmp(buf, node_mask_comment, strlen(node_mask_comment)) == 0) { + tmp_buf = buf; + tmp_buf += strlen(node_mask_comment); + while (*tmp_buf != '\n' && *tmp_buf != '\0') { + if (is_digit(*tmp_buf)) { + len++; + } + tmp_buf++; + } + node_possible = len * 4; + } + } + free(buf); + fclose(fp); + return; +} + +struct bitmask *numa_bm_alloc(void) +{ + return bitmask_alloc(node_possible); +} + +long long numa_mem_size(int node, long long *fp) +{ + size_t length = 0; + char *line = nullptr; + long long size = -1; + char fn[64]; + int state = 0; + int req = fp ? 2 : 1; + + if (fp) { + *fp = -1; + } + sprintf(fn,"/sys/devices/system/node/node%d/meminfo", node); + FILE *f = fopen(fn, "r"); + if (f == nullptr) + return -1; + + while (getdelim(&line, &length, '\n', f) > 0) { + char *end; + char *str = strcasestr(line, "kB"); + if (str == nullptr) { + continue; + } + --str; + while (str > line && isspace(*str)) { + --str; + } + while (str > line && isdigit(*str)) { + --str; + } + + if (strstr(line, "MemTotal")) { + size = strtoull(str, &end, 0) << 10; + if (end == str) { + size = -1; + } else { + state++; + } + } + if (fp && strstr(line, "MemFree")) { + *fp = strtoull(str, &end, 0) << 10; + if (end == str) { + *fp = -1; + } else { + state++; + } + } + } + fclose(f); + free(line); + if (state != req) { + sql_print_warning("Parse sysfs mem info failed, (%d)", state); + } + + return size; +} + +static void set_cpu_possible(void) +{ + struct bitmask *buf; + int olderr = errno; + int length = 4096; + int num; + + do { + buf = bitmask_alloc(length); + num = numa_sched_getaffinity(0, buf); + if (num < 0) { + if (errno == EINVAL) { + if (length >= 1024 * 1024) { + break; + } + length *= 2; + numa_node_bm_free(buf); + continue; + } else { + sql_print_warning("Determine max cpu failed (sched_getaffinity: %s)", + strerror(errno)); + num = sizeof(cpu_set_t); + break; + } + } + } while (num < 0); + numa_node_bm_free(buf); + errno = olderr; + cpu_possible = num * 8; +} + +int numa_num_configured_nodes(void) +{ + int count_node_mem=0; + + for (int i=0; i <= max_conf_nodes; i++) { + if (get_bmp_bit(numa_bitmask_mem, i)) + count_node_mem++; + } + return count_node_mem; +} + +struct bitmask *numa_allocate_cpumask() +{ + return bitmask_alloc(cpu_possible); +} + +int numa_sched_setaffinity(pid_t pid, struct bitmask *bmp) +{ + return syscall(__NR_sched_setaffinity, pid, BIT_LENGTH_LONG_N(bmp->size) * sizeof(unsigned long), bmp->mask_cpu); +} + +int numa_sched_getaffinity(pid_t pid, struct bitmask *bmp) +{ + return syscall(__NR_sched_getaffinity, pid, BIT_LENGTH_LONG_N(bmp->size) * sizeof(unsigned long), bmp->mask_cpu); +} + +bool numa_affinity_manager::init() { + initok = false; + + if (get_sys_cpu() != get_sys_cpu_only()) { + return false; + } + + set_node_possible(); + set_configured_nodes(); + set_cpu_possible(); + + cpu_count = get_sys_cpu(); + numa_count = get_sys_numa(); + if (cpu_count <= 0 || numa_count <= 0 || cpu_count % numa_count != 0) { + return false; + } + + int cpu_per_numa = cpu_count / numa_count; + int start = 0; + numa_cpu_map.clear(); + auto del_cpu_mask = [](bitmask *p) { + if (p != nullptr) { + numa_node_bm_free(p); + } + }; + for (int i = 0; i < numa_count; i++) { + auto msk = numa_allocate_cpumask(); + if (msk == nullptr) { + return false; + } + + for (int j = 0; j < cpu_per_numa; j++) { + set_bmp_bit(msk, start + j, 1); + } + numa_cpu_map.emplace_back(msk, del_cpu_mask); + start += cpu_per_numa; + } + initok = true; + return true; +} + +bool numa_affinity_manager::bind_numa(int group_id) { + if (initok) { + pid_t pid = gettid(); + return (numa_sched_setaffinity( + pid, numa_cpu_map[group_id%numa_cpu_map.size()].get()) == 0); + } + + return false; +} + +void numa_affinity_manager::print_cpumask(const string &name, bitmask *msk) { + cout << name << ": "; + for (unsigned int i = 0; i < msk->size; i++) { + if (get_bmp_bit(msk, i)) { + cout << i << " "; + } + } + cout << endl; +} +void numa_affinity_manager::dump() { + cout << "initok: " << initok << endl; + cout << "cpu_count: " << cpu_count << endl; + cout << "numa_count: " << numa_count << endl; + + for (unsigned int i = 0; i < numa_cpu_map.size(); i++) { + string name = "numa_cpu_map[" + to_string(i) + "]"; + print_cpumask(name, numa_cpu_map[i].get()); + } +} \ No newline at end of file diff --git a/plugin/thread_pool/numa_affinity_manager.h b/plugin/thread_pool/numa_affinity_manager.h index 3471d3287369b6f8900544c9a30485570101c915..690c0ec94707d499c1d0543e606cf91ac7fae087 100644 --- a/plugin/thread_pool/numa_affinity_manager.h +++ b/plugin/thread_pool/numa_affinity_manager.h @@ -20,62 +20,37 @@ #include #include #include -#include +#include +#include "sys/syscall.h" using namespace std; +extern struct bitmask *numa_bitmask; + +void __attribute__((destructor)) numa_end(void); +int numa_num_configured_nodes(void); + +struct bitmask { + unsigned long size; + unsigned long *mask_cpu; +}; + class numa_affinity_manager { public: numa_affinity_manager(){}; virtual ~numa_affinity_manager(){}; - bool init() { - initok = false; - cpu_count = get_sys_cpu(); - numa_count = get_sys_numa(); - if (cpu_count <= 0 || numa_count <= 0 || - cpu_count % numa_count != 0) { - return false; - } - - int cpu_per_numa = cpu_count / numa_count; - int start = 0; - numa_cpu_map.clear(); - auto delete_cpumask = [](bitmask *ptr) { - if (ptr != nullptr) { - numa_free_cpumask(ptr); - } - }; - for (int i = 0; i < numa_count; i++) { - auto msk = numa_allocate_cpumask(); - if (msk == nullptr) { - return false; - } - - for (int j = 0; j < cpu_per_numa; j++) { - numa_bitmask_setbit(msk, start + j); - } - numa_cpu_map.emplace_back(msk, delete_cpumask); - start += cpu_per_numa; - } - initok = true; - return true; - } - - bool bind_numa(int group_id) { - if (initok) { - pid_t pid = gettid(); - return (numa_sched_setaffinity( - pid, numa_cpu_map[group_id%numa_cpu_map.size()].get()) == 0); - } - - return false; - } + bool init(); + bool bind_numa(int group_id); protected: int get_sys_cpu() { - return numa_num_configured_cpus(); + return sysconf(_SC_NPROCESSORS_CONF); + } + + int get_sys_cpu_only() { + return sysconf(_SC_NPROCESSORS_ONLN); } int get_sys_numa() { @@ -87,30 +62,14 @@ protected: } public: - void print_cpumask(const string &name, bitmask *msk) { - cout << name << ": "; - for (unsigned int i = 0; i < msk->size; i++) { - if (numa_bitmask_isbitset(msk, i)) { - cout << i << " "; - } - } - cout << endl; - } - void dump() { - cout << "initok: " << initok << endl; - cout << "cpu_count: " << cpu_count << endl; - cout << "numa_count: " << numa_count << endl; - - for (unsigned int i = 0; i < numa_cpu_map.size(); i++) { - string name = "numa_cpu_map[" + to_string(i) + "]"; - print_cpumask(name, numa_cpu_map[i].get()); - } - } + void print_cpumask(const string &name, bitmask *msk) ; + void dump(); private: bool initok{false}; int cpu_count{0}; int numa_count{0}; + vector> numa_cpu_map; }; diff --git a/plugin/thread_pool/threadpool_common.cc b/plugin/thread_pool/threadpool_common.cc index b8c421ce8afce9ef4d32968a7f470c8b2d34f833..c072a208f0f669680ce3cbb93d4f3c1ec2beaec1 100644 --- a/plugin/thread_pool/threadpool_common.cc +++ b/plugin/thread_pool/threadpool_common.cc @@ -296,6 +296,13 @@ static void fix_threadpool_stall_limit(THD*, struct SYS_VAR *, void*, const void tp_set_threadpool_stall_limit(threadpool_stall_limit); } +static void fix_threadpool_connection_balance(THD*, struct SYS_VAR *, void*, const void* value) +{ + change_group_rwlock.xlock(); + threadpool_connection_balance = *static_cast(value); + change_group_rwlock.unlock(); +} + static inline int my_getncpus() noexcept { #ifdef _SC_NPROCESSORS_ONLN return sysconf(_SC_NPROCESSORS_ONLN); @@ -354,7 +361,7 @@ static MYSQL_SYSVAR_BOOL(connection_balance, threadpool_connection_balance, PLUGIN_VAR_RQCMDARG, "Control whether thread group migrating connections" "so that they are evenly distributed.", nullptr, -nullptr, false); +fix_threadpool_connection_balance, false); static int threadpool_plugin_init(void *) { @@ -406,6 +413,7 @@ static uint &stall_limit = threadpool_stall_limit; static uint &max_threads = threadpool_max_threads; static uint &oversubscribe = threadpool_oversubscribe; static uint &toobusy = threadpool_toobusy; +static bool &connection_balance = threadpool_connection_balance; SYS_VAR *system_variables[] = { MYSQL_SYSVAR(idle_timeout), diff --git a/plugin/thread_pool/threadpool_unix.cc b/plugin/thread_pool/threadpool_unix.cc index 5d81f7c78a99790a6bb2254d2bcc2642df977890..6efbdedd5e8fe53f4bcf5d64ed3679a5f94c2c00 100644 --- a/plugin/thread_pool/threadpool_unix.cc +++ b/plugin/thread_pool/threadpool_unix.cc @@ -14,6 +14,7 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "threadpool_unix.h" +#include "numa_affinity_manager.h" #include "sql/debug_sync.h" #include "sql/log.h" #include "sql/protocol_classic.h" @@ -378,6 +379,10 @@ int change_group(connection_t *c, thread_group_t *group, thread_group_t *to_grou if (!to_group->thread_count) ret = create_worker(to_group, false); mysql_mutex_unlock(&to_group->mutex); + if (threadpool_sched_affinity) { + group_affinity.bind_numa(to_group - all_groups); + } + return ret; } @@ -403,8 +408,7 @@ thread_group_t *get_change_group_to(connection_t *connection) { if (group == &all_groups[i]) { continue; } - if (all_groups[i].connection_count < avg_conn_cnt || - (connection->thread_group - all_groups >= group_count && i == group_count - 1)) { + if (all_groups[i].connection_count < avg_conn_cnt || i == group_count - 1) { return &all_groups[i]; } } @@ -421,10 +425,12 @@ int get_min_conn_cnt() { } bool check_change_group_low(connection_t *connection) { - return connection->thread_group - all_groups >= group_count || - (threadpool_connection_balance && - (connection->thread_group->connection_count > get_avg_conn_cnt() || - connection->thread_group->connection_count - get_min_conn_cnt() >= 2)); + return (!threadpool_connection_balance && + connection->thread_group - all_groups != connection->thd->thread_id() % group_count) || + (threadpool_connection_balance && + ((connection->thread_group->connection_count > get_avg_conn_cnt() || + connection->thread_group->connection_count - get_min_conn_cnt() >= 2) || + (connection->thread_group - all_groups >= group_count))); } int change_group(connection_t *connection) { @@ -528,7 +534,7 @@ class Thd_timeout_checker : public Do_THD_Impl { virtual ~Thd_timeout_checker() {} - virtual void operator()(THD *thd) noexcept { + virtual void operator()(THD *thd) noexcept override{ if (thd_get_net_read_write(thd) != 1) return; connection_t *connection = (connection_t *)thd->scheduler.data; @@ -1656,7 +1662,7 @@ static void *worker_main(void *param) { assert(thread_group != nullptr); if (threadpool_sched_affinity) { - group_affinity.bind_numa((thread_group - all_groups)); + group_affinity.bind_numa(thread_group - all_groups); } /* Init per-thread structure */ @@ -1749,6 +1755,8 @@ void tp_end() { DBUG_ENTER("tp_end"); threadpool_thds.killConns(); + numa_end(); + std::thread exit_tp(tp_end_thread); exit_tp.detach(); DBUG_VOID_RETURN; @@ -1772,11 +1780,15 @@ void tp_set_threadpool_size(uint size) noexcept { } mysql_mutex_unlock(&all_groups[i].mutex); if (!success) { + change_group_rwlock.xlock(); group_count = i; + change_group_rwlock.unlock(); return; } } + change_group_rwlock.xlock(); group_count = size; + change_group_rwlock.unlock(); } void tp_set_threadpool_stall_limit(uint limit) noexcept {