pumaが同時に建てるthread数はこちらで指定することが出来る
実際の定義を見るとこのように2つの値を入れることが出来る
threads 5, 16
第一引数が最小値、第二引数が最大値を表しているらしく、処理の際に自動でスレッド数をスケールしてくれるらしい
実際どのように動くか空気感を調べる必要があったためメモ
最初にまとめ
語弊がある気もするけどこんな感じ
- 最初は最小値の数だけthreadを用意する (thread poolに格納される)
- 既存のthreadが全てビジーな状態になった際、現在のthread poolのスレッド数が最大値に達していない場合は
Thread.new
して新しいthreadを確保- thread数が +1 される
- 30秒おきにthread poolの状況を確認し、もし待ち状態のthreadが存在するのであればthreadを1つ削除するよう宣言
- threadの処理に入る際、実際に待ち状態だった場合に自身を削除する
表題の関心事について調べた時に見たところ
以下のh4要素はそれぞれgithubへのリンクになっているので自分で実際のコードを見る場合はそちらも参考にしてください
Puma::Server#run
Puma::Server#run
の中という如何にもなメソッドの中に如何にもな記述があり、ここで設定した最小値と最大値を渡している
@thread_pool = ThreadPool.new(@min_threads, @max_threads, IOBuffer) do |client, buffer|
#run
メソッド内部で Puma::ThreadPool#auto_trim!
というメソッドを呼んでいる
if @auto_trim_time # デフォルト値は 30秒 @thread_pool.auto_trim!(@auto_trim_time) end
その後 #handle_servers
を呼び、実際にハンドリングされるっぽい (今回リクエストがどうハンドルされるかという点はちゃんと追ってない)
if background @thread = Thread.new { handle_servers } return @thread else handle_servers end
今回は先に #handle_servers
の処理を追う
Puma::Server#handle_servers
無限ループになっていて、Puma::Client
(実際にリクエスト/レスポンスを処理する実装があるっぽい) をthread poolに渡す箇所がある
pool = @thread_pool
# 無限ループの中 client = Client.new io, @binder.env(sock) # 中略 pool << client
Puma::ThreadPool#<<
@todo
(ただの配列) にclientを挿入する
ここで今回の関心事について重要っぽい条件分岐が存在していて、
- 待ち状態のthreadの数(
@waiting
) が todoの要素数 より小さい かつ - 現在存在するthreadの数(
@spawned
) が 設定した最大値(@max
) より小さい
という場合にのみ #spawn_thread
を呼び出す
def <<(work) # work には前述した client が渡される @mutex.synchronize do # 略 @todo << work if @waiting < @todo.size and @spawned < @max spawn_thread end # 略 end end
Puma::ThreadPool#spawn_thread
実際にthreadを新しく作って @workers
に溜めるところ
threadの内容的には無限ループしていて、処理内容は
- todoに要素が残っている場合
- その処理
- todoに要素が残っていない場合
@trim_requested
(threadを削除したいという要求の数が格納された変数) が 1以上の場合- ループから抜ける
@trim_requested
が 0の場合- todoに要素が入ったことを受け取るまで待つ (
@waiting
に +1 する) - 待ち終わったら
@waiting
-1
- todoに要素が入ったことを受け取るまで待つ (
となっていて、ループから抜けるとworkersから自身を削除する、という動きをする
(todoの各要素は前述したがclientとなっていて、実際にhttpのリクエストを処理するものとなっている)
ここまでがthreadsを実際に増やすまでの流れと、減らす機構の実装で、ここからは減らす起因となる部分を追う
Puma::ThreadPool#auto_trim!
記事の前半 (Puma::Server#run
のあたり) で追従せずにすっとばした処理
AutoTrimなるものを動かす timeoutに渡される値は Puma::Server#run
で渡されたものとなっており、デフォルトだと30秒
見たままのことをしている
def auto_trim!(timeout=30) @auto_trim = AutoTrim.new(self, timeout) @auto_trim.start! end
Puma::ThreadPool::AutoTrim#start!
timeoutの分だけsleepを挟みつつ Puma::ThreadPool#trim
を呼び出している
def start! @running = true @thread = Thread.new do while @running @pool.trim sleep @timeout end end end
Puma::ThreadPool#trim
- 待ち状態のスレッド(
@waiting
) が 1以上 かつ - 削除予定スレッド数(
@trim_requested
) を差し引いた現在のスレッド数 が 設定した最小値 より大きい
場合のみ @trim_requested
を +1 する
def trim(force=false) @mutex.synchronize do if (force or @waiting > 0) and @spawned - @trim_requested > @min @trim_requested += 1 @not_empty.signal end end end
ここで +1 され、thread側のループが回ることでようやくthreadが削除される
わかること
ここまで追うと今回知りたかったことはだいたいわかる
以下2つの機構によってthreads数が伸縮する
- リクエストを処理するところ
- threadを少なくするところ
- 30秒おきにthreadsの状態を鑑みて
@trim_requested
を +1 する
- 30秒おきにthreadsの状態を鑑みて
こういうスレッドベースな処理を書いたこともコードを読んだこともなかったのでこの処理が良いのか悪いのか普通のことなのかわからないが、全体的に宣言的に動いているんだなというのがわかる (各処理ごとに現在のステータスを変更していき、各処理ごとにそのステータスに合わせて動くような実装だった)
しかもそういうものって非同期であることを意識して書かないといけないのでちょっと億劫になってしまうものだと思っていたが、そのあたりはruby標準ライブラリの良い感じな機構提供のおかげでわかりやすさが保たれているように感じた、Mutex と ConditionVariable すごい (Rubyもすごい)
今は別に非同期ななにかを書くつもりはないが、デザインのパターンとその実装例として大いに参考になるコードだった