Ceph iscsi方案及环境搭建

方案1:Ceph iscsi gateway及tcmu-runner部署流程

本部署流程文档基于Centos 7.5云主机验证。

TCMU原理介绍:Linux LIO 与 TCMU 用户空间透传 – Lixiubo_Liuyuan.pdf

环境准备

  1. Ceph L版本可用集群
  2. 至少两台Centos7.5版本主机(云主机或物理机)作为iscsi gateway节点,可以与Ceph集群public网络互通,或者其他发行版,但内核版本需要4.16以上

 

RHEL/CentOS 7.5; Linux kernel v4.16 or newer; or the Ceph iSCSI client test kernel

If not using a distro kernel that contains the required Ceph iSCSI patches, then Linux kernel v4.16 or newer or the ceph-client ceph-iscsi-test branch must be used.

> git clone https://github.com/ceph/ceph-client.git
> git checkout ceph-iscsi-test

Warning: ceph-iscsi-test is not for production use. It should only be used for proof of concept setups and testing. The kernel is only updated with Ceph iSCSI patches. General security and bug fixes from upstream are not applied.

# 官方建议的ceph集群osd配置调整:
[osd]
osd heartbeat grace = 20
osd heartbeat interval = 5

 

部署过程

小提示:两台iscsi gateway节点,如果是使用的云主机,可以先只部署一台,部署ok之后做个自定义镜像,再用自定义镜像创建一台,修改主机名和/etc/hosts文件及iscsi gateway配置即可复制出新的gateway节点。

整体步骤:

  1. 配置两台Centos7.5云主机的主机名,修改/etc/hosts,保证两边可通过主机名互通,本次部署主机名分别为tcmu、tcmu2,ip分别为192.168.0.6、192.168.0.7
  2. 在gateway节点安装ceph包(使用ceph-deploy或手工安装),参考:http://docs.ceph.com/docs/master/start/quick-rbd/#install-ceph
  3. 下载tcmu-runner源码并编译安装
  4. 下载并安装ceph-iscsi-config(rbdtargetgw)、cephiscsicli(rbdtargetapi及gwcli命令),以及相关依赖包

安装ceph包

# 在gateway节点添加Ceph公司内部源,或者添加官方源:yum install centos-release-ceph-luminous.noarch -y
$ ceph-deploy install --release luminous tcmu  # 及tcmu2,ceph-deploy节点需要修改/etc/hosts文件
$ ceph-deploy admin tcmu   # 及tcmu2,或者手工copy /etc/ceph目录到gateway节点
# 验证ceph命令是否正常,如ceph -s

安装tcmu-runner

$ git clone https://github.com/open-iscsi/tcmu-runner.git
$ yum -y install epel-release python-pip python-rbd python-devel python-crypto # 安装依赖包
cd tcmu-runner
$ cmake -Dwith-glfs=false -Dwith-qcow=false -DSUPPORT_SYSTEMD=ON -DCMAKE_INSTALL_PREFIX=/usr
make make install
$ systemctl daemon-reload
$ systemctl enable tcmu-runner   # 如报错,则手工copy service:cp tcmu-runner.service /lib/systemd/system,参考:https://github.com/open-iscsi/tcmu-runner#running-tcmu-runner
$ systemctl start tcmu-runner

安装ceph-iscsi-config

$ git clone https://github.com/open-iscsi/targetcli-fb; git clone https://github.com/open-iscsi/configshell-fb; git clone https://github.com/open-iscsi/rtslib-fb  ## 下载依赖包,targetcli可不装,用ceph-iscsi-cli代替
$ python setup.py install  # 在上述3个依赖包目录下执行安装命令,如有提示缺少相关包则手工pip install安装
$ git clone https://github.com/ceph/ceph-iscsi-config.git
$ python setup.py install # 在ceph-iscsi-config目录执行安装命令,如有提示缺少相关包则手工pip install安装
$ systemctl daemon-reload
$ systemctl enable rbd-target-gw  # 如提示错误,则手工copy service文件到/lib/systemd/system
$ systemctl start rbd-target-gw  # 注意需要先创建配置文件,否则配置文件创建完需要重启该服务

在所有gateway节点创建配置文件:

[root@tcmu ~]# cat /etc/ceph/iscsi-gateway.cfg
[config]
# name of the *.conf file. A suitable conf file allowing access to the ceph
# cluster from the gateway node is required.
cluster_name = ceph
# Place a copy of the ceph cluster's admin keyring in the gateway's /etc/ceph
# drectory and reference the filename here
gateway_keyring = ceph.client.admin.keyring
# API settings.
# The api supports a number of options that allow you to tailor it to your
# local environment. If you want to run the api under https, you will need to
# create crt/key files that are compatible for each gateway node (i.e. not
# locked to a specific node). SSL crt and key files *must* be called
# iscsi-gateway.crt and iscsi-gateway.key and placed in /etc/ceph on *each*
# gateway node. With the SSL files in place, you can use api_secure = true
# to switch to https mode.
# To support the api, the bear minimum settings are;
api_secure = false
# Additional API configuration options are as follows (defaults shown);
api_user = admin
api_password = admin
api_port = 5001
trusted_ip_list = 192.168.0.6, 192.168.0.7

安装ceph-iscsi-cli

$ git clone https://github.com/ceph/ceph-iscsi-cli.git
cd ceph-iscsi-cli
$ python setup.py install --install-scripts=/usr/bin # 在ceph-iscsi-cli目录执行安装命令,如有提示缺少相关包则手工pip install安装
cp usr/lib/systemd/system/rbd-target-api.service /lib/systemd/system
$ systemctl daemon-reload
$ systemctl enable rbd-target-api
$ systemctl start rbd-target-api

iscsi target配置

通过gwcli命令即可配置,过程请参考:http://www.zphj1987.com/2018/04/11/ceph-ISCSI-GATEWAY/

iscsi initiator配置

为了简化部署,可使用gateway节点作为initiator节点来测试功能,相关软件的安装及配置,iscsiadm操作过程请参考:http://www.zphj1987.com/2018/04/11/ceph-ISCSI-GATEWAY/

参考:

  1. http://docs.ceph.com/docs/master/rbd/iscsi-requirements/
  2. http://docs.ceph.com/docs/master/rbd/iscsi-target-cli-manual-install/
  3. http://docs.ceph.com/docs/master/rbd/iscsi-target-cli/
  4. http://www.zphj1987.com/2018/04/11/ceph-ISCSI-GATEWAY/

 

方案2:TGT+rbd backing store部署流程

本次测试在两台Centos7.5云主机上完成,分别为tcmu(192.168.0.6)、tcmu2(192.168.0.7)。

前提:一个可以正常创建卷的ceph L版本集群,TGT节点可以访问该集群。

安装TGT服务(target端)

Centos 7.5添加epel源,可以直接安装scsi-target-utils包(yum –enablerepo=epel -y install scsi-target-utils),但是这个包里的TGT不支持rbd backing store,所以还是要手工编译。

Debian 9发行版可能有tgt-rbd包可以用,https://packages.debian.org/stretch/tgt, https://packages.debian.org/stretch/tgt-rbd, 安装这两个包应该就可以了。

 

编译TGT

  1. 下载源码:wget https://github.com/fujita/tgt/archive/v1.0.73.tar.gz
  2. tar xzf v1.0.73.tar.gz; cd tgt-1.0.73; make; make install (如果make提示xsltproc command not found,需要先执行yum install libxslt -y安装依赖包)
  3. 启动tgtd服务,可以手工启动:/usr/sbin/tgtd -f,或者用systemctl start tgtd(实际测试过程中发现tgt源码目录tgt-1.0.73/scripts下的tgtd.service配置文件并不好用,启动会卡住,我这里是先安装了epel源的scsi-target-utils,然后make install替换掉二进制文件,就可以正常启动带rbd backing store的tgtd服务了)
  4. 检查是否支持rbd backing store:tgtadm –lld iscsi –op show –mode system | grep rbd,输出rbd (bsoflags sync:direct)表示支持。

 

部署iscsi initiator(initiator端)

这里为了方便,直接在节点tcmu2上部署initiator软件(与target端共用一个节点)。

参考http://www.zphj1987.com/2018/04/11/ceph-ISCSI-GATEWAY/ Linux的客户端连接部分即可,主要是安装iscsi-initiator-utils客户端软件,以及多路径软件device-mapper-multipath。

 

创建target及lun(target端)

先创建一个rbd卷:

$ rbd create disk2 --size 1G

之后在tcmu、tcmu2上执行相同命令:

$ tgtadm --lld iscsi --mode target --op new --tid 1 --targetname iqn.2018-10.com.netease:cephtgt.target0  # 创建target
$ tgtadm --lld iscsi --mode logicalunit --op new --tid 1 --lun 1 --backing-store disk2 --bstype rbd  # 将rbd卷作为lun添加到target,注意lun id要从1开始,0被tgt使用
$ tgtadm --lld iscsi --op bind --mode target --tid 1 -I ALL  # 配置ACL授权,ALL表示所有节点均可访问该target,也可以用CIDR限制某个网段访问

 

initiator连接到target(initiator端)

在tcmu2节点上执行:

$ iscsiadm -m discovery -t st -p tcmu  # 发现target
$ iscsiadm -m discovery -t st -p tcmu2
$ iscsiadm -m node -T iqn.2018-10.com.netease:cephtgt.target0 -l -p tcmu # 登录target
$ iscsiadm -m node -T iqn.2018-10.com.netease:cephtgt.target0 -l -p tcmu2

即可连接到target,查看映射到本机的iscsi卷:

[root@tcmu2 ~]# lsblk
NAME     MAJ:MIN RM  SIZE RO TYPE  MOUNTPOINT
sda        8:0    0    1G  0 disk 
└─mpathc 252:0    0    1G  0 mpath
sdb        8:16   0    1G  0 disk 
└─mpathc 252:0    0    1G  0 mpath
[root@tcmu2 ~]# multipath -ll   # 查看多路径设备信息
mpathc (360000000000000000e00000000020001) dm-0 IET     ,VIRTUAL-DISK   
size=1.0G features='0' hwhandler='0' wp=rw
|-+- policy='service-time 0' prio=0 status=enabled
| `- 10:0:0:1 sda 8:0  failed faulty running
`-+- policy='service-time 0' prio=1 status=active
  `- 11:0:0:1 sdb 8:16 active ready running

使用/dev/mapper/mpathc设备即可访问rbd卷disk2,并且是多路径方式,tcmu节点上的tgtd服务异常或者网络异常、节点宕机,均可自动切换到tcmu2的tgtd进行正常的IO访问。

 

添加一个新的lun到target(target端)

先创建一个rbd卷:

$ rbd create disk3 --size 2G

在tcmu、tcmu2上分别执行:

$ tgtadm --lld iscsi --mode logicalunit --op new --tid 1 --lun 2 --backing-store disk3 --bstype rbd  # 将rbd卷作为lun添加到target

 

initiator端发现新的lun(initiator端)

 

$ iscsiadm -m session -R  # 重新扫描所有已建立的target连接,发现新的lun及lun大小变动等信息更新

查看映射的iscsi卷方法同上。

 

initiator端登出

$ iscsiadm -m node -T iqn.2018-10.com.netease:cephtgt.target0 --logout # 先确保卷未使用
$ iscsiadm -m node -T iqn.2018-10.com.netease:cephtgt.target0 -o delete
$ iscsiadm -m node  # 查看所有保存的target记录(可能未login)

 

target端清理

$ tgtadm --lld iscsi --mode logicalunit --op delete --tid 1 --lun 2  # 删除target id为1中的id为2的lun,先确保initiator端先logout
$ tgtadm --lld iscsi --mode target --op delete --tid 1  # 删除id为1的target
$ tgt-admin --show   # 查看所有target信息

配置持久化

通过配置文件实现target和initiator端重启后自动恢复相关配置和连接。

待补充

参考:

  1. http://www.zphj1987.com/2018/04/11/ceph-ISCSI-GATEWAY/
  2. https://jerry.red/300/%E5%88%9B%E5%BB%BA-iscsi-target-%E6%9C%8D%E5%8A%A1%E5%99%A8%E5%92%8C-iscsi-initiator-%E5%AE%A2%E6%88%B7%E7%AB%AF%E8%BF%9E%E6%8E%A5
  3. http://linux.vbird.org/linux_server/0460iscsi.php#initiator_exam

 

方案3:LIO+krbd/rbd-nbd实现iscsi target方案

LIO是内核态的iscsi target实现,支持多种backing store,但还不支持rbd,只能用krbd或者rbd-nbd方式先把rbd卷map成block device,之后再将映射的设备如/dev/rbdX或/dev/nbdX给LIO作为block backing store使用,并最终作为target导出给initiator使用。

TCMU是LIO的用户态实现,可直接支持rbd后端:Ceph iscsi gateway及tcmu-runner部署流程

LIO原理介绍:Linux LIO 与 TCMU 用户空间透传 – Lixiubo_Liuyuan.pdf

本次测试在两台Centos7.5云主机上完成,分别为tcmu(192.168.0.6)作为target节点、tcmu2(192.168.0.7)作为initiator节点。

前提:一个可以正常创建卷的ceph L版本集群,target节点可以访问该集群。

相关操作流程如下:

内核模块检查

一般内核都是默认加载的,如果没有加载可以手工modprobe加载上:

# lsmod | grep target_core_mod               
target_core_mod       340809  13 target_core_iblock,target_core_pscsi,iscsi_target_mod,target_core_file,target_core_user
crc_t10dif             12912  2 target_core_mod,sd_mod

 

安装targetcli客户端

# 在target节点(tcmu)上执行
$ git clone https://github.com/open-iscsi/targetcli-fb; git clone https://github.com/open-iscsi/configshell-fb; git clone https://github.com/open-iscsi/rtslib-fb  ## 下载包及依赖
$ python setup.py install  # 在上述3个目录下执行安装命令,targetcli-fb依赖后面两个包,如有提示缺少其他依赖包则手工pip install安装,或者从github上(pip命令需要安装python-pip rpm包)

 

进行target配置

首先要有一个rbd卷,这里已经create过一个vol3,1G大小,属于rbd pool。

然后在target(tcmu)节点上map这个rbd卷:

# 在target节点(tcmu)上执行
$ rbd map vol3  # 或者用rbd-nbd方式map也可以
$ rbd showmapped
id pool image snap device   
0  rbd  vol3  -    /dev/rbd0

之后将/dev/rbd0作为block设备给LIO使用。

在target(tcmu)节点执行targetcli命令:

$ targetcli
targetcli shell version 2.1.fb49
Copyright 2011-2013 by Datera, Inc and others.
For help on commands, type 'help'.
 /> cd /backstores/block
/backstores/blockls
o- block ...................................................................................................... [Storage Objects: 0]
/backstores/block> create name=rbd0 dev=/dev/rbd0 # 添加block后端
Created block storage object rbd0 using /dev/rbd0.
/backstores/blockls
o- block ...................................................................................................... [Storage Objects: 1]
 o- rbd0 .............................................................................. [/dev/rbd0 (1.0GiB) write-thru deactivated]
 o- alua ....................................................................................................... [ALUA Groups: 1]
 o- default_tg_pt_gp ........................................................................... [ALUA state: Active/optimized]/>
cd /iscsi/iscsi> create
Created target iqn.2003-01.org.linux-iscsi.tcmu.x8664:sn.546f452bcfe2.
Created TPG 1.
Global pref auto_add_default_portal=true
Created default portal listening on all IPs (0.0.0.0), port 3260.
/iscsils
o- iscsi .............................................................................................................. [Targets: 1]
 o- iqn.2003-01.org.linux-iscsi.tcmu.x8664:sn.546f452bcfe2 .............................................................. [TPGs: 1]
 o- tpg1 ................................................................................................. [no-gen-acls, no-auth]
 o- acls ............................................................................................................ [ACLs: 0]
 o- luns ............................................................................................................ [LUNs: 0]
 o- portals ...................................................................................................... [Portals: 1]
 o- 0.0.0.0:3260 ....................................................................................................... [OK]
/iscsicd iqn.2003-01.org.linux-iscsi.tcmu.x8664:sn.546f452bcfe2/tpg1/luns
/iscsi/iqn.20...fe2/tpg1/luns> create /backstores/block/rbd0
Created LUN 0.
/iscsi/iqn.20...fe2/tpg1/lunsls
o- luns .................................................................................................................. [LUNs: 1]
 o- lun0 .............................................................................. [block/rbd0 (/dev/rbd0) (default_tg_pt_gp)]
/iscsi/iqn.20...fe2/tpg1/lunscd ..
/iscsi/iqn.20...452bcfe2/tpg1set attribute authentication=0 demo_mode_write_protect=0 generate_node_acls=1 cache_dynamic_acls=1
Parameter demo_mode_write_protect is now '0'.
Parameter authentication is now '0'.
Parameter generate_node_acls is now '1'.
Parameter cache_dynamic_acls is now '1'./iscsi/iqn.20...452bcfe2/tpg1cd /
/> saveconfig
Last 10 configs saved in /etc/target/backup/.
Configuration saved to /etc/target/saveconfig.json/> exit

 

initiator操作

与其他target方式相同,都是用iscsiadm工具连接到target,具体参考TGT+rbd backing store部署流程这篇文档中的操作流程。

# 在tcmu2节点执行
$ iscsiadm -m discovery -t st -p tcmu
$ iscsiadm -m node -T iqn.2018-10.com.netease:cephtgt.target0 -l -p tcmu
$ lsblk
NAME   MAJ:MIN RM SIZE RO TYPE  MOUNTPOINT
sda      8:0    0   1G  0 disk

 

多路径

RBD exclusive lock feature对多路径有影响,参考:https://www.sebastien-han.fr/blog/2017/01/05/Ceph-RBD-and-iSCSI/(第5节),只能是主备模式?这部分有待补充。

 

 

 

QEMU中librbd相关线程和回调及IO写流程简要介绍

概念介绍

Image

对应于LVM的Logical Volume,是能被attach/detach到VM的载体。在RBD中,Image的数据有多个Object组成。

Snapshot

Image的某一个特定时刻的状态,只能读不能写但是可以将Image回滚到某一个Snapshot状态。Snapshot必定属于某一个Image。

Clone

为Image的某一个Snapshot的状态复制变成一个Image。如ImageA有一个Snapshot-1,clone是根据ImageA的Snapshot-1克隆得到ImageB。ImageB此时的状态与Snapshot-1完全一致,区别在于ImageB此时可写,并且拥有Image的相应能力。

元数据

striping

  • order:22,The size of objects we stripe over is a power of two, specifically 2^[order] bytes. The default is 22, or 4 MB.
  • stripe_unit:4M,Each [stripe_unit] contiguous bytes are stored adjacently in the same object, before we move on to the next object.
  • stripe_count:1,After we write [stripe_unit] bytes to [stripe_count] objects, we loop back to the initial object and write another stripe, until the object reaches its maximum size (as specified by [order]. At that point, we move on to the next [stripe_count] objects.

root@ceph1 ~ $ rados -p rbd ls

  • rbd_header.1bdfd6b8b4567:保存image元数据(rbd info的信息)
  • rbd_directory:保存所有image的id和名称列表
  • rbd_info:“overwrite validated”,EC pool使用?
  • rbd_id.vol1:保存image的id
  • rbd_data.233546b8b4567.0000000000000025:保存image数据的对象,按需分配,233546b8b4567为image id,0000000000000025为stripe_unit id,从0开始增长

参考:

  1. http://hustcat.github.io/rbd-image-internal-in-ceph/
  2. http://tracker.ceph.com/issues/19081

回调

回调类

3个特征:

  1. 类名称以C_开头
  2. 实现了finish成员函数
  3. Context子类

举例:

struct C_AioComplete : public Context {
  AioCompletionImpl *c;

  explicit C_AioComplete(AioCompletionImpl *cc) : c(cc) {
    c->_get();
  }

  void finish(int r) override {
    rados_callback_t cb_complete = c->callback_complete;
    void *cb_complete_arg = c->callback_complete_arg;
    if (cb_complete)
      cb_complete(c, cb_complete_arg);

    rados_callback_t cb_safe = c->callback_safe;
    void *cb_safe_arg = c->callback_safe_arg;
    if (cb_safe)
      cb_safe(c, cb_safe_arg);

    c->lock.Lock();
    c->callback_complete = NULL;
    c->callback_safe = NULL;
    c->cond.Signal();
    c->put_unlock();
  }
};

还有一种回调适配器类,通过模板类实现通用的回调类,可以把各种类转换成回调类:

template <typename T, void (T::*MF)(int)>
class C_CallbackAdapter : public Context {
  T *obj;
public:
  C_CallbackAdapter(T *obj) : obj(obj) {
  }

protected:
  void finish(int r) override {
    (obj->*MF)(r);
  }
};

之后通过回调生成函数create_xxx_callback(create_context_callback、create_async_context_callback)函数创建出回调类,供后续注册使用。

回调适配函数

通过模板函数将任意函数转换为回调函数。

为啥不直接用原始函数作为回调函数注册进去?

template <typename T>
void rados_callback(rados_completion_t c, void *arg) {
  reinterpret_cast<T*>(arg)->complete(rados_aio_get_return_value(c));
}

template <typename T, void(T::*MF)(int)>
void rados_callback(rados_completion_t c, void *arg) {
  T *obj = reinterpret_cast<T*>(arg);
  int r = rados_aio_get_return_value(c);
  (obj->*MF)(r);
}

template <typename T, Context*(T::*MF)(int*), bool destroy>
void rados_state_callback(rados_completion_t c, void *arg) {
  T *obj = reinterpret_cast<T*>(arg);
  int r = rados_aio_get_return_value(c);
  Context *on_finish = (obj->*MF)(&r);
  if (on_finish != nullptr) {
    on_finish->complete(r);
    if (destroy) {
      delete obj;
    }
  }
}

回调生成函数

create_context_callback、create_async_context_callback上面已经介绍过,这里主要介绍create_rados_callback:

template <typename T>
librados::AioCompletion *create_rados_callback(T *obj) {
  return librados::Rados::aio_create_completion(
    obj, &detail::rados_callback<T>, nullptr);
}

template <typename T, void(T::*MF)(int)> // MF是真正的回调函数
librados::AioCompletion *create_rados_callback(T *obj) {
  return librados::Rados::aio_create_completion(
    obj, &detail::rados_callback<T, MF>, nullptr);
}
/*       这2个create_rados_callback用来创建间接回调rados_callback/rados_state_callback,MF是真正的回调                    */

// 重载函数,要注意区分模板中的第二个参数(也即回调函数)的类型,以便确定调用的是这个还是上面的那个
// 如Context *handle_v2_get_mutable_metadata(int *result)调用这个,
// 而void RewatchRequest::handle_unwatch(int r)则调用的是上面那个
template <typename T, Context*(T::*MF)(int*), bool destroy=true> // MF是真正的回调
librados::AioCompletion *create_rados_callback(T *obj) {
  return librados::Rados::aio_create_completion(
    obj, &detail::rados_state_callback<T, MF, destroy>, nullptr);
}

这个函数只做了一件事,就是创建一个rados操作需要的AioCompletion回调类(与上面),而回调类里的回调函数,则是用上面提到的回调适配函数转换的,把普通函数转换为回调函数。

回调注册

有如下几种方式:

  1. 直接注册:通常在最外层,对外接口中使用,一般需要在librbd内部二次封装
  2. 通过回调生成函数:librbd内部使用较多
  3. 通过回调适配函数:librbd内部使用较多

回调与Finisher线程的关系

回调类为啥必须继承Context?

这是因为所有的回调都由finisher线程处理(执行体为Finisher::finisher_thread_entry),而该线程会调用回调类的complete成员函数,Context类实现了这个函数,专门用来作为回调公共类。只是为了方便、统一,并不是必须的,你可以可以自己实现回调类的complete成员函数,而不继承Context。

参考下面finisher thread的关联队列finisher_queue、finisher_queue_rval的入队过程,可了解回调入队过程。

void *Finisher::finisher_thread_entry()
{
  ......
  while (!finisher_stop) {
    while (!finisher_queue.empty()) {
      vector<Context*> ls;
      list<pair<Context*,int> > ls_rval;
      ls.swap(finisher_queue);
      ls_rval.swap(finisher_queue_rval);
      ......
      // Now actually process the contexts.
      for (vector<Context*>::iterator p = ls.begin(); p != ls.end(); ++p) {
    	if (*p) {
    	  (*p)->complete(0); // 调用回调类的complete成员函数
    	} else {
    	  // When an item is NULL in the finisher_queue, it means
    	  // we should instead process an item from finisher_queue_rval,
    	  // which has a parameter for complete() other than zero.
    	  // This preserves the order while saving some storage.
    	  assert(!ls_rval.empty());
    	  Context *c = ls_rval.front().first;
    	  c->complete(ls_rval.front().second); // 调用回调类的complete成员函数
    	  ls_rval.pop_front();
    	}
    ......
}

回调流

在rbd image打开过程中,需要执行很多流程来获取image的各种元数据信息(流程描述参考OpenRequest的注释,主要包括V2_DETECT_HEADER、V2_GET_ID|NAME、V2_GET_IMMUTABLE_METADATA、V2_GET_STRIPE_UNIT_COUNT、V2_GET_CREATE_TIMESTAMP、V2_GET_DATA_POOL等),当然你也可以在一个方法中一次获取全部元数据,但会导致单次操作耗时太长,各元数据的获取函数耦合也比较重,这是我个人的猜测,也可能其他方面的考虑,目前还没有理解。

librbd中用回调流的方式,来依次调用各个元数据请求函数和响应处理函数,入口是rbd_open,第一个执行的元数据请求函数是send_v2_detect_header(发送检查是否为v2版本image header的请求),qemu的具体调用栈如下:

Thread 1 "qemu-system-x86" hit Breakpoint 4, librbd::image::OpenRequest<librbd::ImageCtx>::send_v2_detect_header (this=this@entry=0x5555568d1520)
    at /mnt/ceph/src/librbd/image/OpenRequest.cc:84
84      void OpenRequest<I>::send_v2_detect_header() {
(gdb) bt
#0  librbd::image::OpenRequest<librbd::ImageCtx>::send_v2_detect_header (this=this@entry=0x5555568d1520) at /mnt/ceph/src/librbd/image/OpenRequest.cc:84
#1  0x00007fffdf0f1895 in librbd::image::OpenRequest<librbd::ImageCtx>::send (this=this@entry=0x5555568d1520) at /mnt/ceph/src/librbd/image/OpenRequest.cc:42
#2  0x00007fffdf058030 in librbd::ImageState<librbd::ImageCtx>::send_open_unlock (this=0x5555568cf750) at /mnt/ceph/src/librbd/ImageState.cc:592
#3  0x00007fffdf05b9b9 in librbd::ImageState<librbd::ImageCtx>::execute_next_action_unlock (this=this@entry=0x5555568cf750) at /mnt/ceph/src/librbd/ImageState.cc:521
#4  0x00007fffdf05ca39 in librbd::ImageState<librbd::ImageCtx>::execute_action_unlock (this=this@entry=0x5555568cf750, action=..., 
    on_finish=on_finish@entry=0x7fffffffd1a0) at /mnt/ceph/src/librbd/ImageState.cc:546
#5  0x00007fffdf05cbdd in librbd::ImageState<librbd::ImageCtx>::open (this=this@entry=0x5555568cf750, skip_open_parent=skip_open_parent@entry=false, 
    on_finish=on_finish@entry=0x7fffffffd1a0) at /mnt/ceph/src/librbd/ImageState.cc:271
#6  0x00007fffdf05ccfd in librbd::ImageState<librbd::ImageCtx>::open (this=0x5555568cf750, skip_open_parent=skip_open_parent@entry=false)
    at /mnt/ceph/src/librbd/ImageState.cc:250
#7  0x00007fffdf042116 in rbd_open (p=<optimized out>, name=name@entry=0x555556749fd8 "vol1", image=image@entry=0x555556749fd0, snap_name=<optimized out>)
    at /mnt/ceph/src/librbd/librbd.cc:2508
#8  0x00007fffdf534dd3 in qemu_rbd_open (bs=0x555556701880, options=<optimized out>, flags=24578, errp=0x7fffffffdd68) at ./block/rbd.c:565
#9  0x0000555555b0e658 in bdrv_open_common (errp=0x7fffffffdd58, options=0x555556757190, file=0x0, bs=0x555556701880) at ./block.c:1104
#10 bdrv_open_inherit (filename=<optimized out>, filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    reference=<optimized out>, options=0x555556757190, flags=<optimized out>, flags@entry=0, parent=parent@entry=0x5555566fb2c0, 
    child_role=child_role@entry=0x555556152c80 <child_file>, errp=0x7fffffffdeb8) at ./block.c:1833
#11 0x0000555555b0f68f in bdrv_open_child (filename=filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    options=options@entry=0x5555566ff670, bdref_key=bdref_key@entry=0x555555c24c69 "file", parent=parent@entry=0x5555566fb2c0, 
    child_role=child_role@entry=0x555556152c80 <child_file>, allow_none=allow_none@entry=true, errp=0x7fffffffdeb8) at ./block.c:1588
#12 0x0000555555b0e24c in bdrv_open_inherit (filename=filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    reference=reference@entry=0x0, options=0x5555566ff670, options@entry=0x5555566f90b0, flags=<optimized out>, flags@entry=0, parent=parent@entry=0x0, 
    child_role=child_role@entry=0x0, errp=0x7fffffffe190) at ./block.c:1794
#13 0x0000555555b0f7b1 in bdrv_open (filename=filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    reference=reference@entry=0x0, options=options@entry=0x5555566f90b0, flags=flags@entry=0, errp=errp@entry=0x7fffffffe190) at ./block.c:1924
#14 0x0000555555b4890b in blk_new_open (filename=filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    reference=reference@entry=0x0, options=options@entry=0x5555566f90b0, flags=flags@entry=0, errp=errp@entry=0x7fffffffe190) at ./block/block-backend.c:160
#15 0x000055555580c90f in blockdev_init (file=file@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    bs_opts=bs_opts@entry=0x5555566f90b0, errp=errp@entry=0x7fffffffe190) at ./blockdev.c:582
#16 0x0000555555936f88 in drive_new (all_opts=0x5555566883a0, block_default_type=<optimized out>) at ./blockdev.c:1080
#17 0x00005555559473d1 in drive_init_func (opaque=<optimized out>, opts=<optimized out>, errp=<optimized out>) at ./vl.c:1191
#18 0x0000555555bbcf7a in qemu_opts_foreach (list=<optimized out>, func=0x5555559473c0 <drive_init_func>, opaque=0x5555566a6b30, errp=0x0)
    at ./util/qemu-option.c:1116
#19 0x000055555580ffdf in main (argc=<optimized out>, argv=<optimized out>, envp=<optimized out>) at ./vl.c:4481
template <typename I>  // 打开rbd镜像入口,
void OpenRequest<I>::send_v2_detect_header() {
  if (m_image_ctx->id.empty()) {
    CephContext *cct = m_image_ctx->cct;
    ldout(cct, 10) << this << " " << __func__ << dendl;

    librados::ObjectReadOperation op;
    op.stat(NULL, NULL, NULL);

    using klass = OpenRequest<I>;
    librados::AioCompletion *comp =
      create_rados_callback<klass, &klass::handle_v2_detect_header>(this); // 创建回调类,回调函数是handle_v2_detect_header,收到响应时被调用
    m_out_bl.clear();
    m_image_ctx->md_ctx.aio_operate(util::id_obj_name(m_image_ctx->name),
                                   comp, &op, &m_out_bl);  // 发送请求给ceph服务端
    comp->release();
  } else {
    send_v2_get_name();
  }
}
template <typename I>
Context *OpenRequest<I>::handle_v2_detect_header(int *result) {
  CephContext *cct = m_image_ctx->cct;
  ldout(cct, 10) << __func__ << ": r=" << *result << dendl;

  if (*result == -ENOENT) {
    send_v1_detect_header();
  } else if (*result < 0) {
    lderr(cct) << "failed to stat v2 image header: " << cpp_strerror(*result)
               << dendl;
    send_close_image(*result);
  } else {
    m_image_ctx->old_format = false;
    send_v2_get_id();    // 直接调用下一个元数据请求函数
  }
  return nullptr;
}

通过直接调用+设置回调再调用形成回调流,最后进入send_v2_apply_metadata,它会注册最后一个回调handle_v2_apply_metadata。

控制流

  • 请求:由RadosClient、MgrClient及其成员函数处理,一般是普通dispatch流程,最终都交给AsyncMessenger发送出去
  • 响应:AsyncMessenger相关方法

数据流

由Objecter类及其成员函数处理,一般是fast dispatch流程,最终都交给AsyncMessenger发送出去

数据结构及IO数据流转

控制流

Context

所有回调的基类

CephContext

所有操作都需要用到,存储了各种全局信息,每个client一个(librbd算一个client)

ImageCtx

存储image的全局信息,每个image一个

ContextWQ

IO控制流的工作队列类(包含队列和处理方法),op_work_queue对象

librados::IoCtx、IoCtxImpl

与rados交互所需的全局信息,一个对外一个内部使用,一个pool一个

Finisher、Finisher::FinisherThread

回调执行类,专门管理回调队列并在线程中调用各种回调

数据流

AsyncConnection

与ceph服务端连接信息,由AsyncMessenger维护,所有请求都由其发送,AsyncConnection::process

librbdioAioCompletion

用户层发起的异步IO完成后的librbd内部回调,主要用来记录perf counter信息,以及IO请求发起用户传入的外部回调函数

librbd::ThreadPoolSingleton

封装ThreadPool,实现tp_librbd单例线程

ThreadPool

所有线程池的基类

ThreadPool::PointerWQ

IO数据流、控制流工作队列的共同基类

librbdioImageRequestWQ

IO数据流的工作队列类(包含队列和处理方法),io_work_queue对象

librbdioImageRequest

IO请求的基类,image级别,对应用户IO请求

librbdioAbstractImageWriteRequest

IO写请求的抽象类,继承自ImageRequest

librbdioImageWriteRequest

IO写请求类,继承自AbstractImageWriteRequest

Thread

所有线程、线程池的基类,子类通过start函数启动各自的entry函数进入thread执行体完成实际工作。

Objecter

上层单次IO操作对象,对应用户IO请求

Objecter::Op

上层IO操作对象可能包含多个object,需要拆分成多个Op,对应到rados对象

Dispatcher

与服务端交互的分发方法基类,MgrClient、Objecter、RadosClient都继承自Dispatcher类

Striper

IO封装、解封,读写操作过程中从IO到object互相转换

librbdioObjectRequest、librbdioObjectReadRequest、librbdioAbstractObjectWriteRequest、librbdioObjectWriteRequest

用户IO请求拆分后的object级别的IO请求

线程池与队列

tp_librbd(librbd::thread_pool)

tp_thread启动(处理io_work_queue及op_work_queue):ThreadPoolstart–ThreadPoolstart_threads–new WorkThread(this)–Threadcreate–Threadtry_create–pthread_create–Thread::_entry_func–Threadentry_wrapper–ThreadPoolWorkThread::entry–线程启动完毕,worker开始工作

(gdb) bt
#0  0x00007fffdf019af0 in ThreadPool::start()@plt () from /usr/local/lib/librbd.so.1   //----- 后续流程见上面注释
#1  0x00007fffdf04b475 in librbd::(anonymous namespace)::ThreadPoolSingleton::ThreadPoolSingleton (cct=0x555556752f30, this=0x5555568cdf50)
    at /mnt/ceph/src/librbd/ImageCtx.cc:66
#2  CephContext::lookup_or_create_singleton_object<librbd::(anonymous namespace)::ThreadPoolSingleton> (name="librbd::thread_pool", 
    p=<synthetic pointer>: <optimized out>, this=0x555556752f30) at /mnt/ceph/src/common/ceph_context.h:130
#3  librbd::ImageCtx::get_thread_pool_instance (cct=0x555556752f30, thread_pool=thread_pool@entry=0x7fffffffcfc8, op_work_queue=op_work_queue@entry=0x5555568cdc60)
    at /mnt/ceph/src/librbd/ImageCtx.cc:1159
#4  0x00007fffdf04c0f9 in librbd::ImageCtx::ImageCtx (this=0x5555568cd300, image_name=..., image_id=..., snap=0x0, p=..., ro=<optimized out>)
    at /mnt/ceph/src/librbd/ImageCtx.cc:213
#5  0x00007fffdf0420d7 in rbd_open (p=<optimized out>, name=name@entry=0x555556749fd8 "vol1", image=image@entry=0x555556749fd0, snap_name=0x0)
    at /mnt/ceph/src/librbd/librbd.cc:2505
#6  0x00007fffdf534dd3 in qemu_rbd_open (bs=0x555556701880, options=<optimized out>, flags=24578, errp=0x7fffffffdd78) at ./block/rbd.c:565
#7  0x0000555555b0e658 in bdrv_open_common (errp=0x7fffffffdd68, options=0x555556757190, file=0x0, bs=0x555556701880) at ./block.c:1104
#8  bdrv_open_inherit (filename=<optimized out>, filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    reference=<optimized out>, options=0x555556757190, flags=<optimized out>, flags@entry=0, parent=parent@entry=0x5555566fb2c0, 
    child_role=child_role@entry=0x555556152c80 <child_file>, errp=0x7fffffffdec8) at ./block.c:1833
#9  0x0000555555b0f68f in bdrv_open_child (filename=filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    options=options@entry=0x5555566ff670, bdref_key=bdref_key@entry=0x555555c24c69 "file", parent=parent@entry=0x5555566fb2c0, 
    child_role=child_role@entry=0x555556152c80 <child_file>, allow_none=allow_none@entry=true, errp=0x7fffffffdec8) at ./block.c:1588
#10 0x0000555555b0e24c in bdrv_open_inherit (filename=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", reference=<optimized out>, 
    options=0x5555566ff670, flags=<optimized out>, parent=parent@entry=0x0, child_role=child_role@entry=0x0, errp=0x7fffffffe1a0) at ./block.c:1794
#11 0x0000555555b0f7b1 in bdrv_open (filename=<optimized out>, reference=<optimized out>, options=<optimized out>, flags=<optimized out>, errp=<optimized out>)
    at ./block.c:1924
#12 0x0000555555b4890b in blk_new_open (filename=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", reference=0x0, 
    options=0x5555566f90b0, flags=0, errp=0x7fffffffe1a0) at ./block/block-backend.c:160
#13 0x000055555580c90f in blockdev_init (file=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", bs_opts=0x5555566f90b0, 
    errp=0x7fffffffe1a0) at ./blockdev.c:582
#14 0x0000555555936f88 in drive_new (all_opts=0x5555566883a0, block_default_type=<optimized out>) at ./blockdev.c:1080
#15 0x00005555559473d1 in drive_init_func (opaque=<optimized out>, opts=<optimized out>, errp=<optimized out>) at ./vl.c:1191
#16 0x0000555555bbcf7a in qemu_opts_foreach (list=<optimized out>, func=0x5555559473c0 <drive_init_func>, opaque=0x5555566a6b30, errp=0x0)
    at ./util/qemu-option.c:1116
#17 0x000055555580ffdf in main (argc=<optimized out>, argv=<optimized out>, envp=<optimized out>) at ./vl.c:4481

关联队列1:io_work_queue

// io_work_queue,所有rbd io操作的主队列,用来处理异步IO,在ImageCtx构造函数中初始化
// ictx->io_work_queue->aio_write/ictx->io_work_queue->aio_discard/ictx->io_work_queue->aio_read/ictx->io_work_queue->aio_flush/...
ImageCtx::ImageCtx() {
    .....
    io_work_queue = new io::ImageRequestWQ<>(
      this, "librbd::io_work_queue",
      cct->_conf->get_val<int64_t>("rbd_op_thread_timeout"),
      thread_pool);
      // ImageRequestWQ继承自ThreadPool::PointerWQ,初始化过程中(构造函数里)会把自己注册到thread_pool.work_queues里,thread_pool里绑定了op_work_queue用来调用IO操作结束后的回调,thread_pool用来处理ImageRequestWQ的所有IO操作,也即ictx->io_work_queue->aio_write最终都是由thread_pool的worker函数来处理的,处理结束后调用对应的回调,thread_pool的worker就是下面tp_thread启动过程。
      // ThreadPoolSingleton的op_work_queue是ContextWQ *,每个线程池只有一个,每个rbd镜像只有一个io处理线程池,而ThreadPool的work_queues是vector<WorkQueue_*>,保存了一个rbd卷的op_work_queue和io_work_queue(都是在new的时候register的)。
    ......
}

入队过程:见下面主要代码流程部分,从ImageRequestWQ::aio_write()到入队io_work_queue。

关联队列2:op_work_queue

// op_work_queue是用来异步调用IO操作的callback,跟Finisher有关(啥关系?)
// op_work_queue == ThreadPoolSingleton->op_work_queue == new ContextWQ("librbd::op_work_queue",
//                                                                      cct->_conf->get_val<int64_t>("rbd_op_thread_timeout"),
//                                                                      this)
//               --> 继承自ThreadPool::PointerWQ<Context>
// 在创建之后会通过ThreadPool::PointerWQ<Context>::register_work_queue把自己加入到ThreadPool的work_queues
ImageCtx::ImageCtx() {
    .....
    get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
    .....
}

void ImageCtx::get_thread_pool_instance(CephContext *cct,
                                          ThreadPool **thread_pool,
                                          ContextWQ **op_work_queue) {
    librbd::ThreadPoolSingleton *thread_pool_singleton;
    cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
      thread_pool_singleton, "librbd::thread_pool");
    *thread_pool = thread_pool_singleton;
    *op_work_queue = thread_pool_singleton->op_work_queue;
  }
  
    template<typename T>
  void lookup_or_create_singleton_object(T*& p, const std::string &name) {
    ceph_spin_lock(&_associated_objs_lock);
    if (!_associated_objs.count(name)) {
      p = new T(this); // p = new librbd::ThreadPoolSingleton(this);
      _associated_objs[name] = new TypedSingletonWrapper<T>(p);
     ...... 
    }  
    explicit ThreadPoolSingleton(CephContext *cct)
    : ThreadPool(cct, "librbd::thread_pool", "tp_librbd", 1,
                 "rbd_op_threads"),
      op_work_queue(new ContextWQ("librbd::op_work_queue",
                                  cct->_conf->get_val<int64_t>("rbd_op_thread_timeout"),
                                  this)) {
    start();
  }


  ContextWQ(const string &name, time_t ti, ThreadPool *tp)
    : ThreadPool::PointerWQ<Context>(name, ti, 0, tp), // tp = ThreadPoolSingleton
      m_lock("ContextWQ::m_lock") {
    this->register_work_queue();   // 注册op_work_queue到ThreadPool的work_queues
  }

入队过程:搜索op_work_queue->queue()即可找到,主要是执行各种rbd image控制操作时会用到。

两个队列的关系及出队过程

由tp_librbd(ThreadPool)的work_queues成员保存,work_queues[0] == op_work_queue,work_queues[1] == io_work_queue。在ThreadPool::worker里会死循环处理这两个队列,交替处理。

io_work_queue出队过程:ThreadPoolworker–ThreadPoolPointerWQ_void_dequeue/_void_process/_void_process_finish–ThreadPoolPointerWQ<librbdioImageRequestlibrbd::ImageCtx >_void_process–librbdio::ImageRequestWQlibrbd::ImageCtx::process

op_work_queue出队过程类似,只是最终调用的是ContextWQ::process。

Thread 1 "qemu-system-x86" hit Breakpoint 5, librbd::ImageCtx::ImageCtx (this=0x5555568cd3a0, image_name=..., image_id=..., snap=<optimized out>, p=..., 
    ro=<optimized out>) at /mnt/ceph/src/librbd/ImageCtx.cc:219 
219         if (cct->_conf->get_val<bool>("rbd_auto_exclusive_lock_until_manual_request")) {
(gdb) l
214         io_work_queue = new io::ImageRequestWQ<>(
215           this, "librbd::io_work_queue",
216           cct->_conf->get_val<int64_t>("rbd_op_thread_timeout"),
217           thread_pool);
218
219         if (cct->_conf->get_val<bool>("rbd_auto_exclusive_lock_until_manual_request")) {
220           exclusive_lock_policy = new exclusive_lock::AutomaticPolicy(this);
221         } else {
222           exclusive_lock_policy = new exclusive_lock::StandardPolicy(this);
223         }
(gdb) p io_work_queue
$38 = (librbd::io::ImageRequestWQ<librbd::ImageCtx> *) 0x5555568cfc90
(gdb) p io_work_queue.work_queues
There is no member or method named work_queues.
(gdb) p io_work_queue.
ImageRequestWQ            aio_read                  front                     m_on_shutdown             process_finish            unblock_writes
PointerWQ                 aio_write                 get_pool_lock             m_pool                    queue                     write
WorkQueue_                aio_writesame             handle_acquire_lock       m_processing              read                      writes_blocked
_clear                    block_writes              handle_blocked_writes     m_queued_reads            register_work_queue       writes_empty
_empty                    compare_and_write         handle_refreshed          m_queued_writes           requeue                   writesame
_void_dequeue             discard                   is_lock_required          m_require_lock_on_read    require_lock_on_read      ~ImageRequestWQ
_void_process             drain                     m_image_ctx               m_require_lock_on_write   set_require_lock          ~PointerWQ
_void_process_finish      empty                     m_in_flight_ios           m_shutdown                shut_down                 ~WorkQueue_
_vptr.WorkQueue_          fail_in_flight_io         m_in_flight_writes        m_write_blocker_contexts  signal                    
aio_compare_and_write     finish_in_flight_io       m_io_blockers             m_write_blockers          start_in_flight_io        
aio_discard               finish_in_flight_write    m_items                   name                      suicide_interval          
aio_flush                 finish_queued_io          m_lock                    process                   timeout_interval          
(gdb) p io_work_queue.m_pool 
$39 = (ThreadPool *) 0x5555568cdff0
(gdb) p io_work_queue.m_pool.work_queues
$40 = std::vector of length 2, capacity 2 = {0x5555568ce290, 0x5555568cfc90}
(gdb) p io_work_queue.m_pool.next_work_queue 
$41 = 1
(gdb) p op_work_queue.m_pool.next_work_queue   
$42 = 1
(gdb) p op_work_queue.m_pool
$43 = (ThreadPool *) 0x5555568cdff0
(gdb) p io_work_queue.m_pool.work_queues[0]  
$44 = (ThreadPool::WorkQueue_ *) 0x5555568ce290
(gdb) p io_work_queue 
$45 = (librbd::io::ImageRequestWQ<librbd::ImageCtx> *) 0x5555568cfc90
(gdb) p op_work_queue 
$46 = (ContextWQ *) 0x5555568ce290
(gdb) p io_work_queue.m_pool.work_queues[1]
$47 = (ThreadPool::WorkQueue_ *) 0x5555568cfc90
(gdb) p op_work_queue.m_pool.work_queues[1]
$48 = (ThreadPool::WorkQueue_ *) 0x5555568cfc90
(gdb) p op_work_queue.m_pool.work_queues[0]
$49 = (ThreadPool::WorkQueue_ *) 0x5555568ce290

finisher thread

执行体

Finisher::finisher_thread_entry

thread1:fn-radosclient

  • 启动及用途:libradosRadosClientconnect里启动的finisher thread,为rados client服务,用来执行相关回调

thread2:fn_anonymous

  • 启动及用途:MonClient::init里启动的finisher thread,为monitor client服务,用来执行相关回调
  • 与fn-radosclient的区别:anonymous不会通过perfcounter记录队列长度(queue_len),处理延时(complete_latency),而fn-radosclient会记录

thread3:taskfin_librbd

  • 启动及用途:主要用来给ImageWatcher对象执行各种任务(基于SafeTimer定时的或者基于finisher_queue的),ImageWatcher主要是在镜像属性变动的发送通知给关注方。
  • 入队过程与其他两个类似,看queue方法调用位置即可。
// 启动过程,handle_v2_apply_metadata是在打开rbd image时注册的回调,它又初始化了ImageWatcher对象
Thread 16 "fn-radosclient" hit Breakpoint 18, librbd::ImageWatcher<librbd::ImageCtx>::ImageWatcher (this=0x7fffb405ae50, image_ctx=...)
    at /mnt/ceph/src/librbd/ImageWatcher.cc:67
67          m_task_finisher(new TaskFinisher<Task>(*m_image_ctx.cct)),
(gdb) bt
#0  librbd::ImageWatcher<librbd::ImageCtx>::ImageWatcher (this=0x7fffb405ae50, image_ctx=...) at /mnt/ceph/src/librbd/ImageWatcher.cc:67
#1  0x00007fffdf0485bc in librbd::ImageCtx::register_watch (this=0x5555568c9b80, on_finish=0x7fffb40020f0) at /mnt/ceph/src/librbd/ImageCtx.cc:875
#2  0x00007fffdf0ef10d in librbd::image::OpenRequest<librbd::ImageCtx>::send_register_watch (this=this@entry=0x5555568cdd90)
    at /mnt/ceph/src/librbd/image/OpenRequest.cc:490
#3  0x00007fffdf0f6697 in librbd::image::OpenRequest<librbd::ImageCtx>::handle_v2_apply_metadata (this=this@entry=0x5555568cdd90, result=result@entry=0x7fffc17f97f4)
    at /mnt/ceph/src/librbd/image/OpenRequest.cc:471
#4  0x00007fffdf0f6b6f in librbd::util::detail::rados_state_callback<librbd::image::OpenRequest<librbd::ImageCtx>, &librbd::image::OpenRequest<librbd::ImageCtx>::handle_v2_apply_metadata, true> (c=<optimized out>, arg=0x5555568cdd90) at /mnt/ceph/src/librbd/Utils.h:39
#5  0x00007fffded2abcd in librados::C_AioComplete::finish (this=0x7fffc4000aa0, r=<optimized out>) at /mnt/ceph/src/librados/AioCompletionImpl.h:169
#6  0x00007fffded0b109 in Context::complete (this=0x7fffc4000aa0, r=<optimized out>) at /mnt/ceph/src/include/Context.h:70
#7  0x00007fffd61f6ce0 in Finisher::finisher_thread_entry (this=0x5555567e2100) at /mnt/ceph/src/common/Finisher.cc:72
#8  0x00007ffff2a7d494 in start_thread (arg=0x7fffc17fa700) at pthread_create.c:333
#9  0x00007ffff27bfacf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:97
  TaskFinisher(CephContext &cct) : m_cct(cct) {
    TaskFinisherSingleton *singleton;
    cct.lookup_or_create_singleton_object<TaskFinisherSingleton>(
      singleton, "librbd::TaskFinisher::m_safe_timer");
    m_lock = &singleton->m_lock;
    m_safe_timer = singleton->m_safe_timer;
    m_finisher = singleton->m_finisher;
  }
  
  explicit TaskFinisherSingleton(CephContext *cct)
    : m_lock("librbd::TaskFinisher::m_lock") {
    m_safe_timer = new SafeTimer(cct, m_lock, false);
    m_safe_timer->init();  // 启动一个SafeTimer线程
    m_finisher = new Finisher(cct, "librbd::TaskFinisher::m_finisher", "taskfin_librbd");
    m_finisher->start(); // 启动线程taskfin_librbd
  }

关联队列:Finisher::finisher_queue、finisher_queue_rval

二者区别见注释:

  /// Queue for contexts for which complete(0) will be called.
  /// NULLs in this queue indicate that an item from finisher_queue_rval
  /// should be completed in that place instead.
  vector<Context*> finisher_queue;
  
  /// Queue for contexts for which the complete function will be called
  /// with a parameter other than 0.
  list<pair<Context*,int> > finisher_queue_rval;
  • 入队过程:所有调用Finisher::queue函数的地方(一般都是finisher.queue,如c->io->client->finisher.queue),
  • 出队过程:线程执行体Finisher::finisher_thread_entry里面出队

入队过程示例(fn-radosclient线程):

Thread 8 "msgr-worker-2" hit Breakpoint 17, Objecter::handle_osd_op_reply (this=this@entry=0x5555568bda60, m=m@entry=0x7fffc8390ba0)
    at /mnt/ceph/src/osdc/Objecter.cc:3558
(gdb) bt
#0  librados::IoCtxImpl::C_aio_Complete::finish (this=0x7fffb00027b0, r=0) at /mnt/ceph/src/librados/IoCtxImpl.cc:2030
#1  0x00007fffded0b109 in Context::complete (this=0x7fffb00027b0, r=<optimized out>) at /mnt/ceph/src/include/Context.h:70
#2  0x00007fffded6dcae in Objecter::handle_osd_op_reply (this=this@entry=0x5555568bda60, m=m@entry=0x7fffc8390ba0) at /mnt/ceph/src/osdc/Objecter.cc:3558
#3  0x00007fffded7887b in Objecter::ms_dispatch (this=0x5555568bda60, m=0x7fffc8390ba0) at /mnt/ceph/src/osdc/Objecter.cc:970
#4  0x00007fffded7dbca in Objecter::ms_fast_dispatch (this=<optimized out>, m=0x7fffc8390ba0) at /mnt/ceph/src/osdc/Objecter.h:2099
#5  0x00007fffd627296e in Messenger::ms_fast_dispatch (m=0x7fffc8390ba0, this=0x555556830c90) at /mnt/ceph/src/msg/Messenger.h:639
#6  DispatchQueue::fast_dispatch (this=0x555556830e10, m=m@entry=0x7fffc8390ba0) at /mnt/ceph/src/msg/DispatchQueue.cc:71
#7  0x00007fffd638c533 in AsyncConnection::process (this=0x7fffb8007cd0) at /mnt/ceph/src/msg/async/AsyncConnection.cc:792
#8  0x00007fffd639d208 in EventCenter::process_events (this=this@entry=0x55555688bc80, timeout_microseconds=<optimized out>, timeout_microseconds@entry=30000000, 
    working_dur=working_dur@entry=0x7fffd1c1a868) at /mnt/ceph/src/msg/async/Event.cc:409
#9  0x00007fffd63a1e98 in NetworkStack::<lambda()>::operator()(void) const (__closure=0x5555568b8ee8) at /mnt/ceph/src/msg/async/Stack.cc:51
#10 0x00007fffd5a82e6f in ?? () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#11 0x00007ffff2a7d494 in start_thread (arg=0x7fffd1c1b700) at pthread_create.c:333
#12 0x00007ffff27bfacf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:97
2015    void librados::IoCtxImpl::C_aio_Complete::finish(int r)
2016    {
(gdb) 
2028      if (c->callback_complete ||
(gdb) 
2030        c->io->client->finisher.queue(new C_AioComplete(c));
(gdb) p c->io
$10 = (librados::IoCtxImpl *) 0x5555568ca660
(gdb) p c
$11 = (librados::AioCompletionImpl *) 0x7fffb0011750
(gdb) p c->io->client
$12 = (librados::RadosClient *) 0x5555567e1520
(gdb) p c->callback_complete
$14 = (rados_callback_t) 0x7fffdf12a1a0 <librbd::util::detail::rados_callback<librbd::io::AbstractObjectWriteRequest<librbd::ImageCtx>, &librbd::io::AbstractObjectWriteRequest<librbd::ImageCtx>::handle_write_object>(void*, void*)>

handle_write_object是write_object函数注册的回调,属于tp_librbd线程,也即处理io的线程。

template <typename I>
void AbstractObjectWriteRequest<I>::write_object() {
  I *image_ctx = this->m_ictx;
  ldout(image_ctx->cct, 20) << dendl;

  librados::ObjectWriteOperation write;
  if (m_copyup_enabled) {
    ldout(image_ctx->cct, 20) << "guarding write" << dendl;
    write.assert_exists();
  }

  add_write_hint(&write);
  add_write_ops(&write);
  assert(write.size() != 0);

  librados::AioCompletion *rados_completion = librbd::util::create_rados_callback<  //radosclient写回调
    AbstractObjectWriteRequest<I>,
    &AbstractObjectWriteRequest<I>::handle_write_object>(this);
  int r = image_ctx->data_ctx.aio_operate(  // librados::IoCtx::aio_operate
    this->m_oid, rados_completion, &write, m_snap_seq, m_snaps,
    (this->m_trace.valid() ? this->m_trace.get_info() : nullptr));
  assert(r == 0);
  rados_completion->release();
}

rados_completion回调最终传递给了ObjecterOponfinish(经过一次封装:C_aio_Complete(c)),实现了从tp_librbd线程转到msgr-worker-*线程,再到fn-radosclient线程(也即Finisher线程)的流转,这也是(几乎)所有回调都由Finisher线程调用的缘由。

msgr-worker-*

  • 暂未深入分析
  • 启动及用途:异步消息收发线程,主要与ms_dispatch、ms_local线程交互
  • 关联的队列:用于处理各种事件
  • 执行体:NetworkStackadd_thread里面return的lambda函数,由PosixNetworkStackspawn_worker启动
  • 数量:由配置项cct->_conf->ms_async_op_threads决定,默认值3,代码里写死上限值24个,配置项超出这个会被强制改为24,看代码逻辑应该不能在线修改
// 启动
Thread 1 "qemu-system-x86" hit Breakpoint 5, NetworkStack::add_thread (this=this@entry=0x555556831dc0, i=i@entry=0) at /mnt/ceph/src/msg/async/Stack.cc:37
37        Worker *w = workers[i];
(gdb) bt
#0  NetworkStack::add_thread (this=this@entry=0x555556831dc0, i=i@entry=0) at /mnt/ceph/src/msg/async/Stack.cc:37
#1  0x00007fffd63a2dd5 in NetworkStack::start (this=0x555556831dc0) at /mnt/ceph/src/msg/async/Stack.cc:135
#2  0x00007fffd6396704 in AsyncMessenger::AsyncMessenger (this=0x555556830bf0, cct=0x5555567522b0, name=..., type=..., mname=..., _nonce=11119027854570673215)
    at /mnt/ceph/src/msg/async/AsyncMessenger.cc:265
#3  0x00007fffd634409f in Messenger::create (cct=cct@entry=0x5555567522b0, type="async+posix", name=..., lname="", nonce=<optimized out>, cflags=0)
    at /mnt/ceph/src/msg/Messenger.cc:43
#4  0x00007fffd634476a in Messenger::create_client_messenger (cct=0x5555567522b0, lname="") at /mnt/ceph/src/msg/Messenger.cc:23
#5  0x00007fffded35ff5 in librados::RadosClient::connect (this=this@entry=0x5555567e1480) at /mnt/ceph/src/librados/RadosClient.cc:257
#6  0x00007fffdece268f in rados_connect (cluster=0x5555567e1480) at /mnt/ceph/src/librados/librados.cc:2851
#7  0x00007fffdf534d96 in qemu_rbd_open (bs=0x555556701880, options=<optimized out>, flags=24578, errp=0x7fffffffdd68) at ./block/rbd.c:553
#8  0x0000555555b0e658 in bdrv_open_common (errp=0x7fffffffdd58, options=0x555556747120, file=0x0, bs=0x555556701880) at ./block.c:1104
#9  bdrv_open_inherit (filename=<optimized out>, filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    reference=<optimized out>, options=0x555556747120, flags=<optimized out>, flags@entry=0, parent=parent@entry=0x5555566fb2c0, 
    child_role=child_role@entry=0x555556152c80 <child_file>, errp=0x7fffffffdeb8) at ./block.c:1833
#10 0x0000555555b0f68f in bdrv_open_child (filename=filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    options=options@entry=0x5555566ff670, bdref_key=bdref_key@entry=0x555555c24c69 "file", parent=parent@entry=0x5555566fb2c0, 
    child_role=child_role@entry=0x555556152c80 <child_file>, allow_none=allow_none@entry=true, errp=0x7fffffffdeb8) at ./block.c:1588
#11 0x0000555555b0e24c in bdrv_open_inherit (filename=filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    reference=reference@entry=0x0, options=0x5555566ff670, options@entry=0x5555566f90b0, flags=<optimized out>, flags@entry=0, parent=parent@entry=0x0, 
    child_role=child_role@entry=0x0, errp=0x7fffffffe190) at ./block.c:1794
#12 0x0000555555b0f7b1 in bdrv_open (filename=filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    reference=reference@entry=0x0, options=options@entry=0x5555566f90b0, flags=flags@entry=0, errp=errp@entry=0x7fffffffe190) at ./block.c:1924
#13 0x0000555555b4890b in blk_new_open (filename=filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    reference=reference@entry=0x0, options=options@entry=0x5555566f90b0, flags=flags@entry=0, errp=errp@entry=0x7fffffffe190) at ./block/block-backend.c:160
#14 0x000055555580c90f in blockdev_init (file=file@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    bs_opts=bs_opts@entry=0x5555566f90b0, errp=errp@entry=0x7fffffffe190) at ./blockdev.c:582
#15 0x0000555555936f88 in drive_new (all_opts=0x5555566883a0, block_default_type=<optimized out>) at ./blockdev.c:1080
#16 0x00005555559473d1 in drive_init_func (opaque=<optimized out>, opts=<optimized out>, errp=<optimized out>) at ./vl.c:1191
#17 0x0000555555bbcf7a in qemu_opts_foreach (list=<optimized out>, func=0x5555559473c0 <drive_init_func>, opaque=0x5555566a6b30, errp=0x0)
    at ./util/qemu-option.c:1116
#18 0x000055555580ffdf in main (argc=<optimized out>, argv=<optimized out>, envp=<optimized out>) at ./vl.c:4481

admin_socket

  • 用途:用来创建ceph-client.admin.2840389.94310395876384.asok,socket文件位置由ceph.conf配置文件中的[client]admin_socket = /var/run/ceph/qemu/$cluster-$type.$id.$pid.$cctid.asok决定。创建完之后作为UNIX domain socket的server端接收客户端请求,并给出响应,客户端可以用ceph –admin-daemon ceph-client.admin.2840389.94310395876384.asok命令发送请求,支持配置修改、perf dump等命令,具体命令列表可以用help子命令查看。
  • 初始化及启动:在CephContext构造函数中初始化,在CephContext::start_service_thread中启动。
// 初始化
#0  CephContext::CephContext (this=0x555556752f30, module_type_=8, code_env=CODE_ENVIRONMENT_LIBRARY, init_flags_=0) at /mnt/ceph/src/common/ceph_context.cc:558
#1  0x00007fffd64525f1 in common_preinit (iparams=..., code_env=code_env@entry=CODE_ENVIRONMENT_LIBRARY, flags=flags@entry=0)
				 at /mnt/ceph/src/common/common_init.cc:34
#2  0x00007fffded093f0 in rados_create_cct (clustername=clustername@entry=0x7fffded9effd "", iparams=iparams@entry=0x7fffffffd2b0)
				 at /mnt/ceph/src/librados/librados.cc:2769
#3  0x00007fffded0996e in rados_create (pcluster=pcluster@entry=0x555556749fc0, id=0x0) at /mnt/ceph/src/librados/librados.cc:2785
#4  0x00007fffdf534d0e in qemu_rbd_open (bs=0x555556701880, options=<optimized out>, flags=24578, errp=0x7fffffffdd48) at ./block/rbd.c:507
#5  0x0000555555b0e658 in bdrv_open_common (errp=0x7fffffffdd38, options=0x555556757190, file=0x0, bs=0x555556701880) at ./block.c:1104
#6  bdrv_open_inherit (filename=<optimized out>, filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
				 reference=<optimized out>, options=0x555556757190, flags=<optimized out>, flags@entry=0, parent=parent@entry=0x5555566fb2c0, 
				 child_role=child_role@entry=0x555556152c80 <child_file>, errp=0x7fffffffde98) at ./block.c:1833
#7  0x0000555555b0f68f in bdrv_open_child (filename=filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
				 options=options@entry=0x5555566ff670, bdref_key=bdref_key@entry=0x555555c24c69 "file", parent=parent@entry=0x5555566fb2c0, 
				 child_role=child_role@entry=0x555556152c80 <child_file>, allow_none=allow_none@entry=true, errp=0x7fffffffde98) at ./block.c:1588
#8  0x0000555555b0e24c in bdrv_open_inherit (filename=filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
				 reference=reference@entry=0x0, options=0x5555566ff670, options@entry=0x5555566f90b0, flags=<optimized out>, flags@entry=0, parent=parent@entry=0x0, 
				 child_role=child_role@entry=0x0, errp=0x7fffffffe170) at ./block.c:1794
#9  0x0000555555b0f7b1 in bdrv_open (filename=filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
				 reference=reference@entry=0x0, options=options@entry=0x5555566f90b0, flags=flags@entry=0, errp=errp@entry=0x7fffffffe170) at ./block.c:1924
#10 0x0000555555b4890b in blk_new_open (filename=filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
				 reference=reference@entry=0x0, options=options@entry=0x5555566f90b0, flags=flags@entry=0, errp=errp@entry=0x7fffffffe170) at ./block/block-backend.c:160
#11 0x000055555580c90f in blockdev_init (file=file@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
				 bs_opts=bs_opts@entry=0x5555566f90b0, errp=errp@entry=0x7fffffffe170) at ./blockdev.c:582
#12 0x0000555555936f88 in drive_new (all_opts=0x5555566883a0, block_default_type=<optimized out>) at ./blockdev.c:1080
#13 0x00005555559473d1 in drive_init_func (opaque=<optimized out>, opts=<optimized out>, errp=<optimized out>) at ./vl.c:1191
#14 0x0000555555bbcf7a in qemu_opts_foreach (list=<optimized out>, func=0x5555559473c0 <drive_init_func>, opaque=0x5555566a6b30, errp=0x0)
				 at ./util/qemu-option.c:1116
#15 0x000055555580ffdf in main (argc=<optimized out>, argv=<optimized out>, envp=<optimized out>) at ./vl.c:4481
// 启动
Thread 1 "qemu-system-x86" hit Breakpoint 2, 0x00007fffd6182600 in CephContext::start_service_thread()@plt () from /usr/local/lib/ceph/libceph-common.so.0
(gdb) bt
#0  0x00007fffd6182600 in CephContext::start_service_thread()@plt () from /usr/local/lib/ceph/libceph-common.so.0
#1  0x00007fffd645b3cc in common_init_finish (cct=0x5555567522b0) at /mnt/ceph/src/common/common_init.cc:95
#2  0x00007fffded35fa0 in librados::RadosClient::connect (this=this@entry=0x5555567e1480) at /mnt/ceph/src/librados/RadosClient.cc:240
#3  0x00007fffdece268f in rados_connect (cluster=0x5555567e1480) at /mnt/ceph/src/librados/librados.cc:2851
#4  0x00007fffdf534d96 in qemu_rbd_open (bs=0x555556701880, options=<optimized out>, flags=24578, errp=0x7fffffffdd68) at ./block/rbd.c:553
#5  0x0000555555b0e658 in bdrv_open_common (errp=0x7fffffffdd58, options=0x555556747120, file=0x0, bs=0x555556701880) at ./block.c:1104
#6  bdrv_open_inherit (filename=<optimized out>, filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    reference=<optimized out>, options=0x555556747120, flags=<optimized out>, flags@entry=0, parent=parent@entry=0x5555566fb2c0, 
    child_role=child_role@entry=0x555556152c80 <child_file>, errp=0x7fffffffdeb8) at ./block.c:1833
#7  0x0000555555b0f68f in bdrv_open_child (filename=filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    options=options@entry=0x5555566ff670, bdref_key=bdref_key@entry=0x555555c24c69 "file", parent=parent@entry=0x5555566fb2c0, 
    child_role=child_role@entry=0x555556152c80 <child_file>, allow_none=allow_none@entry=true, errp=0x7fffffffdeb8) at ./block.c:1588
#8  0x0000555555b0e24c in bdrv_open_inherit (filename=filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    reference=reference@entry=0x0, options=0x5555566ff670, options@entry=0x5555566f90b0, flags=<optimized out>, flags@entry=0, parent=parent@entry=0x0, 
    child_role=child_role@entry=0x0, errp=0x7fffffffe190) at ./block.c:1794
#9  0x0000555555b0f7b1 in bdrv_open (filename=filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    reference=reference@entry=0x0, options=options@entry=0x5555566f90b0, flags=flags@entry=0, errp=errp@entry=0x7fffffffe190) at ./block.c:1924
#10 0x0000555555b4890b in blk_new_open (filename=filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    reference=reference@entry=0x0, options=options@entry=0x5555566f90b0, flags=flags@entry=0, errp=errp@entry=0x7fffffffe190) at ./block/block-backend.c:160
#11 0x000055555580c90f in blockdev_init (file=file@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    bs_opts=bs_opts@entry=0x5555566f90b0, errp=errp@entry=0x7fffffffe190) at ./blockdev.c:582
#12 0x0000555555936f88 in drive_new (all_opts=0x5555566883a0, block_default_type=<optimized out>) at ./blockdev.c:1080
#13 0x00005555559473d1 in drive_init_func (opaque=<optimized out>, opts=<optimized out>, errp=<optimized out>) at ./vl.c:1191
#14 0x0000555555bbcf7a in qemu_opts_foreach (list=<optimized out>, func=0x5555559473c0 <drive_init_func>, opaque=0x5555566a6b30, errp=0x0)
    at ./util/qemu-option.c:1116
#15 0x000055555580ffdf in main (argc=<optimized out>, argv=<optimized out>, envp=<optimized out>) at ./vl.c:4481

ms_dispatch、ms_local

ms_dispatch

  • 用途:暂未深入分析,接收ms_local线程转发的普通dispatch消息,然后转发给Messager注册的普通dispatcher处理(dispatcher有MgrClient、Objecter、RadosClient,他们都继承自Dispatcher类)
  • 关联队列:优先级队列PrioritizedQueue<QueueItem, uint64_t> mqueue
  • 入队:通过DispatchQueue::enqueue入队
  • 出队:线程执行体DispatchQueue::entry

ms_local

  • 用途:初步理解是接收librbd client端请求,转发给ms_dispatch线程处理(普通dispatch,入队mqueue),或者fast dispatch(直接通过Messenger的fast dispatcher发送,messenger目前为AsyncMessenger,dispatcher有MgrClient、Objecter、RadosClient,他们都继承自Dispatcher类)
  • 关联队列:list<pair<Message *, int> > local_messages
  • 入队:通过DispatchQueue::local_delivery入队
  • 出队:线程执行体DispatchQueue::run_local_delivery

启动

// DispatchQueue::start会启动两个线程:
// dispatch_thread.create("ms_dispatch");
// local_delivery_thread.create("ms_local");
Thread 1 "qemu-system-x86" hit Breakpoint 2, DispatchQueue::start (this=this@entry=0x555556830d10) at /mnt/ceph/src/msg/DispatchQueue.cc:229
229     {
(gdb) bt
#0  DispatchQueue::start (this=this@entry=0x555556830d10) at /mnt/ceph/src/msg/DispatchQueue.cc:229
#1  0x00007fffd639242e in AsyncMessenger::ready (this=0x555556830b90) at /mnt/ceph/src/msg/async/AsyncMessenger.cc:306
#2  0x00007fffded373a6 in Messenger::add_dispatcher_head (d=<optimized out>, this=0x555556830b90) at /mnt/ceph/src/msg/Messenger.h:397
#3  librados::RadosClient::connect (this=this@entry=0x5555567e1420) at /mnt/ceph/src/librados/RadosClient.cc:282
#4  0x00007fffdece268f in rados_connect (cluster=0x5555567e1420) at /mnt/ceph/src/librados/librados.cc:2851
#5  0x00007fffdf534d96 in qemu_rbd_open (bs=0x555556701880, options=<optimized out>, flags=24578, errp=0x7fffffffdd68) at ./block/rbd.c:553
#6  0x0000555555b0e658 in bdrv_open_common (errp=0x7fffffffdd58, options=0x555556747120, file=0x0, bs=0x555556701880) at ./block.c:1104
#7  bdrv_open_inherit (filename=<optimized out>, filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    reference=<optimized out>, options=0x555556747120, flags=<optimized out>, flags@entry=0, parent=parent@entry=0x5555566fb2c0, 
    child_role=child_role@entry=0x555556152c80 <child_file>, errp=0x7fffffffdeb8) at ./block.c:1833
#8  0x0000555555b0f68f in bdrv_open_child (filename=filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    options=options@entry=0x5555566ff670, bdref_key=bdref_key@entry=0x555555c24c69 "file", parent=parent@entry=0x5555566fb2c0, 
    child_role=child_role@entry=0x555556152c80 <child_file>, allow_none=allow_none@entry=true, errp=0x7fffffffdeb8) at ./block.c:1588
#9  0x0000555555b0e24c in bdrv_open_inherit (filename=filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    reference=reference@entry=0x0, options=0x5555566ff670, options@entry=0x5555566f90b0, flags=<optimized out>, flags@entry=0, parent=parent@entry=0x0, 
    child_role=child_role@entry=0x0, errp=0x7fffffffe190) at ./block.c:1794
#10 0x0000555555b0f7b1 in bdrv_open (filename=filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    reference=reference@entry=0x0, options=options@entry=0x5555566f90b0, flags=flags@entry=0, errp=errp@entry=0x7fffffffe190) at ./block.c:1924
#11 0x0000555555b4890b in blk_new_open (filename=filename@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    reference=reference@entry=0x0, options=options@entry=0x5555566f90b0, flags=flags@entry=0, errp=errp@entry=0x7fffffffe190) at ./block/block-backend.c:160
#12 0x000055555580c90f in blockdev_init (file=file@entry=0x5555566f0e00 "rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789", 
    bs_opts=bs_opts@entry=0x5555566f90b0, errp=errp@entry=0x7fffffffe190) at ./blockdev.c:582
#13 0x0000555555936f88 in drive_new (all_opts=0x5555566883a0, block_default_type=<optimized out>) at ./blockdev.c:1080
#14 0x00005555559473d1 in drive_init_func (opaque=<optimized out>, opts=<optimized out>, errp=<optimized out>) at ./vl.c:1191
#15 0x0000555555bbcf7a in qemu_opts_foreach (list=<optimized out>, func=0x5555559473c0 <drive_init_func>, opaque=0x5555566a6b30, errp=0x0)
    at ./util/qemu-option.c:1116
#16 0x000055555580ffdf in main (argc=<optimized out>, argv=<optimized out>, envp=<optimized out>) at ./vl.c:4481

safe_timer

  • 用途:管理及触发定时任务事件,librbd中主要用来跟monitor保持心跳(MonClient::schedule_tick),以及ImageWatcher的定时事件。
  • 初始化及启动:qemu中一共启动了3个线程,其中一处是在libradosRadosClientRadosClient构造函数中初始化,在libradosRadosClientconnect中调用SafeTimerinit启动。通过SafeTimer类进行管理和对外提供接口,SafeTimer类包含一个SafeTimerThread类型的成员thread,SafeTimerThread继承Thread类,safe_timer线程通过SafeTimerinit函数使用thread成员进行创建及启动,线程执行的实体函数是SafeTimertimer_thread(SafeTimerThreadentry里面调用),用来轮询检查是否有新的定时任务事件需要触发。另一处是在ImageWatcher对象初始化时启动,第三处未分析,在构造函数处加断点调试即可知晓。
  • 与cephtimer_detailtimer的关系:二者都有定时器功能,但cephtimer_detailtimer更轻量(参考该类的注释),IO卡顿预警功能使用的是cephtimer_detailtimer。
// 建立连接的时候触发心跳tick流程,一次tick结束后会在回调函数里设置下次tick事件,无限循环
#0  SafeTimer::add_event_after (this=0x5555567e16a8, seconds=10, callback=0x5555568c4b90) at /mnt/ceph/src/common/Timer.cc:118
#1  0x00007fffd6244100 in MonClient::init (this=this@entry=0x5555567e14c8) at /mnt/ceph/src/mon/MonClient.cc:404
#2  0x00007fffded36cfa in librados::RadosClient::connect (this=this@entry=0x5555567e1480) at /mnt/ceph/src/librados/RadosClient.cc:292
#3  0x00007fffdece268f in rados_connect (cluster=0x5555567e1480) at /mnt/ceph/src/librados/librados.cc:2851
#4  0x00007fffdf534d96 in qemu_rbd_open (bs=0x555556701880, options=<optimized out>, flags=24578, errp=0x7fffffffdd68) at ./block/rbd.c:553

关联的队列:SafeTimer::schedule

  • 入队过程:SafeTimeradd_event_after、SafeTimeradd_event_at
  • 出队过程:SafeTimercancel_event、SafeTimercancel_all_events,以及SafeTimer::timer_thread中正常的事件触发。
void MonClient::schedule_tick()
{
  struct C_Tick : public Context {
    MonClient *monc;
    explicit C_Tick(MonClient *m) : monc(m) {}
    void finish(int r) override { // 事件回调
      monc->tick();
    }
  };

  if (_hunting()) {
    timer.add_event_after(cct->_conf->mon_client_hunt_interval
			  * reopen_interval_multiplier,
			  new C_Tick(this));
  } else
    // 参数1表示事件触发延时,参数2是事件回调类,继承自Context,事件触发时SafeTimer::timer_thread会调用C_Tick的complete函数,也即Context->complete,它又调用了finish函数,也即实际的事件回调。
    timer.add_event_after(cct->_conf->mon_client_ping_interval, new C_Tick(this));
}

service

  • 用途:CephContextServiceThread::entry是线程执行体,有3个工作,1是检查是否需要重新打开log文件,2是检查心跳,3是更新perfcounter中的记录值,但如果是默认配置情况下,这个线程2、3两个任务是不做的。
  • 初始化及启动:过程与admin_socket的启动过程相同,都在CephContext::start_service_thread中完成

log

  • 初始化及启动:在CephContext构造函数中初始化和启动。
  • 用途:负责文件日志打印和内存日志的存储和dump(通过admin socket)。

主要代码流程分析

// qemu 到 ImageRequestWQ<I>::aio_write():

Thread 37 "CPU 0/TCG" hit Breakpoint 3, librbd::io::ImageRequestWQ<librbd::ImageCtx>::aio_write(librbd::io::AioCompletion*, unsigned long, unsigned long, ceph::buffer::list&&, int, bool) (this=0x55cbc3898890, c=0x7facc57b9b70, off=off@entry=26629120, len=len@entry=1024, 
    bl=bl@entry=<unknown type in /usr/local/lib/librbd.so.1, CU 0x1dbf9f7, DIE 0x1e87755>, op_flags=op_flags@entry=0, native_async=true)
    at /mnt/ceph/src/librbd/io/ImageRequestWQ.cc:239
239     void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
(gdb) bt
#0  librbd::io::ImageRequestWQ<librbd::ImageCtx>::aio_write(librbd::io::AioCompletion*, unsigned long, unsigned long, ceph::buffer::list&&, int, bool) (
    this=0x55cbc3898890, c=0x7facc57b9b70, off=off@entry=26629120, len=len@entry=1024, 
    bl=bl@entry=<unknown type in /usr/local/lib/librbd.so.1, CU 0x1dbf9f7, DIE 0x1e87755>, op_flags=op_flags@entry=0, native_async=true)
    at /mnt/ceph/src/librbd/io/ImageRequestWQ.cc:239
#1  0x00007fad47414310 in rbd_aio_write (image=<optimized out>, off=off@entry=26629120, len=len@entry=1024, buf=buf@entry=0x7facc57ba000 "\300;9\230", 
    c=<optimized out>) at /mnt/ceph/src/librbd/librbd.cc:3536
#2  0x00007fad4791633a in rbd_start_aio (bs=<optimized out>, off=26629120, qiov=<optimized out>, size=1024, cb=<optimized out>, opaque=<optimized out>, 
    cmd=RBD_AIO_WRITE) at ./block/rbd.c:697
#3  0x00007fad47916426 in qemu_rbd_aio_writev (bs=<optimized out>, sector_num=<optimized out>, qiov=<optimized out>, nb_sectors=<optimized out>, cb=<optimized out>, 
    opaque=<optimized out>) at ./block/rbd.c:746
#4  0x000055cbc23b7c3c in bdrv_driver_pwritev (bs=bs@entry=0x55cbc36c9890, offset=offset@entry=26629120, bytes=bytes@entry=1024, qiov=qiov@entry=0x7facc57b8970, 
    flags=flags@entry=0) at ./block/io.c:901
#5  0x000055cbc23b8ed0 in bdrv_aligned_pwritev (bs=bs@entry=0x55cbc36c9890, req=req@entry=0x7facc93d5bc0, offset=offset@entry=26629120, bytes=bytes@entry=1024, 
    align=align@entry=512, qiov=qiov@entry=0x7facc57b8970, flags=0) at ./block/io.c:1360
#6  0x000055cbc23b9ba7 in bdrv_co_pwritev (child=<optimized out>, offset=<optimized out>, offset@entry=26629120, bytes=bytes@entry=1024, 
    qiov=qiov@entry=0x7facc57b8970, flags=flags@entry=0) at ./block/io.c:1610
#7  0x000055cbc237b469 in raw_co_pwritev (bs=0x55cbc36c35e0, offset=26629120, bytes=1024, qiov=<optimized out>, flags=<optimized out>) at ./block/raw_bsd.c:243
#8  0x000055cbc23b7b21 in bdrv_driver_pwritev (bs=bs@entry=0x55cbc36c35e0, offset=offset@entry=26629120, bytes=bytes@entry=1024, qiov=qiov@entry=0x7facc57b8970, 
    flags=flags@entry=0) at ./block/io.c:875
#9  0x000055cbc23b8ed0 in bdrv_aligned_pwritev (bs=bs@entry=0x55cbc36c35e0, req=req@entry=0x7facc93d5e90, offset=offset@entry=26629120, bytes=bytes@entry=1024, 
    align=align@entry=1, qiov=qiov@entry=0x7facc57b8970, flags=0) at ./block/io.c:1360
#10 0x000055cbc23b9ba7 in bdrv_co_pwritev (child=<optimized out>, offset=<optimized out>, offset@entry=26629120, bytes=bytes@entry=1024, 
    qiov=qiov@entry=0x7facc57b8970, flags=0) at ./block/io.c:1610
#11 0x000055cbc23ab90d in blk_co_pwritev (blk=0x55cbc36bd690, offset=26629120, bytes=1024, qiov=0x7facc57b8970, flags=<optimized out>) at ./block/block-backend.c:848
#12 0x000055cbc23aba2b in blk_aio_write_entry (opaque=0x7facc58a9b70) at ./block/block-backend.c:1036
#13 0x000055cbc242452a in coroutine_trampoline (i0=<optimized out>, i1=<optimized out>) at ./util/coroutine-ucontext.c:79
#14 0x00007fad5b0a2000 in ?? () from /lib/x86_64-linux-gnu/libc.so.6
#15 0x00007facf9ff98c0 in ?? ()
#16 0x0000000000000000 in ?? ()

// 从ImageRequestWQ<I>::aio_write()到入队io_work_queue
ImageRequestWQ<I>::aio_write--ImageRequestWQ<I>::queue--ThreadPool::PointerWQ<ImageRequest<I> >::queue(req)

// io_work_queue出队(ThreadPool::worker)到 Objecter::_send_op:

Thread 17 "tp_librbd" hit Breakpoint 1, Objecter::_send_op (this=this@entry=0x55cbc3888600, op=op@entry=0x7fad18004410, m=m@entry=0x7fad180089a0)
    at /mnt/ceph/src/osdc/Objecter.cc:3208
3208    {
(gdb) bt
#0  Objecter::_send_op (this=this@entry=0x55cbc3888600, op=op@entry=0x7fad18004410, m=m@entry=0x7fad180089a0) at /mnt/ceph/src/osdc/Objecter.cc:3208
#1  0x00007fad47143160 in Objecter::_op_submit (this=this@entry=0x55cbc3888600, op=op@entry=0x7fad18004410, sul=..., ptid=ptid@entry=0x7fad18008168)
    at /mnt/ceph/src/osdc/Objecter.cc:2486
#2  0x00007fad47148760 in Objecter::_op_submit_with_budget (this=this@entry=0x55cbc3888600, op=op@entry=0x7fad18004410, sul=..., ptid=ptid@entry=0x7fad18008168, 
    ctx_budget=ctx_budget@entry=0x0) at /mnt/ceph/src/osdc/Objecter.cc:2307
#3  0x00007fad471489de in Objecter::op_submit (this=0x55cbc3888600, op=0x7fad18004410, ptid=0x7fad18008168, ctx_budget=0x0) at /mnt/ceph/src/osdc/Objecter.cc:2274
#4  0x00007fad470fda93 in librados::IoCtxImpl::aio_operate (this=0x55cbc3895440, oid=..., o=0x7fad18004390, c=0x7fad180080a0, snap_context=..., flags=flags@entry=0, 
    trace_info=0x0) at /mnt/ceph/src/librados/IoCtxImpl.cc:826
#5  0x00007fad470e1eb0 in librados::IoCtx::aio_operate (this=this@entry=0x55cbc3894980, oid="rbd_data.fad56b8b4567.", '0' <repeats 15 times>, "a", 
    c=c@entry=0x7fad18001d60, o=o@entry=0x7fad297f8b80, snap_seq=0, snaps=std::vector of length 0, capacity 0, trace_info=0x0)
    at /mnt/ceph/src/librados/librados.cc:1544
#6  0x00007fad4750730b in librbd::io::AbstractObjectWriteRequest<librbd::ImageCtx>::write_object (this=this@entry=0x7fad180083b0)
    at /mnt/ceph/src/librbd/io/ObjectRequest.cc:528   // radosclient回调在这里创建,并传递给Objecter::handle_osd_op_reply里的onfinish->complete
#7  0x00007fad4750af66 in librbd::io::AbstractObjectWriteRequest<librbd::ImageCtx>::pre_write_object_map_update (this=this@entry=0x7fad180083b0)
    at /mnt/ceph/src/librbd/io/ObjectRequest.cc:496
#8  0x00007fad4750b837 in librbd::io::AbstractObjectWriteRequest<librbd::ImageCtx>::send (this=0x7fad180083b0) at /mnt/ceph/src/librbd/io/ObjectRequest.cc:459
#9  0x00007fad474f7571 in librbd::io::AbstractImageWriteRequest<librbd::ImageCtx>::send_object_requests (this=0x7facc41c0950, 
    object_extents=std::vector of length 1, capacity 1 = {...}, snapc=..., object_requests=0x0) at /mnt/ceph/src/librbd/io/ImageRequest.cc:450
#10 0x00007fad474fcf55 in librbd::io::AbstractImageWriteRequest<librbd::ImageCtx>::send_request (this=0x7facc41c0950) at /mnt/ceph/src/librbd/io/ImageRequest.cc:408
#11 0x00007fad474f8f91 in librbd::io::ImageRequest<librbd::ImageCtx>::send (this=this@entry=0x7facc41c0950) at /mnt/ceph/src/librbd/io/ImageRequest.cc:219                    
#12 0x00007fad474ff9b5 in librbd::io::ImageRequestWQ<librbd::ImageCtx>::process (this=0x55cbc3898890, req=0x7facc41c0950)
    at /mnt/ceph/src/librbd/io/ImageRequestWQ.cc:610
#13 0x00007fad3e5d9a68 in ThreadPool::worker (this=0x55cbc3895640, wt=<optimized out>) at /mnt/ceph/src/common/WorkQueue.cc:120
#14 0x00007fad3e5dac10 in ThreadPool::WorkThread::entry (this=<optimized out>) at /mnt/ceph/src/common/WorkQueue.h:448
#15 0x00007fad5b404494 in start_thread (arg=0x7fad297fa700) at pthread_create.c:333
#16 0x00007fad5b146acf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:97

块设备IO到rados对象映射过程(Striper)

void Striper::file_to_extents(
  CephContext *cct, const char *object_format,
  const file_layout_t *layout,
  uint64_t offset, uint64_t len,
  uint64_t trunc_size,
  map<object_t,vector<ObjectExtent> >& object_extents,
  uint64_t buffer_offset)
{
  ldout(cct, 10) << "file_to_extents " << offset << "~" << len
		 << " format " << object_format
		 << dendl;
  assert(len > 0);

  /*
   * we want only one extent per object!  this means that each extent
   * we read may map into different bits of the final read
   * buffer.. hence ObjectExtent.buffer_extents
   */

  // layout = {stripe_unit = 4194304, stripe_count = 1, object_size = 4194304, pool_id = 5, pool_ns = ""}
  __u32 object_size = layout->object_size;
  __u32 su = layout->stripe_unit;
  __u32 stripe_count = layout->stripe_count;
  assert(object_size >= su);
  if (stripe_count == 1) {
    ldout(cct, 20) << " sc is one, reset su to os" << dendl;
    su = object_size;
  }
  uint64_t stripes_per_object = object_size / su;  // 1
  ldout(cct, 20) << " su " << su << " sc " << stripe_count << " os "
		 << object_size << " stripes_per_object " << stripes_per_object
		 << dendl;

  uint64_t cur = offset; // 26596352
  uint64_t left = len;   // 8192
  while (left > 0) {
    // layout into objects
    uint64_t blockno = cur / su; // which block  // 6
    // which horizontal stripe (Y)
    uint64_t stripeno = blockno / stripe_count; // stripe_count = 1
    // which object in the object set (X)
    uint64_t stripepos = blockno % stripe_count;   // 6 % 1 = 0, always == 0
    // which object set
    uint64_t objectsetno = stripeno / stripes_per_object; // 6
    // object id
    uint64_t objectno = objectsetno * stripe_count + stripepos;  // 6 * 1 + 0 = 6

    /*
	rbd image: [obj1(4M) | obj2(4M) | obj3(4M) | ...] =
	           [rbd_data.fad56b8b4567.0000000000000000 | ... | rbd_data.fad56b8b4567.0000000000000006 | ...]
    */

    // find oid, extent
    // object_format = "rbd_data.fad56b8b4567.%016llx"
    char buf[strlen(object_format) + 32];
    snprintf(buf, sizeof(buf), object_format, (long long unsigned)objectno);
    object_t oid = buf;  //  oid = "rbd_data.fad56b8b4567.0000000000000006"

    // map range into object
    uint64_t block_start = (stripeno % stripes_per_object) * su; // 0
    uint64_t block_off = cur % su;	// 26596352 % 4194304 = 1430528
    uint64_t max = su - block_off;	// 4194304 - 1430528 = 2763776

    uint64_t x_offset = block_start + block_off; // 0 + 1430528
    uint64_t x_len;
    if (left > max)	// 8192 > 2763776
      x_len = max;
    else
      x_len = left;	// 8192

    ldout(cct, 20) << " off " << cur << " blockno " << blockno << " stripeno "
		   << stripeno << " stripepos " << stripepos << " objectsetno "
		   << objectsetno << " objectno " << objectno
		   << " block_start " << block_start << " block_off "
		   << block_off << " " << x_offset << "~" << x_len
		   << dendl;

    ObjectExtent *ex = 0;
    vector<ObjectExtent>& exv = object_extents[oid];
    if (exv.empty() || exv.back().offset + exv.back().length != x_offset) {
      exv.resize(exv.size() + 1);
      ex = &exv.back();
      ex->oid = oid;
      ex->objectno = objectno;
      ex->oloc = OSDMap::file_to_object_locator(*layout);  // 封装对象pool信息

      ex->offset = x_offset;
      ex->length = x_len;
      ex->truncate_size = object_truncate_size(cct, layout, objectno,
					       trunc_size); // trunc_size = 0

      ldout(cct, 20) << " added new " << *ex << dendl;
    } else {
      // add to extent
      ex = &exv.back();
      ldout(cct, 20) << " adding in to " << *ex << dendl;
      ex->length += x_len;
    }
    ex->buffer_extents.push_back(make_pair(cur - offset + buffer_offset,  // buffer_offset = 0
					   x_len));

    ldout(cct, 15) << "file_to_extents  " << *ex << " in " << ex->oloc
		   << dendl;
    // ldout(cct, 0) << "map: ino " << ino << " oid " << ex.oid << " osd "
    //		  << ex.osd << " offset " << ex.offset << " len " << ex.len
    //		  << " ... left " << left << dendl;

    left -= x_len;
    cur += x_len;
  }
  // object_extents = std::map with 1 elements = 
  //   {[{name = "rbd_data.fad56b8b4567.", '0' <repeats 15 times>, "6"}] = std::vector of length 1, capacity 1 = {{oid = {
  //      name = "rbd_data.fad56b8b4567.", '0' <repeats 15 times>, "6"}, objectno = 6, offset = 1430528, length = 8192, truncate_size = 0, oloc = {pool = 5, key = "", 
  //      nspace = "", hash = -1}, buffer_extents = std::vector of length 1, capacity 1 = {{first = 0, second = 8192}}}}}
}

object到osd的crush计算过程

Objecter::_op_submit --> Objecter::_calc_target(&op->target, nullptr) --> osdmap->object_locator_to_pg(t->target_oid, t->target_oloc, pgid);
                     \
                      \ --> _send_op(op, m)

遗留问题

  • 整体IO流程图
  • IO到object到op的拆分过程,以及op执行完毕后如何判断用户层单次IO全部执行完毕
  • object到osd的crush计算过程
  • IO请求发送过程及响应处理过程

perf counter机制

每个image一个perf counter,初始化过程:

Thread 16 "fn-radosclient" hit Breakpoint 3, librbd::ImageCtx::perf_start (this=this@entry=0x5555568cd300, name="librbd-fad56b8b4567-rbd-vol1")
    at /mnt/ceph/src/librbd/ImageCtx.cc:365
365       void ImageCtx::perf_start(string name) {
(gdb) bt
#0  librbd::ImageCtx::perf_start (this=this@entry=0x5555568cd300, name="librbd-fad56b8b4567-rbd-vol1") at /mnt/ceph/src/librbd/ImageCtx.cc:365
#1  0x00007fffdf047f14 in librbd::ImageCtx::init (this=0x5555568cd300) at /mnt/ceph/src/librbd/ImageCtx.cc:276
#2  0x00007fffdf0ee07f in librbd::image::OpenRequest<librbd::ImageCtx>::send_register_watch (this=this@entry=0x5555568c8c00)
    at /mnt/ceph/src/librbd/image/OpenRequest.cc:477
#3  0x00007fffdf0f57a7 in librbd::image::OpenRequest<librbd::ImageCtx>::handle_v2_apply_metadata (this=this@entry=0x5555568c8c00, result=result@entry=0x7fffc17f97f4)
    at /mnt/ceph/src/librbd/image/OpenRequest.cc:471
    
// send_v2_apply_metadata里通过create_rados_callback创建间接回调rados_state_callback(handle_v2_apply_metadata作为模板参数传递给rados_state_callback),rados_state_callback里会调用实际的回调handle_v2_apply_metadata
// handle_v2_apply_metadata是send_v2_apply_metadata的回调,而send_v2_apply_metadata被handle_v2_get_data_pool直接调用,handle_v2_get_data_pool又是send_v2_get_data_pool的回调(注册方法跟上面一样),逐级调用+回调(handle_xxx直接调用send_zzz,handle_xxx是它上面的函数send_xxx的回调)
// send_v2_detect_header是最开始的入口,打开rbd镜像时从rbd_open调过来
#4  0x00007fffdf0f5c7f in librbd::util::detail::rados_state_callback<librbd::image::OpenRequest<librbd::ImageCtx>, &librbd::image::OpenRequest<librbd::ImageCtx>::handle_v2_apply_metadata, true> (c=<optimized out>, arg=0x5555568c8c00) at /mnt/ceph/src/librbd/Utils.h:39
#5  0x00007fffded2a8dd in librados::C_AioComplete::finish (this=0x7fffc8001470, r=<optimized out>) at /mnt/ceph/src/librados/AioCompletionImpl.h:169
#6  0x00007fffded0ae59 in Context::complete (this=0x7fffc8001470, r=<optimized out>) at /mnt/ceph/src/include/Context.h:70
#7  0x00007fffd61ecb80 in Finisher::finisher_thread_entry (this=0x5555567e59d0) at /mnt/ceph/src/common/Finisher.cc:72
#8  0x00007ffff2a7d494 in start_thread (arg=0x7fffc17fa700) at pthread_create.c:333
#9  0x00007ffff27bfacf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:97
 void ImageCtx::perf_start(string name) {
    auto perf_prio = PerfCountersBuilder::PRIO_DEBUGONLY;
    if (child == nullptr) {
      // ensure top-level IO stats are exported for librbd daemons
      perf_prio = PerfCountersBuilder::PRIO_USEFUL;
    }
    // 创建PerfCounters
    PerfCountersBuilder plb(cct, name, l_librbd_first, l_librbd_last);

    // 添加实际的counter,支持多种类型,如计数器,时间记录器,平均计数器
    plb.add_u64_counter(l_librbd_rd, "rd", "Reads", "r", perf_prio);
    plb.add_time_avg(l_librbd_rd_latency, "rd_latency", "Latency of reads",
                     "rl", perf_prio);
    plb.add_time(l_librbd_opened_time, "opened_time", "Opened time",
                 "ots", perf_prio);
    // 创建实际的perfcounter,并添加到image context的perfcounter集合中
    perfcounter = plb.create_perf_counters();
    cct->get_perfcounters_collection()->add(perfcounter);
    // 记录时间
    perfcounter->tset(l_librbd_opened_time, ceph_clock_now());
  }

使用过程:

/* added by wangpan */
// tsetp: time set pair, record slowest io start time and elapsed
// 自定义了一个counter类型的记录函数,沿用了社区的记录数据结构,但改变了数据结构保存的内容
void PerfCounters::tsetp(int idx, utime_t start, utime_t elapsed)
{
  if (!m_cct->_conf->perf)
    return;

  assert(idx > m_lower_bound);
  assert(idx < m_upper_bound);

  Mutex::Locker lck(m_lock);  // we should modify two params synchronously

  perf_counter_data_any_d& data(m_data[idx - m_lower_bound - 1]);
  if (!(data.type & PERFCOUNTER_TIME))
    return;

  if (data.type & PERFCOUNTER_LONGRUNAVG) {
    if (data.u64 < elapsed.to_nsec()) {
      data.u64 = elapsed.to_nsec();         // use u64(sum in dump) as io elapsed
      data.avgcount = start.to_msec();      // use avgcount as io start timestamp
      data.avgcount2.store(data.avgcount);  // useless but for read_avg func run as usual
    }
  }
}
/* added end */
template <typename I>
void ImageWriteRequest<I>::update_stats(size_t length) {
  I &image_ctx = this->m_image_ctx;
  image_ctx.perfcounter->inc(l_librbd_wr); // 累积计数
  image_ctx.perfcounter->inc(l_librbd_wr_bytes, length); // 累积计数
}
void AioCompletion::complete() {
  assert(lock.is_locked());
  assert(ictx != nullptr);
  CephContext *cct = ictx->cct;

  tracepoint(librbd, aio_complete_enter, this, rval);
  utime_t elapsed;
  elapsed = ceph_clock_now() - start_time;
  switch (aio_type) {
  case AIO_TYPE_GENERIC:
  case AIO_TYPE_OPEN:
  case AIO_TYPE_CLOSE:
    break;
  case AIO_TYPE_READ:
    ictx->perfcounter->tinc(l_librbd_rd_latency, elapsed); break;
  case AIO_TYPE_WRITE:
    ictx->perfcounter->tinc(l_librbd_wr_latency, elapsed); break;
  case AIO_TYPE_DISCARD:
    ictx->perfcounter->tinc(l_librbd_discard_latency, elapsed); break;
  case AIO_TYPE_FLUSH:
    ictx->perfcounter->tinc(l_librbd_aio_flush_latency, elapsed); break;
  case AIO_TYPE_WRITESAME:
    ictx->perfcounter->tinc(l_librbd_ws_latency, elapsed); break;
  case AIO_TYPE_COMPARE_AND_WRITE:
    ictx->perfcounter->tinc(l_librbd_cmp_latency, elapsed); break;
  default:
    lderr(cct) << "completed invalid aio_type: " << aio_type << dendl;
    break;
  }

  /* added by wangpan */
  switch (aio_type) {
  case AIO_TYPE_NONE:
  case AIO_TYPE_GENERIC:
  case AIO_TYPE_OPEN:
  case AIO_TYPE_CLOSE:
    break;  // ignore above io type
  case AIO_TYPE_READ:
  case AIO_TYPE_WRITE:
  case AIO_TYPE_DISCARD:
  case AIO_TYPE_FLUSH:
  case AIO_TYPE_WRITESAME:
  case AIO_TYPE_COMPARE_AND_WRITE:
    {
      // record all slow io in count, and store the slowest one
      auto threshold = cct->_conf->get_val<double>("rbd_slow_io_threshold");
      if (threshold > 0) {
        utime_t thr;
        thr.set_from_double(threshold);
        if (elapsed >= thr) {
          ldout(cct, 20) << "elapsed(ms): " << elapsed.to_msec() << dendl;
          ictx->perfcounter->inc(l_librbd_all_slow_io_count);
          ictx->perfcounter->tsetp(l_librbd_slowest_io, start_time, elapsed);
        }
      }
    }
    break;
  }
  /* added end */
  ......
  state = AIO_STATE_CALLBACK;
  if (complete_cb) { // qemu/block/rbd.c:rbd_finish_aiocb
    lock.Unlock();
    complete_cb(rbd_comp, complete_arg);
    lock.Lock();
  }
  ......
}
  

cephtimer_detailtimer机制

类似SafeTimer,一个线程专门检查定时任务是否需要触发,可以取消定时任务,取消时如果发现任务已经触发了就忽略,没触发就取消任务。

线程未命名,仍然叫qemu-system-x86,在Objecter对象构造的时候启动:

class Objecter: {
  private:
    ceph::timer<ceph::mono_clock> timer;
}


class timer {
  public:
    timer() {
	  lock_guard l(lock);
	  suspended = false;
	  thread = std::thread(&timer::timer_thread, this); // 启动线程
    }
  ......
  void timer_thread() {
    ...... // 执行体,定时检查是否有任务需要触发
  }
  
}
void Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul,
				      ceph_tid_t *ptid,
				      int *ctx_budget)
{
  ......
  /* added by wangpan */
  auto timeout_warning = cct->_conf->get_val<double>("rados_osd_op_timeout_warning");
  if (timeout_warning > 0) {
    ceph::timespan tw = ceph::make_timespan(timeout_warning);
    op->onslowop_warning = timer.add_event(tw, [this, op, timeout_warning]() {
                    ldout(cct, 0) << "[slow op] warning(>" << timeout_warning << "s), object name: "
                                  << op->target.base_oid.name << ", pool: "
                                  << op->target.base_oloc.pool << dendl; } );
    ldout(cct, 20) << "added slow op warning timer event: " << op->onslowop_warning
                  << ", threshold: "<< timeout_warning << dendl;
  }

  auto timeout_critical = cct->_conf->get_val<double>("rados_osd_op_timeout_critical");
  if (timeout_critical > 0) {
    ceph::timespan tc = ceph::make_timespan(timeout_critical);
    op->onslowop_critical = timer.add_event(tc, [this, op, timeout_critical]() {
                    ldout(cct, 0) << "[slow op] critical(>" << timeout_critical << "s), object name: "
                                  << op->target.base_oid.name << ", pool: "
                                  << op->target.base_oloc.pool << dendl; } );
    ldout(cct, 20) << "added slow op critical timer event: " << op->onslowop_critical
                  << ", threshold: "<< timeout_critical << dendl;
  }
  /* added end */
  ......
}


void Objecter::_finish_op(Op *op, int r)
{
  ldout(cct, 15) << "finish_op " << op->tid << dendl;

  // op->session->lock is locked unique or op->session is null

  if (!op->ctx_budgeted && op->budgeted)
    put_op_budget(op);

  /* added by wangpan */
  if (op->onslowop_warning) {
    timer.cancel_event(op->onslowop_warning);
	ldout(cct, 20) << "cancel slow op warning timer event: " << op->onslowop_warning << dendl;
  }
  if (op->onslowop_critical) {
    timer.cancel_event(op->onslowop_critical);
	ldout(cct, 20) << "cancel slow op critical timer event: " << op->onslowop_critical << dendl;
  }
  /* added end */
  ......
}

参考:Ceph动态更新参数机制浅析 http://t.cn/EPQE1tt

 

使用vstart搭建ceph开发环境

准备工作

  1. 准备代码使用的目录,注意目录要足够大,保证有100G以上可用空间(编译过程占用很多磁盘)
  2. clone源码,切换分支
$ git clone https://github.com/ceph/ceph.git
$ git checkout v12.2.5 -b v12.2.5
$ git submodule update --init --recursive

 

编译vstart所需二进制文件

$ ./run-make-check.sh    ## ceph源码根目录执行,默认如果有2个以上的CPU,只使用一半数量的CPU进行编译,可以编辑下这个脚本文件,把get_processors里面的“expr $(nproc) / 2”改成“expr $(nproc) / 1”,使用全部CPU进行编译
cd ceph/build
make vstart

等待编译结束后,即可执行vstart命令启动ceph集群。

启动集群

$ ../src/vstart.sh -d -n -X --bluestore --mon_num 1 --osd_num 3 --mgr_num 1 --mds_num 1   ### build目录下执行
## 各参数意义
$ ../src/vstart.sh -h
usage: ../src/vstart.sh [option]...
ex: ../src/vstart.sh -n -d --mon_num 3 --osd_num 3 --mds_num 1 --rgw_num 1
options:
        -d, --debug
        -s, --standby_mds: Generate standby-replay MDS for each active
        -l, --localhost: use localhost instead of hostname
        -i <ip>: bind to specific ip
        -n, --new                                                           ####### 注意这个参数,首次启动新集群要加,二次启动不要加
        -N, --not-new: reuse existing cluster config (default)
        --valgrind[_{osd,mds,mon,rgw}] 'toolname args...'
        --nodaemon: use ceph-run as wrapper for mon/osd/mds
        --smallmds: limit mds cache size
        -m ip:port              specify monitor address
        -k keep old configuration files
        -x enable cephx (on by default)
        -X disable cephx
        --hitset <pool> <hit_set_type>: enable hitset tracking
        -e : create an erasure pool
        -o config                add extra config parameters to all sections
        --mon_num specify ceph monitor count
        --osd_num specify ceph osd count
        --mds_num specify ceph mds count
        --rgw_num specify ceph rgw count
        --mgr_num specify ceph mgr count
        --rgw_port specify ceph rgw http listen port
        --rgw_frontend specify the rgw frontend configuration
        --rgw_compression specify the rgw compression plugin
        -b, --bluestore use bluestore as the osd objectstore backend         ######## 使用bluestore后端
        --memstore use memstore as the osd objectstore backend
        --cache <pool>: enable cache tiering on pool
        --short: short object names only; necessary for ext4 dev
        --nolockdep disable lockdep
        --multimds <count> allow multimds with maximum active count

 

查看集群状态

### build目录下执行
$ bin/ceph -s
*** DEVELOPER MODE: setting PATH, PYTHONPATH and LD_LIBRARY_PATH ***
2018-08-21 14:25:48.651800 7f35d4418700 -1 WARNING: all dangerous and experimental features are enabled.
2018-08-21 14:25:48.659410 7f35d4418700 -1 WARNING: all dangerous and experimental features are enabled.
  cluster:
    id:     376aef1c-90a7-4dc1-ba3f-d0ce00490988
    health: HEALTH_OK
 
  services:
    mon: 1 daemons, quorum a
    mgr: x(active)
    mds: cephfs_a-1/1/1 up  {0=a=up:active}
    osd: 3 osds: 3 up, 3 in
 
  data:
    pools:   3 pools, 24 pgs
    objects: 21 objects, 2246 bytes
    usage:   3081 MB used, 27638 MB / 30720 MB avail
    pgs:     24 active+clean

 

停止集群

$ ../src/stop.sh         ### build目录下执行

 

清理集群

$ ../src/stop.sh           ### build目录下执行
rm -rf out dev           ### build目录下执行

注意清理后,如果要再次启动集群,vstart.sh参数里要加上-n。

编译rbd命令

默认情况下,vstart并不编译rbd相关命令和库,需要手工编译,编译方法和普通编译过程没有区别,编译好的二进制文件都在build/bin目录下,跟vstart编译的其他二进制文件一样

make rbd -j4       ##### build目录下执行

之后就可以进行rbd卷相关操作了,

### build目录下执行
$ bin/ceph osd pool create rbd 8 replicated      #### 创建rbd pool
$ bin/rbd ls
2018-08-21 14:26:49.086272 7f887ac2c0c0 -1 WARNING: all dangerous and experimental features are enabled.
2018-08-21 14:26:49.086515 7f887ac2c0c0 -1 WARNING: all dangerous and experimental features are enabled.
2018-08-21 14:26:49.089905 7f887ac2c0c0 -1 WARNING: all dangerous and experimental features are enabled.
$ bin/rbd create vol1 --size 10M
$ bin/rbd ls
vol1
$ bin/rbd info vol1
rbd image 'vol1':
        size 10240 kB in 3 objects
        order 22 (4096 kB objects)
        block_name_prefix: rbd_data.10346b8b4567
        format: 2
        features: layering, exclusive-lock, object-map, fast-diff, deep-flatten
        flags:
        create_timestamp: Tue Aug 21 14:27:13 2018
$ bin/rbd create vol2 --size 10M --image-feature layering
$ bin/rbd info vol2
rbd image 'vol2':
        size 10240 kB in 3 objects
        order 22 (4096 kB objects)
        block_name_prefix: rbd_data.10386b8b4567
        format: 2
        features: layering
        flags:
        create_timestamp: Tue Aug 21 14:28:24 2018
$ bin/rbd map vol2
/dev/rbd0
 

 

参考资料:

http://docs.ceph.com/docs/luminous/dev/quick_guide/

QEMU+librbd调试环境搭建

本次环境搭建在云主机上完成。如在物理机上搭建,可能需要修改虚拟机xml配置文件。

前提

ceph环境已搭建完毕,并可以创建rbd卷。

安装qemu及libvirt

 

apt-get update
apt-get install qemu qemu-block-extra libvirt-daemon-system libvirt-daemon libvirt-clients

 

其中qemu-block-extra包是为了给qemu提供rbd协议存储后端扩展支持。

下载虚拟机镜像

wget https://download.cirros-cloud.net/0.4.0/cirros-0.4.0-x86_64-disk.img  ## 这个是最精简的虚拟机镜像,只有14M

 

创建虚拟机使用的rbd卷(虚拟机系统盘)

qemu-img convert -f qcow2 -O raw cirros-0.4.0-x86_64-disk.img rbd:rbd/vol1
# 注意这条命令会在转换镜像格式的同时在ceph rbd池里创建rbd卷vol1,因此vol1不能已存在
# 如有需要可以resize扩容rbd卷,扩容卷之后还需要扩容文件系统,建议单独挂载一块rbd卷到虚拟机用于测试

 

准备虚拟机xml配置文件

保存成xxx.xml,比如libvirt.xml:

<domain type="qemu">  <!-- 注意这里的type,如果是物理机上启动虚拟机,需要改为kvm  -->
  <uuid>5d1289be-50e1-47b7-86de-1de0ff16a9d4</uuid>  <!-- 虚拟机uuid  -->
  <name>ceph</name>    <!-- 虚拟机名称  -->
  <memory>524288</memory>   <!-- 虚拟机内存大小,这里是配置的512M  -->
  <vcpu>1</vcpu>   <!-- 虚拟机CPU数量  -->
  <os>
    <type>hvm</type>
    <boot dev="hd"/>
  </os>
  <features>
    <acpi/>
    <apic/>
  </features>
  <clock offset="utc">
    <timer name="pit" tickpolicy="delay"/>
    <timer name="rtc" tickpolicy="catchup"/>
  </clock>
  <cpu mode="host-model" match="exact"/>
  <devices>       <!-- 虚拟机磁盘配置,一般vda是系统盘  -->
      <disk type="network" device="disk">
      <driver type="raw" cache="none"/>
      <source protocol="rbd" name="rbd/vol1">      <!-- 一般需要修改name,也就是$pool/$volume  -->
        <host name="192.168.0.2" port="6789"/>        <!-- mon地址  -->
      </source>
      <target bus="virtio" dev="vda"/>         <!-- 虚拟机内设备  -->
    </disk>
    <serial type="file">
      <source path="/var/log/libvirt/qemu/ceph-console.log"/>        <!-- 把虚拟机控制台输出到文件,可选  -->
    </serial>
    <serial type="pty"/>
    <input type="tablet" bus="usb"/>
    <graphics type="vnc" autoport="yes" keymap="en-us" listen="0.0.0.0"/>      <!-- 虚拟机VNC监听地址,一般不需要修改  -->
  </devices>
</domain>

 

之后执行virsh define libvirt.xml(定义虚拟机并持久化虚拟机配置到libvirt),virsh list

–all (查看所有状态虚拟机,包含关机状态),virsh start ceph(启动虚拟机),virsh destroy/shutdown ceph(强制/正常关机),virsh undefine ceph(清理虚拟机)。

 

挂载卷到虚拟机上

准备挂载卷的xml配置,基本上就是从虚拟机配置里面摘出来的磁盘配置:

<disk type="network" device="disk">
  <driver type="raw" cache="none"/>
  <source protocol="rbd" name="rbd/vol2">   <!-- 卷名称要改下 -->
    <host name="192.168.0.2" port="6789"/>
  </source>
  <target bus="virtio" dev="vdb"/>   <!-- 主要是这里的虚拟机设备要改下,不能是虚拟机xml配置文件里面已有的 -->
</disk>

 

之后执行virsh attach-device ceph vdb.xml,虚拟机里面sudo fdisk -l即可看到,注意,virsh destroy之后再start虚拟机,动态挂载的卷会消失,可以在attach-device命令后加上–config参数进行持久化,或者直接把这段xml放到libvirt.xml里面(<devices></devices>段里面即可)再启动虚拟机。

卸载磁盘设备执行virsh detach-device ceph vdb.xml即可。

 

ceph-client socket

如果你在ceph.conf里面配置了admin_socket,并且相关目录的权限也放开(虚拟机对应的qemu进程是libvirt-qemu用户组),

[client]
admin_socket = /var/run/ceph/$cluster-$type.$id.$pid.$cctid.asok
log_file = /var/log/ceph/qemu/qemu-guest-$pid.log

那么你就可以看到相应的ceph-client socket生成。

$ ll /var/run/ceph/
total 0
srwxrwxr-x 1 libvirt-qemu libvirt-qemu 0 Aug 21 11:30 ceph-client.admin.70781.94809655383520.asok
srwxrwxrwx 1 ceph         ceph         0 Aug 17 13:51 ceph-mgr.ceph1.asok
srwxrwxrwx 1 ceph         ceph         0 Aug 17 16:06 ceph-mon.ceph1.asok
srwxrwxrwx 1 ceph         ceph         0 Aug 17 13:51 ceph-osd.0.asok
srwxrwxrwx 1 ceph         ceph         0 Aug 17 13:51 ceph-osd.1.asok
srwxrwxrwx 1 ceph         ceph         0 Aug 17 13:51 ceph-osd.2.asok
$ ceph --admin-daemon /var/run/ceph/ceph-client.admin.70781.94809655383520.asok perf dump | grep librbd-fad56b8b4567-rbd-vol1 -A20                   
    "librbd-fad56b8b4567-rbd-vol1": {
        "rd": 1227,
        "rd_bytes": 25809408,
        "rd_latency": {
            "avgcount": 1227,
            "sum": 3.044197946,
            "avgtime": 0.002481008
        },
        "wr": 65,
        "wr_bytes": 159744,
        "wr_latency": {
            "avgcount": 65,
            "sum": 40.068453646,
            "avgtime": 0.616437748
        },
        "discard": 0,
        "discard_bytes": 0,
        "discard_latency": {
            "avgcount": 0,
            "sum": 0.000000000,
            "avgtime": 0.000000000

 

通过qemu调试librbd

注意:需要先手工编译安装debug版本librbd,qemu使用上面提到的apt-get方式安装,未编译,因此调试时看不到相应源码,如有需要可以自行编译,或者安装debug包。

虚拟机启动后,可以用gdb调试qemu进程,qemu进程通过调用librbd.so来进行rbd卷的IO读写:

## 首先找到qemu进程pid
ps -ef | grep qemu | grep mon_host
libvirt+   70781       1  6 11:30 ?        00:15:18 /usr/bin/qemu-system-x86_64 -name guest=ceph,debug-threads=on -S -object secret,id=masterKey0,format=raw,file=/var/lib/libvirt/qemu/domain-6-ceph/master-key.aes -machine pc-i440fx-2.8,accel=tcg,usb=off,dump-guest-core=off -cpu Broadwell,+vme,+ss,+osxsave,+f16c,+rdrand,+hypervisor,+arat,+tsc_adjust,+xsaveopt,+pdpe1gb,+abm -m 512 -realtime mlock=off -smp 1,sockets=1,cores=1,threads=1 -uuid 5d1289be-50e1-47b7-86de-1de0ff16a9d4 -no-user-config -nodefaults -chardev socket,id=charmonitor,path=/var/lib/libvirt/qemu/domain-6-ceph/monitor.sock,server,nowait -mon chardev=charmonitor,id=monitor,mode=control -rtc base=utc,driftfix=slew -global kvm-pit.lost_tick_policy=delay -no-shutdown -boot strict=on -device piix3-usb-uhci,id=usb,bus=pci.0,addr=0x1.0x2 -drive file=rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\:6789,format=raw,if=none,id=drive-virtio-disk0,cache=none -device virtio-blk-pci,scsi=off,bus=pci.0,addr=0x3,drive=drive-virtio-disk0,id=virtio-disk0,bootindex=1 -add-fd set=0,fd=27 -chardev file,id=charserial0,path=/dev/fdset/0,append=on -device isa-serial,chardev=charserial0,id=serial0 -chardev pty,id=charserial1 -device isa-serial,chardev=charserial1,id=serial1 -device usb-tablet,id=input0,bus=usb.0,port=1 -vnc 0.0.0.0:0 -k en-us -device cirrus-vga,id=video0,bus=pci.0,addr=0x2 -device virtio-balloon-pci,id=balloon0,bus=pci.0,addr=0x4 -msg timestamp=on
 
## 之后用gdb挂载到pid
$ gdb -p 70781
GNU gdb (Debian 7.12-6) 7.12.0.20161007-git
......
Attaching to process 70781
0x00007f1cb4f51741 in __GI_ppoll (fds=0x563a98ec8690, nfds=8, timeout=<optimized out>, sigmask=0x0) at ../sysdeps/unix/sysv/linux/ppoll.c:39
39      ../sysdeps/unix/sysv/linux/ppoll.c: No such file or directory.
(gdb)b librbd::io::ImageRequest<librbd::ImageCtx>::create_write_request     ### 添加librbd断点
(gdb) c
Continuing.       #### 虚拟机里面执行IO操作
[Switching to Thread 0x7f1c84ff9700 (LWP 70800)]
Thread 20 "CPU 0/TCG" hit Breakpoint 3, librbd::io::ImageRequest<librbd::ImageCtx>::create_write_request(librbd::ImageCtx&, librbd::io::AioCompletion*, std::vector<std::pair<unsigned long, unsigned long>, std::allocator<std::pair<unsigned long, unsigned long> > >&&, ceph::buffer::list&&, int, ZTracer::Trace const&) (
    image_ctx=..., aio_comp=aio_comp@entry=0x7f1c417c9400,
    image_extents=image_extents@entry=<unknown type in /usr/local/lib/librbd.so.1, CU 0x1c9367c, DIE 0x1d71f9f>,
    bl=bl@entry=<unknown type in /usr/local/lib/librbd.so.1, CU 0x1c9367c, DIE 0x1d71fb7>, op_flags=op_flags@entry=0, parent_trace=...)
    at /mnt/ceph/src/librbd/io/ImageRequest.cc:96
96      ImageRequest<I>* ImageRequest<I>::create_write_request(
(gdb) bt
#0  librbd::io::ImageRequest<librbd::ImageCtx>::create_write_request(librbd::ImageCtx&, librbd::io::AioCompletion*, std::vector<std::pair<unsigned long, unsigned long>, std::allocator<std::pair<unsigned long, unsigned long> > >&&, ceph::buffer::list&&, int, ZTracer::Trace const&) (image_ctx=...,
    aio_comp=aio_comp@entry=0x7f1c417c9400, image_extents=image_extents@entry=<unknown type in /usr/local/lib/librbd.so.1, CU 0x1c9367c, DIE 0x1d71f9f>,
    bl=bl@entry=<unknown type in /usr/local/lib/librbd.so.1, CU 0x1c9367c, DIE 0x1d71fb7>, op_flags=op_flags@entry=0, parent_trace=...)
    at /mnt/ceph/src/librbd/io/ImageRequest.cc:96
#1  0x00007f1ca1318f59 in librbd::io::ImageRequestWQ<librbd::ImageCtx>::aio_write(librbd::io::AioCompletion*, unsigned long, unsigned long, ceph::buffer::list&&, int, bool) (this=0x563a97ee5090, c=0x7f1c417c9400, off=off@entry=43303936, len=len@entry=1024,
    bl=bl@entry=<unknown type in /usr/local/lib/librbd.so.1, CU 0x1dbf9f7, DIE 0x1e87755>, op_flags=op_flags@entry=0, native_async=true)
    at /mnt/ceph/src/librbd/io/ImageRequestWQ.cc:264
#2  0x00007f1ca1228310 in rbd_aio_write (image=<optimized out>, off=43303936, len=1024, buf=<optimized out>, c=<optimized out>)
    at /mnt/ceph/src/librbd/librbd.cc:3536
#3  0x00007f1ca172a33a in ?? () from /usr/lib/x86_64-linux-gnu/qemu/block-rbd.so
#4  0x00007f1ca172a426 in ?? () from /usr/lib/x86_64-linux-gnu/qemu/block-rbd.so
#5  0x0000563a965aac3c in ?? ()
#6  0x0000563a965abed0 in ?? ()
#7  0x0000563a965acba7 in bdrv_co_pwritev ()
#8  0x0000563a9656e469 in ?? ()
#9  0x0000563a965aab21 in ?? ()
#10 0x0000563a965abed0 in ?? ()
#11 0x0000563a965acba7 in bdrv_co_pwritev ()
#12 0x0000563a9659e90d in blk_co_pwritev ()
#13 0x0000563a9659ea2b in ?? ()
#14 0x0000563a9661752a in ?? ()
#15 0x00007f1cb4eb6000 in ?? () from /lib/x86_64-linux-gnu/libc.so.6
#16 0x00007ffda22a0ff0 in ?? ()
#17 0x0000000000000000 in ?? ()
(gdb) l
91                                       std::move(read_result), op_flags,
92                                       parent_trace);
93      }
94
95      template <typename I>
96      ImageRequest<I>* ImageRequest<I>::create_write_request(
97          I &image_ctx, AioCompletion *aio_comp, Extents &&image_extents,
98          bufferlist &&bl, int op_flags, const ZTracer::Trace &parent_trace) {
99        return new ImageWriteRequest<I>(image_ctx, aio_comp, std::move(image_extents),
100                                       std::move(bl), op_flags, parent_trace);
(gdb) l
101     }
102
103     template <typename I>
104     ImageRequest<I>* ImageRequest<I>::create_discard_request(
105         I &image_ctx, AioCompletion *aio_comp, uint64_t off, uint64_t len,
106         bool skip_partial_discard, const ZTracer::Trace &parent_trace) {
107       return new ImageDiscardRequest<I>(image_ctx, aio_comp, off, len,
108                                         skip_partial_discard, parent_trace);
109     }
110

 

安装qemu debug symbols

 

### 添加debug symbols源
cat <<EOF | sudo tee /etc/apt/sources.list.d/dbgsym.list
> deb http://debug.mirrors.debian.org/debian-debug/ stretch-debug main
> EOF
deb http://debug.mirrors.debian.org/debian-debug/ stretch-debug main
### 更新源
$ apt update -y
$ apt install qemu-system-x86-dbgsym qemu-block-extra-dbgsym qemu-system-common-dbgsym qemu-utils-dbgsym -y

之后再用gdb调试就可以看到全部的调用栈信息及源码位置了:

Thread 37 "CPU 0/TCG" hit Breakpoint 1, librbd::io::ImageRequest<librbd::ImageCtx>::create_write_request(librbd::ImageCtx&, librbd::io::AioCompletion*, std::vector<std::pair<unsigned long, unsigned long>, std::allocator<std::pair<unsigned long, unsigned long> > >&&, ceph::buffer::list&&, int, ZTracer::Trace const&) (
    image_ctx=..., aio_comp=aio_comp@entry=0x7facc4278200,
    image_extents=image_extents@entry=<unknown type in /usr/local/lib/librbd.so.1, CU 0x1c9367c, DIE 0x1d71f9f>,
    bl=bl@entry=<unknown type in /usr/local/lib/librbd.so.1, CU 0x1c9367c, DIE 0x1d71fb7>, op_flags=op_flags@entry=0, parent_trace=...)
    at /mnt/ceph/src/librbd/io/ImageRequest.cc:96
96      ImageRequest<I>* ImageRequest<I>::create_write_request(
(gdb) bt
#0  librbd::io::ImageRequest<librbd::ImageCtx>::create_write_request(librbd::ImageCtx&, librbd::io::AioCompletion*, std::vector<std::pair<unsigned long, unsigned long>, std::allocator<std::pair<unsigned long, unsigned long> > >&&, ceph::buffer::list&&, int, ZTracer::Trace const&) (image_ctx=...,
    aio_comp=aio_comp@entry=0x7facc4278200, image_extents=image_extents@entry=<unknown type in /usr/local/lib/librbd.so.1, CU 0x1c9367c, DIE 0x1d71f9f>,
    bl=bl@entry=<unknown type in /usr/local/lib/librbd.so.1, CU 0x1c9367c, DIE 0x1d71fb7>, op_flags=op_flags@entry=0, parent_trace=...)
    at /mnt/ceph/src/librbd/io/ImageRequest.cc:96
#1  0x00007fad47504f59 in librbd::io::ImageRequestWQ<librbd::ImageCtx>::aio_write(librbd::io::AioCompletion*, unsigned long, unsigned long, ceph::buffer::list&&, int, bool) (this=0x55cbc3898890, c=0x7facc4278200, off=off@entry=17248256, len=len@entry=1024,
    bl=bl@entry=<unknown type in /usr/local/lib/librbd.so.1, CU 0x1dbf9f7, DIE 0x1e87755>, op_flags=op_flags@entry=0, native_async=true)
    at /mnt/ceph/src/librbd/io/ImageRequestWQ.cc:264
#2  0x00007fad47414310 in rbd_aio_write (image=<optimized out>, off=off@entry=17248256, len=len@entry=1024,
    buf=buf@entry=0x7facc4279000 " Opts: (null)\nAug 22 04:20:32 cirros kern.info kernel: [    4.859555] EXT4-fs (sda1): re-mounted. Opts: data=ordered\nAug 22 04:20:32 cirros kern.notice kernel: [    5.703121] random: dd urandom read w"..., c=<optimized out>) at /mnt/ceph/src/librbd/librbd.cc:3536
#3  0x00007fad4791633a in rbd_start_aio (bs=<optimized out>, off=17248256, qiov=<optimized out>, size=1024, cb=<optimized out>, opaque=<optimized out>,
    cmd=RBD_AIO_WRITE) at ./block/rbd.c:697
#4  0x00007fad47916426 in qemu_rbd_aio_writev (bs=<optimized out>, sector_num=<optimized out>, qiov=<optimized out>, nb_sectors=<optimized out>, cb=<optimized out>,
    opaque=<optimized out>) at ./block/rbd.c:746
#5  0x000055cbc23b7c3c in bdrv_driver_pwritev (bs=bs@entry=0x55cbc36c9890, offset=offset@entry=17248256, bytes=bytes@entry=1024, qiov=qiov@entry=0x7facc4277c60,
    flags=flags@entry=0) at ./block/io.c:901
#6  0x000055cbc23b8ed0 in bdrv_aligned_pwritev (bs=bs@entry=0x55cbc36c9890, req=req@entry=0x7facf82fbbc0, offset=offset@entry=17248256, bytes=bytes@entry=1024,
    align=align@entry=512, qiov=qiov@entry=0x7facc4277c60, flags=0) at ./block/io.c:1360
#7  0x000055cbc23b9ba7 in bdrv_co_pwritev (child=<optimized out>, offset=<optimized out>, offset@entry=17248256, bytes=bytes@entry=1024,
    qiov=qiov@entry=0x7facc4277c60, flags=flags@entry=0) at ./block/io.c:1610
#8  0x000055cbc237b469 in raw_co_pwritev (bs=0x55cbc36c35e0, offset=17248256, bytes=1024, qiov=<optimized out>, flags=<optimized out>) at ./block/raw_bsd.c:243
#9  0x000055cbc23b7b21 in bdrv_driver_pwritev (bs=bs@entry=0x55cbc36c35e0, offset=offset@entry=17248256, bytes=bytes@entry=1024, qiov=qiov@entry=0x7facc4277c60,
    flags=flags@entry=0) at ./block/io.c:875
#10 0x000055cbc23b8ed0 in bdrv_aligned_pwritev (bs=bs@entry=0x55cbc36c35e0, req=req@entry=0x7facf82fbe90, offset=offset@entry=17248256, bytes=bytes@entry=1024,
    align=align@entry=1, qiov=qiov@entry=0x7facc4277c60, flags=0) at ./block/io.c:1360
#11 0x000055cbc23b9ba7 in bdrv_co_pwritev (child=<optimized out>, offset=<optimized out>, offset@entry=17248256, bytes=bytes@entry=1024,
    qiov=qiov@entry=0x7facc4277c60, flags=0) at ./block/io.c:1610
#12 0x000055cbc23ab90d in blk_co_pwritev (blk=0x55cbc36bd690, offset=17248256, bytes=1024, qiov=0x7facc4277c60, flags=<optimized out>) at ./block/block-backend.c:848
#13 0x000055cbc23aba2b in blk_aio_write_entry (opaque=0x7facc574eb50) at ./block/block-backend.c:1036
#14 0x000055cbc242452a in coroutine_trampoline (i0=<optimized out>, i1=<optimized out>) at ./util/coroutine-ucontext.c:79
#15 0x00007fad5b0a2000 in ?? () from /lib/x86_64-linux-gnu/libc.so.6
#16 0x00007facf9ff98c0 in ?? ()
#17 0x0000000000000000 in ?? ()

 

从gdb启动qemu进程

如果需要用gdb直接启动qemu进程,可以使用qemu命令行方式启动虚拟机:

$ gdb qemu-system-x86_64
GNU gdb (Debian 7.12-6) 7.12.0.20161007-git
Copyright (C) 2016 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law.  Type "show copying"
and "show warranty" for details.
This GDB was configured as "x86_64-linux-gnu".
Type "show configuration" for configuration details.
For bug reporting instructions, please see:
<http://www.gnu.org/software/gdb/bugs/>.
Find the GDB manual and other documentation resources online at:
<http://www.gnu.org/software/gdb/documentation/>.
For help, type "help".
Type "apropos word" to search for commands related to "word"...
Reading symbols from qemu-system-x86_64...(no debugging symbols found)...done.
(gdb) set args -m 512 -smp 1 -drive format=raw,file=rbd:rbd/vol1:auth_supported=none:mon_host=192.168.0.2\\:6789 -vnc :0
(gdb) b main

 

注意事项:

这里的虚拟机配置没有配置网络(如果需要网络,则需要先配置网桥),如果要连接到虚拟机内部,可以使用VNC客户端连接,下载地址:https://www.realvnc.com/en/connect/download/viewer/

vnc端口号查看:virsh vncdisplay ceph,:0表示5900,VNC客户端比较智能,会自动把0映射到5900,也即你在客户端里面输入:192.168.0.2和192.168.0.2:0和192.168.0.2:5900是一样的效果。

也可以通过ps -ef|grep qemu | grep vnc查看,如-vnc 0.0.0.0:0,表示vnc server监听端口5900。

公有云云主机,由于有安全组限制,需要打开对应的VNC server端口才能连接。

或者通过xshell/secureCRT客户端的端口映射转发来映射过去(secureCRT如下图),之后在VNC客户端输入127.0.0.1:5900即可访问虚拟机。

QEMU/KVM USB设备直通/透传虚拟机

英文应该是叫qemu kvm usb device passthrough,有些人翻译成直通,有些人叫透传,总之就是passthrough。

下面的内容是综合整理了网上的教程和咨询前华为同事的结果,以及自己试验的一些结论。

准备工作:

  1. 确认宿主机BIOS里面打开了VT-d/VT-x/VT-i等所有硬件虚拟化支持开关
  2. 打开Linux操作系统的iommu开关,在grub启动命令行里面配置,Intel CPU和AMD CPU配置参数有区别:Intel CPU: intel_iommu=on;AMD CPU: amd_iommu=on
  3. 重启服务器,检查iommu配置是否生效(dmesg | grep -i iommu,输出“Intel-IOMMU: enabled”表示生效)

grub配置iommu参考资料:https://www.jianshu.com/p/035287ba9acb 《CentOS7 minimal kvm iommu 辅助虚拟化 vt-x (用于pci透传)》

在宿主机上通过lsusb命令获取USB设备信息(yum install usbutils -y安装该工具):

Bus 002 Device 002: ID 8087:8002 Intel Corp. 
Bus 002 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub
Bus 001 Device 028: ID 413c:a001 Dell Computer Corp. Hub
Bus 001 Device 032: ID 0781:5581 SanDisk Corp. Ultra
Bus 001 Device 002: ID 8087:800a Intel Corp. 
Bus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub

字段信息解释:Bus:001,Device:032,vendor id:0781,product id:5581

libvirt配置修改:

在虚拟机的libvirt xml配置文件的<devices></devices>段内添加如下配置:

<hostdev mode='subsystem' type='usb' managed='yes'>
  <source>
    <vendor id='0x1780'/>
    <product id='0x0401'/>
  </source>
</hostdev>

vendor id和product id就是lsusb获取的需要直通的usb设备信息,之后启动虚拟机,正常情况下就可以在虚拟机里看到usb设备了。

注意事项:

有些usb设备不在windows设备管理器的“通用串行总线控制器”里面(U盘一般属于这个),比如我试过的USB无线网卡,是属于“网络适配器”,U盾则属于“DVD/CD-ROM驱动器”,而加密狗设备,则属于“人体学输入设备”(我用的加密狗是这个,不同的加密狗可能有区别)。要区分设备类型,可以用lsusb -t命令查看(比如我用的加密狗设备信息如下):

/:  Bus 07.Port 1: Dev 1, Class=root_hub, Driver=uhci_hcd/2p, 12M
    |__ Port 2: Dev 2, If 0, Class=Human Interface Device, Driver=usbfs, 12M

我就是通过Class=Human Interface Device,才想到它属于“人体学输入设备”的。

工行U盾直通效果:

另外nova里面(Mitaka版本)支持PCI设备的直通,但是usb设备好像还不支持,还没仔细研究过代码。

因此先临时用libvirt管理这台虚拟机了。注意如果手工修改了libvirt的xml配置,通过nova对虚拟机做操作,如reboot、stop/start、rebuild、resize等等会重置虚拟机的xml文件,相关usb配置都会丢失。(我是通过复制一份xml,修改掉uuid,让nova管理不了我这台虚拟机来解决的,用的系统盘、虚拟网卡还是nova创建的,nova看到的虚拟机永远是关机状态的就好了,它是用来为我手工管理的虚拟机占坑用的)。

 

OpenStack Trove&manila的网络依赖

这俩项目架构都差不多,

Trove是trove-api(接收用户请求)、 trove-taskmanager(用户管理操作逻辑适配层)、 trove-conductor(通过RPC接收数据库操作) trove-guestagent(运行在虚拟机里面,实际管理数据库实例,具有多种类型数据库driver以驱动不同数据库类型)。

https://docs.openstack.org/trove/latest/install/get_started.html

Manila是manila-api(接收用户请求)、manila-data(处理备份、迁移等数据相关逻辑)、manila-scheduler(调度文件共享服务节点,也即share service节点)、manila-share(文件共享服务节点,提供实际的共享文件服务,可以运行在物理机上或者虚拟机里)。

https://docs.openstack.org/manila/pike/install/get-started-with-shared-file-systems.html

这两个服务有一个共同点,多个子服务同时运行在物理机和虚拟机里面,这种场景下,就得考虑物理网络到虚拟网络的连通性问题,否则服务之间不能互通,肯定没法正常运行。2种服务的解决方案也比较类似,都是通过L2或者L3来打通物理和虚拟网络:

https://wiki.openstack.org/wiki/Manila/Networking

L2方式下,需要使用FLAT网络,所有物理机和虚拟机都在一个2层下,业务和管理数据都在一个平面,性能好,但是不安全,大规模环境下也存在网络广播风暴问题。

L3方式下,也有两种方法,虚拟路由和物理路由。虚拟路由模式下,服务提供节点(数据库的guestagent节点和共享文件服务节点)需要跟物理机上的管理服务互通,以便接收用户管理操作请求,但实际的业务面数据(客户端虚拟机到服务节点虚拟机,如读写数据库、读写共享文件)仍然是走的同一个私有网络(同一个network的subnet)。如果服务节点(提供数据库或共享文件服务的虚拟机)上有2个port,可以配置为1个租户私有网port,用来提供业务面的网络数据服务,另一个配置为service port(也即FLAT模式网络),用来提供跟物理机网络互通的控制面网络数据服务。如果服务节点上只有一个port,那就需要两个虚拟路由器,一个是租户私有网的,一个是服务网络,二者之间要通过一个interface来打通。

https://www.jianshu.com/p/d04f829e3330

http://ju.outofmemory.cn/entry/113174

我们用的是VLAN网络模式,这种模式下需要用到物理路由器,来打通各个VLAN的子网,这样就可以做到物理网络和虚拟网络(分属不同VLAN,可避免广播风暴问题),这种方案在中小规模私有云下,非常稳定可靠,性能也更好,也更接近传统IDC的网络模型,对传统企业的IT运维人员比较友好。

 

Gerrit删除一个review提交记录

举例:我想删除这个review提交,http://10.0.30.120/#/c/3018/,也即gerrit数据库change_id=3018的提交记录

目标效果:gerrit web页面上看不到这个提交,历史记录也没有它,上面的链接也打不开,但是如果是已经merge的提交,则git库中仍然还是有这个commit的。

网上找了好久,自己把数据库里面相关的几个表都清理了(删除了所有change_id=3018的记录),结果gerrit web页面上还是能看到它,只不过打开就报错了,唯一能想到的就是缓存问题了。

最后找到一篇这个:https://stackoverflow.com/questions/29575600/fully-delete-abandoned-commit-from-gerrit-db-and-query

关键是重建gerrit索引这一步:java -jar path/to/gerrit.war reindex -d path/to/gerrit-site-dir

kube-apiserver RestFul API route创建流程分析

看完《kubernetes权威指南》和《Go程序设计语言》两本书之后,终于可以进入实际的代码分析阶段,根据之前熟悉其他开源项目源码(如libvirt、OpenStack等)的经验,首先从接口/API开始分析。

k8s开发环境搭建:使用kubeasz快速搭建k8s集群all-in-one开发测试环境

Golang调试环境搭建:kubernetes源码调试体验

k8s源码版本:v1.10

分析API:GET 127.0.0.1:8080/api/v1/nodes/10.0.90.22(10.0.90.22是我环境中的一个node,8080为kubectl proxy API代理端口)

分析目标:搞清楚用户发送请求到这个API之后kube-apiserver的处理流程

参考资料:http://www.wklken.me/posts/2017/09/23/source-apiserver-04.html(还有前面几篇,这里面讲的是老版本的,有很多代码已经改了,但是主要流程值得参考)

源码流程很绕(至少目前我是这么认为,可能是因为我刚开始看源码),需要多看,多动手调试,看很多遍,调很多遍,整天琢磨它,应该都能看明白。

route注册({path} to {handler})

要搞清楚route注册流程,就必须先把go-restful框架的用法搞明白,官方Readme文档有说明,也有示例代码。这里给出上面提到的参考资料:http://www.wklken.me/posts/2017/09/23/source-apiserver-01.html

我们只需要记住,go-restful框架中route注册需要经过如下几个步骤:

  1. 创建一个container(默认使用default)
  2. 创建一个web service
  3. 创建handler(也就是API的请求处理函数)
  4. 把API path和handler绑定到web service
  5. 把web service(可多个,但root Path不能相同)绑定到container
  6. 启动包含container(可多个)的http server

我这边试验的一个简单的示例代码:

// https://github.com/emicklei/go-restful/blob/master/examples/restful-multi-containers.go
// GET http://localhost:8080/hello
// GET http://localhost:8081/hello
package main

import (
    "github.com/emicklei/go-restful"
    "io"
    "log"
    "net/http"
)

func main() {
    // add two web services to default container
    ws0 := new(restful.WebService)
    ws0.Path("/hello0")  // 这里/hello0就是ws0的root Path
    ws0.Route(ws0.GET("/").To(hello0)) // curl 127.0.0.1:9080/hello0
    restful.Add(ws0)

    ws1 := new(restful.WebService)
    ws1.Path("/hello1")
    ws1.Route(ws1.GET("/").To(hello1)) // curl 127.0.0.1:9080/hello1
    restful.Add(ws1)
    go func() {
        log.Fatal(http.ListenAndServe(":9080", nil))
    }()

    // container 2
    container2 := restful.NewContainer()
    ws2 := new(restful.WebService)
    ws2.Route(ws2.GET("/hello2").To(hello2)) // curl 127.0.0.1:9081/hello2
    container2.Add(ws2)
    server := &http.Server{Addr: ":9081", Handler: container2}
    log.Fatal(server.ListenAndServe())
}

func hello0(req *restful.Request, resp *restful.Response) {
    io.WriteString(resp, "default world 0")
}

func hello1(req *restful.Request, resp *restful.Response) {
    io.WriteString(resp, "default world 1")
}

func hello2(req *restful.Request, resp *restful.Response) {
    io.WriteString(resp, "second world")
}

 

我们下面所有的流程都以这个为基础进行分析。

kube-apiserver的main入口:

func main() {
	rand.Seed(time.Now().UTC().UnixNano())

	command := app.NewAPIServerCommand()

	......
	if err := command.Execute(); err != nil {
		fmt.Fprintf(os.Stderr, "error: %v\n", err)
		os.Exit(1)
	}
}

这里用到了github.com/spf13/cobra这个包来解析启动参数并启动可执行程序。

具体注册流程就不一步一步的分析了,直接根据断点的bt输出跟着代码查看吧:

(dlv) b k8s.io/kubernetes/pkg/registry/core/rest/storage_core.go:96
(dlv) r
(dlv) c
(dlv) bt
 0  0x0000000003f1373b in k8s.io/kubernetes/pkg/registry/core/rest.LegacyRESTStorageProvider.NewLegacyRESTStorage
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/registry/core/rest/storage_core.go:96
 1  0x000000000408ccc6 in k8s.io/kubernetes/pkg/master.(*Master).InstallLegacyAPI
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/master/master.go:369
 2  0x000000000408c263 in k8s.io/kubernetes/pkg/master.completedConfig.New
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/master/master.go:328
 3  0x00000000040a2eae in k8s.io/kubernetes/cmd/kube-apiserver/app.CreateKubeAPIServer
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:218
 4  0x00000000040a2681 in k8s.io/kubernetes/cmd/kube-apiserver/app.CreateServerChain
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:168
 5  0x00000000040a1e51 in k8s.io/kubernetes/cmd/kube-apiserver/app.Run
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:137
 6  0x00000000040acd57 in k8s.io/kubernetes/cmd/kube-apiserver/app.NewAPIServerCommand.func1
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:121
 7  0x0000000003c5c4e8 in k8s.io/kubernetes/vendor/github.com/spf13/cobra.(*Command).execute
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/github.com/spf13/cobra/command.go:757
 8  0x0000000003c5cf76 in k8s.io/kubernetes/vendor/github.com/spf13/cobra.(*Command).ExecuteC
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/github.com/spf13/cobra/command.go:843
 9  0x0000000003c5c7cf in k8s.io/kubernetes/vendor/github.com/spf13/cobra.(*Command).Execute
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/github.com/spf13/cobra/command.go:791
10  0x00000000040b16e5 in main.main
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/apiserver.go:51

NewLegacyRESTStorage这个方法(注意是LegacyRESTStorageProvider类型的方法),返回了3个参数,restStorage, apiGroupInfo, nil,最后一个是错误信息可忽略,第一个对我们流程分析没啥影响(应该是),中间这个apiGroupInfo是重点,InstallLegacyAPIGroup就是注册/api这个path的,apiPrefix这个参数是DefaultLegacyAPIPrefix = “/api” ,apiGroupInfo.PrioritizedVersions目前就”v1″一个版本。

//InstallLegacyAPI方法调用这个
func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
	if !s.legacyAPIGroupPrefixes.Has(apiPrefix) {
		return fmt.Errorf("%q is not in the allowed legacy API prefixes: %v", apiPrefix, s.legacyAPIGroupPrefixes.List())
	}
	if err := s.installAPIResources(apiPrefix, apiGroupInfo); err != nil {
		return err
	}

	// setup discovery
	apiVersions := []string{}
	for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
		apiVersions = append(apiVersions, groupVersion.Version)
	}
	// Install the version handler.
	// Add a handler at /<apiPrefix> to enumerate the supported api versions.
        // 这步比较简单,就不介绍了,是用来注册GET /api这个path的,用来返回所有api版本,
        // handler是k8s.io/apiserver/pkg/endpoints/discovery/legacy.go:handle方法
	s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix, apiVersions).WebService())

	return nil
}
// installAPIResources is a private method for installing the REST storage backing each api groupversionresource
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
	for _, groupVersion := range apiGroupInfo.PrioritizedVersions {//只有v1
		if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
			glog.Warningf("Skipping API %v because it has no resources.", groupVersion)
			continue
		}
                //注意这里,数据封装过程
		apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
		if apiGroupInfo.OptionsExternalVersion != nil {
			apiGroupVersion.OptionsExternalVersion = apiGroupInfo.OptionsExternalVersion
		}

		if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil {
			return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err)
		}
	}

	return nil
}

注意这里的getAPIGroupVersion方法,它把apiGroupInfo封装到了apiGroupVersion结构体里面,具体是apiGroupVersion.Storage,下面会用到:

func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupVersion schema.GroupVersion, apiPrefix string) *genericapi.APIGroupVersion {
	storage := make(map[string]rest.Storage)
        // 遍历restStorageMap加入到storage,从数据类型可以确认,类型都是map[string]rest.Storage
	for k, v := range apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version] {//groupVersion.Version就是v1
		storage[strings.ToLower(k)] = v //这里k是NewLegacyRESTStorage里restStorageMap的key,v是它的value
	}
	version := s.newAPIGroupVersion(apiGroupInfo, groupVersion)
	version.Root = apiPrefix
	version.Storage = storage
	return version
}
    ......
	restStorageMap := map[string]rest.Storage{
		"pods":             podStorage.Pod,
		"pods/attach":      podStorage.Attach,
		"pods/status":      podStorage.Status,
		"pods/log":         podStorage.Log,
		"pods/exec":        podStorage.Exec,
		"pods/portforward": podStorage.PortForward,
		"pods/proxy":       podStorage.Proxy,
		"pods/binding":     podStorage.Binding,
		"bindings":         podStorage.Binding,

		"podTemplates": podTemplateStorage,

		"replicationControllers":        controllerStorage.Controller,
		"replicationControllers/status": controllerStorage.Status,

		"services":        serviceRest,
		"services/proxy":  serviceRestProxy,
		"services/status": serviceStatusStorage,

		"endpoints": endpointsStorage,

		"nodes":        nodeStorage.Node,
		"nodes/status": nodeStorage.Status,
		"nodes/proxy":  nodeStorage.Proxy,

		"events": eventStorage,

		"limitRanges":                   limitRangeStorage,
		"resourceQuotas":                resourceQuotaStorage,
		"resourceQuotas/status":         resourceQuotaStatusStorage,
		"namespaces":                    namespaceStorage,
		"namespaces/status":             namespaceStatusStorage,
		"namespaces/finalize":           namespaceFinalizeStorage,
		"secrets":                       secretStorage,
		"serviceAccounts":               serviceAccountStorage,
		"persistentVolumes":             persistentVolumeStorage,
		"persistentVolumes/status":      persistentVolumeStatusStorage,
		"persistentVolumeClaims":        persistentVolumeClaimStorage,
		"persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
		"configMaps":                    configMapStorage,

		"componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
	}
    ......
    apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap
    ......

 

// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
// in a slash.
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
	prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
	installer := &APIInstaller{
		group:                        g, //g就是上面的apiGroupVersion,里面有Storage,g.Storage就是上面的restStorageMap 
		prefix:                       prefix,
		minRequestTimeout:            g.MinRequestTimeout,
		enableAPIResponseCompression: g.EnableAPIResponseCompression,
	}

	apiResources, ws, registrationErrors := installer.Install()//生成web service
	versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
	versionDiscoveryHandler.AddToWebService(ws) //path和handler绑定到web service
	container.Add(ws) //把web service加入到container
	return utilerrors.NewAggregate(registrationErrors)
}
(dlv) p installer
*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints.APIInstaller {
        group: *k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints.APIGroupVersion {
                Storage: map[string]k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/rest.Storage [...],
                Root: "/api",
                GroupVersion: (*k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime/schema.GroupVersion)(0xc42047a5b8),
                OptionsExternalVersion: *k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime/schema.GroupVersion nil,
                MetaGroupVersion: *k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime/schema.GroupVersion nil,
                Mapper: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/api/meta.RESTMapper(*k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/api/meta.DefaultRESTMapper) ...,
                Serializer: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.NegotiatedSerializer(k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime/serializer.CodecFactory) *(*"k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.NegotiatedSerializer")(0xc42047a5f8),
                ParameterCodec: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.ParameterCodec(*k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.parameterCodec) ...,
                Typer: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.ObjectTyper(*k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.Scheme) ...,
                Creater: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.ObjectCreater(*k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.Scheme) ...,
                Convertor: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.ObjectConvertor(*k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.Scheme) ...,
                Defaulter: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.ObjectDefaulter(*k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.Scheme) ...,
                Linker: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.SelfLinker(k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/api/meta.resourceAccessor) *(*"k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.SelfLinker")(0xc42047a658),
                UnsafeConvertor: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.ObjectConvertor(k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.unsafeObjectConvertor) *(*"k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.ObjectConvertor")(0xc42047a668),
                Admit: k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/admission.Interface(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/admission/metrics.pluginHandlerWithMetrics) ...,
                Context: k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/request.RequestContextMapper(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/request.requestContextMap) ...,
                MinRequestTimeout: 1800000000000,
                EnableAPIResponseCompression: false,},
        prefix: "/api/v1",
        minRequestTimeout: 1800000000000,
        enableAPIResponseCompression: false,}
(dlv) p g.GroupVersion
k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime/schema.GroupVersion {Group: "", Version: "v1"}
// Install handlers for API resources.
func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
	var apiResources []metav1.APIResource
	var errors []error
	ws := a.newWebService()

	// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
	paths := make([]string, len(a.group.Storage))
	var i int = 0
	for path := range a.group.Storage {
		paths[i] = path
		i++
	}
	sort.Strings(paths)
	for _, path := range paths {
		apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
		if err != nil {
			errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
		}
		if apiResource != nil {
			apiResources = append(apiResources, *apiResource)
		}
	}
	return apiResources, ws, errors
}
(dlv) p a.group.Storage //就是NewLegacyRESTStorage里的restStorageMap
map[string]k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/rest.Storage [
        "replicationcontrollers/status": *k8s.io/kubernetes/pkg/registry/core/replicationcontroller/storage.StatusREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc42116be00),}, 
        "services": *k8s.io/kubernetes/pkg/registry/core/service/storage.REST {
                services: k8s.io/kubernetes/pkg/registry/core/service/storage.ServiceStorage(*k8s.io/kubernetes/pkg/registry/core/service/storage.GenericREST) ...,
                endpoints: k8s.io/kubernetes/pkg/registry/core/service/storage.EndpointsStorage(*k8s.io/kubernetes/pkg/registry/core/endpoint/storage.REST) ...,
                serviceIPs: k8s.io/kubernetes/pkg/registry/core/service/ipallocator.Interface(*k8s.io/kubernetes/pkg/registry/core/service/ipallocator.Range) ...,
                serviceNodePorts: k8s.io/kubernetes/pkg/registry/core/service/portallocator.Interface(*k8s.io/kubernetes/pkg/registry/core/service/portallocator.PortAllocator) ...,
                proxyTransport: net/http.RoundTripper(*net/http.Transport) ...,
                pods: k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/rest.Getter(*k8s.io/kubernetes/pkg/registry/core/pod/storage.REST) ...,}, 
        "serviceaccounts": *k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7a00),
                Token: *k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage.TokenREST nil,}, 
        "events": *k8s.io/kubernetes/pkg/registry/core/event/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc4208cb700),}, 
        "persistentvolumeclaims/status": *k8s.io/kubernetes/pkg/registry/core/persistentvolumeclaim/storage.StatusREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420dd2a00),}, 
        "resourcequotas": *k8s.io/kubernetes/pkg/registry/core/resourcequota/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc4201c3d00),}, 
        "pods/exec": *k8s.io/kubernetes/pkg/registry/core/pod/rest.ExecREST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300),
                KubeletConn: k8s.io/kubernetes/pkg/kubelet/client.ConnectionInfoGetter(*k8s.io/kubernetes/pkg/kubelet/client.NodeConnectionInfoGetter) ...,}, 
        "persistentvolumes/status": *k8s.io/kubernetes/pkg/registry/core/persistentvolume/storage.StatusREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420dd2300),}, 
        "componentstatuses": *k8s.io/kubernetes/pkg/registry/core/componentstatus.REST {GetServersToValidate: k8s.io/kubernetes/pkg/registry/core/rest.(componentStatusStorage).(k8s.io/kubernetes/pkg/registry/core/rest.serversToValidate)-fm}, 
        "pods/status": *k8s.io/kubernetes/pkg/registry/core/pod/storage.StatusREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7900),}, 
        "pods/binding": *k8s.io/kubernetes/pkg/registry/core/pod/storage.BindingREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300),}, 
        "pods/portforward": *k8s.io/kubernetes/pkg/registry/core/pod/rest.PortForwardREST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300),
                KubeletConn: k8s.io/kubernetes/pkg/kubelet/client.ConnectionInfoGetter(*k8s.io/kubernetes/pkg/kubelet/client.NodeConnectionInfoGetter) ...,}, 
        "namespaces/finalize": *k8s.io/kubernetes/pkg/registry/core/namespace/storage.FinalizeREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc42116b900),}, 
        "resourcequotas/status": *k8s.io/kubernetes/pkg/registry/core/resourcequota/storage.StatusREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc42116af00),}, 
        "nodes": *k8s.io/kubernetes/pkg/registry/core/node/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef6c00),
                connection: k8s.io/kubernetes/pkg/kubelet/client.ConnectionInfoGetter(*k8s.io/kubernetes/pkg/kubelet/client.NodeConnectionInfoGetter) ...,
                proxyTransport: net/http.RoundTripper(*net/http.Transport) ...,}, 
        "namespaces": *k8s.io/kubernetes/pkg/registry/core/namespace/storage.REST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420dd3100),
                status: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc42116b800),}, 
        "podtemplates": *k8s.io/kubernetes/pkg/registry/core/podtemplate/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc4208cb100),}, 
        "persistentvolumeclaims": *k8s.io/kubernetes/pkg/registry/core/persistentvolumeclaim/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420dd2400),}, 
        "replicationcontrollers": *k8s.io/kubernetes/pkg/registry/core/replicationcontroller/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc4200a8300),}, 
        "nodes/status": *k8s.io/kubernetes/pkg/registry/core/node/storage.StatusREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7200),}, 
        "configmaps": *k8s.io/kubernetes/pkg/registry/core/configmap/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420dd2b00),}, 
        "nodes/proxy": *k8s.io/kubernetes/pkg/registry/core/node/rest.ProxyREST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef6c00),
                Connection: k8s.io/kubernetes/pkg/kubelet/client.ConnectionInfoGetter(*k8s.io/kubernetes/pkg/kubelet/client.NodeConnectionInfoGetter) ...,
                ProxyTransport: net/http.RoundTripper(*net/http.Transport) ...,}, 
        "pods/eviction": *k8s.io/kubernetes/pkg/registry/core/pod/storage.EvictionREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300),
                podDisruptionBudgetClient: k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion.PodDisruptionBudgetsGetter(*k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion.PolicyClient) ...,}, 
        "secrets": *k8s.io/kubernetes/pkg/registry/core/secret/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc42116b000),}, 
        "pods/proxy": *k8s.io/kubernetes/pkg/registry/core/pod/rest.ProxyREST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300),
                ProxyTransport: net/http.RoundTripper(*net/http.Transport) ...,}, 
        "limitranges": *k8s.io/kubernetes/pkg/registry/core/limitrange/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc4201c3200),}, 
        "bindings": *k8s.io/kubernetes/pkg/registry/core/pod/storage.BindingREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300),}, 
        "services/status": *k8s.io/kubernetes/pkg/registry/core/service/storage.StatusREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc4213acf00),}, 
        "endpoints": *k8s.io/kubernetes/pkg/registry/core/endpoint/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc42116ba00),}, 
        "pods/log": *k8s.io/kubernetes/pkg/registry/core/pod/rest.LogREST {
                KubeletConn: k8s.io/kubernetes/pkg/kubelet/client.ConnectionInfoGetter(*k8s.io/kubernetes/pkg/kubelet/client.NodeConnectionInfoGetter) ...,
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300),}, 
        "services/proxy": *k8s.io/kubernetes/pkg/registry/core/service.ProxyREST {
                Redirector: k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/rest.Redirector(*k8s.io/kubernetes/pkg/registry/core/service/storage.REST) ...,
                ProxyTransport: net/http.RoundTripper(*net/http.Transport) ...,}, 
        "namespaces/status": *k8s.io/kubernetes/pkg/registry/core/namespace/storage.StatusREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc42116b800),}, 
        "pods/attach": *k8s.io/kubernetes/pkg/registry/core/pod/rest.AttachREST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300),
                KubeletConn: k8s.io/kubernetes/pkg/kubelet/client.ConnectionInfoGetter(*k8s.io/kubernetes/pkg/kubelet/client.NodeConnectionInfoGetter) ...,}, 
        "pods": *k8s.io/kubernetes/pkg/registry/core/pod/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300),
                proxyTransport: net/http.RoundTripper(*net/http.Transport) ...,}, 
        "replicationcontrollers/scale": *k8s.io/kubernetes/pkg/registry/core/replicationcontroller/storage.ScaleREST {
                registry: k8s.io/kubernetes/pkg/registry/core/replicationcontroller.Registry(*k8s.io/kubernetes/pkg/registry/core/replicationcontroller.storage) ...,}, 
        "persistentvolumes": *k8s.io/kubernetes/pkg/registry/core/persistentvolume/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc4200a9d00),}, 
]

paths变量就是上面NewLegacyRESTStorage里restStorageMap map的key,也即”pods”、”nodes”等path。a.registerResourceHandlers()就是注册各个path的handler,也即restStorageMap的value。

// 参数storage是restStorageMap的value,path是key
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {
	......
	// what verbs are supported by the storage, used to know what verbs we support per path
	creater, isCreater := storage.(rest.Creater)
	namedCreater, isNamedCreater := storage.(rest.NamedCreater)
	lister, isLister := storage.(rest.Lister)
	getter, isGetter := storage.(rest.Getter)
	getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
	gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
	collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
	updater, isUpdater := storage.(rest.Updater)
	patcher, isPatcher := storage.(rest.Patcher)
	watcher, isWatcher := storage.(rest.Watcher)
	connecter, isConnecter := storage.(rest.Connecter)
	storageMeta, isMetadata := storage.(rest.StorageMetadata)
        ......
	default:
		namespaceParamName := "namespaces"
		// Handler for standard REST verbs (GET, PUT, POST and DELETE).
		namespaceParam := ws.PathParameter("namespace", "object name and auth scope, such as for teams and projects").DataType("string")
		namespacedPath := namespaceParamName + "/{" + "namespace" + "}/" + resource
		namespaceParams := []*restful.Parameter{namespaceParam}

		resourcePath := namespacedPath
		resourceParams := namespaceParams
		itemPath := namespacedPath + "/{name}"
		nameParams := append(namespaceParams, nameParam)
		proxyParams := append(nameParams, pathParam)
		itemPathSuffix := ""
		if isSubresource {
			itemPathSuffix = "/" + subresource
			itemPath = itemPath + itemPathSuffix
			resourcePath = itemPath
			resourceParams = nameParams
		}
		apiResource.Name = path
		apiResource.Namespaced = true
		apiResource.Kind = resourceKind
		namer := handlers.ContextBasedNaming{
			SelfLinker:         a.group.Linker,
			ClusterScoped:      false,
			SelfLinkPathPrefix: gpath.Join(a.prefix, namespaceParamName) + "/",
			SelfLinkPathSuffix: itemPathSuffix,
		}

		actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
		actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
		actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
		// DEPRECATED
		actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)

		actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
		if getSubpath {
			actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
		}
		actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
		actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
		actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)
		actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
		actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
		actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)

		// list or post across namespace.
		// For ex: LIST all pods in all namespaces by sending a LIST request at /api/apiVersion/pods.
		// TODO: more strongly type whether a resource allows these actions on "all namespaces" (bulk delete)
		if !isSubresource {
			actions = appendIf(actions, action{"LIST", resource, params, namer, true}, isLister)
			actions = appendIf(actions, action{"WATCHLIST", "watch/" + resource, params, namer, true}, allowWatchList)
		}
		break
	}

        ......
	for _, action := range actions {
		producedObject := storageMeta.ProducesObject(action.Verb)
		if producedObject == nil {
			producedObject = defaultVersionedObject
		}
		reqScope.Namer = action.Namer

		requestScope := "cluster"
		var namespaced string
		var operationSuffix string
		if apiResource.Namespaced {
			requestScope = "namespace"
			namespaced = "Namespaced"
		}
		if strings.HasSuffix(action.Path, "/{path:*}") {
			requestScope = "resource"
			operationSuffix = operationSuffix + "WithPath"
		}
		if action.AllNamespaces {
			requestScope = "cluster"
			operationSuffix = operationSuffix + "ForAllNamespaces"
			namespaced = ""
		}

		if kubeVerb, found := toDiscoveryKubeVerb[action.Verb]; found {
			if len(kubeVerb) != 0 {
				kubeVerbs[kubeVerb] = struct{}{}
			}
		} else {
			return nil, fmt.Errorf("unknown action verb for discovery: %s", action.Verb)
		}

		routes := []*restful.RouteBuilder{}

		// If there is a subresource, kind should be the parent's kind.
		if isSubresource {
			parentStorage, ok := a.group.Storage[resource]
			if !ok {
				return nil, fmt.Errorf("missing parent storage: %q", resource)
			}

			fqParentKind, err := a.getResourceKind(resource, parentStorage)
			if err != nil {
				return nil, err
			}
			kind = fqParentKind.Kind
		}

		verbOverrider, needOverride := storage.(StorageMetricsOverride)

		switch action.Verb {
		case "GET": // Get a resource.
			var handler restful.RouteFunction //生成handler,用的是上面的getter(=storage.(rest.Getter))
			if isGetterWithOptions {
				handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
			} else {
				handler = restfulGetResource(getter, exporter, reqScope)
			}

			if needOverride {
				// need change the reported verb
				handler = metrics.InstrumentRouteFunc(verbOverrider.OverrideMetricsVerb(action.Verb), resource, subresource, requestScope, handler)
			} else {
				handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, handler)
			}

			if a.enableAPIResponseCompression {
				handler = genericfilters.RestfulWithCompression(handler)
			}
			doc := "read the specified " + kind
			if isSubresource {
				doc = "read " + subresource + " of the specified " + kind
			}
			route := ws.GET(action.Path).To(handler). //绑定path、handler到web service
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
				Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
				Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
				Returns(http.StatusOK, "OK", producedObject).
				Writes(producedObject)
			if isGetterWithOptions {
				if err := addObjectParams(ws, route, versionedGetOptions); err != nil {
					return nil, err
				}
			}
			if isExporter {
				if err := addObjectParams(ws, route, versionedExportOptions); err != nil {
					return nil, err
				}
			}
			addParams(route, action.Params)
			routes = append(routes, route)
		......
	return &apiResource, nil
}

下面分析handler具体是什么方法?以restfulGetResource为例:

func restfulGetResource(r rest.Getter, e rest.Exporter, scope handlers.RequestScope) restful.RouteFunction {
	return func(req *restful.Request, res *restful.Response) {
		handlers.GetResource(r, e, scope)(res.ResponseWriter, req.Request)
	} //return的这个函数就是被注册的handler(根据两个入参也可确定),
          //里面又调用了其他函数handlers.GetResource,它返回一个函数,并被调用,参数是(res.ResponseWriter, req.Request)
          //改成这种写法可能更容易看明白:
          //    myGetHandler := handlers.GetResource(r, e, scope)
          //    myGetHandler(res.ResponseWriter, req.Request) //这一步是真正处理Get请求的函数
}
// GetResource returns a function that handles retrieving a single resource from a rest.Storage object.
// 看上面这行注释,GetResource返回的函数就是处理GET请求的具体执行函数,也即上面说的myGetHandler
func GetResource(r rest.Getter, e rest.Exporter, scope RequestScope) http.HandlerFunc {
	return getResourceHandler(scope, // 注意这里函数套函数,下面的匿名函数是作为参数传递给getResourceHandler的,
                                         // 最先执行的是getResourceHandler,里面会调用下面这个匿名函数
		func(ctx context.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error) {
			// check for export
			options := metav1.GetOptions{}
			if values := req.URL.Query(); len(values) > 0 {
				exports := metav1.ExportOptions{}
				if err := metainternalversion.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, &exports); err != nil {
					err = errors.NewBadRequest(err.Error())
					return nil, err
				}
				if exports.Export {
					if e == nil {
						return nil, errors.NewBadRequest(fmt.Sprintf("export of %q is not supported", scope.Resource.Resource))
					}
					return e.Export(ctx, name, exports)
				}
				if err := metainternalversion.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, &options); err != nil {
					err = errors.NewBadRequest(err.Error())
					return nil, err
				}
			}
			if trace != nil {
				trace.Step("About to Get from storage")
			}
                        // 这一步是最终调用的关键方法,会真正转到资源对象的Get请求处理方法
			return r.Get(ctx, name, &options)
		})
}
// getResourceHandler is an HTTP handler function for get requests. It delegates to the
// passed-in getterFunc to perform the actual get.
func getResourceHandler(scope RequestScope, getter getterFunc) http.HandlerFunc {
	return func(w http.ResponseWriter, req *http.Request) {
		trace := utiltrace.New("Get " + req.URL.Path)
		defer trace.LogIfLong(500 * time.Millisecond)

		namespace, name, err := scope.Namer.Name(req)
		if err != nil {
			scope.err(err, w, req)
			return
		}
		ctx := req.Context()
		ctx = request.WithNamespace(ctx, namespace)
                // 这里调用上面提到的作为参数传入的匿名函数,获取请求处理结果
		result, err := getter(ctx, name, req, trace)
		if err != nil {
			scope.err(err, w, req)
			return
		}
		requestInfo, ok := request.RequestInfoFrom(ctx)
		if !ok {
			scope.err(fmt.Errorf("missing requestInfo"), w, req)
			return
		}
		if err := setSelfLink(result, requestInfo, scope.Namer); err != nil {
			scope.err(err, w, req)
			return
		}

		trace.Step("About to write a response")
		transformResponseObject(ctx, scope, req, w, http.StatusOK, result)
	}
}

也即r.Get是最终的实际请求处理函数,它是怎么来的?

r就是getter也就是storage.(rest.Getter),上面提到过参数storage是restStorageMap的value,path是key,针对我们分析的API对应”nodes”: nodeStorage.Node(storage == nodeStorage.Node)。rest.Getter是一个go-restful里面定义的接口,接口也是一种类型,根据Go语言的接口类型定义,接口即约定,可以在任何地方(包)为任何数据类型(int、string、struct等等)实现接口(也即实现接口约定的具有指定参数类型和返回类型的函数),我们看下这个接口的定义:

// Getter is an object that can retrieve a named RESTful resource.
type Getter interface {
	// Get finds a resource in the storage by name and returns it.
	// Although it can return an arbitrary error value, IsNotFound(err) is true for the
	// returned error value err when the specified resource is not found.
	Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error)
}

再看下storage也即nodeStorage.Node有没有实现它,很郁闷,没找到,但是不要灰心,想想看,struct是可以嵌套的(类似父子类的继承关系),并且支持匿名成员,使用匿名成员可以省略中间struct的名称(尤其是匿名成员根本没有嵌套的中间struct变量名可用)(如果没印象了,可以翻看一下《Go程序设计语言》),看看Node的嵌套struct里面有没有实现接口,

// NewStorage returns a NodeStorage object that will work against nodes.
func NewStorage(optsGetter generic.RESTOptionsGetter, kubeletClientConfig client.KubeletClientConfig, proxyTransport http.RoundTripper) (*NodeStorage, error) {
	store := &genericregistry.Store{ // 注意这个被嵌套的struct是关键
        ......
	// Set up REST handlers
	nodeREST := &REST{Store: store, proxyTransport: proxyTransport} // 在这个struct和它嵌套的struct里面找Get接口实现
        ......
	return &NodeStorage{
		Node:   nodeREST,  //注意这个变量,是一个嵌套struct
		Status: statusREST,
		Proxy:  proxyREST,
		KubeletConnectionInfo: connectionInfoGetter,
	}, nil
}

对,最终我们在genericregistry.Store这个struct定义的包文件里面找到了它的Get接口实现方法:

// Get retrieves the item from storage.
func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
	obj := e.NewFunc()
	key, err := e.KeyFunc(ctx, name)
	if err != nil {
		return nil, err
	}
        // 又是一个接口,总之你发送Get请求给apiserver就会在这里被处理
	if err := e.Storage.Get(ctx, key, options.ResourceVersion, obj, false); err != nil {
		return nil, storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name)
	}
	if e.Decorator != nil {
		if err := e.Decorator(obj); err != nil {
			return nil, err
		}
	}
	return obj, nil
}

下面继续找e.Storage.Get,其实根据接口的定义,我已经找到了真正的后端实现了,k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3.(*store).Get,为啥是etcd3?需要分析etcd存储后端(k8s元数据存储后端)类型注册(默认是etcd3后端)过程:

=>  30: func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
    31:         switch c.Type {
    32:         case storagebackend.StorageTypeETCD2:
    33:                 return newETCD2Storage(c)
    34:         case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
    35:                 // TODO: We have the following features to implement:
(dlv) bt
0 0x0000000001af76eb in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory.Create
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go:30
1 0x0000000001af8614 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic.NewRawStorage
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go:55
2 0x0000000001d26fc7 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.StorageWithCacher.func1
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go:43
3 0x0000000001d2591d in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.(*Store).CompleteWithOptions
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go:1362
// 注意create会每个path都进来,需要区分path类型,如这里是node
4 0x0000000003eb072d in k8s.io/kubernetes/pkg/registry/core/node/storage.NewStorage
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/registry/core/node/storage/storage.go:91
5 0x0000000003f13f77 in k8s.io/kubernetes/pkg/registry/core/rest.LegacyRESTStorageProvider.NewLegacyRESTStorage
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/registry/core/rest/storage_core.go:130
6 0x000000000408ccc6 in k8s.io/kubernetes/pkg/master.(*Master).InstallLegacyAPI
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/master/master.go:369
7 0x000000000408c263 in k8s.io/kubernetes/pkg/master.completedConfig.New
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/master/master.go:328
8 0x00000000040a2eae in k8s.io/kubernetes/cmd/kube-apiserver/app.CreateKubeAPIServer
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:218
9 0x00000000040a2681 in k8s.io/kubernetes/cmd/kube-apiserver/app.CreateServerChain
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:168
10 0x00000000040a1e51 in k8s.io/kubernetes/cmd/kube-apiserver/app.Run
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:137(dlv) p c.Type
""  // 对应storagebackend.StorageTypeUnset,选择默认的etcd3后端

fff但是怎么反推回来handler?也就是e.Storage.Get,最简单的办法当然是加断点调试,调试结果在下面的请求处理部分有贴出来,这里就不贴了。但是看了之后还是有疑问,怎么从e.Storage.Get走到etcd3的Get方法的?

(dlv) bt
 0  0x0000000001ae5358 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3.newStore
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go:99
 1  0x0000000001ae521d in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3.New
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go:90
 2  0x0000000001af7541 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory.newETCD3Storage
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go:78
 3  0x0000000001af77d7 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory.Create
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go:39
 // 注意这里,下面分析从Get方法cacher.go到etcd3.go有用到
 4  0x0000000001af8614 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic.NewRawStorage
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go:55
 5  0x0000000001d26fc7 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.StorageWithCacher.func1
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go:43
 6  0x0000000001d2591d in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.(*Store).CompleteWithOptions
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go:1362
 7  0x0000000003f0f875 in k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage.NewREST
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage/storage.go:52
 8  0x0000000003f16aa3 in k8s.io/kubernetes/pkg/registry/core/rest.LegacyRESTStorageProvider.NewLegacyRESTStorage
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/registry/core/rest/storage_core.go:146
 9  0x000000000408ccc6 in k8s.io/kubernetes/pkg/master.(*Master).InstallLegacyAPI
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/master/master.go:369
10  0x000000000408c263 in k8s.io/kubernetes/pkg/master.completedConfig.New
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/master/master.go:328
> k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.(*Store).CompleteWithOptions() /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go:1292 (hits goroutine(1):2 total:2) (PC: 0x1d25303)
1287: GetAttrs: attrFunc,
1288: }
1289: }
1290: }
1291: // 关键在下面这行,它里面创建了opts,也就是后面用到的e.Storage的来源
=>1292: opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
1293: if err != nil {
1294: return err
1295: }
1296:
1297: // ResourcePrefix must come from the underlying factory
(dlv) whatis options.RESTOptions // 这里没搞明白,怎么就变成了*storageFactoryRestOptionsFactory类型?
k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic.RESTOptionsGetter
Concrete type: *k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server/options.storageFactoryRestOptionsFactory

(dlv) 
> k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.(*Store).CompleteWithOptions() /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go:1361 (PC: 0x1d25808)
1356: }
1357: return accessor.GetName(), nil
1358: }
1359: }
1360:
=>1361: if e.Storage == nil {
1362: e.Storage, e.DestroyFunc = opts.Decorator( // 用opts.Decorator()生成e.Storage
1363: opts.StorageConfig,
1364: e.NewFunc(),
1365: prefix,
1366: keyFunc,
// 没搞明白为啥调用这个接口实现?GetRESTOptions接口有很多实现,为啥走这一个?关键还是上面的类型没搞明白
// 这部分没理解应该是因为Go语言没学透彻,比较现学现卖
func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
	storageConfig, err := f.StorageFactory.NewConfig(resource)
	if err != nil {
		return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
	}

	ret := generic.RESTOptions{
		StorageConfig:           storageConfig,
		Decorator:               generic.UndecoratedStorage,
		DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
		EnableGarbageCollection: f.Options.EnableGarbageCollection,
		ResourcePrefix:          f.StorageFactory.ResourcePrefix(resource),
		CountMetricPollPeriod:   f.Options.StorageConfig.CountMetricPollPeriod,
	}
	if f.Options.EnableWatchCache {
		sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
		if err != nil {
			return generic.RESTOptions{}, err
		}
		cacheSize, ok := sizes[resource]
		if !ok {
			cacheSize = f.Options.DefaultWatchCacheSize
		}
                // 找到了opts.Decorator的来源
		ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
	}

	return ret, nil
}
// Creates a cacher based given storageConfig.
func StorageWithCacher(capacity int) generic.StorageDecorator {
	return func( // 这个匿名函数就是生成e.Storage的opts.Decorator
		storageConfig *storagebackend.Config,
		objectType runtime.Object,
		resourcePrefix string,
		keyFunc func(obj runtime.Object) (string, error),
		newListFunc func() runtime.Object,
		getAttrsFunc storage.AttrFunc,
		triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
                // 注意这行,下面有用到s
		s, d := generic.NewRawStorage(storageConfig)
		if capacity == 0 {
			glog.V(5).Infof("Storage caching is disabled for %T", objectType)
			return s, d
		}
		glog.V(5).Infof("Storage caching is enabled for %T with capacity %v", objectType, capacity)

		// TODO: we would change this later to make storage always have cacher and hide low level KV layer inside.
		// Currently it has two layers of same storage interface -- cacher and low level kv.
		cacherConfig := storage.CacherConfig{
			CacheCapacity:        capacity,
			Storage:              s,  // 下面会用到它
			Versioner:            etcdstorage.APIObjectVersioner{},
			Type:                 objectType,
			ResourcePrefix:       resourcePrefix,
			KeyFunc:              keyFunc,
			NewListFunc:          newListFunc,
			GetAttrsFunc:         getAttrsFunc,
			TriggerPublisherFunc: triggerFunc,
			Codec:                storageConfig.Codec,
		}
                // 这个cacher就是e.Storage,类型是storage.Interface
                // 这部分也没太理解,这里生成的是一个结构体指针,为啥外面变成了storage.Interface?
                // 应该还是Go语言没学明白,大致理解是结构体实现了这几个接口,所以可以互相转换
		cacher := storage.NewCacherFromConfig(cacherConfig)
		destroyFunc := func() {
			cacher.Stop()
			d()
		}

		// TODO : Remove RegisterStorageCleanup below when PR
		// https://github.com/kubernetes/kubernetes/pull/50690
		// merges as that shuts down storage properly
		RegisterStorageCleanup(destroyFunc)

		return cacher, destroyFunc
	}
}

NewCacherFromConfig返回的Cache结构体指针,结构体实现了storage.Interface定义的各个接口,

// Create a new Cacher responsible for servicing WATCH and LIST requests from
// its internal cache and updating its cache in the background based on the
// given configuration.
func NewCacherFromConfig(config CacherConfig) *Cacher {
	watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc)
	listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
	reflectorName := "storage/cacher.go:" + config.ResourcePrefix

	// Give this error when it is constructed rather than when you get the
	// first watch item, because it's much easier to track down that way.
	if obj, ok := config.Type.(runtime.Object); ok {
		if err := runtime.CheckCodec(config.Codec, obj); err != nil {
			panic("storage codec doesn't seem to match given type: " + err.Error())
		}
	}

	stopCh := make(chan struct{})
	cacher := &Cacher{
		ready:       newReady(),
		storage:     config.Storage, // 注意这个成员,下面用到

所以e.Storage.Get就是k8s.io/apiserver/pkg/storage/cacher.go:Get方法,它里面又调用了etcd3后端的Get方法:

// Implements storage.Interface.
func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
	if resourceVersion == "" {
		// If resourceVersion is not specified, serve it from underlying
		// storage (for backward compatibility).
                // c.storage是啥上面有提到
		return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
	}

c.storage==config.Storage由NewRawStorage生成,根据上面的调试结果可以确定它最终调用到k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3.newStore,所以c.storage.Get调用的就是:

// Get implements storage.Interface.Get.
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
	key = path.Join(s.pathPrefix, key) // key是/registry/
        // 通过etcd client根据key从etcd后端获取数据value
	getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
	if err != nil {
		return err
	}

	if len(getResp.Kvs) == 0 {
		if ignoreNotFound {
			return runtime.SetZeroValue(out)
		}
		return storage.NewKeyNotFoundError(key, 0)
	}
	kv := getResp.Kvs[0]

	data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
	if err != nil {
		return storage.NewInternalError(err.Error())
	}

	return decode(s.codec, s.versioner, data, out, kv.ModRevision)
}

这个Get就是最终处理用户发送的Get API请求的位置。

> k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3.(*store).Get() /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go:127 (hits goroutine(2632):1 total:12) (PC: 0x1ae584b)
   122: func (s *store) Versioner() storage.Versioner {
   123:         return s.versioner
   124: }
   125:
   126: // Get implements storage.Interface.Get.
=> 127: func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
   128:         key = path.Join(s.pathPrefix, key)
   129:         getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
   130:         if err != nil {
   131:                 return err
   132:         }
(dlv) p key
"/minions/10.0.90.22"
(dlv) p s.pathPrefix
"/registry"
......
(dlv) n
> k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3.(*store).Get() /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go:129 (PC: 0x1ae5977)
   124: }
   125:
   126: // Get implements storage.Interface.Get.
   127: func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
   128:         key = path.Join(s.pathPrefix, key)
=> 129:         getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
   130:         if err != nil {
   131:                 return err
   132:         }
   133:
   134:         if len(getResp.Kvs) == 0 {
(dlv) p key    // node信息在etcd数据库里面的key
"/registry/minions/10.0.90.22"

 

请求处理(HTTP RestFul request to {handler})

这部分就不多说了,直接看调用栈吧:

(dlv) bt
0 0x0000000001ae584b in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3.(*store).Get
at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go:127
1 0x00000000016cbcdc in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage.(*Cacher).Get
at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/cacher.go:357
2 0x0000000001d1eecb in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.(*Store).Get
at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go:674
3 0x0000000001c9b304 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/handlers.GetResource.func1
at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/handlers/get.go:106
4 0x0000000001c9a6f9 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/handlers.getResourceHandler.func1
at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/handlers/get.go:60
5 0x0000000001cc290b in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints.restfulGetResource.func1
at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/installer.go:1058
6 0x00000000016dfc55 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/metrics.InstrumentRouteFunc.func1
at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go:199
7 0x0000000001511e53 in k8s.io/kubernetes/vendor/github.com/emicklei/go-restful.(*Container).dispatch
at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/github.com/emicklei/go-restful/container.go:277
8 0x0000000001510e35 in k8s.io/kubernetes/vendor/github.com/emicklei/go-restful.(*Container).Dispatch
at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/github.com/emicklei/go-restful/container.go:199
9 0x0000000001d022de in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server.director.ServeHTTP
at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server/handler.go:152
10 0x0000000001d09ea2 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server.(*director).ServeHTTP
at /root/k8s/kubernetes/<autogenerated>:1

 

未解决的问题

  • 上面代码分析过程中没搞清楚的两个Go语言知识点,都是关于接口的
  • container到http server,这个应该不复杂,不在本次代码分析的目标之内,就没关注
  • 各种filter注册和使用,同上,没关注
  • etcd部署及使用,etcd在k8s中的使用,这部分属于扩展知识,有时间再看下

跟OpenStack API处理流程的比较

最大的差异当然还是语言上的,python和Go还是有点不太一样的,所以用到的RestFul HTTP Server的框架也不一样,route定义和注册流程也差别比较大,当然还是习惯问题,如果整天看这些代码,用这些框架,也就不会有刚开始看代码时很强烈的差异感了。犹记得当年刚开始看OpenStack nova-api route注册转发流程也是一脸懵逼好久好久。。。

其他方面就是交互流程不一样,当然Get方法差不多,都是接收用户API请求,然后分发到具体的处理方法(route到controller或handler),之后controller或handler从后端数据库(MySQL或etcd)查询用户请求的数据,最后封装返回给用户,再由controller或handler获取请求数据之前还会对用户请求进行多次filter过程,把不合法或不合适的请求过滤掉,只允许合法合适(如未被限流)的请求被controller或handler处理并正常返回用户。

而Create方法,则差异比较大,OpenStack一般是用消息队列进行消息传递,从API服务把具体执行的动作RPC到其他服务(如调度服务、计算节点管理服务等),动作执行过程中或者执行完毕也会通过RPC更新操作状态到数据库(当然最开始是直接访问数据库,后面为了安全改为经过RPC)。而k8s则是完全通过etcd来完成各个组件之间的异步交互,通过watch各自关系的key来实现消息传递和异步调用,操作状态更新也是通过更新etcd来实现。

 

nova、cinder的snapshot与ceph相关交互

其实这两块代码已经看过很多遍了,但是经常忘记,而且经常有同事问起来,每次都要翻一遍代码,因此决定整理下相关交互流程,后续备查。(注:仍然是基于Mitaka版本分析源码,Queens版本粗略看了下改动不大)

先说nova部分,nova的snapshot主要是针对系统盘的(root disk),相关命令行是:

[root@vs2-compute-84 ~]# nova help backup
usage: nova backup <server> <name> <backup-type> <rotation>

Backup a server by creating a 'backup' type snapshot.

Positional arguments:
  <server>       Name or ID of server.
  <name>         Name of the backup image.
  <backup-type>  The backup type, like "daily" or "weekly".
  <rotation>     Int parameter representing how many backups to keep around.
[root@vs2-compute-84 ~]# nova help image-create
usage: nova image-create [--metadata <key=value>] [--show] [--poll]
                         <server> <name>

Create a new image by taking a snapshot of a running server.

Positional arguments:
  <server>                Name or ID of server.
  <name>                  Name of snapshot.

Optional arguments:
  --metadata <key=value>  Record arbitrary key/value metadata to
                          /meta_data.json on the metadata server. Can be
                          specified multiple times.
  --show                  Print image info.
  --poll                  Report the snapshot progress and poll until image
                          creation is complete.

从nova源码来看,backup和image(snapshot)的区别只是多了个rotation的概念,多出的这个概念主要是用来限制备份数量的(超过这个数量的backup会被滚动删除,肯定是先删除最老的),你如果把rotation设置的很大,那它就跟image没什么区别了,nova后端的代码也是一套(api入口不一样,但是到了nova compute manager那层就没什么区别了)

    @wrap_exception()
    @reverts_task_state
    @wrap_instance_fault
    def backup_instance(self, context, image_id, instance, backup_type,
                        rotation):
        """Backup an instance on this host.

        :param backup_type: daily | weekly
        :param rotation: int representing how many backups to keep around
        """
        self._do_snapshot_instance(context, image_id, instance, rotation)
        self._rotate_backups(context, instance, backup_type, rotation)

    @delete_image_on_error
    def _do_snapshot_instance(self, context, image_id, instance, rotation):
        self._snapshot_instance(context, image_id, instance,
                                task_states.IMAGE_BACKUP)
    @wrap_exception()
    @reverts_task_state
    @wrap_instance_fault
    @delete_image_on_error
    def snapshot_instance(self, context, image_id, instance):
        """Snapshot an instance on this host.

        :param context: security context
        :param instance: a nova.objects.instance.Instance object
        :param image_id: glance.db.sqlalchemy.models.Image.Id
        """
        ......

        self._snapshot_instance(context, image_id, instance,
                                task_states.IMAGE_SNAPSHOT)

从上面源码可见二者调用的具体实现没有区别。

_snapshot_instance调用了libvirt driver的snapshot方法,这里面区分了live和cold的snapshot类型,并且还区分了direct snapshot和外部快照,ceph后端是用的direct snapshot,也即通过ceph的rbd image相关api来做快照。

        try:
            update_task_state(task_state=task_states.IMAGE_UPLOADING,
                              expected_state=task_states.IMAGE_PENDING_UPLOAD)
            metadata['location'] = snapshot_backend.direct_snapshot(
                context, snapshot_name, image_format, image_id,
                instance.image_ref)
            self._snapshot_domain(context, live_snapshot, virt_dom, state,
                                  instance)
            self._image_api.update(context, image_id, metadata,
                                   purge_props=False)
        except (NotImplementedError, exception.ImageUnacceptable,
                exception.Forbidden) as e:
    def direct_snapshot(self, context, snapshot_name, image_format,
                        image_id, base_image_id):
        """Creates an RBD snapshot directly.
        """
        fsid = self.driver.get_fsid()
        # NOTE(nic): Nova has zero comprehension of how Glance's image store
        # is configured, but we can infer what storage pool Glance is using
        # by looking at the parent image.  If using authx, write access should
        # be enabled on that pool for the Nova user
        parent_pool = self._get_parent_pool(context, base_image_id, fsid)

        # Snapshot the disk and clone it into Glance's storage pool.  librbd
        # requires that snapshots be set to "protected" in order to clone them
        self.driver.create_snap(self.rbd_name, snapshot_name, protect=True)
        location = {'url': 'rbd://%(fsid)s/%(pool)s/%(image)s/%(snap)s' %
                           dict(fsid=fsid,
                                pool=self.pool,
                                image=self.rbd_name,
                                snap=snapshot_name)}
        try:
            self.driver.clone(location, image_id, dest_pool=parent_pool)
            # Flatten the image, which detaches it from the source snapshot
            self.driver.flatten(image_id, pool=parent_pool)
        finally:
            # all done with the source snapshot, clean it up
            self.cleanup_direct_snapshot(location)

        # Glance makes a protected snapshot called 'snap' on uploaded
        # images and hands it out, so we'll do that too.  The name of
        # the snapshot doesn't really matter, this just uses what the
        # glance-store rbd backend sets (which is not configurable).
        self.driver.create_snap(image_id, 'snap', pool=parent_pool,
                                protect=True)
        return ('rbd://%(fsid)s/%(pool)s/%(image)s/snap' %
                dict(fsid=fsid, pool=parent_pool, image=image_id))

可以看出经过了创建临时snapshot(还在nova系统盘的pool)、在glance pool中clone snapshot出新rbd卷(跨pool clone卷)、flatten(clone的新卷与snapshot解除关联)、删除临时快照(清理临时资源)、glance pool中的rbd image创建snapshot,此snapshot就是生成的云主机(虚拟机)系统盘的快照(新的镜像,或者叫自定义镜像、捕获镜像、镜像模板等,总之就是nova image-create生成的东西,可以用glance image-list看到),也就是说glance中的image(不管是管理员上传的image还是nova image-create制作的image,都是snap)对应的是rbd里面的一个snap而不是实际的卷,这样创建新的云主机(虚拟机)的时候,系统盘直接从snap clone一个rbd卷就好了,由于支持COW,因此实际clone过程中数据copy量极少、创建系统盘卷速度特别快(这也是glance镜像在有云主机使用的情况下不能删除的原因)。

rbd snapshot的原理可以参考前同事的一篇文章:http://www.sysnote.org/2016/02/28/ceph-rbd-snap/

direct+live snapshot场景下,创建临时snapshot过程中,由于云主机一直运行中,因此可能有部分数据还在内存的磁盘缓存中,没有刷新到磁盘,所以还是有一定概率导致制作的系统盘快照是损坏的。

上面是ceph后端的流程,本地存储后端的snapshot流程可参考之前的文章:Mitaka Nova在线快照数据丢失问题及解决方法

nova这边其实还有一种需要跟cinder(ceph)交互的功能,boot-from-volume,从卷启动云主机(虚拟机),这种情况下cinder list里面看到的volume是bootable的,不过这种功能在ceph后端场景下不常用,就不介绍了。

接下来是cinder部分,涉及的命令行应该有create、backup-create、snapshot-create这几个(还有没有其它的不确定,估计应该没了):

[root@vs2-compute-84 ~]# cinder help create
usage: cinder create [--consisgroup-id <consistencygroup-id>]
                     [--snapshot-id <snapshot-id>]
                     [--source-volid <source-volid>]
                     [--source-replica <source-replica>]
                     [--image-id <image-id>] [--image <image>] [--name <name>]
                     [--description <description>]
                     [--volume-type <volume-type>]
                     [--availability-zone <availability-zone>]
                     [--metadata [<key=value> [<key=value> ...]]]
                     [--hint <key=value>] [--allow-multiattach]
                     [<size>]

Creates a volume.

Positional arguments:
  <size>                Size of volume, in GiBs. (Required unless snapshot-id
                        /source-volid is specified).

Optional arguments:
  --consisgroup-id <consistencygroup-id>
                        ID of a consistency group where the new volume belongs
                        to. Default=None.
  --snapshot-id <snapshot-id>
                        Creates volume from snapshot ID. Default=None.
  --source-volid <source-volid>
                        Creates volume from volume ID. Default=None.
  --source-replica <source-replica>
                        Creates volume from replicated volume ID.
                        Default=None.
  --image-id <image-id>
                        Creates volume from image ID. Default=None.
  --image <image>       Creates a volume from image (ID or name).
                        Default=None.
  --name <name>         Volume name. Default=None.
  --description <description>
                        Volume description. Default=None.
  --volume-type <volume-type>
                        Volume type. Default=None.
  --availability-zone <availability-zone>
                        Availability zone for volume. Default=None.
  --metadata [<key=value> [<key=value> ...]]
                        Metadata key and value pairs. Default=None.
  --hint <key=value>    Scheduler hint, like in nova.
  --allow-multiattach   Allow volume to be attached more than once.
                        Default=False
[root@vs2-compute-84 ~]# cinder help backup-create
usage: cinder backup-create [--container <container>] [--name <name>]
                            [--description <description>] [--incremental]
                            [--force] [--snapshot-id <snapshot-id>]
                            <volume>

Creates a volume backup.

Positional arguments:
  <volume>              Name or ID of volume to backup.

Optional arguments:
  --container <container>
                        Backup container name. Default=None.
  --name <name>         Backup name. Default=None.
  --description <description>
                        Backup description. Default=None.
  --incremental         Incremental backup. Default=False.
  --force               Allows or disallows backup of a volume when the volume
                        is attached to an instance. If set to True, backs up
                        the volume whether its status is "available" or "in-
                        use". The backup of an "in-use" volume means your data
                        is crash consistent. Default=False.
  --snapshot-id <snapshot-id>
                        ID of snapshot to backup. Default=None.
[root@vs2-compute-84 ~]# cinder help snapshot-create
usage: cinder snapshot-create [--force [<True|False>]] [--name <name>]
                              [--description <description>]
                              [--metadata [<key=value> [<key=value> ...]]]
                              <volume>

Creates a snapshot.

Positional arguments:
  <volume>              Name or ID of volume to snapshot.

Optional arguments:
  --force [<True|False>]
                        Allows or disallows snapshot of a volume when the
                        volume is attached to an instance. If set to True,
                        ignores the current status of the volume when
                        attempting to snapshot it rather than forcing it to be
                        available. Default=False.
  --name <name>         Snapshot name. Default=None.
  --description <description>
                        Snapshot description. Default=None.
  --metadata [<key=value> [<key=value> ...]]
                        Snapshot metadata key and value pairs. Default=None.

先看create,创建卷,支持多种参数,比如创建裸卷、从snapshot创建卷、从已有的volume创建卷等。

    def execute(self, context, volume_ref, volume_spec):
        ......
        if create_type == 'raw':
            model_update = self._create_raw_volume(volume_ref=volume_ref,
                                                   **volume_spec)
        elif create_type == 'snap':  ## 从snap创建卷
            model_update = self._create_from_snapshot(context,
                                                      volume_ref=volume_ref,
                                                      **volume_spec)
        elif create_type == 'source_vol':  ## 从已有的卷创建新卷
            model_update = self._create_from_source_volume(
                context, volume_ref=volume_ref, **volume_spec)
        elif create_type == 'source_replica':
            model_update = self._create_from_source_replica(
                context, volume_ref=volume_ref, **volume_spec)
        elif create_type == 'image':
            model_update = self._create_from_image(context,
                                                   volume_ref=volume_ref,
                                                   **volume_spec)
        else:
            raise exception.VolumeTypeNotFound(volume_type_id=create_type)

        ......
        return volume_ref

上面忽略了很多taskflow,直接到了cinder.volume.flows.manager.create_volume.CreateVolumeFromSpecTask#execute,cinder里面用到的taskflow一般都是linear类型的,顺序执行,只要一个一个看过去就行了,一般都包含一个参数解析的task,如cinder.volume.flows.manager.create_volume.ExtractVolumeSpecTask,解析出来的参数传递给下一个task使用,最后run起来,正常执行execute,有异常的话就执行revert方法。关于OpenStack的taskflow介绍:https://docs.openstack.org/taskflow/latest/

创建卷的api文档(v2版本,v3也类似):https://developer.openstack.org/api-ref/block-storage/v2/index.html#create-volume

跟snapshot相关的主要是_create_from_snapshot和_create_from_source_volume,先看第一个:

    def _create_from_snapshot(self, context, volume_ref, snapshot_id,
                              **kwargs):
        volume_id = volume_ref['id']
        snapshot = objects.Snapshot.get_by_id(context, snapshot_id)
        model_update = self.driver.create_volume_from_snapshot(volume_ref,
                                                               snapshot)
        ......
    def create_volume_from_snapshot(self, volume, snapshot):
        """Creates a volume from a snapshot."""
        self._clone(volume, self.configuration.rbd_pool,
                    snapshot.volume_name, snapshot.name)
        if self.configuration.rbd_flatten_volume_from_snapshot:
            self._flatten(self.configuration.rbd_pool, volume.name)
        if int(volume.size):
            self._resize(volume)

    def _flatten(self, pool, volume_name):
        LOG.debug('flattening %(pool)s/%(img)s',
                  dict(pool=pool, img=volume_name))
        with RBDVolumeProxy(self, volume_name, pool) as vol:
            vol.flatten()

    def _clone(self, volume, src_pool, src_image, src_snap):
        LOG.debug('cloning %(pool)s/%(img)s@%(snap)s to %(dst)s',
                  dict(pool=src_pool, img=src_image, snap=src_snap,
                       dst=volume.name))

        chunk_size = self.configuration.rbd_store_chunk_size * units.Mi
        order = int(math.log(chunk_size, 2))

        with RADOSClient(self, src_pool) as src_client:
            with RADOSClient(self) as dest_client:
                self.RBDProxy().clone(src_client.ioctx,
                                      utils.convert_str(src_image),
                                      utils.convert_str(src_snap),
                                      dest_client.ioctx,
                                      utils.convert_str(volume.name),
                                      features=src_client.features,
                                      order=order)

    def _resize(self, volume, **kwargs):
        size = kwargs.get('size', None)
        if not size:
            size = int(volume.size) * units.Gi

        with RBDVolumeProxy(self, volume.name) as vol:
            vol.resize(size)

共3步,从snapshot clone新卷、flatten、resize,后面两步不是必须步骤。配置项rbd_flatten_volume_from_snapshot,

    cfg.BoolOpt('rbd_flatten_volume_from_snapshot',
                default=False,
                help='Flatten volumes created from snapshots to remove '
                     'dependency from volume to snapshot'),

从snapshot创建卷的时候是否flatten,默认是False,不flatten。

再看_create_from_source_volume,它调用的是create_cloned_volume,

    def create_cloned_volume(self, volume, src_vref):
        """Create a cloned volume from another volume.

        Since we are cloning from a volume and not a snapshot, we must first
        create a snapshot of the source volume.

        The user has the option to limit how long a volume's clone chain can be
        by setting rbd_max_clone_depth. If a clone is made of another clone
        and that clone has rbd_max_clone_depth clones behind it, the source
        volume will be flattened.
        """
        src_name = utils.convert_str(src_vref.name)
        dest_name = utils.convert_str(volume.name)
        flatten_parent = False

        # Do full copy if requested
        if self.configuration.rbd_max_clone_depth <= 0:
            with RBDVolumeProxy(self, src_name, read_only=True) as vol:
                vol.copy(vol.ioctx, dest_name)

            return

        # Otherwise do COW clone.
        with RADOSClient(self) as client:
            depth = self._get_clone_depth(client, src_name)
            # If source volume is a clone and rbd_max_clone_depth reached,
            # flatten the source before cloning. Zero rbd_max_clone_depth means
            # infinite is allowed.
            if depth == self.configuration.rbd_max_clone_depth:
                LOG.debug("maximum clone depth (%d) has been reached - "
                          "flattening source volume",
                          self.configuration.rbd_max_clone_depth)
                flatten_parent = True

            src_volume = self.rbd.Image(client.ioctx, src_name)
            try:
                # First flatten source volume if required.
                if flatten_parent:
                    _pool, parent, snap = self._get_clone_info(src_volume,
                                                               src_name)
                    # Flatten source volume
                    LOG.debug("flattening source volume %s", src_name)
                    src_volume.flatten()
                    # Delete parent clone snap
                    parent_volume = self.rbd.Image(client.ioctx, parent)
                    try:
                        parent_volume.unprotect_snap(snap)
                        parent_volume.remove_snap(snap)
                    finally:
                        parent_volume.close()

                # Create new snapshot of source volume
                clone_snap = "%s.clone_snap" % dest_name
                LOG.debug("creating snapshot='%s'", clone_snap)
                src_volume.create_snap(clone_snap)
                src_volume.protect_snap(clone_snap)
            except Exception:
                # Only close if exception since we still need it.
                src_volume.close()
                raise

            # Now clone source volume snapshot
            try:
                LOG.debug("cloning '%(src_vol)s@%(src_snap)s' to "
                          "'%(dest)s'",
                          {'src_vol': src_name, 'src_snap': clone_snap,
                           'dest': dest_name})
                self.RBDProxy().clone(client.ioctx, src_name, clone_snap,
                                      client.ioctx, dest_name,
                                      features=client.features)
            except Exception:
                src_volume.unprotect_snap(clone_snap)
                src_volume.remove_snap(clone_snap)
                raise
            finally:
                src_volume.close()

        if volume.size != src_vref.size:
            LOG.debug("resize volume '%(dst_vol)s' from %(src_size)d to "
                      "%(dst_size)d",
                      {'dst_vol': volume.name, 'src_size': src_vref.size,
                       'dst_size': volume.size})
            self._resize(volume)

        LOG.debug("clone created successfully")

这个流程比较多,毕竟要先做一个snapshot,然后再clone新卷,相当于包含了从snapshot创建卷的流程。配置项rbd_max_clone_depth,

    cfg.IntOpt('rbd_max_clone_depth',
               default=5,
               help='Maximum number of nested volume clones that are '
                    'taken before a flatten occurs. Set to 0 to disable '
                    'cloning.'),

默认最大clone深度是5层,达到5层就flatten。

再看下backup操作(其实这个操作跟rbd snapshot没啥大关系),cinder.backup.drivers.ceph.CephBackupDriver#_backup_rbd这里是最终执行的方法,就不具体分析了,主要是有个增量备份过程:

    def _rbd_diff_transfer(self, src_name, src_pool, dest_name, dest_pool,
                           src_user, src_conf, dest_user, dest_conf,
                           src_snap=None, from_snap=None):
        """Copy only extents changed between two points.

        If no snapshot is provided, the diff extents will be all those changed
        since the rbd volume/base was created, otherwise it will be those
        changed since the snapshot was created.
        """
        LOG.debug("Performing differential transfer from '%(src)s' to "
                  "'%(dest)s'",
                  {'src': src_name, 'dest': dest_name})

        # NOTE(dosaboy): Need to be tolerant of clusters/clients that do
        # not support these operations since at the time of writing they
        # were very new.

        src_ceph_args = self._ceph_args(src_user, src_conf, pool=src_pool)
        dest_ceph_args = self._ceph_args(dest_user, dest_conf, pool=dest_pool)

        cmd1 = ['rbd', 'export-diff'] + src_ceph_args
        if from_snap is not None:
            cmd1.extend(['--from-snap', from_snap])
        if src_snap:
            path = utils.convert_str("%s/%s@%s"
                                     % (src_pool, src_name, src_snap))
        else:
            path = utils.convert_str("%s/%s" % (src_pool, src_name))
        cmd1.extend([path, '-'])

        cmd2 = ['rbd', 'import-diff'] + dest_ceph_args
        rbd_path = utils.convert_str("%s/%s" % (dest_pool, dest_name))
        cmd2.extend(['-', rbd_path])

        ret, stderr = self._piped_execute(cmd1, cmd2)
        if ret:
            msg = (_("RBD diff op failed - (ret=%(ret)s stderr=%(stderr)s)") %
                   {'ret': ret, 'stderr': stderr})
            LOG.info(msg)
            raise exception.BackupRBDOperationFailed(msg)

先用’rbd export-diff’导出增量部分,再用’rbd import-diff’导入。参考:https://ceph.com/geen-categorie/incremental-snapshots-with-rbd/

第一次备份的话是走全量备份过程:

    def _full_backup(self, backup_id, volume_id, src_volume, src_name, length):
        """Perform a full backup of src volume.

        First creates a base backup image in our backup location then performs
        an chunked copy of all data from source volume to a new backup rbd
        image.
        """
        backup_name = self._get_backup_base_name(volume_id, backup_id)

        with rbd_driver.RADOSClient(self, self._ceph_backup_pool) as client:
            # First create base backup image
            old_format, features = self._get_rbd_support()
            LOG.debug("Creating backup base image='%(name)s' for volume "
                      "%(volume)s.",
                      {'name': backup_name, 'volume': volume_id})
            self.rbd.RBD().create(ioctx=client.ioctx,
                                  name=backup_name,
                                  size=length,
                                  old_format=old_format,
                                  features=features,
                                  stripe_unit=self.rbd_stripe_unit,
                                  stripe_count=self.rbd_stripe_count)

            LOG.debug("Copying data from volume %s.", volume_id)
            dest_rbd = self.rbd.Image(client.ioctx, backup_name)
            try:
                rbd_meta = rbd_driver.RBDImageMetadata(dest_rbd,
                                                       self._ceph_backup_pool,
                                                       self._ceph_backup_user,
                                                       self._ceph_backup_conf)
                rbd_fd = rbd_driver.RBDImageIOWrapper(rbd_meta)
                self._transfer_data(src_volume, src_name, rbd_fd, backup_name,
                                    length)
            finally:
                dest_rbd.close()

 

最后看下snapshot create,这个流程跟create创建卷流程类似,直接看最终rbd调用就行了:

    def create_snapshot(self, snapshot):
        """Creates an rbd snapshot."""
        with RBDVolumeProxy(self, snapshot.volume_name) as volume:
            snap = utils.convert_str(snapshot.name)
            volume.create_snap(snap)
            volume.protect_snap(snap)

非常简单,就创建snap,然后protect。

 

 

写给儿子们<18>

今天是一个特别的日子,哥哥幼儿园毕业典礼,弟弟一周岁生日

今天是一个不特别的日子,妈妈不在家,弟弟也不在

本应该一家人开心幸福的参加哥哥的毕业典礼,然后再给弟弟过生日,可惜并没有。。。

 

从某种程度上说,是爸爸自己亲手造成这一切后果的,爸爸内心充满愧疚和自责

曾经是爸爸最不愿发生的事情却实实在在的发生了,弟弟成了留守儿童。。。

妈妈说了很多次,哭着说爸爸太狠心,怎么舍得让弟弟一个人留在老家,即如此为何当初还要生下他?爸爸当时还嘴硬,还狡辩,还强词夺理,说弟弟还小,还不懂事,在家呆一段时间没啥问题,过段时间还会回来的

其实爸爸也很难受,但爸爸还能说什么?让弟弟回来谁来带?妈妈不在家,爷爷连哥哥的饭都做不好(他真的已经尽力了),而爸爸又经常加班。。。

一年来,工作上的困难爸爸都预料到了,实际上虽说很多困难、问题,也都一一化解或规避了,虽说进展不很大,但也比预估的不差太多

一年前决定创业的时候,爸爸想法很简单,只要一家人在一起,没什么好担心的

可是现在,一家人不在一起了,爸爸就不能不担心了

但生活上或者家庭方面的问题,已经完全偏离了爸爸的预期,爸爸感觉生活已经有点失控了

可以说,爸爸与奋斗的目标已经南辕北辙,妈妈、哥哥、弟弟过上好日子是爸爸奋斗的目标和动力来源

最近爸爸的耳边时不时的会想起妈妈哭着对爸爸说的那个词:“妻离子散”

妈妈去外地工作,弟弟回了老家,只有哥哥和爸爸还在家里

是的,爸爸目前的生活状态用这个词来形容真的是很贴切,多么可悲。。。

每当想到这个词,爸爸都有一种生无可恋的挫败感,这么努力的工作,却导致这种下场,所有的努力不但化作泡影,反而还结出了恶果,可悲可叹可笑。。。

爸爸最近也在思考,为啥会出现这种状况?爸爸究竟错在哪里?

想到的一个原因,可能是妈妈和爸爸的心态发生了变化,当爸爸决定出来创业的时候,已经想好了要过苦日子,而妈妈则感觉到咱们家的经济基础已经不稳固了,需要她也努力工作,以防家里没有收入来源,所以她去了外地工作,因为那是她看起来最好、最有前途的选择了

如果爸爸工作稳定,爸爸想妈妈可能还不会下这么大的决心把弟弟放老家,离开哥哥和爸爸去外地工作

因此这一切的问题根源,都可能在爸爸去年的那个出来创业决定

而现在,爸爸后悔了

爸爸不是后悔在工作不稳定、公司发展不如意、收入下降等等问题上

而是在家庭问题上,爸爸真的感觉后悔了。。。

爸爸想恢复到一年前的状态,也决定要这么做

钱,爸爸需要,一家人在一起快乐生活,爸爸更需要

而这一年来,你们妈妈流的泪,比之前10年都要多

而这一切的根源,都在爸爸

爸爸还有什么好说的呢?无话可说。。。

所有人的眼光爸爸都不在乎,而妈妈的眼泪,弟弟的哭声,哥哥想妈妈想弟弟的梦呓,爸爸在乎、爸爸不能不在乎、爸爸必须在乎

爸爸一直希望,替你们走的路,不会变成给你们挖的坑

钱,我们赚不来,也不赚了,只希望我们一家人永远不分开,这比什么都重要

希望一切都还来得及,一切都还回的来