Service_NSSM/CC_SDK/Include/basic/ConcurrentQueue.h

109 lines
3.2 KiB
C
Raw Permalink Normal View History

2025-09-27 14:24:18 +08:00
#ifndef CONCURRENTQUEUE_H
#define CONCURRENTQUEUE_H
#include <atomic>
#include <memory>
/**
* @namespace CTL
* @brief
*/
namespace CTL {
/**
* @template class ConcurrentQueue
* @brief
*
* 线线使线
*/
template<typename T>
class ConcurrentQueue {
/**
* @struct Node
* @brief
*/
struct Node {
std::shared_ptr<T> data; // 节点存储的数据
std::atomic<Node*> next; // 指向下一个节点的指针
/**
* Node
*
* @param value
*/
explicit Node(T const& value) : data(std::make_shared<T>(value)) {}
};
std::atomic<Node*> head; // 队列头指针
std::atomic<Node*> tail; // 队列尾指针
std::atomic<size_t> size_; // 新增一个原子计数器来记录队列大小
public:
/**
* ConcurrentQueue
* 0
*/
ConcurrentQueue() : head(new Node(T())), tail(head.load()), size_(0) {}
/**
* ConcurrentQueue
*
*/
~ConcurrentQueue() {
while (Node* const oldHead = head.load()) {
head.store(oldHead->next);
delete oldHead;
}
}
/**
*
*
* @param data
*/
void add(T const& data) {
std::shared_ptr<T> newData(std::make_shared<T>(data));
Node* p = new Node(T());
Node* oldTail = tail.load();
oldTail->data.swap(newData);
oldTail->next.store(p);
tail.store(p);
size_.fetch_add(1); // 增加计数器
}
/**
*
*
* @return
*/
std::shared_ptr<T> front() {
Node* oldHead = head.load();
while (oldHead && !head.compare_exchange_weak(oldHead, oldHead->next)) {
// 重试
}
if (!oldHead) {
return std::shared_ptr<T>();
}
std::shared_ptr<T> res(oldHead->data);
delete oldHead;
size_.fetch_sub(1); // 减少计数器
return res;
}
/**
*
*
* @return
*/
[[nodiscard]] size_t size() const {
return size_.load(); // 返回当前队列大小
}
/**
*
*
* @return true false
*/
[[nodiscard]] bool isEmpty() const {
return size_.load() == 0; // 判断队列是否为空
}
};
}
#endif