123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::ErlangActor::OnThread

Constant Summary

EnvironmentConstants - Included

ANY, TIMEOUT

Concern::Logging - Included

SEV_LABEL

Class Attribute Summary

Class Method Summary

AbstractActor - Inherited

Synchronization::Object - Inherited

.atomic_attribute?, .atomic_attributes,
.attr_atomic

Creates methods for reading and writing to a instance variable with volatile (Java) semantic as .attr_volatile does.

.attr_volatile

Creates methods for reading and writing (as attr_accessor does) to a instance variable with volatile (Java) semantic.

.ensure_safe_initialization_when_final_fields_are_present

For testing purposes, quite slow.

.new

Has to be called by children.

.safe_initialization!, .define_initialize_atomic_fields

Synchronization::AbstractObject - Inherited

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(channel, environment, name, executor) ⇒ OnThread

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1192

def initialize(channel, environment, name, executor)
  super channel, environment, name, executor
  @Thread = nil
end

Instance Method Details

#receive(*rules, timeout: nil, timeout_value: nil, &given_block)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1223

def receive(*rules, timeout: nil, timeout_value: nil, &given_block)
  clean_reply

  err = canonical_rules rules, timeout, timeout_value, given_block
  raise err if err

  rules_matcher = Or[*rules.map(&:first)]
  matcher       = -> m { m.is_a?(Ask) ? rules_matcher === m.message : rules_matcher === m }
  while true
    message = @Mailbox.pop_matching(matcher, timeout, TIMEOUT)
    log DEBUG, pid, got: message
    unless (message = consume_signal(message)) == NOTHING
      rules.each do |rule, job|
        return eval_task(message, job) if rule === message
      end
    end
  end
end

#run(*args, &body)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1200

def run(*args, &body)
  initial_signal_consumption
  @Thread = Thread.new(@Terminated, self) do |terminated, _actor| # sync point
    Thread.abort_on_exception = true

    final_reason = begin
      reason, value = catch(TERMINATE) do
        [:normal, @Environment.instance_exec(*args, &body)]
      end
      send_exit_messages reason
      terminated.resolve(reason == :normal, value, reason)
      reason
    rescue => e
      send_exit_messages e
      terminated.reject e
      e
    end

    after_termination final_reason
    @Thread = nil
  end
end

#terminate_self(reason, value) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1244

def terminate_self(reason, value)
  throw TERMINATE, [reason, value]
end