ceph osd启动及peering过程中osdmap相关加载和更新流程

本文基于ceph H版本代码,目的是为了调研启动OSD之前vmtouch需要预加载哪些osdmap文件?以及预加载后对peering过程的影响。

OSD启动过程中osdmap加载流程

OSD启动入口是ceph_osd.cc的main函数,他会调用osd->init()进行osd启动前的初始化工作。

int OSD::init()
{
  ......
  int r = store->mount(); // 检查osd目录相关持久化数据,以及文件系统属性等,加载FileStore驱动。
  ......
  r = read_superblock(); // 这个是读取的current/meta目录下的osd_superblock_xxx文件,而不是osd根目录下的superblock文件(这个是在上面的mount函数里读取的)
  /*
  (gdb) p superblock 
  $1 = {cluster_fsid = {uuid = "#\214;EȄI\021\224+\244$\221\002P\277"}, osd_fsid = {
    uuid = "\216+\004F\354)B\033\263\023\320\304\022\220\374", <incomplete sequence \342>}, whoami = 1, current_epoch = 20, oldest_map = 1, newest_map = 20, 
  weight = 0, compat_features = {compat = {mask = 1, names = std::map with 0 elements}, ro_compat = {mask = 1, names = std::map with 0 elements}, incompat = {
      mask = 14335, names = std::map with 12 elements = {[1] = "initial feature set(~v.18)", [2] = "pginfo object", [3] = "object locator", 
        [4] = "last_epoch_clean", [5] = "categories", [6] = "hobjectpool", [7] = "biginfo", [8] = "leveldbinfo", [9] = "leveldblog", [10] = "snapmapper", 
        [12] = "transaction hints", [13] = "pg meta object"}}}, mounted = 12, clean_thru = 20, last_map_marked_full = 0}
  */

  ......
  osdmap = get_map(superblock.current_epoch); // 加载osd down之前保存的最新版本osdmap,具体过程见下面分析
  ......
  // load up pgs (as they previously existed)
  load_pgs(); // 加载OSD上已有的pg,具体见下面分析
  ......
  osd_tp.start(); // 启动osd的peering线程池
  ......
  consume_map(); // 消费osdmap,或者说使用osdmap,具体见下面分析
  ......
  set_state(STATE_BOOTING);  // 设置osd状态为STATE_BOOTING,OSD启动过程中共有STATE_INITIALIZING(默认值)、STATE_BOOTING、STATE_ACTIVE这几个状态阶段
  start_boot(); // 准备启动OSD,具体见下面分析
  ......
}

加载osdmap:

class OSD: {
  ......
  // osd map cache (past osd maps)
  OSDMapRef get_map(epoch_t e) {
    return service.get_map(e);
  }
  ......
}

class OSDService: {
  ......
  OSDMapRef get_map(epoch_t e) {
    OSDMapRef ret(try_get_map(e));
    assert(ret);
    return ret;
  }
  ......
}

OSDMapRef OSDService::try_get_map(epoch_t epoch)
{
  Mutex::Locker l(map_cache_lock);
  OSDMapRef retval = map_cache.lookup(epoch); // 从osdmap缓存查找该版本的map是否存在
  if (retval) {
    dout(30) < < "get_map " << epoch << " -cached" << dendl;
    return retval;
  }

  OSDMap *map = new OSDMap;
  if (epoch > 0) {
    dout(20) < < "get_map " << epoch << " - loading and decoding " << map << dendl;
    bufferlist bl;
    if (!_get_map_bl(epoch, bl)) { // 从osdmap的bufferlist缓存(map_bl_cache)中查找该版本map是否存在,如果不存在则从硬盘上加载,并加入map_bl_cache缓存
      delete map;
      return OSDMapRef();
    }
    map->decode(bl); // 解码bufferlist数据到osdmap
  } else {
    dout(20) < < "get_map " << epoch << " - return initial " << map << dendl;
  }
  return _add_map(map); // 把获取的osdmap加入map_cache缓存
}

上述osdmap加载过程中涉及到两个内存缓存:map_cache和map_bl_cache(还有一个map_bl_inc_cache是保存增量osdmap的bufferlist的缓存),这两个缓存都是基于LRU算法,在OSDService类的构造函数中初始化的,默认的缓存空间大小(缓存项最大数量)是由配置项osd_map_cache_size决定的,其默认值是500,因此在启动过程中缓存的osdmap数量是足够的(根据实际线程环境osdmap变化速度,有运维操作时版本变化量是150左右,osdmap变化数量跟osd状态变化次数强相关,没有操作时基本不变)。

加载OSD上已有的pg:

void OSD::load_pgs()
{
  assert(osd_lock.is_locked());
  dout(0) << "load_pgs" << dendl;
  {
    RWLock::RLocker l(pg_map_lock);
    assert(pg_map.empty());
  }

  vector<coll_t> ls;
  int r = store->list_collections(ls);//遍历current目录下所有文件夹,也即pg
  if (r < 0) {
    derr << "failed to list pgs: " << cpp_strerror(-r) << dendl;
  }
  ......
  for (map<spg_t, interval_set<snapid_t> >::iterator i = pgs.begin(); // pgs是从ls中加载的pg列表
       i != pgs.end();
       ++i) {
    spg_t pgid(i->first);
    ......
    bufferlist bl;
    epoch_t map_epoch = 0;
    int r = PG::peek_map_epoch(store, pgid, &map_epoch, &bl); // 从omap获取pg关联的osdmap版本,可以认为是osd down之前保存的最新osdmap版本
    ......
    PG *pg = NULL;
    if (map_epoch > 0) {
      OSDMapRef pgosdmap = service.try_get_map(map_epoch); // 参考上面的分析过程
      ......
      pg = _open_lock_pg(pgosdmap, pgid);
    } else {
      pg = _open_lock_pg(osdmap, pgid); //打开pg对象并加锁
    }
    ......
    // read pg state, log
    pg->read_state(store, bl); // 从omap中读取pg info和pg log
    ......
    pg->handle_loaded(&rctx); // 使pg状态机进入Reset状态,为进入peering状态做准备
    ......
}

PG *OSD::_open_lock_pg(
  OSDMapRef createmap,
  spg_t pgid, bool no_lockdep_check)
{
  assert(osd_lock.is_locked());

  PG* pg = _make_pg(createmap, pgid);
  {
    RWLock::WLocker l(pg_map_lock);
    pg->lock(no_lockdep_check);
    pg_map[pgid] = pg; // 把pg保存到pg_map
    pg->get("PGMap");  // because it's in pg_map
    service.pg_add_epoch(pg->info.pgid, createmap->get_epoch());
  }
  return pg;
}

使用osdmap:

void OSD::consume_map()
{
  ......
  // scan pg's
  {
    RWLock::RLocker l(pg_map_lock);
    for (ceph::unordered_map<spg_t ,PG*>::iterator it = pg_map.begin(); // pg_map是上面load_pgs函数初始化的,保存的是osd上承载的所有pg
        it != pg_map.end();
        ++it) {
      PG *pg = it->second;
      pg->lock();
      pg->queue_null(osdmap->get_epoch(), osdmap->get_epoch());
      pg->unlock();
    }
  ......
}

void PG::queue_null(epoch_t msg_epoch,
            epoch_t query_epoch)
{
  dout(10) < < "null" << dendl;
  queue_peering_event( // 发送空事件给pg peering_queue,主要为了是让pg进入peering状态
    CephPeeringEvtRef(new CephPeeringEvt(msg_epoch, query_epoch,
                     NullEvt())));
}

需要说明的是,在osd调用start_boot(在回调_maybe_boot里)发送MOSDBoot给monitor之前,OSD仍然处于down状态,其上承载的pg也就处于degraded/undersized状态,这种情况下只要acting set里的osd数量(可用副本数)仍然大于等于pool的min_size值,pg进入peering状态也不会对客户端IO产生影响(不阻塞IO)。

OSD启动(UP):

struct C_OSD_GetVersion : public Context {
  OSD *osd;
  uint64_t oldest, newest;
  C_OSD_GetVersion(OSD *o) : osd(o), oldest(0), newest(0) {}
  void finish(int r) {
    if (r >= 0)
      osd->_maybe_boot(oldest, newest);
  }
};

// OSD::init和OSD::handle_osd_map都会调用这个函数,从monitor查询osdmap版本信息,并在osd拥有的osdmap版本号与最新版本相差不大时发送启动消息给monitor
// 如果osd本地osdmap版本与最新版本相差较大(超过osd_map_message_max),则osdmap_subscribe并在OSD::handle_osd_map里再次调用这个函数检查版本号差距
void OSD::start_boot()
{
  dout(10) < < "start_boot - have maps " << superblock.oldest_map
       << ".." << superblock.newest_map << dendl;
  C_OSD_GetVersion *c = new C_OSD_GetVersion(this);
  monc->get_version("osdmap", &c->newest, &c->oldest, c);
}

void OSD::_maybe_boot(epoch_t oldest, epoch_t newest)
{
  ......
  // if our map within recent history, try to add ourselves to the osdmap.
  if (osdmap->test_flag(CEPH_OSDMAP_NOUP)) {
    dout(5) < < "osdmap NOUP flag is set, waiting for it to clear" << dendl;
  } else if (is_waiting_for_healthy() || !_is_healthy()) {
    // if we are not healthy, do not mark ourselves up (yet)
    dout(1) << "not healthy; waiting to boot" << dendl;
    if (!is_waiting_for_healthy())
      start_waiting_for_healthy();
    // send pings sooner rather than later
    heartbeat_kick();
  } else if (osdmap->get_epoch() >= oldest - 1 &&
         osdmap->get_epoch() + cct->_conf->osd_map_message_max > newest) {
    _send_boot(); // 在这里告诉monitor,osd已启动
    return;
  }

  // get all the latest maps
  // 如果OSD拥有的osdmap与集群的osdmap版本差距较大,则继续发送osdmap订阅消息给monitor,monitor会返回订阅的osdmap(批量发送osd_map_message_max),直到二者差距不大(小于osd_map_message_max)
  if (osdmap->get_epoch() + 1 >= oldest)
    osdmap_subscribe(osdmap->get_epoch() + 1, true);
  else
    osdmap_subscribe(oldest - 1, true);
}

void OSD::_send_boot()
{
  ......
  MOSDBoot *mboot = new MOSDBoot(superblock, service.get_boot_epoch(),
                                 hb_back_addr, hb_front_addr, cluster_addr,
                 CEPH_FEATURES_ALL);
  dout(10) < < " client_addr " << client_messenger->get_myaddr()
       < < ", cluster_addr " << cluster_addr
       << ", hb_back_addr " << hb_back_addr
       << ", hb_front_addr " << hb_front_addr
       << dendl;
  _collect_metadata(&mboot->metadata);
  monc->send_mon_message(mboot); // 发送osd boot消息给monitor,之后monitor就认为osd已经启动
  // 在OSDMonitor::preprocess_boot、OSDMonitor::prepare_boot处理这个消息,prepare_boot会发送osdmap给当前osd(OSDMonitor::_booted),版本号是osd当前拥有的osdmap的epoch+1
}

一旦osd发送了MOSDBoot消息给monitor,并且monitor经过Paxos决议之后接受了osd的boot状态,那么osd就被认为是up的,加入到acting/up set里,就会被crush算法考虑在内,客户端IO就会发送到这个osd上,如果此时osd上的pg处于peering状态,则可能会阻塞客户的IO。

peering过程中osdmap更新流程

无论是调用osdmap_subscribe发送MMonSubscribe消息(osd启动前或者按需发送),或者调用_send_boot发送MOSDBoot消息(osd启动时),或者调用send_alive发送MOSDAlive消息给monitor(osd启动后,peering结束,或者给monitor上报osd信息时等),monitor都会通过调用OSDMonitor::send_latest发生osdmap给osd(全量或增量):

// Monitor::handle_subscribe处理MMonSubscribe消息,之后由OSDMonitor::check_sub发送osdmap给osd
void OSD::osdmap_subscribe(version_t epoch, bool force_request)
{
  OSDMapRef osdmap = service.get_osdmap();
  if (osdmap->get_epoch() >= epoch)
    return;

  if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
      force_request) {
    monc->renew_subs();
  }
}

void OSD::handle_osd_map(MOSDMap *m)
{
  ......
  // missing some?
  bool skip_maps = false;
  if (first > osdmap->get_epoch() + 1) {
    dout(10) < < "handle_osd_map message skips epochs " << osdmap->get_epoch() + 1
         < < ".." << (first-1) << dendl;
    if (m->oldest_map < = osdmap->get_epoch() + 1) {
      osdmap_subscribe(osdmap->get_epoch()+1, true); // 发过来的osdmap版本太老,不符合需要,重新订阅
      m->put();
      return;
    }
    // always try to get the full range of maps--as many as we can.  this
    //  1- is good to have
    //  2- is at present the only way to ensure that we get a *full* map as
    //     the first map!
    if (m->oldest_map < first) {
      osdmap_subscribe(m->oldest_map - 1, true);
      m->put();
      return;
    }
    skip_maps = true;
  }

  ObjectStore::Transaction *_t = new ObjectStore::Transaction;
  ObjectStore::Transaction &t = *_t;

  // store new maps: queue for disk and put in the osdmap cache
  // 上面的原版注释已经写清楚了
  epoch_t last_marked_full = 0;
  epoch_t start = MAX(osdmap->get_epoch() + 1, first);
  for (epoch_t e = start; e < = last; e++) {
    map<epoch_t,bufferlist>::iterator p;
    p = m->maps.find(e);
    if (p != m->maps.end()) { // 处理全量osdmap
      dout(10) < < "handle_osd_map  got full map for epoch " << e << dendl;
      OSDMap *o = new OSDMap;
      bufferlist& bl = p->second;

      o->decode(bl);
      if (o->test_flag(CEPH_OSDMAP_FULL))
      last_marked_full = e;

      hobject_t fulloid = get_osdmap_pobject_name(e);
      t.write(META_COLL, fulloid, 0, bl.length(), bl);
      pin_map_bl(e, bl);
      pinned_maps.push_back(add_map(o));
      continue;
    }

    p = m->incremental_maps.find(e);
    if (p != m->incremental_maps.end()) { // 处理增量osdmap
      dout(10) < < "handle_osd_map  got inc map for epoch " << e << dendl;
      bufferlist& bl = p->second;
      hobject_t oid = get_inc_osdmap_pobject_name(e);
      t.write(META_COLL, oid, 0, bl.length(), bl);
      pin_map_inc_bl(e, bl);

      OSDMap *o = new OSDMap;
      if (e > 1) {
        bufferlist obl;
        get_map_bl(e - 1, obl);
        o->decode(obl);
      }

      OSDMap::Incremental inc;
      bufferlist::iterator p = bl.begin();
      inc.decode(p);
      if (o->apply_incremental(inc) < 0) {
        derr << "ERROR: bad fsid?  i have " << osdmap->get_fsid() < < " and inc has " << inc.fsid << dendl;
        assert(0 == "bad fsid");
      }

      if (o->test_flag(CEPH_OSDMAP_FULL))
        last_marked_full = e;

      bufferlist fbl;
      o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED);
      ......
      hobject_t fulloid = get_osdmap_pobject_name(e);
      t.write(META_COLL, fulloid, 0, fbl.length(), fbl);
      pin_map_bl(e, fbl);
      pinned_maps.push_back(add_map(o));
      continue;
    }

    assert(0 == "MOSDMap lied about what maps it had?");
  }


  if (superblock.oldest_map) { // 更新superblock中的oldest_map版本
    int num = 0;
    epoch_t min(
      MIN(m->oldest_map,
      service.map_cache.cached_key_lower_bound()));
    for (epoch_t e = superblock.oldest_map; e < min; ++e) {
      dout(20) << " removing old osdmap epoch " << e << dendl;
      t.remove(META_COLL, get_osdmap_pobject_name(e));
      t.remove(META_COLL, get_inc_osdmap_pobject_name(e));
      superblock.oldest_map = e+1;
      num++;
      if (num >= cct->_conf->osd_target_transaction_size &&
          (uint64_t)num > (last - first))  // make sure we at least keep pace with incoming maps
        break;
    }
  }

  if (!superblock.oldest_map || skip_maps)
    superblock.oldest_map = first;
  superblock.newest_map = last; // 更新superblock中的newest_map版本

  if (last_marked_full > superblock.last_map_marked_full)
    superblock.last_map_marked_full = last_marked_full; // 更新superblock

  map_lock.get_write();

  C_Contexts *fin = new C_Contexts(cct);

  // advance through the new maps
  for (epoch_t cur = start; cur < = superblock.newest_map; cur++) {
    dout(10) << " advance to epoch " << cur << " (<= newest " << superblock.newest_map << ")" << dendl;

    OSDMapRef newmap = get_map(cur);
    assert(newmap);  // we just cached it above!

    // start blacklisting messages sent to peers that go down.
    service.pre_publish_map(newmap);

    // kill connections to newly down osds
    bool waited_for_reservations = false;
    set<int> old;
    osdmap->get_all_osds(old);
    for (set<int>::iterator p = old.begin(); p != old.end(); ++p) {
      if (*p != whoami &&
      osdmap->have_inst(*p) &&                        // in old map
      (!newmap->exists(*p) || !newmap->is_up(*p))) {  // but not the new one
        if (!waited_for_reservations) {
          service.await_reserved_maps();
          waited_for_reservations = true;
        }
    note_down_osd(*p);
      }
    }

    osdmap = newmap;

    superblock.current_epoch = cur; // 更新superblock
    advance_map(t, fin);
    had_map_since = ceph_clock_now(cct);
  }

  epoch_t _bind_epoch = service.get_bind_epoch();
  if (osdmap->is_up(whoami) &&
      osdmap->get_addr(whoami) == client_messenger->get_myaddr() &&
      _bind_epoch < osdmap->get_up_from(whoami)) {

    if (is_booting()) {
      dout(1) < < "state: booting -> active" < < dendl;
      set_state(STATE_ACTIVE);  // 设置OSD为Active状态

      // set incarnation so that osd_reqid_t's we generate for our
      // objecter requests are unique across restarts.
      service.objecter->set_client_incarnation(osdmap->get_epoch());
    }
  }

  // note in the superblock that we were clean thru the prior epoch
  // 继续更新superblock
  epoch_t boot_epoch = service.get_boot_epoch();
  if (boot_epoch && boot_epoch >= superblock.mounted) {
    superblock.mounted = boot_epoch;
    superblock.clean_thru = osdmap->get_epoch();
  }

  // superblock and commit
  // 保存superblock到硬盘(leveldb)
  write_superblock(t);
  store->queue_transaction(
    0,
    _t,
    new C_OnMapApply(&service, _t, pinned_maps, osdmap->get_epoch()),
    0, fin);
  service.publish_superblock(superblock);
  ......
  // yay!
  consume_map(); // 上面已经分析过

  if (is_active() || is_waiting_for_healthy())
    maybe_update_heartbeat_peers(); // 更新OSD心跳互检的对端列表
  ......
  if (m->newest_map && m->newest_map > last) {
    dout(10) < < " msg say newest map is " << m->newest_map < < ", requesting more" << dendl;
    osdmap_subscribe(osdmap->get_epoch()+1, true);
  }
  else if (is_booting()) {
    start_boot();  // retry,检查osd是否可以启动(是否能发送MOSDBoot消息给monitor,使osd变为up状态)
  }
  else if (do_restart)
    start_boot();

  if (do_shutdown)
    shutdown();

  m->put();
}

