123456789_123456789_123456789_123456789_123456789_

Class: Puma::ThreadPool

Relationships & Source Files
Namespace Children
Classes:
Exceptions:
Inherits: Object
Defined in: lib/puma/thread_pool.rb

Overview

Internal Docs for A simple thread pool management object.

Each Puma “worker” has a thread pool to process requests.

First a connection to a client is made in Server. It is wrapped in a Client instance and then passed to the Reactor to ensure the whole request is buffered into memory. Once the request is ready, it is passed into a thread pool via the #<< operator where it is stored in a @todo array.

Each thread in the pool has an internal loop where it pulls a request from the @todo array and processes it.

Constant Summary

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(name, options = {}, &block) ⇒ ThreadPool

Maintain a minimum of min and maximum of max threads in the pool.

The block passed is the work that will be performed in each thread.

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 36

def initialize(name, options = {}, &block)
  @not_empty = ConditionVariable.new
  @not_full = ConditionVariable.new
  @mutex = Mutex.new
  @todo = Queue.new

  @backlog_max = 0
  @spawned = 0
  @waiting = 0

  @name = name
  @min = Integer(options[:min_threads])
  @max = Integer(options[:max_threads])
  # Not an 'exposed' option, options[:pool_shutdown_grace_time] is used in CI
  # to shorten @shutdown_grace_time from SHUTDOWN_GRACE_TIME. Parallel CI
  # makes stubbing constants difficult.
  @shutdown_grace_time = Float(options[:pool_shutdown_grace_time] || SHUTDOWN_GRACE_TIME)
  @block = block
  @out_of_band = options[:out_of_band]
  @out_of_band_running = false
  @clean_thread_locals = options[:clean_thread_locals]
  @before_thread_start = options[:before_thread_start]
  @before_thread_exit = options[:before_thread_exit]
  @reaping_time = options[:reaping_time]
  @auto_trim_time = options[:auto_trim_time]

  @shutdown = false

  @trim_requested = 0
  @out_of_band_pending = false

  @workers = []

  @auto_trim = nil
  @reaper = nil

  @mutex.synchronize do
    @min.times do
      spawn_thread
      @not_full.wait(@mutex)
    end
  end

  @force_shutdown = false
  @shutdown_mutex = Mutex.new
end

Class Method Details

.clean_thread_locals

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 85

def self.clean_thread_locals
  Thread.current.keys.each do |key| # rubocop: disable Style/HashEachMethods
    Thread.current[key] = nil unless key == :__recursive_key__
  end
end

Instance Attribute Details

#busy_threads (readonly)

Version:

  • 5.0.0

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 129

def busy_threads
  with_mutex { @spawned - @waiting + @todo.size }
end

#out_of_band_running (readonly)

seconds

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 28

attr_reader :out_of_band_running

#pool_capacity (readonly)

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 123

def pool_capacity
  waiting + (@max - spawned)
end

#spawned (readonly)

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 83

attr_reader :spawned, :trim_requested, :waiting

#trim_requested (readonly)

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 83

attr_reader :spawned, :trim_requested, :waiting

#waiting (readonly)

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 83

attr_reader :spawned, :trim_requested, :waiting

Instance Method Details

#<<(work)

Add work to the todo list for a Thread to pickup and process.

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 253

def <<(work)
  with_mutex do
    if @shutdown
      raise "Unable to add work while shutting down"
    end

    @todo << work
    t = @todo.size
    @backlog_max = t if t > @backlog_max

    if @waiting < @todo.size and @spawned < @max
      spawn_thread
    end

    @not_empty.signal
  end
end

#auto_reap!(timeout = @reaping_time)

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 334

def auto_reap!(timeout=@reaping_time)
  @reaper = Automaton.new(self, timeout, "#{@name} tp reap", :reap)
  @reaper.start!
end

#auto_trim!(timeout = @auto_trim_time)

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 329

def auto_trim!(timeout=@auto_trim_time)
  @auto_trim = Automaton.new(self, timeout, "#{@name} tp trim", :trim)
  @auto_trim.start!
end

#backlog

How many objects have yet to be processed by the pool?

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 112

def backlog
  with_mutex { @todo.size }
end

#backlog_max

The maximum size of the backlog

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 118

def backlog_max
  with_mutex { @backlog_max }
end

#reap

If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 287

def reap
  with_mutex do
    dead_workers = @workers.reject(&:alive?)

    dead_workers.each do |worker|
      worker.kill
      @spawned -= 1
    end

    @workers.delete_if do |w|
      dead_workers.include?(w)
    end
  end
end

#reset_max

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 106

def reset_max
  with_mutex { @backlog_max = 0 }
end

#shutdown(timeout = -1))

Tell all threads in the pool to exit and wait for them to finish. Wait timeout seconds then raise ThreadPool::ForceShutdown in remaining threads. Next, wait an extra @shutdown_grace_time seconds then force-kill remaining threads. Finally, wait 1 second for remaining threads to exit.

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 357

