123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::Promises::Future

Overview

Represents a value which will become available in future. May reject with a reason instead, e.g. when the tasks raises an exception.

Constant Summary

InternalStates - Included

PENDING, RESERVED, RESOLVED

Class Attribute Summary

Class Method Summary

AbstractEventFuture - Inherited

Synchronization::Object - Inherited

.atomic_attribute?, .atomic_attributes,
.attr_atomic

Creates methods for reading and writing to a instance variable with volatile (Java) semantic as .attr_volatile does.

.attr_volatile

Creates methods for reading and writing (as attr_accessor does) to a instance variable with volatile (Java) semantic.

.ensure_safe_initialization_when_final_fields_are_present

For testing purposes, quite slow.

.new

Has to be called by children.

.safe_initialization!, .define_initialize_atomic_fields

Synchronization::AbstractObject - Inherited

Instance Attribute Summary

AbstractEventFuture - Inherited

#pending?

Is it in pending state?

#resolved?

Is it in resolved state?

#touched?

For inspection.

Instance Method Summary

OldChannelIntegration - Included

#then_put,
#then_select

Zips with selected value form the suplied channels.

NewChannelIntegration - Included

FlatShortcuts - Included

ActorIntegration - Included

#then_ask

Asks the actor with its value.

AbstractEventFuture - Inherited

#chain

Shortcut of #chain_on with default :io executor supplied.

#chain_on

Chains the task to be executed asynchronously on executor after it is resolved.

#chain_resolvable

Resolves the resolvable when receiver is resolved.

#default_executor

Returns default executor.

#inspect
#internal_state,
#on_resolution

Shortcut of #on_resolution_using with default :io executor supplied.

#on_resolution!

Stores the callback to be executed synchronously on resolving thread after it is resolved.

#on_resolution_using

Stores the callback to be executed asynchronously on executor after it is resolved.

#state

Returns its state.

#tangle
#to_s,
#touch

Propagates touch.

#wait

Wait (block the Thread) until receiver is #resolved?.

#with_default_executor

Crates new object with same class with the executor set as its new default executor.

#add_callback, #add_callback_clear_delayed_node, #add_callback_notify_blocked, #async_callback_on_resolution,
#blocks

For inspection.

#call_callback, #call_callbacks, #callback_clear_delayed_node, #callback_notify_blocked,
#callbacks

For inspection.

#compare_and_set_internal_state

Sets the internal_state to new_internal_state if the current internal_state is expected_internal_state.

#internal_state=

Set the internal_state.

#promise

For inspection.

#resolve_with,
#swap_internal_state

Set the internal_state to new_internal_state and return the old internal_state.

#update_internal_state

Updates the internal_state using the block.

#wait_until_resolved,
#waiting_threads

For inspection.

#with_async, #with_hidden_resolvable

Synchronization::Object - Inherited

Synchronization::Volatile - Included

Synchronization::AbstractObject - Inherited

Constructor Details

This class inherits a constructor from Concurrent::Promises::AbstractEventFuture

Instance Attribute Details

#fulfilled?Boolean (readonly)

Is it in fulfilled state?

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 921

def fulfilled?
  state = internal_state
  state.resolved? && state.fulfilled?
end

#rejected?Boolean (readonly)

Is it in rejected state?

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 928

def rejected?
  state = internal_state
  state.resolved? && !state.fulfilled?
end

Instance Method Details

#&(other)

Alias for #zip.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1078

alias_method :&, :zip

#any(event_or_future) ⇒ Future Also known as: #|

Creates a new event which will be resolved when the first of receiver, event_or_future resolves. Returning future will have value nil if event_or_future is event and resolves first.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1085

def any(event_or_future)
  AnyResolvedFuturePromise.new_blocked_by2(self, event_or_future, @DefaultExecutor).future
end

#apply(args, block) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1215

def apply(args, block)
  internal_state.apply args, block
end

#async_callback_on_fulfillment(state, executor, args, callback) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1272

def async_callback_on_fulfillment(state, executor, args, callback)
  with_async(executor, state, args, callback) do |st, ar, cb|
    callback_on_fulfillment st, ar, cb
  end
end

#async_callback_on_rejection(state, executor, args, callback) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1278

def async_callback_on_rejection(state, executor, args, callback)
  with_async(executor, state, args, callback) do |st, ar, cb|
    callback_on_rejection st, ar, cb
  end
end

#callback_on_fulfillment(state, args, callback) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1284

def callback_on_fulfillment(state, args, callback)
  state.apply args, callback if state.fulfilled?
end

#callback_on_rejection(state, args, callback) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1288

def callback_on_rejection(state, args, callback)
  state.apply args, callback unless state.fulfilled?
