123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::Channel::Buffer::Base

Relationships & Source Files
Extension / Inclusion / Inheritance Descendants
Subclasses:
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
Instance Chain:
Inherits: Concurrent::Synchronization::LockableObject
Defined in: lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb

Overview

Abstract base class for all Channel buffers.

Channel objects maintain an internal, queue-like object called a buffer. It’s the storage bin for values put onto or taken from the channel. Different buffer types have different characteristics. Subsequently, the behavior of any given channel is highly dependent uping the type of its buffer. This is the base class which defines the common buffer interface. Any class intended to be used as a channel buffer should extend this class.

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Synchronization::LockableObject - Inherited

Constructor Details

.new(*args) ⇒ Base

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 27

def initialize(*args)
  super()
  synchronize do
    @closed = false
    @size = 0
    @capacity = 0
    @buffer = nil
    ns_initialize(*args)
  end
end

Instance Attribute Details

#blocking?Boolean (readonly)

Predicate indicating if this buffer will block #put operations once it reaches its maximum capacity.

Returns:

  • (Boolean)

    true if this buffer blocks else false

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 44

def blocking?
  true
end

#buffer (rw, private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 193

def buffer
  @buffer
end

#buffer=(value) (rw, private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 197

def buffer=(value)
  @buffer = value
end

#capacity (rw)

The maximum number of values which can be #put onto the buffer it becomes full.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 22

attr_reader :capacity

#capacity=(value) (rw, private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 205

def capacity=(value)
  @capacity = value
end

#closed=(value) (rw, private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 201

def closed=(value)
  @closed = value
end

#closed?Boolea (rw)

Predicate indicating is this buffer closed.

Returns:

  • (Boolea)

    true when closed else false.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 187

def closed?
  synchronize { ns_closed? }
end

#empty?Boolean (readonly)

Predicate indicating if the buffer is empty.

Returns:

  • (Boolean)

    true if this buffer is empty else false

Raises:

  • (NotImplementedError)

    until overridden in a subclass.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 62

def empty?
  synchronize { ns_empty? }
end

#full?Boolean (readonly)

Predicate indicating if the buffer is full.

Returns:

  • (Boolean)

    true if this buffer is full else false

Raises:

  • (NotImplementedError)

    until overridden in a subclass.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 73

def full?
  synchronize { ns_full? }
end

#ns_closed?Boolea (readonly, private)

Predicate indicating is this buffer closed.

Returns:

  • (Boolea)

    true when closed else false.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 232

def ns_closed?
  @closed
end

#ns_empty?Boolean (readonly, private)

Predicate indicating if the buffer is empty.

Returns:

  • (Boolean)

    true if this buffer is empty else false

Raises:

  • (NotImplementedError)
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 222

def ns_empty?
  raise NotImplementedError
end

#ns_full?Boolean (readonly, private)

Predicate indicating if the buffer is full.

Returns:

  • (Boolean)

    true if this buffer is full else false

Raises:

  • (NotImplementedError)
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 227

def ns_full?
  raise NotImplementedError
end

#size (rw)

The number of items currently in the buffer.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 51

def size
  synchronize { ns_size }
end

#size=(value) (rw, private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 209

def size=(value)
  @size = value
end

Instance Method Details

#closeBoolean

Close the buffer, preventing new items from being added. Once a buffer is closed it cannot be opened again.

Returns:

  • (Boolean)

    true if the buffer was open and successfully closed else false.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 176

def close
  synchronize do
    @closed ? false : @closed = true
  end
end

#nextObject, Boolean

Take the next “item” from the buffer and also return a boolean indicating if “more” items can be taken. Used for iterating over a buffer until it is closed and empty.

If the buffer is open but no items remain the calling thread will block until an item is available. The second of the two return values, “more” (a boolean), will always be true when the buffer is open. The “more” value will be false when the channel has been closed and all values have already been received. When “more” is false the returned item will be NULL.

Note that when multiple threads access the same channel a race condition can occur when using this method. A call to next from one thread may return true for the second return value, but another thread may #take the last value before the original thread makes another call. Code which iterates over a channel must be programmed to properly handle these race conditions.

Returns:

  • (Object, Boolean)

    the first return value will be the item taken from the buffer and the second return value will be a boolean indicating whether or not more items remain.

Raises:

  • (NotImplementedError)

    until overridden in a subclass.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 151

def next
  raise NotImplementedError
end

#ns_initialize(*args) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 213

def ns_initialize(*args)
end

#ns_size (private)

The number of items currently in the buffer.

Raises:

  • (NotImplementedError)
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 217

def ns_size
  raise NotImplementedError
end

#offer(item) ⇒ Boolean

Put an item onto the buffer if possible. If the buffer is open but unable to add an item, probably due to being full, the method will return immediately. Similarly, the method will return immediately when the buffer is closed. A return value of false does not necessarily indicate that the buffer is closed, just that the item could not be added.

Parameters:

  • item (Object)

    the item/value to put onto the buffer.

Returns:

  • (Boolean)

    true if the item was added to the buffer else false (always false when closed).

Raises:

  • (NotImplementedError)

    until overridden in a subclass.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 106

def offer(item)
  raise NotImplementedError
end

#pollObject

Take the next item from the buffer if one is available else return immediately. Failing to return a value does not necessarily indicate that the buffer is closed, just that it is empty.

Returns:

  • (Object)

    the next item from the buffer or NULL if the buffer is empty.

Raises:

  • (NotImplementedError)

    until overridden in a subclass.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 165

def poll
  raise NotImplementedError
end

#put(item) ⇒ Boolean

Put an item onto the buffer if possible. If the buffer is open but not able to accept the item the calling thread will block until the item can be put onto the buffer.

Parameters:

  • item (Object)

    the item/value to put onto the buffer.

Returns:

  • (Boolean)

    true if the item was added to the buffer else false (always false when closed).

Raises:

  • (NotImplementedError)

    until overridden in a subclass.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 88

def put(item)
  raise NotImplementedError
end

#takeObject

Take an item from the buffer if one is available. If the buffer is open and no item is available the calling thread will block until an item is available. If the buffer is closed but items are available the remaining items can still be taken. Once the buffer closes, no remaining items can be taken.

Returns:

  • (Object)

    the item removed from the buffer; NULL once the buffer has closed.

Raises:

  • (NotImplementedError)

    until overridden in a subclass.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb', line 122

def take
  raise NotImplementedError
end