Skip to content

Commit c09526e

Browse files
cpu: combine n_graph and n_threads into a single atomic update
1 parent 9d24cb9 commit c09526e

File tree

2 files changed

+35
-40
lines changed

2 files changed

+35
-40
lines changed

ggml/src/ggml-cpu/ggml-cpu.c

Lines changed: 34 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,9 @@ typedef void * thread_ret_t;
187187

188188
typedef pthread_t ggml_thread_t;
189189

190+
#define GGML_THREADPOOL_N_THREADS_MASK (0xffffU)
191+
#define GGML_THREADPOOL_N_THREADS_BITS (16)
192+
190193
#if defined(__APPLE__)
191194
#include <unistd.h>
192195
#include <mach/mach.h>
@@ -449,20 +452,18 @@ struct ggml_threadpool {
449452
struct ggml_cplan * cplan;
450453

451454
// synchronization primitives
452-
atomic_int n_graph; // incremented when there is work to be done (i.e each graph)
455+
atomic_int n_graph; // updated when there is work to be done (i.e each graph) holds graph and active thread counts.
453456
atomic_int GGML_CACHE_ALIGN n_barrier;
454457
atomic_int GGML_CACHE_ALIGN n_barrier_passed;
455458
atomic_int GGML_CACHE_ALIGN current_chunk; // currently processing chunk during Mat_Mul, shared between all the threads.
456459

457460
// these are atomic as an annotation for thread-sanitizer
458461
atomic_bool stop; // Used for stopping the threadpool altogether
459462
atomic_bool pause; // Used for pausing the threadpool or individual threads
460-
atomic_int abort; // Used for aborting processing of a graph
463+
atomic_int abort; // Used for aborting processing of a graph
461464

462465
struct ggml_compute_state * workers; // per thread state
463-
int n_threads_max; // number of threads in the pool
464-
atomic_int n_threads_cur; // number of threads used in the current graph
465-
466+
int n_threads; // Number of threads in the pool
466467
int32_t prio; // Scheduling priority
467468
uint32_t poll; // Polling level (0 - no polling)
468469

@@ -530,7 +531,7 @@ struct ggml_state {
530531
static struct ggml_state g_state = {0};
531532

532533
void ggml_barrier(struct ggml_threadpool * tp) {
533-
int n_threads = atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed);
534+
int n_threads = atomic_load_explicit(&tp->n_graph, memory_order_relaxed) & GGML_THREADPOOL_N_THREADS_MASK;
534535
if (n_threads == 1) {
535536
return;
536537
}
@@ -547,7 +548,7 @@ void ggml_barrier(struct ggml_threadpool * tp) {
547548
// last thread
548549
atomic_store_explicit(&tp->n_barrier, 0, memory_order_relaxed);
549550

550-
// exit barrier (fill seq-cst fence)
551+
// exit barrier (full seq-cst fence)
551552
atomic_fetch_add_explicit(&tp->n_barrier_passed, 1, memory_order_seq_cst);
552553
return;
553554
}
@@ -2619,7 +2620,7 @@ static void ggml_thread_cpumask_next(const bool * global_mask, bool * local_mask
26192620
void ggml_threadpool_free(struct ggml_threadpool* threadpool) {
26202621
if (!threadpool) return;
26212622

2622-
const int n_threads = threadpool->n_threads_max;
2623+
const int n_threads = threadpool->n_threads;
26232624

26242625
#ifndef GGML_USE_OPENMP
26252626
struct ggml_compute_state* workers = threadpool->workers;
@@ -2695,7 +2696,7 @@ struct ggml_cplan ggml_graph_plan(
26952696
//GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %d\n", n_threads);
26962697
}
26972698
if (n_threads <= 0) {
2698-
n_threads = threadpool ? threadpool->n_threads_max : GGML_DEFAULT_N_THREADS;
2699+
n_threads = threadpool ? threadpool->n_threads : GGML_DEFAULT_N_THREADS;
26992700
}
27002701

27012702
size_t work_size = 0;
@@ -2898,12 +2899,14 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
28982899

28992900
struct ggml_compute_params params = {
29002901
/*.ith =*/ state->ith,
2901-
/*.nth =*/ atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed),
2902+
/*.nth =*/ atomic_load_explicit(&tp->n_graph, memory_order_relaxed) & GGML_THREADPOOL_N_THREADS_MASK,
29022903
/*.wsize =*/ cplan->work_size,
29032904
/*.wdata =*/ cplan->work_data,
29042905
/*.threadpool=*/ tp,
29052906
};
29062907

2908+
GGML_PRINT_DEBUG("thread #%d compute-start cplan %p last-graph %d \n", state->ith, cplan, state->last_graph);
2909+
29072910
for (int node_n = 0; node_n < cgraph->n_nodes && atomic_load_explicit(&tp->abort, memory_order_relaxed) != node_n; node_n++) {
29082911
struct ggml_tensor * node = cgraph->nodes[node_n];
29092912

@@ -2925,34 +2928,32 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
29252928
}
29262929
}
29272930

2931+
GGML_PRINT_DEBUG("thread #%d compute-done cplan %p last-graph %d \n", state->ith, cplan, state->last_graph);
2932+
29282933
ggml_barrier(state->threadpool);
29292934

29302935
return 0;
29312936
}
29322937

29332938
#ifndef GGML_USE_OPENMP
29342939

2935-
// check if thread is active
2936-
static inline bool ggml_graph_compute_thread_active(struct ggml_compute_state * state) {
2937-
struct ggml_threadpool * threadpool = state->threadpool;
2938-
int n_threads = atomic_load_explicit(&threadpool->n_threads_cur, memory_order_relaxed);
2939-
return (state->ith < n_threads);
2940-
}
2941-
29422940
// check if thread is ready to proceed (exit from polling or sleeping)
2941+
// returns true if loops should exit, sets state->pending to indicate new work
29432942
static inline bool ggml_graph_compute_thread_ready(struct ggml_compute_state * state) {
29442943
struct ggml_threadpool * threadpool = state->threadpool;
29452944

29462945
if (state->pending || threadpool->stop || threadpool->pause) { return true; }
29472946

29482947
// check for new graph/work
2949-
int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
2950-
if (new_graph != state->last_graph) {
2951-
state->pending = ggml_graph_compute_thread_active(state);
2952-
state->last_graph = new_graph;
2948+
int n_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
2949+
int n_threads = n_graph & GGML_THREADPOOL_N_THREADS_MASK;
2950+
if (n_graph != state->last_graph) {
2951+
state->pending = (state->ith < n_threads);
2952+
state->last_graph = n_graph;
2953+
return true;
29532954
}
29542955

2955-
return state->pending;
2956+
return false;
29562957
}
29572958

29582959
// sync thread state after polling
@@ -2969,11 +2970,6 @@ static inline void ggml_graph_compute_thread_sync(struct ggml_compute_state * st
29692970
static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
29702971
struct ggml_threadpool * threadpool = state->threadpool;
29712972

2972-
// Skip polling for unused threads
2973-
if (!ggml_graph_compute_thread_active(state)) {
2974-
return state->pending;
2975-
}
2976-
29772973
// This seems to make 0 ... 100 a decent range for polling level across modern processors.
29782974
// Perhaps, we can adjust it dynamically based on load and things.
29792975
const uint64_t n_rounds = 1024UL * 128 * threadpool->poll;
@@ -3035,7 +3031,6 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
30353031
ggml_graph_compute_check_for_work(state);
30363032
if (state->pending) {
30373033
state->pending = false;
3038-
30393034
ggml_graph_compute_thread(state);
30403035
}
30413036
}
@@ -3050,14 +3045,15 @@ static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool, int
30503045

30513046
ggml_mutex_lock(&threadpool->mutex);
30523047

3053-
GGML_PRINT_DEBUG("threadpool: n_threads_cur %d n_threads %d\n", threadpool->n_threads_cur, n_threads);
3048+
// Update the number of active threads and the graph count
3049+
int n_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed) >> GGML_THREADPOOL_N_THREADS_BITS;
3050+
n_graph = ((n_graph + 1) << GGML_THREADPOOL_N_THREADS_BITS) | (n_threads & GGML_THREADPOOL_N_THREADS_MASK);
30543051

3055-
// Update the number of active threads
3056-
atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
3052+
GGML_PRINT_DEBUG("compute-kickoff: n_threads %d n_graph %d\n", n_threads, n_graph);
30573053

30583054
// Indicate the graph is ready to be processed
30593055
// We need the full seq-cst fence here because of the polling threads (used in thread_sync)
3060-
atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_seq_cst);
3056+
atomic_store_explicit(&threadpool->n_graph, n_graph, memory_order_seq_cst);
30613057

