Class: Concurrent::Channel::Selector
Class Method Summary
- .new ⇒ Selector constructor
Instance Method Summary
- #after(seconds, &block) (also: #timeout)
- #case(channel, action, message = nil, &block)
- #default(&block)
- #error(&block)
- #execute
- #put(channel, message, &block) (also: #send)
-
#receive(channel, &block)
Alias for #take.
-
#send(channel, message, &block)
Alias for #put.
- #take(channel, &block) (also: #receive)
-
#timeout(seconds, &block)
Alias for #after.
Constructor Details
.new ⇒ Selector
# File 'lib/concurrent-ruby-edge/concurrent/channel/selector.rb', line 14
def initialize @clauses = [] @error_handler = nil end
Instance Method Details
#after(seconds, &block) Also known as: #timeout
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/channel/selector.rb', line 40
def after(seconds, &block) @clauses << AfterClause.new(seconds, block) end
#case(channel, action, message = nil, &block)
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/channel/selector.rb', line 19
def case(channel, action, = nil, &block) if [:take, :poll, :receive, :~].include?(action) take(channel, &block) elsif [:put, :offer, :send, :<<].include?(action) put(channel, , &block) else raise ArgumentError.new('invalid action') end end
#default(&block)
# File 'lib/concurrent-ruby-edge/concurrent/channel/selector.rb', line 45
def default(&block) raise ArgumentError.new('no block given') unless block_given? @clauses << DefaultClause.new(block) end
#error(&block)
#execute
[ GitHub ]# File 'lib/concurrent-ruby-edge/concurrent/channel/selector.rb', line 56
def execute raise Channel::Error.new('no clauses given') if @clauses.empty? loop do done = @clauses.each do |clause| result = clause.execute break result if result.just? end break done.value if done.is_a?(Concurrent::Maybe) Thread.pass end rescue => ex if @error_handler @error_handler.call(ex) else raise ex end end
#put(channel, message, &block) Also known as: #send
[ GitHub ]#receive(channel, &block)
Alias for #take.
# File 'lib/concurrent-ruby-edge/concurrent/channel/selector.rb', line 33
alias_method :receive, :take
#send(channel, message, &block)
Alias for #put.
# File 'lib/concurrent-ruby-edge/concurrent/channel/selector.rb', line 38
alias_method :send, :put
#take(channel, &block) Also known as: #receive
# File 'lib/concurrent-ruby-edge/concurrent/channel/selector.rb', line 29
def take(channel, &block) raise ArgumentError.new('no block given') unless block_given? @clauses << TakeClause.new(channel, block) end
#timeout(seconds, &block)
Alias for #after.
# File 'lib/concurrent-ruby-edge/concurrent/channel/selector.rb', line 43
alias_method :timeout, :after