123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::ErlangActor::Environment

Overview

A class providing environment and methods for actor bodies to run in.

Class Attribute Summary

Class Method Summary

Synchronization::Object - Inherited

.atomic_attribute?, .atomic_attributes,
.attr_atomic

Creates methods for reading and writing to a instance variable with volatile (Java) semantic as .attr_volatile does.

.attr_volatile

Creates methods for reading and writing (as attr_accessor does) to a instance variable with volatile (Java) semantic.

.ensure_safe_initialization_when_final_fields_are_present

For testing purposes, quite slow.

.new

Has to be called by children.

.safe_initialization!, .define_initialize_atomic_fields

Synchronization::AbstractObject - Inherited

Instance Attribute Summary

Instance Method Summary

Synchronization::Object - Inherited

Synchronization::Volatile - Included

Synchronization::AbstractObject - Inherited

Constructor Details

.new(actor, executor) ⇒ Environment (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 471

def initialize(actor, executor)
  super()
  @Actor           = actor
  @DefaultExecutor = executor
end

Instance Attribute Details

#traps?true, false (readonly)

Returns:

  • (true, false)

    does this actor trap exit messages?

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 160

def traps?
  @Actor.traps?
end

Instance Method Details

#default_executorExecutorService

Returns:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 465

def default_executor
  @DefaultExecutor
end

#demonitor(reference, *options) ⇒ true, false

If MonitorRef is a reference which the calling actor obtained by calling #monitor, this monitoring is turned off. If the monitoring is already turned off, nothing happens.

Once demonitor has returned it is guaranteed that no DownSignal message due to the monitor will be placed in the caller’s message queue in the future. A DownSignal message might have been placed in the caller’s message queue prior to the call, though. Therefore, in most cases, it is advisable to remove such a ‘DOWN’ message from the message queue after monitoring has been stopped. demonitor(reference, :flush) can be used if this cleanup is wanted.

The behavior of this method can be viewed as two combined operations: asynchronously send a “demonitor signal” to the monitored actor and ignore any future results of the monitor.

Failure: It is an error if reference refers to a monitoring started by another actor. In that case it may raise an ArgumentError or go unnoticed.

Options:

  • :flush - Remove (one) DownSignal message, if there is one, from the caller’s message queue after monitoring has been stopped. Calling ‘demonitor(pid, :flush)` is equivalent to the following, but more efficient: “`ruby demonitor(pid) receive on(And[DownSignal, -> d { d.reference == reference}], true), timeout: 0, timeout_value: true “`

  • info The returned value is one of the following:

    • true - The monitor was found and removed. In this case no DownSignal message due to this monitor have been nor will be placed in the message queue of the caller.

    • false - The monitor was not found and could not be removed. This probably because someone already has placed a DownSignal message corresponding to this monitor in the caller’s message queue.

    If the info option is combined with the flush option, false will be returned if a flush was needed; otherwise, true.

Parameters:

  • reference (Reference)
  • options (:flush, :info)
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 332

def demonitor(reference, *options)
  @Actor.demonitor(reference, *options)
end

#linked?(pid) ⇒ true, false (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 268

def linked?(pid)
  @Actor.linked? pid
end

#monitor(pid) ⇒ Reference

The calling actor starts monitoring actor with given pid.

A DownSignal message will be sent to the monitoring actor if the actor with given pid dies, or if the actor with given pid does not exist.

The monitoring is turned off either when the DownSignal message is sent, or when #demonitor is called.

Making several calls to monitor for the same pid is not an error; it results in as many, completely independent, monitorings.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 285

def monitor(pid)
  @Actor.monitor(pid)
end

#monitoring?(reference) ⇒ Boolean (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 337

def monitoring?(reference)
  @Actor.monitoring? reference
end

#name ⇒ #to_s

Returns:

  • (#to_s)

    the name od the actor if provided to spawn method

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 154

def name
  pid.name
end

#on(matcher, value = nil, &block)

Helper for constructing a #receive rules

Examples:

receive on(Numeric) { |v| v.succ },
        on(ANY) { terminate :bad_message }

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 185

def on(matcher, value = nil, &block)
  @Actor.on matcher, value, &block
end

#pidPid

Returns:

  • (Pid)

    the pid of this actor

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 149

def pid
  @Actor.pid
end

#receive(*rules, timeout: nil, timeout_value: nil, **options) {|message| ... } ⇒ Object, nothing

Receive a message.

Parameters:

  • rules (::Array(), ::Array(#===), ::Array<::Array(#===, Proc)>)
    • No rule - receive, ‘receive {|m| m.to_s}`

    • or single rule which can be combined with the supplied block - ‘receive(Numeric)`, `receive(Numeric) {|v| v.succ}`

    • or array of matcher-proc pairs - ‘receive on(Numeric) { |v| v*2 }, on(Symbol) { |c| do_command c }`

  • timeout (Numeric)

    how long it should wait for the message

  • timeout_value (Object)

    if rule ‘on(TIMEOUT) { do_something }` is not specified then timeout_value is returned.

  • options (Hash)

    other options specific by type of the actor

Options Hash (**options):

  • :keep (true, false)

    Keep the rules and repeatedly call the associated blocks, until receive is called again.

Yields:

  • (message)

    block to process the message if single matcher is supplied

Yield Parameters:

  • message (Object)

    the received message

Returns:

  • (Object, nothing)

    depends on type of the actor. On thread it blocks until message is available then it returns the message (or a result of a called block). On pool it stops executing and continues with a given block when message becomes available.

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 218

def receive(*rules, timeout: nil, timeout_value: nil, **options, &block)
  @Actor.receive(*rules, timeout: timeout, timeout_value: timeout_value, **options, &block)
end

#reply(value) ⇒ true, false

Shortcut for fulfilling the reply, same as reply_resolution true, value, nil.

Examples:

actor = Concurrent::ErlangActor.spawn(:on_thread) { reply receive * 2 }
actor.ask 2 #=> 4

Parameters:

Returns:

  • (true, false)

    did the sender ask, and was it resolved

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 405

def reply(value)
  # TODO (pitr-ch 08-Feb-2019): consider adding reply? which returns true,false if success, reply method will always return value
  reply_resolution true, value, nil
end

#reply_resolution(fulfilled = true, value = nil, reason = nil) ⇒ true, false

Reply to the sender of the message currently being processed if the actor was asked instead of told. The reply is stored in a Promises::ResolvableFuture so the arguments are same as for Promises::ResolvableFuture#resolve method.

The reply may timeout, then this will fail with false.

Examples:

actor = Concurrent::ErlangActor.spawn(:on_thread) { reply_resolution true, receive * 2, nil }
actor.ask 2 #=> 4

Parameters:

  • fulfilled (true, false) (defaults to: true)
  • value (Object) (defaults to: nil)
  • reason (Object) (defaults to: nil)

Returns:

  • (true, false)

    did the sender ask, and was it resolved before it timed out?

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 426

def reply_resolution(fulfilled = true, value = nil, reason = nil)
  @Actor.reply_resolution(fulfilled, value, reason)
end

#spawn(*args, type: @Actor.class, channel: Promises::Channel.new, environment: Environment, name: nil, executor: default_executor, link: false, monitor: false) {|*args| ... } ⇒ Pid, ::Array(Pid, Reference)

Creates an actor.

Parameters:

  • args (Object)

    arguments for the actor body

  • type (:on_thread, :on_pool)

    of the actor to be created.

  • channel (Channel)

    The mailbox of the actor, by default it has unlimited capacity. Crating the actor with a bounded queue is useful to create backpressure. The channel can be shared with other abstractions but actor has to be the only consumer otherwise internal signals could be lost.

  • environment (Environment, Module)

    A class which is used to run the body of the actor in. It can either be a child of Environment or a module. Module is extended to a new instance of environment, therefore if there is many actors with this module it is better to create a class and use it instead.

  • name (#to_s)

    of the actor. Available by Pid#name or #name and part of Pid#to_s.

  • link (true, false)

    the created actor is atomically created and linked with the calling actor

  • monitor (true, false)

    the created actor is atomically created and monitored by the calling actor

  • executor (ExecutorService)

    The executor service to use to execute the actor on. Applies only to :on_pool actor type.

Yields:

  • (*args)

    the body of the actor. When actor is spawned this block is evaluated until it terminates. The on-thread actor requires a block. The on-poll actor has a default ‘-> { start }`, therefore if not block is given it executes a #start method which needs to be provided with environment.

Returns:

  • (Pid, ::Array(Pid, Reference))

    a pid or a pid-reference pair when monitor is true

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 378

def spawn(*args,
          type: @Actor.class,
          channel: Promises::Channel.new,
          environment: Environment,
          name: nil,
          executor: default_executor,
          link: false,
          monitor: false,
          &body)

  @Actor.spawn(*args,
               type:        type,
               channel:     channel,
               environment: environment,
               name:        name,
               executor:    executor,
               link:        link,
               monitor:     monitor,
               &body)
end

#terminate(pid = nil, reason, value: nil) ⇒ nothing

If pid **is not** provided stops the execution of the calling actor with the exit reason.

If pid is provided, it sends an exit signal with exit reason to the actor identified by pid.

The following behavior apply if reason is any object except :normal or :kill. If pid is not trapping exits, pid itself will exit with exit reason. If pid is trapping exits, the exit signal is transformed into a message Terminated and delivered to the message queue of pid.

If reason is the Symbol :normal, pid will not exit. If it is trapping exits, the exit signal is transformed into a message Terminated and delivered to its message queue.

If reason is the Symbol :kill, that is if exit(pid, :kill) is called, an untrappable exit signal is sent to pid which will unconditionally exit with exit reason :killed.

Since evaluating this function causes the process to terminate, it has no return value.

Parameters:

  • pid (Pid) (defaults to: nil)
  • reason (Object, :normal, :kill)
  • value (Object)

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 460

def terminate(pid = nil, reason, value: nil)
  @Actor.terminate pid, reason, value: value
end

#terminatedPromises::Future

Returns:

  • (Promises::Future)

    a future which is resolved with the final result of the actor that is either the reason for termination or a value if terminated normally.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 144

def terminated
  @Actor.terminated
end

#trap(value = true) ⇒ true, false

When trap is set to true, exit signals arriving to a actor are converted to Terminated messages, which can be received as ordinary messages. If trap is set to false, the actor exits if it receives an exit signal other than normal and the exit signal is propagated to its linked actors. Application actors should normally not trap exits.

Parameters:

  • value (true, false) (defaults to: true)

Returns:

  • (true, false)

    the old value of the flag

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 176

def trap(value = true)
  @Actor.trap(value)
end