房价网是怎么使用分布式作业框架elastic-job

Elastic-Job是什么?

Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。

Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务;Elastic-Job-Cloud采用自研Mesos Framework的解决方案,额外提供资源治理、应用分发以及进程隔离等功能。

官网地址:http://elasticjob.io/

Github:https://github.com/elasticjob/elastic-job

为什么要使用Elastic-Job

目前我们公司用的是基于Linux Crontab的定时任务执行器。

存在如下问题:

  • 无法集中管理任务
  • 不能水平扩展
  • 无可视化界面操作
  • 存在单点故障

除了Linux Crontab在java这块的方案还有 Quartz,但 Quartz缺少分布式并行调度的功能。

存在的问题也很明显:

  • 当我的项目是一个单体应用时,在里面基于Quartz起一个定时任务,可以很愉快的运行
  • 当我的项目做了负载,扩充到3台节点时,3个节点上的任务会同时执行,数据乱了
  • 同时执行要保证数据没问题需要引入分布式锁来调度,难度增大

怎么解决?

1.自研框架

这种情况下可能需要自己去开发一个能够满足公司业务需求的调度框架,成本较高,不推荐

之前我也有想过要自己写一个,思路有了,就是还没开始,调度框架只要是调度问题,像Elastic-Job就做的非常好,它把分片的规则让你自己定义,然后根据你定的片的数据给你调度下,至于每个节点处理什么数据你自己去控制。

如果说部采用这种方式,也不去写数据的分发,那么我觉得最简单的办法就是用消息队列来实现了。

采用zookeeper来做调度,存储任务数据,定义一个通用的接口,分成2部分,如下:

public interface Job {
    void read();
    void process(Object data);
}

然后使用者通过实现上面的接口来读取需要处理的数据,在process中处理分发过来的数据

至于分发的话一个任务可以通过注解来标记使用一个队列,也可以使用通用的,这样就可以实现多个消费者同时消费了,就算其中一个挂掉也不影响整个任务,也不用考虑失效转移了。

但是要做控制的是read方法,必须只有一个节点执行,不然数据就分发重复了。

上面只是提供一个简单的思路,当然有web页面管理任务,也可以手动执行任务等等。

2.选择开源方案

TBSchedule:阿里早期开源的分布式任务调度系统。代码略陈旧,使用timer而非线程池执行任务调度。众所周知,timer在处理异常状况时是有缺陷的。而且TBSchedule作业类型较为单一,只能是获取/处理数据一种模式。还有就是文档缺失比较严重。

Spring Batch: Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的Spring框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch可以提供大量的,可重复的数据处理功能,包括日志记录/跟踪,事务管理,作业处理统计工作重新启动、跳过,和资源管理等重要功能。

Elastic-Job:国内开源产品,中文文档,入门快速,使用简单,功能齐全,社区活跃,由当当网架构师张亮主导,目前在开源方面投入了比较多的时间。

为什么选择Elastic-Job?

  • 分布式调度协调
  • 弹性扩容缩容
  • 失效转移
  • 错过执行作业重触发
  • 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
  • 自诊断并修复分布式不稳定造成的问题
  • 支持并行调度
  • 支持作业生命周期操作
  • 丰富的作业类型
  • Spring整合以及命名空间提供
  • 运维平台

作业类型介绍

Simple:简单作业,常用, 意为简单实现,未经任何封装的类型。需实现SimpleJob接口。该接口仅提供单一方法用于覆盖,此方法将定时执行。与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。

public class MyElasticJob implements SimpleJob {
    
    @Override
    public void execute(ShardingContext context) {
        switch (context.getShardingItem()) {
            case 0: 
                // do something by sharding item 0
                break;
            case 1: 
                // do something by sharding item 1
                break;
            case 2: 
                // do something by sharding item 2
                break;
            // case n: ...
        }
    }
    
}

DataFlow:Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。

public class MyElasticJob implements DataflowJob<Foo> {
    
    @Override
    public List<Foo> fetchData(ShardingContext context) {
        switch (context.getShardingItem()) {
            case 0: 
                List<Foo> data = // get data from database by sharding item 0
                return data;
            case 1: 
                List<Foo> data = // get data from database by sharding item 1
                return data;
            case 2: 
                List<Foo> data = // get data from database by sharding item 2
                return data;
            // case n: ...
        }
    }
    
    @Override
    public void processData(ShardingContext shardingContext, List<Foo> data) {
        // process data
        // ...
    }
    
}

Script:Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。只需通过控制台或代码配置scriptCommandLine即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。

其实我建议增加一种任务类型,就是流水式任务,为此我还特意提了一个issues:

https://github.com/elasticjob/elastic-job/issues/432

