Class: Concurrent::Channel::Buffer::Unbuffered
Relationships & Source Files | |
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
|
|
Instance Chain:
|
|
Inherits: |
Concurrent::Channel::Buffer::Base
|
Defined in: | lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb |
Overview
A blocking buffer with a size of zero. An item can only be put onto the buffer when a thread is waiting to take. Similarly, an item can only be put onto the buffer when a thread is waiting to put. When either #put or #take is called and there is no corresponding call in progress, the call will block indefinitely. Any other calls to the same method will queue behind the first call and block as well. As soon as a corresponding put/take call is made an exchange will occur and the first blocked call will return.
Class Method Summary
Instance Attribute Summary
-
#empty? ⇒ Boolean
readonly
Predicate indicating if the buffer is empty.
-
#full? ⇒ Boolean
readonly
Predicate indicating if the buffer is full.
Base
- Inherited
#blocking? | Predicate indicating if this buffer will block #put operations once it reaches its maximum capacity. |
#capacity | The maximum number of values which can be #put onto the buffer it becomes full. |
#closed? | Predicate indicating is this buffer closed. |
#empty? | Predicate indicating if the buffer is empty. |
#full? | Predicate indicating if the buffer is full. |
#size | The number of items currently in the buffer. |
#buffer, #buffer=, #capacity=, #closed=, | |
#ns_closed? | Predicate indicating is this buffer closed. |
#ns_empty? | Predicate indicating if the buffer is empty. |
#ns_full? | Predicate indicating if the buffer is full. |
#size= |
Instance Method Summary
-
#next ⇒ Object, Boolean
Take the next “item” from the buffer and also return a boolean indicating if “more” items can be taken.
-
#offer(item) ⇒ Boolean
Put an item onto the buffer if possible.
-
#poll ⇒ Object
Take the next item from the buffer if one is available else return immediately.
-
#put(item) ⇒ Boolean
Put an item onto the buffer if possible.
-
#size
The number of items currently in the buffer.
-
#take ⇒ Object
Take an item from the buffer if one is available.
-
#ns_initialize
private
Creates a new buffer.
- #putting private
- #taking private
Base
- Inherited
#close | Close the buffer, preventing new items from being added. |
#next | Take the next “item” from the buffer and also return a boolean indicating if “more” items can be taken. |
#offer | Put an item onto the buffer if possible. |
#poll | Take the next item from the buffer if one is available else return immediately. |
#put | Put an item onto the buffer if possible. |
#take | Take an item from the buffer if one is available. |
#ns_initialize, | |
#ns_size | The number of items currently in the buffer. |
Synchronization::LockableObject
- Inherited
Constructor Details
This class inherits a constructor from Concurrent::Channel::Buffer::Base
Instance Attribute Details
#empty? ⇒ Boolean
(readonly)
Predicate indicating if the buffer is empty.
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 27
def empty? size == 0 end
#full? ⇒ Boolean
(readonly)
Predicate indicating if the buffer is full.
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 32
def full? !empty? end
Instance Method Details
#next ⇒ Object, Boolean
Take the next “item” from the buffer and also return a boolean indicating if “more” items can be taken. Used for iterating over a buffer until it is closed and empty.
If the buffer is open but no items remain the calling thread will block until an item is available. The second of the two return values, “more” (a boolean), will always be true
when the buffer is open. The “more” value will be false
when the channel has been closed and all values have already been received. When “more” is false the returned item will be NULL.
Note that when multiple threads access the same channel a race condition can occur when using this method. A call to next
from one thread may return true
for the second return value, but another thread may #take the last value before the original thread makes another call. Code which iterates over a channel must be programmed to properly handle these race conditions.
Items can only be taken from the buffer when one or more threads are waiting to #put items onto the buffer. This method exhibits the same blocking behavior as #take.
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 135
def next item = take more = (item != Concurrent::NULL) return item, more end
#ns_initialize (private)
Creates a new buffer.
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 148
def ns_initialize # one will always be empty @putting = [] @taking = [] self.closed = false self.capacity = 1 end
#offer(item) ⇒ Boolean
Put an item onto the buffer if possible. If the buffer is open but unable to add an item, probably due to being full, the method will return immediately. Similarly, the method will return immediately when the buffer is closed. A return value of false
does not necessarily indicate that the buffer is closed, just that the item could not be added.
Items can only be put onto the buffer when one or more threads are waiting to #take items off the buffer. When there is a thread waiting to take an item this method will give its item and return true
immediately. When there are no threads waiting to take or the buffer is closed, this method will return false
immediately.
#poll ⇒ Object
Take the next item from the buffer if one is available else return immediately. Failing to return a value does not necessarily indicate that the buffer is closed, just that it is empty.
Items can only be taken off the buffer when one or more threads are waiting to #put items onto the buffer. When there is a thread waiting to put an item this method will take the item and return it immediately. When there are no threads waiting to put or the buffer is closed, this method will return NULL immediately.
#put(item) ⇒ Boolean
Put an item onto the buffer if possible. If the buffer is open but not able to accept the item the calling thread will block until the item can be put onto the buffer.
Items can only be put onto the buffer when one or more threads are waiting to #take items off the buffer. When there is a thread waiting to take an item this method will give its item and return immediately. When there are no threads waiting to take, this method will block. As soon as a thread calls #take the exchange will occur and this method will return.
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 44
def put(item) mine = synchronize do return false if ns_closed? ref = Concurrent::AtomicReference.new(item) if taking.empty? putting.push(ref) else taken = taking.shift taken.value = item ref.value = nil end ref end loop do return true if mine.value.nil? Thread.pass end end
#putting (private)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 143
def putting() @putting; end
#size
The number of items currently in the buffer.
#take ⇒ Object
Take an item from the buffer if one is available. If the buffer is open and no item is available the calling thread will block until an item is available. If the buffer is closed but items are available the remaining items can still be taken. Once the buffer closes, no remaining items can be taken.
Items can only be taken from the buffer when one or more threads are waiting to #put items onto the buffer. When there is a thread waiting to put an item this method will take that item and return it immediately. When there are no threads waiting to put, this method will block. As soon as a thread calls pur
the exchange will occur and this method will return.
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 89
def take mine = synchronize do return Concurrent::NULL if ns_closed? && putting.empty? ref = Concurrent::AtomicReference.new(nil) if putting.empty? taking.push(ref) else put = putting.shift ref.value = put.value put.value = nil end ref end loop do item = mine.value return item if item Thread.pass end end
#taking (private)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 145
def taking() @taking; end