123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::CyclicBarrier

Relationships & Source Files
Namespace Children
Classes:
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
Instance Chain:
Inherits: Concurrent::Synchronization::LockableObject
Defined in: lib/concurrent-ruby/concurrent/atomic/cyclic_barrier.rb

Overview

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.

Examples:

barrier = Concurrent::CyclicBarrier.new(3)
jobs    = Array.new(3) { |i| -> { sleep i; p done: i } }
process = -> (i) do
  # waiting to start at the same time
  barrier.wait
  # execute job
  jobs[i].call
  # wait for others to finish
  barrier.wait
end
threads = 2.times.map do |i|
  Thread.new(i, &process)
end

# use main as well
process.call 2

# here we can be sure that all jobs are processed

Class Method Summary

Instance Attribute Summary

  • #broken? ⇒ Boolean readonly

    A barrier can be broken when: - a thread called the #reset method while at least one other thread was waiting - at least one thread timed out on #wait method.

Instance Method Summary

  • #number_waiting ⇒ Fixnum
  • #parties ⇒ Fixnum
  • #reset ⇒ nil

    resets the barrier to its initial state If there is at least one waiting thread, it will be woken up, the #wait method will return false and the barrier will be broken If the barrier is broken, this method restores it to the original state.

  • #wait(timeout = nil) ⇒ Boolean

    Blocks on the barrier until the number of waiting threads is equal to #parties or until timeout is reached or #reset is called If a block has been passed to the constructor, it will be executed once by the last arrived thread before releasing the others.

Synchronization::LockableObject - Inherited

Constructor Details

.new(parties) { ... } ⇒ CyclicBarrier

Create a new CyclicBarrier that waits for #parties threads

Parameters:

  • parties (Fixnum)

    the number of parties

Yields:

  • an optional block that will be executed that will be executed after the last thread arrives and before the others are released

Raises:

  • (ArgumentError)

    if #parties is not an integer or is less than zero

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/atomic/cyclic_barrier.rb', line 40

def initialize(parties, &block)
  Utility::NativeInteger.ensure_integer_and_bounds parties
  Utility::NativeInteger.ensure_positive_and_no_zero parties

  super(&nil)
  synchronize { ns_initialize parties, &block }
end

Instance Attribute Details

#broken?Boolean (readonly)

A barrier can be broken when:

  • a thread called the #reset method while at least one other thread was waiting

  • at least one thread timed out on #wait method

A broken barrier can be restored using #reset it’s safer to create a new one

Returns:

  • (Boolean)

    true if the barrier is broken otherwise false

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/atomic/cyclic_barrier.rb', line 105

def broken?
  synchronize { @generation.status != :waiting }
end

Instance Method Details

#number_waitingFixnum

Returns:

  • (Fixnum)

    the number of threads currently waiting on the barrier

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/atomic/cyclic_barrier.rb', line 54

def number_waiting
  synchronize { @number_waiting }
end

#partiesFixnum

Returns:

  • (Fixnum)

    the number of threads needed to pass the barrier

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/atomic/cyclic_barrier.rb', line 49

def parties
  synchronize { @parties }
end

#resetnil

resets the barrier to its initial state If there is at least one waiting thread, it will be woken up, the #wait method will return false and the barrier will be broken If the barrier is broken, this method restores it to the original state

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/atomic/cyclic_barrier.rb', line 95

def reset
  synchronize { ns_generation_done @generation, :reset }
end

#wait(timeout = nil) ⇒ Boolean

Blocks on the barrier until the number of waiting threads is equal to #parties or until timeout is reached or #reset is called If a block has been passed to the constructor, it will be executed once by

the last arrived thread before releasing the others

Parameters:

  • timeout (Fixnum) (defaults to: nil)

    the number of seconds to wait for the counter or nil to block indefinitely

Returns:

  • (Boolean)

    true if the count reaches zero else false on timeout or on #reset or if the barrier is broken

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/atomic/cyclic_barrier.rb', line 66

def wait(timeout = nil)
  synchronize do

    return false unless @generation.status == :waiting

    @number_waiting += 1

    if @number_waiting == @parties
      @action.call if @action
      ns_generation_done @generation, :fulfilled
      true
    else
      generation = @generation
      if ns_wait_until(timeout) { generation.status != :waiting }
        generation.status == :fulfilled
      else
        ns_generation_done generation, :broken, false
        false
      end
    end
  end
end