前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于 ZooKeeper 实现爬虫集群的监控

基于 ZooKeeper 实现爬虫集群的监控

作者头像
fengzhizi715
发布2019-05-28 18:39:38
5010
发布2019-05-28 18:39:38
举报

ZooKeeper

zookeeper.png

ZooKeeper 是一个开源的分布式协调服务,ZooKeeper框架最初是在“Yahoo!"上构建的,用于以简单而稳健的方式访问他们的应用程序。 后来,Apache ZooKeeper成为Hadoop,HBase和其他分布式框架使用的有组织服务的标准。 例如,Apache HBase使用ZooKeeper跟踪分布式数据的状态。ZooKeeper 的设计目标是将那些复杂且容易出错的分布式一致性服务封装起来,构成一个高效可靠的原语集,并以一系列简单易用的接口提供给用户使用。

ZooKeeper 通常用于:命名服务、配置管理、集群管理、分布式协调/通知、分布式锁和分布式队列等等。

各个爬虫的节点通过注册到 ZooKeeper 从而实现爬虫集群的管理。NetDiscovery 正是借助了 ZooKeeper 的特性来监控爬虫集群。

NetDiscovery 是一款基于 Vert.x、RxJava 2 等框架实现的通用爬虫框架。它包含了丰富的特性

爬虫集群的监控

NetDiscovery 包含了 Spider 和 SpiderEngine。 Spider 用于实现爬虫的业务逻辑,Spider 可以添加到 SpiderEngine,由 SpiderEngine 来管理各个 Spider 的生命周期。

但是 SpiderEngine 部署到每一个节点之后,SpiderEngine 如何进行监控和管理呢?

可以将 SpiderEngine 在运行时,先注册到 ZooKeeper。(需要事先在 ZooKeeper 集群创建 /netdiscovery 节点)

代码语言:javascript
复制
    /**
     * 启动SpiderEngine中所有的spider,让每个爬虫并行运行起来。
     *
     */
    public void run() {

        if (Preconditions.isNotBlank(spiders)) {

            registerZK();
            ......  
        }
    }

    /**
     * 将当前 SpiderEngine 注册到 zookeeper 指定的目录 /netdiscovery 下
     */
    private void registerZK() {

        if (Preconditions.isNotBlank(zkStr) && useZk) {
            log.info("zkStr: {}", zkStr);

            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
            CuratorFramework client = CuratorFrameworkFactory.newClient(zkStr, retryPolicy);
            client.start();
            try {
                String ipAddr = InetAddress.getLocalHost().getHostAddress() + "-" + defaultHttpdPort + "-" + System.currentTimeMillis();
                String nowSpiderEngineZNode = "/netdiscovery/" + ipAddr;
                client.create().withMode(CreateMode.EPHEMERAL).forPath(nowSpiderEngineZNode,nowSpiderEngineZNode.getBytes());
            } catch (UnknownHostException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

另外,需要使用 NetDiscovery Monitor 的 CuratorManager 类。 它借助 Zookeeper 的 Watcher 机制,监听已经注册到 /netdiscovery 这个父 zNode 下的各个子 zNode ,也就是各个 SpiderEngine。

Watcher机制是指 ZooKeeper 客户端向 ZooKeeper 服务器注册 Watcher 的同时,会将 Watcher 对象存储在客户端的 WatchManager 中。ZooKeeper 服务器触发 Watcher 事件后,会向客户端发送通知,客户端线程从 WatchManager 中回调 Watcher 执行相应的功能。

代码语言:javascript
复制
    /**
     * 当前所监控的父的 zNode 下若是子 zNode 发生了变化:新增,删除,修改
     * <p>
     * 下述方法都会触发执行
     *
     * @param event
     */
    @Override
    public void process(WatchedEvent event) {

        List<String> newZodeInfos = null;
        try {
            newZodeInfos = client.getChildren().usingWatcher(this).forPath("/netdiscovery");
            //根据初始化容器的长度与最新的容器的长度进行比对,就可以推导出当前 SpiderEngine 集群的状态:新增,宕机/下线,变更...
            //哪个容器中元素多,就循环遍历哪个容器。
            if (Preconditions.isNotBlank(newZodeInfos)) {
                if (newZodeInfos.size()>allZnodes.size()){
                    //明确显示新增了哪个 SpiderEngine 节点
                    for (String nowZNode:newZodeInfos) {
                        if (!allZnodes.contains(nowZNode)){
                            log.info("新增 SpiderEngine 节点{}", nowZNode);
                        }
                    }
                }else if (newZodeInfos.size()<allZnodes.size()){
                    // 宕机/下线
                    // 明确显示哪个 SpiderEngine 节点宕机/下线了
                    for (String initZNode : allZnodes) {
                        if (!newZodeInfos.contains(initZNode)) {
                            log.info("SpiderEngine 节点【{}】下线了!", initZNode);

                            // 如果有下线的处理,则处理(例如发邮件、短信等)
                            if (serverOfflineProcess!=null) {
                                serverOfflineProcess.process();
                            }
                        }
                    }
                }else {
                    // SpiderEngine 集群正常运行;
                    // 宕机/下线了,当时马上重启了,总的爬虫未发生变化
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

        allZnodes = newZodeInfos;
    }

所以需要单独运行一个进程,例如:

代码语言:javascript
复制
public class TestCuratorManager {

    public static void main(String[] args) {

        CuratorManager curatorManager = new CuratorManager();
        curatorManager.start();
    }
}

下图反映了 ZooKeeper 如何监控 SpiderEngine 集群。

SpiderEngine+ZK.png

总结

爬虫框架 github 地址:https://github.com/fengzhizi715/NetDiscovery

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019.05.26 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ZooKeeper
  • 爬虫集群的监控
  • 总结
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档