支持插件的消息中间件【msg broker with plugin】

支持插件的消息中间件

msg broker with plugin

Msg Broker概念:

msg broker是实现application 之间互通讯的组件。通常为实现application之间的解耦,消息都是通过msg broker完成转发。application只需知道其他applicatipn的逻辑名称,而不需要知道对方的具体位置。Broker中维护一个查找表,记录着哪个application注册在此逻辑名称之下,所以消息总是会被正确的投递到目的地。

msg broker不限于1-1的转发,也支持1-N的模式。其主要功能有:

  1. 实现多个application的互通讯,而隐藏彼此的位置
  2. 实现消息个格式的转换,如json to bin
  3. 安全控制,msg broker可以再转发消息前进行一定程度的安全验证
  4. 增大系统的可伸缩性,由于application通讯的目标变成了逻辑结点,而该逻辑结点可以对应多个物理结点,理论上可以动态的增加物理结点,来扩展该逻辑结点的吞吐量。
  5. msg broker可以用来集成服务,并且可以暴楼服务的部分接口

 msg broker 具有的缺点是:

  1. 增加了复杂性,多了一层转发
  2. 可维护性降低,需要理清msg broker和各个application和服务的关系。
  3. 降低性能,主要是实时性能下降了,消息需要多转发一边,单次请求的延时大大增加了。

当前流行的Broker的特点和缺点:

Msg Broker的结构:

流行的Broker中间件介绍:

  1. RabbitMQ

项目地址:http://www.rabbitmq.com/

RabbitMQ是由Erlang开发的以高效、健壮以及高度伸缩性的消息服务器。其所包含的概念有Producer、Consumer、Exchange、Queue。RabbitMQ基于QMQP协议,支持的语言也非常丰富,文档也非常清晰。使用RabbitMQ可以实现订阅发布模型、RPC模型、路由模型等,参见RabbitMQ的例子:http://www.rabbitmq.com/getstarted.html

但是它有如下局限性:

  • RabbitMQ 没有针对连接做控制,它是为高效而生,它对外来的请求是信任的,不存在安全验证,如任何一个client都可以创建消息队列,所以RabbitMQ一定是放在内网的。
  • 使用RabbitMQ ,我们是通过Client远程操作RabbitMQ,不能定制RabbitMQ的功能。
  1. ZeroMQ

项目地址:http://www.zeromq.org/

ZeroMQ是一个Socket封装库,号称是最快的消息内核。ZeroMQ可以支持TCP、UDP、IPC等多种通讯协议。ZeroMQ可以实现的通讯模型就更多了,几乎涵盖了消息通讯的所有模式,参见官网介绍http://www.zeromq.org/intro:read-the-manual

其局限性为:

  • ZeroMQ虽然封装了消息传输的复杂性,但是它也隐藏了连接的建立、断开等过程。ZeroMQ传输消息更像是udp数据报,使用者不能知道对方何时连接建立、何时连接断开。

我们需要一个不一样的Broker

应用场景介绍

在网络游戏中,cliet和服务器是通过tcp长连接的。相对于HTTP+WebServer的不同在于:

  • client连接到服务器,需要进行身份验证,通常是client第一个消息包含身份验证数据如用户名密码等,而验证通过后该连接为可信任连接。
  • client 任意时间都可以向服务器发送请求,而不需要服务器立即返回,同样,服务器是在任意时间(当然会有实时性等约束)都可以像client推送消息。
  • client断开连接时,服务器必须捕获该事件,以便完成一些数据清理操作。
  • client对应的一般是个集群,但是client无从得知细节,因为它只连接最外层的一个,给他取个名字“MsgBroker”。
  • Msg Broker 不许有一定的安全控制,如心跳、网络包频率限制等,防范某些可能的攻击。
  • Msg Broker需要高度可定制。不同的游戏主要是逻辑不同,而MsgBroker大多大同小异。当然MsgBroker总是会根据需求稍作修改。
  • Msg Broker 主要瓶颈是IO操作,因为它涉及大量的网络连接、断开、心跳、广播消息等。而它具有的领域逻辑则非常非常少。所以Msg Broker的逻辑可以使用动态脚本实现,其实时性、效率都能满足要求。

