123456789_123456789_123456789_123456789_123456789_

Class: Puma::Cluster::Worker

Relationships & Source Files
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
self, Runner
Instance Chain:
self, Runner
Inherits: Puma::Runner
Defined in: lib/puma/cluster/worker.rb

Overview

This class is instantiated by the Cluster and represents a single worker process.

At the core of this class is running an instance of Server which gets created via the start_server method from the Runner class that this inherits from.

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(index:, master:, launcher:, pipes:, server: nil) ⇒ Worker

[ GitHub ]

  
# File 'lib/puma/cluster/worker.rb', line 14

def initialize(index:, master:, launcher:, pipes:, server: nil)
  super launcher, launcher.events

  @index = index
  @master = master
  @launcher = launcher
  @options = launcher.options
  @check_pipe = pipes[:check_pipe]
  @worker_write = pipes[:worker_write]
  @fork_pipe = pipes[:fork_pipe]
  @wakeup = pipes[:wakeup]
  @server = server
end

Instance Attribute Details

#index (readonly)

[ GitHub ]

  
# File 'lib/puma/cluster/worker.rb', line 12

attr_reader :index, :master

#master (readonly)

[ GitHub ]

  
# File 'lib/puma/cluster/worker.rb', line 12

attr_reader :index, :master

Instance Method Details

#run

[ GitHub ]

  
# File 'lib/puma/cluster/worker.rb', line 28

def run
  title  = "puma: cluster worker #{index}: #{master}"
  title += " [#{@options[:tag]}]" if @options[:tag] && !@options[:tag].empty?
  $0 = title

  Signal.trap "SIGINT", "IGNORE"
  Signal.trap "SIGCHLD", "DEFAULT"

  Thread.new do
    Puma.set_thread_name "worker check pipe"
    IO.select [@check_pipe]
    log "! Detected parent died, dying"
    exit! 1
  end

  # If we're not running under a Bundler context, then
  # report the info about the context we will be using
  if !ENV['BUNDLE_GEMFILE']
    if File.exist?("Gemfile")
      log "+ Gemfile in context: #{File.expand_path("Gemfile")}"
    elsif File.exist?("gems.rb")
      log "+ Gemfile in context: #{File.expand_path("gems.rb")}"
    end
  end

  # Invoke any worker boot hooks so they can get
  # things in shape before booting the app.
  @launcher.config.run_hooks :before_worker_boot, index, @launcher.events

  server = @server ||= start_server
  restart_server = Queue.new << true << false

  fork_worker = @options[:fork_worker] && index == 0

  if fork_worker
    restart_server.clear
    worker_pids = []
    Signal.trap "SIGCHLD" do
      wakeup! if worker_pids.reject! do |p|
        Process.wait(p, Process::WNOHANG) rescue true
      end
    end

    Thread.new do
      Puma.set_thread_name "worker fork pipe"
      while (idx = @fork_pipe.gets)
        idx = idx.to_i
        if idx == -1 # stop server
          if restart_server.length > 0
            restart_server.clear
            server.begin_restart(true)
            @launcher.config.run_hooks :before_refork, nil, @launcher.events
            Puma::Util.nakayoshi_gc @events if @options[:nakayoshi_fork]
          end
        elsif idx == 0 # restart server
          restart_server << true << false
        else # fork worker
          worker_pids << pid = spawn_worker(idx)
          @worker_write << "f#{pid}:#{idx}\n" rescue nil
        end
      end
    end
  end

  Signal.trap "SIGTERM" do
    @worker_write << "e#{Process.pid}\n" rescue nil
    restart_server.clear
    server.stop
    restart_server << false
  end

  begin
    @worker_write << "b#{Process.pid}:#{index}\n"
  rescue SystemCallError, IOError
    Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
    STDERR.puts "Master seems to have exited, exiting."
    return
  end

  while restart_server.pop
    server_thread = server.run
    stat_thread ||= Thread.new(@worker_write) do |io|
      Puma.set_thread_name "stat payload"
      base_payload = "p#{Process.pid}"

      while true
        begin
          b = server.backlog || 0
          r = server.running || 0
          t = server.pool_capacity || 0
          m = server.max_threads || 0
          rc = server.requests_count || 0
          payload = %Q!#{base_payload}{ "backlog":#{b}, "running":#{r}, "pool_capacity":#{t}, "max_threads": #{m}, "requests_count": #{rc} }\n!
          io << payload
        rescue IOError
          Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
          break
        end
        sleep Const::WORKER_CHECK_INTERVAL
      end
    end
    server_thread.join
  end

  # Invoke any worker shutdown hooks so they can prevent the worker
  # exiting until any background operations are completed
  @launcher.config.run_hooks :before_worker_shutdown, index, @launcher.events
ensure
  @worker_write << "t#{Process.pid}\n" rescue nil
  @worker_write.close
end

#spawn_worker(idx) (private)

[ GitHub ]

  
# File 'lib/puma/cluster/worker.rb', line 142

def spawn_worker(idx)
  @launcher.config.run_hooks :before_worker_fork, idx, @launcher.events

  pid = fork do
    new_worker = Worker.new index: idx,
                            master: master,
                            launcher: @launcher,
                            pipes: { check_pipe: @check_pipe,
                                     worker_write: @worker_write },
                            server: @server
    new_worker.run
  end

  if !pid
    log "! Complete inability to spawn new workers detected"
    log "! Seppuku is the only choice."
    exit! 1
  end

  @launcher.config.run_hooks :after_worker_fork, idx, @launcher.events
  pid
end

#wakeup! (private)

[ GitHub ]

  
# File 'lib/puma/cluster/worker.rb', line 165

def wakeup!
  return unless @wakeup

  begin
    @wakeup.write "!" unless @wakeup.closed?
  rescue SystemCallError, IOError
    Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
  end
end