前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >nydusd 源码理解(二)

nydusd 源码理解(二)

作者头像
abin
发布2023-03-21 20:33:07
9830
发布2023-03-21 20:33:07
举报
文章被收录于专栏:abin在路上

“尝试通过 nydus[1] 源码理解工作流程。这是第二部分,主要包括 API Server 和 Backend 的流程。

1. DAEMON_CONTROLLER 初始化

接上文 nydusd 源码理解(一),回到process_fs_service函数,创建daemon实例完成后,替换DAEMON_CONTROLLERdaemon的值为新的daemon

代码语言:javascript
复制
DAEMON_CONTROLLER.set_daemon(daemon);

 /// Set the daemon service object.
pub fn set_daemon(&self, daemon: Arc<dyn NydusDaemon>) -> Option<Arc<dyn NydusDaemon>> {
    self.daemon.lock().unwrap().replace(daemon)
}

DAEMON_CONTROLLER什么时候被初始化的呢?

DAEMON_CONTROLLER 为全局变量,是 DaemonController 结构体的引用。

“在 Rust 中,应尽可能避免使用全局变量。取而代之,尽早创建对象(比如在 main 函数中),然后将对该对象的可变引用传递到需要它的位置。

代码语言:javascript
复制
lazy_static! {
    static ref DAEMON_CONTROLLER: DaemonController = DaemonController::new();
}

“lazy_static! 是给静态变量延迟赋值的宏,所有static类型的变量会在第一次被使用时初始化,并且只初始化一次。初始化包括分配需要的堆,如vectorhash map,和非常量函数调用。

通过DaemonController::new()初始化DAEMON_CONTROLLER 实例时会创建对应的 poller 等实例,这些在程序运行期间都是作为全局变量使用。

代码语言:javascript
复制
fn new() -> Self {
    let poller = Poll::new().expect("Failed to create `ServiceController` instance");
    let waker = Waker::new(poller.registry(), Token(1))
        .expect("Failed to create waker for ServiceController");

    Self {
        active: AtomicBool::new(true),
        singleton_mode: AtomicBool::new(true),
        daemon: Mutex::new(None),
        blob_cache_mgr: Mutex::new(None),
        fs_service: Mutex::new(None),
        waker: Arc::new(waker),
        poller: Mutex::new(poller),
    }
}

Pollmio[2] crate 中的结构体。Poll::new() 会新建 Selector 实例,Selector 创建的时候会创建 epoll,返回值存储在 Selector 实例中:

Selector示例存储在Poll.registry.selector中:

Waker::new(poller.registry(), Token(1)) 会注册 waker,并创建 sys::Waker 实例。通过执行系统调用eventfd得到的 fd 和传入的Selector 注册 waker:

代码语言:javascript
复制
pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
    syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK)).and_then(|fd| {
        // Turn the file descriptor into a file first so we're ensured
        // it's closed when dropped, e.g. when register below fails.
        let file = unsafe { File::from_raw_fd(fd) };
        selector
            .register(fd, token, Interest::READABLE)
            .map(|()| Waker { fd: file })
    })
}

其中,fd 是用于进程间通信的 unix socket fd。

此外,DaemonController初始化时还会初始化以下内容:

(1)active: AtomicBool::new(true),判断DaemonController是否 ready 的原子 bool 类型

(2)singleton_mode: AtomicBool::new(true),是否启用 singleton 模式的原子 bool 类型

(3)daemon: Mutex::new(None),daemon 的互斥引用(同一时刻只有一个地方可用)

(4)blob_cache_mgr: Mutex::new(None),BlobCacheMgr 的互斥引用

(5)fs_service: Mutex::new(None),FsService 的互斥引用

(6)waker: Arc::new(waker),Waker 的引用

(7)poller: Mutex::new(poller),Poll 的互斥引用

至此,DAEMON_CONTROLLER初始化完成。

回到 main() 函数中,接下来设置默认的fs service

代码语言:javascript
复制
let daemon = DAEMON_CONTROLLER.get_daemon();
if let Some(fs) = daemon.get_default_fs_service() {
    DAEMON_CONTROLLER.set_fs_service(fs);
}

实际上就是对应FsService的拷贝:

代码语言:javascript
复制
fn get_default_fs_service(&self) -> Option<Arc<dyn FsService>> {
    Some(self.service.clone())
}

接下来,启动API Server线程:

代码语言:javascript
复制
// Start the HTTP Administration API server
let mutapi_controller = ApiServerController::new(apisock);
api_controller.start()?;

2. API Server 线程

启动 API Server 的函数:

代码语言:javascript
复制
let mutapi_controller = ApiServerController::new(apisock);
api_controller.start()?;

这里,再次传入 apisock 作为参数,初始化 ApiServerController 实例:

代码语言:javascript
复制
pub fn new(sock: Option<&str>) -> Self {
    ApiServerController {
        sock: sock.map(|v| v.to_string()),
        http_handler_thread: None,
        http_router_thread: None,
        waker: None,
    }
}

接下来,通过api_controller.start()方法启动 API Server:

代码语言:javascript
复制
/// Try to start the HTTP working thread.
pub fn start(&mutself) -> Result<()> {
    ifself.sock.is_none() {
        return Ok(());
    }

    // Safe to unwrap() because self.sock is valid.
    let apisock =self.sock.as_ref().unwrap();
    let (to_handler, from_router) = channel();
    let (to_router, from_handler) = channel();
    let api_server = ApiServer::new(to_router)?;
    let api_handler = ApiServerHandler::new(api_server, from_router)?;
    let (router_thread, waker) = start_http_thread(apisock, None, to_handler, from_handler)?;
    let daemon_waker = DAEMON_CONTROLLER.waker.clone();

    info!("HTTP API server running at {}", apisock);
    let handler_thread = std::thread::Builder::new()
        .name("api-server".to_string())
        .spawn(move || {
            api_handler.handle_requests_from_router();
            info!("HTTP api-server handler thread exits");
            let _ = daemon_waker.wake();
            Ok(())
        })
        .map_err(|_e| einval!("Failed to start work thread for HTTP handler"))?;

    self.waker = Some(waker);
    self.http_handler_thread = Some(handler_thread);
    self.http_router_thread = Some(router_thread);

    Ok(())
}

首先检查 sock 参数是否正确,然后,创建两个用于进程间通信的 channel,创建 ApiServerApiServerHandler 实例:

代码语言:javascript
复制
// Safe to unwrap() because self.sock is valid.
let apisock =self.sock.as_ref().unwrap();
let (to_handler, from_router) = channel();
let (to_router, from_handler) = channel();
let api_server = ApiServer::new(to_router)?;
let api_handler = ApiServerHandler::new(api_server, from_router)?;

这里的两个 channel 用于 ApiServerApiServerHandler 之间进行全双工通信。