30623058
if (threadpool->pause) {
30633059
// Update main thread prio and affinity to match the threadpool settings
@@ -3095,8 +3091,7 @@ static struct ggml_threadpool * ggml_threadpool_new_impl(
30953091
threadpool->pause = tpp->paused;
30963092
threadpool->abort = -1;
30973093
threadpool->workers = NULL;
3098-
threadpool->n_threads_max = tpp->n_threads;
3099-
threadpool->n_threads_cur = tpp->n_threads;
3094+
threadpool->n_threads = tpp->n_threads;
31003095
threadpool->poll = tpp->poll;
31013096
threadpool->prio = tpp->prio;
31023097
threadpool->ec = GGML_STATUS_SUCCESS;
@@ -3191,7 +3186,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
31913186
{
31923187
// update the number of threads from the actual number of threads that we got from OpenMP
31933188
n_threads = omp_get_num_threads();
3194-
atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
3189+
atomic_store_explicit(&threadpool->n_graph, n_threads, memory_order_relaxed);
31953190
}
31963191

31973192
// Apply thread CPU mask and priority
@@ -3204,13 +3199,13 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
32043199
ggml_graph_compute_thread(&threadpool->workers[ith]);
32053200
}
32063201
} else {
3207-
atomic_store_explicit(&threadpool->n_threads_cur, 1, memory_order_relaxed);
3202+
atomic_store_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
32083203
ggml_graph_compute_thread(&threadpool->workers[0]);
32093204
}
32103205
#else
3211-
if (n_threads > threadpool->n_threads_max) {
3212-
GGML_LOG_WARN("cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads_max);
3213-
n_threads = threadpool->n_threads_max;
3206+
if (n_threads > threadpool->n_threads) {
3207+
GGML_LOG_WARN("cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads);
3208+
n_threads = threadpool->n_threads;
32143209
}
32153210

32163211
// Kick all threads to start the new graph

tests/test-barrier.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ static void test_active(int n_threads, int n_rounds) {
119119
<< "\n";
120120
// ggml_graph_print(gf);
121121

122-
// In this test we keep changing number of threads every 4th iteration
122+
// In this test we keep changing the number of threads every 4th iteration
123123
// to test for race conditions in that path
124124

125125
for (int i=0; i < n_rounds; i++) {

0 commit comments

Comments
 (0)