Ceph peering流程14问




本文基于H版本代码编写(对比L版本看了下,大致流程没啥变化)。

问题1:peering是如何启动的?

有两个场景会触发peering流程:
1. 在pg创建时
2. 在OSD启动、停止导致OSDMap变化进而导致pg的acting set发生变化时

场景1对应的代码流程是:

void OSD::dispatch_op(OpRequestRef op)
{
  switch (op->get_req()->get_type()) {

  case MSG_OSD_PG_CREATE:
    handle_pg_create(op);  // 收到pg创建消息的处理函数
    break;
  ......
}

void OSD::handle_pg_create(OpRequestRef op)
{
  ......
  pg->handle_create(&rctx);
  ......
}

void PG::handle_create(RecoveryCtx *rctx)
{
  dout(10) << "handle_create" << dendl;
  Initialize evt;
  recovery_state.handle_event(evt, rctx);   // 发送Initialize事件
  ActMap evt2;
  recovery_state.handle_event(evt2, rctx);   // 发送ActMap事件
}

Initialize事件会使状态机(RecoveryMachine)进入Reset状态:

    struct Initial : boost::statechart::state< Initial, RecoveryMachine >, NamedState {
      Initial(my_context ctx);
      void exit();

      typedef boost::mpl::list <
        boost::statechart::transition< Initialize, Reset >,
      ......
    }

Reset状态定义了ActMap事件的反应函数:

    struct Reset : boost::statechart::state< Reset, RecoveryMachine >, NamedState {
      Reset(my_context ctx);
      void exit();

      typedef boost::mpl::list <
        ......
        boost::statechart::custom_reaction< ActMap >,
        ......
        > reactions;
      ......
      boost::statechart::result react(const ActMap&);
      ......
    }

boost::statechart::result PG::RecoveryState::Reset::react(const ActMap&)
{
  ......
  return transit< Started >();  // 进入Started状态
}

Started状态的初始子状态是Start状态:

struct Started : boost::statechart::state< Started, RecoveryMachine, Start >, NamedState {}


/*-------Start---------*/
PG::RecoveryState::Start::Start(my_context ctx)
  : my_base(ctx),
    NamedState(context< RecoveryMachine >().pg->cct, "Start")
{
  context< RecoveryMachine >().log_enter(state_name);

  PG *pg = context< RecoveryMachine >().pg;
  if (pg->is_primary()) {
    dout(1) << "transitioning to Primary" << dendl;
    post_event(MakePrimary());   // 当前osd是主
  } else { //is_stray
    dout(1) << "transitioning to Stray" << dendl; 
    post_event(MakeStray());     // 当前osd是从
  }
}

这里只关注MakePrimary,Start状态里定义了MakePrimary事件的反应函数:

    struct Start : boost::statechart::state< Start, Started >, NamedState {
      ......
      typedef boost::mpl::list <
        boost::statechart::transition< MakePrimary, Primary >,   // 转到Primary状态,Peering状态是其初始子状态
        boost::statechart::transition< MakeStray, Stray >
        > reactions;
    };

struct Primary : boost::statechart::state< Primary, Started, Peering >, NamedState {}

由此进入了peering流程。

场景2对应的代码流程是:

$7 = std::map with 22 elements = {["arch"] = "x86_64", ["back_addr"] = "10.173.32.8:6805/35212", 
  ["ceph_version"] = "ceph version 157c86f0d4735011af4a2b46dc803af1878e251 (7157c86f0d4735011af4a2b46dc803af1878e251)", 
  ["cpu"] = "Intel Core Processor (Broadwell)", ["distro"] = "Debian", ["distro_codename"] = "jessie", ["distro_description"] = "Debian GNU/Linux 8.6 (jessie)", 
  ["distro_version"] = "8.6", ["filestore_backend"] = "xfs", ["filestore_f_type"] = "0x58465342", ["front_addr"] = "10.173.32.8:6804/35212", 
  ["hb_back_addr"] = "10.173.32.8:6806/35212", ["hb_front_addr"] = "10.173.32.8:6807/35212", ["hostname"] = "ceph-l", 
  ["kernel_description"] = "#1 SMP Debian 3.16.36-1+deb8u1 (2016-09-03)", ["kernel_version"] = "3.16.0-4-amd64", ["mem_swap_kb"] = "0", 
  ["mem_total_kb"] = "16465048", ["os"] = "Linux", ["osd_data"] = "/var/lib/ceph/osd/ceph-1", ["osd_journal"] = "/var/lib/ceph/osd/ceph-1/journal", 
  ["osd_objectstore"] = "filestore"}

OSD::_send_boot()发给monitor的MOSDBoot消息内容,monitor收到之后进行Paxos决议,之后就认为osd已经up了,并且会给osd发送osdmap信息(版本号是osd down之前的epoch+1),osd收到osdmap应该就触发了peering流程。

OSD::ms_dispatch(Message *m) -> OSD::_dispatch(Message *m) -> handle_osd_map(static_cast(m)) -> consume_map() -> pg->queue_null(osdmap->get_epoch(), osdmap->get_epoch()):

void PG::queue_null(epoch_t msg_epoch,
            epoch_t query_epoch)
{
  dout(10) << "null" << dendl;
  queue_peering_event(
    CephPeeringEvtRef(new CephPeeringEvt(msg_epoch, query_epoch,
                     NullEvt())));
}

void PG::queue_peering_event(CephPeeringEvtRef evt)
{
  if (old_peering_evt(evt))
    return;
  peering_queue.push_back(evt);  // 入队NullEvt事件,这个空事件只为了让pg进入peering流程
  osd->queue_for_peering(this);
}

osd启动过程中OSD::init会调用load_pgs函数,进而调用handle_loaded:

void PG::handle_loaded(RecoveryCtx *rctx)
{
  dout(10) << "handle_loaded" << dendl;
  Load evt;
  recovery_state.handle_event(evt, rctx); // 发送Load事件给状态机,会使状态机从Initial状态进入Reset状态
}

之后OSD::process_peering_events函数(peering线程的实际处理函数)会调用OSD::advance_pg,再调用PG::handle_activate_map,发送ActMap事件给状态机(此时状态机处于Reset状态),状态机就进入了Started状态,之后就是MakePrimary/MakeStray了。

问题2:peering各个阶段是如何转换的?

peering主要分为GetInfo、GetLog、GetMissing、WaitUpThru这4个阶段(状态),4个阶段一般来说是串行的,他们的转换过程涉及到boost状态机的基础知识(参考问题7),总体来说状态转换图可以参考官方文档:
pg整体状态变化

这个图虽然看起来更清晰,但缺少了部分状态转换流程:peering状态变化

如在GetLog阶收到AdvMap事件后进入Reset状态的流程。

就代码层面来说,相关的转换过程举例如下:

进入peering初始状态

Peering状态的初始状态就是GetInfo(boost状态机基础知识,参考问题7):

struct Peering : boost::statechart::state< Peering, Primary, GetInfo >, NamedState {......}

GetInfo阶段经过以下几个主要步骤后进入下一个阶段(本阶段具体做的事情参考问题10):
1. pg->generate_past_intervals()
2. pg->build_prior(prior_set)
3. PG::RecoveryState::GetInfo::get_infos() // 如果prior_set为空,不需要从其他OSD获取信息,则直接进入GetLog阶段(post_event(GotInfo()))
4. PG::RecoveryState::GetInfo::react(const MNotifyRec& infoevt) // 获取其他OSD返回的pg info(接收到MNotifyRec事件)后进入GetLog阶段(post_event(GotInfo()))

另外一个问题:MNotifyRec(Message Notify Receive)事件是如何投递过来的?
首先我们要理解一点,根据peering线程模型(参考问题9)可知,所有peering相关操作都是在2个peering线程中执行的,因此事件投递要么直接通过这两个线程进行(无法异步),要么通过队列,然后由peering线程异步的从队列中获取事件并处理,Ceph中几乎所有耗时操作都是异步的,因此肯定是通过队列来投递事件,具体流程是:

void OSD::dispatch_op(OpRequestRef op)
{
  switch (op->get_req()->get_type()) {

  case MSG_OSD_PG_CREATE:
    handle_pg_create(op);
    break;

  case MSG_OSD_PG_NOTIFY:
    handle_pg_notify(op); // 这里分发pg notify消息
    break;
  ......
}


/** PGNotify
 * from non-primary to primary
 * includes pg_info_t.
 * NOTE: called with opqueue active.
 */
void OSD::handle_pg_notify(OpRequestRef op)
{
  MOSDPGNotify *m = (MOSDPGNotify*)op->get_req();
  assert(m->get_type() == MSG_OSD_PG_NOTIFY);
  dout(7) << "handle_pg_notify from " << m->get_source() << dendl;
  ......
  handle_pg_peering_evt(
      spg_t(it->first.info.pgid.pgid, it->first.to),
      it->first.info, it->second,
      it->first.query_epoch, pg_shard_t(from, it->first.from), true,
      PG::CephPeeringEvtRef(
    new PG::CephPeeringEvt(
      it->first.epoch_sent, it->first.query_epoch,
      PG::MNotifyRec(pg_shard_t(from, it->first.from), it->first, // 构造MNotifyRec事件
          op->get_req()->get_connection()->get_features())))
      );
}

/*
 * look up a pg.  if we have it, great.  if not, consider creating it IF the pg mapping
 * hasn't changed since the given epoch and we are the primary.
 */
void OSD::handle_pg_peering_evt(
  spg_t pgid,
  const pg_info_t& info,
  pg_interval_map_t& pi,
  epoch_t epoch,
  pg_shard_t from,
  bool primary,
  PG::CephPeeringEvtRef evt)
{
  ......
  pg->queue_peering_event(evt);  // 事件入队
}


void PG::queue_peering_event(CephPeeringEvtRef evt)
{
  if (old_peering_evt(evt))
    return;
  peering_queue.push_back(evt);  // 入的是peering_queue队列,注意这个队列是PG类的,和OSD::PeeringWQ类的那个同名队列不同
  osd->queue_for_peering(this);  // 这个是入队OSD::PeeringWQ::peering_queue,也即如果有新事件到来,就要继续被peering线程处理一次
}

class PG: {
  ......
  list<CephPeeringEvtRef> peering_queue;  // 这里保存的是pg收到状态机事件
  ......
}

class OSD: {
  ......
  // -- peering queue --
  struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
    list<PG*> peering_queue;  // 这里保存的是正在执行peering流程的pg列表,对应的处理线程是peering线程池(参考问题11)
    ......
  }
  ......
}

PG::peering_queue队列是也是在peering线程池处理函数OSD::process_peering_events里出队的:

void OSD::process_peering_events(//OSD::PeeringWQ::_dequeue获取一批pg(默认20个),然后来这里处理,处理完进入Peering第一阶段GetInfo
  const list<PG*> &pgs,          // 发请求给其他osd获取信息:querying info from osd.0,然后线程就结束了,其他阶段是通过事件触发的
  ThreadPool::TPHandle &handle
  )
{
  ......
    if (!advance_pg(curmap->get_epoch(), pg, handle, &rctx, &split_pgs)) {
      // we need to requeue the PG explicitly since we didn't actually
      // handle an event // 还会被线程池拿出来继续在这里处理
      peering_wq.queue(pg); // peering_wq就是类PeeringWQ的实例化对象,BatchWorkQueue::queue,PeeringWQ::_enqueue,重新加入到peering_queue
    } else {
      assert(!pg->peering_queue.empty());
      PG::CephPeeringEvtRef evt = pg->peering_queue.front();  // 取出第一个元素
      pg->peering_queue.pop_front();  // 出队
      pg->handle_peering_event(evt, &rctx);  // 投递状态机事件给状态机处理
    }
  ......
}

void PG::handle_peering_event(CephPeeringEvtRef evt, RecoveryCtx *rctx)
{
  ......
  recovery_state.handle_event(evt, rctx);
}

进入GetLog状态

上面已经提到进入GetLog状态的两个途径,1是如果prior_set为空,不需要从其他OSD获取信息,则直接进入GetLog阶段;2是收到MNotifyRec事件并处理完之后。这里就不多说明。

进入GetMissing状态

GetLog阶段会给拥有权威日志的OSD发送pg_query_t::LOG获取pg log请求,对方回复的日志消息事件是MLogRec(Message Log Receive),GetLog状态的pg收到这个事件后就进入了GetMissing阶段。

MLogRec事件的分发流程与MNotifyRec类似,这里也不多说明。

GetMissing阶段会检查pg log是否完整,如不完整则需要继续从actingbackfill的OSD集合里拉取日志,生成缺失的对象列表,并在后续的recovery/backfill阶段根据这个列表进行数据恢复。

进入WaitUpThru状态

WaitUpThru状态不是所有pg都会进入的,条件是两种情况:
1. pg已经在GetLog阶段获取了所有日志,没有缺失对象,GetMissing阶段就可以在第一阶段退出(不必给actingbackfill的OSD发请求),这时如果pg不需要通知monitor更新up_thru值,则不进入WaitUpThru状态,否则进入
2. pg有缺失对象,在收到OSD回复的pg日志后(MLogRec事件),还会再次做条件1的判断

进入WaitUpThru状态都是通过post_event(NeedUpThru())进行的。

如果不需要进入WaitUpThru状态,则pg直接进入Active状态(通过post_event(Activate(pg->get_osdmap()->get_epoch()))):
Peering状态里有定义事件转换:boost::statechart::transition< Activate, Active >

进入Active状态

要进入Active状态,必须是up_thru已经更新(或者原本就不需要更新)的情况,第一种很简单,进入WaitUpThru或者之前的GetMissing阶段就直接进入Active了。

这里讨论第二种up_thru需要更新的情况,在peering线程池处理函数OSD::process_peering_events中会轮询每个peering_queue中的pg,并调用OSD::advance_pg批量检查收到的增量osdmap(一般都是monitor发过来的),对每个增量版本都调用pg->handle_advance_map来更新到pg,并发送AdvMap(Advance Map)给状态机,该事件由PG::RecoveryState::Peering::react(const AdvMap& advmap) 反应函数处理,里面继续调用pg->adjust_need_up_thru(advmap.osdmap),这里会根据接收到的osdmap里的up_thru值,来检查是否已经更新到期望值,如果已更新,则把pg的need_up_thru值置为false,表示已更新,就可以OSD::advance_pg函数里pg->handle_advance_map之后通过调用pg->handle_activate_map(rctx)来发送ActMap(Active Map)事件,最终退出WaitUpThru阶段进入Active阶段。

问题3:peering为啥会阻塞客户端IO?

就原理层面简单来说是为了保证用户数据的完整性和一致性,试想一下如果pg的部分数据还没完全恢复或者各个副本之间还没达成一致,就接受客户端IO读写,这时将会发生读错误或者因写入新数据导致用户数据混乱不一致。

就代码层面来说,阻塞客户端IO的代码流程是在:

bool OSD::dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap)
{
  if (is_stopping()) {
    // we're shutting down, so drop the op
    return true;
  }

  epoch_t msg_epoch(op_required_epoch(op));
  if (msg_epoch > osdmap->get_epoch()) { // OSD启动后,peering之前如果client发IO请求到OSD,会在这里返回,因为客户端拿到的osdmap比osd的新
    Session *s = static_cast<Session*>(op->get_req()->
                       get_connection()->get_priv());
    if (s) {
      s->received_map_lock.Lock();
      epoch_t received_epoch = s->received_map_epoch;
      s->received_map_lock.Unlock();
      if (received_epoch < msg_epoch) {
        osdmap_subscribe(msg_epoch, false);
      }
      s->put();
    }
    return false;
  }
  ......
}

