Class: Mongo::Cursor Private
Relationships & Source Files | |
Namespace Children | |
Modules:
| |
Classes:
| |
Extension / Inclusion / Inheritance Descendants | |
Subclasses:
|
|
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
self,
Forwardable
|
|
Instance Chain:
self,
Retryable ,
Enumerable
|
|
Inherits: | Object |
Defined in: | lib/mongo/cursor.rb, lib/mongo/cursor/kill_spec.rb, lib/mongo/cursor/nontailable.rb |
Overview
Client-side representation of an iterator over a query result set on the server.
Cursor
objects are not directly exposed to application code. Rather, Collection::View
exposes the Enumerable
interface to the applications, and the enumerator is backed by a Cursor
instance.
Class Method Summary
-
.finalize(kill_spec, cluster) ⇒ Proc
Internal use only
Finalize the cursor for garbage collection.
-
.new(view, result, server, options = {}) ⇒ Cursor
constructor
Internal use only
Creates a
Cursor
object.
Instance Attribute Summary
-
#closed? ⇒ true, false
readonly
Internal use only
Is the cursor closed?
- #connection readonly Internal use only
- #context ⇒ Operation::Context readonly Internal use only
- #fully_iterated? ⇒ Boolean readonly Internal use only
- #initial_result readonly Internal use only
-
#resume_token ⇒ BSON::Document | nil
readonly
Internal use only
The resume token tracked by the cursor for change stream resuming.
- #server readonly Internal use only
- #view ⇒ Collection::View readonly Internal use only
- #exhausted? ⇒ Boolean readonly private Internal use only
- #explicitly_closed? ⇒ Boolean readonly private Internal use only
- #limited? ⇒ Boolean readonly private Internal use only
- #use_limit? ⇒ Boolean readonly private Internal use only
Instance Method Summary
-
#batch_size ⇒ Integer
Internal use only
Get the batch size.
-
#close(opts = {}) ⇒ nil
Internal use only
Closes this cursor, freeing any associated resources on the client and the server.
-
#collection_name ⇒ String
Internal use only
Get the parsed collection name.
-
#each ⇒ Enumerator
Internal use only
Iterate through documents returned from the query.
-
#get_more ⇒ Array<BSON::Document>
Internal use only
Execute a getMore command and return the batch of documents obtained from the server.
-
#id ⇒ Integer
Internal use only
Get the cursor id.
-
#inspect ⇒ String
Internal use only
Get a human-readable string representation of
Cursor
. - #kill_spec(connection_global_id) Internal use only
-
#to_return ⇒ Integer
Internal use only
Get the number of documents to return.
-
#try_next ⇒ BSON::Document | nil
Internal use only
Return one document from the query, if one is available.
- #batch_size_for_get_more private Internal use only
- #cache_batch_resume_token private Internal use only
- #cache_resume_token(doc) private Internal use only
-
#check_in_connection
private
Internal use only
Returns the connection that was used to create the cursor back to the corresponding connection pool.
-
#connection_global_id_for_context
private
Internal use only
Because a context must not have a connection_global_id if the session is already pinned to one, this method checks to see whether or not there’s pinned connection_global_id on the session and returns nil if so.
- #end_session private Internal use only
- #execute_operation(op, context: nil) private Internal use only
-
#fresh_context(opts = {})
private
Internal use only
Returns a newly instantiated operation context based on the default values from the view.
- #get_more_operation private Internal use only
- #limit private Internal use only
-
#possibly_refreshed_context ⇒ Operation::Context
private
Internal use only
Considers the timeout mode and will either return the cursor’s context directly, or will return a new (refreshed) context.
- #process(result) private Internal use only
- #register private Internal use only
-
#set_cursor_id(result)
private
Internal use only
Sets @cursor_id from the operation result.
- #unregister private Internal use only
Retryable
- Included
#read_worker | Returns the read worker for handling retryable reads. |
#select_server | This is a separate method to make it possible for the test suite to assert that server selection is performed during retry attempts. |
#write_worker | Returns the write worker for handling retryable writes. |
Class Method Details
.finalize(kill_spec, cluster) ⇒ Proc
Finalize the cursor for garbage collection. Schedules this cursor to be included in a killCursors operation executed by the Cluster’s CursorReaper.
Instance Attribute Details
#closed? ⇒ true
, false
(readonly)
Is the cursor closed?
# File 'lib/mongo/cursor.rb', line 296
def closed? # @cursor_id should in principle never be nil @cursor_id.nil? || @cursor_id == 0 end
#connection (readonly)
[ GitHub ]# File 'lib/mongo/cursor.rb', line 117
attr_reader :connection
#context ⇒ Operation::Context (readonly)
# File 'lib/mongo/cursor.rb', line 53
attr_reader :context
#exhausted? ⇒ Boolean
(readonly, private)
# File 'lib/mongo/cursor.rb', line 436
def exhausted? limited? ? @remaining <= 0 : false end
#explicitly_closed? ⇒ Boolean
(readonly, private)
# File 'lib/mongo/cursor.rb', line 422
def explicitly_closed? @lock.synchronize do @explicitly_closed end end
#fully_iterated? ⇒ Boolean
(readonly)
# File 'lib/mongo/cursor.rb', line 416
def fully_iterated? !!@fully_iterated end
#initial_result (readonly)
[ GitHub ]# File 'lib/mongo/cursor.rb', line 114
attr_reader :initial_result
#limited? ⇒ Boolean
(readonly, private)
#resume_token ⇒ BSON::Document
| nil
(readonly)
The resume token tracked by the cursor for change stream resuming
# File 'lib/mongo/cursor.rb', line 50
attr_reader :resume_token
#server (readonly)
[ GitHub ]# File 'lib/mongo/cursor.rb', line 111
attr_reader :server
#use_limit? ⇒ Boolean
(readonly, private)
# File 'lib/mongo/cursor.rb', line 499
def use_limit? limited? && batch_size >= @remaining end
#view ⇒ Collection::View (readonly)
# File 'lib/mongo/cursor.rb', line 44
attr_reader :view
Instance Method Details
#batch_size ⇒ Integer
Get the batch size.
# File 'lib/mongo/cursor.rb', line 279
def batch_size value = @view.batch_size && @view.batch_size > 0 ? @view.batch_size : limit if value == 0 nil else value end end
#batch_size_for_get_more (private)
[ GitHub ]# File 'lib/mongo/cursor.rb', line 428
def batch_size_for_get_more if batch_size && use_limit? [batch_size, @remaining].min else batch_size end end
#cache_batch_resume_token (private)
[ GitHub ]# File 'lib/mongo/cursor.rb', line 446
def cache_batch_resume_token @resume_token = @post_batch_resume_token if @post_batch_resume_token end
#cache_resume_token(doc) (private)
[ GitHub ]# File 'lib/mongo/cursor.rb', line 440
def cache_resume_token(doc) if doc[:_id] && doc[:_id].is_a?(Hash) @resume_token = doc[:_id] && doc[:_id].dup.freeze end end
#check_in_connection (private)
Returns the connection that was used to create the cursor back to the corresponding connection pool.
In a load balanced topology cursors must use the same connection for the initial and all subsequent operations. Therefore, the connection is not checked into the pool after the initial operation is completed, but only when the cursor is drained.
# File 'lib/mongo/cursor.rb', line 578
def check_in_connection # Connection nil means the connection has been already checked in. return if @connection.nil? return unless @connection.server.load_balancer? @connection.connection_pool.check_in(@connection) @connection = nil end
#close(opts = {}) ⇒ nil
Closes this cursor, freeing any associated resources on the client and the server.
# File 'lib/mongo/cursor.rb', line 305
def close(opts = {}) return if closed? ctx = context ? context.refresh(timeout_ms: opts[:timeout_ms]) : fresh_context(opts) unregister read_with_one_retry do spec = { coll_name: collection_name, db_name: database.name, cursor_ids: [id], } op = Operation::KillCursors.new(spec) execute_operation(op, context: ctx) end nil rescue Error::OperationFailure::Family, Error::SocketError, Error::SocketTimeoutError, Error::ServerNotUsable # Errors are swallowed since there is noting can be done by handling them. ensure end_session @cursor_id = 0 @lock.synchronize do @explicitly_closed = true end check_in_connection end
#collection_name ⇒ String
Get the parsed collection name.
# File 'lib/mongo/cursor.rb', line 341
def collection_name # In most cases, this will be equivalent to the name of the collection # object in the driver. However, in some cases (e.g. when connected # to an Atlas Data Lake), the namespace returned by the find command # may be different, which is why we want to use the collection name based # on the namespace in the command result. if @namespace # Often, the namespace will be in the format "database.collection". # However, sometimes the collection name will contain periods, which # is why this method joins all the namespace components after the first. ns_components = @namespace.split('.') ns_components[1...ns_components.length].join('.') else collection.name end end
#connection_global_id_for_context (private)
Because a context must not have a connection_global_id if the session is already pinned to one, this method checks to see whether or not there’s pinned connection_global_id on the session and returns nil if so.
# File 'lib/mongo/cursor.rb', line 563
def connection_global_id_for_context if @session&.pinned_connection_global_id nil else @connection_global_id end end
#each ⇒ Enumerator
Iterate through documents returned from the query.
A cursor may be iterated at most once. Incomplete iteration is also allowed. Attempting to iterate the cursor more than once raises InvalidCursorOperation.
# File 'lib/mongo/cursor.rb', line 163
def each # If we already iterated past the first batch (i.e., called get_more # at least once), the cursor on the server side has advanced past # the first batch and restarting iteration from the beginning by # returning initial result would miss documents in the second batch # and subsequent batches up to wherever the cursor is. Detect this # condition and abort the iteration. # # In a future driver version, each would either continue from the # end of previous iteration or would always restart from the # beginning. if @get_more_called raise Error::InvalidCursorOperation, 'Cannot restart iteration of a cursor which issued a getMore' end # To maintain compatibility with pre-2.10 driver versions, reset # the documents array each time a new iteration is started. @documents = nil if block_given? # StopIteration raised by try_next ends this loop. loop do document = try_next if explicitly_closed? raise Error::InvalidCursorOperation, 'Cursor was explicitly closed' end yield document if document end self else documents = [] # StopIteration raised by try_next ends this loop. loop do document = try_next if explicitly_closed? raise Error::InvalidCursorOperation, 'Cursor was explicitly closed' end documents << document if document end documents end end
#end_session (private)
[ GitHub ]# File 'lib/mongo/cursor.rb', line 467
def end_session @session.end_session if @session && @session.implicit? end
#execute_operation(op, context: nil) (private)
[ GitHub ]# File 'lib/mongo/cursor.rb', line 515
def execute_operation(op, context: nil) op_context = context || possibly_refreshed_context if @connection.nil? op.execute(@server, context: op_context) else op.execute_with_connection(@connection, context: op_context) end end
#fresh_context(opts = {}) (private)
Returns a newly instantiated operation context based on the default values from the view.
#get_more ⇒ Array
<BSON::Document
>
Execute a getMore command and return the batch of documents obtained from the server.
# File 'lib/mongo/cursor.rb', line 391
def get_more @get_more_called = true # Modern retryable reads specification prohibits retrying getMores. # Legacy retryable read logic used to retry getMores, but since # doing so may result in silent data loss, the driver no longer retries # getMore operations in any circumstance. # https://github.com/mongodb/specifications/blob/master/source/retryable-reads/retryable-reads.md#qa process(execute_operation(get_more_operation)) end
#get_more_operation (private)
[ GitHub ]# File 'lib/mongo/cursor.rb', line 450
def get_more_operation spec = { session: @session, db_name: database.name, coll_name: collection_name, cursor_id: id, # 3.2+ servers use batch_size, 3.0- servers use to_return. # TODO should to_return be calculated in the operation layer? batch_size: batch_size_for_get_more, to_return: to_return } if view.respond_to?(: ) && view. .is_a?(Hash) spec[:comment] = view. [:comment] unless view. [:comment].nil? end Operation::GetMore.new(spec) end
#id ⇒ Integer
A cursor id of 0 means the cursor was closed on the server.
Get the cursor id.
# File 'lib/mongo/cursor.rb', line 368
def id @cursor_id end
#inspect ⇒ String
Get a human-readable string representation of Cursor
.
# File 'lib/mongo/cursor.rb', line 145
def inspect "#<Mongo::Cursor:0x#{object_id} @view=#{@view.inspect}>" end
#kill_spec(connection_global_id)
[ GitHub ]# File 'lib/mongo/cursor.rb', line 403
def kill_spec(connection_global_id) KillSpec.new( cursor_id: id, coll_name: collection_name, db_name: database.name, connection_global_id: connection_global_id, server_address: server.address, session: @session, connection: @connection ) end
#limit (private)
[ GitHub ]# File 'lib/mongo/cursor.rb', line 503
def limit @view.send(:limit) end
#possibly_refreshed_context ⇒ Operation::Context (private)
Considers the timeout mode and will either return the cursor’s context directly, or will return a new (refreshed) context.
#process(result) (private)
[ GitHub ]# File 'lib/mongo/cursor.rb', line 475
def process(result) @remaining -= result.returned_count if limited? # #process is called for the first batch of results. In this case # the @cursor_id may be zero (all results fit in the first batch). # Thus we need to check both @cursor_id and the cursor_id of the result # prior to calling unregister here. if !closed? && result.cursor_id == 0 unregister check_in_connection end @cursor_id = set_cursor_id(result) if result.respond_to?(:post_batch_resume_token) @post_batch_resume_token = result.post_batch_resume_token end end_session if closed? # Since our iteration code mutates the documents array by calling #shift # on it, duplicate the documents here to permit restarting iteration # from the beginning of the cursor as long as get_more was not called result.documents.dup end
#register (private)
[ GitHub ]# File 'lib/mongo/cursor.rb', line 507
def register cluster.register_cursor(@cursor_id) end
#set_cursor_id(result) (private)
Sets @cursor_id from the operation result.
In the operation result cursor id can be represented either as Integer value or as BSON::Int64. This method ensures that the instance variable is always of type Integer.
# File 'lib/mongo/cursor.rb', line 542
def set_cursor_id(result) @cursor_id = if result.cursor_id.is_a?(BSON::Int64) result.cursor_id.value else result.cursor_id end end
#to_return ⇒ Integer
Get the number of documents to return. Used on 3.0 and lower server versions.
# File 'lib/mongo/cursor.rb', line 381
def to_return use_limit? ? @remaining : (batch_size || 0) end
#try_next ⇒ BSON::Document
| nil
This method is experimental and subject to change.
Return one document from the query, if one is available.
This method will wait up to max_await_time_ms milliseconds for changes from the server, and if no changes are received it will return nil. If there are no more documents to return from the server, or if we have exhausted the cursor, it will raise a StopIteration exception.
# File 'lib/mongo/cursor.rb', line 223
def try_next if @documents.nil? # Since published versions of Mongoid have a copy of old driver cursor # code, our dup call in #process isn't invoked when Mongoid query # cache is active. Work around that by also calling dup here on # the result of #process which might come out of Mongoid's code. @documents = process(@initial_result).dup # the documents here can be an empty array, hence # we may end up issuing a getMore in the first try_next call end if @documents.empty? # On empty batches, we cache the batch resume token cache_batch_resume_token unless closed? if exhausted? close @fully_iterated = true raise StopIteration end @documents = get_more else @fully_iterated = true raise StopIteration end else # cursor is closed here # keep documents as an empty array end # If there is at least one document, cache its _id if @documents[0] cache_resume_token(@documents[0]) end # Cache the batch resume token if we are iterating # over the last document, or if the batch is empty if @documents.size <= 1 cache_batch_resume_token if closed? @fully_iterated = true end end return @documents.shift end
#unregister (private)
[ GitHub ]# File 'lib/mongo/cursor.rb', line 511
def unregister cluster.unregister_cursor(@cursor_id) end