接下来,启动 HTTP Server 线程。start_http_thread() 方法传入的参数包括 apisock 和两个 channel 的 to_handler(用于向ApiServerHandler发送消息)和 from_handler(用于接收来自 ApiServerHandler 的消息):

代码语言:javascript
复制
/// Start a HTTP server to serve API requests.
///
/// Start a HTTP server parsing http requests and send to nydus API server a concrete
/// request to operate nydus or fetch working status.
/// The HTTP server sends request by `to_api` channel and wait for response from `from_api` channel.
pub fn start_http_thread(
    path: &str,
    api_notifier: Option<Arc<Waker>>,
    to_api: Sender<Option<ApiRequest>>,
    from_api: Receiver<ApiResponse>,
) -> Result<(thread::JoinHandle<Result<()>>, Arc<Waker>)> {}

首先,删除可能残留的 apisock 文件,创建新的 Poll 实例和 Waker 实例,Poll 实例会创建 epoll,Waker 会注册 waker 并返回 Waker 实例。

然后,创建 HttpServer 实例:

代码语言:javascript
复制
let mutserver = HttpServer::new(socket_path).map_err(|e| {
    if let ServerError::IOError(e) = e {
        e
    } else {
        Error::new(ErrorKind::Other, format!("{:?}", e))
    }
})?;

HttpServer(dbs-uhttp[3] 中的结构体)在创建实例时会绑定 socket,将 socket 设置为 nonblocking 模式,并返回 HttpServer 实例:

代码语言:javascript
复制
pub fn new<P: AsRef<Path>>(path_to_socket: P) -> Result<Self> {
    let socket = UnixListener::bind(path_to_socket).map_err(ServerError::IOError)?;
    Self::new_from_socket(socket)
}

然后,注册 poll,开始监听 socket:

代码语言:javascript
复制
poll.registry().register(
    &mut SourceFd(&server.epoll().as_raw_fd()),
    REQUEST_TOKEN,
    Interest::READABLE,
)?;

接下来,启动名称为 nydus-http-server 的线程,处理来自 unix socket 的请求。

在线程中,首先通过server.start_server()方法将对应 unix socket 添加到 poll,start_server() 是 HttpServer 结构体的方法:

代码语言:javascript
复制
/// Starts the HTTP Server.
pub fnstart_server(&mutself) -> Result<()> {
    // Add the socket on which we listen for new connections to the
    // `epoll` structure.
    Self::epoll_add(
        &self.poll,
        Token(self.socket.as_raw_fd() as usize),
self.socket.as_raw_fd(),
    )
}

设置 event 最多能处理的事件数量 capacity:

代码语言:javascript
复制
let mutevents = Events::with_capacity(100);

loop循环中处理从 poll 接收到的 event

代码语言:javascript
复制
loop {
    // 如果接收 event 时出错,会退出线程
    matchpoll.poll(&mutevents, None) {
        Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
        Err(e) => {
            error!("http server poll events failed, {}", e);
            exit_api_server(api_notifier, &to_api);
            return Err(e);
        }
        Ok(_) => {}
    }

    // 遍历每个 event
    for event in &events {
        // 根据 event.token() 进行 match
        match event.token() {
            // 结束线程
            EXIT_TOKEN =>do_exit = true,
            // 如果是REQUEST_TOKEN,会进一步根据server.requests()进行 match
            REQUEST_TOKEN => matchserver.requests() {
                // 没有出错时对 request 进行处理
                Ok(request_vec) => {
                    // 遍历 server_request
                    for server_request in request_vec {
                        let reply = server_request.process(|request| {
                            // 处理 http 请求
handle_http_request(
                                request,
                                api_notifier.clone(),
                                &to_api,
                                &from_api,
                            )
                        });
                        // Ignore error when sending response
server.respond(reply).unwrap_or_else(|e| {
                            error!("HTTP server error on response: {}", e)
                        });
                    }
                }
                Err(e) => {
                    error!("HTTP server error on retrieving incoming request: {}", e);
                }
            },
            _ => unreachable!("unknown poll token."),
        }
    }

    ifdo_exit {
        exit_api_server(api_notifier, &to_api);
        break;
    }
}

server_request.process() 会通过处理函数 handle_http_requestrequest 进行处理,返回 ServerResponse 实例:

代码语言:javascript
复制
/// Calls the function provided on the inner request to obtain the response.
/// The response is then wrapped in a `ServerResponse`.
///
/// Returns a `ServerResponse` ready for yielding to the server
pub fn process<F>(&self, mutcallable: F) -> ServerResponse
where
    F: FnMut(&Request) -> Response,
{
    let http_response =callable(self.inner());
    ServerResponse::new(http_response, self.id)
}

处理 http 请求的函数handle_http_request

代码语言:javascript
复制
fn handle_http_request(
    request: &Request,    // http 请求包
    api_notifier: Option<Arc<Waker>>,
    to_api: &Sender<Option<ApiRequest>>,    // 发送到 api 的 channel
    from_api: &Receiver<ApiResponse>,    // 从 api 给其他进程发送消息的 channel
) -> Response {
    let begin_time = SystemTime::now();
    // 跟踪请求头,打印日志
    trace_api_begin(request);

    // Micro http should ensure that req path is legal.
    // 解析 url 链接
    let uri_parsed = request.uri().get_abs_path().parse::<Uri>();
    // 生成response
    let mutresponse = match uri_parsed {
        Ok(uri) => match HTTP_ROUTES.routes.get(uri.path()) {
            // route 不为 None 时进行处理
            Some(route) => route
                .handle_request(request, &|r| {
                    // 请求的实际处理函数
                    kick_api_server(api_notifier.clone(), to_api, from_api, r)
                })
                .unwrap_or_else(|err| error_response(err, StatusCode::BadRequest)),
            None => error_response(HttpError::NoRoute, StatusCode::NotFound),
        },
        Err(e) => {
            error!("Failed parse URI, {}", e);
            error_response(HttpError::BadRequest, StatusCode::BadRequest)
        }
    };
response.set_server("Nydus API");
response.set_content_type(MediaType::ApplicationJson);

    trace_api_end(&response, request.method(), begin_time);

response
}

其中,HTTP_ROUTES.routes.get(uri.path())会根据请求的 URL 找到对应的事件处理对象,HTTP_ROUTE记录了支持的 API 列表:

代码语言:javascript
复制
lazy_static! {
    /// HTTP_ROUTES contain all the nydusd HTTP routes.
    pub static ref HTTP_ROUTES: HttpRoutes = {
        let mutr = HttpRoutes {
            routes: HashMap::new(),
        };

        // Common
r.routes.insert(endpoint_v1!("/daemon/events"), Box::new(EventsHandler{}));
r.routes.insert(endpoint_v1!("/daemon/exit"), Box::new(ExitHandler{}));
r.routes.insert(endpoint_v1!("/daemon/start"), Box::new(StartHandler{}));
r.routes.insert(endpoint_v1!("/daemon/fuse/sendfd"), Box::new(SendFuseFdHandler{}));
r.routes.insert(endpoint_v1!("/daemon/fuse/takeover"), Box::new(TakeoverFuseFdHandler{}));
r.routes.insert(endpoint_v1!("/mount"), Box::new(MountHandler{}));
r.routes.insert(endpoint_v1!("/metrics/backend"), Box::new(MetricsBackendHandler{}));
r.routes.insert(endpoint_v1!("/metrics/blobcache"), Box::new(MetricsBlobcacheHandler{}));

        // Nydus API, v1
r.routes.insert(endpoint_v1!("/daemon"), Box::new(InfoHandler{}));
r.routes.insert(endpoint_v1!("/daemon/backend"), Box::new(FsBackendInfo{}));
r.routes.insert(endpoint_v1!("/metrics"), Box::new(MetricsFsGlobalHandler{}));
r.routes.insert(endpoint_v1!("/metrics/files"), Box::new(MetricsFsFilesHandler{}));
r.routes.insert(endpoint_v1!("/metrics/inflight"), Box::new(MetricsFsInflightHandler{}));
r.routes.insert(endpoint_v1!("/metrics/pattern"), Box::new(MetricsFsAccessPatternHandler{}));

        // Nydus API, v2
r.routes.insert(endpoint_v2!("/daemon"), Box::new(InfoV2Handler{}));
r.routes.insert(endpoint_v2!("/blobs"), Box::new(BlobObjectListHandlerV2{}));

r
    };
}

Request是标准的 HTTP 请求包:

不同事件处理对象的handle_request方法不同,kick_api_server 作为闭包传入handle_request方法,作用是通过 channel发送请求消息,同时通过from_api.recv()接收请求的处理结果:

代码语言:javascript
复制
fn kick_api_server(
    api_notifier: Option<Arc<Waker>>,
    to_api: &Sender<Option<ApiRequest>>,
    from_api: &Receiver<ApiResponse>,
    request: ApiRequest,    // requesty 内容
) -> ApiResponse {
    // 将 request 通过 channel 发送
    to_api.send(Some(request)).map_err(ApiError::RequestSend)?;
    if let Some(waker) = api_notifier {
        waker.wake().map_err(ApiError::Wakeup)?;
    }
    // 接收 request 的处理结果
    from_api.recv().map_err(ApiError::ResponseRecv)?
}

API 对应的处理函数:

以 Mount 的handle_request为例:

代码语言:javascript
复制
pub struct MountHandler {}
impl EndpointHandler for MountHandler {
    fn handle_request(
        &self,
        req: &Request,
        kicker: &dyn Fn(ApiRequest) -> ApiResponse,
    ) -> HttpResult {
        let mountpoint = extract_query_part(req, "mountpoint").ok_or_else(|| {
            HttpError::QueryString("'mountpoint' should be specified in query string".to_string())
        })?;
        match (req.method(), req.body.as_ref()) {
            (Method::Post, Some(body)) => {
                // 从请求的 body 中解析 command 内容
                let cmd = parse_body(body)?;
                // 通过 channel 发起 Mount 消息,包含 mountpoint 和 cmd
                let r = kicker(ApiRequest::Mount(mountpoint, cmd));
                Ok(convert_to_response(r, HttpError::Mount))
            }
            (Method::Put, Some(body)) => {
                let cmd = parse_body(body)?;
                let r = kicker(ApiRequest::Remount(mountpoint, cmd));
                Ok(convert_to_response(r, HttpError::Mount))
            }
            (Method::Delete, None) => {
                let r = kicker(ApiRequest::Umount(mountpoint));
                Ok(convert_to_response(r, HttpError::Mount))
            }
            _ => Err(HttpError::BadRequest),
        }
    }
}

处理 Request 的日志:

至此,API Server Handler 启动完成。

那么,什么地方接收kicker发起的请求呢?

3. API Server Handler 线程

回到start方法,接下来,会启动名为 api-server 的线程,该线程用于实际处理请求:

代码语言:javascript
复制
let handler_thread = std::thread::Builder::new()
.name("api-server".to_string())
.spawn(move || {
    api_handler.handle_requests_from_router();
    info!("HTTP api-server handler thread exits");
    let _ = daemon_waker.wake();
    Ok(())
})
.map_err(|_e| einval!("Failed to start work thread for HTTP handler"))?;

handle_requests_from_router() 方法内是一个loop函数,用于接收来自 nydus-http-server 的请求:

代码语言:javascript
复制
fn handle_requests_from_router(&self) {
    loop {
        match self.api_receiver.recv() {
            Ok(request) => {
                if let Some(req) = request {
                    self.server.process_request(req).unwrap_or_else(|e| {
                        error!("HTTP handler failed to process request, {}", e)
                    });
                } else {
                    debug!("Received exit notification from the HTTP router");
                    return;
                }
            }
            Err(_e) => {
                error!("Failed to receive request from the HTTP router");
                return;
            }
        }
    }
}

self.server.process_request()方法根据请求类型选择相应的处理函数:

代码语言:javascript
复制
fn process_request(&self, request: ApiRequest) -> Result<()> {
    let resp = match request {
        // Common (v1/v2)
        ApiRequest::ConfigureDaemon(conf) => self.configure_daemon(conf),
        ApiRequest::GetDaemonInfo => self.daemon_info(true),
        ApiRequest::GetEvents => Self::events(),
        ApiRequest::Exit => self.do_exit(),
        ApiRequest::Start => self.do_start(),
        ApiRequest::SendFuseFd => self.send_fuse_fd(),
        ApiRequest::TakeoverFuseFd => self.do_takeover(),
        ApiRequest::Mount(mountpoint, info) => self.do_mount(mountpoint, info),
        ApiRequest::Remount(mountpoint, info) => self.do_remount(mountpoint, info),
        ApiRequest::Umount(mountpoint) => self.do_umount(mountpoint),
        ApiRequest::ExportBackendMetrics(id) => Self::export_backend_metrics(id),
        ApiRequest::ExportBlobcacheMetrics(id) => Self::export_blobcache_metrics(id),

        // Nydus API v1
        ApiRequest::ExportFsGlobalMetrics(id) => Self::export_global_metrics(id),
        ApiRequest::ExportFsFilesMetrics(id, latest_read_files) => {
            Self::export_files_metrics(id, latest_read_files)
        }
        ApiRequest::ExportFsAccessPatterns(id) => Self::export_access_patterns(id),
        ApiRequest::ExportFsBackendInfo(mountpoint) => self.backend_info(&mountpoint),
        ApiRequest::ExportFsInflightMetrics => self.export_inflight_metrics(),

        // Nydus API v2
        ApiRequest::GetDaemonInfoV2 => self.daemon_info(false),
        ApiRequest::GetBlobObject(_param) => todo!(),
        ApiRequest::CreateBlobObject(entry) => self.create_blob_cache_entry(&entry),
        ApiRequest::DeleteBlobObject(param) => self.remove_blob_cache_entry(&param),
    };

    self.respond(resp);

    Ok(())
}

