Skip to content

Commit 8b7c68f

Browse files
cpu: combine n_graph and n_threads into a single atomic update
1 parent 41a2a6c commit 8b7c68f

File tree

2 files changed

+35
-40
lines changed

2 files changed

+35
-40
lines changed

ggml/src/ggml-cpu/ggml-cpu.c

Lines changed: 34 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,9 @@ typedef void * thread_ret_t;
187187

188188
typedef pthread_t ggml_thread_t;
189189

190+
#define GGML_THREADPOOL_N_THREADS_MASK (0xffffU)
191+
#define GGML_THREADPOOL_N_THREADS_BITS (16)
192+
190193
#if defined(__APPLE__)
191194
#include <unistd.h>
192195
#include <mach/mach.h>
@@ -449,20 +452,18 @@ struct ggml_threadpool {
449452
struct ggml_cplan * cplan;
450453

451454
// synchronization primitives
452-
atomic_int n_graph; // incremented when there is work to be done (i.e each graph)
455+
atomic_int n_graph; // updated when there is work to be done (i.e each graph) holds graph and active thread counts.
453456
atomic_int GGML_CACHE_ALIGN n_barrier;
454457
atomic_int GGML_CACHE_ALIGN n_barrier_passed;
455458
atomic_int GGML_CACHE_ALIGN current_chunk; // currently processing chunk during Mat_Mul, shared between all the threads.
456459

457460
// these are atomic as an annotation for thread-sanitizer
458461
atomic_bool stop; // Used for stopping the threadpool altogether
459462
atomic_bool pause; // Used for pausing the threadpool or individual threads
460-
atomic_int abort; // Used for aborting processing of a graph
463+
atomic_int abort; // Used for aborting processing of a graph
461464

462465
struct ggml_compute_state * workers; // per thread state
463-
int n_threads_max; // number of threads in the pool
464-
atomic_int n_threads_cur; // number of threads used in the current graph
465-
466+
int n_threads; // Number of threads in the pool
466467
int32_t prio; // Scheduling priority
467468
uint32_t poll; // Polling level (0 - no polling)
468469

@@ -530,7 +531,7 @@ struct ggml_state {
530531
static struct ggml_state g_state = {0};
531532

532533
void ggml_barrier(struct ggml_threadpool * tp) {
533-
int n_threads = atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed);
534+
int n_threads = atomic_load_explicit(&tp->n_graph, memory_order_relaxed) & GGML_THREADPOOL_N_THREADS_MASK;
534535
if (n_threads == 1) {
535536
return;
536537
}
@@ -547,7 +548,7 @@ void ggml_barrier(struct ggml_threadpool * tp) {
547548
// last thread
548549
atomic_store_explicit(&tp->n_barrier, 0, memory_order_relaxed);
549550

550-
// exit barrier (fill seq-cst fence)
551+
// exit barrier (full seq-cst fence)
551552
atomic_fetch_add_explicit(&tp->n_barrier_passed, 1, memory_order_seq_cst);
552553
return;
553554
}
@@ -2619,7 +2620,7 @@ static void ggml_thread_cpumask_next(const bool * global_mask, bool * local_mask
26192620
void ggml_threadpool_free(struct ggml_threadpool* threadpool) {
26202621
if (!threadpool) return;
26212622

2622-
const int n_threads = threadpool->n_threads_max;
2623+
const int n_threads = threadpool->n_threads;
26232624

26242625
#ifndef GGML_USE_OPENMP
26252626
struct ggml_compute_state* workers = threadpool->workers;
@@ -2695,7 +2696,7 @@ struct ggml_cplan ggml_graph_plan(
26952696
//GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %d\n", n_threads);
26962697
}
26972698
if (n_threads <= 0) {
2698-
n_threads = threadpool ? threadpool->n_threads_max : GGML_DEFAULT_N_THREADS;
2699+
n_threads = threadpool ? threadpool->n_threads : GGML_DEFAULT_N_THREADS;
26992700
}
27002701

27012702
#if defined(__EMSCRIPTEN__) && !defined(__EMSCRIPTEN_PTHREADS__)
@@ -2903,12 +2904,14 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
29032904

29042905
struct ggml_compute_params params = {
29052906
/*.ith =*/ state->ith,
2906-
/*.nth =*/ atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed),
2907+
/*.nth =*/ atomic_load_explicit(&tp->n_graph, memory_order_relaxed) & GGML_THREADPOOL_N_THREADS_MASK,
29072908
/*.wsize =*/ cplan->work_size,
29082909
/*.wdata =*/ cplan->work_data,
29092910
/*.threadpool=*/ tp,
29102911
};
29112912

2913+
GGML_PRINT_DEBUG("thread #%d compute-start cplan %p last-graph %d \n", state->ith, cplan, state->last_graph);
2914+
29122915
for (int node_n = 0; node_n < cgraph->n_nodes && atomic_load_explicit(&tp->abort, memory_order_relaxed) != node_n; node_n++) {
29132916
struct ggml_tensor * node = cgraph->nodes[node_n];
29142917

@@ -2930,34 +2933,32 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
29302933
}
29312934
}
29322935

2936+
GGML_PRINT_DEBUG("thread #%d compute-done cplan %p last-graph %d \n", state->ith, cplan, state->last_graph);
2937+
29332938
ggml_barrier(state->threadpool);
29342939

29352940
return 0;
29362941
}
29372942

29382943
#ifndef GGML_USE_OPENMP
29392944

2940-
// check if thread is active
2941-
static inline bool ggml_graph_compute_thread_active(struct ggml_compute_state * state) {
2942-
struct ggml_threadpool * threadpool = state->threadpool;
2943-
int n_threads = atomic_load_explicit(&threadpool->n_threads_cur, memory_order_relaxed);
2944-
return (state->ith < n_threads);
2945-
}
2946-
29472945
// check if thread is ready to proceed (exit from polling or sleeping)
2946+
// returns true if loops should exit, sets state->pending to indicate new work
29482947
static inline bool ggml_graph_compute_thread_ready(struct ggml_compute_state * state) {
29492948
struct ggml_threadpool * threadpool = state->threadpool;
29502949

29512950
if (state->pending || threadpool->stop || threadpool->pause) { return true; }
29522951

29532952
// check for new graph/work
2954-
int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
2955-
if (new_graph != state->last_graph) {
2956-
state->pending = ggml_graph_compute_thread_active(state);
2957-
state->last_graph = new_graph;
2953+
int n_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
2954+
int n_threads = n_graph & GGML_THREADPOOL_N_THREADS_MASK;
2955+
if (n_graph != state->last_graph) {
2956+
state->pending = (state->ith < n_threads);
2957+
state->last_graph = n_graph;
2958+
return true;
29582959
}
29592960

2960-
return state->pending;
2961+
return false;
29612962
}
29622963

29632964
// sync thread state after polling
@@ -2974,11 +2975,6 @@ static inline void ggml_graph_compute_thread_sync(struct ggml_compute_state * st
29742975
static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
29752976
struct ggml_threadpool * threadpool = state->threadpool;
29762977

2977-
// Skip polling for unused threads
2978-
if (!ggml_graph_compute_thread_active(state)) {
2979-
return state->pending;
2980-
}
2981-
29822978
// This seems to make 0 ... 100 a decent range for polling level across modern processors.
29832979
// Perhaps, we can adjust it dynamically based on load and things.
29842980
const uint64_t n_rounds = 1024UL * 128 * threadpool->poll;
@@ -3040,7 +3036,6 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
30403036
ggml_graph_compute_check_for_work(state);
30413037
if (state->pending) {
30423038
state->pending = false;
3043-
30443039
ggml_graph_compute_thread(state);
30453040
}
30463041
}
@@ -3055,14 +3050,15 @@ static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool, int
30553050

30563051
ggml_mutex_lock(&threadpool->mutex);
30573052

3058-
GGML_PRINT_DEBUG("threadpool: n_threads_cur %d n_threads %d\n", threadpool->n_threads_cur, n_threads);
3053+
// Update the number of active threads and the graph count
3054+
int n_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed) >> GGML_THREADPOOL_N_THREADS_BITS;
3055+
n_graph = ((n_graph + 1) << GGML_THREADPOOL_N_THREADS_BITS) | (n_threads & GGML_THREADPOOL_N_THREADS_MASK);
30593056

3060-
// Update the number of active threads
3061-
atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
3057+
GGML_PRINT_DEBUG("compute-kickoff: n_threads %d n_graph %d\n", n_threads, n_graph);
30623058

30633059
// Indicate the graph is ready to be processed
30643060
// We need the full seq-cst fence here because of the polling threads (used in thread_sync)
3065-
atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_seq_cst);
3061+
atomic_store_explicit(&threadpool->n_graph, n_graph, memory_order_seq_cst);
30663062

