前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >C++编程库与框架实战——ZeroMQ消息队列

C++编程库与框架实战——ZeroMQ消息队列

作者头像
Coder-ZZ
发布2024-05-27 20:37:18
2830
发布2024-05-27 20:37:18
举报
文章被收录于专栏:C/C++进阶专栏C/C++进阶专栏

一,消息队列简介

消息队列是一种进程间的通信机制,用于在不同进程之间同步消息。通信期间,一个进程将消息放入该队列中,然后另一个进程就可以从该队列中取出这条消息。

消息队列可以是异步的,即发送方无需等待接收方的确认或回复就可以立即执行下一步的操作。

消息队列是一种缓冲机制,即使接收方当前无法处理某个消息,该消息也不会立即丢失,而是被存储在队列中。

消息队列的通信方式减少了进程间的耦合,提高了系统的可扩展性和可维护性。

消息队列还可以用于实现分布式的任务调度和负载均衡。

常见的消息队列框架有:

ZeroMQ, RabbitMQ, ActiveMQ, Apache Kafka, MQTT等。

二,ZeroMQ框架介绍

ZeroMZeroMQ,简称"zmq",是一种高效、开源的消息传递框架,它提供了多种消息传递模式和编程语言支持。相比于传统的Socket网络编程,ZeroMQ提供了更高层次的抽象,使得程序员能够更专注于业务逻辑的实现而非底层网络通信。

ZeroMQ提供了多种消息传递模式,包括Request-Reply、Publish-Subscribe、Push-Pull等。这些模式可用于不同的场景,例如,Request-Reply适用于客户端与服务器之间的交互,Publish-Subscribe适用于发布-订阅模式,Push-Pull适用于任务分发和负载均衡等。

在ZeroMQ中,消息是通过Socket进行发送和接收的,ZeroMQ支持多种Socket类型。

ZeroMQ支持多种编程语言,包括C/C++、Java、Python等,这使得不同语言编写的应用程序之间可以互相通信,进而可以实现跨平台或者跨设备的数据传输。

三,ZeroMQ基础用法

1.创建zmq上下文

代码语言:javascript
复制
zmq::context_t context(1);

2.创建zmq通信期间的socket套接字

server端:

代码语言:javascript
复制
zmq::socket_t socket(context, ZMQ_REP)

client端:

代码语言:javascript
复制
zmq::socket_t socket(context, ZMQ_REQ);

3. 绑定或连接到对应的socket

server端:

代码语言:javascript
复制
socket.bind("tcp://*:5555");

client端:

代码语言:javascript
复制
socket.connect("tcp://localhost:5555");

4. 发送或接收消息

代码语言:javascript
复制
zmq::message_t msg(5); 
memcpy(msg.data(), "hello", 5); 
socket.send(msg);
代码语言:javascript
复制
zmq::message_t reply; 
socket.recv(&reply);

5.关闭socket和zmq上下文,释放资源

代码语言:javascript
复制
socket.close(); 
context.close();

四,ZeroMQ应用场景

1.分布式计算:ZeroMQ的分布式特性使其非常适用于构建分布式计算系统,可以在多个计算节点之间进行高效的消息传递和任务分发,从而加速计算过程。举个例子,某些区块链相关的应用就基于ZeroMQ实现了消息分发机制。

2.服务端开发:ZeroMQ可以用于构建轻量级的服务架构,服务之间通过ZeroMQ通信,可以实现高可用性和可扩展性。

3.日志框架开发:ZeroMQ可以用于实时日志收集,它可以在不同的进程之间传递日志消息,并将它们进行聚合和存储。

4.消息队列构建:ZeroMQ可以用于构建高性能的消息队列机制,多个生产者可以向一个队列发送消息,多个消费者可以从队列中取出消息进行处理。

5.实时通信:ZeroMQ可以用于构建实时通信系统,例如聊天应用、游戏服务器等,通过ZeroMQ可以进行高效的消息传递和实时状态同步。

6.跨设备数据传输:在嵌入式开发场景,可以通过ZeroMQ传输从各个传感器采集到的数据,也可以实现嵌入式设备到移动App端的通信。

五,ZeroMQ主要通信模式

1.请求-应答模式(Request-Reply)

用于服务端和客户端的直接通信。

客户端发送请求,服务端接收请求并给出响应。

2.发布-订阅模式(Publish-Subscribe)

