123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::Promises::Channel

Overview

Note:

**Edge Features** are under active development and may change frequently.

  • Deprecations are not added before incompatible changes.

  • Edge version: major is always 0, minor bump means incompatible change, patch bump means compatible change.

  • Edge features may also lack tests and documentation.

  • Features developed in concurrent-ruby-edge are expected to move to concurrent-ruby when finalised.

A first in first out channel that accepts messages with push family of methods and returns messages with pop family of methods. Pop and push operations can be represented as futures, see #pop_op and #push_op. The capacity of the channel can be limited to support back pressure, use capacity option in Concurrent#initialize. #pop method blocks ans #pop_op returns pending future if there is no message in the channel. If the capacity is limited the #push method blocks and #push_op returns pending future.

==== Examples

Let's start by creating a channel with a capacity of 2 messages.

ch = {Concurrent::Promises::Channel.new} 2
=== => #<Concurrent::Promises::Channel:0x000002 capacity taken 0 of 2>

We push 3 messages, then it can be observed that the last thread pushing is sleeping since the channel is full.

threads = {Array.new}(3) { |i| {Thread.new} { ch.push message: i } } 
sleep 0.01 # let the threads run
threads
=== => [#<Thread:0x000003@channel.in.md:14 dead>,
===     #<Thread:0x000004@channel.in.md:14 dead>,
===     #<Thread:0x000005@channel.in.md:14 sleep_forever>]

When message is popped the last thread continues and finishes as well.

ch.pop                                   # => {:message=>1}
threads.map(&:join)
=== => [#<Thread:0x000003@channel.in.md:14 dead>,
===     #<Thread:0x000004@channel.in.md:14 dead>,
===     #<Thread:0x000005@channel.in.md:14 dead>]

Same principle applies to popping as well. There are now 2 messages int he channel. Lets create 3 threads trying to pop a message, one will be blocked until new messages is pushed.

threads = {Array.new}(3) { |i| {Thread.new} { ch.pop } } 
sleep 0.01 # let the threads run
threads 
=== => [#<Thread:0x000006@channel.in.md:32 dead>,
===     #<Thread:0x000007@channel.in.md:32 dead>,
===     #<Thread:0x000008@channel.in.md:32 sleep_forever>]
ch.push message: 3
=== => #<Concurrent::Promises::Channel:0x000002 capacity taken 0 of 2>
threads.map(&:value)
=== => [{:message=>0}, {:message=>2}, {:message=>3}]

===== Promises integration

However this channel is implemented to integrate with promises therefore all operations can be represented as futures.

ch = {Concurrent::Promises::Channel.new} 2
=== => #<Concurrent::Promises::Channel:0x000009 capacity taken 0 of 2>
push_operations = {Array.new}(3) { |i| ch.push_op message: i }
=== => [#<Concurrent::Promises::Future:0x00000a fulfilled with #<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>>,
===     #<Concurrent::Promises::Future:0x00000b fulfilled with #<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>>,
===     #<Concurrent::Promises::ResolvableFuture:0x00000c pending>]

We do not have to sleep here letting the futures execute as Threads. Since there is capacity for 2 messages the Promises are immediately resolved without ever allocating a Thread to execute. Push and pop operations are often more efficient. The remaining pending push operation will also never require another thread, instead it will resolve when a message is popped from the channel making a space for a new message.

ch.pop_op.value!                         # => {:message=>0}
push_operations.map(&:value!)
=== => [#<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>,
===     #<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>,
===     #<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>]

pop_operations = {Array.new}(3) { |i| ch.pop_op }
=== => [#<Concurrent::Promises::ResolvableFuture:0x00000d fulfilled with {:message=>1}>,
===     #<Concurrent::Promises::ResolvableFuture:0x00000e fulfilled with {:message=>2}>,
===     #<Concurrent::Promises::ResolvableFuture:0x00000f pending>]
ch.push message: 3 # (push|pop) can be freely mixed with (push_o|pop_op)
pop_operations.map(&:value) 
=== => [{:message=>1}, {:message=>2}, {:message=>3}]

===== Selecting over channels

A selection over channels can be created with the .select_channel factory method. It will be fulfilled with a first message available in any of the channels. It returns a pair to be able to find out which channel had the message available.

ch1    = {Concurrent::Promises::Channel.new} 2
=== => #<Concurrent::Promises::Channel:0x000010 capacity taken 0 of 2>
ch2    = {Concurrent::Promises::Channel.new} 2
=== => #<Concurrent::Promises::Channel:0x000011 capacity taken 0 of 2>
ch1.push 1 
=== => #<Concurrent::Promises::Channel:0x000010 capacity taken 1 of 2>
ch2.push 2 
=== => #<Concurrent::Promises::Channel:0x000011 capacity taken 1 of 2>

<code>Channel</code>.select([ch1, ch2])
=== => [#<Concurrent::Promises::Channel:0x000010 capacity taken 0 of 2>, 1]
ch1.select(ch2)
=== => [#<Concurrent::Promises::Channel:0x000011 capacity taken 0 of 2>, 2]

{Concurrent::Promises.future} { 3 + 4 }.then_channel_push(ch1)
=== => #<Concurrent::Promises::Future:0x000012 pending>
<code>Channel</code>. 
    # or `ch1.select_op(ch2)` would be equivalent
    select_op([ch1, ch2]).
    then('got number %03d from ch%d') { |(channel, value), format| 
      format format, value, [ch1, ch2].index(channel).succ
    }.value!                             # => "got number 007 from ch1"

===== try_ variants

All blocking operations (#pop, #push, #select) have non-blocking variant with try_ prefix. They always return immediately and indicate either success or failure.

ch
=== => #<Concurrent::Promises::Channel:0x000009 capacity taken 0 of 2>
ch.try_push 1                            # => true
ch.try_push 2                            # => true
ch.try_push 3                            # => false
ch.try_pop                               # => 1
ch.try_pop                               # => 2
ch.try_pop                               # => nil

===== Timeouts

All blocking operations (#pop, #push, #select) have a timeout option. Similar to try_ variants it will indicate success or timing out, when the timeout option is used.

ch
=== => #<Concurrent::Promises::Channel:0x000009 capacity taken 0 of 2>
ch.push 1, 0.01                          # => true
ch.push 2, 0.01                          # => true
ch.push 3, 0.01                          # => false
ch.pop 0.01                              # => 1
ch.pop 0.01                              # => 2
ch.pop 0.01                              # => nil

===== Backpressure

Most importantly the channel can be used to create systems with backpressure. A self adjusting system where the producers will slow down if the consumers are not keeping up.

channel = {Concurrent::Promises::Channel.new} 2
=== => #<Concurrent::Promises::Channel:0x000013 capacity taken 0 of 2>
log     = {Concurrent::Array.new}          # => []

producers = {Array.new} 2 do |i|
  Thread.new(i) do |i|
    4.times do |j|
      log.push format "producer %d pushing %d", i, j      
      channel.push [i, j]      
    end
  end
end
=== => [#<Thread:0x000014@channel.in.md:133 run>,
===     #<Thread:0x000015@channel.in.md:133 run>]

consumers = {Array.new} 4 do |i|
  Thread.new(i) do |consumer|
    2.times do |j|
      from, message = channel.pop
      log.push format "consumer %d got %d. payload %d from producer %d", 
                      consumer, j, message, from       
      do_stuff      
    end
  end
end
=== => [#<Thread:0x000016@channel.in.md:142 run>,
===     #<Thread:0x000017@channel.in.md:142 run>,
===     #<Thread:0x000018@channel.in.md:142 run>,
===     #<Thread:0x000019@channel.in.md:142 run>]

=== wait for all to finish
producers.map(&:join)
=== => [#<Thread:0x000014@channel.in.md:133 dead>,
===     #<Thread:0x000015@channel.in.md:133 dead>]
consumers.map(&:join)
=== => [#<Thread:0x000016@channel.in.md:142 dead>,
===     #<Thread:0x000017@channel.in.md:142 dead>,
===     #<Thread:0x000018@channel.in.md:142 dead>,
===     #<Thread:0x000019@channel.in.md:142 dead>]
=== investigate log
log
=== => ["producer 0 pushing 0",
===     "producer 0 pushing 1",
===     "producer 0 pushing 2",
===     "producer 1 pushing 0",
===     "consumer 0 got 0. payload 0 from producer 0",
===     "producer 0 pushing 3",
===     "consumer 1 got 0. payload 1 from producer 0",
===     "consumer 2 got 0. payload 2 from producer 0",
===     "consumer 3 got 0. payload 0 from producer 1",
===     "producer 1 pushing 1",
===     "producer 1 pushing 2",
===     "consumer 1 got 1. payload 3 from producer 0",
===     "producer 1 pushing 3",
===     "consumer 2 got 1. payload 1 from producer 1",
===     "consumer 3 got 1. payload 2 from producer 1",
===     "consumer 0 got 1. payload 3 from producer 1"]

The producers are much faster than consumers (since they do_stuff which takes some time)
but as it can be seen from the log they fill the channel and then they slow down until there is space available in the channel.

If permanent allocation of threads to the producers and consumers has to be avoided, the threads can be replaced with promises that run a thread pool.

channel = {Concurrent::Promises::Channel.new} 2
=== => #<Concurrent::Promises::Channel:0x00001a capacity taken 0 of 2>
log     = {Concurrent::Array.new}          # => []

def produce(channel, log, producer, i)
  log.push format "producer %d pushing %d", producer, i      
  channel.push_op([producer, i]).then do
    i + 1 < 4 ? produce(channel, log, producer, i + 1) : :done    
  end      
end                                      # => :produce

def consume(channel, log, consumer, i)
  channel.pop_op.then(consumer, i) do |(from, message), consumer, i|
    log.push format "consumer %d got %d. payload %d from producer %d", 
                    consumer, i, message, from       
    do_stuff
    i + 1 < 2 ? consume(channel, log, consumer, i + 1) : :done       
  end
end                                      # => :consume

producers = {Array.new} 2 do |i|
  Concurrent::Promises.future(channel, log, i) { |*args| produce *args, 0 }.run
end
=== => [#<Concurrent::Promises::Future:0x00001b pending>,
===     #<Concurrent::Promises::Future:0x00001c pending>]

consumers = {Array.new} 4 do |i|
  Concurrent::Promises.future(channel, log, i) { |*args| consume *args, 0 }.run
end
=== => [#<Concurrent::Promises::Future:0x00001d pending>,
===     #<Concurrent::Promises::Future:0x00001e pending>,
===     #<Concurrent::Promises::Future:0x00001f pending>,
===     #<Concurrent::Promises::Future:0x000020 pending>]

=== wait for all to finish
producers.map(&:value!)                  # => [:done, :done]
consumers.map(&:value!)                  # => [:done, :done, :done, :done]
=== investigate log
log
=== => ["producer 0 pushing 0",
===     "producer 1 pushing 0",
===     "producer 1 pushing 1",
===     "consumer 1 got 0. payload 0 from producer 1",
===     "consumer 2 got 0. payload 1 from producer 1",
===     "producer 0 pushing 1",
===     "producer 0 pushing 2",
===     "producer 0 pushing 3",
===     "producer 1 pushing 2",
===     "consumer 0 got 0. payload 0 from producer 0",
===     "consumer 3 got 0. payload 1 from producer 0",
===     "producer 1 pushing 3",
===     "consumer 2 got 1. payload 2 from producer 0",
===     "consumer 1 got 1. payload 3 from producer 0",
===     "consumer 3 got 1. payload 3 from producer 1",
===     "consumer 0 got 1. payload 2 from producer 1"]

===== Synchronization of workers by passing a value

If the capacity of the channel is zero then any push operation will succeed only when there is a matching pop operation which can take the message. The operations have to be paired to succeed.

channel = {Concurrent::Promises::Channel.new} 0
=== => #<Concurrent::Promises::Channel:0x000021 capacity taken 0 of 0>
thread = {Thread.new} { channel.pop }; sleep 0.01 
=== allow the thread to go to sleep
thread
=== => #<Thread:0x000022@channel.in.md:214 sleep_forever>
=== succeeds because there is matching pop operation waiting in the thread 
channel.try_push(:v1)                    # => true
=== remains pending, since there is no matching operation 
push = channel.push_op(:v2)
=== => #<Concurrent::Promises::ResolvableFuture:0x000023 pending>
thread.value                             # => :v1
=== the push operation resolves as a pairing pop is called
channel.pop                              # => :v2
push
=== => #<Concurrent::Promises::ResolvableFuture:0x000023 fulfilled with #<Concurrent::Promises::Channel:0x000021 capacity taken 0 of 0>>

Constant Summary

Class Attribute Summary

Class Method Summary

Instance Method Summary

Synchronization::Object - Inherited

Synchronization::Volatile - Included

Synchronization::AbstractObject - Inherited

Constructor Details

.new(capacity = UNLIMITED_CAPACITY) ⇒ Channel

Create channel.

Parameters:

  • capacity (Integer, UNLIMITED_CAPACITY) (defaults to: UNLIMITED_CAPACITY)

    the maximum number of messages which can be stored in the channel.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 64

def initialize(capacity = UNLIMITED_CAPACITY)
  super()
  @Capacity = capacity
  @Mutex    = Mutex.new
  # TODO (pitr-ch 28-Jan-2019): consider linked lists or other data structures for following attributes, things are being deleted from the middle
  @Probes      = []
  @Messages    = []
  @PendingPush = []
end

Class Method Details

.select(channels, timeout = nil) ⇒ ::Array(Channel, Object)?

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 322

def select(channels, timeout = nil)
  channels.first.select(channels[1..-1], timeout)
end

.select_matching(matcher, channels, timeout = nil) ⇒ ::Array(Channel, Object)?

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 340

def select_matching(matcher, channels, timeout = nil)
  channels.first.select_matching(matcher, channels[1..-1], timeout)
end

.select_op(channels, probe = Promises.resolvable_future) ⇒ Future(::Array(Channel, Object))

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 316

def select_op(channels, probe = Promises.resolvable_future)
  channels.first.select_op(channels[1..-1], probe)
end

.select_op_matching(matcher, channels, probe = Promises.resolvable_future) ⇒ Future(::Array(Channel, Object))

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 334

def select_op_matching(matcher, channels, probe = Promises.resolvable_future)
  channels.first.select_op_matching(matcher, channels[1..-1], probe)
end

.try_select(channels) ⇒ ::Array(Channel, Object)

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 310

def try_select(channels)
  channels.first.try_select(channels[1..-1])
end

.try_select_matching(matcher, channels) ⇒ ::Array(Channel, Object)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 328

def try_select_matching(matcher, channels)
  channels.first.try_select_matching(matcher, channels[1..-1])
end

Instance Method Details

#capacityInteger

Returns:

  • (Integer)

    Maximum capacity of the Channel.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 295

def capacity
  @Capacity
end

#inspect

Alias for #to_s.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 304

alias_method :inspect, :to_s

#ns_consume_pending_push(matcher, remove = true) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 397

def ns_consume_pending_push(matcher, remove = true)
  i = 0
  while true
    message, pushed = @PendingPush[i, 2]
    return NOTHING unless pushed

    if matcher === message
      resolved           = pushed.resolved?
      @PendingPush[i, 2] = [] if remove || resolved
      # can fail if timed-out, so try without error
      if remove ? pushed.fulfill(self, false) : !resolved
        # pushed fulfilled so actually push the message
        return message
      end
    end

    i += 2
  end
end

#ns_pop_op(matcher, probe, include_channel) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 352

def ns_pop_op(matcher, probe, include_channel)
  message = ns_shift_message matcher

  # got message from buffer
  if message != NOTHING
    if probe.fulfill(include_channel ? [self, message] : message, false)
      new_message = ns_consume_pending_push ANY
      @Messages.push new_message unless new_message == NOTHING
    else
      @Messages.unshift message
    end
    return probe
  end

  # no message in buffer, try to pair with a pending push
  i = 0
  while true
    message, pushed = @PendingPush[i, 2]
    break if pushed.nil?

    if matcher === message
      value = include_channel ? [self, message] : message
      if Promises::Resolvable.atomic_resolution(probe  => [true, value, nil],
                                                pushed => [true, self, nil])
        @PendingPush[i, 2] = []
        return probe
      end

      if probe.resolved?
        return probe
      end

      # so pushed.resolved? has to be true, remove the push
      @PendingPush[i, 2] = []
    end

    i += 2
  end

  # no push to pair with
  # TODO (pitr-ch 11-Jan-2019): clear up probes when timed out, use callback
  @Probes.push probe, include_channel, matcher if probe.pending?
  return probe
end

#ns_shift_message(matcher, remove = true) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 437

def ns_shift_message(matcher, remove = true)
  i = 0
  while true
    message = @Messages.fetch(i, NOTHING)
    return NOTHING if message == NOTHING

    if matcher === message
      @Messages.delete_at i if remove
      return message
    end

    i += 1
  end
end

#ns_try_push(message) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 417

def ns_try_push(message)
  i = 0
  while true
    probe, include_channel, matcher = @Probes[i, 3]
    break unless probe
    if matcher === message && probe.fulfill(include_channel ? [self, message] : message, false)
      @Probes[i, 3] = []
      return true
    end
    i += 3
  end

  if @Capacity > @Messages.size
    @Messages.push message
    true
  else
    false
  end
end

#partial_select_op(matcher, probe) (private)

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 346

def partial_select_op(matcher, probe)
  @Mutex.synchronize { ns_pop_op(matcher, probe, true) }
end

#peek(no_value = nil) ⇒ Object, no_value

Behaves as #try_pop but it does not remove the message from the channel

Parameters:

  • no_value (Object) (defaults to: nil)

    returned when there is no message available

Returns:

  • (Object, no_value)

    message or nil when there is no message

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 209

def peek(no_value = nil)
  peek_matching ANY, no_value
end

#peek_matching(matcher, no_value = nil) ⇒ Object, no_value

Behaves as #try_pop but it does not remove the message from the channel

Parameters:

  • no_value (Object) (defaults to: nil)

    returned when there is no message available

  • matcher (#===)

    only consider message which matches matcher === a_message

Returns:

  • (Object, no_value)

    message or nil when there is no message

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 215

def peek_matching(matcher, no_value = nil)
  @Mutex.synchronize do
    message = ns_shift_message matcher, false
    return message if message != NOTHING
    message = ns_consume_pending_push matcher, false
    return message != NOTHING ? message : no_value
  end
end

#pop(timeout = nil, timeout_value = nil) ⇒ Object?

Note:

This function potentially blocks current thread until it can continue. Be careful it can deadlock.

Blocks current thread until a message is available in the channel for popping.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

  • timeout_value (Object) (defaults to: nil)

    a value returned by the method when it times out

Returns:

  • (Object, nil)

    message or nil when timed out

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 177

def pop(timeout = nil, timeout_value = nil)
  pop_matching ANY, timeout, timeout_value
end

#pop_matching(matcher, timeout = nil, timeout_value = nil) ⇒ Object?

Note:

This function potentially blocks current thread until it can continue. Be careful it can deadlock.

Blocks current thread until a message is available in the channel for popping.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

  • timeout_value (Object) (defaults to: nil)

    a value returned by the method when it times out

  • matcher (#===)

    only consider message which matches matcher === a_message

Returns:

  • (Object, nil)

    message or nil when timed out

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 183

def pop_matching(matcher, timeout = nil, timeout_value = nil)
  # TODO (pitr-ch 27-Jan-2019): should it try to match pending pushes if it fails to match in the buffer? Maybe only if the size is zero. It could be surprising if it's used as a throttle it might be expected that it will not pop if buffer is full of messages which di not match, it might it expected it will block until the message is added to the buffer
  # that it returns even if the buffer is full. User might expect that it has to be in the buffer first.
  probe = @Mutex.synchronize do
    message = ns_shift_message matcher
    if message == NOTHING
      message = ns_consume_pending_push matcher
      return message if message != NOTHING
    else
      new_message = ns_consume_pending_push ANY
      @Messages.push new_message unless new_message == NOTHING
      return message
    end

    probe = Promises.resolvable_future
    @Probes.push probe, false, matcher
    probe
  end

  probe.value!(timeout, timeout_value, [true, timeout_value, nil])
end

#pop_op(probe = Promises.resolvable_future) ⇒ Future(Object)

Returns a future witch will become fulfilled with a value from the channel when one is available. If it is later waited on the operation with a timeout e.g.‘channel.pop_op.wait(1)` it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later “`ruby pop_op = channel.pop_op if pop_op.wait(1)

process_message pop_op.value

else

pop_op.then { |message| log_unprocessed_message message }

end “‘ or the operation can be prevented from completion after timing out by using channel.pop_op.wait(1, [true, nil, nil]). It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.

Parameters:

  • probe (ResolvableFuture) (defaults to: Promises.resolvable_future)

    the future which will be fulfilled with a channel value

Returns:

  • (Future(Object))

    the probe, its value will be the message when available.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 160

def pop_op(probe = Promises.resolvable_future)
  @Mutex.synchronize { ns_pop_op(ANY, probe, false) }
end

#pop_op_matching(matcher, probe = Promises.resolvable_future) ⇒ Future(Object)

Returns a future witch will become fulfilled with a value from the channel when one is available. If it is later waited on the operation with a timeout e.g.‘channel.pop_op.wait(1)` it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later “`ruby pop_op = channel.pop_op if pop_op.wait(1)

process_message pop_op.value

else

pop_op.then { |message| log_unprocessed_message message }

end “‘ or the operation can be prevented from completion after timing out by using channel.pop_op.wait(1, [true, nil, nil]). It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.

Parameters:

  • probe (ResolvableFuture) (defaults to: Promises.resolvable_future)

    the future which will be fulfilled with a channel value

  • matcher (#===)

    only consider message which matches matcher === a_message

Returns:

  • (Future(Object))

    the probe, its value will be the message when available.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 166

def pop_op_matching(matcher, probe = Promises.resolvable_future)
  @Mutex.synchronize { ns_pop_op(matcher, probe, false) }
end

#push(message, timeout = nil) ⇒ self, ...

Note:

This function potentially blocks current thread until it can continue. Be careful it can deadlock.

Blocks current thread until the message is pushed into the channel.

Parameters:

  • message (Object)
  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (self, true, false)

    self implies timeout was not used, true implies timeout was used and it was pushed, false implies it was not pushed within timeout.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 120

def push(message, timeout = nil)
  pushed_op = @Mutex.synchronize do
    return timeout ? true : self if ns_try_push(message)

    pushed = Promises.resolvable_future
    # TODO (pitr-ch 06-Jan-2019): clear timed out pushes in @PendingPush, null messages
    @PendingPush.push message, pushed
    pushed
  end

  result = pushed_op.wait!(timeout, [true, self, nil])
  result == pushed_op ? self : result
end

#push_op(message) ⇒ ResolvableFuture(self)

Returns future which will fulfill when the message is pushed to the channel. If it is later waited on the operation with a timeout e.g.‘channel.pop_op.wait(1)` it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later “`ruby pop_op = channel.pop_op if pop_op.wait(1)

process_message pop_op.value

else

pop_op.then { |message| log_unprocessed_message message }

end “‘ or the operation can be prevented from completion after timing out by using channel.pop_op.wait(1, [true, nil, nil]). It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.

Parameters:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 101

def push_op(message)
  @Mutex.synchronize do
    if ns_try_push(message)
      Promises.fulfilled_future self
    else
      pushed = Promises.resolvable_future
      @PendingPush.push message, pushed
      return pushed
    end
  end
end

#select(channels, timeout = nil) ⇒ ::Array(Channel, Object)?

Note:

This function potentially blocks current thread until it can continue. Be careful it can deadlock.

As #select_op but does not return future, it block current thread instead until there is a message available in the receiver or in any of the channels.

Parameters:

  • channels (Channel, ::Array<Channel>)
  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (::Array(Channel, Object), nil)

    message or nil when timed out

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 278

def select(channels, timeout = nil)
  select_matching ANY, channels, timeout
end

#select_matching(matcher, channels, timeout = nil) ⇒ ::Array(Channel, Object)?

Note:

This function potentially blocks current thread until it can continue. Be careful it can deadlock.

As #select_op but does not return future, it block current thread instead until there is a message available in the receiver or in any of the channels.

Parameters:

  • channels (Channel, ::Array<Channel>)
  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

  • matcher (#===)

    only consider message which matches matcher === a_message

Returns:

  • (::Array(Channel, Object), nil)

    message or nil when timed out

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 284

def select_matching(matcher, channels, timeout = nil)
  probe = select_op_matching(matcher, channels)
  probe.value!(timeout, nil, [true, nil, nil])
end

#select_op(channels, probe = Promises.resolvable_future) ⇒ ResolvableFuture(::Array(Channel, Object))

When message is available in the receiver or any of the provided channels the future is fulfilled with a channel message pair. The returned channel is the origin of the message. If it is later waited on the operation with a timeout e.g.‘channel.pop_op.wait(1)` it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later “`ruby pop_op = channel.pop_op if pop_op.wait(1)

process_message pop_op.value

else

pop_op.then { |message| log_unprocessed_message message }

end “‘ or the operation can be prevented from completion after timing out by using channel.pop_op.wait(1, [true, nil, nil]). It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.

Parameters:

  • channels (Channel, ::Array<Channel>)
  • probe (ResolvableFuture) (defaults to: Promises.resolvable_future)

    the future which will be fulfilled with the message

Returns:

  • (ResolvableFuture(::Array(Channel, Object)))

    a future which is fulfilled with pair [channel, message] when one of the channels is available for reading

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 257

def select_op(channels, probe = Promises.resolvable_future)
  select_op_matching ANY, channels, probe
end

#select_op_matching(matcher, channels, probe = Promises.resolvable_future) ⇒ ResolvableFuture(::Array(Channel, Object))

When message is available in the receiver or any of the provided channels the future is fulfilled with a channel message pair. The returned channel is the origin of the message. If it is later waited on the operation with a timeout e.g.‘channel.pop_op.wait(1)` it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later “`ruby pop_op = channel.pop_op if pop_op.wait(1)

process_message pop_op.value

else

pop_op.then { |message| log_unprocessed_message message }

end “‘ or the operation can be prevented from completion after timing out by using channel.pop_op.wait(1, [true, nil, nil]). It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.

Parameters:

  • channels (Channel, ::Array<Channel>)
  • probe (ResolvableFuture) (defaults to: Promises.resolvable_future)

    the future which will be fulfilled with the message

  • matcher (#===)

    only consider message which matches matcher === a_message

Returns:

  • (ResolvableFuture(::Array(Channel, Object)))

    a future which is fulfilled with pair [channel, message] when one of the channels is available for reading

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 263

def select_op_matching(matcher, channels, probe = Promises.resolvable_future)
  [self, *channels].each { |ch| ch.partial_select_op matcher, probe }
  probe
end

#sizeInteger

Returns:

  • (Integer)

    The number of messages currently stored in the channel.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 290

def size
  @Mutex.synchronize { @Messages.size }
end

#to_sString Also known as: #inspect

Returns:

  • (String)

    Short string representation.

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 300

def to_s
  format '%s capacity taken %s of %s>', super[0..-2], size, @Capacity
end

#try_pop(no_value = nil) ⇒ Object, no_value

Pop a message from the channel if there is one available.

Parameters:

  • no_value (Object) (defaults to: nil)

    returned when there is no message available

Returns:

  • (Object, no_value)

    message or nil when there is no message

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 138

def try_pop(no_value = nil)
  try_pop_matching ANY, no_value
end

#try_pop_matching(matcher, no_value = nil) ⇒ Object, no_value

Pop a message from the channel if there is one available.

Parameters:

  • no_value (Object) (defaults to: nil)

    returned when there is no message available

  • matcher (#===)

    only consider message which matches matcher === a_message

Returns:

  • (Object, no_value)

    message or nil when there is no message

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 145

def try_pop_matching(matcher, no_value = nil)
  @Mutex.synchronize do
    message = ns_shift_message matcher
    return message if message != NOTHING
    message = ns_consume_pending_push matcher
    return message != NOTHING ? message : no_value
  end
end

#try_push(message) ⇒ true, false

Push the message into the channel if there is space available.

Parameters:

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 77

def try_push(message)
  @Mutex.synchronize { ns_try_push(message) }
end

#try_select(channels) ⇒ ::Array(Channel, Object)?

If message is available in the receiver or any of the provided channels the channel message pair is returned. If there is no message nil is returned. The returned channel is the origin of the message.

Parameters:

  • channels (Channel, ::Array<Channel>)

Returns:

  • (::Array(Channel, Object), nil)

    pair [channel, message] if one of the channels is available for reading

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 232

def try_select(channels)
  try_select_matching ANY, channels
end

#try_select_matching(matcher, channels) ⇒ ::Array(Channel, Object)?

If message is available in the receiver or any of the provided channels the channel message pair is returned. If there is no message nil is returned. The returned channel is the origin of the message.

Parameters:

  • channels (Channel, ::Array<Channel>)
  • matcher (#===)

    only consider message which matches matcher === a_message

Returns:

  • (::Array(Channel, Object), nil)

    pair [channel, message] if one of the channels is available for reading

[ GitHub ]

  
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 238

def try_select_matching(matcher, channels)
  message = nil
  channel = [self, *channels].find do |ch|
    message = ch.try_pop_matching(matcher, NOTHING)
    message != NOTHING
  end
  channel ? [channel, message] : nil
end