123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::ScheduledTask

Relationships & Source Files
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
Instance Chain:
Inherits: Concurrent::IVar
Defined in: lib/concurrent-ruby/concurrent/scheduled_task.rb

Overview

Note:

Time calculations on all platforms and languages are sensitive to changes to the system clock. To alleviate the potential problems associated with changing the system clock while an application is running, most modern operating systems provide a monotonic clock that operates independently of the system clock. A monotonic clock cannot be used to determine human-friendly clock times. A monotonic clock is used exclusively for calculating time intervals. Not all Ruby platforms provide access to an operating system monotonic clock. On these platforms a pure-Ruby monotonic clock will be used as a fallback. An operating system monotonic clock is both faster and more reliable than the pure-Ruby implementation. The pure-Ruby implementation should be fast and reliable enough for most non-realtime operations. At this time the common Ruby platforms that provide access to an operating system monotonic clock are MRI 2.1 and above and JRuby (all versions).

ScheduledTask is a close relative of Future but with one important difference: A Future is set to execute as soon as possible whereas a ScheduledTask is set to execute after a specified delay. This implementation is loosely based on Java’s href="http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledExecutorService.html">docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledExecutorService.html ScheduledExecutorService. It is a more feature-rich variant of Concurrent.timer.

The intended schedule time of task execution is set on object construction with the delay argument. The delay is a numeric (floating point or integer) representing a number of seconds in the future. Any other value or a numeric equal to or less than zero will result in an exception. The actual schedule time of task execution is set when the .execute method is called.

The constructor can also be given zero or more processing options. Currently the only supported options are those recognized by the [Dereferenceable](Dereferenceable) module.

The final constructor argument is a block representing the task to be performed. If no block is given an ArgumentError will be raised.

States

ScheduledTask mixes in the [Obligation](Obligation) module thus giving it “future” behavior. This includes the expected lifecycle states. ScheduledTask has one additional state, however. While the task (block) is being executed the state of the object will be :processing. This additional state is necessary because it has implications for task cancellation.

Cancellation

A :pending task can be cancelled using the #cancel method. A task in any other state, including :processing, cannot be cancelled. The #cancel method returns a boolean indicating the success of the cancellation attempt. A cancelled ScheduledTask cannot be restarted. It is immutable.

**Obligation and Observation**

The result of a ScheduledTask can be obtained either synchronously or asynchronously. ScheduledTask mixes in both the [Obligation](Obligation) module and the href="http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html">ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html Observable module from the Ruby standard library. With one exception ScheduledTask behaves identically to [Future](Observable) with regard to these modules.

## Copy Options

::Object references in Ruby are mutable. This can lead to serious problems when the #value of an object is a mutable reference. Which is always the case unless the value is a Fixnum, Symbol, or similar “primitive” data type. Each instance can be configured with a few options that can help protect the program from potentially dangerous operations. Each of these options can be optionally set when the object instance is created:

  • :dup_on_deref When true the object will call the #dup method on the value object every time the #value method is called (default: false)

  • :freeze_on_deref When true the object will call the #freeze method on the value object every time the #value method is called (default: false)

  • :copy_on_deref When given a Proc object the Proc will be run every time the #value method is called. The Proc will be given the current value as its only argument and the result returned by the block will be the return value of the #value call. When nil this option will be ignored (default: nil)

When multiple deref options are set the order of operations is strictly defined. The order of deref operations is:

  • :copy_on_deref

  • :dup_on_deref

  • :freeze_on_deref

Because of this ordering there is no need to #freeze an object created by a provided :copy_on_deref block. Simply set :freeze_on_deref to true. Setting both :dup_on_deref to true and :freeze_on_deref to true is as close to the behavior of a “pure” functional language (like Erlang, Clojure, or Haskell) as we are likely to get in Ruby.

Examples:

Basic usage

require 'concurrent/scheduled_task'
require 'csv'
require 'open-uri'

