前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java线程的基本使用

Java线程的基本使用

作者头像
spilledyear
发布2019-12-24 11:28:42
5610
发布2019-12-24 11:28:42
举报
文章被收录于专栏:小白鼠小白鼠

首先,这篇文章写的都是一些比较基础的内容,也就是从API层面解释一下我们平时用的比较多的东西,其实我倒是也想写点底层的东西,可是我也不懂啊。虽然比较基础,但可能却比较容易忽略吧

在Java中使用多线程,本质上还是对Thread对象的操作。线程池只是为了方便对线程的管理,避免频繁的创建和销毁线程带来不必要的系统开销,内部通过指定的线程数和阻塞队列实现。

基本使用

创建一个Thread对象的时候一般会传递一个Runnable对象,任务逻辑就写在Runnable的run方法中。感觉这个Runnable的名字取得不太好,如果叫Task是不是会更好一些呢?

代码语言:javascript
复制
new Thread(()-> doXX() ).start();

获取返回值

上面的那种方式使用起来是挺简单,但会遇到一些问题,比如:能获取返回值不?

通过全局变量

像上面这样是没办法获取返回值的,所以我们需要做一些处理,比如,将结果赋值给一个全局变量

代码语言:javascript
复制
private static int result;

public static void main(String[] args) throws InterruptedException {
    new Thread(() -> {
        System.out.println("处理业务逻辑");
        result = 1000;
    }).start();
    Thread.sleep(1000);
    System.out.println(result);
}

result就是一个全局变量,当任务执行完成之后,更新这个值。这其实都不能算是返回值,但有时候也能用:不需要立即知道任务的执行结果,在访问全部变量的时候,只需要获取它的值就好了。比如通过定时任务去更新缓存,不需要关注任务什么时候执行完成,我需要的只是缓存的值,任务执行了就获取最新的值,没有执行就获取旧值。

通过空轮询

那假如我就是想现在获取返回值咋办?因为我要用这个返回值作为下面逻辑的输入。那或许可以通过轮询的方式检测全局变量来达到目的?

代码语言:javascript
复制
while(result == null){
}

除了白白浪费CPU,好像也行啊?但我现在考虑的只是两个线程,如果有多个线程该对全局变量修改该怎么办呢?那用ThreadLocal?算了,就此打住吧

通过简单封装

或许可以封装一下?再封装之前,先考虑几个问题

  1. 任务的逻辑定义在哪里? 如果用Runnable,就无法返回值,所以可以定义一个有返回值的@FunctionalInterface接口,叫 Task
  2. 返回的值存到哪里?怎么返回?Thread没有相关的方法,扩展一下?
代码语言:javascript
复制
public static void main(String[] args) throws InterruptedException {
    CallableThread callableThread = new CallableThread(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "ccccc";
    });

    callableThread.start();
    System.out.println("开始时间 " + LocalDateTime.now());
    System.out.println(callableThread.get());
    System.out.println("结束时间 " + LocalDateTime.now());
}


class CallableThread<T> extends Thread {
    private Task<T> task;

    private T result;

    private volatile boolean finished = false;

    public CallableThread(Task<T> task) {
        this.task = task;
    }

    @Override
    public void run() {
        synchronized (this) {
            result = task.call();
            finished = true;
            notifyAll();
        }
    }

    public T get() {
        synchronized (this) {
            while (!finished) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return result;
        }
    }
}

@FunctionalInterface
interface Task<T> {
    T call();
}

这样貌似也可以,但是不太好。Thread本来只是用于处理和线程相关的事情,现在将它和逻辑(Task)绑定在一起,如果有多个任务想共用一个Thread,那返回值怎么处理?

是否可以将这部分逻辑抽出来,放到一个新类当中?

代码语言:javascript
复制
public static void main(String[] args) throws InterruptedException {
    MyRunnable<String> myRunnable = new MyRunnable(() -> {
        // 模拟耗时的业务操作
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "我是结果";
    });
    System.out.println("开始时间 " + LocalDateTime.now());
    new Thread(myRunnable).start();

    System.out.println("result: " + myRunnable.get());
    System.out.println("结束时间 " + LocalDateTime.now());
}


class MyRunnable<T> implements Runnable {
    private Task<T> task;

    private T result;

    private volatile boolean finished = false;

    public MyRunnable(Task<T> task) {
        this.task = task;
    }

    @Override
    public void run() {
        synchronized (this) {
            result = task.call();
            finished = true;
            notifyAll();
        }
    }

    public T get() {
        synchronized (this) {
            while (!finished) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return result;
        }
    }
}

这不是和java里面的Future有点像吗?确实有点像

Future模式

