123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::Promises::AbstractEventFuture

Relationships & Source Files
Extension / Inclusion / Inheritance Descendants
Subclasses:
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
Instance Chain:
Inherits: Concurrent::Synchronization::Object
Defined in: lib/concurrent-ruby/concurrent/promises.rb

Overview

Common ancestor of Event and Future classes, many shared methods are defined here.

Constant Summary

InternalStates - Included

PENDING, RESERVED, RESOLVED

Class Attribute Summary

Class Method Summary

Synchronization::Object - Inherited

.atomic_attribute?, .atomic_attributes,
.attr_atomic

Creates methods for reading and writing to a instance variable with volatile (Java) semantic as .attr_volatile does.

.attr_volatile

Creates methods for reading and writing (as attr_accessor does) to a instance variable with volatile (Java) semantic.

.ensure_safe_initialization_when_final_fields_are_present

For testing purposes, quite slow.

.new

Has to be called by children.

.safe_initialization!, .define_initialize_atomic_fields

Synchronization::AbstractObject - Inherited

Instance Attribute Summary

Instance Method Summary

Synchronization::Object - Inherited

Synchronization::Volatile - Included

Synchronization::AbstractObject - Inherited

Constructor Details

.new(promise, default_executor) ⇒ AbstractEventFuture

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 522

def initialize(promise, default_executor)
  super()
  @Lock               = Mutex.new
  @Condition          = ConditionVariable.new
  @Promise            = promise
  @DefaultExecutor    = default_executor
  @Callbacks          = LockFreeStack.new
  @Waiters            = AtomicFixnum.new 0
  self.internal_state = PENDING
end

Instance Attribute Details

#pending?Boolean (readonly)

Is it in pending state?

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 549

def pending?
  !internal_state.resolved?
end

#resolved?Boolean (readonly)

Is it in resolved state?

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 555

def resolved?
  internal_state.resolved?
end

#touched?Boolean (readonly, private)

For inspection.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 722

def touched?
  promise.touched?
end

Instance Method Details

#add_callback(method, *args) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 750

def add_callback(method, *args)
  state = internal_state
  if state.resolved?
    call_callback method, state, args
  else
    @Callbacks.push [method, args]
    state = internal_state
    # take back if it was resolved in the meanwhile
    call_callbacks state if state.resolved?
  end
  self
end

#add_callback_clear_delayed_node(node) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 738

def add_callback_clear_delayed_node(node)
  add_callback(:callback_clear_delayed_node, node)
end

#add_callback_notify_blocked(promise, index) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 733

def add_callback_notify_blocked(promise, index)
  add_callback :callback_notify_blocked, promise, index
end

#async_callback_on_resolution(state, executor, args, callback) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 812

def async_callback_on_resolution(state, executor, args, callback)
  with_async(executor, state, args, callback) do |st, ar, cb|
    callback_on_resolution st, ar, cb
  end
end

#blocksArray<AbstractPromise> (private)

For inspection.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 702

def blocks
  @Callbacks.each_with_object([]) do |(method, args), promises|
    promises.push(args[0]) if method == :callback_notify_blocked
  end
end

#call_callback(method, state, args) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 796

def call_callback(method, state, args)
  self.send method, state, *args
end

#call_callbacks(state) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 800

def call_callbacks(state)
  method, args = @Callbacks.pop
  while method
    call_callback method, state, args
    method, args = @Callbacks.pop
  end
end

#callback_clear_delayed_node(state, node) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 763

def callback_clear_delayed_node(state, node)
  node.value = nil
end

#callback_notify_blocked(state, promise, index) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 818

def callback_notify_blocked(state, promise, index)
  promise.on_blocker_resolution self, index
end

#callbacks (private)

For inspection.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 710

def callbacks
  @Callbacks.each.to_a
end

#chain(*args, &task) ⇒ Future Also known as: #then

Shortcut of #chain_on with default :io executor supplied.

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 596

def chain(*args, &task)
  chain_on @DefaultExecutor, *args, &task
end

#an_event.chain_on(executor, *args) {|*args| ... } ⇒ Future #a_future.chain_on(executor, *args) {|fulfilled, value, reason, *args| ... } ⇒ Future

Chains the task to be executed asynchronously on executor after it is resolved.

Parameters:

  • executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. The task is executed on it, default executor remains unchanged.

  • args (Object)

    arguments which are passed to the task when it’s executed. (It might be prepended with other arguments, see the @yield section).

Yield Returns:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 614

def chain_on(executor, *args, &task)
  ChainPromise.new_blocked_by1(self, executor, executor, args, &task).future
end

#chain_resolvable(resolvable) ⇒ self Also known as: #tangle

Resolves the resolvable when receiver is resolved.

Parameters:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 629

def chain_resolvable(resolvable)
  on_resolution! { resolvable.resolve_with internal_state }
end

#compare_and_set_internal_state(expected_internal_state, new_internal_state) ⇒ true, false (private)

Sets the internal_state to new_internal_state if the current internal_state is expected_internal_state

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 515

attr_atomic(:internal_state)

#default_executorExecutor

Returns default executor.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 590

def default_executor
  @DefaultExecutor
end

#inspect

Alias for #to_s.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 623

alias_method :inspect, :to_s