上面的dispatch_op_fast返回false之后,并不会真正的返回错误给客户端,而是会把这个op放入一个等待队列,等条件满足后会再次处理,也即在此过程中客户端IO是阻塞的。

void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
{
  assert(session->session_dispatch_lock.is_locked());
  assert(session->osdmap == osdmap);
  for (list<OpRequestRef>::iterator i = session->waiting_on_map.begin();
       i != session->waiting_on_map.end() && dispatch_op_fast(*i, osdmap);
       session->waiting_on_map.erase(i++));  // dispatch_op_fast返回false,则waiting_on_map不会清空。waiting_on_map是在更上层的函数里把客户端op添加进来的

  if (session->waiting_on_map.empty()) {
    clear_session_waiting_on_map(session);
  } else {
    register_session_waiting_on_map(session); // waiting_on_map不为空则需要等待osdmap更新之后才能处理客户端IO
  }
}

在OSD::consume_map()函数中会调用OSD::dispatch_session_waiting,从而在OSDMap更新后继续处理执行阻塞的客户端IO,OSD::handle_pg_peering_evt(peering事件投递函数)中也会通过调用wake_pg_waiters(pg, pgid)来间接调用到OSD::dispatch_session_waiting。

问题4:peering怎么挑选OSD的?

PG::RecoveryState::GetInfo::GetInfo会调用PG::build_prior生成prior_set,prior_set里保存的就是peering阶段获取pg info要用到的OSD列表(可能包含当前OSD)。

