Ceph中的信号处理、调用栈打印机制分析及超时未自杀情况下线程调用栈打印功能




本文基于H版本代码分析(为啥老是H?因为生产用的就是这个)。

我们想在osd io线程卡住的时候,知道卡在什么地方了(还未自杀的情况),因此想通过打印调用栈的方式获取该信息。

2019-05-10 16:54:48.995601 7f4252824700  1 heartbeat_map is_healthy 'OSD::osd_op_tp thread 0x7f424b015700' had timed out after 15
2019-05-10 15:58:49.071668 7f2584187700  1 heartbeat_map reset_timeout 'OSD::osd_op_tp thread 0x7f2584187700' had suicide timed out after 60

超出一定时间osd进程就自杀了(通过assert异常)。

相关配置项:
1. osd_op_thread_timeout = 15
2. osd_op_thread_suicide_timeout = 150

自杀的情况,社区在新版本已经加入了调用栈打印功能,具体参考commit c90ccbf227c。

信号处理

信号注册

信号注册分为两类,一类是杂项信号:

// src\ceph_osd.cc:
int main(int argc, const char **argv) 
{
  vector<const char*> args;
  argv_to_vec(argc, argv, args);
  env_to_vec(args);

  vector<const char*> def_args;
  // We want to enable leveldb's log, while allowing users to override this
  // option, therefore we will pass it as a default argument to global_init().
  def_args.push_back("--leveldb-log=");

  global_init(&def_args, args, CEPH_ENTITY_TYPE_OSD, CODE_ENVIRONMENT_DAEMON, 0);
  ......
}

// src\global\global_init.cc:
void global_init(std::vector < const char * > *alt_def_args,
         std::vector < const char* >& args,
         uint32_t module_type, code_environment_t code_env, int flags)
{
  global_pre_init(alt_def_args, args, module_type, code_env, flags);

  // signal stuff
  int siglist[] = { SIGPIPE, 0 };
  block_signals(siglist, NULL);

  if (g_conf->fatal_signal_handlers)  // 注意这个配置项默认为true
    install_standard_sighandlers();
  ......
}

// src\global\signal_handler.cc:
void install_standard_sighandlers(void)
{
  // 一共注册了这些信号,并且参数加入了SA_RESETHAND | SA_NODEFER,其中SA_RESETHAND表示当调用信号处理函数时,将信号处理器设置为缺省值SIG_DFL, 这句话的意思也就是说当signo信号出现第二次的时候信号处理终止进程,因为第一次接受的时候是将其设置为SIG_DFL,SA_NODEFER表示不阻塞与处理多个signo相同的信号
  install_sighandler(SIGSEGV, handle_fatal_signal, SA_RESETHAND | SA_NODEFER);
  install_sighandler(SIGABRT, handle_fatal_signal, SA_RESETHAND | SA_NODEFER);
  install_sighandler(SIGBUS, handle_fatal_signal, SA_RESETHAND | SA_NODEFER);
  install_sighandler(SIGILL, handle_fatal_signal, SA_RESETHAND | SA_NODEFER);
  install_sighandler(SIGFPE, handle_fatal_signal, SA_RESETHAND | SA_NODEFER);
  install_sighandler(SIGXCPU, handle_fatal_signal, SA_RESETHAND | SA_NODEFER);
  install_sighandler(SIGXFSZ, handle_fatal_signal, SA_RESETHAND | SA_NODEFER);
  install_sighandler(SIGSYS, handle_fatal_signal, SA_RESETHAND | SA_NODEFER);
}

第二类是3个重要信号,SIGHUP、SIGINT、SIGTERM:

// src\ceph_osd.cc:
int main(int argc, const char **argv) 
{
  ......
  // install signal handlers
  init_async_signal_handler();
  register_async_signal_handler(SIGHUP, sighup_handler);
  register_async_signal_handler_oneshot(SIGINT, handle_osd_signal);
  register_async_signal_handler_oneshot(SIGTERM, handle_osd_signal);
  ......
}

SIGHUP是用来重新打开日志文件的,比如在定期滚动压缩日志文件之后刷新一下日志文件句柄。

另外两个是通知进程退出的,比如stop service的时候。

