Files
2026-02-01 22:23:06 +08:00

226 lines
6.9 KiB
C++

#include "CommunicationManager.h"
#include "Communication.h"
#include "Register/Register.h"
#include "RouterTable/RouterTable.h"
#include "Config/JsonPaser.h"
#include "Transform/Transform.h"
CommunicationManager::CommunicationManager()
{
m_mMapDatas.clear();
m_bThreadRunningLoop = false;
}
CommunicationManager::~CommunicationManager()
{
m_mMapDatas.clear();
}
CommunicationManager* CommunicationManager::Instance()
{
static std::shared_ptr<CommunicationManager> s_instance(new CommunicationManager());
return s_instance.get();
}
void CommunicationManager::Initialize(const std::string& config)
{
/// 以下通过JSON配置 动态生成
std::vector<std::string> m_pCommonicationList = JsonPaser::Instance().CommunicationTypeVector();
for (int i = 0; i < m_pCommonicationList.size(); i++)
{
std::cout << "m_pCommonicationList.at(i): " << m_pCommonicationList.at(i) << std::endl;
initCommunication(m_pCommonicationList.at(i));
}
}
bool CommunicationManager::Start()
{
std::lock_guard<std::recursive_mutex> lck(m_mRecursive);
int nError = -1, nCount = 0;
for (auto it = m_mMapDatas.begin(); it != m_mMapDatas.end();)
{
nError = it->second->Open();
if (0 != nError)
{
++nCount;
RouterTable::Instance().delRouter(it->first);
it = m_mMapDatas.erase(it);
continue;
}
++it;
}
m_bSetup = true;
if (nCount < m_mMapDatas.size())
{
m_bSetup = false;
}
m_bThreadRunningLoop = true;
m_pFutureThreadRunLoop = new std::thread(std::bind(&CommunicationManager::ThreadEntry,
&CommunicationManager::ThreadFunLoop, (void*)this));
return m_bSetup;
}
bool CommunicationManager::Stop()
{
std::lock_guard<std::recursive_mutex> lck(m_mRecursive);
int nError = -1, nCount = 0;
for (auto it = m_mMapDatas.begin(); it != m_mMapDatas.end();)
{
nError = it->second->Close();
if (0 != nError)
{
++nCount;
RouterTable::Instance().delRouter(it->first);
it = m_mMapDatas.erase(it);
continue;
}
++it;
}
m_bSetup = true;
if (nCount < m_mMapDatas.size())
{
m_bSetup = false;
}
if (m_bThreadRunningLoop)
{
m_bThreadRunningLoop = false;
if (m_pFutureThreadRunLoop)
{
m_pFutureThreadRunLoop->join();
delete m_pFutureThreadRunLoop;
m_pFutureThreadRunLoop = NULL;
}
}
return m_bSetup;
}
void CommunicationManager::PushSendData(const SimpleData& stSimpleData)
{
// m_DataSendQueue.enqueue(stSimpleData);
doDealWithSendData(stSimpleData);
}
const SimpleData& CommunicationManager::GetRecvData()
{
return m_DataSendQueue.dequeue();
}
void CommunicationManager::initCommunication(const std::string& strName)
{
std::vector<std::string> m_pCommonicationList = JsonPaser::Instance().CommunicationNameVector(strName);
for (int i = 0; i < m_pCommonicationList.size(); i++)
{
CCommonication* pCCommunication = DataCreatorFactory::Instance()->createCommonication(m_pCommonicationList.at(i));
m_mMapDatas.emplace(m_pCommonicationList.at(i), pCCommunication);
pCCommunication->Init(JsonPaser::Instance().ComunicationProperty(strName));
pCCommunication->RegisterCallBack(onCallBackMessage, (void*)CommunicationManager::Instance());
}
}
void CommunicationManager::onCallBackMessage(const char* source, const char* extra, unsigned char* stData, int nLen, void* pUser)
{
/// 放在线程中处理
CommunicationManager* pManager = (CommunicationManager*)pUser;
if (pManager)
{
pManager->onPushReciveData(source, extra, stData, nLen);
}
}
void CommunicationManager::onPushReciveData(const char* source, const char* extra, unsigned char* stData, int nLen)
{
SimpleData reciveData;
strcpy(reciveData.strSource, source);
strcpy(reciveData.strExtra, extra);
strcpy_s(reciveData.byteArray, nLen, (const char*)stData);
// m_DataRecvQueue.enqueue(reciveData);
doDealWithReciveData(reciveData);
}
void CommunicationManager::doDealWithSendData(const SimpleData& stSimpleData)
{
//先处理转换协议后 转发事件
int nTransLens = -1;
SimpleData transformData /*= stSimpleData*/;
Transform::Instance().TransformToLocalByRule(std::string(stSimpleData.strSource),
stSimpleData.byteArray,
sizeof(stSimpleData.byteArray) / sizeof(stSimpleData.byteArray[0]),
transformData.byteArray, nTransLens);
doDealWithRouterTableSendData(stSimpleData);
}
void CommunicationManager::doDealWithRouterTableSendData(const SimpleData& stSimpleData)
{
// decltype(m_mMapDatas) copy;
// {
// //先拷贝(开销比较小),目的是防止在触发回调时还是上锁状态从而导致交叉互锁
// std::lock_guard<std::recursive_mutex> lck(m_mRecursive);
// copy = m_mMapDatas;
// }
std::vector<std::string> m_RouterList = RouterTable::Instance().getRouterTable(std::string(stSimpleData.strSource));
for (int i = 0; i < m_RouterList.size(); i++)
{
std::cout << "m_RouterList.at(i): " << m_RouterList.at(i) << std::endl;
auto iter = m_mMapDatas.find(m_RouterList.at(i));
while (iter != m_mMapDatas.end())
{
if (iter->second)
{
iter->second->Send((unsigned char*)stSimpleData.byteArray,
sizeof(stSimpleData.byteArray) / sizeof(stSimpleData.byteArray[0]),
std::string(stSimpleData.strExtra));
}
iter++;
}
}
}
void CommunicationManager::doDealWithReciveData(const SimpleData& stSimpleData)
{
//由配置文件决定数据stData
/// 所有数据执行transform 发送一份给软件内部
SimpleData transformData = stSimpleData;
doDealWithRouterTableSendData(transformData);
}
void CommunicationManager::ThreadFunLoop()
{
while (m_bThreadRunningLoop)
{
// if (!m_DataSendQueue.isEmpty())
// {
// SimpleData stSendData = m_DataSendQueue.dequeue();
// //处理发送数据
// doDealWithSendData(stSendData);
// }
// else if (!m_DataRecvQueue.isEmpty())
// {
// SimpleData stRecvData = m_DataRecvQueue.dequeue();
// //处理接收数据
// doDealWithReciveData(stRecvData);
// }
// else
// {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
// }
}
}
void CommunicationManager::ThreadEntry(CommunicationManager::ThreadRunFunPtr pRunFun, void* pOwner)
{
CommunicationManager* pRunClass = reinterpret_cast<CommunicationManager*>(pOwner);
if (!pRunClass)
{
return;
}
(pRunClass->*pRunFun)();
}