Class: Rake::ThreadPool
Relationships & Source Files | |
Inherits: | Object |
Defined in: | lib/rake/thread_pool.rb |
Class Method Summary
-
.new(thread_count) ⇒ ThreadPool
constructor
Creates a
ThreadPool
object.
Instance Method Summary
-
#future(*args, &block)
Creates a future executed by the
ThreadPool
. -
#gather_history
Enable the gathering of history events.
-
#history
Return a array of history events for the thread pool.
-
#join
Waits until the queue of futures is empty and all threads have exited.
-
#statistics
Return a hash of always collected statistics for the thread pool.
-
#__queue__
private
for testing only.
-
#process_queue_item
private
processes one item on the queue.
- #safe_thread_count private
- #start_thread private
- #stat(event, data = nil) private
Constructor Details
.new(thread_count) ⇒ ThreadPool
Creates a ThreadPool
object. The thread_count
parameter is the size of the pool.
# File 'lib/rake/thread_pool.rb', line 12
def initialize(thread_count) @max_active_threads = [thread_count, 0].max @threads = Set.new @threads_mon = Monitor.new @queue = Queue.new @join_cond = @threads_mon.new_cond @history_start_time = nil @history = [] @history_mon = Monitor.new @total_threads_in_play = 0 end
Instance Method Details
#__queue__ (private)
for testing only
# File 'lib/rake/thread_pool.rb', line 158
def __queue__ # :nodoc: @queue end
#future(*args, &block)
Creates a future executed by the ThreadPool
.
The args are passed to the block when executing (similarly to Thread#new
) The return value is an object representing a future which has been created and added to the queue in the pool. Sending #value
to the object will sleep the current thread until the future is finished and will return the result (or raise an exception thrown from the future)
# File 'lib/rake/thread_pool.rb', line 33
def future(*args, &block) promise = Promise.new(args, &block) promise.recorder = lambda { |*stats| stat(*stats) } @queue.enq promise stat :queued, item_id: promise.object_id start_thread promise end
#gather_history
Enable the gathering of history events.
# File 'lib/rake/thread_pool.rb', line 68
def gather_history #:nodoc: @history_start_time = Time.now if @history_start_time.nil? end
#history
Return a array of history events for the thread pool.
History gathering must be enabled to be able to see the events (see #gather_history). Best to call this when the job is complete (i.e. after #join is called).
# File 'lib/rake/thread_pool.rb', line 77
def history # :nodoc: @history_mon.synchronize { @history.dup }. sort_by { |i| i[:time] }. each { |i| i[:time] -= @history_start_time } end
#join
Waits until the queue of futures is empty and all threads have exited.
# File 'lib/rake/thread_pool.rb', line 44
def join @threads_mon.synchronize do begin stat :joining @join_cond.wait unless @threads.empty? stat :joined rescue Exception => e stat :joined $stderr.puts e $stderr.print "Queue contains #{@queue.size} items. " + "Thread pool contains #{@threads.count} threads\n" $stderr.print "Current Thread #{Thread.current} status = " + "#{Thread.current.status}\n" $stderr.puts e.backtrace.join("\n") @threads.each do |t| $stderr.print "Thread #{t} status = #{t.status}\n" $stderr.puts t.backtrace.join("\n") end raise e end end end
#process_queue_item (private)
processes one item on the queue. Returns true if there was an item to process, false if there was no item
# File 'lib/rake/thread_pool.rb', line 95
def process_queue_item #:nodoc: return false if @queue.empty? # Even though we just asked if the queue was empty, it # still could have had an item which by this statement # is now gone. For this reason we pass true to Queue#deq # because we will sleep indefinitely if it is empty. promise = @queue.deq(true) stat :dequeued, item_id: promise.object_id promise.work return true rescue ThreadError # this means the queue is empty false end
#safe_thread_count (private)
[ GitHub ]# File 'lib/rake/thread_pool.rb', line 111
def safe_thread_count @threads_mon.synchronize do @threads.count end end
#start_thread (private)
[ GitHub ]# File 'lib/rake/thread_pool.rb', line 117
def start_thread # :nodoc: @threads_mon.synchronize do next unless @threads.count < @max_active_threads t = Thread.new do begin while safe_thread_count <= @max_active_threads break unless process_queue_item end ensure @threads_mon.synchronize do @threads.delete Thread.current stat :ended, thread_count: @threads.count @join_cond.broadcast if @threads.empty? end end end @threads << t stat( :spawned, new_thread: t.object_id, thread_count: @threads.count) @total_threads_in_play = @threads.count if @threads.count > @total_threads_in_play end end
#stat(event, data = nil) (private)
[ GitHub ]# File 'lib/rake/thread_pool.rb', line 145
def stat(event, data=nil) # :nodoc: return if @history_start_time.nil? info = { event: event, data: data, time: Time.now, thread: Thread.current.object_id, } @history_mon.synchronize { @history << info } end
#statistics
Return a hash of always collected statistics for the thread pool.
# File 'lib/rake/thread_pool.rb', line 84
def statistics # :nodoc: { total_threads_in_play: @total_threads_in_play, max_active_threads: @max_active_threads, } end