mirror of
https://gitee.com/TarsCloud/TarsCpp.git
synced 2025-01-05 17:42:24 +08:00
fix tc_thread_queue,Optimizing the swarm effect
This commit is contained in:
parent
cc5e547d1b
commit
2f1df872a3
@ -42,7 +42,7 @@ template<typename T, typename D = deque<T> >
|
|||||||
class TC_ThreadQueue
|
class TC_ThreadQueue
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
TC_ThreadQueue():_size(0), _bTerminate(false){};
|
TC_ThreadQueue() : _size(0){}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|
||||||
@ -139,17 +139,15 @@ public:
|
|||||||
*/
|
*/
|
||||||
bool empty() const;
|
bool empty() const;
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief 调用后, 所有正在进行的、将要进行的阻塞等待将立即返回.
|
|
||||||
*/
|
|
||||||
void terminate();
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
TC_ThreadQueue(const TC_ThreadQueue&) = delete;
|
TC_ThreadQueue(const TC_ThreadQueue&) = delete;
|
||||||
TC_ThreadQueue(TC_ThreadQueue&&) = delete;
|
TC_ThreadQueue(TC_ThreadQueue&&) = delete;
|
||||||
TC_ThreadQueue& operator=(const TC_ThreadQueue&) = delete;
|
TC_ThreadQueue& operator=(const TC_ThreadQueue&) = delete;
|
||||||
TC_ThreadQueue& operator=(TC_ThreadQueue&&) = delete;
|
TC_ThreadQueue& operator=(TC_ThreadQueue&&) = delete;
|
||||||
|
|
||||||
|
bool hasNotify(size_t lockId) const {
|
||||||
|
return lockId != _lockId;
|
||||||
|
}
|
||||||
protected:
|
protected:
|
||||||
/**
|
/**
|
||||||
* 队列
|
* 队列
|
||||||
@ -167,8 +165,8 @@ protected:
|
|||||||
//锁
|
//锁
|
||||||
mutable std::mutex _mutex;
|
mutable std::mutex _mutex;
|
||||||
|
|
||||||
//结束工作否,若为true则所有的阻塞等待都将立即返回
|
//lockId, 判断请求是否唤醒过
|
||||||
bool _bTerminate;
|
size_t _lockId = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
template<typename T, typename D> T TC_ThreadQueue<T, D>::front()
|
template<typename T, typename D> T TC_ThreadQueue<T, D>::front()
|
||||||
@ -182,19 +180,21 @@ template<typename T, typename D> bool TC_ThreadQueue<T, D>::pop_front(T& t, size
|
|||||||
{
|
{
|
||||||
if(wait) {
|
if(wait) {
|
||||||
|
|
||||||
|
size_t lockId = _lockId;
|
||||||
|
|
||||||
std::unique_lock<std::mutex> lock(_mutex);
|
std::unique_lock<std::mutex> lock(_mutex);
|
||||||
|
|
||||||
// 此处等待两个条件: 1.来数据了; 2.terminate.
|
// 此处等待两个条件: 1.来数据了; 2.有人唤醒了.
|
||||||
// 任一条件满足都将打破等待立即返回
|
// 任一条件满足都将打破等待立即返回
|
||||||
if (millsecond == (size_t) -1) {
|
if (millsecond == (size_t) -1) {
|
||||||
_cond.wait(lock, [this] { return !_queue.empty() || _bTerminate; });
|
_cond.wait(lock, [&] { return !_queue.empty() || hasNotify(lockId); });
|
||||||
}
|
}
|
||||||
else if (millsecond > 0) {
|
else if (millsecond > 0) {
|
||||||
_cond.wait_for(lock, std::chrono::milliseconds(millsecond), [this] { return !_queue.empty() || _bTerminate; });
|
_cond.wait_for(lock, std::chrono::milliseconds(millsecond), [&] { return !_queue.empty() || hasNotify(lockId); });
|
||||||
}
|
}
|
||||||
|
|
||||||
// 超时了数据还没到 或 还没超时就被terminate打破了, 直接返回
|
// 超时了数据还没到 或 还没超时就被notify打破了, 直接返回
|
||||||
if (_queue.empty() || _bTerminate) {
|
if (_queue.empty() || hasNotify(lockId)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -241,6 +241,7 @@ template<typename T, typename D> bool TC_ThreadQueue<T, D>::pop_front()
|
|||||||
template<typename T, typename D> void TC_ThreadQueue<T, D>::notifyT()
|
template<typename T, typename D> void TC_ThreadQueue<T, D>::notifyT()
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> lock(_mutex);
|
std::unique_lock<std::mutex> lock(_mutex);
|
||||||
|
++_lockId;
|
||||||
_cond.notify_all();
|
_cond.notify_all();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -345,19 +346,22 @@ template<typename T, typename D> void TC_ThreadQueue<T, D>::push_front(const que
|
|||||||
template<typename T, typename D> bool TC_ThreadQueue<T, D>::swap(queue_type &q, size_t millsecond, bool wait)
|
template<typename T, typename D> bool TC_ThreadQueue<T, D>::swap(queue_type &q, size_t millsecond, bool wait)
|
||||||
{
|
{
|
||||||
if(wait) {
|
if(wait) {
|
||||||
|
|
||||||
|
size_t lockId = _lockId;
|
||||||
|
|
||||||
std::unique_lock<std::mutex> lock(_mutex);
|
std::unique_lock<std::mutex> lock(_mutex);
|
||||||
|
|
||||||
// 此处等待两个条件: 1.来数据了; 2.terminate.
|
// 此处等待两个条件: 1.来数据了; 2.notify来了
|
||||||
// 任一条件满足都将打破等待立即返回
|
// 任一条件满足都将打破等待立即返回
|
||||||
if (millsecond == (size_t) -1) {
|
if (millsecond == (size_t) -1) {
|
||||||
_cond.wait(lock, [this] { return !_queue.empty() || _bTerminate; });
|
_cond.wait(lock, [&] { return !_queue.empty() || hasNotify(lockId); });
|
||||||
}
|
}
|
||||||
else if (millsecond > 0) {
|
else if (millsecond > 0) {
|
||||||
_cond.wait_for(lock, std::chrono::milliseconds(millsecond), [this] { return !_queue.empty() || _bTerminate; });
|
_cond.wait_for(lock, std::chrono::milliseconds(millsecond), [&] { return !_queue.empty() || hasNotify(lockId); });
|
||||||
}
|
}
|
||||||
|
|
||||||
// 超时了数据还没到 或 还没超时就被terminate打破了, 直接返回
|
// 超时了数据还没到 或 还没超时就被notify唤醒了, 直接返回
|
||||||
if (_queue.empty() || _bTerminate) {
|
if (_queue.empty() || hasNotify(lockId)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -401,12 +405,6 @@ template<typename T, typename D> bool TC_ThreadQueue<T, D>::empty() const
|
|||||||
return _queue.empty();
|
return _queue.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename T, typename D> void TC_ThreadQueue<T, D>::terminate() {
|
|
||||||
std::lock_guard<std::mutex> lock(_mutex);
|
|
||||||
_bTerminate = true;
|
|
||||||
_cond.notify_all();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user