133 lines
3.7 KiB
C++
133 lines
3.7 KiB
C++
#ifndef OH_BS_TASK_EXECUTOR_H
|
|
#define OH_BS_TASK_EXECUTOR_H
|
|
|
|
#include "TL/Map.h"
|
|
#include "TL/Queue.h"
|
|
#include "Terminal/Terminal.h"
|
|
#include "CCThreadPool.h"
|
|
#include "Transmitter.h"
|
|
|
|
struct ExecutorInfo{
|
|
int Index = -1;
|
|
CCList<int> list = {};
|
|
int TaskID = -1;
|
|
};
|
|
|
|
class TaskExecutor{
|
|
CTL::Map<int, int> Terms;
|
|
CTL::Queue<CTL::ByteArray> M_Queue;
|
|
bool Flag = false;
|
|
bool Flag_t = false;
|
|
static CTL::ThreadPool thread_pool_t;
|
|
public:
|
|
//------------------------------------------------------------------------------------------------------------------
|
|
static void InitPool() {
|
|
thread_pool_t.InitStart(100,100,1000 * 5);
|
|
}
|
|
static void ReleasePool() {
|
|
thread_pool_t.Stop();
|
|
}
|
|
//------------------------------------------------------------------------------------------------------------------
|
|
int Index = -1;
|
|
int Test = 0;
|
|
int TestNum = 0;
|
|
int DistributorFlag = 0;
|
|
int TaskID = -1;
|
|
//------------------------------------------------------------------------------------------------------------------
|
|
TaskExecutor() = default;
|
|
~TaskExecutor() = default;
|
|
TaskExecutor(TaskExecutor& taskExecutor){
|
|
this->Terms.SetStdMap(taskExecutor.Terms);
|
|
this->Index = taskExecutor.Index;
|
|
}
|
|
TaskExecutor(const TaskExecutor& taskExecutor){
|
|
this->Terms.SetStdMap(taskExecutor.Terms);
|
|
this->Index = taskExecutor.Index;
|
|
}
|
|
void Init(const ExecutorInfo& taskExecutor){
|
|
for (auto ID : taskExecutor.list) {
|
|
Terms.Put(ID,ID);
|
|
}
|
|
this->Index = taskExecutor.Index;
|
|
this->TaskID = taskExecutor.TaskID;
|
|
}
|
|
int GetTermSize(){
|
|
return Terms.Size();
|
|
}
|
|
//------------------------------------------------------------------------------------------------------------------
|
|
void AddBuffer(const CTL::ByteArray& buffer){
|
|
M_Queue.Add(buffer);
|
|
}
|
|
void Start(){
|
|
Flag = true;
|
|
Flag_t = true;
|
|
thread_pool_t.AddTask(&TaskExecutor::Running,this);
|
|
}
|
|
void Stop(){
|
|
Flag = false;
|
|
while (Flag_t){
|
|
CTL::Thread::SleepMS(1);
|
|
}
|
|
}
|
|
void AddTerm(const int TID,const CTL::ByteArray& Command){
|
|
if(Terms.IsContains(TID)){
|
|
return;
|
|
}
|
|
Terms.Put(TID,TID);
|
|
}
|
|
void RemoveTerm(const int TID,const CTL::ByteArray& Command){
|
|
if(!Terms.IsContains(TID)){
|
|
return;
|
|
}
|
|
Terms.Remove(TID);
|
|
}
|
|
static bool NotificationTerminal(const int TID,const CTL::ByteArray& Command){
|
|
auto Term = Terminal::getData(TID);
|
|
if(Term){
|
|
auto* AIM = new AIMInfo();
|
|
AIM->IP = Term->IP;
|
|
AIM->Port = Term->Port;
|
|
AIM->Data = Command;
|
|
Transmitter::Send_Massage(AIM);
|
|
delete AIM;
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
bool IsStop(){
|
|
return Flag_t;
|
|
}
|
|
private:
|
|
void SendTermsData(const CTL::ByteArray& data){
|
|
auto list = Terms.Values();
|
|
auto* AIM = new AIMInfo();
|
|
CTL::AutoDestruct<AIMInfo> Aim_Destruct(AIM);
|
|
AIM->Data = data;
|
|
for(auto ID : list){
|
|
auto Term = Terminal::getData(ID);
|
|
if(Term){
|
|
if(Term->Status == 1){
|
|
AIM->IP = Term->IP;
|
|
AIM->Port = Term->StreamPort;
|
|
Transmitter::Send_Stream(AIM);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
void Running(){
|
|
while (Flag){
|
|
auto buffer = M_Queue.Poll();
|
|
if(buffer){
|
|
SendTermsData(*buffer.get());
|
|
}
|
|
else {
|
|
CTL::Thread::SleepMS(2);
|
|
}
|
|
}
|
|
this->Flag_t = false;
|
|
}
|
|
};
|
|
|
|
|
|
#endif
|