Class: Concurrent::RubyThreadPoolExecutor
| Relationships & Source Files | |
| Namespace Children | |
|
Classes:
| |
| Extension / Inclusion / Inheritance Descendants | |
|
Subclasses:
|
|
| Super Chains via Extension / Inclusion / Inheritance | |
|
Class Chain:
|
|
|
Instance Chain:
|
|
| Inherits: |
Concurrent::RubyExecutorService
|
| Defined in: | lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb |
Overview
Failure to properly shutdown a thread pool can lead to unpredictable results. Please read *Shutting Down Thread Pools* for more information.
An abstraction composed of one or more threads and a task queue. Tasks (blocks or proc objects) are submitted to the pool and added to the queue. The threads in the pool remove the tasks and execute them in the order they were received.
A ThreadPoolExecutor will automatically adjust the pool size according to the bounds set by min-threads and max-threads. When a new task is submitted and fewer than min-threads threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than min-threads but less than max-threads threads running, a new thread will be created only if the queue is full.
Threads that are idle for too long will be garbage collected, down to the configured minimum options. Should a thread crash it, too, will be garbage collected.
ThreadPoolExecutor is based on the Java class of the same name. From the official Java documentation;
> Thread pools address two different problems: they usually provide > improved performance when executing large numbers of asynchronous tasks, > due to reduced per-task invocation overhead, and they provide a means > of bounding and managing the resources, including threads, consumed > when executing a collection of tasks. Each ThreadPoolExecutor also > maintains some basic statistics, such as the number of completed tasks. > > To be useful across a wide range of contexts, this class provides many > adjustable parameters and extensibility hooks. However, programmers are > urged to use the more convenient Executors factory methods > [CachedThreadPool] (unbounded thread pool, with automatic thread reclamation), > [FixedThreadPool] (fixed size thread pool) and [SingleThreadExecutor] (single > background thread), that preconfigure settings for the most common usage > scenarios. **Thread Pool Options**
Thread pools support several configuration options:
-
#idletime: The number of seconds that a thread may be idle before being reclaimed.
-
name: The name of the executor (optional). Printed in the executor’s#to_soutput and aname is given to its threads if supported by used Ruby implementation.-worker- is uniq for each thread. -
#max_queue: The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches #max_queue and no new threads can be created, subsequent tasks will be rejected in accordance with the configured
fallback_policy. -
auto_terminate: When true (default), the threads started will be marked as daemon. -
fallback_policy: The policy defining how rejected tasks are handled.
Three fallback policies are supported:
-
:abort: Raise a RejectedExecutionError exception and discard the task. -
:discard: Discard the task and return false. -
:caller_runs: Execute the task on the calling thread.
**Shutting Down Thread Pools**
Killing a thread pool while tasks are still being processed, either by calling the #kill method or at application exit, will have unpredictable results. There is no way for the thread pool to know what resources are being used by the in-progress tasks. When those tasks are killed the impact on those resources cannot be predicted. The best practice is to explicitly shutdown all thread pools using the provided methods:
-
Call
#shutdownto initiate an orderly termination of all in-progress tasks -
Call
#wait_for_terminationwith an appropriate timeout interval an allow the orderly shutdown to complete -
Call
#kill*only when* the thread pool fails to shutdown in the allotted time
On some runtime platforms (most notably the JVM) the application will not exit until all thread pools have been shutdown. To prevent applications from “hanging” on exit, all threads can be marked as daemon according to the :auto_terminate option.
“‘ruby pool1 = Concurrent::FixedThreadPool.new(5) # threads will be marked as daemon pool2 = Concurrent::FixedThreadPool.new(5, auto_terminate: false) # mark threads as non-daemon “`
Constant Summary
-
DEFAULT_MAX_POOL_SIZE =
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 17
Default maximum number of threads that will be created in the pool.
2_147_483_647 -
DEFAULT_MAX_QUEUE_SIZE =
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 23
Default maximum number of tasks that may be added to the task queue.
0 -
DEFAULT_MIN_POOL_SIZE =
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 20
Default minimum number of threads that will be retained in the pool.
0 -
DEFAULT_SYNCHRONOUS =
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 29
Default value of the
:synchronousoption.false -
DEFAULT_THREAD_IDLETIMEOUT =
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 26
Default maximum number of seconds a thread in the pool may remain idle before being reclaimed.
60
Concern::Logging - Included
AbstractExecutorService - Inherited
Class Method Summary
-
.new(opts = {}) ⇒ RubyThreadPoolExecutor
constructor
Create a new thread pool.
RubyExecutorService - Inherited
AbstractExecutorService - Inherited
| .new | Create a new thread pool. |
Instance Attribute Summary
-
#can_overflow? ⇒ Boolean
readonly
Does the task queue have a maximum size?
-
#idletime ⇒ Integer
readonly
The number of seconds that a thread may be idle before being reclaimed.
-
#max_length ⇒ Integer
readonly
The maximum number of threads that may be created in the pool.
-
#max_queue ⇒ Integer
readonly
The maximum number of tasks that may be waiting in the work queue at any one time.
-
#min_length ⇒ Integer
readonly
The minimum number of threads that may be retained in the pool.
-
#synchronous ⇒ true, false
readonly
Whether or not a value of 0 for
:max_queueoption means the queue must perform direct hand-off or rather unbounded queue. - #ns_limited_queue? ⇒ Boolean readonly private
RubyExecutorService - Inherited
AbstractExecutorService - Inherited
| #auto_terminate= |
|
| #auto_terminate? | Is the executor auto-terminate when the application exits? |
| #fallback_policy, #name, | |
| #running? | Is the executor running? |
| #shutdown | Begin an orderly shutdown. |
| #shutdown? | Is the executor shutdown? |
| #shuttingdown? | Is the executor shuttingdown? |
| #ns_auto_terminate? | |
ExecutorService - Included
| #can_overflow? | Does the task queue have a maximum size? |
| #serialized? | Does this executor guarantee serialization of its operations? |
Instance Method Summary
-
#active_count ⇒ Integer
The number of threads that are actively executing tasks.
-
#completed_task_count ⇒ Integer
The number of tasks that have been completed by the pool since construction.
-
#largest_length ⇒ Integer
The largest number of threads that have been created in the pool since construction.
-
#length ⇒ Integer
The number of threads currently in the pool.
-
#prune_pool
Prune the thread pool of unneeded threads.
-
#queue_length ⇒ Integer
The number of tasks in the queue awaiting execution.
-
#remaining_capacity ⇒ Integer
Number of tasks that may be enqueued before reaching #max_queue and rejecting new tasks.
-
#scheduled_task_count ⇒ Integer
The number of tasks that have been scheduled for execution on the pool since construction.
-
#ns_add_busy_worker ⇒ nil, Worker
private
creates new worker which has to receive work to do after it’s added.
-
#ns_assign_worker(*args, &task) ⇒ true, false
private
tries to assign task to a worker, tries to get one from @ready or to create new one.
-
#ns_enqueue(*args, &task) ⇒ true, false
private
tries to enqueue task.
- #ns_execute(*args, &task) private
- #ns_initialize(opts) private
- #ns_kill_execution private
- #ns_prunable_capacity ⇒ Integer private
-
#ns_ready_worker(worker, last_message, success = true)
private
handle ready worker, giving it new job or assigning back to @ready.
-
#ns_remove_busy_worker(worker)
private
removes a worker which is not tracked in @ready.
- #ns_remove_ready_worker(worker) private
- #ns_reset_if_forked private
- #ns_shutdown_execution private
- #ns_worker_died(worker) private
-
#prune_worker(worker) ⇒ true, false
private
removes the worker if it can be pruned.
- #ready_worker(worker, last_message) private
- #remove_worker(worker) private
- #worker_died(worker) private
- #worker_task_completed private
RubyExecutorService - Inherited
| #<< | Submit a task to the executor for asynchronous processing. |
| #auto_terminate= |
|
| #auto_terminate? | Is the executor auto-terminate when the application exits? |
| #can_overflow? | Does the task queue have a maximum size? |
| #kill | Begin an immediate shutdown. |
| #post | Submit a task to the executor for asynchronous processing. |
| #running? | Is the executor running? |
| #serialized? | Does this executor guarantee serialization of its operations? |
| #shutdown | Begin an orderly shutdown. |
| #shutdown? | Is the executor shutdown? |
| #shuttingdown? | Is the executor shuttingdown? |
| #wait_for_termination | Block until executor shutdown is complete or until |
| #ns_shutdown_execution, #stop_event, #stopped_event | |
AbstractExecutorService - Inherited
| #<< | Submit a task to the executor for asynchronous processing. |
| #can_overflow? | Does the task queue have a maximum size? |
| #kill | Begin an immediate shutdown. |
| #post | Submit a task to the executor for asynchronous processing. |
| #serialized? | Does this executor guarantee serialization of its operations? |
| #to_s, | |
| #wait_for_termination | Block until executor shutdown is complete or until |
| #fallback_action | Returns an action which executes the |
| #ns_execute, | |
| #ns_kill_execution | Callback method called when the executor has been killed. |
| #ns_shutdown_execution | Callback method called when an orderly shutdown has completed. |
Concern::Deprecation - Included
ExecutorService - Included
| #<< | Submit a task to the executor for asynchronous processing. |
| #post | Submit a task to the executor for asynchronous processing. |
Concern::Logging - Included
| #log | Logs through global_logger, it can be overridden by setting @logger. |
Synchronization::LockableObject - Inherited
Constructor Details
.new(opts = {}) ⇒ RubyThreadPoolExecutor
Create a new thread pool.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 47
def initialize(opts = {}) super(opts) end
Instance Attribute Details
#can_overflow? ⇒ Boolean (readonly)
Does the task queue have a maximum size?
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 74
def can_overflow? synchronize { ns_limited_queue? } end
#idletime ⇒ Integer (readonly)
The number of seconds that a thread may be idle before being reclaimed.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 38
attr_reader :idletime
#max_length ⇒ Integer (readonly)
The maximum number of threads that may be created in the pool.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 32
attr_reader :max_length
#max_queue ⇒ Integer (readonly)
The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches max_queue subsequent tasks will be rejected in accordance with the configured fallback_policy.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 41
attr_reader :max_queue
#min_length ⇒ Integer (readonly)
The minimum number of threads that may be retained in the pool.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 35
attr_reader :min_length
#ns_limited_queue? ⇒ Boolean (readonly, private)
[ GitHub ]
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 173
def ns_limited_queue? @max_queue != 0 end
#synchronous ⇒ true, false (readonly)
Whether or not a value of 0 for :max_queue option means the queue must perform direct hand-off or rather unbounded queue.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 44
attr_reader :synchronous
Instance Method Details
#active_count ⇒ Integer
The number of threads that are actively executing tasks.
#completed_task_count ⇒ Integer
The number of tasks that have been completed by the pool since construction.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 62
def completed_task_count synchronize { @completed_task_count } end
#largest_length ⇒ Integer
The largest number of threads that have been created in the pool since construction.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 52
def largest_length synchronize { @largest_length } end
#length ⇒ Integer
The number of threads currently in the pool.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 79
def length synchronize { @pool.length } end
#ns_add_busy_worker ⇒ nil, Worker (private)
creates new worker which has to receive work to do after it’s added
#ns_assign_worker(*args, &task) ⇒ true, false (private)
tries to assign task to a worker, tries to get one from @ready or to create new one
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 217
def ns_assign_worker(*args, &task) # keep growing if the pool is not at the minimum yet worker, _ = (@ready.pop if @pool.size >= @min_length) || ns_add_busy_worker if worker worker << [task, args] true else false end rescue ThreadError # Raised when the operating system refuses to create the new thread return false end
#ns_enqueue(*args, &task) ⇒ true, false (private)
tries to enqueue task
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 235
def ns_enqueue(*args, &task) return false if @synchronous if !ns_limited_queue? || @queue.size < @max_queue @queue << [task, args] true else false end end
#ns_execute(*args, &task) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 178
def ns_execute(*args, &task) ns_reset_if_forked if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task) @scheduled_task_count += 1 nil else fallback_action(*args, &task) end end
#ns_initialize(opts) (private)
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 146
def ns_initialize(opts) @min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i @max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i @idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i @synchronous = opts.fetch(:synchronous, DEFAULT_SYNCHRONOUS) @fallback_policy = opts.fetch(:fallback_policy, :abort) raise ArgumentError.new("`synchronous` cannot be set unless `max_queue` is 0") if @synchronous && @max_queue > 0 raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy) raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @max_length < DEFAULT_MIN_POOL_SIZE raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if @max_length > DEFAULT_MAX_POOL_SIZE raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @min_length < DEFAULT_MIN_POOL_SIZE raise ArgumentError.new("`min_threads` cannot be more than `max_threads`") if min_length > max_length @pool = [] # all workers @ready = [] # used as a stash (most idle worker is at the start) @queue = [] # used as queue # @ready or @queue is empty at all times @scheduled_task_count = 0 @completed_task_count = 0 @largest_length = 0 @workers_counter = 0 @ruby_pid = $$ # detects if Ruby has forked end
#ns_kill_execution (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 205
def ns_kill_execution # TODO log out unprocessed tasks in queue # TODO try to shutdown first? @pool.each(&:kill) @pool.clear @ready.clear end
#ns_prunable_capacity ⇒ Integer (private)
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 305
def ns_prunable_capacity if running? [@pool.size - @min_length, @ready.size].min else @pool.size end end
#ns_ready_worker(worker, last_message, success = true) (private)
handle ready worker, giving it new job or assigning back to @ready
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 269
def ns_ready_worker(worker, , success = true) task_and_args = @queue.shift if task_and_args worker << task_and_args else # stop workers when !running?, do not return them to @ready if running? raise unless @ready.push([worker, ]) else worker.stop end end end
#ns_remove_busy_worker(worker) (private)
removes a worker which is not tracked in @ready
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 287
def ns_remove_busy_worker(worker) @pool.delete(worker) stopped_event.set if @pool.empty? && !running? true end
#ns_remove_ready_worker(worker) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 294
def ns_remove_ready_worker(worker) if index = @ready.index { |rw, _| rw == worker } @ready.delete_at(index) end true end
#ns_reset_if_forked (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 314
def ns_reset_if_forked if $$ != @ruby_pid @queue.clear @ready.clear @pool.clear @scheduled_task_count = 0 @completed_task_count = 0 @largest_length = 0 @workers_counter = 0 @ruby_pid = $$ end end
#ns_shutdown_execution (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 190
def ns_shutdown_execution ns_reset_if_forked if @pool.empty? # nothing to do stopped_event.set end if @queue.empty? # no more tasks will be accepted, just stop all workers @pool.each(&:stop) end end
#ns_worker_died(worker) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 247
def ns_worker_died(worker) ns_remove_busy_worker worker replacement_worker = ns_add_busy_worker ns_ready_worker replacement_worker, Concurrent.monotonic_time, false if replacement_worker end
#prune_pool
Prune the thread pool of unneeded threads
What is being pruned is controlled by the min_threads and idletime parameters passed at pool creation time
This is a no-op on all pool implementations as they prune themselves automatically, and has been deprecated.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 139
def prune_pool deprecated "#prune_pool has no effect and will be removed in next the release, see https://github.com/ruby-concurrency/concurrent-ruby/pull/1082." end
#prune_worker(worker) ⇒ true, false (private)
removes the worker if it can be pruned
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 104
def prune_worker(worker) synchronize do if ns_prunable_capacity > 0 remove_worker worker true else false end end end
#queue_length ⇒ Integer
The number of tasks in the queue awaiting execution.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 84
def queue_length synchronize { @queue.length } end
#ready_worker(worker, last_message) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 124
def ready_worker(worker, ) synchronize { ns_ready_worker worker, } end
#remaining_capacity ⇒ Integer
Number of tasks that may be enqueued before reaching #max_queue and rejecting new tasks. A value of -1 indicates that the queue may grow without bound.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 89
def remaining_capacity synchronize do if ns_limited_queue? @max_queue - @queue.length else -1 end end end
#remove_worker(worker) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 116
def remove_worker(worker) synchronize do ns_remove_ready_worker worker ns_remove_busy_worker worker end end
#scheduled_task_count ⇒ Integer
The number of tasks that have been scheduled for execution on the pool since construction.
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 57
def scheduled_task_count synchronize { @scheduled_task_count } end
#worker_died(worker) (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 129
def worker_died(worker) synchronize { ns_worker_died worker } end
#worker_task_completed (private)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 134
def worker_task_completed synchronize { @completed_task_count += 1 } end