123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::RubyThreadPoolExecutor

Overview

Note:

Failure to properly shutdown a thread pool can lead to unpredictable results. Please read *Shutting Down Thread Pools* for more information.

An abstraction composed of one or more threads and a task queue. Tasks (blocks or proc objects) are submitted to the pool and added to the queue. The threads in the pool remove the tasks and execute them in the order they were received.

A ThreadPoolExecutor will automatically adjust the pool size according to the bounds set by min-threads and max-threads. When a new task is submitted and fewer than min-threads threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than min-threads but less than max-threads threads running, a new thread will be created only if the queue is full.

Threads that are idle for too long will be garbage collected, down to the configured minimum options. Should a thread crash it, too, will be garbage collected.

ThreadPoolExecutor is based on the Java class of the same name. From the official Java documentation;

> Thread pools address two different problems: they usually provide > improved performance when executing large numbers of asynchronous tasks, > due to reduced per-task invocation overhead, and they provide a means > of bounding and managing the resources, including threads, consumed > when executing a collection of tasks. Each ThreadPoolExecutor also > maintains some basic statistics, such as the number of completed tasks. > > To be useful across a wide range of contexts, this class provides many > adjustable parameters and extensibility hooks. However, programmers are > urged to use the more convenient Executors factory methods > [CachedThreadPool] (unbounded thread pool, with automatic thread reclamation), > [FixedThreadPool] (fixed size thread pool) and [SingleThreadExecutor] (single > background thread), that preconfigure settings for the most common usage > scenarios. **Thread Pool Options**

Thread pools support several configuration options:

  • #idletime: The number of seconds that a thread may be idle before being reclaimed.

  • name: The name of the executor (optional). Printed in the executor’s #to_s output and a -worker- name is given to its threads if supported by used Ruby implementation. is uniq for each thread.

  • #max_queue: The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches #max_queue and no new threads can be created, subsequent tasks will be rejected in accordance with the configured fallback_policy.

  • auto_terminate: When true (default), the threads started will be marked as daemon.

  • fallback_policy: The policy defining how rejected tasks are handled.

Three fallback policies are supported:

  • :abort: Raise a RejectedExecutionError exception and discard the task.

  • :discard: Discard the task and return false.

  • :caller_runs: Execute the task on the calling thread.

**Shutting Down Thread Pools**

Killing a thread pool while tasks are still being processed, either by calling the #kill method or at application exit, will have unpredictable results. There is no way for the thread pool to know what resources are being used by the in-progress tasks. When those tasks are killed the impact on those resources cannot be predicted. The best practice is to explicitly shutdown all thread pools using the provided methods:

  • Call #shutdown to initiate an orderly termination of all in-progress tasks

  • Call #wait_for_termination with an appropriate timeout interval an allow the orderly shutdown to complete

  • Call #kill *only when* the thread pool fails to shutdown in the allotted time

On some runtime platforms (most notably the JVM) the application will not exit until all thread pools have been shutdown. To prevent applications from “hanging” on exit, all threads can be marked as daemon according to the :auto_terminate option.

“‘ruby pool1 = FixedThreadPool.new(5) # threads will be marked as daemon pool2 = FixedThreadPool.new(5, auto_terminate: false) # mark threads as non-daemon “`

Constant Summary

Concern::Logging - Included

SEV_LABEL

AbstractExecutorService - Inherited

FALLBACK_POLICIES

Class Method Summary

RubyExecutorService - Inherited

AbstractExecutorService - Inherited

.new

Create a new thread pool.

Instance Attribute Summary

RubyExecutorService - Inherited

AbstractExecutorService - Inherited

#auto_terminate=

Set the auto-terminate behavior for this executor.

#auto_terminate?

Is the executor auto-terminate when the application exits?

#fallback_policy, #name,
#running?

Is the executor running?

#shutdown

Begin an orderly shutdown.

#shutdown?

Is the executor shutdown?

#shuttingdown?

Is the executor shuttingdown?

#ns_auto_terminate?

ExecutorService - Included

#can_overflow?

Does the task queue have a maximum size?

#serialized?

Does this executor guarantee serialization of its operations?

Instance Method Summary

RubyExecutorService - Inherited

#<<

