123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::Throttle

Overview

Note:

**Edge Features** are under active development and may change frequently.

  • Deprecations are not added before incompatible changes.

  • Edge version: major is always 0, minor bump means incompatible change, patch bump means compatible change.

  • Edge features may also lack tests and documentation.

  • Features developed in concurrent-ruby-edge are expected to move to concurrent-ruby when finalised.

A tool managing concurrency level of tasks. The maximum capacity is set in constructor. Each acquire will lower the available capacity and release will increase it. When there is no available capacity the current thread may either be blocked or an event is returned which will be resolved when capacity becomes available.

The more common usage of the Throttle is with a proxy executor a_throttle.on(Concurrent.global_io_executor). Anything executed on the proxy executor will be throttled and execute on the given executor. There can be more than one proxy executors. All abstractions which execute tasks have option to specify executor, therefore the proxy executor can be injected to any abstraction throttling its concurrency level.

==== Examples

Limiting concurrency level of a concurrently executed block to two

max_two = {Concurrent::Throttle.new} 2
=== => #<Concurrent::Throttle:0x000002 capacity available 2 of 2>

=== used to track concurrency level
concurrency_level = {Concurrent::AtomicFixnum.new}
=== => #<Concurrent::AtomicFixnum:0x000003 value:0>
job = -> do
  # increase the current level at the beginning of the throttled block    
  concurrency_level.increment
  # work, takes some time
  do_stuff
  # read the current concurrency level 
  current_concurrency_level = concurrency_level.value
  # decrement the concurrency level back at the end of the block            
  concurrency_level.decrement
  # return the observed concurrency level 
  current_concurrency_level
end 

=== create 10 threads running concurrently the jobs
{Array.new}(10) do  
  Thread.new do
    max_two.acquire(&job)   
  end
=== wait for all the threads to finish and read the observed 
=== concurrency level in each of them   
end.map(&:value)                         # => [2, 2, 1, 1, 1, 2, 2, 2, 2, 1]

Notice that the returned array has no number bigger than 2 therefore the concurrency level of the block with the do_stuff was never bigger than 2.

=== runs a block, and returns the observed concurrency level during the execution
def monitor_concurrency_level(concurrency_level, &block)
  concurrency_level.increment
  block.call
  current_concurrency_level = concurrency_level.value
  concurrency_level.decrement
  # return the observed concurrency level
  return current_concurrency_level 
end 

throttle = {Concurrent::Throttle.new} 3
=== => #<Concurrent::Throttle:0x000004 capacity available 3 of 3>
concurrency_level = {Concurrent::AtomicFixnum.new} 
=== => #<Concurrent::AtomicFixnum:0x000005 value:0>

{Array.new}(10) do |i|
  # create throttled future
  throttle.future(i) do |arg|
    monitor_concurrency_level(concurrency_level) { do_stuff arg }  
    # fulfill with the observed concurrency level
  end
=== collect observed concurrency levels   
end.map(&:value!)                        # => [3, 2, 1, 2, 1, 3, 3, 1, 2, 1]

The concurrency level does not rise above 3.

It works by setting the executor of the future created from the throttle. The executor is a proxy executor for the Concurrent::Promises.default_executor which can be obtained using #on method. Therefore the above example could be instead more explicitly written as follows

=== ...
{Array.new}(10) do |i|
  # create throttled future
  Concurrent::Promises.future_on(throttle.on(Concurrent::Promises.default_executor)) do
    # ...
  end
end.map(&:value!)

Anything executed on the proxy executor is throttled. A throttle can have more proxy executors for different executors, all jobs share the same capacity provided by the throttle.

Since the proxy executor becomes the executor of the future, any chained futures will also be throttled. It can be changed by using different executor. It the following example the first 2 futures in the chain are throttled, the last is not.

concurrency_level_throttled   = {Concurrent::AtomicFixnum.new} 
concurrency_level_unthrottled = {Concurrent::AtomicFixnum.new} 
{Array.new}(10) do |i|
  throttle.future(i) do 
    monitor_concurrency_level(concurrency_level_throttled) { do_stuff } 
  end.then do |v|
    [v, monitor_concurrency_level(concurrency_level_throttled) { do_stuff }]
  end.then_on(:io) do |l1, l2|
    [l1, l2, monitor_concurrency_level(concurrency_level_unthrottled) { 5.times { do_stuff } }]
  end
