Skip to content

Commit 7c17201

Browse files
author
lijiachen19
committed
threadpool monitor
1 parent 24e6bfa commit 7c17201

File tree

5 files changed

+234
-33
lines changed

5 files changed

+234
-33
lines changed

ucm/shared/infra/thread/thread_pool.h

Lines changed: 123 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -24,34 +24,57 @@
2424
#ifndef UNIFIEDCACHE_INFRA_THREAD_POOL_H
2525
#define UNIFIEDCACHE_INFRA_THREAD_POOL_H
2626

27+
#include <atomic>
2728
#include <condition_variable>
2829
#include <functional>
2930
#include <future>
3031
#include <list>
32+
#include <memory>
3133
#include <mutex>
34+
#include <sys/syscall.h>
3235
#include <thread>
36+
#include <unistd.h>
37+
#include <vector>
3338

3439
namespace UC {
3540

3641
template <class Task, class WorkerArgs = void*>
3742
class ThreadPool {
3843
using WorkerInitFn = std::function<bool(WorkerArgs&)>;
3944
using WorkerFn = std::function<void(Task&, const WorkerArgs&)>;
45+
using WorkerTimeoutFn = std::function<void(Task&, const ssize_t)>;
4046
using WorkerExitFn = std::function<void(WorkerArgs&)>;
4147

48+
class StopToken {
49+
std::shared_ptr<std::atomic<bool>> flag_ = std::make_shared<std::atomic<bool>>(false);
50+
51+
public:
52+
void RequestStop() noexcept { this->flag_->store(true, std::memory_order_relaxed); }
53+
bool StopRequested() const noexcept { return this->flag_->load(std::memory_order_relaxed); }
54+
};
55+
56+
struct Worker {
57+
ssize_t tid;
58+
std::thread th;
59+
StopToken stop;
60+
std::weak_ptr<Task> current;
61+
std::atomic<std::chrono::steady_clock::time_point> tp{};
62+
};
63+
4264
public:
4365
ThreadPool() = default;
4466
ThreadPool(const ThreadPool&) = delete;
4567
ThreadPool& operator=(const ThreadPool&) = delete;
4668
~ThreadPool()
4769
{
4870
{
49-
std::unique_lock<std::mutex> lk(this->mtx_);
71+
std::lock_guard<std::mutex> lock(this->taskMtx_);
5072
this->stop_ = true;
5173
this->cv_.notify_all();
5274
}
53-
for (auto& w : this->workers_) {
54-
if (w.joinable()) { w.join(); }
75+
if (this->monitor_.joinable()) { this->monitor_.join(); }
76+
for (auto& worker : this->workers_) {
77+
if (worker->th.joinable()) { worker->th.join(); }
5578
}
5679
}
5780
ThreadPool& SetWorkerFn(WorkerFn&& fn)
@@ -69,6 +92,14 @@ class ThreadPool {
6992
this->exitFn_ = std::move(fn);
7093
return *this;
7194
}
95+
ThreadPool& SetWorkerTimeoutFn(WorkerTimeoutFn&& fn, const size_t timeoutMs,
96+
const size_t intervalMs = 1000)
97+
{
98+
this->timeoutFn_ = std::move(fn);
99+
this->timeoutMs_ = timeoutMs;
100+
this->intervalMs_ = intervalMs;
101+
return *this;
102+
}
72103
ThreadPool& SetNWorker(const size_t nWorker)
73104
{
74105
this->nWorker_ = nWorker;
@@ -77,61 +108,123 @@ class ThreadPool {
77108
bool Run()
78109
{
79110
if (this->nWorker_ == 0) { return false; }
80-
if (!this->fn_) { return false; }
81-
std::list<std::promise<bool>> start(this->nWorker_);
82-
std::list<std::future<bool>> fut;
83-
for (auto& s : start) {
84-
fut.push_back(s.get_future());
85-
this->workers_.emplace_back([&] { this->Worker(s); });
111+
if (this->fn_ == nullptr) { return false; }
112+
this->workers_.reserve(this->nWorker_);
113+
for (size_t i = 0; i < this->nWorker_; i++) {
114+
if (!this->AddOneWorker()) { return false; }
86115
}
87-
auto success = true;
88-
for (auto& f : fut) {
89-
if (!f.get()) { success = false; }
116+
if (this->timeoutMs_ > 0) {
117+
this->monitor_ = std::thread([this] { this->MonitorLoop(); });
90118
}
91-
return success;
119+
return true;
92120
}
93121
void Push(std::list<Task>& tasks) noexcept
94122
{
95-
std::unique_lock<std::mutex> lk(this->mtx_);
123+
std::unique_lock<std::mutex> lock(this->taskMtx_);
96124
this->taskQ_.splice(this->taskQ_.end(), tasks);
97125
this->cv_.notify_all();
98126
}
99127
void Push(Task&& task) noexcept
100128
{
101-
std::unique_lock<std::mutex> lk(this->mtx_);
129+
std::unique_lock<std::mutex> lock(this->taskMtx_);
102130
this->taskQ_.push_back(std::move(task));
103131
this->cv_.notify_one();
104132
}
105133

106134
private:
107-
void Worker(std::promise<bool>& started) noexcept
135+
bool AddOneWorker()
136+
{
137+
try {
138+
auto worker = std::make_shared<Worker>();
139+
std::promise<bool> prom;
140+
auto fut = prom.get_future();
141+
worker->th = std::thread([this, worker, &prom] { this->WorkerLoop(prom, worker); });
142+
auto success = fut.get();
143+
if (!success) { return false; }
144+
this->workers_.push_back(worker);
145+
return true;
146+
} catch (...) {
147+
return false;
148+
}
149+
}
150+
void WorkerLoop(std::promise<bool>& prom, std::shared_ptr<Worker> worker)
108151
{
152+
worker->tid = syscall(SYS_gettid);
109153
WorkerArgs args = nullptr;
110154
auto success = true;
111155
if (this->initFn_) { success = this->initFn_(args); }
112-
started.set_value(success);
156+
prom.set_value(success);
113157
while (success) {
114-
std::unique_lock<std::mutex> lk(this->mtx_);
115-
this->cv_.wait(lk, [this] { return this->stop_ || !this->taskQ_.empty(); });
116-
if (this->stop_) { break; }
117-
if (this->taskQ_.empty()) { continue; }
118-
auto task = std::make_shared<Task>(std::move(this->taskQ_.front()));
119-
this->taskQ_.pop_front();
120-
lk.unlock();
158+
std::shared_ptr<Task> task = nullptr;
159+
{
160+
std::unique_lock<std::mutex> lock(this->taskMtx_);
161+
this->cv_.wait(lock, [this, worker] {
162+
return this->stop_ || worker->stop.StopRequested() || !this->taskQ_.empty();
163+
});
164+
if (this->stop_ || worker->stop.StopRequested()) { break; }
165+
if (this->taskQ_.empty()) { continue; }
166+
task = std::make_shared<Task>(std::move(this->taskQ_.front()));
167+
this->taskQ_.pop_front();
168+
}
169+
worker->current = task;
170+
worker->tp.store(std::chrono::steady_clock::now(), std::memory_order_relaxed);
121171
this->fn_(*task, args);
172+
if (worker->stop.StopRequested()) { break; }
173+
worker->current.reset();
174+
worker->tp.store({}, std::memory_order_relaxed);
122175
}
123176
if (this->exitFn_) { this->exitFn_(args); }
124177
}
125178

179+
void MonitorLoop()
180+
{
181+
const auto interval = std::chrono::milliseconds(this->intervalMs_);
182+
while (true) {
183+
{
184+
std::unique_lock<std::mutex> lock(this->taskMtx_);
185+
this->cv_.wait_for(lock, interval, [this] { return this->stop_; });
186+
if (this->stop_) { break; }
187+
}
188+
size_t nWorker = this->Monitor();
189+
for (size_t i = nWorker; i < this->nWorker_; i++) { (void)this->AddOneWorker(); }
190+
}
191+
}
192+
193+
size_t Monitor()
194+
{
195+
using namespace std::chrono;
196+
const auto timeout = milliseconds(this->timeoutMs_);
197+
size_t nWorker = 0;
198+
for (auto it = this->workers_.begin(); it != this->workers_.end();) {
199+
auto tp = (*it)->tp.load(std::memory_order_relaxed);
200+
auto task = (*it)->current.lock();
201+
auto now = steady_clock::now();
202+
if (task && tp != steady_clock::time_point{} && now - tp > timeout) {
203+
if (this->timeoutFn_) { this->timeoutFn_(*task, (*it)->tid); }
204+
(*it)->stop.RequestStop();
205+
if ((*it)->th.joinable()) { (*it)->th.detach(); }
206+
it = this->workers_.erase(it);
207+
} else {
208+
it++;
209+
}
210+
nWorker++;
211+
}
212+
return nWorker;
213+
}
214+
126215
private:
216+
WorkerInitFn initFn_{nullptr};
217+
WorkerFn fn_{nullptr};
218+
WorkerTimeoutFn timeoutFn_{nullptr};
219+
WorkerExitFn exitFn_{nullptr};
220+
size_t timeoutMs_{0};
221+
size_t intervalMs_{0};
222+
size_t nWorker_{0};
127223
bool stop_{false};
128-
size_t nWorker_{1};
129-
std::list<std::thread> workers_;
130-
WorkerInitFn initFn_;
131-
WorkerFn fn_;
132-
WorkerExitFn exitFn_;
224+
std::vector<std::shared_ptr<Worker>> workers_;
225+
std::thread monitor_;
226+
std::mutex taskMtx_;
133227
std::list<Task> taskQ_;
134-
std::mutex mtx_;
135228
std::condition_variable cv_;
136229
};
137230

ucm/store/pcstore/cc/domain/trans/trans_manager.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ Status TransManager::Setup(const size_t rankSize, const int32_t deviceId, const
3838
if (s.Failure()) { return s; }
3939
}
4040
s = this->queue_.Setup(deviceId, streamNumber, blockSize, ioSize, ioDirect, bufferNumber,
41-
layout, &this->failureSet_, scatterGatherEnable);
41+
layout, &this->failureSet_, scatterGatherEnable, timeoutMs);
4242
if (s.Failure()) { return s; }
4343
this->rankSize_ = rankSize;
4444
this->timeoutMs_ = timeoutMs;

ucm/store/pcstore/cc/domain/trans/trans_queue.cc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ void TransQueue::FileWorker(BlockTask&& task)
7878
Status TransQueue::Setup(const int32_t deviceId, const size_t streamNumber, const size_t blockSize,
7979
const size_t ioSize, const bool ioDirect, const size_t bufferNumber,
8080
const SpaceLayout* layout, TaskSet* failureSet_,
81-
const bool scatterGatherEnable)
81+
const bool scatterGatherEnable, const size_t timeoutMs)
8282
{
8383
Trans::Device device;
8484
auto ts = device.Setup(deviceId);
@@ -110,6 +110,12 @@ Status TransQueue::Setup(const int32_t deviceId, const size_t streamNumber, cons
110110
.Run();
111111
if (!success) { return Status::Error(); }
112112
success = this->filePool_.SetWorkerFn([this](auto t, auto) { this->FileWorker(std::move(t)); })
113+
.SetWorkerTimeoutFn(
114+
[this](auto t, auto tid) {
115+
UC_WARN("FileWorker timeout({}).", tid);
116+
t.done(false);
117+
},
118+
timeoutMs)
113119
.SetNWorker(streamNumber)
114120
.Run();
115121
if (!success) { return Status::Error(); }

ucm/store/pcstore/cc/domain/trans/trans_queue.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ class TransQueue {
5151
public:
5252
Status Setup(const int32_t deviceId, const size_t streamNumber, const size_t blockSize,
5353
const size_t ioSize, const bool ioDirect, const size_t bufferNumber,
54-
const SpaceLayout* layout, TaskSet* failureSet_, const bool scatterGatherEnable);
54+
const SpaceLayout* layout, TaskSet* failureSet_, const bool scatterGatherEnable,
55+
const size_t timeoutMs);
5556
void Dispatch(TaskPtr task, WaiterPtr waiter);
5657
void DispatchDump(TaskPtr task, WaiterPtr waiter);
5758
void DispatchSatterGatherDump(TaskPtr task, WaiterPtr waiter);
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/**
2+
* MIT License
3+
*
4+
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
* */
24+
25+
#include <chrono>
26+
#include <fcntl.h>
27+
#include <gtest/gtest.h>
28+
#include <unistd.h>
29+
#include "thread/latch.h"
30+
#include "thread/thread_pool.h"
31+
32+
class UCThreadPoolTest : public ::testing::Test {};
33+
34+
TEST_F(UCThreadPoolTest, TimeoutDetection)
35+
{
36+
struct TestTask {
37+
int taskId;
38+
std::atomic<bool>* finished;
39+
std::atomic<bool>* timeout;
40+
};
41+
42+
constexpr size_t nWorker = 2;
43+
constexpr size_t timeoutMs = 20;
44+
std::atomic<int> timeoutCount{0};
45+
std::atomic<bool> taskFinished{false};
46+
std::atomic<bool> taskTimeout{false};
47+
48+
UC::ThreadPool<TestTask> threadPool;
49+
threadPool.SetNWorker(nWorker)
50+
.SetWorkerFn([](TestTask& task, const auto&) {
51+
std::this_thread::sleep_for(std::chrono::milliseconds(30));
52+
*(task.finished) = true;
53+
})
54+
.SetWorkerTimeoutFn(
55+
[&](TestTask& task, const auto) {
56+
timeoutCount++;
57+
task.timeout->store(true);
58+
},
59+
timeoutMs, 10)
60+
.Run();
61+
std::list<TestTask> tasks{
62+
{1, &taskFinished, &taskTimeout}
63+
};
64+
threadPool.Push(tasks);
65+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
66+
ASSERT_GT(timeoutCount.load(), 0);
67+
ASSERT_TRUE(taskTimeout.load());
68+
}
69+
70+
TEST_F(UCThreadPoolTest, SimulatedFileSystemHang)
71+
{
72+
struct TestTask {
73+
std::atomic<bool>* simulatingHang;
74+
};
75+
76+
std::atomic<int> hangDetected{0};
77+
constexpr size_t hangTimeoutMs = 20;
78+
std::atomic<bool> taskHang{true};
79+
80+
UC::ThreadPool<TestTask> threadPool;
81+
threadPool.SetNWorker(1)
82+
.SetWorkerFn([](TestTask& task, const auto&) {
83+
std::mutex fakeMutex;
84+
std::unique_lock<std::mutex> fakelock(fakeMutex);
85+
std::condition_variable fakeCond;
86+
while (*(task.simulatingHang)) {
87+
fakeCond.wait_for(fakelock, std::chrono::milliseconds(10)); // waiting forever
88+
}
89+
})
90+
.SetWorkerTimeoutFn(
91+
[&](TestTask& task, const auto) {
92+
hangDetected++;
93+
*(task.simulatingHang) = false; // stop simulating hang
94+
},
95+
hangTimeoutMs, 10)
96+
.Run();
97+
std::list<TestTask> tasks{{&taskHang}};
98+
threadPool.Push(tasks);
99+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
100+
EXPECT_GT(hangDetected.load(), 0);
101+
}

0 commit comments

Comments
 (0)