首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >从0开始搭建编程框架——插件

从0开始搭建编程框架——插件

作者头像
方亮
发布2019-01-16 16:26:09
7320
发布2019-01-16 16:26:09
举报
文章被收录于专栏:方亮方亮

我将插件设计为两种类型。一种是框架自身携带的插件,用于增强其基础能力。一种是用户自定义插件,用于完成业务。本文将分别介绍在peleus框架下,这两种插件该怎么写。(转载请指明出于breaksoftware的csdn博客)

        本文谈下框架自身携带的插件怎么写。我们以操作mysql为例。

        第一步把mysqlclient开发环境给准备好。

Wget https://downloads.mysql.com/archives/get/file/mysql-connector-c-6.1.11-src.tar.gz .
tar -zxvf mysql-connector-c-6.1.11-src.tar.gz
cd mysql-connector-c-6.1.11-src
cmake -G "Unix Makefiles"
cmake -G "Unix Makefiles" -DCMAKE_INSTALL_PREFIX=/usr
make
sudo make install

        由于实际安装到系统中的头文件不全,我们还要从https://github.com/mysql/mysql-server/blob/5.6/include/hash.h把hash.h文件放到/usr/include下。

        因为mysql-connector-c是C语言编码,为了统一peleus下语言栈,我们将其封装成一个类

#include "mysql_connection.h"
#include <butil/logging.h>
#include <errmsg.h>
#include "mysql_error.h"

namespace peleus {
namespace plugins {
namespace mysql_conns {

using peleus::plugins::configure::mysql_connection;

MysqlConnection::MysqlConnection() {
    _conn = NULL;
}

MysqlConnection::~MysqlConnection() {
    _disconnect();
}

void MysqlConnection::init(const mysql_connection& conf) {
    _conf.CopyFrom(conf);
}

int MysqlConnection::mysql_real_query(const std::string& sql) {
    int ret = 0;
    if (!_conn) {
        ret = _reconnct();
        if (ret) {
            return ret;
        }
    }

    LOG(INFO) << "query sql:" << sql.c_str();
    const int max_retry = 2;
    for (auto i = 0; i < max_retry; i++) {
        ret = ::mysql_real_query(_conn, sql.c_str(), sql.length());
        if (0 == ret) {
            break;
        }
        else if (i < max_retry - 1){
            LOG(INFO) << "mysql connection is lost.retry " << i;
            int ret_re = _reconnct();
            if (ret_re) {
                LOG(WARNING) << "mysql connection is lost.retry error" << ret_re;
                return ret_re;
            }
        }
        else if (ret != CR_SERVER_LOST && ret != CR_SERVER_GONE_ERROR) {
            LOG(ERROR) << "query error:" << ret << " sql:" << sql.c_str();
            return ret;
        }
    }

    return ret;
}

MYSQL_RES* MysqlConnection::mysql_store_result() {
    if (_conn) {
        return ::mysql_store_result(_conn);
    }
    else {
        return NULL;
    }
}

int MysqlConnection::_reconnct() {
    if (_conn) {
        _disconnect();
    }
    return _connect();
}

int MysqlConnection::_connect() {
    _conn = mysql_init(NULL);
    if (!_conn) {
        return CR_OUT_OF_MEMORY;
    }
    unsigned long time_out = _conf.read_timeout();
    unsigned long conn_out = _conf.conn_timeout();
    mysql_options(_conn, MYSQL_OPT_READ_TIMEOUT, (char*)&time_out);
    mysql_options(_conn, MYSQL_OPT_CONNECT_TIMEOUT, (char*)&conn_out);

    if (!mysql_real_connect(_conn,
            _conf.host().c_str(),
            _conf.user().c_str(),
            _conf.password().c_str(),
            _conf.dbname().c_str(),
            _conf.port(),
            NULL, 0))
    {
        _disconnect();
        return CR_SERVER_GONE_ERROR;
    }

    int ret = mysql_set_character_set(_conn, _conf.character_set().c_str());
    if (ret) {
        _disconnect();
    }

    return ret;
}

void MysqlConnection::_disconnect() {
    if (_conn) {
        mysql_close(_conn);
        _conn = NULL;
    }
}

}
}
}

        MysqlConnection类主要对外提供连接和查询数据库的功能,并支持断开重连功能。这是针对一个连接的,我们需要连接池来提高连接的使用率。

MysqlConnectionPool::MysqlConnectionPool() {
}

