Skip to content

Commit 4c7335d

Browse files
committed
bugfix in cache dump queue
1 parent 7a3e211 commit 4c7335d

File tree

2 files changed

+20
-26
lines changed

2 files changed

+20
-26
lines changed

ucm/store/cache/cc/dump_queue.cc

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ void DumpQueue::DispatchOneTask(Trans::Stream* stream, size_t tensorSize, TaskPa
8383
auto& task = pair.first;
8484
auto& waiter = pair.second;
8585
auto wait = NowTime::Now() - waiter->startTp;
86-
UC_DEBUG("Cache task({}) start running, wait {:06f}s.", task->id, wait);
86+
UC_DEBUG("Cache task({}) start running, wait {:.3f}ms.", task->id, wait * 1e3);
8787
if (!failureSet_->Contains(task->id)) {
8888
auto s = DumpOneTask(stream, tensorSize, task);
8989
if (s.Failure()) [[unlikely]] { failureSet_->Insert(task->id); }
@@ -97,49 +97,43 @@ Status DumpQueue::DumpOneTask(Trans::Stream* stream, size_t tensorSize, TaskPtr
9797
Detail::TaskDesc backendTaskDesc;
9898
backendTaskDesc.brief = "Cache2Backend";
9999
const auto nShard = task->desc.size();
100-
std::vector<size_t> backendTaskIndex;
101-
backendTaskIndex.reserve(nShard);
102-
std::vector<ShardTask> shardTasks(nShard);
103-
for (size_t i = 0; i < task->desc.size(); i++) {
100+
DumpCtx dumpCtx;
101+
for (size_t i = 0; i < nShard; i++) {
104102
auto& shard = task->desc[i];
105-
auto& shardTask = shardTasks[i];
106-
shardTask.bufferHandle = buffer_->Get(shard.owner, shard.index);
107-
if (!shardTask.bufferHandle.Owner()) { continue; }
108-
if (!shardTask.bufferHandle.Ready()) {
109-
auto s = stream->DeviceToHostAsync(shard.addrs.data(), shardTask.bufferHandle.Data(),
110-
tensorSize, shard.addrs.size());
103+
auto handle = buffer_->Get(shard.owner, shard.index);
104+
if (!handle.Owner()) { continue; }
105+
if (!handle.Ready()) {
106+
auto s = stream->DeviceToHostAsync(shard.addrs.data(), handle.Data(), tensorSize,
107+
shard.addrs.size());
111108
if (s.Failure()) [[unlikely]] {
112109
UC_ERROR("Failed({}) to do D2H({}) batch({}) async.", s, tensorSize,
113110
shard.addrs.size());
114111
return s;
115112
}
116113
}
117-
backendTaskDesc.push_back(
118-
Detail::Shard{shard.owner, shard.index, {shardTask.bufferHandle.Data()}});
119-
backendTaskIndex.emplace_back(i);
114+
backendTaskDesc.push_back(Detail::Shard{shard.owner, shard.index, {handle.Data()}});
115+
dumpCtx.bufferHandles.push_back(std::move(handle));
120116
}
121117
auto tpMakeBuffer = NowTime::Now();
122-
if (backendTaskIndex.empty()) { return Status::OK(); }
118+
if (backendTaskDesc.empty()) { return Status::OK(); }
123119
auto s = stream->Synchronized();
124120
if (s.Failure()) [[unlikely]] {
125121
UC_ERROR("Failed({}) to sync on stream.", s);
126122
return s;
127123
}
128124
auto tpSyncStream = NowTime::Now();
129-
for (const auto& i : backendTaskIndex) { shardTasks[i].bufferHandle.MarkReady(); }
125+
for (auto& handle : dumpCtx.bufferHandles) { handle.MarkReady(); }
130126
auto res = backend_->Dump(std::move(backendTaskDesc));
131127
if (!res) [[unlikely]] {
132128
UC_ERROR("Failed({}) to submit dump task to backend.", res.Error());
133129
return res.Error();
134130
}
135-
for (const auto& i : backendTaskIndex) {
136-
auto& shardTask = shardTasks[i];
137-
shardTask.backendTaskHandle = res.Value();
138-
dumping_.Push(std::move(shardTask));
139-
}
131+
dumpCtx.backendTaskHandle = res.Value();
132+
dumping_.Push(std::move(dumpCtx));
140133
auto tpEnd = NowTime::Now();
141-
UC_DEBUG("Cache task({}) mk_buf={:06f}s, sync={:06f}s, back={:06f}s.", task->id,
142-
tpMakeBuffer - tp, tpSyncStream - tpMakeBuffer, tpEnd - tpSyncStream);
134+
UC_DEBUG("Cache task({}) mk_buf={:.3f}ms, sync={:.3f}ms, back={:.3f}ms.", task->id,
135+
(tpMakeBuffer - tp) * 1e3, (tpSyncStream - tpMakeBuffer) * 1e3,
136+
(tpEnd - tpSyncStream) * 1e3);
143137
return Status::OK();
144138
}
145139

ucm/store/cache/cc/dump_queue.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ class DumpQueue {
4141
using WaiterPtr = std::shared_ptr<Latch>;
4242
using TaskPair = std::pair<TaskPtr, WaiterPtr>;
4343
using TaskIdSet = HashSet<Detail::TaskHandle>;
44-
struct ShardTask {
44+
struct DumpCtx {
4545
Detail::TaskHandle backendTaskHandle;
46-
TransBuffer::Handle bufferHandle;
46+
std::vector<TransBuffer::Handle> bufferHandles;
4747
};
4848

4949
private:
@@ -53,7 +53,7 @@ class DumpQueue {
5353
TransBuffer* buffer_{nullptr};
5454
Store* backend_{nullptr};
5555
SpscRingQueue<TaskPair> waiting_;
56-
SpscRingQueue<ShardTask> dumping_;
56+
SpscRingQueue<DumpCtx> dumping_;
5757
std::thread dispatcher_;
5858
std::thread dumper_;
5959

0 commit comments

Comments
 (0)