Class: Concurrent::TimerSet
| Relationships & Source Files | |
| Super Chains via Extension / Inclusion / Inheritance | |
| Class Chain: | |
| Instance Chain: | |
| Inherits: | Concurrent::RubyExecutorService | 
| Defined in: | lib/concurrent-ruby/concurrent/executor/timer_set.rb | 
Overview
Time calculations on all platforms and languages are sensitive to changes to the system clock. To alleviate the potential problems associated with changing the system clock while an application is running, most modern operating systems provide a monotonic clock that operates independently of the system clock. A monotonic clock cannot be used to determine human-friendly clock times. A monotonic clock is used exclusively for calculating time intervals. Not all Ruby platforms provide access to an operating system monotonic clock. On these platforms a pure-Ruby monotonic clock will be used as a fallback. An operating system monotonic clock is both faster and more reliable than the pure-Ruby implementation. The pure-Ruby implementation should be fast and reliable enough for most non-realtime operations. At this time the common Ruby platforms that provide access to an operating system monotonic clock are MRI 2.1 and above and JRuby (all versions).
Executes a collection of tasks, each after a given delay. A master task monitors the set and schedules each task for execution at the appropriate time. Tasks are run on the global thread pool or on the supplied executor. Each task is represented as a ScheduledTask.
Constant Summary
Class Method Summary
- 
    
      .new(opts = {})  ⇒ TimerSet 
    
    constructor
    Create a new set of timed tasks. 
RubyExecutorService - Inherited
AbstractExecutorService - Inherited
| .new | Create a new thread pool. | 
Instance Attribute Summary
RubyExecutorService - Inherited
AbstractExecutorService - Inherited
| #auto_terminate= | 
 | 
| #auto_terminate? | Is the executor auto-terminate when the application exits? | 
| #fallback_policy, #name, | |
| #running? | Is the executor running? | 
| #shutdown | Begin an orderly shutdown. | 
| #shutdown? | Is the executor shutdown? | 
| #shuttingdown? | Is the executor shuttingdown? | 
| #ns_auto_terminate? | |
ExecutorService - Included
| #can_overflow? | Does the task queue have a maximum size? | 
| #serialized? | Does this executor guarantee serialization of its operations? | 
Instance Method Summary
- 
    
      #kill  
    
    Begin an immediate shutdown. 
- 
    
      #post(delay, *args) { ... } ⇒ Concurrent::ScheduledTask, false 
    
    Post a task to be execute run after a given delay (in seconds). 
- 
    
      #ns_initialize(opts)  
    
    private
    Initialize the object. 
- #ns_post_task(task) private
- #ns_reset_if_forked private
- 
    
      #ns_shutdown_execution  
    
    private
    ExecutorServicecallback called during shutdown.
- 
    
      #post_task(task)  
    
    private
    Post the task to the internal queue. 
- 
    
      #process_tasks  
    
    private
    Run a loop and execute tasks in the scheduled order and at the approximate scheduled time. 
- 
    
      #remove_task(task)  
    
    private
    Remove the given task from the queue. 
RubyExecutorService - Inherited
| #<< | Submit a task to the executor for asynchronous processing. | 
| #auto_terminate= | 
 | 
| #auto_terminate? | Is the executor auto-terminate when the application exits? | 
| #can_overflow? | Does the task queue have a maximum size? | 
| #kill | Begin an immediate shutdown. | 
| #post | Submit a task to the executor for asynchronous processing. | 
| #running? | Is the executor running? | 
| #serialized? | Does this executor guarantee serialization of its operations? | 
| #shutdown | Begin an orderly shutdown. | 
| #shutdown? | Is the executor shutdown? | 
| #shuttingdown? | Is the executor shuttingdown? | 
| #wait_for_termination | Block until executor shutdown is complete or until  | 
| #ns_shutdown_execution, #stop_event, #stopped_event | |
AbstractExecutorService - Inherited
| #<< | Submit a task to the executor for asynchronous processing. | 
| #can_overflow? | Does the task queue have a maximum size? | 
| #kill | Begin an immediate shutdown. | 
| #post | Submit a task to the executor for asynchronous processing. | 
| #serialized? | Does this executor guarantee serialization of its operations? | 
| #to_s, | |
| #wait_for_termination | Block until executor shutdown is complete or until  | 
| #fallback_action | Returns an action which executes the  | 
| #ns_execute, | |
| #ns_kill_execution | Callback method called when the executor has been killed. | 
| #ns_shutdown_execution | Callback method called when an orderly shutdown has completed. | 
Concern::Deprecation - Included
ExecutorService - Included
| #<< | Submit a task to the executor for asynchronous processing. | 
| #post | Submit a task to the executor for asynchronous processing. | 
Concern::Logging - Included
| #log | Logs through global_logger, it can be overridden by setting @logger. | 
Synchronization::LockableObject - Inherited
Constructor Details
    .new(opts = {})  ⇒ TimerSet 
  