在特定的业务需求下,A任务执行完之后,需要执行B任务,以此类推,这种具有依赖性的流水式的任务。
在目前可以将这些任务合在一起,通过代码调用的方式来达到效果。
但我希望能增加这样一个功能,比如加一个配置,job-after="com.xxx.job.XXXJob" 
在执行完这个任务之后,自动调用另一个任务BB,BB任务只需要配置任务信息,
把cron去掉就可以,因为BB是依靠别的任务触发执行的。
当然这些任务必须在同一个zk的命名空间下,如果能支持夸命名空间就更好了
这样就能达到,流水式的任务操作了,并且每个任务可以用不同的分片key

开始使用

1.关于框架怎么搭建,怎么配置就不做讲解了,官网文档肯定比我写的好,一般开源框架都有demo,大家下载下来导入IDE中即可运行。

demo地址:https://github.com/elasticjob/elastic-job/tree/master/elastic-job-example

2.介绍下使用中的一些经验

  • 建议按产品来划分任务,一个产品对应一个任务的项目,当团队比较大的时候可能一个小组负责一个产品,这样就不会跟别的混在一起了
  • 任务的描述一定要写清楚,干嘛用的,在配置任务中有个描述的配置,填写清楚
/**
 * 用户维度统计任务<br>统计出用户的房产,置换,贷款等信息
 * @author yinjihuan
 */
public class UserStatJob implements SimpleJob {
    private Logger logger = LoggerFactory.getLogger(UserStatJob.class);
    
    @Autowired
    private EnterpriseProductUserService enterpriseProductUserService;
    
    @Autowired
    private UserStatService userStatService;
   
    @Autowired
    private HouseInfoService houseInfoService;
   
    @Autowired
    private HouseSubstitutionService houseSubstitutionService;
    
    @Autowired
    private LoanApplyService loanApplyService;
   
    @Override
    public void execute(ShardingContext shardingContext) {
        logger.info("开始执行UserStatJob");
        long total = enterpriseProductUserService.queryCount();
        int pages = PageBean.calcPages(total, 1000);
        for (int i = 1; i <= pages; i++) {
            List<EnterpriseProductUser> users = enterpriseProductUserService.queryByPage(i, 1000);
            for (EnterpriseProductUser user : users) {
                try {
                    processStat(user);
                } catch (Exception e) {
                    logger.error("用户维度统计任务异常", e);
                    DingDingMessageUtil.sendTextMessage("用户维度统计任务异常:" + e.getMessage());
                }
            }
        }
        logger.info("UserStatJob执行结束");
    }
    
    private void processStat(EnterpriseProductUser user) {
        UserStat stat = userStatService.getByUid(user.getEid(), user.getUid());
        Long eid = user.getEid();
        String uid = user.getUid();
        if (stat == null) {
            stat = new UserStat();
            stat.setEid(eid);
            stat.setUid(uid);
            stat.setUserAddTime(user.getAddTime());
            stat.setCity(user.getCity());
            stat.setRegion(user.getRegion());
        }
        stat.setHouseCount(houseInfoService.queryCountByEidAndUid(eid, uid));
        stat.setHousePrice(houseInfoService.querySumMoneyByEidAndUid(eid, uid));
        stat.setSubstitutionCount(houseSubstitutionService.queryCount(eid, uid));
        stat.setSubstitutionMaxPrice(houseSubstitutionService.queryMaxBudget(eid, uid));
        stat.setLoanEvalCount(loanApplyService.queryUserCountByType(eid, uid, 2));
        stat.setLoanEvalMaxPrice(loanApplyService.queryMaxEvalMoney(eid, uid));
        stat.setLoanCount(loanApplyService.queryUserCountByType(eid, uid, 1));
        stat.setModifyDate(new Date());
        userStatService.save(stat);
    }
}
 <!-- 用户统计任务 每天1点10分执行 -->
 <job:simple id="userStatJob" class="com.fangjia.job.fsh.job.UserStatJob" registry-center-ref="regCenter"
         sharding-total-count="1" cron="0 10 1 * * ?" sharding-item-parameters=""
         failover="true" description="【房生活】用户维度统计任务,统计出用户的房产,置换,贷款等信息 UserStatJob"
         overwrite="true" event-trace-rdb-data-source="elasticJobLog" job-exception-handler="com.fangjia.job.fsh.handler.CustomJobExceptionHandler">
          <job:listener class="com.fangjia.job.fsh.listener.MessageElasticJobListener"></job:listener>
 </job:simple>
  • 为每个任务配置一个统一的监听器,来对任务的执行,结束进行通知,可以是短信,邮件或者别的,我这边用的是钉钉的机器人来发消息通知到钉钉
/**
 * 作业监听器, 执行前后发送钉钉消息进行通知
 * @author yinjihuan
 */
public class MessageElasticJobListener implements ElasticJobListener {
    
