Ceph monmap消息编解码过程




本文源码基于luminous-12.2.5版本分析

最近在分析H版本librbd client无法兼容L版本Ceph集群的问题,提前说明下最终结论是H版本被之前的同时修改过才导致的不兼容,官方版本应该是兼容的。这篇文章正是问题分析过程的一次整理总结。

需求描述

需求是想要在同一个OpenStack环境中同时使用H、L版本两个Ceph集群做云主机系统盘、云硬盘存储系统,因此计算节点(也即librbd client节点)就可能分为H、L两个版本,众所周知,升级librbd动态库,需要重启云主机才能生效(除非对librbd做过较大的改动),重启所有云主机这个操作用户显然是不能接受的(当然也可以执行热迁移操作,但大批量的热迁移也存在耗时长、风险大的问题,况且还有部分云主机不支持热迁移),因此如果要上线L版本集群就必须做到兼容H版本client,这样才能保证L版本集群的卷能被当前正使用H版本client的云主机挂载、读写。H版本是之前遗留的,不能升级到L版本(主要考虑2个问题,1是风险高,二是代价大。实际上还有一个小问题就是H版本client不兼容L版本server,这个正是我本次需要解决的问题)。

问题描述

有两个问题:
1. 不修改server的crush参数情况下,客户端执行ceph -s、rbd ls等命令报错:
2018-10-12 15:37:50.117973 7f58f4298700  0 -- 10.182.2.32:0/3576177365  10.185.0.97:6789/0 pipe(0x7f58e0000c80 sd=3 :34392 s=1 pgs=0 cs=0 l=1 c=0x7f58e0005160).connect protocol feature mismatch, my 883ffffffffffff  peer c81dff8eea4fffb missing 400000000000000
2. 通过ceph osd crush tunables hammer命令修改crush参数之后,执行命令直接抛异常,原因是decode monmap的created字段时发生越界异常,位置是:
  void buffer::list::iterator::copy(unsigned len, char *dest)
  {
    if (p == ls->end()) seek(off);
    while (len > 0) {
      if (p == ls->end())
	throw end_of_buffer(); // 这里抛异常
      assert(p->length() > 0); 
      
      unsigned howmuch = p->length() - p_off;
      if (len < howmuch) howmuch = len;
      p->copy_out(p_off, howmuch, dest);
      dest += howmuch;

      len -= howmuch;
      advance(howmuch);
    }
  }

本次重点分析第二个问题,bt查看调用栈(调试的是rbd ls命令):

Thread 6 "rbd" hit Breakpoint 4, ceph::buffer::list::iterator::copy (this=0x7fffee122800, len=4, dest=0x7fffee1225dc "I]\026\022") at common/buffer.cc:936
936         if (p == ls->end()) seek(off);
(gdb) bt
#0  ceph::buffer::list::iterator::copy (this=0x7fffee122800, len=4, dest=0x7fffee1225dc "I]\026\022") at common/buffer.cc:936
#1  0x00007ffff2824284 in decode_raw<ceph_le32> (t=..., p=...) at ./include/encoding.h:57
#2  0x00007ffff281c2da in decode (v=@0x555557d350a0: 0, p=...) at ./include/encoding.h:103
#3  0x00007ffff281d023 in utime_t::decode (this=0x555557d350a0, p=...) at ./include/utime.h:103
#4  0x00007ffff281d060 in decode (c=..., p=...) at ./include/utime.h:312
#5  0x00007ffff2a1fd49 in MonMap::decode (this=0x555557d35050, p=...) at mon/MonMap.cc:74
#6  0x00007ffff2a151fd in decode (c=..., p=...) at ./mon/MonMap.h:240
#7  0x00007ffff2a0c050 in MonClient::handle_monmap (this=0x555557d35040, m=0x7fffd8000bd0) at mon/MonClient.cc:317
#8  0x00007ffff2a0bd09 in MonClient::ms_dispatch (this=0x555557d35040, m=0x7fffd8000bd0) at mon/MonClient.cc:274
#9  0x00007ffff2b9c9db in Messenger::ms_deliver_dispatch (this=0x555557d35a80, m=0x7fffd8000bd0) at ./msg/Messenger.h:582
#10 0x00007ffff2b9be57 in DispatchQueue::entry (this=0x555557d35c70) at msg/simple/DispatchQueue.cc:185
#11 0x00007ffff2bcd560 in DispatchQueue::DispatchThread::entry (this=0x555557d35e10) at msg/simple/DispatchQueue.h:103
#12 0x00007ffff296608c in Thread::entry_wrapper (this=0x555557d35e10) at common/Thread.cc:61
#13 0x00007ffff2965ffe in Thread::_entry_func (arg=0x555557d35e10) at common/Thread.cc:45
#14 0x00007ffff2335494 in start_thread (arg=0x7fffee123700) at pthread_create.c:333
#15 0x00007ffff0c45acf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:97

