先看用例源码:
#include <assert.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <uv.h>
void on_read(uv_fs_t *req);
// 几个req请求
uv_fs_t open_req;
uv_fs_t read_req;
uv_fs_t write_req;
// 读写的缓冲区都是这个
static char buffer[1024];
static uv_buf_t iov;
// 依次写req结束之后的回调 再次读取文件的内容
void on_write(uv_fs_t *req) {
if (req->result < 0) {
fprintf(stderr, "Write error: %s\n", uv_strerror((int)req->result));
}
else {
uv_fs_read(uv_default_loop(), &read_req, open_req.result, &iov, 1, -1, on_read);
}
}
// 打开文件之后读取文件内容 读到0意味着结束了
void on_read(uv_fs_t *req) {
if (req->result < 0) {
fprintf(stderr, "Read error: %s\n", uv_strerror(req->result));
}
else if (req->result == 0) {
uv_fs_t close_req;
// synchronous
// 关闭文件fd 注意这个调用没有cb 直接同步执行了
uv_fs_close(uv_default_loop(), &close_req, open_req.result, NULL);
}
else if (req->result > 0) {
// 提交一个写req来消费数据
iov.len = req->result;
uv_fs_write(uv_default_loop(), &write_req, 1, &iov, 1, -1, on_write);
}
}
// 打开文件req
void on_open(uv_fs_t *req) {
// The request passed to the callback is the same as the one the call setup
// function was passed.
assert(req == &open_req);
if (req->result >= 0) {
iov = uv_buf_init(buffer, sizeof(buffer));
uv_fs_read(uv_default_loop(), &read_req, req->result,
&iov, 1, -1, on_read);
}
else {
fprintf(stderr, "error opening file: %s\n", uv_strerror((int)req->result));
}
}
// 以上3个任务都是提交为异步任务 由工作线程完成
int main(int argc, char **argv) {
uv_fs_open(uv_default_loop(), &open_req, argv[1], O_RDONLY, 0, on_open);
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
uv_fs_req_cleanup(&open_req);
uv_fs_req_cleanup(&read_req);
uv_fs_req_cleanup(&write_req);
return 0;
}
我们以 open为例子:
int uv_fs_open(uv_loop_t* loop,
uv_fs_t* req,
const char* path,
int flags,
int mode,
uv_fs_cb cb) {
INIT(OPEN);
PATH;
req->flags = flags;
req->mode = mode;
POST;
}
#define INIT(subtype) \
do { \
if (req == NULL) \
return UV_EINVAL; \
UV_REQ_INIT(req, UV_FS); \
req->fs_type = UV_FS_ ## subtype; \
req->result = 0; \
req->ptr = NULL; \
req->loop = loop; \
req->path = NULL; \
req->new_path = NULL; \
req->bufs = NULL; \
req->cb = cb; \
} \
while (0)
#define POST \
do { \
if (cb != NULL) { \
uv__req_register(loop, req); \
uv__work_submit(loop, \
&req->work_req, \
UV__WORK_FAST_IO, \
uv__fs_work, \
uv__fs_done); \
return 0; \
} \
else { \
uv__fs_work(&req->work_req); \
return req->result; \
} \
} \
while (0)
大家很熟悉的工作线程任务了
看下清理工作:
void uv_fs_req_cleanup(uv_fs_t* req) {
if (req == NULL)
return;
/* Only necessary for asychronous requests, i.e., requests with a callback.
* Synchronous ones don't copy their arguments and have req->path and
* req->new_path pointing to user-owned memory. UV_FS_MKDTEMP and
* UV_FS_MKSTEMP are the exception to the rule, they always allocate memory.
*/
if (req->path != NULL &&
(req->cb != NULL ||
req->fs_type == UV_FS_MKDTEMP || req->fs_type == UV_FS_MKSTEMP))
uv__free((void*) req->path); /* Memory is shared with req->new_path. */
req->path = NULL;
req->new_path = NULL;
if (req->fs_type == UV_FS_READDIR && req->ptr != NULL)
uv__fs_readdir_cleanup(req);
if (req->fs_type == UV_FS_SCANDIR && req->ptr != NULL)
uv__fs_scandir_cleanup(req);
if (req->bufs != req->bufsml)
uv__free(req->bufs);
req->bufs = NULL;
if (req->fs_type != UV_FS_OPENDIR && req->ptr != &req->statbuf)
uv__free(req->ptr);
req->ptr = NULL;
}
#ifdef _WIN32
# define uv__fs_scandir_free uv__free
#else
# define uv__fs_scandir_free free
#endif
void uv__fs_scandir_cleanup(uv_fs_t* req) {
uv__dirent_t** dents;
unsigned int* nbufs = uv__get_nbufs(req);
dents = req->ptr;
if (*nbufs > 0 && *nbufs != (unsigned int) req->result)
(*nbufs)--;
for (; *nbufs < (unsigned int) req->result; (*nbufs)++)
uv__fs_scandir_free(dents[*nbufs]);
uv__fs_scandir_free(req->ptr);
req->ptr = NULL;
}
都是比较常规的代码。
总结:把同步阻塞的操作变成异步work,多线程来处理。很妙。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。