Module: Concurrent::Actor::Behaviour
Overview
Actors have modular architecture, which is achieved by combining a light core with chain of behaviours. Each message or internal event propagates through the chain allowing the behaviours react based on their responsibility.
-
> Links the actor to other actors and sends actor’s events to them, like:
:terminated
,:paused
,:resumed
, errors, etc. Linked actor needs to handle those messages.listener = AdHoc.spawn name: :listener do lambda do || case when Reference if .ask!(:linked?) << :unlink else << :link end else puts "got event #{ .inspect} from #{envelope.sender}" end end end an_actor = AdHoc.spawn name: :an_actor, supervise: true, behaviour_definition: Behaviour.restarting_behaviour_definition do lambda { || raise 'failed'} end # link the actor listener.ask(an_actor).wait an_actor.ask(:fail).wait # unlink the actor listener.ask(an_actor).wait an_actor.ask(:fail).wait an_actor << :terminate!
produces only two events, other events happened after unlinking
got event #<RuntimeError: failed> from #<Concurrent::Actor::Reference /an_actor (Concurrent::Actor::Utils::AdHoc)> got event :reset from #<Concurrent::Actor::Reference /an_actor (Concurrent::Actor::Utils::AdHoc)>
-
> Accepts
:await
messages. Which allows to wait onActor
to process all previously send messages.actor << :a << :b actor.ask(:await).wait # blocks until :a and :b are processed
-
> Allows to pause actors on errors. When paused all arriving messages are collected and processed after the actor is resumed or reset. Resume will simply continue with next message. Reset also reinitialized context.
-
> Handles supervised actors. Handle configures what to do with failed child: :terminate!, :resume!, :reset!, or :restart!. Strategy sets
:one_for_one
(restarts just failed actor) or:one_for_all
(restarts all child actors). -
> Handles supervised actors. Handle configures what to do with failed child: :terminate!, :resume!, :reset!, or :restart!. Strategy sets
:one_for_one
(restarts just failed actor) or:one_for_all
(restarts all child actors). -
> Delegates messages and events to
AbstractContext
instance. -
> Simply fails when message arrives here. It’s usually the last behaviour.
-
> Handles actor termination. Waits until all its children are terminated, can be configured on behaviour initialization.
-
> Removes terminated children.
If needed new behaviours can be added, or old one removed to get required behaviour.
-
Context
usesArray
of behaviours and their construction parameters.[[Behaviour::SetResults, :terminate!], [Behaviour::RemovesChild], [Behaviour::Termination], [Behaviour::Linking], [Behaviour::Awaits], [Behaviour::ExecutesContext], [Behaviour::ErrorsOnUnknownMessage]]
-
RestartingContext
usesArray
of behaviours and their construction parameters.[[Behaviour::SetResults, :pause!], [Behaviour::RemovesChild], [Behaviour::Termination], [Behaviour::Linking], [Behaviour::Pausing], [Behaviour::Supervising, :reset!, :one_for_one], [Behaviour::Awaits], [Behaviour::ExecutesContext], [Behaviour::ErrorsOnUnknownMessage]]
Constant Summary
-
MESSAGE_PROCESSED =
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 52::Object.new
Class Method Summary
- .base(on_error)
-
.basic_behaviour_definition
Array
of behaviours and their construction parameters. - .linking
-
.restarting_behaviour_definition(handle = :reset!, strategy = :one_for_one)
Array
of behaviours and their construction parameters. - .supervised
- .supervising(handle = :reset!, strategy = :one_for_one)
- .user_messages
Class Method Details
.base(on_error)
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 105
def self.base(on_error) [[SetResults, on_error], # has to be before Termination to be able to remove children from terminated actor RemovesChild, Termination] end
.basic_behaviour_definition
Array
of behaviours and their construction parameters.
[[Behaviour::SetResults, :terminate!],
[Behaviour::RemovesChild],
[Behaviour::Termination],
[Behaviour::Linking],
[Behaviour::Awaits],
[Behaviour::ExecutesContext],
[Behaviour::ErrorsOnUnknownMessage]]
.linking
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 113
def self.linking [Linking] end
.restarting_behaviour_definition(handle = :reset!, strategy = :one_for_one)
Array
of behaviours and their construction parameters.
[[Behaviour::SetResults, :pause!],
[Behaviour::RemovesChild],
[Behaviour::Termination],
[Behaviour::Linking],
[Behaviour::Pausing],
[Behaviour::Supervising, :reset!, :one_for_one],
[Behaviour::Awaits],
[Behaviour::ExecutesContext],
[Behaviour::ErrorsOnUnknownMessage]]
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 96
def self.restarting_behaviour_definition(handle = :reset!, strategy = :one_for_one) [*base(:pause!), *linking, *supervised, *supervising(handle, strategy), * ] end
.supervised
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 118
def self.supervised [Pausing] end
.supervising(handle = :reset!, strategy = :one_for_one)
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 123
def self.supervising(handle = :reset!, strategy = :one_for_one) [[Behaviour::Supervising, handle, strategy]] end
.user_messages
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 128
def self. [Awaits, ExecutesContext, ErrorsOnUnknownMessage] end