Class: Concurrent::JavaThreadPoolExecutor
Relationships & Source Files | |
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
|
|
Instance Chain:
|
|
Inherits: |
Concurrent::JavaExecutorService
|
Defined in: | lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb |
Overview
Failure to properly shutdown a thread pool can lead to unpredictable results. Please read *Shutting Down Thread Pools* for more information.
An abstraction composed of one or more threads and a task queue. Tasks (blocks or proc
objects) are submitted to the pool and added to the queue. The threads in the pool remove the tasks and execute them in the order they were received.
A ThreadPoolExecutor
will automatically adjust the pool size according to the bounds set by min-threads
and max-threads
. When a new task is submitted and fewer than min-threads
threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than min-threads
but less than max-threads
threads running, a new thread will be created only if the queue is full.
Threads that are idle for too long will be garbage collected, down to the configured minimum options. Should a thread crash it, too, will be garbage collected.
ThreadPoolExecutor
is based on the Java class of the same name. From the official Java documentation;
> Thread pools address two different problems: they usually provide > improved performance when executing large numbers of asynchronous tasks, > due to reduced per-task invocation overhead, and they provide a means > of bounding and managing the resources, including threads, consumed > when executing a collection of tasks. Each ThreadPoolExecutor also > maintains some basic statistics, such as the number of completed tasks. > > To be useful across a wide range of contexts, this class provides many > adjustable parameters and extensibility hooks. However, programmers are > urged to use the more convenient Executors factory methods > [CachedThreadPool] (unbounded thread pool, with automatic thread reclamation), > [FixedThreadPool] (fixed size thread pool) and [SingleThreadExecutor] (single > background thread), that preconfigure settings for the most common usage > scenarios. **Thread Pool Options**
Thread pools support several configuration options:
-
#idletime: The number of seconds that a thread may be idle before being reclaimed.
-
name
: The name of the executor (optional). Printed in the executor’s#to_s
output and a
name is given to its threads if supported by used Ruby implementation.-worker-
is uniq for each thread. -
#max_queue: The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches #max_queue and no new threads can be created, subsequent tasks will be rejected in accordance with the configured
fallback_policy
. -
auto_terminate
: When true (default), the threads started will be marked as daemon. -
fallback_policy
: The policy defining how rejected tasks are handled.
Three fallback policies are supported:
-
:abort
: Raise a RejectedExecutionError exception and discard the task. -
:discard
: Discard the task and return false. -
:caller_runs
: Execute the task on the calling thread.
**Shutting Down Thread Pools**
Killing a thread pool while tasks are still being processed, either by calling the #kill
method or at application exit, will have unpredictable results. There is no way for the thread pool to know what resources are being used by the in-progress tasks. When those tasks are killed the impact on those resources cannot be predicted. The best practice is to explicitly shutdown all thread pools using the provided methods:
-
Call
#shutdown
to initiate an orderly termination of all in-progress tasks -
Call
#wait_for_termination
with an appropriate timeout interval an allow the orderly shutdown to complete -
Call
#kill
*only when* the thread pool fails to shutdown in the allotted time
On some runtime platforms (most notably the JVM) the application will not exit until all thread pools have been shutdown. To prevent applications from “hanging” on exit, all threads can be marked as daemon according to the :auto_terminate
option.
“‘ruby pool1 = FixedThreadPool
.new(5) # threads will be marked as daemon pool2 = FixedThreadPool
.new(5, auto_terminate: false) # mark threads as non-daemon “`
Constant Summary
-
DEFAULT_MAX_POOL_SIZE =
Default maximum number of threads that will be created in the pool.
java.lang.Integer::MAX_VALUE
-
DEFAULT_MAX_QUEUE_SIZE =
Default maximum number of tasks that may be added to the task queue.
0
-
DEFAULT_MIN_POOL_SIZE =
Default minimum number of threads that will be retained in the pool.
0
-
DEFAULT_SYNCHRONOUS =
Default value of the
:synchronous
option.false
-
DEFAULT_THREAD_IDLETIMEOUT =
Default maximum number of seconds a thread in the pool may remain idle before being reclaimed.
60
Concern::Logging
- Included
AbstractExecutorService
- Inherited
JavaExecutorService
- Inherited
Class Method Summary
-
.new(opts = {}) ⇒ JavaThreadPoolExecutor
constructor
Create a new thread pool.
AbstractExecutorService
- Inherited
.new | Create a new thread pool. |
Instance Attribute Summary
-
#can_overflow? ⇒ Boolean
readonly
Does the task queue have a maximum size?
-
#max_length ⇒ Integer
readonly
The maximum number of threads that may be created in the pool.
-
#max_queue ⇒ Integer
readonly
The maximum number of tasks that may be waiting in the work queue at any one time.
-
#running? ⇒ Boolean
readonly
Is the executor running?
-
#synchronous ⇒ true, false
readonly
Whether or not a value of 0 for
:max_queue
option means the queue must perform direct hand-off or rather unbounded queue.
JavaExecutorService
- Inherited
AbstractExecutorService
- Inherited
#auto_terminate= |
|
#auto_terminate? | Is the executor auto-terminate when the application exits? |
#fallback_policy, #name, | |
#running? | Is the executor running? |
#shutdown | Begin an orderly shutdown. |
#shutdown? | Is the executor shutdown? |
#shuttingdown? | Is the executor shuttingdown? |
#ns_auto_terminate? |
ExecutorService
- Included
#can_overflow? | Does the task queue have a maximum size? |
#serialized? | Does this executor guarantee serialization of its operations? |
Instance Method Summary
-
#active_count ⇒ Integer
The number of threads that are actively executing tasks.
-
#completed_task_count ⇒ Integer
The number of tasks that have been completed by the pool since construction.
-
#idletime ⇒ Integer
The number of seconds that a thread may be idle before being reclaimed.
-
#largest_length ⇒ Integer
The largest number of threads that have been created in the pool since construction.
-
#length ⇒ Integer
The number of threads currently in the pool.
-
#min_length ⇒ Integer
The minimum number of threads that may be retained in the pool.
-
#prune_pool
Prune the thread pool of unneeded threads.
-
#queue_length ⇒ Integer
The number of tasks in the queue awaiting execution.
-
#remaining_capacity ⇒ Integer
Number of tasks that may be enqueued before reaching #max_queue and rejecting new tasks.
-
#scheduled_task_count ⇒ Integer
The number of tasks that have been scheduled for execution on the pool since construction.
- #ns_initialize(opts) private
JavaExecutorService
- Inherited
#<< | Submit a task to the executor for asynchronous processing. |
#auto_terminate= |
|
#auto_terminate? | Is the executor auto-terminate when the application exits? |
#can_overflow? | Does the task queue have a maximum size? |
#kill | Begin an immediate shutdown. |
#post | Submit a task to the executor for asynchronous processing. |
#running? | Is the executor running? |
#serialized? | Does this executor guarantee serialization of its operations? |
#shutdown | Begin an orderly shutdown. |
#shutdown? | Is the executor shutdown? |
#shuttingdown? | Is the executor shuttingdown? |
#wait_for_termination | Block until executor shutdown is complete or until |
AbstractExecutorService
- Inherited
#<< | Submit a task to the executor for asynchronous processing. |
#can_overflow? | Does the task queue have a maximum size? |
#kill | Begin an immediate shutdown. |
#post | Submit a task to the executor for asynchronous processing. |
#serialized? | Does this executor guarantee serialization of its operations? |
#to_s, | |
#wait_for_termination | Block until executor shutdown is complete or until |
#fallback_action | Returns an action which executes the |
#ns_execute, | |
#ns_kill_execution | Callback method called when the executor has been killed. |
#ns_shutdown_execution | Callback method called when an orderly shutdown has completed. |
Concern::Deprecation
- Included
ExecutorService
- Included
#<< | Submit a task to the executor for asynchronous processing. |
#post | Submit a task to the executor for asynchronous processing. |
Concern::Logging
- Included
#log | Logs through global_logger, it can be overridden by setting @logger. |
Synchronization::LockableObject
- Inherited
Constructor Details
.new(opts = {}) ⇒ JavaThreadPoolExecutor
Create a new thread pool.
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 37
def initialize(opts = {}) super(opts) end
Instance Attribute Details
#can_overflow? ⇒ Boolean
(readonly)
Does the task queue have a maximum size?
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 42
def can_overflow? @max_queue != 0 end
#max_length ⇒ Integer
(readonly)
The maximum number of threads that may be created in the pool.
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 52
attr_reader :max_length
#max_queue ⇒ Integer
(readonly)
The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches max_queue
subsequent tasks will be rejected in accordance with the configured fallback_policy
.
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 31
attr_reader :max_queue
#running? ⇒ Boolean
(readonly)
Is the executor running?
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 97
def running? super && !@executor.isTerminating end
#synchronous ⇒ true
, false
(readonly)
Whether or not a value of 0 for :max_queue
option means the queue must perform direct hand-off or rather unbounded queue.
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 34
attr_reader :synchronous
Instance Method Details
#active_count ⇒ Integer
The number of threads that are actively executing tasks.
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 77
def active_count @executor.getActiveCount end
#completed_task_count ⇒ Integer
The number of tasks that have been completed by the pool since construction.
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 72
def completed_task_count @executor.getCompletedTaskCount end
#idletime ⇒ Integer
The number of seconds that a thread may be idle before being reclaimed.
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 82
def idletime @executor.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS) end
#largest_length ⇒ Integer
The largest number of threads that have been created in the pool since construction.
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 62
def largest_length @executor.getLargestPoolSize end
#length ⇒ Integer
The number of threads currently in the pool.
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 57
def length @executor.getPoolSize end
#min_length ⇒ Integer
The minimum number of threads that may be retained in the pool.
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 47
def min_length @executor.getCorePoolSize end
#ns_initialize(opts) (private)
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 107
def ns_initialize(opts) min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i @synchronous = opts.fetch(:synchronous, DEFAULT_SYNCHRONOUS) @fallback_policy = opts.fetch(:fallback_policy, :abort) raise ArgumentError.new("`synchronous` cannot be set unless `max_queue` is 0") if @synchronous && @max_queue > 0 raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if max_length < DEFAULT_MIN_POOL_SIZE raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if max_length > DEFAULT_MAX_POOL_SIZE raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if min_length < DEFAULT_MIN_POOL_SIZE raise ArgumentError.new("`min_threads` cannot be more than `max_threads`") if min_length > max_length raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.include?(@fallback_policy) if @max_queue == 0 if @synchronous queue = java.util.concurrent.SynchronousQueue.new else queue = java.util.concurrent.LinkedBlockingQueue.new end else queue = java.util.concurrent.LinkedBlockingQueue.new(@max_queue) end @executor = java.util.concurrent.ThreadPoolExecutor.new( min_length, max_length, idletime, java.util.concurrent.TimeUnit::SECONDS, queue, DaemonThreadFactory.new(ns_auto_terminate?), FALLBACK_POLICY_CLASSES[@fallback_policy].new) end
#prune_pool
Prune the thread pool of unneeded threads
What is being pruned is controlled by the min_threads and idletime parameters passed at pool creation time
This is a no-op on some pool implementation (e.g. the Java one). The Ruby pool will auto-prune each time a new job is posted. You will need to call this method explicitly in case your application post jobs in bursts (a lot of jobs and then nothing for long periods)
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 102
def prune_pool end
#queue_length ⇒ Integer
The number of tasks in the queue awaiting execution.
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 87
def queue_length @executor.getQueue.size end
#remaining_capacity ⇒ Integer
Number of tasks that may be enqueued before reaching #max_queue and rejecting new tasks. A value of -1 indicates that the queue may grow without bound.
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 92
def remaining_capacity @max_queue == 0 ? -1 : @executor.getQueue.remainingCapacity end
#scheduled_task_count ⇒ Integer
The number of tasks that have been scheduled for execution on the pool since construction.
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 67
def scheduled_task_count @executor.getTaskCount end