123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::ErlangActor::OnPool

Constant Summary

EnvironmentConstants - Included

ANY, TIMEOUT

Concern::Logging - Included

SEV_LABEL

Class Attribute Summary

Class Method Summary

AbstractActor - Inherited

Synchronization::Object - Inherited

.atomic_attribute?, .atomic_attributes,
.attr_atomic

Creates methods for reading and writing to a instance variable with volatile (Java) semantic as .attr_volatile does.

.attr_volatile

Creates methods for reading and writing (as attr_accessor does) to a instance variable with volatile (Java) semantic.

.ensure_safe_initialization_when_final_fields_are_present

For testing purposes, quite slow.

.new

Has to be called by children.

.safe_initialization!, .define_initialize_atomic_fields

Synchronization::AbstractObject - Inherited

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(channel, environment, name, executor) ⇒ OnPool

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1060

def initialize(channel, environment, name, executor)
  super channel, environment, name, executor
  @Executor       = executor
  @behaviour      = []
  @keep_behaviour = false
end

Instance Method Details

#apply_behaviour(message) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1178

def apply_behaviour(message)
  @behaviour.each do |rule, job|
    if rule === message
      @behaviour = [] unless @keep_behaviour
      return eval_task(message, job)
    end
  end
  raise 'should not reach'
end

#inner_run(*args, &body) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1094

def inner_run(*args, &body)
  first       = !!body
  future_body = -> message, _actor do
    kind, reason, value =
        if message.is_a?(::Array) && message.first == TERMINATE
          message
        else
          begin
            catch(JUMP) do
              [NOTHING,
               :normal,
               first ? @Environment.instance_exec(*args, &body) : apply_behaviour(message)]
            end
          rescue => e
            [TERMINATE, e, nil]
          end
        end

    case kind
    when TERMINATE
      send_exit_messages reason
      @Terminated.resolve(reason == :normal, value, reason)
      reason
    when RECEIVE
      Run[inner_run]
    when NOTHING
      if @behaviour.empty?
        send_exit_messages reason
        @Terminated.resolve(reason == :normal, value, reason)
        reason
      else
        Run[inner_run]
      end
    else
      raise "bad kind: #{kind.inspect}"
    end
  end

  if first
    Promises.future_on(@Executor, nil, self, &future_body)
  else
    internal_receive.run(Run::TEST).then(self, &future_body)
  end
end

#internal_receive (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1139

def internal_receive
  raise if @behaviour.empty?
  rules_matcher  = Or[*@behaviour.map(&:first)]
  matcher        = -> m { m.is_a?(Ask) ? rules_matcher === m.message : rules_matcher === m }
  start          = nil
  message_future = case @timeout
                   when 0
                     Promises.fulfilled_future @Mailbox.try_pop_matching(matcher, TIMEOUT)
                   when Numeric
                     pop   = @Mailbox.pop_op_matching(matcher)
                     start = Concurrent.monotonic_time
                     # FIXME (pitr-ch 30-Jan-2019): the scheduled future should be cancelled
                     (Promises.schedule(@timeout) { TIMEOUT } | pop).then(pop) do |message, p|
                       if message == TIMEOUT && !p.resolve(true, TIMEOUT, nil, false)
                         # timeout raced with probe resolution, take the value instead
                         p.value
                       else
                         message
                       end
                     end
                   when nil
                     @Mailbox.pop_op_matching(matcher)
                   else
                     raise
                   end

  message_future.then(start, self) do |message, s, _actor|
    log DEBUG, pid, got: message
    catch(JUMP) do
      if (message = consume_signal(message)) == NOTHING
        @timeout = [@timeout + s - Concurrent.monotonic_time, 0].max if s
        Run[internal_receive]
      else
        message
      end
    end
  end
end

#receive(*rules, timeout: nil, timeout_value: nil, keep: false, &given_block)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1077

def receive(*rules, timeout: nil, timeout_value: nil, keep: false, &given_block)
  clean_reply
  err = canonical_rules rules, timeout, timeout_value, given_block
  raise err if err

  @keep_behaviour = keep
  @timeout        = timeout
  @behaviour      = rules
  throw JUMP, [RECEIVE]
end

#run(*args, &body)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1067

def run(*args, &body)
  body ||= -> { start }

  initial_signal_consumption
  inner_run(*args, &body).
      run(Run::TEST).
      then(&method(:after_termination)).
      rescue { |e| log ERROR, e }
end

#terminate_self(reason, value) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1090

def terminate_self(reason, value)
  throw JUMP, [TERMINATE, reason, value]
end