123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::Actor::Core

Relationships & Source Files
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
Instance Chain:
Inherits: Concurrent::Synchronization::LockableObject
Defined in: lib/concurrent-ruby-edge/concurrent/actor/core.rb

Overview

Note:

Whole class should be considered private. An user should use Contexts and References only.

Note:

devel: core should not block on anything, e.g. it cannot wait on children to terminate that would eat up all threads in task pool and deadlock

Core of the actor.

Constant Summary

Concern::Logging - Included

SEV_LABEL

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Concern::Logging - Included

#log

Logs through Concurrent.global_logger, it can be overridden by setting @logger.

TypeCheck - Included

Synchronization::LockableObject - Inherited

Constructor Details

.new(opts = {}, &block) ⇒ Core

Parameters:

  • block (Proc)

    for class instantiation

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • name (String)
  • actor_class (Context)

    a class to be instantiated defining Actor’s behaviour

  • args (Array<Object>)

    arguments for actor_class instantiation

  • executor, (Executor)
  • link, (true, false)

    atomically link the actor to its parent (default: true)

  • reference (Class)

    a custom descendant of Reference to use

  • behaviour_definition, (Array<Array(Behavior::Abstract, Array<Object>)>)

    array of pairs where each pair is behaviour class and its args, see Behaviour.basic_behaviour_definition

  • initialized, (ResolvableFuture, nil)

    if present it’ll be set or failed after Context initialization

  • parent (Reference, nil)

    **private api** parent of the actor (the one spawning )

  • logger (Proc, nil)

    a proc accepting (level, progname, message = nil, &block) params, can be used to hook actor instance to any logging system, see Concern::Logging

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 50

def initialize(opts = {}, &block)
  super(&nil)
  synchronize { ns_initialize(opts, &block) }
end

Instance Attribute Details

#actor_classContext (readonly)

A subclass of AbstractContext representing Actor’s behaviour.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

#behaviour_definition (readonly)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

#context (readonly)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

#context_class (readonly)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

#executorExecutor (readonly)

Executor which is used to process messages.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

#nameString (readonly)

The name of actor instance, it should be uniq (not enforced). Allows easier orientation between actor instances.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

#pathString (readonly)

Path of this actor. It is used for easier orientation and logging. Path is constructed recursively with: parent.path + self.name up to a Concurrent::Actor.root, e.g. /an_actor/its_child.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

#referenceReference (readonly)

Reference to this actor which can be safely passed around.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

Instance Method Details

#add_child(child)

This method is for internal use only.
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 74

def add_child(child)
  guard!
  Type! child, Reference
  @children.add child
  nil
end

#allocate_context

This method is for internal use only.
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 150

def allocate_context
  @context = @context_class.allocate
end

#behaviour(behaviour_class) ⇒ Behaviour::Abstract?

Parameters:

  • behaviour_class (Class)

Returns:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 138

def behaviour(behaviour_class)
  @behaviours[behaviour_class]
end

#behaviour!(behaviour_class) ⇒ Behaviour::Abstract

Parameters:

  • behaviour_class (Class)

Returns:

Raises:

  • (KeyError)

    when no behaviour

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 145

def behaviour!(behaviour_class)
  @behaviours.fetch behaviour_class
end

#broadcast(public, event)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 131

def broadcast(public, event)
  log(DEBUG) { "event: #{event.inspect} (#{public ? 'public' : 'private'})" }
  @first_behaviour.on_event(public, event)
end

#build_context

This method is for internal use only.
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 155

def build_context
  @context.send :initialize_core, self
  @context.send :initialize, *@args, &@block
end

#childrenArray<Reference>

Returns:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 68

def children
  guard!
  @children.to_a
end

#dead_letter_routing

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 63

def dead_letter_routing
  @context.dead_letter_routing
end

#guard!

ensures that we are inside of the executor

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 102

def guard!
  unless Actor.current == reference
    raise "can be called only inside actor #{reference} but was #{Actor.current}"
  end
end

#initialize_behaviours(opts) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 210

def initialize_behaviours(opts)
  @behaviour_definition = (Type! opts[:behaviour_definition] || @context.behaviour_definition, ::Array).each do |(behaviour, _)|
    Child! behaviour, Behaviour::Abstract
  end
  @behaviours           = {}
  @first_behaviour      = @behaviour_definition.reverse.
      reduce(nil) { |last, (behaviour, *args)| @behaviours[behaviour] = behaviour.new(self, last, opts, *args) }
end

#log(level, message = nil, &block)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 108

def log(level, message = nil, &block)
  super level, @path, message, &block
end

#ns_initialize(opts, &block) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 167

def ns_initialize(opts, &block)
  @mailbox              = ::Array.new
  @serialized_execution = SerializedExecution.new
  @children             = Set.new

  @context_class = Child! opts.fetch(:class), AbstractContext
  allocate_context

  @executor = Type! opts.fetch(:executor, @context.default_executor), Concurrent::AbstractExecutorService

  @reference = (Child! opts[:reference_class] || @context.default_reference_class, Reference).new self
  @name      = (Type! opts.fetch(:name), String, Symbol).to_s

  parent       = opts[:parent]
  @parent_core = (Type! parent, Reference, NilClass) && parent.send(:core)
  if @parent_core.nil? && @name != '/'
    raise 'only root has no parent'
  end

  @path   = @parent_core ? File.join(@parent_core.path, @name) : @name
  @logger = opts[:logger]

  @parent_core.add_child reference if @parent_core

  initialize_behaviours opts

  @args       = opts.fetch(:args, [])
  @block      = block
  initialized = Type! opts[:initialized], Promises::ResolvableFuture, NilClass

  schedule_execution do
    begin
      build_context
      initialized.fulfill reference if initialized
      log DEBUG, 'spawned'
    rescue => ex
      log ERROR, ex
      @first_behaviour.terminate!
      initialized.reject ex if initialized
    end
  end
end

#on_envelope(envelope)

is executed by Reference scheduling processing of new messages can be called from other alternative Reference implementations

Parameters:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 92

def on_envelope(envelope)
  log(DEBUG) { "is  #{envelope.future ? 'asked' : 'told'} #{envelope.message.inspect} by #{envelope.sender}" }
  schedule_execution do
    log(DEBUG) { "was #{envelope.future ? 'asked' : 'told'} #{envelope.message.inspect} by #{envelope.sender} - processing" }
    process_envelope envelope
  end
  nil
end

#parentReference?

A parent Actor. When actor is spawned the Concurrent::Actor.current becomes its parent. When actor is spawned from a thread outside of an actor (Concurrent::Actor.current is nil) Concurrent::Actor.root is assigned.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 58

def parent
  @parent_core && @parent_core.reference
end

#process_envelope(envelope)

This method is for internal use only.
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 161

def process_envelope(envelope)
  @first_behaviour.on_envelope envelope
end

#remove_child(child)

This method is for internal use only.
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 82

def remove_child(child)
  guard!
  Type! child, Reference
  @children.delete child
  nil
end

#schedule_execution

Schedules blocks to be executed on executor sequentially, sets Actress.current

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 114

def schedule_execution
  @serialized_execution.post(@executor) do
    synchronize do
      begin
        Thread.current[:__current_actor__] = reference
        yield
      rescue => e
        log FATAL, e
      ensure
        Thread.current[:__current_actor__] = nil
      end
    end
  end

  nil
end