首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ClickHouse Lambda 组合拳

ClickHouse Lambda 组合拳

原创
作者头像
jasong
发布2022-03-08 20:32:57
1K0
发布2022-03-08 20:32:57
举报
文章被收录于专栏:ClickHouseClickHouse

本文主要讲解 ClickHouse 对 Lambda 表达式 使用学习

Cpp reference

https://en.cppreference.com/w/cpp/language/lambda

ClickHouse Lambda

1 定义 结构定义 FormatFactory

using InputFormatPtr = std::shared_ptr<IInputFormat>;
    //Lambda 表达式, 类型定义
    using InputCreator = std::function<InputFormatPtr(
            ReadBuffer & buf,
            const Block & header,
            const RowInputFormatParams & params,
            const FormatSettings & settings)>;

    using OutputCreator = std::function<OutputFormatPtr(
            WriteBuffer & buf,
            const Block & sample,
            const RowOutputFormatParams & params,
            const FormatSettings & settings)>;

    //数据结构定义
    struct Creators
    {
        //类型变量定义 
        InputCreator input_creator;
        OutputCreator output_creator;
        FileSegmentationEngine file_segmentation_engine;
        SchemaReaderCreator schema_reader_creator;
        ExternalSchemaReaderCreator external_schema_reader_creator;
        bool supports_parallel_formatting{false};
        bool is_column_oriented{false};
        NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker;
        AppendSupportChecker append_support_checker;
    };

    //结构MAP 丰富
    using FormatsDictionary = std::unordered_map<String, Creators>;

    //成员变量
    FormatsDictionary dict;

    //提供注册函数
    void registerInputFormat(const String & name, InputCreator input_creator);
    void registerOutputFormat(const String & name, OutputCreator output_creator);

