ceph osd启动及peering过程中osdmap相关加载和更新流程




本文基于ceph H版本代码,目的是为了调研启动OSD之前vmtouch需要预加载哪些osdmap文件?以及预加载后对peering过程的影响。

OSD启动过程中osdmap加载流程

OSD启动入口是ceph_osd.cc的main函数,他会调用osd->init()进行osd启动前的初始化工作。

int OSD::init()
{
  ......
  int r = store->mount(); // 检查osd目录相关持久化数据,以及文件系统属性等,加载FileStore驱动。
  ......
  r = read_superblock(); // 这个是读取的current/meta目录下的osd_superblock_xxx文件,而不是osd根目录下的superblock文件(这个是在上面的mount函数里读取的)
  /*
  (gdb) p superblock 
  $1 = {cluster_fsid = {uuid = "#\214;EȄI\021\224+\244$\221\002P\277"}, osd_fsid = {
    uuid = "\216+\004F\354)B\033\263\023\320\304\022\220\374", <incomplete sequence \342>}, whoami = 1, current_epoch = 20, oldest_map = 1, newest_map = 20, 
  weight = 0, compat_features = {compat = {mask = 1, names = std::map with 0 elements}, ro_compat = {mask = 1, names = std::map with 0 elements}, incompat = {
      mask = 14335, names = std::map with 12 elements = {[1] = "initial feature set(~v.18)", [2] = "pginfo object", [3] = "object locator", 
        [4] = "last_epoch_clean", [5] = "categories", [6] = "hobjectpool", [7] = "biginfo", [8] = "leveldbinfo", [9] = "leveldblog", [10] = "snapmapper", 
        [12] = "transaction hints", [13] = "pg meta object"}}}, mounted = 12, clean_thru = 20, last_map_marked_full = 0}
  */

  ......
  osdmap = get_map(superblock.current_epoch); // 加载osd down之前保存的最新版本osdmap,具体过程见下面分析
  ......
  // load up pgs (as they previously existed)
  load_pgs(); // 加载OSD上已有的pg,具体见下面分析
  ......
  osd_tp.start(); // 启动osd的peering线程池
  ......
  consume_map(); // 消费osdmap,或者说使用osdmap,具体见下面分析
  ......
  set_state(STATE_BOOTING);  // 设置osd状态为STATE_BOOTING,OSD启动过程中共有STATE_INITIALIZING(默认值)、STATE_BOOTING、STATE_ACTIVE这几个状态阶段
  start_boot(); // 准备启动OSD,具体见下面分析
  ......
}

加载osdmap:

class OSD: {
  ......
  // osd map cache (past osd maps)
  OSDMapRef get_map(epoch_t e) {
    return service.get_map(e);
  }
  ......
}

class OSDService: {
  ......
  OSDMapRef get_map(epoch_t e) {
    OSDMapRef ret(try_get_map(e));
    assert(ret);
    return ret;
  }
  ......
}

OSDMapRef OSDService::try_get_map(epoch_t epoch)
{
  Mutex::Locker l(map_cache_lock);
  OSDMapRef retval = map_cache.lookup(epoch); // 从osdmap缓存查找该版本的map是否存在
  if (retval) {
    dout(30) < < "get_map " << epoch << " -cached" << dendl;
    return retval;
  }

  OSDMap *map = new OSDMap;
  if (epoch > 0) {
    dout(20) < < "get_map " << epoch << " - loading and decoding " << map << dendl;
    bufferlist bl;
    if (!_get_map_bl(epoch, bl)) { // 从osdmap的bufferlist缓存(map_bl_cache)中查找该版本map是否存在,如果不存在则从硬盘上加载,并加入map_bl_cache缓存
      delete map;
      return OSDMapRef();
    }
    map->decode(bl); // 解码bufferlist数据到osdmap
  } else {
    dout(20) < < "get_map " << epoch << " - return initial " << map << dendl;
  }
  return _add_map(map); // 把获取的osdmap加入map_cache缓存
}

