123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::Promises::BlockedPromise Abstract

Overview

This class is abstract.

Constant Summary

InternalStates - Included

PENDING, RESERVED, RESOLVED

Class Attribute Summary

Class Method Summary

AbstractPromise - Inherited

Synchronization::Object - Inherited

.atomic_attribute?, .atomic_attributes,
.attr_atomic

Creates methods for reading and writing to a instance variable with volatile (Java) semantic as .attr_volatile does.

.attr_volatile

Creates methods for reading and writing (as attr_accessor does) to a instance variable with volatile (Java) semantic.

.ensure_safe_initialization_when_final_fields_are_present

For testing purposes, quite slow.

.new

Has to be called by children.

.safe_initialization!, .define_initialize_atomic_fields

Synchronization::AbstractObject - Inherited

Instance Method Summary

Constructor Details

.new(delayed, blockers_count, future) ⇒ BlockedPromise

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1661

def initialize(delayed, blockers_count, future)
  super(future)
  @Delayed   = delayed
  @Countdown = AtomicFixnum.new blockers_count
end

Class Method Details

.add_delayed(delayed1, delayed2)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1652

def self.add_delayed(delayed1, delayed2)
  if delayed1 && delayed2
    delayed1.push delayed2
    delayed1
  else
    delayed1 || delayed2
  end
end

.new_blocked_by(blockers, *args, &block)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1645

def self.new_blocked_by(blockers, *args, &block)
  delayed = blockers.reduce(nil) { |d, f| add_delayed d, f.promise.delayed_because }
  promise = new(delayed, blockers.size, *args, &block)
  blockers.each_with_index { |f, i| f.add_callback_notify_blocked promise, i }
  promise
end

.new_blocked_by1(blocker, *args, &block)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1623

def self.new_blocked_by1(blocker, *args, &block)
  blocker_delayed = blocker.promise.delayed_because
  promise         = new(blocker_delayed, 1, *args, &block)
  blocker.add_callback_notify_blocked promise, 0
  promise
end

.new_blocked_by2(blocker1, blocker2, *args, &block)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1630

def self.new_blocked_by2(blocker1, blocker2, *args, &block)
  blocker_delayed1 = blocker1.promise.delayed_because
  blocker_delayed2 = blocker2.promise.delayed_because
  delayed          = if blocker_delayed1 && blocker_delayed2
                       # TODO (pitr-ch 23-Dec-2016): use arrays when we know it will not grow (only flat adds delay)
                       LockFreeStack.of2(blocker_delayed1, blocker_delayed2)
                     else
                       blocker_delayed1 || blocker_delayed2
                     end
  promise          = new(delayed, 2, *args, &block)
  blocker1.add_callback_notify_blocked promise, 0
  blocker2.add_callback_notify_blocked promise, 1
  promise
end

Instance Method Details

#blocked_by

for inspection only

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1683

def blocked_by
  blocked_by = []
  ObjectSpace.each_object(AbstractEventFuture) { |o| blocked_by.push o if o.blocks.include? self }
  blocked_by
end

#clear_and_propagate_touch(stack_or_element = @Delayed) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1691

def clear_and_propagate_touch(stack_or_element = @Delayed)
  return if stack_or_element.nil?

  if stack_or_element.is_a? LockFreeStack
    stack_or_element.clear_each { |element| clear_and_propagate_touch element }
  else
    stack_or_element.touch unless stack_or_element.nil? # if still present
  end
end

#delayed_because

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1674

def delayed_because
  @Delayed
end

#on_blocker_resolution(future, index)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1667

def on_blocker_resolution(future, index)
  countdown  = process_on_blocker_resolution(future, index)
  resolvable = resolvable?(countdown, future, index)

  on_resolvable(future, index) if resolvable
end

#on_resolvable(resolved_future, index) (private)

Raises:

  • (NotImplementedError)
[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1710

def on_resolvable(resolved_future, index)
  raise NotImplementedError
end

#process_on_blocker_resolution(future, index) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1706

def process_on_blocker_resolution(future, index)
  @Countdown.decrement
end

#resolvable?(countdown, future, index) ⇒ true, false (private)

Returns:

  • (true, false)

    if resolvable

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1702

def resolvable?(countdown, future, index)
  countdown.zero?
end

#touch

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/promises.rb', line 1678

def touch
  clear_and_propagate_touch
end