Class: Concurrent::ErlangActor::AbstractActor
Relationships & Source Files | |
Extension / Inclusion / Inheritance Descendants | |
Subclasses:
|
|
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
|
|
Instance Chain:
|
|
Inherits: |
Concurrent::Synchronization::Object
|
Defined in: | lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb |
Constant Summary
Class Attribute Summary
Synchronization::Object
- Inherited
Class Method Summary
Synchronization::Object
- Inherited
.atomic_attribute?, .atomic_attributes, | |
.attr_atomic | Creates methods for reading and writing to a instance variable with volatile (Java) semantic as |
.attr_volatile | Creates methods for reading and writing (as |
.ensure_safe_initialization_when_final_fields_are_present | For testing purposes, quite slow. |
.new | Has to be called by children. |
.safe_initialization!, .define_initialize_atomic_fields |
Synchronization::AbstractObject
- Inherited
Instance Attribute Summary
- #traps? ⇒ Boolean readonly
- #asked? ⇒ Boolean readonly private
Instance Method Summary
- #ask(message, timeout, timeout_value)
- #ask_op(message, probe)
- #demonitor(reference, *options)
- #link(pid)
- #linked?(pid) ⇒ Boolean
- #monitor(pid)
- #monitoring?(reference) ⇒ Boolean
- #on(matcher, value = nil, &block)
- #pid
- #receive(*rules, timeout: nil, timeout_value: nil, **options, &block)
- #reply_resolution(fulfilled, value, reason)
- #spawn(*args, type:, channel:, environment:, name:, link:, monitor:, executor:, &body)
- #tell(message, timeout = nil)
- #tell_op(message)
- #terminate(pid = nil, reason, value: nil)
- #terminated
- #trap(value = true)
- #unlink(pid)
- #after_termination(final_reason) private
- #canonical_rules(rules, timeout, timeout_value, given_block) private
- #clean_reply(reason = NoReply) private
- #consume_exit(exit_message) private
- #consume_signal(message) private
- #eval_task(message, job) private
- #initial_signal_consumption private
- #send_exit_messages(reason) private
- #terminate_self(reason, value) private
Concern::Logging
- Included
#log | Logs through Concurrent.global_logger, it can be overridden by setting @logger. |
Synchronization::Object
- Inherited
Synchronization::Volatile
- Included
Synchronization::AbstractObject
- Inherited
Constructor Details
.new(mailbox, environment, name, executor) ⇒ AbstractActor
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 652
def initialize(mailbox, environment, name, executor) super() @Mailbox = mailbox @Pid = Pid.new self, name @Linked = ::Set.new @Monitors = {} @Monitoring = {} @MonitoringLateDelivery = {} @Terminated = Promises.resolvable_future @trap = false @reply = nil @Environment = if environment.is_a?(Class) && environment <= Environment environment.new self, executor elsif environment.is_a? Module e = Environment.new self, executor e.extend environment e else raise ArgumentError, "environment has to be a class inheriting from Environment or a module" end end
Instance Attribute Details
#asked? ⇒ Boolean
(readonly, private)
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 930
def asked? !!@reply end
#traps? ⇒ Boolean
(readonly)
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 746
def traps? @trap end
Instance Method Details
#after_termination(final_reason) (private)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1031
def after_termination(final_reason) log DEBUG, @Pid, terminated: final_reason clean_reply NoActor.new(@Pid) while true = @Mailbox.try_pop NOTHING break if == NOTHING case when Monitor # The actor is terminated so we must return NoActor, # even though we still know the reason. # Otherwise it would return different reasons non-deterministically. .from.tell DownSignal.new(@Pid, .reference, NoActor.new(@Pid)) when Link # same as for Monitor .from.tell NoActor.new(@Pid) when Ask .probe.reject(NoActor.new(@Pid), false) else # normal messages and other signals are thrown away end end @Mailbox = nil end
#ask(message, timeout, timeout_value)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 695
def ask(, timeout, timeout_value) log DEBUG, @Pid, asked: if @Terminated.resolved? raise NoActor.new(@Pid) else probe = Promises.resolvable_future question = Ask.new(, probe) if timeout start = Concurrent.monotonic_time in_time = tell question, timeout # recheck it could have in the meantime terminated and drained mailbox raise NoActor.new(@Pid) if @Terminated.resolved? # has to be after resolved check, to catch case where it would return timeout_value # when it was actually terminated to_wait = if in_time time = timeout - (Concurrent.monotonic_time - start) time >= 0 ? time : 0 else 0 end # TODO (pitr-ch 06-Feb-2019): allow negative timeout everywhere, interpret as 0 probe.value! to_wait, timeout_value, [true, nil, nil] else raise NoActor.new(@Pid) if @Terminated.resolved? tell question probe.reject NoActor.new(@Pid), false if @Terminated.resolved? probe.value! end end end
#ask_op(message, probe)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 726
def ask_op(, probe) log DEBUG, @Pid, asked: if @Terminated.resolved? probe.reject NoActor.new(@Pid), false else tell_op(Ask.new(, probe)).then do probe.reject NoActor.new(@Pid), false if @Terminated.resolved? probe end.flat end end
#canonical_rules(rules, timeout, timeout_value, given_block) (private)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 880
def canonical_rules(rules, timeout, timeout_value, given_block) block = given_block || -> v { v } case rules.size when 0 rules.push(on(ANY, &block)) when 1 matcher = rules.first if matcher.is_a?(::Array) && matcher.size == 2 return ArgumentError.new 'a block cannot be given if full rules are used' if given_block else rules.replace([on(matcher, &block)]) end else return ArgumentError.new 'a block cannot be given if full rules are used' if given_block end if timeout # TIMEOUT rule has to be first, to prevent any picking it up ANY has_timeout = nil i = rules.size rules.reverse_each do |r, _| i -= 1 if r == TIMEOUT has_timeout = i break end end rules.unshift(has_timeout ? rules[has_timeout] : on(TIMEOUT, timeout_value)) end nil end
#clean_reply(reason = NoReply) (private)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 934
def clean_reply(reason = NoReply) if @reply @reply.reject(reason, false) @reply = nil end end
#consume_exit(exit_message) (private)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 987
def consume_exit( ) from, reason = if ! .link_terminated || @Linked.delete(from) case reason when :normal if @trap Terminated.new from, reason else if from == @Pid terminate :normal else NOTHING # do nothing end end else if @trap Terminated.new from, reason else terminate reason end end else # *link* *exiting* # send Link # terminate # terminated? # drain signals # generates second Terminated which is dropped here # already processed exit message, do nothing NOTHING end end
#consume_signal(message) (private)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 941
def consume_signal( ) if AbstractSignal === case when Ask @reply = .probe . when Link @Linked.add .from NOTHING when UnLink @Linked.delete .from NOTHING when Monitor @Monitors[ .reference] = .from NOTHING when DeMonitor @Monitors.delete .reference NOTHING when Kill terminate :killed when DownSignal if @Monitoring.delete( .reference) || @MonitoringLateDelivery.delete( .reference) # put into a queue return Down.new( .from, .reference, .info) end # ignore down message if no longer monitoring, and following case # # *monitoring* *monitored* # send Monitor # terminate # terminated? # drain signals # generates second DOWN which is dropped here # already reported as :noproc NOTHING when Terminate consume_exit else raise "unknown message #{}" end else # regular message end end
#demonitor(reference, *options)
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 812
def demonitor(reference, * ) info = .delete :info flush = .delete :flush raise ArgumentError, "bad options #{}" unless .empty? pid = @Monitoring.delete reference demonitoring = !!pid pid.tell DeMonitor.new @Pid, reference if demonitoring if flush # remove (one) down message having reference from mailbox flushed = demonitoring ? !!@Mailbox.try_pop_matching(And[DownSignal, -> m { m.reference == reference }]) : false return info ? !flushed : true end if info return false unless demonitoring if @Mailbox.peek_matching(And[DownSignal, -> m { m.reference == reference }]) @MonitoringLateDelivery[reference] = pid # allow to deliver the message once return false end end return true end
#eval_task(message, job) (private)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 913
def eval_task(, job) if job.is_a? Proc @Environment.instance_exec , &job else job end end
#initial_signal_consumption (private)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1019
def initial_signal_consumption while true = @Mailbox.try_pop break unless consume_signal( ) == NOTHING or raise 'it was not consumable signal' end end
#link(pid)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 766
def link(pid) return true if pid == @Pid if @Linked.add? pid pid.tell Link.new(@Pid) if pid.terminated.resolved? # no race since it only could get NoActor if @trap tell Terminate.new pid, NoActor.new(pid) else @Linked.delete pid raise NoActor.new(pid) end end end true end
#linked?(pid) ⇒ Boolean
#monitor(pid)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 792
def monitor(pid) # *monitoring* *monitored* # send Monitor # terminated? # terminate before getting Monitor # drain signals including the Monitor reference = Reference.new @Monitoring[reference] = pid if pid.terminated.resolved? # always return no-proc when terminated tell DownSignal.new(pid, reference, NoActor.new(pid)) else # otherwise let it race pid.tell Monitor.new(@Pid, reference) # no race, it cannot get anything else than NoActor tell DownSignal.new(pid, reference, NoActor.new(pid)) if pid.terminated.resolved? end reference end
#monitoring?(reference) ⇒ Boolean
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 839
def monitoring?(reference) @Monitoring.include? reference end
#on(matcher, value = nil, &block)
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 757
def on(matcher, value = nil, &block) raise ArgumentError, 'only one of block or value can be supplied' if block && value [matcher, value || block] end
#pid
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 742
def pid @Pid end
#receive(*rules, timeout: nil, timeout_value: nil, **options, &block)
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 762
def receive(*rules, timeout: nil, timeout_value: nil, **, &block) raise NotImplementedError end
#reply_resolution(fulfilled, value, reason)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 860
def reply_resolution(fulfilled, value, reason) return false unless @reply return !!@reply.resolve(fulfilled, value, reason, false) end
#send_exit_messages(reason) (private)
[ GitHub ]#spawn(*args, type:, channel:, environment:, name:, link:, monitor:, executor:, &body)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 843
def spawn(*args, type:, channel:, environment:, name:, link:, monitor:, executor:, &body) actor = ErlangActor.create type, channel, environment, name, executor pid = actor.pid link pid if link ref = (monitor pid if monitor) actor.run(*args, &body) monitor ? [pid, ref] : pid end
#tell(message, timeout = nil)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 685
def tell(, timeout = nil) log DEBUG, @Pid, told: if (mailbox = @Mailbox) timed_out = mailbox.push , timeout timeout ? timed_out : @Pid else timeout ? false : @Pid end end
#tell_op(message)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 676
def tell_op( ) log DEBUG, @Pid, told: if (mailbox = @Mailbox) mailbox.push_op( ).then { @Pid } else Promises.fulfilled_future @Pid end end
#terminate(pid = nil, reason, value: nil)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 865
def terminate(pid = nil, reason, value: nil) if pid # has to send it to itself even if pid equals self.pid if reason == :kill pid.tell Kill.new(@Pid) else pid.tell Terminate.new(@Pid, reason, false) end else terminate_self(reason, value) end end
#terminate_self(reason, value) (private)
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 1027
def terminate_self(reason, value) raise NotImplementedError end
#terminated
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 738
def terminated @Terminated.with_hidden_resolvable end
#trap(value = true)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 750
def trap(value = true) old = @trap # noinspection RubySimplifyBooleanInspection @trap = !!value old end