对于第二类信号来说但是上面的流程并不是真正的信号处理函数,或者说处理的并不是第一手的信号,而是经过一次传递之后的信号。

第一手的信号处理函数在这里注册:

// src\global\signal_handler.cc:
void SignalHandler::register_handler(int signum, signal_handler_t handler, bool oneshot)
{
  int r;

  assert(signum >= 0 && signum < 32);

  safe_handler *h = new safe_handler;

  r = pipe(h->pipefd);  // 创建pipe用来传递信号,每个信号一个pipe
  assert(r == 0);
  r = fcntl(h->pipefd[0], F_SETFL, O_NONBLOCK);
  assert(r == 0);

  h->handler = handler; // main函数传入的最终处理函数
  lock.Lock();
  handlers[signum] = h;
  lock.Unlock();

  // signal thread so that it sees our new handler
  signal_thread();

  // install our handler
  struct sigaction oldact;
  struct sigaction act;
  memset(&act, 0, sizeof(act));

  act.sa_handler = handler_hook;  // 第一层处理函数,处理第一手信号,主要是转发
  sigfillset(&act.sa_mask);  // mask all signals in the handler
  act.sa_flags = oneshot ? SA_RESETHAND : 0;

  int ret = sigaction(signum, &act, &oldact);
  assert(ret == 0);
}

信号传递

通过上面的代码分析可以看出,ceph中信号(SIGHUP、SIGINT、SIGTERM)并不是直接传递给信号处理函数的,二是经过了一个自定义的传递管道(pipe),相关传递过程如下:

// src\global\signal_handler.cc:
static void handler_hook(int signum)
{
  g_signal_handler->queue_signal(signum);
}

struct SignalHandler : public Thread {
  ......
  void queue_signal(int signum) {
    // If this signal handler is registered, the callback must be
    // defined.  We can do this without the lock because we will never
    // have the signal handler defined without the handlers entry also
    // being filled in.
    assert(handlers[signum]);
    int r = write(handlers[signum]->pipefd[1], " ", 1); // 通过写入一个空字符来传递信号
    assert(r == 1);
  }
  ......
}

有信息写入管道,就得有对应的线程从管道读取信息,才能传递给最终的处理函数,相关的线程就是:

struct SignalHandler : public Thread {
  ......
  SignalHandler()
    : stop(false), lock("SignalHandler::lock")
  {
    for (unsigned i = 0; i < 32; i++)
      handlers[i] = NULL;

    // create signal pipe
    int r = pipe(pipefd);
    assert(r == 0);
    r = fcntl(pipefd[0], F_SETFL, O_NONBLOCK);
    assert(r == 0);

    // create thread
    create();  // 在这里启动线程
  }
  ......
  // thread entry point
  void *entry() {
    while (!stop) {
      // build fd list
      struct pollfd fds[33];

      lock.Lock();
      int num_fds = 0;
      fds[num_fds].fd = pipefd[0];
      fds[num_fds].events = POLLIN | POLLERR;
      fds[num_fds].revents = 0;
      ++num_fds;
      for (unsigned i=0; i<32; i++) {
        if (handlers[i]) {
          fds[num_fds].fd = handlers[i]->pipefd[0];
          fds[num_fds].events = POLLIN | POLLERR;
          fds[num_fds].revents = 0;
          ++num_fds;
        }
      }
      lock.Unlock();

      // wait for data on any of those pipes
      int r = poll(fds, num_fds, -1);   // 通过poll函数轮询pipe句柄
      if (stop)
        break;
      if (r > 0) {
        char v;

        // consume byte from signal socket, if any.
        r = read(pipefd[0], &v, 1);

        lock.Lock();
        for (unsigned signum=0; signum<32; signum++) {
          if (handlers[signum]) {
            r = read(handlers[signum]->pipefd[0], &v, 1);
            if (r == 1) {
              handlers[signum]->handler(signum); // 从pipe读取信息成功,则调用真正的信号处理函数,其中pipe的编号就是要传递的信号
            }
          }
        }
        lock.Unlock();
      } else {
        //cout << "no data, got r=" << r << " errno=" << errno << std::endl;
      }
    }
    return NULL;
  }
  ......
}

