123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::IVar

Relationships & Source Files
Extension / Inclusion / Inheritance Descendants
Subclasses:
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
Instance Chain:
Inherits: Concurrent::Synchronization::LockableObject
Defined in: lib/concurrent-ruby/concurrent/ivar.rb

Overview

An IVar is like a future that you can assign. As a future is a value that is being computed that you can wait on, an IVar is a value that is waiting to be assigned, that you can wait on. IVars are single assignment and deterministic.

Then, express futures as an asynchronous computation that assigns an IVar. The IVar becomes the primitive on which [futures](Future) and [dataflow](Dataflow) are built.

An IVar is a single-element container that is normally created empty, and can only be set once. The I in IVar stands for immutable. Reading an IVar normally blocks until it is set. It is safe to set and read an IVar from different threads.

If you want to have some parallel task set the value in an IVar, you want a Future. If you want to create a graph of parallel tasks all executed when the values they depend on are ready you want dataflow. IVar is generally a low-level primitive.

Examples

Create, set and get an IVar

“‘ruby ivar = .new ivar.set 14 ivar.value #=> 14 ivar.set 2 # would now be an error “`

See Also

  1. For the theory: Arvind, R. Nikhil, and K. Pingali. [I-Structures: Data structures for parallel computing](dl.acm.org/citation.cfm?id=69562). In Proceedings of Workshop on Graph Reduction, 1986.

  2. For recent application: [DataDrivenFuture in Habanero Java from Rice](www.cs.rice.edu/~vs3/hjlib/doc/edu/rice/hj/api/HjDataDrivenFuture.html).

Class Method Summary

Instance Attribute Summary

Concern::Obligation - Included

#complete?

Has the obligation completed processing?

#fulfilled?

Has the obligation been fulfilled?

#incomplete?

Is the obligation still awaiting completion of processing?

#pending?

Is obligation completion still pending?

#realized?
#rejected?

Has the obligation been rejected?

#state

The current state of the obligation.

#unscheduled?

Is the obligation still unscheduled?

#state=

Concern::Dereferenceable - Included

#value

Return the value this object represents after applying the options specified by the #set_deref_options method.

Instance Method Summary

Concern::Observable - Included

#add_observer

Adds an observer to this set.

#count_observers

Return the number of observers associated with this object.

#delete_observer

Remove observer as an observer on this object so that it will no longer receive notifications.

#delete_observers

Remove all observers associated with this object.

#with_observer

As #add_observer but can be used for chaining.

Concern::Obligation - Included

#exception,
#no_error!
#reason

If an exception was raised during processing this will return the exception object.

#value

The current value of the obligation.

#value!

The current value of the obligation.

#wait

Wait until obligation is complete or the timeout has been reached.

#wait!

Wait until obligation is complete or the timeout is reached.

#compare_and_set_state

Atomic compare and set operation State is set to next_state only if current state == expected_current.

#event, #get_arguments_from,
#if_state

Executes the block within mutex if current state is included in expected_states.

#init_obligation,
#ns_check_state?

Am I in the current state?

#ns_set_state, #set_state

Concern::Dereferenceable - Included

#deref
#apply_deref_options,
#ns_set_deref_options

Set the options which define the operations #value performs before returning data to the caller (dereferencing).

Synchronization::LockableObject - Inherited

Constructor Details

.new(value = NULL, opts = {}, &block) ⇒ IVar

Create a new IVar in the :pending state with the (optional) initial value.

Parameters:

  • value (Object) (defaults to: NULL)

    the initial value

  • opts (Hash) (defaults to: {})

    the options to create a message with

Options Hash (opts):

  • :dup_on_deref (String) — default: false

    call #dup before returning the data

  • :freeze_on_deref (String) — default: false

    call #freeze before returning the data

  • :copy_on_deref (String) — default: nil

    call the given Proc passing the internal value and returning the value returned from the proc

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 62

def initialize(value = NULL, opts = {}, &block)
  if value != NULL && block_given?
    raise ArgumentError.new('provide only a value or a block')
  end
  super(&nil)
  synchronize { ns_initialize(value, opts, &block) }
end

Instance Method Details

