123456789_123456789_123456789_123456789_123456789_

Class: ActiveSupport::Testing::Parallelization::ThreadPoolExecutor

Overview

Thread pool executor using a test distributor strategy. Provides the same interface as Minitest::Parallel::Executor but with configurable distribution (round robin vs work stealing).

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(size:, distributor:) ⇒ ThreadPoolExecutor

[ GitHub ]

  
# File 'activesupport/lib/active_support/testing/parallelization/thread_pool_executor.rb', line 14

def initialize(size:, distributor:)
  @size = size
  @distributor = distributor
  @pool = Concurrent::FixedThreadPool.new(size, fallback_policy: :abort)
end

Instance Attribute Details

#size (readonly)

[ GitHub ]

  
# File 'activesupport/lib/active_support/testing/parallelization/thread_pool_executor.rb', line 12

attr_reader :size

Instance Method Details

#<<(work)

[ GitHub ]

  
# File 'activesupport/lib/active_support/testing/parallelization/thread_pool_executor.rb', line 26

def <<(work)
  @distributor.add_test(work)
end

#shutdown

[ GitHub ]

  
# File 'activesupport/lib/active_support/testing/parallelization/thread_pool_executor.rb', line 30

def shutdown
  @distributor.close
  @pool.shutdown
  @pool.wait_for_termination
end

#start

[ GitHub ]

  
# File 'activesupport/lib/active_support/testing/parallelization/thread_pool_executor.rb', line 20

def start
  size.times do |worker_id|
    @pool.post { worker_loop(worker_id) }
  end
end

#worker_loop(worker_id) (private)

[ GitHub ]

  
# File 'activesupport/lib/active_support/testing/parallelization/thread_pool_executor.rb', line 37

def worker_loop(worker_id)
  while job = @distributor.take(worker_id: worker_id)
    klass, method, reporter = job

    reporter.synchronize { reporter.prerecord klass, method }
    result = Minitest.run_one_method klass, method
    reporter.synchronize { reporter.record result }
  end
end