3.1 do_mount
代码语言:javascript
复制
fn do_mount(&self, mountpoint: String, cmd: ApiMountCmd) -> ApiResponse {
    // 获取传入参数的 fs type
    let fs_type = FsBackendType::from_str(&cmd.fs_type)
        .map_err(|e| ApiError::MountFilesystem(DaemonError::from(e).into()))?;
    let fs = self.get_default_fs_service()?;
    fs.mount(FsBackendMountCmd {
        fs_type,    // 文件系统类型
        mountpoint,    // 挂载点
        config: cmd.config,    // config 信息
        source: cmd.source,    // source
        prefetch_files: cmd.prefetch_files,    // 预取文件列表
    })
    .map(|_| ApiResponsePayload::Empty)
    .map_err(|e| ApiError::MountFilesystem(e.into()))
}

首先,解析传入参数中的fs type,然后,获取DAEMON_CONTROLLER对象存储的fs service,得到fs,再调用对应的mount方法:

代码语言:javascript
复制
fn mount(&self, cmd: FsBackendMountCmd) -> DaemonResult<()> {
    if self. (&cmd.mountpoint)?.is_some() {
        return Err(DaemonError::AlreadyExists);
    }
    let backend = fs_backend_factory(&cmd)?;
    let index = self.get_vfs().mount(backend, &cmd.mountpoint)?;
    info!("{} filesystem mounted at {}", &cmd.fs_type, &cmd.mountpoint);
    self.backend_collection().add(&cmd.mountpoint, &cmd)?;

    // Add mounts opaque to UpgradeManager
    if let Some(mutmgr_guard) = self.upgrade_mgr() {
        upgrade::add_mounts_state(&mutmgr_guard, cmd, index)?;
    }

    Ok(())
}

接下来,和daemon.service.mount(cmd)方法直接挂载 backend 文件系统的流程相同。(上一篇文章分析过)

3.2 do_remount
代码语言:javascript
复制
fn do_remount(&self, mountpoint: String, cmd: ApiMountCmd) -> ApiResponse {
    let fs_type = FsBackendType::from_str(&cmd.fs_type)
        .map_err(|e| ApiError::MountFilesystem(DaemonError::from(e).into()))?;
    self.get_default_fs_service()?
        .remount(FsBackendMountCmd {
            fs_type,
            mountpoint,
            config: cmd.config,
            source: cmd.source,
            prefetch_files: cmd.prefetch_files,
        })
        .map(|_| ApiResponsePayload::Empty)
        .map_err(|e| ApiError::MountFilesystem(e.into()))
}

do_remountdo_mount的方法类似,其中,remount方法:

代码语言:javascript
复制
fn remount(&self, cmd: FsBackendMountCmd) -> DaemonResult<()> {
    let rootfs = self
        .backend_from_mountpoint(&cmd.mountpoint)?
        .ok_or(DaemonError::NotFound)?;
    let rafs_config = RafsConfig::from_str(&cmd.config)?;
    let mutbootstrap = <dyn RafsIoRead>::from_file(&&cmd.source)?;
    let any_fs = rootfs.deref().as_any();
    let rafs = any_fs
        .downcast_ref::<Rafs>()
        .ok_or_else(|| DaemonError::FsTypeMismatch("to rafs".to_string()))?;

    rafs.update(&mutbootstrap, rafs_config)
        .map_err(|e| match e {
            RafsError::Unsupported => DaemonError::Unsupported,
            e => DaemonError::Rafs(e),
        })?;

    // To update mounted time and backend configurations.
    self.backend_collection().add(&cmd.mountpoint, &cmd)?;

    // Update mounts opaque from UpgradeManager
    if let Some(mutmgr_guard) = self.upgrade_mgr() {
        upgrade::update_mounts_state(&mutmgr_guard, cmd)?;
    }

    Ok(())
}

rafs.update方法根据传入的bootstrap reader 和 config 更新 device 信息:

代码语言:javascript
复制
/// Update storage backend for blobs.
pub fn update(&self,r: &mut RafsIoReader, conf: RafsConfig) -> RafsResult<()> {
    info!("update");
    if !self.initialized {
        warn!("Rafs is not yet initialized");
        return Err(RafsError::Uninitialized);
    }

    // TODO: seems no need to do self.sb.update()
    // step 1: update sb.
    // No lock is needed thanks to ArcSwap.
    self.sb.update(r).map_err(|e| {
        error!("update failed due to {:?}", e);
        e
    })?;
    info!("update sb is successful");

    let storage_conf = Self::prepare_storage_conf(&conf)?;
    let blob_infos = self.sb.superblock.get_blob_infos();

    // step 2: update device (only localfs is supported)
    self.device
        .update(&storage_conf, &blob_infos, self.fs_prefetch)
        .map_err(RafsError::SwapBackend)?;
    info!("update device is successful");

    Ok(())
}

调用deviceupdate方法,根据传入的配置信息生成新的 backend 对象:

代码语言:javascript
复制
pub fn update(
    &self,
    config: &Arc<FactoryConfig>,
    blob_infos: &[Arc<BlobInfo>],
    fs_prefetch: bool,
) -> io::Result<()> {
    if self.blobs.load().len() != blob_infos.len() {
        return Err(einval!(
            "number of blobs doesn't match when update 'BlobDevice' object"
        ));
    }

    let mutblobs = Vec::with_capacity(blob_infos.len());
    for blob_info in blob_infos.iter() {
        let blob = BLOB_FACTORY.new_blob_cache(config, blob_info, blob_infos.len())?;
blobs.push(blob);
    }
    // 停止旧的 prefetch 操作
    if fs_prefetch {
        // Stop prefetch if it is running before swapping backend since prefetch threads cloned
        // Arc<BlobCache>, the swap operation can't drop inner object completely.
        // Otherwise prefetch threads will be leaked.
        self.stop_prefetch();
    }
    // 替换为新的 blobs
    self.blobs.store(Arc::new(blobs));
    if fs_prefetch {
        // 开始根据新的配置信息进行预取
        self.start_prefetch();
    }

    Ok(())
}

3.3 do_umount
代码语言:javascript
复制
fn do_umount(&self, mountpoint: String) -> ApiResponse {
    self.get_default_fs_service()?
        .umount(FsBackendUmountCmd { mountpoint })
        .map(|_| ApiResponsePayload::Empty)
        .map_err(|e| ApiError::MountFilesystem(e.into()))
}

获取fs并调用umount方法:

代码语言:javascript
复制
fn umount(&self, cmd: FsBackendUmountCmd) -> DaemonResult<()> {
    let _ = self
        .backend_from_mountpoint(&cmd.mountpoint)?
        .ok_or(DaemonError::NotFound)?;

    self.get_vfs().umount(&cmd.mountpoint)?;
    self.backend_collection().del(&cmd.mountpoint);
    if let Some(mutmgr_guard) = self.upgrade_mgr() {
        // Remove mount opaque from UpgradeManager
        upgrade::remove_mounts_state(&mutmgr_guard, cmd)?;
    }

    debug!("try to gc unused blobs");
    BLOB_FACTORY.gc(None);

    Ok(())
}

self.get_vfs().umount通过fuse-backend-rsumount方法卸载挂载到 FUSE 的path

代码语言:javascript
复制
/// Umount a backend file system at path
pub fn umount(&self, path: &str) -> VfsResult<()> {
    // Serialize mount operations. Do not expect poisoned lock here.
    let _guard = self.lock.lock().unwrap();
    // 遍历,得到 path 对应的 inode 号
    let inode = self
        .root
        .path_walk(path)
        .map_err(VfsError::PathWalk)?
        .ok_or_else(|| VfsError::NotFound(path.to_string()))?;
    // 获取存储的 mountpoints 信息
    let mutmountpoints = self.mountpoints.load().deref().deref().clone();
    // 获取 fs index
    let fs_idx =mountpoints
        .get(&inode)
        .map(Arc::clone)
        .map(|x| {
            // Do not remove pseudofs inode. We keep all pseudofs inode so that
            // 1. they can be reused later on
            // 2. during live upgrade, it is easier reconstruct pseudofs inodes since
            //    we do not have to track pseudofs deletions
            //self.root.evict_inode(inode);
            // 移除挂载点信息(没有删除 pseudofs 信息)
mountpoints.remove(&inode);
            self.mountpoints.store(Arc::new(mountpoints));
            x.fs_idx
        })
        .ok_or_else(|| {
            error!("{} is not a mount point.", path);
            VfsError::NotFound(path.to_string())
        })?;

    trace!("fs_idx {}", fs_idx);
    let mutsuperblocks = self.superblocks.load().deref().deref().clone();
    // 移除 superblocks 中的信息,设置为 None
    if let Some(fs) =superblocks[fs_idx as usize].take() {
        fs.destroy();
    }
    self.superblocks.store(Arc::new(superblocks));

    Ok(())
}

此外,还有一系列通过 API 触发的函数:do_startdaemon_infobackend_info等。

API Server Handler 线程启动成功后,将两个线程存储在 ApiServerController 的实例中:

代码语言:javascript
复制
self.waker = Some(waker);
self.http_handler_thread = Some(handler_thread);
self.http_router_thread = Some(router_thread);

通过 top -Hp NYDUSD_PID 可以看到启动的两个线程:

在 main() 函数中,启动 api_controller 已经完成,接下来会注册两个信号处理函数:

代码语言:javascript
复制
// Initialize and run the daemon controller event loop.
nydus_app::signal::register_signal_handler(signal::SIGINT, sig_exit);
nydus_app::signal::register_signal_handler(signal::SIGTERM, sig_exit);

sig_exit 会调用 DAEMNON_CONTROLLER.shutdown() 方法:

代码语言:javascript
复制
extern "C" fn sig_exit(_sig: std::os::raw::c_int) {
    DAEMON_CONTROLLER.shutdown();
}

shutdown() 方法中,会通过 daemonstop() 方法向 state_machine 线程发送 DaemonStateMachineInput::Stop 事件:

代码语言:javascript
复制
fn shutdown(&self) {
    // Marking exiting state.
    self.active.store(false, Ordering::Release);
    // Signal the `run_loop()` working thread to exit.
    let _ = self.waker.wake();

    let daemon = self.daemon.lock().unwrap().take();
    if let Some(d) = daemon {
        /*
        // TODO: fix the behavior
        if cfg!(feature = "virtiofs") {
            // In case of virtiofs, mechanism to unblock recvmsg() from VMM is lacked.
            // Given the fact that we have nothing to clean up, directly exit seems fine.
            process::exit(0);
        }
         */
        if let Err(e) = d.stop() {
            error!("failed to stop daemon: {}", e);
        }
        if let Err(e) = d.wait() {
            error!("failed to wait daemon: {}", e)
        }
    }
}

信号处理函数注册完成后, main() 函数运行 DAEMON_CONTROLLER.run_loop() 方法:

代码语言:javascript
复制
// Run the main event loop
if DAEMON_CONTROLLER.is_active() {
    DAEMON_CONTROLLER.run_loop();
}

run_loop() 方法:

代码语言:javascript
复制
fn run_loop(&self) {
    let mutevents = Events::with_capacity(8);

    loop {
        match self.poller.lock().unwrap().poll(&mutevents, None) {
            Err(e) if e.kind() == ErrorKind::Interrupted => continue,
            Err(e) => error!("failed to receive notification from waker: {}", e),
            Ok(_) => {}
        }

        for event inevents.iter() {
            if event.is_error() {
                error!("Got error on the monitored event.");
                continue;
            }

            if event.is_readable() && event.token() == Token(1) {
                if self.active.load(Ordering::Acquire) {
                    return;
                } else if self.singleton_mode.load(Ordering::Acquire) {
                    self.active.store(false, Ordering::Relaxed);
                    return;
                }
            }
        }
    }
}

首先,通过 self.poller.poll() 方法获取事件。对于每次获取的事件,对其进行遍历,如果事件可读,并且 Token 正确,获取 active 的值:

代码语言:javascript
复制
/// Loads a value from the bool.
///
/// `load` takes an [`Ordering`] argument which describes the memory ordering
/// of this operation. Possible values are [`SeqCst`], [`Acquire`] and [`Relaxed`].
///
/// # Panics
///
/// Panics if `order` is [`Release`] or [`AcqRel`].
///
/// # Examples
///
/// ```
/// use std::sync::atomic::{AtomicBool, Ordering};
///
/// let some_bool = AtomicBool::new(true);
///
/// assert_eq!(some_bool.load(Ordering::Relaxed), true);
/// ```
#[inline]
#[stable(feature = "rust1", since = "1.0.0")]
pub fn load(&self, order: Ordering) -> bool {
    // SAFETY: any data races are prevented by atomic intrinsics and the raw
    // pointer passed in is valid because we got it from a reference.
    unsafe { atomic_load(self.v.get(), order) != 0 }
}

如果为真,直接返回,否则,获取 singleton_mode 的值,如果为真,修改 active 的值为 false。

当 main() 函数结束时,会通过 api_controller.stop() 方法停止 HTTP Server,并通过 DAEMON_CONTROLLER.shutdown()方法停止 DAEMON:

代码语言:javascript
复制
api_controller.stop();
DAEMON_CONTROLLER.shutdown();

api_controller.stop()方法:

代码语言:javascript
复制
/// Stop the HTTP working thread.
pub fnstop(&mutself) {
    // Signal the HTTP router thread to exit, which will then notify the HTTP handler thread.
    if let Some(waker) =self.waker.take() {
        let _ = waker.wake();
    }
    if let Some(t) =self.http_handler_thread.take() {
        if let Err(e) = t.join() {
            error!(
                "Failed to join the HTTP handler thread, execution error. {:?}",
                e
            );
        }
    }
    if let Some(t) =self.http_router_thread.take() {
        if let Err(e) = t.join() {
            error!(
                "Failed to join the HTTP router thread, execution error. {:?}",
                e
            );
        }
    }
    if let Some(apisock) =self.sock.as_ref() {
        std::fs::remove_file(apisock).unwrap_or_default();
    }
}

停止 http_handler_threadhttp_router_thread 线程,删除 apisock 文件。

4. Backend 处理流程

Nydusd 支持不同的 backend(目前已经支持 localfs,OSS 和 Registry),通过 fuse 处理来自 vfs 的请求。BlobReader 特征抽象了对 backend 的操作,主要内容如下:

代码语言:javascript
复制
pub trait BlobReader: Send + Sync {
    /// Get size of the blob file.
    fn blob_size(&self) -> BackendResult<u64>;

    /// Try to read a range of data from the blob file into the provided buffer.
    ///
    /// Try to read data of range [offset, offset + buf.len()) from the blob file, and returns:
    /// - bytes of data read, which may be smaller than buf.len()
    /// - error code if error happens
    fn try_read(&self,buf: &mut [u8], offset: u64) -> BackendResult<usize>;

    /// Read a range of data from the blob file into the provided buffer.
    ///
    /// Read data of range [offset, offset + buf.len()) from the blob file, and returns:
    /// - bytes of data read, which may be smaller than buf.len()
    /// - error code if error happens
    ///
    /// It will try `BlobBackend::retry_limit()` times at most and return the first successfully
    /// read data.
    fn read(&self,buf: &mut [u8], offset: u64) -> BackendResult<usize> {
        let mutretry_count = self.retry_limit();
        let begin_time = self.metrics().begin();

        loop {
            match self.try_read(buf, offset) {
                Ok(size) => {
                    self.metrics().end(&begin_time,buf.len(), false);
                    return Ok(size);
                }
                Err(err) => {
                    ifretry_count > 0 {
                        warn!(
                            "Read from backend failed: {:?}, retry count {}",
                            err,retry_count
                        );
retry_count-= 1;
                    } else {
                        self.metrics().end(&begin_time,buf.len(), true);
                        ERROR_HOLDER
                            .lock()
                            .unwrap()
                            .push(&format!("{:?}", err))
                            .unwrap_or_else(|_| error!("Failed when try to hold error"));
                        return Err(err);
                    }
                }
            }
        }
    }

    /// Read a range of data from the blob file into the provided buffers.
    ///
    /// Read data of range [offset, offset + max_size) from the blob file, and returns:
    /// - bytes of data read, which may be smaller than max_size
    /// - error code if error happens
    ///
    /// It will try `BlobBackend::retry_limit()` times at most and return the first successfully
    /// read data.
    fn readv(
        &self,
        bufs: &[FileVolatileSlice],
        offset: u64,
        max_size: usize,
    ) -> BackendResult<usize> {
        if bufs.len() == 1 && max_size >= bufs[0].len() {
            letbuf = unsafe { std::slice::from_raw_parts_mut(bufs[0].as_ptr(), bufs[0].len()) };
            self.read(buf, offset)
        } else {
            // Use std::alloc to avoid zeroing the allocated buffer.
            let size = bufs.iter().fold(0usize, move |size, s| size + s.len());
            let size = std::cmp::min(size, max_size);
            let mutdata = alloc_buf(size);

            let result = self.read(&mutdata, offset)?;
            copyv(&[&data], bufs, 0, result, 0, 0)
                .map(|r| r.0)
                .map_err(BackendError::CopyData)
        }
    }

    /// Get metrics object.
    fn metrics(&self) -> &BackendMetrics;

    /// Get maximum number of times to retry when encountering IO errors.
    fn retry_limit(&self) -> u8 {
        0
    }
}

针对不同 backend,try_read 实现细节不同,read 方法传入 bufoffset,如果读取失败,会重试 retry_count 次。

4.1 Registry Backend

Registry 为 backend 场景下的 try_read() 方法:

代码语言:javascript
复制
fn try_read(&self,buf: &mut [u8], offset: u64) -> BackendResult<usize> {
    self._try_read(buf, offset, true)
        .map_err(BackendError::Registry)
}

该方法调用了由 RegistryReader 类型实现的 _try_read() 方法:

代码语言:javascript
复制
/// Read data from registry server
///
/// Step:
///
/// Request:  GET /blobs/sha256:<blob_id>
/// Response: status: 307 Temporary Redirect
///           header: location: https://raw-blob-storage-host.com/signature=x
///
/// Request:  GET https://raw-blob-storage-host.com/signature=x
/// Response: status: 200 Ok / 403 Forbidden
/// If responding 403, we need to repeat step one
fn _try_read(
    &self,
    mutbuf: &mut [u8],
    offset: u64,
    allow_retry: bool,
) -> RegistryResult<usize> {
    let url = format!("/blobs/sha256:{}", self.blob_id);
    // 生成 url
    let url = self
        .state
        .url(url.as_str(), &[])
        .map_err(RegistryError::Url)?;
    // 新建空白 headers
    let mutheaders = HeaderMap::new();
    let end_at = offset +buf.len() as u64 - 1;
    let range = format!("bytes={}-{}", offset, end_at);
headers.insert("Range", range.parse().unwrap());

    let mutresp;
    let cached_redirect = self.state.cached_redirect.get(&self.blob_id);

    // 有缓存 cached_redirect 时,调用 Connection 的 call 方法发起请求
    if let Some(cached_redirect) = cached_redirect {
resp = self
            .connection
            .call::<&[u8]>(
                Method::GET,
                cached_redirect.as_str(),
                None,
                None,
                &mutheaders,
                false,
            )
            .map_err(RegistryError::Request)?;

        // 如果 allow_retry 且返回 UNAUTHORIZED 和 FORBIDDEN 状态,进行重试
        // The request has expired or has been denied, need to re-request
        if allow_retry
            && vec![StatusCode::UNAUTHORIZED, StatusCode::FORBIDDEN].contains(&resp.status())
        {
            warn!(
                "The redirected link has expired: {}, will retry read",
                cached_redirect.as_str()
            );
            self.state.cached_redirect.remove(&self.blob_id);
            // Try read again only once
            return self._try_read(buf, offset, false);
        }
    } else {
        // 否则,直接通过 request 方法发起请求
resp =
            self.request::<&[u8]>(Method::GET, url.as_str(), None,headers.clone(), false)?;
        let status =resp.status();

        // Handle redirect request and cache redirect url
        if REDIRECTED_STATUS_CODE.contains(&status) {
            if let Some(location) =resp.headers().get("location") {
                let location = location.to_str().unwrap();
                let mutlocation = Url::parse(location).map_err(RegistryError::Url)?;
                // Note: Some P2P proxy server supports only scheme specified origin blob server,
                // so we need change scheme to `blob_url_scheme` here
                if !self.state.blob_url_scheme.is_empty() {
location
                        .set_scheme(&self.state.blob_url_scheme)
                        .map_err(|_| {
                            RegistryError::Scheme(self.state.blob_url_scheme.clone())
                        })?;
                }
                if !self.state.blob_redirected_host.is_empty() {
location
                        .set_host(Some(self.state.blob_redirected_host.as_str()))
                        .map_err(|e| {
                            error!(
                                "Failed to set blob redirected host to {}: {:?}",
                                self.state.blob_redirected_host.as_str(),
                                e
                            );
                            RegistryError::Url(e)
                        })?;
                    debug!("New redirected location {:?}",location.host_str());
                }
                let resp_ret = self
                    .connection
                    .call::<&[u8]>(
                        Method::GET,
location.as_str(),
                        None,
                        None,
                        &mutheaders,
                        true,
                    )
                    .map_err(RegistryError::Request);
                match resp_ret {
                    Ok(_resp) => {
resp = _resp;
                        self.state
                            .cached_redirect
                            .set(self.blob_id.clone(),location.as_str().to_string())
                    }
                    Err(err) => {
                        return Err(err);
                    }
                }
            };
        } else {
resp = respond(resp, true).map_err(RegistryError::Request)?;
        }
    }

resp.copy_to(&mutbuf)
        .map_err(RegistryError::Transport)
        .map(|size| size as usize)
}

