123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::ErlangActor::AbstractActor

Relationships & Source Files
Extension / Inclusion / Inheritance Descendants
Subclasses:
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
Instance Chain:
Inherits: Concurrent::Synchronization::Object
Defined in: lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb

Constant Summary

EnvironmentConstants - Included

ANY, TIMEOUT

Concern::Logging - Included

SEV_LABEL

Class Attribute Summary

Class Method Summary

Synchronization::Object - Inherited

.atomic_attribute?, .atomic_attributes,
.attr_atomic

Creates methods for reading and writing to a instance variable with volatile (Java) semantic as .attr_volatile does.

.attr_volatile

Creates methods for reading and writing (as attr_accessor does) to a instance variable with volatile (Java) semantic.

.ensure_safe_initialization_when_final_fields_are_present

For testing purposes, quite slow.

.new

Has to be called by children.

.safe_initialization!, .define_initialize_atomic_fields

Synchronization::AbstractObject - Inherited

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(mailbox, environment, name, executor) ⇒ AbstractActor

Parameters:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 652

def initialize(mailbox, environment, name, executor)
  super()
  @Mailbox                = mailbox
  @Pid                    = Pid.new self, name
  @Linked                 = ::Set.new
  @Monitors               = {}
  @Monitoring             = {}
  @MonitoringLateDelivery = {}
  @Terminated             = Promises.resolvable_future
  @trap                   = false
  @reply                  = nil

  @Environment = if environment.is_a?(Class) && environment <= Environment
                   environment.new self, executor
                 elsif environment.is_a? Module
                   e = Environment.new self, executor
                   e.extend environment
                   e
                 else
                   raise ArgumentError,
                         "environment has to be a class inheriting from Environment or a module"
                 end
end

Instance Attribute Details

#asked?Boolean (readonly, private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 930

def asked?
  !!@reply
end

#traps?Boolean (readonly)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 746

def traps?
  @trap
end

Instance Method Details

#after_termination(final_reason) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1031

def after_termination(final_reason)
  log DEBUG, @Pid, terminated: final_reason
  clean_reply NoActor.new(@Pid)
  while true
    message = @Mailbox.try_pop NOTHING
    break if message == NOTHING
    case message
    when Monitor
      # The actor is terminated so we must return NoActor,
      # even though we still know the reason.
      # Otherwise it would return different reasons non-deterministically.
      message.from.tell DownSignal.new(@Pid, message.reference, NoActor.new(@Pid))
    when Link
      # same as for Monitor
      message.from.tell NoActor.new(@Pid)
    when Ask
      message.probe.reject(NoActor.new(@Pid), false)
    else
      # normal messages and other signals are thrown away
    end
  end
  @Mailbox = nil
end

#ask(message, timeout, timeout_value)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 695

def ask(message, timeout, timeout_value)
  log DEBUG, @Pid, asked: message
  if @Terminated.resolved?
    raise NoActor.new(@Pid)
  else
    probe    = Promises.resolvable_future
    question = Ask.new(message, probe)
    if timeout
      start   = Concurrent.monotonic_time
      in_time = tell question, timeout
      # recheck it could have in the meantime terminated and drained mailbox
      raise NoActor.new(@Pid) if @Terminated.resolved?
      # has to be after resolved check, to catch case where it would return timeout_value
      # when it was actually terminated
      to_wait = if in_time
                  time = timeout - (Concurrent.monotonic_time - start)
                  time >= 0 ? time : 0
                else
                  0
                end
      # TODO (pitr-ch 06-Feb-2019): allow negative timeout everywhere, interpret as 0
      probe.value! to_wait, timeout_value, [true, nil, nil]
    else
      raise NoActor.new(@Pid) if @Terminated.resolved?
      tell question
      probe.reject NoActor.new(@Pid), false if @Terminated.resolved?
      probe.value!
    end
  end
end

#ask_op(message, probe)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 726

