123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::Channel::Selector

Class Method Summary

Instance Method Summary

Constructor Details

.newSelector

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/selector.rb', line 14

def initialize
  @clauses = []
  @error_handler = nil
end

Instance Method Details

#after(seconds, &block) Also known as: #timeout

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/selector.rb', line 40

def after(seconds, &block)
  @clauses << AfterClause.new(seconds, block)
end

#case(channel, action, message = nil, &block)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/selector.rb', line 19

def case(channel, action, message = nil, &block)
  if [:take, :poll, :receive, :~].include?(action)
    take(channel, &block)
  elsif [:put, :offer, :send, :<<].include?(action)
    put(channel, message, &block)
  else
    raise ArgumentError.new('invalid action')
  end
end

#default(&block)

Raises:

  • (ArgumentError)
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/selector.rb', line 45

def default(&block)
  raise ArgumentError.new('no block given') unless block_given?
  @clauses << DefaultClause.new(block)
end

#error(&block)

Raises:

  • (ArgumentError)
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/selector.rb', line 50

def error(&block)
  raise ArgumentError.new('no block given') unless block_given?
  raise ArgumentError.new('only one error handler allowed') if @error_handler
  @error_handler = block
end

#execute

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/selector.rb', line 56

def execute
  raise Channel::Error.new('no clauses given') if @clauses.empty?
  loop do
    done = @clauses.each do |clause|
      result = clause.execute
      break result if result.just?
    end
    break done.value if done.is_a?(Concurrent::Maybe)
    Thread.pass
  end
rescue => ex
  if @error_handler
    @error_handler.call(ex)
  else
    raise ex
  end
end

#put(channel, message, &block) Also known as: #send

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/selector.rb', line 35

def put(channel, message, &block)
  @clauses << PutClause.new(channel, message, block)
end

#receive(channel, &block)

Alias for #take.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/selector.rb', line 33

alias_method :receive, :take

#send(channel, message, &block)

Alias for #put.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/selector.rb', line 38

alias_method :send, :put

#take(channel, &block) Also known as: #receive

Raises:

  • (ArgumentError)
[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/selector.rb', line 29

def take(channel, &block)
  raise ArgumentError.new('no block given') unless block_given?
  @clauses << TakeClause.new(channel, block)
end

#timeout(seconds, &block)

Alias for #after.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/channel/selector.rb', line 43

alias_method :timeout, :after