挑选过程可以简单的理解为pg当前acting和up set,以及历史的acting set(需要OSD当前处于up状态)。在我们的使用场景下,一般一个OSD down了之后,不会out(坏盘场景除外),并且会很快的处理掉这种故障,在OSD down期间,他对应的副本OSD down的概率极低(我们一般不会容忍2个副本OSD同时down的场景),因此历史的acting set一般都用不到(一个OSD down期间可以认为他上面的pg的acting和up set不会变化),当前的acting和up set就可以获取peering所需的全部pg info信息。

问题5:为啥要有peering流程?

ceph社区官方解释:http://docs.ceph.com/docs/master/dev/peering/

简单来说就是,peering是为了让pg知道他应该关联到哪些OSD?哪个是主?谁保存了哪些用户数据、元数据?怎么汇集这些数据以便在各种OSD故障场景下,保证用户数据的完整性和一致性?所有额外的那些概念(PG或OSD类的成员)都是为了达到这个目的而添加/设置的。

为了保证数据的完整性和一致性,需要在peering未成功完成之前阻止客户端IO。因此如果peering无法正常完成,pg将处于incomplete状态,无法提供IO能力。无法完成peering的原因根据peering的功能可以推测出来,比如pg知道了他关联的OSD,但这些OSD都是down的;或者这些OSD保存的元数据汇集起来仍然无法还原故障期间用户所做的IO操作,也就是无法保证用户数据的完整性和一致性。

