226 lines
6.9 KiB
C++
226 lines
6.9 KiB
C++
|
|
#include "CommunicationManager.h"
|
|
#include "Communication.h"
|
|
#include "Register/Register.h"
|
|
#include "RouterTable/RouterTable.h"
|
|
#include "Config/JsonPaser.h"
|
|
#include "Transform/Transform.h"
|
|
|
|
CommunicationManager::CommunicationManager()
|
|
{
|
|
m_mMapDatas.clear();
|
|
m_bThreadRunningLoop = false;
|
|
}
|
|
|
|
CommunicationManager::~CommunicationManager()
|
|
{
|
|
m_mMapDatas.clear();
|
|
}
|
|
|
|
CommunicationManager* CommunicationManager::Instance()
|
|
{
|
|
static std::shared_ptr<CommunicationManager> s_instance(new CommunicationManager());
|
|
return s_instance.get();
|
|
}
|
|
|
|
void CommunicationManager::Initialize(const std::string& config)
|
|
{
|
|
/// 以下通过JSON配置 动态生成
|
|
std::vector<std::string> m_pCommonicationList = JsonPaser::Instance().CommunicationTypeVector();
|
|
for (int i = 0; i < m_pCommonicationList.size(); i++)
|
|
{
|
|
std::cout << "m_pCommonicationList.at(i): " << m_pCommonicationList.at(i) << std::endl;
|
|
initCommunication(m_pCommonicationList.at(i));
|
|
}
|
|
}
|
|
|
|
bool CommunicationManager::Start()
|
|
{
|
|
std::lock_guard<std::recursive_mutex> lck(m_mRecursive);
|
|
int nError = -1, nCount = 0;
|
|
for (auto it = m_mMapDatas.begin(); it != m_mMapDatas.end();)
|
|
{
|
|
nError = it->second->Open();
|
|
if (0 != nError)
|
|
{
|
|
++nCount;
|
|
RouterTable::Instance().delRouter(it->first);
|
|
it = m_mMapDatas.erase(it);
|
|
continue;
|
|
}
|
|
++it;
|
|
}
|
|
m_bSetup = true;
|
|
if (nCount < m_mMapDatas.size())
|
|
{
|
|
m_bSetup = false;
|
|
}
|
|
m_bThreadRunningLoop = true;
|
|
m_pFutureThreadRunLoop = new std::thread(std::bind(&CommunicationManager::ThreadEntry,
|
|
&CommunicationManager::ThreadFunLoop, (void*)this));
|
|
|
|
return m_bSetup;
|
|
}
|
|
|
|
bool CommunicationManager::Stop()
|
|
{
|
|
std::lock_guard<std::recursive_mutex> lck(m_mRecursive);
|
|
int nError = -1, nCount = 0;
|
|
for (auto it = m_mMapDatas.begin(); it != m_mMapDatas.end();)
|
|
{
|
|
nError = it->second->Close();
|
|
if (0 != nError)
|
|
{
|
|
++nCount;
|
|
RouterTable::Instance().delRouter(it->first);
|
|
it = m_mMapDatas.erase(it);
|
|
continue;
|
|
}
|
|
++it;
|
|
}
|
|
m_bSetup = true;
|
|
if (nCount < m_mMapDatas.size())
|
|
{
|
|
m_bSetup = false;
|
|
}
|
|
|
|
if (m_bThreadRunningLoop)
|
|
{
|
|
m_bThreadRunningLoop = false;
|
|
if (m_pFutureThreadRunLoop)
|
|
{
|
|
m_pFutureThreadRunLoop->join();
|
|
delete m_pFutureThreadRunLoop;
|
|
m_pFutureThreadRunLoop = NULL;
|
|
}
|
|
}
|
|
|
|
return m_bSetup;
|
|
}
|
|
|
|
void CommunicationManager::PushSendData(const SimpleData& stSimpleData)
|
|
{
|
|
// m_DataSendQueue.enqueue(stSimpleData);
|
|
doDealWithSendData(stSimpleData);
|
|
}
|
|
|
|
const SimpleData& CommunicationManager::GetRecvData()
|
|
{
|
|
return m_DataSendQueue.dequeue();
|
|
}
|
|
|
|
void CommunicationManager::initCommunication(const std::string& strName)
|
|
{
|
|
std::vector<std::string> m_pCommonicationList = JsonPaser::Instance().CommunicationNameVector(strName);
|
|
for (int i = 0; i < m_pCommonicationList.size(); i++)
|
|
{
|
|
CCommonication* pCCommunication = DataCreatorFactory::Instance()->createCommonication(m_pCommonicationList.at(i));
|
|
m_mMapDatas.emplace(m_pCommonicationList.at(i), pCCommunication);
|
|
pCCommunication->Init(JsonPaser::Instance().ComunicationProperty(strName));
|
|
pCCommunication->RegisterCallBack(onCallBackMessage, (void*)CommunicationManager::Instance());
|
|
}
|
|
}
|
|
|
|
void CommunicationManager::onCallBackMessage(const char* source, const char* extra, unsigned char* stData, int nLen, void* pUser)
|
|
{
|
|
/// 放在线程中处理
|
|
CommunicationManager* pManager = (CommunicationManager*)pUser;
|
|
if (pManager)
|
|
{
|
|
pManager->onPushReciveData(source, extra, stData, nLen);
|
|
}
|
|
}
|
|
|
|
void CommunicationManager::onPushReciveData(const char* source, const char* extra, unsigned char* stData, int nLen)
|
|
{
|
|
SimpleData reciveData;
|
|
strcpy(reciveData.strSource, source);
|
|
strcpy(reciveData.strExtra, extra);
|
|
strcpy_s(reciveData.byteArray, nLen, (const char*)stData);
|
|
|
|
// m_DataRecvQueue.enqueue(reciveData);
|
|
doDealWithReciveData(reciveData);
|
|
}
|
|
|
|
void CommunicationManager::doDealWithSendData(const SimpleData& stSimpleData)
|
|
{
|
|
//先处理转换协议后 转发事件
|
|
int nTransLens = -1;
|
|
SimpleData transformData /*= stSimpleData*/;
|
|
Transform::Instance().TransformToLocalByRule(std::string(stSimpleData.strSource),
|
|
stSimpleData.byteArray,
|
|
sizeof(stSimpleData.byteArray) / sizeof(stSimpleData.byteArray[0]),
|
|
transformData.byteArray, nTransLens);
|
|
doDealWithRouterTableSendData(stSimpleData);
|
|
}
|
|
|
|
void CommunicationManager::doDealWithRouterTableSendData(const SimpleData& stSimpleData)
|
|
{
|
|
// decltype(m_mMapDatas) copy;
|
|
// {
|
|
// //先拷贝(开销比较小),目的是防止在触发回调时还是上锁状态从而导致交叉互锁
|
|
// std::lock_guard<std::recursive_mutex> lck(m_mRecursive);
|
|
// copy = m_mMapDatas;
|
|
// }
|
|
std::vector<std::string> m_RouterList = RouterTable::Instance().getRouterTable(std::string(stSimpleData.strSource));
|
|
for (int i = 0; i < m_RouterList.size(); i++)
|
|
{
|
|
std::cout << "m_RouterList.at(i): " << m_RouterList.at(i) << std::endl;
|
|
auto iter = m_mMapDatas.find(m_RouterList.at(i));
|
|
while (iter != m_mMapDatas.end())
|
|
{
|
|
if (iter->second)
|
|
{
|
|
iter->second->Send((unsigned char*)stSimpleData.byteArray,
|
|
sizeof(stSimpleData.byteArray) / sizeof(stSimpleData.byteArray[0]),
|
|
std::string(stSimpleData.strExtra));
|
|
}
|
|
iter++;
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
void CommunicationManager::doDealWithReciveData(const SimpleData& stSimpleData)
|
|
{
|
|
//由配置文件决定数据stData
|
|
|
|
/// 所有数据执行transform 发送一份给软件内部
|
|
SimpleData transformData = stSimpleData;
|
|
doDealWithRouterTableSendData(transformData);
|
|
}
|
|
|
|
void CommunicationManager::ThreadFunLoop()
|
|
{
|
|
while (m_bThreadRunningLoop)
|
|
{
|
|
// if (!m_DataSendQueue.isEmpty())
|
|
// {
|
|
// SimpleData stSendData = m_DataSendQueue.dequeue();
|
|
// //处理发送数据
|
|
// doDealWithSendData(stSendData);
|
|
// }
|
|
// else if (!m_DataRecvQueue.isEmpty())
|
|
// {
|
|
// SimpleData stRecvData = m_DataRecvQueue.dequeue();
|
|
// //处理接收数据
|
|
// doDealWithReciveData(stRecvData);
|
|
// }
|
|
// else
|
|
// {
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
|
// }
|
|
}
|
|
}
|
|
|
|
void CommunicationManager::ThreadEntry(CommunicationManager::ThreadRunFunPtr pRunFun, void* pOwner)
|
|
{
|
|
CommunicationManager* pRunClass = reinterpret_cast<CommunicationManager*>(pOwner);
|
|
if (!pRunClass)
|
|
{
|
|
return;
|
|
}
|
|
(pRunClass->*pRunFun)();
|
|
}
|
|
|