Class: Concurrent::MVar
| Relationships & Source Files | |
| Super Chains via Extension / Inclusion / Inheritance | |
| Class Chain: | |
| Instance Chain: | |
| Inherits: | Concurrent::Synchronization::Object | 
| Defined in: | lib/concurrent-ruby/concurrent/mvar.rb | 
Overview
An MVar is a synchronized single element container. They are empty or contain one item. Taking a value from an empty MVar blocks, as does putting a value into a full one. You can either think of them as blocking queue of length one, or a special kind of mutable variable.
On top of the fundamental #put and #take operations, we also provide a #modify that is atomic with respect to operations on the same instance. These operations all support timeouts.
We also support non-blocking operations #try_put! and #try_take!, a #set! that ignores existing values, a #value that returns the value without removing it or returns EMPTY, and a #modify! that yields EMPTY if the MVar is empty and can be used to set EMPTY. You shouldn’t use these operations in the first instance.
MVar is a [Dereferenceable](Dereferenceable).
MVar is related to M-structures in Id, MVar in Haskell and SyncVar in Scala.
Note that unlike the original Haskell paper, our #take is blocking. This is how Haskell and Scala do it today.
## Copy Options
::Object references in Ruby are mutable. This can lead to serious problems when the #value of an object is a mutable reference. Which is always the case unless the value is a Fixnum, Symbol, or similar “primitive” data type. Each instance can be configured with a few options that can help protect the program from potentially dangerous operations. Each of these options can be optionally set when the object instance is created:
- 
:dup_on_derefWhen true the object will call the#dupmethod on thevalueobject every time the#valuemethod is called (default: false)
- 
:freeze_on_derefWhen true the object will call the#freezemethod on thevalueobject every time the#valuemethod is called (default: false)
- 
:copy_on_derefWhen given aProcobject theProcwill be run every time the#valuemethod is called. TheProcwill be given the currentvalueas its only argument and the result returned by the block will be the return value of the#valuecall. Whennilthis option will be ignored (default: nil)
When multiple deref options are set the order of operations is strictly defined. The order of deref operations is:
- 
:copy_on_deref
- 
:dup_on_deref
- 
:freeze_on_deref
Because of this ordering there is no need to #freeze an object created by a provided :copy_on_deref block. Simply set :freeze_on_deref to true. Setting both :dup_on_deref to true and :freeze_on_deref to true is as close to the behavior of a “pure” functional language (like Erlang, Clojure, or Haskell) as we are likely to get in Ruby.
See Also
- 
Barth, R. Nikhil, and Arvind. [M-Structures: Extending a parallel, non- strict, functional language with state](dl.acm.org/citation.cfm?id=652538). In Proceedings of the 5th 
 - ACM Conference on Functional Programming Languages and Computer Architecture (FPCA), 1991. 
- 
- 
Peyton Jones, A. Gordon, and S. Finne. [Concurrent Haskell](dl.acm.org/citation.cfm?id=237794). 
 - In Proceedings of the 23rd Symposium on Principles of Programming Languages (PoPL), 1996. 
- 
Constant Summary
- 
    EMPTY =
    # File 'lib/concurrent-ruby/concurrent/mvar.rb', line 43Unique value that represents that an MVarwas empty::Object.new 
- 
    TIMEOUT =
    # File 'lib/concurrent-ruby/concurrent/mvar.rb', line 47Unique value that represents that an MVartimed out before it was able to produce a value.::Object.new 
Class Attribute Summary
Synchronization::Object - Inherited
Class Method Summary
- 
    
      .new(value = EMPTY, opts = {})  ⇒ MVar 
    
    constructor
    Create a new MVar, either empty or with an initial value.