Submit a task to the executor for asynchronous processing.

#auto_terminate=

Set the auto-terminate behavior for this executor.

#auto_terminate?

Is the executor auto-terminate when the application exits?

#can_overflow?

Does the task queue have a maximum size?

#kill

Begin an immediate shutdown.

#post

Submit a task to the executor for asynchronous processing.

#running?

Is the executor running?

#serialized?

Does this executor guarantee serialization of its operations?

#shutdown

Begin an orderly shutdown.

#shutdown?

Is the executor shutdown?

#shuttingdown?

Is the executor shuttingdown?

#wait_for_termination

Block until executor shutdown is complete or until timeout seconds have passed.

#ns_shutdown_execution, #stop_event, #stopped_event

AbstractExecutorService - Inherited

#<<

Submit a task to the executor for asynchronous processing.

#can_overflow?

Does the task queue have a maximum size?

#kill

Begin an immediate shutdown.

#post

Submit a task to the executor for asynchronous processing.

#serialized?

Does this executor guarantee serialization of its operations?

#to_s,
#wait_for_termination

Block until executor shutdown is complete or until timeout seconds have passed.

#fallback_action

Returns an action which executes the fallback_policy once the queue size reaches #max_queue.

#ns_execute,
#ns_kill_execution

Callback method called when the executor has been killed.

#ns_shutdown_execution

Callback method called when an orderly shutdown has completed.

Concern::Deprecation - Included

ExecutorService - Included

#<<

Submit a task to the executor for asynchronous processing.

#post

Submit a task to the executor for asynchronous processing.

Concern::Logging - Included

#log

Logs through global_logger, it can be overridden by setting @logger.

Synchronization::LockableObject - Inherited

Constructor Details

.new(opts = {}) ⇒ RubyThreadPoolExecutor

Create a new thread pool.

Parameters:

  • opts (Hash) (defaults to: {})

    the options which configure the thread pool.

Options Hash (opts):

  • :max_threads (Integer) — default: DEFAULT_MAX_POOL_SIZE

    the maximum number of threads to be created

  • :min_threads (Integer) — default: DEFAULT_MIN_POOL_SIZE

    When a new task is submitted and fewer than min_threads are running, a new thread is created

  • :idletime (Integer) — default: DEFAULT_THREAD_IDLETIMEOUT

    the maximum number of seconds a thread may be idle before being reclaimed

  • :max_queue (Integer) — default: DEFAULT_MAX_QUEUE_SIZE

    the maximum number of tasks allowed in the work queue at any one time; a value of zero means the queue may grow without bound

  • :fallback_policy (Symbol) — default: :abort

    the policy for handling new tasks that are received when the queue size has reached #max_queue or the executor has shut down

  • :synchronous (Boolean) — default: DEFAULT_SYNCHRONOUS

    whether or not a value of 0 for :max_queue means the queue must perform direct hand-off rather than unbounded.

Raises:

  • (ArgumentError)

    if :max_threads is less than one

  • (ArgumentError)

    if :min_threads is less than zero

  • (ArgumentError)

    if :fallback_policy is not one of the values specified in FALLBACK_POLICIES

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 45

def initialize(opts = {})
  super(opts)
end

Instance Attribute Details

#can_overflow?Boolean (readonly)

Does the task queue have a maximum size?

Returns:

  • (Boolean)

    True if the task queue has a maximum size else false.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 72

def can_overflow?
  synchronize { ns_limited_queue? }
end

#idletimeInteger (readonly)

The number of seconds that a thread may be idle before being reclaimed.

Returns:

  • (Integer)

    The number of seconds that a thread may be idle before being reclaimed.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 36

attr_reader :idletime

#max_lengthInteger (readonly)

The maximum number of threads that may be created in the pool.

Returns:

  • (Integer)

    The maximum number of threads that may be created in the pool.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 30

attr_reader :max_length

#max_queueInteger (readonly)

The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches max_queue subsequent tasks will be rejected in accordance with the configured fallback_policy.

Returns:

  • (Integer)

    The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches max_queue subsequent tasks will be rejected in accordance with the configured fallback_policy.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 39

attr_reader :max_queue

#min_lengthInteger (readonly)

The minimum number of threads that may be retained in the pool.

