首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >消息队列学习

消息队列学习

作者头像
用户11719958
发布2025-12-30 12:04:42
发布2025-12-30 12:04:42
710
举报

一,项目介绍

首先介绍一下阻塞队列(Blocking Queue),在多线程编程中阻塞队列(Blocking Queue)是⼀种常⽤于实现⽣产者和消费者模型的数据结构。

其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放⼊了元素;当队列满时,往队列⾥存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进行操作时会被阻塞)。

而阻塞队列是应用于同一主机的不同线程之上。

所谓的消息队列,就是把阻塞队列这样的数据结构,单独提取成一个程序,进行独立部署。

分布式系统里, 跨主机之间使用生产者消费者模型, 也是非常普遍的需求。

因此, 我们通常会把阻塞队列封装成一个独立的服务器程序, 并且赋予其更丰富的功能。 这样的服务程序我们就称为 消息队列 (Message Queue, MQ)。也是跨主机实现生产者消费者模型。

生产者消费者模型的作用:

  • 解耦合:使数据的生产与数据的处理分离开来,使程序更加模块化。
  • 支持并发:根据实际的生产速率与消费速率,控制创建不同 数量生产者的线程或消费者的线程。
  • 支持忙闲不均/削峰填谷:当生产的数据量剧增时,消息队列可以作为缓冲区,将数据缓存起来,消费者根据自己的处理能力来获取数据。

市面上成熟的消息队列非常多:

  • RabbitMQ
  • Kafka
  • RocketMQ
  • ActiveMQ
  • ......

其中 RabbitMQ 是一个非常知名、功能强大且广泛使用的消息队列。这个项目就是仿照RabbitMQ,实现一个简单的消息队列。

二,开发环境

  • 操作系统:Linux(Ubuntu 20.04)
  • 编辑器:Vscode/vim
  • 编译器/调试工具 :g++/gdb
  • 项目自动化构建工具:Makefile

三,技术选型

  • 开发语言:C++
  • 序列化框架:protobuf序列化框架
  • 网络通信:自定义应用层协议+muduo库
  • 数据持久化:sqlite3数据库
  • 单元测试框架:Gtest

四,第三方库介绍

1,Protobuf

ProtoBuf(全称 Protocol Buffer)是数据结构序列化和反序列化框架,它具有以下特点:

  • 语言无关、平台无关:即 ProtoBuf 支持 Java、C++、Python 等多种语言,支持多个平台。
  • 高效:即比 XML 更小、更快、更为简单。
  • 扩展性、兼容性好:你可以更新数据结构,而不影响和破坏原有的旧程序。

protobuf使用流程介绍:

  1. 编写 .proto 文件,目的是为了定义结构对象(message)及属性内容。
  2. 使用 protoc 编译器编译 .proto 文件,生成一系列接口代码,存放在新生成头文件和源文件中。
  3. 依赖生成的接口,将编译生成的头文件包含进我们的代码中,实现对 .proto 文件中定义的字段进行设置和获取,和对 message 对象进行序列化和反序列化。

示例:

1,编写contacts.proto文件

代码语言:javascript
复制
//通过protobuf生成代码

//声明语法版本
syntax="proto3";

//声明代码的命名空间
package contacts;


//结构化对象的描述
message contact
{
    //各个字段的描述:子段类型 子段名=字段编号
    uint64 sn=1;
    string name=2;
    float score=3;
};  

2,编译contacts.proto文件

代码语言:javascript
复制
//编译命令格式
protoc --cpp_out=./ contacts.proto

3,生成的文件

代码语言:javascript
复制
contacts.pb.cc 和contacts.pb.h

4,序列化和反序列的使用

代码语言:javascript
复制
#include <iostream>
#include <string>
#include "contacts.pb.h"

int main()
{
    // 序列化
    contacts::contact conn;
    conn.set_sn(1001);
    conn.set_name("小明");
    conn.set_score(90);
    std::string str = conn.SerializeAsString();
    // std::cout<<str<<std::endl;

    // 反序列化
    contacts::contact stu;
    bool ret = stu.ParseFromString(str);
    if (ret == false)
    {
        std::cout << "反序列化失败" << std::endl;
    }
    std::cout << stu.sn() << std::endl;
    std::cout << stu.score() << std::endl;
    std::cout << stu.name() << std::endl;
    return 0;
}
2,muduo库

2.1,muduo库的介绍

muduo是由陈硕大佬开发,是一个基于非阻塞IO事件驱动的C++高并发TCP网络编程库。它是一个基于主从Reactor模型的网络库,其用的线程模型是one thread one loop,所谓的one thread one loop指的是:

  • 一个线程只能有一个事件循环(EventLoop),用来响应IO事件。
  • 一个文件描述符,只能由一个线程进行读写,也就是一个TCP连接必须归属于某个EventLoop管理。

在搭建服务器和客户端时,会使用到muduo库中的相关接口。

3,SQLite

SQLite是一个较轻量级的数据库,它是一个零配置的数据库,这意味着与其他数据库不一样,我们不需要在系统中配置。

像其他数据库,SQLite 引擎不是一个独立的进程,可以按应用程序需求进行静态或动态连接,SQLite 直接访问其存储文件。

SQLite3 C/C++ API介绍

SQLite3 官方文档:https://www.sqlite.org/c3ref/funclist.html

代码语言:javascript
复制
//sqlite3有 三种安全等级
//1,非线程安全模式:不加任何的锁,此时的效率肯定是最高的
//2,线程安全模式(不同的连接在不同的线程/进程下使用是安全的,即同一个操作句柄不能用于多线程间)
//3,串行化模式(可以在不同的线程/进程间使用同一个句柄):对所有的操作都 加锁保护起来

//下面的所有函数,返回值为SQLITE_OK表示函数执行成功,否则失败。

//1,创建/打开数据库文件,并返回操作句柄(以输出型参数的形式),该函数执行成功返回SQLITE_OK
int sqlite3_open(const char *filename, sqlite3 **ppDb)

//2,若在编译阶段启动了线程安全,则在程序运行阶段可以通过参数选择线程安全等级
int sqlite3_open_v2(const char *filename, sqlite3 **ppDb, int flags, const char *zVfs );
falg参数可以设置为:SQLITE_OPEN_READWRITE-- 以可读可写方式打开数据库文件
                   SQLITE_OPEN_CREATE-- 不存在数据库文件则创建
                   SQLITE_OPEN_NOMUTEX--多线程模式,只要不同的线程使用不同的连接即可保证线程安全
                   SQLITE_OPEN_FULLMUTEX--串行化模式

//3,执行语句
//对于执行语句sql,他可能是一个查询语句,那么对于结果如何处理?
//此时就需要我们传入一个回调函数来对结果进行处理
//如果这个sql语句是一个插入语句或者删除语句等等,那么回调函数设置为nullptr即可
//在该函数的执行过程中,会将arg传给回调函数的第一个参数
//最后一个参数表示,如果该执行出错,错误信息会设置到err中,通过这个我们可以查看错误原因
int sqlite3_exec(sqlite3*, char *sql, int (*callback)(void*,int,char**,char**),
void* arg, char **err)
callback回调函数的参数:  
    void* : 原函数中的arg参数
    int:一行中数据的列数
    char**:存储一行数据的字符指针数组
    char**:每一列的字段名称   
