前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >springboot集成PowerJob-openAPI和回调完整流程

springboot集成PowerJob-openAPI和回调完整流程

作者头像
用户5927264
发布2020-10-26 17:12:30
4.8K0
发布2020-10-26 17:12:30
举报
文章被收录于专栏:OSChinaOSChina

参考官网:https://www.yuque.com/powerjob/guidence/olgyf0

https://github.com/KFCFans/PowerJob

推荐使用3.3.0版本

导入需要的jar包,servie也需要使用3.3.0版本

代码语言:javascript
复制
<dependency>
            <groupId>com.github.kfcfans</groupId>
            <artifactId>powerjob-client</artifactId>
            <version>3.3.0</version>
        </dependency>

        <dependency>
            <groupId>com.github.kfcfans</groupId>
            <artifactId>powerjob-worker-spring-boot-starter</artifactId>
            <version>3.3.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>${jackson.version}</version>
        </dependency>

<dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp</artifactId>
            <version>3.14.9</version>
        </dependency>

一. 使用openAPI 开发一次性的任务,保证任务只调度一次就好

  1. 编写yml配置文件

后台访问地址:http://192.168.2.11:7700/

代码语言:javascript
复制
powerjob:
  worker:
    akka-port: 27777   # akka 工作端口,可选,默认 27777
    app-name: sass-openapi     # 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称
    server-address: 192.168.2.11:7700  # 调度服务器地址,IP:Port 或 域名,多值逗号分隔
    password: shiye9527  # 密码
    store-strategy: disk    # 持久化方式,可选,默认 disk

2. 编写config配置文件 PowerJobConfig

代码语言:javascript
复制
package com.un.common.utils.job;

import com.github.kfcfans.powerjob.client.OhMyClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author shiye
 * @create 2020-10-09 16:33
 */
@Configuration
public class PowerJobConfig {

    /**
     * # akka 工作端口,可选,默认 27777
     */
    @Value("${powerjob.worker.akka-port}")
    private Integer akkaPort;

    /**
     * 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称
     */
    @Value("${powerjob.worker.app-name}")
    private String appName;

    /**
     * 调度服务器地址,IP:Port 或 域名,多值逗号分隔
     */
    @Value("${powerjob.worker.server-address}")
    private String serverAddress;

    @Value("${powerjob.worker.password}")
    private String password;

    /**
     * 持久化方式,可选,默认 disk
     */
    @Value("${powerjob.worker.store-strategy}")
    private String storeStrategy;


    @Bean
    public OhMyClient getOhMyClient() {
        return new OhMyClient(serverAddress, appName, password);
    }

}

3. 编写util工具类 PowerJobUtil

代码语言:javascript
复制
package com.un.common.utils.job;

import com.github.kfcfans.powerjob.client.OhMyClient;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.ProcessorType;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.un.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * @author shiye
 * @create 2020-10-09 16:55
 */
@Configuration
public class PowerJobUtil {

    protected final Logger logger = LoggerFactory.getLogger(PowerJobUtil.class);

    @Autowired
    private OhMyClient ohMyClient;

    @Bean
    public PowerJobUtil getPowerJobUtil() {
        return new PowerJobUtil();
    }

    /**
     * 创建一个单核一年只执行一次得任务
     *
     * @param StartTime     任务开始时间
     * @param params        任务参数
     * @param processorInfo 回调得类全类名如:com.un.framework.task.SysNoticeScheduProcessor
     * @param jobId         任务id:如果为空就是创建任务
     * @return 返回结果
     * @throws Exception
     */
    public ResultDTO<Long> saveJob(Date StartTime, String params, String processorInfo, String jobId, String jobName, String jobDescription) throws Exception {
        logger.info("saveJob .......{},{},{},{}", StartTime, params, processorInfo, jobId);

        SaveJobInfoRequest request = new SaveJobInfoRequest();
        if (StringUtils.isNotEmpty(jobId)) {
            request.setId(Long.valueOf(jobId));
        }
        //任务名称
        request.setJobName(jobName);
        //任务描述
        request.setJobDescription(jobDescription);
        //任务参数,Processor#process方法入参TaskContext对象的jobParams字段
        request.setJobParams(params);
        //时间表达式类型,枚举值
        request.setTimeExpressionType(TimeExpressionType.CRON);
        //时间表达式,填写类型由timeExpressionType决定,比如CRON需要填写CRON表达式
        request.setTimeExpression(getCron(StartTime));
        //执行类型,枚举值
        request.setExecuteType(ExecuteType.STANDALONE);
        //处理器类型,枚举值
        request.setProcessorType(ProcessorType.EMBEDDED_JAVA);
        //处理器参数,填写类型由processorType决定,如Java处理器需要填写全限定类名,如:com.github.kfcfans.oms.processors.demo.MapReduceProcessorDemo
        request.setProcessorInfo(processorInfo);
        //最大实例数,该任务同时执行的数量(任务和实例就像是类和对象的关系,任务被调度执行后被称为实例)
        request.setMaxInstanceNum(1);
        //单机线程并发数,表示该实例执行过程中每个Worker使用的线程数量
        request.setConcurrency(1);
        //任务实例运行时间限制,0代表无任何限制,超时会被打断并判定为执行失败
        request.setInstanceTimeLimit(0l);
        //任务实例重试次数,整个任务失败时重试,代价大,不推荐使用
        request.setMaxInstanceNum(0);
        //Task重试次数,每个子Task失败后单独重试,代价小,推荐使用
        request.setTaskRetryNum(2);
        //最小可用CPU核心数,CPU可用核心数小于该值的Worker将不会执行该任务,0代表无任何限制
        request.setMinCpuCores(0);
        //最小内存大小(GB),可用内存小于该值的Worker将不会执行该任务,0代表无任何限制
        request.setMinMemorySpace(0);
        //最小磁盘大小(GB),可用磁盘空间小于该值的Worker将不会执行该任务,0代表无任何限制
        request.setMinDiskSpace(0);
        //指定机器执行,设置该参数后只有列表中的机器允许执行该任务,空代表不指定机器
        request.setDesignatedWorkers(null);
        //最大执行机器数量,限定调动执行的机器数量,0代表无限制
        request.setMaxWorkerCount(1);
        //是否启用该任务,未启用的任务不会被调度
        request.setEnable(true);

        ResultDTO<Long> resultDTO = ohMyClient.saveJob(request);
        return resultDTO;
    }

