Time calculations on all platforms and languages are sensitive to changes to the system clock. To alleviate the potential problems associated with changing the system clock while an application is running, most modern operating systems provide a monotonic clock that operates independently of the system clock. A monotonic clock cannot be used to determine human-friendly clock times. A monotonic clock is used exclusively for calculating time intervals. Not all Ruby platforms provide access to an operating system monotonic clock. On these platforms a pure-Ruby monotonic clock will be used as a fallback. An operating system monotonic clock is both faster and more reliable than the pure-Ruby implementation. The pure-Ruby implementation should be fast and reliable enough for most non-realtime operations. At this time the common Ruby platforms that provide access to an operating system monotonic clock are MRI 2.1 and above and JRuby (all versions).

Executes a collection of tasks, each after a given delay. A master task monitors the set and schedules each task for execution at the appropriate time. Tasks are run on the global thread pool or on the supplied executor. Each task is represented as a ScheduledTask.

Create a new thread pool.

.new(opts = {}) ⇒ TimerSet

Create a new set of timed tasks.

# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 30

def initialize(opts = {})

Begin an immediate shutdown. In-progress tasks will be allowed to complete but enqueued tasks will be dismissed and no new tasks will be accepted. Has no additional effect if the thread pool is not running.

# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 62

def kill

#ns_initialize(opts) (private)

Initialize the object.


  • opts (Hash)

    the options to create the object with.

# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 74

def ns_initialize(opts)
  @queue              = Collection::NonConcurrentPriorityQueue.new(order: :min)
  @task_executor      = Options.executor_from_options(opts) || Concurrent.global_io_executor
  @timer_executor     = SingleThreadExecutor.new
  @condition          = Event.new
  @ruby_pid           = $$ # detects if Ruby has forked

#ns_post_task(task) (private)

# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 94

def ns_post_task(task)
  return false unless ns_running?
  if (task.initial_delay) <= 0.01
    task.executor.post { task.process_task }
    # only post the process method when the queue is empty
    @timer_executor.post(&method(:process_tasks)) if @queue.length == 1

#ns_reset_if_forked (private)

# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 129

def ns_reset_if_forked
  if $$ != @ruby_pid
    @ruby_pid = $$

#ns_shutdown_execution (private)

ExecutorService callback called during shutdown.

# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 122

def ns_shutdown_execution

#post(delay, *args) { ... } ⇒ Concurrent::ScheduledTask, false

Post a task to be execute run after a given delay (in seconds). If the delay is less than 1/100th of a second the task will be immediately post to the executor.


  • delay (Float)

    the number of seconds to wait for before executing the task.

  • args (Array<Object>)

    the arguments passed to the task on execution.


  • the task to be performed.



  • (ArgumentError)

    if the intended execution time is not in the future.

  • (ArgumentError)

    if no block is given.

# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 48

def post(delay, *args, &task)
  raise ArgumentError.new('no block given') unless block_given?
  return false unless running?
  opts = { executor:  @task_executor,
           args:      args,
           timer_set: self }
  task = ScheduledTask.execute(delay, opts, &task) # may raise exception
  task.unscheduled? ? false : task

#post_task(task) (private)


This is intended as a callback method from ScheduledTask only. It is not intended to be used directly. Post a task by using the SchedulesTask#execute method.

Post the task to the internal queue.

# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 89

def post_task(task)
  synchronize { ns_post_task(task) }

#process_tasks (private)

Run a loop and execute tasks in the scheduled order and at the approximate scheduled time. If no tasks remain the thread will exit gracefully so that garbage collection can occur. If there are no ready tasks it will sleep for up to 60 seconds waiting for the next scheduled task.

# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 143

def process_tasks
  loop do
    task = synchronize { @condition.reset; @queue.peek }
    break unless task

    now  = Concurrent.monotonic_time
    diff = task.schedule_time - now

    if diff <= 0
      # We need to remove the task from the queue before passing
      # it to the executor, to avoid race conditions where we pass
      # the peek'ed task to the executor and then pop a different
      # one that's been added in the meantime.
      # Note that there's no race condition between the peek and
      # this pop - this pop could retrieve a different task from
      # the peek, but that task would be due to fire now anyway
      # (because @queue is a priority queue, and this thread is
      # the only reader, so whatever timer is at the head of the
      # queue now must have the same pop time, or a closer one, as
      # when we peeked).
      task = synchronize { @queue.pop }
        task.executor.post { task.process_task }
      rescue RejectedExecutionError
        # ignore and continue
      @condition.wait([diff, 60].min)

#remove_task(task) (private)


This is intended as a callback method from ScheduledTask only. It is not intended to be used directly. Cancel a task by using the ScheduledTask#cancel method.

Remove the given task from the queue.

# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 115

def remove_task(task)
  synchronize { @queue.delete(task) }