def shutdown(timeout=-1)
  threads = with_mutex do
    @shutdown = true
    @trim_requested = @spawned
    @not_empty.broadcast
    @not_full.broadcast

    @auto_trim&.stop
    @reaper&.stop
    # dup workers so that we join them all safely
    @workers.dup
  end

  if timeout == -1
    # Wait for threads to finish without force shutdown.
    threads.each(&:join)
  else
    join = ->(inner_timeout) do
      start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      threads.reject! do |t|
        elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
        t.join inner_timeout - elapsed
      end
    end

    # Wait timeout seconds for threads to finish.
    join.call(timeout)

    # If threads are still running, raise ForceShutdown and wait to finish.
    @shutdown_mutex.synchronize do
      @force_shutdown = true
      threads.each do |t|
        t.raise ForceShutdown if t[:with_force_shutdown]
      end
    end
    join.call(@shutdown_grace_time)

    # If threads are _still_ running, forcefully kill them and wait to finish.
    threads.each(&:kill)
    join.call(1)
  end

  @spawned = 0
  @workers = []
end

#spawn_thread (private)

This method is for internal use only.

Must be called with @mutex held!

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 137

def spawn_thread
  @spawned += 1

  trigger_before_thread_start_hooks
  th = Thread.new(@spawned) do |spawned|
    Puma.set_thread_name '%s tp %03i' % [@name, spawned]
    todo  = @todo
    block = @block
    mutex = @mutex
    not_empty = @not_empty
    not_full = @not_full

    while true
      work = nil

      mutex.synchronize do
        while todo.empty?
          if @trim_requested > 0
            @trim_requested -= 1
            @spawned -= 1
            @workers.delete th
            not_full.signal
            trigger_before_thread_exit_hooks
            Thread.exit
          end

          @waiting += 1
          if @out_of_band_pending && trigger_out_of_band_hook
            @out_of_band_pending = false
          end
          not_full.signal
          begin
            not_empty.wait mutex
          ensure
            @waiting -= 1
          end
        end

        work = todo.shift
      end

      if @clean_thread_locals
        ThreadPool.clean_thread_locals
      end

      begin
        @out_of_band_pending = true if block.call(work)
      rescue Exception => e
        STDERR.puts "Error reached top of thread-pool: #{e.message} (#{e.class})"
      end
    end
  end

  @workers << th

  th
end

#statsHash

generate stats hash so as not to perform multiple locks

Returns:

  • (Hash)

    hash containing stat info from ThreadPool

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 93

def stats
  with_mutex do
    temp = @backlog_max
    @backlog_max = 0
    { backlog: @todo.size,
      running: @spawned,
      pool_capacity: @waiting + (@max - @spawned),
      busy_threads: @spawned - @waiting + @todo.size,
      backlog_max: temp
    }
  end
end

#trigger_before_thread_exit_hooks (private)

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 212

def trigger_before_thread_exit_hooks
  return unless @before_thread_exit&.any?

  @before_thread_exit.each do |b|
    begin
      b.call
    rescue Exception => e
      STDERR.puts "WARNING before_thread_exit hook failed with exception (#{e.class}) #{e.message}"
    end
  end
  nil
end

#trigger_before_thread_start_hooks (private)

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 197

def trigger_before_thread_start_hooks
  return unless @before_thread_start&.any?

  @before_thread_start.each do |b|
    begin
      b.call
    rescue Exception => e
      STDERR.puts "WARNING before_thread_start hook failed with exception (#{e.class}) #{e.message}"
    end
  end
  nil
end

#trigger_out_of_band_hook (private)

Version:

  • 5.0.0

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 228

def trigger_out_of_band_hook
  return false unless @out_of_band&.any?

  # we execute on idle hook when all threads are free
  return false unless @spawned == @waiting
  @out_of_band_running = true
  @out_of_band.each(&:call)
  true
rescue Exception => e
  STDERR.puts "Exception calling out_of_band_hook: #{e.message} (#{e.class})"
  true
ensure
  @out_of_band_running = false
end

#trim(force = false)

If there are any free threads in the pool, tell one to go ahead and exit. If force is true, then a trim request is requested even if all threads are being utilized.

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 275

def trim(force=false)
  with_mutex do
    free = @waiting - @todo.size
    if (force or free > 0) and @spawned - @trim_requested > @min
      @trim_requested += 1
      @not_empty.signal
    end
  end
end

#with_force_shutdown

Allows ThreadPool::ForceShutdown to be raised within the provided block if the thread is forced to shutdown during execution.

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 341

def with_force_shutdown
  t = Thread.current
  @shutdown_mutex.synchronize do
    raise ForceShutdown if @force_shutdown
    t[:with_force_shutdown] = true
  end
  yield
ensure
  t[:with_force_shutdown] = false
end

#with_mutex(&block)

Version:

  • 5.0.0

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 246

def with_mutex(&block)
  @mutex.owned? ?
    yield :
    @mutex.synchronize(&block)
end