Class: Mongo::Cluster::CursorReaper Private
Relationships & Source Files | |
Super Chains via Extension / Inclusion / Inheritance | |
Instance Chain:
self,
::Mongo::Retryable
|
|
Inherits: | Object |
Defined in: | lib/mongo/cluster/reapers/cursor_reaper.rb |
Overview
A manager that sends kill cursors operations at regular intervals to close cursors that have been garbage collected without being exhausted.
Constant Summary
-
FREQUENCY =
The default time interval for the cursor reaper to send pending kill cursors operations.
1.freeze
Class Method Summary
-
.new(cluster) ⇒ CursorReaper
constructor
Internal use only
Create a cursor reaper.
Instance Attribute Summary
- #cluster readonly Internal use only
Instance Method Summary
-
#execute
Alias for #kill_cursors.
-
#flush
Alias for #kill_cursors.
-
#kill_cursors
(also: #execute, #flush)
Internal use only
Execute all pending kill cursors operations.
-
#read_scheduled_kill_specs
Internal use only
Read and decode scheduled kill cursors operations.
-
#register_cursor(id)
Internal use only
Register a cursor id as active.
-
#schedule_kill_cursor(kill_spec)
Internal use only
Schedule a kill cursors operation to be eventually executed.
-
#unregister_cursor(id)
Internal use only
Unregister a cursor id, indicating that it’s no longer active.
::Mongo::Retryable
- Included
#read_worker | Returns the read worker for handling retryable reads. |
#select_server | This is a separate method to make it possible for the test suite to assert that server selection is performed during retry attempts. |
#write_worker | Returns the write worker for handling retryable writes. |
Instance Attribute Details
#cluster (readonly)
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 50
attr_reader :cluster
Instance Method Details
#execute
Alias for #kill_cursors.
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 211
alias :execute :kill_cursors
#flush
Alias for #kill_cursors.
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 212
alias :flush :kill_cursors
#kill_cursors Also known as: #execute, #flush
Execute all pending kill cursors operations.
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 133
def kill_cursors # TODO optimize this to batch kill cursor operations for the same # server/database/collection instead of killing each cursor # individually. loop do server_address = nil kill_spec = @mutex.synchronize do read_scheduled_kill_specs # Find a server that has any cursors scheduled for destruction. server_address, specs = @to_kill.detect { |_, specs| specs.any? } if specs.nil? # All servers have empty specs, nothing to do. return end # Note that this mutates the spec in the queue. # If the kill cursor operation fails, we don't attempt to # kill that cursor again. spec = specs.take(1).tap do |arr| specs.subtract(arr) end.first unless @active_cursor_ids.include?(spec.cursor_id) # The cursor was already killed, typically because it has # been iterated to completion. Remove the kill spec from # our records without doing any more work. spec = nil end spec end # If there was a spec to kill but its cursor was already killed, # look for another spec. next unless kill_spec # We could also pass kill_spec directly into the KillCursors # operation, though this would make that operation have a # different API from all of the other ones which accept hashes. spec = { cursor_ids: [kill_spec.cursor_id], coll_name: kill_spec.coll_name, db_name: kill_spec.db_name, } op = Operation::KillCursors.new(spec) server = cluster.servers.detect do |server| server.address == server_address end unless server # TODO We currently don't have a server for the address that the # cursor is associated with. We should leave the cursor in the # queue to be killed at a later time (when the server comes back). next end = { server_api: server. [:server_api], connection_global_id: kill_spec.connection_global_id, } if connection = kill_spec.connection op.execute_with_connection(connection, context: Operation::Context.new(options: )) connection.connection_pool.check_in(connection) else op.execute(server, context: Operation::Context.new(options: )) end if session = kill_spec.session if session.implicit? session.end_session end end end end
#read_scheduled_kill_specs
Read and decode scheduled kill cursors operations.
This method mutates instance variables without locking, so is is not thread safe. Generally, it should not be called itself, this is a helper for kill_cursor
method.
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 114
def read_scheduled_kill_specs while kill_spec = @kill_spec_queue.pop(true) if @active_cursor_ids.include?(kill_spec.cursor_id) @to_kill[kill_spec.server_address] ||= Set.new @to_kill[kill_spec.server_address] << kill_spec end end rescue ThreadError # Empty queue, nothing to do. end
#register_cursor(id)
Register a cursor id as active.
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 71
def register_cursor(id) if id.nil? raise ArgumentError, 'register_cursor called with nil cursor_id' end if id == 0 raise ArgumentError, 'register_cursor called with cursor_id=0' end @mutex.synchronize do @active_cursor_ids << id end end
#schedule_kill_cursor(kill_spec)
Schedule a kill cursors operation to be eventually executed.
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 57
def schedule_kill_cursor(kill_spec) @kill_spec_queue << kill_spec end
#unregister_cursor(id)
Unregister a cursor id, indicating that it’s no longer active.
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 94
def unregister_cursor(id) if id.nil? raise ArgumentError, 'unregister_cursor called with nil cursor_id' end if id == 0 raise ArgumentError, 'unregister_cursor called with cursor_id=0' end @mutex.synchronize do @active_cursor_ids.delete(id) end end