123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::JavaThreadPoolExecutor

Overview

Note:

Failure to properly shutdown a thread pool can lead to unpredictable results. Please read *Shutting Down Thread Pools* for more information.

An abstraction composed of one or more threads and a task queue. Tasks (blocks or proc objects) are submitted to the pool and added to the queue. The threads in the pool remove the tasks and execute them in the order they were received.

A ThreadPoolExecutor will automatically adjust the pool size according to the bounds set by min-threads and max-threads. When a new task is submitted and fewer than min-threads threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than min-threads but less than max-threads threads running, a new thread will be created only if the queue is full.

Threads that are idle for too long will be garbage collected, down to the configured minimum options. Should a thread crash it, too, will be garbage collected.

ThreadPoolExecutor is based on the Java class of the same name. From the official Java documentation;

> Thread pools address two different problems: they usually provide > improved performance when executing large numbers of asynchronous tasks, > due to reduced per-task invocation overhead, and they provide a means > of bounding and managing the resources, including threads, consumed > when executing a collection of tasks. Each ThreadPoolExecutor also > maintains some basic statistics, such as the number of completed tasks. > > To be useful across a wide range of contexts, this class provides many > adjustable parameters and extensibility hooks. However, programmers are > urged to use the more convenient Executors factory methods > [CachedThreadPool] (unbounded thread pool, with automatic thread reclamation), > [FixedThreadPool] (fixed size thread pool) and [SingleThreadExecutor] (single > background thread), that preconfigure settings for the most common usage > scenarios. **Thread Pool Options**

Thread pools support several configuration options:

  • #idletime: The number of seconds that a thread may be idle before being reclaimed.

  • name: The name of the executor (optional). Printed in the executor’s #to_s output and a -worker- name is given to its threads if supported by used Ruby implementation. is uniq for each thread.

  • #max_queue: The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches #max_queue and no new threads can be created, subsequent tasks will be rejected in accordance with the configured fallback_policy.

  • auto_terminate: When true (default), the threads started will be marked as daemon.

  • fallback_policy: The policy defining how rejected tasks are handled.

Three fallback policies are supported:

  • :abort: Raise a RejectedExecutionError exception and discard the task.

  • :discard: Discard the task and return false.

  • :caller_runs: Execute the task on the calling thread.

**Shutting Down Thread Pools**

Killing a thread pool while tasks are still being processed, either by calling the #kill method or at application exit, will have unpredictable results. There is no way for the thread pool to know what resources are being used by the in-progress tasks. When those tasks are killed the impact on those resources cannot be predicted. The best practice is to explicitly shutdown all thread pools using the provided methods:

  • Call #shutdown to initiate an orderly termination of all in-progress tasks

  • Call #wait_for_termination with an appropriate timeout interval an allow the orderly shutdown to complete

  • Call #kill *only when* the thread pool fails to shutdown in the allotted time

On some runtime platforms (most notably the JVM) the application will not exit until all thread pools have been shutdown. To prevent applications from “hanging” on exit, all threads can be marked as daemon according to the :auto_terminate option.

“‘ruby pool1 = FixedThreadPool.new(5) # threads will be marked as daemon pool2 = FixedThreadPool.new(5, auto_terminate: false) # mark threads as non-daemon “`

Constant Summary

Concern::Logging - Included

SEV_LABEL

AbstractExecutorService - Inherited

FALLBACK_POLICIES

JavaExecutorService - Inherited

FALLBACK_POLICY_CLASSES

Class Method Summary

AbstractExecutorService - Inherited

.new

Create a new thread pool.

Instance Attribute Summary

JavaExecutorService - Inherited

AbstractExecutorService - Inherited

#auto_terminate=

Set the auto-terminate behavior for this executor.

#auto_terminate?

Is the executor auto-terminate when the application exits?

#fallback_policy, #name,
#running?

Is the executor running?

#shutdown

Begin an orderly shutdown.

#shutdown?

Is the executor shutdown?

#shuttingdown?

Is the executor shuttingdown?

#ns_auto_terminate?

ExecutorService - Included

#can_overflow?

Does the task queue have a maximum size?

#serialized?

Does this executor guarantee serialization of its operations?

Instance Method Summary

