123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::AbstractExecutorService

Constant Summary

Concern::Logging - Included

SEV_LABEL

Class Method Summary

Instance Attribute Summary

ExecutorService - Included

#can_overflow?

Does the task queue have a maximum size?

#serialized?

Does this executor guarantee serialization of its operations?

Instance Method Summary

Concern::Deprecation - Included

ExecutorService - Included

#<<

Submit a task to the executor for asynchronous processing.

#post

Submit a task to the executor for asynchronous processing.

Concern::Logging - Included

#log

Logs through global_logger, it can be overridden by setting @logger.

Synchronization::LockableObject - Inherited

Constructor Details

.new(opts = {}, &block) ⇒ AbstractExecutorService

Create a new thread pool.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb', line 23

def initialize(opts = {}, &block)
  super(&nil)
  synchronize do
    @auto_terminate = opts.fetch(:auto_terminate, true)
    @name = opts.fetch(:name) if opts.key?(:name)
    ns_initialize(opts, &block)
  end
end

Instance Attribute Details

#auto_terminate=(value) ⇒ Boolean (rw)

Deprecated.

Has no effect

Set the auto-terminate behavior for this executor.

Parameters:

  • value (Boolean)

    The new auto-terminate value to set for this executor.

Returns:

  • (Boolean)

    true when auto-termination is enabled else false.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb', line 72

class AbstractExecutorService < Synchronization::LockableObject
  include ExecutorService
  include Concern::Deprecation

  # The set of possible fallback policies that may be set at thread pool creation.
  FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze

  # @!macro executor_service_attr_reader_fallback_policy
  attr_reader :fallback_policy

  attr_reader :name

  # Create a new thread pool.
  def initialize(opts = {}, &block)
    super(&nil)
    synchronize do
      @auto_terminate = opts.fetch(:auto_terminate, true)
      @name = opts.fetch(:name) if opts.key?(:name)
      ns_initialize(opts, &block)
    end
  end

  def to_s
    name ? "#{super[0..-2]} name: #{name}>" : super
  end

  # @!macro executor_service_method_shutdown
  def shutdown
    raise NotImplementedError
  end

  # @!macro executor_service_method_kill
  def kill
    raise NotImplementedError
  end

  # @!macro executor_service_method_wait_for_termination
  def wait_for_termination(timeout = nil)
    raise NotImplementedError
  end

  # @!macro executor_service_method_running_question
  def running?
    synchronize { ns_running? }
  end

  # @!macro executor_service_method_shuttingdown_question
  def shuttingdown?
    synchronize { ns_shuttingdown? }
  end

  # @!macro executor_service_method_shutdown_question
  def shutdown?
    synchronize { ns_shutdown? }
  end

  # @!macro executor_service_method_auto_terminate_question
  def auto_terminate?
    synchronize { @auto_terminate }
  end

  # @!macro executor_service_method_auto_terminate_setter
  def auto_terminate=(value)
    deprecated "Method #auto_terminate= has no effect. Set :auto_terminate option when executor is initialized."
  end

  private

  # Returns an action which executes the `fallback_policy` once the queue
  # size reaches `max_queue`. The reason for the indirection of an action
  # is so that the work can be deferred outside of synchronization.
  #
  # @param [Array] args the arguments to the task which is being handled.
  #
  # @!visibility private
  def fallback_action(*args)
    case fallback_policy
    when :abort
      lambda { raise RejectedExecutionError }
    when :discard
      lambda { false }
    when :caller_runs
      lambda {
        begin
          yield(*args)
        rescue => ex
          # let it fail
          log DEBUG, ex
        end
        true
      }
    else
      lambda { fail "Unknown fallback policy #{fallback_policy}" }
    end
  end

  def ns_execute(*args, &task)
    raise NotImplementedError
  end

  # @!macro executor_service_method_ns_shutdown_execution
  #
  #   Callback method called when an orderly shutdown has completed.
  #   The default behavior is to signal all waiting threads.
  def ns_shutdown_execution
    # do nothing
  end

  # @!macro executor_service_method_ns_kill_execution
  #
  #   Callback method called when the executor has been killed.
  #   The default behavior is to do nothing.
  def ns_kill_execution
    # do nothing
  end

  def ns_auto_terminate?
    @auto_terminate
  end

end

#auto_terminate?Boolean (rw)

Is the executor auto-terminate when the application exits?

Returns:

  • (Boolean)

    true when auto-termination is enabled else false.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb', line 67