end

#callback_on_resolution(state, args, callback) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1292

def callback_on_resolution(state, args, callback)
  callback.call(*state.result, *args)
end

#delayFuture

Creates new future dependent on receiver which will not evaluate until touched, see #touch. In other words, it inserts delay into the chain of Futures making rest of it lazy evaluated.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1095

def delay
  event = DelayPromise.new(@DefaultExecutor).event
  ZipFutureEventPromise.new_blocked_by2(self, event, @DefaultExecutor).future
end

#exception(*args) ⇒ Exception

Allows rejected Future to be risen with raise method. If the reason is not an exception Runtime.new(reason) is returned.

Examples:

raise Promises.rejected_future(StandardError.new("boom"))
raise Promises.rejected_future("or just boom")

Raises:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1013

def exception(*args)
  raise Concurrent::Error, 'it is not rejected' unless rejected?
  raise ArgumentError unless args.size <= 1
  reason = Array(internal_state.reason).flatten.compact
  if reason.size > 1
    ex = Concurrent::MultipleErrors.new reason
    ex.set_backtrace(caller)
    ex
  else
    ex = if reason[0].respond_to? :exception
           reason[0].exception(*args)
         else
           RuntimeError.new(reason[0]).exception(*args)
         end
    ex.set_backtrace Array(ex.backtrace) + caller
    ex
  end
end

#flat(level = 1)

Alias for #flat_future.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1124

alias_method :flat, :flat_future

#flat_eventEvent

Creates new event which will be resolved when the returned event by receiver is. Be careful if the receiver rejects it will just resolve since Event does not hold reason.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1130

def flat_event
  FlatEventPromise.new_blocked_by1(self, @DefaultExecutor).event
end

#flat_future(level = 1) ⇒ Future Also known as: #flat

Creates new future which will have result of the future returned by receiver. If receiver rejects it will have its rejection.

Parameters:

  • level (Integer) (defaults to: 1)

    how many levels of futures should flatten

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1120

def flat_future(level = 1)
  FlatFuturePromise.new_blocked_by1(self, level, @DefaultExecutor).future
end

#inspect

Alias for #to_s.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1243

alias_method :inspect, :to_s

#on_fulfillment(*args, &callback) ⇒ self

Shortcut of #on_fulfillment_using with default :io executor supplied.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1136

def on_fulfillment(*args, &callback)
  on_fulfillment_using @DefaultExecutor, *args, &callback
end

#on_fulfillment!(*args) {|value, *args| ... } ⇒ self

Stores the callback to be executed synchronously on resolving thread after it is fulfilled. Does nothing on rejection.

Parameters:

  • args (Object)

    arguments which are passed to the task when it’s executed. (It might be prepended with other arguments, see the @yield section).

Yields:

  • (value, *args)

    to the callback.

Yield Returns:

  • is forgotten.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1147

def on_fulfillment!(*args, &callback)
  add_callback :callback_on_fulfillment, args, callback
end

#on_fulfillment_using(executor, *args) {|value, *args| ... } ⇒ self

Stores the callback to be executed asynchronously on executor after it is fulfilled. Does nothing on rejection.

Parameters:

  • executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. The task is executed on it, default executor remains unchanged.

  • args (Object)

    arguments which are passed to the task when it’s executed. (It might be prepended with other arguments, see the @yield section).

Yields:

  • (value, *args)

    to the callback.

Yield Returns:

  • is forgotten.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1159

def on_fulfillment_using(executor, *args, &callback)
  add_callback :async_callback_on_fulfillment, executor, args, callback
end

#on_rejection(*args, &callback) ⇒ self

Shortcut of #on_rejection_using with default :io executor supplied.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1165

def on_rejection(*args, &callback)
  on_rejection_using @DefaultExecutor, *args, &callback
end

#on_rejection!(*args) {|reason, *args| ... } ⇒ self

Stores the callback to be executed synchronously on resolving thread after it is rejected. Does nothing on fulfillment.

Parameters:

  • args (Object)

    arguments which are passed to the task when it’s executed. (It might be prepended with other arguments, see the @yield section).

Yields:

  • (reason, *args)

    to the callback.

Yield Returns:

  • is forgotten.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1176

def on_rejection!(*args, &callback)
  add_callback :callback_on_rejection, args, callback
end

#on_rejection_using(executor, *args) {|reason, *args| ... } ⇒ self

Stores the callback to be executed asynchronously on executor after it is rejected. Does nothing on fulfillment.

Parameters:

  • executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. The task is executed on it, default executor remains unchanged.

  • args (Object)

    arguments which are passed to the task when it’s executed. (It might be prepended with other arguments, see the @yield section).

