Class: Concurrent::Throttle
Relationships & Source Files | |
Namespace Children | |
Classes:
| |
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
|
|
Instance Chain:
|
|
Inherits: |
Concurrent::Synchronization::Object
|
Defined in: | lib/concurrent-ruby-edge/concurrent/edge/throttle.rb |
Overview
**Edge Features** are under active development and may change frequently.
-
Deprecations are not added before incompatible changes.
-
Edge
version: major is always 0, minor bump means incompatible change, patch bump means compatible change. -
Edge
features may also lack tests and documentation. -
Features developed in
concurrent-ruby-edge
are expected to move toconcurrent-ruby
when finalised.
A tool managing concurrency level of tasks. The maximum capacity is set in constructor. Each acquire will lower the available capacity and release will increase it. When there is no available capacity the current thread may either be blocked or an event is returned which will be resolved when capacity becomes available.
The more common usage of the Throttle
is with a proxy executor a_throttle.on(Concurrent.global_io_executor)
. Anything executed on the proxy executor will be throttled and execute on the given executor. There can be more than one proxy executors. All abstractions which execute tasks have option to specify executor, therefore the proxy executor can be injected to any abstraction throttling its concurrency level.
==== Examples
Limiting concurrency level of a concurrently executed block to two
max_two = {Concurrent::Throttle.new} 2
=== => #<Concurrent::Throttle:0x000002 capacity available 2 of 2>
=== used to track concurrency level
concurrency_level = {Concurrent::AtomicFixnum.new}
=== => #<Concurrent::AtomicFixnum:0x000003 value:0>
job = -> do
# increase the current level at the beginning of the throttled block
concurrency_level.increment
# work, takes some time
do_stuff
# read the current concurrency level
current_concurrency_level = concurrency_level.value
# decrement the concurrency level back at the end of the block
concurrency_level.decrement
# return the observed concurrency level
current_concurrency_level
end
=== create 10 threads running concurrently the jobs
{Array.new}(10) do
Thread.new do
max_two.acquire(&job)
end
=== wait for all the threads to finish and read the observed
=== concurrency level in each of them
end.map(&:value) # => [2, 2, 1, 1, 1, 2, 2, 2, 2, 1]
Notice that the returned array has no number bigger than 2 therefore
the concurrency level of the block with the do_stuff
was never bigger than 2.
=== runs a block, and returns the observed concurrency level during the execution
def monitor_concurrency_level(concurrency_level, &block)
concurrency_level.increment
block.call
current_concurrency_level = concurrency_level.value
concurrency_level.decrement
# return the observed concurrency level
return current_concurrency_level
end
throttle = {Concurrent::Throttle.new} 3
=== => #<Concurrent::Throttle:0x000004 capacity available 3 of 3>
concurrency_level = {Concurrent::AtomicFixnum.new}
=== => #<Concurrent::AtomicFixnum:0x000005 value:0>
{Array.new}(10) do |i|
# create throttled future
throttle.future(i) do |arg|
monitor_concurrency_level(concurrency_level) { do_stuff arg }
# fulfill with the observed concurrency level
end
=== collect observed concurrency levels
end.map(&:value!) # => [3, 2, 1, 2, 1, 3, 3, 1, 2, 1]
The concurrency level does not rise above 3.
It works by setting the executor of the future created from the throttle.
The executor is a proxy executor for the Concurrent::Promises.default_executor
which can be obtained using #on method.
Therefore the above example could be instead more explicitly written as follows
=== ...
{Array.new}(10) do |i|
# create throttled future
Concurrent::Promises.future_on(throttle.on(Concurrent::Promises.default_executor)) do
# ...
end
end.map(&:value!)
Anything executed on the proxy executor is throttled. A throttle can have more proxy executors for different executors, all jobs share the same capacity provided by the throttle.
Since the proxy executor becomes the executor of the future, any chained futures will also be throttled. It can be changed by using different executor. It the following example the first 2 futures in the chain are throttled, the last is not.
concurrency_level_throttled = {Concurrent::AtomicFixnum.new}
concurrency_level_unthrottled = {Concurrent::AtomicFixnum.new}
{Array.new}(10) do |i|
throttle.future(i) do
monitor_concurrency_level(concurrency_level_throttled) { do_stuff }
end.then do |v|
[v, monitor_concurrency_level(concurrency_level_throttled) { do_stuff }]
end.then_on(:io) do |l1, l2|
[l1, l2, monitor_concurrency_level(concurrency_level_unthrottled) { 5.times { do_stuff } }]
end
end.map(&:value!)
=== => [[3, 3, 7],
=== [3, 2, 9],
=== [3, 3, 10],
=== [3, 3, 6],
=== [3, 3, 5],
=== [3, 3, 8],
=== [3, 3, 3],
=== [3, 3, 4],
=== [3, 2, 2],
=== [3, 1, 1]]
In the output you can see that the first 2 columns do not cross the 3 capacity limit and the last column which is untroubled does.
TODO (pitr-ch 20-Dec-2018): example with virtual throttled executor, throttling only part of promises chain.
Other abstraction
The proxy executor created with throttle can be used with other abstractions as well and combined.
concurrency_level = {Concurrent::AtomicFixnum.new}
futures = {Array.new}(5) do |i|
# create throttled future
throttle.future(i) do |arg|
monitor_concurrency_level(concurrency_level) { do_stuff arg }
# fulfill with the observed concurrency level
end
end
agents = {Array.new}(5) do |i|
agent = Concurrent::Agent.new 0
# execute agent update on throttled executor
agent.send_via(throttle.on(:io)) { monitor_concurrency_level(concurrency_level_throttled) { do_stuff } }
agent
end
futures.map(&:value!) # => [3, 3, 3, 2, 1]
agents.each { |a| a.await }.map(&:value)
=== => [3, 2, 3, 3, 1]
There is no observed concurrency level above 3.
Class Attribute Summary
Synchronization::Object
- Inherited
Class Method Summary
-
.new(capacity) ⇒ Throttle
constructor
Create throttle.
Synchronization::Object
- Inherited
.atomic_attribute?, .atomic_attributes, | |
.attr_atomic | Creates methods for reading and writing to a instance variable with volatile (Java) semantic as |
.attr_volatile | Creates methods for reading and writing (as |
.ensure_safe_initialization_when_final_fields_are_present | For testing purposes, quite slow. |
.new | Has to be called by children. |
.safe_initialization!, .define_initialize_atomic_fields |
Synchronization::AbstractObject
- Inherited
Instance Method Summary
-
#acquire(timeout = nil) { ... } ⇒ Object, ...
Blocks current thread until there is capacity available in the throttle.
- #available_capacity ⇒ Integer
-
#default_executor ⇒ ExecutorService
Uses executor provided by #on therefore all events and futures created using factory methods on this object will be throttled.
-
#inspect
Alias for #to_s.
- #max_capacity ⇒ Integer
- #on(executor = Promises::FactoryMethods.default_executor) ⇒ ExecutorService
-
#release ⇒ self
Releases previously acquired capacity back to
Throttle
. - #to_s ⇒ String (also: #inspect)
-
#try_acquire ⇒ true, false
Tries to acquire capacity from the throttle.
- #acquire_or_event private
Promises::FactoryMethods
- Included
#any | Alias for Promises::FactoryMethods#any_resolved_future. |
#any_event | Shortcut of |
#any_event_on | Creates a new event which becomes resolved after the first futures_and_or_events resolves. |
#any_fulfilled_future | Shortcut of |
#any_fulfilled_future_on | Creates a new future which is resolved after the first futures_and_or_events is fulfilled. |
#any_resolved_future | Shortcut of |
#any_resolved_future_on | Creates a new future which is resolved after the first futures_and_or_events is resolved. |
#delay | Shortcut of |
#delay_on | Creates a new event or future which is resolved only after it is touched, see |
#fulfilled_future | Creates a resolved future which will be fulfilled with the given value. |
#future | Shortcut of |
#future_on | Constructs a new |
#make_future | General constructor. |
#rejected_future | Creates a resolved future which will be rejected with the given reason. |
#resolvable_event | Shortcut of |
#resolvable_event_on | Creates a resolvable event, user is responsible for resolving the event once by calling Promises::ResolvableEvent#resolve. |
#resolvable_future | Shortcut of |
#resolvable_future_on | Creates resolvable future, user is responsible for resolving the future once by Promises::ResolvableFuture#resolve, Promises::ResolvableFuture#fulfill, or Promises::ResolvableFuture#reject |
#resolved_event | Creates resolved event. |
#resolved_future | Creates a resolved future with will be either fulfilled with the given value or rejected with the given reason. |
#schedule | Shortcut of |
#schedule_on | Creates a new event or future which is resolved in intended_time. |
#zip | Alias for Promises::FactoryMethods#zip_futures. |
#zip_events | Shortcut of |
#zip_events_on | Creates a new event which is resolved after all futures_and_or_events are resolved. |
#zip_futures | Shortcut of |
#zip_futures_on | Creates a new future which is resolved after all futures_and_or_events are resolved. |
#zip_futures_over | Shortcut of |
#zip_futures_over_on | Creates new future which is resolved after all the futures created by future_factory from enumerable elements are resolved. |
Promises::FactoryMethods::OldChannelIntegration
- Included
#select | only proof of concept. |
Promises::FactoryMethods::Configuration
- Included
Synchronization::Object
- Inherited
Synchronization::Volatile
- Included
Synchronization::AbstractObject
- Inherited
Constructor Details
.new(capacity) ⇒ Throttle
Create throttle.
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 37
def initialize(capacity) super() @MaxCapacity = capacity @Queue = LockFreeQueue.new @executor_cache = [nil, nil] self.capacity = capacity end
Instance Method Details
#acquire(timeout = nil) { ... } ⇒ Object, ...
Blocks current thread until there is capacity available in the throttle. The acquired capacity has to be returned to the throttle by calling #release. If block is passed then the block is called after the capacity is acquired and it is automatically released after the block is executed.
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 63
def acquire(timeout = nil, &block) event = acquire_or_event if event within_timeout = event.wait(timeout) # release immediately when acquired later after the timeout since it is unused event.on_resolution!(self, &:release) unless within_timeout else within_timeout = true end called = false if timeout if block if within_timeout called = true block.call else nil end else within_timeout end else if block called = true block.call else self end end ensure release if called end
#acquire_or_event (private)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 140
def acquire_or_event while true current_capacity = capacity if compare_and_set_capacity current_capacity, current_capacity - 1 if current_capacity > 0 return nil else event = Promises.resolvable_event @Queue.push event return event end end end end
#available_capacity ⇒ Integer
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 30
def available_capacity current_capacity = capacity current_capacity >= 0 ? current_capacity : 0 end
#default_executor ⇒ ExecutorService
Uses executor provided by #on therefore all events and futures created using factory methods on this object will be throttled. Overrides Promises::FactoryMethods#default_executor
.
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 183
def default_executor on(super) end
#inspect
Alias for #to_s.
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 137
alias_method :inspect, :to_s
#max_capacity ⇒ Integer
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 46
def max_capacity @MaxCapacity end
#on(executor = Promises::FactoryMethods.default_executor) ⇒ ExecutorService
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 162
def on(executor = Promises::FactoryMethods.default_executor) current_executor, current_cache = @executor_cache return current_cache if current_executor == executor && current_cache if current_executor.nil? # cache first proxy proxy_executor = ProxyExecutor.new(self, Concurrent.executor(executor)) @executor_cache = [executor, proxy_executor] return proxy_executor else # do not cache more than 1 executor ProxyExecutor.new(self, Concurrent.executor(executor)) end end
#release ⇒ self
Releases previously acquired capacity back to Throttle
. Has to be called exactly once for each acquired capacity.
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 118
def release while true current_capacity = capacity if compare_and_set_capacity current_capacity, current_capacity + 1 if current_capacity < 0 # release called after trigger which pushed a trigger, busy wait is ok Thread.pass until (trigger = @Queue.pop) trigger.resolve end return self end end end
#to_s ⇒ String
Also known as: #inspect
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 133
def to_s format '%s capacity available %d of %d>', super[0..-2], capacity, @MaxCapacity end
#try_acquire ⇒ true
, false
Tries to acquire capacity from the throttle. Returns true when there is capacity available. The acquired capacity has to be returned to the throttle by calling #release.
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 102
def try_acquire while true current_capacity = capacity if current_capacity > 0 return true if compare_and_set_capacity( current_capacity, current_capacity - 1) else return false end end end