123456789_123456789_123456789_123456789_123456789_

Class: Rake::ThreadPool

Do not use. This class is for internal use only.
Relationships & Source Files
Inherits: Object
Defined in: lib/rake/thread_pool.rb

Class Method Summary

Instance Method Summary

Constructor Details

.new(thread_count) ⇒ ThreadPool

Creates a ThreadPool object. The thread_count parameter is the size of the pool.

[ GitHub ]

  
# File 'lib/rake/thread_pool.rb', line 11

def initialize(thread_count)
  require "set"
  @max_active_threads = [thread_count, 0].max
  @threads = Set.new
  @threads_mon = Monitor.new
  @queue = Queue.new
  @join_cond = @threads_mon.new_cond

  @history_start_time = nil
  @history = []
  @history_mon = Monitor.new
  @total_threads_in_play = 0
end

Instance Method Details

#__queue__ (private)

for testing only

[ GitHub ]

  
# File 'lib/rake/thread_pool.rb', line 158

def __queue__ # :nodoc:
  @queue
end

#future(*args, &block)

Creates a future executed by the ThreadPool.

The args are passed to the block when executing (similarly to Thread#new) The return value is an object representing a future which has been created and added to the queue in the pool. Sending #value to the object will sleep the current thread until the future is finished and will return the result (or raise an exception thrown from the future)

[ GitHub ]

  
# File 'lib/rake/thread_pool.rb', line 33

def future(*args, &block)
  promise = Promise.new(args, &block)
  promise.recorder = lambda { |*stats| stat(*stats) }

  @queue.enq promise
  stat :queued, item_id: promise.object_id
  start_thread
  promise
end

#gather_history

Enable the gathering of history events.

[ GitHub ]

  
# File 'lib/rake/thread_pool.rb', line 68

def gather_history          #:nodoc:
  @history_start_time = Time.now if @history_start_time.nil?
end

#history

Return a array of history events for the thread pool.

History gathering must be enabled to be able to see the events (see #gather_history). Best to call this when the job is complete (i.e. after #join is called).

[ GitHub ]

  
# File 'lib/rake/thread_pool.rb', line 77

def history                 # :nodoc:
  @history_mon.synchronize { @history.dup }.
    sort_by { |i| i[:time] }.
    each { |i| i[:time] -= @history_start_time }
end

#join

Waits until the queue of futures is empty and all threads have exited.

[ GitHub ]

  
# File 'lib/rake/thread_pool.rb', line 44

def join
  @threads_mon.synchronize do
    begin
      stat :joining
      @join_cond.wait unless @threads.empty?
      stat :joined
    rescue Exception => e
      stat :joined
      $stderr.puts e
      $stderr.print "Queue contains #{@queue.size} items. " +
        "Thread pool contains #{@threads.count} threads\n"
      $stderr.print "Current Thread #{Thread.current} status = " +
        "#{Thread.current.status}\n"
      $stderr.puts e.backtrace.join("\n")
      @threads.each do |t|
        $stderr.print "Thread #{t} status = #{t.status}\n"
        $stderr.puts t.backtrace.join("\n")
      end
      raise e
    end
  end
end

#process_queue_item (private)

processes one item on the queue. Returns true if there was an item to process, false if there was no item

[ GitHub ]

  
# File 'lib/rake/thread_pool.rb', line 95

def process_queue_item      #:nodoc:
  return false if @queue.empty?

  # Even though we just asked if the queue was empty, it
  # still could have had an item which by this statement
  # is now gone. For this reason we pass true to Queue#deq
  # because we will sleep indefinitely if it is empty.
  promise = @queue.deq(true)
  stat :dequeued, item_id: promise.object_id
  promise.work
  return true

rescue ThreadError # this means the queue is empty
  false
end

#safe_thread_count (private)

[ GitHub ]

  
# File 'lib/rake/thread_pool.rb', line 111

def safe_thread_count
  @threads_mon.synchronize do
    @threads.count
  end
end

#start_thread (private)

[ GitHub ]

  
# File 'lib/rake/thread_pool.rb', line 117

def start_thread # :nodoc:
  @threads_mon.synchronize do
    next unless @threads.count < @max_active_threads

    t = Thread.new do
      begin
        while safe_thread_count <= @max_active_threads
          break unless process_queue_item
        end
      ensure
        @threads_mon.synchronize do
          @threads.delete Thread.current
          stat :ended, thread_count: @threads.count
          @join_cond.broadcast if @threads.empty?
        end
      end
    end

    @threads << t
    stat(
      :spawned,
      new_thread: t.object_id,
      thread_count: @threads.count)
    @total_threads_in_play = @threads.count if
      @threads.count > @total_threads_in_play
  end
end

#stat(event, data = nil) (private)

[ GitHub ]

  
# File 'lib/rake/thread_pool.rb', line 145

def stat(event, data=nil) # :nodoc:
  return if @history_start_time.nil?
  info = {
    event: event,
    data: data,
    time: Time.now,
    thread: Thread.current.object_id,
  }
  @history_mon.synchronize { @history << info }
end

#statistics

Return a hash of always collected statistics for the thread pool.

[ GitHub ]

  
# File 'lib/rake/thread_pool.rb', line 84

def statistics              #  :nodoc:
  {
    total_threads_in_play: @total_threads_in_play,
    max_active_threads: @max_active_threads,
  }
end