#add_observer(observer = nil, func = :update, &block)

Add an observer on this object that will receive notification on update.

Upon completion the IVar will notify all observers in a thread-safe way. The func method of the observer will be called with three arguments: the Time at which the Future completed the asynchronous operation, the final value (or nil on rejection), and the final reason (or nil on fulfillment).

Parameters:

  • observer (Object) (defaults to: nil)

    the object that will be notified of changes

  • func (Symbol) (defaults to: :update)

    symbol naming the method to call when this Observable has changes`

Raises:

  • (ArgumentError)
[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 81

def add_observer(observer = nil, func = :update, &block)
  raise ArgumentError.new('cannot provide both an observer and a block') if observer && block
  direct_notification = false

  if block
    observer = block
    func = :call
  end

  synchronize do
    if event.set?
      direct_notification = true
    else
      observers.add_observer(observer, func)
    end
  end

  observer.send(func, Time.now, self.value, reason) if direct_notification
  observer
end

#check_for_block_or_value!(block_given, value) (private)

This method is for internal use only.
[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 202

def check_for_block_or_value!(block_given, value) # :nodoc:
  if (block_given && value != NULL) || (! block_given && value == NULL)
    raise ArgumentError.new('must set with either a value or a block')
  end
end

#complete(success, value, reason) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 177

def complete(success, value, reason)
  complete_without_notification(success, value, reason)
  notify_observers(self.value, reason)
  self
end

#complete_without_notification(success, value, reason) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 184

def complete_without_notification(success, value, reason)
  synchronize { ns_complete_without_notification(success, value, reason) }
  self
end

#fail(reason = StandardError.new) ⇒ IVar

Set the IVar to failed due to some error and wake or notify all threads waiting on it.

Parameters:

  • reason (Object) (defaults to: StandardError.new)

    for the failure

Returns:

  • (IVar)

    self

Raises:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 135

def fail(reason = StandardError.new)
  complete(false, nil, reason)
end

#notify_observers(value, reason) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 190

def notify_observers(value, reason)
  observers.notify_and_delete_observers{ [Time.now, value, reason] }
end

#ns_complete_without_notification(success, value, reason) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 195

def ns_complete_without_notification(success, value, reason)
  raise MultipleAssignmentError if [:fulfilled, :rejected].include? @state
  set_state(success, value, reason)
  event.set
end

#ns_initialize(value, opts) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 155

def ns_initialize(value, opts)
  value = yield if block_given?
  init_obligation
  self.observers = Collection::CopyOnWriteObserverSet.new
  set_deref_options(opts)

  @state = :pending
  if value != NULL
    ns_complete_without_notification(true, value, nil)
  end
end

#safe_execute(task, args = []) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 168

def safe_execute(task, args = [])
  if compare_and_set_state(:processing, :pending)
    success, val, reason = SafeTaskExecutor.new(task, rescue_exception: true).execute(*@args)
    complete(success, val, reason)
    yield(success, val, reason) if block_given?
  end
end

#set(value = NULL) { ... } ⇒ IVar

Set the IVar to a value and wake or notify all threads waiting on it.

Parameters:

  • value (Object) (defaults to: NULL)

    the value to store in the IVar

Yields:

  • A block operation to use for setting the value

Returns:

  • (IVar)

    self

Raises:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 113

def set(value = NULL)
  check_for_block_or_value!(block_given?, value)
  raise MultipleAssignmentError unless compare_and_set_state(:processing, :pending)

  begin
    value = yield if block_given?
    complete_without_notification(true, value, nil)
  rescue => ex
    complete_without_notification(false, nil, ex)
  end

  notify_observers(self.value, reason)
  self
end

#try_set(value = NULL) { ... } ⇒ Boolean

Attempt to set the IVar with the given value or block. Return a boolean indicating the success or failure of the set operation.

Parameters:

  • value (Object) (defaults to: NULL)

    the value to store in the IVar

Yields:

  • A block operation to use for setting the value

Returns:

  • (Boolean)

    true if the value was set else false

Raises:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 145

def try_set(value = NULL, &block)
  set(value, &block)
  true
rescue MultipleAssignmentError
  false
end