123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::Channel::Buffer::Unbuffered

Relationships & Source Files
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
Instance Chain:
Inherits: Concurrent::Channel::Buffer::Base
Defined in: lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb

Overview

A blocking buffer with a size of zero. An item can only be put onto the buffer when a thread is waiting to take. Similarly, an item can only be put onto the buffer when a thread is waiting to put. When either #put or #take is called and there is no corresponding call in progress, the call will block indefinitely. Any other calls to the same method will queue behind the first call and block as well. As soon as a corresponding put/take call is made an exchange will occur and the first blocked call will return.

Class Method Summary

Base - Inherited

Instance Attribute Summary

Base - Inherited

#blocking?

Predicate indicating if this buffer will block #put operations once it reaches its maximum capacity.

#capacity

The maximum number of values which can be #put onto the buffer it becomes full.

#closed?

Predicate indicating is this buffer closed.

#empty?

Predicate indicating if the buffer is empty.

#full?

Predicate indicating if the buffer is full.

#size

The number of items currently in the buffer.

#buffer, #buffer=, #capacity=, #closed=,
#ns_closed?

Predicate indicating is this buffer closed.

#ns_empty?

Predicate indicating if the buffer is empty.

#ns_full?

Predicate indicating if the buffer is full.

#size=

Instance Method Summary

Base - Inherited

#close

Close the buffer, preventing new items from being added.

#next

Take the next “item” from the buffer and also return a boolean indicating if “more” items can be taken.

#offer

Put an item onto the buffer if possible.

#poll

Take the next item from the buffer if one is available else return immediately.

#put

Put an item onto the buffer if possible.

#take

Take an item from the buffer if one is available.

#ns_initialize,
#ns_size

The number of items currently in the buffer.

Synchronization::LockableObject - Inherited

Constructor Details

This class inherits a constructor from Concurrent::Channel::Buffer::Base

Instance Attribute Details

#empty?Boolean (readonly)

Predicate indicating if the buffer is empty.

Returns:

  • (Boolean)

    true if this buffer is empty else false

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 27

def empty?
  size == 0
end

#full?Boolean (readonly)

Predicate indicating if the buffer is full.

Returns:

  • (Boolean)

    true if this buffer is full else false

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 32

def full?
  !empty?
end

Instance Method Details

#nextObject, Boolean

Take the next “item” from the buffer and also return a boolean indicating if “more” items can be taken. Used for iterating over a buffer until it is closed and empty.

If the buffer is open but no items remain the calling thread will block until an item is available. The second of the two return values, “more” (a boolean), will always be true when the buffer is open. The “more” value will be false when the channel has been closed and all values have already been received. When “more” is false the returned item will be NULL.

Note that when multiple threads access the same channel a race condition can occur when using this method. A call to next from one thread may return true for the second return value, but another thread may #take the last value before the original thread makes another call. Code which iterates over a channel must be programmed to properly handle these race conditions.

Items can only be taken from the buffer when one or more threads are waiting to #put items onto the buffer. This method exhibits the same blocking behavior as #take.

Returns:

  • (Object, Boolean)

    the first return value will be the item taken from the buffer and the second return value will be a boolean indicating whether or not more items remain.

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 135

def next
  item = take
  more = (item != Concurrent::NULL)
  return item, more
end

#ns_initialize (private)

Creates a new buffer.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 148

def ns_initialize
  # one will always be empty
  @putting = []
  @taking = []
  self.closed = false
  self.capacity = 1
end

#offer(item) ⇒ Boolean

Put an item onto the buffer if possible. If the buffer is open but unable to add an item, probably due to being full, the method will return immediately. Similarly, the method will return immediately when the buffer is closed. A return value of false does not necessarily indicate that the buffer is closed, just that the item could not be added.

Items can only be put onto the buffer when one or more threads are waiting to #take items off the buffer. When there is a thread waiting to take an item this method will give its item and return true immediately. When there are no threads waiting to take or the buffer is closed, this method will return false immediately.

Parameters:

  • item (Object)

    the item/value to put onto the buffer.

Returns:

  • (Boolean)

    true if the item was added to the buffer else false (always false when closed).

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 71

def offer(item)
  synchronize do
    return false if ns_closed? || taking.empty?

    taken = taking.shift
    taken.value = item
    true
  end
end

#pollObject

Take the next item from the buffer if one is available else return immediately. Failing to return a value does not necessarily indicate that the buffer is closed, just that it is empty.

Items can only be taken off the buffer when one or more threads are waiting to #put items onto the buffer. When there is a thread waiting to put an item this method will take the item and return it immediately. When there are no threads waiting to put or the buffer is closed, this method will return NULL immediately.

Returns:

  • (Object)

    the next item from the buffer or NULL if the buffer is empty.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 117

def poll
  synchronize do
    return Concurrent::NULL if putting.empty?

    put = putting.shift
    value = put.value
    put.value = nil
    value
  end
end

#put(item) ⇒ Boolean

Put an item onto the buffer if possible. If the buffer is open but not able to accept the item the calling thread will block until the item can be put onto the buffer.

Items can only be put onto the buffer when one or more threads are waiting to #take items off the buffer. When there is a thread waiting to take an item this method will give its item and return immediately. When there are no threads waiting to take, this method will block. As soon as a thread calls #take the exchange will occur and this method will return.

Parameters:

  • item (Object)

    the item/value to put onto the buffer.

Returns:

  • (Boolean)

    true if the item was added to the buffer else false (always false when closed).

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 44

def put(item)
  mine = synchronize do
    return false if ns_closed?

    ref = Concurrent::AtomicReference.new(item)
    if taking.empty?
      putting.push(ref)
    else
      taken = taking.shift
      taken.value = item
      ref.value = nil
    end
    ref
  end
  loop do
    return true if mine.value.nil?
    Thread.pass
  end
end

#putting (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 143

def putting() @putting; end

#size

The number of items currently in the buffer.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 20

def size
  synchronize do
    putting.empty? ? 0 : 1
  end
end

#takeObject

Take an item from the buffer if one is available. If the buffer is open and no item is available the calling thread will block until an item is available. If the buffer is closed but items are available the remaining items can still be taken. Once the buffer closes, no remaining items can be taken.

Items can only be taken from the buffer when one or more threads are waiting to #put items onto the buffer. When there is a thread waiting to put an item this method will take that item and return it immediately. When there are no threads waiting to put, this method will block. As soon as a thread calls pur the exchange will occur and this method will return.

Returns:

  • (Object)

    the item removed from the buffer; NULL once the buffer has closed.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 89

def take
  mine = synchronize do
    return Concurrent::NULL if ns_closed? && putting.empty?

    ref = Concurrent::AtomicReference.new(nil)
    if putting.empty?
      taking.push(ref)
    else
      put = putting.shift
      ref.value = put.value
      put.value = nil
    end
    ref
  end
  loop do
    item = mine.value
    return item if item
    Thread.pass
  end
end

#taking (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 145

def taking() @taking; end