123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::Actor::Behaviour::Buffer

Relationships & Source Files
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
self, Abstract
Instance Chain:
Inherits: Concurrent::Actor::Behaviour::Abstract
Defined in: lib/concurrent-ruby-edge/concurrent/actor/behaviour/buffer.rb

Overview

Any message reaching this behaviour is buffered. Only one message is is scheduled at any given time. Others are kept in buffer until another one can be scheduled. This effectively means that messages handled by behaviours before buffer have higher priority and they can be processed before messages arriving into buffer. This allows for the processing of internal actor messages like (‘:link`, :supervise) first.

Constant Summary

Concern::Logging - Included

SEV_LABEL

Class Method Summary

Instance Attribute Summary

  • #process_envelopes? ⇒ Boolean readonly

    Ensures that only one envelope processing is scheduled with #schedule_execution, this allows other scheduled blocks to be executed before next envelope processing.

Abstract - Inherited

Actor::InternalDelegations - Included

Instance Method Summary

Constructor Details

.new(core, subsequent, core_options) ⇒ Buffer

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour/buffer.rb', line 14

def initialize(core, subsequent, core_options)
  super core, subsequent, core_options
  @buffer                     = []
  @receive_envelope_scheduled = false
end

Instance Attribute Details

#process_envelopes?Boolean (readonly)

Ensures that only one envelope processing is scheduled with #schedule_execution, this allows other scheduled blocks to be executed before next envelope processing. Simply put this ensures that Actor::Core is still responsive to internal calls (like add_child) even though the Actor is flooded with messages.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour/buffer.rb', line 30

def process_envelopes?
  unless @buffer.empty? || @receive_envelope_scheduled
    @receive_envelope_scheduled = true
    process_envelope
  end
end

Instance Method Details

#on_envelope(envelope)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour/buffer.rb', line 20

def on_envelope(envelope)
  @buffer.push envelope
  process_envelopes?
  MESSAGE_PROCESSED
end

#on_event(public, event)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour/buffer.rb', line 46

def on_event(public, event)
  event_name, _ = event
  case event_name
  when :terminated, :restarted
    @buffer.each { |envelope| reject_envelope envelope }
    @buffer.clear
  end
  super public, event_name
end

#process_envelope

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour/buffer.rb', line 37

def process_envelope
  envelope = @buffer.shift
  return nil unless envelope
  pass envelope
ensure
  @receive_envelope_scheduled = false
  core.schedule_execution { process_envelopes? }
end