123456789_123456789_123456789_123456789_123456789_

Class: Redis::SubscribedClient

Relationships & Source Files
Inherits: Object
Defined in: lib/redis/subscribe.rb

Class Method Summary

Instance Method Summary

Constructor Details

.new(client) ⇒ SubscribedClient

[ GitHub ]

  
# File 'lib/redis/subscribe.rb', line 5

def initialize(client)
  @client = client
  @write_monitor = Monitor.new
end

Instance Method Details

#call_v(command)

[ GitHub ]

  
# File 'lib/redis/subscribe.rb', line 10

def call_v(command)
  @write_monitor.synchronize do
    @client.call_v(command)
  end
end

#close

[ GitHub ]

  
# File 'lib/redis/subscribe.rb', line 52

def close
  @client.close
end

#psubscribe(*channels, &block)

[ GitHub ]

  
# File 'lib/redis/subscribe.rb', line 24

def psubscribe(*channels, &block)
  subscription("psubscribe", "punsubscribe", channels, block)
end

#psubscribe_with_timeout(timeout, *channels, &block)

[ GitHub ]

  
# File 'lib/redis/subscribe.rb', line 28

def psubscribe_with_timeout(timeout, *channels, &block)
  subscription("psubscribe", "punsubscribe", channels, block, timeout)
end

#punsubscribe(*channels)

[ GitHub ]

  
# File 'lib/redis/subscribe.rb', line 44

def punsubscribe(*channels)
  call_v([:punsubscribe, *channels])
end

#ssubscribe(*channels, &block)

[ GitHub ]

  
# File 'lib/redis/subscribe.rb', line 32

def ssubscribe(*channels, &block)
  subscription("ssubscribe", "sunsubscribe", channels, block)
end

#ssubscribe_with_timeout(timeout, *channels, &block)

[ GitHub ]

  
# File 'lib/redis/subscribe.rb', line 36

def ssubscribe_with_timeout(timeout, *channels, &block)
  subscription("ssubscribe", "sunsubscribe", channels, block, timeout)
end

#subscribe(*channels, &block)

[ GitHub ]

  
# File 'lib/redis/subscribe.rb', line 16

def subscribe(*channels, &block)
  subscription("subscribe", "unsubscribe", channels, block)
end

#subscribe_with_timeout(timeout, *channels, &block)

[ GitHub ]

  
# File 'lib/redis/subscribe.rb', line 20

def subscribe_with_timeout(timeout, *channels, &block)
  subscription("subscribe", "unsubscribe", channels, block, timeout)
end

#subscription(start, stop, channels, block, timeout = 0) (protected)

[ GitHub ]

  
# File 'lib/redis/subscribe.rb', line 58

def subscription(start, stop, channels, block, timeout = 0)
  sub = Subscription.new(&block)

  case start
  when "ssubscribe" then channels.each { |c| call_v([start, c]) } # avoid cross-slot keys
  else call_v([start, *channels])
  end

  while event = @client.next_event(timeout)
    if event.is_a?(::RedisClient::CommandError)
      raise Client::ERROR_MAPPING.fetch(event.class), event.message
    end

    type, *rest = event
    if callback = sub.callbacks[type]
      callback.call(*rest)
    end
    break if type == stop && rest.last == 0
  end
  # No need to unsubscribe here. The real client closes the connection
  # whenever an exception is raised (see #ensure_connected).
end

#sunsubscribe(*channels)

[ GitHub ]

  
# File 'lib/redis/subscribe.rb', line 48

def sunsubscribe(*channels)
  call_v([:sunsubscribe, *channels])
end

#unsubscribe(*channels)

[ GitHub ]

  
# File 'lib/redis/subscribe.rb', line 40

def unsubscribe(*channels)
  call_v([:unsubscribe, *channels])
end