Ceph mds启动及切换流程




基于L版本代码(v12.2.12)分析。本人还在CephFs学习入门阶段,分析过程仅供参考,如有错误请谅解!

mds启动过程

单纯的启动过程比较简单,比较复杂的地方是如何加入mds集群,尤其是多主模式,这部分暂时没有分析。

因此只讨论单主模式的启动,可以分为主启动和备启动两种情况。

通用流程

-> ceph_mds.cc:main()
    -> global_init()  // 创建cct
    -> ceph_argparse_witharg  // 解析进程启动参数hot-standby
    -> pick_addresses  // 解析监听地址
    -> Messenger::create  // 创建messenger,之后就是设置一堆参数,最终绑定ip和端口:msgr->bind(g_conf->public_addr)
    -> global_init_daemonize  // daemon化,守护进程
    -> mc.build_initial_monmap()  // 创建monitor client并且根据配置项里的mon地址初始化monmap
    -> msgr->start()  // 启动messenger线程
    -> mds = new MDSDaemon()  // 创建MDSDaemon实例,后面初始化参数mds->init()
    -> init_async_signal_handler、*_signal_handler  // 注册信号处理函数
    -> msgr->wait()   // 阻塞,等待stop信号


-> MDSDaemon::init()
    -> messenger->add_dispatcher_tail(&beacon);  messenger->add_dispatcher_tail(this);  // 添加dispatcher,一个是心跳、一个是mds自己
    -> monc->set_messenger(messenger)  // mon client也用同一个messenger收发消息
    -> monc->init()  // monitor client初始化,添加dispatcher到messenger,准备与monitor的认证参数,下面monc->authenticate()是完成认证
    -> mgrc.init(); messenger->add_dispatcher_head(&mgrc)  // mgr client的初始化
    -> monc->sub_want("mdsmap", 0, 0);  monc->sub_want("mgrmap", 0, 0);  monc->renew_subs();  // 设置订阅map(mdsmap、mgrmap),并且发送订阅更新请求
    -> set_up_admin_socket()  // 设置admin socket
    -> timer.init()  // mds定时器,主要调用了MDSRankDispatcher::tick()
    -> beacon.init(mdsmap)  // 初始化心跳服务
    -> reset_tick()   // 启动mdsdaemon的定时任务(定期调用MDSRankDispatcher::tick())
// 这个函数比较重要,这个函数是用来定期检查mds运行状态的
-> MDSRankDispatcher::tick()
    -> heartbeat_reset()  // 重置心跳超时时间,一般在执行耗时较长的任务前调用
    -> check_ops_in_flight()  // 检查正在执行的op,slow request就是这里发现的
    -> mdlog->flush()  // mdlog落盘,也就是写入到osd,会调用到Journaler::flush()
    -> is_active() || is_stopping()状态下,执行:
        -> server->recall_client_state(nullptr, Server::RecallFlags::ENFORCE_MAX)  // 发消息给client清理cap,并且unpin一些mdcache里的inode,主要是是释放内存,ENFORCE_MAX表示释放超过最大caps数量的sessions的cap??
        -> mdcache->trim();  // 清理mdcache
        -> mdcache->trim_client_leases();  // 清理无用client链接
        -> mdcache->check_memory_usage();  // 检查是否需要释放内存,如需要则调用recall_client_state进行释放(不指定ENFORCE_MAX,应该是尽量释放?),如果用的是tcmalloc,还会调用ceph_heap_release_free_memory进一步释放内存。
        -> mdlog->trim();  // 清理mdlog
    -> is_clientreplay() || is_active() || is_stopping()状态下,执行:
        -> server->find_idle_sessions()  // 清理空闲的session,g_conf->mds_session_blacklist_on_timeout为true会调用mds->evict_client(),否则kill_session(),前者是加入黑名单,后者只是清理session
        -> server->evict_cap_revoke_non_responders()  // 也是调用evict_client
        -> locker->tick()  // 没看明白
    -> is_reconnect()状态会执行server->reconnect_tick(),reconnect_tick会在mds_reconnect_timeout之后检查session重连情况,对重连超时的session执行evict_client或者kill_session(检查的配置项同上)
    -> is_active()状态会执行(这部分应该都是多mds相关的操作):
        -> balancer->tick()  // 多mds负载均衡
        -> mdcache->find_stale_fragment_freeze()  // 没看明白,大概是说把freeze超时的目录分片unfreeze掉
        -> mdcache->migrator->find_stale_export_freeze()  // 没看明白,大概是执行迁移故障目录
    -> is_active() || is_stopping()状态下,执行update_targets(),这个没看明白,看注释是更新当前mds的mdsmap里的export_targets,主要是这个export_targets不了解是啥,看类型应该是mdsrank的计数集合,计数越大表示mds越重要?
    -> beacon.notify_health(this)  // 将mds的内部状态信息同步给心跳服务,上报给monitor