JavaExecutorService - Inherited

#<<

Submit a task to the executor for asynchronous processing.

#auto_terminate=

Set the auto-terminate behavior for this executor.

#auto_terminate?

Is the executor auto-terminate when the application exits?

#can_overflow?

Does the task queue have a maximum size?

#kill

Begin an immediate shutdown.

#post

Submit a task to the executor for asynchronous processing.

#running?

Is the executor running?

#serialized?

Does this executor guarantee serialization of its operations?

#shutdown

Begin an orderly shutdown.

#shutdown?

Is the executor shutdown?

#shuttingdown?

Is the executor shuttingdown?

#wait_for_termination

Block until executor shutdown is complete or until timeout seconds have passed.

AbstractExecutorService - Inherited

#<<

Submit a task to the executor for asynchronous processing.

#can_overflow?

Does the task queue have a maximum size?

#kill

Begin an immediate shutdown.

#post

Submit a task to the executor for asynchronous processing.

#serialized?

Does this executor guarantee serialization of its operations?

#to_s,
#wait_for_termination

Block until executor shutdown is complete or until timeout seconds have passed.

#fallback_action

Returns an action which executes the fallback_policy once the queue size reaches #max_queue.

#ns_execute,
#ns_kill_execution

Callback method called when the executor has been killed.

#ns_shutdown_execution

Callback method called when an orderly shutdown has completed.

Concern::Deprecation - Included

ExecutorService - Included

#<<

Submit a task to the executor for asynchronous processing.

#post

Submit a task to the executor for asynchronous processing.

Concern::Logging - Included

#log

Logs through global_logger, it can be overridden by setting @logger.

Synchronization::LockableObject - Inherited

Constructor Details

.new(opts = {}) ⇒ JavaThreadPoolExecutor

Create a new thread pool.

Parameters:

  • opts (Hash) (defaults to: {})

    the options which configure the thread pool.

Options Hash (opts):

  • :max_threads (Integer) — default: DEFAULT_MAX_POOL_SIZE

    the maximum number of threads to be created

  • :min_threads (Integer) — default: DEFAULT_MIN_POOL_SIZE

    When a new task is submitted and fewer than min_threads are running, a new thread is created

  • :idletime (Integer) — default: DEFAULT_THREAD_IDLETIMEOUT

    the maximum number of seconds a thread may be idle before being reclaimed

  • :max_queue (Integer) — default: DEFAULT_MAX_QUEUE_SIZE

    the maximum number of tasks allowed in the work queue at any one time; a value of zero means the queue may grow without bound

  • :fallback_policy (Symbol) — default: :abort

    the policy for handling new tasks that are received when the queue size has reached #max_queue or the executor has shut down

  • :synchronous (Boolean) — default: DEFAULT_SYNCHRONOUS

    whether or not a value of 0 for :max_queue means the queue must perform direct hand-off rather than unbounded.

Raises:

  • (ArgumentError)

    if :max_threads is less than one

  • (ArgumentError)

    if :min_threads is less than zero

  • (ArgumentError)

    if :fallback_policy is not one of the values specified in FALLBACK_POLICIES

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 37

def initialize(opts = {})
  super(opts)
end

Instance Attribute Details

#can_overflow?Boolean (readonly)

Does the task queue have a maximum size?

Returns:

  • (Boolean)

    True if the task queue has a maximum size else false.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 42

def can_overflow?
  @max_queue != 0
end

#max_lengthInteger (readonly)

The maximum number of threads that may be created in the pool.

Returns:

  • (Integer)

    The maximum number of threads that may be created in the pool.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 52

attr_reader :max_length

#max_queueInteger (readonly)

The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches max_queue subsequent tasks will be rejected in accordance with the configured fallback_policy.

Returns:

  • (Integer)

    The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches max_queue subsequent tasks will be rejected in accordance with the configured fallback_policy.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 31

attr_reader :max_queue

#running?Boolean (readonly)

Is the executor running?

Returns:

  • (Boolean)

    true when running, false when shutting down or shutdown

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 97

def running?
  super && !@executor.isTerminating
end

#synchronoustrue, false (readonly)

Whether or not a value of 0 for :max_queue option means the queue must perform direct hand-off or rather unbounded queue.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 34