class Ticker
  def get_year_end_closing(symbol, year, api_key)
   uri = "https://www.alphavantage.co/query?function=TIME_SERIES_MONTHLY&symbol=#{symbol}&apikey=#{api_key}&datatype=csv"
   data = []
   csv = URI.parse(uri).read
   if csv.include?('call frequency')
     return :rate_limit_exceeded
   end
   CSV.parse(csv, headers: true) do |row|
     data << row['close'].to_f if row['timestamp'].include?(year.to_s)
   end
   year_end = data.first
   year_end
 rescue => e
   p e
 end
end

api_key = ENV['ALPHAVANTAGE_KEY']
abort(error_message) unless api_key

# Future
price = Concurrent::Future.execute{ Ticker.new.get_year_end_closing('TWTR', 2013, api_key) }
price.state #=> :pending
price.pending? #=> true
price.value(0) #=> nil (does not block)

 sleep(1)    # do other stuff

price.value #=> 63.65 (after blocking if necessary)
price.state #=> :fulfilled
price.fulfilled? #=> true
price.value #=> 63.65

Successful task execution

task = Concurrent::ScheduledTask.new(2){ 'What does the fox say?' }
task.state         #=> :unscheduled
task.execute
task.state         #=> pending

# wait for it...
sleep(3)

task.unscheduled? #=> false
task.pending?     #=> false
task.fulfilled?   #=> true
task.rejected?    #=> false
task.value        #=> 'What does the fox say?'

One line creation and execution

task = Concurrent::ScheduledTask.new(2){ 'What does the fox say?' }.execute
task.state         #=> pending

task = Concurrent::ScheduledTask.execute(2){ 'What do you get when you multiply 6 by 9?' }
task.state         #=> pending

Failed task execution

task = Concurrent::ScheduledTask.execute(2){ raise StandardError.new('Call me maybe?') }
task.pending?      #=> true

# wait for it...
sleep(3)

task.unscheduled? #=> false
task.pending?     #=> false
task.fulfilled?   #=> false
task.rejected?    #=> true
task.value        #=> nil
task.reason       #=> #<StandardError: Call me maybe?>

Task execution with observation

observer = Class.new{
  def update(time, value, reason)
    puts "The task completed at #{time} with value '#{value}'"
  end
}.new

task = Concurrent::ScheduledTask.new(2){ 'What does the fox say?' }
task.add_observer(observer)
task.execute
task.pending?      #=> true

# wait for it...
sleep(3)

#>> The task completed at 2013-11-07 12:26:09 -0500 with value 'What does the fox say?'

See Also:

Class Method Summary

IVar - Inherited

.new

Create a new IVar in the :pending state with the (optional) initial value.

Instance Attribute Summary

Concern::Obligation - Included

#complete?

Has the obligation completed processing?

#fulfilled?

Has the obligation been fulfilled?

#incomplete?

Is the obligation still awaiting completion of processing?

#pending?

Is obligation completion still pending?

#realized?
#rejected?

Has the obligation been rejected?

#state

The current state of the obligation.

#unscheduled?

Is the obligation still unscheduled?

#state=

Concern::Dereferenceable - Included

#value

Return the value this object represents after applying the options specified by the #set_deref_options method.

Instance Method Summary

IVar - Inherited

#add_observer

Add an observer on this object that will receive notification on update.

#fail

Set the IVar to failed due to some error and wake or notify all threads waiting on it.

#set

Set the IVar to a value and wake or notify all threads waiting on it.

#try_set

Attempt to set the IVar with the given value or block.

#complete, #complete_without_notification, #notify_observers, #ns_complete_without_notification, #ns_initialize, #safe_execute, #check_for_block_or_value!

Concern::Observable - Included

#add_observer

Adds an observer to this set.

#count_observers

Return the number of observers associated with this object.

#delete_observer

Remove observer as an observer on this object so that it will no longer receive notifications.

#delete_observers

Remove all observers associated with this object.

#with_observer

As #add_observer but can be used for chaining.

Concern::Obligation - Included

#exception,
#no_error!
#reason

