123456789_123456789_123456789_123456789_123456789_

Class: Thread::Queue

Relationships & Source Files
Inherits: Object
Defined in: thread_sync.c,
thread_sync.c,
thread_sync.rb

Overview

The Queue class implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class implements all the required locking semantics.

The class implements FIFO type of queue. In a FIFO queue, the first tasks added are the first retrieved.

Example:

queue = Thread::Queue.new

producer = Thread.new do
  5.times do |i|
     sleep rand(i) # simulate expense
     queue << i
     puts "#{i} produced"
  end
end

consumer = Thread.new do
  5.times do |i|
     value = queue.pop
     sleep rand(i/2) # simulate expense
     puts "consumed #{value}"
  end
end

consumer.join

Class Method Summary

  • .new ⇒ Queue constructor

    Creates a new queue instance, optionally using the contents of an enumerable for its initial state.

Instance Attribute Summary

Instance Method Summary

Constructor Details

.newQueue .new(enumerable) ⇒ Queue

Creates a new queue instance, optionally using the contents of an enumerable for its initial state.

Example:

q = Thread::Queue.new
#=> #<Thread::Queue:0x00007ff7501110d0>
q.empty?
#=> true

q = Thread::Queue.new([1, 2, 3])
#=> #<Thread::Queue:0x00007ff7500ec500>
q.empty?
#=> false
q.pop
#=> 1
[ GitHub ]

  
# File 'thread_sync.c', line 900

static VALUE
rb_queue_initialize(int argc, VALUE *argv, VALUE self)
{
    VALUE initial;
    struct rb_queue *q = queue_ptr(self);
    if ((argc = rb_scan_args(argc, argv, "01", &initial)) == 1) {
        initial = rb_to_array(initial);
    }
    RB_OBJ_WRITE(self, &q->que, ary_buf_new());
    ccan_list_head_init(queue_waitq(q));
    if (argc == 1) {
        rb_ary_concat(q->que, initial);
    }
    return self;
}

Instance Attribute Details

#closed?Boolean (readonly)

Returns true if the queue is closed.

[ GitHub ]

  
# File 'thread_sync.c', line 981

static VALUE
rb_queue_closed_p(VALUE self)
{
    return RBOOL(queue_closed_p(self));
}

#empty?Boolean (readonly)

Returns true if the queue is empty.

[ GitHub ]

  
# File 'thread_sync.c', line 1104

static VALUE
rb_queue_empty_p(VALUE self)
{
    return RBOOL(queue_length(self, queue_ptr(self)) == 0);
}

Instance Method Details

#push(object) #enq(object) #<<(object)
Also known as: #push, #enq

Pushes the given object to the queue.

[ GitHub ]

  
# File 'thread_sync.c', line 997

static VALUE
rb_queue_push(VALUE self, VALUE obj)
{
    return queue_do_push(self, queue_ptr(self), obj);
}

#clear

Removes all objects from the queue.

[ GitHub ]

  
# File 'thread_sync.c', line 1116

static VALUE
rb_queue_clear(VALUE self)
{
    struct rb_queue *q = queue_ptr(self);

    rb_ary_clear(check_array(self, q->que));
    return self;
}

#close

Closes the queue. A closed queue cannot be re-opened.

After the call to close completes, the following are true:

  • #closed? will return true

  • close will be ignored.

  • calling enq/push/<< will raise a ::ClosedQueueError.

  • when #empty? is false, calling deq/pop/shift will return an object from the queue as usual.

  • when #empty? is true, deq(false) will not suspend the thread and will return nil. deq(true) will raise a ::ThreadError.

::ClosedQueueError is inherited from ::StopIteration, so that you can break loop block.

Example:

q = Thread::Queue.new
Thread.new{
  while e = q.deq # wait for nil to break loop
    # ...
  end
}
q.close
[ GitHub ]

  
# File 'thread_sync.c', line 960

static VALUE
rb_queue_close(VALUE self)
{
    struct rb_queue *q = queue_ptr(self);

    if (!queue_closed_p(self)) {
        FL_SET(self, QUEUE_CLOSED);

        wakeup_all(queue_waitq(q));
    }

    return self;
}

#deq(non_block = false, timeout: nil)

Alias for #pop.

[ GitHub ]

  
# File 'thread_sync.rb', line 20

alias_method :deq, :pop

#push(object) #enq(object) #<<(object)

Alias for #<<.

#length #size
Also known as: #size

Returns the length of the queue.

[ GitHub ]

  
# File 'thread_sync.c', line 1134

static VALUE
rb_queue_length(VALUE self)
{
    return LONG2NUM(queue_length(self, queue_ptr(self)));
}

#marshal_dump

This method is for internal use only.
[ GitHub ]

  
# File 'thread_sync.c', line 1544

static VALUE
undumpable(VALUE obj)
{
    rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj));
    UNREACHABLE_RETURN(Qnil);
}

#num_waiting

Returns the number of threads waiting on the queue.

[ GitHub ]

  
# File 'thread_sync.c', line 1146

static VALUE
rb_queue_num_waiting(VALUE self)
{
    struct rb_queue *q = queue_ptr(self);

    return INT2NUM(q->num_waiting);
}

#pop(non_block = false, timeout: nil) Also known as: #deq, #shift

Retrieves data from the queue.

If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn’t suspended, and ::ThreadError is raised.

If timeout seconds have passed and no data is available nil is returned. If timeout is 0 it returns immediately.

[ GitHub ]

  
# File 'thread_sync.rb', line 14

def pop(non_block = false, timeout: nil)
  if non_block && timeout
    raise ArgumentError, "can't set a timeout if non_block is enabled"
  end
  Primitive.rb_queue_pop(non_block, timeout)
end

#push(object) #enq(object) #<<(object)

Alias for #<<.

#shift(non_block = false, timeout: nil)

Alias for #pop.

[ GitHub ]

  
# File 'thread_sync.rb', line 21

alias_method :shift, :pop

#length #size

Alias for #length.