Class: Thread::Queue
| Relationships & Source Files | |
| Inherits: | Object | 
| Defined in: | thread_sync.c, thread_sync.c, thread_sync.rb | 
Overview
The Queue class implements multi-producer, multi-consumer queues.  It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class implements all the required locking semantics.
The class implements FIFO (first in, first out) type of queue. In a FIFO queue, the first tasks added are the first retrieved.
Example:
queue = Thread::Queue.new
producer = Thread.new do
  5.times do |i|
    sleep rand(i) # simulate expense
    queue << i
    puts "#{i} produced"
  end
end
consumer = Thread.new do
  5.times do |i|
    value = queue.pop
    sleep rand(i/2) # simulate expense
    puts "consumed #{value}"
  end
end
consumer.joinClass Method Summary
- 
    
      .new  ⇒ Queue 
    
    constructor
    Creates a new queue instance, optionally using the contents of an enumerablefor its initial state.
Instance Attribute Summary
- 
    
      #closed?  ⇒ Boolean 
    
    readonly
    Returns trueif the queue is closed.
- 
    
      #empty?  ⇒ Boolean 
    
    readonly
    Returns trueif the queue is empty.
Instance Method Summary
- 
    
      #<<(object)  
      (also: #push, #enq)
    
    Pushes the given objectto the queue.
- 
    
      #clear  
    
    Removes all objects from the queue. 
- 
    
      #close  
    
    Closes the queue. 
- 
    
      #deq(non_block = false, timeout: nil)  
    
    Alias for #pop. 
- 
    
      #enq(object)  
    
    Alias for #<<. 
- 
    
      #freeze  
    
    The queue can’t be frozen, so this method raises an exception: 
- 
    
      #length  
      (also: #size)
    
    Returns the length of the queue. 
- 
    
      #num_waiting  
    
    Returns the number of threads waiting on the queue. 
- 
    
      #pop(non_block = false, timeout: nil)  
      (also: #deq, #shift)
    
    Retrieves data from the queue. 
- 
    
      #push(object)  
    
    Alias for #<<. 
- 
    
      #shift(non_block = false, timeout: nil)  
    
    Alias for #pop. 
- 
    
      #size  
    
    Alias for #length. 
- #marshal_dump Internal use only
Constructor Details
    
      .new  ⇒ Queue 
      .new(enumerable)  ⇒ Queue 
    
  
Queue 
      .new(enumerable)  ⇒ Queue 
    # File 'thread_sync.c', line 970
static VALUE
rb_queue_initialize(int argc, VALUE *argv, VALUE self)
{
    VALUE initial;
    struct rb_queue *q = queue_ptr(self);
    if ((argc = rb_scan_args(argc, argv, "01", &initial)) == 1) {
        initial = rb_to_array(initial);
    }
    RB_OBJ_WRITE(self, queue_list(q), ary_buf_new());
    ccan_list_head_init(queue_waitq(q));
    if (argc == 1) {
        rb_ary_concat(q->que, initial);
    }
    return self;
}
  Instance Attribute Details
    #closed?  ⇒ Boolean  (readonly)  
Returns true if the queue is closed.
# File 'thread_sync.c', line 1051
static VALUE
rb_queue_closed_p(VALUE self)
{
    return RBOOL(queue_closed_p(self));
}
  
    #empty?  ⇒ Boolean  (readonly)  
Returns true if the queue is empty.
# File 'thread_sync.c', line 1174
static VALUE
rb_queue_empty_p(VALUE self)
{
    return RBOOL(queue_length(self, queue_ptr(self)) == 0);
}
  Instance Method Details
    
      #push(object)  
      #enq(object)  
      #<<(object)  
    
    Also known as: #push, #enq
  
Pushes the given object to the queue.
# File 'thread_sync.c', line 1067
static VALUE
rb_queue_push(VALUE self, VALUE obj)
{
    return queue_do_push(self, queue_ptr(self), obj);
}
  #clear
Removes all objects from the queue.
# File 'thread_sync.c', line 1186
static VALUE
rb_queue_clear(VALUE self)
{
    struct rb_queue *q = queue_ptr(self);
    rb_ary_clear(check_array(self, q->que));
    return self;
}
  #close
Closes the queue. A closed queue cannot be re-opened.
After the call to close completes, the following are true:
- 
#closed? will return true 
- 
closewill be ignored.
- 
calling enq/push/<< will raise a ::ClosedQueueError.
- 
when #empty? is false, calling deq/pop/shift will return an object from the queue as usual. 
- 
when #empty? is true, deq(false) will not suspend the thread and will return nil. deq(true) will raise a ::ThreadError.
::ClosedQueueError is inherited from ::StopIteration, so that you can break loop block.
Example:
q = Thread::Queue.new
Thread.new{
  while e = q.deq # wait for nil to break loop
    # ...
  end
}
q.close# File 'thread_sync.c', line 1030
static VALUE
rb_queue_close(VALUE self)
{
    struct rb_queue *q = queue_ptr(self);
    if (!queue_closed_p(self)) {
        FL_SET(self, QUEUE_CLOSED);
        wakeup_all(queue_waitq(q));
    }
    return self;
}
  #deq(non_block = false, timeout: nil)
Alias for #pop.
# File 'thread_sync.rb', line 20
alias_method :deq, :pop
    
      #push(object)  
      #enq(object)  
      #<<(object)  
    
  
Alias for #<<.
#freeze
# File 'thread_sync.c', line 1219
static VALUE
rb_queue_freeze(VALUE self)
{
    rb_raise(rb_eTypeError, "cannot freeze " "%+"PRIsVALUE, self);
    UNREACHABLE_RETURN(self);
}
  
    
      #length  
      #size  
    
    Also known as: #size
  
Returns the length of the queue.
# File 'thread_sync.c', line 1204
static VALUE
rb_queue_length(VALUE self)
{
    return LONG2NUM(queue_length(self, queue_ptr(self)));
}
  #marshal_dump
# File 'thread_sync.c', line 1630
static VALUE
undumpable(VALUE obj)
{
    rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj));
    UNREACHABLE_RETURN(Qnil);
}
  #num_waiting
Returns the number of threads waiting on the queue.
# File 'thread_sync.c', line 1232
static VALUE
rb_queue_num_waiting(VALUE self)
{
    struct rb_queue *q = queue_ptr(self);
    return INT2NUM(q->num_waiting);
}
  #pop(non_block = false, timeout: nil) Also known as: #deq, #shift
Retrieves data from the queue.
If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn’t suspended, and ::ThreadError is raised.
If timeout seconds have passed and no data is available nil is returned. If timeout is 0 it returns immediately.
# File 'thread_sync.rb', line 14
def pop(non_block = false, timeout: nil) if non_block && timeout raise ArgumentError, "can't set a timeout if non_block is enabled" end Primitive.rb_queue_pop(non_block, timeout) end
    
      #push(object)  
      #enq(object)  
      #<<(object)  
    
  
Alias for #<<.
#shift(non_block = false, timeout: nil)
Alias for #pop.
# File 'thread_sync.rb', line 21
alias_method :shift, :pop
    
      #length  
      #size  
    
  
Alias for #length.