设想一下如果没有peering流程,3副本情况最简单的故障场景下:
1. A B C — up
2. A B — up, C — down // 我们期望AB继续对客户端提供IO能力,如果没有peering,C是主的情况下,AB就无法对客户端提供IO读写能力
3. A B C — up // C启动之后,它保存的数据可能就是不完整的,甚至是与AB不一致的

当然更复杂的故障场景下,需要添加更多的持久化变量(例如up_set、acting_set、up_thru等)来记录相关的信息,以便帮助OSD进行故障恢复的决策,也有一些临时性的变量在故障恢复过程中发挥作用(如prior_set等)。

问题6:peering相关的可调参数有哪些?分别有啥用处?

  1. peering线程池中worker线程数:osd_op_threads,默认值2,即有几个线程并发处理peering pg,线程多了相对会加快peering处理速度,但可能对monitor节点或者OSD本地盘造成压力,如果一个OSD节点的pg数量较多,并且peering pg有堆积现象,可以考虑增加线程数。
  2. 线程处理pg时批量获取pg的数量:osd_peering_wq_batch_size,默认值20,即每个peering worker线程每次从peering_queue队列中获取多少pg来处理,每批pg都是串行处理的,如果有一个pg比较耗时,则可能影响同一批的其他pg
  3. pg检查OSDMap增量版本时每次检查的最大版本数量:osd_map_max_advance,默认值200,每个pg每次被peering线程处理时,都会检查最多这么多个OSDMap增量版本,多了单次处理相对会更耗时,少了就要多被peering线程调度几次才能处理完
  4. monitor Paxos决议间隔时间:paxos_propose_interval,默认值1.0,monitor的多个主从节点之间进行Paxos决议的间隔,这个间隔是为了防止OSDMap的版本号变化太快导致overflow,同时缓解monitor节点压力,这个间隔时间段内的所有osd状态变化可以一次性处理掉

问题7:peering使用的boost状态机是怎么回事?

boost状态机的参考文档:http://sns.hwcrazy.com/boost_1_41_0/libs/statechart/doc/tutorial.html

下面将结合peering相关代码来说明状态机的常用操作。

常用操作:

  1. 定义一个状态机
    状态机都是继承自boost::statechart::state_machine:
class PG {
  class RecoveryState {
    /* States */
    struct Initial; // 提前声明初始的状态,因为它只能在定义了状态机之后再定义状态详情,而定义状态机必须有一个初始状态
    class RecoveryMachine : public boost::statechart::state_machine< RecoveryMachine, Initial > {
      ......
    }
    // 上述类RecoveryMachine就是一个状态机,Initial是其初始状态
  ......
  RecoveryMachine machine;
  ......
  public:
    RecoveryState(PG *pg)
      : machine(this, pg), pg(pg), orig_ctx(0) {
      machine.initiate();   // 状态机初始化
    }
    ......
  } recovery_state;  // recovery_state在PG构造函数中初始化
  ......
}
  1. 定义一个状态
    状态继承自boost::statechart::state(也可以继承自boost::statechart::simple_state):
    struct Initial : boost::statechart::state< Initial, RecoveryMachine >, NamedState {
      Initial(my_context ctx);
      void exit();
      ......
    }
    // Initial就是一个状态,他属于状态机RecoveryMachine(每个状态都必须属于一个状态机)。
  1. 定义一个子状态
    子状态和状态定义上没有区别,只是在定义的时候可以为一个状态指定子状态,表示其从属关系,子状态也可以有自己的子状态,也可以没有。
    struct Start;
    struct Started : boost::statechart::state< Started, RecoveryMachine, Start >, NamedState {
      Started(my_context ctx);
      void exit();
      ......
    }
    // Started也是一个状态,他属于状态机RecoveryMachine,初始子状态是Start。
    ......

    struct GetInfo;
    struct Peering : boost::statechart::state< Peering, Primary, GetInfo >, NamedState {
      std::auto_ptr< PriorSet > prior_set;
      Peering(my_context ctx);
      void exit();
      ......
    };
    // Peering也是一个状态,其父状态是Primary(由父状态间接确定所属状态机),初始子状态是GetInfo。

    ......
    struct Start : boost::statechart::state< Start, Started >, NamedState {
      Start(my_context ctx);
      void exit();
      ......
    };
    // Start状态就没有子状态,只有父状态Started。
  1. 定义状态机事件
    事件是用来驱动状态机在状态间进行转换的,其继承自boost::statechart::event:
    struct MakePrimary : boost::statechart::event< MakePrimary > {
      MakePrimary() : boost::statechart::event< MakePrimary >() {}
    };
    // MakePrimary就是一个事件
  1. 定义事件反应
    事件反应是指状态机在某个状态下,接收到指定事件后,会把状态机转换到另外一个状态,或者执行指定的操作:
    struct Peering : boost::statechart::state< Peering, Primary, GetInfo >, NamedState {
      ......
      typedef boost::mpl::list <
        boost::statechart::custom_reaction< QueryState >,
        boost::statechart::transition< Activate, Active >,
        boost::statechart::custom_reaction< AdvMap >
        > reactions;
      ......
    };
    // 上述reactions就是定制化的事件反应,声明了QueryState事件和AdvMap事件的反应,transition则是定义了接收到Activate事件的状态变化,会把Peering状态转换到Active状态(也是一种事件反应的定义)
  1. 实现事件反应函数
    定制化的事件反应需要实现对应的反应函数,如上面定义的QueryState事件和AdvMap事件的custom_reaction反应,都需要实现具体的反应函数才行:
// 事件反应函数的名称统一为react,通过重载(参数不同)对应不同的事件
boost::statechart::result PG::RecoveryState::GetLog::react(const AdvMap& advmap)
{
  ......
  if (!advmap.osdmap->is_up(auth_log_shard.osd)) {
    dout(10) << "GetLog: auth_log_shard osd."
         << auth_log_shard.osd << " went down" << dendl;
    post_event(advmap);
    return transit< Reset >();
  }

  // let the Peering state do its checks.
  return forward_event();
}
// 上述反应函数对应的是AdvMap事件
  1. 丢弃事件