MysqlConnectionPool::~MysqlConnectionPool() {
    for (auto it = _unused.begin(); it != _unused.end(); it++) {
        if (*it) {
            delete *it;
            *it = NULL;
        }
    }
}

void MysqlConnectionPool::init(const peleus::plugins::
        configure::mysql_connection_pool& conf) {
    _conf.CopyFrom(conf);
    int size = _conf.size();
    if (size < 0) {
        _use_pool = false;
        return;
    }

    _use_pool = true;
    pthread_mutex_init(&_mutex, NULL);
    pthread_mutex_lock(&_mutex);
    for (int i = 0; i< size; i++) {
        MysqlConnection* conn = new MysqlConnection();
        conn->init(_conf.conn_info());
        _unused.push_back(conn);
    }
    pthread_mutex_unlock(&_mutex);
}

int MysqlConnectionPool::query(const std::string& sql, MysqlSession& session) {
    if (_use_pool) {
        return query_from_pool(sql, session);
    }
    return query_once(sql, session);
}

void MysqlConnectionPool::recovery(MysqlConnection* conn) {
    if (_use_pool) {
        pthread_mutex_lock(&_mutex);
        _unused.push_back(conn);
        pthread_mutex_unlock(&_mutex);
    }
}

int MysqlConnectionPool::query_once(const std::string& sql, MysqlSession& session) {
    MysqlConnection conn;
    conn.init(_conf.conn_info());
    int ret = conn.mysql_real_query(sql);
    session._res = conn.mysql_store_result();
    return ret;
}

int MysqlConnectionPool::query_from_pool(const std::string& sql, MysqlSession& session) {
    MysqlConnection* conn = NULL;
    pthread_mutex_lock(&_mutex);
    if (!_unused.empty()) {
        conn = _unused.back();
        _unused.pop_back();
    }
    pthread_mutex_unlock(&_mutex);
    if (!conn) {
        LOG(ERROR) << "need more connection";
        return LACK_CONNECTION;
    }

    int ret = conn->mysql_real_query(sql);
    session._res = conn->mysql_store_result();
    session._conn = conn;
    session._conn_pool = this;
    return ret;
}

        MysqlConnectionPool将空闲连接放在_unused中。当需要做查询操作时,便从_unused中取出一个连接使用。但是连接池里的连接不能交由用户管理,于是需要暴露query_from_pool方法,将连接隐藏起来。

        由于业务可能需要连接多个数据库,所以需要做一个针对多个数据库的连接管理器

MysqlConns::MysqlConns() {
}

MysqlConns::~MysqlConns() {
    for (auto it = _conns.begin(); it != _conns.end(); it++) {
        if (it->second) {
            delete it->second;
            it->second = NULL;
        }
    }
}

void MysqlConns::init(const char* conf_floder_path) {
    TraversalCallback traversal_floder = [this](const char* path) {this->init(path);};
    TraversalCallback traversal_file = [this](const char* path) {this->init_from_file(path);};
    TraversalFloder traversal;
    traversal.set_callback(traversal_floder, traversal_file, __FILE__);
    traversal.init(conf_floder_path);
}

void MysqlConns::init_from_file(const char* conf_path) {
    mysql_connection_pool config;
    bool suc = peleus::utils::file2conf(conf_path, config);
    if (!suc) {
        LOG(ERROR) << "init " << conf_path << " error";
        return;
    }
    const std::string& conn_name = config.name();
    MysqlConnectionPool* conn_pool = init_conn_pool(config);
    if (!conn_pool) {
        LOG(ERROR) << "init mysql connection pool error:" << conn_name;
        return;
    }
    _conns[conn_name] = conn_pool;
}

MysqlConnectionPool* MysqlConns::init_conn_pool(const mysql_connection_pool& conf) {
    MysqlConnectionPool* conn_pool = new MysqlConnectionPool();
    conn_pool->init(conf);
    return conn_pool;
}


int MysqlConns::query(const std::string& conn_name,
        const std::string& sql,
        MysqlSession& session)
{
    auto it = _conns.find(conn_name);
    if (it == _conns.end()) {
        LOG(ERROR) << "cant find mysql connection:" << conn_name.c_str();
        return NO_CONNECTION;
    }
    if (!it->second) {
        LOG(ERROR) << "mysql connection pool is null";
        return NULL_CONNECTION;
    }
    return it->second->query(sql, session);
}

        至此,我们将mysql最基础的操作部分给封装完毕。现在开始写插件