备mds启动

class Beacon : public Dispatcher
{
  ...
  MDSMap::DaemonState want_state = MDSMap::STATE_BOOT; // 默认state就是BOOT
  ...
}

启动后Beacon会上报mds状态给monitor,默认是BOOT:
/**
 * Call periodically, or when you have updated the desired state
 */
bool Beacon::_send()
{
  ...
    MMDSBeacon *beacon = new MMDSBeacon(
      monc->get_fsid(), mds_gid_t(monc->get_global_id()),
      name,
      epoch,
      want_state,   // 默认就是STATE_BOOT
      last_seq,
      CEPH_FEATURES_SUPPORTED_DEFAULT);

  beacon->set_standby_for_rank(standby_for_rank);
  beacon->set_standby_for_name(standby_for_name);
  beacon->set_standby_for_fscid(standby_for_fscid);
  beacon->set_standby_replay(standby_replay);
  beacon->set_health(health);
  beacon->set_compat(compat);
  // piggyback the sys info on beacon msg
  if (want_state == MDSMap::STATE_BOOT) {
    map<string, string> sys_info;
    collect_sys_info(&sys_info, cct);
    sys_info["addr"] = stringify(monc->get_myaddr());
    beacon->set_sys_info(sys_info);
  }
  monc->send_mon_message(beacon);
  last_send = now;
  return true;
}
}

mds启动后,第一次通过beacon上报mds信息给monitor的时候,monitor如果发现mds不在已有的mds_roles列表,则分配给mds的角色就是STATE_STANDBY。