boost::statechart::result PG::RecoveryState::GetLog::react(const MLogRec& logevt)
{
  assert(!msg);
  if (logevt.from != auth_log_shard) {
    dout(10) << "GetLog: discarding log from "
         << "non-auth_log_shard osd." << logevt.from << dendl;
    return discard_event();
  }
  ......
}
  1. 投递事件
    投递事件有两种方式,一种是调用状态机的process_event()函数,一种是在自定义的事件反应函数里调用post_event(),两种方式都是向状态机投递相应的事件:
  class RecoveryState {
    ......
    void handle_event(const boost::statechart::event_base &evt,
              RecoveryCtx *rctx) {
      start_handle(rctx);
      machine.process_event(evt);
      end_handle();
    }
    ......
  }
boost::statechart::result PG::RecoveryState::GetLog::react(const MLogRec& logevt)
{
  ......
  dout(10) << "GetLog: received master log from osd" << logevt.from << dendl;
  msg = logevt.msg;
  post_event(GotLog());
  ......
}

// 上述post_event(GotLog())投递GotLog事件后会调用这个事件反应函数
boost::statechart::result PG::RecoveryState::GetLog::react(const GotLog&)
{
  dout(10) << "leaving GetLog" << dendl;
  ......
  return transit< GetMissing >();
}

投递事件之后,状态机会根据当前所处的状态以及当前状态下定义的事件反应函数对事件做出反应(也即调用对应的事件反应函数)。

  1. 转发事件
boost::statechart::result PG::RecoveryState::GetLog::react(const AdvMap& advmap)
{
  ......
  return forward_event(); // 会把AdvMap事件转发给GetLog的上层状态(Started/Primary/Peering/GetLog,因此上层状态是Peering)进行处理,如果上层状态没有实现对应的事件反应函数,则继续向上层转发直到状态机的最顶层
}
  1. 强制状态转换
boost::statechart::result PG::RecoveryState::GetLog::react(const GotLog&)
{
  dout(10) << "leaving GetLog" << dendl;
  ......
  return transit< GetMissing >();
}

需要注意的是,强制状态转换时必须搭配return关键词使用,否则可能导致未定义行为。原因是transit之后会调用当前状态的析构函数释放相关资源,如果在transit之后还有要执行的代码,则会导致未知行为。

  1. 从外部获取状态机内部信息
    可以通过定义事件反应函数来通过特定事件获取状态机内部信息:
// QueryState事件的自定义事件反应函数
boost::statechart::result PG::RecoveryState::Peering::react(const QueryState& q)
{
  PG *pg = context< RecoveryMachine >().pg;

  q.f->open_object_section("state");
  q.f->dump_string("name", state_name);
  q.f->dump_stream("enter_time") << enter_time;
  ......
  q.f->close_section();
  return forward_event();
}
// 上述事件反应是为perf dump服务的,用来通过OSD的UNIX domain socket接口查询状态机内部信息

通过ceph daemon osd.0 perf dump recoverystate_perf命令可以从OSD的UNIX domain socket接口查询状态机内部信息:

        "peering_latency": { // peering阶段总耗时统计,包含了下面4个阶段
            "avgcount": 11,  // 多少个pg进入过该阶段
            "sum": 13.586214755 // 阶段总耗时,除以avgcount就是每个pg的平均耗时,下同
        },
--
        "getinfo_latency": { // GetInfo阶段耗时统计
            "avgcount": 11,
            "sum": 0.233014045
        },
        "getlog_latency": { // GetLog阶段耗时统计
            "avgcount": 11,
            "sum": 0.000715519
        },
--
        "getmissing_latency": { // GetMissing阶段耗时统计
            "avgcount": 8,
            "sum": 0.000222941
        },
        "waitupthru_latency": { // WaitUpThru阶段耗时统计
            "avgcount": 8,
            "sum": 13.351894826
        }

perf dump相关数据记录过程(以GetInfo阶段为例):

void PG::RecoveryState::GetInfo::exit()
{
  context< RecoveryMachine >().log_exit(state_name, enter_time); // enter_time是在NamedState里定义,并在其构造函数里初始化的
  PG *pg = context< RecoveryMachine >().pg;
  utime_t dur = ceph_clock_now(pg->cct) - enter_time;
  pg->osd->recoverystate_perf->tinc(rs_getinfo_latency, dur);  // 通过ceph通用的perfcounter机制来累计次数和耗时
  pg->blocked_by.clear();
}

// GetInfo继承了NamedState
  struct NamedState {
    const char *state_name;  // 状态名称
    utime_t enter_time;  // 进入状态的时间
    const char *get_state_name() { return state_name; }
    NamedState(CephContext *cct_, const char *state_name_)
      : state_name(state_name_),
        enter_time(ceph_clock_now(cct_)) {}  // 初始化enter_time为当前时间
    virtual ~NamedState() {}
  };

问题8:peering有哪些阶段可能比较耗时?

准备阶段

之所以要讨论这个阶段,是因为在线下复现过程中,发现准备阶段也会影响peering耗时,其影响的peering阶段主要是GetInfo,原因如下:

// OSD::process_peering_events函数是peering线程池执行体,负责处理OSD的pg的peering流程,默认有两个线程,每次批量处理20个pg,其主要流程伪代码如下:
for pg in pgs:
    pg->lock();
    advance_pg(pg) or pg->handle_peering_event();
    pg->write_if_dirty();
    dispatch_context_transaction(pg);
    pg->unlock();

每个pg执行到pg->handle_peering_event()之后就进入了peering的各个阶段(根据事件不同状态机转换到不同状态也即peering的不同阶段),而peering线程池线程数量就2个(也即2个线程并发执行OSD::process_peering_events函数),如果两个线程都阻塞到OSD::process_peering_events函数的某一步(例如advance_pg或dispatch_context_transaction),则就会导致之前已经进入peering流程的pg耗时变长,磁盘限速复现场景下,可以通过日志看到dispatch_context_transaction是耗时较长的流程(在FileStore::op_queue_reserve_throttle函数里有限流操作,队列中op数量超出50个则需要等待),其影响的peering阶段主要是GetInfo。

GetInfo

GetInfo阶段本身可能耗时的地方在于从其他OSD(同属一个pg或者曾经同属一个,up+acting)获取pg_query_t::INFO信息过程,与每个OSD的交互相当于有两次的网络传输,和一次OSD读取数据。在收到OSD回复的MNotifyRec消息后会退出GetInfo阶段进入GetLog阶段。