需要的broker具有的功能:

  • 能够捕获client连接事件
  • 能够捕获client断开事件
  • 具有网络心跳功能
  • 方便的消息发送接口
  • broker可以以client的角色连接到其他Server,因为从其他逻辑角度看,Broker可能是其他服务的使用者。
  • Broker 提供消息收发框架,逻辑层通过插件实现。
  • 实现插件的方式有
    • 动态链接库,可以将逻辑层封装到so链接库中
    • python脚本,逻辑层可以有python脚本实现,Broker封装了载入python、调用python,封装消息发送接口到Pyhton
    • Lua脚本,逻辑层也可以又Lua脚本实现,Broker封装了载入lua、调用lua、封装消息接口给lua。

Msg Broker 结构图

Msg Broker  的安装使用:

安装依赖库:

由于msg broker支持Python和lua作为插件,那么必须确保linux下安装了相应的头文件。示例中的插件均只实现了echo功能。

  • 确保Linux系统安装了Python,推荐python2.6
  • 确保安装了Python-devel,如果是centos,直接yum即可。
  • 确保安装了Lua-5.1.4, 其他版本没有测试过
  • 下载Msg Broker最新源码,目前处于0.1版本

svn co https://ffown.googlecode.com/svn/trunk/

  • 编译源码:
    • cd trunk/example/plugin_msg_broker/
    • make
  • 编译动态连接库插件
    • cd plugin/plugin_echo_dll/
    • sh gen_dll.sh

运行示例插件:

  • 运行动态链接库
    • ./msg_broker_server 127.0.0.1 10241 plugin/plugin_echo_dll/libechoso
    • 另开终端,telent 127.0.01 10241, 收入5 回车,再输入5个字符,通讯协议是body长度加回车加body,如图:
  • 运行Python 脚本示例程序
    • ./msg_broker_server 127.0.0.1 10241 plugin/plugin_echo_py/echo.py
    • 同样使用telnet 测试echo功能
  • 运行Lua脚本示例程序
    • ./msg_broker_server 127.0.0.1 10241 plugin/plugin_echo_lua/lua.py
    • 同样使用telnet 测试echo功能

插件层设计分析:

插件接口:

#ifndef _PLUGIN_H_
#define _PLUGIN_H_

#include "channel.h"
#include "message.h"

class plugin_i
{
public:
    virtual ~plugin_i(){}
    virtual int start() = 0;
    virtual int stop() = 0;

    virtual int handle_broken(channel_ptr_t channel_) = 0;
    virtual int handle_msg(const message_t& msg_, channel_ptr_t channel_) = 0;
};

typedef plugin_i* plugin_ptr_t;
typedef int (*handle_channel_msg_func_t)(const message_t& msg_, channel_ptr_t);
typedef int (*handle_channel_broken_func_t)(channel_ptr_t);

#define HANDLE_CHANNEL_MSG       "handle_channel_msg"
#define HANDLE_CHANNEL_BROKEN "handle_channel_broken"
#endif

各个接口作用如下:

  • start 实现插件载入,环境初始化
  • stop实现优雅的退出
  • handle msg 为消息到来通知
  • handle_broken 为对方连接关闭

Channel 设计

channel 用来表示一个连接,可以理解成socket的抽象,也可直接理解成远程client。

#ifndef _CHANNEL_H_
#define _CHANNEL_H_

#include "socket_i.h"

class channel_t
{
public:
    channel_t(socket_ptr_t sock_);
    ~channel_t();
    void  set_data(void* p);
    void* get_data() const;

    template<typename T>
    T* get_data() const { return (T*)this->get_data(); }

    void async_send(const string& buff_);
    void close();

private:
    socket_ptr_t    m_socket;
    void*               m_data;
};

typedef channel_t* channel_ptr_t;
#endif

各个接口作用如下:

  • 构造,channel必须绑定一个socket
  • set_data get_data用来操作channel私有数据,如我们可以在channel私有数据中存放该channel对应的uid,这样每个channel之需验证一次,以后自然知道到来的消息属于哪个channel。
  • async_send 异步发送消息
  • close 关闭连接

动态链接库插件:

流程如下:
  • 载入动态库
  • 获取动态库的接口,记录函数指针地址
  • 若有msg到来,调用动态链接库的handle_msg
  • 若连接关闭,调用动态链接库的handl_broken

