Class: Puma::ThreadPool
Relationships & Source Files | |
Namespace Children | |
Classes:
| |
Exceptions:
| |
Inherits: | Object |
Defined in: | lib/puma/thread_pool.rb |
Overview
Internal Docs for A simple thread pool management object.
Each Puma “worker” has a thread pool to process requests.
First a connection to a client is made in Server
. It is wrapped in a Client
instance and then passed to the Reactor
to ensure the whole request is buffered into memory. Once the request is ready, it is passed into a thread pool via the #<< operator where it is stored in a @todo
array.
Each thread in the pool has an internal loop where it pulls a request from the @todo
array and processes it.
Constant Summary
-
SHUTDOWN_GRACE_TIME =
How long, after raising the
ThreadPool::ForceShutdown
of a thread during forced shutdown mode, to wait for the thread to try and finish up its work before leaving the thread to die on the vine.5
Class Method Summary
- .clean_thread_locals
-
.new(name, options = {}, &block) ⇒ ThreadPool
constructor
Maintain a minimum of
min
and maximum ofmax
threads in the pool.
Instance Attribute Summary
- #busy_threads readonly
-
#out_of_band_running
readonly
seconds.
- #pool_capacity readonly
- #spawned readonly
- #trim_requested readonly
- #waiting readonly
Instance Method Summary
-
#<<(work)
Add
work
to the todo list for a Thread to pickup and process. - #auto_reap!(timeout = @reaping_time)
- #auto_trim!(timeout = @auto_trim_time)
-
#backlog
How many objects have yet to be processed by the pool?
-
#backlog_max
The maximum size of the backlog.
-
#reap
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
- #reset_max
-
#shutdown(timeout = -1))
Tell all threads in the pool to exit and wait for them to finish.
-
#stats ⇒ Hash
generate stats hash so as not to perform multiple locks.
-
#trim(force = false)
If there are any free threads in the pool, tell one to go ahead and exit.
-
#with_force_shutdown
Allows
ForceShutdown
to be raised within the provided block if the thread is forced to shutdown during execution. - #with_mutex(&block)
- #trigger_before_thread_exit_hooks private
- #trigger_before_thread_start_hooks private
- #trigger_out_of_band_hook private
-
#spawn_thread
private
Internal use only
Must be called with @mutex held!
Constructor Details
.new(name, options = {}, &block) ⇒ ThreadPool
Maintain a minimum of min
and maximum of max
threads in the pool.
The block passed is the work that will be performed in each thread.
# File 'lib/puma/thread_pool.rb', line 36
def initialize(name, = {}, &block) @not_empty = ConditionVariable.new @not_full = ConditionVariable.new @mutex = Mutex.new @todo = Queue.new @backlog_max = 0 @spawned = 0 @waiting = 0 @name = name @min = Integer( [:min_threads]) @max = Integer( [:max_threads]) # Not an 'exposed' option, options[:pool_shutdown_grace_time] is used in CI # to shorten @shutdown_grace_time from SHUTDOWN_GRACE_TIME. Parallel CI # makes stubbing constants difficult. @shutdown_grace_time = Float( [:pool_shutdown_grace_time] || SHUTDOWN_GRACE_TIME) @block = block @out_of_band = [:out_of_band] @out_of_band_running = false @clean_thread_locals = [:clean_thread_locals] @before_thread_start = [:before_thread_start] @before_thread_exit = [:before_thread_exit] @reaping_time = [:reaping_time] @auto_trim_time = [:auto_trim_time] @shutdown = false @trim_requested = 0 @out_of_band_pending = false @workers = [] @auto_trim = nil @reaper = nil @mutex.synchronize do @min.times do spawn_thread @not_full.wait(@mutex) end end @force_shutdown = false @shutdown_mutex = Mutex.new end
Class Method Details
.clean_thread_locals
[ GitHub ]# File 'lib/puma/thread_pool.rb', line 85
def self.clean_thread_locals Thread.current.keys.each do |key| # rubocop: disable Style/HashEachMethods Thread.current[key] = nil unless key == :__recursive_key__ end end
Instance Attribute Details
#busy_threads (readonly)
# File 'lib/puma/thread_pool.rb', line 129
def busy_threads with_mutex { @spawned - @waiting + @todo.size } end
#out_of_band_running (readonly)
seconds
# File 'lib/puma/thread_pool.rb', line 28
attr_reader :out_of_band_running
#pool_capacity (readonly)
[ GitHub ]#spawned (readonly)
[ GitHub ]# File 'lib/puma/thread_pool.rb', line 83
attr_reader :spawned, :trim_requested, :waiting
#trim_requested (readonly)
[ GitHub ]#waiting (readonly)
[ GitHub ]# File 'lib/puma/thread_pool.rb', line 83
attr_reader :spawned, :trim_requested, :waiting
Instance Method Details
#<<(work)
Add work
to the todo list for a Thread to pickup and process.
# File 'lib/puma/thread_pool.rb', line 253
def <<(work) with_mutex do if @shutdown raise "Unable to add work while shutting down" end @todo << work t = @todo.size @backlog_max = t if t > @backlog_max if @waiting < @todo.size and @spawned < @max spawn_thread end @not_empty.signal end end
#auto_reap!(timeout = @reaping_time)
[ GitHub ]#auto_trim!(timeout = @auto_trim_time)
[ GitHub ]#backlog
How many objects have yet to be processed by the pool?
# File 'lib/puma/thread_pool.rb', line 112
def backlog with_mutex { @todo.size } end
#backlog_max
The maximum size of the backlog
# File 'lib/puma/thread_pool.rb', line 118
def backlog_max with_mutex { @backlog_max } end
#reap
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
# File 'lib/puma/thread_pool.rb', line 287
def reap with_mutex do dead_workers = @workers.reject(&:alive?) dead_workers.each do |worker| worker.kill @spawned -= 1 end @workers.delete_if do |w| dead_workers.include?(w) end end end
#reset_max
[ GitHub ]# File 'lib/puma/thread_pool.rb', line 106
def reset_max with_mutex { @backlog_max = 0 } end
#shutdown(timeout = -1))
Tell all threads in the pool to exit and wait for them to finish. Wait timeout
seconds then raise ThreadPool::ForceShutdown
in remaining threads. Next, wait an extra @shutdown_grace_time
seconds then force-kill remaining threads. Finally, wait 1 second for remaining threads to exit.
# File 'lib/puma/thread_pool.rb', line 357
def shutdown(timeout=-1) threads = with_mutex do @shutdown = true @trim_requested = @spawned @not_empty.broadcast @not_full.broadcast @auto_trim&.stop @reaper&.stop # dup workers so that we join them all safely @workers.dup end if timeout == -1 # Wait for threads to finish without force shutdown. threads.each(&:join) else join = ->(inner_timeout) do start = Process.clock_gettime(Process::CLOCK_MONOTONIC) threads.reject! do |t| elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start t.join inner_timeout - elapsed end end # Wait timeout seconds for threads to finish. join.call(timeout) # If threads are still running, raise ForceShutdown and wait to finish. @shutdown_mutex.synchronize do @force_shutdown = true threads.each do |t| t.raise ForceShutdown if t[:with_force_shutdown] end end join.call(@shutdown_grace_time) # If threads are _still_ running, forcefully kill them and wait to finish. threads.each(&:kill) join.call(1) end @spawned = 0 @workers = [] end
#spawn_thread (private)
Must be called with @mutex held!
# File 'lib/puma/thread_pool.rb', line 137
def spawn_thread @spawned += 1 trigger_before_thread_start_hooks th = Thread.new(@spawned) do |spawned| Puma.set_thread_name '%s tp %03i' % [@name, spawned] todo = @todo block = @block mutex = @mutex not_empty = @not_empty not_full = @not_full while true work = nil mutex.synchronize do while todo.empty? if @trim_requested > 0 @trim_requested -= 1 @spawned -= 1 @workers.delete th not_full.signal trigger_before_thread_exit_hooks Thread.exit end @waiting += 1 if @out_of_band_pending && trigger_out_of_band_hook @out_of_band_pending = false end not_full.signal begin not_empty.wait mutex ensure @waiting -= 1 end end work = todo.shift end if @clean_thread_locals ThreadPool.clean_thread_locals end begin @out_of_band_pending = true if block.call(work) rescue Exception => e STDERR.puts "Error reached top of thread-pool: #{e.} (#{e.class})" end end end @workers << th th end
#stats ⇒ Hash
generate stats hash so as not to perform multiple locks
# File 'lib/puma/thread_pool.rb', line 93
def stats with_mutex do temp = @backlog_max @backlog_max = 0 { backlog: @todo.size, running: @spawned, pool_capacity: @waiting + (@max - @spawned), busy_threads: @spawned - @waiting + @todo.size, backlog_max: temp } end end
#trigger_before_thread_exit_hooks (private)
[ GitHub ]# File 'lib/puma/thread_pool.rb', line 212
def trigger_before_thread_exit_hooks return unless @before_thread_exit&.any? @before_thread_exit.each do |b| begin b.call rescue Exception => e STDERR.puts "WARNING before_thread_exit hook failed with exception (#{e.class}) #{e.}" end end nil end
#trigger_before_thread_start_hooks (private)
[ GitHub ]# File 'lib/puma/thread_pool.rb', line 197
def trigger_before_thread_start_hooks return unless @before_thread_start&.any? @before_thread_start.each do |b| begin b.call rescue Exception => e STDERR.puts "WARNING before_thread_start hook failed with exception (#{e.class}) #{e.}" end end nil end
#trigger_out_of_band_hook (private)
# File 'lib/puma/thread_pool.rb', line 228
def trigger_out_of_band_hook return false unless @out_of_band&.any? # we execute on idle hook when all threads are free return false unless @spawned == @waiting @out_of_band_running = true @out_of_band.each(&:call) true rescue Exception => e STDERR.puts "Exception calling out_of_band_hook: #{e.} (#{e.class})" true ensure @out_of_band_running = false end
#trim(force = false)
If there are any free threads in the pool, tell one to go ahead and exit. If force
is true, then a trim request is requested even if all threads are being utilized.
# File 'lib/puma/thread_pool.rb', line 275
def trim(force=false) with_mutex do free = @waiting - @todo.size if (force or free > 0) and @spawned - @trim_requested > @min @trim_requested += 1 @not_empty.signal end end end
#with_force_shutdown
Allows ThreadPool::ForceShutdown
to be raised within the provided block if the thread is forced to shutdown during execution.
# File 'lib/puma/thread_pool.rb', line 341
def with_force_shutdown t = Thread.current @shutdown_mutex.synchronize do raise ForceShutdown if @force_shutdown t[:with_force_shutdown] = true end yield ensure t[:with_force_shutdown] = false end
#with_mutex(&block)
# File 'lib/puma/thread_pool.rb', line 246
def with_mutex(&block) @mutex.owned? ? yield : @mutex.synchronize(&block) end