Distribution_Service/Server/Task/TaskInfo.cpp
2026-03-24 14:43:26 +08:00

128 lines
3.8 KiB
C++

#include "TaskInfo.h"
#include "PortService/ByteTool.h"
#include "TaskModel/TaskAllocator.h"
CTL::Map<int,TaskInfo*> TaskInfo::TaskInfoMap;
CTL::DatagramSocket* TaskInfo::socket = nullptr;
std::shared_mutex TaskInfo::mutex_socket;
CTL::ThreadPool TaskInfo::thread_pool;
TaskInfo::TaskInfo() {
}
TaskInfo::~TaskInfo() {
if (pushFlowTask) {
delete pushFlowTask;
pushFlowTask = nullptr;
}
}
void TaskInfo::BitstreamHeartbeat() {
std::shared_lock<std::shared_mutex> lock(mutex_socket);
const auto Setting = Config::getConfig();
const auto IDS = APPTool::GetBytes(Setting->ID);
if (socket && !socket->isClosed()) {
CTL::DatagramPacket packet;
const CTL::InetAddress address(Setting->ServerIP);
packet.setAddress(address);
packet.setPort(this->ServerPort);
packet.setData({0x66,0xAB,0x01,TaskID,IDS[0],IDS[1],0x01,0x00});
const bool F = socket->send(packet);
if (!F) {
CTL::System::Println("TaskID: {} Heartbeat sending failed",TaskID);
}
}
}
void TaskInfo::InitSocket() {
const auto Setting = Config::getConfig();
CTL::InetAddress address(Setting->IP);
{
std::unique_lock<std::shared_mutex> lock(mutex_socket);
socket = new CTL::DatagramSocket(Setting->StreamPort,address);
}
while (!socket->isClosed()) {
if (socket->available()) {
CTL::DatagramPacket packet;
socket->receive(packet);
const auto buffer = packet.getData();
const auto length = packet.getLength();
if (length > 0) {
const int taskID = buffer[0];
const auto task = TaskInfo::getData(taskID);
if (task) {
if (task->Type == 0 && task->pushFlowTask) {
CTL::ByteArray data;
data.assign(buffer,length);
task->pushFlowTask->addBuffer(data);
}
}
}
}
else {
CTL::Thread::Sleep(300);
}
}
{
std::unique_lock<std::shared_mutex> lock(mutex_socket);
delete socket;
socket = nullptr;
}
}
void TaskInfo::Start(TaskInfo *taskInfo) {
TaskInfoMap.put(taskInfo->TaskID,taskInfo);
const auto Aim = new AIMInfo;
CTL::AutoDestruct<AIMInfo> AimDestruct(Aim);
Aim->Data = CTL::ByteArray(taskInfo->TaskOrder.data(),taskInfo->TaskOrder.size());
for (const auto &ID : taskInfo->Terms) {
const auto term = Terminal::getData(ID.first);
if (term) {
Aim->IP = term->IP;
Aim->Port = term->Port;
Transmitter::Send_Massage(Aim);
}
}
}
void TaskInfo::Stop(const int TaskID) {
const auto taskInfo = getData(TaskID);
if (taskInfo) {
taskInfo->Flag = 0;
const auto Aim = new AIMInfo;
CTL::AutoDestruct<AIMInfo> AimDestruct(Aim);
Aim->Data = CTL::ByteArray(taskInfo->TaskOrder.data(),taskInfo->TaskOrder.size());
for (const auto &ID : taskInfo->Terms) {
const auto term = Terminal::getData(ID.first);
if (term) {
Aim->IP = term->IP;
Aim->Port = term->Port;
Transmitter::Send_Massage(Aim);
}
}
}
TaskInfoMap.Remove(TaskID);
}
TaskInfo * TaskInfo::getData(const int TaskID) {
const auto taskInfo = TaskInfoMap.get(TaskID);
if (taskInfo && *taskInfo) {
return *taskInfo;
}
return nullptr;
}
void TaskInfo::StopAll() {
for (const auto &task : TaskInfoMap) {
const auto taskInfo = getData(task.second->TaskID);
if (taskInfo) {
if (taskInfo->Type == 0 && taskInfo->pushFlowTask) {
taskInfo->pushFlowTask->stop();
}
}
Stop(task.second->TaskID);
}
}