其他主要流程如pg->generate_past_intervals()、pg->build_prior()、pg->proc_replica_info等都是OSD进程内的内存操作,正常情况下都是非常快速的。

GetLog

GetLog阶段本身耗时的地方也是与其他OSD(同属一个pg或者曾经同属一个,up+acting)交互流程,获取pg_query_t::LOG信息,在收到其他OSD回复的MLogRec消息后会退出GetLog流程,进入GetMissing阶段。其他流程如pg->proc_master_log也是OSD内部的内存操作。

GetMissing

GetMissing阶段本身耗时的地方也是与其他OSD(同属一个pg或者曾经同属一个,actingbackfill)交互流程,获取pg_query_t::LOG或pg_query_t::FULLLOG信息,在收到其他OSD回复的MLogRec消息后会退出GetMissing流程,根据是否需要更新up_thru决定进入WaitUpThru阶段或者Activate阶段。其他流程如pg->proc_replica_log也是OSD内部的内存操作。

WaitUpThru

通常情况下这个阶段都是要走的,因为OSD启动或者停止后都会进入peering流程(本身OSD或者其他OSD),流程结束后,都要更新OSD的up_thru值,以记录OSD上次peering正常结束的epoch版本。

该阶段主要是发送MOSDAlive消息给monitor(包含期望更新到的up_thru版本号),并等待monitor回复更新后的OSDMap,并没有其他额外的流程。

OSD::process_peering_events函数在遍历完所有pg后,会根据是否需要更新up_thru来进入queue_want_up_thru函数,这个函数会发送MOSDAlive消息给monitor。之后由monitor通过Paxos协议决议出新的OSDMap(包含OSD期望更新到的up_thru版本号),然后通过回调回复给OSD。这个决议过程需要写入磁盘进行持久化,并且还需要最长等待1s来防止OSDMap变更过于频繁导致epoch值变化太快(可能发生越界)、OSDMap数量太多占用更多的存储空间。

如果monitor存储元数据所用的磁盘IO能力较低(尤其是IOPS能力),就会导致磁盘写入等待时间过长,无法及时进行决议的持久化,从而影响OSD的waitupthru耗时。

问题9:peering相关的线程模型和数据结构是什么样的?

数据结构:

  // -- peering queue --
  // struct PeeringWQ是OSD类的一个成员,实例化了peering_wq成员
  struct PeeringWQ : public ThreadPool::BatchWorkQueue<pg> {
    list</pg><pg *> peering_queue;  // 正在peering的PG列表
    OSD *osd;    // osd对象指针
    set</pg><pg *> in_use;     // 正在peering线程处理的pg集合,在_dequeue(list</pg><pg *> *out)中添加pg,在_process_finish中清理
    PeeringWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp) // 构造函数
      : ThreadPool::BatchWorkQueue</pg><pg>(  // 继承了线程池父类,从而关联到线程池
    "OSD::PeeringWQ", ti, si, tp), osd(o) {}

    void _dequeue(PG *pg); // 出队单个pg,带下划线前缀表示是内部接口,只被父类BatchWorkQueue提供的相关外部接口封装使用,下同

    bool _enqueue(PG *pg) {  // 入队函数,被调用流程:main,OSD::init,OSD::consume_map,PG::queue_null,PG::queue_peering_event,
      pg->get("PeeringWQ");  // OSDService::queue_for_peering,ThreadPool::BatchWorkQueue</pg><pg>::queue
      peering_queue.push_back(pg);
      return true;
    }
    bool _empty(); // peering_queue队列是否为空
    void _dequeue(list</pg><pg *> *out);  // 批量出队pg,被ThreadPool::worker里的_void_dequeue调用
    void _process(  // peering线程处理函数,被ThreadPool::worker里的_void_process调用
      const list</pg><pg *> &pgs,  // pgs是OSD::PeeringWQ::_dequeue批量获取的
      ThreadPool::TPHandle &handle) {
      osd->process_peering_events(pgs, handle);   // peering主处理流程,批处理方式进行
      for (list</pg><pg *>::const_iterator i = pgs.begin();
       i != pgs.end();
       ++i) {
        (*i)->put("PeeringWQ");
      }
    }
    void _process_finish(const list</pg><pg *> &pgs); // 清理peering线程使用完毕的pg,并不意味着这些pg的peering过程已经结束,被ThreadPool::worker里的_void_process_finish调用
    void _clear() {
      assert(peering_queue.empty());
    }
  } peering_wq;

线程模型:

peering使用的也是ceph的通用线程池模型(PeeringWQ绑定了ThreadPool),实现了自己的队列出队、入队,及线程处理函数(struct PeeringWQ::_process),线程池初始化是在OSD类的构造函数里初始化的:

OSD::OSD(CephContext *cct_, ObjectStore *store_,
     ......
     const std::string &dev, const std::string &jdev) :
  ......
  osd_tp(cct, "OSD::osd_tp", cct->_conf->osd_op_threads, "osd_op_threads"), // 可以看出peering是有自己独立的线程池的,默认线程实例数量是2个,没有共用IO线程池
  peering_wq(
    this,
    cct->_conf->osd_op_thread_timeout,  // 线程处理超时告警时间,默认15s,注意这个15s是对单个pg来说的,每操作一个pg会reset一次
    cct->_conf->osd_op_thread_suicide_timeout,  // 线程处理超时自杀时间,默认150s
    &osd_tp),

整个线程池调用的最关键的处理函数就是osd->process_peering_events(pgs, handle)。

问题10:GetInfo阶段到底get的是哪些info?info用来干啥的?

GetInfo阶段主要做了4件事:
1. 计算past_interval
2. 构造prior_set
3. 给prior_set里的OSD发送查询pg info消息
4. 处理OSD返回的pg info

past_interval

pg->generate_past_intervals()生成past_interval的列表,保存在map<epoch_t,pg_interval_t> past_intervals中。

简单来说,past_interval就是pg的acting set和up set保持不变的一个时间段,这个时间段的开始和结束时间点都是用epoch来表示的(如果把epoch理解为时间戳则更容易理解),一旦acting和up set发生了变化,则就会生成一个新的interval,所谓past,顾名思义就是过去的,有过去的就有现在或者说当前的,当前的interval就是pg当前acting和up set一致的时间段,对应的名称是current_interval。

