Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ggml : synchronize threads using barriers #7993

Merged
merged 5 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .github/workflows/server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,22 @@ jobs:
exit 1
fi

- name: Build (no OpenMP)
id: cmake_build_no_openmp
if: ${{ matrix.sanitizer == 'THREAD' }}
run: |
cmake -B build \
-DLLAMA_NATIVE=OFF \
-DLLAMA_BUILD_SERVER=ON \
-DLLAMA_CURL=ON \
-DCMAKE_BUILD_TYPE=${{ matrix.build_type }} \
-DLLAMA_SANITIZE_${{ matrix.sanitizer }}=ON \
-DLLAMA_OPENMP=OFF ;
cmake --build build --config ${{ matrix.build_type }} -j $(nproc) --target llama-server

- name: Build
id: cmake_build
if: ${{ matrix.sanitizer != 'THREAD' }}
run: |
cmake -B build \
-DLLAMA_NATIVE=OFF \
Expand Down
217 changes: 67 additions & 150 deletions ggml.c
Original file line number Diff line number Diff line change
Expand Up @@ -1753,9 +1753,8 @@ struct ggml_compute_state_shared {
int n_threads;

// synchronization primitives
atomic_int n_active; // num active threads
atomic_int node_n; // active graph node
atomic_int node_task; // active graph node task phase
atomic_int n_barrier;
atomic_int n_barrier_passed;

ggml_abort_callback abort_callback; // abort ggml_graph_compute when true
void* abort_callback_data;
Expand Down Expand Up @@ -18972,184 +18971,104 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads, int n_cur_
return n_tasks;
}