注意:这个回调函数有一个int类型的返回值,这个回调函数如果成功执行必须返回0,返回非0就会触发abort退出程序
//4,销毁语句
int sqlite3_close(sqlite3* db); 成功返回 SQLITE_OK
int sqlite3_close_v2(sqlite3*); 推荐使用--无论如何都会返回SQLITE_OK

//5,获取错误信息
const char *sqlite3_errmsg(sqlite3* db);
4,GTest

GTest 是一个跨平台的 C++单元测试框架,由 google 公司发布。它提供了丰富的断言、致命和非致命判断、参数化等等。

代码语言:javascript
复制
TEST(test_case_name, test_name)
TEST_F(test_fixture,test_name)
  • TEST:主要用来创建一个简单测试, 它定义了一个测试函数, 在这个函数中可以使用任何 C++代码并且使用框架提供的断言进行检查。
  • TEST_F:主要用来进行多样测试,适用于多个测试场景如果需要相同的数据配置的情况, 即相同的数据测不同的行为。

GTest 中的断言的宏可以分为两类:

  • ASSERT_系列:如果当前点检测失败则退出当前函数。
  • EXPECT_系列:如果当前点检测失败则继续往下执行。
代码语言:javascript
复制
//断言宏的使用
        ASSERT_GT(age,18) //age是否大于18
        ASSERT_LT(age,18) //age是否小于18
//同理也存在EXPECT_GT EXPECT_LT
        ASSERT_ 断言失败停止运行
        EXPECT_ 断言失败继续运行

示例:

代码语言:javascript
复制
#include <iostream>
#include <gtest/gtest.h>

//第一个参数表示单元测试的总名称,第二个表示具体的名称
TEST(test,great_than)
{
    int age=20;
    //GT大于
    ASSERT_GT(age,18);
    std::cout<<"OK!\n";    
}
TEST(test,less_than1)
{
    int age=20;
    //LT小于
    EXPECT_GT(age,18);
    std::cout<<"OK!\n";
}
int main(int argc,char* argv[])
{
    testing::InitGoogleTest(&argc,argv);
    //启动所有的单元测试
    int ret=RUN_ALL_TESTS();
    return 0;
}

五,项目需求分析

1,核心概念

在前面提到过,消息队列,就是把阻塞队列这样的数据结构,单独提取成一个程序,进行独立部署。

会有很多个生产者客户端向服务器放入数据,服务器在将消息推送给对应的客户端进行处理。

Broker Server服务器内部涉及到一些关键的概念: 1,队列(Queue):真正用来存储消息的实体,后续消费者也是从队列中获取数据。

2,交换机(Exchange):生产者将消息投递到Broker Server上,是先将消息船体给某个交换机,再由交换机决定,应该将消息传递给哪些绑定的队列。

3,绑定(Binding):交换机与队列的绑定关系,一个交换机可以绑定一个或者多个队列。

4,虚拟机(Virtual Host):虚拟机可以理解为数据库中的"database"。一个Broker Server上可以组织不同类别的数据,此时就可以使用虚拟机进行区分。但是在本项目的实现中,虚拟机只实现了一个。

5,消息(Message):可以认为是A给B发的请求,通过MQ转发,就是一个消息。同样B给A发出响应,经过MQ转发,也是一个消息。

当生产者发布一条消息到服务器后,先交给交换机,再由交换机决定交给哪个队列,这个过程是由交换机类型决定的。交换机有三种类型:Direct(直接交换),Fanout(广播交换),Topic(主题交换)。

  • 生产者发送消息的时候,会携带一个routing_key,就是一个字符串,而交换机与队列的绑定关系中也包含一个字符串binding_key
  • Direct(直接交换):生产者发送消息的时候,交换机先拿到消息,交换机再根据绑定的所有队列的binding_key进行比较,只有routing_key=binding_key,才将消息发送给该 队列。
  • Fanout(广播交换):同样生产者 发送消息先到交换机,此时交换机会将这条消息发送给所有绑定的队列。
  • Topic(主题交换):可以理解为如果此时routing_key与binding_key对上"暗号"了,此时就可以把消息发送给指定的队列。这里的"暗号"实际 上就是指一种匹配规则,当routing_key与binding_key匹配成功时,就可以把消息 发送给指定的队列中。

以上的这些内容,都是AMQP协议——高级消息队列协议中规定出来的。

持久化操作:对于上述的数据:交换机,队列,绑定关系,消息等等,需要在内存中存储,也需要在硬盘中存储。 在内存中存储:使用/访问更加方便开快速。 在硬盘存储:保证服务器重启后数据不丢失。

至此 ,本项目需要实现的内容包括:Broker Server服务器,客户端(包括发布客户端和订阅客户端)。

2,服务器需要提供的核心API
  • 创建交换机(exchangeDeclare)
  • 移除交换机(exchangeDelete)
  • 创建队列(queueDeclare)
  • 移除队列(queueDelete)
  • 创建绑定关系(queueBind)
  • 解除绑定关系(queueUnBind)
  • 发布消息(basicPublish)
  • 订阅消息(basicConsume)
  • 确认消息(basicAck):消费客户端拿到消息,处理完毕后,向服务器发送确认响应,然后就可以从服务器上删除该消息了。
  • 取消订阅(basicCancel)
3,网络通信

生产者/消费者都是通过网络,和Broker Server进行交互的。

此处设定,使用Tcp+自定义应用层协议实现生产者/消费者和Broker Server的交互。

在客户端这边,也需要提供上述服务器的方法,只不过服务器上的方法是真正做事的,而客户端这边实现的方法,实际上只是发送请求/接受响应。

由于在Tcp中,建立/断开一个连接(Connection)的成本其实挺高的,因此,很多时候,不希望频繁的建立/断开Tcp连接。

为此,RabbitMQ引入一个新的概念——信道(channel),一个连接(Connection)可以创建多个信道,信道(Channel)只是逻辑上的一个概念,它比连接(Connection)轻量很多。

每个信道上的数据传输都是互不相干的,这是它的核心价值。它允许在同一个Tcp连接上,并发地执行多个独立的操作(如发布消息,消费消息,声明队列等等),这些操作都是在不同信道上进行的,互不干扰。

Tcp连接和信道的关系就比如高速公路和车道的关系,Tcp是铺路,信道是划车道。

如上图所示,在发布客户端中,不同的线程使用不同的信道来发送请求,这几个信道是互不影响的,一个信道上操作失败通常是不会影响其他信道。在服务器这边,也有信道,来接受客户端发送来的请求,并进行处理(这么多的请求,到时候肯定会选择使用线程池来处理)。

在这里我有一个问题,就是为什么客户端也要使用信道这个概念,发送请求时,为什么不直接使用多线程,而是要使用信道。也就是下面的这种方式:

如果使用 这种方案:会存在以下问题。

  1. 因为对于客户端发送的一些请求来说,是有严格顺序的,比如客户端发送了声明队列和订阅队列消息这两个请求,而服务器可能先接受到的请求是订阅队列消息,但是此时队列根本还没有创建,就无法订阅,程序混乱。
  2. 当服务端处理完客户端发送的请求后,会给客户端发送一个响应,如果使用这种方式,我们无法知道这个响应是哪个线程对应的请求。
4,消息应答模式

被消费的消息,需要进行应答,有两种应答方式:

自动应答:消息只要消费了,就算应答完毕了,Broker Server直接删除掉这条消息。

手动应答:消费者消费完消息后,需要收到调用接口,像服务器发送响应,然后Broker Server再删除掉这条消息。

六,项目模块化划分

有了上面对消息队列的简单认识,接下来就要细化一下具体要做哪些工作。

需要实现的内容包括:

服务器(Broker Server) 发布客户端(生产者) 订阅客户端(消费者)

1,服务端模块设计
  1. 交换机模块
  2. 队列模块
  3. 绑定关系模块
  4. 消息模块
  5. 虚拟机模块:该模块是对以上四个模块的整合,向外提供操作接口。
  6. 路由匹配模块:根据交换机的类型,判断一条消息应该路由给绑定的哪些队列。
  7. 消费者管理模块:消费者就是订阅者,当消费者调用API订阅了某条消息,其实本质是消费者订阅了某个队列。然后当这个队列有消息时,就会查找这个队列有哪些消费者订阅了,将消息推送给其中一个消费者进行处理,所以在服务器这边,我们需要将消费者的消息保存起来,以便于推送消息时可以找到消费者。
  8. 信道管理模块:一个TCP连接可能会有多个信道,当某个客户端想要关闭通信,关闭的不是连接,而是自己的信道,关闭信道的同时,我们需要将该客户端的订阅信息给删除。
  9. 连接管理模块:我们的服务器可能会收到多个客户端的连接,我们需要将这些连接保存起来。
  10. Broker Server模块(搭建服务器):对以上模块的整合,向客户端提供服务。
2,服务端模块汇总图

首先介绍服务器模块,当 用户创建交换机和队列,以及绑定交换机和队列时,这些信息应该被持久化存储,这里使用的是sqlite数据库进行存储,以便于主机重启/程序重启后,这些信息可以恢复。同样,消息也需要持久化存储,因为一条消息被放入到消息队列后,可能还没有被消费,服务器就可能因为某些原因停止运行了,为了使服务器重启后,消息还存在,这里使用文件进行存储。

当将一条消息发布给消息队列时,这个消息首先是交给交换机,再根据交换机的类型以及路由匹配,将消息推送给绑定的队列,然后这条消息就会被保存下来,内存中保存一份,文件中也会保存。当这个消息推送给某个消费者客户端,并且收到确认响应后,才会从内存和文件中将这条消息删除掉。

3,客户端模块设计
  1. 消费者模块:一个客户端如果订阅了消息,那么他就是消费客户端,在向服务器发送订阅消息请求的时候,要包含本身的消费者信息。
  2. 信道管理模块:同样我们在发布消息/获取消息的时候,是通过信道来完成的。
  3. 连接管理模块:该模块提供各种API接口,比如创建交换机,创建队列等等所有的接口。

最后基于以上三个模块:实现发布客户端和订阅客户端。

4,客户端模块汇总图

七,工具类的实现

在本项目中,会使用到一些工具类,比如线程池,对数据库的操作,随机ID的生成等等。

所以,在这里先实现这些工具类,之后在项目中直接使用。

1,线程池的实现

1,前置知识

使用C++11中的异步操作,实现一个线程池。

std::future是C++11标准库中的一个模板类,它表示一个异步操作的执行结果。当我们在多线程编程中,使用异步操作来执行任务时,std::future可以帮助我们获取任务的执行结果。std::future会阻塞当前线程,直到异步任务执行完。

std::future提供了一种安全的方式来获取异步任务的执行结果,我们可以通过std::future::get()函数来获取任务的结果,此函数会阻塞当前线程,直到异步任务执行完。

std::future通常要与其他组件(如async/promise/packaged_task)搭配使用。 1,std::async是一个函数模板 作用:异步启动任务,该函数会返回一个future对象,用来获取执行结果。 异步任务的执行方式: std::launch::async:强制在新线程执行。 std::launch::deferred:延迟执行(调用get()时执行)。 2,std::promise是一个类模板 调用这个类的成员函数get_future(),返回一个std::future对象。 3,std::packaged_task是一个类模板 同样也可以调用成员函数get_future(),返回一个std::future对象。 可以对可调用对象进行二次封装,将其结果绑定到std::future中

std::async与std::future搭配使用示例:

代码语言:javascript
复制
//使用future来获取异步线程的执行结果
#include <iostream>
#include <future>
#include <chrono>

int Add(int num1,int num2)
{
    std::this_thread::sleep_for(std::chrono::seconds(3));
    std::cout<<"开始执行!"<<std::endl;
    int resul=num1+num2;
    return resul;
}
int main()
{
    //std::launch::deferred  当调用get获取执行结果时,异步线程才会执行任务
    //std::launch::async 异步线程会立即执行任务
    // std::future<int> ret=std::async(std::launch::deferred,Add,11,22);
    std::future<int> ret=std::async(std::launch::async,Add,11,22);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    int sum=ret.get();//获取执行结果
    std::cout<<sum<<std::endl;
    return 0;
}

std::promise与std::future搭配使用示例:

代码语言:javascript
复制
//promise模板类的使用
#include <iostream>
#include <future>
#include <thread>
void Add(int num1,int num2,std::promise<int>& pro)
{
    std::this_thread::sleep_for(std::chrono::seconds(3));
    int result=num1+num2;
    pro.set_value(result);
}
//通过promise对象使两个线程之间的结果得到同步
int main()
{
    std::promise<int> pro;

    std::future<int> fut=pro.get_future();

    std::thread thr(Add,11,22,std::ref(pro));
    int ret=fut.get();//会在这里阻塞住,直到pro中结果被设置了

    std::cout<<"sum:"<<ret<<std::endl;
    thr.join();

    return 0;
}

std::packaged_task与std::future搭配使用案例:

代码语言:javascript
复制
// packaged_task的使用
#include <iostream>
#include <future>
#include <memory>

int Add(int num1, int num2)
{
    std::this_thread::sleep_for(std::chrono::seconds(3));
    std::cout << "开始执行!" << std::endl;
    int resul = num1 + num2;
    return resul;
}
int main()
{
    // 可以通过packaged_task对象交给智能指针来管理,也就是封装成一个指针
    // 然后将该指针传入线程中,再解引用执行
    // 但是此时的packaged_task对象就需要从堆上new一个,而不是在栈上开辟
    // 因为线程的执行顺序不确定,可能packaged_task对象销毁了,才会执行线程函数
    //  std::shared_ptr<std::packaged_task<int(int, int)>> ptask(new std::packaged_task<int(int, int)>(Add));
    auto ptask = std::make_shared<std::packaged_task<int(int, int)>>(Add);
    std::thread thr([ptask]()
                    {
    std::this_thread::sleep_for(std::chrono::seconds(3));

        (*ptask)(11,22); });
    std::future<int> fut = ptask->get_future();
    int ret = fut.get(); // 同样会在这里阻塞
    std::cout << ret << std::endl;
    thr.join();

    // ————————————————————————————————————————————————————————————————————————————————————
    //  定义一个packaged_task对象,它可以用来管理一个可调用对象
    //   std::packaged_task<int(int,int)> task(Add);
    //   std::future<int> fut=task.get_future();
    //  //可以把该对象当作使一个可调用对象来执行
    //  task(11,22);
    //  int ret=fut.get();
    //  std::cout<<ret<<std::endl;

    // ——————————————————————————————————————————————————————————————————————————————————
    //  不能将task对象传入async,让异步线程去执行
    //   std::packaged_task<int(int,int)> task(Add);
    //   std::future<int> fut=task.get_future();
    //   std::async(std::launch::async,task,11,22);
    //   std::cout<<fut.get()<<std::endl;

    // ——————————————————————————————————————————————————————————————————————————————————
    // 也不能将task对象作为线程的入口函数
    //  std::packaged_task<int(int,int)> task(Add);
    //  std::future<int> fut=task.get_future();
    //  std::thread thr(task,11,22);
    //  std::cout<<fut.get()<<std::endl;
    //  thr.join();
    return 0;
}
2,线程池的实现

