Module: Concurrent::Actor
Overview
**Edge Features** are under active development and may change frequently.
-
Deprecations are not added before incompatible changes.
-
Edge
version: major is always 0, minor bump means incompatible change, patch bump means compatible change. -
Edge
features may also lack tests and documentation. -
Features developed in
concurrent-ruby-edge
are expected to move toconcurrent-ruby
when finalised.
=== Actor model
- Light-weighted running on thread-pool.
- Inspired by Akka and Erlang.
- Modular.
This Actor model implementation makes actors very cheap to create and discard. Thousands of actors can be created, allowing you to break the program into smaller maintainable pieces, without violating the single responsibility principle.
==== What is an actor model?
Actor-based concurrency is all the rage in some circles. Originally described in 1973, the actor model is a paradigm for creating asynchronous, concurrent objects that is becoming increasingly popular. Much has changed since actors were first written about four decades ago, which has led to a serious fragmentation within the actor community. There is no universally accepted, strict definition of "actor" and actor implementations differ widely between languages and libraries.
Wiki definition is pretty good: The actor model in computer science is a mathematical model of concurrent computation that treats actors as the universal primitives of concurrent digital computation: in response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message received.
==== Why?
Concurrency is hard to get right, actors are one of many ways how to simplify the problem.
==== Quick example
An example:
class Counter < {Concurrent::Actor::Context}
# Include context of an actor which gives this class access to reference
# and other information about the actor
# use initialize as you wish
def initialize(initial_value)
@count = initial_value
end
# override on_message to define actor's behaviour
def ( )
if Integer ===
@count +=
end
end
end #
=== Create new actor naming the instance 'first'.
=== Return value is a reference to the actor, the actual actor is never returned.
counter = {Counter.spawn}(:first, 5)
=== Tell a and forget returning self.
counter.tell(1)
counter << 1
=== (First counter now contains 7.)
=== Send a asking for a result.
counter.ask(0).class
counter.ask(0).value
class Adder < {Concurrent::Actor::RestartingContext} def initialize(init) @count = init end def ( ) case when :add @count += 1 else # pass to ErrorsOnUnknownMessage behaviour, which will just fail pass end end end === <code>link: true</code> makes the actor linked to root actor and supervised === which is default behavior adder = {Adder.spawn}(name: :adder, link: true, args: [1]) # => #<Concurrent::Actor::Reference:0x7fbedd8e3d40 /adder (Adder)> adder.parent # => #<Concurrent::Actor::Reference:0x7fbedbaa1e90 / (Concurrent::Actor::Root)> === tell and forget adder.tell(:add).tell(:add) # => #<Concurrent::Actor::Reference:0x7fbedd8e3d40 /adder (Adder)> === ask to get result adder.ask!(:add) # => 4 === fail the actor adder.ask!(:bad) rescue $! # => #<Concurrent::Actor::UnknownMessage: :bad from #<Thread:0x007fbedb8809b8>> === actor is restarted with initial values adder.ask!(:add) # => 2 adder.ask!(:terminate!) # => true
==== Spawning actors
==== Sending messages
- Reference#tell
Sends the message asynchronously to the actor and immediately returns
self
(the reference) allowing to chain message telling. - Reference#ask
- Reference#ask! Sends the message synchronously and blocks until the message is processed. Raises on error.
Messages are processed in same order as they are sent by a sender. It may interleaved with messages from other senders though.
===== Immutability
Messages sent between actors should be immutable. Gems like
- Algebrick - Typed struct on steroids based on algebraic types and pattern matching
- Hamster - Efficient, Immutable, Thread-Safe Collection classes for Ruby
are very helpful.
require 'algebrick' # => true === Actor protocol definition with Algebrick Protocol = {Algebrick.type} do variants Add = type { fields! a: Numeric, b: Numeric }, Subtract = type { fields! a: Numeric, b: Numeric } end # => Protocol(Add | Subtract) class Calculator < {Concurrent::Actor::RestartingContext} include Algebrick::Matching def ( ) # pattern matching on the message with deconstruction # ~ marks values which are passed to the block match , (on Add.(~any, ~any) do |a, b| a + b end), # or using multi-assignment (on ~Subtract do |(a, b)| a - b end) end end calculator = {Calculator.spawn}('calculator') # => #<Concurrent::Actor::Reference:0x7fbedba52d90 /calculator (Calculator)> addition = calculator.ask Add[1, 2] # => <#Concurrent::Promises::Future:0x7fbedc05f7b0 pending> subtraction = calculator.ask Subtract[1, 0.5] # => <#Concurrent::Promises::Future:0x7fbedd891388 pending> results = (addition & subtraction) # => <#Concurrent::Promises::Future:0x7fbedc04eeb0 pending> results.value! # => [3, 0.5] calculator.ask! :terminate! # => true
==== Actor definition
New actor is defined by subclassing RestartingContext
, Context
and defining its abstract methods. AbstractContext
can be subclassed directly to implement more specific behaviour see Root
implementation.
-
> Basic Context of an
Actor
. It supports only linking and it simply terminates on error. Uses Behaviour.basic_behaviour_definition: -
>
Context
of anActor
for robust systems. It supports supervision, linking, pauses on error. Uses Behaviour.restarting_behaviour_definition
Example of ac actor definition:
Message = {Struct.new} :action, :value class AnActor < {Concurrent::Actor::RestartingContext} def initialize(init) @counter = init end # override #on_message to define actor's behaviour on message received def ( ) case .action when :add @counter = @counter + .value when :subtract @counter = @counter - .value when :value @counter else pass end end # set counter to zero when there is an error def on_event(event) if event == :reset @counter = 0 # ignore initial value end end end an_actor = {AnActor.spawn} name: 'an_actor', args: 10 an_actor << {Message.new}(:add, 1) << {Message.new}(:subtract, 2) an_actor.ask!(Message.new(:value, nil)) # => 9 an_actor << :boo << {Message.new}(:add, 1) an_actor.ask!(Message.new(:value, nil)) # => 1 an_actor << :terminate! # => #<Concurrent::Actor::Reference:0x7fbedc137688 /an_actor (AnActor)>
See methods of AbstractContext
what else can be tweaked, e.g AbstractContext#default_reference_class
==== Reference
Reference
is public interface of Actor
instances. It is used for sending messages and can be freely passed around the application. It also provides some basic information about the actor, see PublicDelegations
.
AdHoc.spawn('printer') { -> { puts } }
# => #<Concurrent::Actor::Reference:0x7fd0d2883218 /printer (Concurrent::Actor::Utils::AdHoc)>
# ^object_id ^path ^context class
==== Garbage collection
Spawned actor cannot be garbage-collected until it's terminated. There is a reference held in the parent actor.
==== Parent-child relationship, name, and path
- Core#name The name of actor instance, it should be uniq (not enforced). Allows easier orientation between actor instances.
- Core#path
Path of this actor. It is used for easier orientation and logging. Path is constructed recursively with:
parent.path + self.name
up to a .root, e.g./an_actor/its_child
. - Core#parent
A parent
Actor
. When actor is spawned the .current becomes its parent. When actor is spawned from a thread outside of an actor (.current is nil) .root is assigned.
==== Behaviour
Actors have modular architecture, which is achieved by combining a light core with chain of behaviours. Each message or internal event propagates through the chain allowing the behaviours react based on their responsibility.
-
> Links the actor to other actors and sends actor’s events to them, like:
:terminated
,:paused
,:resumed
, errors, etc. Linked actor needs to handle those messages.listener = AdHoc.spawn name: :listener do lambda do || case when Reference if .ask!(:linked?) << :unlink else << :link end else puts "got event #{ .inspect} from #{envelope.sender}" end end end an_actor = AdHoc.spawn name: :an_actor, supervise: true, behaviour_definition: Behaviour.restarting_behaviour_definition do lambda { || raise 'failed'} end # link the actor listener.ask(an_actor).wait an_actor.ask(:fail).wait # unlink the actor listener.ask(an_actor).wait an_actor.ask(:fail).wait an_actor << :terminate!
produces only two events, other events happened after unlinking
got event #<RuntimeError: failed> from #<Concurrent::Actor::Reference /an_actor (Concurrent::Actor::Utils::AdHoc)> got event :reset from #<Concurrent::Actor::Reference /an_actor (Concurrent::Actor::Utils::AdHoc)>
-
> Accepts
:await
messages. Which allows to wait onActor
to process all previously send messages.actor << :a << :b actor.ask(:await).wait # blocks until :a and :b are processed
-
> Allows to pause actors on errors. When paused all arriving messages are collected and processed after the actor is resumed or reset. Resume will simply continue with next message. Reset also reinitialized context.
-
Actor::Behaviour::Supervising
:> Handles supervised actors. Handle configures what to do with failed child: :terminate!, :resume!, :reset!, or :restart!. Strategy sets
:one_for_one
(restarts just failed actor) or:one_for_all
(restarts all child actors). -
Actor::Behaviour::Supervising
:> Handles supervised actors. Handle configures what to do with failed child: :terminate!, :resume!, :reset!, or :restart!. Strategy sets
:one_for_one
(restarts just failed actor) or:one_for_all
(restarts all child actors). -
Actor::Behaviour::ExecutesContext
:> Delegates messages and events to
AbstractContext
instance. -
Actor::Behaviour::ErrorsOnUnknownMessage
:> Simply fails when message arrives here. It’s usually the last behaviour.
-
Actor::Behaviour::Termination
:> Handles actor termination. Waits until all its children are terminated, can be configured on behaviour initialization.
-
Actor::Behaviour::RemovesChild
:> Removes terminated children.
If needed new behaviours can be added, or old one removed to get required behaviour.
-
Context
usesArray
of behaviours and their construction parameters.[[Behaviour::SetResults, :terminate!], [Behaviour::RemovesChild], [Behaviour::Termination], [Behaviour::Linking], [Behaviour::Awaits], [Behaviour::ExecutesContext], [Behaviour::ErrorsOnUnknownMessage]]
-
RestartingContext
usesArray
of behaviours and their construction parameters.[[Behaviour::SetResults, :pause!], [Behaviour::RemovesChild], [Behaviour::Termination], [Behaviour::Linking], [Behaviour::Pausing], [Behaviour::Supervising, :reset!, :one_for_one], [Behaviour::Awaits], [Behaviour::ExecutesContext], [Behaviour::ErrorsOnUnknownMessage]]
==== IO cooperation
Actors are running on shared thread poll which allows user to create many actors cheaply. Downside is that these actors cannot be directly used to do IO or other blocking operations. Blocking operations could starve the global_fast_executor. However there are two options:
- Create an regular actor which will schedule blocking operations in global_io_executor (which is intended for blocking operations) sending results back to self in messages.
- Create an actor using global_io_executor instead of global_fast_executor, e.g.
AnIOActor.spawn name: :blocking, executor: Concurrent.global_io_executor
.
===== Example
require 'concurrent' # => false === {Concurrent.use_simple_logger}(:WARN, STDOUT) === First option is to use operation pool class ActorDoingIO < {Concurrent::Actor::RestartingContext} def ( ) # do IO operation end def default_executor Concurrent.global_io_executor end end actor_doing_io = {ActorDoingIO.spawn} :actor_doing_io # => #<Concurrent::Actor::Reference:0x7fbedc146e80 /actor_doing_io (ActorDoingIO)> actor_doing_io.executor == {Concurrent.global_io_executor} # => true === It can be also built into a pool so there is not too many IO operations class IOWorker < {Concurrent::Actor::Context} def (io_job) # do IO work sleep 0.1 puts "#{path} second:#{(Time.now.to_f*100).floor} message:#{io_job}" end def default_executor Concurrent.global_io_executor end end pool = {Concurrent::Actor::Utils::Pool}.spawn('pool', 2) do |index| IOWorker.spawn(name: "worker-#{index}") end # => #<Concurrent::Actor::Reference:0x7fbedba83378 /pool (Concurrent::Actor::Utils::Pool)> pool << 1 << 2 << 3 << 4 << 5 << 6 # => #<Concurrent::Actor::Reference:0x7fbedba83378 /pool (Concurrent::Actor::Utils::Pool)> === prints two lines each second === /pool/worker-0 second:1414677666 :1 === /pool/worker-1 second:1414677666 :2 === /pool/worker-0 second:1414677667 :3 === /pool/worker-1 second:1414677667 :4 === /pool/worker-0 second:1414677668 :5 === /pool/worker-1 second:1414677668 :6 sleep 1 # => 1
==== Dead letter routing
see AbstractContext#dead_letter_routing description:
Defines an actor responsible for dead letters. Any rejected message send with Reference#tell is sent there, a message with future is considered already monitored for failures. Default behaviour is to use AbstractContext#dead_letter_routing of the parent, so if no AbstractContext#dead_letter_routing method is overridden in parent-chain the message ends up in
Actor.root.dead_letter_routing
agent which will log warning.
==== FAQ
===== What happens if I try to supervise using a normal Context?
Alleged supervisor will receive errors from its supervised actors. They'll have to be handled manually.
===== How to change supervision strategy?
Use option behaviour_definition: {Behaviour.restarting_behaviour_definition}(:resume!)
or
behaviour_definition: {Behaviour.restarting_behaviour_definition}(:reset!, :one_for_all)
===== How to change behaviors?
Any existing behavior can be subclassed
===== How to implement custom restarting?
By subclassing Actor::Behaviour::Pausing
and overriding Behaviour::Pausing#restart!. Implementing
AbstractContext#on_event could be also considered.
We'll be happy to answer any other questions, just open an Issue or find us on https://gitter.im/ruby-concurrency/concurrent-ruby.
==== Speed
Simple benchmark Actor vs Celluloid, the numbers are looking good
but you know how it is with benchmarks. Source code is in
examples/actor/celluloid_benchmark.rb
. It sends numbers between x actors
and adding 1 until certain limit is reached.
Benchmark legend:
- mes. - number of messages send between the actors
- act. - number of actors exchanging the messages
- impl. - which gem is used
===== JRUBY
Rehearsal ---------------------------------------------------------
50000 2 concurrent 26.140000 0.610000 26.750000 ( 7.761000)
50000 2 celluloid 41.440000 5.270000 46.710000 ( 17.535000)
50000 500 concurrent 11.340000 0.180000 11.520000 ( 3.498000)
50000 500 celluloid 19.310000 10.680000 29.990000 ( 14.619000)
50000 1000 concurrent 10.640000 0.180000 10.820000 ( 3.563000)
50000 1000 celluloid 17.840000 19.850000 37.690000 ( 18.892000)
50000 1500 concurrent 14.120000 0.290000 14.410000 ( 4.618000)
50000 1500 celluloid 19.060000 28.920000 47.980000 ( 25.185000)
---------------------------------------------- total: 225.870000sec
mes. act. impl. user system total real
50000 2 concurrent 7.320000 0.530000 7.850000 ( 3.637000)
50000 2 celluloid 13.780000 4.730000 18.510000 ( 10.756000)
50000 500 concurrent 9.270000 0.140000 9.410000 ( 3.020000)
50000 500 celluloid 16.540000 10.920000 27.460000 ( 14.308000)
50000 1000 concurrent 9.970000 0.160000 10.130000 ( 3.445000)
50000 1000 celluloid 15.930000 20.840000 36.770000 ( 18.272000)
50000 1500 concurrent 11.580000 0.240000 11.820000 ( 3.723000)
50000 1500 celluloid 19.440000 29.060000 48.500000 ( 25.227000) (1)
===== MRI 2.1.0
Rehearsal ---------------------------------------------------------
50000 2 concurrent 4.180000 0.080000 4.260000 ( 4.269435)
50000 2 celluloid 7.740000 3.100000 10.840000 ( 10.043875)
50000 500 concurrent 5.900000 1.310000 7.210000 ( 6.565067)
50000 500 celluloid 12.820000 5.810000 18.630000 ( 17.320765)
50000 1000 concurrent 6.080000 1.640000 7.720000 ( 6.931294)
50000 1000 celluloid 17.130000 8.320000 25.450000 ( 23.786146)
50000 1500 concurrent 6.940000 2.030000 8.970000 ( 7.927330)
50000 1500 celluloid 20.980000 12.040000 33.020000 ( 30.849578)
---------------------------------------------- total: 116.100000sec
mes. act. impl. user system total real
50000 2 concurrent 3.730000 0.100000 3.830000 ( 3.822688)
50000 2 celluloid 7.900000 2.910000 10.810000 ( 9.924014)
50000 500 concurrent 5.420000 1.230000 6.650000 ( 6.025579)
50000 500 celluloid 12.720000 5.540000 18.260000 ( 16.889517)
50000 1000 concurrent 5.420000 0.910000 6.330000 ( 5.896689)
50000 1000 celluloid 16.090000 8.040000 24.130000 ( 22.347102)
50000 1500 concurrent 5.580000 0.760000 6.340000 ( 6.038535)
50000 1500 celluloid 20.000000 11.680000 31.680000 ( 29.590774) (1)
Note (1): Celluloid is using thread per actor so this bench is creating about 1500 native threads. Actor is using constant number of threads.
Constant Summary
-
Error =
# File 'lib/concurrent-ruby-edge/concurrent/actor/errors.rb', line 5Class.new(StandardError)
Class Method Summary
- .current ⇒ Reference?
-
.root
A root actor, a default parent of all actors spawned outside an actor.
-
.spawn(*args, &block) ⇒ Reference
Spawns a new actor.
-
.spawn!(*args, &block)
as .spawn but it’ll block until actor is initialized or it’ll raise exception on error.
- .to_spawn_options(context_class, name, *args)
Class Method Details
.current ⇒ Reference?
# File 'lib/concurrent-ruby-edge/concurrent/actor.rb', line 34
def self.current Thread.current[:__current_actor__] end
.root
A root actor, a default parent of all actors spawned outside an actor
# File 'lib/concurrent-ruby-edge/concurrent/actor.rb', line 49
def self.root @root.value! end
.spawn(*args, &block) ⇒ Reference
Spawns a new actor. AbstractContext.spawn allows to omit class parameter. To see the list of available options see Core#initialize
# File 'lib/concurrent-ruby-edge/concurrent/actor.rb', line 72
def self.spawn(*args, &block) = (*args) if [:executor] && [:executor].is_a?(ImmediateExecutor) raise ArgumentError, 'ImmediateExecutor is not supported' end if Actor.current Core.new( .merge(parent: Actor.current), &block).reference else root.ask([:spawn, , block]).value! end end
.spawn!(*args, &block)
as .spawn but it’ll block until actor is initialized or it’ll raise exception on error
# File 'lib/concurrent-ruby-edge/concurrent/actor.rb', line 85
def self.spawn!(*args, &block) spawn( (*args).merge(initialized: future = Concurrent::Promises.resolvable_future), &block).tap { future.wait! } end
.to_spawn_options(context_class, name, *args)
.to_spawn_options(opts)
# File 'lib/concurrent-ruby-edge/concurrent/actor.rb', line 96
def self. (*args) if args.size == 1 && args.first.is_a?(::Hash) args.first else { class: args[0], name: args[1], args: args[2..-1] } end end