bool MDSMonitor::prepare_beacon(MonOpRequestRef op)
{
  ...
  // boot?
  if (state == MDSMap::STATE_BOOT) {
    // zap previous instance of this name?
    if (g_conf->mds_enforce_unique_name) {
      bool failed_mds = false;
      while (mds_gid_t existing = pending.find_mds_gid_by_name(m->get_name())) {
        if (!mon->osdmon()->is_writeable()) {
          mon->osdmon()->wait_for_writeable(op, new C_RetryMessage(this, op));
          return false;
        }
        const MDSMap::mds_info_t &existing_info =
          pending.get_info_gid(existing);
        mon->clog->info() << existing_info.human_name() << " restarted";
        fail_mds_gid(pending, existing);
        failed_mds = true;
      }
      if (failed_mds) {
        assert(mon->osdmon()->is_writeable());
        request_proposal(mon->osdmon());
      }
    }

    // Add this daemon to the map
    if (pending.mds_roles.count(gid) == 0) {
      MDSMap::mds_info_t new_info;
      new_info.global_id = gid;
      new_info.name = m->get_name();
      new_info.addr = addr;
      new_info.mds_features = m->get_mds_features();
      new_info.state = MDSMap::STATE_STANDBY;
      new_info.state_seq = seq;
      new_info.standby_for_rank = m->get_standby_for_rank();
      new_info.standby_for_name = m->get_standby_for_name();
      new_info.standby_for_fscid = m->get_standby_for_fscid();
      new_info.standby_replay = m->get_standby_replay();
      pending.insert(new_info);
    }
  ...
}

// 后面monitor会进行Paxos决议,把pending状态的mdsmap持久化,然后分发给mds,mds收到mdsmap后,在MDSDaemon::handle_core_message()里根据消息类型调用MDSDaemon::handle_mds_map()进一步处理。

void MDSDaemon::handle_mds_map(MMDSMap *m)
{
  ...
  if (whoami == MDS_RANK_NONE) {
    if (mds_rank != NULL) {
      const auto myid = monc->get_global_id();
      // We have entered a rank-holding state, we shouldn't be back
      // here!
      if (g_conf->mds_enforce_unique_name) {
        if (mds_gid_t existing = mdsmap->find_mds_gid_by_name(name)) {
          const MDSMap::mds_info_t& i = mdsmap->get_info_gid(existing);
          if (i.global_id > myid) {
            ...
            return;
          }
        }
      }

      dout(1) << "Map removed me (mds." << whoami << " gid:"
              << myid << ") from cluster due to lost contact; respawning" << dendl;
      respawn();
    }
    // MDSRank not active: process the map here to see if we have
    // been assigned a rank.
    dout(10) <<  __func__ << ": handling map in rankless mode" << dendl;
    _handle_mds_map(oldmap);
  } 
  ...
}

void MDSDaemon::_handle_mds_map(MDSMap *oldmap)
{
  MDSMap::DaemonState new_state = mdsmap->get_state_gid(mds_gid_t(monc->get_global_id()));

  // Normal rankless case, we're marked as standby
  if (new_state == MDSMap::STATE_STANDBY) {
    beacon.set_want_state(mdsmap, new_state);
    dout(1) << "Map has assigned me to become a standby" << dendl;

    return;
  }
  ...
}

从上面可以看出STANDBY状态mds启动比较简单,基本啥都不用做。

主mds启动

monitor这边主mds启动有两种情况,一种是扩容mds,比如第一个mds启动,会从BOOT进入STARTING状态:

// Beacon上报的mds状态之后,MDSMonitor会通过tick函数进行定期的检查,根据mds集群状态决定下一步的动作,比如是否需要扩容mds集群,是否需要替换mds,是否需要进行故障恢复等。
void MDSMonitor::tick()
{
  ...
    // expand mds cluster (add new nodes to @in)?
  for (auto &p : pending.filesystems) {
    do_propose |= maybe_expand_cluster(pending, p.second->fscid);  // 扩容mds,单mds下一般为第一个mds,多mds则可能是其他新mds
  }
  ...
  if (since_last.count() >= g_conf->mds_beacon_grace) {
      auto &info = pending.get_info_gid(gid);
      dout(1) << "no beacon from mds." << info.rank << "." << info.inc
              << " (gid: " << gid << " addr: " << info.addr
              << " state: " << ceph_mds_state_name(info.state) << ")"
              << " since " << since_last.count() << "s" << dendl;
      // If the OSDMap is writeable, we can blacklist things, so we can
      // try failing any laggy MDS daemons.  Consider each one for failure.
      if (osdmap_writeable) {
        maybe_replace_gid(pending, gid, info, &do_propose, &propose_osdmap);   // 主mds超时,进行主备转换,用备替换主
      }
    }
  ...
  for (auto &p : pending.filesystems) {
    auto &fs = p.second;
    if (!fs->mds_map.test_flag(CEPH_MDSMAP_DOWN)) {
      do_propose |= maybe_promote_standby(pending, fs);  // 这个场景没太看明白,也是把备变成主,但是不知道是什么场景,跟上面的maybe_replace_gid有啥区别?
    }
  }
  ...
  if (do_propose) {
    propose_pending();  // 触发Paxos决议mdsmap
  }
}

void MDSMonitor::maybe_replace_gid(FSMap &fsmap, mds_gid_t gid,
    const MDSMap::mds_info_t& info, bool *mds_propose, bool *osd_propose)
{
  ...
  mds_gid_t sgid;
  if (info.rank >= 0 &&
      info.state != MDSMap::STATE_STANDBY &&
      info.state != MDSMap::STATE_STANDBY_REPLAY &&
      may_replace &&
      !fsmap.get_filesystem(fscid)->mds_map.test_flag(CEPH_MDSMAP_DOWN) &&
      (sgid = fsmap.find_replacement_for({fscid, info.rank}, info.name,
                g_conf->mon_force_standby_active)) != MDS_GID_NONE)
  {
    ...
    // Promote the replacement
    auto fs = fsmap.filesystems.at(fscid);
    fsmap.promote(sgid, fs, info.rank);

    *mds_propose = true;
  }
  ...
}


void FSMap::promote(
    mds_gid_t standby_gid,
    const std::shared_ptr<Filesystem> &filesystem,
    mds_rank_t assigned_rank)
{
  ...
  if (mds_map.stopped.erase(assigned_rank)) {
    // The cluster is being expanded with a stopped rank  // 我理解这就是启动首个mds情况
    info.state = MDSMap::STATE_STARTING;
  } else if (!mds_map.is_in(assigned_rank)) {
    // The cluster is being expanded with a new rank  // 这个是扩容mds集群情况(多主mds)
    info.state = MDSMap::STATE_CREATING;
  } else {
    // An existing rank is being assigned to a replacement  // 这个是主备转换(备转主)
    info.state = MDSMap::STATE_REPLAY;    // 注意这个状态,mds进程那边会用到
    mds_map.failed.erase(assigned_rank);
  }
  ...
}

MDS进程这边,主和备启动的差别是在MDSDaemon::handle_mds_map(MMDSMap *m)里:

void MDSDaemon::handle_mds_map(MMDSMap *m)
{
  ...
  if (whoami == MDS_RANK_NONE) {
    ...
  } else {
    ...
    // Did I previously not hold a rank?  Initialize!
    if (mds_rank == NULL) {
      mds_rank = new MDSRankDispatcher(whoami, mds_lock, clog,
          timer, beacon, mdsmap, messenger, monc,
          new FunctionContext([this](int r){respawn();}),
          new FunctionContext([this](int r){suicide();}));
      dout(10) <<  __func__ << ": initializing MDS rank "
               << mds_rank->get_nodeid() << dendl;
      mds_rank->init();
    }

    // MDSRank is active: let him process the map, we have no say.
    dout(10) <<  __func__ << ": handling map as rank "
             << mds_rank->get_nodeid() << dendl;
    mds_rank->handle_mds_map(m, oldmap);
  }
  ...
}
void MDSRankDispatcher::handle_mds_map(
    MMDSMap *m,
    MDSMap *oldmap)
{
  ...
  // did it change?
  if (oldstate != state) {
    dout(1) << "handle_mds_map state change "
        << ceph_mds_state_name(oldstate) << " --> "
        << ceph_mds_state_name(state) << dendl;
    beacon.set_want_state(mdsmap, state);

    if (oldstate == MDSMap::STATE_STANDBY_REPLAY) {
        dout(10) << "Monitor activated us! Deactivating replay loop" << dendl;
        assert (state == MDSMap::STATE_REPLAY);
    } else {
      // did i just recover?
      if ((is_active() || is_clientreplay()) &&
          (oldstate == MDSMap::STATE_CREATING ||
       oldstate == MDSMap::STATE_REJOIN ||
       oldstate == MDSMap::STATE_RECONNECT))
        recovery_done(oldstate);

      if (is_active()) {
        active_start();
      } else if (is_any_replay()) {   // 从这个状态开始,FSMap::promote里面设置的STATE_REPLAY状态
        replay_start();   // 经过状态申请消息及回调过程,完成boot的MDS_BOOT_INITIAL、MDS_BOOT_OPEN_ROOT、MDS_BOOT_PREPARE_LOG、MDS_BOOT_REPLAY_DONE阶段,进入MDSRank::replay_done(),这个函数里面最终会调用request_state(MDSMap::STATE_RECONNECT)(单主mds)或request_state(MDSMap::STATE_RESOLVE)(多主mds)
      } else if (is_resolve()) {  // mdsmap里返回STATE_RESOLVE状态后,进入resolve_start
        resolve_start();
      } else if (is_reconnect()) {  // mdsmap里返回STATE_RECONNECT状态后,进入reconnect_start,后面其他状态也类似
        reconnect_start();
      } else if (is_rejoin()) {
        rejoin_start();
      } else if (is_clientreplay()) {
        clientreplay_start();
      } else if (is_creating()) {
        boot_create();
      } else if (is_starting()) {
        boot_start();
      } else if (is_stopping()) {
        assert(oldstate == MDSMap::STATE_ACTIVE);
        stopping_start();
      }
    }
  }
  ...
}

通过上述分析可以发现,mds的状态转化是要经过多个mdsmap变化来完成的,每个阶段或状态都请求一个mdsmap,被决议通过后才会继续进行下一个阶段。

mds状态列表

可以参考: https://github.com/ceph/ceph/blob/master/doc/cephfs/mds-states.rst#mds-states