文档中心>微服务平台 TSF

任务停止原理及实践

最近更新时间:2022-06-10 11:21:52

我的收藏
分布式任务调度实现了灵活的任务管理方式,其中之一是停止任务。我们知道, 从系统层面而言,停止一个运行中的线程存在诸多不确定因素。Java 语言体系中, Thread.stop() 方法已遭废弃,此方法的调用会带来不确定的安全问题,并明确不支持 Thread.stop(throwable) 方法。
在本文中,我们将了解分布式任务调度系统是如何实现任务停止,以及如何编写好一个能够满足业务需求的可停止的任务。

预备知识

首先,我们需要了解 Java 体系内,如何停止一个执行中的任务,以及 Java 提供了哪些 API 可供使用。
向一个 Alive 状态的 Thread 发送中断信号,不一定能中断线程的执行。中断信号只是向运行的线程一个建议,告诉它有外界希望中断它,至于线程接受到信号后要做出何种反应,完全由线程及运行状态自身决定。 Java 提供的 API 中,对于中断信号,通常存在两种响应形态:
设置中断状态标志位,通过 Thread.isInterrupted() 进行判断。
抛出中断异常 InterruptedException。

中断状态

对于中断状态标志位,判断条件比较简单,但需要注意 Thread.Interrupted() 和 Thread.isInterrupted() 区别,这里不再进行描述,具体请参见 Java API 文档
基于对中断状态的判断,通常会被作为是否跳出任务执行的依据。

中断异常

当执行任务的线程处于 BLOCKED 状态(例如调用了 wait、sleep、join 等方法)时,向线程发送中断信号,通常会抛出中断异常,如 java.lang.InterruptedException。Java 中常见的中断异常有:
java.lang.InterruptedException
java.io.InterruptedIOException
java.nio.channels.ClosedByInterruptException
基于中断异常的抛出,通过捕获中断异常,通常也用来作为实现跳出任务的执行的一种方式。

准备工作

编写一个 Java 类型的可停止的任务,需要同时实现ExecutableTaskTerminableTask两个接口。
execute(ExecutableTaskData taskData) 方法中编写任务执行逻辑。
cancel(TaskExecuteFuture future, ExecutableTaskData tasData) 方法中编写任务停止逻辑。

停止任务

基于上文对停止线程的分析和说明,分布式任务调度中提供了:
发送中断信号的 API。
开发的 API,允许用户编写灵活的逻辑(例如逻辑标识符),判断并上报是任务终止状态。

中断任务 API

基于对中断线程的分析和说明,分布式任务调度中,对用户侧提供了中断执行任务线程的 API 来向执行线程发送中断信号。 cancel(TaskExecuteFuture future, ExecutableTaskData tasData)
此方法中暴露 Future 对象,底层是对当前任务提交执行后返回的java.util.concurrent.future的封装。 通过调用 future.cancel(boolean) 向执行任务的线程发送中断信号。
当在控制台操作停止任务时,cancel 方法将会被调用。

逻辑终止

通过 Future 方式发送中断信号,并不能确保停止任务的执行。通过提供的 API,可以实现业务上的逻辑终止。
通过在下面两个方法中,按照业务场景的需要,组合实现任务的逻辑终止。
execute(ExecutableTaskData tasData)
cancel(TaskExecuteFuture future, ExecutableTaskData tasData)

编码实践

任务停止的编码实践如下:
import com.tencent.cloud.task.sdk.core.utils.ThreadUtils;
import com.tencent.cloud.task.sdk.client.LogReporter;
import com.tencent.cloud.task.sdk.client.model.ExecutableTaskData;
import com.tencent.cloud.task.sdk.client.model.ProcessResult;
import com.tencent.cloud.task.sdk.client.model.ProcessResultCode;
import com.tencent.cloud.task.sdk.client.model.TerminateResult;
import com.tencent.cloud.task.sdk.client.remoting.TaskExecuteFuture;
import com.tencent.cloud.task.sdk.client.spi.ExecutableTask;
import com.tencent.cloud.task.sdk.client.spi.TerminableTask;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* 模拟可终止的任务
*/
public class SimpleCancelableLogTask implements ExecutableTask,TerminableTask {

private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());


private final AtomicBoolean isCancelled = new AtomicBoolean(false);

@Override
public TerminateResult cancel(TaskExecuteFuture future, ExecutableTaskData taskData) {
// 设置终止状态,终止成功
isCancelled.set(true);
// 返回终止成功
return TerminateResult.newTerminateSuccessResult("terminate success");

}

@Override
public ProcessResult execute(ExecutableTaskData taskData) {
ProcessResult result = new ProcessResult();
LOG.info("start to execute task: " + this.getClass().getName());
LogReporter.log(taskData,"start to execute task...");
// 执行第一段业务逻辑
process1();
if (isCancelled.get()) {
result.setResultCode(ProcessResultCode.TERMINATED);
LogReporter.log(taskData,"task execute failed, caused by terminate. taskName: " + this.getClass().getName());
// 返回终止成功
return result;
}
// 执行第二段业务逻辑
process2();
result.setResultCode(ProcessResultCode.SUCCESS);
LogReporter.log(taskData,"task execute success. taskName: " + this.getClass().getName());
// 返回执行成功
return result;
}

// 模拟业务执行
private void process1() {
long sleepTime = RandomUtils.nextLong(10000, 15000);
ThreadUtils.waitMs(sleepTime);
}
// 模拟业务执行
private void process2() {
long sleepTime = RandomUtils.nextLong(10000, 15000);
ThreadUtils.waitMs(sleepTime);
}
}