“尝试通过 nydus[1] 源码理解工作流程。这是第二部分,主要包括 API Server 和 Backend 的流程。
接上文 nydusd 源码理解(一),回到process_fs_service
函数,创建daemon
实例完成后,替换DAEMON_CONTROLLER
中daemon
的值为新的daemon
。
DAEMON_CONTROLLER.set_daemon(daemon);
/// Set the daemon service object.
pub fn set_daemon(&self, daemon: Arc<dyn NydusDaemon>) -> Option<Arc<dyn NydusDaemon>> {
self.daemon.lock().unwrap().replace(daemon)
}
DAEMON_CONTROLLER
什么时候被初始化的呢?
DAEMON_CONTROLLER 为全局变量,是 DaemonController 结构体的引用。
“在 Rust 中,应尽可能避免使用全局变量。取而代之,尽早创建对象(比如在 main 函数中),然后将对该对象的可变引用传递到需要它的位置。
lazy_static! {
static ref DAEMON_CONTROLLER: DaemonController = DaemonController::new();
}
“lazy_static! 是给静态变量延迟赋值的宏,所有
static
类型的变量会在第一次被使用时初始化,并且只初始化一次。初始化包括分配需要的堆,如vector
或hash map
,和非常量函数调用。
通过DaemonController::new()
初始化DAEMON_CONTROLLER
实例时会创建对应的 poller 等实例,这些在程序运行期间都是作为全局变量使用。
fn new() -> Self {
let poller = Poll::new().expect("Failed to create `ServiceController` instance");
let waker = Waker::new(poller.registry(), Token(1))
.expect("Failed to create waker for ServiceController");
Self {
active: AtomicBool::new(true),
singleton_mode: AtomicBool::new(true),
daemon: Mutex::new(None),
blob_cache_mgr: Mutex::new(None),
fs_service: Mutex::new(None),
waker: Arc::new(waker),
poller: Mutex::new(poller),
}
}
Poll
是 mio[2] crate 中的结构体。Poll::new()
会新建 Selector
实例,Selector
创建的时候会创建 epoll,返回值存储在 Selector
实例中:
Selector
示例存储在Poll.registry.selector
中:
Waker::new(poller.registry(), Token(1))
会注册 waker,并创建 sys::Waker
实例。通过执行系统调用eventfd
得到的 fd 和传入的Selector
注册 waker:
pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK)).and_then(|fd| {
// Turn the file descriptor into a file first so we're ensured
// it's closed when dropped, e.g. when register below fails.
let file = unsafe { File::from_raw_fd(fd) };
selector
.register(fd, token, Interest::READABLE)
.map(|()| Waker { fd: file })
})
}
其中,fd 是用于进程间通信的 unix socket fd。
此外,DaemonController
初始化时还会初始化以下内容:
(1)active: AtomicBool::new(true),判断DaemonController
是否 ready 的原子 bool 类型
(2)singleton_mode: AtomicBool::new(true),是否启用 singleton 模式的原子 bool 类型
(3)daemon: Mutex::new(None),daemon 的互斥引用(同一时刻只有一个地方可用)
(4)blob_cache_mgr: Mutex::new(None),BlobCacheMgr 的互斥引用
(5)fs_service: Mutex::new(None),FsService 的互斥引用
(6)waker: Arc::new(waker),Waker 的引用
(7)poller: Mutex::new(poller),Poll 的互斥引用
至此,DAEMON_CONTROLLER
初始化完成。
回到 main()
函数中,接下来设置默认的fs service
:
let daemon = DAEMON_CONTROLLER.get_daemon();
if let Some(fs) = daemon.get_default_fs_service() {
DAEMON_CONTROLLER.set_fs_service(fs);
}
实际上就是对应FsService
的拷贝:
fn get_default_fs_service(&self) -> Option<Arc<dyn FsService>> {
Some(self.service.clone())
}
接下来,启动API Server
线程:
// Start the HTTP Administration API server
let mutapi_controller = ApiServerController::new(apisock);
api_controller.start()?;
启动 API Server 的函数:
let mutapi_controller = ApiServerController::new(apisock);
api_controller.start()?;
这里,再次传入 apisock
作为参数,初始化 ApiServerController
实例:
pub fn new(sock: Option<&str>) -> Self {
ApiServerController {
sock: sock.map(|v| v.to_string()),
http_handler_thread: None,
http_router_thread: None,
waker: None,
}
}
接下来,通过api_controller.start()
方法启动 API Server:
/// Try to start the HTTP working thread.
pub fn start(&mutself) -> Result<()> {
ifself.sock.is_none() {
return Ok(());
}
// Safe to unwrap() because self.sock is valid.
let apisock =self.sock.as_ref().unwrap();
let (to_handler, from_router) = channel();
let (to_router, from_handler) = channel();
let api_server = ApiServer::new(to_router)?;
let api_handler = ApiServerHandler::new(api_server, from_router)?;
let (router_thread, waker) = start_http_thread(apisock, None, to_handler, from_handler)?;
let daemon_waker = DAEMON_CONTROLLER.waker.clone();
info!("HTTP API server running at {}", apisock);
let handler_thread = std::thread::Builder::new()
.name("api-server".to_string())
.spawn(move || {
api_handler.handle_requests_from_router();
info!("HTTP api-server handler thread exits");
let _ = daemon_waker.wake();
Ok(())
})
.map_err(|_e| einval!("Failed to start work thread for HTTP handler"))?;
self.waker = Some(waker);
self.http_handler_thread = Some(handler_thread);
self.http_router_thread = Some(router_thread);
Ok(())
}
首先检查 sock 参数是否正确,然后,创建两个用于进程间通信的 channel,创建 ApiServer
和 ApiServerHandler
实例:
// Safe to unwrap() because self.sock is valid.
let apisock =self.sock.as_ref().unwrap();
let (to_handler, from_router) = channel();
let (to_router, from_handler) = channel();
let api_server = ApiServer::new(to_router)?;
let api_handler = ApiServerHandler::new(api_server, from_router)?;
这里的两个 channel 用于 ApiServer
和 ApiServerHandler
之间进行全双工通信。
接下来,启动 HTTP Server 线程。start_http_thread()
方法传入的参数包括 apisock
和两个 channel 的 to_handler
(用于向ApiServerHandler
发送消息)和 from_handler
(用于接收来自 ApiServerHandler
的消息):
/// Start a HTTP server to serve API requests.
///
/// Start a HTTP server parsing http requests and send to nydus API server a concrete
/// request to operate nydus or fetch working status.
/// The HTTP server sends request by `to_api` channel and wait for response from `from_api` channel.
pub fn start_http_thread(
path: &str,
api_notifier: Option<Arc<Waker>>,
to_api: Sender<Option<ApiRequest>>,
from_api: Receiver<ApiResponse>,
) -> Result<(thread::JoinHandle<Result<()>>, Arc<Waker>)> {}
首先,删除可能残留的 apisock 文件,创建新的 Poll 实例和 Waker 实例,Poll 实例会创建 epoll,Waker 会注册 waker 并返回 Waker 实例。
然后,创建 HttpServer 实例:
let mutserver = HttpServer::new(socket_path).map_err(|e| {
if let ServerError::IOError(e) = e {
e
} else {
Error::new(ErrorKind::Other, format!("{:?}", e))
}
})?;
HttpServer(dbs-uhttp[3] 中的结构体)在创建实例时会绑定 socket,将 socket 设置为 nonblocking 模式,并返回 HttpServer 实例:
pub fn new<P: AsRef<Path>>(path_to_socket: P) -> Result<Self> {
let socket = UnixListener::bind(path_to_socket).map_err(ServerError::IOError)?;
Self::new_from_socket(socket)
}
然后,注册 poll
,开始监听 socket:
poll.registry().register(
&mut SourceFd(&server.epoll().as_raw_fd()),
REQUEST_TOKEN,
Interest::READABLE,
)?;
接下来,启动名称为 nydus-http-server
的线程,处理来自 unix socket 的请求。
在线程中,首先通过server.start_server()
方法将对应 unix socket
添加到 poll,start_server()
是 HttpServer 结构体的方法:
/// Starts the HTTP Server.
pub fnstart_server(&mutself) -> Result<()> {
// Add the socket on which we listen for new connections to the
// `epoll` structure.
Self::epoll_add(
&self.poll,
Token(self.socket.as_raw_fd() as usize),
self.socket.as_raw_fd(),
)
}
设置 event
最多能处理的事件数量 capacity:
let mutevents = Events::with_capacity(100);
在loop
循环中处理从 poll
接收到的 event
:
loop {
// 如果接收 event 时出错,会退出线程
matchpoll.poll(&mutevents, None) {
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => {
error!("http server poll events failed, {}", e);
exit_api_server(api_notifier, &to_api);
return Err(e);
}
Ok(_) => {}
}
// 遍历每个 event
for event in &events {
// 根据 event.token() 进行 match
match event.token() {
// 结束线程
EXIT_TOKEN =>do_exit = true,
// 如果是REQUEST_TOKEN,会进一步根据server.requests()进行 match
REQUEST_TOKEN => matchserver.requests() {
// 没有出错时对 request 进行处理
Ok(request_vec) => {
// 遍历 server_request
for server_request in request_vec {
let reply = server_request.process(|request| {
// 处理 http 请求
handle_http_request(
request,
api_notifier.clone(),
&to_api,
&from_api,
)
});
// Ignore error when sending response
server.respond(reply).unwrap_or_else(|e| {
error!("HTTP server error on response: {}", e)
});
}
}
Err(e) => {
error!("HTTP server error on retrieving incoming request: {}", e);
}
},
_ => unreachable!("unknown poll token."),
}
}
ifdo_exit {
exit_api_server(api_notifier, &to_api);
break;
}
}
server_request.process()
会通过处理函数 handle_http_request
对 request
进行处理,返回 ServerResponse
实例:
/// Calls the function provided on the inner request to obtain the response.
/// The response is then wrapped in a `ServerResponse`.
///
/// Returns a `ServerResponse` ready for yielding to the server
pub fn process<F>(&self, mutcallable: F) -> ServerResponse
where
F: FnMut(&Request) -> Response,
{
let http_response =callable(self.inner());
ServerResponse::new(http_response, self.id)
}
处理 http 请求的函数handle_http_request
:
fn handle_http_request(
request: &Request, // http 请求包
api_notifier: Option<Arc<Waker>>,
to_api: &Sender<Option<ApiRequest>>, // 发送到 api 的 channel
from_api: &Receiver<ApiResponse>, // 从 api 给其他进程发送消息的 channel
) -> Response {
let begin_time = SystemTime::now();
// 跟踪请求头,打印日志
trace_api_begin(request);
// Micro http should ensure that req path is legal.
// 解析 url 链接
let uri_parsed = request.uri().get_abs_path().parse::<Uri>();
// 生成response
let mutresponse = match uri_parsed {
Ok(uri) => match HTTP_ROUTES.routes.get(uri.path()) {
// route 不为 None 时进行处理
Some(route) => route
.handle_request(request, &|r| {
// 请求的实际处理函数
kick_api_server(api_notifier.clone(), to_api, from_api, r)
})
.unwrap_or_else(|err| error_response(err, StatusCode::BadRequest)),
None => error_response(HttpError::NoRoute, StatusCode::NotFound),
},
Err(e) => {
error!("Failed parse URI, {}", e);
error_response(HttpError::BadRequest, StatusCode::BadRequest)
}
};
response.set_server("Nydus API");
response.set_content_type(MediaType::ApplicationJson);
trace_api_end(&response, request.method(), begin_time);
response
}
其中,HTTP_ROUTES.routes.get(uri.path())
会根据请求的 URL 找到对应的事件处理对象,HTTP_ROUTE
记录了支持的 API 列表:
lazy_static! {
/// HTTP_ROUTES contain all the nydusd HTTP routes.
pub static ref HTTP_ROUTES: HttpRoutes = {
let mutr = HttpRoutes {
routes: HashMap::new(),
};
// Common
r.routes.insert(endpoint_v1!("/daemon/events"), Box::new(EventsHandler{}));
r.routes.insert(endpoint_v1!("/daemon/exit"), Box::new(ExitHandler{}));
r.routes.insert(endpoint_v1!("/daemon/start"), Box::new(StartHandler{}));
r.routes.insert(endpoint_v1!("/daemon/fuse/sendfd"), Box::new(SendFuseFdHandler{}));
r.routes.insert(endpoint_v1!("/daemon/fuse/takeover"), Box::new(TakeoverFuseFdHandler{}));
r.routes.insert(endpoint_v1!("/mount"), Box::new(MountHandler{}));
r.routes.insert(endpoint_v1!("/metrics/backend"), Box::new(MetricsBackendHandler{}));
r.routes.insert(endpoint_v1!("/metrics/blobcache"), Box::new(MetricsBlobcacheHandler{}));
// Nydus API, v1
r.routes.insert(endpoint_v1!("/daemon"), Box::new(InfoHandler{}));
r.routes.insert(endpoint_v1!("/daemon/backend"), Box::new(FsBackendInfo{}));
r.routes.insert(endpoint_v1!("/metrics"), Box::new(MetricsFsGlobalHandler{}));
r.routes.insert(endpoint_v1!("/metrics/files"), Box::new(MetricsFsFilesHandler{}));
r.routes.insert(endpoint_v1!("/metrics/inflight"), Box::new(MetricsFsInflightHandler{}));
r.routes.insert(endpoint_v1!("/metrics/pattern"), Box::new(MetricsFsAccessPatternHandler{}));
// Nydus API, v2
r.routes.insert(endpoint_v2!("/daemon"), Box::new(InfoV2Handler{}));
r.routes.insert(endpoint_v2!("/blobs"), Box::new(BlobObjectListHandlerV2{}));
r
};
}
Request
是标准的 HTTP 请求包:
不同事件处理对象的handle_request
方法不同,kick_api_server
作为闭包传入handle_request
方法,作用是通过 channel
发送请求消息,同时通过from_api.recv()
接收请求的处理结果:
fn kick_api_server(
api_notifier: Option<Arc<Waker>>,
to_api: &Sender<Option<ApiRequest>>,
from_api: &Receiver<ApiResponse>,
request: ApiRequest, // requesty 内容
) -> ApiResponse {
// 将 request 通过 channel 发送
to_api.send(Some(request)).map_err(ApiError::RequestSend)?;
if let Some(waker) = api_notifier {
waker.wake().map_err(ApiError::Wakeup)?;
}
// 接收 request 的处理结果
from_api.recv().map_err(ApiError::ResponseRecv)?
}
API 对应的处理函数:
以 Mount 的handle_request
为例:
pub struct MountHandler {}
impl EndpointHandler for MountHandler {
fn handle_request(
&self,
req: &Request,
kicker: &dyn Fn(ApiRequest) -> ApiResponse,
) -> HttpResult {
let mountpoint = extract_query_part(req, "mountpoint").ok_or_else(|| {
HttpError::QueryString("'mountpoint' should be specified in query string".to_string())
})?;
match (req.method(), req.body.as_ref()) {
(Method::Post, Some(body)) => {
// 从请求的 body 中解析 command 内容
let cmd = parse_body(body)?;
// 通过 channel 发起 Mount 消息,包含 mountpoint 和 cmd
let r = kicker(ApiRequest::Mount(mountpoint, cmd));
Ok(convert_to_response(r, HttpError::Mount))
}
(Method::Put, Some(body)) => {
let cmd = parse_body(body)?;
let r = kicker(ApiRequest::Remount(mountpoint, cmd));
Ok(convert_to_response(r, HttpError::Mount))
}
(Method::Delete, None) => {
let r = kicker(ApiRequest::Umount(mountpoint));
Ok(convert_to_response(r, HttpError::Mount))
}
_ => Err(HttpError::BadRequest),
}
}
}
处理 Request 的日志:
至此,API Server Handler 启动完成。
那么,什么地方接收kicker
发起的请求呢?
回到start
方法,接下来,会启动名为 api-server
的线程,该线程用于实际处理请求:
let handler_thread = std::thread::Builder::new()
.name("api-server".to_string())
.spawn(move || {
api_handler.handle_requests_from_router();
info!("HTTP api-server handler thread exits");
let _ = daemon_waker.wake();
Ok(())
})
.map_err(|_e| einval!("Failed to start work thread for HTTP handler"))?;
handle_requests_from_router()
方法内是一个loop
函数,用于接收来自 nydus-http-server
的请求:
fn handle_requests_from_router(&self) {
loop {
match self.api_receiver.recv() {
Ok(request) => {
if let Some(req) = request {
self.server.process_request(req).unwrap_or_else(|e| {
error!("HTTP handler failed to process request, {}", e)
});
} else {
debug!("Received exit notification from the HTTP router");
return;
}
}
Err(_e) => {
error!("Failed to receive request from the HTTP router");
return;
}
}
}
}
self.server.process_request()
方法根据请求类型选择相应的处理函数:
fn process_request(&self, request: ApiRequest) -> Result<()> {
let resp = match request {
// Common (v1/v2)
ApiRequest::ConfigureDaemon(conf) => self.configure_daemon(conf),
ApiRequest::GetDaemonInfo => self.daemon_info(true),
ApiRequest::GetEvents => Self::events(),
ApiRequest::Exit => self.do_exit(),
ApiRequest::Start => self.do_start(),
ApiRequest::SendFuseFd => self.send_fuse_fd(),
ApiRequest::TakeoverFuseFd => self.do_takeover(),
ApiRequest::Mount(mountpoint, info) => self.do_mount(mountpoint, info),
ApiRequest::Remount(mountpoint, info) => self.do_remount(mountpoint, info),
ApiRequest::Umount(mountpoint) => self.do_umount(mountpoint),
ApiRequest::ExportBackendMetrics(id) => Self::export_backend_metrics(id),
ApiRequest::ExportBlobcacheMetrics(id) => Self::export_blobcache_metrics(id),
// Nydus API v1
ApiRequest::ExportFsGlobalMetrics(id) => Self::export_global_metrics(id),
ApiRequest::ExportFsFilesMetrics(id, latest_read_files) => {
Self::export_files_metrics(id, latest_read_files)
}
ApiRequest::ExportFsAccessPatterns(id) => Self::export_access_patterns(id),
ApiRequest::ExportFsBackendInfo(mountpoint) => self.backend_info(&mountpoint),
ApiRequest::ExportFsInflightMetrics => self.export_inflight_metrics(),
// Nydus API v2
ApiRequest::GetDaemonInfoV2 => self.daemon_info(false),
ApiRequest::GetBlobObject(_param) => todo!(),
ApiRequest::CreateBlobObject(entry) => self.create_blob_cache_entry(&entry),
ApiRequest::DeleteBlobObject(param) => self.remove_blob_cache_entry(¶m),
};
self.respond(resp);
Ok(())
}
fn do_mount(&self, mountpoint: String, cmd: ApiMountCmd) -> ApiResponse {
// 获取传入参数的 fs type
let fs_type = FsBackendType::from_str(&cmd.fs_type)
.map_err(|e| ApiError::MountFilesystem(DaemonError::from(e).into()))?;
let fs = self.get_default_fs_service()?;
fs.mount(FsBackendMountCmd {
fs_type, // 文件系统类型
mountpoint, // 挂载点
config: cmd.config, // config 信息
source: cmd.source, // source
prefetch_files: cmd.prefetch_files, // 预取文件列表
})
.map(|_| ApiResponsePayload::Empty)
.map_err(|e| ApiError::MountFilesystem(e.into()))
}
首先,解析传入参数中的fs type
,然后,获取DAEMON_CONTROLLER
对象存储的fs service
,得到fs
,再调用对应的mount
方法:
fn mount(&self, cmd: FsBackendMountCmd) -> DaemonResult<()> {
if self. (&cmd.mountpoint)?.is_some() {
return Err(DaemonError::AlreadyExists);
}
let backend = fs_backend_factory(&cmd)?;
let index = self.get_vfs().mount(backend, &cmd.mountpoint)?;
info!("{} filesystem mounted at {}", &cmd.fs_type, &cmd.mountpoint);
self.backend_collection().add(&cmd.mountpoint, &cmd)?;
// Add mounts opaque to UpgradeManager
if let Some(mutmgr_guard) = self.upgrade_mgr() {
upgrade::add_mounts_state(&mutmgr_guard, cmd, index)?;
}
Ok(())
}
接下来,和daemon.service.mount(cmd)
方法直接挂载 backend 文件系统的流程相同。(上一篇文章分析过)
fn do_remount(&self, mountpoint: String, cmd: ApiMountCmd) -> ApiResponse {
let fs_type = FsBackendType::from_str(&cmd.fs_type)
.map_err(|e| ApiError::MountFilesystem(DaemonError::from(e).into()))?;
self.get_default_fs_service()?
.remount(FsBackendMountCmd {
fs_type,
mountpoint,
config: cmd.config,
source: cmd.source,
prefetch_files: cmd.prefetch_files,
})
.map(|_| ApiResponsePayload::Empty)
.map_err(|e| ApiError::MountFilesystem(e.into()))
}
do_remount
和do_mount
的方法类似,其中,remount
方法:
fn remount(&self, cmd: FsBackendMountCmd) -> DaemonResult<()> {
let rootfs = self
.backend_from_mountpoint(&cmd.mountpoint)?
.ok_or(DaemonError::NotFound)?;
let rafs_config = RafsConfig::from_str(&cmd.config)?;
let mutbootstrap = <dyn RafsIoRead>::from_file(&&cmd.source)?;
let any_fs = rootfs.deref().as_any();
let rafs = any_fs
.downcast_ref::<Rafs>()
.ok_or_else(|| DaemonError::FsTypeMismatch("to rafs".to_string()))?;
rafs.update(&mutbootstrap, rafs_config)
.map_err(|e| match e {
RafsError::Unsupported => DaemonError::Unsupported,
e => DaemonError::Rafs(e),
})?;
// To update mounted time and backend configurations.
self.backend_collection().add(&cmd.mountpoint, &cmd)?;
// Update mounts opaque from UpgradeManager
if let Some(mutmgr_guard) = self.upgrade_mgr() {
upgrade::update_mounts_state(&mutmgr_guard, cmd)?;
}
Ok(())
}
rafs.update
方法根据传入的bootstrap
reader 和 config 更新 device 信息:
/// Update storage backend for blobs.
pub fn update(&self,r: &mut RafsIoReader, conf: RafsConfig) -> RafsResult<()> {
info!("update");
if !self.initialized {
warn!("Rafs is not yet initialized");
return Err(RafsError::Uninitialized);
}
// TODO: seems no need to do self.sb.update()
// step 1: update sb.
// No lock is needed thanks to ArcSwap.
self.sb.update(r).map_err(|e| {
error!("update failed due to {:?}", e);
e
})?;
info!("update sb is successful");
let storage_conf = Self::prepare_storage_conf(&conf)?;
let blob_infos = self.sb.superblock.get_blob_infos();
// step 2: update device (only localfs is supported)
self.device
.update(&storage_conf, &blob_infos, self.fs_prefetch)
.map_err(RafsError::SwapBackend)?;
info!("update device is successful");
Ok(())
}
调用device
的update
方法,根据传入的配置信息生成新的 backend 对象:
pub fn update(
&self,
config: &Arc<FactoryConfig>,
blob_infos: &[Arc<BlobInfo>],
fs_prefetch: bool,
) -> io::Result<()> {
if self.blobs.load().len() != blob_infos.len() {
return Err(einval!(
"number of blobs doesn't match when update 'BlobDevice' object"
));
}
let mutblobs = Vec::with_capacity(blob_infos.len());
for blob_info in blob_infos.iter() {
let blob = BLOB_FACTORY.new_blob_cache(config, blob_info, blob_infos.len())?;
blobs.push(blob);
}
// 停止旧的 prefetch 操作
if fs_prefetch {
// Stop prefetch if it is running before swapping backend since prefetch threads cloned
// Arc<BlobCache>, the swap operation can't drop inner object completely.
// Otherwise prefetch threads will be leaked.
self.stop_prefetch();
}
// 替换为新的 blobs
self.blobs.store(Arc::new(blobs));
if fs_prefetch {
// 开始根据新的配置信息进行预取
self.start_prefetch();
}
Ok(())
}
fn do_umount(&self, mountpoint: String) -> ApiResponse {
self.get_default_fs_service()?
.umount(FsBackendUmountCmd { mountpoint })
.map(|_| ApiResponsePayload::Empty)
.map_err(|e| ApiError::MountFilesystem(e.into()))
}
获取fs
并调用umount
方法:
fn umount(&self, cmd: FsBackendUmountCmd) -> DaemonResult<()> {
let _ = self
.backend_from_mountpoint(&cmd.mountpoint)?
.ok_or(DaemonError::NotFound)?;
self.get_vfs().umount(&cmd.mountpoint)?;
self.backend_collection().del(&cmd.mountpoint);
if let Some(mutmgr_guard) = self.upgrade_mgr() {
// Remove mount opaque from UpgradeManager
upgrade::remove_mounts_state(&mutmgr_guard, cmd)?;
}
debug!("try to gc unused blobs");
BLOB_FACTORY.gc(None);
Ok(())
}
self.get_vfs().umount
通过fuse-backend-rs
的umount
方法卸载挂载到 FUSE 的path
:
/// Umount a backend file system at path
pub fn umount(&self, path: &str) -> VfsResult<()> {
// Serialize mount operations. Do not expect poisoned lock here.
let _guard = self.lock.lock().unwrap();
// 遍历,得到 path 对应的 inode 号
let inode = self
.root
.path_walk(path)
.map_err(VfsError::PathWalk)?
.ok_or_else(|| VfsError::NotFound(path.to_string()))?;
// 获取存储的 mountpoints 信息
let mutmountpoints = self.mountpoints.load().deref().deref().clone();
// 获取 fs index
let fs_idx =mountpoints
.get(&inode)
.map(Arc::clone)
.map(|x| {
// Do not remove pseudofs inode. We keep all pseudofs inode so that
// 1. they can be reused later on
// 2. during live upgrade, it is easier reconstruct pseudofs inodes since
// we do not have to track pseudofs deletions
//self.root.evict_inode(inode);
// 移除挂载点信息(没有删除 pseudofs 信息)
mountpoints.remove(&inode);
self.mountpoints.store(Arc::new(mountpoints));
x.fs_idx
})
.ok_or_else(|| {
error!("{} is not a mount point.", path);
VfsError::NotFound(path.to_string())
})?;
trace!("fs_idx {}", fs_idx);
let mutsuperblocks = self.superblocks.load().deref().deref().clone();
// 移除 superblocks 中的信息,设置为 None
if let Some(fs) =superblocks[fs_idx as usize].take() {
fs.destroy();
}
self.superblocks.store(Arc::new(superblocks));
Ok(())
}
此外,还有一系列通过 API 触发的函数:do_start
,daemon_info
,backend_info
等。
API Server Handler 线程启动成功后,将两个线程存储在 ApiServerController
的实例中:
self.waker = Some(waker);
self.http_handler_thread = Some(handler_thread);
self.http_router_thread = Some(router_thread);
通过 top -Hp NYDUSD_PID
可以看到启动的两个线程:
在 main() 函数中,启动 api_controller
已经完成,接下来会注册两个信号处理函数:
// Initialize and run the daemon controller event loop.
nydus_app::signal::register_signal_handler(signal::SIGINT, sig_exit);
nydus_app::signal::register_signal_handler(signal::SIGTERM, sig_exit);
sig_exit
会调用 DAEMNON_CONTROLLER.shutdown()
方法:
extern "C" fn sig_exit(_sig: std::os::raw::c_int) {
DAEMON_CONTROLLER.shutdown();
}
在 shutdown()
方法中,会通过 daemon
的 stop()
方法向 state_machine
线程发送 DaemonStateMachineInput::Stop
事件:
fn shutdown(&self) {
// Marking exiting state.
self.active.store(false, Ordering::Release);
// Signal the `run_loop()` working thread to exit.
let _ = self.waker.wake();
let daemon = self.daemon.lock().unwrap().take();
if let Some(d) = daemon {
/*
// TODO: fix the behavior
if cfg!(feature = "virtiofs") {
// In case of virtiofs, mechanism to unblock recvmsg() from VMM is lacked.
// Given the fact that we have nothing to clean up, directly exit seems fine.
process::exit(0);
}
*/
if let Err(e) = d.stop() {
error!("failed to stop daemon: {}", e);
}
if let Err(e) = d.wait() {
error!("failed to wait daemon: {}", e)
}
}
}
信号处理函数注册完成后, main()
函数运行 DAEMON_CONTROLLER.run_loop()
方法:
// Run the main event loop
if DAEMON_CONTROLLER.is_active() {
DAEMON_CONTROLLER.run_loop();
}
run_loop()
方法:
fn run_loop(&self) {
let mutevents = Events::with_capacity(8);
loop {
match self.poller.lock().unwrap().poll(&mutevents, None) {
Err(e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => error!("failed to receive notification from waker: {}", e),
Ok(_) => {}
}
for event inevents.iter() {
if event.is_error() {
error!("Got error on the monitored event.");
continue;
}
if event.is_readable() && event.token() == Token(1) {
if self.active.load(Ordering::Acquire) {
return;
} else if self.singleton_mode.load(Ordering::Acquire) {
self.active.store(false, Ordering::Relaxed);
return;
}
}
}
}
}
首先,通过 self.poller.poll()
方法获取事件。对于每次获取的事件,对其进行遍历,如果事件可读,并且 Token 正确,获取 active 的值:
/// Loads a value from the bool.
///
/// `load` takes an [`Ordering`] argument which describes the memory ordering
/// of this operation. Possible values are [`SeqCst`], [`Acquire`] and [`Relaxed`].
///
/// # Panics
///
/// Panics if `order` is [`Release`] or [`AcqRel`].
///
/// # Examples
///
/// ```
/// use std::sync::atomic::{AtomicBool, Ordering};
///
/// let some_bool = AtomicBool::new(true);
///
/// assert_eq!(some_bool.load(Ordering::Relaxed), true);
/// ```
#[inline]
#[stable(feature = "rust1", since = "1.0.0")]
pub fn load(&self, order: Ordering) -> bool {
// SAFETY: any data races are prevented by atomic intrinsics and the raw
// pointer passed in is valid because we got it from a reference.
unsafe { atomic_load(self.v.get(), order) != 0 }
}
如果为真,直接返回,否则,获取 singleton_mode
的值,如果为真,修改 active 的值为 false。
当 main() 函数结束时,会通过 api_controller.stop()
方法停止 HTTP Server,并通过 DAEMON_CONTROLLER.shutdown()
方法停止 DAEMON:
api_controller.stop();
DAEMON_CONTROLLER.shutdown();
api_controller.stop()
方法:
/// Stop the HTTP working thread.
pub fnstop(&mutself) {
// Signal the HTTP router thread to exit, which will then notify the HTTP handler thread.
if let Some(waker) =self.waker.take() {
let _ = waker.wake();
}
if let Some(t) =self.http_handler_thread.take() {
if let Err(e) = t.join() {
error!(
"Failed to join the HTTP handler thread, execution error. {:?}",
e
);
}
}
if let Some(t) =self.http_router_thread.take() {
if let Err(e) = t.join() {
error!(
"Failed to join the HTTP router thread, execution error. {:?}",
e
);
}
}
if let Some(apisock) =self.sock.as_ref() {
std::fs::remove_file(apisock).unwrap_or_default();
}
}
停止 http_handler_thread
和 http_router_thread
线程,删除 apisock 文件。
Nydusd 支持不同的 backend(目前已经支持 localfs,OSS 和 Registry),通过 fuse 处理来自 vfs 的请求。BlobReader 特征抽象了对 backend 的操作,主要内容如下:
pub trait BlobReader: Send + Sync {
/// Get size of the blob file.
fn blob_size(&self) -> BackendResult<u64>;
/// Try to read a range of data from the blob file into the provided buffer.
///
/// Try to read data of range [offset, offset + buf.len()) from the blob file, and returns:
/// - bytes of data read, which may be smaller than buf.len()
/// - error code if error happens
fn try_read(&self,buf: &mut [u8], offset: u64) -> BackendResult<usize>;
/// Read a range of data from the blob file into the provided buffer.
///
/// Read data of range [offset, offset + buf.len()) from the blob file, and returns:
/// - bytes of data read, which may be smaller than buf.len()
/// - error code if error happens
///
/// It will try `BlobBackend::retry_limit()` times at most and return the first successfully
/// read data.
fn read(&self,buf: &mut [u8], offset: u64) -> BackendResult<usize> {
let mutretry_count = self.retry_limit();
let begin_time = self.metrics().begin();
loop {
match self.try_read(buf, offset) {
Ok(size) => {
self.metrics().end(&begin_time,buf.len(), false);
return Ok(size);
}
Err(err) => {
ifretry_count > 0 {
warn!(
"Read from backend failed: {:?}, retry count {}",
err,retry_count
);
retry_count-= 1;
} else {
self.metrics().end(&begin_time,buf.len(), true);
ERROR_HOLDER
.lock()
.unwrap()
.push(&format!("{:?}", err))
.unwrap_or_else(|_| error!("Failed when try to hold error"));
return Err(err);
}
}
}
}
}
/// Read a range of data from the blob file into the provided buffers.
///
/// Read data of range [offset, offset + max_size) from the blob file, and returns:
/// - bytes of data read, which may be smaller than max_size
/// - error code if error happens
///
/// It will try `BlobBackend::retry_limit()` times at most and return the first successfully
/// read data.
fn readv(
&self,
bufs: &[FileVolatileSlice],
offset: u64,
max_size: usize,
) -> BackendResult<usize> {
if bufs.len() == 1 && max_size >= bufs[0].len() {
letbuf = unsafe { std::slice::from_raw_parts_mut(bufs[0].as_ptr(), bufs[0].len()) };
self.read(buf, offset)
} else {
// Use std::alloc to avoid zeroing the allocated buffer.
let size = bufs.iter().fold(0usize, move |size, s| size + s.len());
let size = std::cmp::min(size, max_size);
let mutdata = alloc_buf(size);
let result = self.read(&mutdata, offset)?;
copyv(&[&data], bufs, 0, result, 0, 0)
.map(|r| r.0)
.map_err(BackendError::CopyData)
}
}
/// Get metrics object.
fn metrics(&self) -> &BackendMetrics;
/// Get maximum number of times to retry when encountering IO errors.
fn retry_limit(&self) -> u8 {
0
}
}
针对不同 backend,try_read
实现细节不同,read
方法传入 buf
和 offset
,如果读取失败,会重试 retry_count
次。
Registry 为 backend 场景下的 try_read()
方法:
fn try_read(&self,buf: &mut [u8], offset: u64) -> BackendResult<usize> {
self._try_read(buf, offset, true)
.map_err(BackendError::Registry)
}
该方法调用了由 RegistryReader
类型实现的 _try_read()
方法:
/// Read data from registry server
///
/// Step:
///
/// Request: GET /blobs/sha256:<blob_id>
/// Response: status: 307 Temporary Redirect
/// header: location: https://raw-blob-storage-host.com/signature=x
///
/// Request: GET https://raw-blob-storage-host.com/signature=x
/// Response: status: 200 Ok / 403 Forbidden
/// If responding 403, we need to repeat step one
fn _try_read(
&self,
mutbuf: &mut [u8],
offset: u64,
allow_retry: bool,
) -> RegistryResult<usize> {
let url = format!("/blobs/sha256:{}", self.blob_id);
// 生成 url
let url = self
.state
.url(url.as_str(), &[])
.map_err(RegistryError::Url)?;
// 新建空白 headers
let mutheaders = HeaderMap::new();
let end_at = offset +buf.len() as u64 - 1;
let range = format!("bytes={}-{}", offset, end_at);
headers.insert("Range", range.parse().unwrap());
let mutresp;
let cached_redirect = self.state.cached_redirect.get(&self.blob_id);
// 有缓存 cached_redirect 时,调用 Connection 的 call 方法发起请求
if let Some(cached_redirect) = cached_redirect {
resp = self
.connection
.call::<&[u8]>(
Method::GET,
cached_redirect.as_str(),
None,
None,
&mutheaders,
false,
)
.map_err(RegistryError::Request)?;
// 如果 allow_retry 且返回 UNAUTHORIZED 和 FORBIDDEN 状态,进行重试
// The request has expired or has been denied, need to re-request
if allow_retry
&& vec![StatusCode::UNAUTHORIZED, StatusCode::FORBIDDEN].contains(&resp.status())
{
warn!(
"The redirected link has expired: {}, will retry read",
cached_redirect.as_str()
);
self.state.cached_redirect.remove(&self.blob_id);
// Try read again only once
return self._try_read(buf, offset, false);
}
} else {
// 否则,直接通过 request 方法发起请求
resp =
self.request::<&[u8]>(Method::GET, url.as_str(), None,headers.clone(), false)?;
let status =resp.status();
// Handle redirect request and cache redirect url
if REDIRECTED_STATUS_CODE.contains(&status) {
if let Some(location) =resp.headers().get("location") {
let location = location.to_str().unwrap();
let mutlocation = Url::parse(location).map_err(RegistryError::Url)?;
// Note: Some P2P proxy server supports only scheme specified origin blob server,
// so we need change scheme to `blob_url_scheme` here
if !self.state.blob_url_scheme.is_empty() {
location
.set_scheme(&self.state.blob_url_scheme)
.map_err(|_| {
RegistryError::Scheme(self.state.blob_url_scheme.clone())
})?;
}
if !self.state.blob_redirected_host.is_empty() {
location
.set_host(Some(self.state.blob_redirected_host.as_str()))
.map_err(|e| {
error!(
"Failed to set blob redirected host to {}: {:?}",
self.state.blob_redirected_host.as_str(),
e
);
RegistryError::Url(e)
})?;
debug!("New redirected location {:?}",location.host_str());
}
let resp_ret = self
.connection
.call::<&[u8]>(
Method::GET,
location.as_str(),
None,
None,
&mutheaders,
true,
)
.map_err(RegistryError::Request);
match resp_ret {
Ok(_resp) => {
resp = _resp;
self.state
.cached_redirect
.set(self.blob_id.clone(),location.as_str().to_string())
}
Err(err) => {
return Err(err);
}
}
};
} else {
resp = respond(resp, true).map_err(RegistryError::Request)?;
}
}
resp.copy_to(&mutbuf)
.map_err(RegistryError::Transport)
.map(|size| size as usize)
}
request 方法中向 registry 发起请求读取数据的具体步骤:
/// Request registry server with `authorization` header
///
/// Bearer token authenticate workflow:
///
/// Request: POST https://my-registry.com/test/repo/blobs/uploads
/// Response: status: 401 Unauthorized
/// header: www-authenticate: Bearer realm="https://auth.my-registry.com/token",service="my-registry.com",scope="repository:test/repo:pull,push"
///
/// Request: POST https://auth.my-registry.com/token
/// body: "service=my-registry.com&scope=repository:test/repo:pull,push&grant_type=password&username=x&password=x&client_id=nydus-registry-client"
/// Response: status: 200 Ok
/// body: { "token": "<token>" }
///
/// Request: POST https://my-registry.com/test/repo/blobs/uploads
/// header: authorization: Bearer <token>
/// Response: status: 200 Ok
///
/// Basic authenticate workflow:
///
/// Request: POST https://my-registry.com/test/repo/blobs/uploads
/// Response: status: 401 Unauthorized
/// header: www-authenticate: Basic
///
/// Request: POST https://my-registry.com/test/repo/blobs/uploads
/// header: authorization: Basic base64(<username:password>)
/// Response: status: 200 Ok
fn request<R: Read + Send + 'static>(
&self,
method: Method,
url: &str,
data: Option<ReqBody<R>>,
mutheaders: HeaderMap,
catch_status: bool,
) -> RegistryResult<Response> {
// Try get authorization header from cache for this request
let mutlast_cached_auth = String::new();
// 将缓存的 auth 添加到 headers 中
let cached_auth = self.state.cached_auth.get();
if !cached_auth.is_empty() {
last_cached_auth = cached_auth.clone();
headers.insert(
HEADER_AUTHORIZATION,
HeaderValue::from_str(cached_auth.as_str()).unwrap(),
);
}
// For upload request with payload, the auth header should be cached
// after create_upload(), so we can request registry server directly
if let Some(data) = data {
return self
.connection
.call(method, url, None, Some(data), &mutheaders, catch_status)
.map_err(RegistryError::Request);
}
// 调用 call 方法
// Try to request registry server with `authorization` header
let resp = self
.connection
.call::<&[u8]>(method.clone(), url, None, None, &mutheaders, false)
.map_err(RegistryError::Request)?;
// 如果返回状态为 UNAUTHORIZED,则会获取 token 并存储在 state.cached_auth
if resp.status() == StatusCode::UNAUTHORIZED {
if let Some(resp_auth_header) = resp.headers().get(HEADER_WWW_AUTHENTICATE) {
// Get token from registry authorization server
if let Some(auth) = RegistryState::parse_auth(resp_auth_header, &self.state.auth) {
let auth_header = self
.state
.get_auth_header(auth, &self.connection)
.map_err(|e| RegistryError::Common(e.to_string()))?;
headers.insert(
HEADER_AUTHORIZATION,
HeaderValue::from_str(auth_header.as_str()).unwrap(),
);
// Try to request registry server with `authorization` header again
let resp = self
.connection
.call(method, url, None, data, &mutheaders, catch_status)
.map_err(RegistryError::Request)?;
let status = resp.status();
if is_success_status(status) {
// Cache authorization header for next request
self.state.cached_auth.set(&last_cached_auth, auth_header)
}
return respond(resp, catch_status).map_err(RegistryError::Request);
}
}
}
respond(resp, catch_status).map_err(RegistryError::Request)
}
首先,将缓存的 auth
添加到 headers
中,然后调用 call
方法发起请求:
/// Send a request to server and wait for response.
pub fn call<R: Read + Send + 'static>(
&self,
method: Method,
url: &str,
query: Option<&[(&str, &str)]>,
data: Option<ReqBody<R>>,
headers: &mut HeaderMap,
catch_status: bool,
) -> ConnectionResult<Response> {
if self.shutdown.load(Ordering::Acquire) {
return Err(ConnectionError::Disconnected);
}
// 使用 proxy 发起请求
if let Some(proxy) = &self.proxy {
if proxy.health.ok() {
let data_cloned: Option<ReqBody<R>> = match data.as_ref() {
Some(ReqBody::Form(form)) => Some(ReqBody::Form(form.clone())),
Some(ReqBody::Buf(buf)) => Some(ReqBody::Buf(buf.clone())),
_ => None,
};
let http_url: Option<String>;
let mutreplaced_url = url;
if proxy.use_http {
http_url = proxy.try_use_http(url);
if let Some(ref r) = http_url {
replaced_url = r.as_str();
}
}
let result = self.call_inner(
&proxy.client,
method.clone(),
replaced_url,
&query,
data_cloned,
headers,
catch_status,
true,
);
match result {
Ok(resp) => {
if !proxy.fallback || resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
return Ok(resp);
}
}
Err(err) => {
if !proxy.fallback {
return Err(err);
}
}
}
// proxy 失败之后进行回源处理
// If proxy server responds invalid status code or http connection failed, we need to
// fallback to origin server, the policy only applicable to non-upload operation
warn!("Request proxy server failed, fallback to original server");
} else {
LAST_FALLBACK_AT.with(|f| {
let current = SystemTime::now();
if current.duration_since(*f.borrow()).unwrap().as_secs()
>= RATE_LIMITED_LOG_TIME as u64
{
warn!("Proxy server is not healthy, fallback to original server");
f.replace(current);
}
})
}
}
// 回源处理,直接向源 registry 发起请求
self.call_inner(
&self.client,
method,
url,
&query,
data,
headers,
catch_status,
false,
)
}
call_inner
方法:
#[allow(clippy::too_many_arguments)]
fn call_inner<R: Read + Send + 'static>(
&self,
client: &Client,
method: Method,
url: &str,
query: &Option<&[(&str, &str)]>,
data: Option<ReqBody<R>>,
headers: &HeaderMap,
catch_status: bool,
proxy: bool,
) -> ConnectionResult<Response> {
// 避免在日志中打印 HEADER_AUTHORIZATION 信息
// Only clone header when debugging to reduce potential overhead.
let display_headers = if max_level() >= Level::Debug {
let mutdisplay_headers = headers.clone();
display_headers.remove(HEADER_AUTHORIZATION);
Some(display_headers)
} else {
None
};
let has_data = data.is_some();
let start = Instant::now();
// 发起请求,带 headers
let mutrb = client.request(method.clone(), url).headers(headers.clone());
if let Some(q) = query.as_ref() {
rb =rb.query(q);
}
let ret;
if let Some(data) = data {
match data {
ReqBody::Read(body, total) => {
let body = Body::sized(body, total as u64);
ret =rb.body(body).send();
}
ReqBody::Buf(buf) => {
ret =rb.body(buf).send();
}
ReqBody::Form(form) => {
ret =rb.form(&form).send();
}
}
} else {
// 没有 data 时,body 为 "" 并直接发起请求
ret =rb.body("").send();
}
debug!(
"{} Request: {} {} headers: {:?}, proxy: {}, data: {}, duration: {}ms",
std::thread::current().name().unwrap_or_default(),
method,
url,
display_headers,
proxy,
has_data,
Instant::now().duration_since(start).as_millis(),
);
match ret {
Err(err) => Err(ConnectionError::Common(err)),
Ok(resp) => respond(resp, catch_status),
}
}
OSS 作为 backend 时的try_read
方法:
fn try_read(&self, mut buf: &mut [u8], offset: u64) -> BackendResult<usize> {
let query = &[];
// 生成 URL 信息
let (resource, url) = self.state.url(&self.blob_id, query);
let mut headers = HeaderMap::new();
let end_at = offset + buf.len() as u64 - 1;
let range = format!("bytes={}-{}", offset, end_at);
// headers 添加 range 信息
headers.insert(
"Range",
range
.as_str()
.parse()
.map_err(|e| OssError::ConstructHeader(format!("{}", e)))?,
);
self.state
.sign(Method::GET, &mut headers, resource.as_str())
.map_err(OssError::Auth)?;
// Safe because the the call() is a synchronous operation.
let mut resp = self
.connection
.call::<&[u8]>(
Method::GET,
url.as_str(),
None,
None,
&mut headers,
true,
false,
)
.map_err(OssError::Request)?;
Ok(resp
.copy_to(&mut buf)
.map_err(OssError::Transport)
.map(|size| size as usize)?)
}
self.state.sign
方法用于生成请求的签名:
/// generate oss request signature
fn sign(
&self,
verb: Method,
headers: &mut HeaderMap,
canonicalized_resource: &str,
) -> Result<()> {
let content_md5 = "";
let content_type = "";
let mut canonicalized_oss_headers = vec![];
let date = httpdate::fmt_http_date(SystemTime::now());
// data 的内容
let mut data = vec![
verb.as_str(),
content_md5,
content_type,
date.as_str(),
// canonicalized_oss_headers,
canonicalized_resource,
];
// headers 内容
for (name, value) in headers.iter() {
let name = name.as_str();
let value = value.to_str().map_err(|e| einval!(e))?;
if name.starts_with("x-oss-") {
let header = format!("{}:{}", name.to_lowercase(), value);
canonicalized_oss_headers.push(header);
}
}
let canonicalized_oss_headers = canonicalized_oss_headers.join("\n");
if !canonicalized_oss_headers.is_empty() {
data.insert(4, canonicalized_oss_headers.as_str());
}
let data = data.join("\n");
let digest = HMAC::mac(data.as_bytes(), self.access_key_secret.as_bytes());
let signature = base64::encode(&digest);
let authorization = format!("OSS {}:{}", self.access_key_id, signature);
headers.insert(HEADER_DATE, date.as_str().parse().map_err(|e| einval!(e))?);
headers.insert(
HEADER_AUTHORIZATION,
authorization.as_str().parse().map_err(|e| einval!(e))?,
);
Ok(())
}
然后,通过self.connection.call
方法发起请求,和registry
发起请求时的方法相同。
LocalFS 作为 backend 的try_read
方法直接调用了uio::pread
方法:
fn try_read(&self, buf: &mut [u8], offset: u64) -> BackendResult<usize> {
debug!(
"local blob file reading: offset={}, size={} from={}",
offset,
buf.len(),
self.id,
);
uio::pread(self.file.as_raw_fd(), buf, offset as i64)
.map_err(|e| LocalFsError::ReadBlob(e).into())
}
[1]
nydus: https://github.com/dragonflyoss/image-service.git
[2]
mio: https://docs.rs/mio/latest/mio/struct.Poll.html
[3]
dbs-uhttp: https://github.com/openanolis/dbs-uhttp