Class: Concurrent::Actor::Behaviour::Buffer
Relationships & Source Files | |
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
self,
Abstract
|
|
Instance Chain:
|
|
Inherits: |
Concurrent::Actor::Behaviour::Abstract
|
Defined in: | lib/concurrent-ruby-edge/concurrent/actor/behaviour/buffer.rb |
Overview
Any message reaching this behaviour is buffered. Only one message is is scheduled at any given time. Others are kept in buffer until another one can be scheduled. This effectively means that messages handled by behaviours before buffer have higher priority and they can be processed before messages arriving into buffer. This allows for the processing of internal actor messages like (‘:link`, :supervise
) first.
Constant Summary
Concern::Logging
- Included
Class Method Summary
Instance Attribute Summary
-
#process_envelopes? ⇒ Boolean
readonly
Ensures that only one envelope processing is scheduled with
#schedule_execution
, this allows other scheduled blocks to be executed before next envelope processing.
Abstract
- Inherited
Actor::InternalDelegations
- Included
Instance Method Summary
Abstract
- Inherited
#broadcast | broadcasts event to all behaviours and context. |
#on_envelope | override to add extra behaviour. |
#on_event | override to add extra behaviour. |
#pass, #reject_envelope |
Actor::InternalDelegations
- Included
#behaviour | see Core#behaviour |
#behaviour! | see Core#behaviour! |
#children, #context, #dead_letter_routing, | |
#log | delegates to core.log. |
#redirect, #terminate! |
Concern::Logging
- Included
#log | Logs through Concurrent.global_logger, it can be overridden by setting @logger. |
Actor::PublicDelegations
- Included
#actor_class | Alias for PublicDelegations#context_class. |
#context_class, #executor, #name, #parent, #path, | |
#ref | Alias for PublicDelegations#reference. |
#reference |
Actor::TypeCheck
- Included
Constructor Details
.new(core, subsequent, core_options) ⇒ Buffer
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour/buffer.rb', line 14
def initialize(core, subsequent, ) super core, subsequent, @buffer = [] @receive_envelope_scheduled = false end
Instance Attribute Details
#process_envelopes? ⇒ Boolean
(readonly)
Ensures that only one envelope processing is scheduled with #schedule_execution
, this allows other scheduled blocks to be executed before next envelope processing. Simply put this ensures that Actor::Core
is still responsive to internal calls (like add_child) even though the Actor
is flooded with messages.
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour/buffer.rb', line 30
def process_envelopes? unless @buffer.empty? || @receive_envelope_scheduled @receive_envelope_scheduled = true process_envelope end end
Instance Method Details
#on_envelope(envelope)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour/buffer.rb', line 20
def on_envelope(envelope) @buffer.push envelope process_envelopes? MESSAGE_PROCESSED end
#on_event(public, event)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour/buffer.rb', line 46
def on_event(public, event) event_name, _ = event case event_name when :terminated, :restarted @buffer.each { |envelope| reject_envelope envelope } @buffer.clear end super public, event_name end
#process_envelope
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour/buffer.rb', line 37
def process_envelope envelope = @buffer.shift return nil unless envelope pass envelope ensure @receive_envelope_scheduled = false core.schedule_execution { process_envelopes? } end