首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >springboot项目集成大数据第三方dolphinscheduler调度器 执行/停止任务

springboot项目集成大数据第三方dolphinscheduler调度器 执行/停止任务

原创
作者头像
刘大猫
发布2025-07-17 20:17:02
发布2025-07-17 20:17:02
2550
举报
文章被收录于专栏:JAVA相关JAVA相关

dolphinscheduler调度器接入注意事项等信息可参考我的上一篇博客进行了解,地址在这里 ->

@TOC

一、功能清单

二、执行/停止任务

说明:

大数据平台执行可拖拽spark任务实际实行的是dolphinscheduler调度器中项目下工作流下的某一节点而已,不是执行整个工作流。

共用的依赖

代码语言:java
复制
<!--httpclient-->

<dependency>

    <groupId>commons-httpclient</groupId>

    <artifactId>commons-httpclient</artifactId>

    <version>3.1</version>

</dependency>

共用配置文件

代码语言:java
复制
dolphinscheduler.token=xxx

dolphinscheduler.address=http://IP:12345

共用代码

代码语言:java
复制
@Autowired

private RestTemplate restTemplate;

@Value("${dolphinscheduler.token}")

String token;

@Value("${dolphinscheduler.address}")

String address;

public static final int ZERO = 0;

public static final int SUCCESS = 200;

@Autowired

private DragSparkTaskService dragSparkTaskService;

@Value("${spark.main.class}")

String mainClass;

public static final String CREATE = "create";

public static final String UPDATE = "update";

public static final String ADD = "add";

public static final String DELETE = "delete";

public static final String ONLINE = "ONLINE";

public static final String OFFLINE = "OFFLINE";

public static final int ONE\_THOUSAND\_AND\_FIVE\_HUNDRED = 1500;

public static final int SIX = 6;

public static final int EIGHTY = 80;

public static final int THREE = 3;

@Autowired

private StringRedisTemplate redisTemplate;

@Value("${drag.task.state}")

String dragTaskState;

@Autowired

private DragSparkTaskMapper dragSparkTaskMapper;

1.执行任务

代码:

代码语言:java
复制
/\*\*

     \* 运行流程实例

     \* @param projectName 项目名称

     \* @param request request

     \* @param dragSparkTaskId 任务ID

     \* @author liudz

     \* @date 2021/5/7

     \* @return 执行结果

     \*\*/

    @GetMapping("/project/process/start")

    public DolphinschedulerResponse startProcessInstance(

            @RequestParam("projectName") String projectName, @RequestParam("dragSparkTaskId") Integer dragSparkTaskId,

            HttpServletRequest request) {

        try {

            Long userId = Long.valueOf(request.getUserPrincipal().getName());

            DolphinschedulerResponse processInfoList = getUserProcess(projectName);

            if (processInfoList.getCode() != ZERO) {

                return processInfoList;

            }

            JSONObject processJson = new JSONObject();

            log.info("--(1)getUserProcess--success:{}", processInfoList);

            List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();

            for (Map<String, Object> map : list) {

                if (map.get("name").equals(userId + "-dragSparkTask")) {

                    processJson.fluentPutAll(map);

                }

            }

            if (processJson.getString(DictionaryEnum.RELEASE\_STATE.getFiledString()).equals(OFFLINE)) {

                releaseProcessDefinition(projectName, userId + "-dragSparkTask",

                        processJson.getInteger("id"), 1);

                log.info("--(2)releaseProcessDefinition--ONLINE--success");

            }

            String postURL = address + "/dolphinscheduler/projects/" + URLEncoder.encode(projectName, "utf-8")

                   + "/executors/start-process-instance";

            PostMethod postMethod = new PostMethod(postURL);

            postMethod.setRequestHeader("Content-Type",

                    "application/x-www-form-urlencoded;charset=utf-8");

            postMethod.setRequestHeader("token", token);

            // 参数设置,需要注意的就是里边不能传NULL,要传空字符串

            NameValuePair[] data = packageNameValuePair(processJson, dragSparkTaskId);

            postMethod.setRequestBody(data);

            org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();

            httpClient.executeMethod(postMethod);

            JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());

            log.info("--(2)startProcessInstance--result:{}", result);

            if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {

                return DolphinschedulerResponse.error(result.getInteger("code"), result.getString("msg"));

            }

            redisTemplate.opsForValue().set(dragTaskState + dragSparkTaskId, "1", 1, TimeUnit.HOURS);

            DragSparkTask drag = new DragSparkTask();

            drag.setId(Long.valueOf(dragSparkTaskId));

            drag.setState("1");

            drag.setCreateId(userId);

            dragSparkTaskMapper.updateDragSparkTask(drag);

            log.info("--(3)----updateDragSparkTask--success!");

        } catch (Exception e) {

            log.info("请求异常:{}", e);

        }

        return DolphinschedulerResponse.success();

    }