past_interval也不是全部的历史记录,也没有必要全部,实际上只关注OSD down掉的那部分时间段的即可,(为了各种异常会比较OSD的superblock里持久化了之前已经保存好的最老osdmap的epoch),在我们场景下,osd down了之后,pg会在degraded模式下运行(acting和up set中有2个osd),因此其past_intervals一般就一个interval,从osd down的那个epoch开始到osd up之前的那个epoch结束(从up那个epoch开始到当前最新的epoch就是current_interval)。

prior_set

由于我们的场景下past_intervals很少,因此prior_set也比较简单,基本可以认为是osd down掉之后另外两个osd就是prior_set,也即找这两个osd就可以恢复全部元数据和数据(这也不难理解,另外两个正常的osd确实保存了全部用户数据和pg元数据信息)。

pg info

pg info结构体定义如下:

struct pg_info_t {
  spg_t pgid;
  eversion_t last_update;    // last object version applied to store.
  eversion_t last_complete;  // last version pg was complete through.
  epoch_t last_epoch_started;// last epoch at which this pg started on this osd

  version_t last_user_version; // last user object version applied to store

  eversion_t log_tail;     // oldest log entry.

  hobject_t last_backfill;   // objects >= this and < last_complete may be missing

  interval_set<snapid_t> purged_snaps;

  pg_stat_t stats;

  pg_history_t history;
  pg_hit_set_history_t hit_set;
  ......
}

这些信息主要是记录pg的元数据的,并没有用户数据,通过对从prior_set里其他OSD获取的pg info和本地OSD的pg info进行比对,就能知道谁的pg保存了哪些(时间段或者说epoch版本序列)用户数据(pg log),从而在后续阶段利用对应的OSD进行恢复。

问题11:GetLog阶段到底get的是哪些log?log用来干啥的?

choose_acting

因为新的osd up了,pg的分布就可能发生变化,所以要生成新的acting set,但是acting set里面可能有需要backfill的osd,所以这个set就叫acting_backfill。
在根据pool的类型来调用PG::calc_replicated_acting(副本)或calc_ec_acting(ec)计算acting_backfill之前,需要先找出具有权威日志的osd(可能包含多个osd,会进行排序),在我们关注的场景下(启动、停止osd,并且noout),降级状态的2个副本其中之一为主,因此根据选择规则基本上选择的权威日志OSD就是当前的主OSD。

这个过程中还涉及到pg_temp(临时主)的选择过程,简单理解就是crush算法选择的新主不具备全部用户数据时,就需要先通过临时主对新主进行数据恢复,恢复完毕之后再切换到真正的新主。

get olog

如果选择出来的权威日志OSD就是当前OSD,则直接进入GetMissing阶段,也即不需要从权威日志OSD节点上获取pg log用于恢复数据(自己本身就有这些日志了)。

如果权威日志OSD不是自己,就需要给他发送pg_query_t::LOG查询消息,而对方回复的消息里包含的数据是:

class MOSDPGLog : public Message {

  static const int HEAD_VERSION = 4;
  static const int COMPAT_VERSION = 2;

  epoch_t epoch;
  /// query_epoch is the epoch of the query being responded to, or
  /// the current epoch if this is not being sent in response to a
  /// query. This allows the recipient to disregard responses to old
  /// queries.
  epoch_t query_epoch;

public:
  shard_id_t to;   // 目标OSD
  shard_id_t from; // 源OSD
  pg_info_t info;  // pg info
  pg_log_t log;    // pg log
  pg_missing_t missing; // 权威日志OSD当前pg下缺少的对象列表,后面用来恢复对端的数据
  pg_interval_map_t past_intervals;
  ......
}

收到上述信息之后,会调用pg->proc_master_log()对其进行处理,这个函数主要是把收到的权威pg log与本地pg log进行合并,从而让当前pg所在的OSD变成主OSD(可能还需要经历pg_temp临时主阶段)。

问题12:GetMissing阶段?

GetMissing的正常流程是会从actingbackfill里的OSD上获取pg log,然后将其与权威日志比较,从而计算出各个OSD上缺少的对象,在后面的recovery/backfill阶段进行恢复。但在我们关注的场景下(启动、停止osd,并且noout),只有启动OSD的时候可能在当前OSD有缺少的对象,另外两个OSD一般来说都是包含全部对象的,因此只需要在当前OSD上进行数据恢复即可(也就是说不需要从其他OSD获取任何日志了)。

PG::RecoveryState::GetMissing::GetMissing(my_context ctx)
  : my_base(ctx),
    NamedState(context< RecoveryMachine >().pg->cct, "Started/Primary/Peering/GetMissing")
{
  ......
    // 下面这两个条件在我们关注的场景下都是满足的:
    if (pi.last_update == pi.last_complete &&  // peer has no missing
        pi.last_update == pg->info.last_update) {  // peer is up to date
      // replica has no missing and identical log as us.  no need to
      // pull anything.
      // FIXME: we can do better here.  if last_update==last_complete we
      //        can infer the rest!
      dout(10) < < " osd." << *i << " has no missing, identical log" << dendl;
      pg->peer_missing[*i];
      continue;
    }
  ......
}

问题13:为啥要有WaitUpThru阶段?到底wait的是啥?

首先要理解up_thru引入的场景,简单来说就是up_thru是用来确认OSD上次正常完成peering流程的epoch版本,完成了peering流程就意味着OSD可能已经开始对外提供IO服务,就有可能有数据写入,有数据写入就意味着要检查pg上数据的完整性和一致性。如果没有这个字段,那么在下面的场景下(size=3,min_size=1),我们就无法判断OSD B上是否有写入过用户数据(如果直接认定B未写入过,就可能导致用户数据丢失):

epoch OSD up/down
101 A B C all up
102 B C A down
103 B C down
104 A C B down, AC up

上述假设场景下,epoch 103这个阶段,B可能是真的活着,也可能down了但还没报给monitor,如果B真的活着,那就有可能有数据写入,则在epoch 104的时候,AC虽然up了,也不能完成peering并接收客户端IO请求,否则在103到104这个时间段内用户数据可能丢失。

