123456789_123456789_123456789_123456789_123456789_

Class: Mongo::Cluster::CursorReaper Private

Do not use. This class is for internal use only.
Relationships & Source Files
Super Chains via Extension / Inclusion / Inheritance
Instance Chain:
Inherits: Object
Defined in: lib/mongo/cluster/reapers/cursor_reaper.rb

Overview

A manager that sends kill cursors operations at regular intervals to close cursors that have been garbage collected without being exhausted.

Since:

  • 2.3.0

Constant Summary

Class Method Summary

Instance Attribute Summary

Instance Method Summary

::Mongo::Retryable - Included

#read_worker

Returns the read worker for handling retryable reads.

#select_server

This is a separate method to make it possible for the test suite to assert that server selection is performed during retry attempts.

#write_worker

Returns the write worker for handling retryable writes.

Instance Attribute Details

#cluster (readonly)

Since:

  • 2.3.0

[ GitHub ]

  
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 50

attr_reader :cluster

Instance Method Details

#execute

Alias for #kill_cursors.

[ GitHub ]

  
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 211

alias :execute :kill_cursors

#flush

Alias for #kill_cursors.

[ GitHub ]

  
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 212

alias :flush :kill_cursors

#kill_cursors Also known as: #execute, #flush

Execute all pending kill cursors operations.

Examples:

Execute pending kill cursors operations.

cursor_reaper.kill_cursors

Since:

  • 2.3.0

[ GitHub ]

  
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 133

def kill_cursors
  # TODO optimize this to batch kill cursor operations for the same
  # server/database/collection instead of killing each cursor
  # individually.
  loop do
    server_address = nil

    kill_spec = @mutex.synchronize do
      read_scheduled_kill_specs
      # Find a server that has any cursors scheduled for destruction.
      server_address, specs =
        @to_kill.detect { |_, specs| specs.any? }

      if specs.nil?
        # All servers have empty specs, nothing to do.
        return
      end

      # Note that this mutates the spec in the queue.
      # If the kill cursor operation fails, we don't attempt to
      # kill that cursor again.
      spec = specs.take(1).tap do |arr|
        specs.subtract(arr)
      end.first

      unless @active_cursor_ids.include?(spec.cursor_id)
        # The cursor was already killed, typically because it has
        # been iterated to completion. Remove the kill spec from
        # our records without doing any more work.
        spec = nil
      end

      spec
    end

    # If there was a spec to kill but its cursor was already killed,
    # look for another spec.
    next unless kill_spec

    # We could also pass kill_spec directly into the KillCursors
    # operation, though this would make that operation have a
    # different API from all of the other ones which accept hashes.
    spec = {
      cursor_ids: [kill_spec.cursor_id],
      coll_name: kill_spec.coll_name,
      db_name: kill_spec.db_name,
    }
    op = Operation::KillCursors.new(spec)

    server = cluster.servers.detect do |server|
      server.address == server_address
    end

    unless server
      # TODO We currently don't have a server for the address that the
      # cursor is associated with. We should leave the cursor in the
      # queue to be killed at a later time (when the server comes back).
      next
    end

    options = {
      server_api: server.options[:server_api],
      connection_global_id: kill_spec.connection_global_id,
    }
    if connection = kill_spec.connection
      op.execute_with_connection(connection, context: Operation::Context.new(options: options))
      connection.connection_pool.check_in(connection)
    else
      op.execute(server, context: Operation::Context.new(options: options))
    end

    if session = kill_spec.session
      if session.implicit?
        session.end_session
      end
    end
  end
end

#read_scheduled_kill_specs

Read and decode scheduled kill cursors operations.

This method mutates instance variables without locking, so is is not thread safe. Generally, it should not be called itself, this is a helper for kill_cursor method.

Since:

  • 2.3.0

[ GitHub ]

  
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 114

def read_scheduled_kill_specs
  while kill_spec = @kill_spec_queue.pop(true)
    if @active_cursor_ids.include?(kill_spec.cursor_id)
      @to_kill[kill_spec.server_address] ||= Set.new
      @to_kill[kill_spec.server_address] << kill_spec
    end
  end
rescue ThreadError
  # Empty queue, nothing to do.
end

#register_cursor(id)

Register a cursor id as active.

Examples:

Register a cursor as active.

cursor_reaper.register_cursor(id)

Parameters:

  • id (Integer)

    The id of the cursor to register as active.

Since:

  • 2.3.0

[ GitHub ]

  
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 71

def register_cursor(id)
  if id.nil?
    raise ArgumentError, 'register_cursor called with nil cursor_id'
  end
  if id == 0
    raise ArgumentError, 'register_cursor called with cursor_id=0'
  end

  @mutex.synchronize do
    @active_cursor_ids << id
  end
end

#schedule_kill_cursor(kill_spec)

Schedule a kill cursors operation to be eventually executed.

Parameters:

Since:

  • 2.3.0

[ GitHub ]

  
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 57

def schedule_kill_cursor(kill_spec)
  @kill_spec_queue << kill_spec
end

#unregister_cursor(id)

Unregister a cursor id, indicating that it’s no longer active.

Examples:

Unregister a cursor.

cursor_reaper.unregister_cursor(id)

Parameters:

  • id (Integer)

    The id of the cursor to unregister.

Since:

  • 2.3.0

[ GitHub ]

  
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 94

def unregister_cursor(id)
  if id.nil?
    raise ArgumentError, 'unregister_cursor called with nil cursor_id'
  end
  if id == 0
    raise ArgumentError, 'unregister_cursor called with cursor_id=0'
  end

  @mutex.synchronize do
    @active_cursor_ids.delete(id)
  end
end