If an exception was raised during processing this will return the exception object.

#value

The current value of the obligation.

#value!

The current value of the obligation.

#wait

Wait until obligation is complete or the timeout has been reached.

#wait!

Wait until obligation is complete or the timeout is reached.

#compare_and_set_state

Atomic compare and set operation State is set to next_state only if current state == expected_current.

#event, #get_arguments_from,
#if_state

Executes the block within mutex if current state is included in expected_states.

#init_obligation,
#ns_check_state?

Am I in the current state?

#ns_set_state, #set_state

Concern::Dereferenceable - Included

#deref
#apply_deref_options,
#ns_set_deref_options

Set the options which define the operations #value performs before returning data to the caller (dereferencing).

Synchronization::LockableObject - Inherited

Constructor Details

.new(delay, opts = {}) { ... } ⇒ ScheduledTask

Schedule a task for execution at a specified future time.

Parameters:

  • delay (Float)

    the number of seconds to wait for before executing the task

  • opts (Hash) (defaults to: {})

    the options used to define the behavior at update and deref and to specify the executor on which to perform actions

Options Hash (opts):

  • :executor (Executor)

    when set use the given Executor instance. Three special values are also supported: :io returns the global pool for long, blocking (IO) tasks, :fast returns the global pool for short, fast operations, and :immediate returns the global ImmediateExecutor object.

  • :dup_on_deref (Boolean) — default: false

    Call #dup before returning the data from #value

  • :freeze_on_deref (Boolean) — default: false

    Call #freeze before returning the data from #value

  • :copy_on_deref (Proc) — default: nil

    When calling the #value method, call the given proc passing the internal value as the sole argument then return the new value returned from the proc.

  • :args (object, Array)

    zero or more arguments to be passed the task block on execution

Yields:

  • the task to be performed

Raises:

  • (ArgumentError)

    When no block is given

  • (ArgumentError)

    When given a time that is in the past

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/scheduled_task.rb', line 178

def initialize(delay, opts = {}, &task)
  raise ArgumentError.new('no block given') unless block_given?
  raise ArgumentError.new('seconds must be greater than zero') if delay.to_f < 0.0

  super(NULL, opts, &nil)

  synchronize do
    ns_set_state(:unscheduled)
    @parent = opts.fetch(:timer_set, Concurrent.global_timer_set)
    @args = get_arguments_from(opts)
    @delay = delay.to_f
    @task = task
    @time = nil
    @executor = Options.executor_from_options(opts) || Concurrent.global_io_executor
    self.observers = Collection::CopyOnNotifyObserverSet.new
  end
end

Class Method Details

.execute(delay, opts = {}, &task) ⇒ ScheduledTask

Create a new ScheduledTask object with the given block, execute it, and return the :pending object.

Parameters:

  • delay (Float)

    the number of seconds to wait for before executing the task

  • opts (Hash) (defaults to: {})

    the options used to define the behavior at update and deref and to specify the executor on which to perform actions

Options Hash (opts):

  • :executor (Executor)

    when set use the given Executor instance. Three special values are also supported: :io returns the global pool for long, blocking (IO) tasks, :fast returns the global pool for short, fast operations, and :immediate returns the global ImmediateExecutor object.

  • :dup_on_deref (Boolean) — default: false

    Call #dup before returning the data from #value

  • :freeze_on_deref (Boolean) — default: false

    Call #freeze before returning the data from #value

  • :copy_on_deref (Proc) — default: nil

    When calling the #value method, call the given proc passing the internal value as the sole argument then return the new value returned from the proc.

Returns:

  • (ScheduledTask)

    the newly created ScheduledTask in the :pending state

Raises:

  • (ArgumentError)

    if no block is given

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/scheduled_task.rb', line 290

def self.execute(delay, opts = {}, &task)
  new(delay, opts, &task).execute
end

Instance Attribute Details

#cancelled?Boolean (readonly)

Has the task been cancelled?

Returns:

  • (Boolean)

    true if the task is in the given state else false

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/scheduled_task.rb', line 220

