Class: Concurrent::ErlangActor::OnPool
Relationships & Source Files | |
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
|
|
Instance Chain:
|
|
Inherits: |
Concurrent::ErlangActor::AbstractActor
|
Defined in: | lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb |
Constant Summary
Class Attribute Summary
Synchronization::Object
- Inherited
Class Method Summary
AbstractActor
- Inherited
Synchronization::Object
- Inherited
.atomic_attribute?, .atomic_attributes, | |
.attr_atomic | Creates methods for reading and writing to a instance variable with volatile (Java) semantic as |
.attr_volatile | Creates methods for reading and writing (as |
.ensure_safe_initialization_when_final_fields_are_present | For testing purposes, quite slow. |
.new | Has to be called by children. |
.safe_initialization!, .define_initialize_atomic_fields |
Synchronization::AbstractObject
- Inherited
Instance Attribute Summary
AbstractActor
- Inherited
Instance Method Summary
- #receive(*rules, timeout: nil, timeout_value: nil, keep: false, &given_block)
- #run(*args, &body)
- #apply_behaviour(message) private
- #inner_run(*args, &body) private
- #internal_receive private
- #terminate_self(reason, value) private
AbstractActor
- Inherited
Concern::Logging
- Included
#log | Logs through Concurrent.global_logger, it can be overridden by setting @logger. |
Synchronization::Object
- Inherited
Synchronization::Volatile
- Included
Synchronization::AbstractObject
- Inherited
Constructor Details
.new(channel, environment, name, executor) ⇒ OnPool
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1060
def initialize(channel, environment, name, executor) super channel, environment, name, executor @Executor = executor @behaviour = [] @keep_behaviour = false end
Instance Method Details
#apply_behaviour(message) (private)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1178
def apply_behaviour( ) @behaviour.each do |rule, job| if rule === @behaviour = [] unless @keep_behaviour return eval_task(, job) end end raise 'should not reach' end
#inner_run(*args, &body) (private)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1094
def inner_run(*args, &body) first = !!body future_body = -> , _actor do kind, reason, value = if .is_a?(::Array) && .first == TERMINATE else begin catch(JUMP) do [NOTHING, :normal, first ? @Environment.instance_exec(*args, &body) : apply_behaviour( )] end rescue => e [TERMINATE, e, nil] end end case kind when TERMINATE reason @Terminated.resolve(reason == :normal, value, reason) reason when RECEIVE Run[inner_run] when NOTHING if @behaviour.empty? reason @Terminated.resolve(reason == :normal, value, reason) reason else Run[inner_run] end else raise "bad kind: #{kind.inspect}" end end if first Promises.future_on(@Executor, nil, self, &future_body) else internal_receive.run(Run::TEST).then(self, &future_body) end end
#internal_receive (private)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1139
def internal_receive raise if @behaviour.empty? rules_matcher = Or[*@behaviour.map(&:first)] matcher = -> m { m.is_a?(Ask) ? rules_matcher === m. : rules_matcher === m } start = nil = case @timeout when 0 Promises.fulfilled_future @Mailbox.try_pop_matching(matcher, TIMEOUT) when Numeric pop = @Mailbox.pop_op_matching(matcher) start = Concurrent.monotonic_time # FIXME (pitr-ch 30-Jan-2019): the scheduled future should be cancelled (Promises.schedule(@timeout) { TIMEOUT } | pop).then(pop) do |, p| if == TIMEOUT && !p.resolve(true, TIMEOUT, nil, false) # timeout raced with probe resolution, take the value instead p.value else end end when nil @Mailbox.pop_op_matching(matcher) else raise end .then(start, self) do |, s, _actor| log DEBUG, pid, got: catch(JUMP) do if ( = consume_signal( )) == NOTHING @timeout = [@timeout + s - Concurrent.monotonic_time, 0].max if s Run[internal_receive] else end end end end
#receive(*rules, timeout: nil, timeout_value: nil, keep: false, &given_block)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1077
def receive(*rules, timeout: nil, timeout_value: nil, keep: false, &given_block) clean_reply err = canonical_rules rules, timeout, timeout_value, given_block raise err if err @keep_behaviour = keep @timeout = timeout @behaviour = rules throw JUMP, [RECEIVE] end