123456789_123456789_123456789_123456789_123456789_

Class: ActionCable::SubscriptionAdapter::Redis::Listener

Constant Summary

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(adapter, config_options, executor) ⇒ Listener

[ GitHub ]

  
# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 86

def initialize(adapter, config_options, executor)
  super(executor)

  @adapter = adapter

  @subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
  @subscription_lock = Mutex.new

  @reconnect_attempt = 0
  # Use the same config as used by Redis conn
  @reconnect_attempts = config_options.fetch(:reconnect_attempts, 1)
  @reconnect_attempts = Array.new(@reconnect_attempts, 0) if @reconnect_attempts.is_a?(Integer)

  @subscribed_client = nil

  @when_connected = []

  @thread = nil
end

Instance Attribute Details

#logger (readonly)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 77

delegate :logger, to: :@adapter

#retry_connecting?Boolean (readonly, private)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 207

def retry_connecting?
  @reconnect_attempt += 1

  return false if @reconnect_attempt > @reconnect_attempts.size

  sleep_t = @reconnect_attempts[@reconnect_attempt - 1]

  sleep(sleep_t) if sleep_t > 0

  true
end

Instance Method Details

#add_channel(channel, on_success)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 153

def add_channel(channel, on_success)
  @subscription_lock.synchronize do
    ensure_listener_running
    @subscribe_callbacks[channel] << on_success
    when_connected { @subscribed_client.call("subscribe", channel) }
  end
end

#ensure_listener_running (private)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 168

def ensure_listener_running
  # The internal sentinel subscription keeps the listener alive in
  # normal operation, but the thread can still die for other reasons
  # (e.g. exhausting the Redis reconnect attempts). Since this method
  # memoizes with `||=`, a dead thread would never be replaced and
  # every later subscribe would queue forever. Drop a dead thread so
  # the next subscribe spawns a fresh listener.
  if @thread && !@thread.alive?
    @thread = nil
    @reconnect_attempt = 0
  end

  @thread ||= Thread.new do
    Thread.current.abort_on_exception = true

    begin
      conn = @adapter.redis_connection_for_subscriptions
      listen conn
    rescue RedisClient::ConnectionError => e
      reset
      if retry_connecting?
        logger&.warn "Redis connection failed: #{e.message}. Trying to reconnect..."
        when_connected { resubscribe }
        retry
      else
        logger&.error "Failed to reconnect to Redis after #{@reconnect_attempt} attempts."
      end
    end
  end
end

#listen(conn)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 106

def listen(conn)
  pubsub_client = conn.pubsub

  @reconnect_attempt = 0
  @subscribed_client = pubsub_client

  pubsub_client.call("subscribe", INTERNAL_CHANNEL)

  until @when_connected.empty?
    @when_connected.shift.call
  end

  loop do
    type, chan, message = pubsub_client.next_event(60)
    case type
    when "subscribe", "psubscribe"
      if callbacks = @subscribe_callbacks[chan]
        next_callback = callbacks.shift
        @executor.post(&next_callback) if next_callback
        @subscribe_callbacks.delete(chan) if callbacks.empty?
      end
    when "message", "pmessage"
      broadcast(chan, message)
    when "unsubscribe", "punsubscribe"
      if message == 0
        @subscription_lock.synchronize do
          @subscribed_client = nil
        end
        break
      end
    end
  end
end

#remove_channel(channel)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 161

def remove_channel(channel)
  @subscription_lock.synchronize do
    when_connected { @subscribed_client.call("unsubscribe", channel) }
  end
end

#reset (private)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 226

def reset
  @subscription_lock.synchronize do
    @subscribed_client = nil
    @when_connected.clear
  end
end

#resubscribe (private)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 219

def resubscribe
  channels = @sync.synchronize do
    @subscribers.keys
  end
  @subscribed_client.call("subscribe", *channels) unless channels.empty?
end

#shutdown

[ GitHub ]

  
# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 140

def shutdown
  @subscription_lock.synchronize do
    return if @thread.nil?

    when_connected do
      @subscribed_client.call("unsubscribe")
      @subscribed_client = nil
    end
  end

  Thread.pass while @thread.alive?
end

#when_connected(&block) (private)

[ GitHub ]

  
# File 'actioncable/lib/action_cable/subscription_adapter/redis.rb', line 199

def when_connected(&block)
  if @subscribed_client
    block.call
  else
    @when_connected << block
  end
end