Returns:

  • (Integer)

    The minimum number of threads that may be retained in the pool.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 33

attr_reader :min_length

#ns_limited_queue?Boolean (readonly, private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 155

def ns_limited_queue?
  @max_queue != 0
end

#synchronoustrue, false (readonly)

Whether or not a value of 0 for :max_queue option means the queue must perform direct hand-off or rather unbounded queue.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 42

attr_reader :synchronous

Instance Method Details

#active_countInteger

The number of threads that are actively executing tasks.

Returns:

  • (Integer)

    The number of threads that are actively executing tasks.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 65

def active_count
  synchronize do
    @pool.length - @ready.length
  end
end

#completed_task_countInteger

The number of tasks that have been completed by the pool since construction.

Returns:

  • (Integer)

    The number of tasks that have been completed by the pool since construction.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 60

def completed_task_count
  synchronize { @completed_task_count }
end

#largest_lengthInteger

The largest number of threads that have been created in the pool since construction.

Returns:

  • (Integer)

    The largest number of threads that have been created in the pool since construction.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 50

def largest_length
  synchronize { @largest_length }
end

#lengthInteger

The number of threads currently in the pool.

Returns:

  • (Integer)

    The number of threads currently in the pool.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 77

def length
  synchronize { @pool.length }
end

#ns_add_busy_workernil, Worker (private)

creates new worker which has to receive work to do after it’s added

Returns:

  • (nil, Worker)

    nil of max capacity is reached

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 241

def ns_add_busy_worker
  return if @pool.size >= @max_length

  @workers_counter += 1
  @pool << (worker = Worker.new(self, @workers_counter))
  @largest_length = @pool.length if @pool.length > @largest_length
  worker
end

#ns_assign_worker(*args, &task) ⇒ true, false (private)

tries to assign task to a worker, tries to get one from @ready or to create new one

Returns:

  • (true, false)

    if task is assigned to a worker

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 201

def ns_assign_worker(*args, &task)
  # keep growing if the pool is not at the minimum yet
  worker, _ = (@ready.pop if @pool.size >= @min_length) || ns_add_busy_worker
  if worker
    worker << [task, args]
    true
  else
    false
  end
rescue ThreadError
  # Raised when the operating system refuses to create the new thread
  return false
end

#ns_enqueue(*args, &task) ⇒ true, false (private)

tries to enqueue task

Returns:

  • (true, false)

    if enqueued

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 219

def ns_enqueue(*args, &task)
  return false if @synchronous
  
  if !ns_limited_queue? || @queue.size < @max_queue
    @queue << [task, args]
    true
  else
    false
  end
end

#ns_execute(*args, &task) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 160

def ns_execute(*args, &task)
  ns_reset_if_forked

  if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task)
    @scheduled_task_count += 1
  else
    return fallback_action(*args, &task)
  end

  ns_prune_pool if @next_gc_time < Concurrent.monotonic_time
  nil
end

#ns_initialize(opts) (private)

Raises:

  • (ArgumentError)
