Files
CodeRepository/Common/GdZeroMQ.cpp
chenzhen 222dda1e43 1,新增“App_ThermalImageSystem”;
2,新增“Apps”;
3,新增“Common”;
4,新增“FileList”;
5,新增“MediaX”;
6,新增“OpenSource”;
7,新增“Samples”;
8,新增“SoftwareBusinessLines”.
2026-02-14 23:03:23 +08:00

217 lines
5.1 KiB
C++

#include "GdZeroMQ.h"
GdZeroMQ::GdZeroMQ()
{
m_pContext = nullptr;
m_pPublisher = nullptr;
m_pSubscriber = nullptr;
m_bInit = false;
m_bThreadRunning_RecvData = false;
m_ptrThread_RecvData = nullptr;
m_pCallBack_RecvData = nullptr;
m_pOwner = nullptr;
}
GdZeroMQ::~GdZeroMQ()
{
Uninit();
}
bool GdZeroMQ::Init(int nPort_Publisher, QVector<int> nPorts_Subscriber, const char* lpszPublisherName, pCallBack_RecvData pCB_RecvData, void* pOwner) //ZMQ初始化
{
m_lpszSelfName = const_cast<char *>(lpszPublisherName);
m_pCallBack_RecvData = pCB_RecvData;
m_pOwner = pOwner;
m_pContext = zmq_ctx_new();
if(nullptr == m_pContext)
{
return false;
}
//服务端 发布
CreatePublisher(nPort_Publisher);
//客户端 订阅
CreateSubscriber(nPorts_Subscriber);
m_bInit = true;
StartThread();
return true;
}
void GdZeroMQ::Uninit()
{
if(!m_bInit)
{
return;
}
if(nullptr != m_pSubscriber)
{
zmq_close(m_pSubscriber); //退出时调用
}
if(nullptr != m_pPublisher)
{
zmq_close(m_pPublisher); //退出时调用
}
if(nullptr != m_pContext)
{
zmq_ctx_destroy(m_pContext);
}
StopThread();
m_bInit = false;
}
void GdZeroMQ::CreatePublisher(int nPort_Publisher)
{
if(nullptr == m_pContext)
{
return;
}
m_pPublisher = zmq_socket(m_pContext, ZMQ_PUB);
QString pub_str = QString("tcp://*:") + QString::number(nPort_Publisher);
// qDebug()<<"pub_str:"<<pub_str;
zmq_bind(m_pPublisher, pub_str.toLocal8Bit().data());
}
void GdZeroMQ::CreateSubscriber(QVector<int> nPorts_Subscriber)
{
if(nullptr == m_pContext)
{
return;
}
m_pSubscriber = zmq_socket(m_pContext, ZMQ_SUB);
for(QVector<int>::Iterator iter = nPorts_Subscriber.begin();
iter != nPorts_Subscriber.end();
++iter)
{
QString sub_str = QString("tcp://localhost:") + QString::number(*iter);
// qDebug()<<"sub_str:"<<sub_str;
zmq_connect(m_pSubscriber, sub_str.toLocal8Bit().data());
}
AddTopic(m_lpszSelfName);
}
void GdZeroMQ::AddTopic(const char* lpszTopicName)
{
zmq_setsockopt(m_pSubscriber, ZMQ_SUBSCRIBE, lpszTopicName, strlen(lpszTopicName) ); //允许订阅多个频道
//qDebug()<<"sub topic:"<< lpszTopicName;
}
void GdZeroMQ::SendData(char* lpszObjName, QByteArray szData) //向某个主题发送数据
{
if(nullptr == m_pPublisher)
{
return;
}
zmq_send (m_pPublisher, lpszObjName, strlen(lpszObjName), ZMQ_SNDMORE); //指定要发布消息的主题
// qDebug()<<"pub msg_more:" << lpszObjName;
zmq_send (m_pPublisher, m_lpszSelfName, strlen(m_lpszSelfName),ZMQ_SNDMORE); //向设置的主题发布消息(自己的名称)
// qDebug()<<"pub msg_more:"<<m_lpszSelfName;
zmq_send (m_pPublisher, szData.data(), szData.length(), 0); //向设置的主题发布消息(内容主体)
// qDebug()<<"pub msg:"<< szData;
}
void GdZeroMQ::ThreadEntry(ThreadRunFunPtr pRunFun, void* pOwner)
{
GdZeroMQ* pThis = reinterpret_cast<GdZeroMQ*>(pOwner);
if (nullptr == pThis)
{
return;
}
(pThis->*pRunFun)();
}
void GdZeroMQ::ThreadFun_RecvData()
{
char szTopicName[4096] = {0}; // 用于接收订阅的主题名
char szAddr[4096] = {0}; // 用于接收address
char* lpszData = new char[16*1024*1024]; // 用于接收订阅主题的内容
memset(lpszData, 0, 16*1024*1024);
while(m_bThreadRunning_RecvData)
{
memset(szTopicName, 0, sizeof(szTopicName) );
memset(szAddr, 0, sizeof(szAddr) );
int nRecvSize_TopicName = zmq_recv(m_pSubscriber, szTopicName, sizeof(szTopicName), 0); //接收订阅的主题名称
if (-1 == nRecvSize_TopicName)
{
qDebug("recv topic error!!\n");
}
int nRecvSize_Addr = zmq_recv(m_pSubscriber, szAddr, sizeof(szAddr), 0); //接收订阅的主题名称
if (-1 == nRecvSize_Addr)
{
qDebug("recv addr error!!\n");
}
int nRecvSize_Data = zmq_recv(m_pSubscriber, lpszData, 16*1024*1024, 0);
if (-1 == nRecvSize_Data)
{
qDebug("recv data error!!\n");
}
QByteArray rcv_data1 = szTopicName;
QByteArray rcv_data2 = szAddr;
QByteArray rcv_data3(lpszData, nRecvSize_Data);
m_pCallBack_RecvData(rcv_data1,rcv_data2,rcv_data3, m_pOwner);
std::this_thread::sleep_for(std::chrono::microseconds(1) );
}
}
bool GdZeroMQ::StartThread()
{
StopThread();
m_bThreadRunning_RecvData = true;
m_ptrThread_RecvData = new std::thread(std::bind(&GdZeroMQ::ThreadEntry, &GdZeroMQ::ThreadFun_RecvData, (void*)this));
return true;
}
void GdZeroMQ::StopThread()
{
if(m_bThreadRunning_RecvData)
{
m_bThreadRunning_RecvData = false;
if(nullptr != m_ptrThread_RecvData)
{
m_ptrThread_RecvData->join();
delete m_ptrThread_RecvData;
m_ptrThread_RecvData = nullptr;
}
}
}