123456789_123456789_123456789_123456789_123456789_

Class: EventMachine::Channel

Relationships & Source Files
Inherits: Object
Defined in: lib/em/channel.rb

Overview

Provides a simple thread-safe way to transfer data between (typically) long running tasks in defer and event loop thread.

Examples:

channel = EventMachine::Channel.new
sid     = channel.subscribe { |msg| p [:got, msg] }

channel.push('hello world')
channel.unsubscribe(sid)

Class Method Summary

Instance Method Summary

Constructor Details

.newChannel

[ GitHub ]

  
# File 'lib/em/channel.rb', line 15

def initialize
  @subs = {}
  @uid  = 0
end

Instance Method Details

#<<(*items)

Alias for #push.

[ GitHub ]

  
# File 'lib/em/channel.rb', line 50

alias << push

#gen_id (private)

This method is for internal use only.
[ GitHub ]

  
# File 'lib/em/channel.rb', line 65

def gen_id
  @uid += 1
end

#num_subscribers

Return the number of current subscribers.

[ GitHub ]

  
# File 'lib/em/channel.rb', line 21

def num_subscribers
  return @subs.size
end

#pop(*a, &b)

Fetches one message from the channel.

[ GitHub ]

  
# File 'lib/em/channel.rb', line 53

def pop(*a, &b)
  EM.schedule {
    name = subscribe do |*args|
      unsubscribe(name)
      EM::Callback(*a, &b).call(*args)
    end
  }
end

#push(*items) Also known as: #<<

Add items to the channel, which are pushed out to all subscribers.

[ GitHub ]

  
# File 'lib/em/channel.rb', line 46

def push(*items)
  items = items.dup
  EM.schedule { items.each { |i| @subs.values.each { |s| s.call i } } }
end

#subscribe(*a, &b) ⇒ Integer

Takes any arguments suitable for EM::Callback() and returns a subscriber id for use when unsubscribing.

Returns:

  • (Integer)

    Subscribe identifier

See Also:

[ GitHub ]

  
# File 'lib/em/channel.rb', line 30

def subscribe(*a, &b)
  name = gen_id
  EM.schedule { @subs[name] = EM::Callback(*a, &b) }

  name
end

#unsubscribe(name)

Removes subscriber from the list.

Parameters:

  • Subscriber (Integer)

    identi

See Also:

[ GitHub ]

  
# File 'lib/em/channel.rb', line 41

def unsubscribe(name)
  EM.schedule { @subs.delete name }
end