#ifndef OH_BS_TASK_ALLOCATOR_H #define OH_BS_TASK_ALLOCATOR_H #define Thread_TermNumber_Max 60 #include "TaskExecutor.h" #include "TL/List.h" #include "CCVector.h" #include "Config.h" struct AllocatorInfo{ int TaskID = -1; CCVector Terms; }; class TaskAllocator{ CTL::Map executors; bool Flag = false; static int CalculateNumberThreads(const CCList& Terms){ int size = (int)Terms.size() / Thread_TermNumber_Max ; if(size == 0){ return size + 1; } else { return size; } } static int CalculateNumberThreads(const int TestCount){ const int size = TestCount / Thread_TermNumber_Max; if(size == 0){ return size + 1; } else { return size; } } static std::list GetTerms(const int Index,CCVector& Terms){ std::list terms; int index = Index * Thread_TermNumber_Max; int size = (Index + 1) * Thread_TermNumber_Max; for(int i = index;i < size;i++){ if (Terms.size() > i) { const auto ID = Terms.get(i); if(ID){ terms.push_back(ID); } else { break; } } else { break; } } return terms; } CTL::Map TermList; int TaskID = -1; mutable std::shared_mutex mutex_t; public: //------------------------------------------------------------------------------------------------------------------ TaskAllocator() = default; //------------------------------------------------------------------------------------------------------------------ void Init(AllocatorInfo& info){ const int Thread_Num = CalculateNumberThreads(info.Terms); for (const auto ID : info.Terms) { TermList.Put(ID,ID); } this->TaskID = info.TaskID; for(int i = 0;i < Thread_Num;i++){ ExecutorInfo executorInfo; executorInfo.TaskID = info.TaskID; executorInfo.list = GetTerms(i, info.Terms); executorInfo.Index = i; auto executor = new TaskExecutor(); executor->Init(executorInfo); executors.Put(executorInfo.Index, executor); } } bool Start(){ std::unique_lock lock(mutex_t); this->Flag = true; for(const auto& executor : executors){ if(executor.second){ executor.second->Start(); } } return true; } void Stop(){ std::unique_lock lock(mutex_t); Flag = false; for(auto& executor : executors){ if(executor.second){ executor.second->Stop(); } delete executor.second; executor.second = nullptr; } executors.Clear(); } void AddBuffer(const CTL::ByteArray& buffer){ std::unique_lock lock(mutex_t); for(auto& executor : executors){ if(executor.second){ executor.second->AddBuffer(buffer); } } } void AddTerm(const int TID,const CTL::ByteArray& Command){ std::unique_lock lock(mutex_t); for(auto& executor : executors){ if(executor.second){ if(executor.second->GetTermSize() < Thread_TermNumber_Max){ executor.second->AddTerm(TID,Command); return; } } } int Size = executors.Size(); ExecutorInfo executorInfo; executorInfo.TaskID = TaskID; CCVector Terms = {TID}; executorInfo.list = GetTerms(Size, Terms); executorInfo.Index = Size; auto executor = new TaskExecutor(); executor->Init(executorInfo); executors.Put(executorInfo.Index, executor); executor->Start(); TaskExecutor::NotificationTerminal(TID, Command); } void RemoveTerm(const int TID,const CTL::ByteArray& Command){ std::unique_lock lock(mutex_t); CCVector IndexList; for(auto& executor : executors){ if(executor.second){ executor.second->RemoveTerm(TID,Command); if(executor.second->GetTermSize() <= 0){ IndexList.add_lock(executor.first); } } } for(auto& index : IndexList){ auto executor = executors.Get(index);; if(executor && *executor){ auto item = *executor; item->Stop(); delete item; item = nullptr; } executors.Remove(index); } } }; #endif