Class: EventMachine::Iterator
Relationships & Source Files | |
Inherits: | Object |
Defined in: | lib/em/iterator.rb |
Overview
A simple iterator for concurrent asynchronous work.
Unlike ruby's built-in iterators, the end of the current iteration cycle is signaled manually, instead of happening automatically after the yielded block finishes executing. For example:
(0..10).each{ |num| }
becomes:
EM::Iterator.new(0..10).each{ |num,iter| iter.next }
This is especially useful when doing asynchronous work via reactor libraries and functions. For example, given a sync and async http api:
response = sync_http_get(url); ... async_http_get(url){ |response| ... }
a synchronous iterator such as:
responses = urls.map{ |url| sync_http_get(url) } ... puts 'all done!'
could be written as:
EM::Iterator.new(urls).map(proc{ |url,iter| async_http_get(url){ |res| iter.return(res) } }, proc{ |responses| ... puts 'all done!' })
Now, you can take advantage of the asynchronous api to issue requests in parallel. For example, to fetch 10 urls at a time, simply pass in a concurrency of 10:
EM::Iterator.new(urls, 10).each do |url,iter| async_http_get(url){ iter.next } end
Constant Summary
-
Stop =
# File 'lib/em/iterator.rb', line 44"EM::Stop"
Class Method Summary
-
.new(list, concurrency = 1) ⇒ Iterator
constructor
Create a new parallel async iterator with specified concurrency.
Instance Attribute Summary
- #concurrency rw
-
#concurrency=(val)
rw
Change the concurrency of this iterator.
Instance Method Summary
-
#each(foreach = nil, after = nil, &blk)
Iterate over a set of items using the specified block or proc.
-
#inject(obj, foreach, after)
Inject the results of an asynchronous iteration onto a given object.
-
#map(foreach, after)
Collect the results of an asynchronous iteration into an array.
-
#next_item
private
Return the next item from @list or @list_proc.
-
#spawn_workers
private
Spawn workers to consume items from the iterator's enumerator based on the current concurrency level.
Constructor Details
.new(list, concurrency = 1) ⇒ Iterator
Create a new parallel async iterator with specified concurrency.
i = EM::Iterator.new(1..100, 10)
will create an iterator over the range that processes 10 items at a time. Iteration is started via #each, #map or #inject
The list may either be an array-like object, or a proc that returns a new object to be processed each time it is called. If a proc is used, it must return EventMachine::Iterator::Stop to signal the end of the iterations.
# File 'lib/em/iterator.rb', line 56
def initialize(list, concurrency = 1) raise ArgumentError, 'concurrency must be bigger than zero' unless (concurrency > 0) if list.respond_to?(:call) @list = nil @list_proc = list elsif list.respond_to?(:to_a) @list = list.to_a.dup @list_proc = nil else raise ArgumentError, 'argument must be a proc or an array' end @concurrency = concurrency @started = false @ended = false end
Instance Attribute Details
#concurrency (rw)
[ GitHub ]# File 'lib/em/iterator.rb', line 82
attr_reader :concurrency
#concurrency=(val) (rw)
Change the concurrency of this iterator. Workers will automatically be spawned or destroyed to accomodate the new concurrency level.
# File 'lib/em/iterator.rb', line 76
def concurrency=(val) old = @concurrency @concurrency = val spawn_workers if val > old and @started and !@ended end
Instance Method Details
#each(foreach = nil, after = nil, &blk)
Iterate over a set of items using the specified block or proc.
EM::Iterator.new(1..100).each do |num, iter| puts num iter.next end
An optional second proc is invoked after the iteration is complete.
EM::Iterator.new(1..100).each( proc{ |num,iter| iter.next }, proc{ puts 'all done' } )
# File 'lib/em/iterator.rb', line 98
def each(foreach=nil, after=nil, &blk) raise ArgumentError, 'proc or block required for iteration' unless foreach ||= blk raise RuntimeError, 'cannot iterate over an iterator more than once' if @started or @ended @started = true @pending = 0 @workers = 0 all_done = proc{ after.call if after and @ended and @pending == 0 } @process_next = proc{ # p [:process_next, :pending=, @pending, :workers=, @workers, :ended=, @ended, :concurrency=, @concurrency, :list=, @list] unless @ended or @workers > @concurrency item = next_item() if item.equal?(Stop) @ended = true @workers -= 1 all_done.call else @pending += 1 is_done = false on_done = proc{ raise RuntimeError, 'already completed this iteration' if is_done is_done = true @pending -= 1 if @ended all_done.call else EM.next_tick(@process_next) end } class << on_done alias :next :call end foreach.call(item, on_done) end else @workers -= 1 end } spawn_workers self end
#inject(obj, foreach, after)
Inject the results of an asynchronous iteration onto a given object.
EM::Iterator.new(%w[ pwd uptime uname date ], 2).inject({}, proc{ |hash,cmd,iter| EM.system(cmd){ |output,status| hash[cmd] = status.exitstatus == 0 ? output.strip : nil iter.return(hash) } }, proc{ |results| p results })
# File 'lib/em/iterator.rb', line 199
def inject(obj, foreach, after) each(proc{ |item,iter| is_done = false on_done = proc{ |res| raise RuntimeError, 'already returned a value for this iteration' if is_done is_done = true obj = res iter.next } class << on_done alias :return :call def next raise NoMethodError, 'must call #return on an inject iterator' end end foreach.call(obj, item, on_done) }, proc{ after.call(obj) }) end
#map(foreach, after)
Collect the results of an asynchronous iteration into an array.
EM::Iterator.new(%w[ pwd uptime uname date ], 2).map(proc{ |cmd,iter| EM.system(cmd){ |output,status| iter.return(output) } }, proc{ |results| p results })
# File 'lib/em/iterator.rb', line 160
def map(foreach, after) index = 0 inject([], proc{ |results,item,iter| i = index index += 1 is_done = false on_done = proc{ |res| raise RuntimeError, 'already returned a value for this iteration' if is_done is_done = true results[i] = res iter.return(results) } class << on_done alias :return :call def next raise NoMethodError, 'must call #return on a map iterator' end end foreach.call(item, on_done) }, proc{ |results| after.call(results) }) end
#next_item (private)
Return the next item from @list or @list_proc. Once items have run out, will return EM::Iterator::Stop. Procs must supply this themselves
# File 'lib/em/iterator.rb', line 240
def next_item if @list_proc @list_proc.call else @list.empty? ? Stop : @list.shift end end
#spawn_workers (private)
Spawn workers to consume items from the iterator's enumerator based on the current concurrency level.