Class: Concurrent::RubyThreadPoolExecutor
Relationships & Source Files | |
Namespace Children | |
Classes:
| |
Extension / Inclusion / Inheritance Descendants | |
Subclasses:
|
|
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
|
|
Instance Chain:
|
|
Inherits: |
Concurrent::RubyExecutorService
|
Defined in: | lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb |
Overview
Failure to properly shutdown a thread pool can lead to unpredictable results. Please read *Shutting Down Thread Pools* for more information.
An abstraction composed of one or more threads and a task queue. Tasks (blocks or proc
objects) are submitted to the pool and added to the queue. The threads in the pool remove the tasks and execute them in the order they were received.
A ThreadPoolExecutor
will automatically adjust the pool size according to the bounds set by min-threads
and max-threads
. When a new task is submitted and fewer than min-threads
threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than min-threads
but less than max-threads
threads running, a new thread will be created only if the queue is full.
Threads that are idle for too long will be garbage collected, down to the configured minimum options. Should a thread crash it, too, will be garbage collected.
ThreadPoolExecutor
is based on the Java class of the same name. From the official Java documentation;
> Thread pools address two different problems: they usually provide > improved performance when executing large numbers of asynchronous tasks, > due to reduced per-task invocation overhead, and they provide a means > of bounding and managing the resources, including threads, consumed > when executing a collection of tasks. Each ThreadPoolExecutor also > maintains some basic statistics, such as the number of completed tasks. > > To be useful across a wide range of contexts, this class provides many > adjustable parameters and extensibility hooks. However, programmers are > urged to use the more convenient Executors factory methods > [CachedThreadPool] (unbounded thread pool, with automatic thread reclamation), > [FixedThreadPool] (fixed size thread pool) and [SingleThreadExecutor] (single > background thread), that preconfigure settings for the most common usage > scenarios. **Thread Pool Options**
Thread pools support several configuration options:
-
#idletime: The number of seconds that a thread may be idle before being reclaimed.
-
name
: The name of the executor (optional). Printed in the executor’s#to_s
output and a
name is given to its threads if supported by used Ruby implementation.-worker-
is uniq for each thread. -
#max_queue: The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches #max_queue and no new threads can be created, subsequent tasks will be rejected in accordance with the configured
fallback_policy
. -
auto_terminate
: When true (default), the threads started will be marked as daemon. -
fallback_policy
: The policy defining how rejected tasks are handled.
Three fallback policies are supported:
-
:abort
: Raise a RejectedExecutionError exception and discard the task. -
:discard
: Discard the task and return false. -
:caller_runs
: Execute the task on the calling thread.
**Shutting Down Thread Pools**
Killing a thread pool while tasks are still being processed, either by calling the #kill
method or at application exit, will have unpredictable results. There is no way for the thread pool to know what resources are being used by the in-progress tasks. When those tasks are killed the impact on those resources cannot be predicted. The best practice is to explicitly shutdown all thread pools using the provided methods:
-
Call
#shutdown
to initiate an orderly termination of all in-progress tasks -
Call
#wait_for_termination
with an appropriate timeout interval an allow the orderly shutdown to complete -
Call
#kill
*only when* the thread pool fails to shutdown in the allotted time
On some runtime platforms (most notably the JVM) the application will not exit until all thread pools have been shutdown. To prevent applications from “hanging” on exit, all threads can be marked as daemon according to the :auto_terminate
option.
“‘ruby pool1 = FixedThreadPool
.new(5) # threads will be marked as daemon pool2 = FixedThreadPool
.new(5, auto_terminate: false) # mark threads as non-daemon “`
Constant Summary
-
DEFAULT_MAX_POOL_SIZE =
Default maximum number of threads that will be created in the pool.
2_147_483_647
-
DEFAULT_MAX_QUEUE_SIZE =
Default maximum number of tasks that may be added to the task queue.
0
-
DEFAULT_MIN_POOL_SIZE =
Default minimum number of threads that will be retained in the pool.
0
-
DEFAULT_SYNCHRONOUS =
Default value of the
:synchronous
option.false
-
DEFAULT_THREAD_IDLETIMEOUT =
Default maximum number of seconds a thread in the pool may remain idle before being reclaimed.
60
Concern::Logging
- Included
AbstractExecutorService
- Inherited
Class Method Summary
-
.new(opts = {}) ⇒ RubyThreadPoolExecutor
constructor
Create a new thread pool.
RubyExecutorService
- Inherited
AbstractExecutorService
- Inherited
.new | Create a new thread pool. |
Instance Attribute Summary
-
#can_overflow? ⇒ Boolean
readonly
Does the task queue have a maximum size?
-
#idletime ⇒ Integer
readonly
The number of seconds that a thread may be idle before being reclaimed.
-
#max_length ⇒ Integer
readonly
The maximum number of threads that may be created in the pool.
-
#max_queue ⇒ Integer
readonly
The maximum number of tasks that may be waiting in the work queue at any one time.
-
#min_length ⇒ Integer
readonly
The minimum number of threads that may be retained in the pool.
-
#synchronous ⇒ true, false
readonly
Whether or not a value of 0 for
:max_queue
option means the queue must perform direct hand-off or rather unbounded queue. - #ns_limited_queue? ⇒ Boolean readonly private
RubyExecutorService
- Inherited
AbstractExecutorService
- Inherited
#auto_terminate= |
|
#auto_terminate? | Is the executor auto-terminate when the application exits? |
#fallback_policy, #name, | |
#running? | Is the executor running? |
#shutdown | Begin an orderly shutdown. |
#shutdown? | Is the executor shutdown? |
#shuttingdown? | Is the executor shuttingdown? |
#ns_auto_terminate? |
ExecutorService
- Included
#can_overflow? | Does the task queue have a maximum size? |
#serialized? | Does this executor guarantee serialization of its operations? |
Instance Method Summary
-
#active_count ⇒ Integer
The number of threads that are actively executing tasks.
-
#completed_task_count ⇒ Integer
The number of tasks that have been completed by the pool since construction.
-
#largest_length ⇒ Integer
The largest number of threads that have been created in the pool since construction.
-
#length ⇒ Integer
The number of threads currently in the pool.
-
#prune_pool
Prune the thread pool of unneeded threads.
-
#queue_length ⇒ Integer
The number of tasks in the queue awaiting execution.
-
#remaining_capacity ⇒ Integer
Number of tasks that may be enqueued before reaching #max_queue and rejecting new tasks.
-
#scheduled_task_count ⇒ Integer
The number of tasks that have been scheduled for execution on the pool since construction.
-
#ns_add_busy_worker ⇒ nil, Worker
private
creates new worker which has to receive work to do after it’s added.
-
#ns_assign_worker(*args, &task) ⇒ true, false
private
tries to assign task to a worker, tries to get one from @ready or to create new one.
-
#ns_enqueue(*args, &task) ⇒ true, false
private
tries to enqueue task.
- #ns_execute(*args, &task) private
- #ns_initialize(opts) private
- #ns_kill_execution private
-
#ns_prune_pool
private
try oldest worker if it is idle for enough time, it’s returned back at the start.
-
#ns_ready_worker(worker, last_message, success = true)
private
handle ready worker, giving it new job or assigning back to @ready.
-
#ns_remove_busy_worker(worker)
private
removes a worker which is not in not tracked in @ready.
- #ns_reset_if_forked private
- #ns_shutdown_execution private
- #ns_worker_died(worker) private
- #ready_worker(worker, last_message) private
- #remove_busy_worker(worker) private
- #worker_died(worker) private
- #worker_task_completed private
RubyExecutorService
- Inherited
#<< | Submit a task to the executor for asynchronous processing. |
#auto_terminate= |
|
#auto_terminate? | Is the executor auto-terminate when the application exits? |
#can_overflow? | Does the task queue have a maximum size? |
#kill | Begin an immediate shutdown. |
#post | Submit a task to the executor for asynchronous processing. |
#running? | Is the executor running? |
#serialized? | Does this executor guarantee serialization of its operations? |
#shutdown | Begin an orderly shutdown. |
#shutdown? | Is the executor shutdown? |
#shuttingdown? | Is the executor shuttingdown? |
#wait_for_termination | Block until executor shutdown is complete or until |
#ns_shutdown_execution, #stop_event, #stopped_event |
AbstractExecutorService
- Inherited
#<< | Submit a task to the executor for asynchronous processing. |
#can_overflow? | Does the task queue have a maximum size? |
#kill | Begin an immediate shutdown. |
#post | Submit a task to the executor for asynchronous processing. |
#serialized? | Does this executor guarantee serialization of its operations? |
#to_s, | |
#wait_for_termination | Block until executor shutdown is complete or until |
#fallback_action | Returns an action which executes the |
#ns_execute, | |
#ns_kill_execution | Callback method called when the executor has been killed. |
#ns_shutdown_execution | Callback method called when an orderly shutdown has completed. |
Concern::Deprecation
- Included
ExecutorService
- Included
#<< | Submit a task to the executor for asynchronous processing. |
#post | Submit a task to the executor for asynchronous processing. |
Concern::Logging
- Included
#log | Logs through global_logger, it can be overridden by setting @logger. |
Synchronization::LockableObject
- Inherited
Constructor Details
.new(opts = {}) ⇒ RubyThreadPoolExecutor
Create a new thread pool.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 45
def initialize(opts = {}) super(opts) end
Instance Attribute Details
#can_overflow? ⇒ Boolean
(readonly)
Does the task queue have a maximum size?
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 72
def can_overflow? synchronize { ns_limited_queue? } end
#idletime ⇒ Integer
(readonly)
The number of seconds that a thread may be idle before being reclaimed.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 36
attr_reader :idletime
#max_length ⇒ Integer
(readonly)
The maximum number of threads that may be created in the pool.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 30
attr_reader :max_length
#max_queue ⇒ Integer
(readonly)
The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches max_queue
subsequent tasks will be rejected in accordance with the configured fallback_policy
.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 39
attr_reader :max_queue
#min_length ⇒ Integer
(readonly)
The minimum number of threads that may be retained in the pool.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 33
attr_reader :min_length
#ns_limited_queue? ⇒ Boolean
(readonly, private)
[ GitHub ]
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 155
def ns_limited_queue? @max_queue != 0 end
#synchronous ⇒ true
, false
(readonly)
Whether or not a value of 0 for :max_queue
option means the queue must perform direct hand-off or rather unbounded queue.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 42
attr_reader :synchronous
Instance Method Details
#active_count ⇒ Integer
The number of threads that are actively executing tasks.
#completed_task_count ⇒ Integer
The number of tasks that have been completed by the pool since construction.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 60
def completed_task_count synchronize { @completed_task_count } end
#largest_length ⇒ Integer
The largest number of threads that have been created in the pool since construction.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 50
def largest_length synchronize { @largest_length } end
#length ⇒ Integer
The number of threads currently in the pool.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 77
def length synchronize { @pool.length } end
#ns_add_busy_worker ⇒ nil
, Worker (private)
creates new worker which has to receive work to do after it’s added
#ns_assign_worker(*args, &task) ⇒ true
, false
(private)
tries to assign task to a worker, tries to get one from @ready or to create new one
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 201
def ns_assign_worker(*args, &task) # keep growing if the pool is not at the minimum yet worker, _ = (@ready.pop if @pool.size >= @min_length) || ns_add_busy_worker if worker worker << [task, args] true else false end rescue ThreadError # Raised when the operating system refuses to create the new thread return false end
#ns_enqueue(*args, &task) ⇒ true
, false
(private)
tries to enqueue task
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 219
def ns_enqueue(*args, &task) return false if @synchronous if !ns_limited_queue? || @queue.size < @max_queue @queue << [task, args] true else false end end
#ns_execute(*args, &task) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 160
def ns_execute(*args, &task) ns_reset_if_forked if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task) @scheduled_task_count += 1 else return fallback_action(*args, &task) end ns_prune_pool if @next_gc_time < Concurrent.monotonic_time nil end
#ns_initialize(opts) (private)
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 125
def ns_initialize(opts) @min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i @max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i @idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i @synchronous = opts.fetch(:synchronous, DEFAULT_SYNCHRONOUS) @fallback_policy = opts.fetch(:fallback_policy, :abort) raise ArgumentError.new("`synchronous` cannot be set unless `max_queue` is 0") if @synchronous && @max_queue > 0 raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy) raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @max_length < DEFAULT_MIN_POOL_SIZE raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if @max_length > DEFAULT_MAX_POOL_SIZE raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @min_length < DEFAULT_MIN_POOL_SIZE raise ArgumentError.new("`min_threads` cannot be more than `max_threads`") if min_length > max_length @pool = [] # all workers @ready = [] # used as a stash (most idle worker is at the start) @queue = [] # used as queue # @ready or @queue is empty at all times @scheduled_task_count = 0 @completed_task_count = 0 @largest_length = 0 @workers_counter = 0 @ruby_pid = $$ # detects if Ruby has forked @gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented @next_gc_time = Concurrent.monotonic_time + @gc_interval end
#ns_kill_execution (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 189
def ns_kill_execution # TODO log out unprocessed tasks in queue # TODO try to shutdown first? @pool.each(&:kill) @pool.clear @ready.clear end
#ns_prune_pool (private)
try oldest worker if it is idle for enough time, it’s returned back at the start
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 280
def ns_prune_pool now = Concurrent.monotonic_time stopped_workers = 0 while !@ready.empty? && (@pool.size - stopped_workers > @min_length) worker, = @ready.first if now - > self.idletime stopped_workers += 1 @ready.shift worker << :stop else break end end @next_gc_time = Concurrent.monotonic_time + @gc_interval end
#ns_ready_worker(worker, last_message, success = true) (private)
handle ready worker, giving it new job or assigning back to @ready
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 253
def ns_ready_worker(worker, , success = true) task_and_args = @queue.shift if task_and_args worker << task_and_args else # stop workers when !running?, do not return them to @ready if running? raise unless @ready.push([worker, ]) else worker.stop end end end
#ns_remove_busy_worker(worker) (private)
removes a worker which is not in not tracked in @ready
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 271
def ns_remove_busy_worker(worker) @pool.delete(worker) stopped_event.set if @pool.empty? && !running? true end
#ns_reset_if_forked (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 296
def ns_reset_if_forked if $$ != @ruby_pid @queue.clear @ready.clear @pool.clear @scheduled_task_count = 0 @completed_task_count = 0 @largest_length = 0 @workers_counter = 0 @ruby_pid = $$ end end
#ns_shutdown_execution (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 174
def ns_shutdown_execution ns_reset_if_forked if @pool.empty? # nothing to do stopped_event.set end if @queue.empty? # no more tasks will be accepted, just stop all workers @pool.each(&:stop) end end
#ns_worker_died(worker) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 231
def ns_worker_died(worker) ns_remove_busy_worker worker replacement_worker = ns_add_busy_worker ns_ready_worker replacement_worker, Concurrent.monotonic_time, false if replacement_worker end
#prune_pool
Prune the thread pool of unneeded threads
What is being pruned is controlled by the min_threads and idletime parameters passed at pool creation time
This is a no-op on some pool implementation (e.g. the Java one). The Ruby pool will auto-prune each time a new job is posted. You will need to call this method explicitely in case your application post jobs in bursts (a lot of jobs and then nothing for long periods)
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 118
def prune_pool synchronize { ns_prune_pool } end
#queue_length ⇒ Integer
The number of tasks in the queue awaiting execution.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 82
def queue_length synchronize { @queue.length } end
#ready_worker(worker, last_message) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 103
def ready_worker(worker, ) synchronize { ns_ready_worker worker, } end
#remaining_capacity ⇒ Integer
Number of tasks that may be enqueued before reaching #max_queue and rejecting new tasks. A value of -1 indicates that the queue may grow without bound.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 87
def remaining_capacity synchronize do if ns_limited_queue? @max_queue - @queue.length else -1 end end end
#remove_busy_worker(worker) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 98
def remove_busy_worker(worker) synchronize { ns_remove_busy_worker worker } end
#scheduled_task_count ⇒ Integer
The number of tasks that have been scheduled for execution on the pool since construction.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 55
def scheduled_task_count synchronize { @scheduled_task_count } end
#worker_died(worker) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 108
def worker_died(worker) synchronize { ns_worker_died worker } end
#worker_task_completed (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 113
def worker_task_completed synchronize { @completed_task_count += 1 } end