Class: Puma::ThreadPool
Relationships & Source Files | |
Namespace Children | |
Classes:
| |
Exceptions:
| |
Inherits: | Object |
Defined in: | lib/puma/thread_pool.rb |
Overview
Internal Docs for A simple thread pool management object.
Each Puma “worker” has a thread pool to process requests.
First a connection to a client is made in Server
. It is wrapped in a Client
instance and then passed to the Reactor
to ensure the whole request is buffered into memory. Once the request is ready, it is passed into a thread pool via the #<< operator where it is stored in a @todo
array.
Each thread in the pool has an internal loop where it pulls a request from the @todo
array and processes it.
Constant Summary
-
SHUTDOWN_GRACE_TIME =
How long, after raising the
ThreadPool::ForceShutdown
of a thread during forced shutdown mode, to wait for the thread to try and finish up its work before leaving the thread to die on the vine.5
Class Method Summary
- .clean_thread_locals
-
.new(name, options = {}, &block) ⇒ ThreadPool
constructor
Maintain a minimum of
min
and maximum ofmax
threads in the pool.
Instance Attribute Summary
- #busy_threads readonly
- #pool_capacity readonly
- #spawned readonly
- #trim_requested readonly
- #waiting readonly
Instance Method Summary
-
#<<(work)
Add
work
to the todo list for a Thread to pickup and process. - #auto_reap!(timeout = @reaping_time)
- #auto_trim!(timeout = @auto_trim_time)
-
#backlog
How many objects have yet to be processed by the pool?
-
#reap
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
-
#shutdown(timeout = -1))
Tell all threads in the pool to exit and wait for them to finish.
-
#stats ⇒ Hash
generate stats hash so as not to perform multiple locks.
-
#trim(force = false)
If there are any free threads in the pool, tell one to go ahead and exit.
- #wait_for_less_busy_worker(delay_s)
-
#wait_until_not_full
This method is used by
Server
to let the server know when the thread pool can pull more requests from the socket and pass to the reactor. -
#with_force_shutdown
Allows
ForceShutdown
to be raised within the provided block if the thread is forced to shutdown during execution. - #with_mutex(&block)
- #trigger_before_thread_exit_hooks private
- #trigger_before_thread_start_hooks private
- #trigger_out_of_band_hook private
-
#spawn_thread
private
Internal use only
Must be called with @mutex held!
Constructor Details
.new(name, options = {}, &block) ⇒ ThreadPool
Maintain a minimum of min
and maximum of max
threads in the pool.
The block passed is the work that will be performed in each thread.
# File 'lib/puma/thread_pool.rb', line 34
def initialize(name, = {}, &block) @not_empty = ConditionVariable.new @not_full = ConditionVariable.new @mutex = Mutex.new @todo = [] @spawned = 0 @waiting = 0 @name = name @min = Integer( [:min_threads]) @max = Integer( [:max_threads]) # Not an 'exposed' option, options[:pool_shutdown_grace_time] is used in CI # to shorten @shutdown_grace_time from SHUTDOWN_GRACE_TIME. Parallel CI # makes stubbing constants difficult. @shutdown_grace_time = Float( [:pool_shutdown_grace_time] || SHUTDOWN_GRACE_TIME) @block = block @out_of_band = [:out_of_band] @clean_thread_locals = [:clean_thread_locals] @before_thread_start = [:before_thread_start] @before_thread_exit = [:before_thread_exit] @reaping_time = [:reaping_time] @auto_trim_time = [:auto_trim_time] @shutdown = false @trim_requested = 0 @out_of_band_pending = false @workers = [] @auto_trim = nil @reaper = nil @mutex.synchronize do @min.times do spawn_thread @not_full.wait(@mutex) end end @force_shutdown = false @shutdown_mutex = Mutex.new end
Class Method Details
.clean_thread_locals
[ GitHub ]# File 'lib/puma/thread_pool.rb', line 82
def self.clean_thread_locals Thread.current.keys.each do |key| # rubocop: disable Style/HashEachMethods Thread.current[key] = nil unless key == :__recursive_key__ end end
Instance Attribute Details
#busy_threads (readonly)
# File 'lib/puma/thread_pool.rb', line 113
def busy_threads with_mutex { @spawned - @waiting + @todo.size } end
#pool_capacity (readonly)
[ GitHub ]#spawned (readonly)
[ GitHub ]# File 'lib/puma/thread_pool.rb', line 80
attr_reader :spawned, :trim_requested, :waiting
#trim_requested (readonly)
[ GitHub ]#waiting (readonly)
[ GitHub ]# File 'lib/puma/thread_pool.rb', line 80
attr_reader :spawned, :trim_requested, :waiting
Instance Method Details
#<<(work)
Add work
to the todo list for a Thread to pickup and process.
# File 'lib/puma/thread_pool.rb', line 235
def <<(work) with_mutex do if @shutdown raise "Unable to add work while shutting down" end @todo << work if @waiting < @todo.size and @spawned < @max spawn_thread end @not_empty.signal end end
#auto_reap!(timeout = @reaping_time)
[ GitHub ]#auto_trim!(timeout = @auto_trim_time)
[ GitHub ]#backlog
How many objects have yet to be processed by the pool?
# File 'lib/puma/thread_pool.rb', line 102
def backlog with_mutex { @todo.size } end
#reap
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
# File 'lib/puma/thread_pool.rb', line 330
def reap with_mutex do dead_workers = @workers.reject(&:alive?) dead_workers.each do |worker| worker.kill @spawned -= 1 end @workers.delete_if do |w| dead_workers.include?(w) end end end
#shutdown(timeout = -1))
Tell all threads in the pool to exit and wait for them to finish. Wait timeout
seconds then raise ThreadPool::ForceShutdown
in remaining threads. Next, wait an extra @shutdown_grace_time
seconds then force-kill remaining threads. Finally, wait 1 second for remaining threads to exit.
# File 'lib/puma/thread_pool.rb', line 400
def shutdown(timeout=-1) threads = with_mutex do @shutdown = true @trim_requested = @spawned @not_empty.broadcast @not_full.broadcast @auto_trim&.stop @reaper&.stop # dup workers so that we join them all safely @workers.dup end if timeout == -1 # Wait for threads to finish without force shutdown. threads.each(&:join) else join = ->(inner_timeout) do start = Process.clock_gettime(Process::CLOCK_MONOTONIC) threads.reject! do |t| elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start t.join inner_timeout - elapsed end end # Wait timeout seconds for threads to finish. join.call(timeout) # If threads are still running, raise ForceShutdown and wait to finish. @shutdown_mutex.synchronize do @force_shutdown = true threads.each do |t| t.raise ForceShutdown if t[:with_force_shutdown] end end join.call(@shutdown_grace_time) # If threads are _still_ running, forcefully kill them and wait to finish. threads.each(&:kill) join.call(1) end @spawned = 0 @workers = [] end
#spawn_thread (private)
Must be called with @mutex held!
# File 'lib/puma/thread_pool.rb', line 121
def spawn_thread @spawned += 1 trigger_before_thread_start_hooks th = Thread.new(@spawned) do |spawned| Puma.set_thread_name '%s tp %03i' % [@name, spawned] todo = @todo block = @block mutex = @mutex not_empty = @not_empty not_full = @not_full while true work = nil mutex.synchronize do while todo.empty? if @trim_requested > 0 @trim_requested -= 1 @spawned -= 1 @workers.delete th not_full.signal trigger_before_thread_exit_hooks Thread.exit end @waiting += 1 if @out_of_band_pending && trigger_out_of_band_hook @out_of_band_pending = false end not_full.signal begin not_empty.wait mutex ensure @waiting -= 1 end end work = todo.shift end if @clean_thread_locals ThreadPool.clean_thread_locals end begin @out_of_band_pending = true if block.call(work) rescue Exception => e STDERR.puts "Error reached top of thread-pool: #{e.} (#{e.class})" end end end @workers << th th end
#stats ⇒ Hash
generate stats hash so as not to perform multiple locks
# File 'lib/puma/thread_pool.rb', line 90
def stats with_mutex do { backlog: @todo.size, running: @spawned, pool_capacity: @waiting + (@max - @spawned), busy_threads: @spawned - @waiting + @todo.size } end end
#trigger_before_thread_exit_hooks (private)
[ GitHub ]# File 'lib/puma/thread_pool.rb', line 196
def trigger_before_thread_exit_hooks return unless @before_thread_exit&.any? @before_thread_exit.each do |b| begin b.call rescue Exception => e STDERR.puts "WARNING before_thread_exit hook failed with exception (#{e.class}) #{e.}" end end nil end
#trigger_before_thread_start_hooks (private)
[ GitHub ]# File 'lib/puma/thread_pool.rb', line 181
def trigger_before_thread_start_hooks return unless @before_thread_start&.any? @before_thread_start.each do |b| begin b.call rescue Exception => e STDERR.puts "WARNING before_thread_start hook failed with exception (#{e.class}) #{e.}" end end nil end
#trigger_out_of_band_hook (private)
# File 'lib/puma/thread_pool.rb', line 212
def trigger_out_of_band_hook return false unless @out_of_band&.any? # we execute on idle hook when all threads are free return false unless @spawned == @waiting @out_of_band.each(&:call) true rescue Exception => e STDERR.puts "Exception calling out_of_band_hook: #{e.} (#{e.class})" true end
#trim(force = false)
If there are any free threads in the pool, tell one to go ahead and exit. If force
is true, then a trim request is requested even if all threads are being utilized.
# File 'lib/puma/thread_pool.rb', line 318
def trim(force=false) with_mutex do free = @waiting - @todo.size if (force or free > 0) and @spawned - @trim_requested > @min @trim_requested += 1 @not_empty.signal end end end
#wait_for_less_busy_worker(delay_s)
# File 'lib/puma/thread_pool.rb', line 294
def wait_for_less_busy_worker(delay_s) return unless delay_s && delay_s > 0 # Ruby MRI does GVL, this can result # in processing contention when multiple threads # (requests) are running concurrently return unless Puma.mri? with_mutex do return if @shutdown # do not delay, if we are not busy return unless busy_threads > 0 # this will be signaled once a request finishes, # which can happen earlier than delay @not_full.wait @mutex, delay_s end end
#wait_until_not_full
This method is used by Server
to let the server know when the thread pool can pull more requests from the socket and pass to the reactor.
The general idea is that the thread pool can only work on a fixed number of requests at the same time. If it is already processing that number of requests then it is at capacity. If another ::Puma
process has spare capacity, then the request can be left on the socket so the other worker can pick it up and process it.
For example: if there are 5 threads, but only 4 working on requests, this method will not wait and the Server
can pull a request right away.
If there are 5 threads and all 5 of them are busy, then it will pause here, and wait until the not_full
condition variable is signaled, usually this indicates that a request has been processed.
It’s important to note that even though the server might accept another request, it might not be added to the @todo
array right away. For example if a slow client has only sent a header, but not a body then the @todo
array would stay the same size as the reactor works to try to buffer the request. In that scenario the next call to this method would not block and another request would be added into the reactor by the server. This would continue until a fully buffered request makes it through the reactor and can then be processed by the thread pool.
# File 'lib/puma/thread_pool.rb', line 277
def wait_until_not_full with_mutex do while true return if @shutdown # If we can still spin up new threads and there # is work queued that cannot be handled by waiting # threads, then accept more work until we would # spin up the max number of threads. return if busy_threads < @max @not_full.wait @mutex end end end
#with_force_shutdown
Allows ThreadPool::ForceShutdown
to be raised within the provided block if the thread is forced to shutdown during execution.
# File 'lib/puma/thread_pool.rb', line 384
def with_force_shutdown t = Thread.current @shutdown_mutex.synchronize do raise ForceShutdown if @force_shutdown t[:with_force_shutdown] = true end yield ensure t[:with_force_shutdown] = false end
#with_mutex(&block)
# File 'lib/puma/thread_pool.rb', line 228
def with_mutex(&block) @mutex.owned? ? yield : @mutex.synchronize(&block) end