123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::CachedThreadPool

Relationships & Source Files
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
Instance Chain:
Inherits: Concurrent::ThreadPoolExecutor
Defined in: lib/concurrent-ruby/concurrent/executor/cached_thread_pool.rb

Overview

Note:

Failure to properly shutdown a thread pool can lead to unpredictable results. Please read *Shutting Down Thread Pools* for more information.

A thread pool that dynamically grows and shrinks to fit the current workload. New threads are created as needed, existing threads are reused, and threads that remain idle for too long are killed and removed from the pool. These pools are particularly suited to applications that perform a high volume of short-lived tasks.

On creation a CachedThreadPool has zero running threads. New threads are created on the pool as new operations are #post. The size of the pool will grow until #max_length threads are in the pool or until the number of threads exceeds the number of running and pending operations. When a new operation is post to the pool the first available idle thread will be tasked with the new operation.

Should a thread crash for any reason the thread will immediately be removed from the pool. Similarly, threads which remain idle for an extended period of time will be killed and reclaimed. Thus these thread pools are very efficient at reclaiming unused resources.

The API and behavior of this class are based on Java’s CachedThreadPool

**Thread Pool Options**

Thread pools support several configuration options:

  • idletime: The number of seconds that a thread may be idle before being reclaimed.

  • name: The name of the executor (optional). Printed in the executor’s #to_s output and a -worker- name is given to its threads if supported by used Ruby implementation. is uniq for each thread.

  • max_queue: The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches max_queue and no new threads can be created, subsequent tasks will be rejected in accordance with the configured fallback_policy.

  • auto_terminate: When true (default), the threads started will be marked as daemon.

  • fallback_policy: The policy defining how rejected tasks are handled.

Three fallback policies are supported:

  • :abort: Raise a RejectedExecutionError exception and discard the task.

  • :discard: Discard the task and return false.

  • :caller_runs: Execute the task on the calling thread.

**Shutting Down Thread Pools**

Killing a thread pool while tasks are still being processed, either by calling the #kill method or at application exit, will have unpredictable results. There is no way for the thread pool to know what resources are being used by the in-progress tasks. When those tasks are killed the impact on those resources cannot be predicted. The best practice is to explicitly shutdown all thread pools using the provided methods:

  • Call #shutdown to initiate an orderly termination of all in-progress tasks

  • Call #wait_for_termination with an appropriate timeout interval an allow the orderly shutdown to complete

  • Call #kill *only when* the thread pool fails to shutdown in the allotted time

On some runtime platforms (most notably the JVM) the application will not exit until all thread pools have been shutdown. To prevent applications from “hanging” on exit, all threads can be marked as daemon according to the :auto_terminate option.

“‘ruby pool1 = FixedThreadPool.new(5) # threads will be marked as daemon pool2 = FixedThreadPool.new(5, auto_terminate: false) # mark threads as non-daemon “`

Class Method Summary

Instance Attribute Summary

ThreadPoolExecutor - Inherited

#completed_task_count

The number of tasks that have been completed by the pool since construction.

#fallback_policy,
#idletime

The number of seconds that a thread may be idle before being reclaimed.

#largest_length

The largest number of threads that have been created in the pool since construction.

#length

The number of threads currently in the pool.

#max_length

The maximum number of threads that may be created in the pool.

#max_queue

The maximum number of tasks that may be waiting in the work queue at any one time.

#min_length

The minimum number of threads that may be retained in the pool.

#queue_length

The number of tasks in the queue awaiting execution.

#remaining_capacity

Number of tasks that may be enqueued before reaching max_queue and rejecting new tasks.

#scheduled_task_count

The number of tasks that have been scheduled for execution on the pool since construction.

Instance Method Summary

ThreadPoolExecutor - Inherited

#<<

Submit a task to the executor for asynchronous processing.

#auto_terminate=

Set the auto-terminate behavior for this executor.

#auto_terminate?

Is the executor auto-terminate when the application exits?

#can_overflow?

Does the task queue have a maximum size?

#kill

Begin an immediate shutdown.

#post

Submit a task to the executor for asynchronous processing.

#prune_pool

Prune the thread pool of unneeded threads.

#running?

Is the executor running?

#serialized?

Does this executor guarantee serialization of its operations?

#shutdown

Begin an orderly shutdown.

#shutdown?

Is the executor shutdown?

#shuttingdown?

Is the executor shuttingdown?

#wait_for_termination

Block until executor shutdown is complete or until timeout seconds have passed.

Constructor Details

.new(opts = {}) ⇒ CachedThreadPool

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/cached_thread_pool.rb', line 39

def initialize(opts = {})
  defaults  = { idletime: DEFAULT_THREAD_IDLETIMEOUT }
  overrides = { min_threads: 0,
                max_threads: DEFAULT_MAX_POOL_SIZE,
                max_queue:   DEFAULT_MAX_QUEUE_SIZE }
  super(defaults.merge(opts).merge(overrides))
end

Instance Method Details

#ns_initialize(opts) (private)

Create a new thread pool.

Parameters:

  • opts (Hash)

    the options defining pool behavior.

Options Hash (opts):

  • :fallback_policy (Symbol) — default: `:abort`

    the fallback policy

Raises:

  • (ArgumentError)

    if fallback_policy is not a known policy

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/cached_thread_pool.rb', line 51

def ns_initialize(opts)
  super(opts)
  if Concurrent.on_jruby?
    @max_queue          = 0
    @executor           = java.util.concurrent.Executors.newCachedThreadPool(
        DaemonThreadFactory.new(ns_auto_terminate?))
    @executor.setRejectedExecutionHandler(FALLBACK_POLICY_CLASSES[@fallback_policy].new)
    @executor.setKeepAliveTime(opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT), java.util.concurrent.TimeUnit::SECONDS)
  end
end