结论

OSD变为up状态前,所有的加载操作,对peering流程耗时均没有影响,只有当osd发送MOSDBoot消息通知monitor他已经启动,并且monitor经过Paxos决议之后将其加入osdmap中变为UP状态之后,如果再有相关数据的(从硬盘)加载操作,才可能会影响peering耗时。

在monitor发送osdmap之后osd就会将其保存到缓存中,考虑到缓存大小默认500条,还是有可能会被冲掉的,只有在osd启动时的osdmap版本跟集群的版本差距很大的时候才有这种可能(义桥私有云集群观察到启动OSD操作时osd与集群的版本号差了2091个),此时就可能影响到peering流程(OSD::process_peering_events->OSD::advance_pg->service.try_get_map),因此要尽量避免启动、停止osd时与集群的osdmap版本号差距太大(差距太大,不仅内存缓存可能不够,每个版本的osdmap都要被每个pg检查并使用一遍,积少成多也会有一定的耗时)。

因此启动前预加载osdmap到内存pagecache中,带来的好处不大。

Ceph peering流程14问

本文基于H版本代码编写(对比L版本看了下,大致流程没啥变化)。

问题1:peering是如何启动的?

有两个场景会触发peering流程:
1. 在pg创建时
2. 在OSD启动、停止导致OSDMap变化进而导致pg的acting set发生变化时

场景1对应的代码流程是:

void OSD::dispatch_op(OpRequestRef op)
{
  switch (op->get_req()->get_type()) {

  case MSG_OSD_PG_CREATE:
    handle_pg_create(op);  // 收到pg创建消息的处理函数
    break;
  ......
}

void OSD::handle_pg_create(OpRequestRef op)
{
  ......
  pg->handle_create(&rctx);
  ......
}

void PG::handle_create(RecoveryCtx *rctx)
{
  dout(10) << "handle_create" << dendl;
  Initialize evt;
  recovery_state.handle_event(evt, rctx);   // 发送Initialize事件
  ActMap evt2;
  recovery_state.handle_event(evt2, rctx);   // 发送ActMap事件
}

Initialize事件会使状态机(RecoveryMachine)进入Reset状态:

    struct Initial : boost::statechart::state< Initial, RecoveryMachine >, NamedState {
      Initial(my_context ctx);
      void exit();

      typedef boost::mpl::list <
        boost::statechart::transition< Initialize, Reset >,
      ......
    }

Reset状态定义了ActMap事件的反应函数:

    struct Reset : boost::statechart::state< Reset, RecoveryMachine >, NamedState {
      Reset(my_context ctx);
      void exit();

      typedef boost::mpl::list <
        ......
        boost::statechart::custom_reaction< ActMap >,
        ......
        > reactions;
      ......
      boost::statechart::result react(const ActMap&);
      ......
    }

boost::statechart::result PG::RecoveryState::Reset::react(const ActMap&)
{
  ......
  return transit< Started >();  // 进入Started状态
}

Started状态的初始子状态是Start状态:

struct Started : boost::statechart::state< Started, RecoveryMachine, Start >, NamedState {}


/*-------Start---------*/
PG::RecoveryState::Start::Start(my_context ctx)
  : my_base(ctx),
    NamedState(context< RecoveryMachine >().pg->cct, "Start")
{
  context< RecoveryMachine >().log_enter(state_name);

  PG *pg = context< RecoveryMachine >().pg;
  if (pg->is_primary()) {
    dout(1) << "transitioning to Primary" << dendl;
    post_event(MakePrimary());   // 当前osd是主
  } else { //is_stray
    dout(1) << "transitioning to Stray" << dendl; 
    post_event(MakeStray());     // 当前osd是从
  }
}

这里只关注MakePrimary,Start状态里定义了MakePrimary事件的反应函数:

    struct Start : boost::statechart::state< Start, Started >, NamedState {
      ......
      typedef boost::mpl::list <
        boost::statechart::transition< MakePrimary, Primary >,   // 转到Primary状态,Peering状态是其初始子状态
        boost::statechart::transition< MakeStray, Stray >
        > reactions;
    };

struct Primary : boost::statechart::state< Primary, Started, Peering >, NamedState {}

由此进入了peering流程。

场景2对应的代码流程是:

$7 = std::map with 22 elements = {["arch"] = "x86_64", ["back_addr"] = "10.173.32.8:6805/35212", 
  ["ceph_version"] = "ceph version 157c86f0d4735011af4a2b46dc803af1878e251 (7157c86f0d4735011af4a2b46dc803af1878e251)", 
  ["cpu"] = "Intel Core Processor (Broadwell)", ["distro"] = "Debian", ["distro_codename"] = "jessie", ["distro_description"] = "Debian GNU/Linux 8.6 (jessie)", 
  ["distro_version"] = "8.6", ["filestore_backend"] = "xfs", ["filestore_f_type"] = "0x58465342", ["front_addr"] = "10.173.32.8:6804/35212", 
  ["hb_back_addr"] = "10.173.32.8:6806/35212", ["hb_front_addr"] = "10.173.32.8:6807/35212", ["hostname"] = "ceph-l", 
  ["kernel_description"] = "#1 SMP Debian 3.16.36-1+deb8u1 (2016-09-03)", ["kernel_version"] = "3.16.0-4-amd64", ["mem_swap_kb"] = "0", 
  ["mem_total_kb"] = "16465048", ["os"] = "Linux", ["osd_data"] = "/var/lib/ceph/osd/ceph-1", ["osd_journal"] = "/var/lib/ceph/osd/ceph-1/journal", 
  ["osd_objectstore"] = "filestore"}

OSD::_send_boot()发给monitor的MOSDBoot消息内容,monitor收到之后进行Paxos决议,之后就认为osd已经up了,并且会给osd发送osdmap信息(版本号是osd down之前的epoch+1),osd收到osdmap应该就触发了peering流程。

OSD::ms_dispatch(Message *m) -> OSD::_dispatch(Message *m) -> handle_osd_map(static_cast(m)) -> consume_map() -> pg->queue_null(osdmap->get_epoch(), osdmap->get_epoch()):

void PG::queue_null(epoch_t msg_epoch,
            epoch_t query_epoch)
{
  dout(10) << "null" << dendl;
  queue_peering_event(
    CephPeeringEvtRef(new CephPeeringEvt(msg_epoch, query_epoch,
                     NullEvt())));
}

void PG::queue_peering_event(CephPeeringEvtRef evt)
{
  if (old_peering_evt(evt))
    return;
  peering_queue.push_back(evt);  // 入队NullEvt事件,这个空事件只为了让pg进入peering流程
  osd->queue_for_peering(this);
}

osd启动过程中OSD::init会调用load_pgs函数,进而调用handle_loaded:

void PG::handle_loaded(RecoveryCtx *rctx)
{
  dout(10) << "handle_loaded" << dendl;
  Load evt;
  recovery_state.handle_event(evt, rctx); // 发送Load事件给状态机,会使状态机从Initial状态进入Reset状态
}

之后OSD::process_peering_events函数(peering线程的实际处理函数)会调用OSD::advance_pg,再调用PG::handle_activate_map,发送ActMap事件给状态机(此时状态机处于Reset状态),状态机就进入了Started状态,之后就是MakePrimary/MakeStray了。

问题2:peering各个阶段是如何转换的?

peering主要分为GetInfo、GetLog、GetMissing、WaitUpThru这4个阶段(状态),4个阶段一般来说是串行的,他们的转换过程涉及到boost状态机的基础知识(参考问题7),总体来说状态转换图可以参考官方文档:
pg整体状态变化

这个图虽然看起来更清晰,但缺少了部分状态转换流程:peering状态变化

如在GetLog阶收到AdvMap事件后进入Reset状态的流程。

就代码层面来说,相关的转换过程举例如下:

进入peering初始状态

Peering状态的初始状态就是GetInfo(boost状态机基础知识,参考问题7):

struct Peering : boost::statechart::state< Peering, Primary, GetInfo >, NamedState {......}

GetInfo阶段经过以下几个主要步骤后进入下一个阶段(本阶段具体做的事情参考问题10):
1. pg->generate_past_intervals()
2. pg->build_prior(prior_set)
3. PG::RecoveryState::GetInfo::get_infos() // 如果prior_set为空,不需要从其他OSD获取信息,则直接进入GetLog阶段(post_event(GotInfo()))
4. PG::RecoveryState::GetInfo::react(const MNotifyRec& infoevt) // 获取其他OSD返回的pg info(接收到MNotifyRec事件)后进入GetLog阶段(post_event(GotInfo()))

另外一个问题:MNotifyRec(Message Notify Receive)事件是如何投递过来的?
首先我们要理解一点,根据peering线程模型(参考问题9)可知,所有peering相关操作都是在2个peering线程中执行的,因此事件投递要么直接通过这两个线程进行(无法异步),要么通过队列,然后由peering线程异步的从队列中获取事件并处理,Ceph中几乎所有耗时操作都是异步的,因此肯定是通过队列来投递事件,具体流程是:

void OSD::dispatch_op(OpRequestRef op)
{
  switch (op->get_req()->get_type()) {

  case MSG_OSD_PG_CREATE:
    handle_pg_create(op);
    break;

  case MSG_OSD_PG_NOTIFY:
    handle_pg_notify(op); // 这里分发pg notify消息
    break;
  ......
}


/** PGNotify
 * from non-primary to primary
 * includes pg_info_t.
 * NOTE: called with opqueue active.
 */
void OSD::handle_pg_notify(OpRequestRef op)
{
  MOSDPGNotify *m = (MOSDPGNotify*)op->get_req();
  assert(m->get_type() == MSG_OSD_PG_NOTIFY);
  dout(7) << "handle_pg_notify from " << m->get_source() << dendl;
  ......
  handle_pg_peering_evt(
      spg_t(it->first.info.pgid.pgid, it->first.to),
      it->first.info, it->second,
      it->first.query_epoch, pg_shard_t(from, it->first.from), true,
      PG::CephPeeringEvtRef(
    new PG::CephPeeringEvt(
      it->first.epoch_sent, it->first.query_epoch,
      PG::MNotifyRec(pg_shard_t(from, it->first.from), it->first, // 构造MNotifyRec事件
          op->get_req()->get_connection()->get_features())))
      );
}

/*
 * look up a pg.  if we have it, great.  if not, consider creating it IF the pg mapping
 * hasn't changed since the given epoch and we are the primary.
 */
void OSD::handle_pg_peering_evt(
  spg_t pgid,
  const pg_info_t& info,
  pg_interval_map_t& pi,
  epoch_t epoch,
  pg_shard_t from,
  bool primary,
  PG::CephPeeringEvtRef evt)
{
  ......
  pg->queue_peering_event(evt);  // 事件入队
}


void PG::queue_peering_event(CephPeeringEvtRef evt)
{
  if (old_peering_evt(evt))
    return;
  peering_queue.push_back(evt);  // 入的是peering_queue队列,注意这个队列是PG类的,和OSD::PeeringWQ类的那个同名队列不同
  osd->queue_for_peering(this);  // 这个是入队OSD::PeeringWQ::peering_queue,也即如果有新事件到来,就要继续被peering线程处理一次
}

class PG: {
  ......
  list<CephPeeringEvtRef> peering_queue;  // 这里保存的是pg收到状态机事件
  ......
}

class OSD: {
  ......
  // -- peering queue --
  struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
    list<PG*> peering_queue;  // 这里保存的是正在执行peering流程的pg列表,对应的处理线程是peering线程池(参考问题11)
    ......
  }
  ......
}

PG::peering_queue队列是也是在peering线程池处理函数OSD::process_peering_events里出队的:

void OSD::process_peering_events(//OSD::PeeringWQ::_dequeue获取一批pg(默认20个),然后来这里处理,处理完进入Peering第一阶段GetInfo
  const list<PG*> &pgs,          // 发请求给其他osd获取信息:querying info from osd.0,然后线程就结束了,其他阶段是通过事件触发的
  ThreadPool::TPHandle &handle
  )
{
  ......
    if (!advance_pg(curmap->get_epoch(), pg, handle, &rctx, &split_pgs)) {
      // we need to requeue the PG explicitly since we didn't actually
      // handle an event // 还会被线程池拿出来继续在这里处理
      peering_wq.queue(pg); // peering_wq就是类PeeringWQ的实例化对象,BatchWorkQueue::queue,PeeringWQ::_enqueue,重新加入到peering_queue
    } else {
      assert(!pg->peering_queue.empty());
      PG::CephPeeringEvtRef evt = pg->peering_queue.front();  // 取出第一个元素
      pg->peering_queue.pop_front();  // 出队
      pg->handle_peering_event(evt, &rctx);  // 投递状态机事件给状态机处理
    }
  ......
}

void PG::handle_peering_event(CephPeeringEvtRef evt, RecoveryCtx *rctx)
{
  ......
  recovery_state.handle_event(evt, rctx);
}

进入GetLog状态

上面已经提到进入GetLog状态的两个途径,1是如果prior_set为空,不需要从其他OSD获取信息,则直接进入GetLog阶段;2是收到MNotifyRec事件并处理完之后。这里就不多说明。

进入GetMissing状态

GetLog阶段会给拥有权威日志的OSD发送pg_query_t::LOG获取pg log请求,对方回复的日志消息事件是MLogRec(Message Log Receive),GetLog状态的pg收到这个事件后就进入了GetMissing阶段。

MLogRec事件的分发流程与MNotifyRec类似,这里也不多说明。

GetMissing阶段会检查pg log是否完整,如不完整则需要继续从actingbackfill的OSD集合里拉取日志,生成缺失的对象列表,并在后续的recovery/backfill阶段根据这个列表进行数据恢复。

进入WaitUpThru状态

WaitUpThru状态不是所有pg都会进入的,条件是两种情况:
1. pg已经在GetLog阶段获取了所有日志,没有缺失对象,GetMissing阶段就可以在第一阶段退出(不必给actingbackfill的OSD发请求),这时如果pg不需要通知monitor更新up_thru值,则不进入WaitUpThru状态,否则进入
2. pg有缺失对象,在收到OSD回复的pg日志后(MLogRec事件),还会再次做条件1的判断

进入WaitUpThru状态都是通过post_event(NeedUpThru())进行的。

如果不需要进入WaitUpThru状态,则pg直接进入Active状态(通过post_event(Activate(pg->get_osdmap()->get_epoch()))):
Peering状态里有定义事件转换:boost::statechart::transition< Activate, Active >

进入Active状态

要进入Active状态,必须是up_thru已经更新(或者原本就不需要更新)的情况,第一种很简单,进入WaitUpThru或者之前的GetMissing阶段就直接进入Active了。

这里讨论第二种up_thru需要更新的情况,在peering线程池处理函数OSD::process_peering_events中会轮询每个peering_queue中的pg,并调用OSD::advance_pg批量检查收到的增量osdmap(一般都是monitor发过来的),对每个增量版本都调用pg->handle_advance_map来更新到pg,并发送AdvMap(Advance Map)给状态机,该事件由PG::RecoveryState::Peering::react(const AdvMap& advmap) 反应函数处理,里面继续调用pg->adjust_need_up_thru(advmap.osdmap),这里会根据接收到的osdmap里的up_thru值,来检查是否已经更新到期望值,如果已更新,则把pg的need_up_thru值置为false,表示已更新,就可以OSD::advance_pg函数里pg->handle_advance_map之后通过调用pg->handle_activate_map(rctx)来发送ActMap(Active Map)事件,最终退出WaitUpThru阶段进入Active阶段。

问题3:peering为啥会阻塞客户端IO?

就原理层面简单来说是为了保证用户数据的完整性和一致性,试想一下如果pg的部分数据还没完全恢复或者各个副本之间还没达成一致,就接受客户端IO读写,这时将会发生读错误或者因写入新数据导致用户数据混乱不一致。

就代码层面来说,阻塞客户端IO的代码流程是在:

bool OSD::dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap)
{
  if (is_stopping()) {
    // we're shutting down, so drop the op
    return true;
  }

  epoch_t msg_epoch(op_required_epoch(op));
  if (msg_epoch > osdmap->get_epoch()) { // OSD启动后,peering之前如果client发IO请求到OSD,会在这里返回,因为客户端拿到的osdmap比osd的新
    Session *s = static_cast<Session*>(op->get_req()->
                       get_connection()->get_priv());
    if (s) {
      s->received_map_lock.Lock();
      epoch_t received_epoch = s->received_map_epoch;
      s->received_map_lock.Unlock();
      if (received_epoch < msg_epoch) {
        osdmap_subscribe(msg_epoch, false);
      }
      s->put();
    }
    return false;
  }
  ......
}