以广播的方式传递消息,发布者将数据分发给多个订阅者。

发布者将消息发送到一个或多个主题,订阅者可以订阅特定的主题并接收消息。

3.异步队列模式(Push-Pull)

Push端将消息推到队列中,Pull端从队列中取出消息进行处理。

该模式也被称为管道模式(Pipeline)。

4.排他对接模式(Exclusive Pair)

点对点的模式,将两个套接字一对一地连接起来,用于两个节点之间的通信,这种模式应用场景很少。

六,ZeroMQ常用函数接口

zmq_ctx_new:创建zmq上下文对象。

zmq_socket:创建zmq套接字对象。

zmq_bind:将套接字绑定到指定端口上。

zmq_connect:将套接字连接到指定端口上。

zmq_send:往套接字上发送消息。

zmq_recv:从套接字上接收消息。

zmq_poll:等待多个套接字上的事件。

zmq_msg_init:初始化空的zmq消息。

zmq_msg_send:往套接字上发送消息,支持更复杂的操作。

zmq_msg_recv:从套接字上接收消息,支持更复杂的操作。

1.zmq消息的构造

代码语言:javascript
复制
//创建空的zmq消息
zmq::message_t msg;

//给消息分配内存空间
const size_t size = 1024;
zmq::message_t msg(size);

//使用外部数据初始化消息
zmq::message_t msg("hello world!", 12);

2.发送zmq消息

代码语言:javascript
复制
zmq::message_t msg = ...;

auto res = sock.send(msg, zmq::send_flags::none);
auto res = sock.send(std::move(msg), zmq::send_flags::none);
auto res = sock.send(zmq::str_buffer("hello world"), zmq::send_flags::none);

3.接收zmq消息

代码语言:javascript
复制
auto res = sock.recv(msg, zmq::recv_flags::none);
auto res = sock.recv(buf, zmq::recv_flags::none);

4.设置或读取套接字属性

代码语言:javascript
复制
sock.set(zmq::sockopt::immediate, false);
sock.set(zmq::sockopt::routing_id, "100");

auto rid = sock.get(zmq::sockopt::routing_id);

5.poller轮询器操作

代码语言:javascript
复制
zmq::poller_t<> in_poller, out_poller;
//创建两个输入poller,一个输出poller
in_poller.add(input_socket1, zmq::event_flags::pollout);
in_poller.add(input_socket2, zmq::event_flags::pollout);
out_poller.add(output_socket, zmq::event_flags::pollout);

const std::chrono::milliseconds timeout{100};
std::vector<zio::poller_event<>> in_events(2);
std::vector<zio::poller_event<>> out_events(1);
while (true) {
    const auto nin = in_poller.wait_all(in_events, timeout);
    if (!nin) {
        std::cout << "input timeout, try again" << std::endl;
        continue;
    }
    for (int ind=0; ind<nin; ++ind) {
        zmq::message_t msg;
        auto rres = in_events[ind].socket.recv(msg, zmq::recv_flags::none);


        const auto nout = out_poller.wait_all(out_events, timeout);
        if (!nout) {
            std::cout << "output timeout, freakout" << std::endl;
            abort();
        }
        auto sres = out_events[0].socket.send(msg, zmq::send_flags::none);
    }
}

七,ZeroMQ的编码与集成

1.zmq的Linux版本安装

下载官方发行的Linux版本zmq代码,下载完成后在本地编译生成依赖库和头文件。

下载地址1:

https://github.com/zeromq/zeromq4-1/releases

进入zmq代码目录,使用以下命令进行编译:

代码语言:javascript
复制
sh autogen.sh
./configure
make
make install

运行完以上命令,不配置自定义路径的时候,会在"/user/local/"下生成对应的so文件和头文件。

代码语言:javascript
复制
├── include
│   ├── zmq.h
│   └── zmq_utils.h
├── lib
│   ├── libzmq.a
│   ├── libzmq.la
│   ├── libzmq.so -> libzmq.so.5.0.3
│   ├── libzmq.so.5 -> libzmq.so.5.0.3
│   ├── libzmq.so.5.0.3
│   └── pkgconfig

注:"/user/local/"路径很容易被编译器找到,因此,编译的时候,只需要在gcc或g++命令后面加上"-lzmq"参数即可。

2.C语言版本的zmq集成

a.操作步骤:

完成以上安装即可。

