Class: Puma::ThreadPool
| Relationships & Source Files | |
| Namespace Children | |
|
Classes:
| |
|
Exceptions:
| |
| Inherits: | Object |
| Defined in: | lib/puma/thread_pool.rb |
Overview
Internal Docs for A simple thread pool management object.
Each Puma “worker” has a thread pool to process requests.
First a connection to a client is made in Server. It is wrapped in a Client instance and then passed to the Reactor to ensure the whole request is buffered into memory. Once the request is ready, it is passed into a thread pool via the #<< operator where it is stored in a @todo array.
Each thread in the pool has an internal loop where it pulls a request from the @todo array and processes it.
Constant Summary
-
SHUTDOWN_GRACE_TIME =
# File 'lib/puma/thread_pool.rb', line 76
How long, after raising the
ThreadPool::ForceShutdownof a thread during forced shutdown mode, to wait for the thread to try and finish up its work before leaving the thread to die on the vine.5
Class Method Summary
Instance Attribute Summary
- #busy_threads readonly
- #max rw
- #min rw
-
#out_of_band_running
readonly
seconds.
- #pool_capacity readonly
- #spawned readonly
- #trim_requested readonly
- #waiting readonly
-
#can_spawn_processor? ⇒ Boolean
readonly
Internal use only
Must be called with @mutex held!
Instance Method Summary
-
#<<(work)
Add
workto the todo list for a Thread to pickup and process. - #auto_reap!(timeout = @reaping_time)
- #auto_trim!(timeout = @auto_trim_time)
-
#backlog
How many objects have yet to be processed by the pool?
-
#backlog_max
The maximum size of the backlog.
-
#reap
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
- #reset_max
-
#shutdown(timeout)
Tell all threads in the pool to exit and wait for them to finish.
-
#stats ⇒ Hash
generate stats hash so as not to perform multiple locks.
-
#trim(force = false)
If there are any free threads in the pool, tell one to go ahead and exit.
- #wait_while_out_of_band_running
-
#with_force_shutdown
Allows
ForceShutdownto be raised within the provided block if the thread is forced to shutdown during execution. - #with_mutex(&block)
- #shutdown_debug(message) private
- #trigger_before_thread_exit_hooks private
- #trigger_before_thread_start_hooks private
- #trigger_out_of_band_hook private
- #spawn_thread_if_needed Internal use only
-
#spawn_thread
private
Internal use only
Must be called with @mutex held!
Constructor Details
.new(name, options = {}, server: nil, &block) ⇒ ThreadPool
# File 'lib/puma/thread_pool.rb', line 86
def initialize(name, = {}, server: nil, &block) @server = server @not_empty = ConditionVariable.new @not_full = ConditionVariable.new @mutex = Mutex.new @todo = Queue.new @backlog_max = 0 @spawned = 0 @waiting = 0 @name = name @min = Integer([:min_threads]) @max = Integer([:max_threads]) @max_io_threads = Integer([:max_io_threads] || 0) # Not an 'exposed' option, options[:pool_shutdown_grace_time] is used in CI # to shorten @shutdown_grace_time from SHUTDOWN_GRACE_TIME. Parallel CI # makes stubbing constants difficult. @shutdown_grace_time = Float([:pool_shutdown_grace_time] || SHUTDOWN_GRACE_TIME) @shutdown_debug = [:shutdown_debug] @block = block @out_of_band = [:out_of_band] @out_of_band_running = false @out_of_band_condvar = ConditionVariable.new @before_thread_start = [:before_thread_start] @before_thread_exit = [:before_thread_exit] @reaping_time = [:reaping_time] @auto_trim_time = [:auto_trim_time] @shutdown = false @trim_requested = 0 @out_of_band_pending = false @processors = [] @auto_trim = nil @reaper = nil @mutex.synchronize do @min.times do spawn_thread @not_full.wait(@mutex) end end @force_shutdown = false @shutdown_mutex = Mutex.new end
Instance Attribute Details
#busy_threads (readonly)
# File 'lib/puma/thread_pool.rb', line 180
def busy_threads with_mutex { @spawned - @waiting + @todo.size } end
#can_spawn_processor? ⇒ Boolean (readonly)
Must be called with @mutex held!
# File 'lib/puma/thread_pool.rb', line 328
def can_spawn_processor? io_processors_count = @processors.count(&:marked_as_io_thread?) extra_io_processors_count = io_processors_count > @max_io_threads ? io_processors_count - @max_io_threads : 0 (@spawned - io_processors_count) < (@max - extra_io_processors_count) end
#max (rw)
[ GitHub ]# File 'lib/puma/thread_pool.rb', line 139
attr_accessor :min, :max
#min (rw)
[ GitHub ]# File 'lib/puma/thread_pool.rb', line 139
attr_accessor :min, :max
#out_of_band_running (readonly)
seconds
# File 'lib/puma/thread_pool.rb', line 78
attr_reader :out_of_band_running
#pool_capacity (readonly)
[ GitHub ]#spawned (readonly)
[ GitHub ]# File 'lib/puma/thread_pool.rb', line 138
attr_reader :spawned, :trim_requested, :waiting
#trim_requested (readonly)
[ GitHub ]#waiting (readonly)
[ GitHub ]# File 'lib/puma/thread_pool.rb', line 138
attr_reader :spawned, :trim_requested, :waiting
Instance Method Details
#<<(work)
Add work to the todo list for a Thread to pickup and process.
# File 'lib/puma/thread_pool.rb', line 335
def <<(work) with_mutex do if @shutdown raise "Unable to add work while shutting down" end @todo << work t = @todo.size @backlog_max = t if t > @backlog_max if @waiting < @todo.size and can_spawn_processor? spawn_thread end @not_empty.signal end self end
#auto_reap!(timeout = @reaping_time)
[ GitHub ]#auto_trim!(timeout = @auto_trim_time)
[ GitHub ]#backlog
How many objects have yet to be processed by the pool?
# File 'lib/puma/thread_pool.rb', line 163
def backlog with_mutex { @todo.size } end
#backlog_max
The maximum size of the backlog
# File 'lib/puma/thread_pool.rb', line 169
def backlog_max with_mutex { @backlog_max } end
#reap
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
# File 'lib/puma/thread_pool.rb', line 378
def reap with_mutex do @processors, dead_processors = @processors.partition(&:alive?) dead_processors.each do |processor| processor.kill @spawned -= 1 end end end
#reset_max
[ GitHub ]# File 'lib/puma/thread_pool.rb', line 157
def reset_max with_mutex { @backlog_max = 0 } end
#shutdown(timeout)
Tell all threads in the pool to exit and wait for them to finish. Wait timeout seconds then raise ThreadPool::ForceShutdown in remaining threads. Next, wait an extra @shutdown_grace_time seconds then force-kill remaining threads. Finally, wait 1 second for remaining threads to exit.
# File 'lib/puma/thread_pool.rb', line 444
def shutdown(timeout) threads = with_mutex do @shutdown = true @trim_requested = @spawned @not_empty.broadcast @not_full.broadcast @auto_trim&.stop @reaper&.stop # dup processors so that we join them all safely @processors.dup end if @shutdown_debug == true shutdown_debug("Shutdown initiated") end if timeout == -1 # Wait for threads to finish without force shutdown. threads.each(&:join) else join = ->(inner_timeout) do start = Process.clock_gettime(Process::CLOCK_MONOTONIC) threads.reject! do |t| remaining = inner_timeout - (Process.clock_gettime(Process::CLOCK_MONOTONIC) - start) remaining > 0 && t.join(remaining) end end # Wait timeout seconds for threads to finish. join.call(timeout) if @shutdown_debug == :on_force && !threads.empty? shutdown_debug("Shutdown timeout exceeded") end # If threads are still running, raise ForceShutdown and wait to finish. @shutdown_mutex.synchronize do @force_shutdown = true threads.each do |t| t.raise ForceShutdown if t[:with_force_shutdown] end end join.call(@shutdown_grace_time) if @shutdown_debug == :on_force && !threads.empty? shutdown_debug("Shutdown grace timeout exceeded") end # If threads are _still_ running, forcefully kill them and wait to finish. threads.each(&:kill) join.call(1) end @spawned = 0 @processors = [] end
#shutdown_debug(message) (private)
[ GitHub ]# File 'lib/puma/thread_pool.rb', line 502
def shutdown_debug() pid = Process.pid threads = Thread.list $stdout.syswrite "#{pid}: #{}\n" $stdout.syswrite "#{pid}: === Begin thread backtrace dump ===\n" threads.each_with_index do |thread, index| $stdout.syswrite "#{pid}: Thread #{index + 1}/#{threads.size}: #{thread.inspect}\n" $stdout.syswrite "#{pid}: #{(thread.backtrace || []).join("\n#{pid}: ")}\n\n" end $stdout.syswrite "#{pid}: === End thread backtrace dump ===\n" end
#spawn_thread (private)
Must be called with @mutex held!
# File 'lib/puma/thread_pool.rb', line 188
def spawn_thread @spawned += 1 trigger_before_thread_start_hooks processor = ProcessorThread.new(self) processor.thread = Thread.new(processor, @spawned) do |processor, spawned| Puma.set_thread_name '%s tp %03i' % [@name, spawned] # Advertise server into the thread Thread.current.puma_server = @server todo = @todo block = @block mutex = @mutex not_empty = @not_empty not_full = @not_full while true work = nil mutex.synchronize do if processor.marked_as_io_thread? if @processors.count { |t| !t.marked_as_io_thread? } < @max # We're not at max processor threads, so the io thread can rejoin the normal population. processor.marked_as_io_thread = false else # We're already at max threads, so we exit the extra io thread. @processors.delete(processor) trigger_before_thread_exit_hooks Thread.exit end end while todo.empty? if @trim_requested > 0 @trim_requested -= 1 @spawned -= 1 @processors.delete(processor) not_full.signal trigger_before_thread_exit_hooks Thread.exit end @waiting += 1 if @out_of_band_pending && trigger_out_of_band_hook @out_of_band_pending = false end not_full.signal begin not_empty.wait mutex ensure @waiting -= 1 end end work = todo.shift end begin @out_of_band_pending = true if block.call(processor, work) rescue Exception => e STDERR.puts "Error reached top of thread-pool: #{e.} (#{e.class})" end end end @processors << processor processor end
#spawn_thread_if_needed
# File 'lib/puma/thread_pool.rb', line 354
def spawn_thread_if_needed # :nodoc: with_mutex do if @waiting < @todo.size and can_spawn_processor? spawn_thread end end end
#stats ⇒ Hash
generate stats hash so as not to perform multiple locks
# File 'lib/puma/thread_pool.rb', line 143
def stats with_mutex do temp = @backlog_max @backlog_max = 0 { backlog: @todo.size, running: @spawned, pool_capacity: pool_capacity, busy_threads: @spawned - @waiting + @todo.size, io_threads: @processors.count(&:marked_as_io_thread?), backlog_max: temp } end end
#trigger_before_thread_exit_hooks (private)
[ GitHub ]# File 'lib/puma/thread_pool.rb', line 275
def trigger_before_thread_exit_hooks return unless @before_thread_exit&.any? @before_thread_exit.each do |b| begin b[:block].call rescue Exception => e STDERR.puts "WARNING before_thread_exit hook failed with exception (#{e.class}) #{e.}" end end nil end
#trigger_before_thread_start_hooks (private)
[ GitHub ]# File 'lib/puma/thread_pool.rb', line 260
def trigger_before_thread_start_hooks return unless @before_thread_start&.any? @before_thread_start.each do |b| begin b[:block].call(ServerPluginControl.new(@server)) rescue Exception => e STDERR.puts "WARNING before_thread_start hook failed with exception (#{e.class}) #{e.}" end end nil end
#trigger_out_of_band_hook (private)
# File 'lib/puma/thread_pool.rb', line 291
def trigger_out_of_band_hook return false unless @out_of_band&.any? # we execute on idle hook when all threads are free return false unless @spawned == @waiting @out_of_band_running = true @out_of_band.each { |b| b[:block].call } true rescue Exception => e STDERR.puts "Exception calling out_of_band_hook: #{e.} (#{e.class})" true ensure @out_of_band_running = false @out_of_band_condvar.broadcast end
#trim(force = false)
If there are any free threads in the pool, tell one to go ahead and exit. If force is true, then a trim request is requested even if all threads are being utilized.
# File 'lib/puma/thread_pool.rb', line 366
def trim(force=false) with_mutex do free = @waiting - @todo.size if (force or free > 0) and @spawned - @trim_requested > @min @trim_requested += 1 @not_empty.signal end end end
#wait_while_out_of_band_running
[ GitHub ]# File 'lib/puma/thread_pool.rb', line 309
def wait_while_out_of_band_running return unless @out_of_band_running with_mutex do @out_of_band_condvar.wait(@mutex) while @out_of_band_running end end
#with_force_shutdown
Allows ThreadPool::ForceShutdown to be raised within the provided block if the thread is forced to shutdown during execution.
# File 'lib/puma/thread_pool.rb', line 428
def with_force_shutdown t = Thread.current @shutdown_mutex.synchronize do raise ForceShutdown if @force_shutdown t[:with_force_shutdown] = true end yield ensure t[:with_force_shutdown] = false end
#with_mutex(&block)
# File 'lib/puma/thread_pool.rb', line 318
def with_mutex(&block) @mutex.owned? ? yield : @mutex.synchronize(&block) end