前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >java线程池(八):ForkJoinPool源码分析之四(ForkJoinWorkerThread源码)

java线程池(八):ForkJoinPool源码分析之四(ForkJoinWorkerThread源码)

作者头像
冬天里的懒猫
发布2020-10-29 10:43:51
1.2K0
发布2020-10-29 10:43:51
举报

1.类结构及其成员变量

1.1 类结构和注释

类结构代码如下:

代码语言:javascript
复制
public class ForkJoinWorkerThread extends Thread {
    
}
image.png
image.png

ForkJoinWorkerThread继承了Thread类,其注释大意如下:

ForkJoinWorkerThread是由ForkJoinPool管理的线程,该线程执行ForkJoinTask。此类仅可做为扩展功能的需要而被集成,因为没有提供可以调度或者可重新的方法。但是,你可以覆盖主任务处理循环周围初始化和终止方法。如果确实创建了这个类的子类,还需要在ForkJoinPool中提供自定义的ForkJoinWorkerThreadFactory来使用。

1.2 常量

主要有两个:

代码语言:javascript
复制
final ForkJoinPool pool;                // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

注释大意: ForkJoinWorkerThreads由ForkJoinPools管理,并执行ForkJoinTasks。请参见ForkJoinPool的内部文档。此类仅仅维护了指向pool和WorkQueue的链接。pool字段在构造的时候直接设置。但是直到对registerWorker调用完成之后,才设置workQueue字段。这将导致可见性竞争,可以通过要求workQueue字段仅由其所属线程访问来规避这个问题。对于InnocuousForkJoinWorkerThread子类的支持,要求我们在此处和子类中破坏很多封装,通过Unsafe以访问和设置Thread字段。 这是两个final修饰的常量,智能初始化一次。

2.构造函数

2.1 ForkJoinWorkerThread(ForkJoinPool pool)

代码语言:javascript
复制
/**
 * Creates a ForkJoinWorkerThread operating in the given pool.
 *
 * @param pool the pool this thread works in
 * @throws NullPointerException if pool is null
 */
protected ForkJoinWorkerThread(ForkJoinPool pool) {
    // Use a placeholder until a useful name can be set in registerWorker
    super("aForkJoinWorkerThread");
    this.pool = pool;
    this.workQueue = pool.registerWorker(this);
}

其构造函数主要是创建一个在给定pool中的ForkJoinWorkerThread。 构造函数中最主要的方法就是registerWorker。

2.2 ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup, AccessControlContext acc)

代码语言:javascript
复制
ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup,
                     AccessControlContext acc) {
    super(threadGroup, null, "aForkJoinWorkerThread");
    U.putOrderedObject(this, INHERITEDACCESSCONTROLCONTEXT, acc);
    eraseThreadLocals(); // clear before registering
    this.pool = pool;
    this.workQueue = pool.registerWorker(this);
}

这个方法需要注意的是,采用unSafe方法,在这个类的INHERITEDACCESSCONTROLCONTEXT位置处,设置传入的AccessControlContext对象。 之后调用方法eraseThreadLocals将threadLocals清除。 之后与同用的构造函数一致。 擦除ThreadLocal也是采用UnSafe来完成。通过putObject将ThreadLocals的位置设置为null。

代码语言:javascript
复制
/**
 * Erases ThreadLocals by nulling out Thread maps.
 */
final void eraseThreadLocals() {
    U.putObject(this, THREADLOCALS, null);
    U.putObject(this, INHERITABLETHREADLOCALS, null);
}

实际上这个方法将会提供给InnocuousForkJoinWorkerThread继承的时候使用。

3.重要的方法

3.1 run

做为Thread,最重要的就是run方法,我们来看看ForkJoinWorkerThread的实现:

代码语言:javascript
复制
public void run() {
     //如果workQueue为空,则抛出异常
    if (workQueue.array == null) { // only run once
        Throwable exception = null;
        try {
           //调用onStart方法
            onStart();
            //调用pool的runWorker方法,运行workQueue
            pool.runWorker(workQueue);
        } catch (Throwable ex) {
            exception = ex;
        } finally {
           //操作完之后处理
            try {
               //调用onTermination方法
                onTermination(exception);
            } catch (Throwable ex) {
               //如果出现异常,则进行异常处理
                if (exception == null)
                    exception = ex;
            } finally {
               //最终需要指向deregister方法
                pool.deregisterWorker(this, exception);
            }
        }
    }
}

