123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::Channel::Buffer::Buffered

Relationships & Source Files
Extension / Inclusion / Inheritance Descendants
Subclasses:
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
Instance Chain:
Inherits: Concurrent::Channel::Buffer::Base
Defined in: lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb

Overview

A buffer with a fixed internal capacity. Items can be put onto the buffer without blocking until the internal capacity is reached. Once the buffer is at capacity, subsequent calls to #put will block until an item is removed from the buffer, creating spare capacity.

Class Method Summary

Base - Inherited

Instance Attribute Summary

Base - Inherited

#blocking?

Predicate indicating if this buffer will block #put operations once it reaches its maximum capacity.

#capacity

The maximum number of values which can be #put onto the buffer it becomes full.

#closed?

Predicate indicating is this buffer closed.

#empty?

Predicate indicating if the buffer is empty.

#full?

Predicate indicating if the buffer is full.

#size

The number of items currently in the buffer.

#buffer, #buffer=, #capacity=, #closed=,
#ns_closed?

Predicate indicating is this buffer closed.

#ns_empty?

Predicate indicating if the buffer is empty.

#ns_full?

Predicate indicating if the buffer is full.

#size=

Instance Method Summary

Base - Inherited

#close

Close the buffer, preventing new items from being added.

#next

Take the next “item” from the buffer and also return a boolean indicating if “more” items can be taken.

#offer

Put an item onto the buffer if possible.

#poll

Take the next item from the buffer if one is available else return immediately.

#put

Put an item onto the buffer if possible.

#take

Take an item from the buffer if one is available.

#ns_initialize,
#ns_size

The number of items currently in the buffer.

Synchronization::LockableObject - Inherited

Constructor Details

This class inherits a constructor from Concurrent::Channel::Buffer::Base

Instance Attribute Details

#ns_empty?Boolean (readonly, private)

Predicate indicating if the buffer is empty.

Returns:

  • (Boolean)

    true if this buffer is empty else false

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 100

def ns_empty?
  ns_size == 0
end

#ns_full?Boolean (readonly, private)

Predicate indicating if the buffer is full.

Returns:

  • (Boolean)

    true if this buffer is full else false

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 105

def ns_full?
  ns_size == capacity
end

Instance Method Details

#nextObject, Boolean

Take the next “item” from the buffer and also return a boolean indicating if “more” items can be taken. Used for iterating over a buffer until it is closed and empty.

If the buffer is open but no items remain the calling thread will block until an item is available. The second of the two return values, “more” (a boolean), will always be true when the buffer is open. The “more” value will be false when the channel has been closed and all values have already been received. When “more” is false the returned item will be NULL.

Note that when multiple threads access the same channel a race condition can occur when using this method. A call to next from one thread may return true for the second return value, but another thread may #take the last value before the original thread makes another call. Code which iterates over a channel must be programmed to properly handle these race conditions.

Returns:

  • (Object, Boolean)

    the first return value will be the item taken from the buffer and the second return value will be a boolean indicating whether or not more items remain.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 56

def next
  loop do
    synchronize do
      if ns_closed? && ns_empty?
        return Concurrent::NULL, false
      elsif !ns_empty?
        item = buffer.shift
        return item, true
      end
    end
    Thread.pass
  end
end

#ns_initialize(size) (private)

Creates a new buffer.

Parameters:

  • size (Integer)

    the maximum capacity of the buffer; must be greater than zero.

Raises:

  • (ArgumentError)

    when the size is zero (0) or less.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 88

def ns_initialize(size)
  raise ArgumentError.new('size must be greater than 0') if size.to_i <= 0
  self.capacity = size.to_i
  self.buffer = []
end

#ns_put_onto_buffer(item) ⇒ Boolean (private)

Put an item onto the buffer if possible. If the buffer is open but not able to accept the item the calling thread will block until the item can be put onto the buffer.

Parameters:

  • item (Object)

    the item/value to put onto the buffer.

Returns:

  • (Boolean)

    true if the item was added to the buffer else false (always false when closed).

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 110

def ns_put_onto_buffer(item)
  buffer.push(item)
end

#ns_size (private)

The number of items currently in the buffer.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 95

def ns_size
  buffer.size
end

#offer(item) ⇒ Boolean

Put an item onto the buffer if possible. If the buffer is open but unable to add an item, probably due to being full, the method will return immediately. Similarly, the method will return immediately when the buffer is closed. A return value of false does not necessarily indicate that the buffer is closed, just that the item could not be added.

New items can be put onto the buffer until the number of items in the buffer reaches the #size value specified during initialization.

Parameters:

  • item (Object)

    the item/value to put onto the buffer.

Returns:

  • (Boolean)

    true if the item was added to the buffer else false (always false when closed).

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 38

def offer(item)
  synchronize do
    if ns_closed? || ns_full?
      return false
    else
      ns_put_onto_buffer(item)
      return true
    end
  end
end

#pollObject

Take the next item from the buffer if one is available else return immediately. Failing to return a value does not necessarily indicate that the buffer is closed, just that it is empty.

Returns:

  • (Object)

    the next item from the buffer or NULL if the buffer is empty.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 71

def poll
  synchronize do
    if ns_empty?
      Concurrent::NULL
    else
      buffer.shift
    end
  end
end

#put(item) ⇒ Boolean

Put an item onto the buffer if possible. If the buffer is open but not able to accept the item the calling thread will block until the item can be put onto the buffer.

New items can be put onto the buffer until the number of items in the buffer reaches the #size value specified during initialization.

Parameters:

  • item (Object)

    the item/value to put onto the buffer.

Returns:

  • (Boolean)

    true if the item was added to the buffer else false (always false when closed).

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 19

def put(item)
  loop do
    synchronize do
      if ns_closed?
        return false
      elsif !ns_full?
        ns_put_onto_buffer(item)
        return true
      end
    end
    Thread.pass
  end
end

#takeObject

Take an item from the buffer if one is available. If the buffer is open and no item is available the calling thread will block until an item is available. If the buffer is closed but items are available the remaining items can still be taken. Once the buffer closes, no remaining items can be taken.

Returns:

  • (Object)

    the item removed from the buffer; NULL once the buffer has closed.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 50

def take
  item, _ = self.next
  item
end