Class: Concurrent::TimerSet
Relationships & Source Files | |
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
|
|
Instance Chain:
|
|
Inherits: | Concurrent::RubyExecutorService |
Defined in: | lib/concurrent-ruby/concurrent/executor/timer_set.rb |
Overview
Time calculations on all platforms and languages are sensitive to changes to the system clock. To alleviate the potential problems associated with changing the system clock while an application is running, most modern operating systems provide a monotonic clock that operates independently of the system clock. A monotonic clock cannot be used to determine human-friendly clock times. A monotonic clock is used exclusively for calculating time intervals. Not all Ruby platforms provide access to an operating system monotonic clock. On these platforms a pure-Ruby monotonic clock will be used as a fallback. An operating system monotonic clock is both faster and more reliable than the pure-Ruby implementation. The pure-Ruby implementation should be fast and reliable enough for most non-realtime operations. At this time the common Ruby platforms that provide access to an operating system monotonic clock are MRI 2.1 and above and JRuby (all versions).
Executes a collection of tasks, each after a given delay. A master task monitors the set and schedules each task for execution at the appropriate time. Tasks are run on the global thread pool or on the supplied executor. Each task is represented as a ScheduledTask
.
Constant Summary
Class Method Summary
-
.new(opts = {}) ⇒ TimerSet
constructor
Create a new set of timed tasks.
RubyExecutorService
- Inherited
AbstractExecutorService
- Inherited
.new | Create a new thread pool. |
Instance Attribute Summary
RubyExecutorService
- Inherited
AbstractExecutorService
- Inherited
#auto_terminate= |
|
#auto_terminate? | Is the executor auto-terminate when the application exits? |
#fallback_policy, #name, | |
#running? | Is the executor running? |
#shutdown | Begin an orderly shutdown. |
#shutdown? | Is the executor shutdown? |
#shuttingdown? | Is the executor shuttingdown? |
#ns_auto_terminate? |
ExecutorService
- Included
#can_overflow? | Does the task queue have a maximum size? |
#serialized? | Does this executor guarantee serialization of its operations? |
Instance Method Summary
-
#kill
Begin an immediate shutdown.
-
#post(delay, *args) { ... } ⇒ Concurrent::ScheduledTask, false
Post a task to be execute run after a given delay (in seconds).
-
#ns_initialize(opts)
private
Initialize the object.
- #ns_post_task(task) private
- #ns_reset_if_forked private
-
#ns_shutdown_execution
private
ExecutorService
callback called during shutdown. -
#post_task(task)
private
Post the task to the internal queue.
-
#process_tasks
private
Run a loop and execute tasks in the scheduled order and at the approximate scheduled time.
-
#remove_task(task)
private
Remove the given task from the queue.
RubyExecutorService
- Inherited
#<< | Submit a task to the executor for asynchronous processing. |
#auto_terminate= |
|
#auto_terminate? | Is the executor auto-terminate when the application exits? |
#can_overflow? | Does the task queue have a maximum size? |
#kill | Begin an immediate shutdown. |
#post | Submit a task to the executor for asynchronous processing. |
#running? | Is the executor running? |
#serialized? | Does this executor guarantee serialization of its operations? |
#shutdown | Begin an orderly shutdown. |
#shutdown? | Is the executor shutdown? |
#shuttingdown? | Is the executor shuttingdown? |
#wait_for_termination | Block until executor shutdown is complete or until |
#ns_shutdown_execution, #stop_event, #stopped_event |
AbstractExecutorService
- Inherited
#<< | Submit a task to the executor for asynchronous processing. |
#can_overflow? | Does the task queue have a maximum size? |
#kill | Begin an immediate shutdown. |
#post | Submit a task to the executor for asynchronous processing. |
#serialized? | Does this executor guarantee serialization of its operations? |
#to_s, | |
#wait_for_termination | Block until executor shutdown is complete or until |
#fallback_action | Returns an action which executes the |
#ns_execute, | |
#ns_kill_execution | Callback method called when the executor has been killed. |
#ns_shutdown_execution | Callback method called when an orderly shutdown has completed. |
Concern::Deprecation
- Included
ExecutorService
- Included
#<< | Submit a task to the executor for asynchronous processing. |
#post | Submit a task to the executor for asynchronous processing. |
Concern::Logging
- Included
#log | Logs through global_logger, it can be overridden by setting @logger. |
Synchronization::LockableObject
- Inherited
Constructor Details
.new(opts = {}) ⇒ TimerSet
Create a new set of timed tasks.
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 30
def initialize(opts = {}) super(opts) end
Instance Method Details
#kill
Begin an immediate shutdown. In-progress tasks will be allowed to complete but enqueued tasks will be dismissed and no new tasks will be accepted. Has no additional effect if the thread pool is not running.
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 62
def kill shutdown end
#ns_initialize(opts) (private)
Initialize the object.
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 74
def ns_initialize(opts) @queue = Collection::NonConcurrentPriorityQueue.new(order: :min) @task_executor = Options. (opts) || Concurrent.global_io_executor @timer_executor = SingleThreadExecutor.new @condition = Event.new @ruby_pid = $$ # detects if Ruby has forked end
#ns_post_task(task) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 94
def ns_post_task(task) return false unless ns_running? ns_reset_if_forked if (task.initial_delay) <= 0.01 task.executor.post { task.process_task } else @queue.push(task) # only post the process method when the queue is empty @timer_executor.post(&method(:process_tasks)) if @queue.length == 1 @condition.set end true end
#ns_reset_if_forked (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 129
def ns_reset_if_forked if $$ != @ruby_pid @queue.clear @condition.reset @ruby_pid = $$ end end
#ns_shutdown_execution (private)
ExecutorService
callback called during shutdown.
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 122
def ns_shutdown_execution ns_reset_if_forked @queue.clear @timer_executor.kill stopped_event.set end
#post(delay, *args) { ... } ⇒ Concurrent::ScheduledTask, false
Post a task to be execute run after a given delay (in seconds). If the delay is less than 1/100th of a second the task will be immediately post to the executor.
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 48
def post(delay, *args, &task) raise ArgumentError.new('no block given') unless block_given? return false unless running? opts = { executor: @task_executor, args: args, timer_set: self } task = ScheduledTask.execute(delay, opts, &task) # may raise exception task.unscheduled? ? false : task end
#post_task(task) (private)
This is intended as a callback method from ScheduledTask
only. It is not intended to be used directly. Post a task by using the SchedulesTask#execute
method.
Post the task to the internal queue.
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 89
def post_task(task) synchronize { ns_post_task(task) } end
#process_tasks (private)
Run a loop and execute tasks in the scheduled order and at the approximate scheduled time. If no tasks remain the thread will exit gracefully so that garbage collection can occur. If there are no ready tasks it will sleep for up to 60 seconds waiting for the next scheduled task.
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 143
def process_tasks loop do task = synchronize { @condition.reset; @queue.peek } break unless task now = Concurrent.monotonic_time diff = task.schedule_time - now if diff <= 0 # We need to remove the task from the queue before passing # it to the executor, to avoid race conditions where we pass # the peek'ed task to the executor and then pop a different # one that's been added in the meantime. # # Note that there's no race condition between the peek and # this pop - this pop could retrieve a different task from # the peek, but that task would be due to fire now anyway # (because @queue is a priority queue, and this thread is # the only reader, so whatever timer is at the head of the # queue now must have the same pop time, or a closer one, as # when we peeked). task = synchronize { @queue.pop } begin task.executor.post { task.process_task } rescue RejectedExecutionError # ignore and continue end else @condition.wait([diff, 60].min) end end end
#remove_task(task) (private)
This is intended as a callback method from ScheduledTask
only. It is not intended to be used directly. Cancel a task by using the ScheduledTask#cancel method.
Remove the given task from the queue.
# File 'lib/concurrent-ruby/concurrent/executor/timer_set.rb', line 115
def remove_task(task) synchronize { @queue.delete(task) } end