From 41a2a6cfa91d1a4074acc8d3ebd2b57c16b07eb4 Mon Sep 17 00:00:00 2001 From: Max Krasnyansky Date: Wed, 3 Dec 2025 10:45:44 -0800 Subject: [PATCH 1/3] tests: update barrier test to check for race condition in active threads --- tests/test-barrier.cpp | 90 +++++++++++++++++++++++++++++++++++------- 1 file changed, 76 insertions(+), 14 deletions(-) diff --git a/tests/test-barrier.cpp b/tests/test-barrier.cpp index 04c27761dc8..e9b35b0b5e8 100644 --- a/tests/test-barrier.cpp +++ b/tests/test-barrier.cpp @@ -11,19 +11,7 @@ #define MAX_NARGS 2 -int main(int argc, char *argv[]) { - - int n_threads = std::max(1, std::min(4, (int) std::thread::hardware_concurrency())); - int n_rounds = 100; - - if (argc > 1) { - n_threads = std::atoi(argv[1]); - } - - if (argc > 2) { - n_rounds = std::atoi(argv[2]); - } - +static void test_barrier(int n_threads, int n_rounds) { struct ggml_init_params params = { /* .mem_size = */ 1024*1024*1024, /* .mem_buffer = */ NULL, @@ -56,7 +44,7 @@ int main(int argc, char *argv[]) { exit(1); } - // Create compute plan + // The test runs with constant number of threads struct ggml_cplan cplan = ggml_graph_plan(gf, n_threads, threadpool); std::vector work_data(cplan.work_size); @@ -89,6 +77,80 @@ int main(int argc, char *argv[]) { ggml_threadpool_free(threadpool); ggml_free(ctx); +} + +static void test_active(int n_threads, int n_rounds) { + struct ggml_init_params params = { + /* .mem_size = */ 1024*1024*1024, + /* .mem_buffer = */ NULL, + /* .no_alloc = */ false, + }; + + struct ggml_context * ctx = ggml_init(params); + + // Create graph + struct ggml_cgraph * gf = ggml_new_graph(ctx); + + // Small graph with, parallel ops with barriers + struct ggml_tensor * out = ggml_new_tensor_1d(ctx, GGML_TYPE_F32, 64); + for (int i = 0; i < 2; i++) { + struct ggml_tensor * a = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 64, 128); + out = ggml_mul_mat(ctx, a, out); + + struct ggml_tensor * d = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 128, 64); + out = ggml_mul_mat(ctx, d, out); + } + + ggml_build_forward_expand(gf, out); + int n_nodes = ggml_graph_n_nodes(gf); + + // Create threadpool + struct ggml_threadpool_params tpp = ggml_threadpool_params_default(n_threads); + struct ggml_threadpool* threadpool = ggml_threadpool_new(&tpp); + if (!threadpool) { + fprintf(stderr, "threadpool create failed : n_threads %d\n", n_threads); + exit(1); + } + + std::cerr << "graph-compute with" + << "\n n_threads: " << n_threads + << "\n n_nodes: " << n_nodes + << "\n n_rounds: " << n_rounds + << "\n"; + // ggml_graph_print(gf); + + // In this test we keep changing number of threads every 4th iteration + // to test for race conditions in that path + + for (int i=0; i < n_rounds; i++) { + struct ggml_cplan cplan = ggml_graph_plan(gf, (i % 4) == 0 ? 1 : n_threads, threadpool); + + std::vector work_data(cplan.work_size); + cplan.work_data = work_data.data(); + + ggml_graph_compute(gf, &cplan); + } + + ggml_threadpool_free(threadpool); + ggml_free(ctx); +} + +int main(int argc, char *argv[]) { + + int n_threads = std::max(1, std::min(4, (int) std::thread::hardware_concurrency())); + int n_rounds = 100; + + if (argc > 1) { + n_threads = std::atoi(argv[1]); + } + + if (argc > 2) { + n_rounds = std::atoi(argv[2]); + } + + test_barrier(n_threads, n_rounds); + + test_active(n_threads, n_rounds * 100); return 0; } From 8b7c68f447259aba4461a527c9d5847ac0f71f67 Mon Sep 17 00:00:00 2001 From: Max Krasnyansky Date: Wed, 3 Dec 2025 09:33:19 -0800 Subject: [PATCH 2/3] cpu: combine n_graph and n_threads into a single atomic update --- ggml/src/ggml-cpu/ggml-cpu.c | 73 +++++++++++++++++------------------- tests/test-barrier.cpp | 2 +- 2 files changed, 35 insertions(+), 40 deletions(-) diff --git a/ggml/src/ggml-cpu/ggml-cpu.c b/ggml/src/ggml-cpu/ggml-cpu.c index 8507557267a..816ee1c8777 100644 --- a/ggml/src/ggml-cpu/ggml-cpu.c +++ b/ggml/src/ggml-cpu/ggml-cpu.c @@ -187,6 +187,9 @@ typedef void * thread_ret_t; typedef pthread_t ggml_thread_t; +#define GGML_THREADPOOL_N_THREADS_MASK (0xffffU) +#define GGML_THREADPOOL_N_THREADS_BITS (16) + #if defined(__APPLE__) #include #include @@ -449,7 +452,7 @@ struct ggml_threadpool { struct ggml_cplan * cplan; // synchronization primitives - atomic_int n_graph; // incremented when there is work to be done (i.e each graph) + atomic_int n_graph; // updated when there is work to be done (i.e each graph) holds graph and active thread counts. atomic_int GGML_CACHE_ALIGN n_barrier; atomic_int GGML_CACHE_ALIGN n_barrier_passed; atomic_int GGML_CACHE_ALIGN current_chunk; // currently processing chunk during Mat_Mul, shared between all the threads. @@ -457,12 +460,10 @@ struct ggml_threadpool { // these are atomic as an annotation for thread-sanitizer atomic_bool stop; // Used for stopping the threadpool altogether atomic_bool pause; // Used for pausing the threadpool or individual threads - atomic_int abort; // Used for aborting processing of a graph + atomic_int abort; // Used for aborting processing of a graph struct ggml_compute_state * workers; // per thread state - int n_threads_max; // number of threads in the pool - atomic_int n_threads_cur; // number of threads used in the current graph - + int n_threads; // Number of threads in the pool int32_t prio; // Scheduling priority uint32_t poll; // Polling level (0 - no polling) @@ -530,7 +531,7 @@ struct ggml_state { static struct ggml_state g_state = {0}; void ggml_barrier(struct ggml_threadpool * tp) { - int n_threads = atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed); + int n_threads = atomic_load_explicit(&tp->n_graph, memory_order_relaxed) & GGML_THREADPOOL_N_THREADS_MASK; if (n_threads == 1) { return; } @@ -547,7 +548,7 @@ void ggml_barrier(struct ggml_threadpool * tp) { // last thread atomic_store_explicit(&tp->n_barrier, 0, memory_order_relaxed); - // exit barrier (fill seq-cst fence) + // exit barrier (full seq-cst fence) atomic_fetch_add_explicit(&tp->n_barrier_passed, 1, memory_order_seq_cst); return; } @@ -2619,7 +2620,7 @@ static void ggml_thread_cpumask_next(const bool * global_mask, bool * local_mask void ggml_threadpool_free(struct ggml_threadpool* threadpool) { if (!threadpool) return; - const int n_threads = threadpool->n_threads_max; + const int n_threads = threadpool->n_threads; #ifndef GGML_USE_OPENMP struct ggml_compute_state* workers = threadpool->workers; @@ -2695,7 +2696,7 @@ struct ggml_cplan ggml_graph_plan( //GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %d\n", n_threads); } if (n_threads <= 0) { - n_threads = threadpool ? threadpool->n_threads_max : GGML_DEFAULT_N_THREADS; + n_threads = threadpool ? threadpool->n_threads : GGML_DEFAULT_N_THREADS; } #if defined(__EMSCRIPTEN__) && !defined(__EMSCRIPTEN_PTHREADS__) @@ -2903,12 +2904,14 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { struct ggml_compute_params params = { /*.ith =*/ state->ith, - /*.nth =*/ atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed), + /*.nth =*/ atomic_load_explicit(&tp->n_graph, memory_order_relaxed) & GGML_THREADPOOL_N_THREADS_MASK, /*.wsize =*/ cplan->work_size, /*.wdata =*/ cplan->work_data, /*.threadpool=*/ tp, }; + GGML_PRINT_DEBUG("thread #%d compute-start cplan %p last-graph %d \n", state->ith, cplan, state->last_graph); + for (int node_n = 0; node_n < cgraph->n_nodes && atomic_load_explicit(&tp->abort, memory_order_relaxed) != node_n; node_n++) { struct ggml_tensor * node = cgraph->nodes[node_n]; @@ -2930,6 +2933,8 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { } } + GGML_PRINT_DEBUG("thread #%d compute-done cplan %p last-graph %d \n", state->ith, cplan, state->last_graph); + ggml_barrier(state->threadpool); return 0; @@ -2937,27 +2942,23 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { #ifndef GGML_USE_OPENMP -// check if thread is active -static inline bool ggml_graph_compute_thread_active(struct ggml_compute_state * state) { - struct ggml_threadpool * threadpool = state->threadpool; - int n_threads = atomic_load_explicit(&threadpool->n_threads_cur, memory_order_relaxed); - return (state->ith < n_threads); -} - // check if thread is ready to proceed (exit from polling or sleeping) +// returns true if loops should exit, sets state->pending to indicate new work static inline bool ggml_graph_compute_thread_ready(struct ggml_compute_state * state) { struct ggml_threadpool * threadpool = state->threadpool; if (state->pending || threadpool->stop || threadpool->pause) { return true; } // check for new graph/work - int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed); - if (new_graph != state->last_graph) { - state->pending = ggml_graph_compute_thread_active(state); - state->last_graph = new_graph; + int n_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed); + int n_threads = n_graph & GGML_THREADPOOL_N_THREADS_MASK; + if (n_graph != state->last_graph) { + state->pending = (state->ith < n_threads); + state->last_graph = n_graph; + return true; } - return state->pending; + return false; } // sync thread state after polling @@ -2974,11 +2975,6 @@ static inline void ggml_graph_compute_thread_sync(struct ggml_compute_state * st static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) { struct ggml_threadpool * threadpool = state->threadpool; - // Skip polling for unused threads - if (!ggml_graph_compute_thread_active(state)) { - return state->pending; - } - // This seems to make 0 ... 100 a decent range for polling level across modern processors. // Perhaps, we can adjust it dynamically based on load and things. const uint64_t n_rounds = 1024UL * 128 * threadpool->poll; @@ -3040,7 +3036,6 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) { ggml_graph_compute_check_for_work(state); if (state->pending) { state->pending = false; - ggml_graph_compute_thread(state); } } @@ -3055,14 +3050,15 @@ static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool, int ggml_mutex_lock(&threadpool->mutex); - GGML_PRINT_DEBUG("threadpool: n_threads_cur %d n_threads %d\n", threadpool->n_threads_cur, n_threads); + // Update the number of active threads and the graph count + int n_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed) >> GGML_THREADPOOL_N_THREADS_BITS; + n_graph = ((n_graph + 1) << GGML_THREADPOOL_N_THREADS_BITS) | (n_threads & GGML_THREADPOOL_N_THREADS_MASK); - // Update the number of active threads - atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed); + GGML_PRINT_DEBUG("compute-kickoff: n_threads %d n_graph %d\n", n_threads, n_graph); // Indicate the graph is ready to be processed // We need the full seq-cst fence here because of the polling threads (used in thread_sync) - atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_seq_cst); + atomic_store_explicit(&threadpool->n_graph, n_graph, memory_order_seq_cst); if (threadpool->pause) { // Update main thread prio and affinity to match the threadpool settings @@ -3100,8 +3096,7 @@ static struct ggml_threadpool * ggml_threadpool_new_impl( threadpool->pause = tpp->paused; threadpool->abort = -1; threadpool->workers = NULL; - threadpool->n_threads_max = tpp->n_threads; - threadpool->n_threads_cur = tpp->n_threads; + threadpool->n_threads = tpp->n_threads; threadpool->poll = tpp->poll; threadpool->prio = tpp->prio; threadpool->ec = GGML_STATUS_SUCCESS; @@ -3196,7 +3191,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl { // update the number of threads from the actual number of threads that we got from OpenMP n_threads = omp_get_num_threads(); - atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed); + atomic_store_explicit(&threadpool->n_graph, n_threads, memory_order_relaxed); } // Apply thread CPU mask and priority @@ -3209,13 +3204,13 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl ggml_graph_compute_thread(&threadpool->workers[ith]); } } else { - atomic_store_explicit(&threadpool->n_threads_cur, 1, memory_order_relaxed); + atomic_store_explicit(&threadpool->n_graph, 1, memory_order_relaxed); ggml_graph_compute_thread(&threadpool->workers[0]); } #else - if (n_threads > threadpool->n_threads_max) { - GGML_LOG_WARN("cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads_max); - n_threads = threadpool->n_threads_max; + if (n_threads > threadpool->n_threads) { + GGML_LOG_WARN("cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads); + n_threads = threadpool->n_threads; } // Kick all threads to start the new graph diff --git a/tests/test-barrier.cpp b/tests/test-barrier.cpp index e9b35b0b5e8..018d1f04705 100644 --- a/tests/test-barrier.cpp +++ b/tests/test-barrier.cpp @@ -119,7 +119,7 @@ static void test_active(int n_threads, int n_rounds) { << "\n"; // ggml_graph_print(gf); - // In this test we keep changing number of threads every 4th iteration + // In this test we keep changing the number of threads every 4th iteration // to test for race conditions in that path for (int i=0; i < n_rounds; i++) { From 222c9f89ad9779d51da1af5360074c4d6050e07d Mon Sep 17 00:00:00 2001 From: Max Krasnyansky Date: Wed, 3 Dec 2025 19:58:00 -0800 Subject: [PATCH 3/3] tests: add multi-graph test for test_barrier --- tests/test-barrier.cpp | 80 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/tests/test-barrier.cpp b/tests/test-barrier.cpp index 018d1f04705..61f73adfd2c 100644 --- a/tests/test-barrier.cpp +++ b/tests/test-barrier.cpp @@ -135,6 +135,84 @@ static void test_active(int n_threads, int n_rounds) { ggml_free(ctx); } +static void test_multi_graph(int n_threads, int n_rounds) { + struct ggml_init_params params = { + /* .mem_size = */ 1024*1024*1024, + /* .mem_buffer = */ NULL, + /* .no_alloc = */ false, + }; + + struct ggml_context * ctx = ggml_init(params); + + // Create graphs + struct ggml_cgraph * gf0 = ggml_new_graph(ctx); + { + // Small graph with parallel ops with barriers + struct ggml_tensor * out = ggml_new_tensor_1d(ctx, GGML_TYPE_F32, 64); + for (int i = 0; i < 2; i++) { + struct ggml_tensor * a = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 64, 128); + out = ggml_mul_mat(ctx, a, out); + + struct ggml_tensor * d = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 128, 64); + out = ggml_mul_mat(ctx, d, out); + } + + ggml_build_forward_expand(gf0, out); + } + + struct ggml_cgraph * gf1 = ggml_new_graph(ctx); + { + // Small graph with parallel ops with barriers + // Use larger tensors to make sure work_data size is larger than gf0 + struct ggml_tensor * out = ggml_new_tensor_1d(ctx, GGML_TYPE_F32, 256); + for (int i = 0; i < 4; i++) { + struct ggml_tensor * a = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 256, 128); + out = ggml_mul_mat(ctx, a, out); + + struct ggml_tensor * d = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 128, 256); + out = ggml_mul_mat(ctx, d, out); + } + + ggml_build_forward_expand(gf1, out); + } + + + // Create threadpool + struct ggml_threadpool_params tpp = ggml_threadpool_params_default(n_threads); + struct ggml_threadpool* threadpool = ggml_threadpool_new(&tpp); + if (!threadpool) { + fprintf(stderr, "threadpool create failed : n_threads %d\n", n_threads); + exit(1); + } + + std::cerr << "graph-compute with" + << "\n gf0 n_nodes: " << ggml_graph_n_nodes(gf0) + << "\n gf1 n_nodes: " << ggml_graph_n_nodes(gf1) + << "\n n_threads: " << n_threads + << "\n n_rounds: " << n_rounds + << "\n"; + + // In this test we keep changing the number of threads every 4th iteration + // and we compute two graphs back to back to test graph frequent graph switching + + for (int i=0; i < n_rounds; i++) { + struct ggml_cplan cplan0 = ggml_graph_plan(gf0, (i % 4) == 0 ? 1 : n_threads, threadpool); + std::vector work_data0(cplan0.work_size); + cplan0.work_data = work_data0.data(); + + struct ggml_cplan cplan1 = ggml_graph_plan(gf1, (i % 4) == 0 ? 1 : n_threads, threadpool); + std::vector work_data1(cplan1.work_size); + cplan1.work_data = work_data1.data(); + + ggml_graph_compute(gf0, &cplan0); + ggml_graph_compute(gf1, &cplan1); + } + + ggml_threadpool_free(threadpool); + ggml_free(ctx); +} + + int main(int argc, char *argv[]) { int n_threads = std::max(1, std::min(4, (int) std::thread::hardware_concurrency())); @@ -152,5 +230,7 @@ int main(int argc, char *argv[]) { test_active(n_threads, n_rounds * 100); + test_multi_graph(n_threads, n_rounds * 10); + return 0; }