Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 34 additions & 39 deletions ggml/src/ggml-cpu/ggml-cpu.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ typedef void * thread_ret_t;

typedef pthread_t ggml_thread_t;

#define GGML_THREADPOOL_N_THREADS_MASK (0xffffU)
#define GGML_THREADPOOL_N_THREADS_BITS (16)

#if defined(__APPLE__)
#include <unistd.h>
#include <mach/mach.h>
Expand Down Expand Up @@ -449,20 +452,18 @@ struct ggml_threadpool {
struct ggml_cplan * cplan;

// synchronization primitives
atomic_int n_graph; // incremented when there is work to be done (i.e each graph)
atomic_int n_graph; // updated when there is work to be done (i.e each graph) holds graph and active thread counts.
atomic_int GGML_CACHE_ALIGN n_barrier;
atomic_int GGML_CACHE_ALIGN n_barrier_passed;
atomic_int GGML_CACHE_ALIGN current_chunk; // currently processing chunk during Mat_Mul, shared between all the threads.

// these are atomic as an annotation for thread-sanitizer
atomic_bool stop; // Used for stopping the threadpool altogether
atomic_bool pause; // Used for pausing the threadpool or individual threads
atomic_int abort; // Used for aborting processing of a graph
atomic_int abort; // Used for aborting processing of a graph

struct ggml_compute_state * workers; // per thread state
int n_threads_max; // number of threads in the pool
atomic_int n_threads_cur; // number of threads used in the current graph

int n_threads; // Number of threads in the pool
int32_t prio; // Scheduling priority
uint32_t poll; // Polling level (0 - no polling)

Expand Down Expand Up @@ -530,7 +531,7 @@ struct ggml_state {
static struct ggml_state g_state = {0};

void ggml_barrier(struct ggml_threadpool * tp) {
int n_threads = atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed);
int n_threads = atomic_load_explicit(&tp->n_graph, memory_order_relaxed) & GGML_THREADPOOL_N_THREADS_MASK;
if (n_threads == 1) {
return;
}
Expand All @@ -547,7 +548,7 @@ void ggml_barrier(struct ggml_threadpool * tp) {
// last thread
atomic_store_explicit(&tp->n_barrier, 0, memory_order_relaxed);

// exit barrier (fill seq-cst fence)
// exit barrier (full seq-cst fence)
atomic_fetch_add_explicit(&tp->n_barrier_passed, 1, memory_order_seq_cst);
return;
}
Expand Down Expand Up @@ -2619,7 +2620,7 @@ static void ggml_thread_cpumask_next(const bool * global_mask, bool * local_mask
void ggml_threadpool_free(struct ggml_threadpool* threadpool) {
if (!threadpool) return;

const int n_threads = threadpool->n_threads_max;
const int n_threads = threadpool->n_threads;

#ifndef GGML_USE_OPENMP
struct ggml_compute_state* workers = threadpool->workers;
Expand Down Expand Up @@ -2695,7 +2696,7 @@ struct ggml_cplan ggml_graph_plan(
//GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %d\n", n_threads);
}
if (n_threads <= 0) {
n_threads = threadpool ? threadpool->n_threads_max : GGML_DEFAULT_N_THREADS;
n_threads = threadpool ? threadpool->n_threads : GGML_DEFAULT_N_THREADS;
}

#if defined(__EMSCRIPTEN__) && !defined(__EMSCRIPTEN_PTHREADS__)
Expand Down Expand Up @@ -2903,12 +2904,14 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {

struct ggml_compute_params params = {
/*.ith =*/ state->ith,
/*.nth =*/ atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed),
/*.nth =*/ atomic_load_explicit(&tp->n_graph, memory_order_relaxed) & GGML_THREADPOOL_N_THREADS_MASK,
/*.wsize =*/ cplan->work_size,
/*.wdata =*/ cplan->work_data,
/*.threadpool=*/ tp,
};

GGML_PRINT_DEBUG("thread #%d compute-start cplan %p last-graph %d \n", state->ith, cplan, state->last_graph);

for (int node_n = 0; node_n < cgraph->n_nodes && atomic_load_explicit(&tp->abort, memory_order_relaxed) != node_n; node_n++) {
struct ggml_tensor * node = cgraph->nodes[node_n];

Expand All @@ -2930,34 +2933,32 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
}
}

GGML_PRINT_DEBUG("thread #%d compute-done cplan %p last-graph %d \n", state->ith, cplan, state->last_graph);

ggml_barrier(state->threadpool);

return 0;
}

#ifndef GGML_USE_OPENMP

// check if thread is active
static inline bool ggml_graph_compute_thread_active(struct ggml_compute_state * state) {
struct ggml_threadpool * threadpool = state->threadpool;
int n_threads = atomic_load_explicit(&threadpool->n_threads_cur, memory_order_relaxed);
return (state->ith < n_threads);
}

// check if thread is ready to proceed (exit from polling or sleeping)
// returns true if loops should exit, sets state->pending to indicate new work
static inline bool ggml_graph_compute_thread_ready(struct ggml_compute_state * state) {
struct ggml_threadpool * threadpool = state->threadpool;

if (state->pending || threadpool->stop || threadpool->pause) { return true; }

// check for new graph/work
int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
if (new_graph != state->last_graph) {
state->pending = ggml_graph_compute_thread_active(state);
state->last_graph = new_graph;
int n_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
int n_threads = n_graph & GGML_THREADPOOL_N_THREADS_MASK;
if (n_graph != state->last_graph) {
state->pending = (state->ith < n_threads);
state->last_graph = n_graph;
return true;
}

return state->pending;
return false;
}

// sync thread state after polling
Expand All @@ -2974,11 +2975,6 @@ static inline void ggml_graph_compute_thread_sync(struct ggml_compute_state * st
static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
struct ggml_threadpool * threadpool = state->threadpool;

// Skip polling for unused threads
if (!ggml_graph_compute_thread_active(state)) {
return state->pending;
}

// This seems to make 0 ... 100 a decent range for polling level across modern processors.
// Perhaps, we can adjust it dynamically based on load and things.
const uint64_t n_rounds = 1024UL * 128 * threadpool->poll;
Expand Down Expand Up @@ -3040,7 +3036,6 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
ggml_graph_compute_check_for_work(state);
if (state->pending) {
state->pending = false;

ggml_graph_compute_thread(state);
}
}
Expand All @@ -3055,14 +3050,15 @@ static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool, int

ggml_mutex_lock(&threadpool->mutex);

GGML_PRINT_DEBUG("threadpool: n_threads_cur %d n_threads %d\n", threadpool->n_threads_cur, n_threads);
// Update the number of active threads and the graph count
int n_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed) >> GGML_THREADPOOL_N_THREADS_BITS;
n_graph = ((n_graph + 1) << GGML_THREADPOOL_N_THREADS_BITS) | (n_threads & GGML_THREADPOOL_N_THREADS_MASK);

// Update the number of active threads
atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
GGML_PRINT_DEBUG("compute-kickoff: n_threads %d n_graph %d\n", n_threads, n_graph);

// Indicate the graph is ready to be processed
// We need the full seq-cst fence here because of the polling threads (used in thread_sync)
atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_seq_cst);
atomic_store_explicit(&threadpool->n_graph, n_graph, memory_order_seq_cst);

if (threadpool->pause) {
// Update main thread prio and affinity to match the threadpool settings
Expand Down Expand Up @@ -3100,8 +3096,7 @@ static struct ggml_threadpool * ggml_threadpool_new_impl(
threadpool->pause = tpp->paused;
threadpool->abort = -1;
threadpool->workers = NULL;
threadpool->n_threads_max = tpp->n_threads;
threadpool->n_threads_cur = tpp->n_threads;
threadpool->n_threads = tpp->n_threads;
threadpool->poll = tpp->poll;
threadpool->prio = tpp->prio;
threadpool->ec = GGML_STATUS_SUCCESS;
Expand Down Expand Up @@ -3196,7 +3191,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
{
// update the number of threads from the actual number of threads that we got from OpenMP
n_threads = omp_get_num_threads();
atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
atomic_store_explicit(&threadpool->n_graph, n_threads, memory_order_relaxed);
}

// Apply thread CPU mask and priority
Expand All @@ -3209,13 +3204,13 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
ggml_graph_compute_thread(&threadpool->workers[ith]);
}
} else {
atomic_store_explicit(&threadpool->n_threads_cur, 1, memory_order_relaxed);
atomic_store_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
ggml_graph_compute_thread(&threadpool->workers[0]);
}
#else
if (n_threads > threadpool->n_threads_max) {
GGML_LOG_WARN("cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads_max);
n_threads = threadpool->n_threads_max;
if (n_threads > threadpool->n_threads) {
GGML_LOG_WARN("cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads);
n_threads = threadpool->n_threads;
}

// Kick all threads to start the new graph
Expand Down
170 changes: 156 additions & 14 deletions tests/test-barrier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,7 @@

#define MAX_NARGS 2

int main(int argc, char *argv[]) {

int n_threads = std::max(1, std::min(4, (int) std::thread::hardware_concurrency()));
int n_rounds = 100;

if (argc > 1) {
n_threads = std::atoi(argv[1]);
}

if (argc > 2) {
n_rounds = std::atoi(argv[2]);
}

static void test_barrier(int n_threads, int n_rounds) {
struct ggml_init_params params = {
/* .mem_size = */ 1024*1024*1024,
/* .mem_buffer = */ NULL,
Expand Down Expand Up @@ -56,7 +44,7 @@ int main(int argc, char *argv[]) {
exit(1);
}

// Create compute plan
// The test runs with constant number of threads
struct ggml_cplan cplan = ggml_graph_plan(gf, n_threads, threadpool);

std::vector<uint8_t> work_data(cplan.work_size);
Expand Down Expand Up @@ -89,6 +77,160 @@ int main(int argc, char *argv[]) {

ggml_threadpool_free(threadpool);
ggml_free(ctx);
}

static void test_active(int n_threads, int n_rounds) {
struct ggml_init_params params = {
/* .mem_size = */ 1024*1024*1024,
/* .mem_buffer = */ NULL,
/* .no_alloc = */ false,
};

struct ggml_context * ctx = ggml_init(params);

// Create graph
struct ggml_cgraph * gf = ggml_new_graph(ctx);

// Small graph with, parallel ops with barriers
struct ggml_tensor * out = ggml_new_tensor_1d(ctx, GGML_TYPE_F32, 64);
for (int i = 0; i < 2; i++) {
struct ggml_tensor * a = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 64, 128);
out = ggml_mul_mat(ctx, a, out);

struct ggml_tensor * d = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 128, 64);
out = ggml_mul_mat(ctx, d, out);
}

ggml_build_forward_expand(gf, out);
int n_nodes = ggml_graph_n_nodes(gf);

// Create threadpool
struct ggml_threadpool_params tpp = ggml_threadpool_params_default(n_threads);
struct ggml_threadpool* threadpool = ggml_threadpool_new(&tpp);
if (!threadpool) {
fprintf(stderr, "threadpool create failed : n_threads %d\n", n_threads);
exit(1);
}

std::cerr << "graph-compute with"
<< "\n n_threads: " << n_threads
<< "\n n_nodes: " << n_nodes
<< "\n n_rounds: " << n_rounds
<< "\n";
// ggml_graph_print(gf);

// In this test we keep changing the number of threads every 4th iteration
// to test for race conditions in that path

for (int i=0; i < n_rounds; i++) {
struct ggml_cplan cplan = ggml_graph_plan(gf, (i % 4) == 0 ? 1 : n_threads, threadpool);

std::vector<uint8_t> work_data(cplan.work_size);
cplan.work_data = work_data.data();

ggml_graph_compute(gf, &cplan);
}

ggml_threadpool_free(threadpool);
ggml_free(ctx);
}

static void test_multi_graph(int n_threads, int n_rounds) {
struct ggml_init_params params = {
/* .mem_size = */ 1024*1024*1024,
/* .mem_buffer = */ NULL,
/* .no_alloc = */ false,
};

struct ggml_context * ctx = ggml_init(params);

// Create graphs
struct ggml_cgraph * gf0 = ggml_new_graph(ctx);
{
// Small graph with parallel ops with barriers
struct ggml_tensor * out = ggml_new_tensor_1d(ctx, GGML_TYPE_F32, 64);
for (int i = 0; i < 2; i++) {
struct ggml_tensor * a = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 64, 128);
out = ggml_mul_mat(ctx, a, out);

struct ggml_tensor * d = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 128, 64);
out = ggml_mul_mat(ctx, d, out);
}

ggml_build_forward_expand(gf0, out);
}

struct ggml_cgraph * gf1 = ggml_new_graph(ctx);
{
// Small graph with parallel ops with barriers
// Use larger tensors to make sure work_data size is larger than gf0
struct ggml_tensor * out = ggml_new_tensor_1d(ctx, GGML_TYPE_F32, 256);
for (int i = 0; i < 4; i++) {
struct ggml_tensor * a = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 256, 128);
out = ggml_mul_mat(ctx, a, out);

struct ggml_tensor * d = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 128, 256);
out = ggml_mul_mat(ctx, d, out);
}

ggml_build_forward_expand(gf1, out);
}


// Create threadpool
struct ggml_threadpool_params tpp = ggml_threadpool_params_default(n_threads);
struct ggml_threadpool* threadpool = ggml_threadpool_new(&tpp);
if (!threadpool) {
fprintf(stderr, "threadpool create failed : n_threads %d\n", n_threads);
exit(1);
}

std::cerr << "graph-compute with"
<< "\n gf0 n_nodes: " << ggml_graph_n_nodes(gf0)
<< "\n gf1 n_nodes: " << ggml_graph_n_nodes(gf1)
<< "\n n_threads: " << n_threads
<< "\n n_rounds: " << n_rounds
<< "\n";

// In this test we keep changing the number of threads every 4th iteration
// and we compute two graphs back to back to test graph frequent graph switching

for (int i=0; i < n_rounds; i++) {
struct ggml_cplan cplan0 = ggml_graph_plan(gf0, (i % 4) == 0 ? 1 : n_threads, threadpool);
std::vector<uint8_t> work_data0(cplan0.work_size);
cplan0.work_data = work_data0.data();

struct ggml_cplan cplan1 = ggml_graph_plan(gf1, (i % 4) == 0 ? 1 : n_threads, threadpool);
std::vector<uint8_t> work_data1(cplan1.work_size);
cplan1.work_data = work_data1.data();

ggml_graph_compute(gf0, &cplan0);
ggml_graph_compute(gf1, &cplan1);
}

ggml_threadpool_free(threadpool);
ggml_free(ctx);
}


int main(int argc, char *argv[]) {

int n_threads = std::max(1, std::min(4, (int) std::thread::hardware_concurrency()));
int n_rounds = 100;

if (argc > 1) {
n_threads = std::atoi(argv[1]);
}

if (argc > 2) {
n_rounds = std::atoi(argv[2]);
}

test_barrier(n_threads, n_rounds);

test_active(n_threads, n_rounds * 100);

test_multi_graph(n_threads, n_rounds * 10);

return 0;
}
Loading