Class: Concurrent::Promises::AbstractEventFuture
Relationships & Source Files | |
Extension / Inclusion / Inheritance Descendants | |
Subclasses:
|
|
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
|
|
Instance Chain:
|
|
Inherits: |
Concurrent::Synchronization::Object
|
Defined in: | lib/concurrent-ruby/concurrent/promises.rb |
Overview
Constant Summary
InternalStates
- Included
Class Attribute Summary
Synchronization::Object
- Inherited
Class Method Summary
Synchronization::Object
- Inherited
.atomic_attribute?, .atomic_attributes, | |
.attr_atomic | Creates methods for reading and writing to a instance variable with volatile (Java) semantic as |
.attr_volatile | Creates methods for reading and writing (as |
.ensure_safe_initialization_when_final_fields_are_present | For testing purposes, quite slow. |
.new | Has to be called by children. |
.safe_initialization!, .define_initialize_atomic_fields |
Synchronization::AbstractObject
- Inherited
Instance Attribute Summary
-
#pending? ⇒ Boolean
readonly
Is it in pending state?
-
#resolved? ⇒ Boolean
readonly
Is it in resolved state?
-
#touched? ⇒ Boolean
readonly
private
For inspection.
Instance Method Summary
-
#chain(*args, &task) ⇒ Future
(also: #then)
Shortcut of #chain_on with default
:io
executor supplied. -
#an_event.chain_on(executor, *args) {|*args| ... } ⇒ Future
Chains the task to be executed asynchronously on executor after it is resolved.
-
#chain_resolvable(resolvable) ⇒ self
(also: #tangle)
Resolves the resolvable when receiver is resolved.
-
#default_executor ⇒ Executor
Returns default executor.
-
#inspect
Alias for #to_s.
- #internal_state ⇒ Object
-
#on_resolution(*args, &callback) ⇒ self
Shortcut of #on_resolution_using with default
:io
executor supplied. -
#an_event.on_resolution!(*args) {|*args| ... } ⇒ self
Stores the callback to be executed synchronously on resolving thread after it is resolved.
-
#an_event.on_resolution_using(executor, *args) {|*args| ... } ⇒ self
Stores the callback to be executed asynchronously on executor after it is resolved.
-
#an_event.state ⇒ :pending, :resolved
Returns its state.
-
#tangle(resolvable)
Alias for #chain_resolvable.
- #to_s ⇒ String (also: #inspect)
-
#touch ⇒ self
Propagates touch.
-
#wait(timeout = nil) ⇒ self, ...
Wait (block the Thread) until receiver is #resolved?.
-
#with_default_executor(executor) ⇒ AbstractEventFuture
abstract
Crates new object with same class with the executor set as its new default executor.
- #add_callback(method, *args) private
- #add_callback_clear_delayed_node(node) private
- #add_callback_notify_blocked(promise, index) private
- #async_callback_on_resolution(state, executor, args, callback) private
-
#blocks ⇒ Array<AbstractPromise>
private
For inspection.
- #call_callback(method, state, args) private
- #call_callbacks(state) private
- #callback_clear_delayed_node(state, node) private
- #callback_notify_blocked(state, promise, index) private
-
#callbacks
private
For inspection.
-
#compare_and_set_internal_state(expected_internal_state, new_internal_state) ⇒ true, false
private
Sets the internal_state to new_internal_state if the current internal_state is expected_internal_state.
-
#internal_state=(new_internal_state) ⇒ Object
private
Set
the internal_state. -
#promise
private
For inspection.
- #resolve_with(state, raise_on_reassign = true, reserved = false) private
-
#swap_internal_state(new_internal_state) ⇒ Object
private
Set
the internal_state to new_internal_state and return the old internal_state. -
#update_internal_state {|Object| ... } ⇒ Object
private
Updates the internal_state using the block.
- #wait_until_resolved(timeout) ⇒ Boolean private
-
#waiting_threads
private
For inspection.
- #with_async(executor, *args, &block) private
- #with_hidden_resolvable private
Synchronization::Object
- Inherited
Synchronization::Volatile
- Included
Synchronization::AbstractObject
- Inherited
Constructor Details
.new(promise, default_executor) ⇒ AbstractEventFuture
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 522
def initialize(promise, default_executor) super() @Lock = Mutex.new @Condition = ConditionVariable.new @Promise = promise @DefaultExecutor = default_executor @Callbacks = LockFreeStack.new @Waiters = AtomicFixnum.new 0 self.internal_state = PENDING end
Instance Attribute Details
#pending? ⇒ Boolean
(readonly)
Is it in pending state?
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 549
def pending? !internal_state.resolved? end
#resolved? ⇒ Boolean
(readonly)
Is it in resolved state?
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 555
def resolved? internal_state.resolved? end
#touched? ⇒ Boolean
(readonly, private)
For inspection.
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 722
def touched? promise.touched? end
Instance Method Details
#add_callback(method, *args) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/promises.rb', line 750
def add_callback(method, *args) state = internal_state if state.resolved? call_callback method, state, args else @Callbacks.push [method, args] state = internal_state # take back if it was resolved in the meanwhile call_callbacks state if state.resolved? end self end
#add_callback_clear_delayed_node(node) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/promises.rb', line 738
def add_callback_clear_delayed_node(node) add_callback(:callback_clear_delayed_node, node) end
#add_callback_notify_blocked(promise, index) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/promises.rb', line 733
def add_callback_notify_blocked(promise, index) add_callback :callback_notify_blocked, promise, index end
#async_callback_on_resolution(state, executor, args, callback) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/promises.rb', line 812
def async_callback_on_resolution(state, executor, args, callback) with_async(executor, state, args, callback) do |st, ar, cb| callback_on_resolution st, ar, cb end end
#blocks ⇒ Array<AbstractPromise> (private)
For inspection.
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 702
def blocks @Callbacks.each_with_object([]) do |(method, args), promises| promises.push(args[0]) if method == :callback_notify_blocked end end
#call_callback(method, state, args) (private)
[ GitHub ]#call_callbacks(state) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/promises.rb', line 800
def call_callbacks(state) method, args = @Callbacks.pop while method call_callback method, state, args method, args = @Callbacks.pop end end
#callback_clear_delayed_node(state, node) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/promises.rb', line 763
def callback_clear_delayed_node(state, node) node.value = nil end
#callback_notify_blocked(state, promise, index) (private)
[ GitHub ]#callbacks (private)
For inspection.
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 710
def callbacks @Callbacks.each.to_a end
#chain(*args, &task) ⇒ Future Also known as: #then
Shortcut of #chain_on with default :io
executor supplied.
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 596
def chain(*args, &task) chain_on @DefaultExecutor, *args, &task end
Chains the task to be executed asynchronously on executor after it is resolved.
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 614
def chain_on(executor, *args, &task) ChainPromise.new_blocked_by1(self, executor, executor, args, &task).future end
#chain_resolvable(resolvable) ⇒ self
Also known as: #tangle
Resolves the resolvable when receiver is resolved.
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 629
def chain_resolvable(resolvable) on_resolution! { resolvable.resolve_with internal_state } end
#compare_and_set_internal_state(expected_internal_state, new_internal_state) ⇒ true
, false
(private)
Sets the internal_state to new_internal_state if the current internal_state is expected_internal_state
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 515
attr_atomic(:internal_state)
#default_executor ⇒ Executor
Returns default executor.
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 590
def default_executor @DefaultExecutor end
#inspect
Alias for #to_s.
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 623
alias_method :inspect, :to_s
#internal_state ⇒ Object
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 515
attr_atomic(:internal_state)
#internal_state=(new_internal_state) ⇒ Object (private)
Set
the internal_state.
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 515
attr_atomic(:internal_state)
#on_resolution(*args, &callback) ⇒ self
Shortcut of #on_resolution_using with default :io
executor supplied.
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 637
def on_resolution(*args, &callback) on_resolution_using @DefaultExecutor, *args, &callback end
#an_event.on_resolution!(*args) {|*args| ... } ⇒ self
#a_future.on_resolution!(*args) {|fulfilled, value, reason, *args| ... } ⇒ self
self
#a_future.on_resolution!(*args) {|fulfilled, value, reason, *args| ... } ⇒ self
Stores the callback to be executed synchronously on resolving thread after it is resolved.
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 655
def on_resolution!(*args, &callback) add_callback :callback_on_resolution, args, callback end
#an_event.on_resolution_using(executor, *args) {|*args| ... } ⇒ self
#a_future.on_resolution_using(executor, *args) {|fulfilled, value, reason, *args| ... } ⇒ self
self
#a_future.on_resolution_using(executor, *args) {|fulfilled, value, reason, *args| ... } ⇒ self
Stores the callback to be executed asynchronously on executor after it is resolved.
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 673
def on_resolution_using(executor, *args, &callback) add_callback :async_callback_on_resolution, executor, args, callback end
#promise (private)
For inspection.
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 716
def promise @Promise end
#resolve_with(state, raise_on_reassign = true, reserved = false) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/promises.rb', line 688
def resolve_with(state, raise_on_reassign = true, reserved = false) if compare_and_set_internal_state(reserved ? RESERVED : PENDING, state) # go to synchronized block only if there were waiting threads @Lock.synchronize { @Condition.broadcast } unless @Waiters.value == 0 call_callbacks state else return rejected_resolution(raise_on_reassign, state) end self end
#an_event.state ⇒ :pending
, :resolved
#a_future.state ⇒ :pending
, ...
:pending
, :resolved
#a_future.state ⇒ :pending
, ...
Returns its state.
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 543
def state internal_state.to_sym end
#swap_internal_state(new_internal_state) ⇒ Object (private)
Set
the internal_state to new_internal_state and return the old internal_state.
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 515
attr_atomic(:internal_state)
#tangle(resolvable)
Alias for #chain_resolvable.
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 633
alias_method :tangle, :chain_resolvable
#to_s ⇒ String
Also known as: #inspect
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 619
def to_s format '%s %s>', super[0..-2], state end
#touch ⇒ self
Propagates touch. Requests all the delayed futures, which it depends on, to be executed. This method is called by any other method requiring resolved state, like #wait.
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 562
def touch @Promise.touch self end
#update_internal_state {|Object| ... } ⇒ Object (private)
Updates the internal_state using the block.
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 515
attr_atomic(:internal_state)
#wait(timeout = nil) ⇒ self
, ...
This function potentially blocks current thread until the Future
is resolved. Be careful it can deadlock. Try to chain instead.
Wait (block the Thread) until receiver is #resolved?. Calls Concurrent::AbstractEventFuture#touch
.
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 578
def wait(timeout = nil) result = wait_until_resolved(timeout) timeout ? result : self end
#wait_until_resolved(timeout) ⇒ Boolean
(private)
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 768
def wait_until_resolved(timeout) return true if resolved? touch @Lock.synchronize do @Waiters.increment begin if timeout start = Concurrent.monotonic_time until resolved? break if @Condition.wait(@Lock, timeout) == nil # nil means timeout timeout -= (Concurrent.monotonic_time - start) break if timeout <= 0 end else until resolved? @Condition.wait(@Lock, timeout) end end ensure # JRuby may raise ConcurrencyError @Waiters.decrement end end resolved? end
#waiting_threads (private)
For inspection.
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 728
def waiting_threads @Waiters.each.to_a end
#with_async(executor, *args, &block) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/promises.rb', line 808
def with_async(executor, *args, &block) Concurrent.executor(executor).post(*args, &block) end
#with_default_executor(executor) ⇒ AbstractEventFuture
Crates new object with same class with the executor set as its new default executor. Any futures depending on it will use the new default executor.
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 683
def with_default_executor(executor) raise NotImplementedError end