Yields:

  • (reason, *args)

    to the callback.

Yield Returns:

  • is forgotten.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1188

def on_rejection_using(executor, *args, &callback)
  add_callback :async_callback_on_rejection, executor, args, callback
end

#reason(timeout = nil, timeout_value = nil) ⇒ Object, timeout_value

Note:

This function potentially blocks current thread until the Future is resolved. Be careful it can deadlock. Try to chain instead.

Note:

Make sure returned nil is not confused with timeout, no value when rejected, no reason when fulfilled, etc. Use more exact methods if needed, like #wait, #value!, #result, etc.

Returns reason of future’s rejection. Calls Concurrent::AbstractEventFuture#touch.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

  • timeout_value (Object) (defaults to: nil)

    a value returned by the method when it times out

Returns:

  • (Object, timeout_value)

    the reason, or timeout_value on timeout, or nil on fulfillment.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 966

def reason(timeout = nil, timeout_value = nil)
  if wait_until_resolved timeout
    internal_state.reason
  else
    timeout_value
  end
end

#rejected_resolution(raise_on_reassign, state) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1251

def rejected_resolution(raise_on_reassign, state)
  if raise_on_reassign
    if internal_state == RESERVED
      raise Concurrent::MultipleAssignmentError.new(
          "Future can be resolved only once. It is already reserved.")
    else
      raise Concurrent::MultipleAssignmentError.new(
          "Future can be resolved only once. It's #{result}, trying to set #{state.result}.",
          current_result: result,
          new_result:     state.result)
    end
  end
  return false
end

#rescue(*args, &task) ⇒ Future

Shortcut of #rescue_on with default :io executor supplied.

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1052

def rescue(*args, &task)
  rescue_on @DefaultExecutor, *args, &task
end

#rescue_on(executor, *args) {|reason, *args| ... } ⇒ Future

Chains the task to be executed asynchronously on executor after it rejects. Does not run the task if it fulfills. It will resolve though, triggering any dependent futures.

Parameters:

  • executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. The task is executed on it, default executor remains unchanged.

  • args (Object)

    arguments which are passed to the task when it’s executed. (It might be prepended with other arguments, see the @yield section).

Yields:

  • (reason, *args)

    to the task.

Yield Returns:

  • will become result of the returned Future. Its returned value becomes #value fulfilling it, raised exception becomes #reason rejecting it.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1064

def rescue_on(executor, *args, &task)
  RescuePromise.new_blocked_by1(self, executor, executor, args, &task).future
end

#result(timeout = nil) ⇒ Array(Boolean, Object, Object)?

Note:

This function potentially blocks current thread until the Future is resolved. Be careful it can deadlock. Try to chain instead.

Returns triplet fulfilled?, value, reason. Calls Concurrent::AbstractEventFuture#touch.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (Array(Boolean, Object, Object), nil)

    triplet of fulfilled?, value, reason, or nil on timeout.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 981

def result(timeout = nil)
  internal_state.result if wait_until_resolved timeout
end

#run(run_test = method(:run_test)) ⇒ Future

Allows to use futures as green threads. The receiver has to evaluate to a future which represents what should be done next. It basically flattens indefinitely until non Future values is returned which becomes result of the returned future. Any encountered exception will become reason of the returned future.

Examples:

body = lambda do |v|
  v += 1
  v < 5 ? Promises.future(v, &body) : v
end
Promises.future(0, &body).run.value! # => 5

