前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ClickHouse 同步-异步 Executor

ClickHouse 同步-异步 Executor

原创
作者头像
jasong
发布2021-09-20 11:39:55
1K0
发布2021-09-20 11:39:55
举报
文章被收录于专栏:ClickHouseClickHouse
代码语言:javascript
复制
#pragma once

#include <future>
#include <functional>
#include <Common/ThreadPool.h>

namespace DB
{

/// Interface to run task asynchronously with possibility to wait for execution.
class Executor
{
public:
    virtual ~Executor() = default;
    virtual std::future<void> execute(std::function<void()> task) = 0;
};

/// Executes task synchronously in case when disk doesn't support async operations.
class SyncExecutor : public Executor
{
public:
    SyncExecutor() = default;
    std::future<void> execute(std::function<void()> task) override
    {
        auto promise = std::make_shared<std::promise<void>>();
        try
        {
            task();
            promise->set_value();
        }
        catch (...)
        {
            try
            {
                promise->set_exception(std::current_exception());
            }
            catch (...) { }
        }
        return promise->get_future();
    }
};


/// Runs tasks asynchronously using thread pool.
class AsyncExecutor : public Executor
{
public:
    explicit AsyncExecutor(const String & name_, int thread_pool_size) : name(name_), pool(ThreadPool(thread_pool_size)) { }

    std::future<void> execute(std::function<void()> task) override
    {
        auto promise = std::make_shared<std::promise<void>>();
        pool.scheduleOrThrowOnError([promise, task]() {
            try
            {
                task();
                promise->set_value();
            }
            catch (...)
            {
                tryLogCurrentException("Failed to run async task");

                try
                {
                    promise->set_exception(std::current_exception());
                }
                catch (...)
                {
                }
            }
        });

        return promise->get_future();
    }

    void setMaxThreads(size_t threads) { pool.setMaxThreads(threads); }

private:
    String name;
    ThreadPool pool;
};


}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云服务器
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档