int plugin_dll_t::start()
{
    m_dll_handler = ::dlopen(m_dll_name.c_str(), RTLD_NOW|RTLD_GLOBAL);

    if (NULL == m_dll_handler)
    {
        logerror((PLUGIN_IMPL, "plugin_dll_t::start dlopen failed:<%s>", dlerror()));
        return -1;
    }

    m_msg_cb     = (handle_channel_msg_func_t)::dlsym(m_dll_handler, HANDLE_CHANNEL_MSG);
    m_broken_cb = (handle_channel_broken_func_t)::dlsym(m_dll_handler, HANDLE_CHANNEL_BROKEN);

    if (NULL == m_msg_cb)
    {
        logerror((PLUGIN_IMPL, "plugin_dll_t::start dlopen failed:<%s> not exist", HANDLE_CHANNEL_MSG));
        return -1;
    }
    if (NULL == m_broken_cb)
    {
        logerror((PLUGIN_IMPL, "plugin_dll_t::start dlopen failed:<%s> not exist", HANDLE_CHANNEL_BROKEN));
        return -1;
    }

    return 0;
}

int plugin_dll_t::stop()
{
    ::dlclose(m_dll_handler);
    return 0;
}

int plugin_dll_t::handle_broken(channel_ptr_t channel_)
{
    return m_broken_cb(channel_);
}

int plugin_dll_t::handle_msg(const message_t& msg_, channel_ptr_t channel_)
{
    return m_msg_cb(msg_, channel_);
}

Python 插件

其工作流程如下:
  • 初始化Python解释权,将封装的发送消息接口注册到Python虚拟机中
  • 设置PythonPath
  • 载入python文件
  • 若msg到来,调用python全局函数handle_msg
  • 若channel断开,调用Python 全局handle_broken 函数

#include "plugin_impl/plugin_python.h"
#include "plugin_impl/pyext.h"
#include "log_module.h"


plugin_python_t::plugin_python_t(const string& name_):
    m_py_mod(NULL)
{
    string pythonpath = "./";
    int pos = name_.find_last_of('/');
    if (-1 == pos)
    {
        m_py_name = name_;
    }
    else
    {
        m_py_name = name_.substr(pos+1);
        pythonpath = name_.substr(0, pos+1);
    }
    pos = m_py_name.find_first_of('.');
    m_py_name = m_py_name.substr(0, pos);

    Py_InitializeEx(0);
    Py_SetPythonHome((char*)pythonpath.c_str());
    initpyext(this);
    PyRun_SimpleString("import channel;import sys;sys.path.append('./plugin/plugin_echo_py/')");
}

plugin_python_t::~plugin_python_t()
{
    Py_Finalize();
}

int plugin_python_t::start()
{
    if(load_py_mod())
    {
        return -1;
    } 
    return 0;
}

int plugin_python_t::stop()
{
    return 0;
}

int plugin_python_t::load_py_mod()
{
    PyObject *pName, *pModule;
    pName = PyString_FromString(m_py_name.c_str());
    pModule = PyImport_Import(pName);
    if (!pModule )  
    {
        Py_DECREF(pName);
        logerror((PLUGIN_IMPL, "can't find %s.py\n", m_py_name.c_str()));
        if (PyErr_Occurred())
    {
        PyErr_Print();
        PyErr_Clear();
        return -1;
    }  
        return -1;
    }
    m_py_mod = PyModule_GetDict(pModule);
    Py_DECREF(pName);
    Py_DECREF(pModule);
    return 0;
}

int plugin_python_t::handle_broken(channel_ptr_t channel_)
{    
    m_channel_mgr.erase(long(channel_));
    delete channel_;
    return call_py_handle_broken(long(channel_));
}
int plugin_python_t::handle_msg(const message_t& msg_, channel_ptr_t channel_)
{
    m_channel_mgr.insert(make_pair((long)channel_, channel_));
    return call_py_handle_msg((long)channel_, msg_.get_body().c_str());
}


int plugin_python_t::call_py_handle_msg(long val, const char* msg)
{
    PyObject *pDict       = m_py_mod;
    const char* func_name = "handle_msg";
    PyObject *pFunc, *arglist, *pRetVal;

    pFunc = PyDict_GetItemString(pDict, func_name);
    if (!pFunc || !PyCallable_Check(pFunc))  
    {  
        logerror((PLUGIN_IMPL, "can't find function [%s]\n", func_name));
        return -1;
    }
    arglist = Py_BuildValue("ls", val, msg);
    pRetVal = PyObject_CallObject(pFunc, arglist);
    Py_DECREF(arglist);
    if (pRetVal)
    {
        Py_DECREF(pRetVal);
    }
    if (PyErr_Occurred())
    {
        PyErr_Print();
        PyErr_Clear();
        return -1;
    }
    return 0;
}

