第19章 スレッド

概要

Rubyインターフェイス

そういえば実際にRubyでスレッドを使うコードをまだちゃんと見せていなかっ たような気がする。たいしたものではないが、いちおう紹介しておこう。

Thread.fork {
    while true
      puts 'forked thread'
    end
}
while true
  puts 'main thread'
end

このプログラムを実行すればちゃんと"forked thread""main thread"が ぐちゃまぜになって出力されてくる。

もちろんただ複数のスレッドを作る以外にもいろいろ制御の方法はある。 Javaのようにsynchronize予約語があったりはしないが、 MutexQueueMonitorといった常識的なプリミティブはもちろん 用意されているし、スレッド自体の操作のためには以下のようなAPIが使える。

▼スレッドAPI

Thread.pass誰か他のスレッドに実行を移す。
Thread.kill(th)スレッドthを終了させる。
Thread.exit自スレッドを終了させる。
Thread.stop自スレッドを一時停止する。
Thread#joinそのスレッドが終了するのを待つ。
Thread#wakeup一時停止しているスレッドを起こす。

rubyスレッド

スレッドは「みんな一斉に動く」というのが建前だが、実際には少しの時間ず つ順番に動いているわけだ。厳密に言えばマルチCPUのマシンでそれなりに工 夫すれば同時に二つ動いたりもできるが、それでもCPUの数以上のスレッドが あればやはり順番に動かなければならない。

つまりスレッドを作り出すためにはどこかで誰かがスレッドを切り替えてやら ないといけないわけだが、その手法には大きく分けて二種類ある。 カーネルレベルスレッドと ユーザレベルスレッドだ。これはどちらも読んで字の如く、ス レッドというものをカーネルで作るかユーザレベルで作るかという違いである。 カーネルレベルならマルチCPUを生かして複数のスレッドを同時に動かしたり もできる。

ではrubyのスレッドはどうなのかと言えば、これはユーザレベルスレッドで ある。 そして(それゆえ)同時に動けるスレッドは厳密にただ一つと制限されている。

プリエンプティブか

もう少し詳しくrubyスレッドの特徴について話そう。スレッドに関する別の視 点としてしては「プリエンプティブ(preemptive)かどうか」という点が ある。

「スレッド(機構)がプリエンプティブ(preemptive)である」と言った場合、 そのスレッドは使っているほうがスレッド切り替えを明示的に行わなくても勝 手にスレッドを切り替えてくれるということである。これを逆に見れば、スレッ ド切り替えのタイミングを制御できないということになる。

一方ノンプリエンプティブなスレッド機構では、スレッドを使っているほうが 明示的に「次のスレッドに制御権を渡してあげてもいいよ」と言わない限り スレッドが切り替わらない。また逆に見れば、スレッドがいつ切り替わ可能性 のある場所がはっきりわかるということにある。

この区別はプロセスにもあって、その場合はプリエンプティブなほうが「偉い」 ことになっている。例えばとあるプログラムにバグがあって無限ループに陥っ てしまうとプロセスが切り替わらなくなる。つまり一つのユーザプログラムが システム全体を停止できてしまうわけで、それはよろしくない。でもって、 Windows 3.1は基本がMS-DOSだったのでプロセス切り替えがノンプリエンプ ティブだったけどもWindows 95はプリエンプティブだ。よってシステムはより 堅牢である。従ってWindows 95は3.1より「偉い」という話になる。

そこでrubyのスレッドはどうかと言うと、Rubyレベルではプリエンプティブで、 Cレベルではノンプリエンプティブである。つまりCのコードを書いているときは スレッドが切り替わるタイミングをほぼ確実に特定できる。

どうしてこうなっているのだろうか。スレッドは確かに便利なものだが、使う ほうにもそれなりの心構えが必要になる。即ちコードがスレッドに対応してい なければならない(マルチスレッドセーフでなければいけない)。つまりCレ ベルでもプリエンプティブにするならば使うCのライブラリ全てがマルチスレッ ド対応になっていなければならないということである。

しかし現実にCのライブラリはまだまだスレッドセーフでないものも多い。せっ かく苦労して拡張ライブラリを書きやすくしたのに、スレッド対応を必須にし て使えるライブラリの数を減らしてしまったら意味がない。だからCレベルで はノンプリエンプティブ、というのはrubyにとっては合理的な選択なのである。

管理体制

rubyスレッドはCレベルではノンプリエンプティブだとわかった。つまりある 程度動いたら自発的に実行権を手離すわけだ。それでは今まさに実行中のスレッ ドが実行をやめようとしていると考えてほしい。次は誰に実行権を渡せばいい のだろうか。いやそもそもその前に、スレッドはrubyの内部ではどんなふうに 表現されているのか知らないとどうにもならない。スレッドを管理するための 変数とデータ型を見ておこう。

▼スレッドを管理する構造

 864  typedef struct thread * rb_thread_t;
 865  static rb_thread_t curr_thread = 0;
 866  static rb_thread_t main_thread;

7301  struct thread {
7302      struct thread *next, *prev;

(eval.c)

struct threadはとある理由からとても大きいので今重要な部分だけに絞った。 そんなわけで二つしかないのだが、このnextprevというメンバ名、そしてそ の型がrb_thread_tであることからrb_thread_tは双方向リンクリストでつなが れていると考えられる。しかも実はただの双方向リストではなく、両端がつな がっている。つまり環状なのだ。ここは大きなポイントである。スタティック 変数のmain_threadcurr_threadも加えるとデータ構造全体は 図図1のようになる。

(thread)
図1: スレッドを管理するデータ構造

main_thread(メインスレッド)とはプログラムが起動したときに存在してい るスレッド、つまり「最初の」スレッドのことである。curr_threadは当然 current thread、つまり現在動いているスレッドのことである。 main_threadの値はプロセス稼働中は変わらないが、curr_threadの値はどんどん 変わっていく。

こうしてリストが輪になっていると「次のスレッド」を選ぶ作業が簡単になる。 単にnextリンクをたぐればいいのだ。それだけである程度全てのスレッドを 平等に動かすことができる。

スレッドを切り替えるとは

ところで、そもそもスレッドとは何なのだろう。 あるいは、どうすればスレッドが切り替わったと言えるのだろうか。

これはとても難しい問題だ。プログラムとは何か、オブジェクトとは何か、と いうのと似たようなもので、普段適当に「感じ」で理解しているものについて 問われるとスッキリとは答えられない。特に、スレッドとプロセスはどこがど う違うのか、なんて聞かれると困ってしまう。

それでも現実的な範囲で言えばある程度は言える。スレッドに必要なものは実 行のコンテキストだ。rubyにおいてコンテキストと言えば、これまで見てきた ように、ruby_frameruby_scoperuby_classなどであった。 またrubyはマシ ンスタックの上にruby_frameの実体を確保しているし、拡張ライブラリが使っ ているスタック領域もあるだろうから、マシンスタックもRubyプログラムのコ ンテキストとして必要である。それと最後に、CPUのレジスタも欠かせない。 これら様々なコンテキストがスレッドを成り立たせる要素であり、これを切り 替えることがまた、スレッドの切り替えであるわけだ。あるいは コンテキストスイッチ(context switch)とも言う。

コンテキストスイッチの方法

あとはどうやってコンテキストを切り替えるかという話になる。ruby_scoperuby_classを変更するのは簡単だ。ヒープにでも領域を確保しておいて地道に 退避すればよい。CPUのレジスタもなんとかなる。setjmp()を使うと 保存・書き戻しができるからだ。そのための領域はどちらもrb_thread_tに 用意されている。

struct thread(一部)

7301  struct thread {
7302      struct thread *next, *prev;
7303      jmp_buf context;

7315      struct FRAME *frame;        /* ruby_frame */
7316      struct SCOPE *scope;        /* ruby_scope */
7317      struct RVarmap *dyna_vars;  /* ruby_dyna_vars */
7318      struct BLOCK *block;        /* ruby_block */
7319      struct iter *iter;          /* ruby_iter */
7320      struct tag *tag;            /* prot_tag */
7321      VALUE klass;                /* ruby_class */
7322      VALUE wrapper;              /* ruby_wrapper */
7323      NODE *cref;                 /* ruby_cref */
7324
7325      int flags;  /* scope_vmode / rb_trap_immediate / raised */
7326
7327      NODE *node;                 /* rb_current_node */
7328
7329      int tracing;                /* tracing */
7330      VALUE errinfo;              /* $! */
7331      VALUE last_status;          /* $? */
7332      VALUE last_line;            /* $_ */
7333      VALUE last_match;           /* $~ */
7334
7335      int safe;                   /* ruby_safe_level */

(eval.c)

このようにruby_frameruby_scopeに対応しているらしいメンバがある。 レジスタを保存するためのjmp_bufもある。

さて、問題はマシンスタックである。これをすりかえるにはどうしたらいいだろう。

仕組みに対して最も素直なのはスタックの位置(先端)を指定しているポイン タを直接書き換えることである。普通はCPUのレジスタにそれがある。専用レ ジスタがあることもあれば汎用レジスタを一つそのために確保するという場合 もあるのだが、とにかくどこかにはある。面倒なので以下このポインタのこと をスタックポインタと呼んでおこう。これを変更してしまえば別の領域を スタックにできるのは当然だ。しかし当然ながらこの方法はCPUとOSごとに対 処する必要があるわけで、移植性を確保するのは非常に大変である。

そこでrubyではかなり暴力的な手段でマシンスタックのすり替えを実装してい る。スタックポインタがだめなら、スタックポインタが指す先を変更してしま おうというのである。スタックを直接いじることができるのは既にガーベージコ レクタのところで見てきたから、あとはやることを少し変えるだけだ。 スタックを保存しておく場所もstruct threadにちゃんとある。

struct thread(一部)

7310      int   stk_len;      /* スタックの長さ */
7311      int   stk_max;      /* stk_ptrに割り当てたメモリのサイズ */
7312      VALUE*stk_ptr;      /* スタックのコピー */
7313      VALUE*stk_pos;      /* スタックの位置 */

(eval.c)

解説の進めかた

以上、いろいろ話したが要点は三点にまとめられる。

コンテキストを切り替えるか、である。それがそのまま本章のポイントにも なる。以下ではこの三点それぞれについて一節を使って話してゆく。

トリガー

まずは第一点、いつスレッドは切り替わるかだ。言い換えれば、 スレッドが切り替わる原因は何かということになる。

I/O待ち

例えばIO#getsIO#readを呼んで何か読み込もうとする場合、 読み込みにはか なり時間がかかると思われるので、その間は他のスレッドを動かしておいたほ うがいいはずだ。つまりここで強制切り替えが必要になる。以下はgetcのCイ ンターフェイスである。

rb_getc()

1185  int
1186  rb_getc(f)
1187      FILE *f;
1188  {
1189      int c;
1190
1191      if (!READ_DATA_PENDING(f)) {
1192          rb_thread_wait_fd(fileno(f));
1193      }
1194      TRAP_BEG;
1195      c = getc(f);
1196      TRAP_END;
1197
1198      return c;
1199  }

(io.c)

READ_DATA_PENDING(f)はそのファイルのバッファの中身がまだあるかチェック するマクロである。バッファの中身があったら、待ち時間ゼロで動けるわけだ から、すぐに読む。空だったら時間がかかるのでrb_thread_wait_fd()を呼ぶ。 これがスレッド切り替えの間接的な要因である。

rb_thread_wait_fd()が「間接的」と言うなら「直接的」な要因もないとまず い。それは何か。rb_thread_wait_fd()の中を見てみよう。

rb_thread_wait_fd()

8047  void
8048  rb_thread_wait_fd(fd)
8049      int fd;
8050  {
8051      if (rb_thread_critical) return;
8052      if (curr_thread == curr_thread->next) return;
8053      if (curr_thread->status == THREAD_TO_KILL) return;
8054
8055      curr_thread->status = THREAD_STOPPED;
8056      curr_thread->fd = fd;
8057      curr_thread->wait_for = WAIT_FD;
8058      rb_thread_schedule();
8059  }

(eval.c)

最後の一行にrb_thread_schedule()というのがある。この関数が「直接的な原 因」だ。rubyのスレッドの実装の核となる関数であり、次のスレッドの選定と 切り替えを行っている。

なぜこの関数がそういう役割なのかわかるかと言うと、筆者の場合は、スレッ ドのスケジューリング(scheduling)という言葉があるのを前もって知ってい たからである。もし知らなくても今覚えたので次からは気付くことができる というわけだ。

それでこの場合は単に他のスレッドに制御を移すだけでなく自分は停止してし まう。しかも「読み込みが終わるまで」という明確な期限付きだ。だからその 要望をrb_thread_schedule()に伝えなくてはならない。それがcurr_threadの メンバにいろいろ代入しているあたりだ。停止の理由をwait_forに、起こすと きに使う情報をfdに、それぞれ入れる。

別スレッド待ち

rb_thread_schedule()のタイミングでスレッドが切り替わるとわかれば、今度 は逆にrb_thread_schedule()のあるところからスレッドが切り替わる地点を見 付けることができる。そこでスキャンしてみると、rb_thread_join()という関 数で発見した。

rb_thread_join()(一部)

8227  static int
8228  rb_thread_join(th, limit)
8229      rb_thread_t th;
8230      double limit;
8231  {

8243          curr_thread->status = THREAD_STOPPED;
8244          curr_thread->join = th;
8245          curr_thread->wait_for = WAIT_JOIN;
8246          curr_thread->delay = timeofday() + limit;
8247          if (limit < DELAY_INFTY) curr_thread->wait_for |= WAIT_TIME;
8248          rb_thread_schedule();

(eval.c)

この関数はThread#joinの実体で、Thread#joinはレシーバのスレッドが終了す るのを待つメソッドだ。確かに、待ち時間があるなら他のスレッドを動かした ほうがお得である。これで二つめの切り替え理由が見付かった。

時間待ち

さらにrb_thread_wait_for()という関数でもrb_thread_schedule()が見付かっ た。これは(Rubyの)sleepなどの実体である。

rb_thread_wait_for(簡約版)

8080  void
8081  rb_thread_wait_for(time)
8082      struct timeval time;
8083  {
8084      double date;

8124      date = timeofday() +
                 (double)time.tv_sec + (double)time.tv_usec*1e-6;
8125      curr_thread->status = THREAD_STOPPED;
8126      curr_thread->delay = date;
8127      curr_thread->wait_for = WAIT_TIME;
8128      rb_thread_schedule();
8129  }

(eval.c)

timeofday()は今現在の時刻を返す。それにtimeの値を加えるから、 dateは待ち時間が切れる時刻を示している。つまりこれは 「特定の時刻になるまで停止したい」という指定だ。

時間切れによる切り替え

以上はどれもなんらかの操作がRubyレベルから行われて、その結果スレッド切 り替えの原因となっていた。つまりここまでだとRubyレベルでもノンプリエン プティブになってしまっている。これだけでは、もしひたすら計算し続けるよ うなプログラムだったら一つのスレッドが永遠に走り続けることになってしま う。そこである程度動いたら自発的に実行権を捨てさせるようにしな ければならない。ではどの程度動いたら止まらなくてはいけないのだ ろうか。それを次に話そう。

setitimer

毎度毎度同じで芸がないような気もするが、さらにrb_thread_schedule()を 呼んでいるところを探してみた。すると今回は変なところで見付かる。 それはここだ。

catch_timer()

8574  static void
8575  catch_timer(sig)
8576      int sig;
8577  {
8578  #if !defined(POSIX_SIGNAL) && !defined(BSD_SIGNAL)
8579      signal(sig, catch_timer);
8580  #endif
8581      if (!rb_thread_critical) {
8582          if (rb_trap_immediate) {
8583              rb_thread_schedule();
8584          }
8585          else rb_thread_pending = 1;
8586      }
8587  }

(eval.c)

なにやらシグナル関係らしいのだが、これは一体なんだろう。 この関数catch_timer()を使っているところを追ってみると このあたりで使われていた。

rb_thread_start_0()(部分)

8620  static VALUE
8621  rb_thread_start_0(fn, arg, th_arg)
8622      VALUE (*fn)();
8623      void *arg;
8624      rb_thread_t th_arg;
8625  {

8632  #if defined(HAVE_SETITIMER)
8633      if (!thread_init) {
8634  #ifdef POSIX_SIGNAL
8635          posix_signal(SIGVTALRM, catch_timer);
8636  #else
8637          signal(SIGVTALRM, catch_timer);
8638  #endif
8639
8640          thread_init = 1;
8641          rb_thread_start_timer();
8642      }
8643  #endif

(eval.c)

つまりcatch_timer()SIGVTALRMのシグナルハンドラらしい。

ここでSIGVTALRMというのがどういうシグナルか、というのが問題になる。 これは実はsetitimerというシステムコールを使うと送られてくるシグナル なのである。それゆえ直前でHAVE_SETITIMERのチェックが入っているわけだ。 setitimerというのはSET Interval TIMERの略で、一定時間ごとにシグナルを 送るようOSに伝えるシステムコールである。

ではそのsetitimerを呼んでいるところはと言うと、偶然にもこのリストの 最後にあるrb_thread_start_timer()である。

全部まとめると次のような筋書きになる。setitimerで一定時間ごとにシグナ ルを送らせる。それをcatch_timer()でキャッチする。そこで rb_thread_schedule()を呼びスレッドを切り替える。完璧だ。

ただしシグナルはいついかなる時でも発生してしまうものだから、これだけ だとCレベルでもプリエンプティブということになってしまう。そこで catch_timer()のコードをもう一度見てほしい。

if (rb_trap_immediate) {
    rb_thread_schedule();
}
else rb_thread_pending = 1;

rb_thread_schedule()するのはrb_trap_immediateのときだけ、という条件が 付いている。これがポイントだ。rb_trap_immediateは名前の通り「シグナル を即座に処理するかどうか」を表しており、普段は偽になっている。これが真 になるのは単一スレッドでI/Oを行っている間など、ごく限られた期間だけで ある。ソースコード上ではTRAP_BEGTRAP_ENDで囲まれているところだ。

一方それが偽のときはrb_thread_pendingをセットしているので、これを 追ってみよう。この変数は次のところで使っている。

CHECK_INTSHAVE_SETITIMER

  73  #if defined(HAVE_SETITIMER) && !defined(__BOW__)
  74  EXTERN int rb_thread_pending;
  75  # define CHECK_INTS do {\
  76      if (!rb_prohibit_interrupt) {\
  77          if (rb_trap_pending) rb_trap_exec();\
  78          if (rb_thread_pending && !rb_thread_critical)\
  79              rb_thread_schedule();\
  80      }\
  81  } while (0)

(rubysig.h)

このようにCHECK_INTSの中でrb_thread_pendingをチェックし、 rb_thread_schedule()している。つまりSIGVTALRMを受けると rb_thread_pendingが真になり、その次にCHECK_INTSを通ったときにスレッド が切り替わるというわけだ。

このCHECK_INTSはこれまでもいろいろなところで登場していた。 例えばrb_eval()rb_call0()rb_yield_0()である。CHECK_INTSは定期 的に通るところに置いておかないと意味がないから、自然と重要関数に 集まるのだろう。

tick

setitimerがある場合についてはこれでわかった。しかしsetitimerがないとき はどうするのだろう。実は今見たばかりのCHECK_INTS#else側の定義が答え だ。

CHECK_INTSnot HAVE_SETITIMER

  84  EXTERN int rb_thread_tick;
  85  #define THREAD_TICK 500
  86  #define CHECK_INTS do {\
  87      if (!rb_prohibit_interrupt) {\
  88          if (rb_trap_pending) rb_trap_exec();\
  89          if (!rb_thread_critical) {\
  90              if (rb_thread_tick-- <= 0) {\
  91                  rb_thread_tick = THREAD_TICK;\
  92                  rb_thread_schedule();\
  93              }\
  94          }\
  95      }\
  96  } while (0)

(rubysig.h)

CHECK_INTSを通るたびにrb_thread_tickが減る。 0になったらrb_thread_schedule()する。 つまりTHREAD_TICK(=500)回CHECK_INTSを通ったら スレッドが切り替わるという仕組みだ。

スケジューリング

ポイントの第二点はどのスレッドに切り替えるかだ。 この決定を一手に担うのがrb_thread_schedule()である。

rb_thread_schedule()

rubyの重要関数はどいつもこいつもデカい。 このrb_thread_schedule()は220行以上ある。 徹底的に切り分けていこう。

rb_thread_schedule()(概形)

7819  void
7820  rb_thread_schedule()
7821  {
7822      rb_thread_t next;           /* OK */
7823      rb_thread_t th;
7824      rb_thread_t curr;
7825      int found = 0;
7826
7827      fd_set readfds;
7828      fd_set writefds;
7829      fd_set exceptfds;
7830      struct timeval delay_tv, *delay_ptr;
7831      double delay, now;  /* OK */
7832      int n, max;
7833      int need_select = 0;
7834      int select_timeout = 0;
7835
7836      rb_thread_pending = 0;
7837      if (curr_thread == curr_thread->next
7838          && curr_thread->status == THREAD_RUNNABLE)
7839          return;
7840
7841      next = 0;
7842      curr = curr_thread;         /* starting thread */
7843
7844      while (curr->status == THREAD_KILLED) {
7845          curr = curr->prev;
7846      }

          /* ……selectで使う変数を用意する…… */
          /* ……必要ならselectする        …… */
          /* ……次に起動するスレッドを決定…… */
          /* ……コンテキストスイッチ      …… */
8045  }

(eval.c)

(A)スレッドが一つしかない場合は何もしないですぐに戻るように なっている。従ってこれ以降の話は必ずスレッドが複数存在するという 前提に立って考えられる。

(B)続いて変数の初期化。whileまで含めて初期化と思っていいだろう。 currprevを辿っているので、生きている(status != THREAD_KILLED) 最後のスレッドがセットされる。なんで「最初の」でないかと言うと、 「currの次から始めてcurrを扱って終わる」というループがたくさん 出てくるからである。

そのあとにはselectがなんたら、という文が見える。 rubyのスレッド切り替えはselectに非常に依存しているので、 ここでまずselectについて予習しておこう。

select

selectというのは、とあるファイルの読み書きの準備ができるまで 待つためのシステムコールである。プロトタイプはこうだ。

int select(int max,
           fd_set *readset, fd_set *writeset, fd_set *exceptset,
           struct timeval *timeout);

fd_set型の変数にはチェックしたいfdの集合を入れる。 第一引数maxは「(fd_setに入っているfdの最大値)+1」。 timeoutselectの最大待ち時間である。 timeoutNULLなら無期限に待つ。timeoutの値が0なら一秒も待たずに チェックだけしてすぐ戻る。返り値は使うときに話す。

fd_setについて詳しく話す。fd_setは以下のようなマクロで操作できる。

fd_setの操作

fd_set set;

FD_ZERO(&set)       /* 初期化 */
FD_SET(fd, &set)    /* ファイルディスクリプタfdを集合に加える */
FD_ISSET(fd, &set)  /* fdが集合にあるなら真 */

fd_setは典型的にはビット配列であり、 n 番のファイルディスクリプタを チェックしたいときには n ビットめが立つ(図2)。

(fdset)
図2: fd_set

簡単なselectの使用例を見せよう。

selectの使用例

#include <stdio.h>
#include <sys/types.h>
#include <sys/time.h>
#include <unistd.h>

int
main(int argc, char **argv)
{
    char *buf[1024];
    fd_set readset;

    FD_ZERO(&readset);              /* readsetを初期化 */
    FD_SET(STDIN_FILENO, &readset); /* stdinを集合に入れる */
    select(STDIN_FILENO + 1, &readset, NULL, NULL,  NULL);
    read(STDIN_FILENO, buf, 1024);  /* 遅延なしで成功する */
    exit(0);
}

このコードではシステムコールは必ず成功するものと仮定し エラーチェックは全くやっていない。FD_ZEROFD_SETselectという 流れだけ見てほしい。selectの第五引数timeoutNULLにしているので このselect呼び出しは永遠にstdinの読み込みを待つ。そのselectが 終わったということは、次のreadでは全く待たずに読み込めるということ である。途中にprintをはさんでみればよりよく動きがわかるだろう。 またもう少し詳しいコード例も添付CD-ROMに入れてあ る\footnote{selectのコード例:添付CD-ROMのdoc/select.htmlを参照}。

selectの準備

ではrb_thread_schedule()のコードに戻ることにする。 ここのコードはスレッドの待ち理由ごとに分岐しているので、 その中身は短縮して示す。

rb_thread_schedule()selectの準備

7848    again:
          /* select関連変数の初期化 */
7849      max = -1;
7850      FD_ZERO(&readfds);
7851      FD_ZERO(&writefds);
7852      FD_ZERO(&exceptfds);
7853      delay = DELAY_INFTY;
7854      now = -1.0;
7855
7856      FOREACH_THREAD_FROM(curr, th) {
7857          if (!found && th->status <= THREAD_RUNNABLE) {
7858              found = 1;
7859          }
7860          if (th->status != THREAD_STOPPED) continue;
7861          if (th->wait_for & WAIT_JOIN) {
                  /* ……join待ち…… */
7866          }
7867          if (th->wait_for & WAIT_FD) {
                  /* ……I/O待ち…… */
7871          }
7872          if (th->wait_for & WAIT_SELECT) {
                  /* ……select待ち…… */
7882          }
7883          if (th->wait_for & WAIT_TIME) {
                  /* ……時間待ち…… */
7899          }
7900      }
7901      END_FOREACH_FROM(curr, th);

(eval.c)

嫌でも目立つのがFOREACHなんとかというマクロだ。 この二つは次にように定義されている。

FOREACH_THREAD_FROM

7360  #define FOREACH_THREAD_FROM(f,x) x = f; do { x = x->next;
7361  #define END_FOREACH_FROM(f,x) } while (x != f)

(eval.c)

わかりやすく展開してみよう。

th = curr;
do {
    th = th->next;
    {
        .....
    }
} while (th != curr);

スレッドの環状リストをcurrの次からたぐり最後にcurrを処理して終わる、 そのときに変数thを使う、という意味らしい。ややRubyのイテレータを 思わせるものがある……というのは想像力がありすぎだろうか。

ここでコードの続きに戻ると、この微妙に変なループを使ってselectの必要な スレッドがないかチェックする。先程見たとおりselectは読み・書き・例外・ 時刻を全部一気に待てるので、I/O待ちと時間待ちがselect一つに統合できる のはわかると思う。また前節では説明しなかったがselect待ちというのもある。 RubyのライブラリにもIO.selectというメソッドがあるし、Cレベルだと rb_thread_select()というのが使える。だからそのselectも同時に 実行しないといけない。fd_setを合成すれば複数のselectを同時に 済ますことができる。

残るはjoin待ちだけだ。このコードはいちおう見ておこう。

rb_thread_schedule()selectの準備−join待ち

7861          if (th->wait_for & WAIT_JOIN) {
7862              if (rb_thread_dead(th->join)) {
7863                  th->status = THREAD_RUNNABLE;
7864                  found = 1;
7865              }
7866          }

(eval.c)

rb_thread_dead()の意味は名前から明らかだ。引数のスレッドが終了している かどうか判定する。

selectを呼ぶ

ここまででselectが必要かどうかが判明し、必要ならそのfd_setも準備 できている。そこで必要ならselectを呼ぶ。例えすぐに起動できる (THREAD_RUNNABLE)スレッドがあってもselectは呼ばなくてはいけない。 実はもうとっくにI/O待ちなどが終わっているスレッドがあり、そちらのほうが 優先順位が高いかもしれないからだ。ただしその場合はselectにはすぐに 戻るよう指定し、I/Oが完了しているかのチェックだけさせる。

rb_thread_schedule()select

7904      if (need_select) {
7905          /* delayをtimevalに変換する。 */
7906          /* すぐに起動可能なスレッドがあったらI/Oチェックだけやる */
7907          if (found) {
7908              delay_tv.tv_sec = 0;
7909              delay_tv.tv_usec = 0;
7910              delay_ptr = &delay_tv;
7911          }
7912          else if (delay == DELAY_INFTY) {
7913              delay_ptr = 0;
7914          }
7915          else {
7916              delay_tv.tv_sec = delay;
7917              delay_tv.tv_usec = (delay - (double)delay_tv.tv_sec)*1e6;
7918              delay_ptr = &delay_tv;
7919          }
7920
7921          n = select(max+1, &readfds, &writefds, &exceptfds, delay_ptr);
7922          if (n < 0) {
                  /* ……シグナルなどに割り込まれた…… */
7944          }
7945          if (select_timeout && n == 0) {
                  /* ……タイムアウトした…… */
7960          }
7961          if (n > 0) {
                  /* ……正常終了…… */
7989          }
7990          /* どこかのスレッドで時間待ちが終了した。
7991             スレッドを特定するためにもう一度ループを回す */
7992          if (!found && delay != DELAY_INFTY)
7993              goto again;
7994      }

(eval.c)

ブロック前半はコメントに書いてあるとおり。 delayは次にいずれかのスレッドが起動可能になるまでのusecなので、 それをtimeval形式に変換する。

後半では実際にselectを呼び、その結果で分岐する。このコードが 長いのでまた分割した。シグナルに割り込まれた場合は最初に戻って やりなおすかエラーかどちらかなので、意味があるのは残りの二つだ。

タイムアウト

selectがタイムアウトした場合は時間待ちまたはselect待ちの スレッドが起動可能になっているかもしれない。それをチェックして 走れるスレッドを探す。見付かったらTHREAD_RUNNABLEを付ける。

正常終了

selectが正常終了したということはI/Oの準備ができたか、select待ちが 終わったかのどちらかだ。fd_setをチェックして待ちが終わったスレッドを 探す。見付かったらTHREAD_RUNNABLEを付ける。

次のスレッドを決定

今までの情報を全て勘案して、最終的に次に起動するスレッドを決定する。 元から起動可能なもの、待ちが終わったものなどはどれもRUNNABLEになって いるはずなので、その中から適当に選べばいい。

rb_thread_schedule()−次のスレッドを決定

7996      FOREACH_THREAD_FROM(curr, th) {
7997          if (th->status == THREAD_TO_KILL) {              /*(A)*/
7998              next = th;
7999              break;
8000          }
8001          if (th->status == THREAD_RUNNABLE && th->stk_ptr) {
8002              if (!next || next->priority < th->priority)  /*(B)*/
8003                 next = th;
8004          }
8005      }
8006      END_FOREACH_FROM(curr, th);

(eval.c)

(A)いまにも終了しようとしているスレッドがあったら優先的に 順位をまわして終了させる。

(B)走れそうな奴を見付ける。ただしpriorityの値を考慮するらしい。 このメンバはRubyレベルからもThread#priority Thread#priority=で 変更できる。特にruby自体が変更するようなことはしていない。

もしここまでが終わっても次のスレッドが見付からなかったら、つまりnextが セットされていなかったら、どうなるのだろうか。既にselectをしたの だから時間待ちやI/O待ちのスレッドのどれかは待ちが終わっているはずだ。 それがないということは残るは他のスレッド待ちだけで、しかももう起動でき るスレッドはないからその待ちが終わることはない。つまり デッドロック(dead lock)である。

もちろんこれ以外にもデッドロックは起こりうるのだが、一般的にデッドロッ クを検出するというのはとても難しい。特にrubyの場合はMutexなども Rubyレベルで実装されているので完全な検出は不可能に近い。

スレッドを切り替える

次に起動すべきスレッドは決定した。 I/Oやselectのチェックもした。 あとは目指すスレッドに制御を移してやるだけだ。 しかしrb_thread_schedule()の最後とスレッド切り替えのコードは 節を改めて始めることにしよう。

コンテキストスイッチ

最後の第三点はスレッド切り替え、 コンテキストスイッチ(context switch)である。 ここがrubyのスレッドの一番面白いところだ。

基本路線

ではrb_thread_schedule()末尾から行こう。 ここの節の話は厄介なので思い切り簡約版で行く。

rb_thread_schedule()(コンテキストスイッチ)

if (THREAD_SAVE_CONTEXT(curr)) {
    return;
}
rb_thread_restore_context(next, RESTORE_NORMAL);

THREAD_SAVE_CONTEXT()のところは その中身をいくつか展開して考えないとわからない。

THREAD_SAVE_CONTEXT()

7619  #define THREAD_SAVE_CONTEXT(th) \
7620      (rb_thread_save_context(th),thread_switch(setjmp((th)->context)))

7587  static int
7588  thread_switch(n)
7589      int n;
7590  {
7591      switch (n) {
7592        case 0:
7593          return 0;
7594        case RESTORE_FATAL:
7595          JUMP_TAG(TAG_FATAL);
7596          break;
7597        case RESTORE_INTERRUPT:
7598          rb_interrupt();
7599          break;
            /* ……異常系をいろいろ処理…… */
7612        case RESTORE_NORMAL:
7613        default:
7614          break;
7615      }
7616      return 1;
7617  }

(eval.c)

つまり三つ合わせて展開するとこうなる。

rb_thread_save_context(curr);
switch (setjmp(curr->context)) {
  case 0:
    break;
  case RESTORE_FATAL:
    ....
  case RESTORE_INTERRUPT:
    ....
  /* ……異常系の処理…… */
  case RESTORE_NORMAL:
  default:
    return;
}
rb_thread_restore_context(next, RESTORE_NORMAL);

setjmp()の返り値とrb_thread_restore_context()の両方で RESTORE_NORMALが現れているのは明らかに怪しい。 rb_thread_restore_context()の中でlongjmp()していて、 setjmp()longjmp()が対応するのだろうと予想できる。 そして関数の名前からも意味を想像すると、

現在のスレッドのコンテキストを保存する
setjmp
次のスレッドのコンテキストを復帰
longjmp

というのが大筋の流れだろう。ただしここで注意しなければならない のは、このsetjmp()longjmp()の組はこのスレッドの中で完結している のではないということだ。setjmp()は自分のコンテキストを保存するために 使い、longjmp()は次のスレッドのコンテキストに復帰するときに使う。つ まり次のようなsetjmp()/longjmp()の連環ができていることになる (図3)。

(setjmploop)
図3: setjmpの連鎖による本返し縫い

setjmp()/longjmp()でCPUまわりの復帰はできるから、 残るコンテキストはRubyスタックにマシンスタックである。 その退避がrb_thread_save_context()、 復帰がrb_thread_restore_context()だ。順番に見ていこう。

rb_thread_save_context()

ではまずコンテキストを保存するrb_thread_save_context()から。

rb_thread_save_context()(簡約版)

7539  static void
7540  rb_thread_save_context(th)
7541      rb_thread_t th;
7542  {
7543      VALUE *pos;
7544      int len;
7545      static VALUE tval;
7546
7547      len = ruby_stack_length(&pos);
7548      th->stk_len = 0;
7549      th->stk_pos = (rb_gc_stack_start<pos)?rb_gc_stack_start
7550                                           :rb_gc_stack_start - len;
7551      if (len > th->stk_max) {
7552          REALLOC_N(th->stk_ptr, VALUE, len);
7553          th->stk_max = len;
7554      }
7555      th->stk_len = len;
7556      FLUSH_REGISTER_WINDOWS;
7557      MEMCPY(th->stk_ptr, th->stk_pos, VALUE, th->stk_len);

          /* …………省略………… */
      }

(eval.c)

後半はthruby_scopeなどグローバル変数をひたすら代入しまくっているだ けで面白くないので省略した。残りの、ここに載せた部分ではマシンスタック をまるごとth->stk_ptrの先にコピーしようとしている。

まずruby_stack_length()だが、引数のposにスタックの先端アドレスを書き込 み、長さを返す。この値を使ってスタックの範囲を特定し、下端側のアドレス をth->stk_ptrにセットする。なにやら分岐しているのは上にのびるスタック 下にのびるスタックとがあるからだ(図4)。

(twodirection)
図4: 上にのびるスタック、下にのびるスタック

そうしたらあとはth->stk_ptrの先にメモリを確保しスタックをコピーすれば よい。th->stk_max分のメモリを確保して長さlenだけコピーする。

FLUSH_REGISTER_WINDOWSは第5章『ガ−ベージコレクション』で説明したのでもういいだろう。 スタック領域のキャッシュをメモリに落とすマクロ(実体はアセンブラ)だ。 スタック全体を対象にするときには必ず呼ばなければいけない。

rb_thread_restore_context()

そして最後にスレッドを復帰するための関数、 rb_thread_restore_context()だ。

rb_thread_restore_context()

7635  static void
7636  rb_thread_restore_context(th, exit)
7637      rb_thread_t th;
7638      int exit;
7639  {
7640      VALUE v;
7641      static rb_thread_t tmp;
7642      static int ex;
7643      static VALUE tval;
7644
7645      if (!th->stk_ptr) rb_bug("unsaved context");
7646
7647      if (&v < rb_gc_stack_start) {
7648          /* マシンスタックは下にのびる */
7649          if (&v > th->stk_pos) stack_extend(th, exit);
7650      }
7651      else {
7652          /* マシンスタックは上にのびる */
7653          if (&v < th->stk_pos + th->stk_len) stack_extend(th, exit);
7654      }

          /* 省略……グローバル変数を戻す */

7677      tmp = th;
7678      ex = exit;
7679      FLUSH_REGISTER_WINDOWS;
7680      MEMCPY(tmp->stk_pos, tmp->stk_ptr, VALUE, tmp->stk_len);
7681
7682      tval = rb_lastline_get();
7683      rb_lastline_set(tmp->last_line);
7684      tmp->last_line = tval;
7685      tval = rb_backref_get();
7686      rb_backref_set(tmp->last_match);
7687      tmp->last_match = tval;
7688
7689      longjmp(tmp->context, ex);
7690  }

(eval.c)

引数thが実行を戻す相手である。核となるのは後半のMEMCPY()longjmp()だ。MEMCPY()は最後に近ければ近いほどよい。この操作から longjmp()の間はスタックが壊れた状態になっているからである。

それなのに、rb_lastline_set()rb_backref_set()が入っている。 これは$_$~の復帰だ。この二つの変数はローカル変数のくせにスレッド ローカルでも あるので、一つのローカル変数スロットでもスレッドの数だけ存在するのであ る。この場所になくてはならないのは実際に戻す先がスタックだからである。 ローカル変数だからスロット領域はalloca()で確保されている。

と、基本は以上でよいのだが、単純にスタックを書き戻してしまうと切り替え 先のスレッドより現在のスレッドのスタックのほうが長さが短かかった場合、 コピーした瞬間に今まさに実行中の関数(rb_thread_restore_context)のス タックフレームを上書きしてしまう。即ち引数thの内容が壊れる。だからそう いうことがないようにまずスタックを伸ばさないといけない。それをやっている のが前半のstack_extend()だ。

stack_extend()

7624  static void
7625  stack_extend(th, exit)
7626      rb_thread_t th;
7627      int exit;
7628  {
7629      VALUE space[1024];
7630
7631      memset(space, 0, 1);        /* prevent array from optimization */
7632      rb_thread_restore_context(th, exit);
7633  }

(eval.c)

1Kバイト分のローカル変数(マシンスタック領域に置かれる)を確保してスタック をむりやり延ばす。しかし、当然のことだが、stack_extend()からreturnして しまったら伸びたスタックがまた縮んでしまう。だからその場ですぐに rb_thread_restore_context()を呼び直す。

ところでrb_thread_restore_context()の仕事が完了するとはlongjmp()の呼び 出しに至ることであり、一度呼び出したら絶対に戻ってこない。当然 stack_extend()の呼び出しも絶対に戻らない。それゆえ rb_thread_restore_context()ではstack_extend()から戻ったあとの処理 その他いろいろを考える必要はない。

問題点

以上がrubyのスレッド切り替えの実装だ。どう考えても軽くはない。大量に malloc() realloc()して大量にmemcpy()してsetjmp() longjmp()した 挙句スタックをのばすために関数を呼びまくるのだから「死ぬほど重い」と 表現しても問題あるまい。しかしその代わりにOS依存のシステムコール呼び 出しもなければアセンブラもSparcのレジスタウィンドウ関連のみだ。これ ならば確かに移植性は高そうである。

問題点は他にもある。それは、全部のスレッドのスタックが同じアドレスに割 り当てられるために、スタック領域を指すポインタを使っているコードが動か ない可能性があることだ。実はTcl/Tkが見事にこれにハマってしまっており、 RubyのTcl/Tkインターフェイスでは仕方なくメインスレッドから限定アクセス することで回避している。

もちろんネイティブスレッドとの相性もよろしくない。 特定のネイティブスレッドの上でだけrubyスレッドを動かすようにしないと うまく動かないだろう。UNIXだとスレッドを使いまくるライブラリはまだ 少ないがWin32ではなにかとスレッドが動くので注意が必要だ。


御意見・御感想・誤殖の指摘などは 青木峰郎 <aamine@loveruby.net> までお願いします。

『Rubyソースコード完全解説』 はインプレスダイレクトで御予約・御購入いただけます (書籍紹介ページへ飛びます)。

Copyright (c) 2002-2004 Minero Aoki, All rights reserved.