class AbstractExecutorService < Synchronization::LockableObject
  include ExecutorService
  include Concern::Deprecation

  # The set of possible fallback policies that may be set at thread pool creation.
  FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze

  # @!macro executor_service_attr_reader_fallback_policy
  attr_reader :fallback_policy

  attr_reader :name

  # Create a new thread pool.
  def initialize(opts = {}, &block)
    super(&nil)
    synchronize do
      @auto_terminate = opts.fetch(:auto_terminate, true)
      @name = opts.fetch(:name) if opts.key?(:name)
      ns_initialize(opts, &block)
    end
  end

  def to_s
    name ? "#{super[0..-2]} name: #{name}>" : super
  end

  # @!macro executor_service_method_shutdown
  def shutdown
    raise NotImplementedError
  end

  # @!macro executor_service_method_kill
  def kill
    raise NotImplementedError
  end

  # @!macro executor_service_method_wait_for_termination
  def wait_for_termination(timeout = nil)
    raise NotImplementedError
  end

  # @!macro executor_service_method_running_question
  def running?
    synchronize { ns_running? }
  end

  # @!macro executor_service_method_shuttingdown_question
  def shuttingdown?
    synchronize { ns_shuttingdown? }
  end

  # @!macro executor_service_method_shutdown_question
  def shutdown?
    synchronize { ns_shutdown? }
  end

  # @!macro executor_service_method_auto_terminate_question
  def auto_terminate?
    synchronize { @auto_terminate }
  end

  # @!macro executor_service_method_auto_terminate_setter
  def auto_terminate=(value)
    deprecated "Method #auto_terminate= has no effect. Set :auto_terminate option when executor is initialized."
  end

  private

  # Returns an action which executes the `fallback_policy` once the queue
  # size reaches `max_queue`. The reason for the indirection of an action
  # is so that the work can be deferred outside of synchronization.
  #
  # @param [Array] args the arguments to the task which is being handled.
  #
  # @!visibility private
  def fallback_action(*args)
    case fallback_policy
    when :abort
      lambda { raise RejectedExecutionError }
    when :discard
      lambda { false }
    when :caller_runs
      lambda {
        begin
          yield(*args)
        rescue => ex
          # let it fail
          log DEBUG, ex
        end
        true
      }
    else
      lambda { fail "Unknown fallback policy #{fallback_policy}" }
    end
  end

  def ns_execute(*args, &task)
    raise NotImplementedError
  end

  # @!macro executor_service_method_ns_shutdown_execution
  #
  #   Callback method called when an orderly shutdown has completed.
  #   The default behavior is to signal all waiting threads.
  def ns_shutdown_execution
    # do nothing
  end

  # @!macro executor_service_method_ns_kill_execution
  #
  #   Callback method called when the executor has been killed.
  #   The default behavior is to do nothing.
  def ns_kill_execution
    # do nothing
  end

  def ns_auto_terminate?
    @auto_terminate
  end

end

#fallback_policySymbol (readonly)

Returns:

  • (Symbol)

    The fallback policy in effect. Either :abort, :discard, or :caller_runs.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb', line 10

class AbstractExecutorService < Synchronization::LockableObject
  include ExecutorService
  include Concern::Deprecation

  # The set of possible fallback policies that may be set at thread pool creation.
  FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze

  # @!macro executor_service_attr_reader_fallback_policy
  attr_reader :fallback_policy

  attr_reader :name

  # Create a new thread pool.
  def initialize(opts = {}, &block)
    super(&nil)
    synchronize do
      @auto_terminate = opts.fetch(:auto_terminate, true)
      @name = opts.fetch(:name) if opts.key?(:name)
      ns_initialize(opts, &block)
    end
  end

  def to_s
    name ? "#{super[0..-2]} name: #{name}>" : super
  end

  # @!macro executor_service_method_shutdown
  def shutdown
    raise NotImplementedError
  end

  # @!macro executor_service_method_kill
  def kill
    raise NotImplementedError
  end

  # @!macro executor_service_method_wait_for_termination
  def wait_for_termination(timeout = nil)
    raise NotImplementedError
  end

  # @!macro executor_service_method_running_question
  def running?
    synchronize { ns_running? }
  end

  # @!macro executor_service_method_shuttingdown_question
  def shuttingdown?
    synchronize { ns_shuttingdown? }
  end

  # @!macro executor_service_method_shutdown_question
  def shutdown?
    synchronize { ns_shutdown? }
  end

  # @!macro executor_service_method_auto_terminate_question
  def auto_terminate?
    synchronize { @auto_terminate }
  end

  # @!macro executor_service_method_auto_terminate_setter
  def auto_terminate=(value)
    deprecated "Method #auto_terminate= has no effect. Set :auto_terminate option when executor is initialized."
  end

  private

  # Returns an action which executes the `fallback_policy` once the queue
  # size reaches `max_queue`. The reason for the indirection of an action
  # is so that the work can be deferred outside of synchronization.
  #
  # @param [Array] args the arguments to the task which is being handled.
  #
  # @!visibility private
  def fallback_action(*args)
    case fallback_policy
    when :abort
      lambda { raise RejectedExecutionError }
    when :discard
      lambda { false }
    when :caller_runs
      lambda {
        begin
          yield(*args)
        rescue => ex
          # let it fail
          log DEBUG, ex
        end
        true
      }
    else
      lambda { fail "Unknown fallback policy #{fallback_policy}" }
    end
  end

  def ns_execute(*args, &task)
    raise NotImplementedError
  end

  # @!macro executor_service_method_ns_shutdown_execution
  #
  #   Callback method called when an orderly shutdown has completed.
  #   The default behavior is to signal all waiting threads.
  def ns_shutdown_execution
    # do nothing
  end

  # @!macro executor_service_method_ns_kill_execution
  #
  #   Callback method called when the executor has been killed.
  #   The default behavior is to do nothing.
  def ns_kill_execution
    # do nothing
  end

  def ns_auto_terminate?
    @auto_terminate
  end

end

#name (readonly)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb', line 20

attr_reader :name

#ns_auto_terminate?Boolean (readonly, private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb', line 126

def ns_auto_terminate?
  @auto_terminate
end

#running?Boolean (readonly)

Is the executor running?

Returns:

  • (Boolean)

    true when running, false when shutting down or shutdown

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb', line 52