end.map(&:value!) 
=== => [[3, 3, 7],
===     [3, 2, 9],
===     [3, 3, 10],
===     [3, 3, 6],
===     [3, 3, 5],
===     [3, 3, 8],
===     [3, 3, 3],
===     [3, 3, 4],
===     [3, 2, 2],
===     [3, 1, 1]]

In the output you can see that the first 2 columns do not cross the 3 capacity limit and the last column which is untroubled does.

TODO (pitr-ch 20-Dec-2018): example with virtual throttled executor, throttling only part of promises chain.

Other abstraction

The proxy executor created with throttle can be used with other abstractions as well and combined.

concurrency_level = {Concurrent::AtomicFixnum.new} 
futures = {Array.new}(5) do |i|
  # create throttled future
  throttle.future(i) do |arg|
    monitor_concurrency_level(concurrency_level) { do_stuff arg }  
    # fulfill with the observed concurrency level
  end
end 
agents = {Array.new}(5) do |i|
  agent = Concurrent::Agent.new 0
  # execute agent update on throttled executor
  agent.send_via(throttle.on(:io)) { monitor_concurrency_level(concurrency_level_throttled) { do_stuff } }
  agent 
end 
futures.map(&:value!)                    # => [3, 3, 3, 2, 1]
agents.each { |a| a.await }.map(&:value) 
=== => [3, 2, 3, 3, 1]

There is no observed concurrency level above 3.

Class Attribute Summary

Class Method Summary

Synchronization::Object - Inherited

.atomic_attribute?, .atomic_attributes,
.attr_atomic

Creates methods for reading and writing to a instance variable with volatile (Java) semantic as .attr_volatile does.

.attr_volatile

Creates methods for reading and writing (as attr_accessor does) to a instance variable with volatile (Java) semantic.

.ensure_safe_initialization_when_final_fields_are_present

For testing purposes, quite slow.

.new

Has to be called by children.

.safe_initialization!, .define_initialize_atomic_fields

Synchronization::AbstractObject - Inherited

Instance Method Summary

Promises::FactoryMethods - Included

#any
#any_event

Shortcut of #any_event_on with default :io executor supplied.

#any_event_on

Creates a new event which becomes resolved after the first futures_and_or_events resolves.

#any_fulfilled_future

Shortcut of #any_fulfilled_future_on with default :io executor supplied.

#any_fulfilled_future_on

Creates a new future which is resolved after the first futures_and_or_events is fulfilled.

#any_resolved_future

Shortcut of #any_resolved_future_on with default :io executor supplied.

#any_resolved_future_on

Creates a new future which is resolved after the first futures_and_or_events is resolved.

#delay

Shortcut of #delay_on with default :io executor supplied.

#delay_on

Creates a new event or future which is resolved only after it is touched, see Concurrent::AbstractEventFuture#touch.

#fulfilled_future

Creates a resolved future which will be fulfilled with the given value.

#future

Shortcut of #future_on with default :io executor supplied.

#future_on

Constructs a new Future which will be resolved after block is evaluated on default executor.

#make_future

General constructor.

#rejected_future

Creates a resolved future which will be rejected with the given reason.

#resolvable_event

Shortcut of #resolvable_event_on with default :io executor supplied.

#resolvable_event_on

Creates a resolvable event, user is responsible for resolving the event once by calling Promises::ResolvableEvent#resolve.

#resolvable_future

Shortcut of #resolvable_future_on with default :io executor supplied.

#resolvable_future_on

Creates resolvable future, user is responsible for resolving the future once by Promises::ResolvableFuture#resolve, Promises::ResolvableFuture#fulfill, or Promises::ResolvableFuture#reject

#resolved_event

Creates resolved event.

#resolved_future

Creates a resolved future with will be either fulfilled with the given value or rejected with the given reason.

#schedule

Shortcut of #schedule_on with default :io executor supplied.

#schedule_on

Creates a new event or future which is resolved in intended_time.

#zip
#zip_events

Shortcut of #zip_events_on with default :io executor supplied.

#zip_events_on

Creates a new event which is resolved after all futures_and_or_events are resolved.

#zip_futures

Shortcut of #zip_futures_on with default :io executor supplied.

#zip_futures_on

Creates a new future which is resolved after all futures_and_or_events are resolved.

#zip_futures_over

Shortcut of #zip_futures_over_on with default :io executor supplied.

#zip_futures_over_on

Creates new future which is resolved after all the futures created by future_factory from enumerable elements are resolved.

Promises::FactoryMethods::OldChannelIntegration - Included

#select

only proof of concept.

Promises::FactoryMethods::Configuration - Included

Synchronization::Object - Inherited

Synchronization::Volatile - Included

