Class: Concurrent::Throttle
| Relationships & Source Files | |
| Namespace Children | |
| Classes: | |
| Super Chains via Extension / Inclusion / Inheritance | |
| Class Chain: | |
| Instance Chain: | |
| Inherits: | Concurrent::Synchronization::Object 
 | 
| Defined in: | lib/concurrent-ruby-edge/concurrent/edge/throttle.rb | 
Overview
**Edge Features** are under active development and may change frequently.
- 
Deprecations are not added before incompatible changes. 
- 
Edgeversion: major is always 0, minor bump means incompatible change, patch bump means compatible change.
- 
Edgefeatures may also lack tests and documentation.
- 
Features developed in concurrent-ruby-edgeare expected to move toconcurrent-rubywhen finalised.
A tool managing concurrency level of tasks. The maximum capacity is set in constructor. Each acquire will lower the available capacity and release will increase it. When there is no available capacity the current thread may either be blocked or an event is returned which will be resolved when capacity becomes available.
The more common usage of the Throttle is with a proxy executor a_throttle.on(Concurrent.global_io_executor). Anything executed on the proxy executor will be throttled and execute on the given executor. There can be more than one proxy executors. All abstractions which execute tasks have option to specify executor, therefore the proxy executor can be injected to any abstraction throttling its concurrency level.
==== Examples
Limiting concurrency level of a concurrently executed block to two
max_two = {Concurrent::Throttle.new} 2
=== => #<Concurrent::Throttle:0x000002 capacity available 2 of 2>
=== used to track concurrency level
concurrency_level = {Concurrent::AtomicFixnum.new}
=== => #<Concurrent::AtomicFixnum:0x000003 value:0>
job = -> do
  # increase the current level at the beginning of the throttled block    
  concurrency_level.increment
  # work, takes some time
  do_stuff
  # read the current concurrency level 
  current_concurrency_level = concurrency_level.value
  # decrement the concurrency level back at the end of the block            
  concurrency_level.decrement
  # return the observed concurrency level 
  current_concurrency_level
end 
=== create 10 threads running concurrently the jobs
{Array.new}(10) do  
  Thread.new do
    max_two.acquire(&job)   
  end
=== wait for all the threads to finish and read the observed 
=== concurrency level in each of them   
end.map(&:value)                         # => [2, 2, 1, 1, 1, 2, 2, 2, 2, 1]Notice that the returned array has no number bigger than 2 therefore 
the concurrency level of the block with the do_stuff was never bigger than 2. 
=== runs a block, and returns the observed concurrency level during the execution
def monitor_concurrency_level(concurrency_level, &block)
  concurrency_level.increment
  block.call
  current_concurrency_level = concurrency_level.value
  concurrency_level.decrement
  # return the observed concurrency level
  return current_concurrency_level 
end 
throttle = {Concurrent::Throttle.new} 3
=== => #<Concurrent::Throttle:0x000004 capacity available 3 of 3>
concurrency_level = {Concurrent::AtomicFixnum.new} 
=== => #<Concurrent::AtomicFixnum:0x000005 value:0>
{Array.new}(10) do |i|
  # create throttled future
  throttle.future(i) do |arg|
    monitor_concurrency_level(concurrency_level) { do_stuff arg }  
    # fulfill with the observed concurrency level
  end
=== collect observed concurrency levels   
end.map(&:value!)                        # => [3, 2, 1, 2, 1, 3, 3, 1, 2, 1]The concurrency level does not rise above 3.
It works by setting the executor of the future created from the throttle. 
The executor is a proxy executor for the Concurrent::Promises.default_executor 
which can be obtained using #on method. 
Therefore the above example could be instead more explicitly written as follows
=== ...
{Array.new}(10) do |i|
  # create throttled future
  Concurrent::Promises.future_on(throttle.on(Concurrent::Promises.default_executor)) do
    # ...
  end
end.map(&:value!)Anything executed on the proxy executor is throttled. A throttle can have more proxy executors for different executors, all jobs share the same capacity provided by the throttle.
Since the proxy executor becomes the executor of the future, any chained futures will also be throttled. It can be changed by using different executor. It the following example the first 2 futures in the chain are throttled, the last is not.
concurrency_level_throttled   = {Concurrent::AtomicFixnum.new} 
concurrency_level_unthrottled = {Concurrent::AtomicFixnum.new} 
{Array.new}(10) do |i|
  throttle.future(i) do 
    monitor_concurrency_level(concurrency_level_throttled) { do_stuff } 
  end.then do |v|
    [v, monitor_concurrency_level(concurrency_level_throttled) { do_stuff }]
  end.then_on(:io) do |l1, l2|
    [l1, l2, monitor_concurrency_level(concurrency_level_unthrottled) { 5.times { do_stuff } }]
  end
