123456789_123456789_123456789_123456789_123456789_

Module: EventMachine::Protocols::Memcache

Relationships & Source Files
Namespace Children
Exceptions:
Extension / Inclusion / Inheritance Descendants
Included In:
Super Chains via Extension / Inclusion / Inheritance
Instance Chain:
self, Deferrable
Defined in: lib/em/protocols/memcache.rb

Overview

Implements the Memcache protocol (http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt). Requires memcached >= 1.2.4 w/ noreply support

Usage example

EM.run{ cache = EM::P::Memcache.connect 'localhost', 11211

cache.set :a, 'hello'
cache.set :b, 'hi'
cache.set :c, 'how are you?'
cache.set :d, ''

cache.get(:a){ |v| p v }
cache.get_hash(:a, :b, :c, :d){ |v| p v }
cache.get(:a,:b,:c,:d){ |a,b,c,d| p [a,b,c,d] }

cache.get(:a,:z,:b,:y,:d){ |a,z,b,y,d| p [a,z,b,y,d] }

cache.get(:missing){ |m| p [:missing=, m] }
cache.set(:missing, 'abc'){ p :stored }
cache.get(:missing){ |m| p [:missing=, m] }
cache.del(:missing){ p :deleted }
cache.get(:missing){ |m| p [:missing=, m] }

}

Constant Summary

Deferrable - Included

Pool

Class Method Summary

Instance Method Summary

Deferrable - Included

#callback

Specify a block to be executed if and when the Deferrable object receives a status of :succeeded.

#cancel_callback

Cancels an outstanding callback to &block if any.

#cancel_errback

Cancels an outstanding errback to &block if any.

#cancel_timeout

Cancels an outstanding timeout if any.

#errback

Specify a block to be executed if and when the Deferrable object receives a status of :failed.

#fail

Sugar for set_deferred_status(:failed, ...).

#set_deferred_failure

Alias for Deferrable#fail.

#set_deferred_status

Sets the "disposition" (status) of the Deferrable object.

#set_deferred_success
#succeed

Sugar for set_deferred_status(:succeeded, ...).

#timeout

Setting a timeout on a Deferrable causes it to go into the failed state after the Timeout expires (passing no arguments to the object's errbacks).

Class Method Details

.connect(host = 'localhost', port = 11211)

Connect to a memcached server (must support NOREPLY, memcached >= 1.2.4)

[ GitHub ]

  
# File 'lib/em/protocols/memcache.rb', line 114

def self.connect host = 'localhost', port = 11211
  EM.connect host, port, self, host, port
end

Instance Method Details

#connection_completed

This method is for internal use only.
[ GitHub ]

  
# File 'lib/em/protocols/memcache.rb', line 139

def connection_completed
  @get_cbs = []
  @set_cbs = []
  @del_cbs = []

  @values = {}

  @reconnecting = false
  @connected = true
  succeed
  # set_delimiter "\r\n"
  # set_line_mode
end

#del(key, expires = 0, &cb)

Alias for #delete.

[ GitHub ]

  
# File 'lib/em/protocols/memcache.rb', line 111

alias del delete

#delete(key, expires = 0, &cb) Also known as: #del

Delete the value associated with a key

cache.del :a cache.del(:b){ puts "deleted the value!" }

[ GitHub ]

  
# File 'lib/em/protocols/memcache.rb', line 105

def delete key, expires = 0, &cb
  callback{
    send_data "delete #{key} #{expires}#{cb ? '' : ' noreply'}\r\n"
    @del_cbs << cb if cb
  }
end

#get(*keys)

Get the value associated with one or multiple keys

cache.get(:a){ |v| p v } cache.get(:a,:b,:c,:d){ |a,b,c,d| p [a,b,c,d] }

Raises:

  • (ArgumentError)
[ GitHub ]

  
# File 'lib/em/protocols/memcache.rb', line 61

def get *keys
  raise ArgumentError unless block_given?

  callback{
    keys = keys.map{|k| k.to_s.gsub(/\s/,'_') }
    send_data "get #{keys.join(' ')}\r\n"
    @get_cbs << [keys, proc{ |values|
      yield *keys.map{ |k| values[k] }
    }]
  }
end

#get_hash(*keys)

Gets multiple values as a hash

cache.get_hash(:a, :b, :c, :d){ |h| puts h[:a] }

Raises:

  • (ArgumentError)
[ GitHub ]

  
# File 'lib/em/protocols/memcache.rb', line 92

def get_hash *keys
  raise ArgumentError unless block_given?

  get *keys do |*values|
    yield keys.inject({}){ |hash, k| hash.update k => values[keys.index(k)] }
  end
end

#initialize(host, port = 11211)

This method is for internal use only.
[ GitHub ]

  
# File 'lib/em/protocols/memcache.rb', line 134

def initialize host, port = 11211
  @host, @port = host, port
end

#process_cmd(line)

[ GitHub ]

  
# File 'lib/em/protocols/memcache.rb', line 175

def process_cmd line
  case line.strip
  when /^VALUE\s+(.+?)\s+(\d+)\s+(\d+)/ # VALUE <key> <flags> <bytes>
    bytes = Integer($3)
    # set_binary_mode bytes+2
    # @cur_key = $1
    if @buffer.size >= bytes + 2
      @values[$1] = @buffer.slice!(0,bytes)
      @buffer.slice!(0,2) # \r\n
    else
      raise ParserError
    end

  when Cend # END
    if entry = @get_cbs.shift
      keys, cb = entry
      cb.call(@values)
    end
    @values = {}

  when Cstored # STORED
    if cb = @set_cbs.shift
      cb.call(true)
    end

  when Cdeleted # DELETED
    if cb = @del_cbs.shift
      cb.call(true)
    end

  when Cunknown # NOT_FOUND
    if cb = @del_cbs.shift
      cb.call(false)
    end

  else
    p [:MEMCACHE_UNKNOWN, line]
  end
end

#receive_data(data)

[ GitHub ]

  
# File 'lib/em/protocols/memcache.rb', line 158

def receive_data data
  (@buffer||='') << data

  while index = @buffer.index(Cdelimiter)
    begin
      line = @buffer.slice!(0,index+2)
      process_cmd line
    rescue ParserError
      @buffer[0...0] = line
      break
    end
  end
end

#send_cmd(cmd, key, flags = 0, exptime = 0, bytes = 0, noreply = false) (private)

[ GitHub ]

  
# File 'lib/em/protocols/memcache.rb', line 118

def send_cmd cmd, key, flags = 0, exptime = 0, bytes = 0, noreply = false
  send_data "#{cmd} #{key} #{flags} #{exptime} #{bytes}#{noreply ? ' noreply' : ''}\r\n"
end

#set(key, val, exptime = 0, &cb)

Set the value for a given key

cache.set :a, 'hello' cache.set(:missing, 'abc'){ puts "stored the value!" }

[ GitHub ]

  
# File 'lib/em/protocols/memcache.rb', line 78

def set key, val, exptime = 0, &cb
  callback{
    val = val.to_s
    send_cmd :set, key, 0, exptime, val.respond_to?(:bytesize) ? val.bytesize : val.size, !block_given?
    send_data val
    send_data Cdelimiter
    @set_cbs << cb if cb
  }
end

#unbind

This method is for internal use only.
[ GitHub ]

  
# File 'lib/em/protocols/memcache.rb', line 221

def unbind
  if @connected or @reconnecting
    EM.add_timer(1){ reconnect @host, @port }
    @connected = false
    @reconnecting = true
    @deferred_status = nil
  else
    raise 'Unable to connect to memcached server'
  end
end