继续基于H版本代码分析。
主动触发
$ ceph osd deep-scrub 1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
bool OSDMonitor::preprocess_command(MMonCommand *m) { ...... } else if ((prefix == "osd scrub" || prefix == "osd deep-scrub" || prefix == "osd repair")) { string whostr; cmd_getval(g_ceph_context, cmdmap, "who", whostr); vector<string> pvec; get_str_vec(prefix, pvec); if (whostr == "*") { ss << "osds "; int c = 0; for (int i = 0; i < osdmap.get_max_osd(); i++) if (osdmap.is_up(i)) { ss << (c++ ? "," : "") << i; mon->try_send_message(new MOSDScrub(osdmap.get_fsid(), pvec.back() == "repair", pvec.back() == "deep-scrub"), osdmap.get_inst(i)); } r = 0; ss << " instructed to " << pvec.back(); } else { long osd = parse_osd_id(whostr.c_str(), &ss); if (osd < 0) { r = -EINVAL; } else if (osdmap.is_up(osd)) { mon->try_send_message(new MOSDScrub(osdmap.get_fsid(), pvec.back() == "repair", pvec.back() == "deep-scrub"), osdmap.get_inst(osd)); ss << "osd." << osd << " instructed to " << pvec.back(); } else { ss << "osd." << osd << " is not up"; r = -EAGAIN; } } } ...... } |
或者ceph pg scrub 0.1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
bool PGMonitor::preprocess_command(MMonCommand *m) { ...... } else if (prefix == "pg scrub" || prefix == "pg repair" || prefix == "pg deep-scrub") { string scrubop = prefix.substr(3, string::npos); pg_t pgid; string pgidstr; cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr); if (!pgid.parse(pgidstr.c_str())) { ss << "invalid pgid '" << pgidstr << "'"; r = -EINVAL; goto reply; } if (!pg_map.pg_stat.count(pgid)) { ss << "pg " << pgid << " dne"; r = -ENOENT; goto reply; } if (pg_map.pg_stat[pgid].acting_primary == -1) { ss << "pg " << pgid << " has no primary osd"; r = -EAGAIN; goto reply; } int osd = pg_map.pg_stat[pgid].acting_primary; if (!mon->osdmon()->osdmap.is_up(osd)) { ss << "pg " << pgid << " primary osd." << osd << " not up"; r = -EAGAIN; goto reply; } vector<pg_t> pgs(1); pgs[0] = pgid; mon->try_send_message(new MOSDScrub(mon->monmap->fsid, pgs, scrubop == "repair", scrubop == "deep-scrub"), mon->osdmon()->osdmap.get_inst(osd)); ss << "instructing pg " << pgid << " on osd." << osd << " to " << scrubop; r = 0; } ...... } |
pg是发给主osd。
定时触发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
void OSD::tick() { ...... if (is_active()) { // periodically kick recovery work queue recovery_tp.wake(); if (!scrub_random_backoff()) { // 随机等待 sched_scrub(); // 调度scrub任务 } ...... tick_timer.add_event_after(1.0, new C_Tick(this)); // 每秒1次tick } void OSD::sched_scrub() { utime_t now = ceph_clock_now(cct); bool time_permit = scrub_time_permit(now); // 检查scrub配置项,看当前时间点是否允许scrub bool load_is_low = scrub_load_below_threshold(); // 检查服务器load值,只有比较低的情况下才允许scrub dout(20) << "sched_scrub load_is_low=" << (int)load_is_low << dendl; OSDService::ScrubJob scrub; if (service.first_scrub_stamp(&scrub)) { do { dout(30) << "sched_scrub examine " << scrub.pgid << " at " << scrub.sched_time << dendl; if (scrub.sched_time > now) { // save ourselves some effort dout(10) << "sched_scrub " << scrub.pgid << " schedued at " << scrub.sched_time << " > " << now << dendl; break; } PG *pg = _lookup_lock_pg(scrub.pgid); if (!pg) continue; if (pg->get_pgbackend()->scrub_supported() && pg->is_active() && // pg状态检查 (scrub.deadline < now || (time_permit && load_is_low))) { // deadline到了或者允许执行 dout(10) << "sched_scrub scrubbing " << scrub.pgid << " at " << scrub.sched_time << (pg->scrubber.must_scrub ? ", explicitly requested" : (load_is_low ? ", load_is_low" : " deadline < now")) << dendl; if (pg->sched_scrub()) { // 交给pg处理 pg->unlock(); break; } } pg->unlock(); } while (service.next_scrub_stamp(scrub, &scrub)); } dout(20) << "sched_scrub done" << dendl; } // returns true if a scrub has been newly kicked off bool PG::sched_scrub() { ...... bool ret = true; if (!scrubber.reserved) { assert(scrubber.reserved_peers.empty()); if (osd->inc_scrubs_pending()) { dout(20) << "sched_scrub: reserved locally, reserving replicas" << dendl; scrubber.reserved = true; // 本地osd预留资源 scrubber.reserved_peers.insert(pg_whoami); scrub_reserve_replicas(); // 预留副本osd } else { dout(20) << "sched_scrub: failed to reserve locally" << dendl; ret = false; } } if (scrubber.reserved) { if (scrubber.reserve_failed) { // 从预留失败 dout(20) << "sched_scrub: failed, a peer declined" << dendl; clear_scrub_reserved(); scrub_unreserve_replicas(); ret = false; } else if (scrubber.reserved_peers.size() == acting.size()) { // 全部osd预留成功 dout(20) << "sched_scrub: success, reserved self and replicas" << dendl; if (time_for_deep) { dout(10) << "sched_scrub: scrub will be deep" << dendl; state_set(PG_STATE_DEEP_SCRUB); // 设置pg状态 } queue_scrub(); // 准备scrub } else { // 有副本还没预留成功 // none declined, since scrubber.reserved is set dout(20) << "sched_scrub: reserved " << scrubber.reserved_peers << ", waiting for replicas" << dendl; } } return ret; } bool PG::queue_scrub() { assert(_lock.is_locked()); if (is_scrubbing()) { return false; } scrubber.must_scrub = false; state_set(PG_STATE_SCRUBBING); // 设置状态 if (scrubber.must_deep_scrub) { state_set(PG_STATE_DEEP_SCRUB); // 设置状态 scrubber.must_deep_scrub = false; } if (scrubber.must_repair) { state_set(PG_STATE_REPAIR); // 设置状态 scrubber.must_repair = false; } osd->queue_for_scrub(this); // 丢队列OSDService的scrub_wq准备scrub return true; } |
主osd
scrub_wq是关联到disk_tp这个线程池的一个ThreadPool::WorkQueue
scrub_wq具体初始化流程是OSD::OSD构造函数,之后传给OSDService的构造函数,
1 2 3 4 5 6 7 8 9 10 11 12 13 |
// OSD::OSD scrub_wq( this, cct->_conf->osd_scrub_thread_timeout, cct->_conf->osd_scrub_thread_suicide_timeout, &disk_tp), // 关联的处理线程池 ...... disk_tp(cct, "OSD::disk_tp", cct->_conf->osd_disk_threads, "osd_disk_threads"), // OSDService::OSDService(OSD *osd) : scrub_wq(osd->scrub_wq), |
接下来是disk_tp的初始化过程,在OSD::init()里调用了disk_tp.start();,之后的流程都是线程池初始化通用流程了。
ThreadPool::start() -> ThreadPool::start_threads() -> new WorkThread -> wt->create() -> Thread::create(0) -> Thread::try_create(0) -> pthread_create(&thread_id, thread_attr, _entry_func, (void*)this);。
其中线程的处理函数_entry_func是Thread::entry_wrapper()返回的entry()函数,这个在WorkThread里定义了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
// threads struct WorkThread : public Thread { ThreadPool *pool; WorkThread(ThreadPool *p) : pool(p) {} void *entry() { pool->worker(this); return 0; } }; void ThreadPool::worker(WorkThread *wt) // 线程池处理函数 { ...... while (!_stop) { ...... if (!_pause && !work_queues.empty()) { WorkQueue_* wq; int tries = work_queues.size(); bool did = false; while (tries--) { last_work_queue++; last_work_queue %= work_queues.size(); wq = work_queues[last_work_queue]; void *item = wq->_void_dequeue(); // 调用WorkQueue的_void_dequeue(),再调用ScrubWQ的_dequeue() if (item) { processing++; ...... wq->_void_process(item, tp_handle); // 同_void_dequeue流程,调用pg->scrub(handle) _lock.Lock(); wq->_void_process_finish(item); processing--; ...... } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
void PG::scrub(ThreadPool::TPHandle &handle) { ...... chunky_scrub(handle); ...... } void PG::chunky_scrub(ThreadPool::TPHandle &handle) { ...... bool done = false; int ret; while (!done) { dout(20) << "scrub state " << Scrubber::state_string(scrubber.state) << dendl; ...... // 有限状态机,一坨状态变化过程及对应处理方法,直到PG::Scrubber::FINISH状态退出while循环 // PG::Scrubber::NEW_CHUNK状态会调用PG::_request_scrub_map函数找副本osd要scrub map,用来比较object的信息(大小、属性、校验和等) // PG::Scrubber::WAIT_REPLICAS 等待从osd返回scrub map(收到map的处理函数是sub_op_scrub_map),全部返回后进入COMPARE_MAPS状态 // PG::Scrubber::BUILD_MAP状态会调用build_scrub_map_chunk生成主osd的scrub map(会区分是否为deep) // PG::Scrubber::COMPARE_MAPS比较scrub map } |
整个scrub流程,主要包括确定时间,确定scrub对象(pg、object),获取从osd的对应信息进行比较,这些信息都是保存在scrub map里的。
副本osd
主要就是生成scrub map(OSD::handle_rep_scrub->rep_scrub_wq.queue(m)->PG::replica_scrub)和预留scrub资源(PG::sub_op_scrub_reserve),比较都是在主osd进行的(PG::scrub_compare_maps())。
replica_scrub也是调用build_scrub_map_chunk生成的map(跟主osd一样)。
scrub map比较
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
void PG::scrub_compare_maps() { dout(10) << __func__ << " has maps, analyzing" << dendl; // construct authoritative scrub map for type specific scrubbing ScrubMap authmap(scrubber.primary_scrubmap); map<hobject_t, pair<uint32_t, uint32_t> > missing_digest; if (acting.size() > 1) { dout(10) << __func__ << " comparing replica scrub maps" << dendl; stringstream ss; // Map from object with errors to good peer map<hobject_t, list<pg_shard_t> > authoritative; map<pg_shard_t, ScrubMap *> maps; dout(2) << __func__ << " osd." << acting[0] << " has " << scrubber.primary_scrubmap.objects.size() << " items" << dendl; maps[pg_whoami] = &scrubber.primary_scrubmap; // 加入主osd scrub map for (set<pg_shard_t>::iterator i = actingbackfill.begin(); i != actingbackfill.end(); ++i) { if (*i == pg_whoami) continue; dout(2) << __func__ << " replica " << *i << " has " << scrubber.received_maps[*i].objects.size() << " items" << dendl; maps[*i] = &scrubber.received_maps[*i]; // 加入从osd scrub map } // can we relate scrub digests to oi digests? bool okseed = (get_min_peer_features() & CEPH_FEATURE_OSD_OBJECT_DIGEST); assert(okseed == (scrubber.seed == 0xffffffff)); get_pgbackend()->be_compare_scrubmaps( // PGBackend::be_select_auth_object选取权威objects,之后调用PGBackend::be_compare_scrub_objects比较scrub maps中的主从数据 maps, okseed, state_test(PG_STATE_REPAIR), scrubber.missing, scrubber.inconsistent, authoritative, missing_digest, scrubber.shallow_errors, scrubber.deep_errors, info.pgid, acting, ss); dout(2) << ss.str() << dendl; ...... } |
PGBackend::be_compare_scrubmaps会调用PGBackend::be_compare_scrub_objects进行objects级别的比较,如果有不一致则打印日志并修改pg状态为inconsistent。
scrub之后如果不一致,则要进行repair,ceph pg repair xxx.yyy,repair可能要等很久,因为scrub在执行中,要排队,可以先把scrub关掉(ceph osd noscrub、ceph osd nodeep-scrub),过段时间就会开始repair,开始repair之后就可以再次开启scrub了。