b.引入的头文件:

代码语言:javascript
复制
include <zmq.h>

3.C++语言版本的zmq集成

a.操作步骤:

1.完成以上安装。

2.下载并解压官方的cppzmq压缩包,从中拷贝需要依赖的hpp头文件到之前的include目录中。

下载地址2:

https://github.com/zeromq/cppzmq/archive/master.zip

代码语言:javascript
复制
cp zmq.hpp /user/local/include/
cp zmq_addon.hpp /user/local/include/

b.引入的头文件:

zmq.hpp: 包含zmq消息、上下文、缓冲区、套接字、监视器、轮询器等的具体实现。

zmq_addon.hpp:zeromq库的扩展,包含更多高级功能以及另一种形式的轮询器的实现。

代码语言:javascript
复制
include <zmq.h>
include <zmq.hpp>

//实现复杂功能会需要zmq_addon.hpp
//#include <zmq_addon.hpp> 

4.完整的集成过程

a.项目结构:

代码语言:javascript
复制
── zmq_demo
    ├── CMakeLists.txt
    ├── zmq_client.cpp
    └── zmq_server.cpp

b.CMakeLists.txt配置

代码语言:javascript
复制
cmake_minimum_required(VERSION 3.5)
project(zmq_demo)

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
include_directories(/user/local/include)
link_directories(/user/local/lib)

add_compile_options(-Wno-error=unused-parameter)

#Add executable
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)
add_executable(zmq_server zmq_server.cpp)
add_executable(zmq_client zmq_client.cpp)

#Link zmq library
target_link_libraries(zmq_server -lzmq)
target_link_libraries(zmq_client -lzmq)

c.服务端代码:zmq_server.cpp

代码语言:javascript
复制
#include <zmq.h>
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>

std::string s_recv(zmq::socket_t & socket, int flags = 0) {
    zmq::message_t message;
    auto recv_flags = (flags ==0)? zmq::recv_flags::none: zmq::recv_flags::dontwait;
    (void)socket.recv(message, recv_flags);

    return std::string(static_cast<char*>(message.data()), message.size());
}

bool s_send(zmq::socket_t & socket, const std::string & string, int flags = 0) {
    zmq::message_t message(string.size());
    memcpy (message.data(), string.data(), string.size());

    bool rc = socket.send (message, static_cast<zmq::send_flags>(flags)).has_value();
    return (rc);
}

int main()
{
    zmq::context_t context(1);
    zmq::socket_t socket(context, ZMQ_REP);
    socket.bind("tcp://*:5555");

    int count=0;
    while (true)
    {
        std::string recvStr;
        recvStr = s_recv(socket);
        std::cout << "server Received " << recvStr << " " << count << std::endl;
        sleep(1);

        //Send reply back to client
        std::string sendStr = "World";
        s_send(socket, sendStr);
        std::cout << "server Send " << sendStr << " " << count << std::endl;
        count++;
    }
    return 0;
}

d.客户端代码:zmq_client.cpp

代码语言:javascript
复制
#include <zmq.hpp>
#include <zmq.h>
#include <string>
#include <iostream>

std::string s_recv(zmq::socket_t & socket, int flags = 0) {
    zmq::message_t message;
    auto recv_flags = (flags ==0)? zmq::recv_flags::none: zmq::recv_flags::dontwait;
    (void)socket.recv(message, recv_flags);

    return std::string(static_cast<char*>(message.data()), message.size());
}

bool s_send(zmq::socket_t & socket, const std::string & string, int flags = 0) {
    zmq::message_t message(string.size());
    memcpy (message.data(), string.data(), string.size());

    bool rc = socket.send (message, static_cast<zmq::send_flags>(flags)).has_value();
    return (rc);
}

int main()
{
    zmq::context_t context(1);
    zmq::socket_t socket(context, ZMQ_REQ);

    std::cout << "Connecting to hello world server..." << std::endl;
    socket.connect("tcp://localhost:5555");

    for (int request_nbr = 0; request_nbr != 6; request_nbr++)
    {
        std::string sendStr = "Hello";
        s_send(socket, sendStr);
        std::cout << "client Send " << sendStr << " " << request_nbr << std::endl;

        //Get the reply.
        std::string recvStr;
        recvStr = s_recv(socket);
        std::cout << "client Received " << recvStr << " " << request_nbr << std::endl;
    }
    return 0;
}

