123 lines
3.6 KiB
C++
123 lines
3.6 KiB
C++
#include "TaskInfo.h"
|
|
|
|
#include "PortService/ByteTool.h"
|
|
#include "TaskModel/TaskAllocator.h"
|
|
|
|
TaskInfo::TaskInfo() {
|
|
|
|
}
|
|
|
|
TaskInfo::~TaskInfo() {
|
|
if (pushFlowTask) {
|
|
delete pushFlowTask;
|
|
pushFlowTask = nullptr;
|
|
}
|
|
}
|
|
|
|
void TaskInfo::BitstreamHeartbeat() {
|
|
std::shared_lock lock(mutex_socket);
|
|
const auto Setting = Config::getConfig();
|
|
const auto IDS = APPTool::GetBytes(Setting->ID);
|
|
if (socket && !socket->isClosed()) {
|
|
CTL::DatagramPacket packet;
|
|
const CTL::InetAddress address(Setting->ServerIP);
|
|
packet.setAddress(address);
|
|
packet.setPort(this->ServerPort);
|
|
packet.setData({0x66,0xAB,0x01,TaskID,IDS[0],IDS[1],0x01,0x00});
|
|
const bool F = socket->send(packet);
|
|
if (!F) {
|
|
CTL::System::Println("TaskID: {} Heartbeat sending failed",TaskID);
|
|
}
|
|
}
|
|
}
|
|
|
|
void TaskInfo::InitSocket() {
|
|
const auto Setting = Config::getConfig();
|
|
CTL::InetAddress address(Setting->IP);
|
|
{
|
|
std::unique_lock lock(mutex_socket);
|
|
socket = new CTL::DatagramSocket(Setting->StreamPort,address);
|
|
}
|
|
while (!socket->isClosed()) {
|
|
if (socket->available()) {
|
|
CTL::DatagramPacket packet;
|
|
socket->receive(packet);
|
|
const auto buffer = packet.getData();
|
|
const auto length = packet.getLength();
|
|
if (length > 0) {
|
|
const int taskID = buffer[0];
|
|
const auto task = TaskInfo::getData(taskID);
|
|
if (task) {
|
|
if (task->Type == 0 && task->pushFlowTask) {
|
|
CTL::ByteArray data;
|
|
data.assign(buffer,length);
|
|
task->pushFlowTask->addBuffer(data);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else {
|
|
CTL::Thread::Sleep(300);
|
|
}
|
|
}
|
|
{
|
|
std::unique_lock lock(mutex_socket);
|
|
delete socket;
|
|
socket = nullptr;
|
|
}
|
|
}
|
|
|
|
void TaskInfo::Start(TaskInfo *taskInfo) {
|
|
TaskInfoMap.put(taskInfo->TaskID,taskInfo);
|
|
const auto Aim = new AIMInfo;
|
|
CTL::AutoDestruct AimDestruct(Aim);
|
|
Aim->Data = CTL::ByteArray(taskInfo->TaskOrder.data(),taskInfo->TaskOrder.size());
|
|
for (const auto &ID : taskInfo->Terms) {
|
|
const auto term = Terminal::getData(ID.first);
|
|
if (term) {
|
|
Aim->IP = term->IP;
|
|
Aim->Port = term->Port;
|
|
Transmitter::Send_Massage(Aim);
|
|
}
|
|
}
|
|
}
|
|
|
|
void TaskInfo::Stop(const int TaskID) {
|
|
const auto taskInfo = getData(TaskID);
|
|
if (taskInfo) {
|
|
taskInfo->Flag = 0;
|
|
const auto Aim = new AIMInfo;
|
|
CTL::AutoDestruct AimDestruct(Aim);
|
|
Aim->Data = CTL::ByteArray(taskInfo->TaskOrder.data(),taskInfo->TaskOrder.size());
|
|
for (const auto &ID : taskInfo->Terms) {
|
|
const auto term = Terminal::getData(ID.first);
|
|
if (term) {
|
|
Aim->IP = term->IP;
|
|
Aim->Port = term->Port;
|
|
Transmitter::Send_Massage(Aim);
|
|
}
|
|
}
|
|
}
|
|
TaskInfoMap.Remove(TaskID);
|
|
}
|
|
|
|
TaskInfo * TaskInfo::getData(const int TaskID) {
|
|
const auto taskInfo = TaskInfoMap.get(TaskID);
|
|
if (taskInfo && *taskInfo) {
|
|
return *taskInfo;
|
|
}
|
|
return nullptr;
|
|
}
|
|
|
|
void TaskInfo::StopAll() {
|
|
for (const auto &task : TaskInfoMap) {
|
|
const auto taskInfo = getData(task.second->TaskID);
|
|
if (taskInfo) {
|
|
if (taskInfo->Type == 0 && taskInfo->pushFlowTask) {
|
|
taskInfo->pushFlowTask->stop();
|
|
}
|
|
}
|
|
Stop(task.second->TaskID);
|
|
}
|
|
}
|