Class: Concurrent::IVar
Relationships & Source Files | |
Extension / Inclusion / Inheritance Descendants | |
Subclasses:
|
|
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
|
|
Instance Chain:
|
|
Inherits: |
Concurrent::Synchronization::LockableObject
|
Defined in: | lib/concurrent-ruby/concurrent/ivar.rb |
Overview
An IVar
is like a future that you can assign. As a future is a value that is being computed that you can wait on, an IVar
is a value that is waiting to be assigned, that you can wait on. IVars
are single assignment and deterministic.
Then, express futures as an asynchronous computation that assigns an IVar
. The IVar
becomes the primitive on which [futures](Future) and [dataflow](Dataflow) are built.
An IVar
is a single-element container that is normally created empty, and can only be set once. The I in IVar
stands for immutable. Reading an IVar
normally blocks until it is set. It is safe to set and read an IVar
from different threads.
If you want to have some parallel task set the value in an IVar
, you want a Future
. If you want to create a graph of parallel tasks all executed when the values they depend on are ready you want dataflow. IVar
is generally a low-level primitive.
Examples
Create, set and get an IVar
“‘ruby ivar = .new ivar.set 14 ivar.value #=> 14 ivar.set 2 # would now be an error “`
See Also
-
For the theory: Arvind, R. Nikhil, and K. Pingali. [I-Structures: Data structures for parallel computing](dl.acm.org/citation.cfm?id=69562). In Proceedings of Workshop on Graph Reduction, 1986.
-
For recent application: [DataDrivenFuture in Habanero Java from Rice](www.cs.rice.edu/~vs3/hjlib/doc/edu/rice/hj/api/HjDataDrivenFuture.html).
Class Method Summary
-
.new(value = NULL, opts = {}, &block) ⇒ IVar
constructor
Create a new
IVar
in the:pending
state with the (optional) initial value.
Instance Attribute Summary
Concern::Obligation
- Included
#complete? | Has the obligation completed processing? |
#fulfilled? | Has the obligation been fulfilled? |
#incomplete? | Is the obligation still awaiting completion of processing? |
#pending? | Is obligation completion still pending? |
#realized? | Alias for Concern::Obligation#fulfilled?. |
#rejected? | Has the obligation been rejected? |
#state | The current state of the obligation. |
#unscheduled? | Is the obligation still unscheduled? |
#state= |
Concern::Dereferenceable
- Included
#value | Return the value this object represents after applying the options specified by the |
Instance Method Summary
-
#add_observer(observer = nil, func = :update, &block)
Add an observer on this object that will receive notification on update.
-
#fail(reason = StandardError.new) ⇒ IVar
Set
theIVar
to failed due to some error and wake or notify all threads waiting on it. -
#set(value = NULL) { ... } ⇒ IVar
Set
theIVar
to a value and wake or notify all threads waiting on it. -
#try_set(value = NULL) { ... } ⇒ Boolean
Attempt to set the
IVar
with the given value or block. - #complete(success, value, reason) private
- #complete_without_notification(success, value, reason) private
- #notify_observers(value, reason) private
- #ns_complete_without_notification(success, value, reason) private
- #ns_initialize(value, opts) private
- #safe_execute(task, args = []) private
- #check_for_block_or_value!(block_given, value) private Internal use only
Concern::Observable
- Included
#add_observer | Adds an observer to this set. |
#count_observers | Return the number of observers associated with this object. |
#delete_observer | Remove |
#delete_observers | Remove all observers associated with this object. |
#with_observer | As #add_observer but can be used for chaining. |
Concern::Obligation
- Included
#exception, | |
#no_error! | Alias for Concern::Obligation#wait!. |
#reason | If an exception was raised during processing this will return the exception object. |
#value | The current value of the obligation. |
#value! | The current value of the obligation. |
#wait | Wait until obligation is complete or the timeout has been reached. |
#wait! | Wait until obligation is complete or the timeout is reached. |
#compare_and_set_state | Atomic compare and set operation State is set to |
#event, #get_arguments_from, | |
#if_state | Executes the block within mutex if current state is included in expected_states. |
#init_obligation, | |
#ns_check_state? | Am I in the current state? |
#ns_set_state, #set_state |
Concern::Dereferenceable
- Included
#deref | Alias for Concern::Dereferenceable#value. |
#apply_deref_options, | |
#ns_set_deref_options |
|
Synchronization::LockableObject
- Inherited
Constructor Details
.new(value = NULL, opts = {}, &block) ⇒ IVar
Create a new IVar
in the :pending
state with the (optional) initial value.
# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 62
def initialize(value = NULL, opts = {}, &block) if value != NULL && block_given? raise ArgumentError.new('provide only a value or a block') end super(&nil) synchronize { ns_initialize(value, opts, &block) } end
Instance Method Details
#add_observer(observer = nil, func = :update, &block)
Add an observer on this object that will receive notification on update.
Upon completion the IVar
will notify all observers in a thread-safe way. The func
method of the observer will be called with three arguments: the Time
at which the Future
completed the asynchronous operation, the final value
(or nil
on rejection), and the final reason
(or nil
on fulfillment).
# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 81
def add_observer(observer = nil, func = :update, &block) raise ArgumentError.new('cannot provide both an observer and a block') if observer && block direct_notification = false if block observer = block func = :call end synchronize do if event.set? direct_notification = true else observers.add_observer(observer, func) end end observer.send(func, Time.now, self.value, reason) if direct_notification observer end
#check_for_block_or_value!(block_given, value) (private)
#complete(success, value, reason) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 177
def complete(success, value, reason) complete_without_notification(success, value, reason) notify_observers(self.value, reason) self end
#complete_without_notification(success, value, reason) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 184
def complete_without_notification(success, value, reason) synchronize { ns_complete_without_notification(success, value, reason) } self end
#fail(reason = StandardError.new) ⇒ IVar
Set
the IVar
to failed due to some error and wake or notify all threads waiting on it.
#notify_observers(value, reason) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 190
def notify_observers(value, reason) observers.notify_and_delete_observers{ [Time.now, value, reason] } end
#ns_complete_without_notification(success, value, reason) (private)
# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 195
def ns_complete_without_notification(success, value, reason) raise MultipleAssignmentError if [:fulfilled, :rejected].include? @state set_state(success, value, reason) event.set end
#ns_initialize(value, opts) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 155
def ns_initialize(value, opts) value = yield if block_given? init_obligation self.observers = Collection::CopyOnWriteObserverSet.new (opts) @state = :pending if value != NULL ns_complete_without_notification(true, value, nil) end end
#safe_execute(task, args = []) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 168
def safe_execute(task, args = []) if compare_and_set_state(:processing, :pending) success, val, reason = SafeTaskExecutor.new(task, rescue_exception: true).execute(*@args) complete(success, val, reason) yield(success, val, reason) if block_given? end end
#set(value = NULL) { ... } ⇒ IVar
Set
the IVar
to a value and wake or notify all threads waiting on it.
# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 113
def set(value = NULL) check_for_block_or_value!(block_given?, value) raise MultipleAssignmentError unless compare_and_set_state(:processing, :pending) begin value = yield if block_given? complete_without_notification(true, value, nil) rescue => ex complete_without_notification(false, nil, ex) end notify_observers(self.value, reason) self end
#try_set(value = NULL) { ... } ⇒ Boolean
Attempt to set the IVar
with the given value or block. Return a boolean indicating the success or failure of the set operation.
# File 'lib/concurrent-ruby/concurrent/ivar.rb', line 145
def try_set(value = NULL, &block) set(value, &block) true rescue MultipleAssignmentError false end