123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::WrappingExecutor

Overview

A delegating executor which modifies each task with arguments before the task is given to the target executor it delegates to.

Examples:

Count task executions

counter          = AtomicFixnum.new
count_executions = WrappingExecutor.new Concurrent.global_io_executor do |*args, &task|
  [*args, -> *args { counter.increment; task.call *args }]
end
10.times { count_executions.post { :do_something } }
sleep 0.01
counter.value #=> 10

Constant Summary

Concern::Logging - Included

SEV_LABEL

Class Attribute Summary

Class Method Summary

Synchronization::Object - Inherited

.atomic_attribute?, .atomic_attributes,
.attr_atomic

Creates methods for reading and writing to a instance variable with volatile (Java) semantic as .attr_volatile does.

.attr_volatile

Creates methods for reading and writing (as attr_accessor does) to a instance variable with volatile (Java) semantic.

.ensure_safe_initialization_when_final_fields_are_present

For testing purposes, quite slow.

.new

Has to be called by children.

.safe_initialization!, .define_initialize_atomic_fields

Synchronization::AbstractObject - Inherited

Instance Attribute Summary

ExecutorService - Included

#can_overflow?

Does the task queue have a maximum size?

#serialized?

Does this executor guarantee serialization of its operations?

Instance Method Summary

ExecutorService - Included

#<<

Submit a task to the executor for asynchronous processing.

#post

Submit a task to the executor for asynchronous processing.

Concern::Logging - Included

#log

Logs through global_logger, it can be overridden by setting @logger.

Synchronization::Object - Inherited

Synchronization::Volatile - Included

Synchronization::AbstractObject - Inherited

Constructor Details

.new(executor) {|*args, &task| ... } ⇒ WrappingExecutor

Parameters:

  • executor (Executor)

    an executor to delegate the tasks to

Yields:

  • (*args, &task)

    A function which can modify the task with arguments

Yield Parameters:

  • *args (Array<Object>)

    the arguments submitted with the tasks

  • &task (block)

    the task submitted to the executor to be modified

Yield Returns:

  • (Array<Object>)

    a new arguments and task [*args, task] which are submitted to the target executor

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/executor/wrapping_executor.rb', line 26

def initialize(executor, &wrapper)
  super()
  @Wrapper  = wrapper
  @Executor = executor
end

Instance Attribute Details

#can_overflow?Boolean (readonly)

Does the task queue have a maximum size?

Returns:

  • (Boolean)

    True if the task queue has a maximum size else false.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/executor/wrapping_executor.rb', line 41

def can_overflow?
  @Executor.can_overflow?
end

#serialized?Boolean (readonly)

Does this executor guarantee serialization of its operations?

Returns:

  • (Boolean)

    True if the executor guarantees that all operations will be post in the order they are received and no two operations may occur simultaneously. Else false.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/executor/wrapping_executor.rb', line 46

def serialized?
  @Executor.serialized?
end

Instance Method Details

#post(*args) { ... } ⇒ Boolean

Submit a task to the executor for asynchronous processing.

Parameters:

  • args (Array)

    zero or more arguments to be passed to the task

Yields:

  • the asynchronous task to perform

Returns:

  • (Boolean)

    true if the task is queued, false if the executor is not running

Raises:

  • (ArgumentError)

    if no task is given

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/executor/wrapping_executor.rb', line 35

def post(*args, &task)
  *args, task = @Wrapper.call(*args, &task)
  @Executor.post(*args, &task)
end