前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >C++搭建集群聊天室(十八):nginx + redis 发布订阅 升级项目为集群服务器

C++搭建集群聊天室(十八):nginx + redis 发布订阅 升级项目为集群服务器

作者头像
看、未来
发布2021-10-09 14:26:46
6810
发布2021-10-09 14:26:46
举报
文章被收录于专栏:CSDN搜“看,未来”

文章目录

环境配置与基本知识

C++搭建集群聊天室(十七):ngnix简介及tcp负载均衡配置

Redis环境搭建与配置

hiredis从安装到实操,一条龙服务

redis事务处理机制,但当涉猎

了解更多 redis 相关知识:我的redis专栏


上面该看的看完了,咱往下可就直接上码啦!!!

这次的改动会有有点大。


redis.hpp

愿意放哪儿放哪儿,我觉着吧,怎么说redis也是个数据库,就放 db 文件夹下吧。

代码语言:javascript
复制
#ifndef REDIS_H
#define REDIS_H

#include <hiredis/hiredis.h>
#include <thread>
#include <functional>
using namespace std;

class Redis
{
public:
    Redis();
    ~Redis();

    // 连接redis服务器 
    bool connect();

    // 向redis指定的通道channel发布消息
    bool publish(int channel, string message);

    // 向redis指定的通道subscribe订阅消息
    bool subscribe(int channel);

    // 向redis指定的通道unsubscribe取消订阅消息
    bool unsubscribe(int channel);

    // 在独立线程中接收订阅通道中的消息
    void observer_channel_message();

    // 初始化向业务层上报通道消息的回调对象
    void init_notify_handler(function<void(int, string)> fn);

private:
    // hiredis同步上下文对象,负责publish消息
    redisContext *_publish_context;

    // hiredis同步上下文对象,负责subscribe消息
    redisContext *_subcribe_context;

    // 回调操作,收到订阅的消息,给service层上报
    function<void(int, string)> _notify_message_handler;
};

#endif

redis.cpp

这一套,可以在做轻量级集群服务器间通信用,封装好了的。

代码语言:javascript
复制
#include "redis.hpp"
#include <iostream>
using namespace std;

Redis::Redis():_publish_context(nullptr), _subcribe_context(nullptr){}

Redis::~Redis(){
    if (_publish_context != nullptr){
        redisFree(_publish_context);
    }

    if (_subcribe_context != nullptr){
        redisFree(_subcribe_context);
    }
}

bool Redis::connect(){
    // 负责publish发布消息的上下文连接
    _publish_context = redisConnect("127.0.0.1", 6379);
    if (nullptr == _publish_context){
        cerr << "connect redis failed!" << endl;
        return false;
    }

    // 负责subscribe订阅消息的上下文连接
    _subcribe_context = redisConnect("127.0.0.1", 6379);
    if (nullptr == _subcribe_context){
        cerr << "connect redis failed!" << endl;
        return false;
    }

    // 在单独的线程中,监听通道上的事件,有消息给业务层进行上报
    thread t([&]() {
        observer_channel_message();
    });
    t.detach();

    cout << "connect redis-server success!" << endl;

    return true;
}

// 向redis指定的通道channel发布消息
bool Redis::publish(int channel, string message){
    redisReply *reply = (redisReply *)redisCommand(_publish_context, "PUBLISH %d %s", channel, message.c_str());
    if (nullptr == reply){
        cerr << "publish command failed!" << endl;
        return false;
    }
    freeReplyObject(reply);
    return true;
}

// 向redis指定的通道subscribe订阅消息
bool Redis::subscribe(int channel){
    // SUBSCRIBE命令本身会造成线程阻塞等待通道里面发生消息,这里只做订阅通道,不接收通道消息
    // 通道消息的接收专门在observer_channel_message函数中的独立线程中进行
    // 只负责发送命令,不阻塞接收redis server响应消息,否则和notifyMsg线程抢占响应资源
    if (REDIS_ERR == redisAppendCommand(this->_subcribe_context, "SUBSCRIBE %d", channel)){
        cerr << "subscribe command failed!" << endl;
        return false;
    }
    // redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)
    int done = 0;
    while (!done){
        if (REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)){
            cerr << "subscribe command failed!" << endl;
            return false;
        }
    }
    // redisGetReply

    return true;
}

// 向redis指定的通道unsubscribe取消订阅消息
bool Redis::unsubscribe(int channel){
    if (REDIS_ERR == redisAppendCommand(this->_subcribe_context, "UNSUBSCRIBE %d", channel)){
        cerr << "unsubscribe command failed!" << endl;
        return false;
    }
    // redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)
    int done = 0;
    while (!done){
        if (REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)){
            cerr << "unsubscribe command failed!" << endl;
            return false;
        }
    }
    return true;
}

// 在独立线程中接收订阅通道中的消息
void Redis::observer_channel_message(){
    redisReply *reply = nullptr;
    while (REDIS_OK == redisGetReply(this->_subcribe_context, (void **)&reply)){
        // 订阅收到的消息是一个带三元素的数组
        if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr)
        {
            // 给业务层上报通道上发生的消息
            _notify_message_handler(atoi(reply->element[1]->str) , reply->element[2]->str);
        }

        freeReplyObject(reply);
    }

    cerr << ">>>>>>>>>>>>> observer_channel_message quit <<<<<<<<<<<<<" << endl;
}

void Redis::init_notify_handler(function<void(int,string)> fn){
    this->_notify_message_handler = fn;
}

chatservice修改

头文件里面自行修改吧,这里放出源文件的修改范围。

构造函数中连接上redis:

代码语言:javascript
复制
// 连接redis服务器
    if (_redis.connect()){
        // 设置上报消息的回调
        _redis.init_notify_handler(std::bind(&ChatService::handleRedisSubscribeMessage, this, _1, _2));
    }

登录成功后,向redis消息队列进行订阅:

代码语言:javascript
复制
 // id用户登录成功后,向redis订阅channel(id)
            _redis.subscribe(id); 

用户注销之后,取消订阅:

代码语言:javascript
复制
    // 用户注销,相当于就是下线,在redis中取消订阅通道
    _redis.unsubscribe(userid); 

(客户端里以外掉线也给它来上这么一下)


单聊:

代码语言:javascript
复制
//一对一聊天
void ChatService::onechat(const TcpConnectionPtr &conn,json &js,Timestamp time){
//    cout<<js<<endl;

    int toid = js["toid"].get<int>(); //这里bug

    // bool userstate = false;
    //开辟锁的作用域
    {
        lock_guard<mutex> lock(_connMutex);

        auto it = _userConnMap.find(toid);

        if(it != _userConnMap.end()){  
            //用户在线,转发消息
            it->second->send(js.dump());
            return;
        }   
    }

    // 查询toid是否在线 
    User user = _usermodel.query(toid);
    if (user.getstate() == "online"){
        _redis.publish(toid, js.dump());
        return;
    }

    // toid不在线,存储离线消息
    _offlineMsgmodel.insert(toid, js.dump());
}

群聊:

代码语言:javascript
复制
// 群组聊天业务
void ChatService::groupChat(const TcpConnectionPtr &conn, json &js, Timestamp time)
{
    int userid = js["id"].get<int>();
    int groupid = js["groupid"].get<int>();
    vector<int> useridVec = _groupModel.queryGroupUsers(userid, groupid);

    lock_guard<mutex> lock(_connMutex);
    for (int id : useridVec){
        auto it = _userConnMap.find(id);
        if (it != _userConnMap.end()){
            // 转发群消息
            it->second->send(js.dump());
        }
        else{
            User user = _usermodel.query(id);
            if (user.getstate() == "online"){
                _redis.publish(id, js.dump());
            }
            else{
                // 存储离线群消息
                _offlineMsgmodel.insert(id, js.dump());
            }
        }
    }
}

从redis消息队列中获取订阅的消息

代码语言:javascript
复制
void ChatService::handleRedisSubscribeMessage(int userid, string msg){
    lock_guard<mutex> lock(_connMutex);
    auto it = _userConnMap.find(userid);
    if (it != _userConnMap.end())
    {
        it->second->send(msg);
        return;
    }

    // 存储该用户的离线消息
    _offlineMsgmodel.insert(userid, msg);
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/08/20 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 环境配置与基本知识
  • redis.hpp
  • redis.cpp
  • chatservice修改
    • 从redis消息队列中获取订阅的消息
    相关产品与服务
    云数据库 Redis
    腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档