This commit is contained in:
qingjiao 2025-12-04 18:12:54 +08:00
parent 2eb6dc6421
commit 6fdbaa0016
16 changed files with 620 additions and 15 deletions

8
.idea/sshConfigs.xml Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="SshConfigs">
<configs>
<sshConfig authType="PASSWORD" host="192.168.2.23" id="2cbaed5b-a9ab-40c3-9c2d-271ce42d55e5" port="22" nameFormat="DESCRIPTIVE" username="linaro" useOpenSSHConfig="true" />
</configs>
</component>
</project>

14
.idea/webServers.xml Normal file
View File

@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="WebServers">
<option name="servers">
<webServer id="dc030f27-c57f-4e9d-932e-5f0dcd53a72b" name="RK3566LInux">
<fileTransfer accessType="SFTP" host="192.168.2.23" port="22" sshConfigId="2cbaed5b-a9ab-40c3-9c2d-271ce42d55e5" sshConfig="linaro@192.168.2.23:22 password">
<advancedOptions>
<advancedOptions dataProtectionLevel="Private" keepAliveTimeout="0" passiveMode="true" shareSSLContext="true" isUseSudo="true" />
</advancedOptions>
</fileTransfer>
</webServer>
</option>
</component>
</project>

View File

@ -62,9 +62,9 @@ namespace CTL{
_mutex.unlock();
return size;
}
CCVector<T> values(){
std::vector<T> values(){
_mutex.lock();
CCVector<T> vector(_deque.begin(),_deque.end());
std::vector<T> vector(_deque.begin(),_deque.end());
_mutex.unlock();
return vector;
}

View File

@ -17,6 +17,10 @@ add_executable(Distribution_Service main.cpp
Server/Terminal/Terminal.h
Server/ThreadMain/ConnectionService.cpp
Server/ThreadMain/ConnectionService.h
Server/Task/TaskInfo.cpp
Server/Task/TaskInfo.h
Server/Task/PushFlowTask.h
Server/Task/PushFlowTask.cpp
)
set_target_properties(Distribution_Service PROPERTIES
@ -25,6 +29,10 @@ set_target_properties(Distribution_Service PROPERTIES
target_link_libraries(Distribution_Service PUBLIC CC_API)
if (CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64|arm|ARM|Armv[0-9]+")
target_link_libraries(Distribution_Service PUBLIC stdc++fs)
endif ()
target_include_directories(Distribution_Service PUBLIC
${CC_API_INC}
Server

View File

@ -40,6 +40,7 @@ void Config::Init() {
BS_Log::Error("Config::Init Error: {}",e.what());
}
Setting->IP = GetIP();
BS_Log::Log("Local IP: {}",Setting->IP.c_str());
}
Config * Config::getConfig() {

View File

@ -3,7 +3,6 @@
#include "CCDatagramSocket.h"
#include "CCFile.h"
#include "CCSystem.h"
#include "CCJSONObject.h"
#include "CCString.h"
@ -22,6 +21,7 @@ public:
int StreamPort = 10062;
int HttpPort = 9090;
bool Flag = false;
inline static std::shared_mutex m_mutex_udp;
CTL::DatagramSocket* m_socket_udp = nullptr;
CTL::IPVX IP_x = CTL::IPV4;
private:

View File

@ -4,6 +4,7 @@
#include "DataPacket.h"
#include "ByteTool.h"
#include "../Configuration/BS_Log.h"
#include "Task/TaskInfo.h"
#include "Terminal/Terminal.h"
class R_Order_AB {
@ -24,12 +25,9 @@ public:
Setting->ID = ID;
BS_Log::Log("Function_0x02 ID Change to {}", ID);
}
const auto str = Terminal::toTermInfo();
const auto IDS = APPTool::GetBytes(Setting->ID);
const auto Len = APPTool::GetBytes(str.size());
CTL::ByteArray data = {0x66,0xAB,0x03,0x00,IDS[0],IDS[1],Len[0],Len[1]};
data.append(str);
if (packet.IsTCP && packet.AimWS) {
const CTL::ByteArray data = {0x66,0xAB,0x09,0x00,IDS[0],IDS[1],0x00,0x00};
if (packet.AimWS && packet.IsTCP) {
packet.AimWS->SendBinary(data);
}
}
@ -53,14 +51,76 @@ public:
static void Function_0x06(const DataPacket& packet) {
}
static void Function_0x15(const DataPacket& packet) {
static void Function_0x07(const DataPacket& packet) {
try {
const auto ID = APPTool::GetInt(packet.Parameter2, packet.Parameter3);
const int TaskID = packet.Parameter1;
const int Length = APPTool::GetInt(packet.Parameter4, packet.Parameter5);
const auto str = packet.AdditionalData.toString();
CTL::JSONObject Json = CTL::JSONObject::parse(str);
const int Flag = Json.get("Flag");
if (Flag == 1) {
const auto taskInfo = new TaskInfo;
taskInfo->TaskID = TaskID;
taskInfo->TaskType = Json.get("TaskType");
taskInfo->ServerPort = Json.get("ServerPort");
const auto TermList = Json.get("Terms").get<CCVector<int>>();
for (const auto& TID : TermList) {
taskInfo->Terms.put(TID, TID);
}
taskInfo->pushFlowTask = new PushFlowTask(TaskID);
TaskInfo::Start(taskInfo);
taskInfo->pushFlowTask->start();
}
else if (Flag == 0) {
const auto taskInfo = TaskInfo::getData(TaskID);
if (taskInfo) {
if (taskInfo->Type == 0 && taskInfo->pushFlowTask) {
taskInfo->pushFlowTask->stop();
}
}
TaskInfo::Stop(TaskID);
}
}
catch (CCException& e) {
BS_Log::Error("Function_0x07 Error: {}",e.what());
}
}
static void Function_0xA0(const DataPacket& packet){
static void Function_0x09(const DataPacket& packet) {
try {
const auto str = packet.AdditionalData.toString();
CTL::JSONObject Json = CTL::JSONObject::parse(str);
const auto taskInfo = new TaskInfo;
taskInfo->TaskID = Json.get("TaskID");
taskInfo->TaskType = Json.get("TaskType");
taskInfo->ServerPort = Json.get("ServerPort");
const auto TermList = Json.get("Terms").get<CCVector<int>>();
for (const auto& TID : TermList) {
taskInfo->Terms.put(TID, TID);
}
taskInfo->pushFlowTask = new PushFlowTask(taskInfo->TaskID);
TaskInfo::Start(taskInfo);
taskInfo->pushFlowTask->start();
}
catch (CCException& e) {
BS_Log::Error("Function_0x09 Error: {}",e.what());
}
}
static void Function_0xA1(const DataPacket& packet){
const int ID = APPTool::GetInt(packet.Parameter2, packet.Parameter3);
static void Function_0x10(const DataPacket& packet) {
try {
}
catch (CCException& e) {
BS_Log::Error("Function_0x10 Error: {}",e.what());
}
}
static void Function_0x11(const DataPacket& packet) {
try {
}
catch (CCException& e) {
BS_Log::Error("Function_0x11 Error: {}",e.what());
}
}
};

View File

@ -59,6 +59,22 @@ void Routing::RoutingFunction(const DataPacket &packet) {
R_Order_AB::Function_0x06(packet);
break;
}
case 0x07: {
R_Order_AB::Function_0x07(packet);
break;
}
case 0x09: {
R_Order_AB::Function_0x09(packet);
break;
}
case 0x10: {
R_Order_AB::Function_0x10(packet);
break;
}
case 0x11: {
R_Order_AB::Function_0x11(packet);
break;
}
default: {
R_Order_AB::Function(packet);
break;

View File

@ -0,0 +1,47 @@
#include "PushFlowTask.h"
#include "BS_Log.h"
#include "TaskInfo.h"
PushFlowTask::PushFlowTask(const int TaskID) {
this->TaskID = TaskID;
}
void PushFlowTask::start() {
this->Flag = true;
this->Flag_t = true;
const auto Info = TaskInfo::getData(TaskID);
if (Info) {
AllocatorInfo AllInfo;
AllInfo.TaskID = Info->TaskID;
AllInfo.Terms = Info->Terms.values();
taskAllocator.Init(AllInfo);
bool F = taskAllocator.Start();
TaskInfo::thread_pool.AddTask(&PushFlowTask::run, this);
}
}
void PushFlowTask::stop() {
this->Flag = false;
taskAllocator.Stop();
while (this->Flag_t){
CTL::Thread::SleepMS(1);
}
}
void PushFlowTask::addBuffer(const CTL::ByteArray &buffer) {
taskAllocator.AddBuffer(buffer);
}
void PushFlowTask::run() {
BS_Log::Log("PushFlowTask::run Start TaskID: {}",TaskID);
while (Flag) {
const auto task = TaskInfo::getData(TaskID);
if (task) {
task->BitstreamHeartbeat();
}
sleep.SleepMillisecond(std::chrono::milliseconds(5 * 1000));
}
this->Flag_t = false;
BS_Log::Warning("PushFlowTask::run Stop TaskID: {}",TaskID);
}

View File

@ -0,0 +1,21 @@
#ifndef DISTRIBUTION_SERVICE_PUSH_FLOW_TASK_H
#define DISTRIBUTION_SERVICE_PUSH_FLOW_TASK_H
#include "TaskModel/TaskAllocator.h"
class PushFlowTask {
int TaskID = -1;
bool Flag = false;
bool Flag_t = false;
TaskAllocator taskAllocator;
CTL::IntSleep sleep;
public:
explicit PushFlowTask(int TaskID);
void start();
void stop();
void addBuffer(const CTL::ByteArray& buffer);
private:
void run();
};
#endif

88
Server/Task/TaskInfo.cpp Normal file
View File

@ -0,0 +1,88 @@
#include "TaskInfo.h"
#include "PortService/ByteTool.h"
#include "TaskModel/TaskAllocator.h"
TaskInfo::TaskInfo() {
}
TaskInfo::~TaskInfo() {
if (pushFlowTask) {
delete pushFlowTask;
pushFlowTask = nullptr;
}
}
void TaskInfo::BitstreamHeartbeat() {
std::shared_lock lock(mutex_socket);
const auto Setting = Config::getConfig();
const auto IDS = APPTool::GetBytes(Setting->ID);
if (socket && !socket->isClosed()) {
CTL::DatagramPacket packet;
const CTL::InetAddress address(Setting->ServerIP);
packet.setAddress(address);
packet.setPort(this->ServerPort);
packet.setData({0x66,0xAB,0x01,TaskID,IDS[0],IDS[1],0x01,0x00});
const bool F = socket->send(packet);
if (!F) {
CTL::System::Println("TaskID: {} Heartbeat sending failed",TaskID);
}
}
}
void TaskInfo::InitSocket() {
const auto Setting = Config::getConfig();
CTL::InetAddress address(Setting->IP);
{
std::unique_lock lock(mutex_socket);
socket = new CTL::DatagramSocket(Setting->StreamPort,address);
}
while (!socket->isClosed()) {
if (socket->available()) {
CTL::DatagramPacket packet;
socket->receive(packet);
const auto buffer = packet.getData();
const auto length = packet.getLength();
if (length > 0) {
const int taskID = buffer[0];
const auto task = TaskInfo::getData(taskID);
if (task) {
if (task->Type == 0 && task->pushFlowTask) {
CTL::ByteArray data;
data.assign(buffer,length);
task->pushFlowTask->addBuffer(data);
}
}
}
}
else {
CTL::Thread::SleepMS(5);
}
}
{
std::unique_lock lock(mutex_socket);
delete socket;
socket = nullptr;
}
}
void TaskInfo::Start(TaskInfo *taskInfo) {
TaskInfoMap.put(taskInfo->TaskID,taskInfo);
}
void TaskInfo::Stop(const int TaskID) {
const auto taskInfo = getData(TaskID);
if (taskInfo) {
taskInfo->Flag = 0;
}
TaskInfoMap.Remove(TaskID);
}
TaskInfo * TaskInfo::getData(const int TaskID) {
const auto taskInfo = TaskInfoMap.get(TaskID);
if (taskInfo && *taskInfo) {
return *taskInfo;
}
return nullptr;
}

34
Server/Task/TaskInfo.h Normal file
View File

@ -0,0 +1,34 @@
#ifndef DISTRIBUTION_SERVICE_TASK_INFO_H
#define DISTRIBUTION_SERVICE_TASK_INFO_H
#include "CCDatagramSocket.h"
#include "CCThreadPool.h"
#include "TL/Map.h"
#include "PushFlowTask.h"
class TaskInfo {
inline static CTL::Map<int,TaskInfo*> TaskInfoMap;
inline static CTL::DatagramSocket* socket = nullptr;
inline static std::shared_mutex mutex_socket;
public:
inline static CTL::ThreadPool thread_pool;
TaskInfo();
~TaskInfo();
void BitstreamHeartbeat();
int Flag = 0;
int Type = 0;
int TaskID = -1;
int TaskType = 0;
int ServerPort = 10060;
CTL::Map<int, int> Terms;
PushFlowTask* pushFlowTask = nullptr;
private:
public:
static void InitSocket();
static void Start(TaskInfo* taskInfo);
static void Stop(int TaskID);
static TaskInfo* getData(int TaskID);
};
#endif

View File

@ -0,0 +1,156 @@
#ifndef OH_BS_TASK_ALLOCATOR_H
#define OH_BS_TASK_ALLOCATOR_H
#define Thread_TermNumber_Max 60
#include "TaskExecutor.h"
#include "TL/List.h"
#include "CCVector.h"
#include "Config.h"
struct AllocatorInfo{
int TaskID = -1;
CCVector<int> Terms;
};
class TaskAllocator{
CTL::Map<int,TaskExecutor*> executors;
bool Flag = false;
static int CalculateNumberThreads(const CCList<int>& Terms){
int size = (int)Terms.size() / Thread_TermNumber_Max ;
if(size == 0){
return size + 1;
}
else {
return size;
}
}
static int CalculateNumberThreads(const int TestCount){
const int size = TestCount / Thread_TermNumber_Max;
if(size == 0){
return size + 1;
}
else {
return size;
}
}
static std::list<int> GetTerms(const int Index,CCVector<int>& Terms){
std::list<int> terms;
int index = Index * Thread_TermNumber_Max;
int size = (Index + 1) * Thread_TermNumber_Max;
for(int i = index;i < size;i++){
if (Terms.size() > i) {
const auto ID = Terms.get(i);
if(ID){
terms.push_back(ID);
}
else {
break;
}
}
else {
break;
}
}
return terms;
}
CTL::Map<int, int> TermList;
int TaskID = -1;
mutable std::shared_mutex mutex_t;
public:
//------------------------------------------------------------------------------------------------------------------
TaskAllocator() = default;
//------------------------------------------------------------------------------------------------------------------
void Init(AllocatorInfo& info){
const int Thread_Num = CalculateNumberThreads(info.Terms);
for (const auto ID : info.Terms) {
TermList.Put(ID,ID);
}
this->TaskID = info.TaskID;
for(int i = 0;i < Thread_Num;i++){
ExecutorInfo executorInfo;
executorInfo.TaskID = info.TaskID;
executorInfo.list = GetTerms(i, info.Terms);
executorInfo.Index = i;
auto executor = new TaskExecutor();
executor->Init(executorInfo);
executors.Put(executorInfo.Index, executor);
}
}
bool Start(){
std::unique_lock lock(mutex_t);
this->Flag = true;
for(const auto& executor : executors){
if(executor.second){
executor.second->Start();
}
}
return true;
}
void Stop(){
std::unique_lock lock(mutex_t);
Flag = false;
for(auto& executor : executors){
if(executor.second){
executor.second->Stop();
}
delete executor.second;
executor.second = nullptr;
}
executors.Clear();
}
void AddBuffer(const CTL::ByteArray& buffer){
std::unique_lock lock(mutex_t);
for(auto& executor : executors){
if(executor.second){
executor.second->AddBuffer(buffer);
}
}
}
void AddTerm(const int TID,const CTL::ByteArray& Command){
std::unique_lock lock(mutex_t);
for(auto& executor : executors){
if(executor.second){
if(executor.second->GetTermSize() < Thread_TermNumber_Max){
executor.second->AddTerm(TID,Command);
return;
}
}
}
int Size = executors.Size();
ExecutorInfo executorInfo;
executorInfo.TaskID = TaskID;
CCVector<int> Terms = {TID};
executorInfo.list = GetTerms(Size, Terms);
executorInfo.Index = Size;
auto executor = new TaskExecutor();
executor->Init(executorInfo);
executors.Put(executorInfo.Index, executor);
executor->Start();
TaskExecutor::NotificationTerminal(TID, Command);
}
void RemoveTerm(const int TID,const CTL::ByteArray& Command){
std::unique_lock lock(mutex_t);
CCList<int> IndexList;
for(auto& executor : executors){
if(executor.second){
executor.second->RemoveTerm(TID,Command);
if(executor.second->GetTermSize() <= 0){
IndexList.add(executor.first);
}
}
}
for(auto& index : IndexList){
auto executor = executors.Get(index);;
if(executor && *executor){
auto item = *executor;
item->Stop();
delete item;
item = nullptr;
}
executors.Remove(index);
}
}
};
#endif

View File

@ -0,0 +1,135 @@
#ifndef OH_BS_TASK_EXECUTOR_H
#define OH_BS_TASK_EXECUTOR_H
#include "TL/Map.h"
#include "TL/Queue.h"
#include "Terminal/Terminal.h"
#include "CCThreadPool.h"
#include "Transmitter.h"
struct ExecutorInfo{
int Index = -1;
CCList<int> list = {};
int TaskID = -1;
};
class TaskExecutor{
CTL::Map<int, int> Terms;
CTL::Queue<CTL::ByteArray> M_Queue;
bool Flag = false;
bool Flag_t = false;
inline static CTL::ThreadPool thread_pool_t;
public:
//------------------------------------------------------------------------------------------------------------------
static void InitPool() {
thread_pool_t.InitStart(100,1024,1000 * 5);
}
static void ReleasePool() {
thread_pool_t.Stop();
}
//------------------------------------------------------------------------------------------------------------------
int Index = -1;
int Test = 0;
int TestNum = 0;
int DistributorFlag = 0;
int TaskID = -1;
//------------------------------------------------------------------------------------------------------------------
TaskExecutor() = default;
~TaskExecutor() = default;
TaskExecutor(TaskExecutor& taskExecutor){
this->Terms.SetStdMap(taskExecutor.Terms);
this->Index = taskExecutor.Index;
}
TaskExecutor(const TaskExecutor& taskExecutor){
this->Terms.SetStdMap(taskExecutor.Terms);
this->Index = taskExecutor.Index;
}
void Init(const ExecutorInfo& taskExecutor){
for (auto ID : taskExecutor.list) {
Terms.Put(ID,ID);
}
this->Index = taskExecutor.Index;
this->TaskID = taskExecutor.TaskID;
}
int GetTermSize(){
return Terms.Size();
}
//------------------------------------------------------------------------------------------------------------------
void AddBuffer(const CTL::ByteArray& buffer){
M_Queue.Add(buffer);
}
void Start(){
Flag = true;
Flag_t = true;
thread_pool_t.AddTask(&TaskExecutor::Running,this);
}
void Stop(){
Flag = false;
while (Flag_t){
CTL::Thread::SleepMS(1);
}
}
void AddTerm(const int TID,const CTL::ByteArray& Command){
if(Terms.IsContains(TID)){
return;
}
if(NotificationTerminal(TID, Command)){
Terms.Put(TID,TID);
}
}
void RemoveTerm(const int TID,const CTL::ByteArray& Command){
if(!Terms.IsContains(TID)){
return;
}
if(NotificationTerminal(TID, Command)){
Terms.Remove(TID);
}
}
static bool NotificationTerminal(const int TID,const CTL::ByteArray& Command){
auto Term = Terminal::getData(TID);
if(Term){
auto* AIM = new AIMInfo();
AIM->IP = Term->IP;
AIM->Port = Term->Port;
AIM->Data = Command;
Transmitter::Send_Massage(AIM);
delete AIM;
return true;
}
return false;
}
bool IsStop(){
return Flag_t;
}
private:
void SendTermsData(const CTL::ByteArray& data){
auto list = Terms.Values();
auto* AIM = new AIMInfo();
CTL::AutoDestruct Aim_Destruct(AIM);
AIM->Data = data;
for(auto ID : list){
auto Term = Terminal::getData(ID);
if(Term){
if(Term->Status == 1){
AIM->IP = Term->IP;
AIM->Port = Term->StreamPort;
Transmitter::Send_Stream(AIM);
}
}
}
}
void Running(){
while (Flag){
auto buffer = M_Queue.Poll();
if(buffer){
SendTermsData(*buffer.get());
}
else {
CTL::Thread::SleepMS(2);
}
}
this->Flag_t = false;
}
};
#endif

View File

@ -29,6 +29,7 @@ public:
packet.setAddress(address);
packet.setData(info->Data);
packet.setPort(info->Port);
std::shared_lock lock(Config::m_mutex_udp);
return Setting->m_socket_udp->send(packet);
}
}
@ -46,6 +47,7 @@ public:
packet.setAddress(address);
packet.setData(info->Data);
packet.setPort(info->Port);
std::shared_lock lock(Config::m_mutex_udp);
return Setting->m_socket_udp->send(packet);
}
}

View File

@ -6,6 +6,8 @@
#include "Configuration/BS_Log.h"
#include "PortService/Routing.h"
#include "ConnectionService.h"
#include "Task/TaskInfo.h"
#include "TaskModel/TaskExecutor.h"
class ThreadMain {
@ -21,11 +23,16 @@ private:
~ThreadMain() {
BS_Log::Flush();
BS_Log::Stop();
TaskExecutor::ReleasePool();
TaskInfo::thread_pool.Stop();
}
void orderRun() {
const auto Setting = Config::getConfig();
CTL::InetAddress address(Setting->IP);
Setting->m_socket_udp = new CTL::DatagramSocket(Setting->OrderPort,address);
{
std::unique_lock lock(Config::m_mutex_udp);
Setting->m_socket_udp = new CTL::DatagramSocket(Setting->OrderPort,address);
}
BS_Log::Log("OrderRun Start");
while (!Setting->m_socket_udp->isClosed()) {
if (Setting->m_socket_udp->available()) {
@ -41,6 +48,11 @@ private:
CTL::Thread::SleepMS(1);
}
}
{
std::unique_lock lock(Config::m_mutex_udp);
delete Setting->m_socket_udp;
Setting->m_socket_udp = nullptr;
}
BS_Log::Warning("orderRun End");
}
static void ConnectService() {
@ -77,9 +89,12 @@ public:
Setting->Flag = true;
BS_Log::Init();
Config::Init();
TaskExecutor::InitPool();
TaskInfo::thread_pool.InitStart(255, 1024, 1000);
ConnectionService::init();
m_threadPool.AddTask(&ThreadMain::orderRun,this);
m_threadPool.AddTask(ThreadMain::ConnectService);
m_threadPool.AddTask(TaskInfo::InitSocket);
}
};