信号处理

杂项信号处理函数是handle_fatal_signal,这个是直接处理,不经过pipe转发:

static void handle_fatal_signal(int signum)
{
  // This code may itself trigger a SIGSEGV if the heap is corrupt. In that
  // case, SA_RESETHAND specifies that the default signal handler--
  // presumably dump core-- will handle it.
  char buf[1024];
  snprintf(buf, sizeof(buf), "*** Caught signal (%s) **\n "
        "in thread %llx\n", sys_siglist[signum], (unsigned long long)pthread_self());
  dout_emergency(buf);  // 打印接收的信号
  pidfile_remove();  // 清理pidfile

  // avoid recursion back into logging code if that is where
  // we got the SEGV.
  if (!g_ceph_context->_log->is_inside_log_lock()) {
    // TODO: don't use an ostringstream here. It could call malloc(), which we
    // don't want inside a signal handler.
    // Also fix the backtrace code not to allocate memory.
    BackTrace bt(0);
    ostringstream oss;
    bt.print(oss);
    dout_emergency(oss.str());   // 打印线程调用栈

    // dump to log.  this uses the heap extensively, but we're better
    // off trying than not.
    derr < < buf << std::endl;
    bt.print(*_dout);
    *_dout << " NOTE: a copy of the executable, or `objdump -rdS <executable>` "
       < < "is needed to interpret this.\n"
       << dendl;

    g_ceph_context->_log->dump_recent();  // 打印最近的一批日志
  }

  reraise_fatal(signum);  // 重新抛出信号,这里基本上是让默认处理函数处理了,一般都是kill进程,输出coredump文件
}

SIGTERM/SIGINT信号则经过一次转发,当然最终的处理函数还是main中指定的:

// src\ceph_osd.cc
void handle_osd_signal(int signum)
{
  if (osd)
    osd->handle_signal(signum);
}

void OSD::handle_signal(int signum)
{
  assert(signum == SIGINT || signum == SIGTERM);
  derr < < "*** Got signal " << sys_siglist[signum] << " ***" << dendl;
  //suicide(128 + signum);
  shutdown();   // OSD::shutdown()
}
// kill ${pid-of-ceph-osd}
2019-05-13 12:16:55.307757 7f73aa6b3700 -1 osd.0 61845 *** Got signal Terminated ***
2019-05-13 12:16:55.307804 7f73aa6b3700  0 osd.0 61845 prepare_to_stop telling mon we are shutting down
2019-05-13 12:16:55.376437 7f73be4e0700  0 osd.0 61845 got_stop_ack starting shutdown
2019-05-13 12:16:55.376468 7f73aa6b3700  0 osd.0 61845 prepare_to_stop starting shutdown
2019-05-13 12:16:55.376481 7f73aa6b3700 -1 osd.0 61845 shutdown

osd进程正常退出之前,有很多工作要做,比如:
1. 通知mon osd down、close pg
2. superblock、FileJournal刷盘
3. 清理各种线程、messenger、handler
4. 调高日志级别并将内存中保存日志到日志文件

上述操作都是在OSD::shutdown()中处理的。

调用栈打印

进程正常退出,如kill -SIGTERM/-SIGINT等,是不会打印调用栈的,除非退出过程中出现其他异常,例如遇到Segmentation fault。

进程异常退出打印

异常退出分为两种情况,1是遇到assert(*)错误,2是收到kill信号(上面讲到的杂项信号),这两种情况都会打印线程调用栈。

首先看assert错误:

// src\include\assert.h:
#define assert(expr)                           \
  ((expr)                               \
   ? __CEPH_ASSERT_VOID_CAST (0)                    \
   : __ceph_assert_fail (__STRING(expr), __FILE__, __LINE__, __CEPH_ASSERT_FUNCTION))


// src\common\assert.cc:
namespace ceph {
  ......
  void __ceph_assert_fail(const char *assertion, const char *file, int line, const char *func)
  {
    ostringstream tss;
    tss << ceph_clock_now(g_assert_context);

    char buf[8096];
    BackTrace *bt = new BackTrace(1);
    snprintf(buf, sizeof(buf),
         "%s: In function '%s' thread %llx time %s\n"
         "%s: %d: FAILED assert(%s)\n",
         file, func, (unsigned long long)pthread_self(), tss.str().c_str(),
         file, line, assertion);
    dout_emergency(buf);

    // TODO: get rid of this memory allocation.
    ostringstream oss;
    bt->print(oss);
    dout_emergency(oss.str());  // 打印调用栈

    dout_emergency(" NOTE: a copy of the executable, or `objdump -rdS <executable>` "
           "is needed to interpret this.\n");

    if (g_assert_context) {
      lderr(g_assert_context) < < buf << std::endl;
      bt->print(*_dout);
      *_dout < < " NOTE: a copy of the executable, or `objdump -rdS <executable>` "
         < < "is needed to interpret this.\n" << dendl;

      g_assert_context->_log->dump_recent();  // 打印最近一批日志
    }

    throw FailedAssertion(bt);  // 抛异常
  }
  ......
}

收到杂项信号之后的流程上面已经分析过,杂项信号处理函数是handle_fatal_signal,都是在这里面处理的。

打印其他线程的调用栈

Ceph OSD IO线程健康状态检查机制

线程超时导致进程自杀前会打印超时线程的调用栈(这个c90ccbf227c commit之前只是通过assert打印出心跳检查线程本身的调用栈,而不是IO线程osd_op_tp的,所以没有意义)。

参考上面的提交,我们在超时情况下,但还没有达到配置的osd进程自杀时间时,主动打印超时线程的调用栈:

commit 3c711c45d990b5fe3fbbedb32659af282de5b7d7
Author: XXX <xxx @yyy.com>
Date:   Fri May 10 17:51:20 2019 +0800

    log backtrace for unhealthy thread

    CLDNBS-1143

    Change-Id: I910fdbe41d361e143153c9c1fcce3122ff226db2

diff --git a/src/common/HeartbeatMap.cc b/src/common/HeartbeatMap.cc
index f0cdd73..5d2528a 100644
--- a/src/common/HeartbeatMap.cc
+++ b/src/common/HeartbeatMap.cc
@@ -32,7 +32,8 @@ namespace ceph {
 HeartbeatMap::HeartbeatMap(CephContext *cct)
   : m_cct(cct),
     m_rwlock("HeartbeatMap::m_rwlock"),
-    m_inject_unhealthy_until(0)
+    m_inject_unhealthy_until(0),
+    m_last_user_defined_signal(0)  // added by xxx
 {
 }

@@ -72,6 +73,17 @@ bool HeartbeatMap::_check(heartbeat_handle_d *h, const char *who, time_t now)
     ldout(m_cct, 1) < < who << " '" << h->name < < "'"
                    << " had timed out after " << h->grace < < dendl;
     healthy = false;
+    /* added by xxx begin */
+    if (m_cct->_conf->heartbeat_unhealthy_backtrace_interval > 0) {
+      string w(who), is_healthy("is_healthy");
+      time_t last = time(NULL) - m_last_user_defined_signal;
+      if (w == is_healthy &&
+          (last >= m_cct->_conf->heartbeat_unhealthy_backtrace_interval)) {
+        m_last_user_defined_signal = time(NULL);
+        pthread_kill(h->thread_id, SIGUSR1);
+      }
+    }
+    /* added by xxx end */
   }
   was = h->suicide_timeout.read();
   if (was && was < now) {
diff --git a/src/common/HeartbeatMap.h b/src/common/HeartbeatMap.h
index 9af3aae..ea31f5a 100644
--- a/src/common/HeartbeatMap.h
+++ b/src/common/HeartbeatMap.h
@@ -76,6 +76,7 @@ class HeartbeatMap {
   CephContext *m_cct;
   RWLock m_rwlock;
   time_t m_inject_unhealthy_until;
+  time_t m_last_user_defined_signal;  // added by xxx
   std::list<heartbeat_handle_d*> m_workers;

   bool _check(heartbeat_handle_d *h, const char *who, time_t now);
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 5dabd20..d5b07bb 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -127,6 +127,7 @@ OPTION(keyring, OPT_STR, "/etc/ceph/$cluster.$name.keyring,/etc/ceph/$cluster.ke
 OPTION(heartbeat_interval, OPT_INT, 5)
 OPTION(heartbeat_file, OPT_STR, "")
 OPTION(heartbeat_inject_failure, OPT_INT, 0)    // force an unhealthy heartbeat for N seconds
+OPTION(heartbeat_unhealthy_backtrace_interval, OPT_FLOAT, 10.0)    // (added by xxx) < =0 means disable
 OPTION(perf, OPT_BOOL, true)       // enable internal perf counters

 OPTION(ms_type, OPT_STR, "simple")   // messenger backend
diff --git a/src/global/signal_handler.cc b/src/global/signal_handler.cc
index edca694..c191525 100644
--- a/src/global/signal_handler.cc
+++ b/src/global/signal_handler.cc
@@ -109,6 +109,19 @@ static void handle_fatal_signal(int signum)
   reraise_fatal(signum);
 }

+/* added by xxx begin */
+static void handle_user_defined_signal(int signum)
+{
+  char buf[1024];
+  snprintf(buf, sizeof(buf), "*** Caught signal (%s) **\n "
+           "in thread %llx\n", sys_siglist[signum], (unsigned long long)pthread_self());
+  derr << buf << std::endl;
+  BackTrace bt(0);
+  bt.print(*_dout);
+  *_dout << dendl;
+}
+/* added by xxx end */
+
 void install_standard_sighandlers(void)
 {
   install_sighandler(SIGSEGV, handle_fatal_signal, SA_RESETHAND | SA_NODEFER);
@@ -119,6 +132,13 @@ void install_standard_sighandlers(void)
   install_sighandler(SIGXCPU, handle_fatal_signal, SA_RESETHAND | SA_NODEFER);
   install_sighandler(SIGXFSZ, handle_fatal_signal, SA_RESETHAND | SA_NODEFER);
   install_sighandler(SIGSYS, handle_fatal_signal, SA_RESETHAND | SA_NODEFER);
+  /* added by xxx begin */
+  // We always install this handler, because heartbeat_unhealthy_backtrace_interval may be changed online,
+  // which means SIGUSR1 may be sent to a thread anytime, if we don't install this, osd proc will be killed.
+  // We'd like to send SIGUSR1 to a thread more than once, so SA_RESETHAND is not set.
+  install_sighandler(SIGUSR1, handle_user_defined_signal, SA_NODEFER);
+  /* added by xxx end */
 }

另外通过kill -USR1 $pid,也可以主动触发打印线程调用栈。

需要注意的是,打印出的调用栈可能并不是线程当前实际阻塞的位置,例如第二次打印的时候可能是在等锁或者限流场景(其他位置阻塞导致的)。第一次打印可能是准确的。

效果:
“`
2019-05-13 17:31:35.045017 7f9de6c96700 -1 *** Caught signal (User defined signal 1) **
in thread 7f9de6c96700

ceph version 0.94.6-18-g3c711c4 (3c711c45d990b5fe3fbbedb32659af282de5b7d7)
1: /usr/bin/ceph-osd() [0xa21fa4]
2: (()+0xf890) [0x7f9e078bf890]
3: (ReplicatedPG::do_op(std::tr1::shared_ptr<oprequest>&)+0) [0x8510f0]
4: (ReplicatedPG::do_request(std::tr1::shared_ptr<oprequest>&, ThreadPool::TPHandle&)+0x604) [0x7ea074]
5: (OSD::dequeue_op(boost::intrusive_ptr<pg>, std::tr1::shared_ptr<oprequest>, ThreadPool::TPHandle&)+0x3cf) [0x64107f]
6: (OSD::ShardedOpWQ::_process(unsigned int, ceph::heartbeat_handle_d*)+0x338) [0x6415b8]
7: (ShardedThreadPool::shardedthreadpool_worker(unsigned int)+0x89e) [0xb10bce]
8: (ShardedThreadPool::WorkThreadSharded::entry()+0x10) [0xb12d30]
9: (()+0x8064) [0x7f9e078b8064]
10: (clone()+0x6d) [0x7f9e05e1062d]
“`