def ask_op(message, probe)
  log DEBUG, @Pid, asked: message
  if @Terminated.resolved?
    probe.reject NoActor.new(@Pid), false
  else
    tell_op(Ask.new(message, probe)).then do
      probe.reject NoActor.new(@Pid), false if @Terminated.resolved?
      probe
    end.flat
  end
end

#canonical_rules(rules, timeout, timeout_value, given_block) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 880

def canonical_rules(rules, timeout, timeout_value, given_block)
  block = given_block || -> v { v }
  case rules.size
  when 0
    rules.push(on(ANY, &block))
  when 1
    matcher = rules.first
    if matcher.is_a?(::Array) && matcher.size == 2
      return ArgumentError.new 'a block cannot be given if full rules are used' if given_block
    else
      rules.replace([on(matcher, &block)])
    end
  else
    return ArgumentError.new 'a block cannot be given if full rules are used' if given_block
  end

  if timeout
    # TIMEOUT rule has to be first, to prevent any picking it up ANY
    has_timeout = nil
    i           = rules.size
    rules.reverse_each do |r, _|
      i -= 1
      if r == TIMEOUT
        has_timeout = i
        break
      end
    end

    rules.unshift(has_timeout ? rules[has_timeout] : on(TIMEOUT, timeout_value))
  end
  nil
end

#clean_reply(reason = NoReply) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 934

def clean_reply(reason = NoReply)
  if @reply
    @reply.reject(reason, false)
    @reply = nil
  end
end

#consume_exit(exit_message) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 987

def consume_exit(exit_message)
  from, reason = exit_message
  if !exit_message.link_terminated || @Linked.delete(from)
    case reason
    when :normal
      if @trap
        Terminated.new from, reason
      else
        if from == @Pid
          terminate :normal
        else
          NOTHING # do nothing
        end
      end
    else
      if @trap
        Terminated.new from, reason
      else
        terminate reason
      end
    end
  else
    # *link*          *exiting*
    # send Link
    #                 terminate
    # terminated?
    #                 drain signals # generates second Terminated which is dropped here
    # already processed exit message, do nothing
    NOTHING
  end
end

#consume_signal(message) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 941

def consume_signal(message)
  if AbstractSignal === message
    case message
    when Ask
      @reply = message.probe
      message.message
    when Link
      @Linked.add message.from
      NOTHING
    when UnLink
      @Linked.delete message.from
      NOTHING
    when Monitor
      @Monitors[message.reference] = message.from
      NOTHING
    when DeMonitor
      @Monitors.delete message.reference
      NOTHING
    when Kill
      terminate :killed
    when DownSignal
      if @Monitoring.delete(message.reference) || @MonitoringLateDelivery.delete(message.reference)
        # put into a queue
        return Down.new(message.from, message.reference, message.info)
      end

      # ignore down message if no longer monitoring, and following case
      #
      # *monitoring*    *monitored*
      # send Monitor
      #                 terminate
      # terminated?
      #                 drain signals # generates second DOWN which is dropped here
      # already reported as :noproc
      NOTHING
    when Terminate
      consume_exit message
    else
      raise "unknown message #{message}"
    end
  else
    # regular message
    message
  end
end

#demonitor(reference, *options)

Raises:

  • (ArgumentError)
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 812

def demonitor(reference, *options)
  info  = options.delete :info
  flush = options.delete :flush
  raise ArgumentError, "bad options #{options}" unless options.empty?

  pid          = @Monitoring.delete reference
  demonitoring = !!pid
  pid.tell DeMonitor.new @Pid, reference if demonitoring

  if flush
    # remove (one) down message having reference from mailbox
    flushed = demonitoring ? !!@Mailbox.try_pop_matching(And[DownSignal, -> m { m.reference == reference }]) : false
    return info ? !flushed : true
  end

  if info
    return false unless demonitoring

    if @Mailbox.peek_matching(And[DownSignal, -> m { m.reference == reference }])
      @MonitoringLateDelivery[reference] = pid # allow to deliver the message once
      return false
    end
  end

  return true
end

#eval_task(message, job) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 913

