ceph scrub流程分析




继续基于H版本代码分析。

主动触发

$ ceph osd deep-scrub 1

bool OSDMonitor::preprocess_command(MMonCommand *m)
{
  ......
  } else if ((prefix == "osd scrub" ||
          prefix == "osd deep-scrub" ||
          prefix == "osd repair")) {
    string whostr;
    cmd_getval(g_ceph_context, cmdmap, "who", whostr);
    vector<string> pvec;
    get_str_vec(prefix, pvec);

    if (whostr == "*") {
      ss << "osds ";
      int c = 0;
      for (int i = 0; i < osdmap.get_max_osd(); i++)
    if (osdmap.is_up(i)) {
      ss << (c++ ? "," : "") << i;
      mon->try_send_message(new MOSDScrub(osdmap.get_fsid(),
                          pvec.back() == "repair",
                          pvec.back() == "deep-scrub"),
                osdmap.get_inst(i));
    }
      r = 0;
      ss << " instructed to " << pvec.back();
    } else {
      long osd = parse_osd_id(whostr.c_str(), &ss);
      if (osd < 0) {
    r = -EINVAL;
      } else if (osdmap.is_up(osd)) {
    mon->try_send_message(new MOSDScrub(osdmap.get_fsid(),
                        pvec.back() == "repair",
                        pvec.back() == "deep-scrub"),
                  osdmap.get_inst(osd));
    ss << "osd." << osd << " instructed to " << pvec.back();
      } else {
    ss << "osd." << osd << " is not up";
    r = -EAGAIN;
      }
    }
  }
  ......
}

或者ceph pg scrub 0.1

bool PGMonitor::preprocess_command(MMonCommand *m)
{
  ......
  } else if (prefix == "pg scrub" || 
         prefix == "pg repair" || 
         prefix == "pg deep-scrub") {
    string scrubop = prefix.substr(3, string::npos);
    pg_t pgid;
    string pgidstr;
    cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr);
    if (!pgid.parse(pgidstr.c_str())) {
      ss << "invalid pgid '" << pgidstr << "'";
      r = -EINVAL;
      goto reply;
    }
    if (!pg_map.pg_stat.count(pgid)) {
      ss << "pg " << pgid << " dne";
      r = -ENOENT;
      goto reply;
    }
    if (pg_map.pg_stat[pgid].acting_primary == -1) {
      ss << "pg " << pgid << " has no primary osd";
      r = -EAGAIN;
      goto reply;
    }
    int osd = pg_map.pg_stat[pgid].acting_primary;
    if (!mon->osdmon()->osdmap.is_up(osd)) {
      ss << "pg " << pgid << " primary osd." << osd << " not up";
      r = -EAGAIN;
      goto reply;
    }
    vector<pg_t> pgs(1);
    pgs[0] = pgid;
    mon->try_send_message(new MOSDScrub(mon->monmap->fsid, pgs,
                    scrubop == "repair",
                    scrubop == "deep-scrub"),
              mon->osdmon()->osdmap.get_inst(osd));
    ss << "instructing pg " << pgid << " on osd." << osd << " to " << scrubop;
    r = 0;
  }
  ......
}

pg是发给主osd。

定时触发

