123456789_123456789_123456789_123456789_123456789_

Class: ActiveSupport::Testing::Parallelization::RoundRobinDistributor

Relationships & Source Files
Extension / Inclusion / Inheritance Descendants
Subclasses:
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
Instance Chain:
Inherits: ActiveSupport::Testing::Parallelization::TestDistributor
Defined in: activesupport/lib/active_support/testing/parallelization/test_distributor.rb

Overview

Round-robin distributor - tests are assigned to workers as they arrive.

Tests arrive already shuffled by ::Minitest based on the seed. Since the arrival order is deterministic for a given seed, round-robin assignment produces reproducible test-to-worker distribution.

This is much simpler than buffering and re-shuffling: tests can start executing immediately as they arrive, and we avoid complex synchronization.

Constant Summary

Class Method Summary

Instance Attribute Summary

TestDistributor - Inherited

#pending?

Check if there is pending work.

Instance Method Summary

TestDistributor - Inherited

#add_test

Add a test to be distributed to workers.

#close

Close the distributor.

#interrupt

Clear all pending work (called on interrupt).

#take

Retrieve the next test for a specific worker.

Constructor Details

.new(worker_count:) ⇒ RoundRobinDistributor

[ GitHub ]

  
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 78

def initialize(worker_count:)
  @worker_count = worker_count
  @queues = Array.new(@worker_count) { Queue.new }
  @next_worker = 0
  @mutex = Mutex.new
  @cv = ConditionVariable.new
  @closed = false
end

Instance Attribute Details

#pending?Boolean (readonly)

[ GitHub ]

  
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 121

def pending?
  @mutex.synchronize do
    @queues&.any? { |q| !q.empty? }
  end
end

Instance Method Details

#add_test(test)

[ GitHub ]

  
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 87

def add_test(test)
  @mutex.synchronize do
    return if @closed || !@queues

    worker_id = @next_worker
    @next_worker = (@next_worker + 1) % @worker_count
    queue = @queues[worker_id]
    queue << test unless queue.closed?
    @cv.signal  # Wake one waiting worker
  end
end

#close

[ GitHub ]

  
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 127

def close
  @mutex.synchronize do
    @queues&.each(&:close)
    @closed = true
    @cv.broadcast  # Wake all waiting workers
  end
end

#exhausted?(worker_id) ⇒ Boolean (private)

[ GitHub ]

  
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 154

def exhausted?(worker_id)
  queue = @queues[worker_id]
  queue.closed? && queue.empty?
end

#interrupt

[ GitHub ]

  
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 110

def interrupt
  @mutex.synchronize do
    @queues&.each do |q|
      q.clear
      q.close
    end
    @closed = true
    @cv.broadcast  # Wake all waiting workers
  end
end

#next_job(worker_id) (private)

[ GitHub ]

  
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 136

def next_job(worker_id)
  pop_now(worker_id)
end

#pop_now(worker_id) (private)

[ GitHub ]

  
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 140

def pop_now(worker_id)
  @queues[worker_id].pop(true)
rescue ThreadError, ClosedQueueError
  nil
end

#take(worker_id:)

[ GitHub ]

  
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 99

def take(worker_id:)
  job = nil

  until job || exhausted?(worker_id)
    job = next_job(worker_id)
    wait(worker_id) unless job || exhausted?(worker_id)
  end

  job
end

#wait(worker_id) (private)

Waits for work, rechecking exhausted? inside the mutex to handle the race where close() broadcasts before we start waiting.

[ GitHub ]

  
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 148

def wait(worker_id)
  @mutex.synchronize do
    @cv.wait(@mutex, WORK_WAIT_TIMEOUT) unless exhausted?(worker_id)
  end
end