Class: Concurrent::Future
Relationships & Source Files | |
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
|
|
Instance Chain:
|
|
Inherits: |
Concurrent::IVar
|
Defined in: | lib/concurrent-ruby/concurrent/future.rb |
Overview
Future
is inspired by Clojure's future function. A future represents a promise to complete an action at some time in the future. The action is atomic and permanent. The idea behind a future is to send an operation for asynchronous completion, do other stuff, then return and retrieve the result of the async operation at a later time. Future
s run on the global thread pool.
Feature:
As a highly responsive Ruby application
I want long-running tasks on a separate thread
So I can perform other tasks without waiting
Future
s have several possible states: :unscheduled, :pending, :processing, :rejected, or :fulfilled. These are also aggregated as #incomplete?
and #complete?
. When a Future
is created it is set to :unscheduled. Once the #execute method is called the state becomes :pending. Once a job is pulled from the thread pool's queue and is given to a thread for processing (often immediately upon #post
) the state becomes :processing. The future will remain in this state until processing is complete. A future that is in the :unscheduled, :pending, or :processing is considered #incomplete?
. A #complete?
Future
is either :rejected, indicating that an exception was thrown during processing, or :fulfilled, indicating success. If a Future
is :fulfilled its #value
will be updated to reflect the result of the operation. If :rejected the reason
will be updated with a reference to the thrown exception. The predicate methods #unscheduled?
, #pending?
, #rejected?
, and #fulfilled?
can be called at any time to obtain the state of the Future
, as can the #state
method, which returns a symbol.
Retrieving the value of a Future
is done through the #value
(alias: #deref
) method. Obtaining the value of a Future
is a potentially blocking operation. When a Future
is :rejected a call to #value
will return nil
immediately. When a Future
is :fulfilled a call to #value
will immediately return the current value. When a Future
is :pending a call to #value
will block until the Future
is either :rejected or :fulfilled. A timeout value can be passed to #value
to limit how long the call will block. If nil
the call will block indefinitely. If 0
the call will not block. Any other integer or float value will indicate the maximum number of seconds to block.
The constructor can also be given zero or more processing options. Currently the only supported options are those recognized by the Dereferenceable module.
The Future
class also includes the behavior of the Ruby standard library href="http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html">http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html Observable module, but does so in a thread-safe way. On fulfillment or rejection all observers will be notified according to the normal Observable
behavior. The observer callback function will be called with three parameters: the Time
of fulfillment/rejection, the final value
, and the final reason
. Observers added after fulfillment/rejection will still be notified as normal. The notification will occur on the same thread that processed the job.
===== Examples
A fulfilled example:
require 'concurrent'
require 'csv'
require 'open-uri'
class Ticker
def get_year_end_closing(symbol, year, api_key)
uri = "https://www.alphavantage.co/query?function=TIME_SERIES_MONTHLY&symbol=#{symbol}&apikey=#{api_key}&datatype=csv"
data = []
csv = URI.parse(uri).read
if csv.include?('call frequency')
return :rate_limit_exceeded
end
CSV.parse(csv, headers: true) do |row|
data << row['close'].to_f if row['timestamp'].include?(year.to_s)
end
year_end = data.first
year_end
rescue => e
p e
end
end
api_key = ENV['ALPHAVANTAGE_KEY']
abort( ) unless api_key
=== Future
price = <code>Future</code>.execute{ {Ticker.new}.get_year_end_closing('TWTR', 2013, api_key) }
p price.state #=> :pending
p price.pending? #=> true
p price.value(0) #=> nil (does not block)
sleep(1) # do other stuff
p price.value #=> 63.65 (after blocking if necessary)
p price.state #=> :fulfilled
p price.fulfilled? #=> true
p price.value #=> 63.65
A rejected example:
count = <code>Future</code>.execute{ sleep(10); raise {StandardError.new}("Boom!") }
count.state #=> :pending
count.pending? #=> true
count.value #=> nil (after blocking)
count.rejected? #=> true
count.reason #=> #<StandardError: Boom!>
An example with observation:
class Ticker
Stock = Struct.new(:symbol, :name, :exchange)
def update(time, value, reason)
ticker = value.collect do |symbol|
Stock.new(symbol['symbol'], symbol['name'], symbol['exch'])
end
output = ticker.join("\n")
print "#{output}\n"
end
end
yahoo = {Ticker.new}('YAHOO')
future = {Concurrent::Future.new} { yahoo.update.suggested_symbols }
future.add_observer(Ticker.new)
future.execute
=== do important stuff...
{#>>} #<struct {Ticker::Stock} symbol="YHOO", name="Yahoo! {Inc."}, exchange="NMS">
{#>>} #<struct {Ticker::Stock} symbol="YHO.DE", name="Yahoo! {Inc."}, exchange="GER">
{#>>} #<struct {Ticker::Stock} symbol="YAHOY", name="Yahoo Japan Corporation", exchange="PNK">
{#>>} #<struct {Ticker::Stock} symbol="YAHOF", name="YAHOO JAPAN CORP", exchange="PNK">
{#>>} #<struct {Ticker::Stock} symbol="YOJ.SG", name="YAHOO JAPAN", exchange="STU">
{#>>} #<struct {Ticker::Stock} symbol="YHO.SG", name="YAHOO", exchange="STU">
{#>>} #<struct {Ticker::Stock} symbol="YHOO.BA", name="Yahoo! {Inc."}, exchange="BUE">
{#>>} #<struct {Ticker::Stock} symbol="YHO.DU", name="YAHOO", exchange="DUS">
{#>>} #<struct {Ticker::Stock} symbol="YHO.HM", name="YAHOO", exchange="HAM">
{#>>} #<struct {Ticker::Stock} symbol="YHO.BE", name="YAHOO", exchange="BER">
## Copy Options
::Object
references in Ruby are mutable. This can lead to serious problems when the #value
of an object is a mutable reference. Which is always the case unless the value is a Fixnum
, Symbol
, or similar “primitive” data type. Each instance can be configured with a few options that can help protect the program from potentially dangerous operations. Each of these options can be optionally set when the object instance is created:
-
:dup_on_deref
When true the object will call the#dup
method on thevalue
object every time the#value
method is called (default: false) -
:freeze_on_deref
When true the object will call the#freeze
method on thevalue
object every time the#value
method is called (default: false) -
:copy_on_deref
When given aProc
object theProc
will be run every time the#value
method is called. TheProc
will be given the currentvalue
as its only argument and the result returned by the block will be the return value of the#value
call. Whennil
this option will be ignored (default: nil)
When multiple deref options are set the order of operations is strictly defined. The order of deref operations is:
-
:copy_on_deref
-
:dup_on_deref
-
:freeze_on_deref
Because of this ordering there is no need to #freeze
an object created by a provided :copy_on_deref
block. Simply set :freeze_on_deref
to true
. Setting both :dup_on_deref
to true
and :freeze_on_deref
to true
is as close to the behavior of a “pure” functional language (like Erlang, Clojure, or Haskell) as we are likely to get in Ruby.
Class Method Summary
-
.execute(opts = {}) { ... } ⇒ Future
Create a new
Future
object with the given block, execute it, and return the:pending
object. -
.new(opts = {}) { ... } ⇒ Future
constructor
Create a new
Future
in the:unscheduled
state.
IVar
- Inherited
Instance Attribute Summary
-
#cancelled? ⇒ Boolean
readonly
Has the operation been successfully cancelled?
Concern::Obligation
- Included
#complete? | Has the obligation completed processing? |
#fulfilled? | Has the obligation been fulfilled? |
#incomplete? | Is the obligation still awaiting completion of processing? |
#pending? | Is obligation completion still pending? |
#realized? | Alias for Concern::Obligation#fulfilled?. |
#rejected? | Has the obligation been rejected? |
#state | The current state of the obligation. |
#unscheduled? | Is the obligation still unscheduled? |
#state= |
Concern::Dereferenceable
- Included
#value | Return the value this object represents after applying the options specified by the |
Instance Method Summary
-
#cancel ⇒ Boolean
Attempt to cancel the operation if it has not already processed.
-
#execute ⇒ Future
Execute an
:unscheduled
Future
. - #set(value = NULL) { ... } ⇒ IVar
-
#wait_or_cancel(timeout) ⇒ Boolean
Wait the given number of seconds for the operation to complete.
IVar
- Inherited
#add_observer | Add an observer on this object that will receive notification on update. |
#fail | |
#set | |
#try_set | Attempt to set the |
#complete, #complete_without_notification, #notify_observers, #ns_complete_without_notification, #ns_initialize, #safe_execute, #check_for_block_or_value! |
Concern::Observable
- Included
#add_observer | Adds an observer to this set. |
#count_observers | Return the number of observers associated with this object. |
#delete_observer | Remove |
#delete_observers | Remove all observers associated with this object. |
#with_observer | As |
Concern::Obligation
- Included
#exception, | |
#no_error! | Alias for Concern::Obligation#wait!. |
#reason | If an exception was raised during processing this will return the exception object. |
#value | The current value of the obligation. |
#value! | The current value of the obligation. |
#wait | Wait until obligation is complete or the timeout has been reached. |
#wait! | Wait until obligation is complete or the timeout is reached. |
#compare_and_set_state | Atomic compare and set operation State is set to |
#event, #get_arguments_from, | |
#if_state | Executes the block within mutex if current state is included in expected_states. |
#init_obligation, | |
#ns_check_state? | Am I in the current state? |
#ns_set_state, #set_state |
Concern::Dereferenceable
- Included
#deref | Alias for Concern::Dereferenceable#value. |
#apply_deref_options, | |
#ns_set_deref_options |
|
Synchronization::LockableObject
- Inherited
Constructor Details
.new(opts = {}) { ... } ⇒ Future
Create a new Future
in the :unscheduled
state.
# File 'lib/concurrent-ruby/concurrent/future.rb', line 33
def initialize(opts = {}, &block) raise ArgumentError.new('no block given') unless block_given? super(NULL, opts.merge(__task_from_block__: block), &nil) end
Class Method Details
.execute(opts = {}) { ... } ⇒ Future
Create a new Future
object with the given block, execute it, and return the :pending
object.
# File 'lib/concurrent-ruby/concurrent/future.rb', line 77
def self.execute(opts = {}, &block) Future.new(opts, &block).execute end
Instance Attribute Details
#cancelled? ⇒ Boolean
(readonly)
Has the operation been successfully cancelled?
# File 'lib/concurrent-ruby/concurrent/future.rb', line 111
def cancelled? state == :cancelled end
Instance Method Details
#cancel ⇒ Boolean
Attempt to cancel the operation if it has not already processed. The operation can only be cancelled while still pending
. It cannot be cancelled once it has begun processing or has completed.
# File 'lib/concurrent-ruby/concurrent/future.rb', line 99
def cancel if compare_and_set_state(:cancelled, :pending) complete(false, nil, CancelledOperationError.new) true else false end end
#execute ⇒ Future
Execute an :unscheduled
Future
. Immediately sets the state to :pending
and passes the block to a new thread/thread pool for eventual execution. Does nothing if the Future
is in any state other than :unscheduled
.
# File 'lib/concurrent-ruby/concurrent/future.rb', line 53
def execute if compare_and_set_state(:pending, :unscheduled) @executor.post{ safe_execute(@task, @args) } self end end
#set(value = NULL) { ... } ⇒ IVar
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/future.rb', line 82
def set(value = NULL, &block) check_for_block_or_value!(block_given?, value) synchronize do if @state != :unscheduled raise MultipleAssignmentError else @task = block || Proc.new { value } end end execute end
#wait_or_cancel(timeout) ⇒ Boolean
Wait the given number of seconds for the operation to complete. On timeout attempt to cancel the operation.
# File 'lib/concurrent-ruby/concurrent/future.rb', line 121
def wait_or_cancel(timeout) wait(timeout) if complete? true else cancel false end end