上述osdmap加载过程中涉及到两个内存缓存:map_cache和map_bl_cache(还有一个map_bl_inc_cache是保存增量osdmap的bufferlist的缓存),这两个缓存都是基于LRU算法,在OSDService类的构造函数中初始化的,默认的缓存空间大小(缓存项最大数量)是由配置项osd_map_cache_size决定的,其默认值是500,因此在启动过程中缓存的osdmap数量是足够的(根据实际线程环境osdmap变化速度,有运维操作时版本变化量是150左右,osdmap变化数量跟osd状态变化次数强相关,没有操作时基本不变)。

加载OSD上已有的pg:

void OSD::load_pgs()
{
  assert(osd_lock.is_locked());
  dout(0) << "load_pgs" << dendl;
  {
    RWLock::RLocker l(pg_map_lock);
    assert(pg_map.empty());
  }

  vector<coll_t> ls;
  int r = store->list_collections(ls);//遍历current目录下所有文件夹,也即pg
  if (r < 0) {
    derr << "failed to list pgs: " << cpp_strerror(-r) << dendl;
  }
  ......
  for (map<spg_t, interval_set<snapid_t> >::iterator i = pgs.begin(); // pgs是从ls中加载的pg列表
       i != pgs.end();
       ++i) {
    spg_t pgid(i->first);
    ......
    bufferlist bl;
    epoch_t map_epoch = 0;
    int r = PG::peek_map_epoch(store, pgid, &map_epoch, &bl); // 从omap获取pg关联的osdmap版本,可以认为是osd down之前保存的最新osdmap版本
    ......
    PG *pg = NULL;
    if (map_epoch > 0) {
      OSDMapRef pgosdmap = service.try_get_map(map_epoch); // 参考上面的分析过程
      ......
      pg = _open_lock_pg(pgosdmap, pgid);
    } else {
      pg = _open_lock_pg(osdmap, pgid); //打开pg对象并加锁
    }
    ......
    // read pg state, log
    pg->read_state(store, bl); // 从omap中读取pg info和pg log
    ......
    pg->handle_loaded(&rctx); // 使pg状态机进入Reset状态,为进入peering状态做准备
    ......
}

PG *OSD::_open_lock_pg(
  OSDMapRef createmap,
  spg_t pgid, bool no_lockdep_check)
{
  assert(osd_lock.is_locked());

  PG* pg = _make_pg(createmap, pgid);
  {
    RWLock::WLocker l(pg_map_lock);
    pg->lock(no_lockdep_check);
    pg_map[pgid] = pg; // 把pg保存到pg_map
    pg->get("PGMap");  // because it's in pg_map
    service.pg_add_epoch(pg->info.pgid, createmap->get_epoch());
  }
  return pg;
}

使用osdmap:

void OSD::consume_map()
{
  ......
  // scan pg's
  {
    RWLock::RLocker l(pg_map_lock);
    for (ceph::unordered_map<spg_t ,PG*>::iterator it = pg_map.begin(); // pg_map是上面load_pgs函数初始化的,保存的是osd上承载的所有pg
        it != pg_map.end();
        ++it) {
      PG *pg = it->second;
      pg->lock();
      pg->queue_null(osdmap->get_epoch(), osdmap->get_epoch());
      pg->unlock();
    }
  ......
}

void PG::queue_null(epoch_t msg_epoch,
            epoch_t query_epoch)
{
  dout(10) < < "null" << dendl;
  queue_peering_event( // 发送空事件给pg peering_queue,主要为了是让pg进入peering状态
    CephPeeringEvtRef(new CephPeeringEvt(msg_epoch, query_epoch,
                     NullEvt())));
}

需要说明的是,在osd调用start_boot(在回调_maybe_boot里)发送MOSDBoot给monitor之前,OSD仍然处于down状态,其上承载的pg也就处于degraded/undersized状态,这种情况下只要acting set里的osd数量(可用副本数)仍然大于等于pool的min_size值,pg进入peering状态也不会对客户端IO产生影响(不阻塞IO)。

OSD启动(UP):

struct C_OSD_GetVersion : public Context {
  OSD *osd;
  uint64_t oldest, newest;
  C_OSD_GetVersion(OSD *o) : osd(o), oldest(0), newest(0) {}
  void finish(int r) {
    if (r >= 0)
      osd->_maybe_boot(oldest, newest);
  }
};

// OSD::init和OSD::handle_osd_map都会调用这个函数,从monitor查询osdmap版本信息,并在osd拥有的osdmap版本号与最新版本相差不大时发送启动消息给monitor
// 如果osd本地osdmap版本与最新版本相差较大(超过osd_map_message_max),则osdmap_subscribe并在OSD::handle_osd_map里再次调用这个函数检查版本号差距
void OSD::start_boot()
{
  dout(10) < < "start_boot - have maps " << superblock.oldest_map
       << ".." << superblock.newest_map << dendl;
  C_OSD_GetVersion *c = new C_OSD_GetVersion(this);
  monc->get_version("osdmap", &c->newest, &c->oldest, c);
}

void OSD::_maybe_boot(epoch_t oldest, epoch_t newest)
{
  ......
  // if our map within recent history, try to add ourselves to the osdmap.
  if (osdmap->test_flag(CEPH_OSDMAP_NOUP)) {
    dout(5) < < "osdmap NOUP flag is set, waiting for it to clear" << dendl;
  } else if (is_waiting_for_healthy() || !_is_healthy()) {
    // if we are not healthy, do not mark ourselves up (yet)
    dout(1) << "not healthy; waiting to boot" << dendl;
    if (!is_waiting_for_healthy())
      start_waiting_for_healthy();
    // send pings sooner rather than later
    heartbeat_kick();
  } else if (osdmap->get_epoch() >= oldest - 1 &&
         osdmap->get_epoch() + cct->_conf->osd_map_message_max > newest) {
    _send_boot(); // 在这里告诉monitor,osd已启动
    return;
  }

  // get all the latest maps
  // 如果OSD拥有的osdmap与集群的osdmap版本差距较大,则继续发送osdmap订阅消息给monitor,monitor会返回订阅的osdmap(批量发送osd_map_message_max),直到二者差距不大(小于osd_map_message_max)
  if (osdmap->get_epoch() + 1 >= oldest)
    osdmap_subscribe(osdmap->get_epoch() + 1, true);
  else
    osdmap_subscribe(oldest - 1, true);
}

void OSD::_send_boot()
{
  ......
  MOSDBoot *mboot = new MOSDBoot(superblock, service.get_boot_epoch(),
                                 hb_back_addr, hb_front_addr, cluster_addr,
                 CEPH_FEATURES_ALL);
  dout(10) < < " client_addr " << client_messenger->get_myaddr()
       < < ", cluster_addr " << cluster_addr
       << ", hb_back_addr " << hb_back_addr
       << ", hb_front_addr " << hb_front_addr
       << dendl;
  _collect_metadata(&mboot->metadata);
  monc->send_mon_message(mboot); // 发送osd boot消息给monitor,之后monitor就认为osd已经启动
  // 在OSDMonitor::preprocess_boot、OSDMonitor::prepare_boot处理这个消息,prepare_boot会发送osdmap给当前osd(OSDMonitor::_booted),版本号是osd当前拥有的osdmap的epoch+1
}

一旦osd发送了MOSDBoot消息给monitor,并且monitor经过Paxos决议之后接受了osd的boot状态,那么osd就被认为是up的,加入到acting/up set里,就会被crush算法考虑在内,客户端IO就会发送到这个osd上,如果此时osd上的pg处于peering状态,则可能会阻塞客户的IO。

peering过程中osdmap更新流程

无论是调用osdmap_subscribe发送MMonSubscribe消息(osd启动前或者按需发送),或者调用_send_boot发送MOSDBoot消息(osd启动时),或者调用send_alive发送MOSDAlive消息给monitor(osd启动后,peering结束,或者给monitor上报osd信息时等),monitor都会通过调用OSDMonitor::send_latest发生osdmap给osd(全量或增量):

// Monitor::handle_subscribe处理MMonSubscribe消息,之后由OSDMonitor::check_sub发送osdmap给osd
void OSD::osdmap_subscribe(version_t epoch, bool force_request)
{
  OSDMapRef osdmap = service.get_osdmap();
  if (osdmap->get_epoch() >= epoch)
    return;

  if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
      force_request) {
    monc->renew_subs();
  }
}

void OSD::handle_osd_map(MOSDMap *m)
{
  ......
  // missing some?
  bool skip_maps = false;
  if (first > osdmap->get_epoch() + 1) {
    dout(10) < < "handle_osd_map message skips epochs " << osdmap->get_epoch() + 1
         < < ".." << (first-1) << dendl;
    if (m->oldest_map < = osdmap->get_epoch() + 1) {
      osdmap_subscribe(osdmap->get_epoch()+1, true); // 发过来的osdmap版本太老,不符合需要,重新订阅
      m->put();
      return;
    }
    // always try to get the full range of maps--as many as we can.  this
    //  1- is good to have
    //  2- is at present the only way to ensure that we get a *full* map as
    //     the first map!
    if (m->oldest_map < first) {
      osdmap_subscribe(m->oldest_map - 1, true);
      m->put();
      return;
    }
    skip_maps = true;
  }

  ObjectStore::Transaction *_t = new ObjectStore::Transaction;
  ObjectStore::Transaction &t = *_t;

  // store new maps: queue for disk and put in the osdmap cache
  // 上面的原版注释已经写清楚了
  epoch_t last_marked_full = 0;
  epoch_t start = MAX(osdmap->get_epoch() + 1, first);
  for (epoch_t e = start; e < = last; e++) {
    map<epoch_t,bufferlist>::iterator p;
    p = m->maps.find(e);
    if (p != m->maps.end()) { // 处理全量osdmap
      dout(10) < < "handle_osd_map  got full map for epoch " << e << dendl;
      OSDMap *o = new OSDMap;
      bufferlist& bl = p->second;

      o->decode(bl);
      if (o->test_flag(CEPH_OSDMAP_FULL))
      last_marked_full = e;

      hobject_t fulloid = get_osdmap_pobject_name(e);
      t.write(META_COLL, fulloid, 0, bl.length(), bl);
      pin_map_bl(e, bl);
      pinned_maps.push_back(add_map(o));
      continue;
    }

    p = m->incremental_maps.find(e);
    if (p != m->incremental_maps.end()) { // 处理增量osdmap
      dout(10) < < "handle_osd_map  got inc map for epoch " << e << dendl;
      bufferlist& bl = p->second;
      hobject_t oid = get_inc_osdmap_pobject_name(e);
      t.write(META_COLL, oid, 0, bl.length(), bl);
      pin_map_inc_bl(e, bl);

      OSDMap *o = new OSDMap;
      if (e > 1) {
        bufferlist obl;
        get_map_bl(e - 1, obl);
        o->decode(obl);
      }

      OSDMap::Incremental inc;
      bufferlist::iterator p = bl.begin();
      inc.decode(p);
      if (o->apply_incremental(inc) < 0) {
        derr << "ERROR: bad fsid?  i have " << osdmap->get_fsid() < < " and inc has " << inc.fsid << dendl;
        assert(0 == "bad fsid");
      }

      if (o->test_flag(CEPH_OSDMAP_FULL))
        last_marked_full = e;

      bufferlist fbl;
      o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED);
      ......
      hobject_t fulloid = get_osdmap_pobject_name(e);
      t.write(META_COLL, fulloid, 0, fbl.length(), fbl);
      pin_map_bl(e, fbl);
      pinned_maps.push_back(add_map(o));
      continue;
    }

    assert(0 == "MOSDMap lied about what maps it had?");
  }


  if (superblock.oldest_map) { // 更新superblock中的oldest_map版本
    int num = 0;
    epoch_t min(
      MIN(m->oldest_map,
      service.map_cache.cached_key_lower_bound()));
    for (epoch_t e = superblock.oldest_map; e < min; ++e) {
      dout(20) << " removing old osdmap epoch " << e << dendl;
      t.remove(META_COLL, get_osdmap_pobject_name(e));
      t.remove(META_COLL, get_inc_osdmap_pobject_name(e));
      superblock.oldest_map = e+1;
      num++;
      if (num >= cct->_conf->osd_target_transaction_size &&
          (uint64_t)num > (last - first))  // make sure we at least keep pace with incoming maps
        break;
    }
  }

  if (!superblock.oldest_map || skip_maps)
    superblock.oldest_map = first;
  superblock.newest_map = last; // 更新superblock中的newest_map版本

  if (last_marked_full > superblock.last_map_marked_full)
    superblock.last_map_marked_full = last_marked_full; // 更新superblock

  map_lock.get_write();

  C_Contexts *fin = new C_Contexts(cct);

  // advance through the new maps
  for (epoch_t cur = start; cur < = superblock.newest_map; cur++) {
    dout(10) << " advance to epoch " << cur << " (<= newest " << superblock.newest_map << ")" << dendl;

    OSDMapRef newmap = get_map(cur);
    assert(newmap);  // we just cached it above!

    // start blacklisting messages sent to peers that go down.
    service.pre_publish_map(newmap);

    // kill connections to newly down osds
    bool waited_for_reservations = false;
    set<int> old;
    osdmap->get_all_osds(old);
    for (set<int>::iterator p = old.begin(); p != old.end(); ++p) {
      if (*p != whoami &&
      osdmap->have_inst(*p) &&                        // in old map
      (!newmap->exists(*p) || !newmap->is_up(*p))) {  // but not the new one
        if (!waited_for_reservations) {
          service.await_reserved_maps();
          waited_for_reservations = true;
        }
    note_down_osd(*p);
      }
    }

    osdmap = newmap;

    superblock.current_epoch = cur; // 更新superblock
    advance_map(t, fin);
    had_map_since = ceph_clock_now(cct);
  }

  epoch_t _bind_epoch = service.get_bind_epoch();
  if (osdmap->is_up(whoami) &&
      osdmap->get_addr(whoami) == client_messenger->get_myaddr() &&
      _bind_epoch < osdmap->get_up_from(whoami)) {

    if (is_booting()) {
      dout(1) < < "state: booting -> active" < < dendl;
      set_state(STATE_ACTIVE);  // 设置OSD为Active状态

      // set incarnation so that osd_reqid_t's we generate for our
      // objecter requests are unique across restarts.
      service.objecter->set_client_incarnation(osdmap->get_epoch());
    }
  }

  // note in the superblock that we were clean thru the prior epoch
  // 继续更新superblock
  epoch_t boot_epoch = service.get_boot_epoch();
  if (boot_epoch && boot_epoch >= superblock.mounted) {
    superblock.mounted = boot_epoch;
    superblock.clean_thru = osdmap->get_epoch();
  }

  // superblock and commit
  // 保存superblock到硬盘(leveldb)
  write_superblock(t);
  store->queue_transaction(
    0,
    _t,
    new C_OnMapApply(&service, _t, pinned_maps, osdmap->get_epoch()),
    0, fin);
  service.publish_superblock(superblock);
  ......
  // yay!
  consume_map(); // 上面已经分析过

  if (is_active() || is_waiting_for_healthy())
    maybe_update_heartbeat_peers(); // 更新OSD心跳互检的对端列表
  ......
  if (m->newest_map && m->newest_map > last) {
    dout(10) < < " msg say newest map is " << m->newest_map < < ", requesting more" << dendl;
    osdmap_subscribe(osdmap->get_epoch()+1, true);
  }
  else if (is_booting()) {
    start_boot();  // retry,检查osd是否可以启动(是否能发送MOSDBoot消息给monitor,使osd变为up状态)
  }
  else if (do_restart)
    start_boot();

  if (do_shutdown)
    shutdown();

  m->put();
}

结论

OSD变为up状态前,所有的加载操作,对peering流程耗时均没有影响,只有当osd发送MOSDBoot消息通知monitor他已经启动,并且monitor经过Paxos决议之后将其加入osdmap中变为UP状态之后,如果再有相关数据的(从硬盘)加载操作,才可能会影响peering耗时。

在monitor发送osdmap之后osd就会将其保存到缓存中,考虑到缓存大小默认500条,还是有可能会被冲掉的,只有在osd启动时的osdmap版本跟集群的版本差距很大的时候才有这种可能(义桥私有云集群观察到启动OSD操作时osd与集群的版本号差了2091个),此时就可能影响到peering流程(OSD::process_peering_events->OSD::advance_pg->service.try_get_map),因此要尽量避免启动、停止osd时与集群的osdmap版本号差距太大(差距太大,不仅内存缓存可能不够,每个版本的osdmap都要被每个pg检查并使用一遍,积少成多也会有一定的耗时)。

因此启动前预加载osdmap到内存pagecache中,带来的好处不大。