Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 34 additions & 100 deletions src/llama-model-loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -928,135 +928,81 @@ bool llama_model_loader::load_all_data(
std::vector<no_init<uint8_t>> read_buf;
std::vector<std::future<std::pair<ggml_tensor *, bool>>> validation_result;

// 4 staging buffers for async uploads, each sized 1MB seems to be a good default for single NVMe drives.
// NVMe raid configurations might require more / larger buffers.
constexpr size_t n_buffers = 4;
constexpr size_t buffer_size = 1 * 1024 * 1024; // 1MB

std::vector<ggml_backend_buffer_t> host_buffers;
std::vector<ggml_backend_event_t> events;
std::vector<void *> host_ptrs;
size_t buffer_idx = 0; // buffer to use for async loads
size_t buffer_idx = 0;
ggml_backend_t upload_backend = [&](const char * func) -> ggml_backend_t {
if (use_mmap || check_tensors) {
return nullptr;
}
// When not using mmaped io use async uploads from pinned memory to GPU memory.
// First determine if the backend supports the necessary features for async uploads.
if (use_mmap || check_tensors) { return nullptr; }
auto * buf = bufs.count(0) ? bufs.at(0) : nullptr;
if (!buf) {
LLAMA_LOG_DEBUG("%s: no buffer found for async uploads\n", func);
return nullptr;
}

if (!buf) { LLAMA_LOG_DEBUG("%s: no buffer found for async uploads\n", func); return nullptr; }
auto * buft = ggml_backend_buffer_get_type(buf);
auto * dev = ggml_backend_buft_get_device(buft);
if (!dev) {
LLAMA_LOG_DEBUG("%s: no device found for buffer type %s for async uploads\n", func,
ggml_backend_buft_name(buft));
return nullptr;
}

if (buft != ggml_backend_dev_buffer_type(dev)) {
LLAMA_LOG_DEBUG("%s: buffer type %s is not the default buffer type for device %s for async uploads\n", func,
ggml_backend_buft_name(buft), ggml_backend_dev_name(dev));
return nullptr;
}

if (!dev) { LLAMA_LOG_DEBUG("%s: no device found\n", func); return nullptr; }
if (buft != ggml_backend_dev_buffer_type(dev)) { return nullptr; }
ggml_backend_dev_props props;
ggml_backend_dev_get_props(dev, &props);
if (!props.caps.async || !props.caps.host_buffer || !props.caps.events) {
LLAMA_LOG_DEBUG("%s: device %s does not support async, host buffers or events\n", func,
ggml_backend_dev_name(dev));
return nullptr;
}

if (!props.caps.async || !props.caps.host_buffer || !props.caps.events) { return nullptr; }
auto * host_buft = ggml_backend_dev_host_buffer_type(dev);
if (!host_buft) {
LLAMA_LOG_DEBUG("%s: no host buffer type found for device %s\n", func,
ggml_backend_dev_name(dev));
return nullptr;
}

// If the backend is supported, create pinned memory buffers and events for synchronisation.
if (!host_buft) { return nullptr; }
for (size_t idx = 0; idx < n_buffers; ++idx) {
auto * buf = ggml_backend_buft_alloc_buffer(host_buft, buffer_size);
if (!buf) {
LLAMA_LOG_DEBUG("%s: failed to allocate host buffer for async uploads for device %s\n", func,
ggml_backend_dev_name(dev));
return nullptr;
}

if (!buf) { return nullptr; }
host_buffers.emplace_back(buf);
host_ptrs.emplace_back(ggml_backend_buffer_get_base(buf));

auto * event = ggml_backend_event_new(dev);
if (!event) {
LLAMA_LOG_DEBUG("%s: failed to create event for async uploads for device %s\n", func,
ggml_backend_dev_name(dev));
return nullptr;
}

if (!event) { return nullptr; }
events.emplace_back(event);
}

ggml_backend_t backend = ggml_backend_dev_init(dev, nullptr);
if (!backend) {
LLAMA_LOG_DEBUG("%s: failed to initialize backend for device %s for async uploads\n", func,
ggml_backend_dev_name(dev));
return nullptr;
}

if (!backend) { return nullptr; }
return backend;
}(__func__);

if (upload_backend) {
LLAMA_LOG_DEBUG("%s: using async uploads for device %s, buffer type %s, backend %s\n", __func__,
ggml_backend_dev_name(ggml_backend_get_device(upload_backend)),
ggml_backend_buft_name(ggml_backend_buffer_get_type(bufs.at(0))),
ggml_backend_name(upload_backend));
LLAMA_LOG_DEBUG("%s: using async uploads\n", __func__);
}

int tensor_count = 0;
for (struct ggml_tensor * cur = ggml_get_first_tensor(ctx); cur != NULL; cur = ggml_get_next_tensor(ctx, cur)) {
tensor_count++;
const auto * weight = get_weight(ggml_get_name(cur));
if (weight == nullptr) {
// this can happen with split experts models
continue;
}
if (weight == nullptr) { continue; }

if (progress_callback) {
if (!progress_callback((float) size_done / size_data, progress_callback_user_data)) {
return false;
}
if (!progress_callback((float) size_done / size_data, progress_callback_user_data)) { return false; }
}

size_t n_size = ggml_nbytes(cur);

if (use_mmap) {
if (weight->idx >= mappings.size()) {
throw std::runtime_error(format("tensor '%s' has invalid file index %d", ggml_get_name(cur), weight->idx));
}
const auto & mapping = mappings.at(weight->idx);
ggml_backend_buffer_t buf_mmap = nullptr;
if (bufs.count(weight->idx)) {
buf_mmap = bufs.at(weight->idx);
if (weight->offs + n_size > mapping->size()) {
throw std::runtime_error(format("tensor '%s' is out of bounds", ggml_get_name(cur)));
}
ggml_backend_buffer_t buf_mmap = nullptr;
if (bufs.count(weight->idx)) { buf_mmap = bufs.at(weight->idx); }
uint8_t * data = (uint8_t *) mapping->addr() + weight->offs;

if (check_tensors) {
validation_result.emplace_back(std::async(std::launch::async, [cur, data, n_size] {
return std::make_pair(cur, ggml_validate_row_data(cur->type, data, n_size));
}));
}

GGML_ASSERT(buf_mmap || cur->data); // either we have a buffer to allocate the tensor in, or it is already allocated
GGML_ASSERT(buf_mmap || cur->data);
if (buf_mmap && cur->data == nullptr) {
ggml_backend_tensor_alloc(buf_mmap, cur, data);
if (lmlocks) {
const auto & lmlock = lmlocks->at(weight->idx);
lmlock->grow_to(weight->offs + n_size);
if (lmlocks && weight->idx < lmlocks->size()) {
lmlocks->at(weight->idx)->grow_to(weight->offs + n_size);
}
if (weight->idx < mmaps_used.size()) {
auto & mmap_used = mmaps_used[weight->idx];
mmap_used.first = std::min(mmap_used.first, weight->offs);
mmap_used.second = std::max(mmap_used.second, weight->offs + n_size);
}

auto & mmap_used = mmaps_used[weight->idx];
mmap_used.first = std::min(mmap_used.first, weight->offs);
mmap_used.second = std::max(mmap_used.second, weight->offs + n_size);
} else {
ggml_backend_tensor_set(cur, data, 0, n_size);
}
Expand All @@ -1071,20 +1017,15 @@ bool llama_model_loader::load_all_data(
}));
}
} else {
// If upload_backend is valid load the tensor in chunks to pinned memory and upload the buffers asynchronously to the GPU.
if (upload_backend) {
file->seek(weight->offs, SEEK_SET);

size_t bytes_read = 0;

while (bytes_read < n_size) {
size_t read_iteration = std::min<size_t>(buffer_size, n_size - bytes_read);

ggml_backend_event_synchronize(events[buffer_idx]);
file->read_raw(host_ptrs[buffer_idx], read_iteration);
ggml_backend_tensor_set_async(upload_backend, cur, host_ptrs[buffer_idx], bytes_read, read_iteration);
ggml_backend_event_record(events[buffer_idx], upload_backend);

bytes_read += read_iteration;
++buffer_idx;
buffer_idx %= n_buffers;
Expand All @@ -1102,9 +1043,10 @@ bool llama_model_loader::load_all_data(
}

size_done += n_size;
if (tensor_count % 100 == 0) {
LLAMA_LOG_INFO("%s: loaded %d tensors\n", __func__, tensor_count);
}
}

// free temporary resources used for async uploads
for (auto * event : events) {
ggml_backend_event_synchronize(event);
ggml_backend_event_free(event);
Expand All @@ -1114,7 +1056,6 @@ bool llama_model_loader::load_all_data(
}
ggml_backend_free(upload_backend);

// check validation results
bool validation_failed = false;
for (auto & future : validation_result) {
auto result = future.get();
Expand All @@ -1123,13 +1064,8 @@ bool llama_model_loader::load_all_data(
validation_failed = true;
}
}
if (validation_failed) {
throw std::runtime_error("found tensors with invalid data");
}

// check if this is the last call and do final cleanup
if (validation_failed) { throw std::runtime_error("found tensors with invalid data"); }
if (size_done >= size_data) {
// unmap offloaded tensors and metadata
if (use_mmap) {
for (uint32_t idx = 0; idx < mappings.size(); idx++) {
const auto & mmap_used = mmaps_used.at(idx);
Expand All @@ -1141,8 +1077,6 @@ bool llama_model_loader::load_all_data(
}
}
if (progress_callback) {
// Even though the model is done loading, we still honor
// cancellation since we need to free allocations.
return progress_callback(1.0f, progress_callback_user_data);
}
}
Expand Down
12 changes: 12 additions & 0 deletions src/llama-model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13898,6 +13898,11 @@ struct llm_build_deepseek3_2 : public llm_graph_context {

// Apply sparse attention if available, otherwise use regular attention
if (use_sparse_attention) {{
// Guard: Only use sparse attention if inp_attn and mctx are valid
if (!inp_attn || !inp_attn->mctx) {
use_sparse_attention = false;
goto regular_attention_mla;
}
const auto * mctx_cur = inp_attn->mctx;
ggml_build_forward_expand(gf, mctx_cur->cpy_k(ctx0, Kcur, inp_attn->get_k_idxs(), il));
ggml_build_forward_expand(gf, mctx_cur->cpy_v(ctx0, Vcur, inp_attn->get_v_idxs(), il));
Expand Down Expand Up @@ -13969,6 +13974,7 @@ struct llm_build_deepseek3_2 : public llm_graph_context {
LLAMA_LOG_DEBUG("DeepSeek V3.2: Using sparse attention with top-%d tokens for layer %d\n",
(int)top_k, il);
} else {
regular_attention_mla:
// note: MLA with the absorption optimzation converts into MQA (ie: GQA with 1 group)
cur = build_attn(inp_attn,
model.layers[il].wo, NULL,
Expand Down Expand Up @@ -14006,6 +14012,11 @@ struct llm_build_deepseek3_2 : public llm_graph_context {

// Apply sparse attention if available, otherwise use regular attention
if (use_sparse_attention) {{
// Guard: Only use sparse attention if inp_attn and mctx are valid
if (!inp_attn || !inp_attn->mctx) {
use_sparse_attention = false;
goto regular_attention_mha;
}
const auto * mctx_cur = inp_attn->mctx;
ggml_build_forward_expand(gf, mctx_cur->cpy_k(ctx0, Kcur, inp_attn->get_k_idxs(), il));
ggml_build_forward_expand(gf, mctx_cur->cpy_v(ctx0, Vcur, inp_attn->get_v_idxs(), il));
Expand Down Expand Up @@ -14064,6 +14075,7 @@ struct llm_build_deepseek3_2 : public llm_graph_context {
LLAMA_LOG_DEBUG("DeepSeek V3.2: Using sparse attention with top-%d tokens for layer %d\n",
(int)top_k, il);
} else {
regular_attention_mha:
// note: MLA without the absorption optimization converts into MHA (ie: GQA with full n_head groups)
cur = build_attn(inp_attn,
model.layers[il].wo, NULL,
Expand Down
2 changes: 1 addition & 1 deletion src/llama-sparse-mla-fwd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ using std::function;
const int64_t T = q_cur->ne[2];
// Fused decode path: use custom CUDA op when T == 1
const char * env_fused_dec = getenv("LLAMA_SPARSE_MLA_FUSED_DECODE");
if (T == 1 && (env_fused_dec == nullptr || atoi(env_fused_dec) != 0)) {
if (T == 1 && (env_fused_dec != nullptr && atoi(env_fused_dec) != 0)) {
// Build q_t [Dq, Hq]
ggml_tensor * q_cur_cont2 = ggml_cont(ctx, q_cur);
ggml_tensor * q_all_2d2 = ggml_reshape_2d(ctx, q_cur_cont2, Dq, Hq*T);
Expand Down
33 changes: 6 additions & 27 deletions src/llama-sparse-topk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -669,33 +669,12 @@ ggml_tensor * sparse_attn_topk::select_topk_tokens_indexer_kvaware(
// Compute top-k indices via CUDA radix selection
const int64_t k_tile = std::min<int64_t>(k, scores_clamped->ne[0]);
ggml_tensor * topk_tc = nullptr;
if (have_windows && win_ends) {
// slice starts/ends for this tile [t0, t0+Tc)
ggml_tensor * starts_tile = nullptr;
ggml_tensor * ends_tile = nullptr;
if (win_starts) {
size_t off_s = (size_t)t0 * win_starts->nb[0];
starts_tile = ggml_view_1d(ctx, win_starts, Tc, off_s);
starts_tile = ggml_cont(ctx, starts_tile);
}
if (win_ends) {
size_t off_e = (size_t)t0 * win_ends->nb[0];
ends_tile = ggml_view_1d(ctx, win_ends, Tc, off_e);
ends_tile = ggml_cont(ctx, ends_tile);
}
if (dbg) {
printf("[TOPK] using start and end\n");
fflush(stdout);
}
topk_tc = ggml_sparse_topk_radix_ex(ctx, scores_clamped, (int)k_tile, starts_tile, ends_tile);
} else {
if (dbg) {
printf("[TOPK] not using start and end, have_windows=%s win_ends=%s\n",
have_windows ? "true" : "false", win_ends ? "true" : "false");
fflush(stdout);
}
topk_tc = ggml_sparse_topk_radix(ctx, scores_clamped, (int)k_tile);
}
#if defined(__APPLE__)
// Force CPU fallback for top-k since Metal backend is missing SPARSE_TOPK_RADIX
topk_tc = sparse_attn_topk::topk_radix_indices(ctx, scores_clamped, (int)k_tile);
#else
topk_tc = ggml_sparse_topk_radix(ctx, scores_clamped, (int)k_tile);
#endif
if (dbg && t0 == 0) {
cb(topk_tc, "idxkv_topk_radix", -1);
int64_t kk = std::min<int64_t>(k_tile, (int64_t)16);
Expand Down