是在接收到服务端monmap消息后decode过程中抛的异常,具体是在decode created字段出错的。

void MonMap::decode(bufferlist::iterator &p)
{
  DECODE_START_LEGACY_COMPAT_LEN_16(3, 3, 3, p);
  ::decode_raw(fsid, p);
  ::decode(epoch, p); // 这里正常decode
  if (struct_v == 1) {
    vector<entity_inst_t> mon_inst;
    ::decode(mon_inst, p);
    for (unsigned i = 0; i < mon_inst.size(); i++) {
      char n[2];
      n[0] = '0' + i;
      n[1] = 0;
      string name = n;
      mon_addr[name] = mon_inst[i].addr;
    }
  } else {
    ::decode(mon_addr, p); // 这里开始出错
  }
  ::decode(last_changed, p);
  ::decode(created, p); // 这里抛的异常
  DECODE_FINISH(p);
  calc_ranks();
}

单步调试发现,在此之前last_changed字段decode出来就是空的,时间都是0,因此怀疑是更早之前就发生错误了,调试后发现最初的错误是在decode mon_addr时就发生了。

通过调试服务端ceph-mon encode过程,发现mon_addr和last_changed、created字段都是被添加到monmap消息体内的,并且通过调试L版本client,这几个字段也都能正常decode出来,因此确认是客户端问题或者服务端发送的monmap消息客户端不兼容。但看了下L版本服务端encode代码,当前版本是v5,并且最低兼容v3版本,而H版本decode是v3版本,也就是说社区代码在设计上兼容性应该是没问题的。因此需要继续分析问题出在哪里。

仔细对比了mon_addr的decode和encode流程(主要是H版本和L版本的差异),发现L版本encode过程检查连接的features,并走了不同的encode过程。因此问题聚焦在了客户端和服务端协商features过程上。

  void encode(bufferlist& bl, uint64_t features) const {
    if ((features & CEPH_FEATURE_MSG_ADDR2) == 0) { // 检查features 59标记位
      ::encode((__u32)0, bl); // 老版本编码流程
      ::encode(nonce, bl);
      sockaddr_storage ss = get_sockaddr_storage();
#if defined(__linux__) || defined(DARWIN) || defined(__FreeBSD__)
      ::encode(ss, bl);
#else
      ceph_sockaddr_storage wireaddr;
      ::memset(&wireaddr, '\0', sizeof(wireaddr));
      unsigned copysize = MIN(sizeof(wireaddr), sizeof(ss));
      // ceph_sockaddr_storage is in host byte order
      ::memcpy(&wireaddr, &ss, copysize);
      ::encode(wireaddr, bl);
#endif
      return;
    }
    ::encode((__u8)1, bl); // 新版本编码流程
    ENCODE_START(1, 1, bl);
    ::encode(type, bl);
    ::encode(nonce, bl);
    __u32 elen = get_sockaddr_len();
    ::encode(elen, bl);
    if (elen) {
      bl.append((char*)get_sockaddr(), elen);
    }
    ENCODE_FINISH(bl);
  }

也就是features的第59位是怎么设置上的(1<<59),看了下H版本的代码,发现了这个feature:

#define CEPH_FEATURE_MON_METADATA (1ULL<<50)
/* ... */
#define CEPH_FEATURE_HAMMER_0_94_4 (1ULL<<55)

#define CEPH_OSD_PARTIAL_RECOVERY  (1ULL<<59) /* recover partial extents for objects */

