123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::Actor::Utils::Pool

Overview

Allows to create a pool of workers and distribute work between them

Examples:

class Worker < Concurrent::Actor::RestartingContext
  def on_message(message)
    p message * 5
  end
end

pool = Concurrent::Actor::Utils::Pool.spawn! 'pool', 5 do |index|
  Worker.spawn name: "worker-#{index}", supervise: true, args: []
end

pool << 'asd' << 2
# prints:
# "asdasdasdasdasd"
# 10

Yields:

  • (balancer, index)

    a block spawning an worker instance. called size times. The worker should be descendant of AbstractWorker and supervised, see example.

Yield Parameters:

  • balancer (Balancer)

    to pass to the worker

  • index (Integer)

    of the worker, usually used in its name

Yield Returns:

  • (Reference)

    the reference of newly created worker

Constant Summary

Concern::Logging - Included

SEV_LABEL

Class Method Summary

Actor::AbstractContext - Inherited

.spawn

Behaves as Concurrent::Actor.spawn but :class is auto-inserted based on receiver so it can be omitted.

.spawn!

behaves as Concurrent::Actor.spawn! but :class is auto-inserted based on receiver so it can be omitted.

.to_spawn_options

Instance Attribute Summary

Instance Method Summary

Actor::RestartingContext - Inherited

Actor::AbstractContext - Inherited

#<<
#ask,
#ask!
#behaviour_definition,
#dead_letter_routing

Defines an actor responsible for dead letters.

#default_executor

override to se different default executor, e.g.

#default_reference_class

override if different class for reference is needed.

#envelope,
#on_event

override to add custom code invocation on internal events like :terminated, :resumed, anError.

#on_message,
#pass

if you want to pass the message to next behaviour, usually Actor::Behaviour::ErrorsOnUnknownMessage

#tell

tell self a message.

#initialize_core, #on_envelope

Actor::InternalDelegations - Included

Concern::Logging - Included

#log

Logs through Concurrent.global_logger, it can be overridden by setting @logger.

Actor::PublicDelegations - Included

Actor::TypeCheck - Included

Constructor Details

.new(size, &worker_initializer) ⇒ Pool

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/utils/pool.rb', line 31

def initialize(size, &worker_initializer)
  @balancer = Balancer.spawn name: :balancer, supervise: true
  @workers  = ::Array.new(size, &worker_initializer)
  @workers.each do |worker|
    Type! worker, Reference
    @balancer << [:subscribe, worker]
  end
end

Instance Method Details

#on_message(message)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/utils/pool.rb', line 40

def on_message(message)
  command, _ = message
  return if [:restarted, :reset, :resumed, :terminated].include? command # ignore events from supervised actors

  envelope_to_redirect = if envelope.future
                           envelope
                         else
                           Envelope.new(envelope.message, Promises.resolvable_future, envelope.sender, envelope.address)
                         end
  envelope_to_redirect.future.on_fulfillment! { @balancer << :subscribe } # TODO check safety of @balancer reading
  redirect @balancer, envelope_to_redirect
end