而如果引入up_thru这个概念,就能解决这个场景的困扰,只需要在每次peering完成时上报一次pg的up_thru版本号即可,如果103时B上报了up_thru为103,则表示他自己在单副本场景下运行过,可能有数据写入,否则up_thru就是102,表示B从来没自己单独对外提供IO服务过,因此在104阶段就AC可以完成peering并对外提供服务(C有最新的数据)。

有了上述理论基础,WaitUpThru阶段所做的工作和必要性就容易理解了。这个阶段就是更新pg的up_thru版本号并交给monitor进行决议并持久化保存,收到monitor回复的更新完毕的确认消息后,才可以表示peering真正的完成了。对应的代码流程是:

// OSD发送更up_thru请求给monitor
void OSD::process_peering_events(//OSD::PeeringWQ::_dequeue获取一批pg(默认20个),然后来这里处理,处理完进入Peering第一阶段GetInfo
  const list</pg><pg *> &pgs,          // 发请求给其他osd获取信息:querying info from osd.0,然后线程就结束了,其他阶段是通过事件触发的
  ThreadPool::TPHandle &handle
  )
{
  bool need_up_thru = false;
  ......
    need_up_thru = pg->need_up_thru || need_up_thru;
  ......
  if (need_up_thru)
    queue_want_up_thru(same_interval_since);
  ......
}

void OSD::queue_want_up_thru(epoch_t want)
{
  map_lock.get_read();
  epoch_t cur = osdmap->get_up_thru(whoami);
  if (want > up_thru_wanted) {
    ......
    send_alive();
  } else {  // 已经发送过更高版本的up_thru值,就不能再发送低版本的了,否则可能导致错误
    dout(10) < < "queue_want_up_thru want " << want << " <= queued " << up_thru_wanted 
         << ", currently " << cur << dendl;
  }
  map_lock.put_read();
}

void OSD::send_alive()
{
  if (!osdmap->exists(whoami))
    return;
  epoch_t up_thru = osdmap->get_up_thru(whoami);
  dout(10) < < "send_alive up_thru currently " << up_thru << " want " << up_thru_wanted << dendl;
  if (up_thru_wanted > up_thru) { // 再次确认发送的是最新版本
    up_thru_pending = up_thru_wanted;
    dout(10) < < "send_alive want " << up_thru_wanted << dendl;
    monc->send_mon_message(new MOSDAlive(osdmap->get_epoch(), up_thru_wanted)); // 发送的是MOSDAlive消息
  }
}

monitor端收到MOSDAlive消息的处理过程:

bool PaxosService::dispatch(PaxosServiceMessage *m)
{
  ......
  // preprocess
  if (preprocess_query(m)) 
    return true;  // easy!
  ......
  // update
  if (prepare_update(m)) {
    double delay = 0.0;
    if (should_propose(delay)) { // 这里主要是检查上次决议时间,如果已经超过paxos_propose_interval时长,则等待g_conf->paxos_min_wait(默认0.05s)之后开始决议,否则就等待够paxos_propose_interval(默认1s)之后再决议
      if (delay == 0.0) {
        propose_pending();
      } else {
        // delay a bit
        if (!proposal_timer) {
          proposal_timer = new C_Propose(this);
          dout(10) < < " setting proposal_timer " << proposal_timer << " with delay of " << delay << dendl;
          mon->timer.add_event_after(delay, proposal_timer);
        } else { 
          dout(10) < < " proposal_timer already set" << dendl;
        }
      }
    } else {
      dout(10) << " not proposing" << dendl;
    }
  }     
  return true;
}


bool OSDMonitor::preprocess_query(PaxosServiceMessage *m)
{
  ......
  case MSG_OSD_ALIVE:
    return preprocess_alive(static_cast<MOSDAlive*>(m));
  ......
}

bool OSDMonitor::preprocess_alive(MOSDAlive *m)
{
  ......
  dout(10) < < "preprocess_alive want up_thru " << m->want
       < < " from " << m->get_orig_source_inst() < < dendl;
  return false;
  ......
}

bool OSDMonitor::prepare_update(PaxosServiceMessage *m)
{
  ......
  case MSG_OSD_ALIVE:
    return prepare_alive(static_cast<MOSDAlive*>(m));
  ......
}

bool OSDMonitor::prepare_alive(MOSDAlive *m)
{
  ......
  dout(7) < < "prepare_alive want up_thru " << m->want < < " have " << m->version
      < < " from " << m->get_orig_source_inst() < < dendl;
  pending_inc.new_up_thru[from] = m->version;  // set to the latest map the OSD has
  wait_for_finished_proposal(new C_ReplyMap(this, m, m->version));  // 等待决议完成后通过回调函数_reply_map回复OSDMap
  return true;
}


void OSDMonitor::_reply_map(PaxosServiceMessage *m, epoch_t e)
{
  dout(7) < < "_reply_map " << e
      << " from " << m->get_orig_source_inst()
      < < dendl;
  send_latest(m, e); // 最终调用OSDMonitor::send_incremental
}

void OSDMonitor::send_incremental(PaxosServiceMessage *req, epoch_t first)
{
  ......
  // send some maps.  it may not be all of them, but it will get them
  // started.
  epoch_t last = MIN(first + g_conf->osd_map_message_max, osdmap.get_epoch());
  MOSDMap *m = build_incremental(first, last);
  m->oldest_map = get_first_committed();
  m->newest_map = osdmap.get_epoch();
  mon->send_reply(req, m);
  ......
}

WaitUpThru状态收到monitor回复的消息后就会进入Active状态。

问题14:日志合并是怎么做的?

简单来说,要先看本地和权威日志是否有重叠,没有重叠就没办法合并,只能通过backfill全量恢复本地数据。
如果有重叠,就类似两个保存k-v对的有序链表的合并及去重操作(复杂的地方在于同一个对象两份日志可能有冲突,即k
ey相同value不同),大致流程应该是:
对最老的日志,权威日志的更老,就以权威日志为准,把最老的那段时间补上(尾巴接上);
对最新的日志,权威日志如果更新,说明他保存的数据更全更新,因此以他为准,把本地OSD缺少的对象记录下来,后续的流程进行恢复(recovery);
对冲突的日志,仍然以权威日志为准,算出本地需要更新的对象,后续流程进行恢复(recovery)。