前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >时序数据库Influx-IOx源码学习四(Run命令的执行)

时序数据库Influx-IOx源码学习四(Run命令的执行)

作者头像
刘涛华
发布2021-04-26 10:50:25
6430
发布2021-04-26 10:50:25
举报

上篇介绍到:InfluxDB-IOx的命令行及配置,详情见:https://my.oschina.net/u/3374539/blog/5017858

这章记录一下Run命令的执行过程。

代码语言:javascript
复制
 //根据用户在命令行配置的num_threads参数
 //来选择创建一个多线程的模型,还是current_thread的模型
 //后面有时间深入研究tokio的时候再来分析有什么异同
 let tokio_runtime = get_runtime(config.num_threads)?;
 //block_on会让线程一直等待方法里的future执行完成
 //这是让闭包中的方法占有了io driver 和 timer context
 tokio_runtime.block_on(async move {
        let host = config.host;
        match config.command {
            // 省略其它command ... 

            Command::Run(config) => {
                //具体去子类型里执行,然后await一个结果
                if let Err(e) = commands::run::command(logging_level, *config).await {
                    eprintln!("Server command failed: {}", e);
                    std::process::exit(ReturnCode::Failure as _)
                }
            }
        }
});

influxdb_ioxd::main方法中,忽略一些不太需要重点关注的,分别是初始化log的管理、PanicsTracingCancellationToken等。

代码语言:javascript
复制
    //初始化对象存储
    let object_store = ObjectStore::try_from(&config)?;
    //可以看到,目前已经支持了
    //1.内存(在container环境运行时候使用)
    //2.Google
    //3.S3
    //4.Azure
    //5.File 本地文件,方便开发者调试运行在云上时候的文件变化
    fn try_from(config: &Config) -> Result<Self, Self::Error> {
        match config.object_store {
            Some(ObjStoreOpt::Memory) | None => {
           //创建一个btreemap用来缓存或者搜索
           Ok(Self::new_in_memory(object_store::memory::InMemory::new()))
            }

            Some(ObjStoreOpt::Google) => {
                // 省略
            }

            Some(ObjStoreOpt::S3) => {
               // 省略
            }

            Some(ObjStoreOpt::Azure) => { 
              // 省略
            }

            Some(ObjStoreOpt::File) => match config.database_directory.as_ref() {
                Some(db_dir) => {
                    //去递归创建这个配置路径中的文件夹
                    //context也是使用的snafu来处理错误的
                    fs::create_dir_all(db_dir)
                        .context(CreatingDatabaseDirectory { path: db_dir })?;
                    //都创建完成,并且没出错误,把路径保存起来
                    Ok(Self::new_file(object_store::disk::File::new(&db_dir)))
                }
                // 如果database_directory这个参数没有配置的时候
                //使用snafu这个crate来返回一个错误
                None => MissingObjectStoreConfig {
                    object_store: ObjStoreOpt::File,
                    missing: "data-dir",
                }
                .fail(),
            },
        }
    }

关于错误处理的代码:

代码语言:javascript
复制
 #[snafu(display("Unable to create database directory {:?}: {}", path, source))]
    CreatingDatabaseDirectory {
        path: PathBuf,
        source: std::io::Error,
    },

 #[snafu(display(
        "Specified {} for the object store, required configuration missing for {}",
        object_store,
        missing
    ))]
    MissingObjectStoreConfig {
        object_store: ObjStoreOpt,
        missing: String,
    },

我们来测试一下错误的场景,来看看是否符合代码的预期。

代码语言:javascript
复制
// 不传入路径
 cargo run run --object-store file
    Finished dev [unoptimized + debuginfo] target(s) in 0.42s
     Running `./influxdb_iox run --object-store file`
Apr 15 13:38:34.352  INFO influxdb_iox::influxdb_ioxd: Using File for object storage
Server command failed: Run: Specified File for the object store, required configuration missing for data-dir

//传入一个创建不了的路径
cargo run run --object-store file --data-dir /root/1/1
    Finished dev [unoptimized + debuginfo] target(s) in 0.47s
     Running `./influxdb_iox run --object-store file --data-dir /root/1/1`
Apr 15 13:45:26.664  INFO influxdb_iox::influxdb_ioxd: Using File for object storage
Server command failed: Run: Unable to create database directory "/root/1/1": Read-only file system (os error 30)

可以看到是符合预期的,bingo


代码语言:javascript
复制
//创建一个空的结构体
let connection_manager = ConnectionManager {};
//创建AppServer结构体用来保存基本的信息
//server_config里就是保存的对象存储的信息及线程配置
//如果num_worker_threads没有填写,默认就使用cpu数量
let app_server = Arc::new(AppServer::new(connection_manager, server_config));
//不设置这个writer_id能启动,但是不能做任何操作
if let Some(id) = config.writer_id {
        //compare and set 一个非0的数值,错误就打印一个指定的panic
        app_server.set_id(id).expect("writer id already set");
        //校验所有的配置
        if let Err(e) = app_server.load_database_configs().await {
            error!(
                "unable to load database configurations from object storage: {}",
                e
            )
        }
    } else {
        warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID config or API before writing or querying data.");
    }

接下来进入load_database_configs方法看看,

