123456789_123456789_123456789_123456789_123456789_

Class: Puma::Reactor

Relationships & Source Files
Inherits: Object
Defined in: lib/puma/reactor.rb

Overview

Monitors a collection of ::IO objects, calling a block whenever any monitored object either receives data or times out, or when the Reactor shuts down.

The waiting/wake up is performed with nio4r, which will use the appropriate backend (libev, Java NIO or just plain IO#select). The call to NIO::Selector#select will 'wakeup' any ::IO object that receives data.

This class additionally tracks a timeout for every added object, and wakes up any object when its timeout elapses.

The implementation uses a Queue to synchronize adding new objects from the internal select loop.

Class Method Summary

Instance Method Summary

Constructor Details

.new(&block) ⇒ Reactor

Create a new Reactor to monitor ::IO objects added by #add. The provided block will be invoked when an ::IO has data available to read, its timeout elapses, or when the Reactor shuts down.

[ GitHub ]

  
# File 'lib/puma/reactor.rb', line 21

def initialize(&block)
  require 'nio'
  @selector = NIO::Selector.new
  @input = Queue.new
  @timeouts = []
  @block = block
end

Instance Method Details

#add(client)

Add a new client to monitor. The object must respond to #timeout and #timeout_at. Returns false if the reactor is already shut down.

[ GitHub ]

  
# File 'lib/puma/reactor.rb', line 44

def add(client)
  @input << client
  @selector.wakeup
  true
rescue ClosedQueueError
  false
end

#register(client) (private)

Start monitoring the object.

[ GitHub ]

  
# File 'lib/puma/reactor.rb', line 95

def register(client)
  @selector.register(client.to_io, :r).value = client
  @timeouts << client
rescue ArgumentError
  # unreadable clients raise error when processed by NIO
end

#run(background = true)

Run the internal select loop, using a background thread by default.

[ GitHub ]

  
# File 'lib/puma/reactor.rb', line 30

def run(background=true)
  if background
    @thread = Thread.new do
      Puma.set_thread_name "reactor"
      select_loop
    end
  else
    select_loop
  end
end

#select_loop (private)

[ GitHub ]

  
# File 'lib/puma/reactor.rb', line 64

def select_loop
  begin
    until @input.closed? && @input.empty?
      # Wakeup any registered object that receives incoming data.
      # Block until the earliest timeout or Selector#wakeup is called.
      timeout = (earliest = @timeouts.first) && earliest.timeout
      @selector.select(timeout) {|mon| wakeup!(mon.value)}

      # Wakeup all objects that timed out.
      timed_out = @timeouts.take_while {|t| t.timeout == 0}
      timed_out.each(&method(:wakeup!))

      unless @input.empty?
        until @input.empty?
          client = @input.pop
          register(client) if client.io_ok?
        end
        @timeouts.sort_by!(&:timeout_at)
      end
    end
  rescue StandardError => e
    STDERR.puts "Error in reactor loop escaped: #{e.message} (#{e.class})"
    STDERR.puts e.backtrace
    retry
  end
  # Wakeup all remaining objects on shutdown.
  @timeouts.each(&@block)
  @selector.close
end

#shutdown

Shutdown the reactor, blocking until the background thread is finished.

[ GitHub ]

  
# File 'lib/puma/reactor.rb', line 53

def shutdown
  @input.close
  begin
    @selector.wakeup
  rescue IOError # Ignore if selector is already closed
  end
  @thread.join if @thread
end

#wakeup!(client) (private)

'Wake up' a monitored object by calling the provided block. Stop monitoring the object if the block returns true.

[ GitHub ]

  
# File 'lib/puma/reactor.rb', line 104

def wakeup!(client)
  if @block.call client
    @selector.deregister client.to_io
    @timeouts.delete client
  end
end