123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::Actor::Utils::Balancer

Overview

Distributes messages between subscribed actors. Each actor’ll get only one message then it’s unsubscribed. The actor needs to resubscribe when it’s ready to receive next message. It will buffer the messages if there is no worker registered.

See Also:

Constant Summary

Concern::Logging - Included

SEV_LABEL

Class Method Summary

Actor::AbstractContext - Inherited

.spawn

Behaves as Concurrent::Actor.spawn but :class is auto-inserted based on receiver so it can be omitted.

.spawn!

behaves as Concurrent::Actor.spawn! but :class is auto-inserted based on receiver so it can be omitted.

.to_spawn_options

Instance Attribute Summary

Instance Method Summary

Actor::RestartingContext - Inherited

Actor::AbstractContext - Inherited

#<<
#ask,
#ask!
#behaviour_definition,
#dead_letter_routing

Defines an actor responsible for dead letters.

#default_executor

override to se different default executor, e.g.

#default_reference_class

override if different class for reference is needed.

#envelope,
#on_event

override to add custom code invocation on internal events like :terminated, :resumed, anError.

#on_message,
#pass

if you want to pass the message to next behaviour, usually Actor::Behaviour::ErrorsOnUnknownMessage

#tell

tell self a message.

#initialize_core, #on_envelope

Actor::InternalDelegations - Included

Concern::Logging - Included

#log

Logs through Concurrent.global_logger, it can be overridden by setting @logger.

Actor::PublicDelegations - Included

Actor::TypeCheck - Included

Constructor Details

.newBalancer

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/utils/balancer.rb', line 13

def initialize
  @receivers = []
  @buffer    = []
end

Instance Method Details

#distribute

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/utils/balancer.rb', line 37

def distribute
  while !@receivers.empty? && !@buffer.empty?
    redirect @receivers.shift, @buffer.shift
  end
end

#on_message(message)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/utils/balancer.rb', line 18

def on_message(message)
  command, who = message
  case command
  when :subscribe
    @receivers << (who || envelope.sender)
    distribute
    true
  when :unsubscribe
    @receivers.delete(who || envelope.sender)
    true
  when :subscribed?
    @receivers.include?(who || envelope.sender)
  else
    @buffer << envelope
    distribute
    Behaviour::MESSAGE_PROCESSED
  end
end