Parameters:

  • run_test (#call(value)) (defaults to: method(:run_test))

    an object which when called returns either Future to keep running with or nil, then the run completes with the value. The run_test can be used to extract the Future from deeper structure, or to distinguish Future which is a resulting value from a future which is suppose to continue running.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1210

def run(run_test = method(:run_test))
  RunFuturePromise.new_blocked_by1(self, @DefaultExecutor, run_test).future
end

#run_test(v) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1247

def run_test(v)
  v if v.is_a?(Future)
end

#schedule(intended_time) ⇒ Future

Creates new event dependent on receiver scheduled to execute on/in intended_time. In time is interpreted from the moment the receiver is resolved, therefore it inserts delay into the chain.

Parameters:

  • intended_time (Numeric, Time)

    Numeric means to run in intended_time seconds. Time means to run on intended_time.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1102

def schedule(intended_time)
  chain do
    event = ScheduledPromise.new(@DefaultExecutor, intended_time).event
    ZipFutureEventPromise.new_blocked_by2(self, event, @DefaultExecutor).future
  end.flat
end

#then(*args, &task) ⇒ Future

Shortcut of #then_on with default :io executor supplied.

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1034

def then(*args, &task)
  then_on @DefaultExecutor, *args, &task
end

#then_on(executor, *args) {|value, *args| ... } ⇒ Future

Chains the task to be executed asynchronously on executor after it fulfills. Does not run the task if it rejects. It will resolve though, triggering any dependent futures.

Parameters:

  • executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. The task is executed on it, default executor remains unchanged.

  • args (Object)

    arguments which are passed to the task when it’s executed. (It might be prepended with other arguments, see the @yield section).

Yields:

  • (value, *args)

    to the task.

Yield Returns:

  • will become result of the returned Future. Its returned value becomes #value fulfilling it, raised exception becomes #reason rejecting it.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1046

def then_on(executor, *args, &task)
  ThenPromise.new_blocked_by1(self, executor, executor, args, &task).future
end

#to_eventEvent

Converts future to event which is resolved when future is resolved by fulfillment or rejection.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1222

def to_event
  event = Promises.resolvable_event
ensure
  chain_resolvable(event)
end

#to_futureFuture

Returns self, since this is a future

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1230

def to_future
  self
end

#to_sString Also known as: #inspect

Returns:

  • (String)

    Short string representation.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1235

def to_s
  if resolved?
    format '%s with %s>', super[0..-2], (fulfilled? ? value : reason).inspect
  else
    super
  end
end

#value(timeout = nil, timeout_value = nil) ⇒ Object, ...

Note:

This function potentially blocks current thread until the Future is resolved. Be careful it can deadlock. Try to chain instead.

Note:

Make sure returned nil is not confused with timeout, no value when rejected, no reason when fulfilled, etc. Use more exact methods if needed, like #wait, #value!, #result, etc.

Return value of the future. Calls Concurrent::AbstractEventFuture#touch.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

  • timeout_value (Object) (defaults to: nil)

    a value returned by the method when it times out

Returns:

  • (Object, nil, timeout_value)

    the value of the Future when fulfilled, timeout_value on timeout, nil on rejection.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 950

def value(timeout = nil, timeout_value = nil)
  if wait_until_resolved timeout
    internal_state.value
  else
    timeout_value
  end
end

#value!(timeout = nil, timeout_value = nil) ⇒ Object, ...

Note:

This function potentially blocks current thread until the Future is resolved. Be careful it can deadlock. Try to chain instead.

Note:

Make sure returned nil is not confused with timeout, no value when rejected, no reason when fulfilled, etc. Use more exact methods if needed, like #wait, #value!, #result, etc.

Return value of the future. Calls Concurrent::AbstractEventFuture#touch.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

  • timeout_value (Object) (defaults to: nil)

    a value returned by the method when it times out

Returns:

  • (Object, nil, timeout_value)

    the value of the Future when fulfilled, or nil on rejection, or timeout_value on timeout.

Raises:

  • (Exception)

    #reason on rejection

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 997

def value!(timeout = nil, timeout_value = nil)
  if wait_until_resolved! timeout
    internal_state.value
  else
    timeout_value
  end
end

#wait!(timeout = nil) ⇒ self, ...

Note:

This function potentially blocks current thread until the Future is resolved. Be careful it can deadlock. Try to chain instead.

Wait (block the Thread) until receiver is #resolved?. Calls Concurrent::AbstractEventFuture#touch.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (self, true, false)

    self implies timeout was not used, true implies timeout was used and it was resolved, false implies it was not resolved within timeout.

Raises:

  • (Exception)

    #reason on rejection

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 987

def wait!(timeout = nil)
  result = wait_until_resolved!(timeout)
  timeout ? result : self
end

#wait_until_resolved!(timeout = nil) (private)

Raises:

  • (self)
[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1266

def wait_until_resolved!(timeout = nil)
  result = wait_until_resolved(timeout)
  raise self if rejected?
  result
end

#with_default_executor(executor) ⇒ Future

Crates new object with same class with the executor set as its new default executor. Any futures depending on it will use the new default executor.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1111

def with_default_executor(executor)
  FutureWrapperPromise.new_blocked_by1(self, executor).future
end

#zip(other) ⇒ Future Also known as: #&

Creates a new event or a future which will be resolved when receiver and other are. Returns an event if receiver and other are events, otherwise returns a future. If just one of the parties is Future then the result of the returned future is equal to the result of the supplied future. If both are futures then the result is as described in FactoryMethods#zip_futures_on.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1070

def zip(other)
  if other.is_a?(Future)
    ZipFuturesPromise.new_blocked_by2(self, other, @DefaultExecutor).future
  else
    ZipFutureEventPromise.new_blocked_by2(self, other, @DefaultExecutor).future
  end
end

#|(event_or_future)

Alias for #any.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1089

alias_method :|, :any