#define CEPH_FEATURE_RESERVED2 (1ULL<<61)  /* slow down, we are almost out... */
#define CEPH_FEATURE_RESERVED  (1ULL<<62)  /* DO NOT USE THIS ... last bit! */
#define CEPH_FEATURE_RESERVED_BROKEN  (1ULL<<63)  /* DO NOT USE THIS; see below */

按道理在CEPH_FEATURE_HAMMER_0_94_4(1<<55)之后应该不会再有其他feature了才对,但这个CEPH_OSD_PARTIAL_RECOVERY为啥被加入了?去看了下官方代码,发现没有这个feature,然后git log看了下我们的代码仓库,果然是我们自己人加上的。至此问题原因已经清晰。

L版本中用CEPH_FEATURE_MSG_ADDR2(1<<59)来标记是否为新版本消息,这一位是0表示是老版本(未设置该bit表示老版本),但是H版本里面正好用到了这一位CEPH_OSD_PARTIAL_RECOVERY(1<<59),导致L版本服务端检测失败,误认为是新版本客户端,使用了新的编码方式对monmap中的monitor地址信息进行编码,导致客户端无法解码。

改动方案:

由于client端升级困难,因此对L版本服务端进行修改,使其兼容H版本client,增加对其他bit位的检查,当发现client是H版本后,就走老的encode流程,修改之后客户端可以正常使用(包括基本命令行和qemu启动云主机)。

 

此次问题分析的难点主要是:

  1. 不熟悉ceph源码,尤其是消息编解码过程
  2. 编译的第一个环境没有去掉编译优化(-O2),增加debug等级(-g),改成(-O0 -g3 -gdwarf-4)修改之后调试起来就很方便了,https://www-zeuthen.desy.de/unix/unixguide/infohtml/gdb/Inline-Functions.html
  3. 确定服务端是否把monitor地址信息编码到消息体过程被wireshark误导了一段时间

使用tcpdump+wireshark解析Ceph网络包

