123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::Future

Relationships & Source Files
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
Instance Chain:
Inherits: Concurrent::IVar
Defined in: lib/concurrent-ruby/concurrent/future.rb

Overview

Future is inspired by Clojure's future function. A future represents a promise to complete an action at some time in the future. The action is atomic and permanent. The idea behind a future is to send an operation for asynchronous completion, do other stuff, then return and retrieve the result of the async operation at a later time. Futures run on the global thread pool.

Feature:
  As a highly responsive Ruby application
  I want long-running tasks on a separate thread
  So I can perform other tasks without waiting

Futures have several possible states: :unscheduled, :pending, :processing, :rejected, or :fulfilled. These are also aggregated as #incomplete? and #complete?. When a Future is created it is set to :unscheduled. Once the #execute method is called the state becomes :pending. Once a job is pulled from the thread pool's queue and is given to a thread for processing (often immediately upon #post) the state becomes :processing. The future will remain in this state until processing is complete. A future that is in the :unscheduled, :pending, or :processing is considered #incomplete?. A #complete? Future is either :rejected, indicating that an exception was thrown during processing, or :fulfilled, indicating success. If a Future is :fulfilled its #value will be updated to reflect the result of the operation. If :rejected the reason will be updated with a reference to the thrown exception. The predicate methods #unscheduled?, #pending?, #rejected?, and #fulfilled? can be called at any time to obtain the state of the Future, as can the #state method, which returns a symbol.

Retrieving the value of a Future is done through the #value (alias: #deref) method. Obtaining the value of a Future is a potentially blocking operation. When a Future is :rejected a call to #value will return nil immediately. When a Future is :fulfilled a call to #value will immediately return the current value. When a Future is :pending a call to #value will block until the Future is either :rejected or :fulfilled. A timeout value can be passed to #value to limit how long the call will block. If nil the call will block indefinitely. If 0 the call will not block. Any other integer or float value will indicate the maximum number of seconds to block.

The constructor can also be given zero or more processing options. Currently the only supported options are those recognized by the Dereferenceable module.

The Future class also includes the behavior of the Ruby standard library href="http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html">http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html Observable module, but does so in a thread-safe way. On fulfillment or rejection all observers will be notified according to the normal Observable behavior. The observer callback function will be called with three parameters: the Time of fulfillment/rejection, the final value, and the final reason. Observers added after fulfillment/rejection will still be notified as normal. The notification will occur on the same thread that processed the job.

===== Examples

A fulfilled example:

require 'concurrent'
require 'csv'
require 'open-uri'

class Ticker
  def get_year_end_closing(symbol, year, api_key)
    uri = "https://www.alphavantage.co/query?function=TIME_SERIES_MONTHLY&symbol=#{symbol}&apikey=#{api_key}&datatype=csv"
    data = []
    csv = URI.parse(uri).read
    if csv.include?('call frequency')
      return :rate_limit_exceeded
    end
    CSV.parse(csv, headers: true) do |row|
      data << row['close'].to_f if row['timestamp'].include?(year.to_s)
    end
    year_end = data.first
    year_end
  rescue => e
    p e
  end
end

api_key = ENV['ALPHAVANTAGE_KEY']
abort(error_message) unless api_key

=== Future
price = <code>Future</code>.execute{ {Ticker.new}.get_year_end_closing('TWTR', 2013, api_key) }
p price.state #=> :pending
p price.pending? #=> true
p price.value(0) #=> nil (does not block)

sleep(1)    # do other stuff

p price.value #=> 63.65 (after blocking if necessary)
p price.state #=> :fulfilled
p price.fulfilled? #=> true
p price.value #=> 63.65

A rejected example:

count = <code>Future</code>.execute{ sleep(10); raise {StandardError.new}("Boom!") }
count.state #=> :pending
count.pending? #=> true

count.value #=> nil (after blocking)
count.rejected? #=> true
count.reason #=> #<StandardError: Boom!>

An example with observation:

class Ticker
  Stock = Struct.new(:symbol, :name, :exchange)

  def update(time, value, reason)
    ticker = value.collect do |symbol|
      Stock.new(symbol['symbol'], symbol['name'], symbol['exch'])
    end

    output = ticker.join("\n")
    print "#{output}\n"
  end
end

yahoo = {Ticker.new}('YAHOO')
future = {Concurrent::Future.new} { yahoo.update.suggested_symbols }
future.add_observer(Ticker.new)
future.execute

=== do important stuff...

