本文基于ceph H版本代码,目的是为了调研启动OSD之前vmtouch需要预加载哪些osdmap文件?以及预加载后对peering过程的影响。
OSD启动过程中osdmap加载流程
OSD启动入口是ceph_osd.cc的main函数,他会调用osd->init()进行osd启动前的初始化工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
int OSD::init() { ...... int r = store->mount(); // 检查osd目录相关持久化数据,以及文件系统属性等,加载FileStore驱动。 ...... r = read_superblock(); // 这个是读取的current/meta目录下的osd_superblock_xxx文件,而不是osd根目录下的superblock文件(这个是在上面的mount函数里读取的) /* (gdb) p superblock $1 = {cluster_fsid = {uuid = "#\214;EȄI\021\224+\244$\221\002P\277"}, osd_fsid = { uuid = "\216+\004F\354)B\033\263\023\320\304\022\220\374", <incomplete sequence \342>}, whoami = 1, current_epoch = 20, oldest_map = 1, newest_map = 20, weight = 0, compat_features = {compat = {mask = 1, names = std::map with 0 elements}, ro_compat = {mask = 1, names = std::map with 0 elements}, incompat = { mask = 14335, names = std::map with 12 elements = {[1] = "initial feature set(~v.18)", [2] = "pginfo object", [3] = "object locator", [4] = "last_epoch_clean", [5] = "categories", [6] = "hobjectpool", [7] = "biginfo", [8] = "leveldbinfo", [9] = "leveldblog", [10] = "snapmapper", [12] = "transaction hints", [13] = "pg meta object"}}}, mounted = 12, clean_thru = 20, last_map_marked_full = 0} */ ...... osdmap = get_map(superblock.current_epoch); // 加载osd down之前保存的最新版本osdmap,具体过程见下面分析 ...... // load up pgs (as they previously existed) load_pgs(); // 加载OSD上已有的pg,具体见下面分析 ...... osd_tp.start(); // 启动osd的peering线程池 ...... consume_map(); // 消费osdmap,或者说使用osdmap,具体见下面分析 ...... set_state(STATE_BOOTING); // 设置osd状态为STATE_BOOTING,OSD启动过程中共有STATE_INITIALIZING(默认值)、STATE_BOOTING、STATE_ACTIVE这几个状态阶段 start_boot(); // 准备启动OSD,具体见下面分析 ...... } |
加载osdmap:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
class OSD: { ...... // osd map cache (past osd maps) OSDMapRef get_map(epoch_t e) { return service.get_map(e); } ...... } class OSDService: { ...... OSDMapRef get_map(epoch_t e) { OSDMapRef ret(try_get_map(e)); assert(ret); return ret; } ...... } OSDMapRef OSDService::try_get_map(epoch_t epoch) { Mutex::Locker l(map_cache_lock); OSDMapRef retval = map_cache.lookup(epoch); // 从osdmap缓存查找该版本的map是否存在 if (retval) { dout(30) < < "get_map " << epoch << " -cached" << dendl; return retval; } OSDMap *map = new OSDMap; if (epoch > 0) { dout(20) < < "get_map " << epoch << " - loading and decoding " << map << dendl; bufferlist bl; if (!_get_map_bl(epoch, bl)) { // 从osdmap的bufferlist缓存(map_bl_cache)中查找该版本map是否存在,如果不存在则从硬盘上加载,并加入map_bl_cache缓存 delete map; return OSDMapRef(); } map->decode(bl); // 解码bufferlist数据到osdmap } else { dout(20) < < "get_map " << epoch << " - return initial " << map << dendl; } return _add_map(map); // 把获取的osdmap加入map_cache缓存 } |
上述osdmap加载过程中涉及到两个内存缓存:map_cache和map_bl_cache(还有一个map_bl_inc_cache是保存增量osdmap的bufferlist的缓存),这两个缓存都是基于LRU算法,在OSDService类的构造函数中初始化的,默认的缓存空间大小(缓存项最大数量)是由配置项osd_map_cache_size决定的,其默认值是500,因此在启动过程中缓存的osdmap数量是足够的(根据实际线程环境osdmap变化速度,有运维操作时版本变化量是150左右,osdmap变化数量跟osd状态变化次数强相关,没有操作时基本不变)。
加载OSD上已有的pg:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
void OSD::load_pgs() { assert(osd_lock.is_locked()); dout(0) << "load_pgs" << dendl; { RWLock::RLocker l(pg_map_lock); assert(pg_map.empty()); } vector<coll_t> ls; int r = store->list_collections(ls);//遍历current目录下所有文件夹,也即pg if (r < 0) { derr << "failed to list pgs: " << cpp_strerror(-r) << dendl; } ...... for (map<spg_t, interval_set<snapid_t> >::iterator i = pgs.begin(); // pgs是从ls中加载的pg列表 i != pgs.end(); ++i) { spg_t pgid(i->first); ...... bufferlist bl; epoch_t map_epoch = 0; int r = PG::peek_map_epoch(store, pgid, &map_epoch, &bl); // 从omap获取pg关联的osdmap版本,可以认为是osd down之前保存的最新osdmap版本 ...... PG *pg = NULL; if (map_epoch > 0) { OSDMapRef pgosdmap = service.try_get_map(map_epoch); // 参考上面的分析过程 ...... pg = _open_lock_pg(pgosdmap, pgid); } else { pg = _open_lock_pg(osdmap, pgid); //打开pg对象并加锁 } ...... // read pg state, log pg->read_state(store, bl); // 从omap中读取pg info和pg log ...... pg->handle_loaded(&rctx); // 使pg状态机进入Reset状态,为进入peering状态做准备 ...... } PG *OSD::_open_lock_pg( OSDMapRef createmap, spg_t pgid, bool no_lockdep_check) { assert(osd_lock.is_locked()); PG* pg = _make_pg(createmap, pgid); { RWLock::WLocker l(pg_map_lock); pg->lock(no_lockdep_check); pg_map[pgid] = pg; // 把pg保存到pg_map pg->get("PGMap"); // because it's in pg_map service.pg_add_epoch(pg->info.pgid, createmap->get_epoch()); } return pg; } |
使用osdmap:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
void OSD::consume_map() { ...... // scan pg's { RWLock::RLocker l(pg_map_lock); for (ceph::unordered_map<spg_t ,PG*>::iterator it = pg_map.begin(); // pg_map是上面load_pgs函数初始化的,保存的是osd上承载的所有pg it != pg_map.end(); ++it) { PG *pg = it->second; pg->lock(); pg->queue_null(osdmap->get_epoch(), osdmap->get_epoch()); pg->unlock(); } ...... } void PG::queue_null(epoch_t msg_epoch, epoch_t query_epoch) { dout(10) < < "null" << dendl; queue_peering_event( // 发送空事件给pg peering_queue,主要为了是让pg进入peering状态 CephPeeringEvtRef(new CephPeeringEvt(msg_epoch, query_epoch, NullEvt()))); } |
需要说明的是,在osd调用start_boot(在回调_maybe_boot里)发送MOSDBoot给monitor之前,OSD仍然处于down状态,其上承载的pg也就处于degraded/undersized状态,这种情况下只要acting set里的osd数量(可用副本数)仍然大于等于pool的min_size值,pg进入peering状态也不会对客户端IO产生影响(不阻塞IO)。
OSD启动(UP):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
struct C_OSD_GetVersion : public Context { OSD *osd; uint64_t oldest, newest; C_OSD_GetVersion(OSD *o) : osd(o), oldest(0), newest(0) {} void finish(int r) { if (r >= 0) osd->_maybe_boot(oldest, newest); } }; // OSD::init和OSD::handle_osd_map都会调用这个函数,从monitor查询osdmap版本信息,并在osd拥有的osdmap版本号与最新版本相差不大时发送启动消息给monitor // 如果osd本地osdmap版本与最新版本相差较大(超过osd_map_message_max),则osdmap_subscribe并在OSD::handle_osd_map里再次调用这个函数检查版本号差距 void OSD::start_boot() { dout(10) < < "start_boot - have maps " << superblock.oldest_map << ".." << superblock.newest_map << dendl; C_OSD_GetVersion *c = new C_OSD_GetVersion(this); monc->get_version("osdmap", &c->newest, &c->oldest, c); } void OSD::_maybe_boot(epoch_t oldest, epoch_t newest) { ...... // if our map within recent history, try to add ourselves to the osdmap. if (osdmap->test_flag(CEPH_OSDMAP_NOUP)) { dout(5) < < "osdmap NOUP flag is set, waiting for it to clear" << dendl; } else if (is_waiting_for_healthy() || !_is_healthy()) { // if we are not healthy, do not mark ourselves up (yet) dout(1) << "not healthy; waiting to boot" << dendl; if (!is_waiting_for_healthy()) start_waiting_for_healthy(); // send pings sooner rather than later heartbeat_kick(); } else if (osdmap->get_epoch() >= oldest - 1 && osdmap->get_epoch() + cct->_conf->osd_map_message_max > newest) { _send_boot(); // 在这里告诉monitor,osd已启动 return; } // get all the latest maps // 如果OSD拥有的osdmap与集群的osdmap版本差距较大,则继续发送osdmap订阅消息给monitor,monitor会返回订阅的osdmap(批量发送osd_map_message_max),直到二者差距不大(小于osd_map_message_max) if (osdmap->get_epoch() + 1 >= oldest) osdmap_subscribe(osdmap->get_epoch() + 1, true); else osdmap_subscribe(oldest - 1, true); } void OSD::_send_boot() { ...... MOSDBoot *mboot = new MOSDBoot(superblock, service.get_boot_epoch(), hb_back_addr, hb_front_addr, cluster_addr, CEPH_FEATURES_ALL); dout(10) < < " client_addr " << client_messenger->get_myaddr() < < ", cluster_addr " << cluster_addr << ", hb_back_addr " << hb_back_addr << ", hb_front_addr " << hb_front_addr << dendl; _collect_metadata(&mboot->metadata); monc->send_mon_message(mboot); // 发送osd boot消息给monitor,之后monitor就认为osd已经启动 // 在OSDMonitor::preprocess_boot、OSDMonitor::prepare_boot处理这个消息,prepare_boot会发送osdmap给当前osd(OSDMonitor::_booted),版本号是osd当前拥有的osdmap的epoch+1 } |
一旦osd发送了MOSDBoot消息给monitor,并且monitor经过Paxos决议之后接受了osd的boot状态,那么osd就被认为是up的,加入到acting/up set里,就会被crush算法考虑在内,客户端IO就会发送到这个osd上,如果此时osd上的pg处于peering状态,则可能会阻塞客户的IO。
peering过程中osdmap更新流程
无论是调用osdmap_subscribe发送MMonSubscribe消息(osd启动前或者按需发送),或者调用_send_boot发送MOSDBoot消息(osd启动时),或者调用send_alive发送MOSDAlive消息给monitor(osd启动后,peering结束,或者给monitor上报osd信息时等),monitor都会通过调用OSDMonitor::send_latest发生osdmap给osd(全量或增量):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
// Monitor::handle_subscribe处理MMonSubscribe消息,之后由OSDMonitor::check_sub发送osdmap给osd void OSD::osdmap_subscribe(version_t epoch, bool force_request) { OSDMapRef osdmap = service.get_osdmap(); if (osdmap->get_epoch() >= epoch) return; if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) || force_request) { monc->renew_subs(); } } void OSD::handle_osd_map(MOSDMap *m) { ...... // missing some? bool skip_maps = false; if (first > osdmap->get_epoch() + 1) { dout(10) < < "handle_osd_map message skips epochs " << osdmap->get_epoch() + 1 < < ".." << (first-1) << dendl; if (m->oldest_map < = osdmap->get_epoch() + 1) { osdmap_subscribe(osdmap->get_epoch()+1, true); // 发过来的osdmap版本太老,不符合需要,重新订阅 m->put(); return; } // always try to get the full range of maps--as many as we can. this // 1- is good to have // 2- is at present the only way to ensure that we get a *full* map as // the first map! if (m->oldest_map < first) { osdmap_subscribe(m->oldest_map - 1, true); m->put(); return; } skip_maps = true; } ObjectStore::Transaction *_t = new ObjectStore::Transaction; ObjectStore::Transaction &t = *_t; // store new maps: queue for disk and put in the osdmap cache // 上面的原版注释已经写清楚了 epoch_t last_marked_full = 0; epoch_t start = MAX(osdmap->get_epoch() + 1, first); for (epoch_t e = start; e < = last; e++) { map<epoch_t,bufferlist>::iterator p; p = m->maps.find(e); if (p != m->maps.end()) { // 处理全量osdmap dout(10) < < "handle_osd_map got full map for epoch " << e << dendl; OSDMap *o = new OSDMap; bufferlist& bl = p->second; o->decode(bl); if (o->test_flag(CEPH_OSDMAP_FULL)) last_marked_full = e; hobject_t fulloid = get_osdmap_pobject_name(e); t.write(META_COLL, fulloid, 0, bl.length(), bl); pin_map_bl(e, bl); pinned_maps.push_back(add_map(o)); continue; } p = m->incremental_maps.find(e); if (p != m->incremental_maps.end()) { // 处理增量osdmap dout(10) < < "handle_osd_map got inc map for epoch " << e << dendl; bufferlist& bl = p->second; hobject_t oid = get_inc_osdmap_pobject_name(e); t.write(META_COLL, oid, 0, bl.length(), bl); pin_map_inc_bl(e, bl); OSDMap *o = new OSDMap; if (e > 1) { bufferlist obl; get_map_bl(e - 1, obl); o->decode(obl); } OSDMap::Incremental inc; bufferlist::iterator p = bl.begin(); inc.decode(p); if (o->apply_incremental(inc) < 0) { derr << "ERROR: bad fsid? i have " << osdmap->get_fsid() < < " and inc has " << inc.fsid << dendl; assert(0 == "bad fsid"); } if (o->test_flag(CEPH_OSDMAP_FULL)) last_marked_full = e; bufferlist fbl; o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED); ...... hobject_t fulloid = get_osdmap_pobject_name(e); t.write(META_COLL, fulloid, 0, fbl.length(), fbl); pin_map_bl(e, fbl); pinned_maps.push_back(add_map(o)); continue; } assert(0 == "MOSDMap lied about what maps it had?"); } if (superblock.oldest_map) { // 更新superblock中的oldest_map版本 int num = 0; epoch_t min( MIN(m->oldest_map, service.map_cache.cached_key_lower_bound())); for (epoch_t e = superblock.oldest_map; e < min; ++e) { dout(20) << " removing old osdmap epoch " << e << dendl; t.remove(META_COLL, get_osdmap_pobject_name(e)); t.remove(META_COLL, get_inc_osdmap_pobject_name(e)); superblock.oldest_map = e+1; num++; if (num >= cct->_conf->osd_target_transaction_size && (uint64_t)num > (last - first)) // make sure we at least keep pace with incoming maps break; } } if (!superblock.oldest_map || skip_maps) superblock.oldest_map = first; superblock.newest_map = last; // 更新superblock中的newest_map版本 if (last_marked_full > superblock.last_map_marked_full) superblock.last_map_marked_full = last_marked_full; // 更新superblock map_lock.get_write(); C_Contexts *fin = new C_Contexts(cct); // advance through the new maps for (epoch_t cur = start; cur < = superblock.newest_map; cur++) { dout(10) << " advance to epoch " << cur << " (<= newest " << superblock.newest_map << ")" << dendl; OSDMapRef newmap = get_map(cur); assert(newmap); // we just cached it above! // start blacklisting messages sent to peers that go down. service.pre_publish_map(newmap); // kill connections to newly down osds bool waited_for_reservations = false; set<int> old; osdmap->get_all_osds(old); for (set<int>::iterator p = old.begin(); p != old.end(); ++p) { if (*p != whoami && osdmap->have_inst(*p) && // in old map (!newmap->exists(*p) || !newmap->is_up(*p))) { // but not the new one if (!waited_for_reservations) { service.await_reserved_maps(); waited_for_reservations = true; } note_down_osd(*p); } } osdmap = newmap; superblock.current_epoch = cur; // 更新superblock advance_map(t, fin); had_map_since = ceph_clock_now(cct); } epoch_t _bind_epoch = service.get_bind_epoch(); if (osdmap->is_up(whoami) && osdmap->get_addr(whoami) == client_messenger->get_myaddr() && _bind_epoch < osdmap->get_up_from(whoami)) { if (is_booting()) { dout(1) < < "state: booting -> active" < < dendl; set_state(STATE_ACTIVE); // 设置OSD为Active状态 // set incarnation so that osd_reqid_t's we generate for our // objecter requests are unique across restarts. service.objecter->set_client_incarnation(osdmap->get_epoch()); } } // note in the superblock that we were clean thru the prior epoch // 继续更新superblock epoch_t boot_epoch = service.get_boot_epoch(); if (boot_epoch && boot_epoch >= superblock.mounted) { superblock.mounted = boot_epoch; superblock.clean_thru = osdmap->get_epoch(); } // superblock and commit // 保存superblock到硬盘(leveldb) write_superblock(t); store->queue_transaction( 0, _t, new C_OnMapApply(&service, _t, pinned_maps, osdmap->get_epoch()), 0, fin); service.publish_superblock(superblock); ...... // yay! consume_map(); // 上面已经分析过 if (is_active() || is_waiting_for_healthy()) maybe_update_heartbeat_peers(); // 更新OSD心跳互检的对端列表 ...... if (m->newest_map && m->newest_map > last) { dout(10) < < " msg say newest map is " << m->newest_map < < ", requesting more" << dendl; osdmap_subscribe(osdmap->get_epoch()+1, true); } else if (is_booting()) { start_boot(); // retry,检查osd是否可以启动(是否能发送MOSDBoot消息给monitor,使osd变为up状态) } else if (do_restart) start_boot(); if (do_shutdown) shutdown(); m->put(); } |
结论
OSD变为up状态前,所有的加载操作,对peering流程耗时均没有影响,只有当osd发送MOSDBoot消息通知monitor他已经启动,并且monitor经过Paxos决议之后将其加入osdmap中变为UP状态之后,如果再有相关数据的(从硬盘)加载操作,才可能会影响peering耗时。
在monitor发送osdmap之后osd就会将其保存到缓存中,考虑到缓存大小默认500条,还是有可能会被冲掉的,只有在osd启动时的osdmap版本跟集群的版本差距很大的时候才有这种可能(义桥私有云集群观察到启动OSD操作时osd与集群的版本号差了2091个),此时就可能影响到peering流程(OSD::process_peering_events->OSD::advance_pg->service.try_get_map),因此要尽量避免启动、停止osd时与集群的osdmap版本号差距太大(差距太大,不仅内存缓存可能不够,每个版本的osdmap都要被每个pg检查并使用一遍,积少成多也会有一定的耗时)。
因此启动前预加载osdmap到内存pagecache中,带来的好处不大。