Distribution_Service/Server/Task/TaskInfo.cpp

101 lines
2.7 KiB
C++
Raw Normal View History

2025-12-04 18:12:54 +08:00
#include "TaskInfo.h"
#include "PortService/ByteTool.h"
#include "TaskModel/TaskAllocator.h"
TaskInfo::TaskInfo() {
}
TaskInfo::~TaskInfo() {
if (pushFlowTask) {
delete pushFlowTask;
pushFlowTask = nullptr;
}
}
void TaskInfo::BitstreamHeartbeat() {
std::shared_lock lock(mutex_socket);
const auto Setting = Config::getConfig();
const auto IDS = APPTool::GetBytes(Setting->ID);
if (socket && !socket->isClosed()) {
CTL::DatagramPacket packet;
const CTL::InetAddress address(Setting->ServerIP);
packet.setAddress(address);
packet.setPort(this->ServerPort);
packet.setData({0x66,0xAB,0x01,TaskID,IDS[0],IDS[1],0x01,0x00});
const bool F = socket->send(packet);
if (!F) {
CTL::System::Println("TaskID: {} Heartbeat sending failed",TaskID);
}
}
}
void TaskInfo::InitSocket() {
const auto Setting = Config::getConfig();
CTL::InetAddress address(Setting->IP);
{
std::unique_lock lock(mutex_socket);
socket = new CTL::DatagramSocket(Setting->StreamPort,address);
}
while (!socket->isClosed()) {
if (socket->available()) {
CTL::DatagramPacket packet;
socket->receive(packet);
const auto buffer = packet.getData();
const auto length = packet.getLength();
if (length > 0) {
const int taskID = buffer[0];
const auto task = TaskInfo::getData(taskID);
if (task) {
if (task->Type == 0 && task->pushFlowTask) {
CTL::ByteArray data;
data.assign(buffer,length);
task->pushFlowTask->addBuffer(data);
}
}
}
}
else {
CTL::Thread::SleepMS(5);
}
}
{
std::unique_lock lock(mutex_socket);
delete socket;
socket = nullptr;
}
}
void TaskInfo::Start(TaskInfo *taskInfo) {
TaskInfoMap.put(taskInfo->TaskID,taskInfo);
}
void TaskInfo::Stop(const int TaskID) {
const auto taskInfo = getData(TaskID);
if (taskInfo) {
taskInfo->Flag = 0;
}
TaskInfoMap.Remove(TaskID);
}
TaskInfo * TaskInfo::getData(const int TaskID) {
const auto taskInfo = TaskInfoMap.get(TaskID);
if (taskInfo && *taskInfo) {
return *taskInfo;
}
return nullptr;
}
2025-12-06 10:48:12 +08:00
void TaskInfo::StopAll() {
for (const auto &task : TaskInfoMap) {
const auto taskInfo = getData(task.second->TaskID);
if (taskInfo) {
if (taskInfo->Type == 0 && taskInfo->pushFlowTask) {
taskInfo->pushFlowTask->stop();
}
}
Stop(task.second->TaskID);
}
}