    /**
     * 禁用某个任务
     *
     * @param jobId
     * @return
     * @throws Exception
     */
    public ResultDTO<Void> disableJob(Long jobId) {
        logger.info("disableJob .......{}", jobId);
        try {
            TimeUnit.MINUTES.sleep(5);
            return ohMyClient.disableJob(jobId);
        } catch (Exception e) {
            logger.error("disableJob  error.......{},{}", e, jobId);
        }
        return null;
    }

    /**
     * 删除某个任务
     *
     * @param jobId
     * @return
     * @throws Exception
     */
    public ResultDTO<Void> deleteJob(Long jobId) throws Exception {
        return ohMyClient.deleteJob(jobId);
    }

    /**
     * 通过输入指定日期时间生成cron表达式
     *
     * @param date
     * @return cron表达式
     */
    public String getCron(Date date) {
        logger.info("create cron 接收到得时间", date.toString());
        String dateFormat = "ss mm HH dd MM ? yyyy-yyyy";
        SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
        String formatTimeStr = null;
        if (date != null) {
            formatTimeStr = sdf.format(date);
        }
        System.out.println("当前时间得CRON:" + formatTimeStr);
        return formatTimeStr;
    }

}

4. 在业务代码中集成进去

代码语言:javascript
复制
Date startTimeCron = new Date(Long.parseLong(String.valueOf(startTime * 1000)));
ResultDTO<Long> resultDTO = powerJobUtil.saveJob(startTimeCron, pBasActivity.getId(), "com.un.framework.task.ActivityProcessorHandler", createActiveVo1.getStartJobId(),"活动定时任务","活动定时-开始任务");
if (resultDTO.isSuccess()) {
   pBasActivity.setStartJobId(resultDTO.getData() + "");
} else {
   return AjaxResult.error("创建任务失败,请联系管理员");
}

二. 消费端,负责处理任务具体的调用

代码语言:javascript
复制
package com.un.framework.task;

import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.un.common.utils.job.PowerJobUtil;
import com.un.framework.manager.AsyncManager;
import com.un.framework.manager.factory.AsyncFactory;
import com.un.project.system.domain.dto.TaskNoticeDto;
import com.un.project.system.domain.vo.UpdateNoticeVo;
import com.un.project.system.mapper.PBasNoticeMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

/**
 * @author shiye
 * @create 2020-09-30 9:42
 */
@Component
public class NoticeProcessorHandler implements BasicProcessor {

    protected final Logger logger = LoggerFactory.getLogger(NoticeScheduTask.class);

    @Autowired
    private PowerJobUtil powerJobUtil;

    @Autowired
    private PBasNoticeMapper noticeMapper;

    public static Lock lock = new ReentrantLock();

    @Override
    public ProcessResult process(TaskContext context) throws Exception {

        ProcessResult result = new ProcessResult(true, "ok");
        try {
            lock.lock();
            String params = context.getJobParams();
            logger.info("Start a notice job task......" + params);


           //具体得物业处理


            logger.info("end a notice job task......{},{}", noticeIds1, noticeIds2);

        } catch (Exception e) {
            logger.error("error a notice job task......{},{}", e, context.toString());
        } finally {
            lock.unlock();
        }
        return result;
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档