Class: Concurrent::Channel::Buffer::Buffered
Relationships & Source Files | |
Extension / Inclusion / Inheritance Descendants | |
Subclasses:
|
|
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
|
|
Instance Chain:
|
|
Inherits: |
Concurrent::Channel::Buffer::Base
|
Defined in: | lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb |
Overview
A buffer with a fixed internal capacity. Items can be put onto the buffer without blocking until the internal capacity is reached. Once the buffer is at capacity, subsequent calls to #put will block until an item is removed from the buffer, creating spare capacity.
Class Method Summary
Instance Attribute Summary
-
#ns_empty? ⇒ Boolean
readonly
private
Predicate indicating if the buffer is empty.
-
#ns_full? ⇒ Boolean
readonly
private
Predicate indicating if the buffer is full.
Base
- Inherited
#blocking? | Predicate indicating if this buffer will block #put operations once it reaches its maximum capacity. |
#capacity | The maximum number of values which can be #put onto the buffer it becomes full. |
#closed? | Predicate indicating is this buffer closed. |
#empty? | Predicate indicating if the buffer is empty. |
#full? | Predicate indicating if the buffer is full. |
#size | The number of items currently in the buffer. |
#buffer, #buffer=, #capacity=, #closed=, | |
#ns_closed? | Predicate indicating is this buffer closed. |
#ns_empty? | Predicate indicating if the buffer is empty. |
#ns_full? | Predicate indicating if the buffer is full. |
#size= |
Instance Method Summary
-
#next ⇒ Object, Boolean
Take the next “item” from the buffer and also return a boolean indicating if “more” items can be taken.
-
#offer(item) ⇒ Boolean
Put an item onto the buffer if possible.
-
#poll ⇒ Object
Take the next item from the buffer if one is available else return immediately.
-
#put(item) ⇒ Boolean
Put an item onto the buffer if possible.
-
#take ⇒ Object
Take an item from the buffer if one is available.
-
#ns_initialize(size)
private
Creates a new buffer.
-
#ns_put_onto_buffer(item) ⇒ Boolean
private
Put an item onto the buffer if possible.
-
#ns_size
private
The number of items currently in the buffer.
Base
- Inherited
#close | Close the buffer, preventing new items from being added. |
#next | Take the next “item” from the buffer and also return a boolean indicating if “more” items can be taken. |
#offer | Put an item onto the buffer if possible. |
#poll | Take the next item from the buffer if one is available else return immediately. |
#put | Put an item onto the buffer if possible. |
#take | Take an item from the buffer if one is available. |
#ns_initialize, | |
#ns_size | The number of items currently in the buffer. |
Synchronization::LockableObject
- Inherited
Constructor Details
This class inherits a constructor from Concurrent::Channel::Buffer::Base
Instance Attribute Details
#ns_empty? ⇒ Boolean
(readonly, private)
Predicate indicating if the buffer is empty.
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 100
def ns_empty? ns_size == 0 end
#ns_full? ⇒ Boolean
(readonly, private)
Predicate indicating if the buffer is full.
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 105
def ns_full? ns_size == capacity end
Instance Method Details
#next ⇒ Object, Boolean
Take the next “item” from the buffer and also return a boolean indicating if “more” items can be taken. Used for iterating over a buffer until it is closed and empty.
If the buffer is open but no items remain the calling thread will block until an item is available. The second of the two return values, “more” (a boolean), will always be true
when the buffer is open. The “more” value will be false
when the channel has been closed and all values have already been received. When “more” is false the returned item will be NULL.
Note that when multiple threads access the same channel a race condition can occur when using this method. A call to next
from one thread may return true
for the second return value, but another thread may #take the last value before the original thread makes another call. Code which iterates over a channel must be programmed to properly handle these race conditions.
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 56
def next loop do synchronize do if ns_closed? && ns_empty? return Concurrent::NULL, false elsif !ns_empty? item = buffer.shift return item, true end end Thread.pass end end
#ns_initialize(size) (private)
Creates a new buffer.
#ns_put_onto_buffer(item) ⇒ Boolean
(private)
Put an item onto the buffer if possible. If the buffer is open but not able to accept the item the calling thread will block until the item can be put onto the buffer.
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 110
def ns_put_onto_buffer(item) buffer.push(item) end
#ns_size (private)
The number of items currently in the buffer.
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 95
def ns_size buffer.size end
#offer(item) ⇒ Boolean
Put an item onto the buffer if possible. If the buffer is open but unable to add an item, probably due to being full, the method will return immediately. Similarly, the method will return immediately when the buffer is closed. A return value of false
does not necessarily indicate that the buffer is closed, just that the item could not be added.
New items can be put onto the buffer until the number of items in the buffer reaches the #size
value specified during initialization.
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 38
def offer(item) synchronize do if ns_closed? || ns_full? return false else ns_put_onto_buffer(item) return true end end end
#poll ⇒ Object
Take the next item from the buffer if one is available else return immediately. Failing to return a value does not necessarily indicate that the buffer is closed, just that it is empty.
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 71
def poll synchronize do if ns_empty? Concurrent::NULL else buffer.shift end end end
#put(item) ⇒ Boolean
Put an item onto the buffer if possible. If the buffer is open but not able to accept the item the calling thread will block until the item can be put onto the buffer.
New items can be put onto the buffer until the number of items in the buffer reaches the #size
value specified during initialization.
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 19
def put(item) loop do synchronize do if ns_closed? return false elsif !ns_full? ns_put_onto_buffer(item) return true end end Thread.pass end end
#take ⇒ Object
Take an item from the buffer if one is available. If the buffer is open and no item is available the calling thread will block until an item is available. If the buffer is closed but items are available the remaining items can still be taken. Once the buffer closes, no remaining items can be taken.
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 50
def take item, _ = self.next item end