Class: Puma::Cluster::Worker
Do not use. This class is for internal use only.
Relationships & Source Files | |
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
self,
Runner
|
|
Instance Chain:
self,
Runner ,
Const::PipeRequest
|
|
Inherits: |
Puma::Runner
|
Defined in: | lib/puma/cluster/worker.rb |
Overview
Constant Summary
Const::PipeRequest
- Included
PIPE_BOOT, PIPE_EXTERNAL_TERM, PIPE_FORK, PIPE_IDLE, PIPE_PING, PIPE_TERM, PIPE_WAKEUP
Class Method Summary
Instance Attribute Summary
Runner
- Inherited
#app, #development?, | |
#options | Returns the hash of configuration options. |
#redirected_io?, #ruby_engine, #test? |
Instance Method Summary
- #run
- #spawn_worker(idx) private
Runner
- Inherited
#close_control_listeners, #debug, #error, #load_and_bind, #log, #output_header, #redirect_io, #start_control, #start_server, #stop_control, #wakeup!, | |
#debug_loaded_extensions | this method call should always be guarded by |
#ensure_output_directory_exists, #stats, #utc_iso8601 |
Constructor Details
.new(index:, master:, launcher:, pipes:, server: nil) ⇒ Worker
# File 'lib/puma/cluster/worker.rb', line 17
def initialize(index:, master:, launcher:, pipes:, server: nil) super(launcher) @index = index @master = master @check_pipe = pipes[:check_pipe] @worker_write = pipes[:worker_write] @fork_pipe = pipes[:fork_pipe] @wakeup = pipes[:wakeup] @server = server @hook_data = {} end
Instance Attribute Details
#index (readonly)
[ GitHub ]# File 'lib/puma/cluster/worker.rb', line 15
attr_reader :index, :master
#master (readonly)
[ GitHub ]# File 'lib/puma/cluster/worker.rb', line 15
attr_reader :index, :master
Instance Method Details
#run
[ GitHub ]# File 'lib/puma/cluster/worker.rb', line 30
def run title = "puma: cluster worker #{index}: #{master}" title += " [#{@options[:tag]}]" if @options[:tag] && !@options[:tag].empty? $0 = title Signal.trap "SIGINT", "IGNORE" Signal.trap "SIGCHLD", "DEFAULT" Thread.new do Puma.set_thread_name "wrkr check" @check_pipe.wait_readable log "! Detected parent died, dying" exit! 1 end # If we're not running under a Bundler context, then # report the info about the context we will be using if !ENV['BUNDLE_GEMFILE'] if File.exist?("Gemfile") log "+ Gemfile in context: #{File. ("Gemfile")}" elsif File.exist?("gems.rb") log "+ Gemfile in context: #{File. ("gems.rb")}" end end # Invoke any worker boot hooks so they can get # things in shape before booting the app. @config.run_hooks(:before_worker_boot, index, @log_writer, @hook_data) begin server = @server ||= start_server rescue Exception => e log "! Unable to start worker" log e log e.backtrace.join("\n ") exit 1 end restart_server = Queue.new << true << false fork_worker = @options[:fork_worker] && index == 0 if fork_worker restart_server.clear worker_pids = [] Signal.trap "SIGCHLD" do wakeup! if worker_pids.reject! do |p| Process.wait(p, Process::WNOHANG) rescue true end end Thread.new do Puma.set_thread_name "wrkr fork" while (idx = @fork_pipe.gets) idx = idx.to_i if idx == -1 # stop server if restart_server.length > 0 restart_server.clear server.begin_restart(true) @config.run_hooks(:before_refork, nil, @log_writer, @hook_data) end elsif idx == -2 # refork cycle is done @config.run_hooks(:after_refork, nil, @log_writer, @hook_data) elsif idx == 0 # restart server restart_server << true << false else # fork worker worker_pids << pid = spawn_worker(idx) @worker_write << "#{PIPE_FORK}#{pid}:#{idx}\n" rescue nil end end end end Signal.trap "SIGTERM" do @worker_write << "#{PIPE_EXTERNAL_TERM}#{Process.pid}\n" rescue nil restart_server.clear server.stop restart_server << false end begin @worker_write << "#{PIPE_BOOT}#{Process.pid}:#{index}\n" rescue SystemCallError, IOError Puma::Util.purge_interrupt_queue STDERR.puts "Master seems to have exited, exiting." return end while restart_server.pop server_thread = server.run if @log_writer.debug? && index == 0 debug_loaded_extensions "Loaded Extensions - worker 0:" end stat_thread ||= Thread.new(@worker_write) do |io| Puma.set_thread_name "stat pld" base_payload = "p#{Process.pid}" while true begin b = server.backlog || 0 r = server.running || 0 t = server.pool_capacity || 0 m = server.max_threads || 0 rc = server.requests_count || 0 bt = server.busy_threads || 0 payload = %Q!#{base_payload}{ "backlog":#{b}, "running":#{r}, "pool_capacity":#{t}, "max_threads":#{m}, "requests_count":#{rc}, "busy_threads":#{bt} }\n! io << payload rescue IOError Puma::Util.purge_interrupt_queue break end sleep @options[:worker_check_interval] end end server_thread.join end # Invoke any worker shutdown hooks so they can prevent the worker # exiting until any background operations are completed @config.run_hooks(:before_worker_shutdown, index, @log_writer, @hook_data) ensure @worker_write << "#{PIPE_TERM}#{Process.pid}\n" rescue nil @worker_write.close end
#spawn_worker(idx) (private)
[ GitHub ]# File 'lib/puma/cluster/worker.rb', line 159
def spawn_worker(idx) @config.run_hooks(:before_worker_fork, idx, @log_writer, @hook_data) pid = fork do new_worker = Worker.new index: idx, master: master, launcher: @launcher, pipes: { check_pipe: @check_pipe, worker_write: @worker_write }, server: @server new_worker.run end if !pid log "! Complete inability to spawn new workers detected" log "! Seppuku is the only choice." exit! 1 end @config.run_hooks(:after_worker_fork, idx, @log_writer, @hook_data) pid end