Synchronization::Object - Inherited
| .atomic_attribute?, .atomic_attributes, | |
| .attr_atomic | Creates methods for reading and writing to a instance variable with volatile (Java) semantic as  | 
| .attr_volatile | Creates methods for reading and writing (as  | 
| .ensure_safe_initialization_when_final_fields_are_present | For testing purposes, quite slow. | 
| .new | Has to be called by children. | 
| .safe_initialization!, .define_initialize_atomic_fields | |
Synchronization::AbstractObject - Inherited
Instance Attribute Summary
- 
    
      #empty?  ⇒ Boolean 
    
    readonly
    Returns if the MVaris currently empty.
- 
    
      #full?  ⇒ Boolean 
    
    readonly
    Returns if the MVarcurrently contains a value.
- #unlocked_empty? ⇒ Boolean readonly private
- #unlocked_full? ⇒ Boolean readonly private
Concern::Dereferenceable - Included
| #value | Return the value this object represents after applying the options specified by the  | 
Instance Method Summary
- 
    
      #borrow(timeout = nil)  ⇒ Object 
    
    acquires lock on the from an MVAR, yields the value to provided block, and release lock.
- #modify(timeout = nil) ⇒ Object
- #modify!
- 
    
      #put(value, timeout = nil)  ⇒ Object 
    
    Put a value into an MVar, blocking if there is already a value until it is empty.
- 
    
      #set!(value)  
    
    Non-blocking version of #put that will overwrite an existing value. 
- 
    
      #take(timeout = nil)  ⇒ Object 
    
    Remove the value from an MVar, leaving it empty, and blocking if there isn’t a value.
- 
    
      #try_put!(value)  
    
    Non-blocking version of #put, that returns whether or not it was successful. 
- #try_take!
- #wait_for_empty(timeout) private
- #wait_for_full(timeout) private
- #wait_while(condition, timeout) private
Concern::Dereferenceable - Included
| #deref | Alias for Concern::Dereferenceable#value. | 
| #apply_deref_options, | |
| #ns_set_deref_options | 
 | 
Synchronization::Object - Inherited
Synchronization::Volatile - Included
Synchronization::AbstractObject - Inherited
Constructor Details
    .new(value = EMPTY, opts = {})  ⇒ MVar 
  
Create a new MVar, either empty or with an initial value.
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 54
def initialize(value = EMPTY, opts = {}) @value = value @mutex = Mutex.new @empty_condition = ConditionVariable.new @full_condition = ConditionVariable.new (opts) end
Instance Attribute Details
    #empty?  ⇒ Boolean  (readonly)
  
Returns if the MVar is currently empty.
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 195
def empty? @mutex.synchronize { @value == EMPTY } end
    #full?  ⇒ Boolean  (readonly)
  
Returns if the MVar currently contains a value.
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 200
def full? !empty? end
    #unlocked_empty?  ⇒ Boolean  (readonly, private)
  
  [ GitHub ]
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 212
def unlocked_empty? @value == EMPTY end
    #unlocked_full?  ⇒ Boolean  (readonly, private)
  
  [ GitHub ]
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 216
def unlocked_full? ! unlocked_empty? end
Instance Method Details
#borrow(timeout = nil) ⇒ Object
acquires lock on the from an MVAR, yields the value to provided block, and release lock. A timeout can be set to limit the time spent blocked, in which case it returns TIMEOUT if the time is exceeded.
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 86
def borrow(timeout = nil) @mutex.synchronize do wait_for_full(timeout) # If we timed out we'll still be empty if unlocked_full? yield @value else TIMEOUT end end end
#modify(timeout = nil) ⇒ Object
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 123
def modify(timeout = nil) raise ArgumentError.new('no block given') unless block_given? @mutex.synchronize do wait_for_full(timeout) # If we timed out we'll still be empty if unlocked_full? value = @value @value = yield value @full_condition.signal (value) else TIMEOUT end end end
#modify!
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 179
def modify! raise ArgumentError.new('no block given') unless block_given? @mutex.synchronize do value = @value @value = yield value if unlocked_empty? @empty_condition.signal else @full_condition.signal end (value) end end
#put(value, timeout = nil) ⇒ Object
Put a value into an MVar, blocking if there is already a value until it is empty. A timeout can be set to limit the time spent blocked, in which case it returns TIMEOUT if the time is exceeded.
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 103
def put(value, timeout = nil) @mutex.synchronize do wait_for_empty(timeout) # If we timed out we won't be empty if unlocked_empty? @value = value @full_condition.signal (value) else TIMEOUT end end end
#set!(value)
Non-blocking version of #put that will overwrite an existing value.
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 169
def set!(value) @mutex.synchronize do old_value = @value @value = value @full_condition.signal (old_value) end end
#take(timeout = nil) ⇒ Object
Remove the value from an MVar, leaving it empty, and blocking if there isn’t a value. A timeout can be set to limit the time spent blocked, in which case it returns TIMEOUT if the time is exceeded.
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 66
def take(timeout = nil) @mutex.synchronize do wait_for_full(timeout) # If we timed out we'll still be empty if unlocked_full? value = @value @value = EMPTY @empty_condition.signal (value) else TIMEOUT end end end
#try_put!(value)
Non-blocking version of #put, that returns whether or not it was successful.
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 156
def try_put!(value) @mutex.synchronize do if unlocked_empty? @value = value @full_condition.signal true else false end end end
#try_take!
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 142
def try_take! @mutex.synchronize do if unlocked_full? value = @value @value = EMPTY @empty_condition.signal (value) else EMPTY end end end
#wait_for_empty(timeout) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 224
def wait_for_empty(timeout) wait_while(@empty_condition, timeout) { unlocked_full? } end
#wait_for_full(timeout) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 220
def wait_for_full(timeout) wait_while(@full_condition, timeout) { unlocked_empty? } end
#wait_while(condition, timeout) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 228
def wait_while(condition, timeout) if timeout.nil? while yield condition.wait(@mutex) end else stop = Concurrent.monotonic_time + timeout while yield && timeout > 0.0 condition.wait(@mutex, timeout) timeout = stop - Concurrent.monotonic_time end end end