由于使用std::promise,需要将std::promise对象设置到任务的执行参数中,还需要将运行结果设置到该对象中,也就是我们要改变用户传入的函数 ,实现难度较高,不采用这种方式。而std::async,使用的是自己内部的工作线程,那我们创建的线程用来干什么呢?

所以,最后决定使用std::packaged_taskstd::future来实现线程池。

线程池的实现思想:

  • 用户传入要执行的任务(函数),以及要处理的数据(函数的参数),由线程池中的工作线程来完成异步执行。

要提供的操作:

  • push操作:用户向线程池中放入一个任务。
  • stop操作:停止线程池的运行。

要管理的成员:

  • 任务池:保存用户传入的任务
  • 工作线程池:保存一定数量的线程
  • 互斥锁&条件变量:实现同步互斥
  • 结束运行标志:以便于控制线程池的结束
代码语言:javascript
复制
// 线程池
#ifndef __M_THRPOOL_H__
#define __M_THRPOOL_H__

#include <iostream>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <thread>
#include <functional>
#include <future>
#include <memory>

namespace xgmq
{
    class ThreadPool
    {
    public:
        using Funct = std::function<void()>;
        using ptr=std::shared_ptr<ThreadPool>;
        ThreadPool(const int thr_count = 3)
            : _stop(false)
        {
            for (int i = 0; i < thr_count; i++)
            {
                _thr_pool.emplace_back(&entry, this);
            }
        }
        // 向线程池中加一个任务
        template <typename F, typename... Args>
        auto push(F &&func, Args &&...args) -> std::future<decltype(func(args...))>
        {
            // 获取该函数的返回类型
            auto return_type = decltype(func(args...));
            // 将该函数的参数进行绑定
            auto func_t = std::bind(std::forward(func), std::forward(args)...);
            auto task = std::make_shared < std::packaged_task<return_type()>(func_t);
            std::future<return_type> fut = task->get_future();
            {
                std::unique_lock<std::mutex> lock(_mutex);
                _task_pool.emplace_back([task]()
                                        { (*task)(); });
                _cond.notify_one();
            }
            return fut;
        }
        // 停止线程池的运行
        void stop()
        {
            if (_stop == true)
            {
                return;
            }
            _stop = true;
            _cond.notify_all(); // 唤醒所有进程
            for (int i = 0; i < _thr_pool.size(); i++)
            {
                _thr_pool[i].join();
            }
        }
        ~ThreadPool()
        {
            stop();
        }

    private:
        // 线程入口函数
        void entry()
        {
            while (!_stop)
            {
                std::vector<Funct> _tmp_pool;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    _cond.wait(lock, [this]()
                               { return !_stop || _task_pool.size() > 0; });
                    _tmp_pool.swap(_task_pool);
                }
                for (auto &task : _tmp_pool)
                {
                    task();
                }
            }
        }

    private:
        std::atomic<bool> _stop; // 线程池是否停止运行
        std::mutex _mutex;
        std::condition_variable _cond;
        std::vector<Funct> _task_pool;      // 任务池
        std::vector<std::thread> _thr_pool; // 线程池
    };
}
#endif

2,日志打印

在项目中 ,会涉及到一些地方的日志信息打印,来观察程序的运行状态。

在这里,要求进行日志打印的时候,要带上行号和文件名。同时日志打印也有等级的划分,有调试等级的日志打印,有警告等级的日志打印,也有错误等级的日志打印。

用到的关键技术:C语言库中的strftime函数。 size_t strftime(char *str, size_t maxsize, const char *format, const struct tm *timeptr) 这个函数可以将传入的时间,按照指定格式,组织成字符串。 str标识最终形成的字符串,maxsize表示str的最大字符个数,format表示生成的字符串的格式,最后一个参数是一个tm类型的结构体,这个结构体内部就包含了时间,比如年月日,时分秒,微秒......

使用示例:

代码语言:javascript
复制
//获取当前时间
time_t t=time(nullptr);
//使用当前时间构造一个struct tm类型的结构体
struct tm* ptm=localtime(&t);

char str[32];
strftime(str,31,"%H:%M:%S",ptm);//时间格式为 时:分:秒
代码语言:javascript
复制
// 日志打印工具
#ifndef __M_LOGGER_H__
#define __M_LOGGER_H__
#include <iostream>
#include <cstdio>
#include <ctime>