代码语言:javascript
复制
let list_result = self
            .store
            //把write_id和配置的文件路径组合一下,作为一个目录
            //遍历文件夹中的所有东西,用一个BTreeSet存所有子文件夹
            //用Vec存下所有的文件信息,包括路径、修改时间、大小等
            .list_with_delimiter(&self.root_path()?)
            .await
            .context(StoreError)?;
        //拿到配置的server的write_id
        let server_id = self.require_id()?;

        let handles: Vec<_> = list_result
            //配置的文件夹下的所有文件夹
            .common_prefixes
            .into_iter()
            //全部进行map转换
            .map(|mut path| {
                let store = Arc::clone(&self.store);
                let config = Arc::clone(&self.config);
                let exec = Arc::clone(&self.exec);
                //先找database的相关信息文件,名字叫rules.pb
                path.set_file_name(DB_RULES_FILE_NAME);
                //感觉是需要io来读取文件内容,所以开一个异步
                tokio::task::spawn(async move {
                    let mut res = get_store_bytes(&path, &store).await;

                    //省略错误处理。。

                    let res = res.unwrap().freeze();
                    //解析文件内容,根据文件名可以看出是个pb文件。
                    match DatabaseRules::decode(res) {
                        Err(e) => {
                            //省略错误。。
                        }
                        //根据解析出来的文件内容,在内存中恢复回来db的相关信息
                        Ok(rules) => match config.create_db(rules) {
                            Err(e) => error!("error adding database to config: {}", e),
                            //提交一个后台任务,用来不断的检测chunks的状态
                            //比如达到了某个大小,然后写入到存储等
                            Ok(handle) => handle.commit(server_id, store, exec),
                        },
                    }
                })
            })
            .collect();
        //等待所有任务完成
        futures::future::join_all(handles).await;

这里就启动完成了一个基本的服务,创建了存储路径、初始化数据库的基本配置、启动了一个用来刷盘、整理chunk的后台任务。


接下来就是启动连接相关的了。

代码语言:javascript
复制
    //从启动命令行中读取grpc的地址
    let grpc_bind_addr = config.grpc_bind_address;
    //绑定这个地址
    let socket = tokio::net::TcpListener::bind(grpc_bind_addr)
        .await
        .context(StartListeningGrpc { grpc_bind_addr })?;
    //真正的协议启动
    let grpc_server = rpc::serve(socket, Arc::clone(&app_server), frontend_shutdown.clone()).fuse();

    //同样的启动http相关的服务,使用的hyper库
    let bind_addr = config.http_bind_address;
    let addr = AddrIncoming::bind(&bind_addr).context(StartListeningHttp { bind_addr })?;
    let http_server = http::serve(addr, Arc::clone(&app_server), frontend_shutdown.clone()).fuse();

    //省略后面的停止流程。。。

然后看grpc的启动的服务

代码语言:javascript
复制
    //启动起来健康检查的服务
    let stream = TcpListenerStream::new(socket);
    let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
    //标识相对应的服务已经是可以提供服务的状态了
    let services = [
        generated_types::STORAGE_SERVICE,
        generated_types::IOX_TESTING_SERVICE,
        generated_types::ARROW_SERVICE,
    ];
    for service in &services {
        health_reporter
            .set_service_status(service, tonic_health::ServingStatus::Serving)
            .await;
    }

   //增加一堆使用grpc的服务,并启动起来
    tonic::transport::Server::builder()
        .add_service(health_service)
        .add_service(testing::make_server())
        .add_service(storage::make_server(Arc::clone(&server)))
        .add_service(flight::make_server(Arc::clone(&server)))
        .add_service(write::make_server(Arc::clone(&server)))
        .add_service(management::make_server(Arc::clone(&server)))
        .add_service(operations::make_server(server))
        .serve_with_incoming_shutdown(stream, shutdown.cancelled())
        .await

然后是http相关的启动

代码语言:javascript
复制
pub async fn serve<M>(
    addr: AddrIncoming,
    server: Arc<AppServer<M>>,
    shutdown: CancellationToken,
) -> Result<(), hyper::Error>
where
    M: ConnectionManager + Send + Sync + Debug + 'static,
{
    //初始化路由相关的信息
    let router = router(server);
    let service = RouterService::new(router).unwrap();
    //启动服务
    hyper::Server::builder(addr)
        .serve(service)
        .with_graceful_shutdown(shutdown.cancelled())
        .await
}

顺便看一下都提供了哪些地址可以被访问的:

代码语言:javascript
复制
 Router::builder()
        .data(server)
        //写了一个拦截,打印请求参数和返回结果
        .middleware(Middleware::pre(|req| async move {
            debug!(request = ?req, "Processing request");
            Ok(req)
        }))
        .middleware(Middleware::post(|res| async move {
            debug!(response = ?res, "Successfully processed request");
            Ok(res)
        })) // this endpoint is for API backward compatibility with InfluxDB 2.x
        .post("/api/v2/write", write::<M>)
        .get("/health", health)
        .get("/metrics", handle_metrics)
        .get("/iox/api/v1/databases/:name/query", query::<M>)
        .get("/iox/api/v1/databases/:name/wal/meta", get_wal_meta::<M>)
        .get("/api/v1/partitions", list_partitions::<M>)
        .post("/api/v1/snapshot", snapshot_partition::<M>)
        //错误的时候调用的处理拦截
        .err_handler_with_info(error_handler)
        .build()
        .unwrap()

做一个/health的测试:

代码语言:javascript
复制
curl localhost:8080/health
OK%

可以看到成功返回了值。


到这里基本启动就完成了,后面再用到的时候会继续对启动里的细节做研究,比如PanicsLog等等吧,欢迎持续关注。

祝玩儿的开心

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据库技术研究 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档