Create a new set of timed tasks.
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 30
def initialize(opts = {}) super(opts) end
Instance Method Details
#kill
Begin an immediate shutdown. In-progress tasks will be allowed to complete but enqueued tasks will be dismissed and no new tasks will be accepted. Has no additional effect if the thread pool is not running.
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 62
def kill shutdown end
#ns_initialize(opts) (private)
Initialize the object.
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 74
def ns_initialize(opts) @queue = Collection::NonConcurrentPriorityQueue.new(order: :min) @task_executor = Options.(opts) || Concurrent.global_io_executor @timer_executor = SingleThreadExecutor.new @condition = Event.new @ruby_pid = $$ # detects if Ruby has forked end
#ns_post_task(task) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 94
def ns_post_task(task) return false unless ns_running? ns_reset_if_forked if (task.initial_delay) <= 0.01 task.executor.post { task.process_task } else @queue.push(task) # only post the process method when the queue is empty @timer_executor.post(&method(:process_tasks)) if @queue.length == 1 @condition.set end true end
#ns_reset_if_forked (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 129
def ns_reset_if_forked if $$ != @ruby_pid @queue.clear @condition.reset @ruby_pid = $$ end end
#ns_shutdown_execution (private)
ExecutorService callback called during shutdown.
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 122
def ns_shutdown_execution ns_reset_if_forked @queue.clear @timer_executor.kill stopped_event.set end
    #post(delay, *args) { ... } ⇒ Concurrent::ScheduledTask, false 
  
Post a task to be execute run after a given delay (in seconds). If the delay is less than 1/100th of a second the task will be immediately post to the executor.
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 48
def post(delay, *args, &task) raise ArgumentError.new('no block given') unless block_given? return false unless running? opts = { executor: @task_executor, args: args, timer_set: self } task = ScheduledTask.execute(delay, opts, &task) # may raise exception task.unscheduled? ? false : task end
#post_task(task) (private)
This is intended as a callback method from ScheduledTask only. It is not intended to be used directly. Post a task by using the SchedulesTask#execute method.
Post the task to the internal queue.
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 89
def post_task(task) synchronize { ns_post_task(task) } end
#process_tasks (private)
Run a loop and execute tasks in the scheduled order and at the approximate scheduled time. If no tasks remain the thread will exit gracefully so that garbage collection can occur. If there are no ready tasks it will sleep for up to 60 seconds waiting for the next scheduled task.
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 143
def process_tasks loop do task = synchronize { @condition.reset; @queue.peek } break unless task now = Concurrent.monotonic_time diff = task.schedule_time - now if diff <= 0 # We need to remove the task from the queue before passing # it to the executor, to avoid race conditions where we pass # the peek'ed task to the executor and then pop a different # one that's been added in the meantime. # # Note that there's no race condition between the peek and # this pop - this pop could retrieve a different task from # the peek, but that task would be due to fire now anyway # (because @queue is a priority queue, and this thread is # the only reader, so whatever timer is at the head of the # queue now must have the same pop time, or a closer one, as # when we peeked). task = synchronize { @queue.pop } begin task.executor.post { task.process_task } rescue RejectedExecutionError # ignore and continue end else @condition.wait([diff, 60].min) end end end
#remove_task(task) (private)
This is intended as a callback method from ScheduledTask only. It is not intended to be used directly. Cancel a task by using the ScheduledTask#cancel method.
Remove the given task from the queue.
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 115
def remove_task(task) synchronize { @queue.delete(task) } end