Class: ActiveSupport::Testing::Parallelization::RoundRobinDistributor
| Relationships & Source Files | |
| Extension / Inclusion / Inheritance Descendants | |
|
Subclasses:
|
|
| Super Chains via Extension / Inclusion / Inheritance | |
|
Class Chain:
self,
TestDistributor
|
|
|
Instance Chain:
self,
TestDistributor
|
|
| Inherits: |
ActiveSupport::Testing::Parallelization::TestDistributor
|
| Defined in: | activesupport/lib/active_support/testing/parallelization/test_distributor.rb |
Overview
Round-robin distributor - tests are assigned to workers as they arrive.
Tests arrive already shuffled by ::Minitest based on the seed. Since the arrival order is deterministic for a given seed, round-robin assignment produces reproducible test-to-worker distribution.
This is much simpler than buffering and re-shuffling: tests can start executing immediately as they arrive, and we avoid complex synchronization.
Constant Summary
-
WORK_WAIT_TIMEOUT =
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 760.1
Class Method Summary
- .new(worker_count:) ⇒ RoundRobinDistributor constructor
Instance Attribute Summary
Instance Method Summary
- #add_test(test)
- #close
- #interrupt
- #take(worker_id:)
- #exhausted?(worker_id) ⇒ Boolean private
- #next_job(worker_id) private
- #pop_now(worker_id) private
-
#wait(worker_id)
private
Waits for work, rechecking exhausted? inside the mutex to handle the race where close() broadcasts before we start waiting.
TestDistributor - Inherited
| #add_test | Add a test to be distributed to workers. |
| #close | Close the distributor. |
| #interrupt | Clear all pending work (called on interrupt). |
| #take | Retrieve the next test for a specific worker. |
Constructor Details
.new(worker_count:) ⇒ RoundRobinDistributor
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 78
def initialize(worker_count:) @worker_count = worker_count @queues = Array.new(@worker_count) { Queue.new } @next_worker = 0 @mutex = Mutex.new @cv = ConditionVariable.new @closed = false end
Instance Attribute Details
#pending? ⇒ Boolean (readonly)
[ GitHub ]
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 121
def pending? @mutex.synchronize do @queues&.any? { |q| !q.empty? } end end
Instance Method Details
#add_test(test)
[ GitHub ]# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 87
def add_test(test) @mutex.synchronize do return if @closed || !@queues worker_id = @next_worker @next_worker = (@next_worker + 1) % @worker_count queue = @queues[worker_id] queue << test unless queue.closed? @cv.signal # Wake one waiting worker end end
#close
[ GitHub ]# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 127
def close @mutex.synchronize do @queues&.each(&:close) @closed = true @cv.broadcast # Wake all waiting workers end end
#exhausted?(worker_id) ⇒ Boolean (private)
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 154
def exhausted?(worker_id) queue = @queues[worker_id] queue.closed? && queue.empty? end
#interrupt
[ GitHub ]# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 110
def interrupt @mutex.synchronize do @queues&.each do |q| q.clear q.close end @closed = true @cv.broadcast # Wake all waiting workers end end
#next_job(worker_id) (private)
[ GitHub ]# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 136
def next_job(worker_id) pop_now(worker_id) end
#pop_now(worker_id) (private)
[ GitHub ]# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 140
def pop_now(worker_id) @queues[worker_id].pop(true) rescue ThreadError, ClosedQueueError nil end
#take(worker_id:)
[ GitHub ]# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 99
def take(worker_id:) job = nil until job || exhausted?(worker_id) job = next_job(worker_id) wait(worker_id) unless job || exhausted?(worker_id) end job end
#wait(worker_id) (private)
Waits for work, rechecking exhausted? inside the mutex to handle the race where close() broadcasts before we start waiting.
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 148
def wait(worker_id) @mutex.synchronize do @cv.wait(@mutex, WORK_WAIT_TIMEOUT) unless exhausted?(worker_id) end end