123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::MVar

Overview

An MVar is a synchronized single element container. They are empty or contain one item. Taking a value from an empty MVar blocks, as does putting a value into a full one. You can either think of them as blocking queue of length one, or a special kind of mutable variable.

On top of the fundamental #put and #take operations, we also provide a #mutate that is atomic with respect to operations on the same instance. These operations all support timeouts.

We also support non-blocking operations #try_put! and #try_take!, a #set! that ignores existing values, a #value that returns the value without removing it or returns EMPTY, and a #modify! that yields EMPTY if the MVar is empty and can be used to set EMPTY. You shouldn’t use these operations in the first instance.

MVar is a [Dereferenceable](Dereferenceable).

MVar is related to M-structures in Id, MVar in Haskell and SyncVar in Scala.

Note that unlike the original Haskell paper, our #take is blocking. This is how Haskell and Scala do it today.

## Copy Options

::Object references in Ruby are mutable. This can lead to serious problems when the #value of an object is a mutable reference. Which is always the case unless the value is a Fixnum, Symbol, or similar “primitive” data type. Each instance can be configured with a few options that can help protect the program from potentially dangerous operations. Each of these options can be optionally set when the object instance is created:

  • :dup_on_deref When true the object will call the #dup method on the value object every time the #value method is called (default: false)

  • :freeze_on_deref When true the object will call the #freeze method on the value object every time the #value method is called (default: false)

  • :copy_on_deref When given a Proc object the Proc will be run every time the #value method is called. The Proc will be given the current value as its only argument and the result returned by the block will be the return value of the #value call. When nil this option will be ignored (default: nil)

When multiple deref options are set the order of operations is strictly defined. The order of deref operations is:

  • :copy_on_deref

  • :dup_on_deref

  • :freeze_on_deref

Because of this ordering there is no need to #freeze an object created by a provided :copy_on_deref block. Simply set :freeze_on_deref to true. Setting both :dup_on_deref to true and :freeze_on_deref to true is as close to the behavior of a “pure” functional language (like Erlang, Clojure, or Haskell) as we are likely to get in Ruby.

See Also

    1. Barth, R. Nikhil, and Arvind. [M-Structures: Extending a parallel, non- strict, functional language with state](dl.acm.org/citation.cfm?id=652538). In Proceedings of the 5th

    ACM Conference on Functional Programming Languages and Computer Architecture (FPCA), 1991.

    1. Peyton Jones, A. Gordon, and S. Finne. [Concurrent Haskell](dl.acm.org/citation.cfm?id=237794).

    In Proceedings of the 23rd Symposium on Principles of Programming Languages (PoPL), 1996.

Constant Summary

Class Attribute Summary

Class Method Summary

Synchronization::Object - Inherited

.atomic_attribute?, .atomic_attributes,
.attr_atomic

Creates methods for reading and writing to a instance variable with volatile (Java) semantic as .attr_volatile does.

.attr_volatile

Creates methods for reading and writing (as attr_accessor does) to a instance variable with volatile (Java) semantic.

.ensure_safe_initialization_when_final_fields_are_present

For testing purposes, quite slow.

.new

Has to be called by children.

.safe_initialization!, .define_initialize_atomic_fields

Synchronization::AbstractObject - Inherited

Instance Attribute Summary

Concern::Dereferenceable - Included

#value

Return the value this object represents after applying the options specified by the #set_deref_options method.

Instance Method Summary

Concern::Dereferenceable - Included

#deref
#apply_deref_options,
#ns_set_deref_options

Set the options which define the operations #value performs before returning data to the caller (dereferencing).

Synchronization::Object - Inherited

Synchronization::Volatile - Included

Synchronization::AbstractObject - Inherited

Constructor Details

.new(value = EMPTY, opts = {}) ⇒ MVar

Create a new MVar, either empty or with an initial value.

Parameters:

  • opts (Hash) (defaults to: {})

    the options controlling how the future will be processed

Options Hash (opts):

  • :dup_on_deref (Boolean) — default: false

    Call #dup before returning the data from #value

  • :freeze_on_deref (Boolean) — default: false

    Call #freeze before returning the data from #value

  • :copy_on_deref (Proc) — default: nil

    When calling the #value method, call the given proc passing the internal value as the sole argument then return the new value returned from the proc.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 54

def initialize(value = EMPTY, opts = {})
  @value = value
  @mutex = Mutex.new
  @empty_condition = ConditionVariable.new
  @full_condition = ConditionVariable.new
  set_deref_options(opts)
end

Instance Attribute Details

#empty?Boolean (readonly)

