CephFS用户IO流程




基于L版本代码(v12.2.12)分析。本人还在CephFs学习入门阶段,分析过程仅供参考,如有错误请谅解!

用户IO发送客户端

当前有3种方式可以与cephfs集群进行数据交互:
– libcephfs:提供与cephfs集群交互的C语言API,需要自己开发客户端,与ceph-fuse使用的下层接口相同
– ceph-fuse:sudo ceph-fuse -m 192.168.0.1:6789 /mnt/mycephfs
– kernel client:sudo mount -t ceph 192.168.0.1:6789:/ /mnt/mycephfs

ceph-fuse客户端启动流程

ceph-fuse依赖libfuse项目,很多都是调用的libfuse提供的接口,所以要对libfuse的API比较熟悉才能更好的理解启动过程。

// sudo ceph-fuse -m 192.168.0.1:6789 /mnt/mycephfs
ceph_fuse.cc:
    -> main()
        -> global_init()  // 生成ceph context
        -> fuse_parse_cmdline()  // 解析命令行参数
        -> forker.prefork(err)  // daemonize,成为守护进程
        -> new MonClient(g_ceph_context); mc->build_initial_monmap();  // 初始化monitor client及monmap
        -> Messenger::create_client_messenger()  // 创建client messenger
        -> client = new StandaloneClient(messenger, mc)  // 创建client,用来收发用户IO请求
        -> cfuse = new CephFuse(); cfuse->init()  // 创建CephFuse对象并初始化
            -> _handle->init()  // 初始化CephFuse::Handle对象,fuse_parse_cmdline是libfuse的api,用来解析libfuse需要的参数
        -> messenger->start()   // 启动messenger线程,开始接收消息
        -> init_async_signal_handler(); register_async_signal_handler(SIGHUP, sighup_handler)  // 注册SIGHUP信号处理函数
        -> client->init()  // 初始化client的定时器,启动objectcacher(对象缓存管理),初始化objecter并启动,objecter是跟osd打交道的client,添加dispatcher到messenger
        -> client->mount()  // ,与mds交互检查目录权限?
            -> authenticate()  // 通过monitor完成认证流程
            -> monclient->sub_want(want, 0, 0); monclient->renew_subs()  // 订阅mdsmap,并请求更新
            -> tick()  // 启动定时任务
            -> if (require_mds)  // 等待mds可用
            -> make_request()  // 发送请求给mds,循环检查mount的目录权限
        -> cfuse->start()
            -> fuse_mount(mountpoint, &args)  // 调用libfuse接口完成目录挂载
            -> fuse_lowlevel_new(&args, &fuse_ll_oper, sizeof(fuse_ll_oper), this)  // 创建lowlevel fuse session,其中fuse_ll_oper是定义好的各种posix接口的用户态实现,这里涉及到libfuse的两种用法,参考:https://www.lijiaocn.com/%E6%8A%80%E5%B7%A7/2019/01/21/linux-fuse-filesystem-in-userspace-usage.html
            -> fuse_set_signal_handlers(se)  // Exit session on HUP, TERM and INT signals and ignore PIPE signal
            -> fuse_session_add_chan(se, ch)  // Assign a channel to a session
            -> client->ll_register_callbacks(&args)  // 给client注册回调,包括inode invalidate callback、remount callback、dentry invalidate callback等
        -> tester.init(cfuse, client);  tester.create("tester");   // 初始化并启动remount的test线程(执行RemountTest::entry函数检查是否支持invalidate dentry,如果内核版本大于3.18并且配置项里设置了client_try_dentry_invalidate=true,则检查是否注册了dentry invalidate callback;反之则需要通过remount操作来强制invalidate dentry,执行的命令是"mount -i -o remount $mountpoint",如果remount失败并且配置项client_die_on_failed_dentry_invalidate=true则执行"fusermount -u -z $mountpoint"命令umount掉,下面的loop()就会失败,以达到退出进程的目的)
        -> cfuse->loop()
            -> fuse_session_loop_mt(se)  // Enter a multi-threaded event loop,开始处理IO请求
        -> tester.join(&tester_rp)  // loop()结束后,检查tester线程返回值

ceph-fuse用户IO流程

按数据类型可以分为两种IO,一种是操作metadata的,一种是操作文件内容的。

元数据IO流程

以mkdir操作为例进行说明。

client端

