Class: ActionCable::SubscriptionAdapter::Redis::Listener
Relationships & Source Files | |
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
|
|
Instance Chain:
|
|
Inherits: |
ActionCable::SubscriptionAdapter::SubscriberMap
|
Defined in: | actioncable/lib/action_cable/subscription_adapter/redis.rb |
Class Method Summary
- .new(adapter, event_loop) ⇒ Listener constructor
::ActionCable::SubscriptionAdapter::SubscriberMap
- Inherited
Instance Method Summary
Constructor Details
.new(adapter, event_loop) ⇒ Listener
# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 63
def initialize(adapter, event_loop) super() @adapter = adapter @event_loop = event_loop @subscribe_callbacks = Hash.new { |h, k| h[k] = [] } @subscription_lock = Mutex.new @raw_client = nil @when_connected = [] @thread = nil end
Instance Method Details
#add_channel(channel, on_success)
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 130
def add_channel(channel, on_success) @subscription_lock.synchronize do ensure_listener_running @subscribe_callbacks[channel] << on_success when_connected { send_command("subscribe", channel) } end end
#invoke_callback
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 144
def invoke_callback(*) @event_loop.post { super } end
#listen(conn)
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 79
def listen(conn) conn.without_reconnect do original_client = conn.respond_to?(:_client) ? conn._client : conn.client conn.subscribe("_action_cable_internal") do |on| on.subscribe do |chan, count| @subscription_lock.synchronize do if count == 1 @raw_client = original_client until @when_connected.empty? @when_connected.shift.call end end if callbacks = @subscribe_callbacks[chan] next_callback = callbacks.shift @event_loop.post(&next_callback) if next_callback @subscribe_callbacks.delete(chan) if callbacks.empty? end end end on. do |chan, | broadcast(chan, ) end on.unsubscribe do |chan, count| if count == 0 @subscription_lock.synchronize do @raw_client = nil end end end end end end
#remove_channel(channel)
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 138
def remove_channel(channel) @subscription_lock.synchronize do when_connected { send_command("unsubscribe", channel) } end end
#shutdown
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 117
def shutdown @subscription_lock.synchronize do return if @thread.nil? when_connected do send_command("unsubscribe") @raw_client = nil end end Thread.pass while @thread.alive? end