Class: Puma::Cluster
Relationships & Source Files | |
Namespace Children | |
Classes:
| |
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
self,
Runner
|
|
Instance Chain:
self,
Runner
|
|
Inherits: |
Puma::Runner
|
Defined in: | lib/puma/cluster.rb, lib/puma/cluster/worker.rb, lib/puma/cluster/worker_handle.rb |
Overview
This class is instantiated by the Launcher
and used to boot and serve a Ruby application when puma “workers” are needed i.e. when using multi-processes. For example $ puma -w 5
An instance of this class will spawn the number of processes passed in via the #spawn_workers method call. Each worker will have it’s own instance of a Server
.
Class Method Summary
Instance Attribute Summary
- #all_workers_booted? ⇒ Boolean readonly
- #all_workers_idle_timed_out? ⇒ Boolean readonly
- #all_workers_in_phase? ⇒ Boolean readonly
- #next_worker_index readonly
- #preload? ⇒ Boolean readonly
-
#stats
readonly
Inside of a child process, this will return all zeroes, as @workers is only populated in the master process.
-
#workers ⇒ Array<Puma::Cluster::WorkerHandle>
readonly
Returns the list of cluster worker handles.
Runner
- Inherited
#app, #development?, | |
#options | Returns the hash of configuration options. |
#redirected_io?, #ruby_engine, #test? |
Instance Method Summary
- #check_workers
- #cull_start_index(diff)
- #cull_workers
- #fork_worker!
- #halt
- #phased_restart(refork = false)
- #redirect_io
- #reload_worker_directory
- #restart
- #run
-
#setup_signals
We do this in a separate method to keep the lambda scope of the signals handlers as small as possible.
- #spawn_worker(idx, master)
- #spawn_workers
- #start_phased_restart
- #stop
- #stop_blocked
- #stop_workers
- #worker(index, master)
- #worker_at(idx)
- #workers_to_cull(diff)
- #idle_timed_out_worker_pids private
- #idle_workers private
- #single_worker_warning private
- #timeout_workers private
-
#wait_workers
private
loops thru @workers, removing workers that exited, and calling
#term
if needed.
Runner
- Inherited
#close_control_listeners, #debug, #error, #load_and_bind, #log, #output_header, #redirect_io, #start_control, #start_server, #stop_control, #wakeup!, | |
#debug_loaded_extensions | this method call should always be guarded by |
#ensure_output_directory_exists, #stats, #utc_iso8601 |
Constructor Details
.new(launcher) ⇒ Cluster
# File 'lib/puma/cluster.rb', line 18
def initialize(launcher) super(launcher) @phase = 0 @workers = [] @next_check = Time.now @phased_restart = false end
Instance Attribute Details
#all_workers_booted? ⇒ Boolean
(readonly)
[ GitHub ]
# File 'lib/puma/cluster.rb', line 153
def all_workers_booted? @workers.count { |w| !w.booted? } == 0 end
#all_workers_idle_timed_out? ⇒ Boolean
(readonly)
[ GitHub ]
# File 'lib/puma/cluster.rb', line 161
def all_workers_idle_timed_out? (@workers.map(&:pid) - idle_timed_out_worker_pids).empty? end
#all_workers_in_phase? ⇒ Boolean
(readonly)
[ GitHub ]
# File 'lib/puma/cluster.rb', line 157
def all_workers_in_phase? @workers.all? { |w| w.phase == @phase } end
#next_worker_index (readonly)
[ GitHub ]# File 'lib/puma/cluster.rb', line 142
def next_worker_index occupied_positions = @workers.map(&:index) idx = 0 idx += 1 until !occupied_positions.include?(idx) idx end
#preload? ⇒ Boolean
(readonly)
[ GitHub ]
# File 'lib/puma/cluster.rb', line 283
def preload? @options[:preload_app] end
#stats (readonly)
Inside of a child process, this will return all zeroes, as @workers is only populated in the master process.
# File 'lib/puma/cluster.rb', line 259
def stats old_worker_count = @workers.count { |w| w.phase != @phase } worker_status = @workers.map do |w| { started_at: utc_iso8601(w.started_at), pid: w.pid, index: w.index, phase: w.phase, booted: w.booted?, last_checkin: utc_iso8601(w.last_checkin), last_status: w.last_status, } end { started_at: utc_iso8601(@started_at), workers: @workers.size, phase: @phase, booted_workers: worker_status.count { |w| w[:booted] }, old_workers: old_worker_count, worker_status: worker_status, }.merge(super) end
#workers ⇒ Array
<Puma::Cluster::WorkerHandle> (readonly)
Returns the list of cluster worker handles.
# File 'lib/puma/cluster.rb', line 30
attr_reader :workers
Instance Method Details
#check_workers
[ GitHub ]# File 'lib/puma/cluster.rb', line 165
def check_workers return if @next_check >= Time.now @next_check = Time.now + @options[:worker_check_interval] timeout_workers wait_workers cull_workers spawn_workers if all_workers_booted? # If we're running at proper capacity, check to see if # we need to phase any workers out (which will restart # in the right phase). # w = @workers.find { |x| x.phase != @phase } if w log "- Stopping #{w.pid} for phased upgrade..." unless w.term? w.term log "- #{w.signal} sent to #{w.pid}..." end end end t = @workers.reject(&:term?) t.map!(&:ping_timeout) @next_check = [t.min, @next_check].compact.min end
#cull_start_index(diff)
[ GitHub ]# File 'lib/puma/cluster.rb', line 132
def cull_start_index(diff) case @options[:worker_culling_strategy] when :oldest 0 else # :youngest -diff end end
#cull_workers
[ GitHub ]# File 'lib/puma/cluster.rb', line 108
def cull_workers diff = @workers.size - @options[:workers] return if diff < 1 debug "Culling #{diff} workers" workers = workers_to_cull(diff) debug "Workers to cull: #{workers.inspect}" workers.each do |worker| log "- Worker #{worker.index} (PID: #{worker.pid}) terminating" worker.term end end
#fork_worker!
# File 'lib/puma/cluster.rb', line 288
def fork_worker! if (worker = worker_at 0) worker.phase += 1 end phased_restart(true) end
#halt
[ GitHub ]# File 'lib/puma/cluster.rb', line 245
def halt @status = :halt wakeup! end
#idle_timed_out_worker_pids (private)
[ GitHub ]# File 'lib/puma/cluster.rb', line 608
def idle_timed_out_worker_pids idle_workers.keys end
#idle_workers (private)
[ GitHub ]# File 'lib/puma/cluster.rb', line 612
def idle_workers @idle_workers ||= {} end
#phased_restart(refork = false)
[ GitHub ]# File 'lib/puma/cluster.rb', line 224
def phased_restart(refork = false) return false if @options[:preload_app] && !refork @phased_restart = true wakeup! true end
#redirect_io
[ GitHub ]# File 'lib/puma/cluster.rb', line 59
def redirect_io super @workers.each { |x| x.hup } end
#reload_worker_directory
[ GitHub ]# File 'lib/puma/cluster.rb', line 250
def reload_worker_directory dir = @launcher.restart_dir log "+ Changing to #{dir}" Dir.chdir dir end
#restart
[ GitHub ]# File 'lib/puma/cluster.rb', line 219
def restart @restart = true stop end
#run
[ GitHub ]# File 'lib/puma/cluster.rb', line 348
def run @status = :run output_header "cluster" # This is aligned with the output from Runner, see Runner#output_header log "* Workers: #{@options[:workers]}" if preload? # Threads explicitly marked as fork safe will be ignored. Used in Rails, # but may be used by anyone. Note that we need to explicit # Process::Waiter check here because there's a bug in Ruby 2.6 and below # where calling thread_variable_get on a Process::Waiter will segfault. # We can drop that clause once those versions of Ruby are no longer # supported. fork_safe = ->(t) { !t.is_a?(Process::Waiter) && t.thread_variable_get(:fork_safe) } before = Thread.list.reject(&fork_safe) log "* Restarts: (\u2714) hot (\u2716) phased" log "* Preloading application" load_and_bind after = Thread.list.reject(&fork_safe) if after.size > before.size threads = (after - before) if threads.first.respond_to? :backtrace log "! WARNING: Detected #{after.size-before.size} Thread(s) started in app boot:" threads.each do |t| log "! #{t.inspect} - #{t.backtrace ? t.backtrace.first : ''}" end else log "! WARNING: Detected #{after.size-before.size} Thread(s) started in app boot" end end else log "* Restarts: (\u2714) hot (\u2714) phased" unless @config.app_configured? error "No application configured, nothing to run" exit 1 end @launcher.binder.parse @options[:binds] end read, @wakeup = Puma::Util.pipe setup_signals # Used by the workers to detect if the master process dies. # If select says that @check_pipe is ready, it's because the # master has exited and @suicide_pipe has been automatically # closed. # @check_pipe, @suicide_pipe = Puma::Util.pipe # Separate pipe used by worker 0 to receive commands to # fork new worker processes. @fork_pipe, @fork_writer = Puma::Util.pipe log "Use Ctrl-C to stop" single_worker_warning redirect_io Plugins.fire_background @launcher.write_state start_control @master_read, @worker_write = read, @wakeup @options[:worker_write] = @worker_write @config.run_hooks(:before_fork, nil, @log_writer) spawn_workers Signal.trap "SIGINT" do stop end begin booted = false in_phased_restart = false workers_not_booted = @options[:workers] while @status == :run begin if @options[:idle_timeout] && all_workers_idle_timed_out? log "- All workers reached idle timeout" break end if @phased_restart start_phased_restart @phased_restart = false in_phased_restart = true workers_not_booted = @options[:workers] end check_workers if read.wait_readable([0, @next_check - Time.now].max) req = read.read_nonblock(1) next unless req if req == Puma::Const::PipeRequest::WAKEUP @next_check = Time.now next end result = read.gets pid = result.to_i if req == Puma::Const::PipeRequest::BOOT || req == Puma::Const::PipeRequest::FORK pid, idx = result.split(':').map(&:to_i) w = worker_at idx w.pid = pid if w.pid.nil? end if w = @workers.find { |x| x.pid == pid } case req when Puma::Const::PipeRequest::BOOT w.boot! log "- Worker #{w.index} (PID: #{pid}) booted in #{w.uptime.round(2)}s, phase: #{w.phase}" @next_check = Time.now workers_not_booted -= 1 when Puma::Const::PipeRequest::EXTERNAL_TERM # external term, see worker method, Signal.trap "SIGTERM" w.term! when Puma::Const::PipeRequest::TERM w.term unless w.term? when Puma::Const::PipeRequest::PING status = result.sub(/^\d+/,'').chomp w.ping!(status) @events.fire(:ping!, w) if in_phased_restart && workers_not_booted.positive? && w0 = worker_at(0) w0.ping!(status) @events.fire(:ping!, w0) end if !booted && @workers.none? {|worker| worker.last_status.empty?} @events.fire_on_booted! debug_loaded_extensions("Loaded Extensions - master:") if @log_writer.debug? booted = true end when Puma::Const::PipeRequest::IDLE if idle_workers[pid] idle_workers.delete pid else idle_workers[pid] = true end end else log "! Out-of-sync worker list, no #{pid} worker" end end if in_phased_restart && workers_not_booted.zero? @events.fire_on_booted! debug_loaded_extensions("Loaded Extensions - master:") if @log_writer.debug? in_phased_restart = false end rescue Interrupt @status = :stop end end stop_workers unless @status == :halt ensure @check_pipe.close @suicide_pipe.close read.close @wakeup.close end end
#setup_signals
We do this in a separate method to keep the lambda scope of the signals handlers as small as possible.
# File 'lib/puma/cluster.rb', line 297
def setup_signals if @options[:fork_worker] Signal.trap "SIGURG" do fork_worker! end # Auto-fork after the specified number of requests. if (fork_requests = @options[:fork_worker].to_i) > 0 @events.register(:ping!) do |w| fork_worker! if w.index == 0 && w.phase == 0 && w.last_status[:requests_count] >= fork_requests end end end Signal.trap "SIGCHLD" do wakeup! end Signal.trap "TTIN" do @options[:workers] += 1 wakeup! end Signal.trap "TTOU" do @options[:workers] -= 1 if @options[:workers] >= 2 wakeup! end master_pid = Process.pid Signal.trap "SIGTERM" do # The worker installs their own SIGTERM when booted. # Until then, this is run by the worker and the worker # should just exit if they get it. if Process.pid != master_pid log "Early termination of worker" exit! 0 else @launcher.close_binder_listeners stop_workers stop @events.fire_on_stopped! raise(SignalException, "SIGTERM") if @options[:raise_exception_on_sigterm] exit 0 # Clean exit, workers were stopped end end end
#single_worker_warning (private)
[ GitHub ]# File 'lib/puma/cluster.rb', line 533
def single_worker_warning return if @options[:workers] != 1 || @options[:silence_single_worker_warning] log "! WARNING: Detected running cluster mode with 1 worker." log "! Running Puma in cluster mode with a single worker is often a misconfiguration." log "! Consider running Puma in single-mode (workers = 0) in order to reduce memory overhead." log "! Set the `silence_single_worker_warning` option to silence this warning message." end
#spawn_worker(idx, master)
# File 'lib/puma/cluster.rb', line 94
def spawn_worker(idx, master) @config.run_hooks(:before_worker_fork, idx, @log_writer) pid = fork { worker(idx, master) } if !pid log "! Complete inability to spawn new workers detected" log "! Seppuku is the only choice." exit! 1 end @config.run_hooks(:after_worker_fork, idx, @log_writer) pid end
#spawn_workers
[ GitHub ]# File 'lib/puma/cluster.rb', line 65
def spawn_workers diff = @options[:workers] - @workers.size return if diff < 1 master = Process.pid if @options[:fork_worker] @fork_writer << "-1\n" end diff.times do idx = next_worker_index if @options[:fork_worker] && idx != 0 @fork_writer << "#{idx}\n" pid = nil else pid = spawn_worker(idx, master) end debug "Spawned worker: #{pid}" @workers << WorkerHandle.new(idx, pid, @phase, @options) end if @options[:fork_worker] && all_workers_in_phase? @fork_writer << "0\n" end end
#start_phased_restart
[ GitHub ]# File 'lib/puma/cluster.rb', line 47
def start_phased_restart @events.fire_on_restart! @phase += 1 log "- Starting phased worker restart, phase: #{@phase}" # Be sure to change the directory again before loading # the app. This way we can pick up new code. dir = @launcher.restart_dir log "+ Changing to #{dir}" Dir.chdir dir end
#stop
[ GitHub ]# File 'lib/puma/cluster.rb', line 233
def stop @status = :stop wakeup! end
#stop_blocked
[ GitHub ]#stop_workers
[ GitHub ]# File 'lib/puma/cluster.rb', line 32
def stop_workers log "- Gracefully shutting down workers..." @workers.each { |x| x.term } begin loop do wait_workers break if @workers.reject {|w| w.pid.nil?}.empty? sleep 0.2 end rescue Interrupt log "! Cancelled waiting for workers" end end
#timeout_workers (private)
# File 'lib/puma/cluster.rb', line 594
def timeout_workers @workers.each do |w| if !w.term? && w.ping_timeout <= Time.now details = if w.booted? "(Worker #{w.index} failed to check in within #{@options[:worker_timeout]} seconds)" else "(Worker #{w.index} failed to boot within #{@options[:worker_boot_timeout]} seconds)" end log "! Terminating timed out worker #{details}: #{w.pid}" w.kill end end end
#wait_workers (private)
loops thru @workers, removing workers that exited, and calling #term
if needed
# File 'lib/puma/cluster.rb', line 544
def wait_workers # Reap all children, known workers or otherwise. # If puma has PID 1, as it's common in containerized environments, # then it's responsible for reaping orphaned processes, so we must reap # all our dead children, regardless of whether they are workers we spawned # or some reattached processes. reaped_children = {} loop do begin pid, status = Process.wait2(-1, Process::WNOHANG) break unless pid reaped_children[pid] = status rescue Errno::ECHILD break end end @workers.reject! do |w| next false if w.pid.nil? begin # We may need to check the PID individually because: # 1. From Ruby versions 2.6 to 3.2, `Process.detach` can prevent or delay # `Process.wait2(-1)` from detecting a terminated process: https://bugs.ruby-lang.org/issues/19837. # 2. When `fork_worker` is enabled, some worker may not be direct children, # but grand children. Because of this they won't be reaped by `Process.wait2(-1)`. if reaped_children.delete(w.pid) || Process.wait(w.pid, Process::WNOHANG) true else w.term if w.term? nil end rescue Errno::ECHILD begin Process.kill(0, w.pid) # child still alive but has another parent (e.g., using fork_worker) w.term if w.term? false rescue Errno::ESRCH, Errno::EPERM true # child is already terminated end end end # Log unknown children reaped_children.each do |pid, status| log "! reaped unknown child process pid=#{pid} status=#{status}" end end
#worker(index, master)
[ GitHub ]# File 'lib/puma/cluster.rb', line 197
def worker(index, master) @workers = [] @master_read.close @suicide_pipe.close @fork_writer.close pipes = { check_pipe: @check_pipe, worker_write: @worker_write } if @options[:fork_worker] pipes[:fork_pipe] = @fork_pipe pipes[:wakeup] = @wakeup end server = start_server if preload? new_worker = Worker.new index: index, master: master, launcher: @launcher, pipes: pipes, server: server new_worker.run end
#worker_at(idx)
[ GitHub ]# File 'lib/puma/cluster.rb', line 149
def worker_at(idx) @workers.find { |w| w.index == idx } end
#workers_to_cull(diff)
[ GitHub ]# File 'lib/puma/cluster.rb', line 122
def workers_to_cull(diff) workers = @workers.sort_by(&:started_at) # In fork_worker mode, worker 0 acts as our master process. # We should avoid culling it to preserve copy-on-write memory gains. workers.reject! { |w| w.index == 0 } if @options[:fork_worker] workers[cull_start_index(diff), diff] end