class AbstractExecutorService < Synchronization::LockableObject
  include ExecutorService
  include Concern::Deprecation

  # The set of possible fallback policies that may be set at thread pool creation.
  FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze

  # @!macro executor_service_attr_reader_fallback_policy
  attr_reader :fallback_policy

  attr_reader :name

  # Create a new thread pool.
  def initialize(opts = {}, &block)
    super(&nil)
    synchronize do
      @auto_terminate = opts.fetch(:auto_terminate, true)
      @name = opts.fetch(:name) if opts.key?(:name)
      ns_initialize(opts, &block)
    end
  end

  def to_s
    name ? "#{super[0..-2]} name: #{name}>" : super
  end

  # @!macro executor_service_method_shutdown
  def shutdown
    raise NotImplementedError
  end

  # @!macro executor_service_method_kill
  def kill
    raise NotImplementedError
  end

  # @!macro executor_service_method_wait_for_termination
  def wait_for_termination(timeout = nil)
    raise NotImplementedError
  end

  # @!macro executor_service_method_running_question
  def running?
    synchronize { ns_running? }
  end

  # @!macro executor_service_method_shuttingdown_question
  def shuttingdown?
    synchronize { ns_shuttingdown? }
  end

  # @!macro executor_service_method_shutdown_question
  def shutdown?
    synchronize { ns_shutdown? }
  end

  # @!macro executor_service_method_auto_terminate_question
  def auto_terminate?
    synchronize { @auto_terminate }
  end

  # @!macro executor_service_method_auto_terminate_setter
  def auto_terminate=(value)
    deprecated "Method #auto_terminate= has no effect. Set :auto_terminate option when executor is initialized."
  end

  private

  # Returns an action which executes the `fallback_policy` once the queue
  # size reaches `max_queue`. The reason for the indirection of an action
  # is so that the work can be deferred outside of synchronization.
  #
  # @param [Array] args the arguments to the task which is being handled.
  #
  # @!visibility private
  def fallback_action(*args)
    case fallback_policy
    when :abort
      lambda { raise RejectedExecutionError }
    when :discard
      lambda { false }
    when :caller_runs
      lambda {
        begin
          yield(*args)
        rescue => ex
          # let it fail
          log DEBUG, ex
        end
        true
      }
    else
      lambda { fail "Unknown fallback policy #{fallback_policy}" }
    end
  end

  def ns_execute(*args, &task)
    raise NotImplementedError
  end

  # @!macro executor_service_method_ns_shutdown_execution
  #
  #   Callback method called when an orderly shutdown has completed.
  #   The default behavior is to signal all waiting threads.
  def ns_shutdown_execution
    # do nothing
  end

  # @!macro executor_service_method_ns_kill_execution
  #
  #   Callback method called when the executor has been killed.
  #   The default behavior is to do nothing.
  def ns_kill_execution
    # do nothing
  end

  def ns_auto_terminate?
    @auto_terminate
  end

end

#shutdown (readonly)

Begin an orderly shutdown. Tasks already in the queue will be executed, but no new tasks will be accepted. Has no additional effect if the thread pool is not running.

Raises:

  • (NotImplementedError)