runWorker方法实际上是对workQueue结合随机魔数,选择一个workQueue进行遍历,调用scan方法,如果不为空则执行,反之则wait。 其中registerWorker与deregisterWorker方法 ,我们可以参考前面的ForkJoinPool源码解读。

3.2 定义的可扩展方法

由于ForkJoinWorkerThread还支持继承扩展,因此在此定义了两个扩展的方法:

代码语言:javascript
复制
/**
 * Initializes internal state after construction but before
 * processing any tasks. If you override this method, you must
 * invoke {@code super.onStart()} at the beginning of the method.
 * Initialization requires care: Most fields must have legal
 * default values, to ensure that attempted accesses from other
 * threads work correctly even before this thread starts
 * processing tasks.
 */
protected void onStart() {
}

/**
 * Performs cleanup associated with termination of this worker
 * thread.  If you override this method, you must invoke
 * {@code super.onTermination} at the end of the overridden method.
 *
 * @param exception the exception causing this thread to abort due
 * to an unrecoverable error, or {@code null} if completed normally
 */
protected void onTermination(Throwable exception) {
}

这两个方法用于执行之前和之后,onStart用于run实际执行之前,执行一些初始化操作。onTermination用于run实际执行之后,执行一些清理操作。

4.内部类InnocuousForkJoinWorkerThread

这个类就是继承了ForkJoinWorkerThread的一个实现类。此类定义了一个没有任何权限、也非用户定义的任何线程组的线程。这个线程在运行完每个top的task之后,会擦除所有的ThreadLocals。

image.png
image.png

源码如下:

代码语言:javascript
复制
static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
    /** The ThreadGroup for all InnocuousForkJoinWorkerThreads */
    private static final ThreadGroup innocuousThreadGroup =
        createThreadGroup();

    /** An AccessControlContext supporting no privileges */
    private static final AccessControlContext INNOCUOUS_ACC =
        new AccessControlContext(
            new ProtectionDomain[] {
                new ProtectionDomain(null, null)
            });

    InnocuousForkJoinWorkerThread(ForkJoinPool pool) {
        super(pool, innocuousThreadGroup, INNOCUOUS_ACC);
    }

    @Override // to erase ThreadLocals
    void afterTopLevelExec() {
        eraseThreadLocals();
    }

    @Override // to always report system loader
    public ClassLoader getContextClassLoader() {
        return ClassLoader.getSystemClassLoader();
    }

    @Override // to silently fail
    public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }

    @Override // paranoically
    public void setContextClassLoader(ClassLoader cl) {
        throw new SecurityException("setContextClassLoader");
    }

    /**
     * Returns a new group with the system ThreadGroup (the
     * topmost, parent-less group) as parent.  Uses Unsafe to
     * traverse Thread.group and ThreadGroup.parent fields.
     */
    private static ThreadGroup createThreadGroup() {
        try {
            sun.misc.Unsafe u = sun.misc.Unsafe.getUnsafe();
            Class<?> tk = Thread.class;
            Class<?> gk = ThreadGroup.class;
            long tg = u.objectFieldOffset(tk.getDeclaredField("group"));
            long gp = u.objectFieldOffset(gk.getDeclaredField("parent"));
            ThreadGroup group = (ThreadGroup)
                u.getObject(Thread.currentThread(), tg);
            while (group != null) {
                ThreadGroup parent = (ThreadGroup)u.getObject(group, gp);
                if (parent == null)
                    return new ThreadGroup(group,
                                           "InnocuousForkJoinWorkerThreadGroup");
                group = parent;
            }
        } catch (Exception e) {
            throw new Error(e);
        }
        // fall through if null as cannot-happen safeguard
        throw new Error("Cannot create ThreadGroup");
    }
}