上面的dispatch_op_fast返回false之后,并不会真正的返回错误给客户端,而是会把这个op放入一个等待队列,等条件满足后会再次处理,也即在此过程中客户端IO是阻塞的。

void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
{
  assert(session->session_dispatch_lock.is_locked());
  assert(session->osdmap == osdmap);
  for (list<OpRequestRef>::iterator i = session->waiting_on_map.begin();
       i != session->waiting_on_map.end() && dispatch_op_fast(*i, osdmap);
       session->waiting_on_map.erase(i++));  // dispatch_op_fast返回false,则waiting_on_map不会清空。waiting_on_map是在更上层的函数里把客户端op添加进来的

  if (session->waiting_on_map.empty()) {
    clear_session_waiting_on_map(session);
  } else {
    register_session_waiting_on_map(session); // waiting_on_map不为空则需要等待osdmap更新之后才能处理客户端IO
  }
}

在OSD::consume_map()函数中会调用OSD::dispatch_session_waiting,从而在OSDMap更新后继续处理执行阻塞的客户端IO,OSD::handle_pg_peering_evt(peering事件投递函数)中也会通过调用wake_pg_waiters(pg, pgid)来间接调用到OSD::dispatch_session_waiting。

问题4:peering怎么挑选OSD的?

PG::RecoveryState::GetInfo::GetInfo会调用PG::build_prior生成prior_set,prior_set里保存的就是peering阶段获取pg info要用到的OSD列表(可能包含当前OSD)。

挑选过程可以简单的理解为pg当前acting和up set,以及历史的acting set(需要OSD当前处于up状态)。在我们的使用场景下,一般一个OSD down了之后,不会out(坏盘场景除外),并且会很快的处理掉这种故障,在OSD down期间,他对应的副本OSD down的概率极低(我们一般不会容忍2个副本OSD同时down的场景),因此历史的acting set一般都用不到(一个OSD down期间可以认为他上面的pg的acting和up set不会变化),当前的acting和up set就可以获取peering所需的全部pg info信息。

问题5:为啥要有peering流程?

ceph社区官方解释:http://docs.ceph.com/docs/master/dev/peering/

简单来说就是,peering是为了让pg知道他应该关联到哪些OSD?哪个是主?谁保存了哪些用户数据、元数据?怎么汇集这些数据以便在各种OSD故障场景下,保证用户数据的完整性和一致性?所有额外的那些概念(PG或OSD类的成员)都是为了达到这个目的而添加/设置的。

为了保证数据的完整性和一致性,需要在peering未成功完成之前阻止客户端IO。因此如果peering无法正常完成,pg将处于incomplete状态,无法提供IO能力。无法完成peering的原因根据peering的功能可以推测出来,比如pg知道了他关联的OSD,但这些OSD都是down的;或者这些OSD保存的元数据汇集起来仍然无法还原故障期间用户所做的IO操作,也就是无法保证用户数据的完整性和一致性。

设想一下如果没有peering流程,3副本情况最简单的故障场景下:
1. A B C — up
2. A B — up, C — down // 我们期望AB继续对客户端提供IO能力,如果没有peering,C是主的情况下,AB就无法对客户端提供IO读写能力
3. A B C — up // C启动之后,它保存的数据可能就是不完整的,甚至是与AB不一致的

当然更复杂的故障场景下,需要添加更多的持久化变量(例如up_set、acting_set、up_thru等)来记录相关的信息,以便帮助OSD进行故障恢复的决策,也有一些临时性的变量在故障恢复过程中发挥作用(如prior_set等)。

问题6:peering相关的可调参数有哪些?分别有啥用处?

  1. peering线程池中worker线程数:osd_op_threads,默认值2,即有几个线程并发处理peering pg,线程多了相对会加快peering处理速度,但可能对monitor节点或者OSD本地盘造成压力,如果一个OSD节点的pg数量较多,并且peering pg有堆积现象,可以考虑增加线程数。
  2. 线程处理pg时批量获取pg的数量:osd_peering_wq_batch_size,默认值20,即每个peering worker线程每次从peering_queue队列中获取多少pg来处理,每批pg都是串行处理的,如果有一个pg比较耗时,则可能影响同一批的其他pg
  3. pg检查OSDMap增量版本时每次检查的最大版本数量:osd_map_max_advance,默认值200,每个pg每次被peering线程处理时,都会检查最多这么多个OSDMap增量版本,多了单次处理相对会更耗时,少了就要多被peering线程调度几次才能处理完
  4. monitor Paxos决议间隔时间:paxos_propose_interval,默认值1.0,monitor的多个主从节点之间进行Paxos决议的间隔,这个间隔是为了防止OSDMap的版本号变化太快导致overflow,同时缓解monitor节点压力,这个间隔时间段内的所有osd状态变化可以一次性处理掉

问题7:peering使用的boost状态机是怎么回事?

boost状态机的参考文档:http://sns.hwcrazy.com/boost_1_41_0/libs/statechart/doc/tutorial.html

下面将结合peering相关代码来说明状态机的常用操作。

常用操作:

  1. 定义一个状态机
    状态机都是继承自boost::statechart::state_machine:
class PG {
  class RecoveryState {
    /* States */
    struct Initial; // 提前声明初始的状态,因为它只能在定义了状态机之后再定义状态详情,而定义状态机必须有一个初始状态
    class RecoveryMachine : public boost::statechart::state_machine< RecoveryMachine, Initial > {
      ......
    }
    // 上述类RecoveryMachine就是一个状态机,Initial是其初始状态
  ......
  RecoveryMachine machine;
  ......
  public:
    RecoveryState(PG *pg)
      : machine(this, pg), pg(pg), orig_ctx(0) {
      machine.initiate();   // 状态机初始化
    }
    ......
  } recovery_state;  // recovery_state在PG构造函数中初始化
  ......
}
  1. 定义一个状态
    状态继承自boost::statechart::state(也可以继承自boost::statechart::simple_state):
    struct Initial : boost::statechart::state< Initial, RecoveryMachine >, NamedState {
      Initial(my_context ctx);
      void exit();
      ......
    }
    // Initial就是一个状态,他属于状态机RecoveryMachine(每个状态都必须属于一个状态机)。
  1. 定义一个子状态
    子状态和状态定义上没有区别,只是在定义的时候可以为一个状态指定子状态,表示其从属关系,子状态也可以有自己的子状态,也可以没有。
    struct Start;
    struct Started : boost::statechart::state< Started, RecoveryMachine, Start >, NamedState {
      Started(my_context ctx);
      void exit();
      ......
    }
    // Started也是一个状态,他属于状态机RecoveryMachine,初始子状态是Start。
    ......

    struct GetInfo;
    struct Peering : boost::statechart::state< Peering, Primary, GetInfo >, NamedState {
      std::auto_ptr< PriorSet > prior_set;
      Peering(my_context ctx);
      void exit();
      ......
    };
    // Peering也是一个状态,其父状态是Primary(由父状态间接确定所属状态机),初始子状态是GetInfo。

    ......
    struct Start : boost::statechart::state< Start, Started >, NamedState {
      Start(my_context ctx);
      void exit();
      ......
    };
    // Start状态就没有子状态,只有父状态Started。
  1. 定义状态机事件
    事件是用来驱动状态机在状态间进行转换的,其继承自boost::statechart::event:
    struct MakePrimary : boost::statechart::event< MakePrimary > {
      MakePrimary() : boost::statechart::event< MakePrimary >() {}
    };
    // MakePrimary就是一个事件
  1. 定义事件反应
    事件反应是指状态机在某个状态下,接收到指定事件后,会把状态机转换到另外一个状态,或者执行指定的操作:
    struct Peering : boost::statechart::state< Peering, Primary, GetInfo >, NamedState {
      ......
      typedef boost::mpl::list <
        boost::statechart::custom_reaction< QueryState >,
        boost::statechart::transition< Activate, Active >,
        boost::statechart::custom_reaction< AdvMap >
        > reactions;
      ......
    };
    // 上述reactions就是定制化的事件反应,声明了QueryState事件和AdvMap事件的反应,transition则是定义了接收到Activate事件的状态变化,会把Peering状态转换到Active状态(也是一种事件反应的定义)
  1. 实现事件反应函数
    定制化的事件反应需要实现对应的反应函数,如上面定义的QueryState事件和AdvMap事件的custom_reaction反应,都需要实现具体的反应函数才行:
// 事件反应函数的名称统一为react,通过重载(参数不同)对应不同的事件
boost::statechart::result PG::RecoveryState::GetLog::react(const AdvMap& advmap)
{
  ......
  if (!advmap.osdmap->is_up(auth_log_shard.osd)) {
    dout(10) << "GetLog: auth_log_shard osd."
         << auth_log_shard.osd << " went down" << dendl;
    post_event(advmap);
    return transit< Reset >();
  }

  // let the Peering state do its checks.
  return forward_event();
}
// 上述反应函数对应的是AdvMap事件
  1. 丢弃事件
boost::statechart::result PG::RecoveryState::GetLog::react(const MLogRec& logevt)
{
  assert(!msg);
  if (logevt.from != auth_log_shard) {
    dout(10) << "GetLog: discarding log from "
         << "non-auth_log_shard osd." << logevt.from << dendl;
    return discard_event();
  }
  ......
}
  1. 投递事件
    投递事件有两种方式,一种是调用状态机的process_event()函数,一种是在自定义的事件反应函数里调用post_event(),两种方式都是向状态机投递相应的事件:
  class RecoveryState {
    ......
    void handle_event(const boost::statechart::event_base &evt,
              RecoveryCtx *rctx) {
      start_handle(rctx);
      machine.process_event(evt);
      end_handle();
    }
    ......
  }
boost::statechart::result PG::RecoveryState::GetLog::react(const MLogRec& logevt)
{
  ......
  dout(10) << "GetLog: received master log from osd" << logevt.from << dendl;
  msg = logevt.msg;
  post_event(GotLog());
  ......
}

// 上述post_event(GotLog())投递GotLog事件后会调用这个事件反应函数
boost::statechart::result PG::RecoveryState::GetLog::react(const GotLog&)
{
  dout(10) << "leaving GetLog" << dendl;
  ......
  return transit< GetMissing >();
}

投递事件之后,状态机会根据当前所处的状态以及当前状态下定义的事件反应函数对事件做出反应(也即调用对应的事件反应函数)。

  1. 转发事件
boost::statechart::result PG::RecoveryState::GetLog::react(const AdvMap& advmap)
{
  ......
  return forward_event(); // 会把AdvMap事件转发给GetLog的上层状态(Started/Primary/Peering/GetLog,因此上层状态是Peering)进行处理,如果上层状态没有实现对应的事件反应函数,则继续向上层转发直到状态机的最顶层
}
  1. 强制状态转换
boost::statechart::result PG::RecoveryState::GetLog::react(const GotLog&)
{
  dout(10) << "leaving GetLog" << dendl;
  ......
  return transit< GetMissing >();
}

需要注意的是,强制状态转换时必须搭配return关键词使用,否则可能导致未定义行为。原因是transit之后会调用当前状态的析构函数释放相关资源,如果在transit之后还有要执行的代码,则会导致未知行为。

  1. 从外部获取状态机内部信息
    可以通过定义事件反应函数来通过特定事件获取状态机内部信息:
// QueryState事件的自定义事件反应函数
boost::statechart::result PG::RecoveryState::Peering::react(const QueryState& q)
{
  PG *pg = context< RecoveryMachine >().pg;

  q.f->open_object_section("state");
  q.f->dump_string("name", state_name);
  q.f->dump_stream("enter_time") << enter_time;
  ......
  q.f->close_section();
  return forward_event();
}
// 上述事件反应是为perf dump服务的,用来通过OSD的UNIX domain socket接口查询状态机内部信息

通过ceph daemon osd.0 perf dump recoverystate_perf命令可以从OSD的UNIX domain socket接口查询状态机内部信息:

        "peering_latency": { // peering阶段总耗时统计,包含了下面4个阶段
            "avgcount": 11,  // 多少个pg进入过该阶段
            "sum": 13.586214755 // 阶段总耗时,除以avgcount就是每个pg的平均耗时,下同
        },
--
        "getinfo_latency": { // GetInfo阶段耗时统计
            "avgcount": 11,
            "sum": 0.233014045
        },
        "getlog_latency": { // GetLog阶段耗时统计
            "avgcount": 11,
            "sum": 0.000715519
        },
--
        "getmissing_latency": { // GetMissing阶段耗时统计
            "avgcount": 8,
            "sum": 0.000222941
        },
        "waitupthru_latency": { // WaitUpThru阶段耗时统计
            "avgcount": 8,
            "sum": 13.351894826
        }

perf dump相关数据记录过程(以GetInfo阶段为例):

void PG::RecoveryState::GetInfo::exit()
{
  context< RecoveryMachine >().log_exit(state_name, enter_time); // enter_time是在NamedState里定义,并在其构造函数里初始化的
  PG *pg = context< RecoveryMachine >().pg;
  utime_t dur = ceph_clock_now(pg->cct) - enter_time;
  pg->osd->recoverystate_perf->tinc(rs_getinfo_latency, dur);  // 通过ceph通用的perfcounter机制来累计次数和耗时
  pg->blocked_by.clear();
}

// GetInfo继承了NamedState
  struct NamedState {
    const char *state_name;  // 状态名称
    utime_t enter_time;  // 进入状态的时间
    const char *get_state_name() { return state_name; }
    NamedState(CephContext *cct_, const char *state_name_)
      : state_name(state_name_),
        enter_time(ceph_clock_now(cct_)) {}  // 初始化enter_time为当前时间
    virtual ~NamedState() {}
  };

问题8:peering有哪些阶段可能比较耗时?

准备阶段

之所以要讨论这个阶段,是因为在线下复现过程中,发现准备阶段也会影响peering耗时,其影响的peering阶段主要是GetInfo,原因如下:

// OSD::process_peering_events函数是peering线程池执行体,负责处理OSD的pg的peering流程,默认有两个线程,每次批量处理20个pg,其主要流程伪代码如下:
for pg in pgs:
    pg->lock();
    advance_pg(pg) or pg->handle_peering_event();
    pg->write_if_dirty();
    dispatch_context_transaction(pg);
    pg->unlock();

每个pg执行到pg->handle_peering_event()之后就进入了peering的各个阶段(根据事件不同状态机转换到不同状态也即peering的不同阶段),而peering线程池线程数量就2个(也即2个线程并发执行OSD::process_peering_events函数),如果两个线程都阻塞到OSD::process_peering_events函数的某一步(例如advance_pg或dispatch_context_transaction),则就会导致之前已经进入peering流程的pg耗时变长,磁盘限速复现场景下,可以通过日志看到dispatch_context_transaction是耗时较长的流程(在FileStore::op_queue_reserve_throttle函数里有限流操作,队列中op数量超出50个则需要等待),其影响的peering阶段主要是GetInfo。

