123456789_123456789_123456789_123456789_123456789_

Class: Concurrent::Collection::RubyNonConcurrentPriorityQueue

Overview

Note:

This implementation is not thread safe.

Note:

**Private Implementation:** This abstraction is a private, internal implementation detail. It should never be used directly.

A queue collection in which the elements are sorted based on their comparison (spaceship) operator <=>. Items are added to the queue at a position relative to their priority. On removal the element with the “highest” priority is removed. By default the sort order is from highest to lowest, but a lowest-to-highest sort order can be set on construction.

The API is based on the Queue class from the Ruby standard library.

The pure Ruby implementation, RubyNonConcurrentPriorityQueue uses a heap algorithm stored in an array. The algorithm is based on the work of Robert Sedgewick and Kevin Wayne.

The JRuby native implementation is a thin wrapper around the standard library java.util.NonConcurrentPriorityQueue.

When running under JRuby the class NonConcurrentPriorityQueue extends JavaNonConcurrentPriorityQueue. When running under all other interpreters it extends RubyNonConcurrentPriorityQueue.

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(opts = {}) ⇒ RubyNonConcurrentPriorityQueue

Create a new priority queue with no items.

Parameters:

  • opts (Hash) (defaults to: {})

    the options for creating the queue

Options Hash (opts):

  • :order (Symbol) — default: :max

    dictates the order in which items are stored: from highest to lowest when :max or ‘:high`; from lowest to highest when :min or :low

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb', line 11

def initialize(opts = {})
  order = opts.fetch(:order, :max)
  @comparator = [:min, :low].include?(order) ? -1 : 1
  clear
end

Class Method Details

.from_list(list, opts = {}) ⇒ NonConcurrentPriorityQueue

Create a new priority queue from the given list.

Parameters:

  • list (Enumerable)

    the list to build the queue from

  • opts (Hash) (defaults to: {})

    the options for creating the queue

Returns:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb', line 89

def self.from_list(list, opts = {})
  queue = new(opts)
  list.each{|item| queue << item }
  queue
end

Instance Attribute Details

#empty?Boolean (readonly)

Returns true if self contains no elements.

Returns:

  • (Boolean)

    true if there are no items in the queue else false

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb', line 43

def empty?
  size == 0
end

Instance Method Details

#<<(item)

Alias for #push.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb', line 85

alias_method :<<, :push

#clear

Removes all of the elements from this priority queue.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb', line 18

def clear
  @queue = [nil]
  @length = 0
  true
end

#delete(item) ⇒ Object

Deletes all items from self that are equal to item.

Parameters:

  • item (Object)

    the item to be removed from the queue

Returns:

  • (Object)

    true if the item is found else false

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb', line 25

def delete(item)
  return false if empty?
  original_length = @length
  k = 1
  while k <= @length
    if @queue[k] == item
      swap(k, @length)
      @length -= 1
      sink(k) || swim(k)
      @queue.pop
    else
      k += 1
    end
  end
  @length != original_length
end

#deq

Alias for #pop.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb', line 74

alias_method :deq, :pop

#enq(item)

Alias for #push.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb', line 86

alias_method :enq, :push

#has_priority?(item)

Alias for #include?.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb', line 51

alias_method :has_priority?, :include?

#include?(item) ⇒ Boolean Also known as: #has_priority?

Returns true if the given item is present in self (that is, if any element == item), otherwise returns false.

Parameters:

  • item (Object)

    the item to search for

Returns:

  • (Boolean)

    true if the item is found else false

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb', line 48

def include?(item)
  @queue.include?(item)
end

#lengthFixnum Also known as: #size

The current length of the queue.

Returns:

  • (Fixnum)

    the number of items in the queue

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb', line 54

def length
  @length
end

#ordered?(x, y) ⇒ Boolean (private)

Are the items at the given indexes ordered based on the priority order specified at construction?

Parameters:

  • x (Integer)

    the first index from which to retrieve a comparable value

  • y (Integer)

    the second index from which to retrieve a comparable value

Returns:

  • (Boolean)

    true if the two elements are in the correct priority order else false

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb', line 119

def ordered?(x, y)
  (@queue[x] <=> @queue[y]) == @comparator
end

#peekObject

Retrieves, but does not remove, the head of this queue, or returns nil if this queue is empty.

Returns:

  • (Object)

    the head of the queue or nil when empty

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb', line 60

def peek
  empty? ? nil : @queue[1]
end

#popObject Also known as: #deq, #shift

Retrieves and removes the head of this queue, or returns nil if this queue is empty.

Returns:

  • (Object)

    the head of the queue or nil when empty

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb', line 65

def pop
  return nil if empty?
  max = @queue[1]
  swap(1, @length)
  @length -= 1
  sink(1)
  @queue.pop
  max
end

#push(item) Also known as: #<<, #enq

Inserts the specified element into this priority queue.

Parameters:

  • item (Object)

    the item to insert onto the queue

Raises:

  • (ArgumentError)
[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb', line 78

def push(item)
  raise ArgumentError.new('cannot enqueue nil') if item.nil?
  @length += 1
  @queue << item
  swim(@length)
  true
end

#shift

Alias for #pop.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb', line 75

alias_method :shift, :pop

#sink(k) (private)

Percolate down to maintain heap invariant.

Parameters:

  • k (Integer)

    the index at which to start the percolation

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb', line 128

def sink(k)
  success = false

  while (j = (2 * k)) <= @length do
    j += 1 if j < @length && ! ordered?(j, j+1)
    break if ordered?(k, j)
    swap(k, j)
    success = true
    k = j
  end

  success
end

#size

Alias for #length.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb', line 57

alias_method :size, :length

#swap(x, y) (private)

Exchange the values at the given indexes within the internal array.

Parameters:

  • x (Integer)

    the first index to swap

  • y (Integer)

    the second index to swap

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb', line 103

def swap(x, y)
  temp = @queue[x]
  @queue[x] = @queue[y]
  @queue[y] = temp
end

#swim(k) (private)

Percolate up to maintain heap invariant.

Parameters:

  • k (Integer)

    the index at which to start the percolation

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb', line 147

def swim(k)
  success = false

  while k > 1 && ! ordered?(k/2, k) do
    swap(k, k/2)
    k = k/2
    success = true
  end

  success
end