AccessControlContext INNOCUOUS_ACC 定义了一个不支持任何特权访问的AccessControlContext。这个类会创建一个单独的threadGroup,以确保其不属于任何一个用户创建的ThreadGroup。

5. ForkJoinPool中创建工作线程的过程

此时再来结合ForkJoinPool中的ForkJoinWorkerThreadFactory,就能明白ForkJoinThread的创建意义了。ForkJoinPool根据访问权限的需要,定义了采用默认的创建方法,还是创建InnocuousForkJoinWorkerThread。

5.1 makeCommonPool创建过程

再ForkJoinPool重的makeCommPool,有如下代码:

代码语言:javascript
复制
if (factory == null) {
    if (System.getSecurityManager() == null)
        factory = defaultForkJoinWorkerThreadFactory;
    else // use security-managed default
        factory = new InnocuousForkJoinWorkerThreadFactory();
}

这里也就是说,如果System.getSecurityManager()为null,则返回默认的ThreadFactory,而不为null,则说, 使用了默认的安全管理级别,因此将创建InnocuousForkJoinWorkerThreadFactory。

5.2 createWorker创建过程

代码语言:javascript
复制
ForkJoinWorkerThreadFactory fac = factory;
try {
        if (fac != null && (wt = fac.newThread(this)) != null) {
            wt.start();
            return true;
        }
    } catch (Throwable rex) {
        ex = rex;
    }

也就是说,createWorker根据ForkJoinWorkerThreadFactory的实现类来创建。

5.3 InnocuousForkJoinWorkerThreadFactory

代码语言:javascript
复制
static final class InnocuousForkJoinWorkerThreadFactory
    implements ForkJoinWorkerThreadFactory {

    /**
     * An ACC to restrict permissions for the factory itself.
     * The constructed workers have no permissions set.
     */
    private static final AccessControlContext innocuousAcc;
    static {
        Permissions innocuousPerms = new Permissions();
        innocuousPerms.add(modifyThreadPermission);
        innocuousPerms.add(new RuntimePermission(
                               "enableContextClassLoaderOverride"));
        innocuousPerms.add(new RuntimePermission(
                               "modifyThreadGroup"));
        innocuousAcc = new AccessControlContext(new ProtectionDomain[] {
                new ProtectionDomain(null, innocuousPerms)
            });
    }

    public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
        return (ForkJoinWorkerThread.InnocuousForkJoinWorkerThread)
            java.security.AccessController.doPrivileged(
                new java.security.PrivilegedAction<ForkJoinWorkerThread>() {
                public ForkJoinWorkerThread run() {
                    return new ForkJoinWorkerThread.
                        InnocuousForkJoinWorkerThread(pool);
                }}, innocuousAcc);
    }
}

5.4 DefaultForkJoinWorkerThreadFactory

代码语言:javascript
复制
static final class DefaultForkJoinWorkerThreadFactory
    implements ForkJoinWorkerThreadFactory {
    public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
        return new ForkJoinWorkerThread(pool);
    }
}

6.总结

ForkJoinWorkerThread实际上非常简单,就是结合ForkJoinPool,然后根据其需要,创建合适的线程的过程。这里面值得我们借鉴的是,如果需要创建无其他访问权限的线程,实际上这两种线程大部分内容都是相同的,因此可以通过继承来复用大部分代码。之后定义两个factory,让最终的用户根据需要选择factory。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-10-22 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.类结构及其成员变量
    • 1.1 类结构和注释
      • 1.2 常量
      • 2.构造函数
        • 2.1 ForkJoinWorkerThread(ForkJoinPool pool)
          • 2.2 ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup, AccessControlContext acc)
          • 3.重要的方法
            • 3.1 run
              • 3.2 定义的可扩展方法
              • 4.内部类InnocuousForkJoinWorkerThread
              • 5. ForkJoinPool中创建工作线程的过程
                • 5.1 makeCommonPool创建过程
                  • 5.2 createWorker创建过程
                    • 5.3 InnocuousForkJoinWorkerThreadFactory
                      • 5.4 DefaultForkJoinWorkerThreadFactory
                      • 6.总结
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档