Future里面有几个比较核心的概念

  1. Future:抽象出 获取任务返回值获取任务执行状态 等常用方法的接口
  2. Callable:类似于上面的 Task
  3. FutureTask:类似于上面的 MyRunnable

下面看一个例子

代码语言:javascript
复制
public static void main(String[] args) throws ExecutionException, InterruptedException {
    FutureTask<String> future = new FutureTask<>(() -> {
        Thread.sleep(3000);
        System.out.println(System.currentTimeMillis());
        return "hehehh";
    });
    new Thread(future).start();
    System.out.println("Start Get Result : " + System.currentTimeMillis());
    System.out.println("Get Result : " + future.get() + System.currentTimeMillis());
}

Future

Future接口除了提供获取返回值的接口,还提供了一些其他的接口,根据名字大概也可以猜到什么意思,不过多解释了。实在不行看看源码吧,这样子就很愉快了。

代码语言:javascript
复制
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

FutureTask

FutureTask同时实现了RunnableFuture接口,

任务状态

FutureTask中,任务的不同状态通过state变量来表示,状态有以下几种:

代码语言:javascript
复制
/*
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */

private volatile int state;

private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;
任务执行

因为FutureTask本身也实现了 Runnable 接口,所以核心关注它的run方法,执行逻辑其实比较简单:

  1. 先判断状态,如果不为NEW 或者通过cas更新 runner 失败,则直接返回
  2. 执行Callable#call方法,根据执行结果,设置状态, 如果执行成功:先将state设置成COMPLETING,然后保存返回的结果保存到属性outcome,再将state设置成NORMAL,最后通过LockSupport.unpark(t)解除阻塞的线程; 如果执行失败:先将state设置成COMPLETING,然后异常信息保存到属性outcome,再将state设置成EXCEPTIONAL,最后通过LockSupport.unpark(t)解除阻塞的线程;
如何阻塞

当我们通过FutureTask#get方法获取返回值的时候,会阻塞当前线程,那是通过什么方式阻塞当前线程的?是通过LockSupport阻塞的,这个推荐看看博客吧。我也是看博客的,自己也解释的没人家好,嗯,就是这样的

代码语言:javascript
复制
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        // state > COMPLETING ,说明任务要么正常执行,要么异常结束,所以这里可以直接返回
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;   // 这应该是help GC吧?
            return s;
        }
        // 如果正在收尾阶段,交出CPU, 等下次循环
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        // 通过UNSAFE 设置 waiters
        else if (!queued)
            // 将新的`WaitNode`添加到单向链表的头部,waiters即对应头节点
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                    q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);   // 阻塞当前线程
    }
}

上面我们看到了有一个waiters,这是用来干嘛的呢?它是一个单向链表结构,主要是为了处理多次调用FutureTask#get的情况,每调用一次FutureTask#get就会生成一个WaitNode节点,然后将它添加到单向链表的头部

那什么时候用到这个链表呢?在任务执行完成的时候,会执行finishCompletion方法,主要就是从头节点依次往下遍历,获取节点的thread属性,然后执行LockSupport.unpark(thread)解除阻塞

回调如何处理

相对之前的那种方式来说,FutureTask已经很好用了,直接通过FutureTask#get方法就可以获取返回值了,确实蛮方便的。

不过方便是方便,但假如我想在获取返回值之后执行一些其他的逻辑该怎么处理呢?其实我最直接的想法就是回调了。比如,我们可以对上面的MyRunnable代码再扩展一下,例如

代码语言:javascript
复制
public MyRunnable addListener(Consumer c) {
    // 这里是一个例子,肯定不会每次都new一个线程,一般是使用线程池
        while (!finished) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        c.accept(result);
    }).start();
    return this;
}

我们给MyRunnable添加了一个addListener方法,接收一个Consumer作为入参,当任务执行完成之后就执行这段逻辑,如下:

代码语言:javascript
复制
public static void main(String[] args) throws InterruptedException {
    MyRunnable<String> myRunnable = new MyRunnable(() -> {
        // 模拟耗时的业务操作
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "我是结果";
    });
    System.out.println("开始时间 " + LocalDateTime.now());
    new Thread(myRunnable).start();

    myRunnable.addListener(result -> {
        System.out.println("当xxx执行完成之后,线程:" + Thread.currentThread().getName() + " 执行一些其他的任务");
        result = result + "   ggggg";
        System.out.println(result);
    });
}

ListenableFuture

ListenableFutureguava包里面的,对Future进行了增强,ListenableFuture继承了Future,新增了一个添加回调的方法

代码语言:javascript
复制
/**
 * @param listener the listener to run when the computation is complete     回调逻辑
 * @param executor the executor to run the listener in  回调在哪个线程池执行
 */
