123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::TimerSet

Overview

Note:

Time calculations on all platforms and languages are sensitive to changes to the system clock. To alleviate the potential problems associated with changing the system clock while an application is running, most modern operating systems provide a monotonic clock that operates independently of the system clock. A monotonic clock cannot be used to determine human-friendly clock times. A monotonic clock is used exclusively for calculating time intervals. Not all Ruby platforms provide access to an operating system monotonic clock. On these platforms a pure-Ruby monotonic clock will be used as a fallback. An operating system monotonic clock is both faster and more reliable than the pure-Ruby implementation. The pure-Ruby implementation should be fast and reliable enough for most non-realtime operations. At this time the common Ruby platforms that provide access to an operating system monotonic clock are MRI 2.1 and above and JRuby (all versions).

Executes a collection of tasks, each after a given delay. A master task monitors the set and schedules each task for execution at the appropriate time. Tasks are run on the global thread pool or on the supplied executor. Each task is represented as a ScheduledTask.

Constant Summary

Concern::Logging - Included

SEV_LABEL

AbstractExecutorService - Inherited

FALLBACK_POLICIES

Class Method Summary

RubyExecutorService - Inherited

AbstractExecutorService - Inherited

.new

Create a new thread pool.

Instance Attribute Summary

RubyExecutorService - Inherited

AbstractExecutorService - Inherited

#auto_terminate=

Set the auto-terminate behavior for this executor.

#auto_terminate?

Is the executor auto-terminate when the application exits?

#fallback_policy, #name,
#running?

Is the executor running?

#shutdown

Begin an orderly shutdown.

#shutdown?

Is the executor shutdown?

#shuttingdown?

Is the executor shuttingdown?

#ns_auto_terminate?

ExecutorService - Included

#can_overflow?

Does the task queue have a maximum size?

#serialized?

Does this executor guarantee serialization of its operations?

Instance Method Summary

RubyExecutorService - Inherited

#<<

Submit a task to the executor for asynchronous processing.

#auto_terminate=

Set the auto-terminate behavior for this executor.

#auto_terminate?

Is the executor auto-terminate when the application exits?

#can_overflow?

Does the task queue have a maximum size?

#kill

Begin an immediate shutdown.

#post

Submit a task to the executor for asynchronous processing.

#running?

Is the executor running?

#serialized?

Does this executor guarantee serialization of its operations?

#shutdown

Begin an orderly shutdown.

#shutdown?

Is the executor shutdown?

#shuttingdown?

Is the executor shuttingdown?

#wait_for_termination

Block until executor shutdown is complete or until timeout seconds have passed.

#ns_shutdown_execution, #stop_event, #stopped_event

AbstractExecutorService - Inherited

#<<

Submit a task to the executor for asynchronous processing.

#can_overflow?

Does the task queue have a maximum size?

#kill

Begin an immediate shutdown.

#post

Submit a task to the executor for asynchronous processing.

#serialized?

Does this executor guarantee serialization of its operations?

#to_s,
#wait_for_termination

Block until executor shutdown is complete or until timeout seconds have passed.

#fallback_action

Returns an action which executes the fallback_policy once the queue size reaches max_queue.

#ns_execute,
#ns_kill_execution

Callback method called when the executor has been killed.

#ns_shutdown_execution

Callback method called when an orderly shutdown has completed.

Concern::Deprecation - Included

ExecutorService - Included

#<<

Submit a task to the executor for asynchronous processing.

#post

Submit a task to the executor for asynchronous processing.

Concern::Logging - Included

#log

Logs through global_logger, it can be overridden by setting @logger.

Synchronization::LockableObject - Inherited

Constructor Details

.new(opts = {}) ⇒ TimerSet

Create a new set of timed tasks.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 30

def initialize(opts = {})
  super(opts)
end

Instance Method Details

#kill

Begin an immediate shutdown. In-progress tasks will be allowed to complete but enqueued tasks will be dismissed and no new tasks will be accepted. Has no additional effect if the thread pool is not running.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 62