{#>>} #<struct {Ticker::Stock} symbol="YHOO", name="Yahoo! {Inc."}, exchange="NMS">
{#>>} #<struct {Ticker::Stock} symbol="YHO.DE", name="Yahoo! {Inc."}, exchange="GER">
{#>>} #<struct {Ticker::Stock} symbol="YAHOY", name="Yahoo Japan Corporation", exchange="PNK">
{#>>} #<struct {Ticker::Stock} symbol="YAHOF", name="YAHOO JAPAN CORP", exchange="PNK">
{#>>} #<struct {Ticker::Stock} symbol="YOJ.SG", name="YAHOO JAPAN", exchange="STU">
{#>>} #<struct {Ticker::Stock} symbol="YHO.SG", name="YAHOO", exchange="STU">
{#>>} #<struct {Ticker::Stock} symbol="YHOO.BA", name="Yahoo! {Inc."}, exchange="BUE">
{#>>} #<struct {Ticker::Stock} symbol="YHO.DU", name="YAHOO", exchange="DUS">
{#>>} #<struct {Ticker::Stock} symbol="YHO.HM", name="YAHOO", exchange="HAM">
{#>>} #<struct {Ticker::Stock} symbol="YHO.BE", name="YAHOO", exchange="BER">

## Copy Options

::Object references in Ruby are mutable. This can lead to serious problems when the #value of an object is a mutable reference. Which is always the case unless the value is a Fixnum, Symbol, or similar “primitive” data type. Each instance can be configured with a few options that can help protect the program from potentially dangerous operations. Each of these options can be optionally set when the object instance is created:

  • :dup_on_deref When true the object will call the #dup method on the value object every time the #value method is called (default: false)

  • :freeze_on_deref When true the object will call the #freeze method on the value object every time the #value method is called (default: false)

  • :copy_on_deref When given a Proc object the Proc will be run every time the #value method is called. The Proc will be given the current value as its only argument and the result returned by the block will be the return value of the #value call. When nil this option will be ignored (default: nil)

When multiple deref options are set the order of operations is strictly defined. The order of deref operations is:

  • :copy_on_deref

  • :dup_on_deref

  • :freeze_on_deref

Because of this ordering there is no need to #freeze an object created by a provided :copy_on_deref block. Simply set :freeze_on_deref to true. Setting both :dup_on_deref to true and :freeze_on_deref to true is as close to the behavior of a “pure” functional language (like Erlang, Clojure, or Haskell) as we are likely to get in Ruby.

Class Method Summary

IVar - Inherited

.new

Create a new IVar in the :pending state with the (optional) initial value.

Instance Attribute Summary

Concern::Obligation - Included

#complete?

Has the obligation completed processing?

#fulfilled?

Has the obligation been fulfilled?

#incomplete?

Is the obligation still awaiting completion of processing?

#pending?

Is obligation completion still pending?

#realized?
#rejected?

Has the obligation been rejected?

#state

The current state of the obligation.

#unscheduled?

Is the obligation still unscheduled?

#state=

Concern::Dereferenceable - Included

#value

Return the value this object represents after applying the options specified by the #set_deref_options method.

Instance Method Summary

IVar - Inherited

#add_observer

Add an observer on this object that will receive notification on update.

#fail

Set the IVar to failed due to some error and wake or notify all threads waiting on it.

#set

Set the IVar to a value and wake or notify all threads waiting on it.

#try_set

Attempt to set the IVar with the given value or block.

#complete, #complete_without_notification, #notify_observers, #ns_complete_without_notification, #ns_initialize, #safe_execute, #check_for_block_or_value!

Concern::Observable - Included

#add_observer

Adds an observer to this set.

#count_observers

Return the number of observers associated with this object.

#delete_observer

Remove observer as an observer on this object so that it will no longer receive notifications.

#delete_observers

Remove all observers associated with this object.

#with_observer

As #add_observer but can be used for chaining.

Concern::Obligation - Included

#exception,
#no_error!
#reason

If an exception was raised during processing this will return the exception object.

#value

The current value of the obligation.

#value!

The current value of the obligation.

#wait

Wait until obligation is complete or the timeout has been reached.

#wait!

Wait until obligation is complete or the timeout is reached.

#compare_and_set_state

Atomic compare and set operation State is set to next_state only if current state == expected_current.

#event, #get_arguments_from,
#if_state

Executes the block within mutex if current state is included in expected_states.

#init_obligation,
#ns_check_state?

Am I in the current state?

#ns_set_state, #set_state

Concern::Dereferenceable - Included

#deref
#apply_deref_options,
#ns_set_deref_options

Set the options which define the operations #value performs before returning data to the caller (dereferencing).

Synchronization::LockableObject - Inherited

Constructor Details

.new(opts = {}) { ... } ⇒ Future

Create a new Future in the :unscheduled state.

Parameters:

  • opts (Hash) (defaults to: {})

    the options used to define the behavior at update and deref and to specify the executor on which to perform actions

Options Hash (opts):

  • :executor (Executor)

    when set use the given Executor instance. Three special values are also supported: :io returns the global pool for long, blocking (IO) tasks, :fast returns the global pool for short, fast operations, and :immediate returns the global ImmediateExecutor object.

  • :dup_on_deref (Boolean) — default: false

    Call #dup before returning the data from #value

  • :freeze_on_deref (Boolean) — default: false

    Call #freeze before returning the data from #value

  • :copy_on_deref (Proc) — default: nil

    When calling the #value method, call the given proc passing the internal value as the sole argument then return the new value returned from the proc.

  • :args (object, Array)

    zero or more arguments to be passed the task block on execution

Yields:

  • the asynchronous operation to perform

Raises:

  • (ArgumentError)

    if no block is given

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/future.rb', line 33

def initialize(opts = {}, &block)
  raise ArgumentError.new('no block given') unless block_given?
  super(NULL, opts.merge(__task_from_block__: block), &nil)
end

Class Method Details

.execute(opts = {}) { ... } ⇒ Future

Create a new Future object with the given block, execute it, and return the :pending object.

Examples:

future = Concurrent::Future.execute{ sleep(1); 42 }
future.state #=> :pending

Parameters:

  • opts (Hash) (defaults to: {})

    the options used to define the behavior at update and deref and to specify the executor on which to perform actions

Options Hash (opts):

  • :executor (Executor)

    when set use the given Executor instance. Three special values are also supported: :io returns the global pool for long, blocking (IO) tasks, :fast returns the global pool for short, fast operations, and :immediate returns the global ImmediateExecutor object.

  • :dup_on_deref (Boolean) — default: false

    Call #dup before returning the data from #value

  • :freeze_on_deref (Boolean) — default: false

    Call #freeze before returning the data from #value

  • :copy_on_deref (Proc) — default: nil

    When calling the #value method, call the given proc passing the internal value as the sole argument then return the new value returned from the proc.

  • :args (object, Array)

    zero or more arguments to be passed the task block on execution

Yields:

  • the asynchronous operation to perform

Returns:

  • (Future)

    the newly created Future in the :pending state

Raises:

  • (ArgumentError)

    if no block is given

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/future.rb', line 77

def self.execute(opts = {}, &block)
  Future.new(opts, &block).execute
end

Instance Attribute Details

#cancelled?Boolean (readonly)

Has the operation been successfully cancelled?

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/future.rb', line 111

def cancelled?
  state == :cancelled
end

Instance Method Details

#cancelBoolean

Attempt to cancel the operation if it has not already processed. The operation can only be cancelled while still pending. It cannot be cancelled once it has begun processing or has completed.

Returns:

  • (Boolean)

    was the operation successfully cancelled.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/future.rb', line 99

def cancel
  if compare_and_set_state(:cancelled, :pending)
    complete(false, nil, CancelledOperationError.new)
    true
  else
    false
  end
end

#executeFuture

Execute an :unscheduled Future. Immediately sets the state to :pending and passes the block to a new thread/thread pool for eventual execution. Does nothing if the Future is in any state other than :unscheduled.

Examples:

Instance and execute in separate steps

future = Concurrent::Future.new{ sleep(1); 42 }
future.state #=> :unscheduled
future.execute
future.state #=> :pending

Instance and execute in one line

future = Concurrent::Future.new{ sleep(1); 42 }.execute
future.state #=> :pending

Returns:

  • (Future)

    a reference to self

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/future.rb', line 53

def execute
  if compare_and_set_state(:pending, :unscheduled)
    @executor.post{ safe_execute(@task, @args) }
    self
  end
end

#set(value = NULL) { ... } ⇒ IVar

Set the IVar to a value and wake or notify all threads waiting on it.

Parameters:

  • value (Object) (defaults to: NULL)

    the value to store in the IVar

Yields:

  • A block operation to use for setting the value

Returns:

Raises:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/future.rb', line 82

def set(value = NULL, &block)
  check_for_block_or_value!(block_given?, value)
  synchronize do
    if @state != :unscheduled
      raise MultipleAssignmentError
    else
      @task = block || Proc.new { value }
    end
  end
  execute
end

#wait_or_cancel(timeout) ⇒ Boolean

Wait the given number of seconds for the operation to complete. On timeout attempt to cancel the operation.

Parameters:

  • timeout (Numeric)

    the maximum time in seconds to wait.

Returns:

  • (Boolean)

    true if the operation completed before the timeout else false

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/future.rb', line 121

def wait_or_cancel(timeout)
  wait(timeout)
  if complete?
    true
  else
    cancel
    false
  end
end