2 FormatFactory 注册借口实现 dict 成员变量注册
void FormatFactory::registerInputFormat(const String & name, InputCreator input_creator)
{
    auto & target = dict[name].input_creator;
    if (target)
        throw Exception("FormatFactory: Input format " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
    target = std::move(input_creator);
    registerFileExtension(name, name);
}

3 调用注册接口 CSVRowInputFormat.h

void registerWithNamesAndTypes(const std::string & base_format_name, RegisterWithNamesAndTypesFunc register_func)
{
    //去掉用 register_func 
    register_func(base_format_name, false, false);
    register_func(base_format_name + "WithNames", true, false);
    register_func(base_format_name + "WithNamesAndTypes", true, true);
}

void registerInputFormatCSV(FormatFactory & factory)
{
    //lamda 注册初始化函数 内部注册
    auto register_func = [&](const String & format_name, bool with_names, bool with_types)
    {
        factory.registerInputFormat(format_name, [with_names, with_types](
            ReadBuffer & buf,
            const Block & sample,
            IRowInputFormat::Params params,
            const FormatSettings & settings)
        {
            //这里极为 lamda 表达式 最终使用 == InputCreator
            return std::make_shared<CSVRowInputFormat>(sample, buf, std::move(params), with_names, with_types, settings);
        });
    };
    //这里是 CSV 传递参
    registerWithNamesAndTypes("CSV", register_func);
}

4 CSVRowInputFormat 定义

class CSVRowInputFormat : public RowInputFormatWithNamesAndTypes
{
public:
    /** with_names - in the first line the header with column names
      * with_types - on the next line header with type names
      */
    CSVRowInputFormat(const Block & header_, ReadBuffer & in_, const Params & params_,
                      bool with_names_, bool with_types_, const FormatSettings & format_settings_);

5 registerInputFomat factory 参数哪里来的

FormatFactory & FormatFactory::instance()
{
    static FormatFactory ret;
    return ret;
}

- main 函数调用

int Server::main(const std::vector<std::string> & /*args*/)
{
    Poco::Logger * log = &logger();

    UseSSL use_ssl;

    MainThreadStatus::getInstance();

    registerFunctions();
    registerAggregateFunctions();
    registerTableFunctions();
    registerStorages();
    registerDictionaries();
    registerDisks();
    registerFormats();
    registerRemoteFileMetadatas();

  • 调用 registreInputFomat
Format
Format

6 Lambda 变形 ParallelParsingInputFormat.h

    using InternalParserCreator = std::function<InputFormatPtr(ReadBuffer & buf)>;

    struct Params
    {
        ReadBuffer & in;
        Block header;
        InternalParserCreator internal_parser_creator;
        FormatFactory::FileSegmentationEngine file_segmentation_engine;
        String format_name;
        size_t max_threads;
        size_t min_chunk_bytes;
        bool is_server;
    };

    //成员变量
    const InternalParserCreator internal_parser_creator;

    explicit ParallelParsingInputFormat(Params params)
        : IInputFormat(std::move(params.header), params.in)
        , internal_parser_creator(params.internal_parser_creator)
        , file_segmentation_engine(params.file_segmentation_engine)
        , format_name(params.format_name)
        , min_chunk_bytes(params.min_chunk_bytes)
        , is_server(params.is_server)
        , pool(params.max_threads)
    {
        // One unit for each thread, including segmentator and reader, plus a
        // couple more units so that the segmentation thread doesn't spuriously
        // bump into reader thread on wraparound.
        processing_units.resize(params.max_threads + 2);

        LOG_TRACE(&Poco::Logger::get("ParallelParsingInputFormat"), "Parallel parsing is used");
    }

7 注册完成如何使用

通过调用 这里第一个参数 name 即为 CSV

InputFormatPtr FormatFactory::getInput(
    const String & name,
    ReadBuffer & buf,
    const Block & sample,
    ContextPtr context,
    UInt64 max_block_size,
    const std::optional<FormatSettings> & _format_settings) const
{

    if (parallel_parsing)
    {
      
        const auto & input_getter = getCreators(name).input_creator;

        RowInputFormatParams row_input_format_params;
        row_input_format_params.max_block_size = max_block_size;
        row_input_format_params.allow_errors_num = format_settings.input_allow_errors_num;
        row_input_format_params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
        row_input_format_params.max_execution_time = settings.max_execution_time;
        row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode;

        /// Const reference is copied to lambda.
        //这里其实做了转换 将 InputCreator —> InternalParserCreator
        auto parser_creator = [input_getter, sample, row_input_format_params, format_settings]
            (ReadBuffer & input) -> InputFormatPtr
            {return input_getter(input, sample, row_input_format_params, format_settings); };
        //input_getter 会就会去创建一个std::make_shared<CSVRowInputFormat>
      
      
        //file_segmentation_engine
        ParallelParsingInputFormat::Params params{
            buf, sample, parser_creator, file_segmentation_engine, name, settings.max_threads, settings.min_chunk_bytes_for_parallel_parsing,
               context->getApplicationType() == Context::ApplicationType::SERVER};
      
      
        return std::make_shared<ParallelParsingInputFormat>(params);
    }
}


const FormatFactory::Creators & FormatFactory::getCreators(const String & name) const
{
    auto it = dict.find(name);
    if (dict.end() != it)
        return it->second;
    throw Exception("Unknown format " + name, ErrorCodes::UNKNOWN_FORMAT);
}

为何这里的语法过了呢

InputFormat->InputFormatParser
InputFormat->InputFormatParser

8 最终的频繁调用

void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number)
{
    if (thread_group)
        CurrentThread::attachToIfDetached(thread_group);

    const auto parser_unit_number = current_ticket_number % processing_units.size();
    auto & unit = processing_units[parser_unit_number];

    try
    {
        setThreadName("ChunkParser");

        /*
         * This is kind of suspicious -- the input_process_creator contract with
         * respect to multithreaded use is not clear, but we hope that it is
         * just a 'normal' factory class that doesn't have any state, and so we
         * can use it from multiple threads simultaneously.
         */
        ReadBuffer read_buffer(unit.segment.data(), unit.segment.size(), 0);

        InputFormatPtr input_format = internal_parser_creator(read_buffer);
        input_format->setCurrentUnitNumber(current_ticket_number);
        InternalParser parser(input_format);

        unit.chunk_ext.chunk.clear();
        unit.chunk_ext.block_missing_values.clear();

        /// Propagate column_mapping to other parsers.
        /// Note: column_mapping is used only for *WithNames types
        if (current_ticket_number != 0)
            input_format->setColumnMapping(column_mapping);

        // We don't know how many blocks will be. So we have to read them all
        // until an empty block occurred.
        Chunk chunk;
        while (!parsing_finished && (chunk = parser.getChunk()) != Chunk())
        {
            /// Variable chunk is moved, but it is not really used in the next iteration.
            /// NOLINTNEXTLINE(bugprone-use-after-move)
            unit.chunk_ext.chunk.emplace_back(std::move(chunk));
            unit.chunk_ext.block_missing_values.emplace_back(parser.getMissingValues());
        }

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Cpp reference
  • ClickHouse Lambda
    • 1 定义 结构定义 FormatFactory
      • 3 调用注册接口 CSVRowInputFormat.h
        • 4 CSVRowInputFormat 定义
          • 5 registerInputFomat factory 参数哪里来的
            • 6 Lambda 变形 ParallelParsingInputFormat.h
              • 7 注册完成如何使用
                • 8 最终的频繁调用
                相关产品与服务
                数据库
                云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档