end.map(&:value!) 
=== => [[3, 3, 7],
===     [3, 2, 9],
===     [3, 3, 10],
===     [3, 3, 6],
===     [3, 3, 5],
===     [3, 3, 8],
===     [3, 3, 3],
===     [3, 3, 4],
===     [3, 2, 2],
===     [3, 1, 1]]In the output you can see that the first 2 columns do not cross the 3 capacity limit and the last column which is untroubled does.
TODO (pitr-ch 20-Dec-2018): example with virtual throttled executor, throttling only part of promises chain.
Other abstraction
The proxy executor created with throttle can be used with other abstractions as well and combined.
concurrency_level = {Concurrent::AtomicFixnum.new} 
futures = {Array.new}(5) do |i|
  # create throttled future
  throttle.future(i) do |arg|
    monitor_concurrency_level(concurrency_level) { do_stuff arg }  
    # fulfill with the observed concurrency level
  end
end 
agents = {Array.new}(5) do |i|
  agent = Concurrent::Agent.new 0
  # execute agent update on throttled executor
  agent.send_via(throttle.on(:io)) { monitor_concurrency_level(concurrency_level_throttled) { do_stuff } }
  agent 
end 
futures.map(&:value!)                    # => [3, 3, 3, 2, 1]
agents.each { |a| a.await }.map(&:value) 
=== => [3, 2, 3, 3, 1]There is no observed concurrency level above 3.
Class Attribute Summary
Synchronization::Object - Inherited
Class Method Summary
- 
    
      .new(capacity)  ⇒ Throttle 
    
    constructor
    Create throttle. 