request 方法中向 registry 发起请求读取数据的具体步骤:

代码语言:javascript
复制
/// Request registry server with `authorization` header
///
/// Bearer token authenticate workflow:
///
/// Request:  POST https://my-registry.com/test/repo/blobs/uploads
/// Response: status: 401 Unauthorized
///           header: www-authenticate: Bearer realm="https://auth.my-registry.com/token",service="my-registry.com",scope="repository:test/repo:pull,push"
///
/// Request:  POST https://auth.my-registry.com/token
///           body: "service=my-registry.com&scope=repository:test/repo:pull,push&grant_type=password&username=x&password=x&client_id=nydus-registry-client"
/// Response: status: 200 Ok
///           body: { "token": "<token>" }
///
/// Request:  POST https://my-registry.com/test/repo/blobs/uploads
///           header: authorization: Bearer <token>
/// Response: status: 200 Ok
///
/// Basic authenticate workflow:
///
/// Request:  POST https://my-registry.com/test/repo/blobs/uploads
/// Response: status: 401 Unauthorized
///           header: www-authenticate: Basic
///
/// Request:  POST https://my-registry.com/test/repo/blobs/uploads
///           header: authorization: Basic base64(<username:password>)
/// Response: status: 200 Ok
fn request<R: Read + Send + 'static>(
    &self,
    method: Method,
    url: &str,
    data: Option<ReqBody<R>>,
    mutheaders: HeaderMap,
    catch_status: bool,
) -> RegistryResult<Response> {
    // Try get authorization header from cache for this request
    let mutlast_cached_auth = String::new();
    // 将缓存的 auth 添加到 headers 中
    let cached_auth = self.state.cached_auth.get();
    if !cached_auth.is_empty() {
last_cached_auth = cached_auth.clone();
headers.insert(
            HEADER_AUTHORIZATION,
            HeaderValue::from_str(cached_auth.as_str()).unwrap(),
        );
    }

    // For upload request with payload, the auth header should be cached
    // after create_upload(), so we can request registry server directly
    if let Some(data) = data {
        return self
            .connection
            .call(method, url, None, Some(data), &mutheaders, catch_status)
            .map_err(RegistryError::Request);
    }

    // 调用 call 方法
    // Try to request registry server with `authorization` header
    let resp = self
        .connection
        .call::<&[u8]>(method.clone(), url, None, None, &mutheaders, false)
        .map_err(RegistryError::Request)?;
    // 如果返回状态为 UNAUTHORIZED,则会获取 token 并存储在 state.cached_auth
    if resp.status() == StatusCode::UNAUTHORIZED {
        if let Some(resp_auth_header) = resp.headers().get(HEADER_WWW_AUTHENTICATE) {
            // Get token from registry authorization server
            if let Some(auth) = RegistryState::parse_auth(resp_auth_header, &self.state.auth) {
                let auth_header = self
                    .state
                    .get_auth_header(auth, &self.connection)
                    .map_err(|e| RegistryError::Common(e.to_string()))?;
headers.insert(
                    HEADER_AUTHORIZATION,
                    HeaderValue::from_str(auth_header.as_str()).unwrap(),
                );

                // Try to request registry server with `authorization` header again
                let resp = self
                    .connection
                    .call(method, url, None, data, &mutheaders, catch_status)
                    .map_err(RegistryError::Request)?;

                let status = resp.status();
                if is_success_status(status) {
                    // Cache authorization header for next request
                    self.state.cached_auth.set(&last_cached_auth, auth_header)
                }
                return respond(resp, catch_status).map_err(RegistryError::Request);
            }
        }
    }

    respond(resp, catch_status).map_err(RegistryError::Request)
}

首先,将缓存的 auth 添加到 headers 中,然后调用 call 方法发起请求:

代码语言:javascript
复制
/// Send a request to server and wait for response.
pub fn call<R: Read + Send + 'static>(
    &self,
    method: Method,
    url: &str,
    query: Option<&[(&str, &str)]>,
    data: Option<ReqBody<R>>,
headers: &mut HeaderMap,
    catch_status: bool,
) -> ConnectionResult<Response> {
    if self.shutdown.load(Ordering::Acquire) {
        return Err(ConnectionError::Disconnected);
    }

    // 使用 proxy 发起请求
    if let Some(proxy) = &self.proxy {
        if proxy.health.ok() {
            let data_cloned: Option<ReqBody<R>> = match data.as_ref() {
                Some(ReqBody::Form(form)) => Some(ReqBody::Form(form.clone())),
                Some(ReqBody::Buf(buf)) => Some(ReqBody::Buf(buf.clone())),
                _ => None,
            };

            let http_url: Option<String>;
            let mutreplaced_url = url;

            if proxy.use_http {
                http_url = proxy.try_use_http(url);
                if let Some(ref r) = http_url {
replaced_url = r.as_str();
                }
            }

            let result = self.call_inner(
                &proxy.client,
                method.clone(),
replaced_url,
                &query,
                data_cloned,
headers,
                catch_status,
                true,
            );

            match result {
                Ok(resp) => {
                    if !proxy.fallback || resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
                        return Ok(resp);
                    }
                }
                Err(err) => {
                    if !proxy.fallback {
                        return Err(err);
                    }
                }
            }
            // proxy 失败之后进行回源处理
            // If proxy server responds invalid status code or http connection failed, we need to
            // fallback to origin server, the policy only applicable to non-upload operation
            warn!("Request proxy server failed, fallback to original server");
        } else {
            LAST_FALLBACK_AT.with(|f| {
                let current = SystemTime::now();
                if current.duration_since(*f.borrow()).unwrap().as_secs()
                    >= RATE_LIMITED_LOG_TIME as u64
                {
                    warn!("Proxy server is not healthy, fallback to original server");
                    f.replace(current);
                }
            })
        }
    }
    // 回源处理,直接向源 registry 发起请求
    self.call_inner(
        &self.client,
        method,
        url,
        &query,
        data,
headers,
        catch_status,
        false,
    )
}

