Class: ActionCable::Connection::StreamEventLoop
Relationships & Source Files | |
Inherits: | Object |
Defined in: | actioncable/lib/action_cable/connection/stream_event_loop.rb |
Class Method Summary
- .new ⇒ StreamEventLoop constructor
Instance Method Summary
- #attach(io, stream)
- #detach(io, stream)
- #post(task = nil, &block)
- #stop
- #timer(interval, &block)
- #writes_pending(io)
- #run private
- #spawn private
- #wakeup private
Constructor Details
.new ⇒ StreamEventLoop
# File 'actioncable/lib/action_cable/connection/stream_event_loop.rb', line 10
def initialize @nio = @executor = @thread = nil @map = {} @stopping = false @todo = Queue.new @spawn_mutex = Mutex.new end
Instance Method Details
#attach(io, stream)
[ GitHub ]# File 'actioncable/lib/action_cable/connection/stream_event_loop.rb', line 30
def attach(io, stream) @todo << lambda do @map[io] = @nio.register(io, :r) @map[io].value = stream end wakeup end
#detach(io, stream)
[ GitHub ]# File 'actioncable/lib/action_cable/connection/stream_event_loop.rb', line 38
def detach(io, stream) @todo << lambda do @nio.deregister io @map.delete io io.close end wakeup end
#post(task = nil, &block)
[ GitHub ]# File 'actioncable/lib/action_cable/connection/stream_event_loop.rb', line 23
def post(task = nil, &block) task ||= block spawn @executor << task end
#run (private)
[ GitHub ]# File 'actioncable/lib/action_cable/connection/stream_event_loop.rb', line 86
def run loop do if @stopping @nio.close break end until @todo.empty? @todo.pop(true).call end next unless monitors = @nio.select monitors.each do |monitor| io = monitor.io stream = monitor.value begin if monitor.writable? if stream.flush_write_buffer monitor.interests = :r end next unless monitor.readable? end incoming = io.read_nonblock(4096, exception: false) case incoming when :wait_readable next when nil stream.close else stream.receive incoming end rescue # We expect one of EOFError or Errno::ECONNRESET in normal operation (when the # client goes away). But if anything else goes wrong, this is still the best way # to handle it. begin stream.close rescue @nio.deregister io @map.delete io end end end end end
#spawn (private)
[ GitHub ]# File 'actioncable/lib/action_cable/connection/stream_event_loop.rb', line 62
def spawn return if @thread && @thread.status @spawn_mutex.synchronize do return if @thread && @thread.status @nio ||= NIO::Selector.new @executor ||= Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: 10, max_queue: 0, ) @thread = Thread.new { run } return true end end
#stop
[ GitHub ]# File 'actioncable/lib/action_cable/connection/stream_event_loop.rb', line 56
def stop @stopping = true wakeup if @nio end
#timer(interval, &block)
[ GitHub ]# File 'actioncable/lib/action_cable/connection/stream_event_loop.rb', line 19
def timer(interval, &block) Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute) end
#wakeup (private)
[ GitHub ]# File 'actioncable/lib/action_cable/connection/stream_event_loop.rb', line 82
def wakeup spawn || @nio.wakeup end
#writes_pending(io)
[ GitHub ]# File 'actioncable/lib/action_cable/connection/stream_event_loop.rb', line 47
def writes_pending(io) @todo << lambda do if monitor = @map[io] monitor.interests = :rw end end wakeup end