-> 用户在cephfs挂载目录下执行mkdir命令
    -> ceph-fuse进程调用fuse_lowlevel_new函数注册的用户态mkdir实现:fuse_ll_mkdir(ll应该是lowlevel的缩写,因为调用的是libfuse的lowlevel api)
        -> fuse_ll_req_prepare(req)
            -> fuse_req_userdata(req)  // 提取用户数据
        -> fuse_req_ctx(req)    // Get the context from the request
        -> UserPerm perm(ctx->uid, ctx->gid)  // 初始化用户权限
        -> get_fuse_groups(perm, req)  // 从req中获取用户组信息
            -> getgroups()
                -> fuse_req_getgroups(req)  // Get the current supplementary group IDs for the specified request, Similar to the getgroups(2) system call, except the return value is always the total number of group IDs, even if it is larger than the specified size.
            -> perms.init_gids(gids, count)  // 把用户组信息设置到perms
        -> i1 = cfuse->iget(parent);  // 获取父目录的inode
            -> client->get_root() 或 client->ll_get_inode(vino)   // 增加相关inode引用计数
        -> cfuse->client->ll_mkdir(i1, name, mode, &fe.attr, &i2, perm)
            -> _mkdir(parent, name, mode, perm, &in)
                -> is_quota_files_exceeded()  // 检查文件数量配额
                -> _posix_acl_create(dir, &mode, xattrs_bl, perm)   // 创建acl
                -> get_or_create(dir, name, &de)  // 创建新建目录的dentry
                -> make_request(req, perm, inp)  // 发送请求给mds执行创建目录操作,应该是同步请求
                -> trim_cache()  // 清理lru缓存中的dentry
            -> fill_stat(in, attr);    _ll_get(in.get());  // 填充stat信息,增加inode引用计数
        -> fuse_reply_entry(req, &fe)  // Reply with a directory entry

server端

void Server::dispatch_client_request(MDRequestRef& mdr)
{
  ...
    case CEPH_MDS_OP_MKDIR:
      handle_client_mkdir(mdr);
      break;
  ...
}

// MKDIR
/* This function takes responsibility for the passed mdr*/
-> Server::handle_client_mkdir(MDRequestRef& mdr)
    -> is_last_dot_or_dotdot()  // 检查末级目录是否为.或..
    -> rdlock_path_xlock_dentry()  // 遍历并创建新文件夹的dentry,获取非新建目录的rdlock,以及新目录的xlock
    -> check_access()  // 检查目录操作权限
    -> check_fragment_space()  // 检查mds上的目录分片大小是否超出限制
    -> prepare_new_inode()  // 创建新inode,填充信息后存入mdcache
    -> push_projected_linkage()  // 没看懂,猜测是链接新目录到父目录链表?
    -> mdlog->start_entry(le)   // 准备mdlog相关操作,新增一条log
    -> mds->locker->issue_new_caps()  // 新建目录的cap,并且清空它(初始化操作)
    -> journal_and_reply()  // 写入mdlog
        -> early_reply()  // 在提交mdlog之前先返回给client结果,应该是为了加速请求返回
        -> submit_mdlog_entry()
        -> mdlog->flush()

文件数据IO流程

以write操作为例进行说明。

client端

-> 用户在cephfs挂载目录下写入文件
    -> fuse_ll_write(fuse_req_t req, fuse_ino_t ino, const char *buf, size_t size, off_t off, struct fuse_file_info *fi)
        -> fuse_ll_req_prepare(req)  // 提取handle
        -> cfuse->client->ll_write(fh, off, size, buf)
            -> Client::_write()
                -> mdsmap->get_max_filesize()  // 检查文件大小是否越界
                -> objecter->osdmap_pool_full(in->layout.pool_id)  // 检查后的存储池是否满了
                -> f->mode & CEPH_FILE_MODE_WR  // 检查句柄是否可写
                -> is_quota_bytes_exceeded()  // 检查配额
                -> bl.append()  // 把写入的内容转存到bufferlist
                -> get_caps()  // 获取操作文件的cap,这个流程比较长,可以参考上面的元数据client端操作流程
                -> 根据写入的字节数判断是否可以执行inline写入,inline应该是指写入inode里面,也就是写入元数据池
                -> cct->_conf->client_oc  // 是否可以执行buffered write
                    -> objectcacher->file_write()  // async, caching, non-blocking
                    -> if O_SYNC||O_DSYNC; _flush_range()  // 刷数据
                -> 否则执行
                    -> if O_DIRECT; _flush_range   // 刷之前的数据
                    -> filer->write_trunc()
                        -> Striper::file_to_extents()   // 根据文件的偏移量和写入的长度找到对应的后端对象
                        -> objecter->sg_write_trunc()  // 把数据写入后端对象
                            -> write_trunc()
                                -> o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, oncommit, objver)
                                -> op_submit(o, &tid)
                                    -> _send_op()
                                        -> send_message()  // 发送对象写消息给osd
        -> fuse_reply_write(req, r)  // Reply with number of bytes written

server端

与rbd场景下osd端处理IO写请求的流程相同,不再分析。