call_inner 方法:

代码语言:javascript
复制
#[allow(clippy::too_many_arguments)]
fn call_inner<R: Read + Send + 'static>(
    &self,
    client: &Client,
    method: Method,
    url: &str,
    query: &Option<&[(&str, &str)]>,
    data: Option<ReqBody<R>>,
    headers: &HeaderMap,
    catch_status: bool,
    proxy: bool,
) -> ConnectionResult<Response> {
    // 避免在日志中打印 HEADER_AUTHORIZATION 信息
    // Only clone header when debugging to reduce potential overhead.
    let display_headers = if max_level() >= Level::Debug {
        let mutdisplay_headers = headers.clone();
display_headers.remove(HEADER_AUTHORIZATION);
        Some(display_headers)
    } else {
        None
    };
    let has_data = data.is_some();
    let start = Instant::now();
    // 发起请求,带 headers
    let mutrb = client.request(method.clone(), url).headers(headers.clone());
    if let Some(q) = query.as_ref() {
rb =rb.query(q);
    }

    let ret;
    if let Some(data) = data {
        match data {
            ReqBody::Read(body, total) => {
                let body = Body::sized(body, total as u64);
                ret =rb.body(body).send();
            }
            ReqBody::Buf(buf) => {
                ret =rb.body(buf).send();
            }
            ReqBody::Form(form) => {
                ret =rb.form(&form).send();
            }
        }
    } else {
        // 没有 data 时,body 为 "" 并直接发起请求
        ret =rb.body("").send();
    }

    debug!(
        "{} Request: {} {} headers: {:?}, proxy: {}, data: {}, duration: {}ms",
        std::thread::current().name().unwrap_or_default(),
        method,
        url,
        display_headers,
        proxy,
        has_data,
        Instant::now().duration_since(start).as_millis(),
    );

    match ret {
        Err(err) => Err(ConnectionError::Common(err)),
        Ok(resp) => respond(resp, catch_status),
    }
}

4.2 OSS backend

OSS 作为 backend 时的try_read方法:

代码语言:javascript
复制
fn try_read(&self, mut buf: &mut [u8], offset: u64) -> BackendResult<usize> {
    let query = &[];
    // 生成 URL 信息
    let (resource, url) = self.state.url(&self.blob_id, query);
    let mut headers = HeaderMap::new();
    let end_at = offset + buf.len() as u64 - 1;
    let range = format!("bytes={}-{}", offset, end_at);

    // headers 添加 range 信息
    headers.insert(
        "Range",
        range
            .as_str()
            .parse()
            .map_err(|e| OssError::ConstructHeader(format!("{}", e)))?,
    );
    self.state
        .sign(Method::GET, &mut headers, resource.as_str())
        .map_err(OssError::Auth)?;

    // Safe because the the call() is a synchronous operation.
    let mut resp = self
        .connection
        .call::<&[u8]>(
            Method::GET,
            url.as_str(),
            None,
            None,
            &mut headers,
            true,
            false,
        )
        .map_err(OssError::Request)?;
    Ok(resp
        .copy_to(&mut buf)
        .map_err(OssError::Transport)
        .map(|size| size as usize)?)
}

self.state.sign方法用于生成请求的签名:

代码语言:javascript
复制
/// generate oss request signature
fn sign(
    &self,
    verb: Method,
    headers: &mut HeaderMap,
    canonicalized_resource: &str,
) -> Result<()> {
    let content_md5 = "";
    let content_type = "";
    let mut canonicalized_oss_headers = vec![];
    let date = httpdate::fmt_http_date(SystemTime::now());
    // data 的内容
    let mut data = vec![
        verb.as_str(),
        content_md5,
        content_type,
        date.as_str(),
        // canonicalized_oss_headers,
        canonicalized_resource,
    ];
    // headers 内容
    for (name, value) in headers.iter() {
        let name = name.as_str();
        let value = value.to_str().map_err(|e| einval!(e))?;
        if name.starts_with("x-oss-") {
            let header = format!("{}:{}", name.to_lowercase(), value);
            canonicalized_oss_headers.push(header);
        }
    }
    let canonicalized_oss_headers = canonicalized_oss_headers.join("\n");
    if !canonicalized_oss_headers.is_empty() {
        data.insert(4, canonicalized_oss_headers.as_str());
    }
    let data = data.join("\n");
    let digest = HMAC::mac(data.as_bytes(), self.access_key_secret.as_bytes());
    let signature = base64::encode(&digest);

    let authorization = format!("OSS {}:{}", self.access_key_id, signature);

    headers.insert(HEADER_DATE, date.as_str().parse().map_err(|e| einval!(e))?);
    headers.insert(
        HEADER_AUTHORIZATION,
        authorization.as_str().parse().map_err(|e| einval!(e))?,
    );

    Ok(())
}

然后,通过self.connection.call方法发起请求,和registry发起请求时的方法相同。

4.3 LocalFS Backend

LocalFS 作为 backend 的try_read方法直接调用了uio::pread方法:

代码语言:javascript
复制
fn try_read(&self, buf: &mut [u8], offset: u64) -> BackendResult<usize> {
    debug!(
        "local blob file reading: offset={}, size={} from={}",
        offset,
        buf.len(),
        self.id,
    );

    uio::pread(self.file.as_raw_fd(), buf, offset as i64)
        .map_err(|e| LocalFsError::ReadBlob(e).into())
}

参考资料

[1]

nydus: https://github.com/dragonflyoss/image-service.git

[2]

mio: https://docs.rs/mio/latest/mio/struct.Poll.html

[3]

dbs-uhttp: https://github.com/openanolis/dbs-uhttp

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-11-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 abin在路上 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. DAEMON_CONTROLLER 初始化
  • 2. API Server 线程
  • 3. API Server Handler 线程
    • 3.1 do_mount
      • 3.2 do_remount
        • 3.3 do_umount
        • 4. Backend 处理流程
          • 4.1 Registry Backend
            • 4.2 OSS backend
              • 4.3 LocalFS Backend
              • 参考资料
              相关产品与服务
              对象存储
              对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档