Class: ActionCable::SubscriptionAdapter::Redis::Listener
| Relationships & Source Files | |
| Super Chains via Extension / Inclusion / Inheritance | |
|
Class Chain:
|
|
|
Instance Chain:
|
|
| Inherits: |
ActionCable::SubscriptionAdapter::SubscriberMap::Async
|
| Defined in: | actioncable/lib/action_cable/subscription_adapter/redis.rb |
Constant Summary
-
INTERNAL_CHANNEL =
# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 84
A permanent internal subscription keeps the
::ActionCable::SubscriptionAdapter::Redissubscription count above zero while any user channel comes and goes, so removing the last user channel doesn't end the listen loop. Only an explicit shutdown (which unsubscribes from everything) drives the count to zero and stops the listener."_action_cable_internal"
Class Method Summary
::ActionCable::SubscriptionAdapter::SubscriberMap::Async - Inherited
::ActionCable::SubscriptionAdapter::SubscriberMap - Inherited
Instance Attribute Summary
- #logger readonly
- #retry_connecting? ⇒ Boolean readonly private
Instance Method Summary
- #add_channel(channel, on_success)
- #listen(conn)
- #remove_channel(channel)
- #shutdown
- #ensure_listener_running private
- #reset private
- #resubscribe private
- #when_connected(&block) private
::ActionCable::SubscriptionAdapter::SubscriberMap::Async - Inherited
::ActionCable::SubscriptionAdapter::SubscriberMap - Inherited
Constructor Details
.new(adapter, config_options, executor) ⇒ Listener
# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 86
def initialize(adapter, , executor) super(executor) @adapter = adapter @subscribe_callbacks = Hash.new { |h, k| h[k] = [] } @subscription_lock = Mutex.new @reconnect_attempt = 0 # Use the same config as used by Redis conn @reconnect_attempts = .fetch(:reconnect_attempts, 1) @reconnect_attempts = Array.new(@reconnect_attempts, 0) if @reconnect_attempts.is_a?(Integer) @subscribed_client = nil @when_connected = [] @thread = nil end
Instance Attribute Details
#logger (readonly)
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 77
delegate :logger, to: :@adapter
#retry_connecting? ⇒ Boolean (readonly, private)
[ GitHub ]
# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 207
def retry_connecting? @reconnect_attempt += 1 return false if @reconnect_attempt > @reconnect_attempts.size sleep_t = @reconnect_attempts[@reconnect_attempt - 1] sleep(sleep_t) if sleep_t > 0 true end
Instance Method Details
#add_channel(channel, on_success)
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 153
def add_channel(channel, on_success) @subscription_lock.synchronize do ensure_listener_running @subscribe_callbacks[channel] << on_success when_connected { @subscribed_client.call("subscribe", channel) } end end
#ensure_listener_running (private)
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 168
def ensure_listener_running # The internal sentinel subscription keeps the listener alive in # normal operation, but the thread can still die for other reasons # (e.g. exhausting the Redis reconnect attempts). Since this method # memoizes with `||=`, a dead thread would never be replaced and # every later subscribe would queue forever. Drop a dead thread so # the next subscribe spawns a fresh listener. if @thread && !@thread.alive? @thread = nil @reconnect_attempt = 0 end @thread ||= Thread.new do Thread.current.abort_on_exception = true begin conn = @adapter.redis_connection_for_subscriptions listen conn rescue RedisClient::ConnectionError => e reset if retry_connecting? logger&.warn "Redis connection failed: #{e.}. Trying to reconnect..." when_connected { resubscribe } retry else logger&.error "Failed to reconnect to Redis after #{@reconnect_attempt} attempts." end end end end
#listen(conn)
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 106
def listen(conn) pubsub_client = conn.pubsub @reconnect_attempt = 0 @subscribed_client = pubsub_client pubsub_client.call("subscribe", INTERNAL_CHANNEL) until @when_connected.empty? @when_connected.shift.call end loop do type, chan, = pubsub_client.next_event(60) case type when "subscribe", "psubscribe" if callbacks = @subscribe_callbacks[chan] next_callback = callbacks.shift @executor.post(&next_callback) if next_callback @subscribe_callbacks.delete(chan) if callbacks.empty? end when "message", "pmessage" broadcast(chan, ) when "unsubscribe", "punsubscribe" if == 0 @subscription_lock.synchronize do @subscribed_client = nil end break end end end end
#remove_channel(channel)
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 161
def remove_channel(channel) @subscription_lock.synchronize do when_connected { @subscribed_client.call("unsubscribe", channel) } end end
#reset (private)
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 226
def reset @subscription_lock.synchronize do @subscribed_client = nil @when_connected.clear end end
#resubscribe (private)
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 219
def resubscribe channels = @sync.synchronize do @subscribers.keys end @subscribed_client.call("subscribe", *channels) unless channels.empty? end
#shutdown
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 140
def shutdown @subscription_lock.synchronize do return if @thread.nil? when_connected do @subscribed_client.call("unsubscribe") @subscribed_client = nil end end Thread.pass while @thread.alive? end
#when_connected(&block) (private)
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 199
def when_connected(&block) if @subscribed_client block.call else @when_connected << block end end