Class: Thread::Queue
| Relationships & Source Files | |
| Extension / Inclusion / Inheritance Descendants | |
|
Subclasses:
|
|
| Inherits: | Object |
| Defined in: | thread_sync.rb |
Overview
The Queue class implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class implements all the required locking semantics.
The class implements FIFO (first in, first out) type of queue. In a FIFO queue, the first tasks added are the first retrieved.
Example:
queue = Thread::Queue.new
producer = Thread.new do
5.times do |i|
sleep rand(i) # simulate expense
queue << i
puts "#{i} produced"
end
end
consumer = Thread.new do
5.times do |i|
value = queue.pop
sleep rand(i/2) # simulate expense
puts "consumed #{value}"
end
end
consumer.join
Class Method Summary
-
.new ⇒ Queue
constructor
Document-method: .new
Instance Attribute Summary
-
#closed? ⇒ Boolean
readonly
Returns
trueif the queue is closed. -
#empty? ⇒ Boolean
readonly
Returns
trueif the queue is empty.
Instance Method Summary
-
#<<(object)
Alias for #push.
-
#clear
Removes all objects from the queue.
-
#close
Closes the queue.
-
#deq(non_block = false, timeout: nil)
Alias for #pop.
-
#enq(object)
Alias for #push.
-
#freeze
The queue can’t be frozen, so this method raises an exception:
-
#length
(also: #size)
Returns the length of the queue.
-
#num_waiting
Returns the number of threads waiting on the queue.
-
#pop(non_block = false, timeout: nil)
(also: #deq, #shift)
Retrieves data from the queue.
-
#push(object)
(also: #enq, #<<)
Pushes the given
objectto the queue. -
#shift(non_block = false, timeout: nil)
Alias for #pop.
-
#size
Alias for #length.
- #marshal_dump Internal use only
Constructor Details
.new ⇒ Queue
.new(enumerable) ⇒ Queue
Queue
.new(enumerable) ⇒ Queue
Document-method: .new
Creates a new queue instance, optionally using the contents of an enumerable for its initial state.
Example:
q = Thread::Queue.new
#=> #<Thread::Queue:0x00007ff7501110d0>
q.empty?
#=> true
q = Thread::Queue.new([1, 2, 3])
#=> #<Thread::Queue:0x00007ff7500ec500>
q.empty?
#=> false
q.pop
#=> 1
# File 'thread_sync.rb', line 56
def initialize(enumerable = nil) Primitive.queue_initialize(enumerable) end
Instance Attribute Details
#closed? ⇒ Boolean (readonly)
Returns true if the queue is closed.
# File 'thread_sync.rb', line 138
def closed? Primitive.cexpr!('RBOOL(FL_TEST_RAW(self, QUEUE_CLOSED))') end
#empty? ⇒ Boolean (readonly)
Returns true if the queue is empty.
# File 'thread_sync.rb', line 155
def empty? Primitive.cexpr!('RBOOL(queue_ptr(self)->len == 0)') end
Instance Method Details
#<<(object)
Alias for #push.
# File 'thread_sync.rb', line 92
alias_method :<<, :push
#clear
Removes all objects from the queue.
# File 'thread_sync.rb', line 160
def clear Primitive.cstmt! %{ queue_clear(queue_ptr(self)); return self; } end
#close
Closes the queue. A closed queue cannot be re-opened.
After the call to close completes, the following are true:
-
#closed? will return true
-
closewill be ignored. -
calling enq/push/<< will raise a
::ClosedQueueError. -
when #empty? is false, calling deq/pop/shift will return an object from the queue as usual.
-
when #empty? is true, deq(false) will not suspend the thread and will return nil. deq(true) will raise a
::ThreadError.
::ClosedQueueError is inherited from ::StopIteration, so that you can break loop block.
Example:
q = Thread::Queue.new
Thread.new{
while e = q.deq # wait for nil to break loop
# ...
end
}
q.close
# File 'thread_sync.rb', line 123
def close Primitive.cstmt! %{ if (!queue_closed_p(self)) { FL_SET_RAW(self, QUEUE_CLOSED); wakeup_all(&queue_ptr(self)->waitq); } return self; } end
#deq(non_block = false, timeout: nil)
Alias for #pop.
# File 'thread_sync.rb', line 77
alias_method :deq, :pop
#enq(object)
Alias for #push.
# File 'thread_sync.rb', line 91
alias_method :enq, :push
#freeze
#length
#size
Also known as: #size
Returns the length of the queue.
# File 'thread_sync.rb', line 147
def length Primitive.cexpr!('LONG2NUM(queue_ptr(self)->len)') end
#marshal_dump
#num_waiting
Returns the number of threads waiting on the queue.
# File 'thread_sync.rb', line 171
def num_waiting Primitive.cexpr!('INT2NUM(queue_ptr(self)->num_waiting)') end
#pop(non_block = false, timeout: nil) Also known as: #deq, #shift
Retrieves data from the queue.
If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn’t suspended, and ::ThreadError is raised.
If timeout seconds have passed and no data is available nil is returned. If timeout is 0 it returns immediately.
# File 'thread_sync.rb', line 71
def pop(non_block = false, timeout: nil) if non_block && timeout raise ArgumentError, "can't set a timeout if non_block is enabled" end Primitive.rb_queue_pop(non_block, timeout) end
#push(object)
#enq(object)
#<<(object)
Also known as: #enq, #<<
Pushes the given object to the queue.
# File 'thread_sync.rb', line 88
def push(object) Primitive.cexpr!('queue_do_push(self, queue_ptr(self), object)') end
#shift(non_block = false, timeout: nil)
Alias for #pop.
# File 'thread_sync.rb', line 78
alias_method :shift, :pop
#size
Alias for #length.
# File 'thread_sync.rb', line 150
alias_method :size, :length