void addListener(Runnable listener, Executor executor);

ListenableFutureTask继承了FutureTask并且是实现了ListenableFuture接口,看一个简单例子

代码语言:javascript
复制
public static void main(String[] args) throws InterruptedException {
    ListenableFutureTask futureTask = ListenableFutureTask.create(() -> {
        System.out.println("执行任务开始  " + LocalDateTime.now());
        Thread.sleep(3000);
        System.out.println("执行任务完成  " + LocalDateTime.now());
        return "结果";
    });

    futureTask.addListener(() -> System.out.println("获取结果之后,输出一条日志"), MoreExecutors.directExecutor());
    new Thread(futureTask).start();
}
源码分析

原理就是将所有回调维护在一个单向链表中,也就是ExecutionList,然后通过重写``FutureTask#done`方法,在任务完成之后执行回调逻辑

代码语言:javascript
复制
// 每个回调就相当于是一个RunnableExecutorPair节点,所有RunnableExecutorPair节点构成一条链表,头插链表
private final ExecutionList executionList = new ExecutionList();

// ListenableFutureTask#addListener
public void addListener(Runnable listener, Executor exec) {
    executionList.add(listener, exec);
}


// ExecutionList#add
public void add(Runnable runnable, Executor executor) {
    // 上锁,因为它的内部属性 executed 可能会被任务逻辑线程更新,即 ListenableFutureTask 实现了 FutureTask 的done方法,然后会在里面更新 executed 的值为true
    // 还有一点,如果不加锁,当多个线程同时添加回调的时候,可能会造成节点丢失
    synchronized (this) {
        // 如果任务还没有执行完成,就将当前节点添加到头节点
        if (!executed) {
            runnables = new RunnableExecutorPair(runnable, executor, runnables);
            return;
        }
    }

    // 如果任务执行完成,就开始执行回调
    executeListener(runnable, executor);
}


// ExecutionList#executeListener
private static void executeListener(Runnable runnable, Executor executor) {
    try {
        // 直接将任务交给线程池
        executor.execute(runnable);
    } catch (RuntimeException e) {
        log.log(Level.SEVERE, "RuntimeException while executing runnable " + runnable + " with executor " + executor, e);
    }
}

// ExecutionList.RunnableExecutorPair
private static final class RunnableExecutorPair {
    final Runnable runnable;
    final Executor executor;
    @Nullable RunnableExecutorPair next;

    RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) {
        this.runnable = runnable;
        this.executor = executor;
        this.next = next;
    }
}

ListenableFutureTask是怎么知道任务是否执行完成了呢?FutureTask#finishCompletion方法中,解除阻塞的线程之后,还会执行一个done方法,不过该方法在FutureTask没有任何逻辑,可以把它当作是一个模板方法,而ListenableFutureTask实现了该方法,如下:

代码语言:javascript
复制
// ListenableFutureTask#done
protected void done() {
    executionList.execute();
}


// ExecutionList#execute
public void execute() {
    RunnableExecutorPair list;
    synchronized (this) {
        if (executed) {
            return;
        }
        // 首先将executed置为true
        executed = true;
        // runnables代表链表的头节点
        list = runnables;
        runnables = null; // allow GC to free listeners even if this stays around for a while.
    }



    RunnableExecutorPair reversedList = null;
    // 这其实是一个倒置的过程,因为我们添加节点的时候,是插入到头部的,为了保证回调按照我们添加时的顺序执行,即 先添加先执行,所以做了一个倒置
    while (list != null) {
        RunnableExecutorPair tmp = list;
        list = list.next;
        tmp.next = reversedList;
        reversedList = tmp;
    }

    // 遍历链表,依次执行回调逻辑
    while (reversedList != null) {
        executeListener(reversedList.runnable, reversedList.executor);
        reversedList = reversedList.next;
    }
}

FutureCallback

通过ListenableFutureTask,我们可以在任务执行完成之后执行一些回调逻辑。可是细心的同学会发现,回调方法无法使用任务的返回值,那假如我就是想先获取值然后再用这个返回值做下一步操作怎么办?还是只能先通过get方法阻塞当前线程吗?其实guava包中也给了我们相关的接口。先看一个例子:

代码语言:javascript
复制
public static void main(String[] args) throws InterruptedException {
    ListenableFutureTask futureTask = ListenableFutureTask.create(() -> {
        System.out.println("执行任务开始  " + LocalDateTime.now());
        Thread.sleep(3000);
        System.out.println("执行任务完成  " + LocalDateTime.now());
        return "结果";
    });

    Futures.addCallback(futureTask, new FutureCallback<String>() {
        @Override
        public void onSuccess(String result) {
            System.out.println("执行成功: " + result);
        }

        @Override
        public void onFailure(Throwable t) {
            System.out.println("执行失败");
        }
    });

    new Thread(futureTask).start();
}
源码分析