def kill
  shutdown
end

#ns_initialize(opts) (private)

Initialize the object.

Parameters:

  • opts (Hash)

    the options to create the object with.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 74

def ns_initialize(opts)
  @queue              = Collection::NonConcurrentPriorityQueue.new(order: :min)
  @task_executor      = Options.executor_from_options(opts) || Concurrent.global_io_executor
  @timer_executor     = SingleThreadExecutor.new
  @condition          = Event.new
  @ruby_pid           = $$ # detects if Ruby has forked
end

#ns_post_task(task) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 94

def ns_post_task(task)
  return false unless ns_running?
  ns_reset_if_forked
  if (task.initial_delay) <= 0.01
    task.executor.post { task.process_task }
  else
    @queue.push(task)
    # only post the process method when the queue is empty
    @timer_executor.post(&method(:process_tasks)) if @queue.length == 1
    @condition.set
  end
  true
end

#ns_reset_if_forked (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 129

def ns_reset_if_forked
  if $$ != @ruby_pid
    @queue.clear
    @condition.reset
    @ruby_pid = $$
  end
end

#ns_shutdown_execution (private)

ExecutorService callback called during shutdown.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 122

def ns_shutdown_execution
  ns_reset_if_forked
  @queue.clear
  @timer_executor.kill
  stopped_event.set
end

#post(delay, *args) { ... } ⇒ Concurrent::ScheduledTask, false

Post a task to be execute run after a given delay (in seconds). If the delay is less than 1/100th of a second the task will be immediately post to the executor.

Parameters:

  • delay (Float)

    the number of seconds to wait for before executing the task.

  • args (Array<Object>)

    the arguments passed to the task on execution.

Yields:

  • the task to be performed.

Returns:

Raises:

  • (ArgumentError)

    if the intended execution time is not in the future.

  • (ArgumentError)

    if no block is given.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 48

def post(delay, *args, &task)
  raise ArgumentError.new('no block given') unless block_given?
  return false unless running?
  opts = { executor:  @task_executor,
           args:      args,
           timer_set: self }
  task = ScheduledTask.execute(delay, opts, &task) # may raise exception
  task.unscheduled? ? false : task
end

#post_task(task) (private)

Note:

This is intended as a callback method from ScheduledTask only. It is not intended to be used directly. Post a task by using the SchedulesTask#execute method.

Post the task to the internal queue.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 89

def post_task(task)
  synchronize { ns_post_task(task) }
end

#process_tasks (private)

Run a loop and execute tasks in the scheduled order and at the approximate scheduled time. If no tasks remain the thread will exit gracefully so that garbage collection can occur. If there are no ready tasks it will sleep for up to 60 seconds waiting for the next scheduled task.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 143

def process_tasks
  loop do
    task = synchronize { @condition.reset; @queue.peek }
    break unless task

    now  = Concurrent.monotonic_time
    diff = task.schedule_time - now

    if diff <= 0
      # We need to remove the task from the queue before passing
      # it to the executor, to avoid race conditions where we pass
      # the peek'ed task to the executor and then pop a different
      # one that's been added in the meantime.
      #
      # Note that there's no race condition between the peek and
      # this pop - this pop could retrieve a different task from
      # the peek, but that task would be due to fire now anyway
      # (because @queue is a priority queue, and this thread is
      # the only reader, so whatever timer is at the head of the
      # queue now must have the same pop time, or a closer one, as
      # when we peeked).
      task = synchronize { @queue.pop }
      begin
        task.executor.post { task.process_task }
      rescue RejectedExecutionError
        # ignore and continue
      end
    else
      @condition.wait([diff, 60].min)
    end
  end
end

#remove_task(task) (private)

Note:

This is intended as a callback method from ScheduledTask only. It is not intended to be used directly. Cancel a task by using the ScheduledTask#cancel method.

Remove the given task from the queue.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 115

def remove_task(task)
  synchronize { @queue.delete(task) }
end