123456789_123456789_123456789_123456789_123456789_

Examples

Limiting concurrency level of a concurrently executed block to two

max_two = Concurrent::Throttle.new 2
# => #<Concurrent::Throttle:0x000002 capacity available 2 of 2>

# used to track concurrency level
concurrency_level = Concurrent::AtomicFixnum.new
# => #<Concurrent::AtomicFixnum:0x000003 value:0>
job = -> do
  # increase the current level at the beginning of the throttled block    
  concurrency_level.increment
  # work, takes some time
  do_stuff
  # read the current concurrency level 
  current_concurrency_level = concurrency_level.value
  # decrement the concurrency level back at the end of the block            
  concurrency_level.decrement
  # return the observed concurrency level 
  current_concurrency_level
end 

# create 10 threads running concurrently the jobs
Array.new(10) do  
  Thread.new do
    max_two.acquire(&job)   
  end
# wait for all the threads to finish and read the observed 
# concurrency level in each of them   
end.map(&:value)                         # => [2, 2, 1, 1, 1, 2, 2, 2, 2, 1]

Notice that the returned array has no number bigger than 2 therefore the concurrency level of the block with the do_stuff was never bigger than 2.

# runs a block, and returns the observed concurrency level during the execution
def monitor_concurrency_level(concurrency_level, &block)
  concurrency_level.increment
  block.call
  current_concurrency_level = concurrency_level.value
  concurrency_level.decrement
  # return the observed concurrency level
  return current_concurrency_level 
end 

throttle = Concurrent::Throttle.new 3
# => #<Concurrent::Throttle:0x000004 capacity available 3 of 3>
concurrency_level = Concurrent::AtomicFixnum.new 
# => #<Concurrent::AtomicFixnum:0x000005 value:0>

Array.new(10) do |i|
  # create throttled future
  throttle.future(i) do |arg|
    monitor_concurrency_level(concurrency_level) { do_stuff arg }  
    # fulfill with the observed concurrency level
  end
# collect observed concurrency levels   
end.map(&:value!)                        # => [3, 2, 1, 2, 1, 3, 3, 1, 2, 1]

The concurrency level does not rise above 3.

It works by setting the executor of the future created from the throttle. The executor is a proxy executor for the Concurrent::Promises.default_executor which can be obtained using Concurrent::Throttle#on method. Therefore the above example could be instead more explicitly written as follows

# ...
Array.new(10) do |i|
  # create throttled future
  Concurrent::Promises.future_on(throttle.on(Concurrent::Promises.default_executor)) do
    # ...
  end
end.map(&:value!)

Anything executed on the proxy executor is throttled. A throttle can have more proxy executors for different executors, all jobs share the same capacity provided by the throttle.

Since the proxy executor becomes the executor of the future, any chained futures will also be throttled. It can be changed by using different executor. It the following example the first 2 futures in the chain are throttled, the last is not.

concurrency_level_throttled   = Concurrent::AtomicFixnum.new 
concurrency_level_unthrottled = Concurrent::AtomicFixnum.new 
Array.new(10) do |i|
  throttle.future(i) do 
    monitor_concurrency_level(concurrency_level_throttled) { do_stuff } 
  end.then do |v|
    [v, monitor_concurrency_level(concurrency_level_throttled) { do_stuff }]
  end.then_on(:io) do |l1, l2|
    [l1, l2, monitor_concurrency_level(concurrency_level_unthrottled) { 5.times { do_stuff } }]
  end
end.map(&:value!) 
# => [[3, 3, 7],
#     [3, 2, 9],
#     [3, 3, 10],
#     [3, 3, 6],
#     [3, 3, 5],
#     [3, 3, 8],
#     [3, 3, 3],
#     [3, 3, 4],
#     [3, 2, 2],
#     [3, 1, 1]]

In the output you can see that the first 2 columns do not cross the 3 capacity limit and the last column which is untroubled does.

TODO (pitr-ch 20-Dec-2018): example with virtual throttled executor, throttling only part of promises chain.

Other abstraction

The proxy executor created with throttle can be used with other abstractions as well and combined.

concurrency_level = Concurrent::AtomicFixnum.new 
futures = Array.new(5) do |i|
  # create throttled future
  throttle.future(i) do |arg|
    monitor_concurrency_level(concurrency_level) { do_stuff arg }  
    # fulfill with the observed concurrency level
  end
end 
agents = Array.new(5) do |i|
  agent = Concurrent::Agent.new 0
  # execute agent update on throttled executor
  agent.send_via(throttle.on(:io)) { monitor_concurrency_level(concurrency_level_throttled) { do_stuff } }
  agent 
end 
futures.map(&:value!)                    # => [3, 3, 3, 2, 1]
agents.each { |a| a.await }.map(&:value) 
# => [3, 2, 3, 3, 1]

There is no observed concurrency level above 3.