123456789_123456789_123456789_123456789_123456789_

Class: Thread::Queue

Relationships & Source Files
Extension / Inclusion / Inheritance Descendants
Subclasses:
Inherits: Object
Defined in: thread_sync.rb

Overview

The Queue class implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class implements all the required locking semantics.

The class implements FIFO (first in, first out) type of queue. In a FIFO queue, the first tasks added are the first retrieved.

Example:

queue = Thread::Queue.new

    producer = Thread.new do
      5.times do |i|
        sleep rand(i) # simulate expense
        queue << i
        puts "#{i} produced"
      end
    end

    consumer = Thread.new do
      5.times do |i|
        value = queue.pop
        sleep rand(i/2) # simulate expense
        puts "consumed #{value}"
      end
    end

consumer.join

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Constructor Details

.newQueue .new(enumerable) ⇒ Queue

Document-method: .new

Creates a new queue instance, optionally using the contents of an enumerable for its initial state.

Example:

q = Thread::Queue.new
#=> #<Thread::Queue:0x00007ff7501110d0>
q.empty?
#=> true

q = Thread::Queue.new([1, 2, 3])
#=> #<Thread::Queue:0x00007ff7500ec500>
q.empty?
#=> false
q.pop
#=> 1
[ GitHub ]

  
# File 'thread_sync.rb', line 56

def initialize(enumerable = nil)
  Primitive.queue_initialize(enumerable)
end

Instance Attribute Details

#closed?Boolean (readonly)

Returns true if the queue is closed.

[ GitHub ]

  
# File 'thread_sync.rb', line 138

def closed?
  Primitive.cexpr!('RBOOL(FL_TEST_RAW(self, QUEUE_CLOSED))')
end

#empty?Boolean (readonly)

Returns true if the queue is empty.

[ GitHub ]

  
# File 'thread_sync.rb', line 155

def empty?
  Primitive.cexpr!('RBOOL(queue_ptr(self)->len == 0)')
end

Instance Method Details

#<<(object)

Alias for #push.

[ GitHub ]

  
# File 'thread_sync.rb', line 92

alias_method :<<, :push

#clear

Removes all objects from the queue.

[ GitHub ]

  
# File 'thread_sync.rb', line 160

def clear
  Primitive.cstmt! %{
    queue_clear(queue_ptr(self));
    return self;
  }
end

#close

Closes the queue. A closed queue cannot be re-opened.

After the call to close completes, the following are true:

  • #closed? will return true

  • close will be ignored.

  • calling enq/push/<< will raise a ::ClosedQueueError.

  • when #empty? is false, calling deq/pop/shift will return an object from the queue as usual.

  • when #empty? is true, deq(false) will not suspend the thread and will return nil. deq(true) will raise a ::ThreadError.

::ClosedQueueError is inherited from ::StopIteration, so that you can break loop block.

Example:

q = Thread::Queue.new
 Thread.new{
   while e = q.deq # wait for nil to break loop
     # ...
   end
 }
 q.close
[ GitHub ]

  
# File 'thread_sync.rb', line 123

def close
  Primitive.cstmt! %{
    if (!queue_closed_p(self)) {
        FL_SET_RAW(self, QUEUE_CLOSED);

        wakeup_all(&queue_ptr(self)->waitq);
    }

    return self;
  }
end

#deq(non_block = false, timeout: nil)

Alias for #pop.

[ GitHub ]

  
# File 'thread_sync.rb', line 77

alias_method :deq, :pop

#enq(object)

Alias for #push.

[ GitHub ]

  
# File 'thread_sync.rb', line 91

alias_method :enq, :push

#freeze

The queue can’t be frozen, so this method raises an exception:

Thread::Queue.new.freeze # Raises TypeError (cannot freeze #<Thread::Queue:0x...>)

Raises:

[ GitHub ]

  
# File 'thread_sync.rb', line 184

def freeze
  raise TypeError, "cannot freeze #{self}"
end

#length #size
Also known as: #size

Returns the length of the queue.

[ GitHub ]

  
# File 'thread_sync.rb', line 147

def length
  Primitive.cexpr!('LONG2NUM(queue_ptr(self)->len)')
end

#marshal_dump

This method is for internal use only.

Raises:

[ GitHub ]

  
# File 'thread_sync.rb', line 175

def marshal_dump # :nodoc:
  raise TypeError, "can't dump #{self.class}"
end

#num_waiting

Returns the number of threads waiting on the queue.

[ GitHub ]

  
# File 'thread_sync.rb', line 171

def num_waiting
  Primitive.cexpr!('INT2NUM(queue_ptr(self)->num_waiting)')
end

#pop(non_block = false, timeout: nil) Also known as: #deq, #shift

Retrieves data from the queue.

If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn’t suspended, and ::ThreadError is raised.

If timeout seconds have passed and no data is available nil is returned. If timeout is 0 it returns immediately.

[ GitHub ]

  
# File 'thread_sync.rb', line 71

def pop(non_block = false, timeout: nil)
  if non_block && timeout
    raise ArgumentError, "can't set a timeout if non_block is enabled"
  end
  Primitive.rb_queue_pop(non_block, timeout)
end

#push(object) #enq(object) #<<(object)
Also known as: #enq, #<<

Pushes the given object to the queue.

[ GitHub ]

  
# File 'thread_sync.rb', line 88

def push(object)
  Primitive.cexpr!('queue_do_push(self, queue_ptr(self), object)')
end

#shift(non_block = false, timeout: nil)

Alias for #pop.

[ GitHub ]

  
# File 'thread_sync.rb', line 78

alias_method :shift, :pop

#size

Alias for #length.

[ GitHub ]

  
# File 'thread_sync.rb', line 150

alias_method :size, :length