Synchronization::Object - Inherited
| .atomic_attribute?, .atomic_attributes, | |
| .attr_atomic | Creates methods for reading and writing to a instance variable with volatile (Java) semantic as  | 
| .attr_volatile | Creates methods for reading and writing (as  | 
| .ensure_safe_initialization_when_final_fields_are_present | For testing purposes, quite slow. | 
| .new | Has to be called by children. | 
| .safe_initialization!, .define_initialize_atomic_fields | |
Synchronization::AbstractObject - Inherited
Instance Method Summary
- 
    
      #acquire(timeout = nil) { ... } ⇒ Object, ... 
    
    Blocks current thread until there is capacity available in the throttle. 
- #available_capacity ⇒ Integer
- 
    
      #default_executor  ⇒ ExecutorService 
    
    Uses executor provided by #on therefore all events and futures created using factory methods on this object will be throttled. 
- 
    
      #inspect  
    
    Alias for #to_s. 
- #max_capacity ⇒ Integer
- #on(executor = Promises::FactoryMethods.default_executor) ⇒ ExecutorService
- 
    
      #release  ⇒ self 
    
    Releases previously acquired capacity back to Throttle.
- #to_s ⇒ String (also: #inspect)
- 
    
      #try_acquire  ⇒ true, false 
    
    Tries to acquire capacity from the throttle. 
- #acquire_or_event private
Promises::FactoryMethods - Included
| #any | Alias for Promises::FactoryMethods#any_resolved_future. | 
| #any_event | Shortcut of  | 
| #any_event_on | Creates a new event which becomes resolved after the first futures_and_or_events resolves. | 
| #any_fulfilled_future | Shortcut of  | 
| #any_fulfilled_future_on | Creates a new future which is resolved after the first futures_and_or_events is fulfilled. | 
| #any_resolved_future | Shortcut of  | 
| #any_resolved_future_on | Creates a new future which is resolved after the first futures_and_or_events is resolved. | 
| #delay | Shortcut of  | 
| #delay_on | Creates a new event or future which is resolved only after it is touched, see  | 
| #fulfilled_future | Creates a resolved future which will be fulfilled with the given value. | 
| #future | Shortcut of  | 
| #future_on | Constructs a new  | 
| #make_future | General constructor. | 
| #rejected_future | Creates a resolved future which will be rejected with the given reason. | 
| #resolvable_event | Shortcut of  | 
| #resolvable_event_on | Creates a resolvable event, user is responsible for resolving the event once by calling Promises::ResolvableEvent#resolve. | 
| #resolvable_future | Shortcut of  | 
| #resolvable_future_on | Creates resolvable future, user is responsible for resolving the future once by Promises::ResolvableFuture#resolve, Promises::ResolvableFuture#fulfill, or Promises::ResolvableFuture#reject | 
| #resolved_event | Creates resolved event. | 
| #resolved_future | Creates a resolved future with will be either fulfilled with the given value or rejected with the given reason. | 
| #schedule | Shortcut of  | 
| #schedule_on | Creates a new event or future which is resolved in intended_time. | 
| #zip | Alias for Promises::FactoryMethods#zip_futures. | 
| #zip_events | Shortcut of  | 
| #zip_events_on | Creates a new event which is resolved after all futures_and_or_events are resolved. | 
| #zip_futures | Shortcut of  | 
| #zip_futures_on | Creates a new future which is resolved after all futures_and_or_events are resolved. | 
| #zip_futures_over | Shortcut of  | 
| #zip_futures_over_on | Creates new future which is resolved after all the futures created by future_factory from enumerable elements are resolved. | 
Promises::FactoryMethods::OldChannelIntegration - Included
| #select | only proof of concept. | 
Promises::FactoryMethods::Configuration - Included
Synchronization::Object - Inherited
Synchronization::Volatile - Included
Synchronization::AbstractObject - Inherited
Constructor Details
    .new(capacity)  ⇒ Throttle 
  
Create throttle.
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 37
def initialize(capacity) super() @MaxCapacity = capacity @Queue = LockFreeQueue.new @executor_cache = [nil, nil] self.capacity = capacity end
Instance Method Details
#acquire(timeout = nil) { ... } ⇒ Object, ...
Blocks current thread until there is capacity available in the throttle. The acquired capacity has to be returned to the throttle by calling #release. If block is passed then the block is called after the capacity is acquired and it is automatically released after the block is executed.
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 63
def acquire(timeout = nil, &block) event = acquire_or_event if event within_timeout = event.wait(timeout) # release immediately when acquired later after the timeout since it is unused event.on_resolution!(self, &:release) unless within_timeout else within_timeout = true end called = false if timeout if block if within_timeout called = true block.call else nil end else within_timeout end else if block called = true block.call else self end end ensure release if called end
#acquire_or_event (private)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 140
def acquire_or_event while true current_capacity = capacity if compare_and_set_capacity current_capacity, current_capacity - 1 if current_capacity > 0 return nil else event = Promises.resolvable_event @Queue.push event return event end end end end
    #available_capacity  ⇒ Integer 
  
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 30
def available_capacity current_capacity = capacity current_capacity >= 0 ? current_capacity : 0 end
#default_executor ⇒ ExecutorService
Uses executor provided by #on therefore all events and futures created using factory methods on this object will be throttled. Overrides Promises::FactoryMethods#default_executor.
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 183
def default_executor on(super) end
#inspect
Alias for #to_s.
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 137
alias_method :inspect, :to_s
    #max_capacity  ⇒ Integer 
  
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 46
def max_capacity @MaxCapacity end
#on(executor = Promises::FactoryMethods.default_executor) ⇒ ExecutorService
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 162
def on(executor = Promises::FactoryMethods.default_executor) current_executor, current_cache = @executor_cache return current_cache if current_executor == executor && current_cache if current_executor.nil? # cache first proxy proxy_executor = ProxyExecutor.new(self, Concurrent.executor(executor)) @executor_cache = [executor, proxy_executor] return proxy_executor else # do not cache more than 1 executor ProxyExecutor.new(self, Concurrent.executor(executor)) end end
    #release  ⇒ self 
  
Releases previously acquired capacity back to Throttle. Has to be called exactly once for each acquired capacity.
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 118
def release while true current_capacity = capacity if compare_and_set_capacity current_capacity, current_capacity + 1 if current_capacity < 0 # release called after trigger which pushed a trigger, busy wait is ok Thread.pass until (trigger = @Queue.pop) trigger.resolve end return self end end end
    #to_s  ⇒ String 
    Also known as: #inspect
  
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 133
def to_s format '%s capacity available %d of %d>', super[0..-2], capacity, @MaxCapacity end
    #try_acquire  ⇒ true, false 
  
Tries to acquire capacity from the throttle. Returns true when there is capacity available. The acquired capacity has to be returned to the throttle by calling #release.
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 102
def try_acquire while true current_capacity = capacity if current_capacity > 0 return true if compare_and_set_capacity( current_capacity, current_capacity - 1) else return false end end end