int plugin_python_t::call_py_handle_broken(long val)
{
    PyObject *pDict       = m_py_mod;
    const char* func_name = "handle_broken";
    PyObject *pFunc, *arglist, *pRetVal;

    pFunc = PyDict_GetItemString(pDict, func_name);
    if (!pFunc || !PyCallable_Check(pFunc))  
    {  
        logerror((PLUGIN_IMPL, "can't find function [%s]\n", func_name));
        return -1;
    }
    arglist = Py_BuildValue("l", val);
    pRetVal = PyObject_CallObject(pFunc, arglist);
    Py_DECREF(arglist);
    if (pRetVal)
    {
        Py_DECREF(pRetVal);
    }
    if (PyErr_Occurred())
    {
        PyErr_Print();
        PyErr_Clear();
        return -1;
    }
    return 0;
}

channel_ptr_t plugin_python_t::get_channel(long p)
{
    map<long, channel_ptr_t>::iterator it = m_channel_mgr.find(p);
    if (it != m_channel_mgr.end())
    {
        return it->second;
    }
    return NULL;
}

Lua 插件:

工作流程如下:
  • 初始化lua虚拟机
  • 注册发送消息接口给lua
  • 载入Lua脚本
  • 有msg到来,调用lua的hanle_msg接口
  • 有channel断开,调用lua的handle_broken接口

static plugin_lua_t* g_plugin_lua_obj = NULL;
static int channel_send_msg(lua_State* ls_)
{
    long ptr = (long)luaL_checknumber(ls_, 1);
    size_t len = 0;
    const char* msg = luaL_checklstring(ls_, 2, &len);
    channel_ptr_t c = g_plugin_lua_obj->get_channel(ptr);
    if (c)
    {
        c->async_send(msg);
    }
    return 0;
}

plugin_lua_t::plugin_lua_t(const string& name_):
    m_ls(NULL)
{
    g_plugin_lua_obj = this;
    string luapath = "./";
    int pos = name_.find_last_of('/');
    if (-1 == pos)
    {
        m_lua_name = name_;
    }
    else
    {
        m_lua_name = name_.substr(pos+1);
        luapath = name_.substr(0, pos+1);
    }
    pos = m_lua_name.find_first_of('.');
    m_lua_name = m_lua_name.substr(0, pos);
    
    m_ls = lua_open();
    lua_checkstack(m_ls, 20);

    lua_pushcfunction(m_ls, channel_send_msg);
    lua_setglobal(m_ls, "_tmp_func_");
    luaL_dostring(m_ls, "channel = {} channel.send = _tmp_func_ _tmp_func_ = nil");

    string lua_str = "package.path = package.path .. \"" + luapath + "?.lua\"";
    luaL_openlibs(m_ls);

    if (luaL_dostring(m_ls, lua_str.c_str()))
    {
        lua_pop(m_ls, 1);
    }
    m_lua_name = name_;
}

plugin_lua_t::~plugin_lua_t()
{
}

int plugin_lua_t::start()
{
    if (load_lua_mod())
    {
        logerror((PLUGIN_IMPL, "can't find %s.lua\n", m_lua_name.c_str()));
        return -1;
    }
    return 0;
}

int plugin_lua_t::stop()
{
    return 0;
}

int plugin_lua_t::handle_broken(channel_ptr_t channel_)
{
    m_channel_mgr.erase(long(channel_));
    delete channel_;
    return call_lua_handle_broken(long(channel_));
}

int plugin_lua_t::handle_msg(const message_t& msg_, channel_ptr_t channel_)
{
    m_channel_mgr.insert(make_pair((long)channel_, channel_));
    return call_lua_handle_msg((long)channel_, msg_.get_body());
}

int plugin_lua_t::load_lua_mod()
{
    if (luaL_dofile(m_ls, m_lua_name.c_str()))
    {
        lua_pop(m_ls, 1);
        return -1;
    }
    return 0;
}