FutureCallback接口里面有两个方法,分别对应任务执行成功逻辑和任务失败逻辑

代码语言:javascript
复制
void onSuccess(@Nullable V result);

void onFailure(Throwable t);

Futures可以堪称是一个门面类,里面封装了一些操作

代码语言:javascript
复制
// Futures#addCallback
public static <V> void addCallback(
    ListenableFuture<V> future, FutureCallback<? super V> callback) {
        // 这里使用了DirectExecutor线程池,即直接在当前线程执行
        addCallback(future, callback, directExecutor());
}

// Futures#addCallback
public static <V> void addCallback(final ListenableFuture<V> future, final FutureCallback<? super V> callback, Executor executor) {
    Runnable callbackListener =
        new Runnable() {
            @Override
            public void run() {
                final V value;
                try {
                    value = getDone(future);
                } catch (ExecutionException e) {
                    callback.onFailure(e.getCause());
                    return;
                } catch (RuntimeException e) {
                    callback.onFailure(e);
                    return;
                } catch (Error e) {
                    callback.onFailure(e);
                    return;
                }
                callback.onSuccess(value);
            }
        };
    // 最终还是将这部分逻辑封装成一个回调,然后在这个回调中获取返回值,根据返回值的结果执行相应的FutureCallback方法
    future.addListener(callbackListener, executor);
}

// Futures#getDone
public static <V> V getDone(Future<V> future) throws ExecutionException {
    checkState(future.isDone(), "Future was expected to be done: %s", future);
    return getUninterruptibly(future);
}
public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException {
    boolean interrupted = false;
    try {
      while (true) {
        try {
          return future.get();
        } catch (InterruptedException e) {
          interrupted = true;
        }
      }
    } finally {
      if (interrupted) {
        Thread.currentThread().interrupt();
      }
    }
}

本质上其实就是将获取返回值的逻辑封装成一个回调,在这个回调中获取返回值,根据返回值的结果执行相应的FutureCallback方法,不过在使用上却方便了好多。

与我们直接通过get方法获取返回值然后再执行其他逻辑还是有区别的,因为我们直接调用Future#get方法会阻塞当前线程,而guava是在回调中执行这部逻辑,类似于一种通知机制,所以不会阻塞当前线程。

ListenableFutureTask

其实Spring里面也有一个ListenableFutureTask,实现上和guava大同小异,也是继承了FutureTask并且实现了自己的ListenableFuture接口,通过重写FutureTask#done方法,在该方法中获取返回值然后执行回调逻辑

代码语言:javascript
复制
public static void main(String[] args) {
    ListenableFutureTask future = new ListenableFutureTask(() -> "结果");

    future.addCallback(new ListenableFutureCallback() {
        @Override
        public void onSuccess(Object result) {
            System.out.println("callback " + result);
        }

        @Override
        public void onFailure(Throwable ex) {
            System.out.println("执行失败 ");
        }
    });
    new Thread(future).start();
}

核心源码

它的Callback是保存在两个Queue里面的:successCallbacksfailureCallbacks,数据结构是LinkedList

代码语言:javascript
复制
private final Queue<SuccessCallback<? super T>> successCallbacks = new LinkedList<SuccessCallback<? super T>>();

private final Queue<FailureCallback> failureCallbacks = new LinkedList<FailureCallback>();

重写的done方法如下,逻辑很简单,就不解释了

代码语言:javascript
复制
protected void done() {
    Throwable cause;
    try {
        T result = get();
        this.callbacks.success(result);
        return;
    }catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        return;
    }catch (ExecutionException ex) {
        cause = ex.getCause();
        if (cause == null) {
            cause = ex;
        }
    }
    catch (Throwable ex) {
        cause = ex;
    }
    this.callbacks.failure(cause);
}

CompletableFuture

可能是之前的Future功能太少了,所以Java8推出了CompletableFuture,功能强大,除了上面说的那些功能,还有很多其他的功能,反正就是吊炸天。而且从DUBBO 2.7开始异步处理都是通过CompletableFuture来实现。

CompletableFuture ForkJoinPoll

总结

总结下来就发现,那些很好用的API,真的是封装的好啊。所以,设计模式真的很重要啊,老铁。。。。。。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 基本使用
  • 获取返回值
    • 通过全局变量
      • 通过空轮询
      • 通过简单封装
      • Future
      • FutureTask
      • ListenableFuture
      • FutureCallback
      • ListenableFutureTask
      • 核心源码
  • Future模式
  • 回调如何处理
  • CompletableFuture
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档