代码语言:java
复制
/\*\*

     \*  packageNameValuePair封装参数

     \* @param processJson 工作流json

     \* @param dragSparkTaskId 任务ID

     \* @author liudz

     \* @date 2021/5/14

     \* @return NameValuePair

     \*\*/

    public NameValuePair[] packageNameValuePair(JSONObject processJson, Integer dragSparkTaskId) {

        NameValuePair[] data = {

                new NameValuePair("failureStrategy", "CONTINUE"),

                new NameValuePair("processDefinitionId", processJson.getString("id")),

                new NameValuePair("processInstancePriority", "MEDIUM"),

                new NameValuePair("warningGroupId", "0"),

                new NameValuePair("warningType", "NONE"),

                new NameValuePair("runMode", "RUN\_MODE\_SERIAL"),

                new NameValuePair("startNodeList", "spark-" + dragSparkTaskId),

                new NameValuePair("taskDependType", "TASK\_POST"),

                new NameValuePair("workerGroup", "default")};

        return data;

    }
代码语言:java
复制
/\*\*

     \*  解析节点和线,拼接nodesArray

     \* @param jsonObject 模型task

     \* @author liudz

     \* @date 2020/12/10

     \* @return 填充后的nodesArray

     \*\*/

    public JSONArray parseLineAndNode(JSONObject jsonObject) {

        JSONArray edgesArray = jsonObject.getJSONArray("edges");

        JSONArray nodesArray = jsonObject.getJSONArray("nodes");

        for (int i = 0; i < edgesArray.size(); i++) {

            JSONObject edgeJson = edgesArray.getJSONObject(i);

            for (int j = 0; j < nodesArray.size(); j++) {

                JSONObject nodeJson = nodesArray.getJSONObject(j);

                String nodeSourceId = edgeJson.getString("source");

                String nodeTargetId = edgeJson.getString("target");

if ("breakUp".equals(nodeJson.getString("modelType")) && nodeSourceId.equals(nodeJson.getString("id"))) {

                    double fraction = edgeJson.getJSONObject("config").getDoubleValue("fraction");

                    if (fraction > Double.parseDouble("0.5")) {

                        nodeJson.getJSONObject("config").put("fraction", fraction);

                    }

                    if (nodeJson.getJSONArray("firstOutputs").size() == 0) {

                        String[] outputsArr = JavaTools.arrayInsert(nodeJson.getJSONArray("firstOutputs").

                                toArray(new String[nodeJson.getJSONArray("firstOutputs").size()]), nodeTargetId);

                        nodeJson.put("firstOutputs", outputsArr);

                        continue;

                    } else {

                        String[] outputsArr = JavaTools.arrayInsert(nodeJson.getJSONArray("secondOutputs").

                                toArray(new String[nodeJson.getJSONArray("secondOutputs").size()]), nodeTargetId);

                        nodeJson.put("secondOutputs", outputsArr);

                        continue;

                    }

                } else {

                    if (nodeSourceId.equals(nodeJson.getString("id"))) {

                        String[] outputsArr = JavaTools.arrayInsert(nodeJson.getJSONArray("outputs").

                                toArray(new String[nodeJson.getJSONArray("outputs").size()]), nodeTargetId);

                        nodeJson.put("outputs", outputsArr);

                        continue;

                    }

                    if (nodeTargetId.equals(nodeJson.getString("id"))) {

                        String[] inputsArr = JavaTools.arrayInsert(nodeJson.getJSONArray("inputs").

                                toArray(new String[nodeJson.getJSONArray("inputs").size()]), nodeSourceId);

                        nodeJson.put("inputs", inputsArr);

                        continue;

                    }

                }

            }

        }

        for (int j = 0; j < nodesArray.size(); j++) {

            JSONObject nodeJson = nodesArray.getJSONObject(j);

            String label = nodeJson.getString("modelType") + "\_" + nodeJson.getString("id");

            String name = nodeJson.getString("label") + "\_" + nodeJson.getString("id");

            nodeJson.put("name", name);

            nodeJson.put("label", label);

        }

        return nodesArray;

    }