#define DEBUG_LEVEL 0
#define INFO_LEVEL 1
#define ERR_LEVEL 2
#define DEFAULT_LEVEL DEUBG_LEVEL
#define LOG(leve_str, level, format, ...)                                                                   \
    {                                                                                                       \
        if (level >= DEFAULT_LEVEL)                                                                         \
        {                                                                                                   \
            time_t t = time(nullptr);                                                                       \
            struct tm *ptm = localtime(&t);                                                                 \
            char time_str[32];                                                                              \
            strftime(time_str, 31, "%H:%M:%S", ptm);                                                        \
            printf("[%s][%s][%s:%d]\t" format "\n", leve_str, time_str, __FILE__, __LINE__, ##__VA_ARGS__); \
        }                                                                                                   \
    }

#define DLOG(format,...) LOG("DEBUG",DEBUG_LEVEL,format,##__VA_ARGS__)
#define ILOG(format,...) LOG("INFO",INFO_LEVEL,format,##__VA_ARGS__)
#define ELOG(format,...) LOG("ERR",ERR_LEVEL,format,##__VA_ARGS__)

#endif

3,sqlite数据库操作的封装

代码语言:javascript
复制
    class SqliteHelper
    {
    public:
        typedef int (*callback)(void *, int, char **, char **);
        SqliteHelper(const std::string &dbfile)
            : _dbfile(dbfile), pb(nullptr)
        {
        }
        // 打开数据库文件
        // 传入打开时的安全等级,默认是串行化
        void open(int safe = SQLITE_OPEN_FULLMUTEX)
        {
            // 以可读可写方式,不存在则创建的方式打开
            int ret = sqlite3_open_v2(_dbfile.c_str(), &pb,
                                      SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | safe, nullptr);
            if (ret != SQLITE_OK)
            {
                DLOG("打开数据文件失败:%s", _dbfile.c_str());
                return;
            }
        }
        // 执行sql语句
        bool exec(const std::string &sql, callback cb, void *arg)
        {
            int ret = sqlite3_exec(pb, sql.c_str(), cb, arg, nullptr);
            if (ret != SQLITE_OK)
            {
                DLOG("执行语句失败:%s", sql.c_str());
                return false;
            }
            return true;
        }
        // 关闭数据库文件
        void close()
        {
            if (pb != nullptr)
            {
                sqlite3_close_v2(pb);
            }
        }

    private:
        std::string _dbfile;
        sqlite3 *pb;
    };

4,字符串分割方法的实现

前面提到过,消息是先交给交换机,再根据交换机的类型和路由匹配,决定将这条消息该交给哪个队列。

如果交换机的类型是主题交换模式(Topic),那么就要进行路由匹配(如何匹配在后面模块处详解),只有当binding_key与routing_key匹配成功时,才将该消息交给这个队列。

在这部分会涉及到将一个字符串按照指定字符进行分割的操作,所以在这里先实现。

代码语言:javascript
复制
    class StrSplit
    {
        public:
        static size_t split(const std::string& str,const std::string& sep,std::vector<std::string>& result)
        {   
            size_t pos=0,idx=0;
            while(idx<str.size())
            {
                pos=str.find(sep);
                if(pos==std::string::npos)
                {
                    result.push_back(str.substr(idx));
                    return result.size();
                }
                if(idx==pos)
                {
                    idx=pos+sep.size();
                    continue;
                }
                std::string tmp=str.substr(idx,pos-idx);
                result.push_back(tmp);
                idx=pos+sep.size();
            }
            return result.size();
        }
    };

5,uuid生成器

UUID,也叫通用唯一识别码,通常由32位16进制数字字符组成。 在进行消息的发布和消息的确认到的时候,每个消息是有一个唯一ID的,来标识消息的唯一性。 在这里不使用一个自增的整数来表示,是因为如果消息数量太多的话,这个数字是有可能会越界的。 所以,我们决定使用一个字符串来表示这个ID,那么现在我们就需要保证整个字符串不能出现重复。 uuid的标准形式为:8-4-4-4-12的32个字符,如550e8400-e29b-41d4-a716-446655440000.

实现思路:首先生成8个随机数字,范围在[0,255]之间,因为1个字节可以表示的数据范围就是[0,255],这样就可以生成8字节的数字,再加上8字节的序号,构成16字节的数字,然后将这16字节的数字转化成32位16进制字符。

1,std::random_device这是c++11为我们提供生成随机数的方式,通过这个类定义对象生成的随机数是一种机器随机数 ,也就是通过硬件来生成的,效率较低。 2,标准库中还提供了一种生成随机数的算法:std::mt19937_64,通过这个类生成的随机数范围是在[0,2^19937]之间,不过他生成的随机数是一个伪随机数。 3,所以,我们可以将这两者结合,先使用std::random_device生成一个随机数,将这个随机数设置为std::mt19937_64的随机数种子。之后我们使用std::mt19937_64生成随机数即可。 4,此时生成的随机数很大,而我们要的是[0,255]之间的。 我们可以使用标准库中的std::uniform_int_distribution来限定生成随机数的数字区间。

代码语言:javascript
复制
    class UUID
    {
    public:
        static std::string uuid()
        {
            // 先生成随机数种子
            std::random_device rd;
            std::mt19937_64 genorator(rd()); // 生成伪随机数
            // 生成0-255之间的数字
            std::uniform_int_distribution<int> distribution(0, 255);
            std::stringstream ss;
            for (int i = 0; i < 8; i++)
            {
                ss << std::setw(2) << std::setfill('0') << std::hex << distribution(genorator);
                if (i == 3 || i == 5 || i == 7)
                {
                    ss << "-";
                }
            }

            // 生成序号
            static std::atomic<int> seq(1);
            size_t num = seq.fetch_add(1);

            // 将序号添加到字符串后
            // 将序号的每一字节转化为16进制字符
            // size_t 在64位下是8字节
            for (int i = 7; i >= 0; i--)
            {
                ss << std::setw(2) << std::setfill('0') << std::hex << ((num >> i * 8) & 0xff);
                if (i == 6)
                {
                    ss << "-";
                }
            }
            return ss.str();
        }
    };

6,文件操作类的实现

前面服务器模块设计中提到过,我们使用文件来保存消息,在这里肯定就会涉及到很多文件相关的操作。

需要封装的操作:

  • 判断文件是否存在
  • 获取文件大小
  • 文件读/写
  • 文件的创建/删除
  • 目录的创建/删除

使用C语言库中提供的函数int stat(const char* path,struct stat* buf),可以获取一个文件的信息,比如文件的大小,文件的创建时间,文件的inode编号等等。返回值等于0就表示函数执行成功。

文件的信息就存放于struct stat结构体中。

代码语言:javascript
复制
    class FileHelper
    {
        public:
            FileHelper(const std::string& filename)
            :_filename(filename)
            {}
            //判断问价是否存在
            bool exists()
            {
                struct stat st;
                return (stat(_filename.c_str(),&st)==0);
            }
            //获取文件大小
            size_t size()
            {
                struct stat st;
                int n=stat(_filename.c_str(),&st);
                if(n!=0)
                {
                    DLOG("获取文件信息失败:%s",_filename.c_str());
                    return 0;
                }
                return st.st_size;
            }
            //文件的读取
            //从指定位置开始,读取指定长度的数据
            bool read(char* buff,size_t offset,size_t len)
            {
                std::ifstream ifs(_filename.c_str(),std::ios::binary|std::ios::in);
                if(ifs.is_open()==false)
                {
                    ELOG("打开文件失败:%s",_filename.c_str());
                    return false;
                }
                //移动到指定位置
                ifs.seekg(offset,std::ios::beg);
                //开始读取
                ifs.read(buff,len);
                if(ifs.good()==false)
                {
                    ELOG("读取文件失败:%s",_filename.c_str());
                    return false;
                }
                return true;
            }
            //文件的读取,读取全部数据
            bool read(std::string& buff)
            {
                buff.resize(size());
                return read(&buff[0],0,size());
            }
            //向文件指定位置写入指定长度的数据
            bool write(const char* data,size_t offset,size_t len)
            {
                std::fstream fs(_filename,std::ios::binary|std::ios::in|std::ios::out);
                if(fs.is_open()==false)
                {
                    ELOG("打开文件失败:%s",_filename.c_str());
                    return false;
                }
                //移动到指定位置
                fs.seekg(offset,std::ios::beg);
                fs.write(data,len);
                if(fs.good()==false)
                {
                    ELOG("写入文件失败:%s",_filename.c_str());
                    return false;
                }
                return true;
            }
            //向文件开始处写入数据
            bool write(const std::string& data)
            {
                return write(data.c_str(),0,size());
            }

            //创建文件
            static bool createFile(const std::string& filename)
            {
                std::ifstream ifs(filename,std::ios::binary);
                if(ifs.is_open()==false)
                {
                    ELOG("创建文件失败:%s",filename.c_str());
                    return false;
                }
                ifs.close();
                return true;
            }
            //删除文件
            static void removeFile(const std::string& filename)
            {
                ::remove(filename.c_str());
            }
            //获取文件的父路径
            static std::string parentDirectory(const std::string& path)
            {
                //aaa/bbb/ccc/a.txt
                size_t pos=path.find_last_of("/");
                if(pos==std::string::npos)
                {
                    return "./";
                }
                std::string parent=path.substr(0,pos);
                return parent;
            }
            //创建路径
            static bool createDirectory(const std::string& path)
            {
                //aaa/bbb/ccc 可能存在多级目录 
                size_t pos=0,idx=0;
                while(idx<path.size())
                {
                    pos=path.find("/");
                    if(pos==std::string::npos)
                    {
                        return mkdir(path.c_str(),0775)==0;
                    }
                    std::string parent=path.substr(0,pos);
                    int ret=mkdir(parent.c_str(),0775);
                    //目录如果已经存在,在创建的时候返回置为非0,并且错误码被设置
                    if(ret!=0&&errno!=EEXIST)
                    {
                        ELOG("创建目录失败:%s",parent.c_str());
                        return  false;
                    }
                    idx=pos+1;
                }
                return true;
            }
            //删除路径
            static bool removeDirectory(const std::string& path)
            {
                std::string cmd="rm -rf "+path;
                return ::system(cmd.c_str());
            }
            //修改文件名称
            bool rename(const std::string& nname)
            {
                return (::rename(_filename.c_str(),nname.c_str())==0);
            }
        private:
        std::string _filename;
    };

八,项目实现

项目实现部分不展示代码,代码仓库附在最后。

1,服务器模块

1,交换机模块

首先是交换机数据结构的定义:

对于一个交换机,它一个具有以下属性:

  • 交换机的名称(交换机唯一标识)
  • 交换机的类型(主题交换OR直接交换OR广播交换)
  • 交换机的存储方式(是否需要将交换机的信息进行持久化存储)

接着是交换机信息持久化类的实现:使用sqlite数据库来保存

最后是对交换机管理类的实现:向外提供声明交换机,移除交换机,获取指定交换机的方法。

2,队列模块

该模块与交换机模块类似:

队列结构的定义:

  • 队列的名称(唯一标识)
  • 存储方式(是否持久化)
  • 是否独占标志(只能有一个客户端来订阅该队列)
  • 是否自动删除标志 (当创建该队列的客户端退出后,是否自动 删除该队列)

还有队列信息持久化类的实现:使用sqlite数据库来存储。

最后是队列管理类的实现:向外提供声明队列,移除队列,获取指定队列的方法。

3,绑定关系

描述交换机与队列之间的关系,这个模块的设计与前面两个相似。

首先是绑定关系结构的定义:

应该包含的成员:交换机名称,队列名称,还有binding_key(交换机进行消息路由的时候会使用到)

还有绑定关系的持久化类,交换机和哪些队列绑定了也要持久化存储,这样在服务器重启,仍然可以恢复。

最后是一个管理类,向外提供建立一组绑定关系,解除一组绑定关系,解除指定交换机的所有绑定关系,解除指定队列的所有绑定关系,获取一组绑定关系,获取指定交换机的所有绑定关系等操作。

4,消息模块

消息在进行持久化存储的时候,使用的是文件存储,这就会涉及到序列化和反序列工作,所以这里使用protobuf来描述消息的结构,生成消息的结构定义以及序列化和反序列化功能的实现。

编写的message.proto文件内容(在这里顺便也定义了交换机的类型):

一个完整的消息包含有效载荷和其他参数

  • 消息的有效载荷包括:这个消息的属性,消息的正文,以及这个消息是否有效。
  • 消息的属性包括消息的id,消息的routing_key,以及消息的投递模式(是否持久化存储)。
  • 其他参数:这个消息在文件中存储的位置(偏移量)和这个消息的长度。
代码语言:javascript
复制
syntax="proto3";

package xgmq;
//交换机类型
enum ExchangeType
{
    UNKNOWN_TYPE=0;
    DIRECT=1;
    TOPIC=2;
    FANOUT=3;
};
enum DeliveryMode
{
    UNKNOWN=0;
    UNDURABLE=1;
    DURABLE=2;
};
message basicProperties
{
    string id=1;
    DeliveryMode delivery_mode=2;
    string routing_key=3;
};
message Message
{
    message Payload
    {
        basicProperties properties=1;
        string body=2;
        string valid=3;
    };
    Payload payload=1;
    uint32 offset=2;
    uint32 length=3;
};

消息的持久化实现:

在这里,使用文件来存储消息,但是这里如果使用一个文件来存储所有的消息,那么在访问的时候,就需要加锁,锁竞争会非常频繁,所以不适用这种方式。消息的载体是队列,消息最终都是要交给队列的,所以在消息进行持久化时,以队列为单位,每个队列一个文件,来存储该队列上的消息,这样当访问不同队列的消息时,就不存锁竞争问题时,访问同一个队列的消息时,还是需要加锁的,这样就可以大大降低锁竞争的概率。

在实现消息的持久化存储,就必然会涉及到消息的读和写,那么此时就会涉及到怎么写,怎么读的问题,也就是协议。这里设定,在写文件的时候 ,按照格式:【8个字节的消息长度 消息内容】这样的方式进行写入。那么在读取的时候,就先读前8个字节,知道消息的长度之后,就可以读取后面消息的内容 。这样做就可以解决类似Tcp粘包问题。

新增一个消息,我们直接在文件后面追加写这个消息即可,但如果时删除消息,此时我们的做法是,找到这条消息在文件中的位置(偏移量offset),将这条消息的是否有效标志为设为无效,然后写入文件,覆盖掉之前的数据,这样就算一次删除消息的操作。

但是删除完一条消息之后,文件的大小没变,如果此时文件中无效消息数量超过总消息数量的一半,那么此时就需要进行垃圾回收,此时我们创建一个临时文件,然后读取 源文件,将有效消息写入新文件,最后在讲源文件删除掉,临时文件重命名为源文件名,这样一个删除操作才算完成。

5,虚拟机模块

虚拟机模块是对以上四个模块的整合,使用上述四个模块的管理句柄,向外提供服务:

声明/销毁交换机,声明/删除队列,绑定/解绑,发布一条消息,订阅一条消息,确认一条消息,还有获取所有队列,获取指定交换机的所有绑定 信息等操作。

6,路由匹配模块

当一条消息被发布到交换机上之后,需要判断交换机的类型,如果是主题交换(Topic),需要进行路由匹配,决定这条消息应该交给哪些队列。

这个模块只用来提供功能:

  • 判断routing_key是否合法 。
  • 判断binding_key是否合法。
  • 判断binding_key与routing_key是否匹配成功(路由匹配功能)。

routing_key和binding_key都是以'.'作为分割的字符串。

对于routing_key来说:合法字符包含(a~z,A~Z,0~9, '.' , '_')等字符。

对于binding_key来说:合法字符包含(a~z,A~Z,0~9, '.' , '_'),除了这些外,还包含通配符'*'和'#'。但是这两个通配符不能连续出现,并且单独出现的,不能和其他字符一起出现。'*'可以匹配任意的一个字符,'#'可以匹配0个或多个任意字符。

路由匹配规则:

先将routing_key和binding_key按照'.'进行分割,使用数组保存分割后的字符串(单词)。也就是说现在有两个数组:rout_arr保存routing_key分割 后得到的每个单词,bind_arr保存binding_key分割后的每个单词。

然后让rout_arr的每个单词与bind_arr的每个单词进行比对。(使用动态规划算法)

定义状态表示:dp[i][j]表示rout_arr的第i个单词与bind_arr的第j个单词是否匹配成功。0表示匹配失败,1表示匹配成功 。

结果:dp表最后一个位置的值表示最终是否匹配成功。

状态转移方程的推导:

示例1:binding_key="bbb.ddd",routing_key="aaa.ddd",dp表结果如下:

aaa

ddd

bbb

0

0

ddd

0

0

当binding_key与routing_key的第一个单词进行比较的时候,bbb与aaa匹配失败。当binding_key与routing_key的第二个单词进行比较的时候,本来是匹配成功的,但由于前一个单词匹配失败,所以这里也就失败了。所以可以得出一个状态转移:如果dp[i][j]是匹配成功的,那么还需要看上一个单词的匹配结果,dp[i][j]=dp[i-1][j-1]

示例2:binding_key="#",routing_key="aaa.bbb",下面的dp表多开一行一列,可以避免越界,但是dp[0][0]要置为 1,否则就会影响后面结果判断的正确性。

aaa

bbb

1

0

0

#

0

1

0->1

#与aaa匹配成功,dp[1][1]=1,同时继承与dp[i-1][j-1]=1,所以dp[1][1]=1。

#与bbb匹配成功,dp[2][2]=1,但由于dp[i-1][j-1]=0,所以dp[2][2]=0。

但是按理来说,#是匹配任意多个字符,是可以匹配成功 aaa.bbb的,所以这里还需要从当前位置的左边继承结果。可以得出结论,如果是通配符"#",dp[i][j]不仅是要从dp[i-1][j-1]继承结果,还要从dp[i][j-1]位置继承结果。

所以,如果出现"#",dp[i][j]=dp[i-1][j-1] | dp[i][j-1]。

示例3:binding_key="aaa.#",routing_key="aaa"

aaa

1

0

aaa

0

1

#

0

0->1

同理,上面的示例可以看出,当出现"#"时,还可以从上方继承结果。

所以dp[i][j]=dp[i-1][j-1] | dp[i][j-1] | dp[i-1][j]。

"#"还有 一个特殊之处,就是如果binding_key是以"#"开头,那么dp表第一列需置为1,否则会影响后续填表的正确性。

总结:

  1. 当两个单词匹配成功时,从左上方继承结果:dp[i-1][j-1]。
  2. 当 遇到通配符"#"时:dp[i][j]=dp[i-1][j-1] | dp[i][j-1] | dp[i-1][j]。
  3. 当binding_key以 "#"开始时:需要将对应行的第一列位置初始化为1.
  4. 当遇到通配符"*"时,"*"是用来匹配任意一个单词的,所以可以当作第一种情况。
7,消费者管理模块

当有消费者订阅了服务器上的某个队列时,就需要将该消费者的信息保存起来,当这个队列上有了消息之后 ,就会把消息推送给这个队列,因此需要将消费者的信息记录下来。

注意:一个队列可能会被多个消费者订阅,当队列中有消息时,会采用RR轮转的方式,选择一个消费者,然后将消息推送给消费者,从而实现负载均衡。

在前面提到过,客户端与服务器在及逆行交互的时候,使用的是信道,信道给我们提供服务的。当客户端想要订阅一个队列的消息时,是通过信道发送对应的请求,然后在服务器这边,就会创建一个消费者对象,然后将这个消费者队列"绑定"在一起,当队列中有消息的时候,就找到该消费者,然后找到对应的信道,将响应发送给客户端。

一个 消费者属于某个信道的,当信道关闭或者连接关闭时,都要删除这个消费者。

定义消费者结构:

  • 消费者名称:消费者唯一标识
  • 队列名称:该消费者所订阅的队列
  • 是否自动应答标志:该消费者获取到消息后,是否需要收到对这个消息进行确认。
  • 一个回调函数:当该队列有消息后,先找对应的消费者,再调用消费者内部的回调函数,将这条消息发送给客户端。
8,信道管理模块

信道是针对连接更细粒度的一个通信通道,多个信道可以使用同一个连接来进行通信,但是同一个连接的信道之间是相互独立的。

信道建立在TCP连接之上的,是抽象出来的概念,用户通过信道向服务器发送请求,同时服务器也通过信道向客户端发送响应。

信道应该提供以下的服务:

  • 创建/删除交换机
  • 创建/删除队列
  • 绑定/解绑
  • 发布消息
  • 订阅消息
  • 对消息进行确认
  • 取消订阅

信道结构的定义:

  • 信道ID:信道的唯一标识
  • 信道关联的消费者:信道提供了订阅消息的服务 ,当某个用户订阅了队列的消息,此时就会产生一个消费者。当队列中有消息的时候,就会把消息推送给消费者。注意:如果信道关闭了,就需要将这条信道上的消费者也删除掉,避免造成内存泄漏的问题。
  • 信道关联的连接:毕竟真正进行通信的时候,还是要使用到底层TCP连接的。
  • 虚拟机句柄:完成上述各种操作,创建/删除交换机......
  • 线程池句柄:当一个消息被推送到某个队列上后,需要将这条消息推送给订阅的消费者,这个各种交给线程池来完成。
  • 消费者管理句柄:用于删除消费者信息。信道关闭/取消订阅的时候 ,要删除消费者信息 。(防止内存泄漏)
  • protobuf协议处理句柄:网络通信中的协议处理。

接下来还需要对信道进行管理:

  • 用哈希表来管理信道——信道ID和信道对象的映射。
  • 同时提供打开信道,关闭信道,获取信道的功能。
9,应用层协议设计

应用层协议格式 如下:

  • len:4个字节,表示整个报文的长度。
  • namelen:表示typeName的长度。
  • typeName:表示请求/响应报文的类型名称。(比如创建交换机请求的类型,创建队列请求的类型)
  • protobufData:表示请求/响应数据通过protobuf序列化之后的二进制数据。
  • checkSum:校验和。

上述的应用层协议,其实就是muduo库中,陈硕大佬定义的一种应用层协议。我们使用muduo库搭建服务器,所以也就使用这个应用层协议来实现网络通信。

接下来需要定义请求/响应参数(格式):

代码语言:javascript
复制
syntax="proto3";

package xgmq;

//需要用到消息的属性,所以引入了消息模块
import "message.proto";

//信道的打开与关闭请求
message openChannelRequest
{
    string rid=1;//请求id
    string cid=2;//信道id
};

message closeChannelRequest
{
    string rid=1;
    string cid=2;
};

//交换机的创建与销毁请求
message declareExchangeRequest
{
    string rid=1;
    string cid=2;
    string exchange_name=3;//交换机名称
    ExchangeType exchange_type=4;//交换机类型
    bool durable=5;//是否持久化
};

message deleteExchangeRequest
{
    string rid=1;
    string cid=2;
    string exchange_name=3;
};

//队列的创建与销毁请求
message declareQueueRequest
{
    string rid=1;
    string cid=2;
    string queue_name=3;
    bool exclusive=4;
    bool durable=5;
    bool auto_delete=6;
};

message deleteQueueRequest
{
    string rid=1;
    string cid=2;
    string queue_name=3;
};

//队列的绑定与解绑请求
message queueBindRequest
{
    string rid=1;
    string cid=2;
    string exchange_name=3;
    string queue_name=4;
    string binding_key=5;
};
message queueUnBindRequest
{
    string rid=1;
    string cid=2;
    string exchange_name=3;
    string queue_name=4;
};

//消息的发布
message basicPublishRequest
{
    string rid=1;
    string cid=2;
    string exchange_name=3;
    string body=4;
    basicProperties perties=5;
};

//消息的确认请求
message basicAckRequest
{
    string rid=1;
    string cid=2;
    string msg_id=3;
    string queue_name=4;
};

//队列订阅请求
message basicConsumeRequest
{
    string rid=1;
    string cid=2;
    string consume_tag=3;//消费者名称
    bool auto_ack=4;//是否自动应答
    string queue_name=5;//队列名
};

//队列的取消订阅
message basicCancelRequest
{
    string rid=1;
    string cid=2;
    string queue_name=3;
    string consum_tag=4;
};

//消息的推送响应
message basicConsumeResponse
{
    string cid=1;
    string consum_tag=2;//消费者名称
    string body=3;//消息内容
    basicProperties properties=4;//消息的属性
};

//通用响应
message basicCommonResponse
{
    string rid=1;
    string cid=2;
    bool ok=3;
};
10,连接管理模块

服务器可能会收到很多客户端的连接,所以先将这些连接管理起来,每个TCP连接都对应有一个信道管理句柄,这样就实现了一个TCP连接,可以创建多个信道。

而muduo中的连接connection对象,是没有这个功能的,使用这个connection对象只是来完成网络通信的。

所以,这里我们需要再将muduo库中的connection连接封装一次,提供创建信道/关闭信道的功能。

提供的操作:

  • 创建/关闭信道。

成员信息:

我们通过连接创建信道,一个连接的成员就是创建信道时所需的参数,比如connection连接,虚拟机句柄等等。

  • 信道管理句柄(实现信道的增删查):用来打开/关闭/获取信道。当某个连接断开时,相关的信道信息也应该被释放掉,避免内存泄漏。(信道释放了,那么这条信道上的消费者也应该被释放掉,避免内存泄漏,不过这个工作已经在信道模块处理过了,当一个信道对象析构时,其内部会通过消费者管理句柄删除该消费者对象)
  • 连接关联的实际用于通信的 muduo::net::Connection 连接
  • protobuf 协议处理的句柄
  • 消费者管理句柄
  • 虚拟机句柄
  • 异步工作线程池句柄

连接管理的实现:

使用哈希表来管理连接,使用muduo::net::TcpConnectionPtr作为key,而我们封装的连接对象作为value。

当一条Tcp连接建立成功之后,我们需要为这个Tcp连接建立一个我们封装的连接对象,通过这个对象,进行打开信道,关闭信道,获取信道的操作,这样通过一个Tcp连接,就可以建立多个信道了。

11,brokerserver模块

该模块使用muduo库进行服务器的搭建,只需设置请求的处理回调函数即可。

服务器一旦启动,需要完成对消费者管理结构的初始化,因为当由客户端订阅队列消息时,需要将消费者信息添加到消费者管理结构中,如果不初始化,则会出现对空对象访问的错误。

2,客户端模块

1,订阅者模块(消费者模块)

这个模块并不直接向用户展示,这个模块只起到一个角色描述的作用,表示这是一个消费者。如果是生产者发布消息,那它就不需要这个。

2,信道模块

在客户端这边同时也是有信道的,用户通过信道向服务器发送请求,所以,信道同样也要提供服务器模块的相关服务:创建/删除交换机,创建/删除队列......,不过这些服务的内部,只是构建出一个请求,然后发送给服务器,服务器收到后,进行解析处理,所以真正做事的是服务器。

信道的管理:向用户提供创建信道,关闭信道,获取信道的操作。当用户调用创建信道的操作时,同时其内部会向服务器发送一个创建信道的请求,至此服务器 和客户端都存在信道了,而客户端这边的信道是为用户提供服务的,而服务器这边的信道是为客户端提供服务的。

注意:当向服务器发送一个请求后,需要等待这个请求被服务器处理后,并接收到服务器发送来的响应,才能进行下一步操作。由于muduo库中的操作都是异步执行的,当要执行如下操作 时:创建交换机1,创建队列1,绑定交换机1和队列1,结果可能是:创建队列1和绑定请求先发送到了服务器,交换机1还没有创建,此时就会出现绑定失败的问题。所以 ,我们需要等待创建交换机1的请求接收到对应的响应之后,再发送下一个请求。

这里使用哈希表来存储响应id和响应的映射。

3,异步工作线程模块

这个模块是实现客户端异步工作的,防止下面的操作阻塞住当前的工作线程。

  • 使用EventLoopThread来监控底层IO事件是否就绪。
  • 当接收到服务器推送过来的消息,使用线程池来处理。
4,连接模块

这是进行网络通信的模块,也是使用muduo来实现底层的网络通信,一个连接可以创建出多个信道来进行网络通信。向用户提供打开信道/关闭信道的功能。

最后通过客户端模块,就可以分别搭建出发布客户端和订阅客户端。

九,项目总结

1,在订阅客户端启动时,程序异常退出了,问题在于订阅客户端在创建信道时,调用连接对象conn接口,而conn中的信道管理对象没有初始化,造成了空指针的解引用错误。

未初始化ChannelManagger对象的智能指针。

2,没有关闭信道,造成资源泄漏。

3,在服务器将消息推送给客户端时,由于判断条件写反->!bp,导致消费者的属性没有发送,这时客户端通过消息的id进行消息确认的时候,就无法完成确认,也就无法从服务器的队列中删除这条消息,从而一直占有内存资源,会造成内存泄漏问题。

十,仓库链接

消息队列: 消息队列(MQ)是一种在进程间或不同线程间进行通信的技术,它允许应用程序发送消息到队列中,无需立即处理,而是可以存储起来,直到需要处理为止。消息队列作为一种中间件,不仅仅是消息的存储容器,更是一种异步通信机制,使得消息的发送者和接收者能够解耦,提高系统的响应速度和可靠性。该项目是仿照RabbitMQ实现的一个简单消息队列 - Gitee.com

https://gitee.com/wang-junyanxx/message_queue/tree/master

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-07-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一,项目介绍
  • 二,开发环境
  • 三,技术选型
  • 四,第三方库介绍
    • 1,Protobuf
    • 2,muduo库
    • 3,SQLite
    • 4,GTest
  • 五,项目需求分析
    • 1,核心概念
    • 2,服务器需要提供的核心API
    • 3,网络通信
    • 4,消息应答模式
  • 六,项目模块化划分
    • 1,服务端模块设计
    • 2,服务端模块汇总图
    • 3,客户端模块设计
    • 4,客户端模块汇总图
  • 七,工具类的实现
    • 1,线程池的实现
      • 1,前置知识
      • 2,线程池的实现
    • 2,日志打印
    • 3,sqlite数据库操作的封装
    • 4,字符串分割方法的实现
    • 5,uuid生成器
    • 6,文件操作类的实现
  • 八,项目实现
    • 1,服务器模块
      • 1,交换机模块
      • 2,队列模块
      • 3,绑定关系
      • 4,消息模块
      • 5,虚拟机模块
      • 6,路由匹配模块
      • 7,消费者管理模块
      • 8,信道管理模块
      • 9,应用层协议设计
      • 10,连接管理模块
      • 11,brokerserver模块
    • 2,客户端模块
      • 1,订阅者模块(消费者模块)
      • 2,信道模块
      • 3,异步工作线程模块
      • 4,连接模块
  • 九,项目总结
  • 十,仓库链接
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档