#internal_stateObject

Returns:

  • (Object)

    The internal_state.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 515

attr_atomic(:internal_state)

#internal_state=(new_internal_state) ⇒ Object (private)

Set the internal_state.

Returns:

  • (Object)

    new_internal_state.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 515

attr_atomic(:internal_state)

#on_resolution(*args, &callback) ⇒ self

Shortcut of #on_resolution_using with default :io executor supplied.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 637

def on_resolution(*args, &callback)
  on_resolution_using @DefaultExecutor, *args, &callback
end

#an_event.on_resolution!(*args) {|*args| ... } ⇒ self #a_future.on_resolution!(*args) {|fulfilled, value, reason, *args| ... } ⇒ self

Stores the callback to be executed synchronously on resolving thread after it is resolved.

Parameters:

  • args (Object)

    arguments which are passed to the task when it’s executed. (It might be prepended with other arguments, see the @yield section).

Yield Returns:

  • is forgotten.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 655

def on_resolution!(*args, &callback)
  add_callback :callback_on_resolution, args, callback
end

#an_event.on_resolution_using(executor, *args) {|*args| ... } ⇒ self #a_future.on_resolution_using(executor, *args) {|fulfilled, value, reason, *args| ... } ⇒ self

Stores the callback to be executed asynchronously on executor after it is resolved.

Parameters:

  • executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. The task is executed on it, default executor remains unchanged.

  • args (Object)

    arguments which are passed to the task when it’s executed. (It might be prepended with other arguments, see the @yield section).

Yield Returns:

  • is forgotten.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 673

def on_resolution_using(executor, *args, &callback)
  add_callback :async_callback_on_resolution, executor, args, callback
end

#promise (private)

For inspection.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 716

def promise
  @Promise
end

#resolve_with(state, raise_on_reassign = true, reserved = false) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 688

def resolve_with(state, raise_on_reassign = true, reserved = false)
  if compare_and_set_internal_state(reserved ? RESERVED : PENDING, state)
    # go to synchronized block only if there were waiting threads
    @Lock.synchronize { @Condition.broadcast } unless @Waiters.value == 0
    call_callbacks state
  else
    return rejected_resolution(raise_on_reassign, state)
  end
  self
end

#an_event.state:pending, :resolved #a_future.state:pending, ...

Returns its state.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 543

def state
  internal_state.to_sym
end

#swap_internal_state(new_internal_state) ⇒ Object (private)

Set the internal_state to new_internal_state and return the old internal_state.

Returns:

  • (Object)

    old internal_state

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 515

attr_atomic(:internal_state)

#tangle(resolvable)

Alias for #chain_resolvable.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 633

alias_method :tangle, :chain_resolvable

#to_sString Also known as: #inspect

Returns:

  • (String)

    Short string representation.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 619

def to_s
  format '%s %s>', super[0..-2], state
end

#touchself

Propagates touch. Requests all the delayed futures, which it depends on, to be executed. This method is called by any other method requiring resolved state, like #wait.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 562

def touch
  @Promise.touch
  self
end

#update_internal_state {|Object| ... } ⇒ Object (private)

Updates the internal_state using the block.

Yields:

  • (Object)

    Calculate a new internal_state using given (old) internal_state

Yield Parameters:

  • old (Object)

    internal_state

Returns:

  • (Object)

    new internal_state

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 515

attr_atomic(:internal_state)

#wait(timeout = nil) ⇒ self, ...

Note:

This function potentially blocks current thread until the Future is resolved. Be careful it can deadlock. Try to chain instead.

Wait (block the Thread) until receiver is #resolved?. Calls Concurrent::AbstractEventFuture#touch.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (self, true, false)

    self implies timeout was not used, true implies timeout was used and it was resolved, false implies it was not resolved within timeout.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 578

def wait(timeout = nil)
  result = wait_until_resolved(timeout)
  timeout ? result : self
end

#wait_until_resolved(timeout) ⇒ Boolean (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 768

def wait_until_resolved(timeout)
  return true if resolved?

  touch

  @Lock.synchronize do
    @Waiters.increment
    begin
      if timeout
        start = Concurrent.monotonic_time
        until resolved?
          break if @Condition.wait(@Lock, timeout) == nil # nil means timeout
          timeout -= (Concurrent.monotonic_time - start)
          break if timeout <= 0
        end
      else
        until resolved?
          @Condition.wait(@Lock, timeout)
        end
      end
    ensure
      # JRuby may raise ConcurrencyError
      @Waiters.decrement
    end
  end
  resolved?
end

#waiting_threads (private)

For inspection.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 728

def waiting_threads
  @Waiters.each.to_a
end

#with_async(executor, *args, &block) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 808

def with_async(executor, *args, &block)
  Concurrent.executor(executor).post(*args, &block)
end

#with_default_executor(executor) ⇒ AbstractEventFuture

This method is abstract.

Crates new object with same class with the executor set as its new default executor. Any futures depending on it will use the new default executor.

Raises:

  • (NotImplementedError)

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 683

def with_default_executor(executor)
  raise NotImplementedError
end

#with_hidden_resolvable (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 743

def with_hidden_resolvable
  # TODO (pitr-ch 10-Dec-2018): documentation, better name if in edge
  self
end