class MysqlVisitorService :
    public peleus::plugins::MysqlVisitorService,
    public peleus::entrance::Entrance
{
public:
    explicit MysqlVisitorService(const char* name);

public:
    virtual void query(::google::protobuf::RpcController* controller,
       const ::peleus::plugins::MysqlVisitorServiceRequest* request,
       ::peleus::plugins::MysqlVisitorServiceResponse* response,
       ::google::protobuf::Closure* done) override;
public:
    virtual void on_init(const char* conf_path) final;
    virtual void reset() final;
private:

    void parse_res(
            MYSQL_RES* res,
            ::peleus::plugins::MysqlVisitorServiceResponse* response);

    void parse_session(
            peleus::plugins::mysql_conns::MysqlSession& session,
            ::peleus::plugins::MysqlVisitorServiceResponse* response);
};

        MysqlVisitorService将作为内部服务的一个业务逻辑存在,同时它也是一个入口,所以继承于Entrance接口。MysqlVisitorService是protobuf产生的接口类

syntax="proto2";
option cc_generic_services = true;

package peleus.plugins;

message MysqlVisitorServiceRequest {
    required string conn_name = 1;
    required string command = 2;
};

message MysqlVisitorRow {
    repeated string column = 1;
};

message MysqlVisitorServiceResponse {
    required bool suc = 1;
    repeated MysqlVisitorRow rows = 2;
};

service MysqlVisitorService {
    rpc query(MysqlVisitorServiceRequest) returns (MysqlVisitorServiceResponse);
};

        MysqlVisitorService的业务代码就是query,其会调用MysqlConns类的方法去查询结果,然后解析和封装返回

MysqlVisitorService::MysqlVisitorService(const char* name) :
    peleus::entrance::Entrance(name) {
}

void MysqlVisitorService::query(::google::protobuf::RpcController* controller,
    const ::peleus::plugins::MysqlVisitorServiceRequest* request,
    ::peleus::plugins::MysqlVisitorServiceResponse* response,
    ::google::protobuf::Closure* done)
{
    ClosureGuard done_guard(done);
    const std::string& conn_name = request->conn_name();
    const std::string& command = request->command();

    MysqlSession session;
    int ret = MysqlConns::get_mutable_instance().query(conn_name, command, session);
    parse_session(session, response);
    response->set_suc(0 == ret ? true : false);
}

void MysqlVisitorService::parse_session(
        MysqlSession& session,
        ::peleus::plugins::MysqlVisitorServiceResponse* response)
{
    MYSQL_RES* res = session._res;
    if (!res) {
        return;
    }

    int row_size = mysql_num_rows(res);
    for (int i = 0; i < row_size; i++) {
        parse_res(res, response);
    }
}

void MysqlVisitorService::parse_res(
        MYSQL_RES* res,
        ::peleus::plugins::MysqlVisitorServiceResponse* response)
{
    MysqlVisitorRow* row = response->add_rows();
    MYSQL_ROW res_row = mysql_fetch_row(res);
    if (!row) {
        LOG(ERROR) << "add_rows return null";
        return;
    }
    int field_size = mysql_num_fields(res);
    for (int i = 0; i < field_size; i++) {
        row->add_column(std::string(res_row[i]));
    }
}

        最后在插件启动代码中加入注册方法就行了

extern "C" {
    bool plugins_main(const char*);
};

#define USE(x, y)   \
    if (item_conf.name() == x) {    \
        y::get_mutable_instance().init(item_conf.conf_path().c_str());    \
    }

#define REG(x, y, z)    \
    if (item_conf.name() == x) {   \
        return register_class<y>(z);    \
    }

bool load_moudle(const plugin_conf& item_conf) {
    if (!item_conf.on()) {
        return true;
    }

    REG("mysql_visitor_service", MysqlVisitorService, "MysqlVisitorService");

    return true;
}

bool plugins_main(const char* conf_path) {
    plugins_conf config;
    bool suc = file2conf(conf_path, config);
    if (!suc) {
        LOG(ERROR) << "conv config error:" << conf_path;
        return false;
    }

    size_t size = config.conf_size();

    for (size_t i = 0; i < size; i++) {
        const plugin_conf& item_conf = config.conf(i);
        if (!load_moudle(item_conf)) {
            break;
        }
    }

    return true;
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年08月15日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档