    @Override
    public void beforeJobExecuted(ShardingContexts shardingContexts) {
        String date = DateUtils.date2Str(new Date());
        String msg = date + " 【FSH-" + shardingContexts.getJobName() + "】任务开始执行====" + JsonUtils.toJson(shardingContexts);
        DingDingMessageUtil.sendTextMessage(msg);
    }
    
    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {
        String date = DateUtils.date2Str(new Date());
        String msg = date + " 【FSH-" + shardingContexts.getJobName() + "】任务执行结束====" + JsonUtils.toJson(shardingContexts);
        DingDingMessageUtil.sendTextMessage(msg);
    }
    
}
  • 可以在每个任务类上定义一个注解,注解用来标识这个任务是谁开发的,然后对应的钉钉消息就发送给谁,我个人建议还是建一个群,然后大家都在里面,因为如果单独发给一个开发人员,除非他的主动性很高,不然也没什么用,我个人建议发在群里,这样领导看见了就会说那个谁谁谁,你的任务报错了,去查下原因。我这边是统一发的,没有定义注解。
  • 任务的异常处理,可以在任务中对异常进行处理,除了记录日志,也用统一封装好的发送钉钉消息来进行通知,实时知道任务是否有异常,可以查看我上面的代码。
  • 还有一种是没捕获的异常,怎么通知到群里,可以自定义异常处理类来实现, 通过配置job-exception-handler="com.fangjia.job.fsh.handler.CustomJobExceptionHandler"
/**
 * 自定义异常处理,在任务异常时使用钉钉发送通知
 * @author yinjihuan
 */
public class CustomJobExceptionHandler implements JobExceptionHandler {
    private Logger logger = LoggerFactory.getLogger(CustomJobExceptionHandler.class);
    
    @Override
    public void handleException(String jobName, Throwable cause) {
        logger.error(String.format("Job '%s' exception occur in job processing", jobName), cause);
        DingDingMessageUtil.sendTextMessage("【"+jobName+"】任务异常。" + cause.getMessage());
    }
    
}
  • 可以通过监听job_name\instances\job_instance_id节点是否存在来判断作业节点是否挂掉,该节点为临时节点,如果作业服务器下线,该节点将删除。当然也可以通过其他的工具来进行监控。
  • 任务的编写尽量考虑到水平扩展性,像我上面贴的那个列子其实就没考虑到,只是一个单纯的任务,因为我没有用到shardingParameter来处理对应的片的数据,这边其实建议大家考虑下,如果任务时间短。处理的数据少,可以写成我这样。如果能够预计到未来有大量数据需要处理,而且时间很长的话最好配置下分片的规则,并且将代码写成按分片来处理,这样到了后面就直接修改配置,增加下节点就行了。

推荐相关阅读:

  • 《Elastic-Job-Spring-Boot-Starter简化你的任务配置》
  • 《Elastic-Job动态添加任务》
  • 《实战分布式任务调度框架Elastic Job》

原文发布于微信公众号 - 猿天地(cxytiandi)

原文发表时间:2018-05-07

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏架构之美

五分钟学会Elasticsearch查询代理设计

1454
来自专栏生信技能树

在ubuntu使用apt install的fastqc是有bug的

所以我就去了我的生物信息学常见1000个软件的安装代码:https://www.jianshu.com/p/ae28e8e3e9f5 找到了fastqc软件下载...

1322
来自专栏小灰灰

Quick-Task 动态脚本支持框架之结构设计篇

文章链接:https://liuyueyi.github.io/hexblog/2018/07/23/180723-Quick-Task-动态脚本支持框架之结构...

1073
来自专栏向治洪

Hibernate之初体验

在开始学Hibernate之前,一直就有人说:Hibernate并不难,无非是对JDBC进一步封装。一句不难,难道是真的不难还是眼高手低?       如果...

1997
来自专栏liuchengxu

可移植的 Makefile 教程

在我写 Makefile 的头 10 年里,我养成了一个非常不好的习惯 -- 完全严格使用 GNU Make 的扩展名。过去我并不知道, GNU Make 与...

861
来自专栏Linyb极客之路

我们必须要知道的RESTful服务最佳实践

REST:Representational State Transfer(表象层状态转变),如果没听说过REST,你一定以为是rest这个单词,刚开始我也是...

1263
来自专栏张善友的专栏

Windows Server AppFabric Caching

这套 AppFabric Caching 比我用过的 memcached 复杂多了,MSDN有一篇文章进行介绍Introduction to Caching w...

2179
来自专栏阿杜的世界

Java Web技术经验总结(一)

1012
来自专栏代码散人

如何用kotlin开发同时支持iOS和Android的库

虽说kotlin-native可以支持链接到c,java,objective-c等语言,甚至可以进行原生开发,但是在使用的过程中并不友好,配置繁琐且api相对生...

1492
来自专栏吉浦迅科技

DAY40:阅读Memory Fence Functions

The CUDA programming model assumes a device with a weakly-ordered memory model, ...

814

扫码关注云+社区

领取腾讯云代金券