2.停止任务

代码:

代码语言:java
复制
/\*\*

     \* stopProcessSparkTask

     \* @param id id

     \* @param executeType executeType

     \* @param projectName 项目名称

     \* @return Response

     \* @author: liudz

     \* @author: lty update 2020/5/27

     \* @date: 2020/4/28 10:31

     \*/

    @GetMapping(value = "/project/execute/{projectName}/{id}/{executeType}")

    public Response<String> stopProcessSparkTask(@PathVariable("projectName") String projectName,

                                 @PathVariable("id") Long id, @PathVariable("executeType") String executeType) {

        log.info("--(1)stopProcessSparkTask--begin--projectName:{},id:{},executeType:{}", projectName, id, executeType);

        try {

            HttpHeaders headers = new HttpHeaders();

            headers.set("token", token);

            headers.set("Content-Type", "application/json");

            HttpEntity requestEntity = new HttpEntity(headers);

            ResponseEntity<JSONObject> response = restTemplate.exchange(address + "/"

   + "dolphinscheduler/projects/" + projectName + "/task-instance/list-paging?"

   + "pageNo=1&pageSize=100&taskName=spark-" + id, HttpMethod.GET, requestEntity, JSONObject.class);

List<Map<String, Object>> list = (List<Map<String, Object>>) response.getBody().getJSONObject("data").get("totalList");

            Integer processInstanceId = null;

            for (Map<String, Object> map : list) {

                if (map.get("state").equals("RUNNING\_EXEUTION")) {

                    processInstanceId = Integer.valueOf(map.get("processInstanceId").toString());

                }

            }

            log.info("--(2)getProcessInstanceId--success--:{}", processInstanceId);

            String postURL = address + "/dolphinscheduler/projects/"

                    + URLEncoder.encode(projectName, "utf-8") + "/executors/execute";

            PostMethod postMethod = new PostMethod(postURL);

            postMethod.setRequestHeader("Content-Type",

                    "application/x-www-form-urlencoded;charset=utf-8");

            postMethod.setRequestHeader("token", token);

            NameValuePair[] data = {new NameValuePair("executeType", executeType),

                    new NameValuePair("processInstanceId", processInstanceId.toString())};

            postMethod.setRequestBody(data);

            org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();

            httpClient.executeMethod(postMethod);

            JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());

            if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {

                return Response.error(result.getInteger("code"), result.getString("msg"));

            }

            log.info("--(3)stopProcessSparkTask--success--:{}", result);

            redisTemplate.opsForValue().set(dragTaskState + id, "0", 1, TimeUnit.HOURS);

            DragSparkTaskVo dragSparkTaskVo = new DragSparkTaskVo();

            dragSparkTaskVo.setId(id);

            dragSparkTaskVo.setState("0");

            dragSparkTaskService.updateDragSparkTask(dragSparkTaskVo);

            log.info("--(4)updateDragSparkTask--success");

        } catch (UnsupportedEncodingException e) {

            log.info("UnsupportedEncodingException:{}", e);

        } catch (HttpException e) {

            log.info("HttpException:{}", e);

        } catch (IOException e) {

            log.info("IOException:{}", e);

        }

        return Response.success();

    }

三、本人相关其他文章链接

1.springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理:

https://blog.csdn.net/a924382407/article/details/117119831

2.springboot项目集成dolphinscheduler调度器 实现datax数据同步任务:

https://blog.csdn.net/a924382407/article/details/120951230

3.springboot项目集成dolphinscheduler调度器 项目管理:

https://blog.csdn.net/a924382407/article/details/117118931

4.springboot项目集成大数据第三方dolphinscheduler调度器 执行/停止任务

https://blog.csdn.net/a924382407/article/details/117121181

5.springboot项目集成大数据第三方dolphinscheduler调度器

https://blog.csdn.net/a924382407/article/details/117113848

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、功能清单
  • 二、执行/停止任务
    • 共用的依赖
    • 共用配置文件
    • 共用代码
    • 1.执行任务
    • 2.停止任务
  • 三、本人相关其他文章链接
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档