#include "TaskInfo.h" #include "PortService/ByteTool.h" #include "TaskModel/TaskAllocator.h" TaskInfo::TaskInfo() { } TaskInfo::~TaskInfo() { if (pushFlowTask) { delete pushFlowTask; pushFlowTask = nullptr; } } void TaskInfo::BitstreamHeartbeat() { std::shared_lock lock(mutex_socket); const auto Setting = Config::getConfig(); const auto IDS = APPTool::GetBytes(Setting->ID); if (socket && !socket->isClosed()) { CTL::DatagramPacket packet; const CTL::InetAddress address(Setting->ServerIP); packet.setAddress(address); packet.setPort(this->ServerPort); packet.setData({0x66,0xAB,0x01,TaskID,IDS[0],IDS[1],0x01,0x00}); const bool F = socket->send(packet); if (!F) { CTL::System::Println("TaskID: {} Heartbeat sending failed",TaskID); } } } void TaskInfo::InitSocket() { const auto Setting = Config::getConfig(); CTL::InetAddress address(Setting->IP); { std::unique_lock lock(mutex_socket); socket = new CTL::DatagramSocket(Setting->StreamPort,address); } while (!socket->isClosed()) { if (socket->available()) { CTL::DatagramPacket packet; socket->receive(packet); const auto buffer = packet.getData(); const auto length = packet.getLength(); if (length > 0) { const int taskID = buffer[0]; const auto task = TaskInfo::getData(taskID); if (task) { if (task->Type == 0 && task->pushFlowTask) { CTL::ByteArray data; data.assign(buffer,length); task->pushFlowTask->addBuffer(data); } } } } else { CTL::Thread::SleepMS(5); } } { std::unique_lock lock(mutex_socket); delete socket; socket = nullptr; } } void TaskInfo::Start(TaskInfo *taskInfo) { TaskInfoMap.put(taskInfo->TaskID,taskInfo); } void TaskInfo::Stop(const int TaskID) { const auto taskInfo = getData(TaskID); if (taskInfo) { taskInfo->Flag = 0; } TaskInfoMap.Remove(TaskID); } TaskInfo * TaskInfo::getData(const int TaskID) { const auto taskInfo = TaskInfoMap.get(TaskID); if (taskInfo && *taskInfo) { return *taskInfo; } return nullptr; }