123456789_123456789_123456789_123456789_123456789_

Class: EventMachine::Queue

Relationships & Source Files
Inherits: Object
Defined in: lib/em/queue.rb

Overview

A cross thread, reactor scheduled, linear queue.

This class provides a simple queue abstraction on top of the reactor scheduler. It services two primary purposes:

  • API sugar for stateful protocols
  • Pushing processing onto the reactor thread

Examples:

q = EM::Queue.new
q.push('one', 'two', 'three')
3.times do
  q.pop { |msg| puts(msg) }
end

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Constructor Details

.newQueue

[ GitHub ]

  
# File 'lib/em/queue.rb', line 19

def initialize
  @sink  = []
  @drain = []
  @popq  = []
end

Instance Attribute Details

#empty?Boolean (readonly)

Note:

This is a peek, it's not thread safe, and may only tend toward accuracy.

[ GitHub ]

  
# File 'lib/em/queue.rb', line 63

def empty?
  @drain.empty? && @sink.empty?
end

Instance Method Details

#<<(*items)

Alias for #push.

[ GitHub ]

  
# File 'lib/em/queue.rb', line 59

alias :<< :push

#num_waitingInteger

Note:

This is a peek at the number of jobs that are currently waiting on the Queue

Returns:

  • (Integer)

    Waiting

[ GitHub ]

  
# File 'lib/em/queue.rb', line 75

def num_waiting
  @popq.size
end

#pop(*a, &b) ⇒ NilClass

Pop items off the queue, running the block on the reactor thread. The pop will not happen immediately, but at some point in the future, either in the next tick, if the queue has data, or when the queue is populated.

Returns:

  • (NilClass)

[ GitHub ]

  
# File 'lib/em/queue.rb', line 30

def pop(*a, &b)
  cb = EM::Callback(*a, &b)
  EM.schedule do
    if @drain.empty?
      @drain = @sink
      @sink = []
    end
    if @drain.empty?
      @popq << cb
    else
      cb.call @drain.shift
    end
  end
  nil # Always returns nil
end

#push(*items) Also known as: #<<

Push items onto the queue in the reactor thread. The items will not appear in the queue immediately, but will be scheduled for addition during the next reactor tick.

[ GitHub ]

  
# File 'lib/em/queue.rb', line 49

def push(*items)
  EM.schedule do
    @sink.push(*items)
    unless @popq.empty?
      @drain = @sink
      @sink = []
      @popq.shift.call @drain.shift until @drain.empty? || @popq.empty?
    end
  end
end

#sizeInteger

Note:

This is a peek, it's not thread safe, and may only tend toward accuracy.

Returns:

  • (Integer)

    Queue size

[ GitHub ]

  
# File 'lib/em/queue.rb', line 69

def size
  @drain.size + @sink.size
end