void OSD::tick()
{
  ......
  if (is_active()) {
    // periodically kick recovery work queue
    recovery_tp.wake();

    if (!scrub_random_backoff()) {  // 随机等待
      sched_scrub();  // 调度scrub任务
    }
  ......
  tick_timer.add_event_after(1.0, new C_Tick(this));  // 每秒1次tick
}



void OSD::sched_scrub()
{
  utime_t now = ceph_clock_now(cct);
  bool time_permit = scrub_time_permit(now); // 检查scrub配置项,看当前时间点是否允许scrub
  bool load_is_low = scrub_load_below_threshold();  // 检查服务器load值,只有比较低的情况下才允许scrub
  dout(20) << "sched_scrub load_is_low=" << (int)load_is_low << dendl;

  OSDService::ScrubJob scrub;
  if (service.first_scrub_stamp(&scrub)) {
    do {
      dout(30) << "sched_scrub examine " << scrub.pgid << " at " << scrub.sched_time << dendl;

      if (scrub.sched_time > now) {
        // save ourselves some effort
        dout(10) << "sched_scrub " << scrub.pgid << " schedued at " << scrub.sched_time
             << " > " << now << dendl;
        break;
      }

      PG *pg = _lookup_lock_pg(scrub.pgid);
      if (!pg)
        continue;
      if (pg->get_pgbackend()->scrub_supported() && pg->is_active() && // pg状态检查
      (scrub.deadline < now || (time_permit && load_is_low))) {  // deadline到了或者允许执行
        dout(10) << "sched_scrub scrubbing " << scrub.pgid << " at " << scrub.sched_time
             << (pg->scrubber.must_scrub ? ", explicitly requested" :
                 (load_is_low ? ", load_is_low" : " deadline < now"))
             << dendl;
        if (pg->sched_scrub()) { // 交给pg处理
          pg->unlock();
          break;
        }
      }
      pg->unlock();
    } while (service.next_scrub_stamp(scrub, &scrub));
  }    
  dout(20) << "sched_scrub done" << dendl;
}

// returns true if a scrub has been newly kicked off
bool PG::sched_scrub()
{
  ......
  bool ret = true;
  if (!scrubber.reserved) {
    assert(scrubber.reserved_peers.empty());
    if (osd->inc_scrubs_pending()) {
      dout(20) << "sched_scrub: reserved locally, reserving replicas" << dendl;
      scrubber.reserved = true;  // 本地osd预留资源
      scrubber.reserved_peers.insert(pg_whoami);
      scrub_reserve_replicas();  // 预留副本osd
    } else {
      dout(20) << "sched_scrub: failed to reserve locally" << dendl;
      ret = false;
    }
  }
  if (scrubber.reserved) {
    if (scrubber.reserve_failed) { // 从预留失败
      dout(20) << "sched_scrub: failed, a peer declined" << dendl;
      clear_scrub_reserved();
      scrub_unreserve_replicas();
      ret = false;
    } else if (scrubber.reserved_peers.size() == acting.size()) {  // 全部osd预留成功
      dout(20) << "sched_scrub: success, reserved self and replicas" << dendl;
      if (time_for_deep) {
        dout(10) << "sched_scrub: scrub will be deep" << dendl;
        state_set(PG_STATE_DEEP_SCRUB); // 设置pg状态
      }
      queue_scrub();  // 准备scrub
    } else {  // 有副本还没预留成功
      // none declined, since scrubber.reserved is set
      dout(20) << "sched_scrub: reserved " << scrubber.reserved_peers << ", waiting for replicas" << dendl;
    }
  }
  return ret;
}


bool PG::queue_scrub()
{
  assert(_lock.is_locked());
  if (is_scrubbing()) {
    return false;
  }
  scrubber.must_scrub = false;
  state_set(PG_STATE_SCRUBBING); // 设置状态
  if (scrubber.must_deep_scrub) {
    state_set(PG_STATE_DEEP_SCRUB);  // 设置状态
    scrubber.must_deep_scrub = false;
  }
  if (scrubber.must_repair) {
    state_set(PG_STATE_REPAIR);  // 设置状态
    scrubber.must_repair = false;
  }
  osd->queue_for_scrub(this); // 丢队列OSDService的scrub_wq准备scrub
  return true;
}

主osd

scrub_wq是关联到disk_tp这个线程池的一个ThreadPool::WorkQueue,最终线程处理队列调用的是ThreadPool::worker这个函数。

scrub_wq具体初始化流程是OSD::OSD构造函数,之后传给OSDService的构造函数,

// OSD::OSD
  scrub_wq(
    this,
    cct->_conf->osd_scrub_thread_timeout,
    cct->_conf->osd_scrub_thread_suicide_timeout,
    &disk_tp),  // 关联的处理线程池
  ......  
  disk_tp(cct, "OSD::disk_tp", cct->_conf->osd_disk_threads, "osd_disk_threads"),

// 
OSDService::OSDService(OSD *osd) :
  scrub_wq(osd->scrub_wq),

接下来是disk_tp的初始化过程,在OSD::init()里调用了disk_tp.start();,之后的流程都是线程池初始化通用流程了。
ThreadPool::start() -> ThreadPool::start_threads() -> new WorkThread -> wt->create() -> Thread::create(0) -> Thread::try_create(0) -> pthread_create(&thread_id, thread_attr, _entry_func, (void*)this);。

其中线程的处理函数_entry_func是Thread::entry_wrapper()返回的entry()函数,这个在WorkThread里定义了:

  // threads
  struct WorkThread : public Thread {
    ThreadPool *pool;
    WorkThread(ThreadPool *p) : pool(p) {}
    void *entry() {
      pool->worker(this);
      return 0;
    }
  };

void ThreadPool::worker(WorkThread *wt)  // 线程池处理函数
{
  ......
  while (!_stop) {
    ......
    if (!_pause && !work_queues.empty()) {
      WorkQueue_* wq;
      int tries = work_queues.size();
      bool did = false;
      while (tries--) {
    last_work_queue++;
    last_work_queue %= work_queues.size();
    wq = work_queues[last_work_queue];

    void *item = wq->_void_dequeue();  // 调用WorkQueue的_void_dequeue(),再调用ScrubWQ的_dequeue()
    if (item) {
      processing++;
      ......
      wq->_void_process(item, tp_handle);  // 同_void_dequeue流程,调用pg->scrub(handle)
      _lock.Lock();
      wq->_void_process_finish(item);
      processing--;
      ......
}
void PG::scrub(ThreadPool::TPHandle &handle)
{
  ......
  chunky_scrub(handle);
  ......
}


void PG::chunky_scrub(ThreadPool::TPHandle &handle)
{
  ......
  bool done = false;
  int ret;

  while (!done) {
    dout(20) << "scrub state " << Scrubber::state_string(scrubber.state) << dendl;
  ......  // 有限状态机,一坨状态变化过程及对应处理方法,直到PG::Scrubber::FINISH状态退出while循环
          // PG::Scrubber::NEW_CHUNK状态会调用PG::_request_scrub_map函数找副本osd要scrub map,用来比较object的信息(大小、属性、校验和等)
          // PG::Scrubber::WAIT_REPLICAS 等待从osd返回scrub map(收到map的处理函数是sub_op_scrub_map),全部返回后进入COMPARE_MAPS状态
          // PG::Scrubber::BUILD_MAP状态会调用build_scrub_map_chunk生成主osd的scrub map(会区分是否为deep)
          // PG::Scrubber::COMPARE_MAPS比较scrub map
}

整个scrub流程,主要包括确定时间,确定scrub对象(pg、object),获取从osd的对应信息进行比较,这些信息都是保存在scrub map里的。

副本osd

主要就是生成scrub map(OSD::handle_rep_scrub->rep_scrub_wq.queue(m)->PG::replica_scrub)和预留scrub资源(PG::sub_op_scrub_reserve),比较都是在主osd进行的(PG::scrub_compare_maps())。

replica_scrub也是调用build_scrub_map_chunk生成的map(跟主osd一样)。

scrub map比较

void PG::scrub_compare_maps() 
{
  dout(10) << __func__ << " has maps, analyzing" << dendl;

  // construct authoritative scrub map for type specific scrubbing
  ScrubMap authmap(scrubber.primary_scrubmap);
  map<hobject_t, pair<uint32_t, uint32_t> > missing_digest;

  if (acting.size() > 1) {
    dout(10) << __func__ << "  comparing replica scrub maps" << dendl;

    stringstream ss;

    // Map from object with errors to good peer
    map<hobject_t, list<pg_shard_t> > authoritative;
    map<pg_shard_t, ScrubMap *> maps;

    dout(2) << __func__ << "   osd." << acting[0] << " has "
        << scrubber.primary_scrubmap.objects.size() << " items" << dendl;
    maps[pg_whoami] = &scrubber.primary_scrubmap;  // 加入主osd scrub map

    for (set<pg_shard_t>::iterator i = actingbackfill.begin();
     i != actingbackfill.end();
     ++i) {
      if (*i == pg_whoami) continue;
      dout(2) << __func__ << " replica " << *i << " has "
          << scrubber.received_maps[*i].objects.size()
          << " items" << dendl;
      maps[*i] = &scrubber.received_maps[*i];  // 加入从osd scrub map
    }

    // can we relate scrub digests to oi digests?
    bool okseed = (get_min_peer_features() & CEPH_FEATURE_OSD_OBJECT_DIGEST);
    assert(okseed == (scrubber.seed == 0xffffffff));

    get_pgbackend()->be_compare_scrubmaps( // PGBackend::be_select_auth_object选取权威objects,之后调用PGBackend::be_compare_scrub_objects比较scrub maps中的主从数据
      maps,
      okseed,
      state_test(PG_STATE_REPAIR),
      scrubber.missing,
      scrubber.inconsistent,
      authoritative,
      missing_digest,
      scrubber.shallow_errors,
      scrubber.deep_errors,
      info.pgid, acting,
      ss);
    dout(2) << ss.str() << dendl;
    ......
}

PGBackend::be_compare_scrubmaps会调用PGBackend::be_compare_scrub_objects进行objects级别的比较,如果有不一致则打印日志并修改pg状态为inconsistent。

scrub之后如果不一致,则要进行repair,ceph pg repair xxx.yyy,repair可能要等很久,因为scrub在执行中,要排队,可以先把scrub关掉(ceph osd noscrub、ceph osd nodeep-scrub),过段时间就会开始repair,开始repair之后就可以再次开启scrub了。