attr_reader :synchronous

Instance Method Details

#active_countInteger

The number of threads that are actively executing tasks.

Returns:

  • (Integer)

    The number of threads that are actively executing tasks.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 77

def active_count
  @executor.getActiveCount
end

#completed_task_countInteger

The number of tasks that have been completed by the pool since construction.

Returns:

  • (Integer)

    The number of tasks that have been completed by the pool since construction.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 72

def completed_task_count
  @executor.getCompletedTaskCount
end

#idletimeInteger

The number of seconds that a thread may be idle before being reclaimed.

Returns:

  • (Integer)

    The number of seconds that a thread may be idle before being reclaimed.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 82

def idletime
  @executor.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS)
end

#largest_lengthInteger

The largest number of threads that have been created in the pool since construction.

Returns:

  • (Integer)

    The largest number of threads that have been created in the pool since construction.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 62

def largest_length
  @executor.getLargestPoolSize
end

#lengthInteger

The number of threads currently in the pool.

Returns:

  • (Integer)

    The number of threads currently in the pool.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 57

def length
  @executor.getPoolSize
end

#min_lengthInteger

The minimum number of threads that may be retained in the pool.

Returns:

  • (Integer)

    The minimum number of threads that may be retained in the pool.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 47

def min_length
  @executor.getCorePoolSize
end

#ns_initialize(opts) (private)

Raises:

  • (ArgumentError)
[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 107

def ns_initialize(opts)
  min_length       = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
  max_length       = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
  idletime         = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
  @max_queue       = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
  @synchronous     = opts.fetch(:synchronous, DEFAULT_SYNCHRONOUS)
  @fallback_policy = opts.fetch(:fallback_policy, :abort)

  raise ArgumentError.new("`synchronous` cannot be set unless `max_queue` is 0") if @synchronous && @max_queue > 0
  raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if max_length < DEFAULT_MIN_POOL_SIZE
  raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if max_length > DEFAULT_MAX_POOL_SIZE
  raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if min_length < DEFAULT_MIN_POOL_SIZE
  raise ArgumentError.new("`min_threads` cannot be more than `max_threads`") if min_length > max_length
  raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.include?(@fallback_policy)

  if @max_queue == 0
    if @synchronous
      queue = java.util.concurrent.SynchronousQueue.new
    else
      queue = java.util.concurrent.LinkedBlockingQueue.new
    end
  else
    queue = java.util.concurrent.LinkedBlockingQueue.new(@max_queue)
  end

  @executor = java.util.concurrent.ThreadPoolExecutor.new(
      min_length,
      max_length,
      idletime,
      java.util.concurrent.TimeUnit::SECONDS,
      queue,
      DaemonThreadFactory.new(ns_auto_terminate?),
      FALLBACK_POLICY_CLASSES[@fallback_policy].new)

end

#prune_pool

Prune the thread pool of unneeded threads

What is being pruned is controlled by the min_threads and idletime parameters passed at pool creation time

This is a no-op on some pool implementation (e.g. the Java one). The Ruby pool will auto-prune each time a new job is posted. You will need to call this method explicitly in case your application post jobs in bursts (a lot of jobs and then nothing for long periods)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 102

def prune_pool
end

#queue_lengthInteger

The number of tasks in the queue awaiting execution.

Returns:

  • (Integer)

    The number of tasks in the queue awaiting execution.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 87

def queue_length
  @executor.getQueue.size
end

#remaining_capacityInteger

Number of tasks that may be enqueued before reaching #max_queue and rejecting new tasks. A value of -1 indicates that the queue may grow without bound.

Returns:

  • (Integer)

    Number of tasks that may be enqueued before reaching #max_queue and rejecting new tasks. A value of -1 indicates that the queue may grow without bound.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 92

def remaining_capacity
  @max_queue == 0 ? -1 : @executor.getQueue.remainingCapacity
end

#scheduled_task_countInteger

The number of tasks that have been scheduled for execution on the pool since construction.

Returns:

  • (Integer)

    The number of tasks that have been scheduled for execution on the pool since construction.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb', line 67

def scheduled_task_count
  @executor.getTaskCount
end