Returns if the MVar is currently empty.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 195

def empty?
  @mutex.synchronize { @value == EMPTY }
end

#full?Boolean (readonly)

Returns if the MVar currently contains a value.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 200

def full?
  !empty?
end

#unlocked_empty?Boolean (readonly, private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 212

def unlocked_empty?
  @value == EMPTY
end

#unlocked_full?Boolean (readonly, private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 216

def unlocked_full?
  ! unlocked_empty?
end

Instance Method Details

#borrow(timeout = nil) ⇒ Object

acquires lock on the from an MVAR, yields the value to provided block, and release lock. A timeout can be set to limit the time spent blocked, in which case it returns TIMEOUT if the time is exceeded.

Returns:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 86

def borrow(timeout = nil)
  @mutex.synchronize do
    wait_for_full(timeout)

    # if we timeoud out we'll still be empty
    if unlocked_full?
      yield @value
    else
      TIMEOUT
    end
  end
end

#modify(timeout = nil) ⇒ Object

Atomically #take, yield the value to a block for transformation, and then #put the transformed value. Returns the transformed value. A timeout can be set to limit the time spent blocked, in which case it returns TIMEOUT if the time is exceeded.

Returns:

Raises:

  • (ArgumentError)
[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 123

def modify(timeout = nil)
  raise ArgumentError.new('no block given') unless block_given?

  @mutex.synchronize do
    wait_for_full(timeout)

    # If we timed out we'll still be empty
    if unlocked_full?
      value = @value
      @value = yield value
      @full_condition.signal
      apply_deref_options(value)
    else
      TIMEOUT
    end
  end
end

#modify!

Non-blocking version of #modify that will yield with EMPTY if there is no value yet.

Raises:

  • (ArgumentError)
[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 179

def modify!
  raise ArgumentError.new('no block given') unless block_given?

  @mutex.synchronize do
    value = @value
    @value = yield value
    if unlocked_empty?
      @empty_condition.signal
    else
      @full_condition.signal
    end
    apply_deref_options(value)
  end
end

#put(value, timeout = nil) ⇒ Object

Put a value into an MVar, blocking if there is already a value until it is empty. A timeout can be set to limit the time spent blocked, in which case it returns TIMEOUT if the time is exceeded.

Returns:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 103

def put(value, timeout = nil)
  @mutex.synchronize do
    wait_for_empty(timeout)

    # If we timed out we won't be empty
    if unlocked_empty?
      @value = value
      @full_condition.signal
      apply_deref_options(value)
    else
      TIMEOUT
    end
  end
end

#set!(value)

Non-blocking version of #put that will overwrite an existing value.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 169

def set!(value)
  @mutex.synchronize do
    old_value = @value
    @value = value
    @full_condition.signal
    apply_deref_options(old_value)
  end
end

#take(timeout = nil) ⇒ Object

Remove the value from an MVar, leaving it empty, and blocking if there isn’t a value. A timeout can be set to limit the time spent blocked, in which case it returns TIMEOUT if the time is exceeded.

Returns:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 66

def take(timeout = nil)
  @mutex.synchronize do
    wait_for_full(timeout)

    # If we timed out we'll still be empty
    if unlocked_full?
      value = @value
      @value = EMPTY
      @empty_condition.signal
      apply_deref_options(value)
    else
      TIMEOUT
    end
  end
end

#try_put!(value)

Non-blocking version of #put, that returns whether or not it was successful.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 156

def try_put!(value)
  @mutex.synchronize do
    if unlocked_empty?
      @value = value
      @full_condition.signal
      true
    else
      false
    end
  end
end

#try_take!

Non-blocking version of #take, that returns EMPTY instead of blocking.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 142

def try_take!
  @mutex.synchronize do
    if unlocked_full?
      value = @value
      @value = EMPTY
      @empty_condition.signal
      apply_deref_options(value)
    else
      EMPTY
    end
  end
end

#wait_for_empty(timeout) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 224

def wait_for_empty(timeout)
  wait_while(@empty_condition, timeout) { unlocked_full? }
end

#wait_for_full(timeout) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 220

def wait_for_full(timeout)
  wait_while(@full_condition, timeout) { unlocked_empty? }
end

#wait_while(condition, timeout) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 228

def wait_while(condition, timeout)
  if timeout.nil?
    while yield
      condition.wait(@mutex)
    end
  else
    stop = Concurrent.monotonic_time + timeout
    while yield && timeout > 0.0
      condition.wait(@mutex, timeout)
      timeout = stop - Concurrent.monotonic_time
    end
  end
end