217 lines
5.1 KiB
C++
217 lines
5.1 KiB
C++
#include "GdZeroMQ.h"
|
|
|
|
GdZeroMQ::GdZeroMQ()
|
|
{
|
|
m_pContext = nullptr;
|
|
m_pPublisher = nullptr;
|
|
m_pSubscriber = nullptr;
|
|
|
|
m_bInit = false;
|
|
|
|
m_bThreadRunning_RecvData = false;
|
|
m_ptrThread_RecvData = nullptr;
|
|
|
|
m_pCallBack_RecvData = nullptr;
|
|
m_pOwner = nullptr;
|
|
}
|
|
|
|
GdZeroMQ::~GdZeroMQ()
|
|
{
|
|
Uninit();
|
|
}
|
|
|
|
bool GdZeroMQ::Init(int nPort_Publisher, QVector<int> nPorts_Subscriber, const char* lpszPublisherName, pCallBack_RecvData pCB_RecvData, void* pOwner) //ZMQ初始化
|
|
{
|
|
m_lpszSelfName = const_cast<char *>(lpszPublisherName);
|
|
m_pCallBack_RecvData = pCB_RecvData;
|
|
m_pOwner = pOwner;
|
|
|
|
m_pContext = zmq_ctx_new();
|
|
|
|
if(nullptr == m_pContext)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
//服务端 发布
|
|
CreatePublisher(nPort_Publisher);
|
|
|
|
//客户端 订阅
|
|
CreateSubscriber(nPorts_Subscriber);
|
|
|
|
m_bInit = true;
|
|
|
|
StartThread();
|
|
|
|
return true;
|
|
}
|
|
|
|
void GdZeroMQ::Uninit()
|
|
{
|
|
if(!m_bInit)
|
|
{
|
|
return;
|
|
}
|
|
|
|
if(nullptr != m_pSubscriber)
|
|
{
|
|
zmq_close(m_pSubscriber); //退出时调用
|
|
}
|
|
|
|
if(nullptr != m_pPublisher)
|
|
{
|
|
zmq_close(m_pPublisher); //退出时调用
|
|
}
|
|
|
|
if(nullptr != m_pContext)
|
|
{
|
|
zmq_ctx_destroy(m_pContext);
|
|
}
|
|
|
|
StopThread();
|
|
|
|
m_bInit = false;
|
|
}
|
|
|
|
void GdZeroMQ::CreatePublisher(int nPort_Publisher)
|
|
{
|
|
if(nullptr == m_pContext)
|
|
{
|
|
return;
|
|
}
|
|
|
|
m_pPublisher = zmq_socket(m_pContext, ZMQ_PUB);
|
|
|
|
QString pub_str = QString("tcp://*:") + QString::number(nPort_Publisher);
|
|
qDebug()<<"pub_str:"<<pub_str;
|
|
|
|
zmq_bind(m_pPublisher, pub_str.toLocal8Bit().data());
|
|
}
|
|
|
|
void GdZeroMQ::CreateSubscriber(QVector<int> nPorts_Subscriber)
|
|
{
|
|
if(nullptr == m_pContext)
|
|
{
|
|
return;
|
|
}
|
|
|
|
m_pSubscriber = zmq_socket(m_pContext, ZMQ_SUB);
|
|
|
|
for(QVector<int>::Iterator iter = nPorts_Subscriber.begin();
|
|
iter != nPorts_Subscriber.end();
|
|
++iter)
|
|
{
|
|
QString sub_str = QString("tcp://localhost:") + QString::number(*iter);
|
|
qDebug()<<"sub_str:"<<sub_str;
|
|
|
|
zmq_connect(m_pSubscriber, sub_str.toLocal8Bit().data());
|
|
}
|
|
|
|
AddTopic(m_lpszSelfName);
|
|
}
|
|
|
|
void GdZeroMQ::AddTopic(const char* lpszTopicName)
|
|
{
|
|
zmq_setsockopt(m_pSubscriber, ZMQ_SUBSCRIBE, lpszTopicName, strlen(lpszTopicName) ); //允许订阅多个频道
|
|
qDebug()<<"sub topic:"<< lpszTopicName;
|
|
}
|
|
|
|
void GdZeroMQ::SendData(char* lpszObjName, QByteArray szData) //向某个主题发送数据
|
|
{
|
|
if(nullptr == m_pPublisher)
|
|
{
|
|
return;
|
|
}
|
|
|
|
zmq_send (m_pPublisher, lpszObjName, strlen(lpszObjName), ZMQ_SNDMORE); //指定要发布消息的主题
|
|
qDebug()<<"pub msg_more:" << lpszObjName;
|
|
|
|
zmq_send (m_pPublisher, m_lpszSelfName, strlen(m_lpszSelfName),ZMQ_SNDMORE); //向设置的主题发布消息(自己的名称)
|
|
qDebug()<<"pub msg_more:"<<m_lpszSelfName;
|
|
|
|
zmq_send (m_pPublisher, szData.data(), szData.length(), 0); //向设置的主题发布消息(内容主体)
|
|
qDebug()<<"pub msg:"<< szData;
|
|
}
|
|
|
|
void GdZeroMQ::ThreadEntry(ThreadRunFunPtr pRunFun, void* pOwner)
|
|
{
|
|
GdZeroMQ* pThis = reinterpret_cast<GdZeroMQ*>(pOwner);
|
|
|
|
if (nullptr == pThis)
|
|
{
|
|
return;
|
|
}
|
|
|
|
(pThis->*pRunFun)();
|
|
}
|
|
|
|
void GdZeroMQ::ThreadFun_RecvData()
|
|
{
|
|
char szTopicName[4096] = {0}; // 用于接收订阅的主题名
|
|
char szAddr[4096] = {0}; // 用于接收address
|
|
char* lpszData = new char[16*1024*1024]; // 用于接收订阅主题的内容
|
|
|
|
memset(lpszData, 0, 16*1024*1024);
|
|
|
|
while(m_bThreadRunning_RecvData)
|
|
{
|
|
memset(szTopicName, 0, sizeof(szTopicName) );
|
|
memset(szAddr, 0, sizeof(szAddr) );
|
|
|
|
int nRecvSize_TopicName = zmq_recv(m_pSubscriber, szTopicName, sizeof(szTopicName), 0); //接收订阅的主题名称
|
|
|
|
if (-1 == nRecvSize_TopicName)
|
|
{
|
|
qDebug("recv topic error!!\n");
|
|
}
|
|
|
|
int nRecvSize_Addr = zmq_recv(m_pSubscriber, szAddr, sizeof(szAddr), 0); //接收订阅的主题名称
|
|
|
|
if (-1 == nRecvSize_Addr)
|
|
{
|
|
qDebug("recv addr error!!\n");
|
|
}
|
|
|
|
int nRecvSize_Data = zmq_recv(m_pSubscriber, lpszData, 16*1024*1024, 0);
|
|
|
|
if (-1 == nRecvSize_Data)
|
|
{
|
|
qDebug("recv data error!!\n");
|
|
}
|
|
|
|
QByteArray rcv_data1 = szTopicName;
|
|
QByteArray rcv_data2 = szAddr;
|
|
QByteArray rcv_data3(lpszData, nRecvSize_Data);
|
|
|
|
m_pCallBack_RecvData(rcv_data1,rcv_data2,rcv_data3, m_pOwner);
|
|
|
|
std::this_thread::sleep_for(std::chrono::microseconds(1) );
|
|
}
|
|
}
|
|
|
|
bool GdZeroMQ::StartThread()
|
|
{
|
|
StopThread();
|
|
|
|
m_bThreadRunning_RecvData = true;
|
|
m_ptrThread_RecvData = new std::thread(std::bind(&GdZeroMQ::ThreadEntry, &GdZeroMQ::ThreadFun_RecvData, (void*)this));
|
|
|
|
return true;
|
|
}
|
|
|
|
void GdZeroMQ::StopThread()
|
|
{
|
|
if(m_bThreadRunning_RecvData)
|
|
{
|
|
m_bThreadRunning_RecvData = false;
|
|
|
|
if(nullptr != m_ptrThread_RecvData)
|
|
{
|
|
m_ptrThread_RecvData->join();
|
|
|
|
delete m_ptrThread_RecvData;
|
|
m_ptrThread_RecvData = nullptr;
|
|
}
|
|
}
|
|
}
|