前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >固定QPS异步任务功能再探

固定QPS异步任务功能再探

作者头像
FunTester
发布2022-12-09 14:26:39
2380
发布2022-12-09 14:26:39
举报
文章被收录于专栏:FunTesterFunTester

前几天分享过固定QPS异步任务功能初探使用了缓存线程池,利用java.util.concurrent.Semaphore实现了固定QPS的异步任务。

昨天录制视频的时候,谈到第二种放弃的实现思路,就是通过将任务丢到一个队列中,然后通过一个线程从线程池取任务,丢到线程池里面执行。今天早上仔细想了想还是很值得实现了,能够很好地降低线程数,刚好我之前在Java自定义异步功能实践 2021-10-19中用到了守护线程,正好充当这个任务管理线程。

下面就是具体的实现:

入口方法

这里依然写到了com.funtester.frame.SourceCode工具类中,关键字我用看funer,区别去之前的funner

代码如下:

代码语言:javascript
复制
    /**
     * 以固定QPS,异步执行,默认16,调整方法{@link SourceCode#setMaxQps(int)}
     * 改种方式在main线程结束后会以QPS /5 +5执行等待任务
     *
     * @param f
     */
    public static void funer(Closure f) {
        fun(JToG.toClosure(() -> {
            ThreadPoolUtil.addQPSTask(f);
            return null;
        }));
    }

实现细节

这里队列我选用了java.util.concurrent.LinkedBlockingQueue,用来存储需要执行的异步任务。添加任务的方法:

代码语言:javascript
复制
    /**
     * 添加异步固定QPS的任务
     * {@link java.util.concurrent.LinkedBlockingQueue#offer(java.lang.Object)}是非阻塞,返回Boolean值
     * @param closure
     * @return
     */
    static def addQPSTask(Closure closure) {
        asyncQueue.offer(closure)
    }

守护线程添加新执行逻辑:

代码语言:javascript
复制
    static boolean daemon() {
        def thread = new Thread(new Runnable() {

            @Override
            void run() {
                SourceCode.noError {
                    while (checkMain()) {
                        SourceCode.sleep(1.0)
                        ASYNC_QPS.times {executeCacheSync()}
                    }
                    waitAsyncIdle()
                }
                ThreadPoolUtil.shutPool()
            }
        })
        thread.setDaemon(true)
        thread.setName("Deamon")
        thread.start()
    }

其中执行异步任务功能:

代码语言:javascript
复制
    /**
     * 使用缓存线程池执行任务,适配第二种方式{@link com.funtester.frame.SourceCode#funer(groovy.lang.Closure)}
     * @return
     */
    static def executeCacheSync() {
        def poll = asyncQueue.poll()
        if (poll != null) executeCacheSync({poll()})
    }

针对本地版本,还有一个关闭线程池的功能,这个主要是防止程序停不下来,跟固定线程的异步任务功能类似,所以写到了一起。

代码语言:javascript
复制
    /**
     * 等待异步线程池空闲
     */
    static void waitAsyncIdle() {
        if (asyncPool == null) return
        SourceCode.time({
            SourceCode.waitFor {
                ((int) (ASYNC_QPS / 5) + 1).times {executeCacheSync()}
                asyncPool.getQueue().size() == 0 && asyncPool.getActiveCount() == 0 && asyncQueue.size() == 0
            }
        }, "异步线程池等待")
    }

    /**
     * 关闭异步线程池,不然会停不下来*/
    static void shutPool() {
        if (!getFunPool().isShutdown()) {
            log.info(Output.rgb("异步线程池关闭!"))
            getFunPool().shutdown()
        }
        if (cachePool != null && !cachePool.isShutdown()) {
            cachePool.shutdown()
        }
    }

自测

用例:

代码语言:javascript
复制
    static void main(String[] args) {
        setMaxQps(1)
        10.times {
            funer {
                output(Time.getDate())
            }
        }
        sleep(5.0)
        output("FunTester")
    }

控制台输出:

代码语言:javascript
复制
22:02:36.770 main 
  ###### #     #  #    # ####### ######  #####  ####### ######  #####
  #      #     #  ##   #    #    #       #         #    #       #    #
  ####   #     #  # #  #    #    ####    #####     #    ####    #####
  #      #     #  #  # #    #    #            #    #    #       #   #
  #       #####   #    #    #    ######  #####     #    ######  #    #

22:02:38.300 C-1  2022-10-26 22:02:38
22:02:39.294 C-1  2022-10-26 22:02:39
22:02:40.298 C-1  2022-10-26 22:02:40
22:02:41.299 C-1  2022-10-26 22:02:41
22:02:42.261 main FunTester
22:02:42.301 C-1  2022-10-26 22:02:42
22:02:42.320 C-1  2022-10-26 22:02:42
22:02:42.525 C-1  2022-10-26 22:02:42
22:02:42.730 C-1  2022-10-26 22:02:42
22:02:42.931 C-1  2022-10-26 22:02:42
22:02:43.133 C-1  2022-10-26 22:02:43
22:02:43.133 Deamon 异步线程池等待执行耗时:825 ms
22:02:43.153 Deamon 异步线程池关闭!

进程已结束,退出代码0
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-11-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 FunTester 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 入口方法
  • 实现细节
  • 自测
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档