def cancelled?
  synchronize { ns_check_state?(:cancelled) }
end

#executor (readonly, private)

The executor on which to execute the task.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/scheduled_task.rb', line 163

attr_reader :executor

#processing?Boolean (readonly)

In the task execution in progress?

Returns:

  • (Boolean)

    true if the task is in the given state else false

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/scheduled_task.rb', line 227

def processing?
  synchronize { ns_check_state?(:processing) }
end

Instance Method Details

#<=>(other) (private)

Comparator which orders by schedule time.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/scheduled_task.rb', line 213

def <=>(other)
  schedule_time <=> other.schedule_time
end

#cancelBoolean

Cancel this task and prevent it from executing. A task can only be cancelled if it is pending or unscheduled.

Returns:

  • (Boolean)

    true if successfully cancelled else false

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/scheduled_task.rb', line 235

def cancel
  if compare_and_set_state(:cancelled, :pending, :unscheduled)
    complete(false, nil, CancelledOperationError.new)
    # To avoid deadlocks this call must occur outside of #synchronize
    # Changing the state above should prevent redundant calls
    @parent.send(:remove_task, self)
  else
    false
  end
end

#executeScheduledTask

Execute an :unscheduled ScheduledTask. Immediately sets the state to :pending and starts counting down toward execution. Does nothing if the ScheduledTask is in any state other than :unscheduled.

Returns:

  • (ScheduledTask)

    a reference to self

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/scheduled_task.rb', line 273

def execute
  if compare_and_set_state(:pending, :unscheduled)
    synchronize{ ns_schedule(@delay) }
  end
  self
end

#initial_delayFloat

The delay value given at instanciation.

Returns:

  • (Float)

    the initial delay.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/scheduled_task.rb', line 199

def initial_delay
  synchronize { @delay }
end

#ns_reschedule(delay) ⇒ Boolean (private)

Reschedule the task using the given delay and the current time. A task can only be reset while it is :pending.

Parameters:

  • delay (Float)

    the number of seconds to wait for before executing the task

Returns:

  • (Boolean)

    true if successfully rescheduled else false

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/scheduled_task.rb', line 326

def ns_reschedule(delay)
  return false unless ns_check_state?(:pending)
  @parent.send(:remove_task, self) && ns_schedule(delay)
end

#ns_schedule(delay) ⇒ Boolean (private)

Schedule the task using the given delay and the current time.

Parameters:

  • delay (Float)

    the number of seconds to wait for before executing the task

Returns:

  • (Boolean)

    true if successfully rescheduled else false

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/scheduled_task.rb', line 312

def ns_schedule(delay)
  @delay = delay
  @time = Concurrent.monotonic_time + @delay
  @parent.send(:post_task, self)
end

#process_task (private)

Execute the task.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/scheduled_task.rb', line 297

def process_task
  safe_execute(@task, @args)
end

#reschedule(delay) ⇒ Boolean

Reschedule the task using the given delay and the current time. A task can only be reset while it is :pending.

Parameters:

  • delay (Float)

    the number of seconds to wait for before executing the task

Returns:

  • (Boolean)

    true if successfully rescheduled else false

Raises:

  • (ArgumentError)

    When given a time that is in the past

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/scheduled_task.rb', line 262

def reschedule(delay)
  delay = delay.to_f
  raise ArgumentError.new('seconds must be greater than zero') if delay < 0.0
  synchronize{ ns_reschedule(delay) }
end

#resetBoolean

Reschedule the task using the original delay and the current time. A task can only be reset while it is :pending.

Returns:

  • (Boolean)

    true if successfully rescheduled else false

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/scheduled_task.rb', line 250

def reset
  synchronize{ ns_reschedule(@delay) }
end

#schedule_timeFloat

The monotonic time at which the the task is scheduled to be executed.

Returns:

  • (Float)

    the schedule time or nil if unscheduled

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/scheduled_task.rb', line 206

def schedule_time
  synchronize { @time }
end