Distribution_Service/Server/TaskModel/TaskAllocator.h
2026-03-24 14:43:26 +08:00

157 lines
4.8 KiB
C++

#ifndef OH_BS_TASK_ALLOCATOR_H
#define OH_BS_TASK_ALLOCATOR_H
#define Thread_TermNumber_Max 60
#include "TaskExecutor.h"
#include "TL/List.h"
#include "CCVector.h"
#include "Config.h"
struct AllocatorInfo{
int TaskID = -1;
CCVector<int> Terms;
};
class TaskAllocator{
CTL::Map<int,TaskExecutor*> executors;
bool Flag = false;
static int CalculateNumberThreads(const CCList<int>& Terms){
int size = (int)Terms.size() / Thread_TermNumber_Max ;
if(size == 0){
return size + 1;
}
else {
return size;
}
}
static int CalculateNumberThreads(const int TestCount){
const int size = TestCount / Thread_TermNumber_Max;
if(size == 0){
return size + 1;
}
else {
return size;
}
}
static std::list<int> GetTerms(const int Index,CCVector<int>& Terms){
std::list<int> terms;
int index = Index * Thread_TermNumber_Max;
int size = (Index + 1) * Thread_TermNumber_Max;
for(int i = index;i < size;i++){
if (Terms.size() > i) {
const auto ID = Terms.get(i);
if(ID){
terms.push_back(ID);
}
else {
break;
}
}
else {
break;
}
}
return terms;
}
CTL::Map<int, int> TermList;
int TaskID = -1;
mutable std::shared_mutex mutex_t;
public:
//------------------------------------------------------------------------------------------------------------------
TaskAllocator() = default;
//------------------------------------------------------------------------------------------------------------------
void Init(AllocatorInfo& info){
const int Thread_Num = CalculateNumberThreads(info.Terms);
for (const auto ID : info.Terms) {
TermList.Put(ID,ID);
}
this->TaskID = info.TaskID;
for(int i = 0;i < Thread_Num;i++){
ExecutorInfo executorInfo;
executorInfo.TaskID = info.TaskID;
executorInfo.list = GetTerms(i, info.Terms);
executorInfo.Index = i;
auto executor = new TaskExecutor();
executor->Init(executorInfo);
executors.Put(executorInfo.Index, executor);
}
}
bool Start(){
std::unique_lock<std::shared_mutex> lock(mutex_t);
this->Flag = true;
for(const auto& executor : executors){
if(executor.second){
executor.second->Start();
}
}
return true;
}
void Stop(){
std::unique_lock<std::shared_mutex> lock(mutex_t);
Flag = false;
for(auto& executor : executors){
if(executor.second){
executor.second->Stop();
}
delete executor.second;
executor.second = nullptr;
}
executors.Clear();
}
void AddBuffer(const CTL::ByteArray& buffer){
std::unique_lock<std::shared_mutex> lock(mutex_t);
for(auto& executor : executors){
if(executor.second){
executor.second->AddBuffer(buffer);
}
}
}
void AddTerm(const int TID,const CTL::ByteArray& Command){
std::unique_lock<std::shared_mutex> lock(mutex_t);
for(auto& executor : executors){
if(executor.second){
if(executor.second->GetTermSize() < Thread_TermNumber_Max){
executor.second->AddTerm(TID,Command);
return;
}
}
}
int Size = executors.Size();
ExecutorInfo executorInfo;
executorInfo.TaskID = TaskID;
CCVector<int> Terms = {TID};
executorInfo.list = GetTerms(Size, Terms);
executorInfo.Index = Size;
auto executor = new TaskExecutor();
executor->Init(executorInfo);
executors.Put(executorInfo.Index, executor);
executor->Start();
TaskExecutor::NotificationTerminal(TID, Command);
}
void RemoveTerm(const int TID,const CTL::ByteArray& Command){
std::unique_lock<std::shared_mutex> lock(mutex_t);
CCVector<int> IndexList;
for(auto& executor : executors){
if(executor.second){
executor.second->RemoveTerm(TID,Command);
if(executor.second->GetTermSize() <= 0){
IndexList.add_lock(executor.first);
}
}
}
for(auto& index : IndexList){
auto executor = executors.Get(index);;
if(executor && *executor){
auto item = *executor;
item->Stop();
delete item;
item = nullptr;
}
executors.Remove(index);
}
}
};
#endif