记得之前在通读docs.ceph.com上的文档时(http://docs.ceph.com/docs/luminous/dev/wireshark/),有提到过wireshark支持对Ceph网络数据包进行解析,于是试着用tcpdump抓了monitor和client的数据包,然后导入到wireshark进行解析,对H版本来说确实好用,但对L版本来说,monmap解析貌似支持不太好,monitor address地址解析不出来,估计也是只支持低版本的编码协议。参考:https://www.wireshark.org/docs/dfref/c/ceph.html

tcpdump命令(在client节点执行数据包比较少,服务端对接的客户端比较多因此数据包也比较多,当然也可以加入更多的过滤条件来精确抓包):tcpdump -i eth0 host 192.168.0.2 and port 6789 -w ceph.cap

之后把抓到的ceph.cap数据导入到windows系统的wireshark软件中,我下载的是2.6.2版本(官网有2.6.4版本,但是下载困难,就在国内找了软件站下载了2.6.2的)。导入之后就可以自动分析出结果了。

L版本抓包结果(monmap包解析不太好,认为包有问题“malformed packet”,monitor地址信息解析失败,跟我遇到的问题一样,因此误导了我2天,我一直认为服务端就是没有把地址编码进来,客户端才解析不出来,最后在服务端和客户端分别单步调试才发现是编码进去了的):

 

H版本数据包解析就比较完美了:

Mon Map:

OSD Map:

 

Mon Map编解码过程分析

关键数据结构都定义在src\include\buffer.h中,主要包括:

  • buffer::ptr
    •  _raw:保存实际的编码消息数据
    • buffer::ptr::iterator :遍历ptr的迭代器类,提供各种函数用来寻址ptr中的数据
  • buffer::list
    • _buffers:保存ptr的list,也即保存多条编码数据
    • append_buffer:ptr类型,4K对齐,保存编码数据,,其append操作实际是append到ptr._raw,之后会把它push_back到_buffers中,push_back之前会通过ptr构造函数填充ptr的_raw、_off、_len等数据,也即把_raw中保存的数据的偏移量和长度也保存起来,解码时使用
    • buffer::list::iterator:遍历_buffers的迭代器,继承自buffer::list::iterator_impl,主要是对外提供接口,对内封装了iterator_impl的相关接口
    • buffer::list::iterator_impl:遍历_buffers的迭代器实际实现类,主要实现有advance、seek、copy等函数,用来从_buffers里取出数据,advance函数用来在copy函数执行过程中进行ptr内或_buffers的多个ptr前后跳转,有两种场景,一中是跳转还在当前ptr内(ptr内取数据),另外一种是_buffers list中一个ptr已经copy完(跨ptr取数据),需要跳转到下一个ptr对象继续copy。seek函数也是利用advance函数完成数据寻址操作。

以L版本代码为例,编解码代码如下:

void MonMap::encode(bufferlist& blist, uint64_t con_features) const
{
  /* we keep the mon_addr map when encoding to ensure compatibility
   * with clients and other monitors that do not yet support the 'mons'
   * map. This map keeps its original behavior, containing a mapping of
   * monitor id (i.e., 'foo' in 'mon.foo') to the monitor's public
   * address -- which is obtained from the public address of each entry
   * in the 'mons' map.
   */ // mon_addr是为了兼容老版本,新版本直接用mon_info了
  map<string,entity_addr_t> mon_addr;
  for (map<string,mon_info_t>::const_iterator p = mon_info.begin();
       p != mon_info.end();
       ++p) {
    mon_addr[p->first] = p->second.public_addr;
  }

  // con_features是client和server建立connection时协商的,也就是二者支持的feature进行&操作
  if ((con_features & CEPH_FEATURE_MONNAMES) == 0) {
    __u16 v = 1; // v1版本编码过程
    ::encode(v, blist);
    ::encode_raw(fsid, blist);
    ::encode(epoch, blist);
    vector<entity_inst_t> mon_inst(mon_addr.size());
    for (unsigned n = 0; n < mon_addr.size(); n++)
      mon_inst[n] = get_inst(n);
    ::encode(mon_inst, blist, con_features);
    ::encode(last_changed, blist);
    ::encode(created, blist);
    return;
  }

  if ((con_features & CEPH_FEATURE_MONENC) == 0) {
    __u16 v = 2; // v2版本编码过程
    ::encode(v, blist);
    ::encode_raw(fsid, blist);
    ::encode(epoch, blist);
    ::encode(mon_addr, blist, con_features);
    ::encode(last_changed, blist);
    ::encode(created, blist);
  }

  ENCODE_START(5, 3, blist);  // v5版本编码过程,最低兼容版本是v3,blist是编码输出缓冲区
  ::encode_raw(fsid, blist);  // 编码fsid
  ::encode(epoch, blist);  // 编码epoch
  ::encode(mon_addr, blist, con_features);  // 编码mon_addr,注意con_features参数
  ::encode(last_changed, blist);  // 编码last_changed时间
  ::encode(created, blist);  // 编码created时间
  ::encode(persistent_features, blist);  // v5版本新增字段,v3版本decode时忽略,下同
  ::encode(optional_features, blist);
  // this superseeds 'mon_addr'
  ::encode(mon_info, blist, con_features);
  ENCODE_FINISH(blist);  // 结束编码
}

void MonMap::decode(bufferlist::iterator &p)
{ // 解码过程是编码过程的反向操作,也是按编码顺序来的
  map<string,entity_addr_t> mon_addr;
  DECODE_START_LEGACY_COMPAT_LEN_16(5, 3, 3, p);
  ::decode_raw(fsid, p);
  ::decode(epoch, p);
  if (struct_v == 1) {
    vector<entity_inst_t> mon_inst;
    ::decode(mon_inst, p);
    for (unsigned i = 0; i < mon_inst.size(); i++) {
      char n[2];
      n[0] = '0' + i;
      n[1] = 0;
      string name = n;
      mon_addr[name] = mon_inst[i].addr;
    }
  } else {
    ::decode(mon_addr, p);
  }
  ::decode(last_changed, p);
  ::decode(created, p);
  if (struct_v >= 4) {
    ::decode(persistent_features, p);
    ::decode(optional_features, p);
  }
  if (struct_v >= 5) {
    ::decode(mon_info, p);
  } else {
    // we may be decoding to an existing monmap; if we do not
    // clear the mon_info map now, we will likely incur in problems
    // later on MonMap::sanitize_mons()
    mon_info.clear();
  }
  DECODE_FINISH(p);
  sanitize_mons(mon_addr);
  calc_ranks();
}

encode调用栈:

Thread 15 "ms_dispatch" hit Breakpoint 1, MonMap::encode (this=0x7fff39218470, blist=..., con_features=2305244844532236283)
    at /mnt/ceph-Luminous/src/mon/MonMap.cc:138
138     {
(gdb) bt
#0  MonMap::encode (this=0x7fff39218470, blist=..., con_features=2305244844532236283) at /mnt/ceph-Luminous/src/mon/MonMap.cc:138
#1  0x00005560771f483f in Monitor::send_latest_monmap (this=0x556081f94800, con=0x556082b50800) at /mnt/ceph-Luminous/src/mon/Monitor.cc:5116
#2  0x00005560772c04a5 in AuthMonitor::prep_auth (this=0x556081fbb800, op=..., paxos_writable=false) at /mnt/ceph-Luminous/src/mon/AuthMonitor.cc:482
#3  0x00005560772be9d0 in AuthMonitor::preprocess_query (this=0x556081fbb800, op=...) at /mnt/ceph-Luminous/src/mon/AuthMonitor.cc:292
#4  0x000055607741e850 in PaxosService::dispatch (this=0x556081fbb800, op=...) at /mnt/ceph-Luminous/src/mon/PaxosService.cc:74
#5  0x00005560771ebf54 in Monitor::dispatch_op (this=0x556081f94800, op=...) at /mnt/ceph-Luminous/src/mon/Monitor.cc:4234
#6  0x00005560771ebb3d in Monitor::_ms_dispatch (this=0x556081f94800, m=0x5560828ad900) at /mnt/ceph-Luminous/src/mon/Monitor.cc:4209
#7  0x0000556077228a9e in Monitor::ms_dispatch (this=0x556081f94800, m=0x5560828ad900) at /mnt/ceph-Luminous/src/mon/Monitor.h:899
#8  0x0000556077a3aab9 in Messenger::ms_deliver_dispatch (this=0x556082797000, m=0x5560828ad900) at /mnt/ceph-Luminous/src/msg/Messenger.h:668
#9  0x0000556077a39a8a in DispatchQueue::entry (this=0x556082797180) at /mnt/ceph-Luminous/src/msg/DispatchQueue.cc:197
#10 0x000055607773443e in DispatchQueue::DispatchThread::entry (this=0x556082797328) at /mnt/ceph-Luminous/src/msg/DispatchQueue.h:101
#11 0x0000556077873079 in Thread::entry_wrapper (this=0x556082797328) at /mnt/ceph-Luminous/src/common/Thread.cc:79
#12 0x0000556077872fae in Thread::_entry_func (arg=0x556082797328) at /mnt/ceph-Luminous/src/common/Thread.cc:59
#13 0x00007fee9dfe1494 in start_thread (arg=0x7fee90a88700) at pthread_create.c:333
#14 0x00007fee9a0ccacf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:97

decode调用栈:

// ceph消息decode过程(L版本)
Thread 9 "ms_dispatch" hit Breakpoint 3, ceph::buffer::ptr::copy_out (this=0x7fffdc002e20, o=82, l=8, dest=0x555555e6a464 "\vK\311[)\335Q\032")
    at /mnt/ceph/src/common/buffer.cc:1035
1035        assert(_raw);
(gdb) bt
#0  ceph::buffer::ptr::copy_out (this=0x7fffdc002e20, o=82, l=8, dest=0x555555e6a464 "\vK\311[)\335Q\032") at /mnt/ceph/src/common/buffer.cc:1035
#1  0x00007ffff72eb1d6 in ceph::buffer::list::iterator_impl<false>::copy (this=0x7fffe50f9e10, len=8, dest=0x555555e6a464 "\vK\311[)\335Q\032")
    at /mnt/ceph/src/common/buffer.cc:1248
#2  0x00007ffff72e14cc in ceph::buffer::list::iterator::copy (this=0x7fffe50f9e10, len=8, dest=0x555555e6a464 "\vK\311[)\335Q\032")
    at /mnt/ceph/src/common/buffer.cc:1434
#3  0x0000555555942ee9 in utime_t::decode (this=0x555555e6a464, p=...) at /mnt/ceph/src/include/utime.h:133
#4  0x0000555555942f0f in decode (c=..., p=...) at /mnt/ceph/src/include/utime.h:390
#5  0x00007fffedca7b88 in MonMap::decode (this=0x555555e6a448, p=...) at /mnt/ceph/src/mon/MonMap.cc:210
#6  0x00007fffedc8f676 in decode (c=..., p=...) at /mnt/ceph/src/mon/MonMap.h:346
#7  0x00007fffedc82a2e in MonClient::handle_monmap (this=0x555555e6a438, m=0x7fffdc000f60) at /mnt/ceph/src/mon/MonClient.cc:331
#8  0x00007fffedc8253b in MonClient::ms_dispatch (this=0x555555e6a438, m=0x7fffdc000f60) at /mnt/ceph/src/mon/MonClient.cc:272
#9  0x00007fffedccfac9 in Messenger::ms_deliver_dispatch (this=0x555555e6b520, m=0x7fffdc000f60) at /mnt/ceph/src/msg/Messenger.h:668
#10 0x00007fffedcce884 in DispatchQueue::entry (this=0x555555e6b6a0) at /mnt/ceph/src/msg/DispatchQueue.cc:197
#11 0x00007fffedea5c54 in DispatchQueue::DispatchThread::entry (this=0x555555e6b848) at /mnt/ceph/src/msg/DispatchQueue.h:101
#12 0x00007fffee04376d in Thread::entry_wrapper (this=0x555555e6b848) at /mnt/ceph/src/common/Thread.cc:79
#13 0x00007fffee0436a2 in Thread::_entry_func (arg=0x555555e6b848) at /mnt/ceph/src/common/Thread.cc:59
#14 0x00007fffeb626494 in start_thread (arg=0x7fffe50fe700) at pthread_create.c:333
#15 0x00007fffea07bacf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:97
(gdb) n
2018-10-19 12:18:12.443495 7fffe7903700 -1 asok(0x555555d8c810) AdminSocket: error writing response length (32) Broken pipe
1036        if (o+l > _len)
(gdb) 
1038        char* src =  _raw->data + _off + o;
(gdb) 
1039        maybe_inline_memcpy(dest, src, l, 8);
(gdb) 
1040      }
(gdb) p *(utime_t*)dest // 可以通过强制类型转换打印出decode的数据
$35 = {tv = {tv_sec = 1534409040, tv_nsec = 303455561}}
(gdb) c
Continuing.

 

接下来要说的是具体的encode和decode流程,以我调试的mon_addr为例:

encode

// ::encode(mon_addr, blist, con_features);调用这个,但没理解为啥是std::map<T,U,Comp,Alloc>& m
// mon_addr定义是map<string,entity_addr_t> mon_addr;
template<class T, class U, class Comp, class Alloc,
	 typename t_traits=denc_traits<T>, typename u_traits=denc_traits<U>>
inline typename std::enable_if<!t_traits::supported ||
				 !u_traits::supported>::type
  encode(const std::map<T,U,Comp,Alloc>& m, bufferlist& bl, uint64_t features)
{
  __u32 n = (__u32)(m.size()); // 获取map长度,也即接下来要编码几个monitor信息
  encode(n, bl);  // 先编码消息体长度,防止解码的时候越界
  for (auto p = m.begin(); p != m.end(); ++p) { // 遍历map,逐项编码,每一项对应一个monitor
    encode(p->first, bl, features); // monitor名称,string类型
    encode(p->second, bl, features); // monitor地址,struct entity_addr_t类型
  }
}
// encode(n, bl)进入实际的编码过程是(n是__u32类型也就是uint32_t)
WRITE_INTTYPE_ENCODER(uint32_t, le32) // 这是个宏定义如下:

// -----------------------------------
// int types

#define WRITE_INTTYPE_ENCODER(type, etype)				\
  inline void encode(type v, bufferlist& bl, uint64_t features=0) {	\
    ceph_##etype e;	// 类型转换,转换为ceph_le(little endian)				                \
    e = v;    // 这里并没有进行大小端转换,都是小端,src/include/byteorder.h:71 -> src/include/byteorder.h:63                                                         \
    encode_raw(e, bl);	// 裸数据编码,-> src/include/encoding.h:69						\
  }									\
  inline void decode(type &v, bufferlist::iterator& p) {		\
    ceph_##etype e;							\
    decode_raw(e, p);							\
    v = e;								\
  }


// --------------------------------------
// base types

template<class T>
inline void encode_raw(const T& t, bufferlist& bl)
{
  bl.append((char*)&t, sizeof(t)); // -> src/common/buffer.cc:1935
}
  void buffer::list::append(const char *data, unsigned len)
  {
    while (len > 0) {
      // put what we can into the existing append_buffer.
      unsigned gap = append_buffer.unused_tail_length();
      if (gap > 0) {
        if (gap > len) gap = len;
    //cout << "append first char is " << data[0] << ", last char is " << data[len-1] << std::endl;
        append_buffer.append(data, gap); // buffer::list::append_buffer,buffer::ptr类型
        append(append_buffer, append_buffer.length() - gap, gap); // -> buffer::list::append	// add segment to the list
        len -= gap;
        data += gap;
      }
      if (len == 0)
        break;  // done!
      
      // make a new append_buffer.  fill out a complete page, factoring in the
      // raw_combined overhead.
      size_t need = ROUND_UP_TO(len, sizeof(size_t)) + sizeof(raw_combined);
      size_t alen = ROUND_UP_TO(need, CEPH_BUFFER_ALLOC_UNIT) -
	sizeof(raw_combined);
      append_buffer = raw_combined::create(alen, 0, get_mempool());
      append_buffer.set_length(0);   // unused, so far.
    }
  }
......
  void buffer::list::append(const ptr& bp, unsigned off, unsigned len)
  {
    assert(len+off <= bp.length());
    if (!_buffers.empty()) {
      ptr &l = _buffers.back();
      if (l.get_raw() == bp.get_raw() &&
	  l.end() == bp.start() + off) {
	// yay contiguous with tail bp!
	l.set_length(l.length()+len);
	_len += len;
	return;
      }
    }
    // add new item to list
    push_back(ptr(bp, off, len)); // -> src/include/buffer.h:768
  }
    void push_back(ptr&& bp) {
      if (bp.length() == 0)
	return;
      _len += bp.length();
      _buffers.push_back(std::move(bp)); // buffer::list::_buffers类型为std::list<ptr>
    }

其他几个字段也是类似过程,就不做分析,差别就是编码的数据类型不一样,比如string、struct等,string属于基础数据类型,encode.h有对应的encode函数,struct数据类型则需要在对应的struct结构体定义其自己的encode函数,如entity_addr_t::encode (src\msg\msg_types.h)。

decode过程也是类似,根据decode的数据类型,找到对应的decode函数,按段解码即可。数据类型是预先定义好的,如decode(mon_addr, p),mon_addr的类型是已知的,因此其decode函数也可以找到(一般都是跟encode放在一起),只不过由于encode.h中有很多的宏,不好找到源码而已,配合gdb单步调试应该容易很多(记得在do_cmake.sh里的cmake命令那行加上-O0 -g3 -gdwarf-4这几个CXX_FLAGS编译选项: -DCMAKE_CXX_FLAGS="-O0 -g3 -gdwarf-4" -DCMAKE_BUILD_TYPE=Debug)。一般是先解码出消息体长度,然后再逐条解码。总之一切过程都是预先定义好的,一旦收到的消息内容与预设的解码方案不匹配,就会导致各种错误。这就是编解码协议存在的原因。(话说这也是我第一次接触编解码协议,看完这些流程还有点小激动)。

encode是把数据存入消息体,decode是从消息体取出数据,最底层的编码解码都是按字节完成的,编码时会强转为char *类型,底层解码也不区分数据类型,由上层使用方负责进行转换,对应的结构体都定义在src\include\buffer.h中,相关的函数实现在src\common\buffer.cc中。

decode的核心是ceph::buffer::ptr::copy_out () (/mnt/ceph/src/common/buffer.cc:1035),最终都是由它把数据从消息体里取出来的。相关的调用栈可以参考上面贴出来的decode调用栈。

下面附上我在调试过程中手绘的协议字段表格:

L版本:

H版本: