pumaのthread数の伸縮がどのように行われているか

pumaが同時に建てるthread数はこちらで指定することが出来る

実際の定義を見るとこのように2つの値を入れることが出来る

threads 5, 16

第一引数が最小値、第二引数が最大値を表しているらしく、処理の際に自動でスレッド数をスケールしてくれるらしい

実際どのように動くか空気感を調べる必要があったためメモ

最初にまとめ

語弊がある気もするけどこんな感じ

  • 最初は最小値の数だけthreadを用意する (thread poolに格納される)
  • 既存のthreadが全てビジーな状態になった際、現在のthread poolのスレッド数が最大値に達していない場合は Thread.new して新しいthreadを確保
    • thread数が +1 される
  • 30秒おきにthread poolの状況を確認し、もし待ち状態のthreadが存在するのであればthreadを1つ削除するよう宣言
  • threadの処理に入る際、実際に待ち状態だった場合に自身を削除する

表題の関心事について調べた時に見たところ

以下のh4要素はそれぞれgithubへのリンクになっているので自分で実際のコードを見る場合はそちらも参考にしてください

Puma::Server#run

Puma::Server#run の中という如何にもなメソッドの中に如何にもな記述があり、ここで設定した最小値と最大値を渡している

@thread_pool = ThreadPool.new(@min_threads,
                              @max_threads,
                              IOBuffer) do |client, buffer|

#run メソッド内部で Puma::ThreadPool#auto_trim! というメソッドを呼んでいる

if @auto_trim_time # デフォルト値は 30秒
  @thread_pool.auto_trim!(@auto_trim_time)
end

その後 #handle_servers を呼び、実際にハンドリングされるっぽい (今回リクエストがどうハンドルされるかという点はちゃんと追ってない)

if background
  @thread = Thread.new { handle_servers }
  return @thread
else
  handle_servers
end

今回は先に #handle_servers の処理を追う

Puma::Server#handle_servers

無限ループになっていて、Puma::Client (実際にリクエスト/レスポンスを処理する実装があるっぽい) をthread poolに渡す箇所がある

pool = @thread_pool
# 無限ループの中
client = Client.new io, @binder.env(sock)
# 中略
pool << client

Puma::ThreadPool#<<

@todo (ただの配列) にclientを挿入する

ここで今回の関心事について重要っぽい条件分岐が存在していて、

  • 待ち状態のthreadの数(@waiting) が todoの要素数 より小さい かつ
  • 現在存在するthreadの数(@spawned) が 設定した最大値(@max) より小さい

という場合にのみ #spawn_thread を呼び出す

def <<(work) # work には前述した client が渡される
  @mutex.synchronize do
    # 略
    @todo << work

    if @waiting < @todo.size and @spawned < @max
      spawn_thread
    end
    # 略
  end
end

Puma::ThreadPool#spawn_thread

実際にthreadを新しく作って @workers に溜めるところ

threadの内容的には無限ループしていて、処理内容は

  • todoに要素が残っている場合
    • その処理
  • todoに要素が残っていない場合
    • @trim_requested(threadを削除したいという要求の数が格納された変数) が 1以上の場合
      • ループから抜ける
    • @trim_requested が 0の場合
      • todoに要素が入ったことを受け取るまで待つ ( @waiting に +1 する)
      • 待ち終わったら @waiting -1

となっていて、ループから抜けるとworkersから自身を削除する、という動きをする

(todoの各要素は前述したがclientとなっていて、実際にhttpのリクエストを処理するものとなっている)

ここまでがthreadsを実際に増やすまでの流れと、減らす機構の実装で、ここからは減らす起因となる部分を追う

Puma::ThreadPool#auto_trim!

記事の前半 (Puma::Server#run のあたり) で追従せずにすっとばした処理

AutoTrimなるものを動かす timeoutに渡される値は Puma::Server#run で渡されたものとなっており、デフォルトだと30秒

見たままのことをしている

def auto_trim!(timeout=30)
  @auto_trim = AutoTrim.new(self, timeout)
  @auto_trim.start!
end

Puma::ThreadPool::AutoTrim#start!

timeoutの分だけsleepを挟みつつ Puma::ThreadPool#trim を呼び出している

def start!
  @running = true

  @thread = Thread.new do
    while @running
      @pool.trim
      sleep @timeout
    end
  end
end

Puma::ThreadPool#trim

  • 待ち状態のスレッド(@waiting) が 1以上 かつ
  • 削除予定スレッド数(@trim_requested) を差し引いた現在のスレッド数 が 設定した最小値 より大きい

場合のみ @trim_requested を +1 する

def trim(force=false)
  @mutex.synchronize do
    if (force or @waiting > 0) and @spawned - @trim_requested > @min
      @trim_requested += 1
      @not_empty.signal
    end
  end
end

ここで +1 され、thread側のループが回ることでようやくthreadが削除される

わかること

ここまで追うと今回知りたかったことはだいたいわかる

以下2つの機構によってthreads数が伸縮する

  • リクエストを処理するところ
    • リクエストが来た場合はtodoに格納する
    • todoに入るリクエストは既存のthreadが処理していく
    • 既存のthreadがすべて処理中だった場合新規にthreadを作成
    • もし @trim_requested があった場合はthreadを削除する
  • threadを少なくするところ
    • 30秒おきにthreadsの状態を鑑みて @trim_requested を +1 する

こういうスレッドベースな処理を書いたこともコードを読んだこともなかったのでこの処理が良いのか悪いのか普通のことなのかわからないが、全体的に宣言的に動いているんだなというのがわかる (各処理ごとに現在のステータスを変更していき、各処理ごとにそのステータスに合わせて動くような実装だった)

しかもそういうものって非同期であることを意識して書かないといけないのでちょっと億劫になってしまうものだと思っていたが、そのあたりはruby標準ライブラリの良い感じな機構提供のおかげでわかりやすさが保たれているように感じた、Mutex と ConditionVariable すごい (Rubyもすごい)

今は別に非同期ななにかを書くつもりはないが、デザインのパターンとその実装例として大いに参考になるコードだった