前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Zookeeper C++编程实战之配置更新

Zookeeper C++编程实战之配置更新

作者头像
一见
发布2018-12-13 16:55:37
1.4K0
发布2018-12-13 16:55:37
举报
文章被收录于专栏:蓝天

CZookeeperHelper: https://github.com/eyjian/libmooon/blob/master/include/mooon/net/zookeeper_helper.h CMainHelper: https://github.com/eyjian/libmooon/blob/master/include/mooon/sys/main_template.h

代码语言:javascript
复制
 // 演示一个多线程程序如何借助zookeeper,实现配置的动态更新
  
 //
 
 // 实现理念(有些场景不适合):
 
 // 1) 让线程不涉及配置的动态更新,这样避免了动态更新配置
 
 // 2) 通过创建新线程的方式达到配置动态更新的目的,老的线程直接退出
 
 // 3) 先创建新线程,再退出老线程,保持服务不中断
 
 //
 
 // 实际上,也可以通过父子进程方式来达到配置动态更新,
 
 // 父进程检测到配置更新后,父进程读取配置,并检查配置的合法性。
 
 // 如果合法则创建新的子进程,完成后再kill原有的子进程,
 
 // 这样子进程就不涉及配置更新逻辑。
 
 //
 
 // 这两种方法,均可比较简单应对复杂的配置动态更新,
 
 // 但如果新旧配置无法同时兼容,则需要先停掉老的线程或进程,
 
 // 然后再启动新的线程或进程,否则做到无缝地动态更新。
 
 //
 
 // 编译要求环境:C++11或更高
 
 // 编译语句大致如下:
 
 // g++ -g -o b zk_conf_example.cpp -I/usr/local/mooon/include -I/usr/local/zookeeper/include /usr/local/mooon/lib/libmooon.a /usr/local/zookeeper/lib/libzookeeper_mt.a -pthread -std=c++11 -DMOOON_HAVE_ZOOKEEPER=1 -lz
 
 			#include <mooon/net/zookeeper_helper.h>
 
 			#include <mooon/sys/datetime_utils.h> // 格式化时间也可以考虑C++标准库提供的std::put_time
 
 			#include <mooon/sys/main_template.h>
 
 			#include <mooon/utils/args_parser.h>
 
 			#include <chrono>
 
 			#include <condition_variable>
 
 			#include <mutex>
 
 			#include <system_error>
 
 			#include <thread>
 
 
 
 // 指定存放配置的zookeeper
 
 			STRING_ARG_DEFINE(zookeeper, "", "Comma separated list of servers in the ZooKeeper Quorum, example: --zookeeper=127.0.0.1:2181");
 
 
 
 class CMyApplication;
 
 
 
 // 负责具体业务的工作者(线程)
 
 class CWorker
 
 {
 
 public:
 
 			    CWorker(CMyApplication* app, int index);
 
 			    void run(); // 线程入口函数
 
 			    void stop() { _stop = true; }
 
 
 
 private:
 
 			    CMyApplication* _app;
 
 int _index;
 
 			    volatile bool _stop;
 
 };
 
 
 
 // 应用程序主类(或叫上下文类,也可叫入口类)
 
 // 通过继承CZookeeperHelper,获得zookeeper操作能力,
 
 // 包括读写zookeeper数据能力、发现配置更新能力和主备切换能力。
 
 //
 
 // 可继承mooon::sys::CMainHelper,
 
 // 以获得通过信号SIGTERM的优雅退出能力,
 
 // CMainHelper提供了优雅和安全的信号处理,
 
 // 默认的优雅退出信号为SIGTERM,可自定义为其它信号。
 
 class CMyApplication: public mooon::net::CZookeeperHelper, public mooon::sys::CMainHelper
 
 {
 
 public:
 
 			    CMyApplication();
 
 
 
 private:
 
 // num_workers 需要启动的CWorker个数
 
 			    bool start_workers(
 
 			        std::vector<std::thread>* work_threads,
 
 			        std::vector<std::shared_ptr<CWorker>>* workers,
 
 int num_workers);
 
 			    void stop_workers(
 
 			        std::vector<std::thread>* work_threads,
 
 			        std::vector<std::shared_ptr<CWorker>>* workers);
 
 // 当zookeeper的会话过期后,
 
 // 需要调用recreate_zookeeper_session重新建立会话
 
 			    void recreate_zookeeper_session();
 
 
 
 // 实现父类CMainHelper定义的虚拟函数(实为回调函数),
 
 // 以下五个“on_”函数,均运行在独立的信号线程中,而不是主线程中。
 
 private:
 
 // 主线程的调用顺序:
 
 // main()
 
 // -> on_check_parameter() -> on_init()
 
 // -> on_run() -> on_fini()
 
 //
 
 // 注意on_terminated()是由信号触发的,
 
 // 由独立的信号线程调用,但位于on_init()之后。
 
 			    virtual bool on_check_parameter();
 
 			    virtual bool on_init(int argc, char* argv[]);
 
 			    virtual bool on_run(); // 这里使得配置动态生效
 
 			    virtual void on_fini();
 
 			    virtual void on_terminated();
 
 
 
 // 实现父类CZookeeperHelper定义的虚拟函数(实为回调函数)
 
 // 以下五个“on_”函数,均运行在独立的zookeeper线程中,而不是主线程中。
 
 private:
 
 			    virtual void on_zookeeper_session_connected(const char* path);
 
 			    virtual void on_zookeeper_session_connecting(const char* path);
 
 			    virtual void on_zookeeper_session_expired(const char *path);
 
 			    virtual void on_zookeeper_session_event(int state, const char *path);
 
 			    virtual void on_zookeeper_event(int type, int state, const char *path);
 
 
 
 private:
 
 			    volatile bool _stop;
 
 			    std::mutex _mutex;
 
 			    std::condition_variable _cond;
 
 			    std::vector<std::thread> _work_threads;
 
 			    std::vector<std::shared_ptr<CWorker>> _workers;
 
 
 
 private:
 
 			    volatile bool _conf_changed; // 配置发生变化
 
 			    volatile bool _zookeeper_session_expired; // zookeeper的会话(session)过期
 
 			    std::string _zk_nodes; // 存放配置的zookeeper节点列表
 
 			    std::string _conf_zkpath; // 配置的zookeeper节点路径
 
 };
 
 
 
 int main(int argc, char* argv[])
 
 {
 
 			    CMyApplication app;
 
 			    return mooon::sys::main_template(&app, argc, argv);
 
 }
 
 
 
 			static unsigned long long get_current_thread_id()
 
 {
 
 			    std::stringstream ss;
 
 			    ss << std::this_thread::get_id();
 
 			    return std::stoull(ss.str());
 
 }
 
 
 
 			CMyApplication::CMyApplication()
 
 : _stop(false), _conf_changed(false), _zookeeper_session_expired(false)
 
 {
 
 			    _conf_zkpath = "/tmp/conf";
 
 }
 
 
 
 			bool CMyApplication::on_check_parameter()
 
 {
 
 // 命令行参数“--zookeeper”不能为空
 
 			    return !mooon::argument::zookeeper->value().empty();
 
 }
 
 
 
 			bool CMyApplication::on_init(int argc, char* argv[])
 
 {
 
 			    try
 
 {
 
 // 以this方式调用的函数,均为CZookeeperHelper提供
 
 			        _zk_nodes = mooon::argument::zookeeper->value();
 
 			        this->create_session(_zk_nodes);
 
 
 
 // zookeeper的会话(session)是异步创建的,
 
 // 只有连接成功后,方可读取存放在zookeeper上的配置数据。
 
 for (int i=0; i<5&&!_stop; ++i)
 
 {
 
 if (this->is_connected())
 
 			                break;
 
 else
 
 			                std::this_thread::sleep_for(std::chrono::milliseconds(1000));
 
 }
 
 
 
 if (!this->is_connected())
 
 {
 
 			            fprintf(stderr, "Can not connect zookeeper://%s\n", _zk_nodes.c_str());
 
 			            return false;
 
 }
 
 else
 
 {
 
 // 取zookeeper节点数据
 
 			            std::string zkdata;
 
 int n = get_zk_data(_conf_zkpath.c_str(), &zkdata, 4);
 
 if (n > 4 || zkdata.empty())
 
 {
 
 // 配置数据的大小超出预期
 
 			                fprintf(stderr, "conf size error: %d\n", n);
 
 			                return false;
 
 }
 
 else
 
 {
 
 // 如果zkdata不是一个有效的数字,
 
 // stoi会抛出异常invalid_argument
 
 const int num_workers = std::stoi(zkdata);
 
 
 
 if (num_workers < 1 || num_workers > 10)
 
 {
 
 			                    fprintf(stderr, "conf error: %d\n", num_workers);
 
 			                    return false;
 
 }
 
 else
 
 {
 
 			                    return start_workers(&_work_threads, &_workers, num_workers);
 
 }
 
 }
 
 }
 
 }
 
 			    catch (std::invalid_argument& ex)
 
 {
 
 			        fprintf(stderr, "%s\n", ex.what());
 
 			        return false;
 
 }
 
 			    catch (mooon::sys::CSyscallException& ex)
 
 {
 
 			        fprintf(stderr, "%s\n", ex.str().c_str());
 
 			        return false;
 
 }
 
 			    catch (mooon::utils::CException& ex)
 
 {
 
 			        fprintf(stderr, "%s\n", ex.str().c_str());
 
 			        return false;
 
 }
 
 }
 
 
 
 			bool CMyApplication::on_run()
 
 {
 
 while (!_stop)
 
 {
 
 			        std::unique_lock<std::mutex> lock(_mutex);
 
 			        _cond.wait(lock); // 等待配置更新或收到退出指令
 
 if (_stop)
 
 {
 
 			            break;
 
 }
 
 
 
 // 以下实现省略了函数调用抛异常处理
 
 if (_zookeeper_session_expired)
 
 {
 
 // 如果会话过期,则需要重新建会话
 
 			            recreate_zookeeper_session();
 
 }
 
 if (_stop)
 
 {
 
 // 在建立会话过程中,可能收到了停止指令
 
 			            break;
 
 }
 
 if (_conf_changed)
 
 {
 
 			            _conf_changed = false;
 
 
 
 // 读取新的配置
 
 			            std::string zkdata;
 
 int n = get_zk_data(_conf_zkpath.c_str(), &zkdata, 4);
 
 if (n > 4)
 
 {
 
 // 这种情况下应触发告警
 
 // 配置数据的大小超出预期
 
 			                fprintf(stderr, "conf size error: %d\n", n);
 
 }
 
 else
 
 {
 
 // 这里可考虑加上优化:
 
 // 只有配置确实发生变化时才进行后续操作。
 
 const int num_workers = std::stoi(zkdata);
 
 
 
 if (num_workers < 1 || num_workers > 10)
 
 {
 
 // 这种情况下应触发告警
 
 			                    fprintf(stderr, "conf error: %d\n", num_workers);
 
 }
 
 else
 
 {
 
 			                    std::vector<std::thread> work_threads;
 
 			                    std::vector<std::shared_ptr<CWorker>> workers;
 
 
 
 // 新的配置生效,才停掉原来的,
 
 // 防止因为误操破坏配置,导致整个系统崩溃
 
 if (!start_workers(&work_threads, &workers, num_workers))
 
 {
 
 // 这种情况下应触发告警
 
 }
 
 else
 
 {
 
 			                        stop_workers(&_work_threads, &_workers);
 
 			                        _work_threads.swap(work_threads);
 
 			                        _workers.swap(workers);
 
 }
 
 }
 
 }
 
 }
 
 }
 
 
 
 			    return true;
 
 }
 
 
 
 			void CMyApplication::on_fini()
 
 {
 
 // 应用退出时被调用
 
 			    fprintf(stdout, "Application is about to quit\n");
 
 }
 
 
 
 // 接收到了SIGTERM信号
 
 			void CMyApplication::on_terminated()
 
 {
 
 // 一定要最先调用父类CMainHelper的on_terminated
 
 			    mooon::sys::CMainHelper::on_terminated();
 
 
 
 			    _stop = true;
 
 			    stop_workers(&_work_threads, &_workers);
 
 
 
 			    std::unique_lock<std::mutex> lock(_mutex);
 
 			    _cond.notify_one(); // 唤醒等待状态的CMyApplication::run
 
 }
 
 
 
 			bool CMyApplication::start_workers(
 
 			    std::vector<std::thread>* work_threads,
 
 			    std::vector<std::shared_ptr<CWorker>>* workers,
 
 int num_workers)
 
 {
 
 			    try
 
 {
 
 for (int i=0; i<num_workers; ++i)
 
 {
 
 			            std::shared_ptr<CWorker> worker(new CWorker(this, i));
 
 			            workers->push_back(worker);
 
 			            work_threads->push_back(std::thread(&CWorker::run, worker));
 
 }
 
 			        return true;
 
 }
 
 			    catch(const std::system_error& ex)
 
 {
 
 // 如果有部分启动功能应当回退,这里省略了
 
 			        fprintf(stderr, "(%d)%s\n", ex.code().value(), ex.what());
 
 			        return false;
 
 }
 
 }
 
 
 
 			void CMyApplication::stop_workers(
 
 			    std::vector<std::thread>* work_threads,
 
 			    std::vector<std::shared_ptr<CWorker>>* workers)
 
 {
 
 for (std::vector<std::shared_ptr<CWorker>>::size_type i=0; i<workers->size(); ++i)
 
 {
 
 (*workers)[i]->stop();
 
 if ((*work_threads)[i].joinable())
 
 (*work_threads)[i].join();
 
 }
 
 			    work_threads->clear();
 
 			    workers->clear();
 
 }
 
 
 
 			void CMyApplication::recreate_zookeeper_session()
 
 {
 
 			    unsigned int count = 0;
 
 
 
 while (!_stop)
 
 {
 
 			        try
 
 {
 
 			            recreate_session();
 
 			            _zookeeper_session_expired = false;
 
 }
 
 			        catch (mooon::utils::CException& ex)
 
 {
 
 			            std::this_thread::sleep_for(std::chrono::milliseconds(2000));
 
 
 
 if (0 == count++%30)
 
 {
 
 			                fprintf(stderr, "recreate zookeeper session failed: (count:%d)%s\n", count, ex.str().c_str());
 
 }
 
 }
 
 }
 
 }
 
 
 
 			void CMyApplication::on_zookeeper_session_connected(const char* path)
 
 {
 
 			    fprintf(stdout, "path=%s\n", path);
 
 }
 
 
 
 			void CMyApplication::on_zookeeper_session_connecting(const char* path)
 
 {
 
 			    fprintf(stdout, "path=%s\n", path);
 
 }
 
 
 
 			void CMyApplication::on_zookeeper_session_expired(const char *path)
 
 {
 
 			    fprintf(stdout, "path=%s\n", path);
 
 
 
 			    std::unique_lock<std::mutex> lock(_mutex);
 
 			    _zookeeper_session_expired = true;
 
 			    _cond.notify_one(); // 唤醒等待状态的CMyApplication::run
 
 }
 
 
 
 			void CMyApplication::on_zookeeper_session_event(int state, const char *path)
 
 {
 
 			    fprintf(stdout, "state=%d, path=%s\n", state, path);
 
 }
 
 
 
 			void CMyApplication::on_zookeeper_event(int type, int state, const char *path)
 
 {
 
 			    fprintf(stdout, "type=%d, state=%d, path=%s\n", type, state, path);
 
 
 
 if (ZOO_CONNECTED_STATE == state &&
 
 			        ZOO_CHANGED_EVENT == type &&
 
 			        0 == strcmp(path, _conf_zkpath.c_str()))
 
 {
 
 // 配置发生变化
 
 			        std::unique_lock<std::mutex> lock(_mutex);
 
 			        _conf_changed = true;
 
 			        _cond.notify_one(); // 唤醒等待状态的CMyApplication::run
 
 }
 
 }
 
 
 
 			CWorker::CWorker(CMyApplication* app, int index)
 
 : _app(app), _index(index), _stop(false)
 
 {
 
 }
 
 
 
 			void CWorker::run()
 
 {
 
 			    fprintf(stdout, "Worker[%d/%llu] \033[1;33mstarted\033[m\n", _index, get_current_thread_id());
 
 
 
 while (!_stop)
 
 {
 
 // 执行具体的业务逻辑操作,这里仅以sleep替代做示范
 
 			        std::this_thread::sleep_for(std::chrono::milliseconds(2000));
 
 			        fprintf(stdout, "[%s] Worker[\033[1;33m%d\033[m/%llu] is working ...\n",
 
 			            mooon::sys::CDatetimeUtils::get_current_time().c_str(),
 
 			            _index, get_current_thread_id());
 
 }
 
 
 
 			    fprintf(stdout, "Worker[%d/%llu] \033[1;33mstopped\033[m\n", _index, get_current_thread_id());
 
 } 
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018/11/12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档