GetInfo

GetInfo阶段本身可能耗时的地方在于从其他OSD(同属一个pg或者曾经同属一个,up+acting)获取pg_query_t::INFO信息过程,与每个OSD的交互相当于有两次的网络传输,和一次OSD读取数据。在收到OSD回复的MNotifyRec消息后会退出GetInfo阶段进入GetLog阶段。

其他主要流程如pg->generate_past_intervals()、pg->build_prior()、pg->proc_replica_info等都是OSD进程内的内存操作,正常情况下都是非常快速的。

GetLog

GetLog阶段本身耗时的地方也是与其他OSD(同属一个pg或者曾经同属一个,up+acting)交互流程,获取pg_query_t::LOG信息,在收到其他OSD回复的MLogRec消息后会退出GetLog流程,进入GetMissing阶段。其他流程如pg->proc_master_log也是OSD内部的内存操作。

GetMissing

GetMissing阶段本身耗时的地方也是与其他OSD(同属一个pg或者曾经同属一个,actingbackfill)交互流程,获取pg_query_t::LOG或pg_query_t::FULLLOG信息,在收到其他OSD回复的MLogRec消息后会退出GetMissing流程,根据是否需要更新up_thru决定进入WaitUpThru阶段或者Activate阶段。其他流程如pg->proc_replica_log也是OSD内部的内存操作。

WaitUpThru

通常情况下这个阶段都是要走的,因为OSD启动或者停止后都会进入peering流程(本身OSD或者其他OSD),流程结束后,都要更新OSD的up_thru值,以记录OSD上次peering正常结束的epoch版本。

该阶段主要是发送MOSDAlive消息给monitor(包含期望更新到的up_thru版本号),并等待monitor回复更新后的OSDMap,并没有其他额外的流程。

OSD::process_peering_events函数在遍历完所有pg后,会根据是否需要更新up_thru来进入queue_want_up_thru函数,这个函数会发送MOSDAlive消息给monitor。之后由monitor通过Paxos协议决议出新的OSDMap(包含OSD期望更新到的up_thru版本号),然后通过回调回复给OSD。这个决议过程需要写入磁盘进行持久化,并且还需要最长等待1s来防止OSDMap变更过于频繁导致epoch值变化太快(可能发生越界)、OSDMap数量太多占用更多的存储空间。

如果monitor存储元数据所用的磁盘IO能力较低(尤其是IOPS能力),就会导致磁盘写入等待时间过长,无法及时进行决议的持久化,从而影响OSD的waitupthru耗时。

问题9:peering相关的线程模型和数据结构是什么样的?

数据结构:

  // -- peering queue --
  // struct PeeringWQ是OSD类的一个成员,实例化了peering_wq成员
  struct PeeringWQ : public ThreadPool::BatchWorkQueue<pg> {
    list</pg><pg *> peering_queue;  // 正在peering的PG列表
    OSD *osd;    // osd对象指针
    set</pg><pg *> in_use;     // 正在peering线程处理的pg集合,在_dequeue(list</pg><pg *> *out)中添加pg,在_process_finish中清理
    PeeringWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp) // 构造函数
      : ThreadPool::BatchWorkQueue</pg><pg>(  // 继承了线程池父类,从而关联到线程池
    "OSD::PeeringWQ", ti, si, tp), osd(o) {}

    void _dequeue(PG *pg); // 出队单个pg,带下划线前缀表示是内部接口,只被父类BatchWorkQueue提供的相关外部接口封装使用,下同

    bool _enqueue(PG *pg) {  // 入队函数,被调用流程:main,OSD::init,OSD::consume_map,PG::queue_null,PG::queue_peering_event,
      pg->get("PeeringWQ");  // OSDService::queue_for_peering,ThreadPool::BatchWorkQueue</pg><pg>::queue
      peering_queue.push_back(pg);
      return true;
    }
    bool _empty(); // peering_queue队列是否为空
    void _dequeue(list</pg><pg *> *out);  // 批量出队pg,被ThreadPool::worker里的_void_dequeue调用
    void _process(  // peering线程处理函数,被ThreadPool::worker里的_void_process调用
      const list</pg><pg *> &pgs,  // pgs是OSD::PeeringWQ::_dequeue批量获取的
      ThreadPool::TPHandle &handle) {
      osd->process_peering_events(pgs, handle);   // peering主处理流程,批处理方式进行
      for (list</pg><pg *>::const_iterator i = pgs.begin();
       i != pgs.end();
       ++i) {
        (*i)->put("PeeringWQ");
      }
    }
    void _process_finish(const list</pg><pg *> &pgs); // 清理peering线程使用完毕的pg,并不意味着这些pg的peering过程已经结束,被ThreadPool::worker里的_void_process_finish调用
    void _clear() {
      assert(peering_queue.empty());
    }
  } peering_wq;

线程模型:

peering使用的也是ceph的通用线程池模型(PeeringWQ绑定了ThreadPool),实现了自己的队列出队、入队,及线程处理函数(struct PeeringWQ::_process),线程池初始化是在OSD类的构造函数里初始化的:

OSD::OSD(CephContext *cct_, ObjectStore *store_,
     ......
     const std::string &dev, const std::string &jdev) :
  ......
  osd_tp(cct, "OSD::osd_tp", cct->_conf->osd_op_threads, "osd_op_threads"), // 可以看出peering是有自己独立的线程池的,默认线程实例数量是2个,没有共用IO线程池
  peering_wq(
    this,
    cct->_conf->osd_op_thread_timeout,  // 线程处理超时告警时间,默认15s,注意这个15s是对单个pg来说的,每操作一个pg会reset一次
    cct->_conf->osd_op_thread_suicide_timeout,  // 线程处理超时自杀时间,默认150s
    &osd_tp),

整个线程池调用的最关键的处理函数就是osd->process_peering_events(pgs, handle)。

问题10:GetInfo阶段到底get的是哪些info?info用来干啥的?

GetInfo阶段主要做了4件事:
1. 计算past_interval
2. 构造prior_set
3. 给prior_set里的OSD发送查询pg info消息
4. 处理OSD返回的pg info

past_interval

pg->generate_past_intervals()生成past_interval的列表,保存在map<epoch_t,pg_interval_t> past_intervals中。

简单来说,past_interval就是pg的acting set和up set保持不变的一个时间段,这个时间段的开始和结束时间点都是用epoch来表示的(如果把epoch理解为时间戳则更容易理解),一旦acting和up set发生了变化,则就会生成一个新的interval,所谓past,顾名思义就是过去的,有过去的就有现在或者说当前的,当前的interval就是pg当前acting和up set一致的时间段,对应的名称是current_interval。

past_interval也不是全部的历史记录,也没有必要全部,实际上只关注OSD down掉的那部分时间段的即可,(为了各种异常会比较OSD的superblock里持久化了之前已经保存好的最老osdmap的epoch),在我们场景下,osd down了之后,pg会在degraded模式下运行(acting和up set中有2个osd),因此其past_intervals一般就一个interval,从osd down的那个epoch开始到osd up之前的那个epoch结束(从up那个epoch开始到当前最新的epoch就是current_interval)。

prior_set

由于我们的场景下past_intervals很少,因此prior_set也比较简单,基本可以认为是osd down掉之后另外两个osd就是prior_set,也即找这两个osd就可以恢复全部元数据和数据(这也不难理解,另外两个正常的osd确实保存了全部用户数据和pg元数据信息)。

pg info

pg info结构体定义如下:

struct pg_info_t {
  spg_t pgid;
  eversion_t last_update;    // last object version applied to store.
  eversion_t last_complete;  // last version pg was complete through.
  epoch_t last_epoch_started;// last epoch at which this pg started on this osd

  version_t last_user_version; // last user object version applied to store

  eversion_t log_tail;     // oldest log entry.

  hobject_t last_backfill;   // objects >= this and < last_complete may be missing

  interval_set<snapid_t> purged_snaps;

  pg_stat_t stats;

  pg_history_t history;
  pg_hit_set_history_t hit_set;
  ......
}

这些信息主要是记录pg的元数据的,并没有用户数据,通过对从prior_set里其他OSD获取的pg info和本地OSD的pg info进行比对,就能知道谁的pg保存了哪些(时间段或者说epoch版本序列)用户数据(pg log),从而在后续阶段利用对应的OSD进行恢复。

问题11:GetLog阶段到底get的是哪些log?log用来干啥的?

choose_acting

因为新的osd up了,pg的分布就可能发生变化,所以要生成新的acting set,但是acting set里面可能有需要backfill的osd,所以这个set就叫acting_backfill。
在根据pool的类型来调用PG::calc_replicated_acting(副本)或calc_ec_acting(ec)计算acting_backfill之前,需要先找出具有权威日志的osd(可能包含多个osd,会进行排序),在我们关注的场景下(启动、停止osd,并且noout),降级状态的2个副本其中之一为主,因此根据选择规则基本上选择的权威日志OSD就是当前的主OSD。

这个过程中还涉及到pg_temp(临时主)的选择过程,简单理解就是crush算法选择的新主不具备全部用户数据时,就需要先通过临时主对新主进行数据恢复,恢复完毕之后再切换到真正的新主。

get olog

如果选择出来的权威日志OSD就是当前OSD,则直接进入GetMissing阶段,也即不需要从权威日志OSD节点上获取pg log用于恢复数据(自己本身就有这些日志了)。

如果权威日志OSD不是自己,就需要给他发送pg_query_t::LOG查询消息,而对方回复的消息里包含的数据是:

class MOSDPGLog : public Message {

  static const int HEAD_VERSION = 4;
  static const int COMPAT_VERSION = 2;

  epoch_t epoch;
  /// query_epoch is the epoch of the query being responded to, or
  /// the current epoch if this is not being sent in response to a
  /// query. This allows the recipient to disregard responses to old
  /// queries.
  epoch_t query_epoch;

public:
  shard_id_t to;   // 目标OSD
  shard_id_t from; // 源OSD
  pg_info_t info;  // pg info
  pg_log_t log;    // pg log
  pg_missing_t missing; // 权威日志OSD当前pg下缺少的对象列表,后面用来恢复对端的数据
  pg_interval_map_t past_intervals;
  ......
}

收到上述信息之后,会调用pg->proc_master_log()对其进行处理,这个函数主要是把收到的权威pg log与本地pg log进行合并,从而让当前pg所在的OSD变成主OSD(可能还需要经历pg_temp临时主阶段)。

问题12:GetMissing阶段?

GetMissing的正常流程是会从actingbackfill里的OSD上获取pg log,然后将其与权威日志比较,从而计算出各个OSD上缺少的对象,在后面的recovery/backfill阶段进行恢复。但在我们关注的场景下(启动、停止osd,并且noout),只有启动OSD的时候可能在当前OSD有缺少的对象,另外两个OSD一般来说都是包含全部对象的,因此只需要在当前OSD上进行数据恢复即可(也就是说不需要从其他OSD获取任何日志了)。

PG::RecoveryState::GetMissing::GetMissing(my_context ctx)
  : my_base(ctx),
    NamedState(context< RecoveryMachine >().pg->cct, "Started/Primary/Peering/GetMissing")
{
  ......
    // 下面这两个条件在我们关注的场景下都是满足的:
    if (pi.last_update == pi.last_complete &&  // peer has no missing
        pi.last_update == pg->info.last_update) {  // peer is up to date
      // replica has no missing and identical log as us.  no need to
      // pull anything.
      // FIXME: we can do better here.  if last_update==last_complete we
      //        can infer the rest!
      dout(10) < < " osd." << *i << " has no missing, identical log" << dendl;
      pg->peer_missing[*i];
      continue;
    }
  ......
}

问题13:为啥要有WaitUpThru阶段?到底wait的是啥?

首先要理解up_thru引入的场景,简单来说就是up_thru是用来确认OSD上次正常完成peering流程的epoch版本,完成了peering流程就意味着OSD可能已经开始对外提供IO服务,就有可能有数据写入,有数据写入就意味着要检查pg上数据的完整性和一致性。如果没有这个字段,那么在下面的场景下(size=3,min_size=1),我们就无法判断OSD B上是否有写入过用户数据(如果直接认定B未写入过,就可能导致用户数据丢失):

epoch OSD up/down
101 A B C all up
102 B C A down
103 B C down
104 A C B down, AC up

上述假设场景下,epoch 103这个阶段,B可能是真的活着,也可能down了但还没报给monitor,如果B真的活着,那就有可能有数据写入,则在epoch 104的时候,AC虽然up了,也不能完成peering并接收客户端IO请求,否则在103到104这个时间段内用户数据可能丢失。

而如果引入up_thru这个概念,就能解决这个场景的困扰,只需要在每次peering完成时上报一次pg的up_thru版本号即可,如果103时B上报了up_thru为103,则表示他自己在单副本场景下运行过,可能有数据写入,否则up_thru就是102,表示B从来没自己单独对外提供IO服务过,因此在104阶段就AC可以完成peering并对外提供服务(C有最新的数据)。

有了上述理论基础,WaitUpThru阶段所做的工作和必要性就容易理解了。这个阶段就是更新pg的up_thru版本号并交给monitor进行决议并持久化保存,收到monitor回复的更新完毕的确认消息后,才可以表示peering真正的完成了。对应的代码流程是:

// OSD发送更up_thru请求给monitor
void OSD::process_peering_events(//OSD::PeeringWQ::_dequeue获取一批pg(默认20个),然后来这里处理,处理完进入Peering第一阶段GetInfo
  const list</pg><pg *> &pgs,          // 发请求给其他osd获取信息:querying info from osd.0,然后线程就结束了,其他阶段是通过事件触发的
  ThreadPool::TPHandle &handle
  )
{
  bool need_up_thru = false;
  ......
    need_up_thru = pg->need_up_thru || need_up_thru;
  ......
  if (need_up_thru)
    queue_want_up_thru(same_interval_since);
  ......
}

void OSD::queue_want_up_thru(epoch_t want)
{
  map_lock.get_read();
  epoch_t cur = osdmap->get_up_thru(whoami);
  if (want > up_thru_wanted) {
    ......
    send_alive();
  } else {  // 已经发送过更高版本的up_thru值,就不能再发送低版本的了,否则可能导致错误
    dout(10) < < "queue_want_up_thru want " << want << " <= queued " << up_thru_wanted 
         << ", currently " << cur << dendl;
  }
  map_lock.put_read();
}

void OSD::send_alive()
{
  if (!osdmap->exists(whoami))
    return;
  epoch_t up_thru = osdmap->get_up_thru(whoami);
  dout(10) < < "send_alive up_thru currently " << up_thru << " want " << up_thru_wanted << dendl;
  if (up_thru_wanted > up_thru) { // 再次确认发送的是最新版本
    up_thru_pending = up_thru_wanted;
    dout(10) < < "send_alive want " << up_thru_wanted << dendl;
    monc->send_mon_message(new MOSDAlive(osdmap->get_epoch(), up_thru_wanted)); // 发送的是MOSDAlive消息
  }
}

monitor端收到MOSDAlive消息的处理过程:

bool PaxosService::dispatch(PaxosServiceMessage *m)
{
  ......
  // preprocess
  if (preprocess_query(m)) 
    return true;  // easy!
  ......
  // update
  if (prepare_update(m)) {
    double delay = 0.0;
    if (should_propose(delay)) { // 这里主要是检查上次决议时间,如果已经超过paxos_propose_interval时长,则等待g_conf->paxos_min_wait(默认0.05s)之后开始决议,否则就等待够paxos_propose_interval(默认1s)之后再决议
      if (delay == 0.0) {
        propose_pending();
      } else {
        // delay a bit
        if (!proposal_timer) {
          proposal_timer = new C_Propose(this);
          dout(10) < < " setting proposal_timer " << proposal_timer << " with delay of " << delay << dendl;
          mon->timer.add_event_after(delay, proposal_timer);
        } else { 
          dout(10) < < " proposal_timer already set" << dendl;
        }
      }
    } else {
      dout(10) << " not proposing" << dendl;
    }
  }     
  return true;
}


bool OSDMonitor::preprocess_query(PaxosServiceMessage *m)
{
  ......
  case MSG_OSD_ALIVE:
    return preprocess_alive(static_cast<MOSDAlive*>(m));
  ......
}

bool OSDMonitor::preprocess_alive(MOSDAlive *m)
{
  ......
  dout(10) < < "preprocess_alive want up_thru " << m->want
       < < " from " << m->get_orig_source_inst() < < dendl;
  return false;
  ......
}

bool OSDMonitor::prepare_update(PaxosServiceMessage *m)
{
  ......
  case MSG_OSD_ALIVE:
    return prepare_alive(static_cast<MOSDAlive*>(m));
  ......
}

bool OSDMonitor::prepare_alive(MOSDAlive *m)
{
  ......
  dout(7) < < "prepare_alive want up_thru " << m->want < < " have " << m->version
      < < " from " << m->get_orig_source_inst() < < dendl;
  pending_inc.new_up_thru[from] = m->version;  // set to the latest map the OSD has
  wait_for_finished_proposal(new C_ReplyMap(this, m, m->version));  // 等待决议完成后通过回调函数_reply_map回复OSDMap
  return true;
}