int plugin_lua_t::call_lua_handle_msg(long val, const string& msg)
{
    lua_checkstack(m_ls, 20);
    lua_getglobal(m_ls, "handle_msg");
    lua_pushnumber(m_ls, val);
    lua_pushlstring(m_ls, msg.c_str(), msg.size());
    if (lua_pcall(m_ls, 2, 0, 0) != 0)
    {
        lua_pop(m_ls, 1);
        return -1;
    }
    return 0;
}

int plugin_lua_t::call_lua_handle_broken(long val)
{
    lua_checkstack(m_ls, 20);
    lua_getglobal(m_ls, "handle_broken");
    lua_pushnumber(m_ls, val);
    if (lua_pcall(m_ls, 1, 0, 0) != 0)
    {
        lua_pop(m_ls, 1);
        return -1;
    }
    return 0;
}

channel_ptr_t plugin_lua_t::get_channel(long p)
{
    map<long, channel_ptr_t>::iterator it = m_channel_mgr.find(p);
    if (it != m_channel_mgr.end())
    {
        return it->second;
    }
    return NULL;
}

msg_broker 待完善的地方:

  1. 心跳层还未加入
  2. 插件层报错不够友好
  3. Python 中封装的channel使用long型,调用send接口时需要从long转化到channel,需要优化一下,直接封装一个channel对象到Python
  4. Lua中channel的封装暂时也是使用long来表示,具有和上面一样的性能损耗问题

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

扩展CakePHP的CacheHelper以使用缓存引擎

CakePHP是一个MVC设计模式下的PHP框架,它使得您的生活更加简单并且让您的开发工作更上一层楼。尽管它被认为是一个相对缓慢的框架,(因为)它带有的大量缓存...

1989
来自专栏BeJavaGod

RabbitMQ 整合Spring 实现多客户端发送消息队列

好久没更新了。。实在太忙,对不起大家,今天更一下最近使用到的一些东西,比较实用!看官们亲拍~~ 以前在单项目中用过RabbitMQ,没有问题 不过这次在分布式项...

2805
来自专栏网络

负载均衡原理的解析

作者:源子姗 my.oschina.net/u/3341316/blog/877206 开头先理解一下所谓的“均衡” 不能狭义地理解为分配给所有实际服务器一样多...

2478
来自专栏IT技术精选文摘

有赞MySQL自动化运维之路—ZanDB

一、前言 在互联网时代,业务规模常常出现爆发式的增长。快速的实例交付,数据库优化以及备份管理等任务都对DBA产生了更高的要求,单纯的凭借记忆力去管理那几十套DB...

2328
来自专栏文武兼修ing——机器学习与IC设计

AHB学习笔记1.AHB概述2.AHB信号3.AHB传输4.控制信号

1.AHB概述 AHB总线是一种专为高性能同步传输设计的总线,层次高于APB总线,支持以下特性: 突发传输 拆分事务 主设备单时钟周期传输 单时钟沿操作 非三态...

3759
来自专栏技术分享

封装RabbitMQ.NET Library 的一点经验总结

这篇文章内容会很短,主要是想给大家分享下我最近在做一个简单的rabbitmq客户端类库的封装的经验总结,说是简单其实一点都不简单。为了节省时间我主要按照Libr...

2466
来自专栏逸鹏说道

全站缓存时代

原则:动静分离,分级缓存,主动失效。 Web 开发中,接口会被分为以下几类: 纯静态页面。打死我都不会修改的页面。很长一段时间内,基本上不会修改。比如:关于我们...

3058
来自专栏IT技术精选文摘

LVS集群中的IP负载均衡技术

1.前言 在已有的IP负载均衡技术中,主要有通过网络地址转换(Network Address Translation)将一组服务器构成一个高性能的、高可用的虚拟...

2567
来自专栏北京马哥教育

优化存储性能?你需要关注这些Linux I/O调度程序选项

要优化Linux性能,IT团队应该检查当前正在使用的I/O调度程序,并评估诸如deadline和完全公平队列(Completely Fair Queuing)这...

2824
来自专栏杨建荣的学习笔记

Greenplum集群问题修复案例

我看了下GP Master端,看到负载并不高,当然这是一个初步的检测,如果集群响应缓慢,则很可能是segment节点上出现了延迟。一看则吓一跳,这是一个segm...

832

扫码关注云+社区