123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::RubyExchanger

Overview

Note:

**Private Implementation:** This abstraction is a private, internal implementation detail. It should never be used directly.

Constant Summary

AbstractExchanger - Inherited

CANCEL

Class Attribute Summary

Class Method Summary

AbstractExchanger - Inherited

Synchronization::Object - Inherited

.atomic_attribute?, .atomic_attributes,
.attr_atomic

Creates methods for reading and writing to a instance variable with volatile (Java) semantic as .attr_volatile does.

.attr_volatile

Creates methods for reading and writing (as attr_accessor does) to a instance variable with volatile (Java) semantic.

.ensure_safe_initialization_when_final_fields_are_present

For testing purposes, quite slow.

.new

Has to be called by children.

.safe_initialization!, .define_initialize_atomic_fields

Synchronization::AbstractObject - Inherited

Instance Method Summary

AbstractExchanger - Inherited

#exchange

Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return.

#exchange!

Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return.

#try_exchange

Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return.

#do_exchange

Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return.

Synchronization::Object - Inherited

Synchronization::Volatile - Included

Synchronization::AbstractObject - Inherited

Constructor Details

.newRubyExchanger

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 159

def initialize
  super
end

Instance Method Details

#do_exchange(value, timeout) ⇒ Object, CANCEL (private)

Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return. The timeout value indicates the approximate number of seconds the method should block while waiting for the exchange. When the timeout value is nil the method will block indefinitely.

Parameters:

  • value (Object)

    the value to exchange with another thread

  • timeout (Numeric, nil)

    in seconds, nil blocks indefinitely

Returns:

  • (Object, CANCEL)

    the value exchanged by the other thread; CANCEL on timeout

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 170

def do_exchange(value, timeout)

  # ALGORITHM
  #
  # From the original Java version:
  #
  # > The basic idea is to maintain a "slot", which is a reference to
  # > a Node containing both an Item to offer and a "hole" waiting to
  # > get filled in.  If an incoming "occupying" thread sees that the
  # > slot is null, it CAS'es (compareAndSets) a Node there and waits
  # > for another to invoke exchange.  That second "fulfilling" thread
  # > sees that the slot is non-null, and so CASes it back to null,
  # > also exchanging items by CASing the hole, plus waking up the
  # > occupying thread if it is blocked.  In each case CAS'es may
  # > fail because a slot at first appears non-null but is null upon
  # > CAS, or vice-versa.  So threads may need to retry these
  # > actions.
  #
  # This version:
  #
  # An exchange occurs between an "occupier" thread and a "fulfiller" thread.
  # The "slot" is used to setup this interaction. The first thread in the
  # exchange puts itself into the slot (occupies) and waits for a fulfiller.
  # The second thread removes the occupier from the slot and attempts to
  # perform the exchange. Removing the occupier also frees the slot for
  # another occupier/fulfiller pair.
  #
  # Because the occupier and the fulfiller are operating independently and
  # because there may be contention with other threads, any failed operation
  # indicates contention. Both the occupier and the fulfiller operate within
  # spin loops. Any failed actions along the happy path will cause the thread
  # to repeat the loop and try again.
  #
  # When a timeout value is given the thread must be cognizant of time spent
  # in the spin loop. The remaining time is checked every loop. When the time
  # runs out the thread will exit.
  #
  # A "node" is the data structure used to perform the exchange. Only the
  # occupier's node is necessary. It's the node used for the exchange.
  # Each node has an "item," a "hole" (self), and a "latch." The item is the
  # node's initial value. It never changes. It's what the fulfiller returns on
  # success. The occupier's hole is where the fulfiller put its item. It's the
  # item that the occupier returns on success. The latch is used for synchronization.
  # Because a thread may act as either an occupier or fulfiller (or possibly
  # both in periods of high contention) every thread creates a node when
  # the exchange method is first called.
  #
  # The following steps occur within the spin loop. If any actions fail
  # the thread will loop and try again, so long as there is time remaining.
  # If time runs out the thread will return CANCEL.
  #
  # Check the slot for an occupier:
  #
  #   * If the slot is empty try to occupy
  #   * If the slot is full try to fulfill
  #
  # Attempt to occupy:
  #
  #   * Attempt to CAS myself into the slot
  #   * Go to sleep and wait to be woken by a fulfiller
  #   * If the sleep is successful then the fulfiller completed its happy path
  #     - Return the value from my hole (the value given by the fulfiller)
  #   * When the sleep fails (time ran out) attempt to cancel the operation
  #     - Attempt to CAS myself out of the hole
  #     - If successful there is no contention
  #       - Return CANCEL
  #     - On failure, I am competing with a fulfiller
  #       - Attempt to CAS my hole to CANCEL
  #       - On success
  #         - Let the fulfiller deal with my cancel
  #         - Return CANCEL
  #       - On failure the fulfiller has completed its happy path
  #         - Return th value from my hole (the fulfiller's value)
  #
  # Attempt to fulfill:
  #
  # * Attempt to CAS the occupier out of the slot
  #   - On failure loop again
  # * Attempt to CAS my item into the occupier's hole
  #   - On failure the occupier is trying to cancel
  #     - Loop again
  #   - On success we are on the happy path
  #     - Wake the sleeping occupier
  #     - Return the occupier's item

  value  = NULL if value.nil? # The sentinel allows nil to be a valid value
  me     = Node.new(value) # create my node in case I need to occupy
  end_at = Concurrent.monotonic_time + timeout.to_f # The time to give up

  result = loop do
    other = slot
    if other && compare_and_set_slot(other, nil)
      # try to fulfill
      if other.compare_and_set_value(nil, value)
        # happy path
        other.latch.count_down
        break other.item
      end
    elsif other.nil? && compare_and_set_slot(nil, me)
      # try to occupy
      timeout = end_at - Concurrent.monotonic_time if timeout
      if me.latch.wait(timeout)
        # happy path
        break me.value
      else
        # attempt to remove myself from the slot
        if compare_and_set_slot(me, nil)
          break CANCEL
        elsif !me.compare_and_set_value(nil, CANCEL)
          # I've failed to block the fulfiller
          break me.value
        end
      end
    end
    break CANCEL if timeout && Concurrent.monotonic_time >= end_at
  end

  result == NULL ? nil : result
end