def eval_task(message, job)
  if job.is_a? Proc
    @Environment.instance_exec message, &job
  else
    job
  end
end

#initial_signal_consumption (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1019

def initial_signal_consumption
  while true
    message = @Mailbox.try_pop
    break unless message
    consume_signal(message) == NOTHING or raise 'it was not consumable signal'
  end
end

#linked?(pid) ⇒ Boolean

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 788

def linked?(pid)
  @Linked.include? pid
end

#monitor(pid)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 792

def monitor(pid)
  # *monitoring*    *monitored*
  # send Monitor
  # terminated?
  #                 terminate before getting Monitor
  #                 drain signals including the Monitor
  reference              = Reference.new
  @Monitoring[reference] = pid
  if pid.terminated.resolved?
    # always return no-proc when terminated
    tell DownSignal.new(pid, reference, NoActor.new(pid))
  else
    # otherwise let it race
    pid.tell Monitor.new(@Pid, reference)
    # no race, it cannot get anything else than NoActor
    tell DownSignal.new(pid, reference, NoActor.new(pid)) if pid.terminated.resolved?
  end
  reference
end

#monitoring?(reference) ⇒ Boolean

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 839

def monitoring?(reference)
  @Monitoring.include? reference
end

#on(matcher, value = nil, &block)

Raises:

  • (ArgumentError)
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 757

def on(matcher, value = nil, &block)
  raise ArgumentError, 'only one of block or value can be supplied' if block && value
  [matcher, value || block]
end

#pid

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 742

def pid
  @Pid
end

#receive(*rules, timeout: nil, timeout_value: nil, **options, &block)

Raises:

  • (NotImplementedError)
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 762

def receive(*rules, timeout: nil, timeout_value: nil, **options, &block)
  raise NotImplementedError
end

#reply_resolution(fulfilled, value, reason)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 860

def reply_resolution(fulfilled, value, reason)
  return false unless @reply
  return !!@reply.resolve(fulfilled, value, reason, false)
end

#send_exit_messages(reason) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 921

def send_exit_messages(reason)
  @Linked.each do |pid|
    pid.tell Terminate.new(@Pid, reason)
  end.clear
  @Monitors.each do |reference, pid|
    pid.tell DownSignal.new(@Pid, reference, reason)
  end.clear
end

#spawn(*args, type:, channel:, environment:, name:, link:, monitor:, executor:, &body)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 843

def spawn(*args,
          type:,
          channel:,
          environment:,
          name:,
          link:,
          monitor:,
          executor:,
          &body)
  actor = ErlangActor.create type, channel, environment, name, executor
  pid   = actor.pid
  link pid if link
  ref = (monitor pid if monitor)
  actor.run(*args, &body)
  monitor ? [pid, ref] : pid
end

#tell(message, timeout = nil)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 685

def tell(message, timeout = nil)
  log DEBUG, @Pid, told: message
  if (mailbox = @Mailbox)
    timed_out = mailbox.push message, timeout
    timeout ? timed_out : @Pid
  else
    timeout ? false : @Pid
  end
end

#tell_op(message)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 676

def tell_op(message)
  log DEBUG, @Pid, told: message
  if (mailbox = @Mailbox)
    mailbox.push_op(message).then { @Pid }
  else
    Promises.fulfilled_future @Pid
  end
end

#terminate(pid = nil, reason, value: nil)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 865

def terminate(pid = nil, reason, value: nil)
  if pid
    # has to send it to itself even if pid equals self.pid
    if reason == :kill
      pid.tell Kill.new(@Pid)
    else
      pid.tell Terminate.new(@Pid, reason, false)
    end
  else
    terminate_self(reason, value)
  end
end

#terminate_self(reason, value) (private)

Raises:

  • (NotImplementedError)
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1027

def terminate_self(reason, value)
  raise NotImplementedError
end

#terminated

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 738

def terminated
  @Terminated.with_hidden_resolvable
end

#trap(value = true)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 750

def trap(value = true)
  old = @trap
  # noinspection RubySimplifyBooleanInspection
  @trap = !!value
  old
end