123456789_123456789_123456789_123456789_123456789_

Module: Concurrent::Actor::Behaviour

Overview

Actors have modular architecture, which is achieved by combining a light core with chain of behaviours. Each message or internal event propagates through the chain allowing the behaviours react based on their responsibility.

  • Linking:

    > Links the actor to other actors and sends actor’s events to them, like: :terminated, :paused, :resumed, errors, etc. Linked actor needs to handle those messages.

    listener = AdHoc.spawn name: :listener do
      lambda do |message|
        case message
        when Reference
          if message.ask!(:linked?)
            message << :unlink
          else
            message << :link
          end
        else
          puts "got event #{message.inspect} from #{envelope.sender}"
        end
      end
    end
    
    an_actor = AdHoc.spawn name: :an_actor, supervise: true, behaviour_definition: Behaviour.restarting_behaviour_definition do
      lambda { |message| raise 'failed'}
    end
    
    # link the actor
    listener.ask(an_actor).wait
    an_actor.ask(:fail).wait
    # unlink the actor
    listener.ask(an_actor).wait
    an_actor.ask(:fail).wait
    an_actor << :terminate!

    produces only two events, other events happened after unlinking

    got event #<RuntimeError: failed> from #<Concurrent::Actor::Reference /an_actor (Concurrent::Actor::Utils::AdHoc)>
    got event :reset from #<Concurrent::Actor::Reference /an_actor (Concurrent::Actor::Utils::AdHoc)>

  • Awaits:

    > Accepts :await messages. Which allows to wait on Actor to process all previously send messages.

    actor << :a << :b
    actor.ask(:await).wait # blocks until :a and :b are processed

  • Pausing:

    > Allows to pause actors on errors. When paused all arriving messages are collected and processed after the actor is resumed or reset. Resume will simply continue with next message. Reset also reinitialized context.

  • Supervising:

    > Handles supervised actors. Handle configures what to do with failed child: :terminate!, :resume!, :reset!, or :restart!. Strategy sets :one_for_one (restarts just failed actor) or :one_for_all (restarts all child actors).

  • Supervising:

    > Handles supervised actors. Handle configures what to do with failed child: :terminate!, :resume!, :reset!, or :restart!. Strategy sets :one_for_one (restarts just failed actor) or :one_for_all (restarts all child actors).

  • ExecutesContext:

    > Delegates messages and events to AbstractContext instance.

  • ErrorsOnUnknownMessage:

    > Simply fails when message arrives here. It’s usually the last behaviour.

  • Termination:

    > Handles actor termination. Waits until all its children are terminated, can be configured on behaviour initialization.

  • RemovesChild:

    > Removes terminated children.

If needed new behaviours can be added, or old one removed to get required behaviour.

Constant Summary

Class Method Summary

Class Method Details

.base(on_error)

See Also:

  • its source code
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 105

def self.base(on_error)
  [[SetResults, on_error],
   # has to be before Termination to be able to remove children from terminated actor
   RemovesChild,
   Termination]
end

.basic_behaviour_definition

Array of behaviours and their construction parameters.

[[Behaviour::SetResults, :terminate!],
 [Behaviour::RemovesChild],
 [Behaviour::Termination],
 [Behaviour::Linking],
 [Behaviour::Awaits],
 [Behaviour::ExecutesContext],
 [Behaviour::ErrorsOnUnknownMessage]]

See Also:

  • its source code
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 77

def self.basic_behaviour_definition
  [*base(:terminate!),
   *linking,
   *user_messages]
end

.linking

See Also:

  • its source code
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 113

def self.linking
  [Linking]
end

.restarting_behaviour_definition(handle = :reset!, strategy = :one_for_one)

Array of behaviours and their construction parameters.

[[Behaviour::SetResults, :pause!],
 [Behaviour::RemovesChild],
 [Behaviour::Termination],
 [Behaviour::Linking],
 [Behaviour::Pausing],
 [Behaviour::Supervising, :reset!, :one_for_one],
 [Behaviour::Awaits],
 [Behaviour::ExecutesContext],
 [Behaviour::ErrorsOnUnknownMessage]]

See Also:

  • its source code
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 96

def self.restarting_behaviour_definition(handle = :reset!, strategy = :one_for_one)
  [*base(:pause!),
   *linking,
   *supervised,
   *supervising(handle, strategy),
   *user_messages]
end

.supervised

See Also:

  • its source code
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 118

def self.supervised
  [Pausing]
end

.supervising(handle = :reset!, strategy = :one_for_one)

See Also:

  • its source code
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 123

def self.supervising(handle = :reset!, strategy = :one_for_one)
  [[Behaviour::Supervising, handle, strategy]]
end

.user_messages

See Also:

  • its source code
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 128

def self.user_messages
  [Awaits,
   ExecutesContext,
   ErrorsOnUnknownMessage]
end