123456789_123456789_123456789_123456789_123456789_

Class: Rinda::RingServer

Relationships & Source Files
Super Chains via Extension / Inclusion / Inheritance
Instance Chain:
self, DRbUndumped
Inherits: Object
Defined in: lib/rinda/ring.rb

Overview

A RingServer allows a TupleSpace to be located via UDP broadcasts. Default service location uses the following steps:

  1. A RingServer begins listening on the network broadcast UDP address.

  2. A RingFinger sends a UDP packet containing the DRb URI where it will listen for a reply.

  3. The RingServer receives the UDP packet and connects back to the provided DRb URI with the DRb service.

A RingServer requires a TupleSpace:

ts = Rinda::TupleSpace.new
rs = Rinda::RingServer.new

RingServer can also listen on multicast addresses for announcements. This allows multiple RingServers to run on the same host. To use network broadcast and multicast:

ts = Rinda::TupleSpace.new
rs = Rinda::RingServer.new ts, %w[Socket::INADDR_ANY, 239.0.0.1 ff02::1]

Class Method Summary

Instance Method Summary

Constructor Details

.new(ts, addresses = [Socket::INADDR_ANY], port = Ring_PORT) ⇒ RingServer

Advertises ts on the given addresses at port.

If addresses is omitted only the UDP broadcast address is used.

addresses can contain multiple addresses. If a multicast address is given in addresses then the RingServer will listen for multicast queries.

If you use IPv4 multicast you may need to set an address of the inbound interface which joins a multicast group.

ts = Rinda::TupleSpace.new
rs = Rinda::RingServer.new(ts, [['239.0.0.1', '9.5.1.1']])

You can set addresses as an Array Object. The first element of the Array is a multicast address and the second is an inbound interface address. If the second is omitted then '0.0.0.0' is used.

If you use IPv6 multicast you may need to set both the local interface address and the inbound interface index:

rs = Rinda::RingServer.new(ts, [['ff02::1', '::1', 1]])

The first element is a multicast address and the second is an inbound interface address. The third is an inbound interface index.

At this time there is no easy way to get an interface index by name.

If the second is omitted then '::1' is used. If the third is omitted then 0 (default interface) is used.

[ GitHub ]

  
# File 'lib/rinda/ring.rb', line 94

def initialize(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT)
  @port = port

  if Integer === addresses then
    addresses, @port = [Socket::INADDR_ANY], addresses
  end

  @renewer = Renewer.new

  @ts = ts
  @sockets = []
  addresses.each do |address|
    if Array === address
      make_socket(*address)
    else
      make_socket(address)
    end
  end

  @w_services = write_services
  @r_service  = reply_service
end

Instance Method Details

#do_reply

Pulls lookup tuples out of the TupleSpace and sends their DRb object the address of the local TupleSpace.

[ GitHub ]

  
# File 'lib/rinda/ring.rb', line 214

def do_reply
  tuple = @ts.take([:lookup_ring, nil], @renewer)
  Thread.new { tuple[1].call(@ts) rescue nil}
rescue
end

#do_write(msg)

Extracts the response URI from msg and adds it to TupleSpace where it will be picked up by #reply_service for notification.

[ GitHub ]

  
# File 'lib/rinda/ring.rb', line 189

def do_write(msg)
  Thread.new do
    begin
      tuple, sec = Marshal.load(msg)
      @ts.write(tuple, sec)
    rescue
    end
  end
end

#make_socket(address, interface_address = nil, multicast_interface = 0)

Creates a socket at address

If address is multicast address then interface_address and multicast_interface can be set as optional.

A created socket is bound to interface_address. If you use IPv4 multicast then the interface of interface_address is used as the inbound interface. If interface_address is omitted or nil then '0.0.0.0' or '::1' is used.

If you use IPv6 multicast then multicast_interface is used as the inbound interface. multicast_interface is a network interface index. If multicast_interface is omitted then 0 (default interface) is used.

[ GitHub ]

  
# File 'lib/rinda/ring.rb', line 132

def make_socket(address, interface_address=nil, multicast_interface=0)
  addrinfo = Addrinfo.udp(address, @port)

  socket = Socket.new(addrinfo.pfamily, addrinfo.socktype,
                      addrinfo.protocol)
  @sockets << socket

  if addrinfo.ipv4_multicast? or addrinfo.ipv6_multicast? then
    if Socket.const_defined?(:SO_REUSEPORT) then
      socket.setsockopt(:SOCKET, :SO_REUSEPORT, true)
    else
      socket.setsockopt(:SOCKET, :SO_REUSEADDR, true)
    end

    if addrinfo.ipv4_multicast? then
      interface_address = '0.0.0.0' if interface_address.nil?
      socket.bind(Addrinfo.udp(interface_address, @port))

      mreq = IPAddr.new(addrinfo.ip_address).hton +
        IPAddr.new(interface_address).hton

      socket.setsockopt(:IPPROTO_IP, :IP_ADD_MEMBERSHIP, mreq)
    else
      interface_address = '::1' if interface_address.nil?
      socket.bind(Addrinfo.udp(interface_address, @port))

      mreq = IPAddr.new(addrinfo.ip_address).hton +
        [multicast_interface].pack('I')

      socket.setsockopt(:IPPROTO_IPV6, :IPV6_JOIN_GROUP, mreq)
    end
  else
    socket.bind(addrinfo)
  end

  socket
end

#reply_service

Creates a thread that notifies waiting clients from the TupleSpace.

[ GitHub ]

  
# File 'lib/rinda/ring.rb', line 202

def reply_service
  Thread.new do
    loop do
      do_reply
    end
  end
end

#shutdown

Shuts down the RingServer

[ GitHub ]

  
# File 'lib/rinda/ring.rb', line 223

def shutdown
  @renewer.renew = false

  @w_services.each do |thread|
    thread.kill
    thread.join
  end

  @sockets.each do |socket|
    socket.close
  end

  @r_service.kill
  @r_service.join
end

#write_services

Creates threads that pick up UDP packets and passes them to do_write for decoding.

[ GitHub ]

  
# File 'lib/rinda/ring.rb', line 174

def write_services
  @sockets.map do |s|
    Thread.new(s) do |socket|
      loop do
        msg = socket.recv(1024)
        do_write(msg)
      end
    end
  end
end