123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::Actor::Utils::Broadcast

Overview

Allows to build pub/sub easily.

Examples:

news

news_channel = Concurrent::Actor::Utils::Broadcast.spawn :news

2.times do |i|
  Concurrent::Actor::Utils::AdHoc.spawn "listener-#{i}" do
    news_channel << :subscribe
    #=> message { puts message }
  end
end

news_channel << 'Ruby rocks!'
# prints: 'Ruby rocks!' twice

Constant Summary

Concern::Logging - Included

SEV_LABEL

Class Method Summary

Actor::AbstractContext - Inherited

.spawn

Behaves as Concurrent::Actor.spawn but :class is auto-inserted based on receiver so it can be omitted.

.spawn!

behaves as Concurrent::Actor.spawn! but :class is auto-inserted based on receiver so it can be omitted.

.to_spawn_options

Instance Attribute Summary

Instance Method Summary

Actor::RestartingContext - Inherited

Actor::AbstractContext - Inherited

#<<
#ask,
#ask!
#behaviour_definition,
#dead_letter_routing

Defines an actor responsible for dead letters.

#default_executor

override to se different default executor, e.g.

#default_reference_class

override if different class for reference is needed.

#envelope,
#on_event

override to add custom code invocation on internal events like :terminated, :resumed, anError.

#on_message,
#pass

if you want to pass the message to next behaviour, usually Actor::Behaviour::ErrorsOnUnknownMessage

#tell

tell self a message.

#initialize_core, #on_envelope

Actor::InternalDelegations - Included

Concern::Logging - Included

#log

Logs through Concurrent.global_logger, it can be overridden by setting @logger.

Actor::PublicDelegations - Included

Actor::TypeCheck - Included

Constructor Details

.newBroadcast

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/utils/broadcast.rb', line 23

def initialize
  @receivers = Set.new
end

Instance Method Details

#filtered_receivers

override to define different behaviour, filtering etc

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/utils/broadcast.rb', line 46

def filtered_receivers
  @receivers
end

#on_message(message)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/utils/broadcast.rb', line 27

def on_message(message)
  case message
  when :subscribe
    if envelope.sender.is_a? Reference
      @receivers.add envelope.sender
      true
    else
      false
    end
  when :unsubscribe
    !!@receivers.delete(envelope.sender)
  when :subscribed?
    @receivers.include? envelope.sender
  else
    filtered_receivers.each { |r| r << message }
  end
end