Class: Concurrent::CyclicBarrier

Inherits:
Synchronization::LockableObject
  • Object
show all
Defined in:
lib/concurrent/atomic/cyclic_barrier.rb

Overview

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.

Examples:

barrier = Concurrent::CyclicBarrier.new(3)
jobs    = Array.new(3) { |i| -> { sleep i; p done: i } }
process = -> (i) do
  # waiting to start at the same time
  barrier.wait
  # execute job
  jobs[i].call
  # wait for others to finish
  barrier.wait
end
threads = 2.times.map do |i|
  Thread.new(i, &process)
end

# use main as well
process.call 2

# here we can be sure that all jobs are processed

Instance Method Summary collapse

Constructor Details

#initialize(parties) { ... } ⇒ CyclicBarrier

Create a new CyclicBarrier that waits for parties threads

Parameters:

  • parties (Fixnum)

    the number of parties

Yields:

  • an optional block that will be executed that will be executed after the last thread arrives and before the others are released

Raises:

  • (ArgumentError)

    if parties is not an integer or is less than zero



40
41
42
43
44
45
46
# File 'lib/concurrent/atomic/cyclic_barrier.rb', line 40

def initialize(parties, &block)
  Utility::NativeInteger.ensure_integer_and_bounds parties
  Utility::NativeInteger.ensure_positive_and_no_zero parties

  super(&nil)
  synchronize { ns_initialize parties, &block }
end

Instance Method Details

#broken?Boolean

A barrier can be broken when:

  • a thread called the reset method while at least one other thread was waiting
  • at least one thread timed out on wait method

A broken barrier can be restored using reset it's safer to create a new one

Returns:

  • (Boolean)

    true if the barrier is broken otherwise false



105
106
107
# File 'lib/concurrent/atomic/cyclic_barrier.rb', line 105

def broken?
  synchronize { @generation.status != :waiting }
end

#number_waitingFixnum

Returns the number of threads currently waiting on the barrier

Returns:

  • (Fixnum)

    the number of threads currently waiting on the barrier



54
55
56
# File 'lib/concurrent/atomic/cyclic_barrier.rb', line 54

def number_waiting
  synchronize { @number_waiting }
end

#partiesFixnum

Returns the number of threads needed to pass the barrier

Returns:

  • (Fixnum)

    the number of threads needed to pass the barrier



49
50
51
# File 'lib/concurrent/atomic/cyclic_barrier.rb', line 49

def parties
  synchronize { @parties }
end

#resetnil

resets the barrier to its initial state If there is at least one waiting thread, it will be woken up, the wait method will return false and the barrier will be broken If the barrier is broken, this method restores it to the original state

Returns:

  • (nil)


95
96
97
# File 'lib/concurrent/atomic/cyclic_barrier.rb', line 95

def reset
  synchronize { ns_generation_done @generation, :reset }
end

#wait(timeout = nil) ⇒ Boolean

Blocks on the barrier until the number of waiting threads is equal to parties or until timeout is reached or reset is called If a block has been passed to the constructor, it will be executed once by the last arrived thread before releasing the others

Parameters:

  • timeout (Fixnum) (defaults to: nil)

    the number of seconds to wait for the counter or nil to block indefinitely

Returns:

  • (Boolean)

    true if the count reaches zero else false on timeout or on reset or if the barrier is broken



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/concurrent/atomic/cyclic_barrier.rb', line 66

def wait(timeout = nil)
  synchronize do

    return false unless @generation.status == :waiting

    @number_waiting += 1

    if @number_waiting == @parties
      @action.call if @action
      ns_generation_done @generation, :fulfilled
      true
    else
      generation = @generation
      if ns_wait_until(timeout) { generation.status != :waiting }
        generation.status == :fulfilled
      else
        ns_generation_done generation, :broken, false
        false
      end
    end
  end
end