30673063
if (threadpool->pause) {
30683064
// Update main thread prio and affinity to match the threadpool settings
@@ -3100,8 +3096,7 @@ static struct ggml_threadpool * ggml_threadpool_new_impl(
31003096
threadpool->pause = tpp->paused;
31013097
threadpool->abort = -1;
31023098
threadpool->workers = NULL;
3103-
threadpool->n_threads_max = tpp->n_threads;
3104-
threadpool->n_threads_cur = tpp->n_threads;
3099+
threadpool->n_threads = tpp->n_threads;
31053100
threadpool->poll = tpp->poll;
31063101
threadpool->prio = tpp->prio;
31073102
threadpool->ec = GGML_STATUS_SUCCESS;
@@ -3196,7 +3191,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
31963191
{
31973192
// update the number of threads from the actual number of threads that we got from OpenMP
31983193
n_threads = omp_get_num_threads();
3199-
atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
3194+
atomic_store_explicit(&threadpool->n_graph, n_threads, memory_order_relaxed);
32003195
}
32013196

32023197
// Apply thread CPU mask and priority
@@ -3209,13 +3204,13 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
32093204
ggml_graph_compute_thread(&threadpool->workers[ith]);
32103205
}
32113206
} else {
3212-
atomic_store_explicit(&threadpool->n_threads_cur, 1, memory_order_relaxed);
3207+
atomic_store_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
32133208
ggml_graph_compute_thread(&threadpool->workers[0]);
32143209
}
32153210
#else
3216-
if (n_threads > threadpool->n_threads_max) {
3217-
GGML_LOG_WARN("cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads_max);
3218-
n_threads = threadpool->n_threads_max;
3211+
if (n_threads > threadpool->n_threads) {
3212+
GGML_LOG_WARN("cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads);
3213+
n_threads = threadpool->n_threads;
32193214
}
32203215

32213216
// Kick all threads to start the new graph

tests/test-barrier.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ static void test_active(int n_threads, int n_rounds) {
119119
<< "\n";
120120
// ggml_graph_print(gf);
121121

122-
// In this test we keep changing number of threads every 4th iteration
122+
// In this test we keep changing the number of threads every 4th iteration
123123
// to test for race conditions in that path
124124

125125
for (int i=0; i < n_rounds; i++) {

0 commit comments

Comments
 (0)