Class: ActionCable::SubscriptionAdapter::Redis::Listener
| Relationships & Source Files | |
| Super Chains via Extension / Inclusion / Inheritance | |
|
Class Chain:
|
|
|
Instance Chain:
|
|
| Inherits: |
ActionCable::SubscriptionAdapter::SubscriberMap::Async
|
| Defined in: | actioncable/lib/action_cable/subscription_adapter/redis.rb |
Class Method Summary
::ActionCable::SubscriptionAdapter::SubscriberMap::Async - Inherited
::ActionCable::SubscriptionAdapter::SubscriberMap - Inherited
Instance Attribute Summary
- #logger readonly
- #retry_connecting? ⇒ Boolean readonly private
Instance Method Summary
- #add_channel(channel, on_success)
- #listen(conn)
- #remove_channel(channel)
- #shutdown
- #ensure_listener_running private
- #reset private
- #resubscribe private
- #when_connected(&block) private
::ActionCable::SubscriptionAdapter::SubscriberMap::Async - Inherited
::ActionCable::SubscriptionAdapter::SubscriberMap - Inherited
Constructor Details
.new(adapter, config_options, executor) ⇒ Listener
# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 79
def initialize(adapter, , executor) super(executor) @adapter = adapter @subscribe_callbacks = Hash.new { |h, k| h[k] = [] } @subscription_lock = Mutex.new @reconnect_attempt = 0 # Use the same config as used by Redis conn @reconnect_attempts = .fetch(:reconnect_attempts, 1) @reconnect_attempts = Array.new(@reconnect_attempts, 0) if @reconnect_attempts.is_a?(Integer) @subscribed_client = nil @when_connected = [] @thread = nil end
Instance Attribute Details
#logger (readonly)
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 77
delegate :logger, to: :@adapter
#retry_connecting? ⇒ Boolean (readonly, private)
[ GitHub ]
# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 187
def retry_connecting? @reconnect_attempt += 1 return false if @reconnect_attempt > @reconnect_attempts.size sleep_t = @reconnect_attempts[@reconnect_attempt - 1] sleep(sleep_t) if sleep_t > 0 true end
Instance Method Details
#add_channel(channel, on_success)
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 144
def add_channel(channel, on_success) @subscription_lock.synchronize do ensure_listener_running @subscribe_callbacks[channel] << on_success when_connected { @subscribed_client.call("subscribe", channel) } end end
#ensure_listener_running (private)
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 159
def ensure_listener_running @thread ||= Thread.new do Thread.current.abort_on_exception = true begin conn = @adapter.redis_connection_for_subscriptions listen conn rescue RedisClient::ConnectionError => e reset if retry_connecting? logger&.warn "Redis connection failed: #{e.}. Trying to reconnect..." when_connected { resubscribe } retry else logger&.error "Failed to reconnect to Redis after #{@reconnect_attempt} attempts." end end end end
#listen(conn)
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 99
def listen(conn) pubsub_client = conn.pubsub @reconnect_attempt = 0 @subscribed_client = pubsub_client until @when_connected.empty? @when_connected.shift.call end loop do type, chan, = pubsub_client.next_event(60) case type when "subscribe", "psubscribe" if callbacks = @subscribe_callbacks[chan] next_callback = callbacks.shift @executor.post(&next_callback) if next_callback @subscribe_callbacks.delete(chan) if callbacks.empty? end when "message", "pmessage" broadcast(chan, ) when "unsubscribe", "punsubscribe" if == 0 @subscription_lock.synchronize do @subscribed_client = nil end break end end end end
#remove_channel(channel)
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 152
def remove_channel(channel) @subscription_lock.synchronize do when_connected { @subscribed_client.call("unsubscribe", channel) } end end
#reset (private)
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 206
def reset @subscription_lock.synchronize do @subscribed_client = nil @subscribe_callbacks.clear @when_connected.clear end end
#resubscribe (private)
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 199
def resubscribe channels = @sync.synchronize do @subscribers.keys end @subscribed_client.call("subscribe", *channels) unless channels.empty? end
#shutdown
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 131
def shutdown @subscription_lock.synchronize do return if @thread.nil? when_connected do @subscribed_client.call("unsubscribe") @subscribed_client = nil end end Thread.pass while @thread.alive? end
#when_connected(&block) (private)
[ GitHub ]# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 179
def when_connected(&block) if @subscribed_client block.call else @when_connected << block end end