Distribution_Service/CC_SDK/Include/Module/IO/CCThreadPool.h

146 lines
5.1 KiB
C
Raw Normal View History

2025-11-11 17:46:19 +08:00
#ifndef __CC_ThreadPool_H__
#define __CC_ThreadPool_H__
#pragma once
#include <CCThread.h>
#include "CC.h"
#include "CCAutoLock.h"
2025-12-03 18:08:23 +08:00
2025-11-11 17:46:19 +08:00
/**
* CTL线
*/
namespace CTL {
/**
* ThreadPool类线
*/
class ThreadPool {
private:
size_t m_corePoolSize{}; // 核心线程池大小
size_t m_maximumPoolSize{}; // 最大线程池大小
int m_keepAliveTime{}; // 线程存活时间
2025-12-03 18:08:23 +08:00
int MAX_QUEUE_SIZE = 4096;
2025-11-11 17:46:19 +08:00
std::atomic<bool> m_isStop{}; // 使用 atomic 确保线程安全,表示线程池是否停止
2025-12-03 18:08:23 +08:00
std::atomic<int> m_C_threadPool{}; // 使用 atomic 确保线程安全,表示当前线程池中的核心线程数
2025-11-11 17:46:19 +08:00
std::atomic<int> T_PoolSize{}; // 使用 atomic 确保线程安全,表示当前线程池中的任务数
std::atomic<int> E_PoolSize{}; // 使用 atomic 确保线程安全,表示当前额外线程数
std::atomic<int> T_CorePoolSize{}; // 使用 atomic 确保线程安全,表示当前核心任务数
std::mutex m_mutex{}; // 互斥锁,用于保护共享资源
std::condition_variable m_condition{}; // 条件变量,用于线程间通信
std::queue<Function<void()>> m_taskQueue{}; // 任务队列,存储待执行的任务
std::vector<std::thread> m_thread{}; // 使用标准库的 std::thread存储线程池中的线程
public:
ThreadPool(); // 默认构造函数
~ThreadPool(); // 析构函数
/**
* 线
* @param corePoolSize 线
* @param maximumPoolSize 线1024
* @param keepAliveTime 线1000
*/
explicit ThreadPool(const size_t corePoolSize, size_t maximumPoolSize = 1024, int keepAliveTime = 1000)
: m_corePoolSize(corePoolSize), m_maximumPoolSize(maximumPoolSize), m_keepAliveTime(keepAliveTime),
m_isStop(false), m_C_threadPool(0), T_PoolSize(0), E_PoolSize(0), T_CorePoolSize(0) {
InitStart(corePoolSize, maximumPoolSize, keepAliveTime);
}
/**
* 线
* @param corePoolSize 线
* @param maximumPoolSize 线
* @param keepAliveTime 线
*/
void InitStart(size_t corePoolSize, size_t maximumPoolSize, int keepAliveTime);
/**
* 线
*/
void Stop();
/**
* 线
* @param func
* @param args
*/
template<typename Fun,typename ...Args>
void AddTask(Fun&& func, Args&&... args);
/**
* 线
* @return 线
*/
int GetActiveThread() const;
/**
* 线
* @return 线
*/
int GetMaxPoolSize() const;
/**
* 线
* @return 线
*/
int GetCorePoolSize() const;
/**
* 线
* @return 线
*/
int GetTimeout() const;
/**
* 线
* @return 线
*/
int GetActiveExtraThread() const;
private:
/**
* 线
* @param index 线
*/
void Worker(int index);
/**
*
* @param str
*/
void CC_PRINT_ERROR(const CTL::String& str);
/**
* 线
* @param Task
*/
void ExtraThread(Function<void()> Task);
};
template <typename Fun, typename ... Args>
/**
* 线
* @param func
* @param args
*/
void ThreadPool::AddTask(Fun&& func, Args&&... args) {
2025-12-03 18:08:23 +08:00
if (m_isStop.load()) {
return;
}
Function<void()> task = std::bind(std::forward<Fun>(func), std::forward<Args>(args)...);
if (!task) {
return;
}
2025-11-11 17:46:19 +08:00
CCUniqueLock lock(m_mutex);
2025-12-03 18:08:23 +08:00
if (task) {
// 更合理的任务分配策略
if (T_CorePoolSize.load() < m_corePoolSize) {
m_taskQueue.emplace(task);
m_condition.notify_one();
}
else if (E_PoolSize.load() < (m_maximumPoolSize - m_corePoolSize)) {
ExtraThread(task);
CC_PRINT_ERROR("ExtraThread");
}
else if (m_taskQueue.size() < MAX_QUEUE_SIZE) { // 添加队列大小限制
m_taskQueue.emplace(task);
m_condition.notify_one();
}
else {
// 队列满时的处理策略:拒绝、等待或扩容
CC_PRINT_ERROR("Task queue is full, task rejected");
2025-11-11 17:46:19 +08:00
}
}
}
}
#endif