123456789_123456789_123456789_123456789_123456789_

Class: Rinda::TupleSpace

Relationships & Source Files
Super Chains via Extension / Inclusion / Inheritance
Instance Chain:
self, MonitorMixin, DRbUndumped
Inherits: Object
Defined in: lib/rinda/tuplespace.rb

Overview

The Tuplespace manages access to the tuples it contains, ensuring mutual exclusion requirements are met.

The sec option for the write, take, move, read and notify methods may either be a number of seconds or a Renewer object.

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(period = 60) ⇒ TupleSpace

Creates a new TupleSpace. period is used to control how often to look for dead tuples after modifications to the TupleSpace.

If no dead tuples are found period seconds after the last modification, the TupleSpace will stop looking for dead tuples.

[ GitHub ]

  
# File 'lib/rinda/tuplespace.rb', line 437

def initialize(period=60)
  super()
  @bag = TupleBag.new
  @read_waiter = TupleBag.new
  @take_waiter = TupleBag.new
  @notify_waiter = TupleBag.new
  @period = period
  @keeper = nil
end

Instance Attribute Details

#need_keeper?Boolean (readonly, private)

Checks the tuplespace to see if it needs cleaning.

[ GitHub ]

  
# File 'lib/rinda/tuplespace.rb', line 631

def need_keeper?
  return true if @bag.has_expires?
  return true if @read_waiter.has_expires?
  return true if @take_waiter.has_expires?
  return true if @notify_waiter.has_expires?
end

Instance Method Details

#create_entry(tuple, sec) (private)

[ GitHub ]

  
# File 'lib/rinda/tuplespace.rb', line 577

def create_entry(tuple, sec)
  TupleEntry.new(tuple, sec)
end

#keep_clean (private)

Removes dead tuples.

[ GitHub ]

  
# File 'lib/rinda/tuplespace.rb', line 584

def keep_clean
  synchronize do
    @read_waiter.delete_unless_alive.each do |e|
      e.signal
    end
    @take_waiter.delete_unless_alive.each do |e|
      e.signal
    end
    @notify_waiter.delete_unless_alive.each do |e|
      e.notify(['close'])
    end
    @bag.delete_unless_alive.each do |e|
      notify_event('delete', e.value)
    end
  end
end

#move(port, tuple, sec = nil) {|template| ... }

Moves tuple to port.

Yields:

  • (template)
[ GitHub ]

  
# File 'lib/rinda/tuplespace.rb', line 484

def move(port, tuple, sec=nil)
  template = WaitTemplateEntry.new(self, tuple, sec)
  yield(template) if block_given?
  synchronize do
    entry = @bag.find(template)
    if entry
      port.push(entry.value) if port
      @bag.delete(entry)
      notify_event('take', entry.value)
      return port ? nil : entry.value
    end
    raise RequestExpiredError if template.expired?

    begin
      @take_waiter.push(template)
      start_keeper if template.expires
      while true
        raise RequestCanceledError if template.canceled?
        raise RequestExpiredError if template.expired?
        entry = @bag.find(template)
        if entry
          port.push(entry.value) if port
          @bag.delete(entry)
          notify_event('take', entry.value)
          return port ? nil : entry.value
        end
        template.wait
      end
    ensure
      @take_waiter.delete(template)
    end
  end
end

#notify(event, tuple, sec = nil)

Registers for notifications of event. Returns a NotifyTemplateEntry. See NotifyTemplateEntry for examples of how to listen for notifications.

event can be:

'write'

A tuple was added

'take'

A tuple was taken or moved

'delete'

A tuple was lost after being overwritten or expiring

The TupleSpace will also notify you of the 'close' event when the NotifyTemplateEntry has expired.

[ GitHub ]

  
# File 'lib/rinda/tuplespace.rb', line 567

def notify(event, tuple, sec=nil)
  template = NotifyTemplateEntry.new(self, event, tuple, sec)
  synchronize do
    @notify_waiter.push(template)
  end
  template
end

#notify_event(event, tuple) (private)

Notifies all registered listeners for event of a status change of tuple.

[ GitHub ]

  
# File 'lib/rinda/tuplespace.rb', line 605

def notify_event(event, tuple)
  ev = [event, tuple]
  @notify_waiter.find_all_template(ev).each do |template|
    template.notify(ev)
  end
end

#read(tuple, sec = nil) {|template| ... }

Reads tuple, but does not remove it.

Yields:

  • (template)
[ GitHub ]

  
# File 'lib/rinda/tuplespace.rb', line 521

def read(tuple, sec=nil)
  template = WaitTemplateEntry.new(self, tuple, sec)
  yield(template) if block_given?
  synchronize do
    entry = @bag.find(template)
    return entry.value if entry
    raise RequestExpiredError if template.expired?

    begin
      @read_waiter.push(template)
      start_keeper if template.expires
      template.wait
      raise RequestCanceledError if template.canceled?
      raise RequestExpiredError if template.expired?
      return template.found
    ensure
      @read_waiter.delete(template)
    end
  end
end

#read_all(tuple)

Returns all tuples matching tuple. Does not remove the found tuples.

[ GitHub ]

  
# File 'lib/rinda/tuplespace.rb', line 545

def read_all(tuple)
  template = WaitTemplateEntry.new(self, tuple, nil)
  synchronize do
    entry = @bag.find_all(template)
    entry.collect do |e|
      e.value
    end
  end
end

#start_keeper (private)

Creates a thread that scans the tuplespace for expired tuples.

[ GitHub ]

  
# File 'lib/rinda/tuplespace.rb', line 615

def start_keeper
  return if @keeper && @keeper.alive?
  @keeper = Thread.new do
    while true
      sleep(@period)
      synchronize do
        break unless need_keeper?
        keep_clean
      end
    end
  end
end

#take(tuple, sec = nil, &block)

Removes tuple

[ GitHub ]

  
# File 'lib/rinda/tuplespace.rb', line 477

def take(tuple, sec=nil, &block)
  move(nil, tuple, sec, &block)
end

#write(tuple, sec = nil)

Adds tuple

[ GitHub ]

  
# File 'lib/rinda/tuplespace.rb', line 450

def write(tuple, sec=nil)
  entry = create_entry(tuple, sec)
  synchronize do
    if entry.expired?
      @read_waiter.find_all_template(entry).each do |template|
        template.read(tuple)
      end
      notify_event('write', entry.value)
      notify_event('delete', entry.value)
    else
      @bag.push(entry)
      start_keeper if entry.expires
      @read_waiter.find_all_template(entry).each do |template|
        template.read(tuple)
      end
      @take_waiter.find_all_template(entry).each do |template|
        template.signal
      end
      notify_event('write', entry.value)
    end
  end
  entry
end