[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 125

def ns_initialize(opts)
  @min_length      = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
  @max_length      = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
  @idletime        = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
  @max_queue       = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
  @synchronous     = opts.fetch(:synchronous, DEFAULT_SYNCHRONOUS)
  @fallback_policy = opts.fetch(:fallback_policy, :abort)

  raise ArgumentError.new("`synchronous` cannot be set unless `max_queue` is 0") if @synchronous && @max_queue > 0
  raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
  raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @max_length < DEFAULT_MIN_POOL_SIZE
  raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if @max_length > DEFAULT_MAX_POOL_SIZE
  raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @min_length < DEFAULT_MIN_POOL_SIZE
  raise ArgumentError.new("`min_threads` cannot be more than `max_threads`") if min_length > max_length

  @pool                 = [] # all workers
  @ready                = [] # used as a stash (most idle worker is at the start)
  @queue                = [] # used as queue
  # @ready or @queue is empty at all times
  @scheduled_task_count = 0
  @completed_task_count = 0
  @largest_length       = 0
  @workers_counter      = 0
  @ruby_pid             = $$ # detects if Ruby has forked

  @gc_interval  = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented
  @next_gc_time = Concurrent.monotonic_time + @gc_interval
end

#ns_kill_execution (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 189

def ns_kill_execution
  # TODO log out unprocessed tasks in queue
  # TODO try to shutdown first?
  @pool.each(&:kill)
  @pool.clear
  @ready.clear
end

#ns_prune_pool (private)

try oldest worker if it is idle for enough time, it’s returned back at the start

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 280

def ns_prune_pool
  now = Concurrent.monotonic_time
  stopped_workers = 0
  while !@ready.empty? && (@pool.size - stopped_workers > @min_length)
    worker, last_message = @ready.first
    if now - last_message > self.idletime
      stopped_workers += 1
      @ready.shift
      worker << :stop
    else break
    end
  end

  @next_gc_time = Concurrent.monotonic_time + @gc_interval
end

#ns_ready_worker(worker, last_message, success = true) (private)

handle ready worker, giving it new job or assigning back to @ready

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 253

def ns_ready_worker(worker, last_message, success = true)
  task_and_args = @queue.shift
  if task_and_args
    worker << task_and_args
  else
    # stop workers when !running?, do not return them to @ready
    if running?
      raise unless last_message
      @ready.push([worker, last_message])
    else
      worker.stop
    end
  end
end

#ns_remove_busy_worker(worker) (private)

removes a worker which is not in not tracked in @ready

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 271

def ns_remove_busy_worker(worker)
  @pool.delete(worker)
  stopped_event.set if @pool.empty? && !running?
  true
end

#ns_reset_if_forked (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 296

def ns_reset_if_forked
  if $$ != @ruby_pid
    @queue.clear
    @ready.clear
    @pool.clear
    @scheduled_task_count = 0
    @completed_task_count = 0
    @largest_length       = 0
    @workers_counter      = 0
    @ruby_pid             = $$
  end
end

#ns_shutdown_execution (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 174

def ns_shutdown_execution
  ns_reset_if_forked

  if @pool.empty?
    # nothing to do
    stopped_event.set
  end

  if @queue.empty?
    # no more tasks will be accepted, just stop all workers
    @pool.each(&:stop)
  end
end

#ns_worker_died(worker) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 231

def ns_worker_died(worker)
  ns_remove_busy_worker worker
  replacement_worker = ns_add_busy_worker
  ns_ready_worker replacement_worker, Concurrent.monotonic_time, false if replacement_worker
end

#prune_pool

Prune the thread pool of unneeded threads

What is being pruned is controlled by the min_threads and idletime parameters passed at pool creation time

This is a no-op on some pool implementation (e.g. the Java one). The Ruby pool will auto-prune each time a new job is posted. You will need to call this method explicitely in case your application post jobs in bursts (a lot of jobs and then nothing for long periods)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 118

def prune_pool
  synchronize { ns_prune_pool }
end

#queue_lengthInteger

The number of tasks in the queue awaiting execution.

Returns:

  • (Integer)

    The number of tasks in the queue awaiting execution.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 82

def queue_length
  synchronize { @queue.length }
end

#ready_worker(worker, last_message) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 103

def ready_worker(worker, last_message)
  synchronize { ns_ready_worker worker, last_message }
end

#remaining_capacityInteger

Number of tasks that may be enqueued before reaching #max_queue and rejecting new tasks. A value of -1 indicates that the queue may grow without bound.

Returns:

  • (Integer)

    Number of tasks that may be enqueued before reaching #max_queue and rejecting new tasks. A value of -1 indicates that the queue may grow without bound.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 87

def remaining_capacity
  synchronize do
    if ns_limited_queue?
      @max_queue - @queue.length
    else
      -1
    end
  end
end

#remove_busy_worker(worker) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 98

def remove_busy_worker(worker)
  synchronize { ns_remove_busy_worker worker }
end

#scheduled_task_countInteger

The number of tasks that have been scheduled for execution on the pool since construction.

Returns:

  • (Integer)

    The number of tasks that have been scheduled for execution on the pool since construction.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 55

def scheduled_task_count
  synchronize { @scheduled_task_count }
end

#worker_died(worker) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 108

def worker_died(worker)
  synchronize { ns_worker_died worker }
end

#worker_task_completed (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb', line 113

def worker_task_completed
  synchronize { @completed_task_count += 1 }
end