编译过程:

代码语言:javascript
复制
Scanning dependencies of target zmq_server
[ 25%] Building CXX object CMakeFiles/zmq_server.dir/zmq_server.cpp.o
[ 50%] Linking CXX executable ../bin/zmq_server
[ 50%] Built target zmq_server
Scanning dependencies of target zmq_client
[ 75%] Building CXX object CMakeFiles/zmq_client.dir/zmq_client.cpp.o
[100%] Linking CXX executable ../bin/zmq_client
[100%] Built target zmq_client

运行结果:

客户端:

代码语言:javascript
复制
root@ubuntu:/home/zmq_demo/bin# ./zmq_client
Connecting to hello world server...
client Send Hello 0
client Received World 0
client Send Hello 1
client Received World 1
client Send Hello 2
client Received World 2
client Send Hello 3
client Received World 3
client Send Hello 4
client Received World 4
client Send Hello 5
client Received World 5

服务端:

代码语言:javascript
复制
root@ubuntu:/home/zmq_demo/bin# ./zmq_server
server Received Hello 0
server Send World 0
server Received Hello 1
server Send World 1
server Received Hello 2
server Send World 2
server Received Hello 3
server Send World 3
server Received Hello 4
server Send World 4
server Received Hello 5
server Send World 5

八,ZeroMQ代码实战

发布者端:

代码语言:javascript
复制
#include <iostream>
#include <string>
#include <unistd.h>
#include <zmq.h>
#include <zmq.hpp>

int main()
{
    // Create ZMQ Context
    zmq::context_t context ( 1 );
    // Create the Publish socket
    zmq::socket_t publisher ( context, ZMQ_PUB );
    // Bind to a tcp socket
    publisher.bind( "tcp://*:5556" );

    usleep( 1000000 );
    // Message to send to the subscribers
    std::string msg = "msg from [pub]";

    // loop 6 times
    for ( int i = 1; i <= 6; i++ )
    {
        // Create zmq message
        zmq::message_t request( msg.length() );
        // Copy contents to zmq message
        memcpy( request.data(), msg.c_str(), msg.length() );
        // Publish the message
        publisher.send( request );
        std::cout << "sending: " << i << std::endl;
    }
}

订阅者端:

代码语言:javascript
复制
#include <iostream>
#include <string>
#include <zmq.h>
#include <zmq.hpp>

int main()
{
    // Create ZMQ Context
    zmq::context_t context ( 1 );
    // Create the Subscribe socket
    zmq::socket_t subscriber ( context, ZMQ_SUB );
    // Connect to a tcp socket
    subscriber.connect( "tcp://localhost:5556" );
    // Set the socket option to subscribe
    subscriber.setsockopt( ZMQ_SUBSCRIBE, "", 0 );

    // infinite loop to receive messages
    for ( int i = 1; i > 0; i++ )
    {
        // Receive the message and convert to string
        zmq::message_t update;
        subscriber.recv( &update );
        std::string msg = update.to_string();
        // Print the message
        std::cout << "Num: " << i << ", message: " << msg << std::endl;
    }
}

运行结果:

发布者端:

代码语言:javascript
复制
root@ubuntu:/home/zmq_demo/bin# ./zmq_pub
sending: 1
sending: 2
sending: 3
sending: 4
sending: 5
sending: 6

订阅者端:

代码语言:javascript
复制
root@ubuntu:/home/zmq_demo/bin# ./zmq_sub
Num: 1, message: msg from [pub]
Num: 2, message: msg from [pub]
Num: 3, message: msg from [pub]
Num: 4, message: msg from [pub]
Num: 5, message: msg from [pub]
Num: 6, message: msg from [pub]

九,参考阅读

https://zeromq.org/socket-api/

git://github.com/imatix/zguide.git

https://zguide.zeromq.org/docs/chapter2/

https://wizardforcel.gitbooks.io/zmq-guide/content/chapter1.html

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-05-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员与背包客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列
腾讯云消息队列 TDMQ 是分布式架构中的重要组件,提供异步通信的基础能力,通过应用解耦降低系统复杂度,提升系统可用性和可扩展性。TDMQ 产品系列提供丰富的产品形态,包含 CKafka、RocketMQ、RabbitMQ、Pulsar、CMQ 五大产品,覆盖在线和离线场景,满足金融、互联网、教育、物流、能源等不同行业和场景的需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档