Synchronization::AbstractObject - Inherited

Constructor Details

.new(capacity) ⇒ Throttle

Create throttle.

Parameters:

  • capacity (Integer)

    How many tasks using this throttle can run at the same time.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 37

def initialize(capacity)
  super()
  @MaxCapacity            = capacity
  @Queue                  = LockFreeQueue.new
  @executor_cache         = [nil, nil]
  self.capacity = capacity
end

Instance Method Details

#acquire(timeout = nil) { ... } ⇒ Object, ...

Blocks current thread until there is capacity available in the throttle. The acquired capacity has to be returned to the throttle by calling #release. If block is passed then the block is called after the capacity is acquired and it is automatically released after the block is executed.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Yields:

  • block to execute after the capacity is acquired

Returns:

  • (Object, self, true, false)
    • When no timeout and no block it returns self

    • When no timeout and with block it returns the result of the block

    • When with timeout and no block it returns true when acquired and false when timed out

    • When with timeout and with block it returns the result of the block of nil on timing out

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 63

def acquire(timeout = nil, &block)
  event = acquire_or_event
  if event
    within_timeout = event.wait(timeout)
    # release immediately when acquired later after the timeout since it is unused
    event.on_resolution!(self, &:release) unless within_timeout
  else
    within_timeout = true
  end

  called = false
  if timeout
    if block
      if within_timeout
        called = true
        block.call
      else
        nil
      end
    else
      within_timeout
    end
  else
    if block
      called = true
      block.call
    else
      self
    end
  end
ensure
  release if called
end

#acquire_or_event (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 140

def acquire_or_event
  while true
    current_capacity = capacity
    if compare_and_set_capacity current_capacity, current_capacity - 1
      if current_capacity > 0
        return nil
      else
        event = Promises.resolvable_event
        @Queue.push event
        return event
      end
    end
  end
end

#available_capacityInteger

Returns:

  • (Integer)

    The available capacity.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 30

def available_capacity
  current_capacity = capacity
  current_capacity >= 0 ? current_capacity : 0
end

#default_executorExecutorService

Uses executor provided by #on therefore all events and futures created using factory methods on this object will be throttled. Overrides Promises::FactoryMethods#default_executor.

See Also:

  • Promises::FactoryMethods#default_executor
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 183

def default_executor
  on(super)
end

#inspect

Alias for #to_s.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 137

alias_method :inspect, :to_s

#max_capacityInteger

Returns:

  • (Integer)

    The maximum capacity.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 46

def max_capacity
  @MaxCapacity
end

#on(executor = Promises::FactoryMethods.default_executor) ⇒ ExecutorService

Examples:

throttling future

a_future.then_on(a_throttle.on(:io)) { a_throttled_task }

Parameters:

  • executor (ExecutorService) (defaults to: Promises::FactoryMethods.default_executor)

Returns:

  • (ExecutorService)

    An executor which wraps given executor and allows to post tasks only as available capacity in the throttle allows.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 162

def on(executor = Promises::FactoryMethods.default_executor)
  current_executor, current_cache = @executor_cache
  return current_cache if current_executor == executor && current_cache

  if current_executor.nil?
    # cache first proxy
    proxy_executor  = ProxyExecutor.new(self, Concurrent.executor(executor))
    @executor_cache = [executor, proxy_executor]
    return proxy_executor
  else
    # do not cache more than 1 executor
    ProxyExecutor.new(self, Concurrent.executor(executor))
  end
end

#releaseself

Releases previously acquired capacity back to Throttle. Has to be called exactly once for each acquired capacity.

See Also:

  • {#acquire}, {#try_acquire}
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 118

def release
  while true
    current_capacity = capacity
    if compare_and_set_capacity current_capacity, current_capacity + 1
      if current_capacity < 0
        # release called after trigger which pushed a trigger, busy wait is ok
        Thread.pass until (trigger = @Queue.pop)
        trigger.resolve
      end
      return self
    end
  end
end

#to_sString Also known as: #inspect

Returns:

  • (String)

    Short string representation.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 133

def to_s
  format '%s capacity available %d of %d>', super[0..-2], capacity, @MaxCapacity
end

#try_acquiretrue, false

Tries to acquire capacity from the throttle. Returns true when there is capacity available. The acquired capacity has to be returned to the throttle by calling #release.

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 102

def try_acquire
  while true
    current_capacity = capacity
    if current_capacity > 0
      return true if compare_and_set_capacity(
          current_capacity, current_capacity - 1)
    else
      return false
    end
  end
end