[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb', line 37

class AbstractExecutorService < Synchronization::LockableObject
  include ExecutorService
  include Concern::Deprecation

  # The set of possible fallback policies that may be set at thread pool creation.
  FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze

  # @!macro executor_service_attr_reader_fallback_policy
  attr_reader :fallback_policy

  attr_reader :name

  # Create a new thread pool.
  def initialize(opts = {}, &block)
    super(&nil)
    synchronize do
      @auto_terminate = opts.fetch(:auto_terminate, true)
      @name = opts.fetch(:name) if opts.key?(:name)
      ns_initialize(opts, &block)
    end
  end

  def to_s
    name ? "#{super[0..-2]} name: #{name}>" : super
  end

  # @!macro executor_service_method_shutdown
  def shutdown
    raise NotImplementedError
  end

  # @!macro executor_service_method_kill
  def kill
    raise NotImplementedError
  end

  # @!macro executor_service_method_wait_for_termination
  def wait_for_termination(timeout = nil)
    raise NotImplementedError
  end

  # @!macro executor_service_method_running_question
  def running?
    synchronize { ns_running? }
  end

  # @!macro executor_service_method_shuttingdown_question
  def shuttingdown?
    synchronize { ns_shuttingdown? }
  end

  # @!macro executor_service_method_shutdown_question
  def shutdown?
    synchronize { ns_shutdown? }
  end

  # @!macro executor_service_method_auto_terminate_question
  def auto_terminate?
    synchronize { @auto_terminate }
  end

  # @!macro executor_service_method_auto_terminate_setter
  def auto_terminate=(value)
    deprecated "Method #auto_terminate= has no effect. Set :auto_terminate option when executor is initialized."
  end

  private

  # Returns an action which executes the `fallback_policy` once the queue
  # size reaches `max_queue`. The reason for the indirection of an action
  # is so that the work can be deferred outside of synchronization.
  #
  # @param [Array] args the arguments to the task which is being handled.
  #
  # @!visibility private
  def fallback_action(*args)
    case fallback_policy
    when :abort
      lambda { raise RejectedExecutionError }
    when :discard
      lambda { false }
    when :caller_runs
      lambda {
        begin
          yield(*args)
        rescue => ex
          # let it fail
          log DEBUG, ex
        end
        true
      }
    else
      lambda { fail "Unknown fallback policy #{fallback_policy}" }
    end
  end

  def ns_execute(*args, &task)
    raise NotImplementedError
  end

  # @!macro executor_service_method_ns_shutdown_execution
  #
  #   Callback method called when an orderly shutdown has completed.
  #   The default behavior is to signal all waiting threads.
  def ns_shutdown_execution
    # do nothing
  end

  # @!macro executor_service_method_ns_kill_execution
  #
  #   Callback method called when the executor has been killed.
  #   The default behavior is to do nothing.
  def ns_kill_execution
    # do nothing
  end

  def ns_auto_terminate?
    @auto_terminate
  end

end

#shutdown?Boolean (readonly)

Is the executor shutdown?

Returns:

  • (Boolean)

    true when shutdown, false when shutting down or running

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb', line 62

class AbstractExecutorService < Synchronization::LockableObject
  include ExecutorService
  include Concern::Deprecation

  # The set of possible fallback policies that may be set at thread pool creation.
  FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze

  # @!macro executor_service_attr_reader_fallback_policy
  attr_reader :fallback_policy

  attr_reader :name

  # Create a new thread pool.
  def initialize(opts = {}, &block)
    super(&nil)
    synchronize do
      @auto_terminate = opts.fetch(:auto_terminate, true)
      @name = opts.fetch(:name) if opts.key?(:name)
      ns_initialize(opts, &block)
    end
  end

  def to_s
    name ? "#{super[0..-2]} name: #{name}>" : super
  end

  # @!macro executor_service_method_shutdown
  def shutdown
    raise NotImplementedError
  end

  # @!macro executor_service_method_kill
  def kill
    raise NotImplementedError
  end

  # @!macro executor_service_method_wait_for_termination
  def wait_for_termination(timeout = nil)
    raise NotImplementedError
  end

  # @!macro executor_service_method_running_question
  def running?
    synchronize { ns_running? }
  end

  # @!macro executor_service_method_shuttingdown_question
  def shuttingdown?
    synchronize { ns_shuttingdown? }
  end

  # @!macro executor_service_method_shutdown_question
  def shutdown?
    synchronize { ns_shutdown? }
  end

  # @!macro executor_service_method_auto_terminate_question
  def auto_terminate?
    synchronize { @auto_terminate }
  end

  # @!macro executor_service_method_auto_terminate_setter
  def auto_terminate=(value)
    deprecated "Method #auto_terminate= has no effect. Set :auto_terminate option when executor is initialized."
  end

  private

  # Returns an action which executes the `fallback_policy` once the queue
  # size reaches `max_queue`. The reason for the indirection of an action
  # is so that the work can be deferred outside of synchronization.
  #
  # @param [Array] args the arguments to the task which is being handled.
  #
  # @!visibility private
  def fallback_action(*args)
    case fallback_policy
    when :abort
      lambda { raise RejectedExecutionError }
    when :discard
      lambda { false }
    when :caller_runs
      lambda {
        begin
          yield(*args)
        rescue => ex
          # let it fail
          log DEBUG, ex
        end
        true
      }
    else
      lambda { fail "Unknown fallback policy #{fallback_policy}" }
    end
  end

  def ns_execute(*args, &task)
    raise NotImplementedError
  end

  # @!macro executor_service_method_ns_shutdown_execution
  #
  #   Callback method called when an orderly shutdown has completed.
  #   The default behavior is to signal all waiting threads.
  def ns_shutdown_execution
    # do nothing
  end

  # @!macro executor_service_method_ns_kill_execution
  #
  #   Callback method called when the executor has been killed.
  #   The default behavior is to do nothing.
  def ns_kill_execution
    # do nothing
  end

  def ns_auto_terminate?
    @auto_terminate
  end

end

#shuttingdown?Boolean (readonly)

Is the executor shuttingdown?

Returns:

  • (Boolean)

    true when not running and not shutdown, else false

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb', line 57

class AbstractExecutorService < Synchronization::LockableObject
  include ExecutorService
  include Concern::Deprecation

  # The set of possible fallback policies that may be set at thread pool creation.
  FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze

  # @!macro executor_service_attr_reader_fallback_policy
  attr_reader :fallback_policy

  attr_reader :name

  # Create a new thread pool.
  def initialize(opts = {}, &block)
    super(&nil)
    synchronize do
      @auto_terminate = opts.fetch(:auto_terminate, true)
      @name = opts.fetch(:name) if opts.key?(:name)
      ns_initialize(opts, &block)
    end
  end

  def to_s
    name ? "#{super[0..-2]} name: #{name}>" : super
  end

  # @!macro executor_service_method_shutdown
  def shutdown
    raise NotImplementedError
  end

  # @!macro executor_service_method_kill
  def kill
    raise NotImplementedError
  end

  # @!macro executor_service_method_wait_for_termination
  def wait_for_termination(timeout = nil)
    raise NotImplementedError
  end

  # @!macro executor_service_method_running_question
  def running?
    synchronize { ns_running? }
  end

  # @!macro executor_service_method_shuttingdown_question
  def shuttingdown?
    synchronize { ns_shuttingdown? }
  end

  # @!macro executor_service_method_shutdown_question
  def shutdown?
    synchronize { ns_shutdown? }
  end

  # @!macro executor_service_method_auto_terminate_question
  def auto_terminate?
    synchronize { @auto_terminate }
  end

  # @!macro executor_service_method_auto_terminate_setter
  def auto_terminate=(value)
    deprecated "Method #auto_terminate= has no effect. Set :auto_terminate option when executor is initialized."
  end

  private

  # Returns an action which executes the `fallback_policy` once the queue
  # size reaches `max_queue`. The reason for the indirection of an action
  # is so that the work can be deferred outside of synchronization.
  #
  # @param [Array] args the arguments to the task which is being handled.
  #
  # @!visibility private
  def fallback_action(*args)
    case fallback_policy
    when :abort
      lambda { raise RejectedExecutionError }
    when :discard
      lambda { false }
    when :caller_runs
      lambda {
        begin
          yield(*args)
        rescue => ex
          # let it fail
          log DEBUG, ex
        end
        true
      }
    else
      lambda { fail "Unknown fallback policy #{fallback_policy}" }
    end
  end

  def ns_execute(*args, &task)
    raise NotImplementedError
  end

  # @!macro executor_service_method_ns_shutdown_execution
  #
  #   Callback method called when an orderly shutdown has completed.
  #   The default behavior is to signal all waiting threads.
  def ns_shutdown_execution
    # do nothing
  end

  # @!macro executor_service_method_ns_kill_execution
  #
  #   Callback method called when the executor has been killed.
  #   The default behavior is to do nothing.
  def ns_kill_execution
    # do nothing
  end

  def ns_auto_terminate?
    @auto_terminate
  end

end

Instance Method Details

#<<(task) ⇒ self

Submit a task to the executor for asynchronous processing.

Parameters:

  • task (Proc)

    the asynchronous task to perform

Returns:

  • (self)

    returns itself

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb', line 10

class AbstractExecutorService < Synchronization::LockableObject
  include ExecutorService
  include Concern::Deprecation

  # The set of possible fallback policies that may be set at thread pool creation.
  FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze

  # @!macro executor_service_attr_reader_fallback_policy
  attr_reader :fallback_policy

  attr_reader :name

  # Create a new thread pool.
  def initialize(opts = {}, &block)
    super(&nil)
    synchronize do
      @auto_terminate = opts.fetch(:auto_terminate, true)
      @name = opts.fetch(:name) if opts.key?(:name)
      ns_initialize(opts, &block)
    end
  end

  def to_s
    name ? "#{super[0..-2]} name: #{name}>" : super
  end

  # @!macro executor_service_method_shutdown
  def shutdown
    raise NotImplementedError
  end

  # @!macro executor_service_method_kill
  def kill
    raise NotImplementedError
  end

  # @!macro executor_service_method_wait_for_termination
  def wait_for_termination(timeout = nil)
    raise NotImplementedError
  end

  # @!macro executor_service_method_running_question
  def running?
    synchronize { ns_running? }
  end

  # @!macro executor_service_method_shuttingdown_question
  def shuttingdown?
    synchronize { ns_shuttingdown? }
  end

  # @!macro executor_service_method_shutdown_question
  def shutdown?
    synchronize { ns_shutdown? }
  end

  # @!macro executor_service_method_auto_terminate_question
  def auto_terminate?
    synchronize { @auto_terminate }
  end

  # @!macro executor_service_method_auto_terminate_setter
  def auto_terminate=(value)
    deprecated "Method #auto_terminate= has no effect. Set :auto_terminate option when executor is initialized."
  end

  private

  # Returns an action which executes the `fallback_policy` once the queue
  # size reaches `max_queue`. The reason for the indirection of an action
  # is so that the work can be deferred outside of synchronization.
  #
  # @param [Array] args the arguments to the task which is being handled.
  #
  # @!visibility private
  def fallback_action(*args)
    case fallback_policy
    when :abort
      lambda { raise RejectedExecutionError }
    when :discard
      lambda { false }
    when :caller_runs
      lambda {
        begin
          yield(*args)
        rescue => ex
          # let it fail
          log DEBUG, ex
        end
        true
      }
    else
      lambda { fail "Unknown fallback policy #{fallback_policy}" }
    end
  end

  def ns_execute(*args, &task)
    raise NotImplementedError
  end

  # @!macro executor_service_method_ns_shutdown_execution
  #
  #   Callback method called when an orderly shutdown has completed.
  #   The default behavior is to signal all waiting threads.
  def ns_shutdown_execution
    # do nothing
  end

  # @!macro executor_service_method_ns_kill_execution
  #
  #   Callback method called when the executor has been killed.
  #   The default behavior is to do nothing.
  def ns_kill_execution
    # do nothing
  end

  def ns_auto_terminate?
    @auto_terminate
  end

end

#can_overflow?Boolean

Does the task queue have a maximum size?

Returns:

  • (Boolean)

    True if the task queue has a maximum size else false.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb', line 10

class AbstractExecutorService < Synchronization::LockableObject
  include ExecutorService
  include Concern::Deprecation

  # The set of possible fallback policies that may be set at thread pool creation.
  FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze

  # @!macro executor_service_attr_reader_fallback_policy
  attr_reader :fallback_policy

  attr_reader :name

  # Create a new thread pool.
  def initialize(opts = {}, &block)
    super(&nil)
    synchronize do
      @auto_terminate = opts.fetch(:auto_terminate, true)
      @name = opts.fetch(:name) if opts.key?(:name)
      ns_initialize(opts, &block)
    end
  end

  def to_s
    name ? "#{super[0..-2]} name: #{name}>" : super
  end

  # @!macro executor_service_method_shutdown
  def shutdown
    raise NotImplementedError
  end

  # @!macro executor_service_method_kill
  def kill
    raise NotImplementedError
  end

  # @!macro executor_service_method_wait_for_termination
  def wait_for_termination(timeout = nil)
    raise NotImplementedError
  end

  # @!macro executor_service_method_running_question
  def running?
    synchronize { ns_running? }
  end

  # @!macro executor_service_method_shuttingdown_question
  def shuttingdown?
    synchronize { ns_shuttingdown? }
  end

  # @!macro executor_service_method_shutdown_question
  def shutdown?
    synchronize { ns_shutdown? }
  end

  # @!macro executor_service_method_auto_terminate_question
  def auto_terminate?
    synchronize { @auto_terminate }
  end

  # @!macro executor_service_method_auto_terminate_setter
  def auto_terminate=(value)
    deprecated "Method #auto_terminate= has no effect. Set :auto_terminate option when executor is initialized."
  end

  private

  # Returns an action which executes the `fallback_policy` once the queue
  # size reaches `max_queue`. The reason for the indirection of an action
  # is so that the work can be deferred outside of synchronization.
  #
  # @param [Array] args the arguments to the task which is being handled.
  #
  # @!visibility private
  def fallback_action(*args)
    case fallback_policy
    when :abort
      lambda { raise RejectedExecutionError }
    when :discard
      lambda { false }
    when :caller_runs
      lambda {
        begin
          yield(*args)
        rescue => ex
          # let it fail
          log DEBUG, ex
        end
        true
      }
    else
      lambda { fail "Unknown fallback policy #{fallback_policy}" }
    end
  end

  def ns_execute(*args, &task)
    raise NotImplementedError
  end

  # @!macro executor_service_method_ns_shutdown_execution
  #
  #   Callback method called when an orderly shutdown has completed.
  #   The default behavior is to signal all waiting threads.
  def ns_shutdown_execution
    # do nothing
  end

  # @!macro executor_service_method_ns_kill_execution
  #
  #   Callback method called when the executor has been killed.
  #   The default behavior is to do nothing.
  def ns_kill_execution
    # do nothing
  end

  def ns_auto_terminate?
    @auto_terminate
  end

end

#fallback_action(*args) (private)

Returns an action which executes the #fallback_policy once the queue size reaches max_queue. The reason for the indirection of an action is so that the work can be deferred outside of synchronization.

Parameters:

  • args (Array)

    the arguments to the task which is being handled.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb', line 85

def fallback_action(*args)
  case fallback_policy
  when :abort
    lambda { raise RejectedExecutionError }
  when :discard
    lambda { false }
  when :caller_runs
    lambda {
      begin
        yield(*args)
      rescue => ex
        # let it fail
        log DEBUG, ex
      end
      true
    }
  else
    lambda { fail "Unknown fallback policy #{fallback_policy}" }
  end
end

#kill

Begin an immediate shutdown. In-progress tasks will be allowed to complete but enqueued tasks will be dismissed and no new tasks will be accepted. Has no additional effect if the thread pool is not running.

Raises:

  • (NotImplementedError)
[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb', line 42

class AbstractExecutorService < Synchronization::LockableObject
  include ExecutorService
  include Concern::Deprecation

  # The set of possible fallback policies that may be set at thread pool creation.
  FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze

  # @!macro executor_service_attr_reader_fallback_policy
  attr_reader :fallback_policy

  attr_reader :name

  # Create a new thread pool.
  def initialize(opts = {}, &block)
    super(&nil)
    synchronize do
      @auto_terminate = opts.fetch(:auto_terminate, true)
      @name = opts.fetch(:name) if opts.key?(:name)
      ns_initialize(opts, &block)
    end
  end

  def to_s
    name ? "#{super[0..-2]} name: #{name}>" : super
  end

  # @!macro executor_service_method_shutdown
  def shutdown
    raise NotImplementedError
  end

  # @!macro executor_service_method_kill
  def kill
    raise NotImplementedError
  end

  # @!macro executor_service_method_wait_for_termination
  def wait_for_termination(timeout = nil)
    raise NotImplementedError
  end

  # @!macro executor_service_method_running_question
  def running?
    synchronize { ns_running? }
  end

  # @!macro executor_service_method_shuttingdown_question
  def shuttingdown?
    synchronize { ns_shuttingdown? }
  end

  # @!macro executor_service_method_shutdown_question
  def shutdown?
    synchronize { ns_shutdown? }
  end

  # @!macro executor_service_method_auto_terminate_question
  def auto_terminate?
    synchronize { @auto_terminate }
  end

  # @!macro executor_service_method_auto_terminate_setter
  def auto_terminate=(value)
    deprecated "Method #auto_terminate= has no effect. Set :auto_terminate option when executor is initialized."
  end

  private

  # Returns an action which executes the `fallback_policy` once the queue
  # size reaches `max_queue`. The reason for the indirection of an action
  # is so that the work can be deferred outside of synchronization.
  #
  # @param [Array] args the arguments to the task which is being handled.
  #
  # @!visibility private
  def fallback_action(*args)
    case fallback_policy
    when :abort
      lambda { raise RejectedExecutionError }
    when :discard
      lambda { false }
    when :caller_runs
      lambda {
        begin
          yield(*args)
        rescue => ex
          # let it fail
          log DEBUG, ex
        end
        true
      }
    else
      lambda { fail "Unknown fallback policy #{fallback_policy}" }
    end
  end

  def ns_execute(*args, &task)
    raise NotImplementedError
  end

  # @!macro executor_service_method_ns_shutdown_execution
  #
  #   Callback method called when an orderly shutdown has completed.
  #   The default behavior is to signal all waiting threads.
  def ns_shutdown_execution
    # do nothing
  end

  # @!macro executor_service_method_ns_kill_execution
  #
  #   Callback method called when the executor has been killed.
  #   The default behavior is to do nothing.
  def ns_kill_execution
    # do nothing
  end

  def ns_auto_terminate?
    @auto_terminate
  end

end

#ns_execute(*args, &task) (private)

Raises:

  • (NotImplementedError)
[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb', line 106

def ns_execute(*args, &task)
  raise NotImplementedError
end

#ns_kill_execution (private)

Callback method called when the executor has been killed. The default behavior is to do nothing.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb', line 122

def ns_kill_execution
  # do nothing
end

#ns_shutdown_execution (private)

Callback method called when an orderly shutdown has completed. The default behavior is to signal all waiting threads.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb', line 114

def ns_shutdown_execution
  # do nothing
end

#post(*args) { ... } ⇒ Boolean

Submit a task to the executor for asynchronous processing.

Parameters:

  • args (Array)

    zero or more arguments to be passed to the task

Yields:

  • the asynchronous task to perform

Returns:

  • (Boolean)

    true if the task is queued, false if the executor is not running

Raises:

  • (ArgumentError)

    if no task is given

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb', line 10

class AbstractExecutorService < Synchronization::LockableObject
  include ExecutorService
  include Concern::Deprecation

  # The set of possible fallback policies that may be set at thread pool creation.
  FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze

  # @!macro executor_service_attr_reader_fallback_policy
  attr_reader :fallback_policy

  attr_reader :name

  # Create a new thread pool.
  def initialize(opts = {}, &block)
    super(&nil)
    synchronize do
      @auto_terminate = opts.fetch(:auto_terminate, true)
      @name = opts.fetch(:name) if opts.key?(:name)
      ns_initialize(opts, &block)
    end
  end

  def to_s
    name ? "#{super[0..-2]} name: #{name}>" : super
  end

  # @!macro executor_service_method_shutdown
  def shutdown
    raise NotImplementedError
  end

  # @!macro executor_service_method_kill
  def kill
    raise NotImplementedError
  end

  # @!macro executor_service_method_wait_for_termination
  def wait_for_termination(timeout = nil)
    raise NotImplementedError
  end

  # @!macro executor_service_method_running_question
  def running?
    synchronize { ns_running? }
  end

  # @!macro executor_service_method_shuttingdown_question
  def shuttingdown?
    synchronize { ns_shuttingdown? }
  end

  # @!macro executor_service_method_shutdown_question
  def shutdown?
    synchronize { ns_shutdown? }
  end

  # @!macro executor_service_method_auto_terminate_question
  def auto_terminate?
    synchronize { @auto_terminate }
  end

  # @!macro executor_service_method_auto_terminate_setter
  def auto_terminate=(value)
    deprecated "Method #auto_terminate= has no effect. Set :auto_terminate option when executor is initialized."
  end

  private

  # Returns an action which executes the `fallback_policy` once the queue
  # size reaches `max_queue`. The reason for the indirection of an action
  # is so that the work can be deferred outside of synchronization.
  #
  # @param [Array] args the arguments to the task which is being handled.
  #
  # @!visibility private
  def fallback_action(*args)
    case fallback_policy
    when :abort
      lambda { raise RejectedExecutionError }
    when :discard
      lambda { false }
    when :caller_runs
      lambda {
        begin
          yield(*args)
        rescue => ex
          # let it fail
          log DEBUG, ex
        end
        true
      }
    else
      lambda { fail "Unknown fallback policy #{fallback_policy}" }
    end
  end

  def ns_execute(*args, &task)
    raise NotImplementedError
  end

  # @!macro executor_service_method_ns_shutdown_execution
  #
  #   Callback method called when an orderly shutdown has completed.
  #   The default behavior is to signal all waiting threads.
  def ns_shutdown_execution
    # do nothing
  end

  # @!macro executor_service_method_ns_kill_execution
  #
  #   Callback method called when the executor has been killed.
  #   The default behavior is to do nothing.
  def ns_kill_execution
    # do nothing
  end

  def ns_auto_terminate?
    @auto_terminate
  end

end

#serialized?Boolean

Does this executor guarantee serialization of its operations?

Returns:

  • (Boolean)

    True if the executor guarantees that all operations will be post in the order they are received and no two operations may occur simultaneously. Else false.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb', line 10

class AbstractExecutorService < Synchronization::LockableObject
  include ExecutorService
  include Concern::Deprecation

  # The set of possible fallback policies that may be set at thread pool creation.
  FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze

  # @!macro executor_service_attr_reader_fallback_policy
  attr_reader :fallback_policy

  attr_reader :name

  # Create a new thread pool.
  def initialize(opts = {}, &block)
    super(&nil)
    synchronize do
      @auto_terminate = opts.fetch(:auto_terminate, true)
      @name = opts.fetch(:name) if opts.key?(:name)
      ns_initialize(opts, &block)
    end
  end

  def to_s
    name ? "#{super[0..-2]} name: #{name}>" : super
  end

  # @!macro executor_service_method_shutdown
  def shutdown
    raise NotImplementedError
  end

  # @!macro executor_service_method_kill
  def kill
    raise NotImplementedError
  end

  # @!macro executor_service_method_wait_for_termination
  def wait_for_termination(timeout = nil)
    raise NotImplementedError
  end

  # @!macro executor_service_method_running_question
  def running?
    synchronize { ns_running? }
  end

  # @!macro executor_service_method_shuttingdown_question
  def shuttingdown?
    synchronize { ns_shuttingdown? }
  end

  # @!macro executor_service_method_shutdown_question
  def shutdown?
    synchronize { ns_shutdown? }
  end

  # @!macro executor_service_method_auto_terminate_question
  def auto_terminate?
    synchronize { @auto_terminate }
  end

  # @!macro executor_service_method_auto_terminate_setter
  def auto_terminate=(value)
    deprecated "Method #auto_terminate= has no effect. Set :auto_terminate option when executor is initialized."
  end

  private

  # Returns an action which executes the `fallback_policy` once the queue
  # size reaches `max_queue`. The reason for the indirection of an action
  # is so that the work can be deferred outside of synchronization.
  #
  # @param [Array] args the arguments to the task which is being handled.
  #
  # @!visibility private
  def fallback_action(*args)
    case fallback_policy
    when :abort
      lambda { raise RejectedExecutionError }
    when :discard
      lambda { false }
    when :caller_runs
      lambda {
        begin
          yield(*args)
        rescue => ex
          # let it fail
          log DEBUG, ex
        end
        true
      }
    else
      lambda { fail "Unknown fallback policy #{fallback_policy}" }
    end
  end

  def ns_execute(*args, &task)
    raise NotImplementedError
  end

  # @!macro executor_service_method_ns_shutdown_execution
  #
  #   Callback method called when an orderly shutdown has completed.
  #   The default behavior is to signal all waiting threads.
  def ns_shutdown_execution
    # do nothing
  end

  # @!macro executor_service_method_ns_kill_execution
  #
  #   Callback method called when the executor has been killed.
  #   The default behavior is to do nothing.
  def ns_kill_execution
    # do nothing
  end

  def ns_auto_terminate?
    @auto_terminate
  end

end

#to_s

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb', line 32

def to_s
  name ? "#{super[0..-2]} name: #{name}>" : super
end

#wait_for_termination(timeout = nil) ⇒ Boolean

Note:

Does not initiate shutdown or termination. Either #shutdown or #kill must be called before this method (or on another thread).

Block until executor shutdown is complete or until timeout seconds have passed.

Parameters:

  • timeout (Integer) (defaults to: nil)

    the maximum number of seconds to wait for shutdown to complete

Returns:

  • (Boolean)

    true if shutdown complete or false on timeout

Raises:

  • (NotImplementedError)
[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb', line 47

class AbstractExecutorService < Synchronization::LockableObject
  include ExecutorService
  include Concern::Deprecation

  # The set of possible fallback policies that may be set at thread pool creation.
  FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze

  # @!macro executor_service_attr_reader_fallback_policy
  attr_reader :fallback_policy

  attr_reader :name

  # Create a new thread pool.
  def initialize(opts = {}, &block)
    super(&nil)
    synchronize do
      @auto_terminate = opts.fetch(:auto_terminate, true)
      @name = opts.fetch(:name) if opts.key?(:name)
      ns_initialize(opts, &block)
    end
  end

  def to_s
    name ? "#{super[0..-2]} name: #{name}>" : super
  end

  # @!macro executor_service_method_shutdown
  def shutdown
    raise NotImplementedError
  end

  # @!macro executor_service_method_kill
  def kill
    raise NotImplementedError
  end

  # @!macro executor_service_method_wait_for_termination
  def wait_for_termination(timeout = nil)
    raise NotImplementedError
  end

  # @!macro executor_service_method_running_question
  def running?
    synchronize { ns_running? }
  end

  # @!macro executor_service_method_shuttingdown_question
  def shuttingdown?
    synchronize { ns_shuttingdown? }
  end

  # @!macro executor_service_method_shutdown_question
  def shutdown?
    synchronize { ns_shutdown? }
  end

  # @!macro executor_service_method_auto_terminate_question
  def auto_terminate?
    synchronize { @auto_terminate }
  end

  # @!macro executor_service_method_auto_terminate_setter
  def auto_terminate=(value)
    deprecated "Method #auto_terminate= has no effect. Set :auto_terminate option when executor is initialized."
  end

  private

  # Returns an action which executes the `fallback_policy` once the queue
  # size reaches `max_queue`. The reason for the indirection of an action
  # is so that the work can be deferred outside of synchronization.
  #
  # @param [Array] args the arguments to the task which is being handled.
  #
  # @!visibility private
  def fallback_action(*args)
    case fallback_policy
    when :abort
      lambda { raise RejectedExecutionError }
    when :discard
      lambda { false }
    when :caller_runs
      lambda {
        begin
          yield(*args)
        rescue => ex
          # let it fail
          log DEBUG, ex
        end
        true
      }
    else
      lambda { fail "Unknown fallback policy #{fallback_policy}" }
    end
  end

  def ns_execute(*args, &task)
    raise NotImplementedError
  end

  # @!macro executor_service_method_ns_shutdown_execution
  #
  #   Callback method called when an orderly shutdown has completed.
  #   The default behavior is to signal all waiting threads.
  def ns_shutdown_execution
    # do nothing
  end

  # @!macro executor_service_method_ns_kill_execution
  #
  #   Callback method called when the executor has been killed.
  #   The default behavior is to do nothing.
  def ns_kill_execution
    # do nothing
  end

  def ns_auto_terminate?
    @auto_terminate
  end

end