static void ggml_graph_compute_thread_sync_node(int * node_n, struct ggml_compute_state * state, const bool do_yield) {
// wait for other threads to finish
const int last_node_n = * node_n;

while (true) {
if (do_yield) {
sched_yield();
}

*node_n = atomic_load(&state->shared->node_n);
if (*node_n != last_node_n) {
break;
}

#if defined(__SSE3__)
// Tell the processor we're spinning. It's a processor hint for spinlocks.
_mm_pause();
#endif
#ifdef GGML_USE_OPENMP
static void ggml_barrier(struct ggml_compute_state * state) {
if (state->shared->n_threads == 1) {
return;
}

#pragma omp barrier
}
#else
static void ggml_barrier(struct ggml_compute_state * state) {
if (state->shared->n_threads == 1) {
return;
}

static void ggml_graph_compute_thread_sync_task(int * task_phase, struct ggml_compute_state * state, const bool do_yield) {
// wait for other threads to finish
const int last_task_phase = *task_phase;
atomic_int * n_barrier = &state->shared->n_barrier;
atomic_int * n_barrier_passed = &state->shared->n_barrier_passed;

while (true) {
if (do_yield) {
sched_yield();
}
int n_threads = state->shared->n_threads;
int passed_old = atomic_load(n_barrier_passed);

*task_phase = atomic_load(&state->shared->node_task);
if (*task_phase != last_task_phase) {
break;
if (atomic_fetch_add(n_barrier, 1) == n_threads - 1) {
// last thread
atomic_store(n_barrier, 0);
atomic_fetch_add(n_barrier_passed, 1);
} else {
// wait for other threads
//while (atomic_load(n_barrier_passed) == passed_old) {
//}
const int n_spin_before_sleep = 100000;
while (true) {
for (int i = 0; i < n_spin_before_sleep; i++) {
if (atomic_load(n_barrier_passed) != passed_old) {
return;
}
#if defined(__SSE3__)
_mm_pause();
#endif
}
sched_yield();
}

#if defined(__SSE3__)
// Tell the processor we're spinning. It's a processor hint for spinlocks.
_mm_pause();
#endif
}
}
#endif

static thread_ret_t ggml_graph_compute_thread(void * data) {
struct ggml_compute_state * state = (struct ggml_compute_state *) data;

const struct ggml_cgraph * cgraph = state->shared->cgraph;
const struct ggml_cplan * cplan = state->shared->cplan;

const int n_threads = state->shared->n_threads;
const int ith = state->ith;
const int n_threads = state->shared->n_threads;

set_numa_thread_affinity(state->ith);
set_numa_thread_affinity(ith);

int node_n = -1;
int task_phase = GGML_TASK_TYPE_FINALIZE;
struct ggml_compute_params params = {
/*.type =*/ GGML_TASK_TYPE_INIT,
/*.ith =*/ ith,
/*.nth =*/ state->shared->n_threads,
/*.wsize =*/ cplan->work_size,
/*.wdata =*/ cplan->work_data,
};

while (true) {
for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) {
if (cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
state->shared->node_n += 1;
state->ec = GGML_STATUS_ABORTED;
return 0;
}

if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) {
// all other threads are finished and spinning
// do finalize and init here so we don't have synchronize again
struct ggml_compute_params params = {
/*.type =*/ GGML_TASK_TYPE_FINALIZE,
/*.ith =*/ 0,
/*.nth =*/ 0,
/*.wsize =*/ cplan->work_size,
/*.wdata =*/ cplan->work_data,
};

if (node_n != -1) {
/* FINALIZE */
struct ggml_tensor * node = cgraph->nodes[node_n];
if (GGML_OP_HAS_FINALIZE[node->op]) {
params.nth = ggml_get_n_tasks(node, n_threads, state->shared->n_threads);
ggml_compute_forward(&params, node, state);
}
ggml_graph_compute_perf_stats_node(node, state->shared);
}

// distribute new work or execute it direct if 1T
while (++node_n < cgraph->n_nodes) {
GGML_PRINT_DEBUG_5("%s: %d/%d\n", __func__, node_n, cgraph->n_nodes);
struct ggml_tensor * node = cgraph->nodes[node_n];
const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads);

state->shared->perf_node_start_cycles = ggml_perf_cycles();
state->shared->perf_node_start_time_us = ggml_perf_time_us();

params.nth = n_tasks;

if (n_tasks == 1) {
/* INIT */
if (GGML_OP_HAS_INIT[node->op]) {
params.type = GGML_TASK_TYPE_INIT;
ggml_compute_forward(&params, node, state);
}

// TODO: maybe push node_n to the atomic but if other threads see n_tasks is 1,
// they do something more efficient than spinning (?)
params.type = GGML_TASK_TYPE_COMPUTE;
ggml_compute_forward(&params, node, state);

if (GGML_OP_HAS_FINALIZE[node->op]) {
params.type = GGML_TASK_TYPE_FINALIZE;
ggml_compute_forward(&params, node, state);
}

ggml_graph_compute_perf_stats_node(node, state->shared);
} else {
break;
}

if (cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
break;
}
}

task_phase = GGML_TASK_TYPE_INIT;
atomic_store(&state->shared->n_active, n_threads);
atomic_store(&state->shared->node_n, node_n);
atomic_store(&state->shared->node_task, task_phase);
} else {
ggml_graph_compute_thread_sync_node(&node_n, state, false);
ggml_graph_compute_thread_sync_task(&task_phase, state, false);
}

// check if we should stop
if (node_n >= cgraph->n_nodes) break;

/* INIT & COMPUTE */
struct ggml_tensor * node = cgraph->nodes[node_n];
const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads);

struct ggml_compute_params params = {
/*.type =*/ GGML_TASK_TYPE_INIT,
/*.ith =*/ state->ith,
/*.nth =*/ n_tasks,
/*.wsize =*/ cplan->work_size,
/*.wdata =*/ cplan->work_data,
};
params.nth = n_tasks;

if (state->ith < n_tasks) {
if (GGML_OP_HAS_INIT[node->op]) {
/* INIT */
if (GGML_OP_HAS_INIT[node->op]) {
if (ith < n_tasks) {
params.type = GGML_TASK_TYPE_INIT;
ggml_compute_forward(&params, node, state);
}
ggml_barrier(state);
}

if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) {
task_phase = GGML_TASK_TYPE_COMPUTE;
atomic_store(&state->shared->n_active, n_threads);
atomic_store(&state->shared->node_task, task_phase);
}
else {
// TODO: this sched_yield can have significant impact on the performance - either positive or negative
// depending on the workload and the operating system.
// since it is not clear what is the best approach, it should potentially become user-configurable
// ref: https://github.com/ggerganov/ggml/issues/291
// UPD: adding the do_yield flag seems to resolve the issue universally
const bool do_yield = node_n < 0 || cgraph->nodes[node_n]->op == GGML_OP_MUL_MAT;
ggml_graph_compute_thread_sync_task(&task_phase, state, do_yield);
}

if (state->ith < n_tasks) {
/* COMPUTE */
if (ith < n_tasks) {
params.type = GGML_TASK_TYPE_COMPUTE;
ggml_compute_forward(&params, node, state);
}

if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) {
task_phase = GGML_TASK_TYPE_FINALIZE;
atomic_store(&state->shared->n_active, n_threads);
atomic_store(&state->shared->node_task, task_phase);
}
else {
ggml_graph_compute_thread_sync_task(&task_phase, state, false);
ggml_barrier(state);

/* FINALIZE */
if (GGML_OP_HAS_FINALIZE[node->op]) {
if (params.ith == 0) {
params.type = GGML_TASK_TYPE_FINALIZE;
ggml_compute_forward(&params, node, state);
}
ggml_barrier(state);
}
}

Expand Down Expand Up @@ -19336,7 +19255,6 @@ static enum ggml_status ggml_graph_compute_parallel(struct ggml_compute_state *
// update the number of threads from the actual number of threads that we got from OpenMP
n_threads = omp_get_num_threads();
workers[0].shared->n_threads = n_threads;
workers[0].shared->n_active = n_threads;
}
ggml_graph_compute_thread(&workers[omp_get_thread_num()]);
}
Expand Down Expand Up @@ -19399,9 +19317,8 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
/*.perf_node_start_cycles =*/ 0,
/*.perf_node_start_time_us =*/ 0,
/*.n_threads =*/ n_threads,
/*.n_active =*/ n_threads,
/*.node_n =*/ -1,
/*.node_task =*/ GGML_TASK_TYPE_FINALIZE,
/*.n_barrier =*/ 0,
/*.n_barrier_passed =*/ 0,
/*.abort_callback =*/ NULL,
/*.abort_callback_data =*/ NULL,
/*.current_chunk; =*/ 0,
Expand Down
Loading