void OSDMonitor::_reply_map(PaxosServiceMessage *m, epoch_t e)
{
  dout(7) < < "_reply_map " << e
      << " from " << m->get_orig_source_inst()
      < < dendl;
  send_latest(m, e); // 最终调用OSDMonitor::send_incremental
}

void OSDMonitor::send_incremental(PaxosServiceMessage *req, epoch_t first)
{
  ......
  // send some maps.  it may not be all of them, but it will get them
  // started.
  epoch_t last = MIN(first + g_conf->osd_map_message_max, osdmap.get_epoch());
  MOSDMap *m = build_incremental(first, last);
  m->oldest_map = get_first_committed();
  m->newest_map = osdmap.get_epoch();
  mon->send_reply(req, m);
  ......
}

WaitUpThru状态收到monitor回复的消息后就会进入Active状态。

问题14:日志合并是怎么做的?

简单来说,要先看本地和权威日志是否有重叠,没有重叠就没办法合并,只能通过backfill全量恢复本地数据。
如果有重叠,就类似两个保存k-v对的有序链表的合并及去重操作(复杂的地方在于同一个对象两份日志可能有冲突,即k
ey相同value不同),大致流程应该是:
对最老的日志,权威日志的更老,就以权威日志为准,把最老的那段时间补上(尾巴接上);
对最新的日志,权威日志如果更新,说明他保存的数据更全更新,因此以他为准,把本地OSD缺少的对象记录下来,后续的流程进行恢复(recovery);
对冲突的日志,仍然以权威日志为准,算出本地需要更新的对象,后续流程进行恢复(recovery)。

rbd-nbd map卷到222个的时候会失败

控制台报错信息:fork: retry: Resource temporarily unavailable

// rbd client日志:
2019-01-30 08:50:14.949238 7f772824fec0 -1 /home/nbs/jenkins/ceph-build/release/ceph-deb-stretch-x86_64-basic/sha1/dc18f441ea142687ac152894b689d59170a47301/WORKDIR/ceph-12.2.5+netease+stretch+1.1-19-gdc18f44/src/common/Thread.cc: In function 'void Thread::create(const char*, size_t)' thread 7f772824fec0 time 2019-01-30 08:50:14.948047
/home/nbs/jenkins/ceph-build/release/ceph-deb-stretch-x86_64-basic/sha1/dc18f441ea142687ac152894b689d59170a47301/WORKDIR/ceph-12.2.5+netease+stretch+1.1-19-gdc18f44/src/common/Thread.cc: 152: FAILED assert(ret == 0)

ceph version 12.2.5+netease+stretch+1.1-19-gdc18f44 (dc18f441ea142687ac152894b689d59170a47301) luminous (stable)
1: (ceph::__ceph_assert_fail(char const*, char const*, int, char const*)+0x102) [0x7f771ed1fab2]
2: (()+0x50ae55) [0x7f771ef8fe55]
3: (init_async_signal_handler()+0xe8) [0x557258dd9af8]
4: (()+0x15061) [0x557258dc6061]
5: (main()+0x9) [0x557258dc1f59]
6: (__libc_start_main()+0xf1) [0x7f771bf702e1]
7: (_start()+0x2a) [0x557258dc205a]
NOTE: a copy of the executable, or `objdump -rdS <executable>` is needed to interpret this.

首先怀疑是ulimit限制到了,但看了下没问题。

后来在syslog日志中看到了一条错误:

kernel: [ 6039.287966] cgroup: fork rejected by pids controller in /system.slice/ssh.service

所以就找到了原因,问题原因是我是用ssh过去在节点上执行的rbd-nbd map命令,而sshd进程可以fork的进程数量是有限制的。

尝试修改cgroup限制可以解决本问题:

$ sudo cat /sys/fs/cgroup/pids/system.slice/ssh.service/pids.max
4915 # 默认的4915数量太少了

$ sudo tee /sys/fs/cgroup/pids/system.slice/ssh.service/pids.max
32768 # 输入
32768 # ctrl+d退出输入即可

也可在root用户下直接执行:
echo 32768 > /sys/fs/cgroup/pids/system.slice/ssh.service/pids.max

修改后即可创建更多的进程。

Ceph OSD IO线程健康状态检查机制

目前看到H版本OSD IO线程健康状态有3种心跳机制(L版本对比看了下,基本没变化):
1. service线程:

CephContextServiceThread::entry() {
    while (1) {
      Mutex::Locker l(_lock);

      if (_cct->_conf->heartbeat_interval) {
        utime_t interval(_cct->_conf->heartbeat_interval, 0);
        _cond.WaitInterval(_cct, _lock, interval);
      } else
        _cond.Wait(_lock);
      ......
      _cct->_heartbeat_map->check_touch_file(); // 这里检查完只是修改一下心跳文件的时间戳,没有其他操作,目前我们心跳文件路径(heartbeat_file配置项)没有配置,所以这一步其实啥也不做
      _cct->refresh_perf_values(); // 这个是第4种隐藏机制,下面有说明
    }
    return NULL;
}
  1. OSD tick:
void OSD::tick()
{
  ......
  if (is_waiting_for_healthy()) {// 先判断OSD状态是否为STATE_WAITING_FOR_HEALTHY,会在start_waiting_for_healthy()这里更改为这个状态
    if (_is_healthy()) {//这里不常走到,一般是在启动流程才走
      dout(1) << "healthy again, booting" << dendl;
      set_state(STATE_BOOTING);
      start_boot();
    }
  }
  ......
}
  1. 其他OSD发过来的ping请求:
void OSD::handle_osd_ping(MOSDPing *m)
{
  ......
  switch (m->op) {

  case MOSDPing::PING:
  {
    ......
    if (!cct->get_heartbeat_map()->is_healthy()) {//检查IO线程健康状态
        dout(10) << "internal heartbeat not healthy, dropping ping request" << dendl;
        break; //如果不健康则不给ping请求方回包
    }
    // IO线程正常,回包给请求方
    Message *r = new MOSDPing(monc->get_fsid(),
                curmap->get_epoch(),
                MOSDPing::PING_REPLY,
                m->stamp);
    m->get_connection()->send_message(r);

在L版本中其实还有一种机制,基于perfcounter实现,在service线程里(第一种机制代码位置相同)会更新健康IO线程数量和总IO线程数量到perfcounter中的l_cct_total_workers、l_cct_unhealthy_workers两个计数器上,可惜OSD启动时没有enable这两个counter(使用CephContext::enable_perf_counter),搜索代码可以看到rgw和rbd_mirror两个模块enable了,如果有需要我们可以自己在OSD启动过程中enable起来(g_ceph_context->enable_perf_counter()即可)。之后就可以通过ceph daemon osd.0 perf dump来查看相关counter信息了。

心跳超时默认15s打告警日志,超过150s会导致OSD自杀。

2018-12-23 13:27:09.538268 7fca5cc91700  1 heartbeat_map is_healthy 'OSD::osd_op_tp thread 0x7fca0f5e9700' had timed out after 15
2018-12-23 13:27:09.538283 7fca5cc91700  1 heartbeat_map is_healthy 'OSD::osd_op_tp thread 0x7fca0f5e9700' had suicide timed out after 150

Ceph CPU&MEMORY profiling

环境信息

  • OS:debian 9 with kernel-4.9.65
  • Ceph:luminous-12.2.5

CPU profiling

有两个工具,Linux常用的是perf,这个工具比较通用,功能也非常强大,debian提供安装包,另一个是oprofile,debian没有安装包,需要自己编译,并且在虚拟机里面无法使用。

use perf

参考:
– http://docs.ceph.com/docs/master/dev/perf/
– https://www.ibm.com/developerworks/cn/linux/l-cn-perf1/index.html (推荐这篇,各种常用命令解释比较清楚)

安装非常简单,直接apt-get install linux-perf-4.9即可,其中4.9是内核大版本号。

主要用到的命令有:
– perf top/perf top -p 1234/perf top -e cpu-clock:u(用户态CPU时钟周期采样统计):实时观察进程CPU时钟周期采样计数信息
– perf stat/perf stat -p 1234:进程基础统计信息,用于高层次的分析进程情况,比如是IO密集还是CPU密集,或者先看下问题发生在哪个方面
– perf record -p 1234 -F 99 –call-graph dwarf — sleep 60:捕获进程CPU采样周期并且保存调用关系图
– perf report –call-graph caller/callee:报告展示调用关系,caller和callee是顺序相反的两个展示参数(调用者在上还是被调用者在上)
– perf list:查看所有perf支持的event列表,默认是cpu-cycles,这个是硬件事件,也可以用cpu-clock,这个是软件事件
– perf help xxx:查看命令帮助文档

配合FlameGraph脚本生成火焰图(需要先用perf record采集数据):
1. git clone https://github.com/brendangregg/FlameGraph
2. perf script | FlameGraph/stackcollapse-perf.pl > perf-fg
3. ./FlameGraph/flamegraph.pl perf-fg > perf.svg

perf-flamegraph

上图中条带越宽表示函数被采样到的次数占总采样次数比例越高(占用的CPU时间片越多),也就是越耗费CPU资源。

该工具的好处是不需要特殊的编译选项,实际测试加不加-fno-omit-frame-pointer这个CFLAGS看起来对结果没啥影响。

use oprofile

官方的文档已经太老了,新版本的oprofile已经没有opcontrol命令了:
– http://docs.ceph.com/docs/master/rados/troubleshooting/cpu-profiling/
– http://docs.ceph.com/docs/master/dev/cpu-profiler/

因此自己编译了一个新版本的,过程如下:
1. wget https://sourceforge.net/projects/oprofile/files/oprofile/oprofile-1.3.0/oprofile-1.3.0.tar.gz
2. tar xzf oprofile-1.3.0.tar.gz
3. cd oprofile-1.3.0
4. apt install libpopt-dev libiberty-dev
5. useradd oprofile
6. ./configure
7. make && make install

然后就可以使用operf命令了,但是我在虚拟机里面使用报错:

root@ceph-l oprofile-1.3.0 $ operf -h
Your kernel's Performance Events Subsystem does not support your processor type.

考虑到物理机上使用也要编译,因此也不再深究。

MEMORY profiling with google-perftools

参考:
– http://docs.ceph.com/docs/master/rados/troubleshooting/memory-profiling/
– http://goog-perftools.sourceforge.net/doc/heap_profiler.html (官方帮助文档)

我们L版本使用的是tcmalloc,因此可以直接使用google-perftools,该工具安装也是apt-get install google-perftools即可。

常用命令:
– ceph tell osd.0 heap start_profiler:开启内存使用统计
– ceph tell osd.0 heap dump:dump内存使用情况(需要先start_profiler),默认输出到日志目录
– google-pprof –text /usr/bin/ceph-osd /var/log/ceph/ceph-osd.0.profile.0001.heap:查看dump出来的内存使用情况
– google-pprof –text –base osd.1.profile.0002.heap /usr/bin/ceph-osd osd.1.profile.0003.heap:对比两次dump处理的内存堆使用情况,会把base里的内存减掉,方便查看内存增量
– ceph tell osd.0 heap stats:基础统计信息,不需要start_profiler就能使用
– ceph tell osd.2 heap release:释放tcmalloc的缓存,归还给OS,也不需要start_profiler
– ceph tell osd.0 heap stop_profiler:停止profiler

root@blkin ceph $ google-pprof --text /usr/bin/ceph-osd osd.1.profile.0002.heap                                       
Using local file /usr/bin/ceph-osd.
Using local file osd.1.profile.0002.heap.
Total: 6.9 MB
     3.6  52.8%  52.8%      3.6  52.8% ceph::logging::Log::create_entry
     1.2  17.1%  70.0%      1.2  17.1% ceph::buffer::raw_posix_aligned::raw_posix_aligned
     0.9  12.7%  82.7%      0.9  12.7% mempool::pool_allocator::allocate
     0.7   9.9%  92.6%      0.7   9.9% std::__cxx11::basic_string::_M_mutate
     0.2   3.6%  96.1%      0.2   3.6% __gnu_cxx::new_allocator::allocate
     0.1   1.9%  98.1%      0.1   1.9% ceph::buffer::raw_combined::create
     0.1   0.8%  98.8%      0.1   0.8% std::__cxx11::basic_string::_M_construct
     0.0   0.3%  99.1%      0.0   0.3% AsyncConnection::AsyncConnection
     0.0   0.3%  99.4%      0.0   0.3% decode_message
     0.0   0.2%  99.6%      0.0   0.5% OpTracker::create_request
     0.0   0.2%  99.8%      0.0   0.5% AsyncMessenger::add_accept
     0.0   0.1%  99.9%      0.0   0.2% BlueStore::_deferred_queue
     0.0   0.1%  99.9%      0.0   0.2% BlueStore::_get_deferred_op
     0.0   0.0% 100.0%      0.0   0.0% std::__cxx11::basic_string::reserve
     0.0   0.0% 100.0%      0.0   0.0% OSD::ms_verify_authorizer
     0.0   0.0% 100.0%      0.0   0.0% ceph::Formatter::create@277566a
     0.0   0.0% 100.0%      0.0   0.0% get_auth_session_handler
     0.0   0.0% 100.0%      0.0   0.1% AuthNoneClientHandler::build_authorizer
     0.0   0.0% 100.0%      0.0   0.0% OSD::handle_command

其中第一列是函数使用的内存量(MB),第二列是当前函数内存使用量占总内存使用量的百分比(也即第一列的占比),第三列是第二列的累加值,也即TopN函数使用内存占比,第四列是当前函数和所有他调用到的子函数的内存使用量(MB),第五列是第四列和内存使用总量的百分比,最后一列是函数名。

官方解释:
– The first column contains the direct memory use in MB.
– The fourth column contains memory use by the procedure and all of its callees.
– The second and fifth columns are just percentage representations of the numbers in the first and fifth columns.
– The third column is a cumulative sum of the second column (i.e., the kth entry in the third column is the sum of the first k entries in the second column.)

Ceph-blkin+lttng+zipkin性能追踪工具

部署

参考:
– http://docs.ceph.com/docs/master/dev/blkin/
– https://zipkin.io/pages/quickstart.html
– https://github.com/openzipkin/zipkin/blob/master/zipkin-server/README.md#cassandra-storage
– https://github.com/openzipkin/zipkin/tree/master/zipkin-storage/mysql-v1
– https://github.com/openzipkin/zipkin/blob/master/zipkin-storage/mysql-v1/src/main/resources/mysql.sql
– http://cassandra.apache.org/download/
– https://blog.csdn.net/vbirdbest/article/details/77802031

环境要求

  1. ceph-luminous版本编译环境
  2. Debian9.1 + 4.9.65内核
  3. 已部署好ceph集群

部署ceph+blkin

步骤

  1. 修改do_cmake.sh,在cmake那行加上”-DWITH_BLKIN=ON”打开blkin编译选项
  2. ./do_cmake.sh; cd build; make; make install
  3. 修改ceph.conf,打开blkin相关配置项:
[global]
osd_blkin_trace_all = true
rbd_blkin_trace_all = true
osdc_blkin_trace_all = true

部署babeltrace-zipkin

这个项目的用途是把blkin配合lttng收集的trace数据转换并发送给zipkin的数据采集器,zipkin聚合后存储起来供后续web查询使用。

步骤

  1. git clone https://github.com/vears91/babeltrace-zipkin
  2. cd babeltrace-zipkin/setup; 找到 ubuntu.sh,注意这个脚本里面会安装依赖包,以及编译并安装blkin-lib,下载zipkin.jar,最后还会git pull更新下babeltrace-zipkin,由于github clone项目时快时慢,因此建议直接在PC上下载项目的压缩包,单独手工安装,这个很快
  3. 提取出ubuntu.sh里面的pip3 install和apt-get install相关命令,直接手工安装即可
sudo apt-get install -y git
sudo apt-get install -y python3-pip
sudo apt-get install -y default-jre

sudo apt-get install -y libboost-thread-dev

sudo pip3 install --upgrade pip
sudo pip3 install scribe
sudo pip3 install thrift3babeltrace
sudo pip3 install facebook-scribe-py3
sudo pip3 install thriftpy
sudo pip3 install scribe_logger

sudo apt-get install -y babeltrace
sudo apt-get install -y python3-babeltrace

部署lttng

直接apt安装即可:

sudo apt-get install -y lttng-tools
sudo apt-get install -y lttng-modules-dkms
sudo apt-get install -y liblttng-ust-dev

安装完毕后有如下几个包:

ii  liblttng-ctl0:amd64                  2.9.3-1                                 amd64        LTTng control and utility library
ii  liblttng-ust-ctl2:amd64              2.9.0-2+deb9u1                          amd64        LTTng 2.0 Userspace Tracer (trace control library)
ii  liblttng-ust-dev:amd64               2.9.0-2+deb9u1                          amd64        LTTng 2.0 Userspace Tracer (development files)
ii  liblttng-ust-python-agent0:amd64     2.9.0-2+deb9u1                          amd64        LTTng 2.0 Userspace Tracer (Python agent native library)
ii  liblttng-ust0:amd64                  2.9.0-2+deb9u1                          amd64        LTTng 2.0 Userspace Tracer (tracing libraries)
ii  lttng-modules-dkms                   2.9.0-1                                 all          Linux Trace Toolkit (LTTng) kernel modules (DKMS)
ii  lttng-tools                          2.9.3-1                                 amd64        LTTng control and utility programs

具体哪几个有用我也不完全确定,lttng-tools这个肯定用到的,lttng-modules-dkms这个内核态的应该用不到,ceph都是用户态的。

部署zipkin

zipkin部署非常简单,只需要有jre环境,就可以通过下载可独立执行的zipkin.jar包即可运行。最新稳定版下载链接:
– https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec

zipkin的存储后端有多种:
– memory(默认):内存存储,非持久化,重启zipkin进程后数据丢失,需要重新导入
– MySQL:不解释
– Cassandra(官方推荐,最初支持的方案):个人理解是一种类似influxdb的时序数据库
– Elasticsearch:参考官网介绍,是一个分布式、RESTful 风格的搜索和数据分析引擎

我这边验证了前面3种方式,因此需要增加部署MySQL(mariadb)或者Cassandra数据库。

memory方式

直接执行java -jar zipkin.jar即可,web端口默认是9411,浏览器打开http://$IP:9411即可访问zipkin web页面。

注意内存方式下trace数据是非持久化的,重启后丢失。

mariadb方式

部署mariadb:
1. 使用apt安装包:apt-get install mariadb-common mariadb-server
2. 创建用户并添加权限(默认的root用户会连接失败):GRANT ALL PRIVILEGES ON *.* TO 'zipkin'@'%' IDENTIFIED BY 'admin123' WITH GRANT OPTION; FLUSH PRIVILEGES;
3. 创建数据库和schema:直接使用官方的sql脚本创建,https://github.com/openzipkin/zipkin/blob/master/zipkin-storage/mysql-v1/src/main/resources/mysql.sql ,保存脚本到zipkin.sql并执行mysql -Dzipkin < zipkin.sql
4. 如果数据库和zipkin不在一个节点上,还需要修改mariadb的监听地址段并重启服务,/etc/mysql/mariadb.conf.d/50-server.cnf的bind-address配置项

启动zipkin+mariadb:STORAGE_TYPE=mysql MYSQL_USER=zipkin MYSQL_HOST=127.0.0.1 MYSQL_PASS=admin123 java -jar zipkin.jar

相关环境变量:https://github.com/openzipkin/zipkin/blob/master/zipkin-server/README.md#mysql-storage

导入数据后,可以用mysql命令查看数据:

MariaDB [(none)]> use zipkin; 
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
MariaDB [zipkin]> show tables;
+---------------------+
| Tables_in_zipkin    |
+---------------------+
| zipkin_annotations  |
| zipkin_dependencies |
| zipkin_spans        |
+---------------------+
3 rows in set (0.00 sec)

MariaDB [zipkin]> select count(*) from zipkin_spans;
+----------+
| count(*) |
+----------+
|   130437 |
+----------+
1 row in set (0.00 sec)

Cassandra方式

部署Cassandra:

echo "deb http://www.apache.org/dist/cassandra/debian 311x main" | sudo tee -a /etc/apt/sources.list.d/cassandra.sources.list
curl https://www.apache.org/dist/cassandra/KEYS | sudo apt-key add -
sudo apt-get update
sudo apt-get install cassandra

启动及相关默认路径:

You can start Cassandra with sudo service cassandra start and stop it with sudo service cassandra stop. However, normally the service will start automatically. For this reason be sure to stop it if you need to make any configuration changes.
Verify that Cassandra is running by invoking nodetool status from the command line.
The default location of configuration files is /etc/cassandra.
The default location of log and data directories is /var/log/cassandra/ and /var/lib/cassandra.
Start-up options (heap size, etc) can be configured in /etc/default/cassandra.

时序数据库一般不需要特殊的schema配置,直接使用即可:STORAGE_TYPE=cassandra3 java -jar zipkin.jar

导入数据后可以用cqlsh命令查看数据库内容:

cqlsh> DESCRIBE KEYSPACES;                         

system_schema  system_auth  system  zipkin2  system_distributed  system_traces

cqlsh> use zipkin2;
cqlsh:zipkin2> DESCRIBE TABLES; 

dependency  span  trace_by_service_span  span_by_service

cqlsh:zipkin2> select * from span limit 1;

 trace_id         | ts_uuid                              | id               | annotation_query | annotations                           | debug | duration | kind | l_ep                                                            | l_service    | parent_id | r_ep | shared | span    | tags | trace_id_high | ts
------------------+--------------------------------------+------------------+------------------+---------------------------------------+-------+----------+------+-----------------------------------------------------------------+--------------+-----------+------+--------+---------+------+---------------+------------------
 7836119a76da5f76 | ac9912d0-032f-11e9-899e-f156728a4881 | 7836119a76da5f76 |         ░finish░ | [{ts: 1545183890557972, v: 'finish'}] |  null |     null | null | {service: 'objectcacher', ipv4: '0.0.0.0', ipv6: null, port: 0} | objectcacher |      null | null |   null | flusher | null |          null | 1545183890557972

(1 rows)

参考官网:http://cassandra.apache.org/download/

最终部署架构

ceph-blkin-zipkin

使用

  1. 确保lttng服务正常运行:systemctl status lttng-sessiond.service
  2. 停止ceph服务,包括所有服务端进程以及客户端进程(如果是客户端与服务端不再一个节点,也要单独部署lttng和使用打开了blkin的ceph)
  3. 创建lttng session,并添加trace event:
lttng create blkin-test
lttng enable-event --userspace zipkin:timestamp
lttng enable-event --userspace zipkin:keyval_integer
lttng enable-event --userspace zipkin:keyval_string
lttng start
  1. 启动所有ceph进程,以及客户端进程,并进行相关测试
  2. lttng stop停止session,其他可用命令:lttng list/status/view/destroy
  3. 使用babeltrace-zipkin工具导入数据到zipkin:cd /mnt/babeltrace-zipkin-master/; python3 babeltrace_zipkin.py ~/lttng-traces/blkin-wp2-20181218-160516/ust/uid/0/64-bit/ -p 9411 -s 127.0.0.1,其中-p 9411是zipkin的数据接收端口(也是web端口),-s是zipkin服务监听地址。数据导入过程比较耗时,需要等待,开始导入和导入完成都会有日志提示:
Sending traces to 127.0.0.1:9411 using http
Done sending
  1. 导入过程中如果出错可以查看zipkin的日志(前台执行的话看控制台输出就可以了)。
  2. 使用web进行数据查询:浏览器打开http://$IP:9411即可访问zipkin web页面。需要注意的是,查询页面的时间参数是指lttng日志文件里的采集时间,而非数据导入到zipkin的时间,另外还要注意数量参数,经过实测仅搜索指定数量的trace event数据,然后对这些数据进行排序后展示,而不是对所有数据排序后进行截取展示,因此建议时间采用自定义时间段方式,缩小时间段,减少数据量之后再适当设置较大的数量进行查找。

zipkin

zipkin

自定义扩展

相关名词

  1. trace event:跟踪事件类型如zipkin:timestamp、zipkin:keyval_integer、zipkin:keyval_string
  2. service:跟踪服务类型如osd.1、pg 1.0、messenger、objecter、librbd-855346e482f5-test-blkin-vol1等,对应代码中的trace_endpoint,ZTracer::Endpoint类型
  3. span:service的子项,标记一个跨度,跨函数使用,例如librbd里面有多个跨度,从ImageRequestWQ的“wq: write”到“writeback”等,osd默认只有一个“osd op”,pg有“pg op”和“replicated op”,对应代码中的osd_trace、pg_trace、journal_trace等,ZTracer::Trace类型,可以用ZTracer::Trace::init函数来初始化,通过在初始化时指定parent span来构建调用链关系,parent可以跨service,顶级span的id跟traceId相同,可以理解为第一个span的ID就是traceId
  4. annotation:span的子项,基本上是在同一个函数里面使用,ZTracer::Trace::event即可创建

参考:https://blog.csdn.net/manzhizhen/article/details/52811600、https://blog.csdn.net/manzhizhen/article/details/53865368

ceph扩展相关trace类型

service类型一般不需要扩展,已经基本全部包含了,这里重点介绍span和annotation两种类型的扩展方法。
span:

src\common\TrackedOp.h::class TrackedOp类里有如下定义:

public:
  ZTracer::Trace osd_trace;
  ZTracer::Trace pg_trace;
  ZTracer::Trace store_trace;
  ZTracer::Trace journal_trace;

我们只需要模仿它们来定义我们自己的span名称即可。

src\osd\OSD.cc::void OSD::ms_fast_dispatch(Message *m)中会对osd_trace进行初始化,可以参考他的初始化过程来初始化我们自己的span:

  if (m->trace) // m->trace是通过消息里面传递过来的,用来作为parent span来初始化当前span,parent span非常重要,如果不指定就无法生成调用链关系,当然如果你不关心前后流程,也可以不指定,作为一个单独的span来查询分析即可
    op->osd_trace.init("osd op", &trace_endpoint, &m->trace);

实际上我们需要扩展span的场景也很少,因为这几个span已经差不多可以包含所有关键流程了。

扩展完span就可以通过往span中添加annotation来进行实际的时间戳记录了,添加annotation也非常简单:

  op->osd_trace.event("enqueue op");
  op->osd_trace.keyval("priority", op->get_req()->get_priority()); // 这两个是keyval类型的annotation,前面是key,后面是value(支持string和integer类型数据的存储)
  op->osd_trace.keyval("cost", op->get_req()->get_cost());

keyval类型的annotation可以用来观察系统运行过程中某些关键变量的值,如关键运行参数等。

librbd库的使用

qemu、rbd-nbd等客户端都是使用librbd进行ceph rbd卷的IO访问,如果要深入理解librbd,那么尝试自己写一个client来访问rbd卷(控制操作、IO操作),肯定是一个不错的途径。

写了个C的(异步IO模式),C++的可以参考:https://blog.csdn.net/JDPlus/article/details/76522298

/* 注释就不详细写了,代码比较简单 */
#include <rados/librados.h>
#include <rbd/librbd.h>
#include <stdio.h>
#include <stdlib.h>


rados_t init_rados() {
    // we will use all of these below
    int ret = 0;
    rados_t rados = NULL;

    // 1. init rados object
    ret = rados_create(&rados, "admin"); // just use the client.admin keyring
    if (ret < 0) { // let's handle any error that might have come back
        printf("couldn't initialize rados! err %d\n", ret);
        return NULL;
    } else {
        printf("inited rados cluster object\n");
    }
    return rados;
}

rados_ioctx_t init_ioctx(rados_t rados) {
    int ret = 0;
    rados_ioctx_t io_ctx = NULL;

    // 2. read ceph config file
    ret = rados_conf_read_file(rados, "/etc/ceph/ceph.conf");
    if (ret < 0) {
        // This could fail if the config file is malformed, but it'd be hard.
        printf("failed to parse config file! err %d\n", ret);
        return NULL;
    }

    // 3. connect to ceph cluster
    ret = rados_connect(rados);
    if (ret < 0) {
        printf("couldn't connect to cluster! err %d\n", ret);
        return NULL;
    } else {
        printf("connected to the rados cluster\n");
    }

    // 4. init io context for rbd pool
    const char *pool_name = "rbd";
    ret = rados_ioctx_create(rados, pool_name, &io_ctx);
    if (ret < 0) {
        printf("couldn't setup ioctx! err %d\n", ret);
        rados_shutdown(rados);
        return NULL;
    } else {
        printf("created an ioctx for pool: rbd\n");
    }

    return io_ctx;
}

rbd_image_t init_image(rados_ioctx_t io_ctx) {
    int ret = 0;

    // 5. open rbd image
    rbd_image_t image;
    const char *image_name = "sotest";
    ret = rbd_open(io_ctx, image_name, &image, NULL);
    if (ret < 0) {
        printf("couldn't open rbd image! err %d\n", ret);
        return NULL;
    } else {
        printf("opened an rbd image: sotest\n");
    }

    return image;
}

int get_rbd_size(rbd_image_t image) {
    int ret = 0;
    uint64_t size = 0;

    // 6. get rbd image size
    ret = rbd_get_size(image, &size);
    if (ret < 0) {
        printf("couldn't get image size! err %d\n", ret);
        return EXIT_FAILURE;
    } else {
        printf("The size of the image is: %dMB\n", size/1024/1024);
    }

    return size;
}

void rbd_finish_aiocb(rbd_completion_t c, void *arg)
{
    // int ret = rbd_aio_wait_for_complete(c);
    int ret = rbd_aio_get_return_value(c);
    rbd_aio_release(c);

    // for aio read callback, the read data should be copied here to caller
    printf("aio callback: %d, %s\n", ret, (const char*)arg);
}

int aio_write(rbd_image_t image, const char *buff) {
    int off = 128;
    rbd_completion_t c;
    int ret = rbd_aio_create_completion((void *)buff, (rbd_callback_t) rbd_finish_aiocb, &c);
    if (ret < 0) {
        printf("create callback failed %s\n", ret);
        return ret;
    }

    int len = strlen(buff);
    ret = rbd_aio_write(image, off, len, buff, c);
    if (ret < 0) {
        printf("write to image failed %s\n", ret);
        return ret;
    }
    printf("write %s to image end\n", buff);

    return ret;
}

int aio_read(rbd_image_t image, char *buff) {
    int off = 128;
    int len = 10;
    rbd_completion_t c;
    int ret = rbd_aio_create_completion(buff, (rbd_callback_t) rbd_finish_aiocb, &c);
    if (ret < 0) {
        printf("create callback failed %s\n", ret);
        return ret;
    }
    memset(buff, 0, 128);
    ret = rbd_aio_read(image, off, len, buff, c);
    if (ret < 0) {
        printf("read from image failed %s\n", ret);
        return ret;
    }
    printf("read from image end\n");

    return ret;
}

int main() {
	int ret;
	char buff[128] = {0};
	int len;

    rados_t rados = init_rados();
    if (!rados) {
        perror("init_rados");
        return EXIT_FAILURE;
    }
    rados_ioctx_t io_ctx = init_ioctx(rados);
    if (!io_ctx) {
        perror("init_ioctx");
        rados_shutdown(rados);
        return EXIT_FAILURE;
    }

    rbd_image_t image = init_image(io_ctx);
    if (!image) {
        perror("init_image");
        rados_ioctx_destroy(io_ctx);
        rados_shutdown(rados);
        return EXIT_FAILURE;
    }

    int size = get_rbd_size(image);
    printf("image size: %d\n", size);

    sprintf(buff, "%s", "abcd123efg");
    aio_write(image, buff);

    aio_read(image, buff);

    // 7. close image, io context and rados object
    ret = rbd_close(image);
    if (ret < 0) {
        printf("couldn't close rbd image! err %d\n", ret);
        return EXIT_FAILURE;
    } else {
        printf("closed rbd image: sotest\n");
    }
    rados_ioctx_destroy(io_ctx);
    rados_shutdown(rados);

    return 0;

}

 

编译:gcc -g3 -O0 librbdtest.c -o librbdtest -lrados -lrbd

执行(需要先创建”rbd” pool和”sotest”卷):

$ ./librbdtest 
inited rados cluster object
connected to the rados cluster
created an ioctx for pool: rbd
opened an rbd image: sotest
The size of the image is: 128MB
image size: 134217728
write abcd123efg to image end
read  from image end
aio callback: 0, 
aio callback: 10, abcd123efg
closed rbd image: sotest

 

官方example:https://github.com/ceph/ceph/blob/master/examples/librbd/hello_world.cc

Ceph monmap消息编解码过程

本文源码基于luminous-12.2.5版本分析

最近在分析H版本librbd client无法兼容L版本Ceph集群的问题,提前说明下最终结论是H版本被之前的同时修改过才导致的不兼容,官方版本应该是兼容的。这篇文章正是问题分析过程的一次整理总结。

需求描述

需求是想要在同一个OpenStack环境中同时使用H、L版本两个Ceph集群做云主机系统盘、云硬盘存储系统,因此计算节点(也即librbd client节点)就可能分为H、L两个版本,众所周知,升级librbd动态库,需要重启云主机才能生效(除非对librbd做过较大的改动),重启所有云主机这个操作用户显然是不能接受的(当然也可以执行热迁移操作,但大批量的热迁移也存在耗时长、风险大的问题,况且还有部分云主机不支持热迁移),因此如果要上线L版本集群就必须做到兼容H版本client,这样才能保证L版本集群的卷能被当前正使用H版本client的云主机挂载、读写。H版本是之前遗留的,不能升级到L版本(主要考虑2个问题,1是风险高,二是代价大。实际上还有一个小问题就是H版本client不兼容L版本server,这个正是我本次需要解决的问题)。

问题描述

有两个问题:
1. 不修改server的crush参数情况下,客户端执行ceph -s、rbd ls等命令报错:
2018-10-12 15:37:50.117973 7f58f4298700  0 -- 10.182.2.32:0/3576177365  10.185.0.97:6789/0 pipe(0x7f58e0000c80 sd=3 :34392 s=1 pgs=0 cs=0 l=1 c=0x7f58e0005160).connect protocol feature mismatch, my 883ffffffffffff  peer c81dff8eea4fffb missing 400000000000000
2. 通过ceph osd crush tunables hammer命令修改crush参数之后,执行命令直接抛异常,原因是decode monmap的created字段时发生越界异常,位置是:
  void buffer::list::iterator::copy(unsigned len, char *dest)
  {
    if (p == ls->end()) seek(off);
    while (len > 0) {
      if (p == ls->end())
	throw end_of_buffer(); // 这里抛异常
      assert(p->length() > 0); 
      
      unsigned howmuch = p->length() - p_off;
      if (len < howmuch) howmuch = len;
      p->copy_out(p_off, howmuch, dest);
      dest += howmuch;

      len -= howmuch;
      advance(howmuch);
    }
  }

本次重点分析第二个问题,bt查看调用栈(调试的是rbd ls命令):

Thread 6 "rbd" hit Breakpoint 4, ceph::buffer::list::iterator::copy (this=0x7fffee122800, len=4, dest=0x7fffee1225dc "I]\026\022") at common/buffer.cc:936
936         if (p == ls->end()) seek(off);
(gdb) bt
#0  ceph::buffer::list::iterator::copy (this=0x7fffee122800, len=4, dest=0x7fffee1225dc "I]\026\022") at common/buffer.cc:936
#1  0x00007ffff2824284 in decode_raw<ceph_le32> (t=..., p=...) at ./include/encoding.h:57
#2  0x00007ffff281c2da in decode (v=@0x555557d350a0: 0, p=...) at ./include/encoding.h:103
#3  0x00007ffff281d023 in utime_t::decode (this=0x555557d350a0, p=...) at ./include/utime.h:103
#4  0x00007ffff281d060 in decode (c=..., p=...) at ./include/utime.h:312
#5  0x00007ffff2a1fd49 in MonMap::decode (this=0x555557d35050, p=...) at mon/MonMap.cc:74
#6  0x00007ffff2a151fd in decode (c=..., p=...) at ./mon/MonMap.h:240
#7  0x00007ffff2a0c050 in MonClient::handle_monmap (this=0x555557d35040, m=0x7fffd8000bd0) at mon/MonClient.cc:317
#8  0x00007ffff2a0bd09 in MonClient::ms_dispatch (this=0x555557d35040, m=0x7fffd8000bd0) at mon/MonClient.cc:274
#9  0x00007ffff2b9c9db in Messenger::ms_deliver_dispatch (this=0x555557d35a80, m=0x7fffd8000bd0) at ./msg/Messenger.h:582
#10 0x00007ffff2b9be57 in DispatchQueue::entry (this=0x555557d35c70) at msg/simple/DispatchQueue.cc:185
#11 0x00007ffff2bcd560 in DispatchQueue::DispatchThread::entry (this=0x555557d35e10) at msg/simple/DispatchQueue.h:103
#12 0x00007ffff296608c in Thread::entry_wrapper (this=0x555557d35e10) at common/Thread.cc:61
#13 0x00007ffff2965ffe in Thread::_entry_func (arg=0x555557d35e10) at common/Thread.cc:45
#14 0x00007ffff2335494 in start_thread (arg=0x7fffee123700) at pthread_create.c:333
#15 0x00007ffff0c45acf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:97

是在接收到服务端monmap消息后decode过程中抛的异常,具体是在decode created字段出错的。

void MonMap::decode(bufferlist::iterator &p)
{
  DECODE_START_LEGACY_COMPAT_LEN_16(3, 3, 3, p);
  ::decode_raw(fsid, p);
  ::decode(epoch, p); // 这里正常decode
  if (struct_v == 1) {
    vector<entity_inst_t> mon_inst;
    ::decode(mon_inst, p);
    for (unsigned i = 0; i < mon_inst.size(); i++) {
      char n[2];
      n[0] = '0' + i;
      n[1] = 0;
      string name = n;
      mon_addr[name] = mon_inst[i].addr;
    }
  } else {
    ::decode(mon_addr, p); // 这里开始出错
  }
  ::decode(last_changed, p);
  ::decode(created, p); // 这里抛的异常
  DECODE_FINISH(p);
  calc_ranks();
}

单步调试发现,在此之前last_changed字段decode出来就是空的,时间都是0,因此怀疑是更早之前就发生错误了,调试后发现最初的错误是在decode mon_addr时就发生了。

通过调试服务端ceph-mon encode过程,发现mon_addr和last_changed、created字段都是被添加到monmap消息体内的,并且通过调试L版本client,这几个字段也都能正常decode出来,因此确认是客户端问题或者服务端发送的monmap消息客户端不兼容。但看了下L版本服务端encode代码,当前版本是v5,并且最低兼容v3版本,而H版本decode是v3版本,也就是说社区代码在设计上兼容性应该是没问题的。因此需要继续分析问题出在哪里。

仔细对比了mon_addr的decode和encode流程(主要是H版本和L版本的差异),发现L版本encode过程检查连接的features,并走了不同的encode过程。因此问题聚焦在了客户端和服务端协商features过程上。

  void encode(bufferlist& bl, uint64_t features) const {
    if ((features & CEPH_FEATURE_MSG_ADDR2) == 0) { // 检查features 59标记位
      ::encode((__u32)0, bl); // 老版本编码流程
      ::encode(nonce, bl);
      sockaddr_storage ss = get_sockaddr_storage();
#if defined(__linux__) || defined(DARWIN) || defined(__FreeBSD__)
      ::encode(ss, bl);
#else
      ceph_sockaddr_storage wireaddr;
      ::memset(&wireaddr, '\0', sizeof(wireaddr));
      unsigned copysize = MIN(sizeof(wireaddr), sizeof(ss));
      // ceph_sockaddr_storage is in host byte order
      ::memcpy(&wireaddr, &ss, copysize);
      ::encode(wireaddr, bl);
#endif
      return;
    }
    ::encode((__u8)1, bl); // 新版本编码流程
    ENCODE_START(1, 1, bl);
    ::encode(type, bl);
    ::encode(nonce, bl);
    __u32 elen = get_sockaddr_len();
    ::encode(elen, bl);
    if (elen) {
      bl.append((char*)get_sockaddr(), elen);
    }
    ENCODE_FINISH(bl);
  }

也就是features的第59位是怎么设置上的(1<<59),看了下H版本的代码,发现了这个feature:

#define CEPH_FEATURE_MON_METADATA (1ULL<<50)
/* ... */
#define CEPH_FEATURE_HAMMER_0_94_4 (1ULL<<55)

#define CEPH_OSD_PARTIAL_RECOVERY  (1ULL<<59) /* recover partial extents for objects */

#define CEPH_FEATURE_RESERVED2 (1ULL<<61)  /* slow down, we are almost out... */
#define CEPH_FEATURE_RESERVED  (1ULL<<62)  /* DO NOT USE THIS ... last bit! */
#define CEPH_FEATURE_RESERVED_BROKEN  (1ULL<<63)  /* DO NOT USE THIS; see below */

按道理在CEPH_FEATURE_HAMMER_0_94_4(1<<55)之后应该不会再有其他feature了才对,但这个CEPH_OSD_PARTIAL_RECOVERY为啥被加入了?去看了下官方代码,发现没有这个feature,然后git log看了下我们的代码仓库,果然是我们自己人加上的。至此问题原因已经清晰。

L版本中用CEPH_FEATURE_MSG_ADDR2(1<<59)来标记是否为新版本消息,这一位是0表示是老版本(未设置该bit表示老版本),但是H版本里面正好用到了这一位CEPH_OSD_PARTIAL_RECOVERY(1<<59),导致L版本服务端检测失败,误认为是新版本客户端,使用了新的编码方式对monmap中的monitor地址信息进行编码,导致客户端无法解码。

改动方案:

由于client端升级困难,因此对L版本服务端进行修改,使其兼容H版本client,增加对其他bit位的检查,当发现client是H版本后,就走老的encode流程,修改之后客户端可以正常使用(包括基本命令行和qemu启动云主机)。

 

此次问题分析的难点主要是:

  1. 不熟悉ceph源码,尤其是消息编解码过程
  2. 编译的第一个环境没有去掉编译优化(-O2),增加debug等级(-g),改成(-O0 -g3 -gdwarf-4)修改之后调试起来就很方便了,https://www-zeuthen.desy.de/unix/unixguide/infohtml/gdb/Inline-Functions.html
  3. 确定服务端是否把monitor地址信息编码到消息体过程被wireshark误导了一段时间

使用tcpdump+wireshark解析Ceph网络包

记得之前在通读docs.ceph.com上的文档时(http://docs.ceph.com/docs/luminous/dev/wireshark/),有提到过wireshark支持对Ceph网络数据包进行解析,于是试着用tcpdump抓了monitor和client的数据包,然后导入到wireshark进行解析,对H版本来说确实好用,但对L版本来说,monmap解析貌似支持不太好,monitor address地址解析不出来,估计也是只支持低版本的编码协议。参考:https://www.wireshark.org/docs/dfref/c/ceph.html

tcpdump命令(在client节点执行数据包比较少,服务端对接的客户端比较多因此数据包也比较多,当然也可以加入更多的过滤条件来精确抓包):tcpdump -i eth0 host 192.168.0.2 and port 6789 -w ceph.cap

之后把抓到的ceph.cap数据导入到windows系统的wireshark软件中,我下载的是2.6.2版本(官网有2.6.4版本,但是下载困难,就在国内找了软件站下载了2.6.2的)。导入之后就可以自动分析出结果了。

L版本抓包结果(monmap包解析不太好,认为包有问题“malformed packet”,monitor地址信息解析失败,跟我遇到的问题一样,因此误导了我2天,我一直认为服务端就是没有把地址编码进来,客户端才解析不出来,最后在服务端和客户端分别单步调试才发现是编码进去了的):

 

H版本数据包解析就比较完美了:

Mon Map:

OSD Map:

 

Mon Map编解码过程分析

关键数据结构都定义在src\include\buffer.h中,主要包括:

  • buffer::ptr
    •  _raw:保存实际的编码消息数据
    • buffer::ptr::iterator :遍历ptr的迭代器类,提供各种函数用来寻址ptr中的数据
  • buffer::list
    • _buffers:保存ptr的list,也即保存多条编码数据
    • append_buffer:ptr类型,4K对齐,保存编码数据,,其append操作实际是append到ptr._raw,之后会把它push_back到_buffers中,push_back之前会通过ptr构造函数填充ptr的_raw、_off、_len等数据,也即把_raw中保存的数据的偏移量和长度也保存起来,解码时使用
    • buffer::list::iterator:遍历_buffers的迭代器,继承自buffer::list::iterator_impl,主要是对外提供接口,对内封装了iterator_impl的相关接口
    • buffer::list::iterator_impl:遍历_buffers的迭代器实际实现类,主要实现有advance、seek、copy等函数,用来从_buffers里取出数据,advance函数用来在copy函数执行过程中进行ptr内或_buffers的多个ptr前后跳转,有两种场景,一中是跳转还在当前ptr内(ptr内取数据),另外一种是_buffers list中一个ptr已经copy完(跨ptr取数据),需要跳转到下一个ptr对象继续copy。seek函数也是利用advance函数完成数据寻址操作。

以L版本代码为例,编解码代码如下:

void MonMap::encode(bufferlist& blist, uint64_t con_features) const
{
  /* we keep the mon_addr map when encoding to ensure compatibility
   * with clients and other monitors that do not yet support the 'mons'
   * map. This map keeps its original behavior, containing a mapping of
   * monitor id (i.e., 'foo' in 'mon.foo') to the monitor's public
   * address -- which is obtained from the public address of each entry
   * in the 'mons' map.
   */ // mon_addr是为了兼容老版本,新版本直接用mon_info了
  map<string,entity_addr_t> mon_addr;
  for (map<string,mon_info_t>::const_iterator p = mon_info.begin();
       p != mon_info.end();
       ++p) {
    mon_addr[p->first] = p->second.public_addr;
  }

  // con_features是client和server建立connection时协商的,也就是二者支持的feature进行&操作
  if ((con_features & CEPH_FEATURE_MONNAMES) == 0) {
    __u16 v = 1; // v1版本编码过程
    ::encode(v, blist);
    ::encode_raw(fsid, blist);
    ::encode(epoch, blist);
    vector<entity_inst_t> mon_inst(mon_addr.size());
    for (unsigned n = 0; n < mon_addr.size(); n++)
      mon_inst[n] = get_inst(n);
    ::encode(mon_inst, blist, con_features);
    ::encode(last_changed, blist);
    ::encode(created, blist);
    return;
  }

  if ((con_features & CEPH_FEATURE_MONENC) == 0) {
    __u16 v = 2; // v2版本编码过程
    ::encode(v, blist);
    ::encode_raw(fsid, blist);
    ::encode(epoch, blist);
    ::encode(mon_addr, blist, con_features);
    ::encode(last_changed, blist);
    ::encode(created, blist);
  }

  ENCODE_START(5, 3, blist);  // v5版本编码过程,最低兼容版本是v3,blist是编码输出缓冲区
  ::encode_raw(fsid, blist);  // 编码fsid
  ::encode(epoch, blist);  // 编码epoch
  ::encode(mon_addr, blist, con_features);  // 编码mon_addr,注意con_features参数
  ::encode(last_changed, blist);  // 编码last_changed时间
  ::encode(created, blist);  // 编码created时间
  ::encode(persistent_features, blist);  // v5版本新增字段,v3版本decode时忽略,下同
  ::encode(optional_features, blist);
  // this superseeds 'mon_addr'
  ::encode(mon_info, blist, con_features);
  ENCODE_FINISH(blist);  // 结束编码
}

void MonMap::decode(bufferlist::iterator &p)
{ // 解码过程是编码过程的反向操作,也是按编码顺序来的
  map<string,entity_addr_t> mon_addr;
  DECODE_START_LEGACY_COMPAT_LEN_16(5, 3, 3, p);
  ::decode_raw(fsid, p);
  ::decode(epoch, p);
  if (struct_v == 1) {
    vector<entity_inst_t> mon_inst;
    ::decode(mon_inst, p);
    for (unsigned i = 0; i < mon_inst.size(); i++) {
      char n[2];
      n[0] = '0' + i;
      n[1] = 0;
      string name = n;
      mon_addr[name] = mon_inst[i].addr;
    }
  } else {
    ::decode(mon_addr, p);
  }
  ::decode(last_changed, p);
  ::decode(created, p);
  if (struct_v >= 4) {
    ::decode(persistent_features, p);
    ::decode(optional_features, p);
  }
  if (struct_v >= 5) {
    ::decode(mon_info, p);
  } else {
    // we may be decoding to an existing monmap; if we do not
    // clear the mon_info map now, we will likely incur in problems
    // later on MonMap::sanitize_mons()
    mon_info.clear();
  }
  DECODE_FINISH(p);
  sanitize_mons(mon_addr);
  calc_ranks();
}

encode调用栈:

Thread 15 "ms_dispatch" hit Breakpoint 1, MonMap::encode (this=0x7fff39218470, blist=..., con_features=2305244844532236283)
    at /mnt/ceph-Luminous/src/mon/MonMap.cc:138
138     {
(gdb) bt
#0  MonMap::encode (this=0x7fff39218470, blist=..., con_features=2305244844532236283) at /mnt/ceph-Luminous/src/mon/MonMap.cc:138
#1  0x00005560771f483f in Monitor::send_latest_monmap (this=0x556081f94800, con=0x556082b50800) at /mnt/ceph-Luminous/src/mon/Monitor.cc:5116
#2  0x00005560772c04a5 in AuthMonitor::prep_auth (this=0x556081fbb800, op=..., paxos_writable=false) at /mnt/ceph-Luminous/src/mon/AuthMonitor.cc:482
#3  0x00005560772be9d0 in AuthMonitor::preprocess_query (this=0x556081fbb800, op=...) at /mnt/ceph-Luminous/src/mon/AuthMonitor.cc:292
#4  0x000055607741e850 in PaxosService::dispatch (this=0x556081fbb800, op=...) at /mnt/ceph-Luminous/src/mon/PaxosService.cc:74
#5  0x00005560771ebf54 in Monitor::dispatch_op (this=0x556081f94800, op=...) at /mnt/ceph-Luminous/src/mon/Monitor.cc:4234
#6  0x00005560771ebb3d in Monitor::_ms_dispatch (this=0x556081f94800, m=0x5560828ad900) at /mnt/ceph-Luminous/src/mon/Monitor.cc:4209
#7  0x0000556077228a9e in Monitor::ms_dispatch (this=0x556081f94800, m=0x5560828ad900) at /mnt/ceph-Luminous/src/mon/Monitor.h:899
#8  0x0000556077a3aab9 in Messenger::ms_deliver_dispatch (this=0x556082797000, m=0x5560828ad900) at /mnt/ceph-Luminous/src/msg/Messenger.h:668
#9  0x0000556077a39a8a in DispatchQueue::entry (this=0x556082797180) at /mnt/ceph-Luminous/src/msg/DispatchQueue.cc:197
#10 0x000055607773443e in DispatchQueue::DispatchThread::entry (this=0x556082797328) at /mnt/ceph-Luminous/src/msg/DispatchQueue.h:101
#11 0x0000556077873079 in Thread::entry_wrapper (this=0x556082797328) at /mnt/ceph-Luminous/src/common/Thread.cc:79
#12 0x0000556077872fae in Thread::_entry_func (arg=0x556082797328) at /mnt/ceph-Luminous/src/common/Thread.cc:59
#13 0x00007fee9dfe1494 in start_thread (arg=0x7fee90a88700) at pthread_create.c:333
#14 0x00007fee9a0ccacf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:97

decode调用栈:

// ceph消息decode过程(L版本)
Thread 9 "ms_dispatch" hit Breakpoint 3, ceph::buffer::ptr::copy_out (this=0x7fffdc002e20, o=82, l=8, dest=0x555555e6a464 "\vK\311[)\335Q\032")
    at /mnt/ceph/src/common/buffer.cc:1035
1035        assert(_raw);
(gdb) bt
#0  ceph::buffer::ptr::copy_out (this=0x7fffdc002e20, o=82, l=8, dest=0x555555e6a464 "\vK\311[)\335Q\032") at /mnt/ceph/src/common/buffer.cc:1035
#1  0x00007ffff72eb1d6 in ceph::buffer::list::iterator_impl<false>::copy (this=0x7fffe50f9e10, len=8, dest=0x555555e6a464 "\vK\311[)\335Q\032")
    at /mnt/ceph/src/common/buffer.cc:1248
#2  0x00007ffff72e14cc in ceph::buffer::list::iterator::copy (this=0x7fffe50f9e10, len=8, dest=0x555555e6a464 "\vK\311[)\335Q\032")
    at /mnt/ceph/src/common/buffer.cc:1434
#3  0x0000555555942ee9 in utime_t::decode (this=0x555555e6a464, p=...) at /mnt/ceph/src/include/utime.h:133
#4  0x0000555555942f0f in decode (c=..., p=...) at /mnt/ceph/src/include/utime.h:390
#5  0x00007fffedca7b88 in MonMap::decode (this=0x555555e6a448, p=...) at /mnt/ceph/src/mon/MonMap.cc:210
#6  0x00007fffedc8f676 in decode (c=..., p=...) at /mnt/ceph/src/mon/MonMap.h:346
#7  0x00007fffedc82a2e in MonClient::handle_monmap (this=0x555555e6a438, m=0x7fffdc000f60) at /mnt/ceph/src/mon/MonClient.cc:331
#8  0x00007fffedc8253b in MonClient::ms_dispatch (this=0x555555e6a438, m=0x7fffdc000f60) at /mnt/ceph/src/mon/MonClient.cc:272
#9  0x00007fffedccfac9 in Messenger::ms_deliver_dispatch (this=0x555555e6b520, m=0x7fffdc000f60) at /mnt/ceph/src/msg/Messenger.h:668
#10 0x00007fffedcce884 in DispatchQueue::entry (this=0x555555e6b6a0) at /mnt/ceph/src/msg/DispatchQueue.cc:197
#11 0x00007fffedea5c54 in DispatchQueue::DispatchThread::entry (this=0x555555e6b848) at /mnt/ceph/src/msg/DispatchQueue.h:101
#12 0x00007fffee04376d in Thread::entry_wrapper (this=0x555555e6b848) at /mnt/ceph/src/common/Thread.cc:79
#13 0x00007fffee0436a2 in Thread::_entry_func (arg=0x555555e6b848) at /mnt/ceph/src/common/Thread.cc:59
#14 0x00007fffeb626494 in start_thread (arg=0x7fffe50fe700) at pthread_create.c:333
#15 0x00007fffea07bacf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:97
(gdb) n
2018-10-19 12:18:12.443495 7fffe7903700 -1 asok(0x555555d8c810) AdminSocket: error writing response length (32) Broken pipe
1036        if (o+l > _len)
(gdb) 
1038        char* src =  _raw->data + _off + o;
(gdb) 
1039        maybe_inline_memcpy(dest, src, l, 8);
(gdb) 
1040      }
(gdb) p *(utime_t*)dest // 可以通过强制类型转换打印出decode的数据
$35 = {tv = {tv_sec = 1534409040, tv_nsec = 303455561}}
(gdb) c
Continuing.

 

接下来要说的是具体的encode和decode流程,以我调试的mon_addr为例:

encode

// ::encode(mon_addr, blist, con_features);调用这个,但没理解为啥是std::map<T,U,Comp,Alloc>& m
// mon_addr定义是map<string,entity_addr_t> mon_addr;
template<class T, class U, class Comp, class Alloc,
	 typename t_traits=denc_traits<T>, typename u_traits=denc_traits<U>>
inline typename std::enable_if<!t_traits::supported ||
				 !u_traits::supported>::type
  encode(const std::map<T,U,Comp,Alloc>& m, bufferlist& bl, uint64_t features)
{
  __u32 n = (__u32)(m.size()); // 获取map长度,也即接下来要编码几个monitor信息
  encode(n, bl);  // 先编码消息体长度,防止解码的时候越界
  for (auto p = m.begin(); p != m.end(); ++p) { // 遍历map,逐项编码,每一项对应一个monitor
    encode(p->first, bl, features); // monitor名称,string类型
    encode(p->second, bl, features); // monitor地址,struct entity_addr_t类型
  }
}
// encode(n, bl)进入实际的编码过程是(n是__u32类型也就是uint32_t)
WRITE_INTTYPE_ENCODER(uint32_t, le32) // 这是个宏定义如下:

// -----------------------------------
// int types

#define WRITE_INTTYPE_ENCODER(type, etype)				\
  inline void encode(type v, bufferlist& bl, uint64_t features=0) {	\
    ceph_##etype e;	// 类型转换,转换为ceph_le(little endian)				                \
    e = v;    // 这里并没有进行大小端转换,都是小端,src/include/byteorder.h:71 -> src/include/byteorder.h:63                                                         \
    encode_raw(e, bl);	// 裸数据编码,-> src/include/encoding.h:69						\
  }									\
  inline void decode(type &v, bufferlist::iterator& p) {		\
    ceph_##etype e;							\
    decode_raw(e, p);							\
    v = e;								\
  }


// --------------------------------------
// base types

template<class T>
inline void encode_raw(const T& t, bufferlist& bl)
{
  bl.append((char*)&t, sizeof(t)); // -> src/common/buffer.cc:1935
}
  void buffer::list::append(const char *data, unsigned len)
  {
    while (len > 0) {
      // put what we can into the existing append_buffer.
      unsigned gap = append_buffer.unused_tail_length();
      if (gap > 0) {
        if (gap > len) gap = len;
    //cout << "append first char is " << data[0] << ", last char is " << data[len-1] << std::endl;
        append_buffer.append(data, gap); // buffer::list::append_buffer,buffer::ptr类型
        append(append_buffer, append_buffer.length() - gap, gap); // -> buffer::list::append	// add segment to the list
        len -= gap;
        data += gap;
      }
      if (len == 0)
        break;  // done!
      
      // make a new append_buffer.  fill out a complete page, factoring in the
      // raw_combined overhead.
      size_t need = ROUND_UP_TO(len, sizeof(size_t)) + sizeof(raw_combined);
      size_t alen = ROUND_UP_TO(need, CEPH_BUFFER_ALLOC_UNIT) -
	sizeof(raw_combined);
      append_buffer = raw_combined::create(alen, 0, get_mempool());
      append_buffer.set_length(0);   // unused, so far.
    }
  }
......
  void buffer::list::append(const ptr& bp, unsigned off, unsigned len)
  {
    assert(len+off <= bp.length());
    if (!_buffers.empty()) {
      ptr &l = _buffers.back();
      if (l.get_raw() == bp.get_raw() &&
	  l.end() == bp.start() + off) {
	// yay contiguous with tail bp!
	l.set_length(l.length()+len);
	_len += len;
	return;
      }
    }
    // add new item to list
    push_back(ptr(bp, off, len)); // -> src/include/buffer.h:768
  }
    void push_back(ptr&& bp) {
      if (bp.length() == 0)
	return;
      _len += bp.length();
      _buffers.push_back(std::move(bp)); // buffer::list::_buffers类型为std::list<ptr>
    }

其他几个字段也是类似过程,就不做分析,差别就是编码的数据类型不一样,比如string、struct等,string属于基础数据类型,encode.h有对应的encode函数,struct数据类型则需要在对应的struct结构体定义其自己的encode函数,如entity_addr_t::encode (src\msg\msg_types.h)。

decode过程也是类似,根据decode的数据类型,找到对应的decode函数,按段解码即可。数据类型是预先定义好的,如decode(mon_addr, p),mon_addr的类型是已知的,因此其decode函数也可以找到(一般都是跟encode放在一起),只不过由于encode.h中有很多的宏,不好找到源码而已,配合gdb单步调试应该容易很多(记得在do_cmake.sh里的cmake命令那行加上-O0 -g3 -gdwarf-4这几个CXX_FLAGS编译选项: -DCMAKE_CXX_FLAGS="-O0 -g3 -gdwarf-4" -DCMAKE_BUILD_TYPE=Debug)。一般是先解码出消息体长度,然后再逐条解码。总之一切过程都是预先定义好的,一旦收到的消息内容与预设的解码方案不匹配,就会导致各种错误。这就是编解码协议存在的原因。(话说这也是我第一次接触编解码协议,看完这些流程还有点小激动)。

encode是把数据存入消息体,decode是从消息体取出数据,最底层的编码解码都是按字节完成的,编码时会强转为char *类型,底层解码也不区分数据类型,由上层使用方负责进行转换,对应的结构体都定义在src\include\buffer.h中,相关的函数实现在src\common\buffer.cc中。

decode的核心是ceph::buffer::ptr::copy_out () (/mnt/ceph/src/common/buffer.cc:1035),最终都是由它把数据从消息体里取出来的。相关的调用栈可以参考上面贴出来的decode调用栈。

下面附上我在调试过程中手绘的协议字段表格:

L版本:

H版本:

 

任意整数以内的加减法口算练习题生成web服务源码及搭建过程

提示:写这个web服务是为了练手玩,实际上已经有很多web(基于html或其他)、桌面应用、手机APP等支持题目生成功能了,并且手机APP还支持OCR识别判断答案是否正确,比如我试了一个小猿口算APP就挺好:http://kousuan.yuanfudao.com/

先上服务链接:http://aspirer.wang:3389/kousuan/7

链接的最后一个数字是可以修改的,改成几就是生成几以内的加减法练习题(比如上面的链接就是生成7以内的加减法口算题目,每次刷新都是新的题目不会重复)。

儿子上一年级经常有口算练习题,老师发的是一张习题纸,一共100道题,需要家长复印,但是存在三个小问题:一是复印出来的题目完全一样(有一次我发现儿子做题居然在参考前面一张。。。);二是打印不方便,必须得复印,有些家长是没有复印机的(复印还要带上原件,老师发下来的时候家长不一定能及时拿到原件);三是想自己提前给孩子出题练习其他更大数字的加减法不方便。

有了这个web站,后面还可以稍微修改下,支持生成乘法、除法的口算题。

整体部署架构:nginx+uwsgi+bottle,python编写的web后台服务。

部署过程参考资料:

  1. 使用bottle.py体验WSGI服务
  2. Nginx 部署Bottle + uwsgi

使用nginx的原因是我的博客就是用的它,跟博客部署在一起了,只是端口不同。

源码:

root@myblog:~/kousuan# ls *.py
bottle.py  gen.py  kousuan.py

共3个文件:bottle.py是bottle wsgi框架,可以pip install安装,也可以直接拷贝源文件过来,非常方便。gen.py是为了方便后台测试用的,python执行它可以直接打印出题目。kousuan.py是给uwsgi用的,算是wsgi配置文件,当然里面也有一些其他代码,主要是生成html模板文件,以及配置wsgi router。

文件都很短,这里直接贴出来,不放github了:

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import random
import sys


def add(a, b):
    return a + b

def sub(a, b):
    return a - b

FUNC = {'+': add, '-': sub}

def gen_one_question(upper):
    f = random.randint(0, len(FUNC.keys()) - 1)
    i = random.randint(0, upper)
    j = random.randint(0, upper)

    return f, i, j

def gen(upper):
    content = ''
    for row in range(0, 25):
        line = ''
        for col in range(0, 4):
            r = -1
            while (r < 0 or r > upper):
                f, a, b = gen_one_question(upper)
                fv = FUNC.values()[f]
                r = fv(a, b)
                fk = FUNC.keys()[f]
            line += ' '.join([str(a), fk, str(b), '=', ', '])
        line += '\n'
        content += line

    return content


if __name__ == '__main__':
    num = sys.argv[1]
    head = num + '以内加减法口算练习\n学号:, 姓名:, 用时:,\n'
    content = gen(int(num))
    print head, content

 

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import bottle
from bottle import route, run, template

from gen import gen

mybottle = bottle.Bottle()

@mybottle.route('/kousuan')
@mybottle.route('/kousuan/')
@mybottle.route('/')
def index():
    return '''网址有误,请使用"/kousuan/数字",如"/kousuan/7"'''


@mybottle.route('/kousuan/')
def index(num):
    try:
        return _gen(num)
    except Exception as ex:
        print ex

def _gen(num):
    if not num.isdigit():
        return '''网址有误,最后一位必须是数字!'''

    content = gen(int(num))
    space = ' '
    html_content = '''
        
        
        

%s以内加减法口算练习

学号:%s 姓名:%s 用时:%s
''' % (num, space*6, space*16, space*12) for l in content.splitlines(): l = l.strip()[:-2] line = ' \n' html_content += line html_content += '''
' + l.replace(',', '     ') line += '    
''' return html_content application=mybottle #run(host='0.0.0.0', port=3389)

html格式很简单,就没用专门的模板渲染框架如jinjia2等。

uwsgi配置文件:

[uwsgi]
socket = 127.0.0.1:10059
chdir = /root/kousuan
master = true
plugins = python
file = kousuan.py
uid = root # 我这边用www-data报权限错误,就改成了root
gid = root

nginx配置文件:

server {
    listen 3389;
    server_name aspirer.wang;
    root /root/kousuan;

    location / {
        try_files $uri @uwsgi;
    }

    location @uwsgi {
        include uwsgi_params;
        uwsgi_pass 127.0.0.1:10059;
    }
}

上面两个文件需要在对应的enabled目录下建立软链接,具体参考上面的部署过程参考资料(第二个链接)。

uwsgi和nginx的安装就不说了,apt就行。

部署好之后重启uwsgi和nginx服务就可以了。

 

监控:

为了及时发现web故障,用监控宝给3389 tcp端口和网站都加了监控,出现不可用会发短信和邮件通知。(我的博客用他们免费版用了这么久,也给人家打个广告)。

 

10.22更新:

  1.  修改了题目生成方法,大幅减少了包含0的题目的数量
#!/usr/bin/python
# -*- coding: UTF-8 -*-

import random
import sys


def add(a, b):
    return a + b

def sub(a, b):
    return a - b

FUNC = {'+': add, '-': sub}

def gen_one_question(lower, upper):
    f = random.randint(0, len(FUNC.keys()) - 1)
    i = random.randint(lower, upper)
    j = random.randint(lower, upper)

    return f, i, j

def gen(upper):
    content = ''
    for row in range(0, 25):
        line = ''
        for col in range(0, 4):
            r = -1
            while (r < 0 or r > upper):
                lower = random.randint(0, upper/2)
                f, a, b = gen_one_question(lower, upper)
                fv = FUNC.values()[f]
                r = fv(a, b)
                if ((a + b == 0) and (upper > 0)):
                    r = -1
                    continue
                fk = FUNC.keys()[f]
            line += ' '.join([str(a), fk, str(b), '=', ', '])
        line += '\n'
        content += line

    return content


if __name__ == '__main__':
    num = sys.argv[1]
    head = num + '以内加减法口算练习\n学号:, 姓名:, 用时:,\n'
    content = gen(int(num))
    print head, content

 

其他:

生成题目时是暴力穷举符合条件的题目,其实可以根据每个题目的类型(加法或减法)以及生成的第一个数字,来限定第二个数字的随机范围,保证一次就可以生成符合条件的题目,可以很大程度减少计算量,不过对于这么小的程序和用户量的场景来说,这一点点计算量也就无所谓了。